Coverage Report - diy.middleware.channels.ChannelTransport
 
Classes in this File Line Coverage Branch Coverage Complexity
ChannelTransport
97%
140/144
93%
25/27
0
ChannelTransport$1
100%
1/1
N/A
0
ChannelTransport$BufferDecoder
83%
20/24
50%
3/6
0
ChannelTransport$BufferEncoder
84%
27/32
75%
3/4
0
ChannelTransport$ChannelQueueScanner
100%
21/21
N/A
0
ChannelTransport$InternalSession
100%
9/9
N/A
0
ChannelTransport$SelectionRunner
93%
86/92
69%
33/48
0
 
 1  
 /*
 2  
  Copyright 2007 Alexey Akhunov
 3  
 
 4  
  Licensed under the Apache License, Version 2.0 (the "License");
 5  
  you may not use this file except in compliance with the License.
 6  
  You may obtain a copy of the License at
 7  
 
 8  
  http://www.apache.org/licenses/LICENSE-2.0
 9  
 
 10  
  Unless required by applicable law or agreed to in writing, software
 11  
  distributed under the License is distributed on an "AS IS" BASIS,
 12  
  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 13  
  See the License for the specific language governing permissions and
 14  
  limitations under the License.
 15  
  */
 16  
 package diy.middleware.channels;
 17  
 
 18  
 import java.io.IOException;
 19  
 import java.nio.ByteBuffer;
 20  
 import java.nio.CharBuffer;
 21  
 import java.nio.channels.SelectionKey;
 22  
 import java.nio.channels.Selector;
 23  
 import java.nio.channels.SocketChannel;
 24  
 import java.nio.charset.Charset;
 25  
 import java.nio.charset.CharsetDecoder;
 26  
 import java.nio.charset.CharsetEncoder;
 27  
 import java.util.HashMap;
 28  
 import java.util.Iterator;
 29  
 import java.util.LinkedList;
 30  
 import java.util.Map;
 31  
 import java.util.Set;
 32  
 import java.util.concurrent.BlockingQueue;
 33  
 import java.util.concurrent.LinkedBlockingQueue;
 34  
 import java.util.concurrent.locks.ReadWriteLock;
 35  
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 36  
 
 37  
 import diy.middleware.Frame;
 38  
 import diy.middleware.SessionCommand;
 39  
 
 40  535
 public class ChannelTransport implements Runnable {
 41  
 
 42  12
         private BlockingQueue<Frame> gatherQueue = new LinkedBlockingQueue<Frame>();
 43  
 
 44  24
         private class ChannelQueueScanner implements Runnable {
 45  
                 public void run() {
 46  
                         while (true) {
 47  
                                 try {
 48  26
                                         ChannelConnection incomingConnection = channelQueue.take();
 49  14
                                         SocketChannel channel = incomingConnection.getChannel();
 50  14
                                         channel.finishConnect();
 51  13
                                         channel.configureBlocking(false);
 52  13
                                         String sessionId = incomingConnection.getSessionId();
 53  13
                                         InternalSession internalSession = new InternalSession(
 54  
                                                         sessionId, channel);
 55  13
                                         internalSession.setQueue(new LinkedBlockingQueue<Frame>());
 56  
                                         try {
 57  13
                                                 sessionsLock.writeLock().lock();
 58  13
                                                 sessionsById.put(sessionId, internalSession);
 59  13
                                                 sessionsByChannel.put(channel, internalSession);
 60  
                                         } finally {
 61  13
                                                 sessionsLock.writeLock().unlock();
 62  13
                                         }
 63  13
                                         initSessionChannel(internalSession, channel);
 64  13
                                         channelsToRegister.put(channel);
 65  13
                                         selector.wakeup();
 66  12
                                 } catch (InterruptedException ie) {
 67  12
                                         return;
 68  1
                                 } catch (Exception e) {
 69  1
                                         e.printStackTrace();
 70  14
                                 }
 71  
                         }
 72  
                 }
 73  
         }
 74  
 
 75  
         private static class InternalSession {
 76  
                 private String sessionId;
 77  
 
 78  
                 private SocketChannel channel;
 79  
 
 80  
                 private BlockingQueue<Frame> queue;
 81  
 
 82  13
                 public InternalSession(String sessionId, SocketChannel channel) {
 83  13
                         this.sessionId = sessionId;
 84  13
                         this.channel = channel;
 85  13
                 }
 86  
 
 87  
                 public SocketChannel getChannel() {
 88  28
                         return channel;
 89  
                 }
 90  
 
 91  
                 public String getSessionId() {
 92  37
                         return sessionId;
 93  
                 }
 94  
 
 95  
                 public BlockingQueue<Frame> getQueue() {
 96  23
                         return queue;
 97  
                 }
 98  
 
 99  
                 public void setQueue(BlockingQueue<Frame> queue) {
 100  21
                         this.queue = queue;
 101  21
                 }
 102  
         }
 103  
 
 104  
         private Selector selector;
 105  
 
 106  12
         private LinkedBlockingQueue<SocketChannel> channelsToRegister = new LinkedBlockingQueue<SocketChannel>();
 107  
 
 108  12
         private Map<String, InternalSession> sessionsById = new HashMap<String, InternalSession>();
 109  
 
 110  12
         private Map<SocketChannel, InternalSession> sessionsByChannel = new HashMap<SocketChannel, InternalSession>();
 111  
 
 112  12
         private ReadWriteLock sessionsLock = new ReentrantReadWriteLock();
 113  
 
 114  12
         private Map<SocketChannel, ByteBuffer> readSizeBuffers = new HashMap<SocketChannel, ByteBuffer>();
 115  
 
 116  12
         private Map<SocketChannel, LinkedBlockingQueue<ByteBuffer>> readBuffers = new HashMap<SocketChannel, LinkedBlockingQueue<ByteBuffer>>();
 117  
 
 118  12
         private ReadWriteLock readBufferLock = new ReentrantReadWriteLock();
 119  
 
 120  12
         private Map<SocketChannel, ByteBuffer> writeSizeBuffers = new HashMap<SocketChannel, ByteBuffer>();
 121  
 
 122  12
         private Map<SocketChannel, LinkedBlockingQueue<ByteBuffer>> writeBuffers = new HashMap<SocketChannel, LinkedBlockingQueue<ByteBuffer>>();
 123  
 
 124  12
         private ReadWriteLock writeBufferLock = new ReentrantReadWriteLock();
 125  
 
 126  24
         private class BufferEncoder implements Runnable {
 127  
 
 128  12
                 private CharsetEncoder encoder = Charset.forName("UTF-8").newEncoder();
 129  
 
 130  
                 public void run() {
 131  
                         while (true) {
 132  
                                 try {
 133  22
                                         Frame frame = gatherQueue.take();
 134  
                                         InternalSession internalSession;
 135  
                                         try {
 136  10
                                                 sessionsLock.readLock().lock();
 137  10
                                                 internalSession = sessionsById
 138  
                                                                 .get(frame.getSessionId());
 139  
                                         } finally {
 140  10
                                                 sessionsLock.readLock().unlock();
 141  10
                                         }
 142  10
                                         if (internalSession != null) {
 143  
                                                 ByteBuffer encoded;
 144  
                                                 try {
 145  8
                                                         encoded = encoder.encode(CharBuffer.wrap(frame
 146  
                                                                         .getContent()));
 147  0
                                                 } catch (Exception exception) {
 148  0
                                                         destroySession(frame.getSessionId(), exception,
 149  
                                                                         true);
 150  0
                                                         continue;
 151  8
                                                 }
 152  
                                                 LinkedBlockingQueue<ByteBuffer> queue;
 153  
                                                 try {
 154  8
                                                         writeBufferLock.readLock().lock();
 155  8
                                                         queue = writeBuffers.get(internalSession
 156  
                                                                         .getChannel());
 157  
                                                 } finally {
 158  8
                                                         writeBufferLock.readLock().unlock();
 159  8
                                                 }
 160  8
                                                 SocketChannel channel = internalSession.getChannel();
 161  8
                                                 SelectionKey key = channel.keyFor(selector);
 162  
                                                 try {
 163  8
                                                         queue.put(encoded);
 164  8
                                                         if ((key.interestOps() & SelectionKey.OP_WRITE) == 0) {
 165  6
                                                                 key.interestOps(SelectionKey.OP_READ
 166  
                                                                                 + SelectionKey.OP_WRITE);
 167  6
                                                                 selector.wakeup();
 168  
                                                         }
 169  2
                                                 } catch (Exception exception) {
 170  2
                                                         destroySession(internalSession.getSessionId(),
 171  
                                                                         exception, true);
 172  2
                                                         continue;
 173  6
                                                 }
 174  
                                         }
 175  12
                                 } catch (InterruptedException ie) {
 176  12
                                         return;
 177  0
                                 } catch (Exception e) {
 178  0
                                         e.printStackTrace();
 179  8
                                 }
 180  
                         }
 181  
                 }
 182  
 
 183  
         }
 184  
 
 185  
         private Thread transportThread;
 186  
 
 187  
         private BlockingQueue<SessionCommand> inQueue;
 188  
 
 189  
         private BlockingQueue<SessionCommand> outQueue;
 190  
 
 191  
         public void setInQueue(BlockingQueue<SessionCommand> inQueue) {
 192  12
                 this.inQueue = inQueue;
 193  12
         }
 194  
 
 195  
         public void setOutQueue(BlockingQueue<SessionCommand> outQueue) {
 196  12
                 this.outQueue = outQueue;
 197  12
         }
 198  
 
 199  
         private BlockingQueue<ChannelConnection> channelQueue;
 200  
 
 201  
         public void setChannelQueue(BlockingQueue<ChannelConnection> channelQueue) {
 202  12
                 this.channelQueue = channelQueue;
 203  12
         }
 204  
 
 205  
         public void run() {
 206  
                 while (true) {
 207  
                         try {
 208  24
                                 final SessionCommand sessionCommand = inQueue.take();
 209  1
                                 switch (sessionCommand.getCommand()) {
 210  
                                 case QUEUE: {
 211  9
                                         final String sessionId = sessionCommand.getSessionId();
 212  
                                         final InternalSession internalSession;
 213  
                                         try {
 214  9
                                                 sessionsLock.readLock().lock();
 215  9
                                                 internalSession = sessionsById.get(sessionId);
 216  
                                         } finally {
 217  9
                                                 sessionsLock.readLock().unlock();
 218  9
                                         }
 219  9
                                         if (internalSession == null) {
 220  
                                                 // TODO: throw exception
 221  1
                                                 break;
 222  
                                         }
 223  8
                                         BlockingQueue<Frame> oldQueue = internalSession.getQueue();
 224  8
                                         internalSession.setQueue(sessionCommand.getQueue());
 225  8
                                         oldQueue.drainTo(internalSession.getQueue());
 226  8
                                         break;
 227  
                                 }
 228  
                                 case CLOSE: {
 229  2
                                         final String sessionId = sessionCommand.getSessionId();
 230  
                                         final InternalSession internalSession;
 231  
                                         try {
 232  2
                                                 sessionsLock.readLock().lock();
 233  2
                                                 internalSession = sessionsById.get(sessionId);
 234  
                                         } finally {
 235  2
                                                 sessionsLock.readLock().unlock();
 236  2
                                         }
 237  2
                                         if (internalSession == null) {
 238  
                                                 // TODO: throw exception
 239  1
                                                 break;
 240  
                                         }
 241  1
                                         destroySession(sessionId, null, false);
 242  1
                                         break;
 243  
                                 }
 244  
                                 }
 245  12
                         } catch (InterruptedException ie) {
 246  12
                                 return;
 247  1
                         } catch (Exception e) {
 248  1
                                 e.printStackTrace();
 249  12
                         }
 250  
                 }
 251  
         }
 252  
 
 253  
         private Thread scannerThread;
 254  
 
 255  
         private Thread[] encoderThreads;
 256  
 
 257  
         private Thread[] decoderThreads;
 258  
 
 259  
         private int encoders;
 260  
 
 261  
         private int decoders;
 262  
 
 263  
         private Thread selectionThread;
 264  
 
 265  
         public void init() throws IOException {
 266  12
                 transportThread = new Thread(this);
 267  12
                 transportThread.start();
 268  12
                 selector = Selector.open();
 269  12
                 selectionThread = new Thread(new SelectionRunner());
 270  12
                 selectionThread.start();
 271  12
                 encoderThreads = new Thread[encoders];
 272  24
                 for (int i = 0; i < encoders; i++) {
 273  12
                         encoderThreads[i] = new Thread(new BufferEncoder());
 274  12
                         encoderThreads[i].start();
 275  
                 }
 276  12
                 decoderThreads = new Thread[decoders];
 277  24
                 for (int i = 0; i < decoders; i++) {
 278  12
                         decoderThreads[i] = new Thread(new BufferDecoder());
 279  12
                         decoderThreads[i].start();
 280  
                 }
 281  12
                 scannerThread = new Thread(new ChannelQueueScanner());
 282  12
                 scannerThread.start();
 283  12
         }
 284  
 
 285  
         public void setEncoders(int encoders) {
 286  12
                 this.encoders = encoders;
 287  12
         }
 288  
 
 289  
         private void destroySession(String sessionId, Exception exception,
 290  
                         boolean send) {
 291  
                 try {
 292  12
                         sessionsLock.writeLock().lock();
 293  12
                         readBufferLock.writeLock().lock();
 294  12
                         writeBufferLock.writeLock().lock();
 295  12
                         InternalSession internalSession = sessionsById.remove(sessionId);
 296  12
                         if (internalSession != null) {
 297  12
                                 SocketChannel channel = internalSession.getChannel();
 298  12
                                 sessionsByChannel.remove(channel);
 299  12
                                 readSizeBuffers.remove(channel);
 300  12
                                 readBuffers.remove(channel);
 301  12
                                 writeSizeBuffers.remove(channel);
 302  12
                                 writeBuffers.remove(channel);
 303  
                                 try {
 304  12
                                         channel.close();
 305  0
                                 } catch (Exception e) {
 306  0
                                         e.printStackTrace();
 307  12
                                 }
 308  
                         }
 309  
                 } finally {
 310  12
                         writeBufferLock.writeLock().unlock();
 311  12
                         readBufferLock.writeLock().unlock();
 312  12
                         sessionsLock.writeLock().unlock();
 313  12
                 }
 314  12
                 if (send) {
 315  2
                         SessionCommand command = new SessionCommand(sessionId,
 316  
                                         SessionCommand.Command.CLOSE);
 317  2
                         command.setException(exception);
 318  2
                         outQueue.offer(command);
 319  
                 }
 320  12
         }
 321  
 
 322  24
         private class SelectionRunner implements Runnable {
 323  
 
 324  
                 public void run() {
 325  12
                         Map<SocketChannel, ByteBuffer> currentReadBuffers = new HashMap<SocketChannel, ByteBuffer>();
 326  
                         try {
 327  
                                 while (true) {
 328  
                                         SocketChannel toRegister;
 329  57
                                         while ((toRegister = channelsToRegister.poll()) != null) {
 330  
                                                 try {
 331  13
                                                         toRegister.register(selector, SelectionKey.OP_READ);
 332  
                                                         InternalSession internalSession;
 333  
                                                         try {
 334  13
                                                                 sessionsLock.readLock().lock();
 335  13
                                                                 internalSession = sessionsByChannel
 336  
                                                                                 .get(toRegister);
 337  
                                                         } finally {
 338  13
                                                                 sessionsLock.readLock().unlock();
 339  13
                                                         }
 340  13
                                                         SessionCommand command = new SessionCommand(
 341  
                                                                         internalSession.getSessionId(),
 342  
                                                                         SessionCommand.Command.QUEUE);
 343  13
                                                         command.setQueue(gatherQueue);
 344  13
                                                         outQueue.put(command);
 345  0
                                                 } catch (Exception exception) {
 346  0
                                                         destroyChannel(toRegister, exception, true);
 347  13
                                                 }
 348  
                                         }
 349  44
                                         int keyCount = selector.select();
 350  42
                                         if (keyCount == 0) {
 351  29
                                                 if (Thread.interrupted()) {
 352  10
                                                         return;
 353  
                                                 }
 354  
                                         } else {
 355  13
                                                 Set<SelectionKey> keys = selector.selectedKeys();
 356  13
                                                 Iterator<SelectionKey> it = keys.iterator();
 357  26
                                                 while (it.hasNext()) {
 358  13
                                                         SelectionKey key = it.next();
 359  13
                                                         it.remove();
 360  13
                                                         SocketChannel channel = (SocketChannel) key
 361  
                                                                         .channel();
 362  
                                                         try {
 363  13
                                                                 if (key.isReadable()) {
 364  
                                                                         // Get buffers
 365  
                                                                         ByteBuffer sizeBuffer;
 366  
                                                                         LinkedBlockingQueue<ByteBuffer> bufferQueue;
 367  
                                                                         try {
 368  7
                                                                                 readBufferLock.readLock().lock();
 369  7
                                                                                 sizeBuffer = readSizeBuffers
 370  
                                                                                                 .get(channel);
 371  7
                                                                                 bufferQueue = readBuffers.get(channel);
 372  
                                                                         } finally {
 373  7
                                                                                 readBufferLock.readLock().unlock();
 374  7
                                                                         }
 375  
                                                                         boolean tryAgain;
 376  
                                                                         do {
 377  14
                                                                                 tryAgain = false;
 378  14
                                                                                 if (sizeBuffer.hasRemaining()) {
 379  14
                                                                                         if (channel.read(sizeBuffer) == -1) {
 380  0
                                                                                                 destroyChannel(channel, null, true);
 381  0
                                                                                                 break;
 382  
                                                                                         }
 383  14
                                                                                         if (!sizeBuffer.hasRemaining()) {
 384  7
                                                                                                 sizeBuffer.flip();
 385  7
                                                                                                 byte b1 = sizeBuffer.get();
 386  7
                                                                                                 byte b2 = sizeBuffer.get();
 387  7
                                                                                                 int size = ((b1 & 0xFF) << 8)
 388  
                                                                                                                 | (b2 & 0xFF);
 389  7
                                                                                                 ByteBuffer currentBuffer = ByteBuffer
 390  
                                                                                                                 .allocate(size);
 391  7
                                                                                                 currentReadBuffers.put(channel,
 392  
                                                                                                                 currentBuffer);
 393  
                                                                                         }
 394  
                                                                                 }
 395  14
                                                                                 if (!sizeBuffer.hasRemaining()) {
 396  7
                                                                                         ByteBuffer currentBuffer = currentReadBuffers
 397  
                                                                                                         .get(channel);
 398  7
                                                                                         if (channel.read(currentBuffer) == -1) {
 399  0
                                                                                                 destroyChannel(channel, null, true);
 400  0
                                                                                                 break;
 401  
                                                                                         }
 402  7
                                                                                         if (!currentBuffer.hasRemaining()) {
 403  7
                                                                                                 currentBuffer.flip();
 404  7
                                                                                                 currentReadBuffers
 405  
                                                                                                                 .remove(currentBuffer);
 406  7
                                                                                                 if (bufferQueue != null) {
 407  7
                                                                                                         bufferQueue
 408  
                                                                                                                         .put(currentBuffer);
 409  
                                                                                                 }
 410  7
                                                                                                 sizeBuffer.clear();
 411  7
                                                                                                 tryAgain = true;
 412  7
                                                                                                 decodeQueue.put(channel);
 413  
                                                                                         }
 414  
                                                                                 }
 415  14
                                                                         } while (tryAgain);
 416  7
                                                                 } else if (key.isWritable()) {
 417  
                                                                         // Get buffers
 418  
                                                                         ByteBuffer sizeBuffer;
 419  
                                                                         LinkedBlockingQueue<ByteBuffer> bufferQueue;
 420  
                                                                         try {
 421  6
                                                                                 writeBufferLock.readLock().lock();
 422  6
                                                                                 sizeBuffer = writeSizeBuffers
 423  
                                                                                                 .get(channel);
 424  6
                                                                                 bufferQueue = writeBuffers.get(channel);
 425  
                                                                         } finally {
 426  6
                                                                                 writeBufferLock.readLock().unlock();
 427  6
                                                                         }
 428  
                                                                         boolean tryAgain;
 429  
                                                                         do {
 430  12
                                                                                 tryAgain = false;
 431  12
                                                                                 ByteBuffer writeBuffer = null;
 432  12
                                                                                 if (bufferQueue != null) {
 433  12
                                                                                         writeBuffer = bufferQueue.peek();
 434  
                                                                                 }
 435  12
                                                                                 if (writeBuffer == null) {
 436  6
                                                                                         break;
 437  
                                                                                 }
 438  6
                                                                                 if (sizeBuffer.position() == 0) {
 439  6
                                                                                         sizeBuffer.put((byte) ((writeBuffer
 440  
                                                                                                         .limit() >>> 8) & 0xFF));
 441  6
                                                                                         sizeBuffer.put((byte) ((writeBuffer
 442  
                                                                                                         .limit() >>> 0) & 0xFF));
 443  6
                                                                                         sizeBuffer.flip();
 444  
                                                                                 }
 445  6
                                                                                 if (sizeBuffer.hasRemaining()) {
 446  6
                                                                                         channel.write(sizeBuffer);
 447  
                                                                                 }
 448  6
                                                                                 if (!sizeBuffer.hasRemaining()) {
 449  6
                                                                                         channel.write(writeBuffer);
 450  6
                                                                                         if (!writeBuffer.hasRemaining()) {
 451  6
                                                                                                 bufferQueue.remove();
 452  6
                                                                                                 sizeBuffer.clear();
 453  6
                                                                                                 tryAgain = true;
 454  
                                                                                         }
 455  
                                                                                 }
 456  6
                                                                         } while (tryAgain);
 457  6
                                                                         if (bufferQueue != null
 458  
                                                                                         && bufferQueue.isEmpty()) {
 459  6
                                                                                 if ((key.interestOps() & SelectionKey.OP_WRITE) !=0) {
 460  3
                                                                                         key.interestOps(SelectionKey.OP_READ);
 461  
                                                                                 }
 462  
                                                                         }
 463  
                                                                 }
 464  3
                                                         } catch (Exception exception) {
 465  3
                                                                 destroyChannel(channel, exception, true);
 466  10
                                                         }
 467  13
                                                 }
 468  
                                         }
 469  32
                                 }
 470  2
                         } catch (Exception e) {
 471  2
                                 e.printStackTrace();
 472  
                         }
 473  2
                 }
 474  
 
 475  
         }
 476  
 
 477  12
         private LinkedBlockingQueue<SocketChannel> decodeQueue = new LinkedBlockingQueue<SocketChannel>();
 478  
 
 479  
         public void setDecoders(int decoders) {
 480  12
                 this.decoders = decoders;
 481  12
         }
 482  
 
 483  24
         private class BufferDecoder implements Runnable {
 484  12
                 private CharsetDecoder decoder = Charset.forName("UTF-8").newDecoder();
 485  
 
 486  
                 public void run() {
 487  
                         while (true) {
 488  
                                 try {
 489  18
                                         SocketChannel channel = decodeQueue.take();
 490  
                                         LinkedBlockingQueue<ByteBuffer> queue;
 491  
                                         try {
 492  7
                                                 readBufferLock.readLock().lock();
 493  7
                                                 queue = readBuffers.get(channel);
 494  
                                         } finally {
 495  7
                                                 readBufferLock.readLock().unlock();
 496  7
                                         }
 497  7
                                         if (queue == null) {
 498  0
                                                 continue;
 499  
                                         }
 500  7
                                         ByteBuffer buffer = queue.poll();
 501  7
                                         if (buffer == null) {
 502  0
                                                 continue;
 503  
                                         }
 504  
                                         InternalSession internalSession;
 505  
                                         try {
 506  7
                                                 sessionsLock.readLock().lock();
 507  7
                                                 internalSession = sessionsByChannel.get(channel);
 508  
                                         } finally {
 509  7
                                                 sessionsLock.readLock().unlock();
 510  7
                                         }
 511  7
                                         if (internalSession != null) {
 512  7
                                                 String message = decoder.decode(buffer).toString();
 513  7
                                                 internalSession.getQueue().put(
 514  
                                                                 new Frame(internalSession.getSessionId(),
 515  
                                                                                 message));
 516  
                                         }
 517  12
                                 } catch (InterruptedException e) {
 518  12
                                         return;
 519  0
                                 } catch (Exception e) {
 520  0
                                         e.printStackTrace();
 521  6
                                 }
 522  
                         }
 523  
                 }
 524  
         }
 525  
 
 526  
         private void destroyChannel(SocketChannel channel, Exception exception,
 527  
                         boolean send) {
 528  
                 InternalSession session;
 529  
                 try {
 530  3
                         sessionsLock.writeLock().lock();
 531  3
                         session = sessionsByChannel.remove(channel);
 532  3
                         if (session != null) {
 533  1
                                 sessionsById.remove(session.getSessionId());
 534  
                         }
 535  
                 } finally {
 536  3
                         sessionsLock.writeLock().unlock();
 537  3
                 }
 538  
                 try {
 539  3
                         readBufferLock.writeLock().lock();
 540  3
                         writeBufferLock.writeLock().lock();
 541  3
                         readSizeBuffers.remove(channel);
 542  3
                         readBuffers.remove(channel);
 543  3
                         writeSizeBuffers.remove(channel);
 544  3
                         writeBuffers.remove(channel);
 545  
                         try {
 546  3
                                 channel.close();
 547  0
                         } catch (Exception e) {
 548  0
                                 e.printStackTrace();
 549  3
                         }
 550  
                 } finally {
 551  3
                         writeBufferLock.writeLock().unlock();
 552  3
                         readBufferLock.writeLock().unlock();
 553  3
                 }
 554  3
                 if (session != null && send) {
 555  1
                         SessionCommand command = new SessionCommand(session.getSessionId(),
 556  
                                         SessionCommand.Command.CLOSE);
 557  1
                         command.setException(exception);
 558  1
                         outQueue.offer(command);
 559  
                 }
 560  3
         }
 561  
 
 562  
         private void initSessionChannel(InternalSession session,
 563  
                         SocketChannel channel) {
 564  
                 try {
 565  13
                         sessionsLock.writeLock().lock();
 566  13
                         sessionsByChannel.put(channel, session);
 567  13
                         sessionsById.put(session.getSessionId(), session);
 568  13
                         readBufferLock.writeLock().lock();
 569  13
                         writeBufferLock.writeLock().lock();
 570  13
                         readSizeBuffers.put(channel, ByteBuffer.allocate(2));
 571  13
                         readBuffers.put(channel, new LinkedBlockingQueue<ByteBuffer>());
 572  13
                         writeSizeBuffers.put(channel, ByteBuffer.allocate(2));
 573  13
                         writeBuffers.put(channel, new LinkedBlockingQueue<ByteBuffer>());
 574  
                 } finally {
 575  13
                         writeBufferLock.writeLock().unlock();
 576  13
                         readBufferLock.writeLock().unlock();
 577  13
                         sessionsLock.writeLock().unlock();
 578  13
                 }
 579  13
         }
 580  
 
 581  
         public void destroy() throws IOException {
 582  12
                 scannerThread.interrupt();
 583  12
                 transportThread.interrupt();
 584  24
                 for (Thread t : encoderThreads) {
 585  12
                         t.interrupt();
 586  
                 }
 587  24
                 for (Thread t : decoderThreads) {
 588  12
                         t.interrupt();
 589  
                 }
 590  12
                 selectionThread.interrupt();
 591  
                 try {
 592  12
                         sessionsLock.writeLock().lock();
 593  12
                         LinkedList<String> ids = new LinkedList<String>(sessionsById
 594  
                                         .keySet());
 595  12
                         for (String sessionId : ids) {
 596  9
                                 destroySession(sessionId, null, false);
 597  
                         }
 598  
                 } finally {
 599  12
                         sessionsLock.writeLock().unlock();
 600  12
                 }
 601  12
                 selector.close();
 602  12
         }
 603  
 }