Use a selector based accept when the transport socket has a selectable
channel which is the case for all the NIO transport.  Can reduce the
time it takes to close down the transport speeding up tests.
This commit is contained in:
Timothy Bish 2014-07-10 16:32:44 -04:00
parent a498bffdd1
commit e957937f08
3 changed files with 112 additions and 55 deletions

View File

@ -17,7 +17,7 @@
package org.apache.activemq.transport.nio; package org.apache.activemq.transport.nio;
import java.io.IOException; import java.io.IOException;
import java.nio.channels.SocketChannel; import java.nio.channels.spi.AbstractSelectableChannel;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
@ -43,17 +43,18 @@ public final class SelectorManager {
private int maxChannelsPerWorker = 1024; private int maxChannelsPerWorker = 1024;
protected ExecutorService createDefaultExecutor() { protected ExecutorService createDefaultExecutor() {
ThreadPoolExecutor rc = new ThreadPoolExecutor(0, Integer.MAX_VALUE, getDefaultKeepAliveTime(), TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new ThreadFactory() { ThreadPoolExecutor rc = new ThreadPoolExecutor(0, Integer.MAX_VALUE, getDefaultKeepAliveTime(), TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
new ThreadFactory() {
private long i = 0; private long i = 0;
@Override @Override
public Thread newThread(Runnable runnable) { public Thread newThread(Runnable runnable) {
this.i++; this.i++;
final Thread t = new Thread(runnable, "ActiveMQ NIO Worker " + this.i); final Thread t = new Thread(runnable, "ActiveMQ NIO Worker " + this.i);
return t; return t;
} }
}); });
return rc; return rc;
} }
@ -68,27 +69,26 @@ public final class SelectorManager {
public interface Listener { public interface Listener {
void onSelect(SelectorSelection selector); void onSelect(SelectorSelection selector);
void onError(SelectorSelection selection, Throwable error); void onError(SelectorSelection selection, Throwable error);
} }
public synchronized SelectorSelection register(SocketChannel socketChannel, Listener listener) public synchronized SelectorSelection register(AbstractSelectableChannel selectableChannel, Listener listener) throws IOException {
throws IOException {
SelectorSelection selection = null; SelectorSelection selection = null;
while( selection == null ) { while (selection == null) {
if (freeWorkers.size() > 0) { if (freeWorkers.size() > 0) {
SelectorWorker worker = freeWorkers.getFirst(); SelectorWorker worker = freeWorkers.getFirst();
if( worker.isReleased() ) { if (worker.isReleased()) {
freeWorkers.remove(worker); freeWorkers.remove(worker);
} else { } else {
worker.retain(); worker.retain();
selection = new SelectorSelection(worker, socketChannel, listener); selection = new SelectorSelection(worker, selectableChannel, listener);
} }
} else { } else {
// Worker starts /w retain count of 1 // Worker starts /w retain count of 1
SelectorWorker worker = new SelectorWorker(this); SelectorWorker worker = new SelectorWorker(this);
freeWorkers.addFirst(worker); freeWorkers.addFirst(worker);
selection = new SelectorSelection(worker, socketChannel, listener); selection = new SelectorSelection(worker, selectableChannel, listener);
} }
} }

View File

@ -19,13 +19,13 @@ package org.apache.activemq.transport.nio;
import java.nio.channels.CancelledKeyException; import java.nio.channels.CancelledKeyException;
import java.nio.channels.ClosedChannelException; import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey; import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel; import java.nio.channels.spi.AbstractSelectableChannel;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.transport.nio.SelectorManager.Listener; import org.apache.activemq.transport.nio.SelectorManager.Listener;
/** /**
* @author chirino *
*/ */
public final class SelectorSelection { public final class SelectorSelection {
@ -33,15 +33,16 @@ public final class SelectorSelection {
private final Listener listener; private final Listener listener;
private int interest; private int interest;
private SelectionKey key; private SelectionKey key;
private AtomicBoolean closed = new AtomicBoolean(); private final AtomicBoolean closed = new AtomicBoolean();
public SelectorSelection(final SelectorWorker worker, final SocketChannel socketChannel, Listener listener) throws ClosedChannelException { public SelectorSelection(final SelectorWorker worker, final AbstractSelectableChannel selectable, Listener listener) throws ClosedChannelException {
this.worker = worker; this.worker = worker;
this.listener = listener; this.listener = listener;
worker.addIoTask(new Runnable() { worker.addIoTask(new Runnable() {
@Override
public void run() { public void run() {
try { try {
SelectorSelection.this.key = socketChannel.register(worker.selector, 0, SelectorSelection.this); SelectorSelection.this.key = selectable.register(worker.selector, 0, SelectorSelection.this);
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); e.printStackTrace();
} }
@ -55,6 +56,7 @@ public final class SelectorSelection {
public void enable() { public void enable() {
worker.addIoTask(new Runnable() { worker.addIoTask(new Runnable() {
@Override
public void run() { public void run() {
try { try {
key.interestOps(interest); key.interestOps(interest);
@ -66,6 +68,7 @@ public final class SelectorSelection {
public void disable() { public void disable() {
worker.addIoTask(new Runnable() { worker.addIoTask(new Runnable() {
@Override
public void run() { public void run() {
try { try {
key.interestOps(0); key.interestOps(0);
@ -76,9 +79,9 @@ public final class SelectorSelection {
} }
public void close() { public void close() {
// guard against multiple closes. if (closed.compareAndSet(false, true)) {
if( closed.compareAndSet(false, true) ) {
worker.addIoTask(new Runnable() { worker.addIoTask(new Runnable() {
@Override
public void run() { public void run() {
try { try {
key.cancel(); key.cancel();
@ -97,5 +100,4 @@ public final class SelectorSelection {
public void onError(Throwable e) { public void onError(Throwable e) {
listener.onError(this, e); listener.onError(this, e);
} }
} }

View File

@ -26,6 +26,9 @@ import java.net.SocketTimeoutException;
import java.net.URI; import java.net.URI;
import java.net.URISyntaxException; import java.net.URISyntaxException;
import java.net.UnknownHostException; import java.net.UnknownHostException;
import java.nio.channels.SelectionKey;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.HashMap; import java.util.HashMap;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
@ -43,6 +46,8 @@ import org.apache.activemq.openwire.OpenWireFormatFactory;
import org.apache.activemq.transport.Transport; import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportServer; import org.apache.activemq.transport.TransportServer;
import org.apache.activemq.transport.TransportServerThreadSupport; import org.apache.activemq.transport.TransportServerThreadSupport;
import org.apache.activemq.transport.nio.SelectorManager;
import org.apache.activemq.transport.nio.SelectorSelection;
import org.apache.activemq.util.IOExceptionSupport; import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.util.InetAddressUtil; import org.apache.activemq.util.InetAddressUtil;
import org.apache.activemq.util.IntrospectionSupport; import org.apache.activemq.util.IntrospectionSupport;
@ -56,15 +61,12 @@ import org.slf4j.LoggerFactory;
/** /**
* A TCP based implementation of {@link TransportServer} * A TCP based implementation of {@link TransportServer}
*
* @author David Martin Clavo david(dot)martin(dot)clavo(at)gmail.com (logging improvement modifications)
*
*/ */
public class TcpTransportServer extends TransportServerThreadSupport implements ServiceListener { public class TcpTransportServer extends TransportServerThreadSupport implements ServiceListener {
private static final Logger LOG = LoggerFactory.getLogger(TcpTransportServer.class); private static final Logger LOG = LoggerFactory.getLogger(TcpTransportServer.class);
protected ServerSocket serverSocket; protected ServerSocket serverSocket;
protected SelectorSelection selector;
protected int backlog = 5000; protected int backlog = 5000;
protected WireFormatFactory wireFormatFactory = new OpenWireFormatFactory(); protected WireFormatFactory wireFormatFactory = new OpenWireFormatFactory();
protected final TcpTransportFactory transportFactory; protected final TcpTransportFactory transportFactory;
@ -74,7 +76,6 @@ public class TcpTransportServer extends TransportServerThreadSupport implements
protected boolean useQueueForAccept = true; protected boolean useQueueForAccept = true;
protected boolean allowLinkStealing; protected boolean allowLinkStealing;
/** /**
* trace=true -> the Transport stack where this TcpTransport object will be, will have a TransportLogger layer * trace=true -> the Transport stack where this TcpTransport object will be, will have a TransportLogger layer
* trace=false -> the Transport stack where this TcpTransport object will be, will NOT have a TransportLogger layer, * trace=false -> the Transport stack where this TcpTransport object will be, will NOT have a TransportLogger layer,
@ -93,11 +94,13 @@ public class TcpTransportServer extends TransportServerThreadSupport implements
* set in Connection or TransportConnector URIs. * set in Connection or TransportConnector URIs.
*/ */
protected String logWriterName = TransportLoggerSupport.defaultLogWriterName; protected String logWriterName = TransportLoggerSupport.defaultLogWriterName;
/** /**
* Specifies if the TransportLogger will be manageable by JMX or not. Also, as long as there is at least 1 * Specifies if the TransportLogger will be manageable by JMX or not. Also, as long as there is at least 1
* TransportLogger which is manageable, a TransportLoggerControl MBean will me created. * TransportLogger which is manageable, a TransportLoggerControl MBean will me created.
*/ */
protected boolean dynamicManagement = false; protected boolean dynamicManagement = false;
/** /**
* startLogging=true -> the TransportLogger object of the Transport stack will initially write messages to the log. * startLogging=true -> the TransportLogger object of the Transport stack will initially write messages to the log.
* startLogging=false -> the TransportLogger object of the Transport stack will initially NOT write messages to the * startLogging=false -> the TransportLogger object of the Transport stack will initially NOT write messages to the
@ -108,6 +111,7 @@ public class TcpTransportServer extends TransportServerThreadSupport implements
protected final ServerSocketFactory serverSocketFactory; protected final ServerSocketFactory serverSocketFactory;
protected BlockingQueue<Socket> socketQueue = new LinkedBlockingQueue<Socket>(); protected BlockingQueue<Socket> socketQueue = new LinkedBlockingQueue<Socket>();
protected Thread socketHandlerThread; protected Thread socketHandlerThread;
/** /**
* The maximum number of sockets allowed for this server * The maximum number of sockets allowed for this server
*/ */
@ -140,8 +144,7 @@ public class TcpTransportServer extends TransportServerThreadSupport implements
} catch (URISyntaxException e) { } catch (URISyntaxException e) {
// it could be that the host name contains invalid characters such // it could be that the host name contains invalid characters such
// as _ on unix platforms // as _ on unix platforms so lets try use the IP address instead
// so lets try use the IP address instead
try { try {
setConnectURI(new URI(bind.getScheme(), bind.getUserInfo(), addr.getHostAddress(), serverSocket.getLocalPort(), bind.getPath(), setConnectURI(new URI(bind.getScheme(), bind.getUserInfo(), addr.getHostAddress(), serverSocket.getLocalPort(), bind.getPath(),
bind.getQuery(), bind.getFragment())); bind.getQuery(), bind.getFragment()));
@ -295,29 +298,76 @@ public class TcpTransportServer extends TransportServerThreadSupport implements
*/ */
@Override @Override
public void run() { public void run() {
while (!isStopped()) { final ServerSocketChannel chan = serverSocket.getChannel();
Socket socket = null; if (chan != null) {
try { try {
socket = serverSocket.accept(); chan.configureBlocking(false);
if (socket != null) { selector = SelectorManager.getInstance().register(chan, new SelectorManager.Listener() {
if (isStopped() || getAcceptListener() == null) { @Override
socket.close(); public void onSelect(SelectorSelection sel) {
} else { try {
if (useQueueForAccept) { SocketChannel sc = chan.accept();
socketQueue.put(socket); if (sc != null) {
} else { if (isStopped() || getAcceptListener() == null) {
handleSocket(socket); sc.close();
} else {
if (useQueueForAccept) {
socketQueue.put(sc.socket());
} else {
handleSocket(sc.socket());
}
}
}
} catch (Exception e) {
onError(sel, e);
} }
} }
} @Override
} catch (SocketTimeoutException ste) { public void onError(SelectorSelection sel, Throwable error) {
// expect this to happen Exception e = null;
} catch (Exception e) { if (error instanceof Exception) {
if (!isStopping()) { e = (Exception)error;
onAcceptError(e); } else {
} else if (!isStopped()) { e = new Exception(error);
LOG.warn("run()", e); }
onAcceptError(e); if (!isStopping()) {
onAcceptError(e);
} else if (!isStopped()) {
LOG.warn("run()", e);
onAcceptError(e);
}
}
});
selector.setInterestOps(SelectionKey.OP_ACCEPT);
selector.enable();
} catch (IOException ex) {
selector = null;
}
} else {
while (!isStopped()) {
Socket socket = null;
try {
socket = serverSocket.accept();
if (socket != null) {
if (isStopped() || getAcceptListener() == null) {
socket.close();
} else {
if (useQueueForAccept) {
socketQueue.put(socket);
} else {
handleSocket(socket);
}
}
}
} catch (SocketTimeoutException ste) {
// expect this to happen
} catch (Exception e) {
if (!isStopping()) {
onAcceptError(e);
} else if (!isStopped()) {
LOG.warn("run()", e);
onAcceptError(e);
}
} }
} }
} }
@ -405,6 +455,11 @@ public class TcpTransportServer extends TransportServerThreadSupport implements
@Override @Override
protected void doStop(ServiceStopper stopper) throws Exception { protected void doStop(ServiceStopper stopper) throws Exception {
if (selector != null) {
selector.disable();
selector.close();
selector = null;
}
if (serverSocket != null) { if (serverSocket != null) {
serverSocket.close(); serverSocket.close();
serverSocket = null; serverSocket = null;