Coverage Report - diy.middleware.Server
 
Classes in this File Line Coverage Branch Coverage Complexity
Server
100%
52/52
100%
9/9
0
Server$1
100%
44/44
100%
14/14
0
Server$2
100%
1/1
N/A
0
Server$Routing
100%
5/5
N/A
0
 
 1  
 /*
 2  
  Copyright 2007 Alexey Akhunov
 3  
 
 4  
  Licensed under the Apache License, Version 2.0 (the "License");
 5  
  you may not use this file except in compliance with the License.
 6  
  You may obtain a copy of the License at
 7  
 
 8  
  http://www.apache.org/licenses/LICENSE-2.0
 9  
 
 10  
  Unless required by applicable law or agreed to in writing, software
 11  
  distributed under the License is distributed on an "AS IS" BASIS,
 12  
  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 13  
  See the License for the specific language governing permissions and
 14  
  limitations under the License.
 15  
  */
 16  
 
 17  
 package diy.middleware;
 18  
 
 19  
 import java.util.HashMap;
 20  
 import java.util.LinkedHashMap;
 21  
 import java.util.Map;
 22  
 import java.util.concurrent.BlockingQueue;
 23  
 import java.util.concurrent.LinkedBlockingQueue;
 24  
 import java.util.concurrent.locks.ReadWriteLock;
 25  
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 26  
 import java.util.regex.Matcher;
 27  
 import java.util.regex.Pattern;
 28  
 
 29  289
 public class Server {
 30  
 
 31  
         private LinkedHashMap[] distributionMap;
 32  
 
 33  
         private ReadWriteLock[] distributionLocks;
 34  
 
 35  
         private Thread[] distributionThreads;
 36  
 
 37  10
         private LinkedBlockingQueue<Frame> distributionQueue = new LinkedBlockingQueue<Frame>();
 38  
 
 39  
         private int threads;
 40  
 
 41  
         private Monitor monitor;
 42  
 
 43  
         private BlockingQueue<SessionCommand> inQueue;
 44  
 
 45  
         private BlockingQueue<SessionCommand> outQueue;
 46  
 
 47  10
         private Map<String, BlockingQueue<Frame>> outputQueues = new HashMap<String, BlockingQueue<Frame>>();
 48  
 
 49  10
         private ReadWriteLock outputQueuesLock = new ReentrantReadWriteLock();
 50  
 
 51  
         /**
 52  
          * @param args
 53  
          */
 54  
         public void init() {
 55  10
                 distributionMap = new LinkedHashMap[threads];
 56  10
                 distributionLocks = new ReadWriteLock[threads];
 57  10
                 distributionThreads = new Thread[threads];
 58  30
                 for (int i = 0; i < threads; i++) {
 59  20
                         distributionMap[i] = new LinkedHashMap<String, Routing>();
 60  20
                         distributionLocks[i] = new ReentrantReadWriteLock();
 61  20
                         final int threadNumber = i;
 62  20
                         distributionThreads[i] = new Thread(new Runnable() {
 63  
                                 public void run() {
 64  
                                         try {
 65  
                                                 while (true) {
 66  49
                                                         Frame frame = distributionQueue.take();
 67  29
                                                         monitor.count(-1, "Distribution|queue");
 68  29
                                                         String content = frame.getContent();
 69  29
                                                         if (content.startsWith("M")) {
 70  18
                                                                 String toDistribute = content.substring("M"
 71  
                                                                                 .length());
 72  
 
 73  
                                                                 try {
 74  18
                                                                         distributionLocks[threadNumber].readLock()
 75  
                                                                                         .lock();
 76  18
                                                                         for (Map.Entry<String, Routing> entry : ((Map<String, Routing>) distributionMap[threadNumber])
 77  
                                                                                         .entrySet()) {
 78  20
                                                                                 if (entry.getValue().matcher.reset(
 79  
                                                                                                 toDistribute).find()) {
 80  17
                                                                                         entry.getValue().queue
 81  
                                                                                                         .put(new Frame(entry
 82  
                                                                                                                         .getKey(), content));
 83  17
                                                                                         monitor.count(1, "Output|queue");
 84  
                                                                                 }
 85  
                                                                         }
 86  
                                                                 } finally {
 87  18
                                                                         distributionLocks[threadNumber].readLock()
 88  
                                                                                         .unlock();
 89  18
                                                                 }
 90  18
                                                         } else if (content.startsWith("REC")) {
 91  9
                                                                 final String sessionId = frame.getSessionId();
 92  
                                                                 BlockingQueue<Frame> queue;
 93  
                                                                 try {
 94  9
                                                                         outputQueuesLock.readLock().lock();
 95  9
                                                                         queue = outputQueues.get(sessionId);
 96  
                                                                 } finally {
 97  9
                                                                         outputQueuesLock.readLock().unlock();
 98  9
                                                                 }
 99  9
                                                                 Pattern pattern = Pattern.compile(content
 100  
                                                                                 .substring("REC".length()));
 101  
                                                                 // Add subscription
 102  27
                                                                 for (int i = 0; i < threads; i++) {
 103  18
                                                                         Routing routing = new Routing(pattern
 104  
                                                                                         .matcher(""), queue);
 105  
                                                                         try {
 106  18
                                                                                 distributionLocks[i].writeLock().lock();
 107  18
                                                                                 distributionMap[i].put(frame
 108  
                                                                                                 .getSessionId(), routing);
 109  
                                                                         } finally {
 110  18
                                                                                 distributionLocks[i].writeLock()
 111  
                                                                                                 .unlock();
 112  18
                                                                         }
 113  
                                                                 }
 114  
                                                                 // Send acknowledgement
 115  9
                                                                 queue.put(new Frame(sessionId, content));
 116  9
                                                                 monitor.count(1, "Output|queue");
 117  9
                                                         } else if (content.startsWith("STOP")) {
 118  
                                                                 // Remove subscription
 119  3
                                                                 for (int i = 0; i < threads; i++) {
 120  
                                                                         try {
 121  2
                                                                                 distributionLocks[i].writeLock().lock();
 122  2
                                                                                 distributionMap[i].remove(frame
 123  
                                                                                                 .getSessionId());
 124  
                                                                         } finally {
 125  2
                                                                                 distributionLocks[i].writeLock()
 126  
                                                                                                 .unlock();
 127  2
                                                                         }
 128  
                                                                 }
 129  
                                                                 // Send acknowledgement
 130  1
                                                                 final String sessionId = frame.getSessionId();
 131  
                                                                 BlockingQueue<Frame> queue;
 132  
                                                                 try {
 133  1
                                                                         outputQueuesLock.readLock().lock();
 134  1
                                                                         queue = outputQueues.get(sessionId);
 135  
                                                                 } finally {
 136  1
                                                                         outputQueuesLock.readLock().unlock();
 137  1
                                                                 }
 138  1
                                                                 queue.put(new Frame(sessionId, content));
 139  1
                                                                 monitor.count(1, "Output|queue");
 140  
                                                         }
 141  29
                                                 }
 142  20
                                         } catch (InterruptedException e) {
 143  
                                         }
 144  20
                                 }
 145  
                         });
 146  20
                         distributionThreads[i].start();
 147  
                 }
 148  10
         }
 149  
 
 150  37
         public static class Routing {
 151  
                 private Matcher matcher;
 152  
 
 153  
                 private BlockingQueue<Frame> queue;
 154  
 
 155  18
                 public Routing(Matcher matcher, BlockingQueue<Frame> queue) {
 156  18
                         this.matcher = matcher;
 157  18
                         this.queue = queue;
 158  18
                 }
 159  
         }
 160  
 
 161  
         public void runLoop() {
 162  
                 while (true) {
 163  
                         try {
 164  25
                                 SessionCommand command = inQueue.take();
 165  1
                                 switch (command.getCommand()) {
 166  
                                 case QUEUE:
 167  
                                         try {
 168  14
                                                 outputQueuesLock.writeLock().lock();
 169  14
                                                 outputQueues.put(command.getSessionId(), command
 170  
                                                                 .getQueue());
 171  
                                         } finally {
 172  14
                                                 outputQueuesLock.writeLock().unlock();
 173  14
                                         }
 174  14
                                         SessionCommand reply = new SessionCommand(command
 175  
                                                         .getSessionId(), SessionCommand.Command.QUEUE);
 176  14
                                         reply.setQueue(distributionQueue);
 177  14
                                         outQueue.put(reply);
 178  13
                                         break;
 179  
                                 case CLOSE:
 180  3
                                         for (int i = 0; i < threads; i++) {
 181  
                                                 try {
 182  2
                                                         distributionLocks[i].writeLock().lock();
 183  2
                                                         distributionMap[i].remove(command.getSessionId());
 184  
                                                 } finally {
 185  2
                                                         distributionLocks[i].writeLock().unlock();
 186  2
                                                 }
 187  
                                         }
 188  
                                         try {
 189  1
                                                 outputQueuesLock.writeLock().lock();
 190  1
                                                 outputQueues.remove(command.getSessionId());
 191  
                                         } finally {
 192  1
                                                 outputQueuesLock.writeLock().unlock();
 193  1
                                         }
 194  1
                                         SessionCommand replyClose = new SessionCommand(command
 195  
                                                         .getSessionId(), SessionCommand.Command.CLOSE);
 196  1
                                         outQueue.put(replyClose);
 197  
                                         break;
 198  
                                 }
 199  10
                         } catch (InterruptedException e) {
 200  
                                 // Graceful
 201  10
                                 break;
 202  1
                         } catch (Exception e) {
 203  1
                                 e.printStackTrace();
 204  15
                         }
 205  
                 }
 206  
                 // Interrupt distribution threads
 207  30
                 for (Thread thread : distributionThreads) {
 208  20
                         thread.interrupt();
 209  
                 }
 210  10
         }
 211  
 
 212  
         public void setInQueue(BlockingQueue<SessionCommand> inQueue) {
 213  10
                 this.inQueue = inQueue;
 214  10
         }
 215  
 
 216  
         public void setOutQueue(BlockingQueue<SessionCommand> outQueue) {
 217  12
                 this.outQueue = outQueue;
 218  12
         }
 219  
 
 220  
         public void setMonitor(Monitor monitor) {
 221  10
                 this.monitor = monitor;
 222  10
         }
 223  
 
 224  
         public void setThreads(int threads) {
 225  10
                 this.threads = threads;
 226  10
         }
 227  
 
 228  
 }