NO-JIRA Adds some small fixes to the AMQP test client around SSL

handling that were found in Qpid JMS where some of this came from.
This commit is contained in:
Timothy Bish 2016-02-29 16:13:28 -05:00
parent 19c9404916
commit f6d25842cc
2 changed files with 25 additions and 24 deletions

View File

@ -126,9 +126,9 @@ public class NettyTransport {
if (future.isSuccess()) {
handleConnected(future.channel());
} else if (future.isCancelled()) {
connectionFailed(new IOException("Connection attempt was cancelled"));
connectionFailed(future.channel(), new IOException("Connection attempt was cancelled"));
} else {
connectionFailed(IOExceptionSupport.create(future.cause()));
connectionFailed(future.channel(), IOExceptionSupport.create(future.cause()));
}
}
});
@ -144,7 +144,7 @@ public class NettyTransport {
if (failureCause != null) {
// Close out any Netty resources now as they are no longer needed.
if (channel != null) {
channel.close();
channel.close().syncUninterruptibly();
channel = null;
}
if (group != null) {
@ -167,14 +167,14 @@ public class NettyTransport {
}
}
public boolean isSSL() {
return secure;
}
public boolean isConnected() {
return connected.get();
}
public boolean isSSL() {
return secure;
}
public void close() throws IOException {
if (closed.compareAndSet(false, true)) {
connected.set(false);
@ -279,20 +279,10 @@ public class NettyTransport {
}
}
protected void configureChannel(Channel channel) throws Exception {
protected void configureChannel(final Channel channel) throws Exception {
if (isSSL()) {
channel.pipeline().addLast(NettyTransportSupport.createSslHandler(getRemoteLocation(), getSslOptions()));
}
channel.pipeline().addLast(new NettyTcpTransportHandler());
}
protected void handleConnected(final Channel channel) throws Exception {
if (isSSL()) {
SslHandler sslHandler = channel.pipeline().get(SslHandler.class);
Future<Channel> channelFuture = sslHandler.handshakeFuture();
channelFuture.addListener(new GenericFutureListener<Future<Channel>>() {
SslHandler sslHandler = NettyTransportSupport.createSslHandler(getRemoteLocation(), getSslOptions());
sslHandler.handshakeFuture().addListener(new GenericFutureListener<Future<Channel>>() {
@Override
public void operationComplete(Future<Channel> future) throws Exception {
if (future.isSuccess()) {
@ -300,11 +290,19 @@ public class NettyTransport {
connectionEstablished(channel);
} else {
LOG.trace("SSL Handshake has failed: {}", channel);
connectionFailed(IOExceptionSupport.create(future.cause()));
connectionFailed(channel, IOExceptionSupport.create(future.cause()));
}
}
});
} else {
channel.pipeline().addLast(sslHandler);
}
channel.pipeline().addLast(new NettyTcpTransportHandler());
}
protected void handleConnected(final Channel channel) throws Exception {
if (!isSSL()) {
connectionEstablished(channel);
}
}
@ -323,11 +321,14 @@ public class NettyTransport {
/**
* Called when the transport connection failed and an error should be returned.
*
* @param failedChannel
* The Channel instance that failed.
* @param cause
* An IOException that describes the cause of the failed connection.
*/
protected void connectionFailed(IOException cause) {
protected void connectionFailed(Channel failedChannel, IOException cause) {
failureCause = IOExceptionSupport.create(cause);
channel = failedChannel;
connected.set(false);
connectLatch.countDown();
}

View File

@ -19,7 +19,7 @@ package org.apache.activemq.transport.amqp.client.transport;
import java.net.URI;
import java.util.Map;
import org.apache.qpid.jms.util.PropertyUtil;
import org.apache.activemq.transport.amqp.client.util.PropertyUtil;
/**
* Factory for creating the Netty based TCP Transport.