mirror of https://github.com/apache/activemq.git
Fixing the auto+nio+ssl transport so that the protocol detection task will properly terminate on timeout and not continue to run. Also lowered the default detection timeout to 15 seconds instead of 30 seconds to match the InactivityMonitor default.
This commit is contained in:
parent
29b4db5c34
commit
27238b2dd7
|
@ -27,8 +27,6 @@ import java.util.Map;
|
|||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
|
@ -77,8 +75,10 @@ public class AutoTcpTransportServer extends TcpTransportServer {
|
|||
|
||||
protected BrokerService brokerService;
|
||||
|
||||
protected final ThreadPoolExecutor newConnectionExecutor;
|
||||
protected final ThreadPoolExecutor protocolDetectionExecutor;
|
||||
protected int maxConnectionThreadPoolSize = Integer.MAX_VALUE;
|
||||
protected int protocolDetectionTimeOut = 30000;
|
||||
protected int protocolDetectionTimeOut = 15000;
|
||||
|
||||
private static final FactoryFinder TRANSPORT_FACTORY_FINDER = new FactoryFinder("META-INF/services/org/apache/activemq/transport/");
|
||||
private final ConcurrentMap<String, TransportFactory> transportFactories = new ConcurrentHashMap<String, TransportFactory>();
|
||||
|
@ -157,12 +157,21 @@ public class AutoTcpTransportServer extends TcpTransportServer {
|
|||
|
||||
//Use an executor service here to handle new connections. Setting the max number
|
||||
//of threads to the maximum number of connections the thread count isn't unbounded
|
||||
service = new ThreadPoolExecutor(maxConnectionThreadPoolSize,
|
||||
newConnectionExecutor = new ThreadPoolExecutor(maxConnectionThreadPoolSize,
|
||||
maxConnectionThreadPoolSize,
|
||||
30L, TimeUnit.SECONDS,
|
||||
new LinkedBlockingQueue<Runnable>());
|
||||
//allow the thread pool to shrink if the max number of threads isn't needed
|
||||
service.allowCoreThreadTimeOut(true);
|
||||
//and the pool can grow and shrink as needed if contention is high
|
||||
newConnectionExecutor.allowCoreThreadTimeOut(true);
|
||||
|
||||
//Executor for waiting for bytes to detection of protocol
|
||||
protocolDetectionExecutor = new ThreadPoolExecutor(maxConnectionThreadPoolSize,
|
||||
maxConnectionThreadPoolSize,
|
||||
30L, TimeUnit.SECONDS,
|
||||
new LinkedBlockingQueue<Runnable>());
|
||||
//allow the thread pool to shrink if the max number of threads isn't needed
|
||||
protocolDetectionExecutor.allowCoreThreadTimeOut(true);
|
||||
|
||||
this.brokerService = brokerService;
|
||||
this.enabledProtocols = enabledProtocols;
|
||||
|
@ -173,10 +182,32 @@ public class AutoTcpTransportServer extends TcpTransportServer {
|
|||
return maxConnectionThreadPoolSize;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the number of threads to be used for processing connections. Defaults
|
||||
* to Integer.MAX_SIZE. Set this value to be lower to reduce the
|
||||
* number of simultaneous connection attempts. If not set then the maximum number of
|
||||
* threads will generally be controlled by the transport maxConnections setting:
|
||||
* {@link TcpTransportServer#setMaximumConnections(int)}.
|
||||
*<p>
|
||||
* Note that this setter controls two thread pools because connection attempts
|
||||
* require 1 thread to start processing the connection and another thread to read from the
|
||||
* socket and to detect the protocol. Two threads are needed because some transports
|
||||
* block on socket read so the first thread needs to be able to abort the second thread on timeout.
|
||||
* Therefore this setting will set each thread pool to the size passed in essentially giving
|
||||
* 2 times as many potential threads as the value set.
|
||||
*<p>
|
||||
* Both thread pools will close idle threads after a period of time
|
||||
* essentially allowing the thread pools to grow and shrink dynamically based on load.
|
||||
*
|
||||
* @see {@link TcpTransportServer#setMaximumConnections(int)}.
|
||||
* @param maxConnectionThreadPoolSize
|
||||
*/
|
||||
public void setMaxConnectionThreadPoolSize(int maxConnectionThreadPoolSize) {
|
||||
this.maxConnectionThreadPoolSize = maxConnectionThreadPoolSize;
|
||||
service.setCorePoolSize(maxConnectionThreadPoolSize);
|
||||
service.setMaximumPoolSize(maxConnectionThreadPoolSize);
|
||||
newConnectionExecutor.setCorePoolSize(maxConnectionThreadPoolSize);
|
||||
newConnectionExecutor.setMaximumPoolSize(maxConnectionThreadPoolSize);
|
||||
protocolDetectionExecutor.setCorePoolSize(maxConnectionThreadPoolSize);
|
||||
protocolDetectionExecutor.setMaximumPoolSize(maxConnectionThreadPoolSize);
|
||||
}
|
||||
|
||||
public void setProtocolDetectionTimeOut(int protocolDetectionTimeOut) {
|
||||
|
@ -219,16 +250,13 @@ public class AutoTcpTransportServer extends TcpTransportServer {
|
|||
return enabledProtocols == null || enabledProtocols.isEmpty();
|
||||
}
|
||||
|
||||
|
||||
protected final ThreadPoolExecutor service;
|
||||
|
||||
@Override
|
||||
protected void handleSocket(final Socket socket) {
|
||||
final AutoTcpTransportServer server = this;
|
||||
//This needs to be done in a new thread because
|
||||
//the socket might be waiting on the client to send bytes
|
||||
//doHandleSocket can't complete until the protocol can be detected
|
||||
service.submit(new Runnable() {
|
||||
newConnectionExecutor.submit(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
server.doHandleSocket(socket);
|
||||
|
@ -239,30 +267,37 @@ public class AutoTcpTransportServer extends TcpTransportServer {
|
|||
@Override
|
||||
protected TransportInfo configureTransport(final TcpTransportServer server, final Socket socket) throws Exception {
|
||||
final InputStream is = socket.getInputStream();
|
||||
ExecutorService executor = Executors.newSingleThreadExecutor();
|
||||
|
||||
final AtomicInteger readBytes = new AtomicInteger(0);
|
||||
final ByteBuffer data = ByteBuffer.allocate(8);
|
||||
|
||||
// We need to peak at the first 8 bytes of the buffer to detect the protocol
|
||||
Future<?> future = executor.submit(new Runnable() {
|
||||
Future<?> future = protocolDetectionExecutor.submit(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
do {
|
||||
//will block until enough bytes or read or a timeout
|
||||
//and the socket is closed
|
||||
int read = is.read();
|
||||
if (read == -1) {
|
||||
throw new IOException("Connection failed, stream is closed.");
|
||||
}
|
||||
data.put((byte) read);
|
||||
readBytes.incrementAndGet();
|
||||
} while (readBytes.get() < 8);
|
||||
} while (readBytes.get() < 8 && !Thread.interrupted());
|
||||
} catch (Exception e) {
|
||||
throw new IllegalStateException(e);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
waitForProtocolDetectionFinish(future, readBytes);
|
||||
try {
|
||||
//If this fails and throws an exception and the socket will be closed
|
||||
waitForProtocolDetectionFinish(future, readBytes);
|
||||
} finally {
|
||||
//call cancel in case task didn't complete
|
||||
future.cancel(true);
|
||||
}
|
||||
data.flip();
|
||||
ProtocolInfo protocolInfo = detectProtocol(data.array());
|
||||
|
||||
|
@ -320,8 +355,23 @@ public class AutoTcpTransportServer extends TcpTransportServer {
|
|||
}
|
||||
@Override
|
||||
protected void doStop(ServiceStopper stopper) throws Exception {
|
||||
if (service != null) {
|
||||
service.shutdown();
|
||||
if (newConnectionExecutor != null) {
|
||||
newConnectionExecutor.shutdownNow();
|
||||
try {
|
||||
if (!newConnectionExecutor.awaitTermination(3, TimeUnit.SECONDS)) {
|
||||
LOG.warn("Auto Transport newConnectionExecutor didn't shutdown cleanly");
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
}
|
||||
}
|
||||
if (protocolDetectionExecutor != null) {
|
||||
protocolDetectionExecutor.shutdownNow();
|
||||
try {
|
||||
if (!protocolDetectionExecutor.awaitTermination(3, TimeUnit.SECONDS)) {
|
||||
LOG.warn("Auto Transport protocolDetectionExecutor didn't shutdown cleanly");
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
}
|
||||
}
|
||||
super.doStop(stopper);
|
||||
}
|
||||
|
|
|
@ -7,8 +7,6 @@ import java.net.URISyntaxException;
|
|||
import java.nio.ByteBuffer;
|
||||
import java.util.HashMap;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.Future;
|
||||
|
||||
import javax.net.ServerSocketFactory;
|
||||
|
@ -101,8 +99,6 @@ public class AutoNIOSSLTransportServer extends AutoTcpTransportServer {
|
|||
|
||||
@Override
|
||||
protected TransportInfo configureTransport(final TcpTransportServer server, final Socket socket) throws Exception {
|
||||
ExecutorService executor = Executors.newSingleThreadExecutor();
|
||||
|
||||
//The SSLEngine needs to be initialized and handshake done to get the first command and detect the format
|
||||
//The wireformat doesn't need properties set here because we aren't using this format during the SSL handshake
|
||||
final AutoInitNioSSLTransport in = new AutoInitNioSSLTransport(wireFormatFactory.createWireFormat(), socket);
|
||||
|
@ -117,17 +113,38 @@ public class AutoNIOSSLTransportServer extends AutoTcpTransportServer {
|
|||
in.start();
|
||||
SSLEngine engine = in.getSslSession();
|
||||
|
||||
Future<?> future = executor.submit(new Runnable() {
|
||||
//Attempt to read enough bytes to detect the protocol until the timeout period
|
||||
//is reached
|
||||
Future<?> future = protocolDetectionExecutor.submit(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
//Wait for handshake to finish initializing
|
||||
int attempts = 0;
|
||||
do {
|
||||
if(attempts > 0) {
|
||||
try {
|
||||
//increase sleep period each attempt to prevent high cpu usage
|
||||
//if the client is hung and not sending bytes
|
||||
int sleep = attempts >= 1024 ? 1024 : 4 * attempts;
|
||||
Thread.sleep(sleep);
|
||||
} catch (InterruptedException e) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
//In the future it might be better to register a nonblocking selector
|
||||
//to be told when bytes are ready
|
||||
in.serviceRead();
|
||||
} while(in.getReadSize().get() < 8);
|
||||
attempts++;
|
||||
} while(in.getReadSize().get() < 8 && !Thread.interrupted());
|
||||
}
|
||||
});
|
||||
|
||||
waitForProtocolDetectionFinish(future, in.getReadSize());
|
||||
try {
|
||||
//If this fails and throws an exception and the socket will be closed
|
||||
waitForProtocolDetectionFinish(future, in.getReadSize());
|
||||
} finally {
|
||||
//call cancel in case task didn't complete which will interrupt the task
|
||||
future.cancel(true);
|
||||
}
|
||||
in.stop();
|
||||
|
||||
InitBuffer initBuffer = new InitBuffer(in.getReadSize().get(), ByteBuffer.allocate(in.getReadData().length));
|
||||
|
|
|
@ -135,21 +135,6 @@ public class AutoInitNioSSLTransport extends NIOSSLTransport {
|
|||
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
protected void finishHandshake() throws Exception {
|
||||
if (handshakeInProgress) {
|
||||
handshakeInProgress = false;
|
||||
nextFrameSize = -1;
|
||||
|
||||
// Once handshake completes we need to ask for the now real sslSession
|
||||
// otherwise the session would return 'SSL_NULL_WITH_NULL_NULL' for the
|
||||
// cipher suite.
|
||||
sslSession = sslEngine.getSession();
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
public SSLEngine getSslSession() {
|
||||
return this.sslEngine;
|
||||
}
|
||||
|
@ -180,6 +165,10 @@ public class AutoInitNioSSLTransport extends NIOSSLTransport {
|
|||
if (!plain.hasRemaining()) {
|
||||
int readCount = secureRead(plain);
|
||||
|
||||
if (readCount == 0) {
|
||||
break;
|
||||
}
|
||||
|
||||
// channel is closed, cleanup
|
||||
if (readCount == -1) {
|
||||
onException(new EOFException());
|
||||
|
|
|
@ -156,7 +156,6 @@ public class NIOSSLTransport extends NIOTransport {
|
|||
doHandshake();
|
||||
}
|
||||
|
||||
// if (hasSslEngine) {
|
||||
selection = SelectorManager.getInstance().register(channel, new SelectorManager.Listener() {
|
||||
@Override
|
||||
public void onSelect(SelectorSelection selection) {
|
||||
|
@ -233,23 +232,6 @@ public class NIOSSLTransport extends NIOTransport {
|
|||
// otherwise the session would return 'SSL_NULL_WITH_NULL_NULL' for the
|
||||
// cipher suite.
|
||||
sslSession = sslEngine.getSession();
|
||||
|
||||
// listen for events telling us when the socket is readable.
|
||||
selection = SelectorManager.getInstance().register(channel, new SelectorManager.Listener() {
|
||||
@Override
|
||||
public void onSelect(SelectorSelection selection) {
|
||||
serviceRead();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onError(SelectorSelection selection, Throwable error) {
|
||||
if (error instanceof IOException) {
|
||||
onException((IOException) error);
|
||||
} else {
|
||||
onException(IOExceptionSupport.create(error));
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue