From f6d25842ccb276ffe90f0c8ffdf3ee0fd0aca748 Mon Sep 17 00:00:00 2001 From: Timothy Bish Date: Mon, 29 Feb 2016 16:13:28 -0500 Subject: [PATCH] NO-JIRA Adds some small fixes to the AMQP test client around SSL handling that were found in Qpid JMS where some of this came from. --- .../amqp/client/transport/NettyTransport.java | 47 ++++++++++--------- .../transport/NettyTransportFactory.java | 2 +- 2 files changed, 25 insertions(+), 24 deletions(-) diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyTransport.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyTransport.java index 81e3a1dd45..40847800ce 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyTransport.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyTransport.java @@ -126,9 +126,9 @@ public class NettyTransport { if (future.isSuccess()) { handleConnected(future.channel()); } else if (future.isCancelled()) { - connectionFailed(new IOException("Connection attempt was cancelled")); + connectionFailed(future.channel(), new IOException("Connection attempt was cancelled")); } else { - connectionFailed(IOExceptionSupport.create(future.cause())); + connectionFailed(future.channel(), IOExceptionSupport.create(future.cause())); } } }); @@ -144,7 +144,7 @@ public class NettyTransport { if (failureCause != null) { // Close out any Netty resources now as they are no longer needed. if (channel != null) { - channel.close(); + channel.close().syncUninterruptibly(); channel = null; } if (group != null) { @@ -167,14 +167,14 @@ public class NettyTransport { } } - public boolean isSSL() { - return secure; - } - public boolean isConnected() { return connected.get(); } + public boolean isSSL() { + return secure; + } + public void close() throws IOException { if (closed.compareAndSet(false, true)) { connected.set(false); @@ -279,20 +279,10 @@ public class NettyTransport { } } - protected void configureChannel(Channel channel) throws Exception { + protected void configureChannel(final Channel channel) throws Exception { if (isSSL()) { - channel.pipeline().addLast(NettyTransportSupport.createSslHandler(getRemoteLocation(), getSslOptions())); - } - - channel.pipeline().addLast(new NettyTcpTransportHandler()); - } - - protected void handleConnected(final Channel channel) throws Exception { - if (isSSL()) { - SslHandler sslHandler = channel.pipeline().get(SslHandler.class); - - Future channelFuture = sslHandler.handshakeFuture(); - channelFuture.addListener(new GenericFutureListener>() { + SslHandler sslHandler = NettyTransportSupport.createSslHandler(getRemoteLocation(), getSslOptions()); + sslHandler.handshakeFuture().addListener(new GenericFutureListener>() { @Override public void operationComplete(Future future) throws Exception { if (future.isSuccess()) { @@ -300,11 +290,19 @@ public class NettyTransport { connectionEstablished(channel); } else { LOG.trace("SSL Handshake has failed: {}", channel); - connectionFailed(IOExceptionSupport.create(future.cause())); + connectionFailed(channel, IOExceptionSupport.create(future.cause())); } } }); - } else { + + channel.pipeline().addLast(sslHandler); + } + + channel.pipeline().addLast(new NettyTcpTransportHandler()); + } + + protected void handleConnected(final Channel channel) throws Exception { + if (!isSSL()) { connectionEstablished(channel); } } @@ -323,11 +321,14 @@ public class NettyTransport { /** * Called when the transport connection failed and an error should be returned. * + * @param failedChannel + * The Channel instance that failed. * @param cause * An IOException that describes the cause of the failed connection. */ - protected void connectionFailed(IOException cause) { + protected void connectionFailed(Channel failedChannel, IOException cause) { failureCause = IOExceptionSupport.create(cause); + channel = failedChannel; connected.set(false); connectLatch.countDown(); } diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportFactory.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportFactory.java index a002eae147..fd50890b17 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportFactory.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportFactory.java @@ -19,7 +19,7 @@ package org.apache.activemq.transport.amqp.client.transport; import java.net.URI; import java.util.Map; -import org.apache.qpid.jms.util.PropertyUtil; +import org.apache.activemq.transport.amqp.client.util.PropertyUtil; /** * Factory for creating the Netty based TCP Transport.