Coverage Report - diy.middleware.client.Client
 
Classes in this File Line Coverage Branch Coverage Complexity
Client
0%
0/61
0%
0/9
0
Client$1
0%
0/13
0%
0/6
0
Client$2
0%
0/1
N/A
0
 
 1  
 /*
 2  
         Copyright 2007 Alexey Akhunov
 3  
 
 4  
    Licensed under the Apache License, Version 2.0 (the "License");
 5  
    you may not use this file except in compliance with the License.
 6  
    You may obtain a copy of the License at
 7  
 
 8  
        http://www.apache.org/licenses/LICENSE-2.0
 9  
 
 10  
    Unless required by applicable law or agreed to in writing, software
 11  
    distributed under the License is distributed on an "AS IS" BASIS,
 12  
    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 13  
    See the License for the specific language governing permissions and
 14  
    limitations under the License.
 15  
 */
 16  
 package diy.middleware.client;
 17  
 
 18  
 import java.io.IOException;
 19  
 import java.net.UnknownHostException;
 20  
 import java.rmi.server.UID;
 21  
 import java.util.HashMap;
 22  
 import java.util.Map;
 23  
 import java.util.concurrent.BlockingQueue;
 24  
 import java.util.concurrent.CountDownLatch;
 25  
 import java.util.concurrent.LinkedBlockingQueue;
 26  
 import java.util.concurrent.TimeUnit;
 27  
 import java.util.concurrent.locks.ReadWriteLock;
 28  
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 29  
 
 30  
 import diy.middleware.Connector;
 31  
 import diy.middleware.Frame;
 32  
 import diy.middleware.SessionCommand;
 33  
 
 34  0
 public class Client implements Runnable {
 35  
 
 36  
         private BlockingQueue<SessionCommand> inQueue;
 37  
         
 38  
         private BlockingQueue<SessionCommand> outQueue;
 39  
         
 40  
         private Connector connector;
 41  
         
 42  0
         private Map<String,Session> sessionMap = new HashMap<String,Session>();
 43  0
         private ReadWriteLock sessionMapLock = new ReentrantReadWriteLock();
 44  
         
 45  
         public void setInQueue(BlockingQueue<SessionCommand> inQueue) {
 46  0
                 this.inQueue = inQueue;
 47  0
         }
 48  
 
 49  
         public void setOutQueue(BlockingQueue<SessionCommand> outQueue) {
 50  0
                 this.outQueue = outQueue;
 51  0
         }
 52  
         
 53  
         public Session createSession(String host, int port, BlockingQueue<Frame> inMessageQueue, long timeout, TimeUnit unit) throws UnknownHostException, IOException, InterruptedException {
 54  0
                 String sessionId = new UID().toString();
 55  
                 // Create a latch
 56  0
                 CountDownLatch latch = new CountDownLatch(1);
 57  0
                 Session session = new Session(sessionId, inMessageQueue, latch);
 58  
                 try {
 59  0
                         sessionMapLock.writeLock().lock();
 60  0
                         sessionMap.put(sessionId, session);
 61  
                 } finally {
 62  0
                         sessionMapLock.writeLock().unlock();
 63  0
                 }
 64  0
                 connector.connect(host, port, sessionId);
 65  0
                 if (!latch.await(timeout, unit)) {
 66  0
                         return null;
 67  
                 }
 68  
                 // Send QUEUE command to the transport
 69  0
                 SessionCommand command = new SessionCommand(sessionId, SessionCommand.Command.QUEUE);
 70  0
                 command.setQueue(inFrameQueue);
 71  0
                 outQueue.put(command);
 72  0
                 return session;
 73  
         }
 74  
 
 75  
         public void setConnector(Connector connector) {
 76  0
                 this.connector = connector;
 77  0
         }
 78  
 
 79  
         private Thread thread;
 80  
         private int threads;
 81  
         private Thread[] dispatchThreads;
 82  0
         private LinkedBlockingQueue<Frame> inFrameQueue = new LinkedBlockingQueue<Frame>();
 83  
         
 84  
         public void init() {
 85  0
                 dispatchThreads = new Thread[threads];
 86  0
                 for (int i=0;i<threads;i++) {
 87  0
                         dispatchThreads[i] = new Thread(new Runnable() {
 88  
                                 public void run() {
 89  
                                         try {
 90  
                                         while (true) {
 91  0
                                                 Frame frame = inFrameQueue.take();
 92  
                                                 Session session;
 93  
                                                 try {
 94  0
                                                         sessionMapLock.readLock().lock();
 95  0
                                                         session = sessionMap.get(frame.getSessionId());
 96  
                                                 } finally {
 97  0
                                                         sessionMapLock.readLock().unlock();
 98  0
                                                 }
 99  0
                                                 if (frame.getContent().startsWith("REC") || frame.getContent().equals("STOP")) {
 100  
                                                         // subscription acknowledgement arrived
 101  0
                                                         session.getLatch().countDown();
 102  0
                                                 } else if (frame.getContent().startsWith("M")){
 103  0
                                                         session.getInQueue().put(new Frame(frame.getSessionId(),frame.getContent().substring(1)));
 104  
                                                 }
 105  0
                                         }
 106  0
                                         } catch (InterruptedException e) {
 107  
                                                 
 108  
                                         }
 109  0
                                 }
 110  
                         });
 111  0
                         dispatchThreads[i].start();
 112  
                 }
 113  0
                 thread = new Thread(this);
 114  0
                 thread.start();
 115  0
         }
 116  
         
 117  
         public void run() {
 118  
                 try {
 119  
                         while (true) {
 120  0
                                 SessionCommand command = inQueue.take();
 121  
                                 Session session;
 122  
                                 try {
 123  0
                                         sessionMapLock.readLock().lock();
 124  0
                                         session = sessionMap.get(command.getSessionId());
 125  
                                 } finally {
 126  0
                                         sessionMapLock.readLock().unlock();
 127  0
                                 }
 128  0
                                 switch (command.getCommand()) {
 129  
                                 case QUEUE:
 130  0
                                         session.setOutQueue(command.getQueue());
 131  0
                                         session.getLatch().countDown();
 132  0
                                         break;
 133  
                                 case CLOSE:
 134  0
                                         command.getException().printStackTrace();
 135  
                                         try {
 136  0
                                                 sessionMapLock.writeLock().lock();
 137  0
                                                 sessionMap.remove(session.getSessionId());
 138  
                                         } finally {
 139  0
                                                 sessionMapLock.writeLock().unlock();
 140  0
                                         }
 141  
                                 }
 142  0
                         }
 143  0
                 } catch (InterruptedException e) {
 144  
                         
 145  
                 }
 146  0
         }
 147  
 
 148  
         public void setThreads(int threads) {
 149  0
                 this.threads = threads;
 150  0
         }
 151  
         
 152  
         public void closeSession(Session session) throws InterruptedException {
 153  
                 try {
 154  0
                         sessionMapLock.writeLock().lock();
 155  0
                         sessionMap.remove(session.getSessionId());
 156  
                 } finally {
 157  0
                         sessionMapLock.writeLock().unlock();
 158  0
                 }
 159  0
                 SessionCommand command = new SessionCommand(session.getSessionId(), SessionCommand.Command.CLOSE);
 160  0
                 outQueue.put(command);
 161  0
         }
 162  
         
 163  
         public void destroy() {
 164  0
                 thread.interrupt();
 165  0
                 for (int i=0;i<threads;i++) {
 166  0
                         dispatchThreads[i].interrupt();
 167  
                 }
 168  0
         }
 169  
         
 170  
 }