Better management of shared resources between the background run thread
and the main start / stop thread.  Makes sure to cleanup all resources
before finally throwing on stop to prevent leaking and resources.
This commit is contained in:
Timothy Bish 2016-05-13 11:13:11 -04:00
parent d7b5a62bb0
commit ff99872263
1 changed files with 131 additions and 81 deletions

View File

@ -26,6 +26,7 @@ import java.net.SocketTimeoutException;
import java.net.URI; import java.net.URI;
import java.net.URISyntaxException; import java.net.URISyntaxException;
import java.net.UnknownHostException; import java.net.UnknownHostException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey; import java.nio.channels.SelectionKey;
import java.nio.channels.Selector; import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel; import java.nio.channels.ServerSocketChannel;
@ -50,8 +51,6 @@ import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportFactory; import org.apache.activemq.transport.TransportFactory;
import org.apache.activemq.transport.TransportServer; import org.apache.activemq.transport.TransportServer;
import org.apache.activemq.transport.TransportServerThreadSupport; import org.apache.activemq.transport.TransportServerThreadSupport;
import org.apache.activemq.transport.nio.SelectorManager;
import org.apache.activemq.transport.nio.SelectorSelection;
import org.apache.activemq.util.IOExceptionSupport; import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.util.InetAddressUtil; import org.apache.activemq.util.InetAddressUtil;
import org.apache.activemq.util.IntrospectionSupport; import org.apache.activemq.util.IntrospectionSupport;
@ -69,8 +68,9 @@ import org.slf4j.LoggerFactory;
public class TcpTransportServer extends TransportServerThreadSupport implements ServiceListener { public class TcpTransportServer extends TransportServerThreadSupport implements ServiceListener {
private static final Logger LOG = LoggerFactory.getLogger(TcpTransportServer.class); private static final Logger LOG = LoggerFactory.getLogger(TcpTransportServer.class);
protected ServerSocket serverSocket;
protected Selector selector; protected volatile ServerSocket serverSocket;
protected volatile Selector selector;
protected int backlog = 5000; protected int backlog = 5000;
protected WireFormatFactory wireFormatFactory = new OpenWireFormatFactory(); protected WireFormatFactory wireFormatFactory = new OpenWireFormatFactory();
protected final TcpTransportFactory transportFactory; protected final TcpTransportFactory transportFactory;
@ -113,14 +113,14 @@ public class TcpTransportServer extends TransportServerThreadSupport implements
*/ */
protected boolean startLogging = true; protected boolean startLogging = true;
protected final ServerSocketFactory serverSocketFactory; protected final ServerSocketFactory serverSocketFactory;
protected BlockingQueue<Socket> socketQueue = new LinkedBlockingQueue<Socket>(); protected final BlockingQueue<Socket> socketQueue = new LinkedBlockingQueue<Socket>();
protected Thread socketHandlerThread; protected Thread socketHandlerThread;
/** /**
* The maximum number of sockets allowed for this server * The maximum number of sockets allowed for this server
*/ */
protected int maximumConnections = Integer.MAX_VALUE; protected int maximumConnections = Integer.MAX_VALUE;
protected AtomicInteger currentTransportCount = new AtomicInteger(); protected final AtomicInteger currentTransportCount = new AtomicInteger();
public TcpTransportServer(TcpTransportFactory transportFactory, URI location, ServerSocketFactory serverSocketFactory) throws IOException, public TcpTransportServer(TcpTransportFactory transportFactory, URI location, ServerSocketFactory serverSocketFactory) throws IOException,
URISyntaxException { URISyntaxException {
@ -137,8 +137,8 @@ public class TcpTransportServer extends TransportServerThreadSupport implements
InetAddress addr = InetAddress.getByName(host); InetAddress addr = InetAddress.getByName(host);
try { try {
this.serverSocket = serverSocketFactory.createServerSocket(bind.getPort(), backlog, addr); serverSocket = serverSocketFactory.createServerSocket(bind.getPort(), backlog, addr);
configureServerSocket(this.serverSocket); configureServerSocket(serverSocket);
} catch (IOException e) { } catch (IOException e) {
throw IOExceptionSupport.create("Failed to bind to server socket: " + bind + " due to: " + e, e); throw IOExceptionSupport.create("Failed to bind to server socket: " + bind + " due to: " + e, e);
} }
@ -146,7 +146,6 @@ public class TcpTransportServer extends TransportServerThreadSupport implements
setConnectURI(new URI(bind.getScheme(), bind.getUserInfo(), resolveHostName(serverSocket, addr), serverSocket.getLocalPort(), bind.getPath(), setConnectURI(new URI(bind.getScheme(), bind.getUserInfo(), resolveHostName(serverSocket, addr), serverSocket.getLocalPort(), bind.getPath(),
bind.getQuery(), bind.getFragment())); bind.getQuery(), bind.getFragment()));
} catch (URISyntaxException e) { } catch (URISyntaxException e) {
// it could be that the host name contains invalid characters such // it could be that the host name contains invalid characters such
// as _ on unix platforms so lets try use the IP address instead // as _ on unix platforms so lets try use the IP address instead
try { try {
@ -302,87 +301,114 @@ public class TcpTransportServer extends TransportServerThreadSupport implements
*/ */
@Override @Override
public void run() { public void run() {
final ServerSocketChannel chan = serverSocket.getChannel(); if (!isStopped() && !isStopping()) {
if (chan != null) { final ServerSocket serverSocket = this.serverSocket;
if (serverSocket == null) {
onAcceptError(new IOException("Server started without a valid ServerSocket"));
}
final ServerSocketChannel channel = serverSocket.getChannel();
if (channel != null) {
doRunWithServerSocketChannel(channel);
} else {
doRunWithServerSocket(serverSocket);
}
}
}
private void doRunWithServerSocketChannel(final ServerSocketChannel channel) {
try {
channel.configureBlocking(false);
final Selector selector = Selector.open();
try { try {
chan.configureBlocking(false); channel.register(selector, SelectionKey.OP_ACCEPT);
selector = Selector.open(); } catch (ClosedChannelException ex) {
chan.register(selector, SelectionKey.OP_ACCEPT); try {
while (!isStopped()) { selector.close();
int count = selector.select(10); } catch (IOException ignore) {}
if (count == 0) { throw ex;
continue; }
}
Set<SelectionKey> keys = selector.selectedKeys(); // Update object instance for later cleanup.
this.selector = selector;
for (Iterator<SelectionKey> i = keys.iterator(); i.hasNext(); ) { while (!isStopped()) {
final SelectionKey key = i.next(); int count = selector.select(10);
if (key.isAcceptable()) {
try { if (count == 0) {
SocketChannel sc = chan.accept(); continue;
if (sc != null) { }
if (isStopped() || getAcceptListener() == null) {
sc.close(); Set<SelectionKey> keys = selector.selectedKeys();
for (Iterator<SelectionKey> i = keys.iterator(); i.hasNext(); ) {
final SelectionKey key = i.next();
if (key.isAcceptable()) {
try {
SocketChannel sc = channel.accept();
if (sc != null) {
if (isStopped() || getAcceptListener() == null) {
sc.close();
} else {
if (useQueueForAccept) {
socketQueue.put(sc.socket());
} else { } else {
if (useQueueForAccept) { handleSocket(sc.socket());
socketQueue.put(sc.socket());
} else {
handleSocket(sc.socket());
}
} }
} }
}
} catch (SocketTimeoutException ste) { } catch (SocketTimeoutException ste) {
// expect this to happen // expect this to happen
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); e.printStackTrace();
if (!isStopping()) { if (!isStopping()) {
onAcceptError(e); onAcceptError(e);
} else if (!isStopped()) { } else if (!isStopped()) {
LOG.warn("run()", e); LOG.warn("run()", e);
onAcceptError(e); onAcceptError(e);
}
} }
} }
i.remove();
} }
i.remove();
}
} catch (IOException ex) {
if (selector != null) {
try {
selector.close();
} catch (IOException ioe) {}
selector = null;
} }
} }
} else { } catch (IOException ex) {
while (!isStopped()) { if (!isStopping()) {
Socket socket = null; onAcceptError(ex);
try { } else if (!isStopped()) {
socket = serverSocket.accept(); LOG.warn("run()", ex);
if (socket != null) { onAcceptError(ex);
if (isStopped() || getAcceptListener() == null) { }
socket.close(); }
}
private void doRunWithServerSocket(final ServerSocket serverSocket) {
while (!isStopped()) {
Socket socket = null;
try {
socket = serverSocket.accept();
if (socket != null) {
if (isStopped() || getAcceptListener() == null) {
socket.close();
} else {
if (useQueueForAccept) {
socketQueue.put(socket);
} else { } else {
if (useQueueForAccept) { handleSocket(socket);
socketQueue.put(socket);
} else {
handleSocket(socket);
}
} }
} }
} catch (SocketTimeoutException ste) { }
// expect this to happen } catch (SocketTimeoutException ste) {
} catch (Exception e) { // expect this to happen
if (!isStopping()) { } catch (Exception e) {
onAcceptError(e); if (!isStopping()) {
} else if (!isStopped()) { onAcceptError(e);
LOG.warn("run()", e); } else if (!isStopped()) {
onAcceptError(e); LOG.warn("run()", e);
} onAcceptError(e);
} }
} }
} }
@ -472,19 +498,43 @@ public class TcpTransportServer extends TransportServerThreadSupport implements
@Override @Override
protected void doStop(ServiceStopper stopper) throws Exception { protected void doStop(ServiceStopper stopper) throws Exception {
if (selector != null) { Exception firstFailure = null;
selector.close();
selector = null; try {
final Selector selector = this.selector;
if (selector != null) {
this.selector = null;
selector.close();
}
} catch (Exception error) {
} }
if (serverSocket != null) {
serverSocket.close(); try {
serverSocket = null; final ServerSocket serverSocket = this.serverSocket;
if (serverSocket != null) {
this.serverSocket = null;
serverSocket.close();
}
} catch (Exception error) {
firstFailure = error;
} }
if (socketHandlerThread != null) { if (socketHandlerThread != null) {
socketHandlerThread.interrupt(); socketHandlerThread.interrupt();
socketHandlerThread = null; socketHandlerThread = null;
} }
super.doStop(stopper);
try {
super.doStop(stopper);
} catch (Exception error) {
if (firstFailure != null) {
firstFailure = error;
}
}
if (firstFailure != null) {
throw firstFailure;
}
} }
@Override @Override