Coverage Report - diy.middleware.QueueSizeMonitor
 
Classes in this File Line Coverage Branch Coverage Complexity
QueueSizeMonitor
0%
0/108
0%
0/38
0
QueueSizeMonitor$1
0%
0/2
N/A
0
 
 1  
 /*
 2  
         Copyright 2007 Alexey Akhunov
 3  
 
 4  
    Licensed under the Apache License, Version 2.0 (the "License");
 5  
    you may not use this file except in compliance with the License.
 6  
    You may obtain a copy of the License at
 7  
 
 8  
        http://www.apache.org/licenses/LICENSE-2.0
 9  
 
 10  
    Unless required by applicable law or agreed to in writing, software
 11  
    distributed under the License is distributed on an "AS IS" BASIS,
 12  
    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 13  
    See the License for the specific language governing permissions and
 14  
    limitations under the License.
 15  
 */
 16  
 
 17  
 package diy.middleware;
 18  
 
 19  
 import java.io.File;
 20  
 import java.io.FileOutputStream;
 21  
 import java.io.IOException;
 22  
 import java.lang.ref.PhantomReference;
 23  
 import java.lang.ref.Reference;
 24  
 import java.lang.ref.ReferenceQueue;
 25  
 import java.nio.ByteBuffer;
 26  
 import java.nio.CharBuffer;
 27  
 import java.nio.channels.FileChannel;
 28  
 import java.nio.charset.Charset;
 29  
 import java.nio.charset.CharsetEncoder;
 30  
 import java.util.HashMap;
 31  
 import java.util.Map;
 32  
 import java.util.Timer;
 33  
 import java.util.TimerTask;
 34  
 import java.util.concurrent.ConcurrentHashMap;
 35  
 import java.util.concurrent.ConcurrentMap;
 36  
 import java.util.concurrent.atomic.AtomicLong;
 37  
 
 38  0
 public class QueueSizeMonitor extends TimerTask implements Monitor {
 39  
         
 40  0
         private ReferenceQueue<Object> refQueue = new ReferenceQueue<Object>();
 41  
         
 42  0
         private ThreadLocal<Object> threadKey = new ThreadLocal<Object>() {
 43  
                 @Override
 44  
                 protected Object initialValue() {
 45  0
                         return new Object();
 46  
                 }
 47  
         };
 48  
         
 49  0
         private ThreadLocal<ConcurrentMap<String,AtomicLong>> tCounters = new ThreadLocal<ConcurrentMap<String,AtomicLong>>();
 50  
         
 51  0
         private Map<Reference<Object>,Map<String,AtomicLong>> refs = new HashMap<Reference<Object>,Map<String,AtomicLong>>();
 52  
         
 53  
         private FileChannel fileChannel;
 54  
         
 55  0
         private CharBuffer charBuffer = CharBuffer.allocate(65536);
 56  0
         private ByteBuffer byteBuffer = ByteBuffer.allocate(131072);
 57  0
         private CharsetEncoder encoder = Charset.forName("UTF-8").newEncoder();
 58  
         
 59  
         private final static String CLOSING_TAG = "</series>\n";
 60  
         
 61  
         private String filename;
 62  
         private long interval;
 63  
         
 64  
         public void init() throws Exception {
 65  0
                 fileChannel = new FileOutputStream(new File(filename)).getChannel();
 66  0
                 charBuffer.clear();
 67  0
                 charBuffer.append("<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n");
 68  0
                 charBuffer.append("<series>\n");
 69  0
                 charBuffer.append("</series>\n");
 70  0
                 charBuffer.flip();
 71  0
                 byteBuffer.clear();
 72  0
                 encoder.encode(charBuffer, byteBuffer, true);
 73  0
                 byteBuffer.flip();
 74  0
                 while (byteBuffer.hasRemaining()) {
 75  0
                         fileChannel.write(byteBuffer);
 76  
                 }
 77  0
                 Timer timer = new Timer();
 78  0
                 timer.scheduleAtFixedRate(this, 0, interval);
 79  0
         }
 80  
         
 81  
         /**
 82  
          * Increments counter for a key.
 83  
          * @param delta
 84  
          * @param key
 85  
          */
 86  
         public void count(long delta, String key) {
 87  0
                 ConcurrentMap<String,AtomicLong> counterMap = tCounters.get();
 88  0
                 if (counterMap == null) {
 89  0
                         counterMap = new ConcurrentHashMap<String, AtomicLong>(10,0.75f,1);
 90  0
                         synchronized (refs) {
 91  0
                                 refs.put(new PhantomReference<Object>(threadKey.get(),refQueue), counterMap);
 92  0
                         }
 93  0
                         tCounters.set(counterMap);
 94  
                 }
 95  0
                 AtomicLong counter = counterMap.get(key);
 96  0
                 if (counter == null) {
 97  0
                         counterMap.put(key, new AtomicLong(delta));
 98  
                 } else {
 99  0
                         counter.addAndGet(delta);
 100  
                 }
 101  0
         }
 102  
 
 103  0
         private Map<String,AtomicLong> aggregated = new HashMap<String,AtomicLong>();
 104  0
         private Map<String,AtomicLong> deads = new HashMap<String,AtomicLong>();
 105  
 
 106  
         @Override
 107  
         public void run() {
 108  
                 Reference ref;
 109  0
                 while ((ref = refQueue.poll()) != null) {
 110  
                         Map<String,AtomicLong> dead;
 111  0
                         synchronized (refs) {
 112  0
                                 dead = refs.remove(ref);
 113  0
                         }
 114  0
                         for (Map.Entry<String, AtomicLong> entry: dead.entrySet()) {
 115  0
                                 long value = entry.getValue().get();
 116  0
                                 if (value == 0) {
 117  0
                                         continue;
 118  
                                 }
 119  0
                                 AtomicLong aCounter = deads.get(entry.getKey());
 120  0
                                 if (aCounter == null) {
 121  0
                                         deads.put(entry.getKey(), new AtomicLong(value));
 122  
                                 } else {
 123  0
                                         aCounter.addAndGet(value);
 124  
                                 }
 125  0
                         }
 126  0
                 }
 127  0
                 for (Map.Entry<String, AtomicLong> entry: deads.entrySet()) {
 128  0
                         long value = entry.getValue().get();
 129  0
                         if (value == 0) {
 130  0
                                 continue;
 131  
                         }
 132  0
                         AtomicLong aCounter = aggregated.get(entry.getKey());
 133  0
                         if (aCounter == null) {
 134  0
                                 aggregated.put(entry.getKey(), new AtomicLong(value));
 135  
                         } else {
 136  0
                                 aCounter.addAndGet(value);
 137  
                         }
 138  0
                 }
 139  0
                 synchronized (refs) {
 140  0
                         for (Map<String,AtomicLong> counterMap: refs.values()) {
 141  0
                                 for (Map.Entry<String, AtomicLong> entry: counterMap.entrySet()) {
 142  0
                                         long value = entry.getValue().get();
 143  0
                                         if (value == 0) {
 144  0
                                                 continue;
 145  
                                         }
 146  0
                                         AtomicLong aCounter = aggregated.get(entry.getKey());
 147  0
                                         if (aCounter == null) {
 148  0
                                                 aggregated.put(entry.getKey(), new AtomicLong(value));
 149  
                                         } else {
 150  0
                                                 aCounter.addAndGet(value);
 151  
                                         }
 152  0
                                 }
 153  
                         }
 154  0
                 }
 155  0
                 boolean flush = false;
 156  0
                 for (AtomicLong val: aggregated.values()) {
 157  0
                         if (val.get() != 0) {
 158  0
                                 flush = true;
 159  
                         }
 160  
                 }
 161  0
                 if (!flush) {
 162  0
                         return;
 163  
                 }
 164  0
                 charBuffer.clear();
 165  0
                 charBuffer.append("<serie timestamp=\"");
 166  0
                 charBuffer.append(Long.toString(System.currentTimeMillis()));
 167  0
                 charBuffer.append("\">\n");
 168  0
                 for (Map.Entry<String, AtomicLong> entry: aggregated.entrySet()) {
 169  0
                         charBuffer.append("  <item key=\"");
 170  0
                         charBuffer.append(entry.getKey());
 171  0
                         charBuffer.append("\" value=\"");
 172  0
                         long value = entry.getValue().get();
 173  0
                         charBuffer.append(Long.toString(value));
 174  0
                         entry.getValue().addAndGet(-value);
 175  0
                         charBuffer.append("\"/>\n");
 176  0
                 }
 177  0
                 charBuffer.append("</serie>\n");
 178  0
                 charBuffer.append(CLOSING_TAG);
 179  0
                 charBuffer.flip();
 180  0
                 byteBuffer.clear();
 181  0
                 encoder.encode(charBuffer, byteBuffer, true);
 182  0
                 byteBuffer.flip();
 183  
                 try {
 184  0
                         fileChannel.position(fileChannel.position() - CLOSING_TAG.length());
 185  0
                         while (byteBuffer.hasRemaining()) {
 186  0
                                 fileChannel.write(byteBuffer);
 187  
                         }
 188  0
                         fileChannel.force(true);
 189  0
                 } catch (IOException e) {
 190  0
                         e.printStackTrace();
 191  0
                 }
 192  0
         }
 193  
 
 194  
         public void setFilename(String filename) {
 195  0
                 this.filename = filename;
 196  0
         }
 197  
 
 198  
         public void setInterval(long interval) {
 199  0
                 this.interval = interval;
 200  0
         }
 201  
         
 202  
 }