From 4b018b420637db61763f14b8a02f986087608916 Mon Sep 17 00:00:00 2001 From: Timothy Bish Date: Mon, 18 Jul 2016 16:54:37 -0400 Subject: [PATCH] https://issues.apache.org/jira/browse/AMQ-6339 Clean up the transport implementation and reduce duplication. --- .../client/transport/NettyTcpTransport.java | 273 ++++++++------ .../amqp/client/transport/NettyTransport.java | 2 +- .../transport/NettyTransportFactory.java | 2 +- .../transport/NettyTransportListener.java | 2 +- .../transport/NettyTransportOptions.java | 6 +- .../transport/NettyTransportSslOptions.java | 7 +- .../transport/NettyTransportSupport.java | 6 +- .../client/transport/NettyWSTransport.java | 356 ++---------------- 8 files changed, 195 insertions(+), 459 deletions(-) diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyTcpTransport.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyTcpTransport.java index 886ed4be65..0980d5ec38 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyTcpTransport.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyTcpTransport.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. @@ -33,8 +33,10 @@ import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; +import io.netty.channel.ChannelPipeline; import io.netty.channel.EventLoopGroup; import io.netty.channel.FixedRecvByteBufAllocator; import io.netty.channel.SimpleChannelInboundHandler; @@ -58,15 +60,13 @@ public class NettyTcpTransport implements NettyTransport { protected EventLoopGroup group; protected Channel channel; protected NettyTransportListener listener; - protected NettyTransportOptions options; + protected final NettyTransportOptions options; protected final URI remote; - protected boolean secure; private final AtomicBoolean connected = new AtomicBoolean(); private final AtomicBoolean closed = new AtomicBoolean(); private final CountDownLatch connectLatch = new CountDownLatch(1); private IOException failureCause; - private Throwable pendingFailure; /** * Create a new transport instance @@ -91,10 +91,17 @@ public class NettyTcpTransport implements NettyTransport { * the transport options used to configure the socket connection. */ public NettyTcpTransport(NettyTransportListener listener, URI remoteLocation, NettyTransportOptions options) { + if (options == null) { + throw new IllegalArgumentException("Transport Options cannot be null"); + } + + if (remoteLocation == null) { + throw new IllegalArgumentException("Transport remote location cannot be null"); + } + this.options = options; this.listener = listener; this.remote = remoteLocation; - this.secure = remoteLocation.getScheme().equalsIgnoreCase("ssl"); } @Override @@ -104,16 +111,27 @@ public class NettyTcpTransport implements NettyTransport { throw new IllegalStateException("A transport listener must be set before connection attempts."); } + final SslHandler sslHandler; + if (isSSL()) { + try { + sslHandler = NettyTransportSupport.createSslHandler(getRemoteLocation(), getSslOptions()); + } catch (Exception ex) { + // TODO: can we stop it throwing Exception? + throw IOExceptionSupport.create(ex); + } + } else { + sslHandler = null; + } + group = new NioEventLoopGroup(1); bootstrap = new Bootstrap(); bootstrap.group(group); bootstrap.channel(NioSocketChannel.class); bootstrap.handler(new ChannelInitializer() { - @Override public void initChannel(Channel connectedChannel) throws Exception { - configureChannel(connectedChannel); + configureChannel(connectedChannel, sslHandler); } }); @@ -124,12 +142,8 @@ public class NettyTcpTransport implements NettyTransport { @Override public void operationComplete(ChannelFuture future) throws Exception { - if (future.isSuccess()) { - handleConnected(future.channel()); - } else if (future.isCancelled()) { - connectionFailed(future.channel(), new IOException("Connection attempt was cancelled")); - } else { - connectionFailed(future.channel(), IOExceptionSupport.create(future.cause())); + if (!future.isSuccess()) { + handleException(future.channel(), IOExceptionSupport.create(future.cause())); } } }); @@ -160,8 +174,8 @@ public class NettyTcpTransport implements NettyTransport { @Override public void run() { - if (pendingFailure != null) { - channel.pipeline().fireExceptionCaught(pendingFailure); + if (failureCause != null) { + channel.pipeline().fireExceptionCaught(failureCause); } } }); @@ -175,7 +189,7 @@ public class NettyTcpTransport implements NettyTransport { @Override public boolean isSSL() { - return secure; + return options.isSSL(); } @Override @@ -222,14 +236,6 @@ public class NettyTcpTransport implements NettyTransport { @Override public NettyTransportOptions getTransportOptions() { - if (options == null) { - if (isSSL()) { - options = NettyTransportSslOptions.INSTANCE; - } else { - options = NettyTransportOptions.INSTANCE; - } - } - return options; } @@ -240,36 +246,106 @@ public class NettyTcpTransport implements NettyTransport { @Override public Principal getLocalPrincipal() { - if (!isSSL()) { - throw new UnsupportedOperationException("Not connected to a secure channel"); + Principal result = null; + + if (isSSL()) { + SslHandler sslHandler = channel.pipeline().get(SslHandler.class); + result = sslHandler.engine().getSession().getLocalPrincipal(); } - SslHandler sslHandler = channel.pipeline().get(SslHandler.class); - - return sslHandler.engine().getSession().getLocalPrincipal(); + return result; } - //----- Internal implementation details, can be overridden as needed --// + //----- Internal implementation details, can be overridden as needed -----// protected String getRemoteHost() { return remote.getHost(); } protected int getRemotePort() { - int port = remote.getPort(); - - if (port <= 0) { - if (isSSL()) { - port = getSslOptions().getDefaultSslPort(); - } else { - port = getTransportOptions().getDefaultTcpPort(); - } + if (remote.getPort() != -1) { + return remote.getPort(); + } else { + return isSSL() ? getSslOptions().getDefaultSslPort() : getTransportOptions().getDefaultTcpPort(); } - - return port; } - protected void configureNetty(Bootstrap bootstrap, NettyTransportOptions options) { + protected void addAdditionalHandlers(ChannelPipeline pipeline) { + + } + + protected ChannelInboundHandlerAdapter createChannelHandler() { + return new NettyTcpTransportHandler(); + } + + //----- Event Handlers which can be overridden in subclasses -------------// + + protected void handleConnected(Channel channel) throws Exception { + LOG.trace("Channel has become active! Channel is {}", channel); + connectionEstablished(channel); + } + + protected void handleChannelInactive(Channel channel) throws Exception { + LOG.trace("Channel has gone inactive! Channel is {}", channel); + if (connected.compareAndSet(true, false) && !closed.get()) { + LOG.trace("Firing onTransportClosed listener"); + listener.onTransportClosed(); + } + } + + protected void handleException(Channel channel, Throwable cause) throws Exception { + LOG.trace("Exception on channel! Channel is {}", channel); + if (connected.compareAndSet(true, false) && !closed.get()) { + LOG.trace("Firing onTransportError listener"); + if (failureCause != null) { + listener.onTransportError(failureCause); + } else { + listener.onTransportError(cause); + } + } else { + // Hold the first failure for later dispatch if connect succeeds. + // This will then trigger disconnect using the first error reported. + if (failureCause == null) { + LOG.trace("Holding error until connect succeeds: {}", cause.getMessage()); + failureCause = IOExceptionSupport.create(cause); + } + + connectionFailed(channel, failureCause); + } + } + + //----- State change handlers and checks ---------------------------------// + + protected final void checkConnected() throws IOException { + if (!connected.get()) { + throw new IOException("Cannot send to a non-connected transport."); + } + } + + /* + * Called when the transport has successfully connected and is ready for use. + */ + private void connectionEstablished(Channel connectedChannel) { + channel = connectedChannel; + connected.set(true); + connectLatch.countDown(); + } + + /* + * Called when the transport connection failed and an error should be returned. + */ + private void connectionFailed(Channel failedChannel, IOException cause) { + failureCause = cause; + channel = failedChannel; + connected.set(false); + connectLatch.countDown(); + } + + private NettyTransportSslOptions getSslOptions() { + return (NettyTransportSslOptions) getTransportOptions(); + } + + private void configureNetty(Bootstrap bootstrap, NettyTransportOptions options) { bootstrap.option(ChannelOption.TCP_NODELAY, options.isTcpNoDelay()); bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, options.getConnectTimeout()); bootstrap.option(ChannelOption.SO_KEEPALIVE, options.isTcpKeepAlive()); @@ -290,107 +366,62 @@ public class NettyTcpTransport implements NettyTransport { } } - protected void configureChannel(final Channel channel) throws Exception { + private void configureChannel(final Channel channel, final SslHandler sslHandler) throws Exception { if (isSSL()) { - SslHandler sslHandler = NettyTransportSupport.createSslHandler(getRemoteLocation(), getSslOptions()); - sslHandler.handshakeFuture().addListener(new GenericFutureListener>() { - @Override - public void operationComplete(Future future) throws Exception { - if (future.isSuccess()) { - LOG.trace("SSL Handshake has completed: {}", channel); - connectionEstablished(channel); - } else { - LOG.trace("SSL Handshake has failed: {}", channel); - connectionFailed(channel, IOExceptionSupport.create(future.cause())); - } - } - }); - channel.pipeline().addLast(sslHandler); } - channel.pipeline().addLast(new NettyTcpTransportHandler()); - } + addAdditionalHandlers(channel.pipeline()); - protected void handleConnected(final Channel channel) throws Exception { - if (!isSSL()) { - connectionEstablished(channel); - } - } - - //----- State change handlers and checks ---------------------------------// - - /** - * Called when the transport has successfully connected and is ready for use. - */ - protected void connectionEstablished(Channel connectedChannel) { - channel = connectedChannel; - connected.set(true); - connectLatch.countDown(); - } - - /** - * Called when the transport connection failed and an error should be returned. - * - * @param failedChannel - * The Channel instance that failed. - * @param cause - * An IOException that describes the cause of the failed connection. - */ - protected void connectionFailed(Channel failedChannel, IOException cause) { - failureCause = IOExceptionSupport.create(cause); - channel = failedChannel; - connected.set(false); - connectLatch.countDown(); - } - - private NettyTransportSslOptions getSslOptions() { - return (NettyTransportSslOptions) getTransportOptions(); - } - - private void checkConnected() throws IOException { - if (!connected.get()) { - throw new IOException("Cannot send to a non-connected transport."); - } + channel.pipeline().addLast(createChannelHandler()); } //----- Handle connection events -----------------------------------------// - private class NettyTcpTransportHandler extends SimpleChannelInboundHandler { + protected abstract class NettyDefaultHandler extends SimpleChannelInboundHandler { + + @Override + public void channelRegistered(ChannelHandlerContext context) throws Exception { + channel = context.channel(); + } @Override public void channelActive(ChannelHandlerContext context) throws Exception { - LOG.trace("Channel has become active! Channel is {}", context.channel()); + // In the Secure case we need to let the handshake complete before we + // trigger the connected event. + if (!isSSL()) { + handleConnected(context.channel()); + } else { + SslHandler sslHandler = context.pipeline().get(SslHandler.class); + sslHandler.handshakeFuture().addListener(new GenericFutureListener>() { + @Override + public void operationComplete(Future future) throws Exception { + if (future.isSuccess()) { + LOG.trace("SSL Handshake has completed: {}", channel); + handleConnected(channel); + } else { + LOG.trace("SSL Handshake has failed: {}", channel); + handleException(channel, future.cause()); + } + } + }); + } } @Override public void channelInactive(ChannelHandlerContext context) throws Exception { - LOG.trace("Channel has gone inactive! Channel is {}", context.channel()); - if (connected.compareAndSet(true, false) && !closed.get()) { - LOG.trace("Firing onTransportClosed listener"); - listener.onTransportClosed(); - } + handleChannelInactive(context.channel()); } @Override public void exceptionCaught(ChannelHandlerContext context, Throwable cause) throws Exception { - LOG.trace("Exception on channel! Channel is {}", context.channel()); - if (connected.compareAndSet(true, false) && !closed.get()) { - LOG.trace("Firing onTransportError listener"); - if (pendingFailure != null) { - listener.onTransportError(pendingFailure); - } else { - listener.onTransportError(cause); - } - } else { - // Hold the first failure for later dispatch if connect succeeds. - // This will then trigger disconnect using the first error reported. - if (pendingFailure != null) { - LOG.trace("Holding error until connect succeeds: {}", cause.getMessage()); - pendingFailure = cause; - } - } + handleException(context.channel(), cause); } + } + + //----- Handle Binary data from connection -------------------------------// + + protected class NettyTcpTransportHandler extends NettyDefaultHandler { @Override protected void channelRead0(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception { diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyTransport.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyTransport.java index 48be3a2a3e..a5d0214111 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyTransport.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyTransport.java @@ -23,7 +23,7 @@ import java.security.Principal; import io.netty.buffer.ByteBuf; /** - * + * Base for all Netty based Transports in this client. */ public interface NettyTransport { diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportFactory.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportFactory.java index cc47aa2fdc..a06b4d05f4 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportFactory.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportFactory.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportListener.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportListener.java index 09959a32f8..1b3a91c2f5 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportListener.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportListener.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportOptions.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportOptions.java index 7b3dc9b9d7..add924c6b2 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportOptions.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportOptions.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. @@ -163,6 +163,10 @@ public class NettyTransportOptions implements Cloneable { this.defaultTcpPort = defaultTcpPort; } + public boolean isSSL() { + return false; + } + @Override public NettyTransportOptions clone() { return copyOptions(new NettyTransportOptions()); diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportSslOptions.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportSslOptions.java index b01f88428b..f81b8b35c5 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportSslOptions.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportSslOptions.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. @@ -261,6 +261,11 @@ public class NettyTransportSslOptions extends NettyTransportOptions { this.defaultSslPort = defaultSslPort; } + @Override + public boolean isSSL() { + return true; + } + @Override public NettyTransportSslOptions clone() { return copyOptions(new NettyTransportSslOptions()); diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportSupport.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportSupport.java index 18f9630716..653e915fd5 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportSupport.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportSupport.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. @@ -16,8 +16,6 @@ */ package org.apache.activemq.transport.amqp.client.transport; -import io.netty.handler.ssl.SslHandler; - import java.io.File; import java.io.FileInputStream; import java.io.InputStream; @@ -44,6 +42,8 @@ import javax.net.ssl.X509TrustManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import io.netty.handler.ssl.SslHandler; + /** * Static class that provides various utility methods used by Transport implementations. */ diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyWSTransport.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyWSTransport.java index 1b604fe1ce..c2fcfe7799 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyWSTransport.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyWSTransport.java @@ -18,68 +18,38 @@ package org.apache.activemq.transport.amqp.client.transport; import java.io.IOException; import java.net.URI; -import java.security.Principal; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; +import java.nio.charset.StandardCharsets; -import org.apache.activemq.transport.amqp.client.util.IOExceptionSupport; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import io.netty.bootstrap.Bootstrap; import io.netty.buffer.ByteBuf; import io.netty.channel.Channel; -import io.netty.channel.ChannelFuture; -import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelInitializer; -import io.netty.channel.ChannelOption; -import io.netty.channel.ChannelPromise; -import io.netty.channel.EventLoopGroup; -import io.netty.channel.FixedRecvByteBufAllocator; -import io.netty.channel.SimpleChannelInboundHandler; -import io.netty.channel.nio.NioEventLoopGroup; -import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.channel.ChannelPipeline; import io.netty.handler.codec.http.DefaultHttpHeaders; import io.netty.handler.codec.http.FullHttpResponse; import io.netty.handler.codec.http.HttpClientCodec; import io.netty.handler.codec.http.HttpObjectAggregator; import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame; import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame; +import io.netty.handler.codec.http.websocketx.PingWebSocketFrame; import io.netty.handler.codec.http.websocketx.PongWebSocketFrame; import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; import io.netty.handler.codec.http.websocketx.WebSocketClientHandshaker; import io.netty.handler.codec.http.websocketx.WebSocketClientHandshakerFactory; import io.netty.handler.codec.http.websocketx.WebSocketFrame; import io.netty.handler.codec.http.websocketx.WebSocketVersion; -import io.netty.handler.ssl.SslHandler; -import io.netty.util.CharsetUtil; -import io.netty.util.concurrent.Future; -import io.netty.util.concurrent.GenericFutureListener; /** * Transport for communicating over WebSockets */ -public class NettyWSTransport implements NettyTransport { +public class NettyWSTransport extends NettyTcpTransport { private static final Logger LOG = LoggerFactory.getLogger(NettyWSTransport.class); - private static final int QUIET_PERIOD = 20; - private static final int SHUTDOWN_TIMEOUT = 100; - - protected Bootstrap bootstrap; - protected EventLoopGroup group; - protected Channel channel; - protected NettyTransportListener listener; - protected NettyTransportOptions options; - protected final URI remote; - protected boolean secure; - - private final AtomicBoolean connected = new AtomicBoolean(); - private final AtomicBoolean closed = new AtomicBoolean(); - private ChannelPromise handshakeFuture; - private IOException failureCause; - private Throwable pendingFailure; + private static final String AMQP_SUB_PROTOCOL = "amqp"; /** * Create a new transport instance @@ -104,114 +74,7 @@ public class NettyWSTransport implements NettyTransport { * the transport options used to configure the socket connection. */ public NettyWSTransport(NettyTransportListener listener, URI remoteLocation, NettyTransportOptions options) { - this.options = options; - this.listener = listener; - this.remote = remoteLocation; - this.secure = remoteLocation.getScheme().equalsIgnoreCase("wss"); - } - - @Override - public void connect() throws IOException { - - if (listener == null) { - throw new IllegalStateException("A transport listener must be set before connection attempts."); - } - - group = new NioEventLoopGroup(1); - - bootstrap = new Bootstrap(); - bootstrap.group(group); - bootstrap.channel(NioSocketChannel.class); - bootstrap.handler(new ChannelInitializer() { - - @Override - public void initChannel(Channel connectedChannel) throws Exception { - configureChannel(connectedChannel); - } - }); - - configureNetty(bootstrap, getTransportOptions()); - - ChannelFuture future; - try { - future = bootstrap.connect(getRemoteHost(), getRemotePort()); - future.addListener(new ChannelFutureListener() { - - @Override - public void operationComplete(ChannelFuture future) throws Exception { - if (future.isSuccess()) { - handleConnected(future.channel()); - } else if (future.isCancelled()) { - connectionFailed(future.channel(), new IOException("Connection attempt was cancelled")); - } else { - connectionFailed(future.channel(), IOExceptionSupport.create(future.cause())); - } - } - }); - - future.sync(); - - // Now wait for WS protocol level handshake completion - handshakeFuture.await(); - } catch (InterruptedException ex) { - LOG.debug("Transport connection attempt was interrupted."); - Thread.interrupted(); - failureCause = IOExceptionSupport.create(ex); - } - - if (failureCause != null) { - // Close out any Netty resources now as they are no longer needed. - if (channel != null) { - channel.close().syncUninterruptibly(); - channel = null; - } - if (group != null) { - group.shutdownGracefully(QUIET_PERIOD, SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS); - group = null; - } - - throw failureCause; - } else { - // Connected, allow any held async error to fire now and close the transport. - channel.eventLoop().execute(new Runnable() { - - @Override - public void run() { - if (pendingFailure != null) { - channel.pipeline().fireExceptionCaught(pendingFailure); - } - } - }); - } - } - - @Override - public boolean isConnected() { - return connected.get(); - } - - @Override - public boolean isSSL() { - return secure; - } - - @Override - public void close() throws IOException { - if (closed.compareAndSet(false, true)) { - connected.set(false); - if (channel != null) { - channel.close().syncUninterruptibly(); - } - if (group != null) { - group.shutdownGracefully(QUIET_PERIOD, SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS); - } - } - } - - @Override - public ByteBuf allocateSendBuffer(int size) throws IOException { - checkConnected(); - return channel.alloc().ioBuffer(size, size); + super(listener, remoteLocation, options); } @Override @@ -228,206 +91,37 @@ public class NettyWSTransport implements NettyTransport { } @Override - public NettyTransportListener getTransportListener() { - return listener; + protected ChannelInboundHandlerAdapter createChannelHandler() { + return new NettyWebSocketTransportHandler(); } @Override - public void setTransportListener(NettyTransportListener listener) { - this.listener = listener; + protected void addAdditionalHandlers(ChannelPipeline pipeline) { + pipeline.addLast(new HttpClientCodec()); + pipeline.addLast(new HttpObjectAggregator(8192)); } @Override - public NettyTransportOptions getTransportOptions() { - if (options == null) { - if (isSSL()) { - options = NettyTransportSslOptions.INSTANCE; - } else { - options = NettyTransportOptions.INSTANCE; - } - } - - return options; - } - - @Override - public URI getRemoteLocation() { - return remote; - } - - @Override - public Principal getLocalPrincipal() { - if (!isSSL()) { - throw new UnsupportedOperationException("Not connected to a secure channel"); - } - - SslHandler sslHandler = channel.pipeline().get(SslHandler.class); - - return sslHandler.engine().getSession().getLocalPrincipal(); - } - - //----- Internal implementation details, can be overridden as needed --// - - protected String getRemoteHost() { - return remote.getHost(); - } - - protected int getRemotePort() { - int port = remote.getPort(); - - if (port <= 0) { - if (isSSL()) { - port = getSslOptions().getDefaultSslPort(); - } else { - port = getTransportOptions().getDefaultTcpPort(); - } - } - - return port; - } - - protected void configureNetty(Bootstrap bootstrap, NettyTransportOptions options) { - bootstrap.option(ChannelOption.TCP_NODELAY, options.isTcpNoDelay()); - bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, options.getConnectTimeout()); - bootstrap.option(ChannelOption.SO_KEEPALIVE, options.isTcpKeepAlive()); - bootstrap.option(ChannelOption.SO_LINGER, options.getSoLinger()); - bootstrap.option(ChannelOption.ALLOCATOR, PartialPooledByteBufAllocator.INSTANCE); - - if (options.getSendBufferSize() != -1) { - bootstrap.option(ChannelOption.SO_SNDBUF, options.getSendBufferSize()); - } - - if (options.getReceiveBufferSize() != -1) { - bootstrap.option(ChannelOption.SO_RCVBUF, options.getReceiveBufferSize()); - bootstrap.option(ChannelOption.RCVBUF_ALLOCATOR, new FixedRecvByteBufAllocator(options.getReceiveBufferSize())); - } - - if (options.getTrafficClass() != -1) { - bootstrap.option(ChannelOption.IP_TOS, options.getTrafficClass()); - } - } - - protected void configureChannel(final Channel channel) throws Exception { - if (isSSL()) { - SslHandler sslHandler = NettyTransportSupport.createSslHandler(getRemoteLocation(), getSslOptions()); - sslHandler.handshakeFuture().addListener(new GenericFutureListener>() { - @Override - public void operationComplete(Future future) throws Exception { - if (future.isSuccess()) { - LOG.trace("SSL Handshake has completed: {}", channel); - connectionEstablished(channel); - } else { - LOG.trace("SSL Handshake has failed: {}", channel); - connectionFailed(channel, IOExceptionSupport.create(future.cause())); - } - } - }); - - channel.pipeline().addLast(sslHandler); - } - - channel.pipeline().addLast(new HttpClientCodec()); - channel.pipeline().addLast(new HttpObjectAggregator(8192)); - channel.pipeline().addLast(new NettyTcpTransportHandler()); - } - - protected void handleConnected(final Channel channel) throws Exception { - if (!isSSL()) { - connectionEstablished(channel); - } - } - - //----- State change handlers and checks ---------------------------------// - - /** - * Called when the transport has successfully connected and is ready for use. - */ - protected void connectionEstablished(Channel connectedChannel) { - LOG.info("WebSocket connectionEstablished! {}", connectedChannel); - channel = connectedChannel; - connected.set(true); - } - - /** - * Called when the transport connection failed and an error should be returned. - * - * @param failedChannel - * The Channel instance that failed. - * @param cause - * An IOException that describes the cause of the failed connection. - */ - protected void connectionFailed(Channel failedChannel, IOException cause) { - failureCause = IOExceptionSupport.create(cause); - channel = failedChannel; - connected.set(false); - handshakeFuture.setFailure(cause); - } - - private NettyTransportSslOptions getSslOptions() { - return (NettyTransportSslOptions) getTransportOptions(); - } - - private void checkConnected() throws IOException { - if (!connected.get()) { - throw new IOException("Cannot send to a non-connected transport."); - } + protected void handleConnected(Channel channel) throws Exception { + LOG.trace("Channel has become active, awaiting WebSocket handshake! Channel is {}", channel); } //----- Handle connection events -----------------------------------------// - private class NettyTcpTransportHandler extends SimpleChannelInboundHandler { + private class NettyWebSocketTransportHandler extends NettyDefaultHandler { private final WebSocketClientHandshaker handshaker; - public NettyTcpTransportHandler() { + public NettyWebSocketTransportHandler() { handshaker = WebSocketClientHandshakerFactory.newHandshaker( - remote, WebSocketVersion.V13, "amqp", false, new DefaultHttpHeaders()); - } - - @Override - public void handlerAdded(ChannelHandlerContext context) { - LOG.trace("Handler has become added! Channel is {}", context.channel()); - handshakeFuture = context.newPromise(); + getRemoteLocation(), WebSocketVersion.V13, AMQP_SUB_PROTOCOL, true, new DefaultHttpHeaders()); } @Override public void channelActive(ChannelHandlerContext context) throws Exception { - LOG.trace("Channel has become active! Channel is {}", context.channel()); handshaker.handshake(context.channel()); - } - @Override - public void channelInactive(ChannelHandlerContext context) throws Exception { - LOG.trace("Channel has gone inactive! Channel is {}", context.channel()); - if (connected.compareAndSet(true, false) && !closed.get()) { - LOG.trace("Firing onTransportClosed listener"); - listener.onTransportClosed(); - } - } - - @Override - public void exceptionCaught(ChannelHandlerContext context, Throwable cause) throws Exception { - LOG.trace("Exception on channel! Channel is {} -> {}", context.channel(), cause.getMessage()); - LOG.trace("Error Stack: ", cause); - if (connected.compareAndSet(true, false) && !closed.get()) { - LOG.trace("Firing onTransportError listener"); - if (pendingFailure != null) { - listener.onTransportError(pendingFailure); - } else { - listener.onTransportError(cause); - } - } else { - // Hold the first failure for later dispatch if connect succeeds. - // This will then trigger disconnect using the first error reported. - if (pendingFailure != null) { - LOG.trace("Holding error until connect succeeds: {}", cause.getMessage()); - pendingFailure = cause; - } - - if (!handshakeFuture.isDone()) { - handshakeFuture.setFailure(cause); - } - } + super.channelActive(context); } @Override @@ -437,8 +131,9 @@ public class NettyWSTransport implements NettyTransport { Channel ch = ctx.channel(); if (!handshaker.isHandshakeComplete()) { handshaker.finishHandshake(ch, (FullHttpResponse) message); - LOG.info("WebSocket Client connected! {}", ctx.channel()); - handshakeFuture.setSuccess(); + LOG.trace("WebSocket Client connected! {}", ctx.channel()); + // Now trigger super processing as we are really connected. + NettyWSTransport.super.handleConnected(ch); return; } @@ -447,7 +142,7 @@ public class NettyWSTransport implements NettyTransport { FullHttpResponse response = (FullHttpResponse) message; throw new IllegalStateException( "Unexpected FullHttpResponse (getStatus=" + response.getStatus() + - ", content=" + response.content().toString(CharsetUtil.UTF_8) + ')'); + ", content=" + response.content().toString(StandardCharsets.UTF_8) + ')'); } WebSocketFrame frame = (WebSocketFrame) message; @@ -457,10 +152,11 @@ public class NettyWSTransport implements NettyTransport { ctx.fireExceptionCaught(new IOException("Received invalid frame over WebSocket.")); } else if (frame instanceof BinaryWebSocketFrame) { BinaryWebSocketFrame binaryFrame = (BinaryWebSocketFrame) frame; - LOG.info("WebSocket Client received data: {} bytes", binaryFrame.content().readableBytes()); + LOG.trace("WebSocket Client received data: {} bytes", binaryFrame.content().readableBytes()); listener.onData(binaryFrame.content()); - } else if (frame instanceof PongWebSocketFrame) { - LOG.trace("WebSocket Client received pong"); + } else if (frame instanceof PingWebSocketFrame) { + LOG.trace("WebSocket Client received ping, response with pong"); + ch.write(new PongWebSocketFrame(frame.content())); } else if (frame instanceof CloseWebSocketFrame) { LOG.trace("WebSocket Client received closing"); ch.close();