Coverage Report - diy.middleware.sockets.SocketTransport
 
Classes in this File Line Coverage Branch Coverage Complexity
SocketTransport
100%
66/66
100%
17/17
0
SocketTransport$1
100%
18/18
N/A
0
SocketTransport$2
100%
1/1
N/A
0
SocketTransport$InternalSession
100%
1/1
N/A
0
SocketTransport$SocketQueueScanner
100%
23/23
N/A
0
SocketTransport$SocketQueueScanner$1
100%
11/11
N/A
0
 
 1  
 /*
 2  
  Copyright 2007 Alexey Akhunov
 3  
 
 4  
  Licensed under the Apache License, Version 2.0 (the "License");
 5  
  you may not use this file except in compliance with the License.
 6  
  You may obtain a copy of the License at
 7  
 
 8  
  http://www.apache.org/licenses/LICENSE-2.0
 9  
 
 10  
  Unless required by applicable law or agreed to in writing, software
 11  
  distributed under the License is distributed on an "AS IS" BASIS,
 12  
  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 13  
  See the License for the specific language governing permissions and
 14  
  limitations under the License.
 15  
  */
 16  
 package diy.middleware.sockets;
 17  
 
 18  
 import java.io.BufferedInputStream;
 19  
 import java.io.BufferedOutputStream;
 20  
 import java.io.DataInputStream;
 21  
 import java.io.DataOutputStream;
 22  
 import java.io.EOFException;
 23  
 import java.io.OutputStream;
 24  
 import java.net.Socket;
 25  
 import java.util.HashMap;
 26  
 import java.util.LinkedList;
 27  
 import java.util.Map;
 28  
 import java.util.concurrent.BlockingQueue;
 29  
 import java.util.concurrent.LinkedBlockingQueue;
 30  
 import java.util.concurrent.locks.ReadWriteLock;
 31  
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 32  
 
 33  
 import diy.middleware.Frame;
 34  
 import diy.middleware.Monitor;
 35  
 import diy.middleware.SessionCommand;
 36  
 
 37  132
 public class SocketTransport implements Runnable {
 38  
 
 39  28
         private class SocketQueueScanner implements Runnable {
 40  
                 public void run() {
 41  
                         while (true) {
 42  
                                 try {
 43  30
                                         final SocketConnection incomingConnection = socketQueue
 44  
                                                         .take();
 45  16
                                         final String sessionId = incomingConnection.getSessionId();
 46  16
                                         OutputStream output = incomingConnection.getSocket()
 47  
                                                         .getOutputStream();
 48  15
                                         final DataOutputStream dataOutput = new DataOutputStream(
 49  
                                                         new BufferedOutputStream(output));
 50  15
                                         final BlockingQueue<Frame> queue = new LinkedBlockingQueue<Frame>();
 51  15
                                         final InternalSession internalSession = new InternalSession();
 52  15
                                         internalSession.socket = incomingConnection.getSocket();
 53  15
                                         Thread outputThread = new Thread(new Runnable() {
 54  
                                                 public void run() {
 55  
                                                         try {
 56  
                                                                 while (true) {
 57  21
                                                                         Frame outputFrame = queue.take();
 58  9
                                                                         monitor.count(-1, "Output|queue");
 59  9
                                                                         dataOutput.writeUTF(outputFrame
 60  
                                                                                         .getContent());
 61  9
                                                                         dataOutput.flush();
 62  6
                                                                 }
 63  12
                                                         } catch (InterruptedException ie) {
 64  3
                                                         } catch (Exception exception) {
 65  3
                                                                 destroySession(sessionId, exception, true);
 66  12
                                                         }
 67  15
                                                 }
 68  
                                         });
 69  15
                                         internalSession.outputThread = outputThread;
 70  
                                         try {
 71  15
                                                 internalSessionsLock.writeLock().lock();
 72  15
                                                 internalSessions.put(sessionId,internalSession);
 73  
                                         } finally {
 74  15
                                                 internalSessionsLock.writeLock().unlock();
 75  15
                                         }
 76  15
                                         outputThread.start();
 77  15
                                         SessionCommand command = new SessionCommand(
 78  
                                                         incomingConnection.getSessionId(),
 79  
                                                         SessionCommand.Command.QUEUE);
 80  15
                                         command.setQueue(queue);
 81  15
                                         outQueue.put(command);
 82  14
                                 } catch (InterruptedException ie) {
 83  14
                                         return;
 84  1
                                 } catch (Exception e) {
 85  1
                                         e.printStackTrace();
 86  16
                                 }
 87  
                         }
 88  
                 }
 89  
         }
 90  
 
 91  
         private BlockingQueue<SocketConnection> socketQueue;
 92  
 
 93  161
         private static class InternalSession {
 94  
                 private Thread inputThread;
 95  
 
 96  
                 private Thread outputThread;
 97  
 
 98  
                 private Socket socket;
 99  
         }
 100  
 
 101  14
         private Map<String, InternalSession> internalSessions = new HashMap<String, InternalSession>();
 102  
 
 103  14
         private ReadWriteLock internalSessionsLock = new ReentrantReadWriteLock();
 104  
 
 105  
         private BlockingQueue<SessionCommand> inQueue;
 106  
 
 107  
         private BlockingQueue<SessionCommand> outQueue;
 108  
 
 109  
         private Thread transportThread;
 110  
 
 111  
         private Thread scannerThread;
 112  
 
 113  
         private Monitor monitor;
 114  
 
 115  
         public void init() {
 116  14
                 transportThread = new Thread(this);
 117  14
                 transportThread.start();
 118  14
                 scannerThread = new Thread(new SocketQueueScanner());
 119  14
                 scannerThread.start();
 120  14
         }
 121  
 
 122  
         public void run() {
 123  
                 while (true) {
 124  
                         try {
 125  29
                                 final SessionCommand sessionCommand = inQueue.take();
 126  15
                                 switch (sessionCommand.getCommand()) {
 127  
                                 case QUEUE: {
 128  12
                                         final String sessionId = sessionCommand.getSessionId();
 129  
                                         final InternalSession internalSession;
 130  
                                         try {
 131  12
                                                 internalSessionsLock.readLock().lock();
 132  12
                                                 internalSession = internalSessions.get(sessionId);
 133  
                                         } finally {
 134  12
                                                 internalSessionsLock.readLock().unlock();
 135  12
                                         }
 136  12
                                         if (internalSession == null) {
 137  
                                                 // TODO: throw exception
 138  1
                                                 break;
 139  
                                         }
 140  11
                                         final BlockingQueue<Frame> queue = sessionCommand
 141  
                                                         .getQueue();
 142  11
                                         final DataInputStream dataInput = new DataInputStream(
 143  
                                                         new BufferedInputStream(internalSession.socket
 144  
                                                                         .getInputStream()));
 145  10
                                         Thread inputThread = new Thread(new Runnable() {
 146  
                                                 public void run() {
 147  
                                                         try {
 148  
                                                                 while (true) {
 149  
                                                                         try {
 150  17
                                                                                 String incomingMessage = dataInput
 151  
                                                                                                 .readUTF();
 152  8
                                                                                 queue.put(new Frame(sessionId,
 153  
                                                                                                 incomingMessage));
 154  7
                                                                                 monitor.count(1, "Distribution|queue");
 155  2
                                                                         } catch (EOFException eof) {
 156  
                                                                                 // Socket closed by peer
 157  2
                                                                                 internalSession.outputThread
 158  
                                                                                                 .interrupt();
 159  
                                                                                 try {
 160  2
                                                                                         internalSession.socket.close();
 161  1
                                                                                 } catch (Exception se) {
 162  1
                                                                                 }
 163  2
                                                                                 SessionCommand command = new SessionCommand(
 164  
                                                                                                 sessionId,
 165  
                                                                                                 SessionCommand.Command.CLOSE);
 166  2
                                                                                 outQueue.put(command);
 167  2
                                                                                 break;
 168  7
                                                                         }
 169  
                                                                 }
 170  1
                                                         } catch (InterruptedException ie) {
 171  7
                                                         } catch (Exception exception) {
 172  7
                                                                 destroySession(sessionId, exception, true);
 173  3
                                                         }
 174  10
                                                 }
 175  
                                         });
 176  10
                                         internalSession.inputThread = inputThread;
 177  10
                                         inputThread.start();
 178  10
                                         break;
 179  
                                 }
 180  
                                 case CLOSE: {
 181  2
                                         final String sessionId = sessionCommand.getSessionId();
 182  2
                                         destroySession(sessionId, null, false);
 183  2
                                         break;
 184  
                                 }
 185  
                                 }
 186  14
                         } catch (InterruptedException ie) {
 187  
                                 // Gracefull exit
 188  14
                                 break;
 189  2
                         } catch (Exception e) {
 190  2
                                 e.printStackTrace();
 191  15
                         }
 192  
                 }
 193  14
         }
 194  
 
 195  
         public void setInQueue(BlockingQueue<SessionCommand> inQueue) {
 196  14
                 this.inQueue = inQueue;
 197  14
         }
 198  
 
 199  
         public void setOutQueue(BlockingQueue<SessionCommand> outQueue) {
 200  14
                 this.outQueue = outQueue;
 201  14
         }
 202  
 
 203  
         public void setMonitor(Monitor monitor) {
 204  14
                 this.monitor = monitor;
 205  14
         }
 206  
 
 207  
         public void setSocketQueue(BlockingQueue<SocketConnection> socketQueue) {
 208  14
                 this.socketQueue = socketQueue;
 209  14
         }
 210  
 
 211  
         public void destroy() {
 212  14
                 scannerThread.interrupt();
 213  14
                 transportThread.interrupt();
 214  
                 try {
 215  14
                         internalSessionsLock.writeLock().lock();
 216  14
                         LinkedList<String> ids = new LinkedList<String>(internalSessions.keySet());
 217  14
                         for (String sessionId: ids) {
 218  10
                                 destroySession(sessionId, null, false);
 219  
                         }
 220  
                 } finally {
 221  14
                         internalSessionsLock.writeLock().unlock();
 222  14
                 }
 223  14
         }
 224  
         
 225  
         private void destroySession(String sessionId, Exception exception, boolean send) {
 226  
                 InternalSession session;
 227  
                 try {
 228  22
                         internalSessionsLock.writeLock().lock();
 229  22
                         session = internalSessions.remove(sessionId);
 230  
                 } finally {
 231  22
                         internalSessionsLock.writeLock().unlock();
 232  22
                 }
 233  22
                 if (session == null) {
 234  
                         // Already removed
 235  7
                         return;
 236  
                 }
 237  
                 try {
 238  15
                         session.socket.close();
 239  13
                 } catch (Exception e) {};
 240  15
                 if (session.inputThread != null && Thread.currentThread() != session.inputThread) {
 241  9
                         session.inputThread.interrupt();
 242  
                 }
 243  15
                 if (Thread.currentThread() != session.outputThread) {
 244  12
                         session.outputThread.interrupt();
 245  
                 }
 246  15
                 if (send) {
 247  4
                         SessionCommand command = new SessionCommand(
 248  
                                 sessionId, SessionCommand.Command.CLOSE);
 249  4
                         command.setException(exception);
 250  
                         // If we cannot quickly send close command, then nevermind
 251  4
                         outQueue.offer(command);
 252  
                 }
 253  15
         }
 254  
 }