From 4ad78c7fd044fe13e6423707559d61228e4c48cd Mon Sep 17 00:00:00 2001 From: Timothy Bish Date: Thu, 11 May 2017 16:46:22 -0400 Subject: [PATCH] ARTEMIS-1159 Fixes and Improvements to the AMQP test client. Port fixes to the AMQP test client recently made in the 5.x version. Fixes some thread safety issues in the Transport. Ensures more timely shutdown of the Connection executor. Uses a dynamic Proxy to generate Read-Only Proton wrappers instead of the hand crafted versions. Adds additional logging for test data --- .../transport/amqp/client/AmqpConnection.java | 14 +- .../transport/amqp/client/AmqpMessage.java | 4 +- .../transport/amqp/client/AmqpReceiver.java | 4 +- .../transport/amqp/client/AmqpSender.java | 4 +- .../transport/amqp/client/AmqpSession.java | 4 +- .../client/transport/NettyTcpTransport.java | 322 ++++++++------- .../amqp/client/transport/NettyTransport.java | 2 +- .../transport/NettyTransportFactory.java | 16 +- .../transport/NettyTransportListener.java | 14 +- .../transport/NettyTransportOptions.java | 59 ++- .../transport/NettyTransportSslOptions.java | 56 ++- .../transport/NettyTransportSupport.java | 81 ++-- .../client/transport/NettyWSTransport.java | 378 ++---------------- .../client/util/UnmodifiableConnection.java | 202 ---------- .../client/util/UnmodifiableDelivery.java | 179 --------- .../amqp/client/util/UnmodifiableLink.java | 306 -------------- .../amqp/client/util/UnmodifiableProxy.java | 167 ++++++++ .../client/util/UnmodifiableReceiver.java | 65 --- .../amqp/client/util/UnmodifiableSender.java | 51 --- .../amqp/client/util/UnmodifiableSession.java | 198 --------- .../client/util/UnmodifiableTransport.java | 274 ------------- 21 files changed, 565 insertions(+), 1835 deletions(-) delete mode 100644 tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableConnection.java delete mode 100644 tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableDelivery.java delete mode 100644 tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableLink.java create mode 100644 tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableProxy.java delete mode 100644 tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableReceiver.java delete mode 100644 tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableSender.java delete mode 100644 tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableSession.java delete mode 100644 tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableTransport.java diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java index a30bae12b4..062729cb3b 100644 --- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java +++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java @@ -25,8 +25,8 @@ import java.util.Collections; import java.util.List; import java.util.Map; import java.util.UUID; -import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -39,7 +39,7 @@ import org.apache.activemq.transport.amqp.client.util.AsyncResult; import org.apache.activemq.transport.amqp.client.util.ClientFuture; import org.apache.activemq.transport.amqp.client.util.IdGenerator; import org.apache.activemq.transport.amqp.client.util.NoOpAsyncResult; -import org.apache.activemq.transport.amqp.client.util.UnmodifiableConnection; +import org.apache.activemq.transport.amqp.client.util.UnmodifiableProxy; import org.apache.qpid.proton.amqp.Symbol; import org.apache.qpid.proton.engine.Collector; import org.apache.qpid.proton.engine.Connection; @@ -74,7 +74,7 @@ public class AmqpConnection extends AmqpAbstractResource implements public static final long DEFAULT_CLOSE_TIMEOUT = 30000; public static final long DEFAULT_DRAIN_TIMEOUT = 60000; - private final ScheduledExecutorService serializer; + private ScheduledThreadPoolExecutor serializer; private final AtomicBoolean closed = new AtomicBoolean(); private final AtomicBoolean connected = new AtomicBoolean(); private final AtomicLong sessionIdGenerator = new AtomicLong(); @@ -121,7 +121,7 @@ public class AmqpConnection extends AmqpAbstractResource implements this.connectionId = CONNECTION_ID_GENERATOR.generateId(); this.remoteURI = transport.getRemoteLocation(); - this.serializer = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() { + this.serializer = new ScheduledThreadPoolExecutor(1, new ThreadFactory() { @Override public Thread newThread(Runnable runner) { @@ -132,6 +132,10 @@ public class AmqpConnection extends AmqpAbstractResource implements } }); + // Ensure timely shutdown + this.serializer.setExecuteExistingDelayedTasksAfterShutdownPolicy(false); + this.serializer.setContinueExistingPeriodicTasksAfterShutdownPolicy(false); + this.transport.setTransportListener(this); } @@ -434,7 +438,7 @@ public class AmqpConnection extends AmqpAbstractResource implements } public Connection getConnection() { - return new UnmodifiableConnection(getEndpoint()); + return UnmodifiableProxy.connectionProxy(getEndpoint()); } public AmqpConnectionListener getListener() { diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java index bf9e0b5117..a10d27a6e8 100644 --- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java +++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java @@ -20,7 +20,7 @@ import java.util.HashMap; import java.util.Map; import java.util.NoSuchElementException; -import org.apache.activemq.transport.amqp.client.util.UnmodifiableDelivery; +import org.apache.activemq.transport.amqp.client.util.UnmodifiableProxy; import org.apache.qpid.proton.Proton; import org.apache.qpid.proton.amqp.Binary; import org.apache.qpid.proton.amqp.DescribedType; @@ -100,7 +100,7 @@ public class AmqpMessage { */ public Delivery getWrappedDelivery() { if (delivery != null) { - return new UnmodifiableDelivery(delivery); + return UnmodifiableProxy.deliveryProxy(delivery); } return null; diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java index 8653cffa12..c2c721708e 100644 --- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java +++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java @@ -35,7 +35,7 @@ import javax.jms.InvalidDestinationException; import org.apache.activemq.transport.amqp.client.util.AsyncResult; import org.apache.activemq.transport.amqp.client.util.ClientFuture; import org.apache.activemq.transport.amqp.client.util.IOExceptionSupport; -import org.apache.activemq.transport.amqp.client.util.UnmodifiableReceiver; +import org.apache.activemq.transport.amqp.client.util.UnmodifiableProxy; import org.apache.qpid.jms.JmsOperationTimedOutException; import org.apache.qpid.proton.amqp.Binary; import org.apache.qpid.proton.amqp.DescribedType; @@ -677,7 +677,7 @@ public class AmqpReceiver extends AmqpAbstractResource { * @return an unmodifiable view of the underlying Receiver instance. */ public Receiver getReceiver() { - return new UnmodifiableReceiver(getEndpoint()); + return UnmodifiableProxy.receiverProxy(getEndpoint()); } // ----- Receiver configuration properties --------------------------------// diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSender.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSender.java index 03bd28e846..846739a23d 100644 --- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSender.java +++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSender.java @@ -29,7 +29,7 @@ import javax.jms.InvalidDestinationException; import org.apache.activemq.transport.amqp.client.util.AsyncResult; import org.apache.activemq.transport.amqp.client.util.ClientFuture; -import org.apache.activemq.transport.amqp.client.util.UnmodifiableSender; +import org.apache.activemq.transport.amqp.client.util.UnmodifiableProxy; import org.apache.qpid.proton.amqp.Binary; import org.apache.qpid.proton.amqp.Symbol; import org.apache.qpid.proton.amqp.messaging.Accepted; @@ -231,7 +231,7 @@ public class AmqpSender extends AmqpAbstractResource { * @return an unmodifiable view of the underlying Sender instance. */ public Sender getSender() { - return new UnmodifiableSender(getEndpoint()); + return UnmodifiableProxy.senderProxy(getEndpoint()); } /** diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSession.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSession.java index 677b354ff4..8c331cae17 100644 --- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSession.java +++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSession.java @@ -24,7 +24,7 @@ import java.util.concurrent.atomic.AtomicLong; import org.apache.activemq.transport.amqp.client.util.AsyncResult; import org.apache.activemq.transport.amqp.client.util.ClientFuture; -import org.apache.activemq.transport.amqp.client.util.UnmodifiableSession; +import org.apache.activemq.transport.amqp.client.util.UnmodifiableProxy; import org.apache.qpid.proton.amqp.Symbol; import org.apache.qpid.proton.amqp.messaging.Source; import org.apache.qpid.proton.amqp.messaging.Target; @@ -598,7 +598,7 @@ public class AmqpSession extends AmqpAbstractResource { } public Session getSession() { - return new UnmodifiableSession(getEndpoint()); + return UnmodifiableProxy.sessionProxy(getEndpoint()); } public boolean isInTransaction() { diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTcpTransport.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTcpTransport.java index 29963a02d2..473fd759f7 100644 --- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTcpTransport.java +++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTcpTransport.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. @@ -6,7 +6,7 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -23,25 +23,29 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.activemq.transport.amqp.client.util.IOExceptionSupport; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import io.netty.bootstrap.Bootstrap; import io.netty.buffer.ByteBuf; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; +import io.netty.channel.ChannelPipeline; import io.netty.channel.EventLoopGroup; import io.netty.channel.FixedRecvByteBufAllocator; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.handler.logging.LoggingHandler; import io.netty.handler.ssl.SslHandler; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.GenericFutureListener; -import org.apache.activemq.transport.amqp.client.util.IOExceptionSupport; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * TCP based transport that uses Netty as the underlying IO layer. @@ -50,28 +54,27 @@ public class NettyTcpTransport implements NettyTransport { private static final Logger LOG = LoggerFactory.getLogger(NettyTcpTransport.class); - private static final int QUIET_PERIOD = 20; private static final int SHUTDOWN_TIMEOUT = 100; protected Bootstrap bootstrap; protected EventLoopGroup group; protected Channel channel; protected NettyTransportListener listener; - protected NettyTransportOptions options; + protected final NettyTransportOptions options; protected final URI remote; - protected boolean secure; private final AtomicBoolean connected = new AtomicBoolean(); private final AtomicBoolean closed = new AtomicBoolean(); private final CountDownLatch connectLatch = new CountDownLatch(1); - private IOException failureCause; - private Throwable pendingFailure; + private volatile IOException failureCause; /** * Create a new transport instance * - * @param remoteLocation the URI that defines the remote resource to connect to. - * @param options the transport options used to configure the socket connection. + * @param remoteLocation + * the URI that defines the remote resource to connect to. + * @param options + * the transport options used to configure the socket connection. */ public NettyTcpTransport(URI remoteLocation, NettyTransportOptions options) { this(null, remoteLocation, options); @@ -80,15 +83,25 @@ public class NettyTcpTransport implements NettyTransport { /** * Create a new transport instance * - * @param listener the TransportListener that will receive events from this Transport. - * @param remoteLocation the URI that defines the remote resource to connect to. - * @param options the transport options used to configure the socket connection. + * @param listener + * the TransportListener that will receive events from this Transport. + * @param remoteLocation + * the URI that defines the remote resource to connect to. + * @param options + * the transport options used to configure the socket connection. */ public NettyTcpTransport(NettyTransportListener listener, URI remoteLocation, NettyTransportOptions options) { + if (options == null) { + throw new IllegalArgumentException("Transport Options cannot be null"); + } + + if (remoteLocation == null) { + throw new IllegalArgumentException("Transport remote location cannot be null"); + } + this.options = options; this.listener = listener; this.remote = remoteLocation; - this.secure = remoteLocation.getScheme().equalsIgnoreCase("ssl"); } @Override @@ -98,16 +111,27 @@ public class NettyTcpTransport implements NettyTransport { throw new IllegalStateException("A transport listener must be set before connection attempts."); } + final SslHandler sslHandler; + if (isSSL()) { + try { + sslHandler = NettyTransportSupport.createSslHandler(getRemoteLocation(), getSslOptions()); + } catch (Exception ex) { + // TODO: can we stop it throwing Exception? + throw IOExceptionSupport.create(ex); + } + } else { + sslHandler = null; + } + group = new NioEventLoopGroup(1); bootstrap = new Bootstrap(); bootstrap.group(group); bootstrap.channel(NioSocketChannel.class); bootstrap.handler(new ChannelInitializer() { - @Override public void initChannel(Channel connectedChannel) throws Exception { - configureChannel(connectedChannel); + configureChannel(connectedChannel, sslHandler); } }); @@ -118,12 +142,8 @@ public class NettyTcpTransport implements NettyTransport { @Override public void operationComplete(ChannelFuture future) throws Exception { - if (future.isSuccess()) { - handleConnected(future.channel()); - } else if (future.isCancelled()) { - connectionFailed(future.channel(), new IOException("Connection attempt was cancelled")); - } else { - connectionFailed(future.channel(), IOExceptionSupport.create(future.cause())); + if (!future.isSuccess()) { + handleException(future.channel(), IOExceptionSupport.create(future.cause())); } } }); @@ -143,7 +163,10 @@ public class NettyTcpTransport implements NettyTransport { channel = null; } if (group != null) { - group.shutdownGracefully(QUIET_PERIOD, SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS); + Future fut = group.shutdownGracefully(0, SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS); + if (!fut.awaitUninterruptibly(2 * SHUTDOWN_TIMEOUT)) { + LOG.trace("Channel group shutdown failed to complete in allotted time"); + } group = null; } @@ -154,8 +177,8 @@ public class NettyTcpTransport implements NettyTransport { @Override public void run() { - if (pendingFailure != null) { - channel.pipeline().fireExceptionCaught(pendingFailure); + if (failureCause != null) { + channel.pipeline().fireExceptionCaught(failureCause); } } }); @@ -169,18 +192,24 @@ public class NettyTcpTransport implements NettyTransport { @Override public boolean isSSL() { - return secure; + return options.isSSL(); } @Override public void close() throws IOException { if (closed.compareAndSet(false, true)) { connected.set(false); - if (channel != null) { - channel.close().syncUninterruptibly(); - } - if (group != null) { - group.shutdownGracefully(QUIET_PERIOD, SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS); + try { + if (channel != null) { + channel.close().syncUninterruptibly(); + } + } finally { + if (group != null) { + Future fut = group.shutdownGracefully(0, SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS); + if (!fut.awaitUninterruptibly(2 * SHUTDOWN_TIMEOUT)) { + LOG.trace("Channel group shutdown failed to complete in allotted time"); + } + } } } } @@ -216,14 +245,6 @@ public class NettyTcpTransport implements NettyTransport { @Override public NettyTransportOptions getTransportOptions() { - if (options == null) { - if (isSSL()) { - options = NettyTransportSslOptions.INSTANCE; - } else { - options = NettyTransportOptions.INSTANCE; - } - } - return options; } @@ -234,36 +255,106 @@ public class NettyTcpTransport implements NettyTransport { @Override public Principal getLocalPrincipal() { - if (!isSSL()) { - throw new UnsupportedOperationException("Not connected to a secure channel"); + Principal result = null; + + if (isSSL()) { + SslHandler sslHandler = channel.pipeline().get(SslHandler.class); + result = sslHandler.engine().getSession().getLocalPrincipal(); } - SslHandler sslHandler = channel.pipeline().get(SslHandler.class); - - return sslHandler.engine().getSession().getLocalPrincipal(); + return result; } - //----- Internal implementation details, can be overridden as needed --// + // ----- Internal implementation details, can be overridden as needed -----// protected String getRemoteHost() { return remote.getHost(); } protected int getRemotePort() { - int port = remote.getPort(); - - if (port <= 0) { - if (isSSL()) { - port = getSslOptions().getDefaultSslPort(); - } else { - port = getTransportOptions().getDefaultTcpPort(); - } + if (remote.getPort() != -1) { + return remote.getPort(); + } else { + return isSSL() ? getSslOptions().getDefaultSslPort() : getTransportOptions().getDefaultTcpPort(); } - - return port; } - protected void configureNetty(Bootstrap bootstrap, NettyTransportOptions options) { + protected void addAdditionalHandlers(ChannelPipeline pipeline) { + + } + + protected ChannelInboundHandlerAdapter createChannelHandler() { + return new NettyTcpTransportHandler(); + } + + // ----- Event Handlers which can be overridden in subclasses -------------// + + protected void handleConnected(Channel channel) throws Exception { + LOG.trace("Channel has become active! Channel is {}", channel); + connectionEstablished(channel); + } + + protected void handleChannelInactive(Channel channel) throws Exception { + LOG.trace("Channel has gone inactive! Channel is {}", channel); + if (connected.compareAndSet(true, false) && !closed.get()) { + LOG.trace("Firing onTransportClosed listener"); + listener.onTransportClosed(); + } + } + + protected void handleException(Channel channel, Throwable cause) throws Exception { + LOG.trace("Exception on channel! Channel is {}", channel); + if (connected.compareAndSet(true, false) && !closed.get()) { + LOG.trace("Firing onTransportError listener"); + if (failureCause != null) { + listener.onTransportError(failureCause); + } else { + listener.onTransportError(cause); + } + } else { + // Hold the first failure for later dispatch if connect succeeds. + // This will then trigger disconnect using the first error reported. + if (failureCause == null) { + LOG.trace("Holding error until connect succeeds: {}", cause.getMessage()); + failureCause = IOExceptionSupport.create(cause); + } + + connectionFailed(channel, failureCause); + } + } + + // ----- State change handlers and checks ---------------------------------// + + protected final void checkConnected() throws IOException { + if (!connected.get()) { + throw new IOException("Cannot send to a non-connected transport."); + } + } + + /* + * Called when the transport has successfully connected and is ready for use. + */ + private void connectionEstablished(Channel connectedChannel) { + channel = connectedChannel; + connected.set(true); + connectLatch.countDown(); + } + + /* + * Called when the transport connection failed and an error should be returned. + */ + private void connectionFailed(Channel failedChannel, IOException cause) { + failureCause = cause; + channel = failedChannel; + connected.set(false); + connectLatch.countDown(); + } + + private NettyTransportSslOptions getSslOptions() { + return (NettyTransportSslOptions) getTransportOptions(); + } + + private void configureNetty(Bootstrap bootstrap, NettyTransportOptions options) { bootstrap.option(ChannelOption.TCP_NODELAY, options.isTcpNoDelay()); bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, options.getConnectTimeout()); bootstrap.option(ChannelOption.SO_KEEPALIVE, options.isTcpKeepAlive()); @@ -283,105 +374,66 @@ public class NettyTcpTransport implements NettyTransport { } } - protected void configureChannel(final Channel channel) throws Exception { + private void configureChannel(final Channel channel, final SslHandler sslHandler) throws Exception { if (isSSL()) { - SslHandler sslHandler = NettyTransportSupport.createSslHandler(getRemoteLocation(), getSslOptions()); - sslHandler.handshakeFuture().addListener(new GenericFutureListener>() { - @Override - public void operationComplete(Future future) throws Exception { - if (future.isSuccess()) { - LOG.trace("SSL Handshake has completed: {}", channel); - connectionEstablished(channel); - } else { - LOG.trace("SSL Handshake has failed: {}", channel); - connectionFailed(channel, IOExceptionSupport.create(future.cause())); - } - } - }); - channel.pipeline().addLast(sslHandler); } - channel.pipeline().addLast(new NettyTcpTransportHandler()); - } - - protected void handleConnected(final Channel channel) throws Exception { - if (!isSSL()) { - connectionEstablished(channel); + if (getTransportOptions().isTraceBytes()) { + channel.pipeline().addLast("logger", new LoggingHandler(getClass())); } + + addAdditionalHandlers(channel.pipeline()); + + channel.pipeline().addLast(createChannelHandler()); } - //----- State change handlers and checks ---------------------------------// + // ----- Handle connection events -----------------------------------------// - /** - * Called when the transport has successfully connected and is ready for use. - */ - protected void connectionEstablished(Channel connectedChannel) { - channel = connectedChannel; - connected.set(true); - connectLatch.countDown(); - } + protected abstract class NettyDefaultHandler extends SimpleChannelInboundHandler { - /** - * Called when the transport connection failed and an error should be returned. - * - * @param failedChannel The Channel instance that failed. - * @param cause An IOException that describes the cause of the failed connection. - */ - protected void connectionFailed(Channel failedChannel, IOException cause) { - failureCause = IOExceptionSupport.create(cause); - channel = failedChannel; - connected.set(false); - connectLatch.countDown(); - } - - private NettyTransportSslOptions getSslOptions() { - return (NettyTransportSslOptions) getTransportOptions(); - } - - private void checkConnected() throws IOException { - if (!connected.get()) { - throw new IOException("Cannot send to a non-connected transport."); + @Override + public void channelRegistered(ChannelHandlerContext context) throws Exception { + channel = context.channel(); } - } - - //----- Handle connection events -----------------------------------------// - - private class NettyTcpTransportHandler extends SimpleChannelInboundHandler { @Override public void channelActive(ChannelHandlerContext context) throws Exception { - LOG.trace("Channel has become active! Channel is {}", context.channel()); + // In the Secure case we need to let the handshake complete before we + // trigger the connected event. + if (!isSSL()) { + handleConnected(context.channel()); + } else { + SslHandler sslHandler = context.pipeline().get(SslHandler.class); + sslHandler.handshakeFuture().addListener(new GenericFutureListener>() { + @Override + public void operationComplete(Future future) throws Exception { + if (future.isSuccess()) { + LOG.trace("SSL Handshake has completed: {}", channel); + handleConnected(channel); + } else { + LOG.trace("SSL Handshake has failed: {}", channel); + handleException(channel, future.cause()); + } + } + }); + } } @Override public void channelInactive(ChannelHandlerContext context) throws Exception { - LOG.trace("Channel has gone inactive! Channel is {}", context.channel()); - if (connected.compareAndSet(true, false) && !closed.get()) { - LOG.trace("Firing onTransportClosed listener"); - listener.onTransportClosed(); - } + handleChannelInactive(context.channel()); } @Override public void exceptionCaught(ChannelHandlerContext context, Throwable cause) throws Exception { - LOG.trace("Exception on channel! Channel is {}", context.channel()); - if (connected.compareAndSet(true, false) && !closed.get()) { - LOG.trace("Firing onTransportError listener"); - if (pendingFailure != null) { - listener.onTransportError(pendingFailure); - } else { - listener.onTransportError(cause); - } - } else { - // Hold the first failure for later dispatch if connect succeeds. - // This will then trigger disconnect using the first error reported. - if (pendingFailure != null) { - LOG.trace("Holding error until connect succeeds: {}", cause.getMessage()); - pendingFailure = cause; - } - } + handleException(context.channel(), cause); } + } + + // ----- Handle Binary data from connection -------------------------------// + + protected class NettyTcpTransportHandler extends NettyDefaultHandler { @Override protected void channelRead0(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception { diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransport.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransport.java index a2bacdca78..ad0a1fbc1b 100644 --- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransport.java +++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransport.java @@ -23,7 +23,7 @@ import java.security.Principal; import io.netty.buffer.ByteBuf; /** - * + * Base for all Netty based Transports in this client. */ public interface NettyTransport { diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportFactory.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportFactory.java index f6eae46385..30b2e21f41 100644 --- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportFactory.java +++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportFactory.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. @@ -6,7 +6,7 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -30,12 +30,16 @@ public final class NettyTransportFactory { } /** - * Creates an instance of the given Transport and configures it using the - * properties set on the given remote broker URI. + * Creates an instance of the given Transport and configures it using the properties set on + * the given remote broker URI. + * + * @param remoteURI + * The URI used to connect to a remote Peer. * - * @param remoteURI The URI used to connect to a remote Peer. * @return a new Transport instance. - * @throws Exception if an error occurs while creating the Transport instance. + * + * @throws Exception + * if an error occurs while creating the Transport instance. */ public static NettyTransport createTransport(URI remoteURI) throws Exception { Map map = PropertyUtil.parseQuery(remoteURI.getQuery()); diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportListener.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportListener.java index c23ca8c714..01635171f7 100644 --- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportListener.java +++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportListener.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. @@ -6,7 +6,7 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -19,15 +19,16 @@ package org.apache.activemq.transport.amqp.client.transport; import io.netty.buffer.ByteBuf; /** - * Listener interface that should be implemented by users of the various - * QpidJMS Transport classes. + * Listener interface that should be implemented by users of the various QpidJMS Transport + * classes. */ public interface NettyTransportListener { /** * Called when new incoming data has become available. * - * @param incoming the next incoming packet of data. + * @param incoming + * the next incoming packet of data. */ void onData(ByteBuf incoming); @@ -39,7 +40,8 @@ public interface NettyTransportListener { /** * Called when an error occurs during normal Transport operations. * - * @param cause the error that triggered this event. + * @param cause + * the error that triggered this event. */ void onTransportError(Throwable cause); diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportOptions.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportOptions.java index 3ffb8c8d22..c5022c1f38 100644 --- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportOptions.java +++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportOptions.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. @@ -6,7 +6,7 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -30,6 +30,7 @@ public class NettyTransportOptions implements Cloneable { public static final int DEFAULT_SO_TIMEOUT = -1; public static final int DEFAULT_CONNECT_TIMEOUT = 60000; public static final int DEFAULT_TCP_PORT = 5672; + public static final boolean DEFAULT_TRACE_BYTES = false; public static final NettyTransportOptions INSTANCE = new NettyTransportOptions(); @@ -42,6 +43,7 @@ public class NettyTransportOptions implements Cloneable { private boolean tcpKeepAlive = DEFAULT_TCP_KEEP_ALIVE; private boolean tcpNoDelay = DEFAULT_TCP_NO_DELAY; private int defaultTcpPort = DEFAULT_TCP_PORT; + private boolean traceBytes = DEFAULT_TRACE_BYTES; /** * @return the currently set send buffer size in bytes. @@ -51,11 +53,14 @@ public class NettyTransportOptions implements Cloneable { } /** - * Sets the send buffer size in bytes, the value must be greater than zero - * or an {@link IllegalArgumentException} will be thrown. + * Sets the send buffer size in bytes, the value must be greater than zero or an + * {@link IllegalArgumentException} will be thrown. * - * @param sendBufferSize the new send buffer size for the TCP Transport. - * @throws IllegalArgumentException if the value given is not in the valid range. + * @param sendBufferSize + * the new send buffer size for the TCP Transport. + * + * @throws IllegalArgumentException + * if the value given is not in the valid range. */ public void setSendBufferSize(int sendBufferSize) { if (sendBufferSize <= 0) { @@ -73,11 +78,14 @@ public class NettyTransportOptions implements Cloneable { } /** - * Sets the receive buffer size in bytes, the value must be greater than zero - * or an {@link IllegalArgumentException} will be thrown. + * Sets the receive buffer size in bytes, the value must be greater than zero or an + * {@link IllegalArgumentException} will be thrown. * - * @param receiveBufferSize the new receive buffer size for the TCP Transport. - * @throws IllegalArgumentException if the value given is not in the valid range. + * @param receiveBufferSize + * the new receive buffer size for the TCP Transport. + * + * @throws IllegalArgumentException + * if the value given is not in the valid range. */ public void setReceiveBufferSize(int receiveBufferSize) { if (receiveBufferSize <= 0) { @@ -95,11 +103,13 @@ public class NettyTransportOptions implements Cloneable { } /** - * Sets the traffic class value used by the TCP connection, valid - * range is between 0 and 255. + * Sets the traffic class value used by the TCP connection, valid range is between 0 and 255. * - * @param trafficClass the new traffic class value. - * @throws IllegalArgumentException if the value given is not in the valid range. + * @param trafficClass + * the new traffic class value. + * + * @throws IllegalArgumentException + * if the value given is not in the valid range. */ public void setTrafficClass(int trafficClass) { if (trafficClass < 0 || trafficClass > 255) { @@ -157,6 +167,27 @@ public class NettyTransportOptions implements Cloneable { this.defaultTcpPort = defaultTcpPort; } + /** + * @return true if the transport should enable byte tracing + */ + public boolean isTraceBytes() { + return traceBytes; + } + + /** + * Determines if the transport should add a logger for bytes in / out + * + * @param traceBytes + * should the transport log the bytes in and out. + */ + public void setTraceBytes(boolean traceBytes) { + this.traceBytes = traceBytes; + } + + public boolean isSSL() { + return false; + } + @Override public NettyTransportOptions clone() { return copyOptions(new NettyTransportOptions()); diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportSslOptions.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportSslOptions.java index e256fbba8a..3289fce1d8 100644 --- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportSslOptions.java +++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportSslOptions.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. @@ -6,7 +6,7 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -21,9 +21,8 @@ import java.util.Collections; import java.util.List; /** - * Holds the defined SSL options for connections that operate over a secure - * transport. Options are read from the environment and can be overridden by - * specifying them on the connection URI. + * Holds the defined SSL options for connections that operate over a secure transport. Options + * are read from the environment and can be overridden by specifying them on the connection URI. */ public class NettyTransportSslOptions extends NettyTransportOptions { @@ -31,7 +30,7 @@ public class NettyTransportSslOptions extends NettyTransportOptions { public static final String DEFAULT_CONTEXT_PROTOCOL = "TLS"; public static final boolean DEFAULT_TRUST_ALL = false; public static final boolean DEFAULT_VERIFY_HOST = false; - public static final List DEFAULT_DISABLED_PROTOCOLS = Collections.unmodifiableList(Arrays.asList(new String[]{"SSLv2Hello", "SSLv3"})); + public static final List DEFAULT_DISABLED_PROTOCOLS = Collections.unmodifiableList(Arrays.asList(new String[] {"SSLv2Hello", "SSLv3"})); public static final int DEFAULT_SSL_PORT = 5671; public static final NettyTransportSslOptions INSTANCE = new NettyTransportSslOptions(); @@ -69,7 +68,8 @@ public class NettyTransportSslOptions extends NettyTransportOptions { /** * Sets the location on disk of the key store to use. * - * @param keyStoreLocation the keyStoreLocation to use to create the key manager. + * @param keyStoreLocation + * the keyStoreLocation to use to create the key manager. */ public void setKeyStoreLocation(String keyStoreLocation) { this.keyStoreLocation = keyStoreLocation; @@ -83,7 +83,8 @@ public class NettyTransportSslOptions extends NettyTransportOptions { } /** - * @param keyStorePassword the keyStorePassword to set + * @param keyStorePassword + * the keyStorePassword to set */ public void setKeyStorePassword(String keyStorePassword) { this.keyStorePassword = keyStorePassword; @@ -97,7 +98,8 @@ public class NettyTransportSslOptions extends NettyTransportOptions { } /** - * @param trustStoreLocation the trustStoreLocation to set + * @param trustStoreLocation + * the trustStoreLocation to set */ public void setTrustStoreLocation(String trustStoreLocation) { this.trustStoreLocation = trustStoreLocation; @@ -111,7 +113,8 @@ public class NettyTransportSslOptions extends NettyTransportOptions { } /** - * @param trustStorePassword the trustStorePassword to set + * @param trustStorePassword + * the trustStorePassword to set */ public void setTrustStorePassword(String trustStorePassword) { this.trustStorePassword = trustStorePassword; @@ -125,7 +128,8 @@ public class NettyTransportSslOptions extends NettyTransportOptions { } /** - * @param storeType the format that the store files are encoded in. + * @param storeType + * the format that the store files are encoded in. */ public void setStoreType(String storeType) { this.storeType = storeType; @@ -139,7 +143,8 @@ public class NettyTransportSslOptions extends NettyTransportOptions { } /** - * @param enabledCipherSuites the enabledCipherSuites to set + * @param enabledCipherSuites + * the enabledCipherSuites to set */ public void setEnabledCipherSuites(String[] enabledCipherSuites) { this.enabledCipherSuites = enabledCipherSuites; @@ -153,7 +158,8 @@ public class NettyTransportSslOptions extends NettyTransportOptions { } /** - * @param disabledCipherSuites the disabledCipherSuites to set + * @param disabledCipherSuites + * the disabledCipherSuites to set */ public void setDisabledCipherSuites(String[] disabledCipherSuites) { this.disabledCipherSuites = disabledCipherSuites; @@ -169,13 +175,15 @@ public class NettyTransportSslOptions extends NettyTransportOptions { /** * The protocols to be set as enabled. * - * @param enabledProtocols the enabled protocols to set, or null if the defaults should be used. + * @param enabledProtocols + * the enabled protocols to set, or null if the defaults should be used. */ public void setEnabledProtocols(String[] enabledProtocols) { this.enabledProtocols = enabledProtocols; } /** + * * @return the protocols to disable or null if none should be */ public String[] getDisabledProtocols() { @@ -185,7 +193,8 @@ public class NettyTransportSslOptions extends NettyTransportOptions { /** * The protocols to be disable. * - * @param disabledProtocols the protocols to disable, or null if none should be. + * @param disabledProtocols + * the protocols to disable, or null if none should be. */ public void setDisabledProtocols(String[] disabledProtocols) { this.disabledProtocols = disabledProtocols; @@ -202,7 +211,8 @@ public class NettyTransportSslOptions extends NettyTransportOptions { * The protocol value to use when creating an SSLContext via * SSLContext.getInstance(protocol). * - * @param contextProtocol the context protocol to use. + * @param contextProtocol + * the context protocol to use. */ public void setContextProtocol(String contextProtocol) { this.contextProtocol = contextProtocol; @@ -216,7 +226,8 @@ public class NettyTransportSslOptions extends NettyTransportOptions { } /** - * @param trustAll the trustAll to set + * @param trustAll + * the trustAll to set */ public void setTrustAll(boolean trustAll) { this.trustAll = trustAll; @@ -230,7 +241,8 @@ public class NettyTransportSslOptions extends NettyTransportOptions { } /** - * @param verifyHost the verifyHost to set + * @param verifyHost + * the verifyHost to set */ public void setVerifyHost(boolean verifyHost) { this.verifyHost = verifyHost; @@ -244,7 +256,8 @@ public class NettyTransportSslOptions extends NettyTransportOptions { } /** - * @param keyAlias the key alias to use + * @param keyAlias + * the key alias to use */ public void setKeyAlias(String keyAlias) { this.keyAlias = keyAlias; @@ -258,6 +271,11 @@ public class NettyTransportSslOptions extends NettyTransportOptions { this.defaultSslPort = defaultSslPort; } + @Override + public boolean isSSL() { + return true; + } + @Override public NettyTransportSslOptions clone() { return copyOptions(new NettyTransportSslOptions()); diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportSupport.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportSupport.java index 15854e8baa..d41c669135 100644 --- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportSupport.java +++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportSupport.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. @@ -6,7 +6,7 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -16,15 +16,6 @@ */ package org.apache.activemq.transport.amqp.client.transport; -import javax.net.ssl.KeyManager; -import javax.net.ssl.KeyManagerFactory; -import javax.net.ssl.SSLContext; -import javax.net.ssl.SSLEngine; -import javax.net.ssl.SSLParameters; -import javax.net.ssl.TrustManager; -import javax.net.ssl.TrustManagerFactory; -import javax.net.ssl.X509ExtendedKeyManager; -import javax.net.ssl.X509TrustManager; import java.io.File; import java.io.FileInputStream; import java.io.InputStream; @@ -38,10 +29,21 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; -import io.netty.handler.ssl.SslHandler; +import javax.net.ssl.KeyManager; +import javax.net.ssl.KeyManagerFactory; +import javax.net.ssl.SSLContext; +import javax.net.ssl.SSLEngine; +import javax.net.ssl.SSLParameters; +import javax.net.ssl.TrustManager; +import javax.net.ssl.TrustManagerFactory; +import javax.net.ssl.X509ExtendedKeyManager; +import javax.net.ssl.X509TrustManager; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import io.netty.handler.ssl.SslHandler; + /** * Static class that provides various utility methods used by Transport implementations. */ @@ -50,13 +52,18 @@ public class NettyTransportSupport { private static final Logger LOG = LoggerFactory.getLogger(NettyTransportSupport.class); /** - * Creates a Netty SslHandler instance for use in Transports that require - * an SSL encoder / decoder. + * Creates a Netty SslHandler instance for use in Transports that require an SSL encoder / + * decoder. + * + * @param remote + * The URI of the remote peer that the SslHandler will be used against. + * @param options + * The SSL options object to build the SslHandler instance from. * - * @param remote The URI of the remote peer that the SslHandler will be used against. - * @param options The SSL options object to build the SslHandler instance from. * @return a new SslHandler that is configured from the given options. - * @throws Exception if an error occurs while creating the SslHandler instance. + * + * @throws Exception + * if an error occurs while creating the SslHandler instance. */ public static SslHandler createSslHandler(URI remote, NettyTransportSslOptions options) throws Exception { return new SslHandler(createSslEngine(remote, createSslContext(options), options)); @@ -66,9 +73,13 @@ public class NettyTransportSupport { * Create a new SSLContext using the options specific in the given TransportSslOptions * instance. * - * @param options the configured options used to create the SSLContext. + * @param options + * the configured options used to create the SSLContext. + * * @return a new SSLContext instance. - * @throws Exception if an error occurs while creating the context. + * + * @throws Exception + * if an error occurs while creating the context. */ public static SSLContext createSslContext(NettyTransportSslOptions options) throws Exception { try { @@ -91,10 +102,15 @@ public class NettyTransportSupport { * Create a new SSLEngine instance in client mode from the given SSLContext and * TransportSslOptions instances. * - * @param context the SSLContext to use when creating the engine. - * @param options the TransportSslOptions to use to configure the new SSLEngine. + * @param context + * the SSLContext to use when creating the engine. + * @param options + * the TransportSslOptions to use to configure the new SSLEngine. + * * @return a new SSLEngine instance in client mode. - * @throws Exception if an error occurs while creating the new SSLEngine. + * + * @throws Exception + * if an error occurs while creating the new SSLEngine. */ public static SSLEngine createSslEngine(SSLContext context, NettyTransportSslOptions options) throws Exception { return createSslEngine(null, context, options); @@ -104,15 +120,20 @@ public class NettyTransportSupport { * Create a new SSLEngine instance in client mode from the given SSLContext and * TransportSslOptions instances. * - * @param remote the URI of the remote peer that will be used to initialize the engine, may be null if none should. - * @param context the SSLContext to use when creating the engine. - * @param options the TransportSslOptions to use to configure the new SSLEngine. + * @param remote + * the URI of the remote peer that will be used to initialize the engine, may be null + * if none should. + * @param context + * the SSLContext to use when creating the engine. + * @param options + * the TransportSslOptions to use to configure the new SSLEngine. + * * @return a new SSLEngine instance in client mode. - * @throws Exception if an error occurs while creating the new SSLEngine. + * + * @throws Exception + * if an error occurs while creating the new SSLEngine. */ - public static SSLEngine createSslEngine(URI remote, - SSLContext context, - NettyTransportSslOptions options) throws Exception { + public static SSLEngine createSslEngine(URI remote, SSLContext context, NettyTransportSslOptions options) throws Exception { SSLEngine engine = null; if (remote == null) { engine = context.createSSLEngine(); @@ -185,7 +206,7 @@ public class NettyTransportSupport { private static TrustManager[] loadTrustManagers(NettyTransportSslOptions options) throws Exception { if (options.isTrustAll()) { - return new TrustManager[]{createTrustAllTrustManager()}; + return new TrustManager[] {createTrustAllTrustManager()}; } if (options.getTrustStoreLocation() == null) { diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyWSTransport.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyWSTransport.java index f75a52ebb9..eb595d09b8 100644 --- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyWSTransport.java +++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyWSTransport.java @@ -18,73 +18,46 @@ package org.apache.activemq.transport.amqp.client.transport; import java.io.IOException; import java.net.URI; -import java.security.Principal; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; +import java.nio.charset.StandardCharsets; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -import io.netty.bootstrap.Bootstrap; import io.netty.buffer.ByteBuf; import io.netty.channel.Channel; -import io.netty.channel.ChannelFuture; -import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelInitializer; -import io.netty.channel.ChannelOption; -import io.netty.channel.ChannelPromise; -import io.netty.channel.EventLoopGroup; -import io.netty.channel.FixedRecvByteBufAllocator; -import io.netty.channel.SimpleChannelInboundHandler; -import io.netty.channel.nio.NioEventLoopGroup; -import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.channel.ChannelPipeline; import io.netty.handler.codec.http.DefaultHttpHeaders; import io.netty.handler.codec.http.FullHttpResponse; import io.netty.handler.codec.http.HttpClientCodec; import io.netty.handler.codec.http.HttpObjectAggregator; import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame; import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame; +import io.netty.handler.codec.http.websocketx.PingWebSocketFrame; import io.netty.handler.codec.http.websocketx.PongWebSocketFrame; import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; import io.netty.handler.codec.http.websocketx.WebSocketClientHandshaker; import io.netty.handler.codec.http.websocketx.WebSocketClientHandshakerFactory; import io.netty.handler.codec.http.websocketx.WebSocketFrame; import io.netty.handler.codec.http.websocketx.WebSocketVersion; -import io.netty.handler.ssl.SslHandler; -import io.netty.util.CharsetUtil; -import io.netty.util.concurrent.Future; -import io.netty.util.concurrent.GenericFutureListener; -import org.apache.activemq.transport.amqp.client.util.IOExceptionSupport; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * Transport for communicating over WebSockets */ -public class NettyWSTransport implements NettyTransport { +public class NettyWSTransport extends NettyTcpTransport { private static final Logger LOG = LoggerFactory.getLogger(NettyWSTransport.class); - private static final int QUIET_PERIOD = 20; - private static final int SHUTDOWN_TIMEOUT = 100; - - protected Bootstrap bootstrap; - protected EventLoopGroup group; - protected Channel channel; - protected NettyTransportListener listener; - protected NettyTransportOptions options; - protected final URI remote; - protected boolean secure; - - private final AtomicBoolean connected = new AtomicBoolean(); - private final AtomicBoolean closed = new AtomicBoolean(); - private ChannelPromise handshakeFuture; - private IOException failureCause; - private Throwable pendingFailure; + private static final String AMQP_SUB_PROTOCOL = "amqp"; /** * Create a new transport instance * - * @param remoteLocation the URI that defines the remote resource to connect to. - * @param options the transport options used to configure the socket connection. + * @param remoteLocation + * the URI that defines the remote resource to connect to. + * @param options + * the transport options used to configure the socket connection. */ public NettyWSTransport(URI remoteLocation, NettyTransportOptions options) { this(null, remoteLocation, options); @@ -93,119 +66,15 @@ public class NettyWSTransport implements NettyTransport { /** * Create a new transport instance * - * @param listener the TransportListener that will receive events from this Transport. - * @param remoteLocation the URI that defines the remote resource to connect to. - * @param options the transport options used to configure the socket connection. + * @param listener + * the TransportListener that will receive events from this Transport. + * @param remoteLocation + * the URI that defines the remote resource to connect to. + * @param options + * the transport options used to configure the socket connection. */ public NettyWSTransport(NettyTransportListener listener, URI remoteLocation, NettyTransportOptions options) { - this.options = options; - this.listener = listener; - this.remote = remoteLocation; - this.secure = remoteLocation.getScheme().equalsIgnoreCase("wss"); - } - - @Override - public void connect() throws IOException { - - if (listener == null) { - throw new IllegalStateException("A transport listener must be set before connection attempts."); - } - - group = new NioEventLoopGroup(1); - - bootstrap = new Bootstrap(); - bootstrap.group(group); - bootstrap.channel(NioSocketChannel.class); - bootstrap.handler(new ChannelInitializer() { - - @Override - public void initChannel(Channel connectedChannel) throws Exception { - configureChannel(connectedChannel); - } - }); - - configureNetty(bootstrap, getTransportOptions()); - - ChannelFuture future; - try { - future = bootstrap.connect(getRemoteHost(), getRemotePort()); - future.addListener(new ChannelFutureListener() { - - @Override - public void operationComplete(ChannelFuture future) throws Exception { - if (future.isSuccess()) { - handleConnected(future.channel()); - } else if (future.isCancelled()) { - connectionFailed(future.channel(), new IOException("Connection attempt was cancelled")); - } else { - connectionFailed(future.channel(), IOExceptionSupport.create(future.cause())); - } - } - }); - - future.sync(); - - // Now wait for WS protocol level handshake completion - handshakeFuture.await(); - } catch (InterruptedException ex) { - LOG.debug("Transport connection attempt was interrupted."); - Thread.interrupted(); - failureCause = IOExceptionSupport.create(ex); - } - - if (failureCause != null) { - // Close out any Netty resources now as they are no longer needed. - if (channel != null) { - channel.close().syncUninterruptibly(); - channel = null; - } - if (group != null) { - group.shutdownGracefully(QUIET_PERIOD, SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS); - group = null; - } - - throw failureCause; - } else { - // Connected, allow any held async error to fire now and close the transport. - channel.eventLoop().execute(new Runnable() { - - @Override - public void run() { - if (pendingFailure != null) { - channel.pipeline().fireExceptionCaught(pendingFailure); - } - } - }); - } - } - - @Override - public boolean isConnected() { - return connected.get(); - } - - @Override - public boolean isSSL() { - return secure; - } - - @Override - public void close() throws IOException { - if (closed.compareAndSet(false, true)) { - connected.set(false); - if (channel != null) { - channel.close().syncUninterruptibly(); - } - if (group != null) { - group.shutdownGracefully(QUIET_PERIOD, SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS); - } - } - } - - @Override - public ByteBuf allocateSendBuffer(int size) throws IOException { - checkConnected(); - return channel.alloc().ioBuffer(size, size); + super(listener, remoteLocation, options); } @Override @@ -222,202 +91,37 @@ public class NettyWSTransport implements NettyTransport { } @Override - public NettyTransportListener getTransportListener() { - return listener; + protected ChannelInboundHandlerAdapter createChannelHandler() { + return new NettyWebSocketTransportHandler(); } @Override - public void setTransportListener(NettyTransportListener listener) { - this.listener = listener; + protected void addAdditionalHandlers(ChannelPipeline pipeline) { + pipeline.addLast(new HttpClientCodec()); + pipeline.addLast(new HttpObjectAggregator(8192)); } @Override - public NettyTransportOptions getTransportOptions() { - if (options == null) { - if (isSSL()) { - options = NettyTransportSslOptions.INSTANCE; - } else { - options = NettyTransportOptions.INSTANCE; - } - } - - return options; + protected void handleConnected(Channel channel) throws Exception { + LOG.trace("Channel has become active, awaiting WebSocket handshake! Channel is {}", channel); } - @Override - public URI getRemoteLocation() { - return remote; - } + // ----- Handle connection events -----------------------------------------// - @Override - public Principal getLocalPrincipal() { - if (!isSSL()) { - throw new UnsupportedOperationException("Not connected to a secure channel"); - } - - SslHandler sslHandler = channel.pipeline().get(SslHandler.class); - - return sslHandler.engine().getSession().getLocalPrincipal(); - } - - //----- Internal implementation details, can be overridden as needed --// - - protected String getRemoteHost() { - return remote.getHost(); - } - - protected int getRemotePort() { - int port = remote.getPort(); - - if (port <= 0) { - if (isSSL()) { - port = getSslOptions().getDefaultSslPort(); - } else { - port = getTransportOptions().getDefaultTcpPort(); - } - } - - return port; - } - - protected void configureNetty(Bootstrap bootstrap, NettyTransportOptions options) { - bootstrap.option(ChannelOption.TCP_NODELAY, options.isTcpNoDelay()); - bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, options.getConnectTimeout()); - bootstrap.option(ChannelOption.SO_KEEPALIVE, options.isTcpKeepAlive()); - bootstrap.option(ChannelOption.SO_LINGER, options.getSoLinger()); - - if (options.getSendBufferSize() != -1) { - bootstrap.option(ChannelOption.SO_SNDBUF, options.getSendBufferSize()); - } - - if (options.getReceiveBufferSize() != -1) { - bootstrap.option(ChannelOption.SO_RCVBUF, options.getReceiveBufferSize()); - bootstrap.option(ChannelOption.RCVBUF_ALLOCATOR, new FixedRecvByteBufAllocator(options.getReceiveBufferSize())); - } - - if (options.getTrafficClass() != -1) { - bootstrap.option(ChannelOption.IP_TOS, options.getTrafficClass()); - } - } - - protected void configureChannel(final Channel channel) throws Exception { - if (isSSL()) { - SslHandler sslHandler = NettyTransportSupport.createSslHandler(getRemoteLocation(), getSslOptions()); - sslHandler.handshakeFuture().addListener(new GenericFutureListener>() { - @Override - public void operationComplete(Future future) throws Exception { - if (future.isSuccess()) { - LOG.trace("SSL Handshake has completed: {}", channel); - connectionEstablished(channel); - } else { - LOG.trace("SSL Handshake has failed: {}", channel); - connectionFailed(channel, IOExceptionSupport.create(future.cause())); - } - } - }); - - channel.pipeline().addLast(sslHandler); - } - - channel.pipeline().addLast(new HttpClientCodec()); - channel.pipeline().addLast(new HttpObjectAggregator(8192)); - channel.pipeline().addLast(new NettyTcpTransportHandler()); - } - - protected void handleConnected(final Channel channel) throws Exception { - if (!isSSL()) { - connectionEstablished(channel); - } - } - - //----- State change handlers and checks ---------------------------------// - - /** - * Called when the transport has successfully connected and is ready for use. - */ - protected void connectionEstablished(Channel connectedChannel) { - LOG.info("WebSocket connectionEstablished! {}", connectedChannel); - channel = connectedChannel; - connected.set(true); - } - - /** - * Called when the transport connection failed and an error should be returned. - * - * @param failedChannel The Channel instance that failed. - * @param cause An IOException that describes the cause of the failed connection. - */ - protected void connectionFailed(Channel failedChannel, IOException cause) { - failureCause = IOExceptionSupport.create(cause); - channel = failedChannel; - connected.set(false); - handshakeFuture.setFailure(cause); - } - - private NettyTransportSslOptions getSslOptions() { - return (NettyTransportSslOptions) getTransportOptions(); - } - - private void checkConnected() throws IOException { - if (!connected.get()) { - throw new IOException("Cannot send to a non-connected transport."); - } - } - - //----- Handle connection events -----------------------------------------// - - private class NettyTcpTransportHandler extends SimpleChannelInboundHandler { + private class NettyWebSocketTransportHandler extends NettyDefaultHandler { private final WebSocketClientHandshaker handshaker; - NettyTcpTransportHandler() { - handshaker = WebSocketClientHandshakerFactory.newHandshaker(remote, WebSocketVersion.V13, "amqp", false, new DefaultHttpHeaders()); - } - - @Override - public void handlerAdded(ChannelHandlerContext context) { - LOG.trace("Handler has become added! Channel is {}", context.channel()); - handshakeFuture = context.newPromise(); + NettyWebSocketTransportHandler() { + handshaker = WebSocketClientHandshakerFactory.newHandshaker(getRemoteLocation(), WebSocketVersion.V13, AMQP_SUB_PROTOCOL, true, + new DefaultHttpHeaders()); } @Override public void channelActive(ChannelHandlerContext context) throws Exception { - LOG.trace("Channel has become active! Channel is {}", context.channel()); handshaker.handshake(context.channel()); - } - @Override - public void channelInactive(ChannelHandlerContext context) throws Exception { - LOG.trace("Channel has gone inactive! Channel is {}", context.channel()); - if (connected.compareAndSet(true, false) && !closed.get()) { - LOG.trace("Firing onTransportClosed listener"); - listener.onTransportClosed(); - } - } - - @Override - public void exceptionCaught(ChannelHandlerContext context, Throwable cause) throws Exception { - LOG.trace("Exception on channel! Channel is {} -> {}", context.channel(), cause.getMessage()); - LOG.trace("Error Stack: ", cause); - if (connected.compareAndSet(true, false) && !closed.get()) { - LOG.trace("Firing onTransportError listener"); - if (pendingFailure != null) { - listener.onTransportError(pendingFailure); - } else { - listener.onTransportError(cause); - } - } else { - // Hold the first failure for later dispatch if connect succeeds. - // This will then trigger disconnect using the first error reported. - if (pendingFailure != null) { - LOG.trace("Holding error until connect succeeds: {}", cause.getMessage()); - pendingFailure = cause; - } - - if (!handshakeFuture.isDone()) { - handshakeFuture.setFailure(cause); - } - } + super.channelActive(context); } @Override @@ -427,16 +131,17 @@ public class NettyWSTransport implements NettyTransport { Channel ch = ctx.channel(); if (!handshaker.isHandshakeComplete()) { handshaker.finishHandshake(ch, (FullHttpResponse) message); - LOG.info("WebSocket Client connected! {}", ctx.channel()); - handshakeFuture.setSuccess(); + LOG.trace("WebSocket Client connected! {}", ctx.channel()); + // Now trigger super processing as we are really connected. + NettyWSTransport.super.handleConnected(ch); return; } // We shouldn't get this since we handle the handshake previously. if (message instanceof FullHttpResponse) { FullHttpResponse response = (FullHttpResponse) message; - throw new IllegalStateException("Unexpected FullHttpResponse (getStatus=" + response.getStatus() + - ", content=" + response.content().toString(CharsetUtil.UTF_8) + ')'); + throw new IllegalStateException( + "Unexpected FullHttpResponse (getStatus=" + response.status() + ", content=" + response.content().toString(StandardCharsets.UTF_8) + ')'); } WebSocketFrame frame = (WebSocketFrame) message; @@ -446,10 +151,11 @@ public class NettyWSTransport implements NettyTransport { ctx.fireExceptionCaught(new IOException("Received invalid frame over WebSocket.")); } else if (frame instanceof BinaryWebSocketFrame) { BinaryWebSocketFrame binaryFrame = (BinaryWebSocketFrame) frame; - LOG.info("WebSocket Client received data: {} bytes", binaryFrame.content().readableBytes()); + LOG.trace("WebSocket Client received data: {} bytes", binaryFrame.content().readableBytes()); listener.onData(binaryFrame.content()); - } else if (frame instanceof PongWebSocketFrame) { - LOG.trace("WebSocket Client received pong"); + } else if (frame instanceof PingWebSocketFrame) { + LOG.trace("WebSocket Client received ping, response with pong"); + ch.write(new PongWebSocketFrame(frame.content())); } else if (frame instanceof CloseWebSocketFrame) { LOG.trace("WebSocket Client received closing"); ch.close(); diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableConnection.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableConnection.java deleted file mode 100644 index 5e2ef15a9c..0000000000 --- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableConnection.java +++ /dev/null @@ -1,202 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.transport.amqp.client.util; - -import java.util.EnumSet; -import java.util.Map; - -import org.apache.qpid.proton.amqp.Symbol; -import org.apache.qpid.proton.amqp.transport.ErrorCondition; -import org.apache.qpid.proton.engine.Collector; -import org.apache.qpid.proton.engine.Connection; -import org.apache.qpid.proton.engine.Delivery; -import org.apache.qpid.proton.engine.EndpointState; -import org.apache.qpid.proton.engine.Link; -import org.apache.qpid.proton.engine.Record; -import org.apache.qpid.proton.engine.Session; -import org.apache.qpid.proton.engine.Transport; -import org.apache.qpid.proton.reactor.Reactor; - -/** - * Unmodifiable Connection wrapper used to prevent test code from accidentally - * modifying Connection state. - */ -public class UnmodifiableConnection implements Connection { - - private final Connection connection; - - public UnmodifiableConnection(Connection connection) { - this.connection = connection; - } - - @Override - public EndpointState getLocalState() { - return connection.getLocalState(); - } - - @Override - public EndpointState getRemoteState() { - return connection.getRemoteState(); - } - - @Override - public ErrorCondition getCondition() { - return connection.getCondition(); - } - - @Override - public void setCondition(ErrorCondition condition) { - throw new UnsupportedOperationException("Cannot alter the Connection"); - } - - @Override - public ErrorCondition getRemoteCondition() { - return connection.getRemoteCondition(); - } - - @Override - public void free() { - throw new UnsupportedOperationException("Cannot alter the Connection"); - } - - @Override - public void open() { - throw new UnsupportedOperationException("Cannot alter the Connection"); - } - - @Override - public void close() { - throw new UnsupportedOperationException("Cannot alter the Connection"); - } - - @Override - public Session session() { - throw new UnsupportedOperationException("Cannot alter the Connection"); - } - - @Override - public Session sessionHead(EnumSet local, EnumSet remote) { - Session head = connection.sessionHead(local, remote); - if (head != null) { - head = new UnmodifiableSession(head); - } - - return head; - } - - @Override - public Link linkHead(EnumSet local, EnumSet remote) { - // TODO - If implemented this method should return an unmodifiable link instance. - return null; - } - - @Override - public Delivery getWorkHead() { - // TODO - If implemented this method should return an unmodifiable delivery instance. - return null; - } - - @Override - public void setContainer(String container) { - throw new UnsupportedOperationException("Cannot alter the Connection"); - } - - @Override - public void setHostname(String hostname) { - throw new UnsupportedOperationException("Cannot alter the Connection"); - } - - @Override - public String getHostname() { - return connection.getHostname(); - } - - @Override - public String getRemoteContainer() { - return connection.getRemoteContainer(); - } - - @Override - public String getRemoteHostname() { - return connection.getRemoteHostname(); - } - - @Override - public void setOfferedCapabilities(Symbol[] capabilities) { - throw new UnsupportedOperationException("Cannot alter the Connection"); - } - - @Override - public void setDesiredCapabilities(Symbol[] capabilities) { - throw new UnsupportedOperationException("Cannot alter the Connection"); - } - - @Override - public Symbol[] getRemoteOfferedCapabilities() { - return connection.getRemoteOfferedCapabilities(); - } - - @Override - public Symbol[] getRemoteDesiredCapabilities() { - return connection.getRemoteDesiredCapabilities(); - } - - @Override - public Map getRemoteProperties() { - return connection.getRemoteProperties(); - } - - @Override - public void setProperties(Map properties) { - throw new UnsupportedOperationException("Cannot alter the Connection"); - } - - @Override - public Object getContext() { - return connection.getContext(); - } - - @Override - public void setContext(Object context) { - throw new UnsupportedOperationException("Cannot alter the Connection"); - } - - @Override - public void collect(Collector collector) { - throw new UnsupportedOperationException("Cannot alter the Connection"); - } - - @Override - public String getContainer() { - return connection.getContainer(); - } - - @Override - public Transport getTransport() { - return new UnmodifiableTransport(connection.getTransport()); - } - - @Override - public Record attachments() { - return connection.attachments(); - } - - @Override - public Reactor getReactor() { - return connection.getReactor(); - } -} diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableDelivery.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableDelivery.java deleted file mode 100644 index 5545884245..0000000000 --- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableDelivery.java +++ /dev/null @@ -1,179 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.transport.amqp.client.util; - -import org.apache.qpid.proton.amqp.transport.DeliveryState; -import org.apache.qpid.proton.engine.Delivery; -import org.apache.qpid.proton.engine.Link; -import org.apache.qpid.proton.engine.Receiver; -import org.apache.qpid.proton.engine.Record; -import org.apache.qpid.proton.engine.Sender; - -/** - * Unmodifiable Delivery wrapper used to prevent test code from accidentally - * modifying Delivery state. - */ -public class UnmodifiableDelivery implements Delivery { - - private final Delivery delivery; - - public UnmodifiableDelivery(Delivery delivery) { - this.delivery = delivery; - } - - @Override - public byte[] getTag() { - return delivery.getTag(); - } - - @Override - public Link getLink() { - if (delivery.getLink() instanceof Sender) { - return new UnmodifiableSender((Sender) delivery.getLink()); - } else if (delivery.getLink() instanceof Receiver) { - return new UnmodifiableReceiver((Receiver) delivery.getLink()); - } else { - throw new IllegalStateException("Delivery has unknown link type"); - } - } - - /* waiting Pull Request sent - @Override - public int getDataLength() { - return delivery.getDataLength(); - } */ - - @Override - public int available() { - return delivery.available(); - } - - @Override - public DeliveryState getLocalState() { - return delivery.getLocalState(); - } - - @Override - public DeliveryState getRemoteState() { - return delivery.getRemoteState(); - } - - @Override - public int getMessageFormat() { - return delivery.getMessageFormat(); - } - - @Override - public void disposition(DeliveryState state) { - throw new UnsupportedOperationException("Cannot alter the Delivery state"); - } - - @Override - public void settle() { - throw new UnsupportedOperationException("Cannot alter the Delivery state"); - } - - @Override - public boolean isSettled() { - return delivery.isSettled(); - } - - @Override - public boolean remotelySettled() { - return delivery.remotelySettled(); - } - - @Override - public void free() { - throw new UnsupportedOperationException("Cannot alter the Delivery state"); - } - - @Override - public Delivery getWorkNext() { - return new UnmodifiableDelivery(delivery.getWorkNext()); - } - - @Override - public Delivery next() { - return new UnmodifiableDelivery(delivery.next()); - } - - @Override - public boolean isWritable() { - return delivery.isWritable(); - } - - @Override - public boolean isReadable() { - return delivery.isReadable(); - } - - @Override - public void setContext(Object o) { - throw new UnsupportedOperationException("Cannot alter the Delivery state"); - } - - @Override - public Object getContext() { - return delivery.getContext(); - } - - @Override - public boolean isUpdated() { - return delivery.isUpdated(); - } - - @Override - public void clear() { - throw new UnsupportedOperationException("Cannot alter the Delivery state"); - } - - @Override - public boolean isPartial() { - return delivery.isPartial(); - } - - @Override - public int pending() { - return delivery.pending(); - } - - @Override - public boolean isBuffered() { - return delivery.isBuffered(); - } - - @Override - public Record attachments() { - return delivery.attachments(); - } - - @Override - public DeliveryState getDefaultDeliveryState() { - return delivery.getDefaultDeliveryState(); - } - - @Override - public void setDefaultDeliveryState(DeliveryState state) { - throw new UnsupportedOperationException("Cannot alter the Delivery"); - } - - @Override - public void setMessageFormat(int messageFormat) { - throw new UnsupportedOperationException("Cannot alter the Delivery"); - } -} diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableLink.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableLink.java deleted file mode 100644 index 7e4319d91b..0000000000 --- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableLink.java +++ /dev/null @@ -1,306 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.transport.amqp.client.util; - -import java.util.EnumSet; -import java.util.Map; - -import org.apache.qpid.proton.amqp.Symbol; -import org.apache.qpid.proton.amqp.transport.ErrorCondition; -import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode; -import org.apache.qpid.proton.amqp.transport.SenderSettleMode; -import org.apache.qpid.proton.amqp.transport.Source; -import org.apache.qpid.proton.amqp.transport.Target; -import org.apache.qpid.proton.engine.Delivery; -import org.apache.qpid.proton.engine.EndpointState; -import org.apache.qpid.proton.engine.Link; -import org.apache.qpid.proton.engine.Receiver; -import org.apache.qpid.proton.engine.Record; -import org.apache.qpid.proton.engine.Sender; -import org.apache.qpid.proton.engine.Session; - -/** - * Unmodifiable Session wrapper used to prevent test code from accidentally - * modifying Session state. - */ -public class UnmodifiableLink implements Link { - - private final Link link; - - public UnmodifiableLink(Link link) { - this.link = link; - } - - @Override - public EndpointState getLocalState() { - return link.getLocalState(); - } - - @Override - public EndpointState getRemoteState() { - return link.getRemoteState(); - } - - @Override - public ErrorCondition getCondition() { - return link.getCondition(); - } - - @Override - public void setCondition(ErrorCondition condition) { - throw new UnsupportedOperationException("Cannot alter the Link state"); - } - - @Override - public ErrorCondition getRemoteCondition() { - return link.getRemoteCondition(); - } - - @Override - public void free() { - throw new UnsupportedOperationException("Cannot alter the Link state"); - } - - @Override - public void open() { - throw new UnsupportedOperationException("Cannot alter the Link state"); - } - - @Override - public void close() { - throw new UnsupportedOperationException("Cannot alter the Link state"); - } - - @Override - public void setContext(Object o) { - throw new UnsupportedOperationException("Cannot alter the Link state"); - } - - @Override - public Object getContext() { - return link.getContext(); - } - - @Override - public String getName() { - return link.getName(); - } - - @Override - public Delivery delivery(byte[] tag) { - throw new UnsupportedOperationException("Cannot alter the Link state"); - } - - @Override - public Delivery delivery(byte[] tag, int offset, int length) { - throw new UnsupportedOperationException("Cannot alter the Link state"); - } - - @Override - public Delivery head() { - return new UnmodifiableDelivery(link.head()); - } - - @Override - public Delivery current() { - return new UnmodifiableDelivery(link.current()); - } - - @Override - public boolean advance() { - throw new UnsupportedOperationException("Cannot alter the Link state"); - } - - @Override - public Source getSource() { - // TODO Figure out a simple way to wrap the odd Source types in Proton-J - return link.getSource(); - } - - @Override - public Target getTarget() { - // TODO Figure out a simple way to wrap the odd Source types in Proton-J - return link.getTarget(); - } - - @Override - public void setSource(Source address) { - throw new UnsupportedOperationException("Cannot alter the Link state"); - } - - @Override - public void setTarget(Target address) { - throw new UnsupportedOperationException("Cannot alter the Link state"); - } - - @Override - public Source getRemoteSource() { - // TODO Figure out a simple way to wrap the odd Source types in Proton-J - return link.getRemoteSource(); - } - - @Override - public Target getRemoteTarget() { - // TODO Figure out a simple way to wrap the odd Target types in Proton-J - return link.getRemoteTarget(); - } - - @Override - public Link next(EnumSet local, EnumSet remote) { - Link next = link.next(local, remote); - - if (next != null) { - if (next instanceof Sender) { - next = new UnmodifiableSender((Sender) next); - } else { - next = new UnmodifiableReceiver((Receiver) next); - } - } - - return next; - } - - @Override - public int getCredit() { - return link.getCredit(); - } - - @Override - public int getQueued() { - return link.getQueued(); - } - - @Override - public int getUnsettled() { - return link.getUnsettled(); - } - - @Override - public Session getSession() { - return new UnmodifiableSession(link.getSession()); - } - - @Override - public SenderSettleMode getSenderSettleMode() { - return link.getSenderSettleMode(); - } - - @Override - public void setSenderSettleMode(SenderSettleMode senderSettleMode) { - throw new UnsupportedOperationException("Cannot alter the Link state"); - } - - @Override - public SenderSettleMode getRemoteSenderSettleMode() { - return link.getRemoteSenderSettleMode(); - } - - @Override - public ReceiverSettleMode getReceiverSettleMode() { - return link.getReceiverSettleMode(); - } - - @Override - public void setReceiverSettleMode(ReceiverSettleMode receiverSettleMode) { - throw new UnsupportedOperationException("Cannot alter the Link state"); - } - - @Override - public ReceiverSettleMode getRemoteReceiverSettleMode() { - return link.getRemoteReceiverSettleMode(); - } - - @Override - public void setRemoteSenderSettleMode(SenderSettleMode remoteSenderSettleMode) { - throw new UnsupportedOperationException("Cannot alter the Link state"); - } - - @Override - public int drained() { - return link.drained(); // TODO - Is this a mutating call? - } - - @Override - public int getRemoteCredit() { - return link.getRemoteCredit(); - } - - @Override - public boolean getDrain() { - return link.getDrain(); - } - - @Override - public void detach() { - throw new UnsupportedOperationException("Cannot alter the Link state"); - } - - @Override - public boolean detached() { - return link.detached(); - } - - @Override - public Record attachments() { - return link.attachments(); - } - - @Override - public Map getProperties() { - return link.getProperties(); - } - - @Override - public void setProperties(Map properties) { - throw new UnsupportedOperationException("Cannot alter the Link state"); - } - - @Override - public Map getRemoteProperties() { - return link.getRemoteProperties(); - } - - @Override - public Symbol[] getDesiredCapabilities() { - return link.getDesiredCapabilities(); - } - - @Override - public Symbol[] getOfferedCapabilities() { - return link.getOfferedCapabilities(); - } - - @Override - public Symbol[] getRemoteDesiredCapabilities() { - return link.getRemoteDesiredCapabilities(); - } - - @Override - public Symbol[] getRemoteOfferedCapabilities() { - return link.getRemoteOfferedCapabilities(); - } - - @Override - public void setDesiredCapabilities(Symbol[] capabilities) { - throw new UnsupportedOperationException("Cannot alter the Link state"); - } - - @Override - public void setOfferedCapabilities(Symbol[] capabilities) { - throw new UnsupportedOperationException("Cannot alter the Link state"); - } -} diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableProxy.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableProxy.java new file mode 100644 index 0000000000..25e6e6cac7 --- /dev/null +++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableProxy.java @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp.client.util; + +import java.lang.reflect.InvocationHandler; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Map; + +import org.apache.qpid.proton.engine.Connection; +import org.apache.qpid.proton.engine.Delivery; +import org.apache.qpid.proton.engine.Link; +import org.apache.qpid.proton.engine.Receiver; +import org.apache.qpid.proton.engine.Sasl; +import org.apache.qpid.proton.engine.Sender; +import org.apache.qpid.proton.engine.Session; +import org.apache.qpid.proton.engine.Transport; + +/** + * Utility that creates proxy objects for the Proton objects which won't allow any mutating + * operations to be applied so that the test code does not interact with the proton engine + * outside the client serialization thread. + */ +public final class UnmodifiableProxy { + + private static ArrayList blacklist = new ArrayList<>(); + + // These methods are mutating but don't take an arguments so they + // aren't automatically filtered out. We will have to keep an eye + // on proton API in the future and modify this list as it evolves. + static { + blacklist.add("close"); + blacklist.add("free"); + blacklist.add("open"); + blacklist.add("sasl"); + blacklist.add("session"); + blacklist.add("close_head"); + blacklist.add("close_tail"); + blacklist.add("outputConsumed"); + blacklist.add("process"); + blacklist.add("processInput"); + blacklist.add("unbind"); + blacklist.add("settle"); + blacklist.add("clear"); + blacklist.add("detach"); + blacklist.add("abort"); + } + + private UnmodifiableProxy() { + } + + public static Transport transportProxy(final Transport target) { + Transport wrap = wrap(Transport.class, target); + return wrap; + } + + public static Sasl saslProxy(final Sasl target) { + return wrap(Sasl.class, target); + } + + public static Connection connectionProxy(final Connection target) { + return wrap(Connection.class, target); + } + + public static Session sessionProxy(final Session target) { + return wrap(Session.class, target); + } + + public static Delivery deliveryProxy(final Delivery target) { + return wrap(Delivery.class, target); + } + + public static Link linkProxy(final Link target) { + return wrap(Link.class, target); + } + + public static Receiver receiverProxy(final Receiver target) { + return wrap(Receiver.class, target); + } + + public static Sender senderProxy(final Sender target) { + return wrap(Sender.class, target); + } + + private static boolean isProtonType(Class clazz) { + String packageName = clazz.getPackage().getName(); + + if (packageName.startsWith("org.apache.qpid.proton.")) { + return true; + } + + return false; + } + + private static T wrap(Class type, final Object target) { + return type.cast(java.lang.reflect.Proxy.newProxyInstance(type.getClassLoader(), new Class[] {type}, new InvocationHandler() { + @Override + public Object invoke(Object o, Method method, Object[] objects) throws Throwable { + if ("toString".equals(method.getName()) && method.getParameterTypes().length == 0) { + return "Unmodifiable proxy -> (" + method.invoke(target, objects) + ")"; + } + + // Don't let methods that mutate be invoked. + if (method.getParameterTypes().length > 0) { + throw new UnsupportedOperationException("Cannot mutate outside the Client work thread"); + } + + if (blacklist.contains(method.getName())) { + throw new UnsupportedOperationException("Cannot mutate outside the Client work thread"); + } + + Class returnType = method.getReturnType(); + + try { + Object result = method.invoke(target, objects); + if (result == null) { + return null; + } + + if (returnType.isPrimitive() || returnType.isArray() || Object.class.equals(returnType)) { + // Skip any other checks + } else if (returnType.isAssignableFrom(ByteBuffer.class)) { + // Buffers are modifiable but we can just return null to indicate + // there's nothing there to access. + result = null; + } else if (returnType.isAssignableFrom(Map.class)) { + // Prevent return of modifiable maps + result = Collections.unmodifiableMap((Map) result); + } else if (isProtonType(returnType) && returnType.isInterface()) { + + // Can't handle the crazy Source / Target types yet as there's two + // different types for Source and Target the result can't be cast to + // the one people actually want to use. + if (!returnType.getName().equals("org.apache.qpid.proton.amqp.transport.Source") + && !returnType.getName().equals("org.apache.qpid.proton.amqp.messaging.Source") + && !returnType.getName().equals("org.apache.qpid.proton.amqp.transport.Target") + && !returnType.getName().equals("org.apache.qpid.proton.amqp.messaging.Target")) { + + result = wrap(returnType, result); + } + } + + return result; + } catch (InvocationTargetException e) { + throw e.getTargetException(); + } + } + })); + } +} diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableReceiver.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableReceiver.java deleted file mode 100644 index f447d87d28..0000000000 --- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableReceiver.java +++ /dev/null @@ -1,65 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.transport.amqp.client.util; - -import org.apache.qpid.proton.codec.WritableBuffer; -import org.apache.qpid.proton.engine.Receiver; - -/** - * Unmodifiable Receiver wrapper used to prevent test code from accidentally - * modifying Receiver state. - */ -public class UnmodifiableReceiver extends UnmodifiableLink implements Receiver { - - private final Receiver receiver; - - public UnmodifiableReceiver(Receiver receiver) { - super(receiver); - - this.receiver = receiver; - } - - @Override - public void flow(int credits) { - throw new UnsupportedOperationException("Cannot alter the Link state"); - } - - @Override - public int recv(byte[] bytes, int offset, int size) { - throw new UnsupportedOperationException("Cannot alter the Link state"); - } - - @Override - public int recv(WritableBuffer buffer) { - throw new UnsupportedOperationException("Cannot alter the Link state"); - } - - @Override - public void drain(int credit) { - throw new UnsupportedOperationException("Cannot alter the Link state"); - } - - @Override - public boolean draining() { - return receiver.draining(); - } - - @Override - public void setDrain(boolean drain) { - throw new UnsupportedOperationException("Cannot alter the Link state"); - } -} diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableSender.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableSender.java deleted file mode 100644 index 3c67f682b1..0000000000 --- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableSender.java +++ /dev/null @@ -1,51 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.transport.amqp.client.util; - -import org.apache.qpid.proton.codec.ReadableBuffer; -import org.apache.qpid.proton.engine.Sender; - -/** - * Unmodifiable Sender wrapper used to prevent test code from accidentally - * modifying Sender state. - */ -public class UnmodifiableSender extends UnmodifiableLink implements Sender { - - public UnmodifiableSender(Sender sender) { - super(sender); - } - - @Override - public void offer(int credits) { - throw new UnsupportedOperationException("Cannot alter the Link state"); - } - - @Override - public int send(byte[] bytes, int offset, int length) { - throw new UnsupportedOperationException("Cannot alter the Link state"); - } - - @Override - public int send(ReadableBuffer buffer) { - throw new UnsupportedOperationException("Cannot alter the Link state"); - } - - @Override - public void abort() { - throw new UnsupportedOperationException("Cannot alter the Link state"); - } -} diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableSession.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableSession.java deleted file mode 100644 index 3fc26cbd74..0000000000 --- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableSession.java +++ /dev/null @@ -1,198 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.transport.amqp.client.util; - -import java.util.EnumSet; -import java.util.Map; - -import org.apache.qpid.proton.amqp.Symbol; -import org.apache.qpid.proton.amqp.transport.ErrorCondition; -import org.apache.qpid.proton.engine.Connection; -import org.apache.qpid.proton.engine.EndpointState; -import org.apache.qpid.proton.engine.Receiver; -import org.apache.qpid.proton.engine.Record; -import org.apache.qpid.proton.engine.Sender; -import org.apache.qpid.proton.engine.Session; - -/** - * Unmodifiable Session wrapper used to prevent test code from accidentally - * modifying Session state. - */ -public class UnmodifiableSession implements Session { - - private final Session session; - - public UnmodifiableSession(Session session) { - this.session = session; - } - - @Override - public EndpointState getLocalState() { - return session.getLocalState(); - } - - @Override - public EndpointState getRemoteState() { - return session.getRemoteState(); - } - - @Override - public ErrorCondition getCondition() { - return session.getCondition(); - } - - @Override - public void setCondition(ErrorCondition condition) { - throw new UnsupportedOperationException("Cannot alter the Session"); - } - - @Override - public ErrorCondition getRemoteCondition() { - return session.getRemoteCondition(); - } - - @Override - public void free() { - throw new UnsupportedOperationException("Cannot alter the Session"); - } - - @Override - public void open() { - throw new UnsupportedOperationException("Cannot alter the Session"); - } - - @Override - public void close() { - throw new UnsupportedOperationException("Cannot alter the Session"); - } - - @Override - public void setContext(Object o) { - throw new UnsupportedOperationException("Cannot alter the Session"); - } - - @Override - public Object getContext() { - return session.getContext(); - } - - @Override - public Sender sender(String name) { - throw new UnsupportedOperationException("Cannot alter the Session"); - } - - @Override - public Receiver receiver(String name) { - throw new UnsupportedOperationException("Cannot alter the Session"); - } - - @Override - public Session next(EnumSet local, EnumSet remote) { - Session next = session.next(local, remote); - if (next != null) { - next = new UnmodifiableSession(next); - } - - return next; - } - - @Override - public Connection getConnection() { - return new UnmodifiableConnection(session.getConnection()); - } - - @Override - public int getIncomingCapacity() { - return session.getIncomingCapacity(); - } - - @Override - public void setIncomingCapacity(int bytes) { - throw new UnsupportedOperationException("Cannot alter the Session"); - } - - @Override - public int getIncomingBytes() { - return session.getIncomingBytes(); - } - - @Override - public int getOutgoingBytes() { - return session.getOutgoingBytes(); - } - - @Override - public Record attachments() { - return session.attachments(); - } - - @Override - public long getOutgoingWindow() { - return session.getOutgoingWindow(); - } - - @Override - public void setOutgoingWindow(long outgoingWindowSize) { - throw new UnsupportedOperationException("Cannot alter the Session"); - } - - - @Override - public Symbol[] getDesiredCapabilities() { - return session.getDesiredCapabilities(); - } - - @Override - public Symbol[] getOfferedCapabilities() { - return session.getOfferedCapabilities(); - } - - @Override - public Map getProperties() { - return session.getProperties(); - } - - @Override - public Symbol[] getRemoteDesiredCapabilities() { - return session.getRemoteDesiredCapabilities(); - } - - @Override - public Symbol[] getRemoteOfferedCapabilities() { - return session.getRemoteOfferedCapabilities(); - } - - @Override - public Map getRemoteProperties() { - return session.getRemoteProperties(); - } - - @Override - public void setDesiredCapabilities(Symbol[] capabilities) { - throw new UnsupportedOperationException("Cannot alter the Link state"); - } - - @Override - public void setOfferedCapabilities(Symbol[] capabilities) { - throw new UnsupportedOperationException("Cannot alter the Link state"); - } - - @Override - public void setProperties(Map capabilities) { - throw new UnsupportedOperationException("Cannot alter the Link state"); - } -} diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableTransport.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableTransport.java deleted file mode 100644 index 5e305f4d11..0000000000 --- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableTransport.java +++ /dev/null @@ -1,274 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.transport.amqp.client.util; - -import java.nio.ByteBuffer; - -import org.apache.qpid.proton.amqp.transport.ErrorCondition; -import org.apache.qpid.proton.engine.Connection; -import org.apache.qpid.proton.engine.EndpointState; -import org.apache.qpid.proton.engine.Record; -import org.apache.qpid.proton.engine.Sasl; -import org.apache.qpid.proton.engine.Ssl; -import org.apache.qpid.proton.engine.SslDomain; -import org.apache.qpid.proton.engine.SslPeerDetails; -import org.apache.qpid.proton.engine.Transport; -import org.apache.qpid.proton.engine.TransportException; -import org.apache.qpid.proton.engine.TransportResult; - -/** - * Unmodifiable Transport wrapper used to prevent test code from accidentally - * modifying Transport state. - */ -public class UnmodifiableTransport implements Transport { - - private final Transport transport; - - public UnmodifiableTransport(Transport transport) { - this.transport = transport; - } - - @Override - public void close() { - throw new UnsupportedOperationException("Cannot alter the Transport"); - } - - @Override - public void free() { - throw new UnsupportedOperationException("Cannot alter the Transport"); - } - - @Override - public Object getContext() { - return null; - } - - @Override - public EndpointState getLocalState() { - return transport.getLocalState(); - } - - @Override - public ErrorCondition getRemoteCondition() { - return transport.getRemoteCondition(); - } - - @Override - public EndpointState getRemoteState() { - return transport.getRemoteState(); - } - - @Override - public void open() { - throw new UnsupportedOperationException("Cannot alter the Transport"); - } - - @Override - public void setCondition(ErrorCondition arg0) { - throw new UnsupportedOperationException("Cannot alter the Transport"); - } - - @Override - public void setContext(Object arg0) { - throw new UnsupportedOperationException("Cannot alter the Transport"); - } - - @Override - public void bind(Connection arg0) { - throw new UnsupportedOperationException("Cannot alter the Transport"); - } - - @Override - public int capacity() { - return transport.capacity(); - } - - @Override - public void close_head() { - throw new UnsupportedOperationException("Cannot alter the Transport"); - } - - @Override - public void close_tail() { - throw new UnsupportedOperationException("Cannot alter the Transport"); - } - - @Override - public int getChannelMax() { - return transport.getChannelMax(); - } - - @Override - public ErrorCondition getCondition() { - return transport.getCondition(); - } - - @Override - public int getIdleTimeout() { - return transport.getIdleTimeout(); - } - - @Override - public ByteBuffer getInputBuffer() { - return null; - } - - @Override - public int getMaxFrameSize() { - return transport.getMaxFrameSize(); - } - - @Override - public ByteBuffer getOutputBuffer() { - return null; - } - - @Override - public int getRemoteChannelMax() { - return transport.getRemoteChannelMax(); - } - - @Override - public int getRemoteIdleTimeout() { - return transport.getRemoteIdleTimeout(); - } - - @Override - public int getRemoteMaxFrameSize() { - return transport.getRemoteMaxFrameSize(); - } - - @Override - public ByteBuffer head() { - return null; - } - - @Override - public int input(byte[] arg0, int arg1, int arg2) { - throw new UnsupportedOperationException("Cannot alter the Transport"); - } - - @Override - public boolean isClosed() { - return transport.isClosed(); - } - - @Override - public int output(byte[] arg0, int arg1, int arg2) { - throw new UnsupportedOperationException("Cannot alter the Transport"); - } - - @Override - public void outputConsumed() { - throw new UnsupportedOperationException("Cannot alter the Transport"); - } - - @Override - public int pending() { - return transport.pending(); - } - - @Override - public void pop(int arg0) { - throw new UnsupportedOperationException("Cannot alter the Transport"); - } - - @Override - public void process() throws TransportException { - throw new UnsupportedOperationException("Cannot alter the Transport"); - } - - @Override - public TransportResult processInput() { - throw new UnsupportedOperationException("Cannot alter the Transport"); - } - - @Override - public Sasl sasl() throws IllegalStateException { - throw new UnsupportedOperationException("Cannot alter the Transport"); - } - - @Override - public void setChannelMax(int arg0) { - throw new UnsupportedOperationException("Cannot alter the Transport"); - } - - @Override - public void setIdleTimeout(int arg0) { - throw new UnsupportedOperationException("Cannot alter the Transport"); - } - - @Override - public void setMaxFrameSize(int arg0) { - throw new UnsupportedOperationException("Cannot alter the Transport"); - } - - @Override - public Ssl ssl(SslDomain arg0) { - throw new UnsupportedOperationException("Cannot alter the Transport"); - } - - @Override - public Ssl ssl(SslDomain arg0, SslPeerDetails arg1) { - throw new UnsupportedOperationException("Cannot alter the Transport"); - } - - @Override - public ByteBuffer tail() { - return null; - } - - @Override - public long tick(long arg0) { - throw new UnsupportedOperationException("Cannot alter the Transport"); - } - - @Override - public void trace(int arg0) { - throw new UnsupportedOperationException("Cannot alter the Transport"); - } - - @Override - public void unbind() { - throw new UnsupportedOperationException("Cannot alter the Transport"); - } - - @Override - public Record attachments() { - return transport.attachments(); - } - - @Override - public long getFramesInput() { - return transport.getFramesInput(); - } - - @Override - public long getFramesOutput() { - return transport.getFramesOutput(); - } - - @Override - public void setEmitFlowEventOnSend(boolean emitFlowEventOnSend) { - throw new UnsupportedOperationException("Cannot alter the Transport"); - } - - @Override - public boolean isEmitFlowEventOnSend() { - return transport.isEmitFlowEventOnSend(); - } -}