diff --git a/activemq-client/src/main/java/org/apache/activemq/transport/nio/SelectorManager.java b/activemq-client/src/main/java/org/apache/activemq/transport/nio/SelectorManager.java index 9be523120b..7442b15848 100644 --- a/activemq-client/src/main/java/org/apache/activemq/transport/nio/SelectorManager.java +++ b/activemq-client/src/main/java/org/apache/activemq/transport/nio/SelectorManager.java @@ -17,7 +17,7 @@ package org.apache.activemq.transport.nio; import java.io.IOException; -import java.nio.channels.SocketChannel; +import java.nio.channels.spi.AbstractSelectableChannel; import java.util.LinkedList; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; @@ -43,17 +43,18 @@ public final class SelectorManager { private int maxChannelsPerWorker = 1024; protected ExecutorService createDefaultExecutor() { - ThreadPoolExecutor rc = new ThreadPoolExecutor(0, Integer.MAX_VALUE, getDefaultKeepAliveTime(), TimeUnit.SECONDS, new SynchronousQueue(), new ThreadFactory() { + ThreadPoolExecutor rc = new ThreadPoolExecutor(0, Integer.MAX_VALUE, getDefaultKeepAliveTime(), TimeUnit.SECONDS, new SynchronousQueue(), + new ThreadFactory() { - private long i = 0; + private long i = 0; - @Override - public Thread newThread(Runnable runnable) { - this.i++; - final Thread t = new Thread(runnable, "ActiveMQ NIO Worker " + this.i); - return t; - } - }); + @Override + public Thread newThread(Runnable runnable) { + this.i++; + final Thread t = new Thread(runnable, "ActiveMQ NIO Worker " + this.i); + return t; + } + }); return rc; } @@ -68,27 +69,26 @@ public final class SelectorManager { public interface Listener { void onSelect(SelectorSelection selector); + void onError(SelectorSelection selection, Throwable error); } - public synchronized SelectorSelection register(SocketChannel socketChannel, Listener listener) - throws IOException { - + public synchronized SelectorSelection register(AbstractSelectableChannel selectableChannel, Listener listener) throws IOException { SelectorSelection selection = null; - while( selection == null ) { + while (selection == null) { if (freeWorkers.size() > 0) { SelectorWorker worker = freeWorkers.getFirst(); - if( worker.isReleased() ) { + if (worker.isReleased()) { freeWorkers.remove(worker); } else { worker.retain(); - selection = new SelectorSelection(worker, socketChannel, listener); + selection = new SelectorSelection(worker, selectableChannel, listener); } } else { // Worker starts /w retain count of 1 SelectorWorker worker = new SelectorWorker(this); freeWorkers.addFirst(worker); - selection = new SelectorSelection(worker, socketChannel, listener); + selection = new SelectorSelection(worker, selectableChannel, listener); } } diff --git a/activemq-client/src/main/java/org/apache/activemq/transport/nio/SelectorSelection.java b/activemq-client/src/main/java/org/apache/activemq/transport/nio/SelectorSelection.java index e96d50d78d..01480a03ce 100644 --- a/activemq-client/src/main/java/org/apache/activemq/transport/nio/SelectorSelection.java +++ b/activemq-client/src/main/java/org/apache/activemq/transport/nio/SelectorSelection.java @@ -19,13 +19,13 @@ package org.apache.activemq.transport.nio; import java.nio.channels.CancelledKeyException; import java.nio.channels.ClosedChannelException; import java.nio.channels.SelectionKey; -import java.nio.channels.SocketChannel; +import java.nio.channels.spi.AbstractSelectableChannel; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.activemq.transport.nio.SelectorManager.Listener; /** - * @author chirino + * */ public final class SelectorSelection { @@ -33,15 +33,16 @@ public final class SelectorSelection { private final Listener listener; private int interest; private SelectionKey key; - private AtomicBoolean closed = new AtomicBoolean(); + private final AtomicBoolean closed = new AtomicBoolean(); - public SelectorSelection(final SelectorWorker worker, final SocketChannel socketChannel, Listener listener) throws ClosedChannelException { + public SelectorSelection(final SelectorWorker worker, final AbstractSelectableChannel selectable, Listener listener) throws ClosedChannelException { this.worker = worker; this.listener = listener; worker.addIoTask(new Runnable() { + @Override public void run() { try { - SelectorSelection.this.key = socketChannel.register(worker.selector, 0, SelectorSelection.this); + SelectorSelection.this.key = selectable.register(worker.selector, 0, SelectorSelection.this); } catch (Exception e) { e.printStackTrace(); } @@ -55,30 +56,32 @@ public final class SelectorSelection { public void enable() { worker.addIoTask(new Runnable() { + @Override public void run() { try { key.interestOps(interest); } catch (CancelledKeyException e) { } } - }); + }); } public void disable() { worker.addIoTask(new Runnable() { + @Override public void run() { try { key.interestOps(0); } catch (CancelledKeyException e) { } } - }); + }); } public void close() { - // guard against multiple closes. - if( closed.compareAndSet(false, true) ) { + if (closed.compareAndSet(false, true)) { worker.addIoTask(new Runnable() { + @Override public void run() { try { key.cancel(); @@ -86,7 +89,7 @@ public final class SelectorSelection { } worker.release(); } - }); + }); } } @@ -97,5 +100,4 @@ public final class SelectorSelection { public void onError(Throwable e) { listener.onError(this, e); } - } diff --git a/activemq-client/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java b/activemq-client/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java index 2c97e25867..b44a462d67 100755 --- a/activemq-client/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java +++ b/activemq-client/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java @@ -26,6 +26,9 @@ import java.net.SocketTimeoutException; import java.net.URI; import java.net.URISyntaxException; import java.net.UnknownHostException; +import java.nio.channels.SelectionKey; +import java.nio.channels.ServerSocketChannel; +import java.nio.channels.SocketChannel; import java.util.HashMap; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; @@ -43,6 +46,8 @@ import org.apache.activemq.openwire.OpenWireFormatFactory; import org.apache.activemq.transport.Transport; import org.apache.activemq.transport.TransportServer; import org.apache.activemq.transport.TransportServerThreadSupport; +import org.apache.activemq.transport.nio.SelectorManager; +import org.apache.activemq.transport.nio.SelectorSelection; import org.apache.activemq.util.IOExceptionSupport; import org.apache.activemq.util.InetAddressUtil; import org.apache.activemq.util.IntrospectionSupport; @@ -56,15 +61,12 @@ import org.slf4j.LoggerFactory; /** * A TCP based implementation of {@link TransportServer} - * - * @author David Martin Clavo david(dot)martin(dot)clavo(at)gmail.com (logging improvement modifications) - * */ - public class TcpTransportServer extends TransportServerThreadSupport implements ServiceListener { private static final Logger LOG = LoggerFactory.getLogger(TcpTransportServer.class); protected ServerSocket serverSocket; + protected SelectorSelection selector; protected int backlog = 5000; protected WireFormatFactory wireFormatFactory = new OpenWireFormatFactory(); protected final TcpTransportFactory transportFactory; @@ -74,7 +76,6 @@ public class TcpTransportServer extends TransportServerThreadSupport implements protected boolean useQueueForAccept = true; protected boolean allowLinkStealing; - /** * trace=true -> the Transport stack where this TcpTransport object will be, will have a TransportLogger layer * trace=false -> the Transport stack where this TcpTransport object will be, will NOT have a TransportLogger layer, @@ -93,11 +94,13 @@ public class TcpTransportServer extends TransportServerThreadSupport implements * set in Connection or TransportConnector URIs. */ protected String logWriterName = TransportLoggerSupport.defaultLogWriterName; + /** * Specifies if the TransportLogger will be manageable by JMX or not. Also, as long as there is at least 1 * TransportLogger which is manageable, a TransportLoggerControl MBean will me created. */ protected boolean dynamicManagement = false; + /** * startLogging=true -> the TransportLogger object of the Transport stack will initially write messages to the log. * startLogging=false -> the TransportLogger object of the Transport stack will initially NOT write messages to the @@ -108,6 +111,7 @@ public class TcpTransportServer extends TransportServerThreadSupport implements protected final ServerSocketFactory serverSocketFactory; protected BlockingQueue socketQueue = new LinkedBlockingQueue(); protected Thread socketHandlerThread; + /** * The maximum number of sockets allowed for this server */ @@ -140,8 +144,7 @@ public class TcpTransportServer extends TransportServerThreadSupport implements } catch (URISyntaxException e) { // it could be that the host name contains invalid characters such - // as _ on unix platforms - // so lets try use the IP address instead + // as _ on unix platforms so lets try use the IP address instead try { setConnectURI(new URI(bind.getScheme(), bind.getUserInfo(), addr.getHostAddress(), serverSocket.getLocalPort(), bind.getPath(), bind.getQuery(), bind.getFragment())); @@ -295,29 +298,76 @@ public class TcpTransportServer extends TransportServerThreadSupport implements */ @Override public void run() { - while (!isStopped()) { - Socket socket = null; + final ServerSocketChannel chan = serverSocket.getChannel(); + if (chan != null) { try { - socket = serverSocket.accept(); - if (socket != null) { - if (isStopped() || getAcceptListener() == null) { - socket.close(); - } else { - if (useQueueForAccept) { - socketQueue.put(socket); - } else { - handleSocket(socket); + chan.configureBlocking(false); + selector = SelectorManager.getInstance().register(chan, new SelectorManager.Listener() { + @Override + public void onSelect(SelectorSelection sel) { + try { + SocketChannel sc = chan.accept(); + if (sc != null) { + if (isStopped() || getAcceptListener() == null) { + sc.close(); + } else { + if (useQueueForAccept) { + socketQueue.put(sc.socket()); + } else { + handleSocket(sc.socket()); + } + } + } + } catch (Exception e) { + onError(sel, e); } } - } - } catch (SocketTimeoutException ste) { - // expect this to happen - } catch (Exception e) { - if (!isStopping()) { - onAcceptError(e); - } else if (!isStopped()) { - LOG.warn("run()", e); - onAcceptError(e); + @Override + public void onError(SelectorSelection sel, Throwable error) { + Exception e = null; + if (error instanceof Exception) { + e = (Exception)error; + } else { + e = new Exception(error); + } + if (!isStopping()) { + onAcceptError(e); + } else if (!isStopped()) { + LOG.warn("run()", e); + onAcceptError(e); + } + } + }); + selector.setInterestOps(SelectionKey.OP_ACCEPT); + selector.enable(); + } catch (IOException ex) { + selector = null; + } + } else { + while (!isStopped()) { + Socket socket = null; + try { + socket = serverSocket.accept(); + if (socket != null) { + if (isStopped() || getAcceptListener() == null) { + socket.close(); + } else { + if (useQueueForAccept) { + socketQueue.put(socket); + } else { + handleSocket(socket); + } + } + } + } catch (SocketTimeoutException ste) { + // expect this to happen + } catch (Exception e) { + if (!isStopping()) { + onAcceptError(e); + } else if (!isStopped()) { + LOG.warn("run()", e); + onAcceptError(e); + } } } } @@ -405,6 +455,11 @@ public class TcpTransportServer extends TransportServerThreadSupport implements @Override protected void doStop(ServiceStopper stopper) throws Exception { + if (selector != null) { + selector.disable(); + selector.close(); + selector = null; + } if (serverSocket != null) { serverSocket.close(); serverSocket = null;