Changing the nio+ssl transports to trigger a serviceRead after start up
to prevent blocking. The prevents the channels from not reading in
certain cases, most notably with the auto+nio+ssl transport when used
for a network bridge.  Also added a couple tests and changed a network
bridge test to test out auto+nio+ssl.
This commit is contained in:
Christopher L. Shannon (cshannon) 2016-08-31 09:46:50 -04:00
parent e57de54410
commit ed0e786b60
8 changed files with 165 additions and 17 deletions

View File

@ -69,12 +69,11 @@ public class AmqpNioSslTransport extends NIOSSLTransport {
*/
@Override
protected void doInit() {
protected void doInit() throws Exception {
if (initBuffer != null) {
nextFrameSize = -1;
serviceRead();
}
super.doInit();
}
@Override

View File

@ -118,7 +118,7 @@ public class AutoInitNioSSLTransport extends NIOSSLTransport {
sslEngine.beginHandshake();
handshakeStatus = sslEngine.getHandshakeStatus();
doHandshake();
// detectReadyState();
} catch (Exception e) {
try {
if(outputStream != null) {

View File

@ -29,6 +29,7 @@ import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.security.cert.X509Certificate;
import java.util.concurrent.CountDownLatch;
import javax.net.SocketFactory;
import javax.net.ssl.SSLContext;
@ -159,6 +160,11 @@ public class NIOSSLTransport extends NIOTransport {
selection = SelectorManager.getInstance().register(channel, new SelectorManager.Listener() {
@Override
public void onSelect(SelectorSelection selection) {
try {
initialized.await();
} catch (InterruptedException error) {
onException(IOExceptionSupport.create(error));
}
serviceRead();
}
@ -184,13 +190,28 @@ public class NIOSSLTransport extends NIOTransport {
}
}
protected void doInit() throws Exception {
final protected CountDownLatch initialized = new CountDownLatch(1);
protected void doInit() throws Exception {
taskRunnerFactory.execute(new Runnable() {
@Override
public void run() {
//Need to start in new thread to let startup finish first
//We can trigger a read because we know the channel is ready since the SSL handshake
//already happened
serviceRead();
initialized.countDown();
}
});
}
//Only used for the auto transport to abort the openwire init method early if already initialized
boolean openWireInititialized = false;
protected void doOpenWireInit() throws Exception {
//Do this later to let wire format negotiation happen
if (initBuffer != null && this.wireFormat instanceof OpenWireFormat) {
if (initBuffer != null && !openWireInititialized && this.wireFormat instanceof OpenWireFormat) {
initBuffer.buffer.flip();
if (initBuffer.buffer.hasRemaining()) {
nextFrameSize = -1;
@ -198,6 +219,7 @@ public class NIOSSLTransport extends NIOTransport {
processCommand(initBuffer.buffer);
processCommand(initBuffer.buffer);
initBuffer.buffer.clear();
openWireInititialized = true;
}
}
}
@ -460,6 +482,8 @@ public class NIOSSLTransport extends NIOTransport {
@Override
protected void doStop(ServiceStopper stopper) throws Exception {
initialized.countDown();
if (taskRunnerFactory != null) {
taskRunnerFactory.shutdownNow();
taskRunnerFactory = null;
@ -489,6 +513,7 @@ public class NIOSSLTransport extends NIOTransport {
/**
* @return peer certificate chain associated with the ssl socket
*/
@Override
public X509Certificate[] getPeerCertificates() {
X509Certificate[] clientCertChain = null;

View File

@ -74,6 +74,7 @@ public class MQTTNIOSSLTransport extends NIOSSLTransport {
initBuffer.buffer.flip();
processCommand(initBuffer.buffer);
}
super.doInit();
}

View File

@ -87,10 +87,10 @@ public class StompNIOSSLTransport extends NIOSSLTransport {
protected void doInit() throws Exception {
if (initBuffer != null) {
nextFrameSize = -1;
// System.out.println("length1: " + initBuffer.array().length);
receiveCounter += initBuffer.readSize;
initBuffer.buffer.flip();
processCommand(initBuffer.buffer);
}
super.doInit();
}
}

View File

@ -77,6 +77,20 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport {
});
}
public static final String KEYSTORE_TYPE = "jks";
public static final String PASSWORD = "password";
public static final String SERVER_KEYSTORE = "src/test/resources/server.keystore";
public static final String TRUST_KEYSTORE = "src/test/resources/client.keystore";
static {
System.setProperty("javax.net.ssl.trustStore", TRUST_KEYSTORE);
System.setProperty("javax.net.ssl.trustStorePassword", PASSWORD);
System.setProperty("javax.net.ssl.trustStoreType", KEYSTORE_TYPE);
System.setProperty("javax.net.ssl.keyStore", SERVER_KEYSTORE);
System.setProperty("javax.net.ssl.keyStoreType", KEYSTORE_TYPE);
System.setProperty("javax.net.ssl.keyStorePassword", PASSWORD);
}
public DurableSyncNetworkBridgeTest(final FLOW flow) {
this.flow = flow;
@ -492,7 +506,8 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport {
brokerService.addNetworkConnector(configureLocalNetworkConnector());
}
brokerService.addConnector("tcp://localhost:0");
//Use auto+nio+ssl to test out the transport works with bridging
brokerService.addConnector("auto+nio+ssl://localhost:0");
return brokerService;
}
@ -531,7 +546,8 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport {
remoteAdvisoryBroker = (AdvisoryBroker) brokerService.getBroker().getAdaptor(AdvisoryBroker.class);
//Need a larger cache size in order to handle all of the durables
brokerService.addConnector("tcp://localhost:" + port + "?wireFormat.cacheSize=2048&wireFormat.version=" + remoteBrokerWireFormatVersion);
//Use auto+nio+ssl to test out the transport works with bridging
brokerService.addConnector("auto+nio+ssl://localhost:" + port + "?wireFormat.cacheSize=2048&wireFormat.version=" + remoteBrokerWireFormatVersion);
return brokerService;
}

View File

@ -31,6 +31,7 @@ import javax.net.ssl.SSLSocketFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.spring.SpringSslContext;
import org.apache.activemq.transport.tcp.TcpTransportServer;
import org.apache.activemq.util.Wait;
import org.junit.After;
import org.junit.Before;
@ -80,7 +81,11 @@ public class OpenWireConnectionTimeoutTest {
{"tcp"},
{"ssl"},
{"nio"},
{"nio+ssl"}
{"nio+ssl"},
{"auto"},
{"auto+ssl"},
{"auto+nio"},
{"auto+nio+ssl"}
});
}
@ -118,7 +123,7 @@ public class OpenWireConnectionTimeoutTest {
}
public String getAdditionalConfig() {
return "?transport.connectAttemptTimeout=1200";
return "?transport.connectAttemptTimeout=1200&protocolDetectionTimeOut=1200";
}
@Test(timeout = 90000)
@ -144,7 +149,8 @@ public class OpenWireConnectionTimeoutTest {
assertTrue("one connection", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return 1 == brokerService.getTransportConnectorByScheme(getConnectorScheme()).connectionCount();
TcpTransportServer server = (TcpTransportServer) brokerService.getTransportConnectorByScheme(getConnectorScheme()).getServer();
return 1 == server.getCurrentTransportCount().get();
}
}, TimeUnit.SECONDS.toMillis(15), TimeUnit.MILLISECONDS.toMillis(250)));
@ -152,7 +158,8 @@ public class OpenWireConnectionTimeoutTest {
assertTrue("no dangling connections", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return 0 == brokerService.getTransportConnectorByScheme(getConnectorScheme()).connectionCount();
TcpTransportServer server = (TcpTransportServer) brokerService.getTransportConnectorByScheme(getConnectorScheme()).getServer();
return 0 == server.getCurrentTransportCount().get();
}
}, TimeUnit.SECONDS.toMillis(15), TimeUnit.MILLISECONDS.toMillis(500)));
@ -164,10 +171,14 @@ public class OpenWireConnectionTimeoutTest {
switch (connectorScheme) {
case "tcp":
case "auto":
case "nio":
case "auto+nio":
break;
case "ssl":
case "auto+ssl":
case "nio+ssl":
case "auto+nio+ssl":
useSsl = true;;
break;
default:
@ -222,6 +233,18 @@ public class OpenWireConnectionTimeoutTest {
case "nio+ssl":
connector = brokerService.addConnector("nio+ssl://0.0.0.0:0" + getAdditionalConfig());
break;
case "auto":
connector = brokerService.addConnector("auto://0.0.0.0:0" + getAdditionalConfig());
break;
case "auto+nio":
connector = brokerService.addConnector("auto+nio://0.0.0.0:0" + getAdditionalConfig());
break;
case "auto+ssl":
connector = brokerService.addConnector("auto+ssl://0.0.0.0:0" + getAdditionalConfig());
break;
case "auto+nio+ssl":
connector = brokerService.addConnector("auto+nio+ssl://0.0.0.0:0" + getAdditionalConfig());
break;
default:
throw new IOException("Invalid OpenWire connector scheme passed to test.");
}

View File

@ -19,6 +19,7 @@ package org.apache.activemq.usecases;
import java.net.URI;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import org.apache.activemq.JmsMultipleBrokersTestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.network.DiscoveryNetworkConnector;
@ -30,11 +31,91 @@ import org.slf4j.LoggerFactory;
public class NetworkAsyncStartTest extends JmsMultipleBrokersTestSupport {
private static final Logger LOG = LoggerFactory.getLogger(NetworkAsyncStartTest.class);
private String brokerBUri = "tcp://localhost:61617";
private String brokerCUri = "tcp://localhost:61618";
private String brokerBDomain = "localhost:61617";
private String brokerCDomain = "localhost:61618";
int bridgeCount=0;
public void testAsyncNetworkStartup() throws Exception {
public static final String KEYSTORE_TYPE = "jks";
public static final String PASSWORD = "password";
public static final String SERVER_KEYSTORE = "src/test/resources/server.keystore";
public static final String TRUST_KEYSTORE = "src/test/resources/client.keystore";
static {
System.setProperty("javax.net.ssl.trustStore", TRUST_KEYSTORE);
System.setProperty("javax.net.ssl.trustStorePassword", PASSWORD);
System.setProperty("javax.net.ssl.trustStoreType", KEYSTORE_TYPE);
System.setProperty("javax.net.ssl.keyStore", SERVER_KEYSTORE);
System.setProperty("javax.net.ssl.keyStoreType", KEYSTORE_TYPE);
System.setProperty("javax.net.ssl.keyStorePassword", PASSWORD);
}
public void testAsyncNetworkStartupTcp() throws Exception {
testAsyncNetworkStartup("tcp");
}
public void testAsyncNetworkStartupWithSlowConnectionCreationTcp() throws Exception {
testAsyncNetworkStartupWithSlowConnectionCreation("tcp");
}
public void testAsyncNetworkStartupNio() throws Exception {
testAsyncNetworkStartup("nio");
}
public void testAsyncNetworkStartupWithSlowConnectionCreationNio() throws Exception {
testAsyncNetworkStartupWithSlowConnectionCreation("nio");
}
public void testAsyncNetworkStartupAuto() throws Exception {
testAsyncNetworkStartup("auto");
}
public void testAsyncNetworkStartupWithSlowConnectionCreationAuto() throws Exception {
testAsyncNetworkStartupWithSlowConnectionCreation("auto");
}
public void testAsyncNetworkStartupAutoNio() throws Exception {
testAsyncNetworkStartup("auto+nio");
}
public void testAsyncNetworkStartupWithSlowConnectionCreationAutoNio() throws Exception {
testAsyncNetworkStartupWithSlowConnectionCreation("auto+nio");
}
public void testAsyncNetworkStartupSsl() throws Exception {
testAsyncNetworkStartup("ssl");
}
public void testAsyncNetworkStartupWithSlowConnectionCreationSsl() throws Exception {
testAsyncNetworkStartupWithSlowConnectionCreation("ssl");
}
public void testAsyncNetworkStartupAutoSsl() throws Exception {
testAsyncNetworkStartup("auto+ssl");
}
public void testAsyncNetworkStartupWithSlowConnectionCreationAutoSsl() throws Exception {
testAsyncNetworkStartupWithSlowConnectionCreation("auto+ssl");
}
public void testAsyncNetworkStartupNioSsl() throws Exception {
testAsyncNetworkStartup("nio+ssl");
}
public void testAsyncNetworkStartupWithSlowConnectionCreationNioSsl() throws Exception {
testAsyncNetworkStartupWithSlowConnectionCreation("nio+ssl");
}
public void testAsyncNetworkStartupAutoNioSsl() throws Exception {
testAsyncNetworkStartup("auto+nio+ssl");
}
public void testAsyncNetworkStartupWithSlowConnectionCreationAutoNioSsl() throws Exception {
testAsyncNetworkStartupWithSlowConnectionCreation("auto+nio+ssl");
}
protected void testAsyncNetworkStartup(String transport) throws Exception {
String brokerBUri = transport + "://" + brokerBDomain;
String brokerCUri = transport + "://" + brokerCDomain;
BrokerService brokerA = brokers.get("BrokerA").broker;
bridgeBroker(brokerA, brokerBUri);
@ -60,7 +141,9 @@ public class NetworkAsyncStartTest extends JmsMultipleBrokersTestSupport {
assertTrue("got bridge to B&C", waitForBridgeFormation(brokerA, 1, 1));
}
public void testAsyncNetworkStartupWithSlowConnectionCreation() throws Exception {
protected void testAsyncNetworkStartupWithSlowConnectionCreation(String transport) throws Exception {
String brokerBUri = transport + "://" + brokerBDomain;
String brokerCUri = transport + "://" + brokerCDomain;
final BrokerService brokerA = brokers.get("BrokerA").broker;
@ -81,6 +164,7 @@ public class NetworkAsyncStartTest extends JmsMultipleBrokersTestSupport {
Executor e = Executors.newCachedThreadPool();
e.execute(new Runnable() {
@Override
public void run() {
LOG.info("starting A");
try {