From 5211afdf866fbcb5b538b2d5e2670dd5df385423 Mon Sep 17 00:00:00 2001 From: Martyn Taylor Date: Fri, 10 Nov 2017 12:31:29 +0000 Subject: [PATCH] ARTEMIS-1511 Refactor AMQP Transport for use with other test clients --- .../transport/amqp/client/AmqpClient.java | 4 +-- .../transport/amqp/client/AmqpConnection.java | 7 +++--- .../NettyTcpTransport.java | 8 +++--- .../transport => netty}/NettyTransport.java | 5 ++-- .../NettyTransportFactory.java | 25 +++++++++---------- .../NettyTransportListener.java | 2 +- .../NettyTransportOptions.java | 13 +++++++++- .../NettyTransportSslOptions.java | 2 +- .../NettyTransportSupport.java | 2 +- .../transport => netty}/NettyWSTransport.java | 13 +++++----- .../X509AliasKeyManager.java | 2 +- .../impl/netty/NettyHandshakeTimeoutTest.java | 6 ++--- 12 files changed, 51 insertions(+), 38 deletions(-) rename tests/artemis-test-support/src/main/java/org/apache/activemq/transport/{amqp/client/transport => netty}/NettyTcpTransport.java (98%) rename tests/artemis-test-support/src/main/java/org/apache/activemq/transport/{amqp/client/transport => netty}/NettyTransport.java (91%) rename tests/artemis-test-support/src/main/java/org/apache/activemq/transport/{amqp/client/transport => netty}/NettyTransportFactory.java (79%) rename tests/artemis-test-support/src/main/java/org/apache/activemq/transport/{amqp/client/transport => netty}/NettyTransportListener.java (96%) rename tests/artemis-test-support/src/main/java/org/apache/activemq/transport/{amqp/client/transport => netty}/NettyTransportOptions.java (93%) rename tests/artemis-test-support/src/main/java/org/apache/activemq/transport/{amqp/client/transport => netty}/NettyTransportSslOptions.java (99%) rename tests/artemis-test-support/src/main/java/org/apache/activemq/transport/{amqp/client/transport => netty}/NettyTransportSupport.java (99%) rename tests/artemis-test-support/src/main/java/org/apache/activemq/transport/{amqp/client/transport => netty}/NettyWSTransport.java (94%) rename tests/artemis-test-support/src/main/java/org/apache/activemq/transport/{amqp/client/transport => netty}/X509AliasKeyManager.java (97%) diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpClient.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpClient.java index fddaf9d077..d35d0ab490 100644 --- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpClient.java +++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpClient.java @@ -21,8 +21,8 @@ import java.util.Collections; import java.util.List; import java.util.Map; -import org.apache.activemq.transport.amqp.client.transport.NettyTransport; -import org.apache.activemq.transport.amqp.client.transport.NettyTransportFactory; +import org.apache.activemq.transport.netty.NettyTransport; +import org.apache.activemq.transport.netty.NettyTransportFactory; import org.apache.qpid.proton.amqp.Symbol; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java index 2fc720abbf..01e22886cf 100644 --- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java +++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java @@ -33,8 +33,9 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import org.apache.activemq.transport.InactivityIOException; +import org.apache.activemq.transport.netty.NettyTransport; import org.apache.activemq.transport.amqp.client.sasl.SaslAuthenticator; -import org.apache.activemq.transport.amqp.client.transport.NettyTransportListener; +import org.apache.activemq.transport.netty.NettyTransportListener; import org.apache.activemq.transport.amqp.client.util.AsyncResult; import org.apache.activemq.transport.amqp.client.util.ClientFuture; import org.apache.activemq.transport.amqp.client.util.IdGenerator; @@ -80,7 +81,7 @@ public class AmqpConnection extends AmqpAbstractResource implements private final AtomicLong sessionIdGenerator = new AtomicLong(); private final AtomicLong txIdGenerator = new AtomicLong(); private final Collector protonCollector = new CollectorImpl(); - private final org.apache.activemq.transport.amqp.client.transport.NettyTransport transport; + private final NettyTransport transport; private final Transport protonTransport = Transport.Factory.create(); private final String username; @@ -109,7 +110,7 @@ public class AmqpConnection extends AmqpAbstractResource implements private boolean trace; private boolean noContainerID = false; - public AmqpConnection(org.apache.activemq.transport.amqp.client.transport.NettyTransport transport, String username, String password) { + public AmqpConnection(NettyTransport transport, String username, String password) { setEndpoint(Connection.Factory.create()); getEndpoint().collect(protonCollector); diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTcpTransport.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/netty/NettyTcpTransport.java similarity index 98% rename from tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTcpTransport.java rename to tests/artemis-test-support/src/main/java/org/apache/activemq/transport/netty/NettyTcpTransport.java index 7ce3bb91b1..9eab670d48 100644 --- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTcpTransport.java +++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/netty/NettyTcpTransport.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.activemq.transport.amqp.client.transport; +package org.apache.activemq.transport.netty; import java.io.IOException; import java.net.URI; @@ -223,16 +223,16 @@ public class NettyTcpTransport implements NettyTransport { } @Override - public void send(ByteBuf output) throws IOException { + public ChannelFuture send(ByteBuf output) throws IOException { checkConnected(); int length = output.readableBytes(); if (length == 0) { - return; + return null; } LOG.trace("Attempted write of: {} bytes", length); - channel.writeAndFlush(output); + return channel.writeAndFlush(output); } @Override diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransport.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/netty/NettyTransport.java similarity index 91% rename from tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransport.java rename to tests/artemis-test-support/src/main/java/org/apache/activemq/transport/netty/NettyTransport.java index 4d5a389b6f..b06be3e4b8 100644 --- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransport.java +++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/netty/NettyTransport.java @@ -14,13 +14,14 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.activemq.transport.amqp.client.transport; +package org.apache.activemq.transport.netty; import java.io.IOException; import java.net.URI; import java.security.Principal; import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelFuture; /** * Base for all Netty based Transports in this client. @@ -37,7 +38,7 @@ public interface NettyTransport { ByteBuf allocateSendBuffer(int size) throws IOException; - void send(ByteBuf output) throws IOException; + ChannelFuture send(ByteBuf output) throws IOException; NettyTransportListener getTransportListener(); diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportFactory.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/netty/NettyTransportFactory.java similarity index 79% rename from tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportFactory.java rename to tests/artemis-test-support/src/main/java/org/apache/activemq/transport/netty/NettyTransportFactory.java index 30b2e21f41..5eab404168 100644 --- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportFactory.java +++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/netty/NettyTransportFactory.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.activemq.transport.amqp.client.transport; +package org.apache.activemq.transport.netty; import java.net.URI; import java.util.Map; @@ -65,19 +65,18 @@ public final class NettyTransportFactory { NettyTransport result = null; - switch (remoteURI.getScheme().toLowerCase()) { - case "tcp": - case "ssl": - result = new NettyTcpTransport(remoteURI, transportOptions); - break; - case "ws": - case "wss": - result = new NettyWSTransport(remoteURI, transportOptions); - break; - default: - throw new IllegalArgumentException("Invalid URI Scheme: " + remoteURI.getScheme()); + String scheme = remoteURI.getScheme().toLowerCase(); + if (scheme.startsWith("tcp") || scheme.startsWith("ssl")) { + result = new NettyTcpTransport(remoteURI, transportOptions); + } else if (scheme.startsWith("ws") || scheme.startsWith("wss")) { + // Check for ws subprotocol + if (scheme.contains("+")) { + transportOptions.setWsSubProtocol(scheme.substring(scheme.indexOf("+") + 1)); + } + result = new NettyWSTransport(remoteURI, transportOptions); + } else { + throw new IllegalArgumentException("Invalid URI Scheme: " + remoteURI.getScheme()); } - return result; } } diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportListener.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/netty/NettyTransportListener.java similarity index 96% rename from tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportListener.java rename to tests/artemis-test-support/src/main/java/org/apache/activemq/transport/netty/NettyTransportListener.java index 01635171f7..2921dc00c8 100644 --- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportListener.java +++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/netty/NettyTransportListener.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.activemq.transport.amqp.client.transport; +package org.apache.activemq.transport.netty; import io.netty.buffer.ByteBuf; diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportOptions.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/netty/NettyTransportOptions.java similarity index 93% rename from tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportOptions.java rename to tests/artemis-test-support/src/main/java/org/apache/activemq/transport/netty/NettyTransportOptions.java index c5022c1f38..4dda8898db 100644 --- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportOptions.java +++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/netty/NettyTransportOptions.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.activemq.transport.amqp.client.transport; +package org.apache.activemq.transport.netty; /** * Encapsulates all the TCP Transport options in one configuration object. @@ -31,6 +31,7 @@ public class NettyTransportOptions implements Cloneable { public static final int DEFAULT_CONNECT_TIMEOUT = 60000; public static final int DEFAULT_TCP_PORT = 5672; public static final boolean DEFAULT_TRACE_BYTES = false; + public static final String DEFAULT_WS_SUBPROTOCOL = NettyWSTransport.AMQP_SUB_PROTOCOL; public static final NettyTransportOptions INSTANCE = new NettyTransportOptions(); @@ -44,6 +45,7 @@ public class NettyTransportOptions implements Cloneable { private boolean tcpNoDelay = DEFAULT_TCP_NO_DELAY; private int defaultTcpPort = DEFAULT_TCP_PORT; private boolean traceBytes = DEFAULT_TRACE_BYTES; + private String wsSubProtocol = DEFAULT_WS_SUBPROTOCOL; /** * @return the currently set send buffer size in bytes. @@ -188,6 +190,14 @@ public class NettyTransportOptions implements Cloneable { return false; } + public String getWsSubProtocol() { + return wsSubProtocol; + } + + public void setWsSubProtocol(String wsSubProtocol) { + this.wsSubProtocol = wsSubProtocol; + } + @Override public NettyTransportOptions clone() { return copyOptions(new NettyTransportOptions()); @@ -202,6 +212,7 @@ public class NettyTransportOptions implements Cloneable { copy.setTcpKeepAlive(isTcpKeepAlive()); copy.setTcpNoDelay(isTcpNoDelay()); copy.setTrafficClass(getTrafficClass()); + copy.setWsSubProtocol(getWsSubProtocol()); return copy; } diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportSslOptions.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/netty/NettyTransportSslOptions.java similarity index 99% rename from tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportSslOptions.java rename to tests/artemis-test-support/src/main/java/org/apache/activemq/transport/netty/NettyTransportSslOptions.java index 3289fce1d8..c575bdacf8 100644 --- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportSslOptions.java +++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/netty/NettyTransportSslOptions.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.activemq.transport.amqp.client.transport; +package org.apache.activemq.transport.netty; import java.util.Arrays; import java.util.Collections; diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportSupport.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/netty/NettyTransportSupport.java similarity index 99% rename from tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportSupport.java rename to tests/artemis-test-support/src/main/java/org/apache/activemq/transport/netty/NettyTransportSupport.java index d41c669135..9e0c2d7840 100644 --- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportSupport.java +++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/netty/NettyTransportSupport.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.activemq.transport.amqp.client.transport; +package org.apache.activemq.transport.netty; import java.io.File; import java.io.FileInputStream; diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyWSTransport.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/netty/NettyWSTransport.java similarity index 94% rename from tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyWSTransport.java rename to tests/artemis-test-support/src/main/java/org/apache/activemq/transport/netty/NettyWSTransport.java index 9b0e6e245f..08f48168b2 100644 --- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyWSTransport.java +++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/netty/NettyWSTransport.java @@ -14,12 +14,13 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.activemq.transport.amqp.client.transport; +package org.apache.activemq.transport.netty; import java.io.IOException; import java.net.URI; import java.nio.charset.StandardCharsets; +import io.netty.channel.ChannelFuture; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -50,7 +51,7 @@ public class NettyWSTransport extends NettyTcpTransport { private static final Logger LOG = LoggerFactory.getLogger(NettyWSTransport.class); - private static final String AMQP_SUB_PROTOCOL = "amqp"; + public static final String AMQP_SUB_PROTOCOL = "amqp"; /** * Create a new transport instance @@ -79,16 +80,16 @@ public class NettyWSTransport extends NettyTcpTransport { } @Override - public void send(ByteBuf output) throws IOException { + public ChannelFuture send(ByteBuf output) throws IOException { checkConnected(); int length = output.readableBytes(); if (length == 0) { - return; + return null; } LOG.trace("Attempted write of: {} bytes", length); - channel.writeAndFlush(new BinaryWebSocketFrame(output)); + return channel.writeAndFlush(new BinaryWebSocketFrame(output)); } @Override @@ -115,7 +116,7 @@ public class NettyWSTransport extends NettyTcpTransport { NettyWebSocketTransportHandler() { handshaker = WebSocketClientHandshakerFactory.newHandshaker( - getRemoteLocation(), WebSocketVersion.V13, AMQP_SUB_PROTOCOL, + getRemoteLocation(), WebSocketVersion.V13, options.getWsSubProtocol(), true, new DefaultHttpHeaders(), getMaxFrameSize()); } diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/X509AliasKeyManager.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/netty/X509AliasKeyManager.java similarity index 97% rename from tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/X509AliasKeyManager.java rename to tests/artemis-test-support/src/main/java/org/apache/activemq/transport/netty/X509AliasKeyManager.java index 42d6a0b721..101b3489da 100644 --- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/X509AliasKeyManager.java +++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/netty/X509AliasKeyManager.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.activemq.transport.amqp.client.transport; +package org.apache.activemq.transport.netty; import javax.net.ssl.SSLEngine; import javax.net.ssl.X509ExtendedKeyManager; diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/NettyHandshakeTimeoutTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/NettyHandshakeTimeoutTest.java index 309897903b..ec9c99530b 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/NettyHandshakeTimeoutTest.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/NettyHandshakeTimeoutTest.java @@ -25,9 +25,9 @@ import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServers; import org.apache.activemq.artemis.junit.Wait; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; -import org.apache.activemq.transport.amqp.client.transport.NettyTransport; -import org.apache.activemq.transport.amqp.client.transport.NettyTransportFactory; -import org.apache.activemq.transport.amqp.client.transport.NettyTransportListener; +import org.apache.activemq.transport.netty.NettyTransport; +import org.apache.activemq.transport.netty.NettyTransportFactory; +import org.apache.activemq.transport.netty.NettyTransportListener; import org.junit.Test; import java.net.URI;