Fixing the auto transport protocol detection so that the byte buffer
that captures the initial bytes for detection is not shared across
threads. This was causing failed connections under high load and high cpu
usage under NIO
This commit is contained in:
Christopher L. Shannon (cshannon) 2016-11-12 10:00:32 -05:00
parent 980162233f
commit 7e648d512d
7 changed files with 86 additions and 49 deletions

View File

@ -32,6 +32,7 @@ import org.apache.activemq.broker.BrokerServiceAware;
import org.apache.activemq.transport.TransportServer;
import org.apache.activemq.transport.tcp.SslTransportFactory;
import org.apache.activemq.transport.tcp.TcpTransport;
import org.apache.activemq.transport.tcp.TcpTransport.InitBuffer;
import org.apache.activemq.transport.tcp.TcpTransportFactory;
import org.apache.activemq.transport.tcp.TcpTransportServer;
import org.apache.activemq.util.IOExceptionSupport;
@ -96,18 +97,12 @@ public class AutoSslTransportFactory extends SslTransportFactory implements Brok
protected AutoSslTransportServer createAutoSslTransportServer(final URI location, SSLServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException {
AutoSslTransportServer server = new AutoSslTransportServer(this, location, serverSocketFactory,
this.brokerService, enabledProtocols) {
@Override
protected TcpTransport createTransport(Socket socket, WireFormat format)
throws IOException {
setDefaultLinkStealing(format, this);
return super.createTransport(socket, format);
}
@Override
protected TcpTransport createTransport(Socket socket, WireFormat format,
TcpTransportFactory detectedTransportFactory) throws IOException {
TcpTransportFactory detectedTransportFactory, InitBuffer initBuffer) throws IOException {
setDefaultLinkStealing(format, this);
return super.createTransport(socket, format, detectedTransportFactory);
return super.createTransport(socket, format, detectedTransportFactory, initBuffer);
}
};
return server;

View File

@ -29,6 +29,7 @@ import javax.net.ssl.SSLServerSocketFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.transport.tcp.SslTransportFactory;
import org.apache.activemq.transport.tcp.TcpTransport;
import org.apache.activemq.transport.tcp.TcpTransport.InitBuffer;
import org.apache.activemq.transport.tcp.TcpTransportFactory;
import org.apache.activemq.wireformat.WireFormat;
@ -120,9 +121,9 @@ public class AutoSslTransportServer extends AutoTcpTransportServer {
*/
@Override
protected TcpTransport createTransport(Socket socket, WireFormat format,
TcpTransportFactory detectedTransportFactory) throws IOException {
TcpTransportFactory detectedTransportFactory, InitBuffer initBuffer) throws IOException {
return detectedTransportFactory.createTransport(format, socket, this.initBuffer);
return detectedTransportFactory.createTransport(format, socket, initBuffer);
}
@Override

View File

@ -31,6 +31,7 @@ import org.apache.activemq.broker.BrokerServiceAware;
import org.apache.activemq.openwire.OpenWireFormatFactory;
import org.apache.activemq.transport.TransportServer;
import org.apache.activemq.transport.tcp.TcpTransport;
import org.apache.activemq.transport.tcp.TcpTransport.InitBuffer;
import org.apache.activemq.transport.tcp.TcpTransportFactory;
import org.apache.activemq.transport.tcp.TcpTransportServer;
import org.apache.activemq.util.IOExceptionSupport;
@ -87,18 +88,11 @@ public class AutoTcpTransportFactory extends TcpTransportFactory implements Brok
protected AutoTcpTransportServer createTcpTransportServer(final URI location, ServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException {
AutoTcpTransportServer server = new AutoTcpTransportServer(this, location, serverSocketFactory, brokerService, enabledProtocols) {
@Override
protected TcpTransport createTransport(Socket socket, WireFormat format)
throws IOException {
setDefaultLinkStealing(format, this);
return super.createTransport(socket, format);
}
@Override
protected TcpTransport createTransport(Socket socket, WireFormat format,
TcpTransportFactory detectedTransportFactory) throws IOException {
TcpTransportFactory detectedTransportFactory, InitBuffer initBuffer) throws IOException {
setDefaultLinkStealing(format, this);
return super.createTransport(socket, format, detectedTransportFactory);
return super.createTransport(socket, format, detectedTransportFactory, initBuffer);
}
};

View File

@ -222,12 +222,6 @@ public class AutoTcpTransportServer extends TcpTransportServer {
protected final ThreadPoolExecutor service;
/**
* This holds the initial buffer that has been read to detect the protocol.
*/
public InitBuffer initBuffer;
@Override
protected void handleSocket(final Socket socket) {
final AutoTcpTransportServer server = this;
@ -272,7 +266,7 @@ public class AutoTcpTransportServer extends TcpTransportServer {
data.flip();
ProtocolInfo protocolInfo = detectProtocol(data.array());
initBuffer = new InitBuffer(readBytes.get(), ByteBuffer.allocate(readBytes.get()));
InitBuffer initBuffer = new InitBuffer(readBytes.get(), ByteBuffer.allocate(readBytes.get()));
initBuffer.buffer.put(data.array());
if (protocolInfo.detectedTransportFactory instanceof BrokerServiceAware) {
@ -280,7 +274,7 @@ public class AutoTcpTransportServer extends TcpTransportServer {
}
WireFormat format = protocolInfo.detectedWireFormatFactory.createWireFormat();
Transport transport = createTransport(socket, format, protocolInfo.detectedTransportFactory);
Transport transport = createTransport(socket, format, protocolInfo.detectedTransportFactory, initBuffer);
return new TransportInfo(format, transport, protocolInfo.detectedTransportFactory);
}
@ -299,11 +293,6 @@ public class AutoTcpTransportServer extends TcpTransportServer {
}
}
@Override
protected TcpTransport createTransport(Socket socket, WireFormat format) throws IOException {
return new TcpTransport(format, socket, this.initBuffer);
}
/**
* @param socket
* @param format
@ -311,8 +300,8 @@ public class AutoTcpTransportServer extends TcpTransportServer {
* @return
*/
protected TcpTransport createTransport(Socket socket, WireFormat format,
TcpTransportFactory detectedTransportFactory) throws IOException {
return createTransport(socket, format);
TcpTransportFactory detectedTransportFactory, InitBuffer initBuffer) throws IOException {
return new TcpTransport(format, socket, initBuffer);
}
public void setWireFormatOptions(Map<String, Map<String, Object>> wireFormatOptions) {

View File

@ -130,7 +130,7 @@ public class AutoNIOSSLTransportServer extends AutoTcpTransportServer {
waitForProtocolDetectionFinish(future, in.getReadSize());
in.stop();
initBuffer = new InitBuffer(in.getReadSize().get(), ByteBuffer.allocate(in.getReadData().length));
InitBuffer initBuffer = new InitBuffer(in.getReadSize().get(), ByteBuffer.allocate(in.getReadData().length));
initBuffer.buffer.put(in.getReadData());
ProtocolInfo protocolInfo = detectProtocol(in.getReadData());

View File

@ -34,6 +34,7 @@ import org.apache.activemq.transport.auto.AutoTcpTransportServer;
import org.apache.activemq.transport.auto.AutoTransportUtils;
import org.apache.activemq.transport.nio.NIOTransportFactory;
import org.apache.activemq.transport.tcp.TcpTransport;
import org.apache.activemq.transport.tcp.TcpTransport.InitBuffer;
import org.apache.activemq.transport.tcp.TcpTransportFactory;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.util.IntrospectionSupport;
@ -58,13 +59,13 @@ public class AutoNioTransportFactory extends NIOTransportFactory implements Brok
protected AutoTcpTransportServer createTcpTransportServer(URI location, ServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException {
return new AutoTcpTransportServer(this, location, serverSocketFactory, brokerService, enabledProtocols) {
@Override
protected TcpTransport createTransport(Socket socket, WireFormat format, TcpTransportFactory detectedTransportFactory) throws IOException {
protected TcpTransport createTransport(Socket socket, WireFormat format, TcpTransportFactory detectedTransportFactory, InitBuffer initBuffer) throws IOException {
TcpTransport nioTransport = null;
if (detectedTransportFactory.getClass().equals(NIOTransportFactory.class)) {
nioTransport = new AutoNIOTransport(format, socket,this.initBuffer);
nioTransport = new AutoNIOTransport(format, socket, initBuffer);
} else {
nioTransport = detectedTransportFactory.createTransport(
format, socket, this.initBuffer);
format, socket, initBuffer);
}
if (format.getClass().toString().contains("MQTT")) {

View File

@ -24,6 +24,8 @@ import java.util.Collection;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
@ -33,15 +35,21 @@ import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.transport.tcp.TcpTransportServer;
import org.apache.activemq.util.Wait;
import org.apache.activemq.util.Wait.Condition;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
@RunWith(Parameterized.class)
public class AutoTransportMaxConnectionsTest {
public class AutoTransportConnectionsTest {
@Rule
public Timeout globalTimeout = new Timeout(60, TimeUnit.SECONDS);
public static final String KEYSTORE_TYPE = "jks";
public static final String PASSWORD = "password";
@ -55,7 +63,7 @@ public class AutoTransportMaxConnectionsTest {
private TransportConnector connector;
private final String transportType;
@Parameters
@Parameters(name="transport={0}")
public static Collection<Object[]> data() {
return Arrays.asList(new Object[][] {
{"auto"},
@ -66,7 +74,7 @@ public class AutoTransportMaxConnectionsTest {
}
public AutoTransportMaxConnectionsTest(String transportType) {
public AutoTransportConnectionsTest(String transportType) {
super();
this.transportType = transportType;
}
@ -84,7 +92,18 @@ public class AutoTransportMaxConnectionsTest {
service = new BrokerService();
service.setPersistent(false);
service.setUseJmx(false);
connector = service.addConnector(transportType + "://0.0.0.0:0?maxConnectionThreadPoolSize=10&maximumConnections="+maxConnections);
}
@After
public void tearDown() throws Exception {
executor.shutdown();
service.stop();
service.waitUntilStopped();
}
public void configureConnectorAndStart(String bindAddress) throws Exception {
connector = service.addConnector(bindAddress);
connectionUri = connector.getPublishableConnectString();
service.start();
service.waitUntilStarted();
@ -94,8 +113,10 @@ public class AutoTransportMaxConnectionsTest {
return new ActiveMQConnectionFactory(connectionUri);
}
@Test(timeout=60000)
@Test
public void testMaxConnectionControl() throws Exception {
configureConnectorAndStart(transportType + "://0.0.0.0:0?maxConnectionThreadPoolSize=10&maximumConnections="+maxConnections);
final ConnectionFactory cf = createConnectionFactory();
final CountDownLatch startupLatch = new CountDownLatch(1);
@ -113,7 +134,6 @@ public class AutoTransportMaxConnectionsTest {
conn = cf.createConnection();
conn.start();
} catch (Exception e) {
//JmsUtils.closeConnection(conn);
}
}
});
@ -141,11 +161,48 @@ public class AutoTransportMaxConnectionsTest {
}
@After
public void tearDown() throws Exception {
executor.shutdown();
@Test
public void testConcurrentConnections() throws Exception {
configureConnectorAndStart(transportType + "://0.0.0.0:0");
service.stop();
service.waitUntilStopped();
int connectionAttempts = 50;
ConnectionFactory factory = createConnectionFactory();
final AtomicInteger connectedCount = new AtomicInteger(0);
final CountDownLatch latch = new CountDownLatch(1);
try {
for (int i = 0; i < connectionAttempts; i++) {
executor.execute(new Runnable() {
@Override
public void run() {
try {
latch.await();
Connection con = factory.createConnection();
con.start();
connectedCount.incrementAndGet();
} catch (Exception e) {
//print for debugging but don't fail it might just be the transport stopping
e.printStackTrace();
}
}
});
}
latch.countDown();
//Make sure all attempts connected without error
assertTrue(Wait.waitFor(new Condition() {
@Override
public boolean isSatisified() throws Exception {
return connectedCount.get() == connectionAttempts;
}
}));
} catch (Exception e) {
//print for debugging but don't fail it might just be the transport stopping
e.printStackTrace();
}
}
}