|  1 |     | 
     | 
  |  2 |     | 
     | 
  |  3 |     | 
     | 
  |  4 |     | 
     | 
  |  5 |     | 
     | 
  |  6 |     | 
     | 
  |  7 |     | 
     | 
  |  8 |     | 
     | 
  |  9 |     | 
     | 
  |  10 |     | 
     | 
  |  11 |     | 
     | 
  |  12 |     | 
     | 
  |  13 |     | 
     | 
  |  14 |     | 
     | 
  |  15 |     | 
     | 
  |  16 |     | 
   package diy.middleware.channels;  | 
  |  17 |     | 
     | 
  |  18 |     | 
   import java.io.IOException;  | 
  |  19 |     | 
   import java.nio.ByteBuffer;  | 
  |  20 |     | 
   import java.nio.CharBuffer;  | 
  |  21 |     | 
   import java.nio.channels.SelectionKey;  | 
  |  22 |     | 
   import java.nio.channels.Selector;  | 
  |  23 |     | 
   import java.nio.channels.SocketChannel;  | 
  |  24 |     | 
   import java.nio.charset.Charset;  | 
  |  25 |     | 
   import java.nio.charset.CharsetDecoder;  | 
  |  26 |     | 
   import java.nio.charset.CharsetEncoder;  | 
  |  27 |     | 
   import java.util.HashMap;  | 
  |  28 |     | 
   import java.util.Iterator;  | 
  |  29 |     | 
   import java.util.LinkedList;  | 
  |  30 |     | 
   import java.util.Map;  | 
  |  31 |     | 
   import java.util.Set;  | 
  |  32 |     | 
   import java.util.concurrent.BlockingQueue;  | 
  |  33 |     | 
   import java.util.concurrent.LinkedBlockingQueue;  | 
  |  34 |     | 
   import java.util.concurrent.locks.ReadWriteLock;  | 
  |  35 |     | 
   import java.util.concurrent.locks.ReentrantReadWriteLock;  | 
  |  36 |     | 
     | 
  |  37 |     | 
   import diy.middleware.Frame;  | 
  |  38 |     | 
   import diy.middleware.SessionCommand;  | 
  |  39 |     | 
     | 
  |  40 |    535 |    public class ChannelTransport implements Runnable { | 
  |  41 |     | 
     | 
  |  42 |    12 |            private BlockingQueue<Frame> gatherQueue = new LinkedBlockingQueue<Frame>();  | 
  |  43 |     | 
     | 
  |  44 |    24 |            private class ChannelQueueScanner implements Runnable { | 
  |  45 |     | 
                   public void run() { | 
  |  46 |     | 
                           while (true) { | 
  |  47 |     | 
                                   try { | 
  |  48 |    26 |                                            ChannelConnection incomingConnection = channelQueue.take();  | 
  |  49 |    14 |                                            SocketChannel channel = incomingConnection.getChannel();  | 
  |  50 |    14 |                                            channel.finishConnect();  | 
  |  51 |    13 |                                            channel.configureBlocking(false);  | 
  |  52 |    13 |                                            String sessionId = incomingConnection.getSessionId();  | 
  |  53 |    13 |                                            InternalSession internalSession = new InternalSession(  | 
  |  54 |     | 
                                                           sessionId, channel);  | 
  |  55 |    13 |                                            internalSession.setQueue(new LinkedBlockingQueue<Frame>());  | 
  |  56 |     | 
                                           try { | 
  |  57 |    13 |                                                    sessionsLock.writeLock().lock();  | 
  |  58 |    13 |                                                    sessionsById.put(sessionId, internalSession);  | 
  |  59 |    13 |                                                    sessionsByChannel.put(channel, internalSession);  | 
  |  60 |     | 
                                           } finally { | 
  |  61 |    13 |                                                    sessionsLock.writeLock().unlock();  | 
  |  62 |    13 |                                            }  | 
  |  63 |    13 |                                            initSessionChannel(internalSession, channel);  | 
  |  64 |    13 |                                            channelsToRegister.put(channel);  | 
  |  65 |    13 |                                            selector.wakeup();  | 
  |  66 |    12 |                                    } catch (InterruptedException ie) { | 
  |  67 |    12 |                                            return;  | 
  |  68 |    1 |                                    } catch (Exception e) { | 
  |  69 |    1 |                                            e.printStackTrace();  | 
  |  70 |    14 |                                    }  | 
  |  71 |     | 
                           }  | 
  |  72 |     | 
                   }  | 
  |  73 |     | 
           }  | 
  |  74 |     | 
     | 
  |  75 |     | 
           private static class InternalSession { | 
  |  76 |     | 
                   private String sessionId;  | 
  |  77 |     | 
     | 
  |  78 |     | 
                   private SocketChannel channel;  | 
  |  79 |     | 
     | 
  |  80 |     | 
                   private BlockingQueue<Frame> queue;  | 
  |  81 |     | 
     | 
  |  82 |    13 |                    public InternalSession(String sessionId, SocketChannel channel) { | 
  |  83 |    13 |                            this.sessionId = sessionId;  | 
  |  84 |    13 |                            this.channel = channel;  | 
  |  85 |    13 |                    }  | 
  |  86 |     | 
     | 
  |  87 |     | 
                   public SocketChannel getChannel() { | 
  |  88 |    28 |                            return channel;  | 
  |  89 |     | 
                   }  | 
  |  90 |     | 
     | 
  |  91 |     | 
                   public String getSessionId() { | 
  |  92 |    37 |                            return sessionId;  | 
  |  93 |     | 
                   }  | 
  |  94 |     | 
     | 
  |  95 |     | 
                   public BlockingQueue<Frame> getQueue() { | 
  |  96 |    23 |                            return queue;  | 
  |  97 |     | 
                   }  | 
  |  98 |     | 
     | 
  |  99 |     | 
                   public void setQueue(BlockingQueue<Frame> queue) { | 
  |  100 |    21 |                            this.queue = queue;  | 
  |  101 |    21 |                    }  | 
  |  102 |     | 
           }  | 
  |  103 |     | 
     | 
  |  104 |     | 
           private Selector selector;  | 
  |  105 |     | 
     | 
  |  106 |    12 |            private LinkedBlockingQueue<SocketChannel> channelsToRegister = new LinkedBlockingQueue<SocketChannel>();  | 
  |  107 |     | 
     | 
  |  108 |    12 |            private Map<String, InternalSession> sessionsById = new HashMap<String, InternalSession>();  | 
  |  109 |     | 
     | 
  |  110 |    12 |            private Map<SocketChannel, InternalSession> sessionsByChannel = new HashMap<SocketChannel, InternalSession>();  | 
  |  111 |     | 
     | 
  |  112 |    12 |            private ReadWriteLock sessionsLock = new ReentrantReadWriteLock();  | 
  |  113 |     | 
     | 
  |  114 |    12 |            private Map<SocketChannel, ByteBuffer> readSizeBuffers = new HashMap<SocketChannel, ByteBuffer>();  | 
  |  115 |     | 
     | 
  |  116 |    12 |            private Map<SocketChannel, LinkedBlockingQueue<ByteBuffer>> readBuffers = new HashMap<SocketChannel, LinkedBlockingQueue<ByteBuffer>>();  | 
  |  117 |     | 
     | 
  |  118 |    12 |            private ReadWriteLock readBufferLock = new ReentrantReadWriteLock();  | 
  |  119 |     | 
     | 
  |  120 |    12 |            private Map<SocketChannel, ByteBuffer> writeSizeBuffers = new HashMap<SocketChannel, ByteBuffer>();  | 
  |  121 |     | 
     | 
  |  122 |    12 |            private Map<SocketChannel, LinkedBlockingQueue<ByteBuffer>> writeBuffers = new HashMap<SocketChannel, LinkedBlockingQueue<ByteBuffer>>();  | 
  |  123 |     | 
     | 
  |  124 |    12 |            private ReadWriteLock writeBufferLock = new ReentrantReadWriteLock();  | 
  |  125 |     | 
     | 
  |  126 |    24 |            private class BufferEncoder implements Runnable { | 
  |  127 |     | 
     | 
  |  128 |    12 |                    private CharsetEncoder encoder = Charset.forName("UTF-8").newEncoder(); | 
  |  129 |     | 
     | 
  |  130 |     | 
                   public void run() { | 
  |  131 |     | 
                           while (true) { | 
  |  132 |     | 
                                   try { | 
  |  133 |    22 |                                            Frame frame = gatherQueue.take();  | 
  |  134 |     | 
                                           InternalSession internalSession;  | 
  |  135 |     | 
                                           try { | 
  |  136 |    10 |                                                    sessionsLock.readLock().lock();  | 
  |  137 |    10 |                                                    internalSession = sessionsById  | 
  |  138 |     | 
                                                                   .get(frame.getSessionId());  | 
  |  139 |     | 
                                           } finally { | 
  |  140 |    10 |                                                    sessionsLock.readLock().unlock();  | 
  |  141 |    10 |                                            }  | 
  |  142 |    10 |                                            if (internalSession != null) { | 
  |  143 |     | 
                                                   ByteBuffer encoded;  | 
  |  144 |     | 
                                                   try { | 
  |  145 |    8 |                                                            encoded = encoder.encode(CharBuffer.wrap(frame  | 
  |  146 |     | 
                                                                           .getContent()));  | 
  |  147 |    0 |                                                    } catch (Exception exception) { | 
  |  148 |    0 |                                                            destroySession(frame.getSessionId(), exception,  | 
  |  149 |     | 
                                                                           true);  | 
  |  150 |    0 |                                                            continue;  | 
  |  151 |    8 |                                                    }  | 
  |  152 |     | 
                                                   LinkedBlockingQueue<ByteBuffer> queue;  | 
  |  153 |     | 
                                                   try { | 
  |  154 |    8 |                                                            writeBufferLock.readLock().lock();  | 
  |  155 |    8 |                                                            queue = writeBuffers.get(internalSession  | 
  |  156 |     | 
                                                                           .getChannel());  | 
  |  157 |     | 
                                                   } finally { | 
  |  158 |    8 |                                                            writeBufferLock.readLock().unlock();  | 
  |  159 |    8 |                                                    }  | 
  |  160 |    8 |                                                    SocketChannel channel = internalSession.getChannel();  | 
  |  161 |    8 |                                                    SelectionKey key = channel.keyFor(selector);  | 
  |  162 |     | 
                                                   try { | 
  |  163 |    8 |                                                            queue.put(encoded);  | 
  |  164 |    8 |                                                            if ((key.interestOps() & SelectionKey.OP_WRITE) == 0) { | 
  |  165 |    6 |                                                                    key.interestOps(SelectionKey.OP_READ  | 
  |  166 |     | 
                                                                                   + SelectionKey.OP_WRITE);  | 
  |  167 |    6 |                                                                    selector.wakeup();  | 
  |  168 |     | 
                                                           }  | 
  |  169 |    2 |                                                    } catch (Exception exception) { | 
  |  170 |    2 |                                                            destroySession(internalSession.getSessionId(),  | 
  |  171 |     | 
                                                                           exception, true);  | 
  |  172 |    2 |                                                            continue;  | 
  |  173 |    6 |                                                    }  | 
  |  174 |     | 
                                           }  | 
  |  175 |    12 |                                    } catch (InterruptedException ie) { | 
  |  176 |    12 |                                            return;  | 
  |  177 |    0 |                                    } catch (Exception e) { | 
  |  178 |    0 |                                            e.printStackTrace();  | 
  |  179 |    8 |                                    }  | 
  |  180 |     | 
                           }  | 
  |  181 |     | 
                   }  | 
  |  182 |     | 
     | 
  |  183 |     | 
           }  | 
  |  184 |     | 
     | 
  |  185 |     | 
           private Thread transportThread;  | 
  |  186 |     | 
     | 
  |  187 |     | 
           private BlockingQueue<SessionCommand> inQueue;  | 
  |  188 |     | 
     | 
  |  189 |     | 
           private BlockingQueue<SessionCommand> outQueue;  | 
  |  190 |     | 
     | 
  |  191 |     | 
           public void setInQueue(BlockingQueue<SessionCommand> inQueue) { | 
  |  192 |    12 |                    this.inQueue = inQueue;  | 
  |  193 |    12 |            }  | 
  |  194 |     | 
     | 
  |  195 |     | 
           public void setOutQueue(BlockingQueue<SessionCommand> outQueue) { | 
  |  196 |    12 |                    this.outQueue = outQueue;  | 
  |  197 |    12 |            }  | 
  |  198 |     | 
     | 
  |  199 |     | 
           private BlockingQueue<ChannelConnection> channelQueue;  | 
  |  200 |     | 
     | 
  |  201 |     | 
           public void setChannelQueue(BlockingQueue<ChannelConnection> channelQueue) { | 
  |  202 |    12 |                    this.channelQueue = channelQueue;  | 
  |  203 |    12 |            }  | 
  |  204 |     | 
     | 
  |  205 |     | 
           public void run() { | 
  |  206 |     | 
                   while (true) { | 
  |  207 |     | 
                           try { | 
  |  208 |    24 |                                    final SessionCommand sessionCommand = inQueue.take();  | 
  |  209 |    1 |                                    switch (sessionCommand.getCommand()) { | 
  |  210 |     | 
                                   case QUEUE: { | 
  |  211 |    9 |                                            final String sessionId = sessionCommand.getSessionId();  | 
  |  212 |     | 
                                           final InternalSession internalSession;  | 
  |  213 |     | 
                                           try { | 
  |  214 |    9 |                                                    sessionsLock.readLock().lock();  | 
  |  215 |    9 |                                                    internalSession = sessionsById.get(sessionId);  | 
  |  216 |     | 
                                           } finally { | 
  |  217 |    9 |                                                    sessionsLock.readLock().unlock();  | 
  |  218 |    9 |                                            }  | 
  |  219 |    9 |                                            if (internalSession == null) { | 
  |  220 |     | 
                                                     | 
  |  221 |    1 |                                                    break;  | 
  |  222 |     | 
                                           }  | 
  |  223 |    8 |                                            BlockingQueue<Frame> oldQueue = internalSession.getQueue();  | 
  |  224 |    8 |                                            internalSession.setQueue(sessionCommand.getQueue());  | 
  |  225 |    8 |                                            oldQueue.drainTo(internalSession.getQueue());  | 
  |  226 |    8 |                                            break;  | 
  |  227 |     | 
                                   }  | 
  |  228 |     | 
                                   case CLOSE: { | 
  |  229 |    2 |                                            final String sessionId = sessionCommand.getSessionId();  | 
  |  230 |     | 
                                           final InternalSession internalSession;  | 
  |  231 |     | 
                                           try { | 
  |  232 |    2 |                                                    sessionsLock.readLock().lock();  | 
  |  233 |    2 |                                                    internalSession = sessionsById.get(sessionId);  | 
  |  234 |     | 
                                           } finally { | 
  |  235 |    2 |                                                    sessionsLock.readLock().unlock();  | 
  |  236 |    2 |                                            }  | 
  |  237 |    2 |                                            if (internalSession == null) { | 
  |  238 |     | 
                                                     | 
  |  239 |    1 |                                                    break;  | 
  |  240 |     | 
                                           }  | 
  |  241 |    1 |                                            destroySession(sessionId, null, false);  | 
  |  242 |    1 |                                            break;  | 
  |  243 |     | 
                                   }  | 
  |  244 |     | 
                                   }  | 
  |  245 |    12 |                            } catch (InterruptedException ie) { | 
  |  246 |    12 |                                    return;  | 
  |  247 |    1 |                            } catch (Exception e) { | 
  |  248 |    1 |                                    e.printStackTrace();  | 
  |  249 |    12 |                            }  | 
  |  250 |     | 
                   }  | 
  |  251 |     | 
           }  | 
  |  252 |     | 
     | 
  |  253 |     | 
           private Thread scannerThread;  | 
  |  254 |     | 
     | 
  |  255 |     | 
           private Thread[] encoderThreads;  | 
  |  256 |     | 
     | 
  |  257 |     | 
           private Thread[] decoderThreads;  | 
  |  258 |     | 
     | 
  |  259 |     | 
           private int encoders;  | 
  |  260 |     | 
     | 
  |  261 |     | 
           private int decoders;  | 
  |  262 |     | 
     | 
  |  263 |     | 
           private Thread selectionThread;  | 
  |  264 |     | 
     | 
  |  265 |     | 
           public void init() throws IOException { | 
  |  266 |    12 |                    transportThread = new Thread(this);  | 
  |  267 |    12 |                    transportThread.start();  | 
  |  268 |    12 |                    selector = Selector.open();  | 
  |  269 |    12 |                    selectionThread = new Thread(new SelectionRunner());  | 
  |  270 |    12 |                    selectionThread.start();  | 
  |  271 |    12 |                    encoderThreads = new Thread[encoders];  | 
  |  272 |    24 |                    for (int i = 0; i < encoders; i++) { | 
  |  273 |    12 |                            encoderThreads[i] = new Thread(new BufferEncoder());  | 
  |  274 |    12 |                            encoderThreads[i].start();  | 
  |  275 |     | 
                   }  | 
  |  276 |    12 |                    decoderThreads = new Thread[decoders];  | 
  |  277 |    24 |                    for (int i = 0; i < decoders; i++) { | 
  |  278 |    12 |                            decoderThreads[i] = new Thread(new BufferDecoder());  | 
  |  279 |    12 |                            decoderThreads[i].start();  | 
  |  280 |     | 
                   }  | 
  |  281 |    12 |                    scannerThread = new Thread(new ChannelQueueScanner());  | 
  |  282 |    12 |                    scannerThread.start();  | 
  |  283 |    12 |            }  | 
  |  284 |     | 
     | 
  |  285 |     | 
           public void setEncoders(int encoders) { | 
  |  286 |    12 |                    this.encoders = encoders;  | 
  |  287 |    12 |            }  | 
  |  288 |     | 
     | 
  |  289 |     | 
           private void destroySession(String sessionId, Exception exception,  | 
  |  290 |     | 
                           boolean send) { | 
  |  291 |     | 
                   try { | 
  |  292 |    12 |                            sessionsLock.writeLock().lock();  | 
  |  293 |    12 |                            readBufferLock.writeLock().lock();  | 
  |  294 |    12 |                            writeBufferLock.writeLock().lock();  | 
  |  295 |    12 |                            InternalSession internalSession = sessionsById.remove(sessionId);  | 
  |  296 |    12 |                            if (internalSession != null) { | 
  |  297 |    12 |                                    SocketChannel channel = internalSession.getChannel();  | 
  |  298 |    12 |                                    sessionsByChannel.remove(channel);  | 
  |  299 |    12 |                                    readSizeBuffers.remove(channel);  | 
  |  300 |    12 |                                    readBuffers.remove(channel);  | 
  |  301 |    12 |                                    writeSizeBuffers.remove(channel);  | 
  |  302 |    12 |                                    writeBuffers.remove(channel);  | 
  |  303 |     | 
                                   try { | 
  |  304 |    12 |                                            channel.close();  | 
  |  305 |    0 |                                    } catch (Exception e) { | 
  |  306 |    0 |                                            e.printStackTrace();  | 
  |  307 |    12 |                                    }  | 
  |  308 |     | 
                           }  | 
  |  309 |     | 
                   } finally { | 
  |  310 |    12 |                            writeBufferLock.writeLock().unlock();  | 
  |  311 |    12 |                            readBufferLock.writeLock().unlock();  | 
  |  312 |    12 |                            sessionsLock.writeLock().unlock();  | 
  |  313 |    12 |                    }  | 
  |  314 |    12 |                    if (send) { | 
  |  315 |    2 |                            SessionCommand command = new SessionCommand(sessionId,  | 
  |  316 |     | 
                                           SessionCommand.Command.CLOSE);  | 
  |  317 |    2 |                            command.setException(exception);  | 
  |  318 |    2 |                            outQueue.offer(command);  | 
  |  319 |     | 
                   }  | 
  |  320 |    12 |            }  | 
  |  321 |     | 
     | 
  |  322 |    24 |            private class SelectionRunner implements Runnable { | 
  |  323 |     | 
     | 
  |  324 |     | 
                   public void run() { | 
  |  325 |    12 |                            Map<SocketChannel, ByteBuffer> currentReadBuffers = new HashMap<SocketChannel, ByteBuffer>();  | 
  |  326 |     | 
                           try { | 
  |  327 |     | 
                                   while (true) { | 
  |  328 |     | 
                                           SocketChannel toRegister;  | 
  |  329 |    57 |                                            while ((toRegister = channelsToRegister.poll()) != null) { | 
  |  330 |     | 
                                                   try { | 
  |  331 |    13 |                                                            toRegister.register(selector, SelectionKey.OP_READ);  | 
  |  332 |     | 
                                                           InternalSession internalSession;  | 
  |  333 |     | 
                                                           try { | 
  |  334 |    13 |                                                                    sessionsLock.readLock().lock();  | 
  |  335 |    13 |                                                                    internalSession = sessionsByChannel  | 
  |  336 |     | 
                                                                                   .get(toRegister);  | 
  |  337 |     | 
                                                           } finally { | 
  |  338 |    13 |                                                                    sessionsLock.readLock().unlock();  | 
  |  339 |    13 |                                                            }  | 
  |  340 |    13 |                                                            SessionCommand command = new SessionCommand(  | 
  |  341 |     | 
                                                                           internalSession.getSessionId(),  | 
  |  342 |     | 
                                                                           SessionCommand.Command.QUEUE);  | 
  |  343 |    13 |                                                            command.setQueue(gatherQueue);  | 
  |  344 |    13 |                                                            outQueue.put(command);  | 
  |  345 |    0 |                                                    } catch (Exception exception) { | 
  |  346 |    0 |                                                            destroyChannel(toRegister, exception, true);  | 
  |  347 |    13 |                                                    }  | 
  |  348 |     | 
                                           }  | 
  |  349 |    44 |                                            int keyCount = selector.select();  | 
  |  350 |    42 |                                            if (keyCount == 0) { | 
  |  351 |    29 |                                                    if (Thread.interrupted()) { | 
  |  352 |    10 |                                                            return;  | 
  |  353 |     | 
                                                   }  | 
  |  354 |     | 
                                           } else { | 
  |  355 |    13 |                                                    Set<SelectionKey> keys = selector.selectedKeys();  | 
  |  356 |    13 |                                                    Iterator<SelectionKey> it = keys.iterator();  | 
  |  357 |    26 |                                                    while (it.hasNext()) { | 
  |  358 |    13 |                                                            SelectionKey key = it.next();  | 
  |  359 |    13 |                                                            it.remove();  | 
  |  360 |    13 |                                                            SocketChannel channel = (SocketChannel) key  | 
  |  361 |     | 
                                                                           .channel();  | 
  |  362 |     | 
                                                           try { | 
  |  363 |    13 |                                                                    if (key.isReadable()) { | 
  |  364 |     | 
                                                                             | 
  |  365 |     | 
                                                                           ByteBuffer sizeBuffer;  | 
  |  366 |     | 
                                                                           LinkedBlockingQueue<ByteBuffer> bufferQueue;  | 
  |  367 |     | 
                                                                           try { | 
  |  368 |    7 |                                                                                    readBufferLock.readLock().lock();  | 
  |  369 |    7 |                                                                                    sizeBuffer = readSizeBuffers  | 
  |  370 |     | 
                                                                                                   .get(channel);  | 
  |  371 |    7 |                                                                                    bufferQueue = readBuffers.get(channel);  | 
  |  372 |     | 
                                                                           } finally { | 
  |  373 |    7 |                                                                                    readBufferLock.readLock().unlock();  | 
  |  374 |    7 |                                                                            }  | 
  |  375 |     | 
                                                                           boolean tryAgain;  | 
  |  376 |     | 
                                                                           do { | 
  |  377 |    14 |                                                                                    tryAgain = false;  | 
  |  378 |    14 |                                                                                    if (sizeBuffer.hasRemaining()) { | 
  |  379 |    14 |                                                                                            if (channel.read(sizeBuffer) == -1) { | 
  |  380 |    0 |                                                                                                    destroyChannel(channel, null, true);  | 
  |  381 |    0 |                                                                                                    break;  | 
  |  382 |     | 
                                                                                           }  | 
  |  383 |    14 |                                                                                            if (!sizeBuffer.hasRemaining()) { | 
  |  384 |    7 |                                                                                                    sizeBuffer.flip();  | 
  |  385 |    7 |                                                                                                    byte b1 = sizeBuffer.get();  | 
  |  386 |    7 |                                                                                                    byte b2 = sizeBuffer.get();  | 
  |  387 |    7 |                                                                                                    int size = ((b1 & 0xFF) << 8)  | 
  |  388 |     | 
                                                                                                                   | (b2 & 0xFF);  | 
  |  389 |    7 |                                                                                                    ByteBuffer currentBuffer = ByteBuffer  | 
  |  390 |     | 
                                                                                                                   .allocate(size);  | 
  |  391 |    7 |                                                                                                    currentReadBuffers.put(channel,  | 
  |  392 |     | 
                                                                                                                   currentBuffer);  | 
  |  393 |     | 
                                                                                           }  | 
  |  394 |     | 
                                                                                   }  | 
  |  395 |    14 |                                                                                    if (!sizeBuffer.hasRemaining()) { | 
  |  396 |    7 |                                                                                            ByteBuffer currentBuffer = currentReadBuffers  | 
  |  397 |     | 
                                                                                                           .get(channel);  | 
  |  398 |    7 |                                                                                            if (channel.read(currentBuffer) == -1) { | 
  |  399 |    0 |                                                                                                    destroyChannel(channel, null, true);  | 
  |  400 |    0 |                                                                                                    break;  | 
  |  401 |     | 
                                                                                           }  | 
  |  402 |    7 |                                                                                            if (!currentBuffer.hasRemaining()) { | 
  |  403 |    7 |                                                                                                    currentBuffer.flip();  | 
  |  404 |    7 |                                                                                                    currentReadBuffers  | 
  |  405 |     | 
                                                                                                                   .remove(currentBuffer);  | 
  |  406 |    7 |                                                                                                    if (bufferQueue != null) { | 
  |  407 |    7 |                                                                                                            bufferQueue  | 
  |  408 |     | 
                                                                                                                           .put(currentBuffer);  | 
  |  409 |     | 
                                                                                                   }  | 
  |  410 |    7 |                                                                                                    sizeBuffer.clear();  | 
  |  411 |    7 |                                                                                                    tryAgain = true;  | 
  |  412 |    7 |                                                                                                    decodeQueue.put(channel);  | 
  |  413 |     | 
                                                                                           }  | 
  |  414 |     | 
                                                                                   }  | 
  |  415 |    14 |                                                                            } while (tryAgain);  | 
  |  416 |    7 |                                                                    } else if (key.isWritable()) { | 
  |  417 |     | 
                                                                             | 
  |  418 |     | 
                                                                           ByteBuffer sizeBuffer;  | 
  |  419 |     | 
                                                                           LinkedBlockingQueue<ByteBuffer> bufferQueue;  | 
  |  420 |     | 
                                                                           try { | 
  |  421 |    6 |                                                                                    writeBufferLock.readLock().lock();  | 
  |  422 |    6 |                                                                                    sizeBuffer = writeSizeBuffers  | 
  |  423 |     | 
                                                                                                   .get(channel);  | 
  |  424 |    6 |                                                                                    bufferQueue = writeBuffers.get(channel);  | 
  |  425 |     | 
                                                                           } finally { | 
  |  426 |    6 |                                                                                    writeBufferLock.readLock().unlock();  | 
  |  427 |    6 |                                                                            }  | 
  |  428 |     | 
                                                                           boolean tryAgain;  | 
  |  429 |     | 
                                                                           do { | 
  |  430 |    12 |                                                                                    tryAgain = false;  | 
  |  431 |    12 |                                                                                    ByteBuffer writeBuffer = null;  | 
  |  432 |    12 |                                                                                    if (bufferQueue != null) { | 
  |  433 |    12 |                                                                                            writeBuffer = bufferQueue.peek();  | 
  |  434 |     | 
                                                                                   }  | 
  |  435 |    12 |                                                                                    if (writeBuffer == null) { | 
  |  436 |    6 |                                                                                            break;  | 
  |  437 |     | 
                                                                                   }  | 
  |  438 |    6 |                                                                                    if (sizeBuffer.position() == 0) { | 
  |  439 |    6 |                                                                                            sizeBuffer.put((byte) ((writeBuffer  | 
  |  440 |     | 
                                                                                                           .limit() >>> 8) & 0xFF));  | 
  |  441 |    6 |                                                                                            sizeBuffer.put((byte) ((writeBuffer  | 
  |  442 |     | 
                                                                                                           .limit() >>> 0) & 0xFF));  | 
  |  443 |    6 |                                                                                            sizeBuffer.flip();  | 
  |  444 |     | 
                                                                                   }  | 
  |  445 |    6 |                                                                                    if (sizeBuffer.hasRemaining()) { | 
  |  446 |    6 |                                                                                            channel.write(sizeBuffer);  | 
  |  447 |     | 
                                                                                   }  | 
  |  448 |    6 |                                                                                    if (!sizeBuffer.hasRemaining()) { | 
  |  449 |    6 |                                                                                            channel.write(writeBuffer);  | 
  |  450 |    6 |                                                                                            if (!writeBuffer.hasRemaining()) { | 
  |  451 |    6 |                                                                                                    bufferQueue.remove();  | 
  |  452 |    6 |                                                                                                    sizeBuffer.clear();  | 
  |  453 |    6 |                                                                                                    tryAgain = true;  | 
  |  454 |     | 
                                                                                           }  | 
  |  455 |     | 
                                                                                   }  | 
  |  456 |    6 |                                                                            } while (tryAgain);  | 
  |  457 |    6 |                                                                            if (bufferQueue != null  | 
  |  458 |     | 
                                                                                           && bufferQueue.isEmpty()) { | 
  |  459 |    6 |                                                                                    if ((key.interestOps() & SelectionKey.OP_WRITE) !=0) { | 
  |  460 |    3 |                                                                                            key.interestOps(SelectionKey.OP_READ);  | 
  |  461 |     | 
                                                                                   }  | 
  |  462 |     | 
                                                                           }  | 
  |  463 |     | 
                                                                   }  | 
  |  464 |    3 |                                                            } catch (Exception exception) { | 
  |  465 |    3 |                                                                    destroyChannel(channel, exception, true);  | 
  |  466 |    10 |                                                            }  | 
  |  467 |    13 |                                                    }  | 
  |  468 |     | 
                                           }  | 
  |  469 |    32 |                                    }  | 
  |  470 |    2 |                            } catch (Exception e) { | 
  |  471 |    2 |                                    e.printStackTrace();  | 
  |  472 |     | 
                           }  | 
  |  473 |    2 |                    }  | 
  |  474 |     | 
     | 
  |  475 |     | 
           }  | 
  |  476 |     | 
     | 
  |  477 |    12 |            private LinkedBlockingQueue<SocketChannel> decodeQueue = new LinkedBlockingQueue<SocketChannel>();  | 
  |  478 |     | 
     | 
  |  479 |     | 
           public void setDecoders(int decoders) { | 
  |  480 |    12 |                    this.decoders = decoders;  | 
  |  481 |    12 |            }  | 
  |  482 |     | 
     | 
  |  483 |    24 |            private class BufferDecoder implements Runnable { | 
  |  484 |    12 |                    private CharsetDecoder decoder = Charset.forName("UTF-8").newDecoder(); | 
  |  485 |     | 
     | 
  |  486 |     | 
                   public void run() { | 
  |  487 |     | 
                           while (true) { | 
  |  488 |     | 
                                   try { | 
  |  489 |    18 |                                            SocketChannel channel = decodeQueue.take();  | 
  |  490 |     | 
                                           LinkedBlockingQueue<ByteBuffer> queue;  | 
  |  491 |     | 
                                           try { | 
  |  492 |    7 |                                                    readBufferLock.readLock().lock();  | 
  |  493 |    7 |                                                    queue = readBuffers.get(channel);  | 
  |  494 |     | 
                                           } finally { | 
  |  495 |    7 |                                                    readBufferLock.readLock().unlock();  | 
  |  496 |    7 |                                            }  | 
  |  497 |    7 |                                            if (queue == null) { | 
  |  498 |    0 |                                                    continue;  | 
  |  499 |     | 
                                           }  | 
  |  500 |    7 |                                            ByteBuffer buffer = queue.poll();  | 
  |  501 |    7 |                                            if (buffer == null) { | 
  |  502 |    0 |                                                    continue;  | 
  |  503 |     | 
                                           }  | 
  |  504 |     | 
                                           InternalSession internalSession;  | 
  |  505 |     | 
                                           try { | 
  |  506 |    7 |                                                    sessionsLock.readLock().lock();  | 
  |  507 |    7 |                                                    internalSession = sessionsByChannel.get(channel);  | 
  |  508 |     | 
                                           } finally { | 
  |  509 |    7 |                                                    sessionsLock.readLock().unlock();  | 
  |  510 |    7 |                                            }  | 
  |  511 |    7 |                                            if (internalSession != null) { | 
  |  512 |    7 |                                                    String message = decoder.decode(buffer).toString();  | 
  |  513 |    7 |                                                    internalSession.getQueue().put(  | 
  |  514 |     | 
                                                                   new Frame(internalSession.getSessionId(),  | 
  |  515 |     | 
                                                                                   message));  | 
  |  516 |     | 
                                           }  | 
  |  517 |    12 |                                    } catch (InterruptedException e) { | 
  |  518 |    12 |                                            return;  | 
  |  519 |    0 |                                    } catch (Exception e) { | 
  |  520 |    0 |                                            e.printStackTrace();  | 
  |  521 |    6 |                                    }  | 
  |  522 |     | 
                           }  | 
  |  523 |     | 
                   }  | 
  |  524 |     | 
           }  | 
  |  525 |     | 
     | 
  |  526 |     | 
           private void destroyChannel(SocketChannel channel, Exception exception,  | 
  |  527 |     | 
                           boolean send) { | 
  |  528 |     | 
                   InternalSession session;  | 
  |  529 |     | 
                   try { | 
  |  530 |    3 |                            sessionsLock.writeLock().lock();  | 
  |  531 |    3 |                            session = sessionsByChannel.remove(channel);  | 
  |  532 |    3 |                            if (session != null) { | 
  |  533 |    1 |                                    sessionsById.remove(session.getSessionId());  | 
  |  534 |     | 
                           }  | 
  |  535 |     | 
                   } finally { | 
  |  536 |    3 |                            sessionsLock.writeLock().unlock();  | 
  |  537 |    3 |                    }  | 
  |  538 |     | 
                   try { | 
  |  539 |    3 |                            readBufferLock.writeLock().lock();  | 
  |  540 |    3 |                            writeBufferLock.writeLock().lock();  | 
  |  541 |    3 |                            readSizeBuffers.remove(channel);  | 
  |  542 |    3 |                            readBuffers.remove(channel);  | 
  |  543 |    3 |                            writeSizeBuffers.remove(channel);  | 
  |  544 |    3 |                            writeBuffers.remove(channel);  | 
  |  545 |     | 
                           try { | 
  |  546 |    3 |                                    channel.close();  | 
  |  547 |    0 |                            } catch (Exception e) { | 
  |  548 |    0 |                                    e.printStackTrace();  | 
  |  549 |    3 |                            }  | 
  |  550 |     | 
                   } finally { | 
  |  551 |    3 |                            writeBufferLock.writeLock().unlock();  | 
  |  552 |    3 |                            readBufferLock.writeLock().unlock();  | 
  |  553 |    3 |                    }  | 
  |  554 |    3 |                    if (session != null && send) { | 
  |  555 |    1 |                            SessionCommand command = new SessionCommand(session.getSessionId(),  | 
  |  556 |     | 
                                           SessionCommand.Command.CLOSE);  | 
  |  557 |    1 |                            command.setException(exception);  | 
  |  558 |    1 |                            outQueue.offer(command);  | 
  |  559 |     | 
                   }  | 
  |  560 |    3 |            }  | 
  |  561 |     | 
     | 
  |  562 |     | 
           private void initSessionChannel(InternalSession session,  | 
  |  563 |     | 
                           SocketChannel channel) { | 
  |  564 |     | 
                   try { | 
  |  565 |    13 |                            sessionsLock.writeLock().lock();  | 
  |  566 |    13 |                            sessionsByChannel.put(channel, session);  | 
  |  567 |    13 |                            sessionsById.put(session.getSessionId(), session);  | 
  |  568 |    13 |                            readBufferLock.writeLock().lock();  | 
  |  569 |    13 |                            writeBufferLock.writeLock().lock();  | 
  |  570 |    13 |                            readSizeBuffers.put(channel, ByteBuffer.allocate(2));  | 
  |  571 |    13 |                            readBuffers.put(channel, new LinkedBlockingQueue<ByteBuffer>());  | 
  |  572 |    13 |                            writeSizeBuffers.put(channel, ByteBuffer.allocate(2));  | 
  |  573 |    13 |                            writeBuffers.put(channel, new LinkedBlockingQueue<ByteBuffer>());  | 
  |  574 |     | 
                   } finally { | 
  |  575 |    13 |                            writeBufferLock.writeLock().unlock();  | 
  |  576 |    13 |                            readBufferLock.writeLock().unlock();  | 
  |  577 |    13 |                            sessionsLock.writeLock().unlock();  | 
  |  578 |    13 |                    }  | 
  |  579 |    13 |            }  | 
  |  580 |     | 
     | 
  |  581 |     | 
           public void destroy() throws IOException { | 
  |  582 |    12 |                    scannerThread.interrupt();  | 
  |  583 |    12 |                    transportThread.interrupt();  | 
  |  584 |    24 |                    for (Thread t : encoderThreads) { | 
  |  585 |    12 |                            t.interrupt();  | 
  |  586 |     | 
                   }  | 
  |  587 |    24 |                    for (Thread t : decoderThreads) { | 
  |  588 |    12 |                            t.interrupt();  | 
  |  589 |     | 
                   }  | 
  |  590 |    12 |                    selectionThread.interrupt();  | 
  |  591 |     | 
                   try { | 
  |  592 |    12 |                            sessionsLock.writeLock().lock();  | 
  |  593 |    12 |                            LinkedList<String> ids = new LinkedList<String>(sessionsById  | 
  |  594 |     | 
                                           .keySet());  | 
  |  595 |    12 |                            for (String sessionId : ids) { | 
  |  596 |    9 |                                    destroySession(sessionId, null, false);  | 
  |  597 |     | 
                           }  | 
  |  598 |     | 
                   } finally { | 
  |  599 |    12 |                            sessionsLock.writeLock().unlock();  | 
  |  600 |    12 |                    }  | 
  |  601 |    12 |                    selector.close();  | 
  |  602 |    12 |            }  | 
  |  603 |     | 
   }  |