diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java index a30bae12b4..062729cb3b 100644 --- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java +++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java @@ -25,8 +25,8 @@ import java.util.Collections; import java.util.List; import java.util.Map; import java.util.UUID; -import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -39,7 +39,7 @@ import org.apache.activemq.transport.amqp.client.util.AsyncResult; import org.apache.activemq.transport.amqp.client.util.ClientFuture; import org.apache.activemq.transport.amqp.client.util.IdGenerator; import org.apache.activemq.transport.amqp.client.util.NoOpAsyncResult; -import org.apache.activemq.transport.amqp.client.util.UnmodifiableConnection; +import org.apache.activemq.transport.amqp.client.util.UnmodifiableProxy; import org.apache.qpid.proton.amqp.Symbol; import org.apache.qpid.proton.engine.Collector; import org.apache.qpid.proton.engine.Connection; @@ -74,7 +74,7 @@ public class AmqpConnection extends AmqpAbstractResource implements public static final long DEFAULT_CLOSE_TIMEOUT = 30000; public static final long DEFAULT_DRAIN_TIMEOUT = 60000; - private final ScheduledExecutorService serializer; + private ScheduledThreadPoolExecutor serializer; private final AtomicBoolean closed = new AtomicBoolean(); private final AtomicBoolean connected = new AtomicBoolean(); private final AtomicLong sessionIdGenerator = new AtomicLong(); @@ -121,7 +121,7 @@ public class AmqpConnection extends AmqpAbstractResource implements this.connectionId = CONNECTION_ID_GENERATOR.generateId(); this.remoteURI = transport.getRemoteLocation(); - this.serializer = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() { + this.serializer = new ScheduledThreadPoolExecutor(1, new ThreadFactory() { @Override public Thread newThread(Runnable runner) { @@ -132,6 +132,10 @@ public class AmqpConnection extends AmqpAbstractResource implements } }); + // Ensure timely shutdown + this.serializer.setExecuteExistingDelayedTasksAfterShutdownPolicy(false); + this.serializer.setContinueExistingPeriodicTasksAfterShutdownPolicy(false); + this.transport.setTransportListener(this); } @@ -434,7 +438,7 @@ public class AmqpConnection extends AmqpAbstractResource implements } public Connection getConnection() { - return new UnmodifiableConnection(getEndpoint()); + return UnmodifiableProxy.connectionProxy(getEndpoint()); } public AmqpConnectionListener getListener() { diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java index bf9e0b5117..a10d27a6e8 100644 --- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java +++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java @@ -20,7 +20,7 @@ import java.util.HashMap; import java.util.Map; import java.util.NoSuchElementException; -import org.apache.activemq.transport.amqp.client.util.UnmodifiableDelivery; +import org.apache.activemq.transport.amqp.client.util.UnmodifiableProxy; import org.apache.qpid.proton.Proton; import org.apache.qpid.proton.amqp.Binary; import org.apache.qpid.proton.amqp.DescribedType; @@ -100,7 +100,7 @@ public class AmqpMessage { */ public Delivery getWrappedDelivery() { if (delivery != null) { - return new UnmodifiableDelivery(delivery); + return UnmodifiableProxy.deliveryProxy(delivery); } return null; diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java index 8653cffa12..c2c721708e 100644 --- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java +++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java @@ -35,7 +35,7 @@ import javax.jms.InvalidDestinationException; import org.apache.activemq.transport.amqp.client.util.AsyncResult; import org.apache.activemq.transport.amqp.client.util.ClientFuture; import org.apache.activemq.transport.amqp.client.util.IOExceptionSupport; -import org.apache.activemq.transport.amqp.client.util.UnmodifiableReceiver; +import org.apache.activemq.transport.amqp.client.util.UnmodifiableProxy; import org.apache.qpid.jms.JmsOperationTimedOutException; import org.apache.qpid.proton.amqp.Binary; import org.apache.qpid.proton.amqp.DescribedType; @@ -677,7 +677,7 @@ public class AmqpReceiver extends AmqpAbstractResource { * @return an unmodifiable view of the underlying Receiver instance. */ public Receiver getReceiver() { - return new UnmodifiableReceiver(getEndpoint()); + return UnmodifiableProxy.receiverProxy(getEndpoint()); } // ----- Receiver configuration properties --------------------------------// diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSender.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSender.java index 03bd28e846..846739a23d 100644 --- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSender.java +++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSender.java @@ -29,7 +29,7 @@ import javax.jms.InvalidDestinationException; import org.apache.activemq.transport.amqp.client.util.AsyncResult; import org.apache.activemq.transport.amqp.client.util.ClientFuture; -import org.apache.activemq.transport.amqp.client.util.UnmodifiableSender; +import org.apache.activemq.transport.amqp.client.util.UnmodifiableProxy; import org.apache.qpid.proton.amqp.Binary; import org.apache.qpid.proton.amqp.Symbol; import org.apache.qpid.proton.amqp.messaging.Accepted; @@ -231,7 +231,7 @@ public class AmqpSender extends AmqpAbstractResource { * @return an unmodifiable view of the underlying Sender instance. */ public Sender getSender() { - return new UnmodifiableSender(getEndpoint()); + return UnmodifiableProxy.senderProxy(getEndpoint()); } /** diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSession.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSession.java index 677b354ff4..8c331cae17 100644 --- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSession.java +++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSession.java @@ -24,7 +24,7 @@ import java.util.concurrent.atomic.AtomicLong; import org.apache.activemq.transport.amqp.client.util.AsyncResult; import org.apache.activemq.transport.amqp.client.util.ClientFuture; -import org.apache.activemq.transport.amqp.client.util.UnmodifiableSession; +import org.apache.activemq.transport.amqp.client.util.UnmodifiableProxy; import org.apache.qpid.proton.amqp.Symbol; import org.apache.qpid.proton.amqp.messaging.Source; import org.apache.qpid.proton.amqp.messaging.Target; @@ -598,7 +598,7 @@ public class AmqpSession extends AmqpAbstractResource { } public Session getSession() { - return new UnmodifiableSession(getEndpoint()); + return UnmodifiableProxy.sessionProxy(getEndpoint()); } public boolean isInTransaction() { diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTcpTransport.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTcpTransport.java index 29963a02d2..473fd759f7 100644 --- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTcpTransport.java +++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTcpTransport.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. @@ -6,7 +6,7 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -23,25 +23,29 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.activemq.transport.amqp.client.util.IOExceptionSupport; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import io.netty.bootstrap.Bootstrap; import io.netty.buffer.ByteBuf; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; +import io.netty.channel.ChannelPipeline; import io.netty.channel.EventLoopGroup; import io.netty.channel.FixedRecvByteBufAllocator; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.handler.logging.LoggingHandler; import io.netty.handler.ssl.SslHandler; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.GenericFutureListener; -import org.apache.activemq.transport.amqp.client.util.IOExceptionSupport; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * TCP based transport that uses Netty as the underlying IO layer. @@ -50,28 +54,27 @@ public class NettyTcpTransport implements NettyTransport { private static final Logger LOG = LoggerFactory.getLogger(NettyTcpTransport.class); - private static final int QUIET_PERIOD = 20; private static final int SHUTDOWN_TIMEOUT = 100; protected Bootstrap bootstrap; protected EventLoopGroup group; protected Channel channel; protected NettyTransportListener listener; - protected NettyTransportOptions options; + protected final NettyTransportOptions options; protected final URI remote; - protected boolean secure; private final AtomicBoolean connected = new AtomicBoolean(); private final AtomicBoolean closed = new AtomicBoolean(); private final CountDownLatch connectLatch = new CountDownLatch(1); - private IOException failureCause; - private Throwable pendingFailure; + private volatile IOException failureCause; /** * Create a new transport instance * - * @param remoteLocation the URI that defines the remote resource to connect to. - * @param options the transport options used to configure the socket connection. + * @param remoteLocation + * the URI that defines the remote resource to connect to. + * @param options + * the transport options used to configure the socket connection. */ public NettyTcpTransport(URI remoteLocation, NettyTransportOptions options) { this(null, remoteLocation, options); @@ -80,15 +83,25 @@ public class NettyTcpTransport implements NettyTransport { /** * Create a new transport instance * - * @param listener the TransportListener that will receive events from this Transport. - * @param remoteLocation the URI that defines the remote resource to connect to. - * @param options the transport options used to configure the socket connection. + * @param listener + * the TransportListener that will receive events from this Transport. + * @param remoteLocation + * the URI that defines the remote resource to connect to. + * @param options + * the transport options used to configure the socket connection. */ public NettyTcpTransport(NettyTransportListener listener, URI remoteLocation, NettyTransportOptions options) { + if (options == null) { + throw new IllegalArgumentException("Transport Options cannot be null"); + } + + if (remoteLocation == null) { + throw new IllegalArgumentException("Transport remote location cannot be null"); + } + this.options = options; this.listener = listener; this.remote = remoteLocation; - this.secure = remoteLocation.getScheme().equalsIgnoreCase("ssl"); } @Override @@ -98,16 +111,27 @@ public class NettyTcpTransport implements NettyTransport { throw new IllegalStateException("A transport listener must be set before connection attempts."); } + final SslHandler sslHandler; + if (isSSL()) { + try { + sslHandler = NettyTransportSupport.createSslHandler(getRemoteLocation(), getSslOptions()); + } catch (Exception ex) { + // TODO: can we stop it throwing Exception? + throw IOExceptionSupport.create(ex); + } + } else { + sslHandler = null; + } + group = new NioEventLoopGroup(1); bootstrap = new Bootstrap(); bootstrap.group(group); bootstrap.channel(NioSocketChannel.class); bootstrap.handler(new ChannelInitializer() { - @Override public void initChannel(Channel connectedChannel) throws Exception { - configureChannel(connectedChannel); + configureChannel(connectedChannel, sslHandler); } }); @@ -118,12 +142,8 @@ public class NettyTcpTransport implements NettyTransport { @Override public void operationComplete(ChannelFuture future) throws Exception { - if (future.isSuccess()) { - handleConnected(future.channel()); - } else if (future.isCancelled()) { - connectionFailed(future.channel(), new IOException("Connection attempt was cancelled")); - } else { - connectionFailed(future.channel(), IOExceptionSupport.create(future.cause())); + if (!future.isSuccess()) { + handleException(future.channel(), IOExceptionSupport.create(future.cause())); } } }); @@ -143,7 +163,10 @@ public class NettyTcpTransport implements NettyTransport { channel = null; } if (group != null) { - group.shutdownGracefully(QUIET_PERIOD, SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS); + Future fut = group.shutdownGracefully(0, SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS); + if (!fut.awaitUninterruptibly(2 * SHUTDOWN_TIMEOUT)) { + LOG.trace("Channel group shutdown failed to complete in allotted time"); + } group = null; } @@ -154,8 +177,8 @@ public class NettyTcpTransport implements NettyTransport { @Override public void run() { - if (pendingFailure != null) { - channel.pipeline().fireExceptionCaught(pendingFailure); + if (failureCause != null) { + channel.pipeline().fireExceptionCaught(failureCause); } } }); @@ -169,18 +192,24 @@ public class NettyTcpTransport implements NettyTransport { @Override public boolean isSSL() { - return secure; + return options.isSSL(); } @Override public void close() throws IOException { if (closed.compareAndSet(false, true)) { connected.set(false); - if (channel != null) { - channel.close().syncUninterruptibly(); - } - if (group != null) { - group.shutdownGracefully(QUIET_PERIOD, SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS); + try { + if (channel != null) { + channel.close().syncUninterruptibly(); + } + } finally { + if (group != null) { + Future fut = group.shutdownGracefully(0, SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS); + if (!fut.awaitUninterruptibly(2 * SHUTDOWN_TIMEOUT)) { + LOG.trace("Channel group shutdown failed to complete in allotted time"); + } + } } } } @@ -216,14 +245,6 @@ public class NettyTcpTransport implements NettyTransport { @Override public NettyTransportOptions getTransportOptions() { - if (options == null) { - if (isSSL()) { - options = NettyTransportSslOptions.INSTANCE; - } else { - options = NettyTransportOptions.INSTANCE; - } - } - return options; } @@ -234,36 +255,106 @@ public class NettyTcpTransport implements NettyTransport { @Override public Principal getLocalPrincipal() { - if (!isSSL()) { - throw new UnsupportedOperationException("Not connected to a secure channel"); + Principal result = null; + + if (isSSL()) { + SslHandler sslHandler = channel.pipeline().get(SslHandler.class); + result = sslHandler.engine().getSession().getLocalPrincipal(); } - SslHandler sslHandler = channel.pipeline().get(SslHandler.class); - - return sslHandler.engine().getSession().getLocalPrincipal(); + return result; } - //----- Internal implementation details, can be overridden as needed --// + // ----- Internal implementation details, can be overridden as needed -----// protected String getRemoteHost() { return remote.getHost(); } protected int getRemotePort() { - int port = remote.getPort(); - - if (port <= 0) { - if (isSSL()) { - port = getSslOptions().getDefaultSslPort(); - } else { - port = getTransportOptions().getDefaultTcpPort(); - } + if (remote.getPort() != -1) { + return remote.getPort(); + } else { + return isSSL() ? getSslOptions().getDefaultSslPort() : getTransportOptions().getDefaultTcpPort(); } - - return port; } - protected void configureNetty(Bootstrap bootstrap, NettyTransportOptions options) { + protected void addAdditionalHandlers(ChannelPipeline pipeline) { + + } + + protected ChannelInboundHandlerAdapter createChannelHandler() { + return new NettyTcpTransportHandler(); + } + + // ----- Event Handlers which can be overridden in subclasses -------------// + + protected void handleConnected(Channel channel) throws Exception { + LOG.trace("Channel has become active! Channel is {}", channel); + connectionEstablished(channel); + } + + protected void handleChannelInactive(Channel channel) throws Exception { + LOG.trace("Channel has gone inactive! Channel is {}", channel); + if (connected.compareAndSet(true, false) && !closed.get()) { + LOG.trace("Firing onTransportClosed listener"); + listener.onTransportClosed(); + } + } + + protected void handleException(Channel channel, Throwable cause) throws Exception { + LOG.trace("Exception on channel! Channel is {}", channel); + if (connected.compareAndSet(true, false) && !closed.get()) { + LOG.trace("Firing onTransportError listener"); + if (failureCause != null) { + listener.onTransportError(failureCause); + } else { + listener.onTransportError(cause); + } + } else { + // Hold the first failure for later dispatch if connect succeeds. + // This will then trigger disconnect using the first error reported. + if (failureCause == null) { + LOG.trace("Holding error until connect succeeds: {}", cause.getMessage()); + failureCause = IOExceptionSupport.create(cause); + } + + connectionFailed(channel, failureCause); + } + } + + // ----- State change handlers and checks ---------------------------------// + + protected final void checkConnected() throws IOException { + if (!connected.get()) { + throw new IOException("Cannot send to a non-connected transport."); + } + } + + /* + * Called when the transport has successfully connected and is ready for use. + */ + private void connectionEstablished(Channel connectedChannel) { + channel = connectedChannel; + connected.set(true); + connectLatch.countDown(); + } + + /* + * Called when the transport connection failed and an error should be returned. + */ + private void connectionFailed(Channel failedChannel, IOException cause) { + failureCause = cause; + channel = failedChannel; + connected.set(false); + connectLatch.countDown(); + } + + private NettyTransportSslOptions getSslOptions() { + return (NettyTransportSslOptions) getTransportOptions(); + } + + private void configureNetty(Bootstrap bootstrap, NettyTransportOptions options) { bootstrap.option(ChannelOption.TCP_NODELAY, options.isTcpNoDelay()); bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, options.getConnectTimeout()); bootstrap.option(ChannelOption.SO_KEEPALIVE, options.isTcpKeepAlive()); @@ -283,105 +374,66 @@ public class NettyTcpTransport implements NettyTransport { } } - protected void configureChannel(final Channel channel) throws Exception { + private void configureChannel(final Channel channel, final SslHandler sslHandler) throws Exception { if (isSSL()) { - SslHandler sslHandler = NettyTransportSupport.createSslHandler(getRemoteLocation(), getSslOptions()); - sslHandler.handshakeFuture().addListener(new GenericFutureListener>() { - @Override - public void operationComplete(Future future) throws Exception { - if (future.isSuccess()) { - LOG.trace("SSL Handshake has completed: {}", channel); - connectionEstablished(channel); - } else { - LOG.trace("SSL Handshake has failed: {}", channel); - connectionFailed(channel, IOExceptionSupport.create(future.cause())); - } - } - }); - channel.pipeline().addLast(sslHandler); } - channel.pipeline().addLast(new NettyTcpTransportHandler()); - } - - protected void handleConnected(final Channel channel) throws Exception { - if (!isSSL()) { - connectionEstablished(channel); + if (getTransportOptions().isTraceBytes()) { + channel.pipeline().addLast("logger", new LoggingHandler(getClass())); } + + addAdditionalHandlers(channel.pipeline()); + + channel.pipeline().addLast(createChannelHandler()); } - //----- State change handlers and checks ---------------------------------// + // ----- Handle connection events -----------------------------------------// - /** - * Called when the transport has successfully connected and is ready for use. - */ - protected void connectionEstablished(Channel connectedChannel) { - channel = connectedChannel; - connected.set(true); - connectLatch.countDown(); - } + protected abstract class NettyDefaultHandler extends SimpleChannelInboundHandler { - /** - * Called when the transport connection failed and an error should be returned. - * - * @param failedChannel The Channel instance that failed. - * @param cause An IOException that describes the cause of the failed connection. - */ - protected void connectionFailed(Channel failedChannel, IOException cause) { - failureCause = IOExceptionSupport.create(cause); - channel = failedChannel; - connected.set(false); - connectLatch.countDown(); - } - - private NettyTransportSslOptions getSslOptions() { - return (NettyTransportSslOptions) getTransportOptions(); - } - - private void checkConnected() throws IOException { - if (!connected.get()) { - throw new IOException("Cannot send to a non-connected transport."); + @Override + public void channelRegistered(ChannelHandlerContext context) throws Exception { + channel = context.channel(); } - } - - //----- Handle connection events -----------------------------------------// - - private class NettyTcpTransportHandler extends SimpleChannelInboundHandler { @Override public void channelActive(ChannelHandlerContext context) throws Exception { - LOG.trace("Channel has become active! Channel is {}", context.channel()); + // In the Secure case we need to let the handshake complete before we + // trigger the connected event. + if (!isSSL()) { + handleConnected(context.channel()); + } else { + SslHandler sslHandler = context.pipeline().get(SslHandler.class); + sslHandler.handshakeFuture().addListener(new GenericFutureListener>() { + @Override + public void operationComplete(Future future) throws Exception { + if (future.isSuccess()) { + LOG.trace("SSL Handshake has completed: {}", channel); + handleConnected(channel); + } else { + LOG.trace("SSL Handshake has failed: {}", channel); + handleException(channel, future.cause()); + } + } + }); + } } @Override public void channelInactive(ChannelHandlerContext context) throws Exception { - LOG.trace("Channel has gone inactive! Channel is {}", context.channel()); - if (connected.compareAndSet(true, false) && !closed.get()) { - LOG.trace("Firing onTransportClosed listener"); - listener.onTransportClosed(); - } + handleChannelInactive(context.channel()); } @Override public void exceptionCaught(ChannelHandlerContext context, Throwable cause) throws Exception { - LOG.trace("Exception on channel! Channel is {}", context.channel()); - if (connected.compareAndSet(true, false) && !closed.get()) { - LOG.trace("Firing onTransportError listener"); - if (pendingFailure != null) { - listener.onTransportError(pendingFailure); - } else { - listener.onTransportError(cause); - } - } else { - // Hold the first failure for later dispatch if connect succeeds. - // This will then trigger disconnect using the first error reported. - if (pendingFailure != null) { - LOG.trace("Holding error until connect succeeds: {}", cause.getMessage()); - pendingFailure = cause; - } - } + handleException(context.channel(), cause); } + } + + // ----- Handle Binary data from connection -------------------------------// + + protected class NettyTcpTransportHandler extends NettyDefaultHandler { @Override protected void channelRead0(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception { diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransport.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransport.java index a2bacdca78..ad0a1fbc1b 100644 --- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransport.java +++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransport.java @@ -23,7 +23,7 @@ import java.security.Principal; import io.netty.buffer.ByteBuf; /** - * + * Base for all Netty based Transports in this client. */ public interface NettyTransport { diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportFactory.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportFactory.java index f6eae46385..30b2e21f41 100644 --- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportFactory.java +++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportFactory.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. @@ -6,7 +6,7 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -30,12 +30,16 @@ public final class NettyTransportFactory { } /** - * Creates an instance of the given Transport and configures it using the - * properties set on the given remote broker URI. + * Creates an instance of the given Transport and configures it using the properties set on + * the given remote broker URI. + * + * @param remoteURI + * The URI used to connect to a remote Peer. * - * @param remoteURI The URI used to connect to a remote Peer. * @return a new Transport instance. - * @throws Exception if an error occurs while creating the Transport instance. + * + * @throws Exception + * if an error occurs while creating the Transport instance. */ public static NettyTransport createTransport(URI remoteURI) throws Exception { Map map = PropertyUtil.parseQuery(remoteURI.getQuery()); diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportListener.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportListener.java index c23ca8c714..01635171f7 100644 --- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportListener.java +++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportListener.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. @@ -6,7 +6,7 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -19,15 +19,16 @@ package org.apache.activemq.transport.amqp.client.transport; import io.netty.buffer.ByteBuf; /** - * Listener interface that should be implemented by users of the various - * QpidJMS Transport classes. + * Listener interface that should be implemented by users of the various QpidJMS Transport + * classes. */ public interface NettyTransportListener { /** * Called when new incoming data has become available. * - * @param incoming the next incoming packet of data. + * @param incoming + * the next incoming packet of data. */ void onData(ByteBuf incoming); @@ -39,7 +40,8 @@ public interface NettyTransportListener { /** * Called when an error occurs during normal Transport operations. * - * @param cause the error that triggered this event. + * @param cause + * the error that triggered this event. */ void onTransportError(Throwable cause); diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportOptions.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportOptions.java index 3ffb8c8d22..c5022c1f38 100644 --- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportOptions.java +++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportOptions.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. @@ -6,7 +6,7 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -30,6 +30,7 @@ public class NettyTransportOptions implements Cloneable { public static final int DEFAULT_SO_TIMEOUT = -1; public static final int DEFAULT_CONNECT_TIMEOUT = 60000; public static final int DEFAULT_TCP_PORT = 5672; + public static final boolean DEFAULT_TRACE_BYTES = false; public static final NettyTransportOptions INSTANCE = new NettyTransportOptions(); @@ -42,6 +43,7 @@ public class NettyTransportOptions implements Cloneable { private boolean tcpKeepAlive = DEFAULT_TCP_KEEP_ALIVE; private boolean tcpNoDelay = DEFAULT_TCP_NO_DELAY; private int defaultTcpPort = DEFAULT_TCP_PORT; + private boolean traceBytes = DEFAULT_TRACE_BYTES; /** * @return the currently set send buffer size in bytes. @@ -51,11 +53,14 @@ public class NettyTransportOptions implements Cloneable { } /** - * Sets the send buffer size in bytes, the value must be greater than zero - * or an {@link IllegalArgumentException} will be thrown. + * Sets the send buffer size in bytes, the value must be greater than zero or an + * {@link IllegalArgumentException} will be thrown. * - * @param sendBufferSize the new send buffer size for the TCP Transport. - * @throws IllegalArgumentException if the value given is not in the valid range. + * @param sendBufferSize + * the new send buffer size for the TCP Transport. + * + * @throws IllegalArgumentException + * if the value given is not in the valid range. */ public void setSendBufferSize(int sendBufferSize) { if (sendBufferSize <= 0) { @@ -73,11 +78,14 @@ public class NettyTransportOptions implements Cloneable { } /** - * Sets the receive buffer size in bytes, the value must be greater than zero - * or an {@link IllegalArgumentException} will be thrown. + * Sets the receive buffer size in bytes, the value must be greater than zero or an + * {@link IllegalArgumentException} will be thrown. * - * @param receiveBufferSize the new receive buffer size for the TCP Transport. - * @throws IllegalArgumentException if the value given is not in the valid range. + * @param receiveBufferSize + * the new receive buffer size for the TCP Transport. + * + * @throws IllegalArgumentException + * if the value given is not in the valid range. */ public void setReceiveBufferSize(int receiveBufferSize) { if (receiveBufferSize <= 0) { @@ -95,11 +103,13 @@ public class NettyTransportOptions implements Cloneable { } /** - * Sets the traffic class value used by the TCP connection, valid - * range is between 0 and 255. + * Sets the traffic class value used by the TCP connection, valid range is between 0 and 255. * - * @param trafficClass the new traffic class value. - * @throws IllegalArgumentException if the value given is not in the valid range. + * @param trafficClass + * the new traffic class value. + * + * @throws IllegalArgumentException + * if the value given is not in the valid range. */ public void setTrafficClass(int trafficClass) { if (trafficClass < 0 || trafficClass > 255) { @@ -157,6 +167,27 @@ public class NettyTransportOptions implements Cloneable { this.defaultTcpPort = defaultTcpPort; } + /** + * @return true if the transport should enable byte tracing + */ + public boolean isTraceBytes() { + return traceBytes; + } + + /** + * Determines if the transport should add a logger for bytes in / out + * + * @param traceBytes + * should the transport log the bytes in and out. + */ + public void setTraceBytes(boolean traceBytes) { + this.traceBytes = traceBytes; + } + + public boolean isSSL() { + return false; + } + @Override public NettyTransportOptions clone() { return copyOptions(new NettyTransportOptions()); diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportSslOptions.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportSslOptions.java index e256fbba8a..3289fce1d8 100644 --- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportSslOptions.java +++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportSslOptions.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. @@ -6,7 +6,7 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -21,9 +21,8 @@ import java.util.Collections; import java.util.List; /** - * Holds the defined SSL options for connections that operate over a secure - * transport. Options are read from the environment and can be overridden by - * specifying them on the connection URI. + * Holds the defined SSL options for connections that operate over a secure transport. Options + * are read from the environment and can be overridden by specifying them on the connection URI. */ public class NettyTransportSslOptions extends NettyTransportOptions { @@ -31,7 +30,7 @@ public class NettyTransportSslOptions extends NettyTransportOptions { public static final String DEFAULT_CONTEXT_PROTOCOL = "TLS"; public static final boolean DEFAULT_TRUST_ALL = false; public static final boolean DEFAULT_VERIFY_HOST = false; - public static final List DEFAULT_DISABLED_PROTOCOLS = Collections.unmodifiableList(Arrays.asList(new String[]{"SSLv2Hello", "SSLv3"})); + public static final List DEFAULT_DISABLED_PROTOCOLS = Collections.unmodifiableList(Arrays.asList(new String[] {"SSLv2Hello", "SSLv3"})); public static final int DEFAULT_SSL_PORT = 5671; public static final NettyTransportSslOptions INSTANCE = new NettyTransportSslOptions(); @@ -69,7 +68,8 @@ public class NettyTransportSslOptions extends NettyTransportOptions { /** * Sets the location on disk of the key store to use. * - * @param keyStoreLocation the keyStoreLocation to use to create the key manager. + * @param keyStoreLocation + * the keyStoreLocation to use to create the key manager. */ public void setKeyStoreLocation(String keyStoreLocation) { this.keyStoreLocation = keyStoreLocation; @@ -83,7 +83,8 @@ public class NettyTransportSslOptions extends NettyTransportOptions { } /** - * @param keyStorePassword the keyStorePassword to set + * @param keyStorePassword + * the keyStorePassword to set */ public void setKeyStorePassword(String keyStorePassword) { this.keyStorePassword = keyStorePassword; @@ -97,7 +98,8 @@ public class NettyTransportSslOptions extends NettyTransportOptions { } /** - * @param trustStoreLocation the trustStoreLocation to set + * @param trustStoreLocation + * the trustStoreLocation to set */ public void setTrustStoreLocation(String trustStoreLocation) { this.trustStoreLocation = trustStoreLocation; @@ -111,7 +113,8 @@ public class NettyTransportSslOptions extends NettyTransportOptions { } /** - * @param trustStorePassword the trustStorePassword to set + * @param trustStorePassword + * the trustStorePassword to set */ public void setTrustStorePassword(String trustStorePassword) { this.trustStorePassword = trustStorePassword; @@ -125,7 +128,8 @@ public class NettyTransportSslOptions extends NettyTransportOptions { } /** - * @param storeType the format that the store files are encoded in. + * @param storeType + * the format that the store files are encoded in. */ public void setStoreType(String storeType) { this.storeType = storeType; @@ -139,7 +143,8 @@ public class NettyTransportSslOptions extends NettyTransportOptions { } /** - * @param enabledCipherSuites the enabledCipherSuites to set + * @param enabledCipherSuites + * the enabledCipherSuites to set */ public void setEnabledCipherSuites(String[] enabledCipherSuites) { this.enabledCipherSuites = enabledCipherSuites; @@ -153,7 +158,8 @@ public class NettyTransportSslOptions extends NettyTransportOptions { } /** - * @param disabledCipherSuites the disabledCipherSuites to set + * @param disabledCipherSuites + * the disabledCipherSuites to set */ public void setDisabledCipherSuites(String[] disabledCipherSuites) { this.disabledCipherSuites = disabledCipherSuites; @@ -169,13 +175,15 @@ public class NettyTransportSslOptions extends NettyTransportOptions { /** * The protocols to be set as enabled. * - * @param enabledProtocols the enabled protocols to set, or null if the defaults should be used. + * @param enabledProtocols + * the enabled protocols to set, or null if the defaults should be used. */ public void setEnabledProtocols(String[] enabledProtocols) { this.enabledProtocols = enabledProtocols; } /** + * * @return the protocols to disable or null if none should be */ public String[] getDisabledProtocols() { @@ -185,7 +193,8 @@ public class NettyTransportSslOptions extends NettyTransportOptions { /** * The protocols to be disable. * - * @param disabledProtocols the protocols to disable, or null if none should be. + * @param disabledProtocols + * the protocols to disable, or null if none should be. */ public void setDisabledProtocols(String[] disabledProtocols) { this.disabledProtocols = disabledProtocols; @@ -202,7 +211,8 @@ public class NettyTransportSslOptions extends NettyTransportOptions { * The protocol value to use when creating an SSLContext via * SSLContext.getInstance(protocol). * - * @param contextProtocol the context protocol to use. + * @param contextProtocol + * the context protocol to use. */ public void setContextProtocol(String contextProtocol) { this.contextProtocol = contextProtocol; @@ -216,7 +226,8 @@ public class NettyTransportSslOptions extends NettyTransportOptions { } /** - * @param trustAll the trustAll to set + * @param trustAll + * the trustAll to set */ public void setTrustAll(boolean trustAll) { this.trustAll = trustAll; @@ -230,7 +241,8 @@ public class NettyTransportSslOptions extends NettyTransportOptions { } /** - * @param verifyHost the verifyHost to set + * @param verifyHost + * the verifyHost to set */ public void setVerifyHost(boolean verifyHost) { this.verifyHost = verifyHost; @@ -244,7 +256,8 @@ public class NettyTransportSslOptions extends NettyTransportOptions { } /** - * @param keyAlias the key alias to use + * @param keyAlias + * the key alias to use */ public void setKeyAlias(String keyAlias) { this.keyAlias = keyAlias; @@ -258,6 +271,11 @@ public class NettyTransportSslOptions extends NettyTransportOptions { this.defaultSslPort = defaultSslPort; } + @Override + public boolean isSSL() { + return true; + } + @Override public NettyTransportSslOptions clone() { return copyOptions(new NettyTransportSslOptions()); diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportSupport.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportSupport.java index 15854e8baa..d41c669135 100644 --- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportSupport.java +++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportSupport.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. @@ -6,7 +6,7 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -16,15 +16,6 @@ */ package org.apache.activemq.transport.amqp.client.transport; -import javax.net.ssl.KeyManager; -import javax.net.ssl.KeyManagerFactory; -import javax.net.ssl.SSLContext; -import javax.net.ssl.SSLEngine; -import javax.net.ssl.SSLParameters; -import javax.net.ssl.TrustManager; -import javax.net.ssl.TrustManagerFactory; -import javax.net.ssl.X509ExtendedKeyManager; -import javax.net.ssl.X509TrustManager; import java.io.File; import java.io.FileInputStream; import java.io.InputStream; @@ -38,10 +29,21 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; -import io.netty.handler.ssl.SslHandler; +import javax.net.ssl.KeyManager; +import javax.net.ssl.KeyManagerFactory; +import javax.net.ssl.SSLContext; +import javax.net.ssl.SSLEngine; +import javax.net.ssl.SSLParameters; +import javax.net.ssl.TrustManager; +import javax.net.ssl.TrustManagerFactory; +import javax.net.ssl.X509ExtendedKeyManager; +import javax.net.ssl.X509TrustManager; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import io.netty.handler.ssl.SslHandler; + /** * Static class that provides various utility methods used by Transport implementations. */ @@ -50,13 +52,18 @@ public class NettyTransportSupport { private static final Logger LOG = LoggerFactory.getLogger(NettyTransportSupport.class); /** - * Creates a Netty SslHandler instance for use in Transports that require - * an SSL encoder / decoder. + * Creates a Netty SslHandler instance for use in Transports that require an SSL encoder / + * decoder. + * + * @param remote + * The URI of the remote peer that the SslHandler will be used against. + * @param options + * The SSL options object to build the SslHandler instance from. * - * @param remote The URI of the remote peer that the SslHandler will be used against. - * @param options The SSL options object to build the SslHandler instance from. * @return a new SslHandler that is configured from the given options. - * @throws Exception if an error occurs while creating the SslHandler instance. + * + * @throws Exception + * if an error occurs while creating the SslHandler instance. */ public static SslHandler createSslHandler(URI remote, NettyTransportSslOptions options) throws Exception { return new SslHandler(createSslEngine(remote, createSslContext(options), options)); @@ -66,9 +73,13 @@ public class NettyTransportSupport { * Create a new SSLContext using the options specific in the given TransportSslOptions * instance. * - * @param options the configured options used to create the SSLContext. + * @param options + * the configured options used to create the SSLContext. + * * @return a new SSLContext instance. - * @throws Exception if an error occurs while creating the context. + * + * @throws Exception + * if an error occurs while creating the context. */ public static SSLContext createSslContext(NettyTransportSslOptions options) throws Exception { try { @@ -91,10 +102,15 @@ public class NettyTransportSupport { * Create a new SSLEngine instance in client mode from the given SSLContext and * TransportSslOptions instances. * - * @param context the SSLContext to use when creating the engine. - * @param options the TransportSslOptions to use to configure the new SSLEngine. + * @param context + * the SSLContext to use when creating the engine. + * @param options + * the TransportSslOptions to use to configure the new SSLEngine. + * * @return a new SSLEngine instance in client mode. - * @throws Exception if an error occurs while creating the new SSLEngine. + * + * @throws Exception + * if an error occurs while creating the new SSLEngine. */ public static SSLEngine createSslEngine(SSLContext context, NettyTransportSslOptions options) throws Exception { return createSslEngine(null, context, options); @@ -104,15 +120,20 @@ public class NettyTransportSupport { * Create a new SSLEngine instance in client mode from the given SSLContext and * TransportSslOptions instances. * - * @param remote the URI of the remote peer that will be used to initialize the engine, may be null if none should. - * @param context the SSLContext to use when creating the engine. - * @param options the TransportSslOptions to use to configure the new SSLEngine. + * @param remote + * the URI of the remote peer that will be used to initialize the engine, may be null + * if none should. + * @param context + * the SSLContext to use when creating the engine. + * @param options + * the TransportSslOptions to use to configure the new SSLEngine. + * * @return a new SSLEngine instance in client mode. - * @throws Exception if an error occurs while creating the new SSLEngine. + * + * @throws Exception + * if an error occurs while creating the new SSLEngine. */ - public static SSLEngine createSslEngine(URI remote, - SSLContext context, - NettyTransportSslOptions options) throws Exception { + public static SSLEngine createSslEngine(URI remote, SSLContext context, NettyTransportSslOptions options) throws Exception { SSLEngine engine = null; if (remote == null) { engine = context.createSSLEngine(); @@ -185,7 +206,7 @@ public class NettyTransportSupport { private static TrustManager[] loadTrustManagers(NettyTransportSslOptions options) throws Exception { if (options.isTrustAll()) { - return new TrustManager[]{createTrustAllTrustManager()}; + return new TrustManager[] {createTrustAllTrustManager()}; } if (options.getTrustStoreLocation() == null) { diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyWSTransport.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyWSTransport.java index f75a52ebb9..eb595d09b8 100644 --- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyWSTransport.java +++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyWSTransport.java @@ -18,73 +18,46 @@ package org.apache.activemq.transport.amqp.client.transport; import java.io.IOException; import java.net.URI; -import java.security.Principal; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; +import java.nio.charset.StandardCharsets; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -import io.netty.bootstrap.Bootstrap; import io.netty.buffer.ByteBuf; import io.netty.channel.Channel; -import io.netty.channel.ChannelFuture; -import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelInitializer; -import io.netty.channel.ChannelOption; -import io.netty.channel.ChannelPromise; -import io.netty.channel.EventLoopGroup; -import io.netty.channel.FixedRecvByteBufAllocator; -import io.netty.channel.SimpleChannelInboundHandler; -import io.netty.channel.nio.NioEventLoopGroup; -import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.channel.ChannelPipeline; import io.netty.handler.codec.http.DefaultHttpHeaders; import io.netty.handler.codec.http.FullHttpResponse; import io.netty.handler.codec.http.HttpClientCodec; import io.netty.handler.codec.http.HttpObjectAggregator; import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame; import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame; +import io.netty.handler.codec.http.websocketx.PingWebSocketFrame; import io.netty.handler.codec.http.websocketx.PongWebSocketFrame; import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; import io.netty.handler.codec.http.websocketx.WebSocketClientHandshaker; import io.netty.handler.codec.http.websocketx.WebSocketClientHandshakerFactory; import io.netty.handler.codec.http.websocketx.WebSocketFrame; import io.netty.handler.codec.http.websocketx.WebSocketVersion; -import io.netty.handler.ssl.SslHandler; -import io.netty.util.CharsetUtil; -import io.netty.util.concurrent.Future; -import io.netty.util.concurrent.GenericFutureListener; -import org.apache.activemq.transport.amqp.client.util.IOExceptionSupport; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * Transport for communicating over WebSockets */ -public class NettyWSTransport implements NettyTransport { +public class NettyWSTransport extends NettyTcpTransport { private static final Logger LOG = LoggerFactory.getLogger(NettyWSTransport.class); - private static final int QUIET_PERIOD = 20; - private static final int SHUTDOWN_TIMEOUT = 100; - - protected Bootstrap bootstrap; - protected EventLoopGroup group; - protected Channel channel; - protected NettyTransportListener listener; - protected NettyTransportOptions options; - protected final URI remote; - protected boolean secure; - - private final AtomicBoolean connected = new AtomicBoolean(); - private final AtomicBoolean closed = new AtomicBoolean(); - private ChannelPromise handshakeFuture; - private IOException failureCause; - private Throwable pendingFailure; + private static final String AMQP_SUB_PROTOCOL = "amqp"; /** * Create a new transport instance * - * @param remoteLocation the URI that defines the remote resource to connect to. - * @param options the transport options used to configure the socket connection. + * @param remoteLocation + * the URI that defines the remote resource to connect to. + * @param options + * the transport options used to configure the socket connection. */ public NettyWSTransport(URI remoteLocation, NettyTransportOptions options) { this(null, remoteLocation, options); @@ -93,119 +66,15 @@ public class NettyWSTransport implements NettyTransport { /** * Create a new transport instance * - * @param listener the TransportListener that will receive events from this Transport. - * @param remoteLocation the URI that defines the remote resource to connect to. - * @param options the transport options used to configure the socket connection. + * @param listener + * the TransportListener that will receive events from this Transport. + * @param remoteLocation + * the URI that defines the remote resource to connect to. + * @param options + * the transport options used to configure the socket connection. */ public NettyWSTransport(NettyTransportListener listener, URI remoteLocation, NettyTransportOptions options) { - this.options = options; - this.listener = listener; - this.remote = remoteLocation; - this.secure = remoteLocation.getScheme().equalsIgnoreCase("wss"); - } - - @Override - public void connect() throws IOException { - - if (listener == null) { - throw new IllegalStateException("A transport listener must be set before connection attempts."); - } - - group = new NioEventLoopGroup(1); - - bootstrap = new Bootstrap(); - bootstrap.group(group); - bootstrap.channel(NioSocketChannel.class); - bootstrap.handler(new ChannelInitializer() { - - @Override - public void initChannel(Channel connectedChannel) throws Exception { - configureChannel(connectedChannel); - } - }); - - configureNetty(bootstrap, getTransportOptions()); - - ChannelFuture future; - try { - future = bootstrap.connect(getRemoteHost(), getRemotePort()); - future.addListener(new ChannelFutureListener() { - - @Override - public void operationComplete(ChannelFuture future) throws Exception { - if (future.isSuccess()) { - handleConnected(future.channel()); - } else if (future.isCancelled()) { - connectionFailed(future.channel(), new IOException("Connection attempt was cancelled")); - } else { - connectionFailed(future.channel(), IOExceptionSupport.create(future.cause())); - } - } - }); - - future.sync(); - - // Now wait for WS protocol level handshake completion - handshakeFuture.await(); - } catch (InterruptedException ex) { - LOG.debug("Transport connection attempt was interrupted."); - Thread.interrupted(); - failureCause = IOExceptionSupport.create(ex); - } - - if (failureCause != null) { - // Close out any Netty resources now as they are no longer needed. - if (channel != null) { - channel.close().syncUninterruptibly(); - channel = null; - } - if (group != null) { - group.shutdownGracefully(QUIET_PERIOD, SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS); - group = null; - } - - throw failureCause; - } else { - // Connected, allow any held async error to fire now and close the transport. - channel.eventLoop().execute(new Runnable() { - - @Override - public void run() { - if (pendingFailure != null) { - channel.pipeline().fireExceptionCaught(pendingFailure); - } - } - }); - } - } - - @Override - public boolean isConnected() { - return connected.get(); - } - - @Override - public boolean isSSL() { - return secure; - } - - @Override - public void close() throws IOException { - if (closed.compareAndSet(false, true)) { - connected.set(false); - if (channel != null) { - channel.close().syncUninterruptibly(); - } - if (group != null) { - group.shutdownGracefully(QUIET_PERIOD, SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS); - } - } - } - - @Override - public ByteBuf allocateSendBuffer(int size) throws IOException { - checkConnected(); - return channel.alloc().ioBuffer(size, size); + super(listener, remoteLocation, options); } @Override @@ -222,202 +91,37 @@ public class NettyWSTransport implements NettyTransport { } @Override - public NettyTransportListener getTransportListener() { - return listener; + protected ChannelInboundHandlerAdapter createChannelHandler() { + return new NettyWebSocketTransportHandler(); } @Override - public void setTransportListener(NettyTransportListener listener) { - this.listener = listener; + protected void addAdditionalHandlers(ChannelPipeline pipeline) { + pipeline.addLast(new HttpClientCodec()); + pipeline.addLast(new HttpObjectAggregator(8192)); } @Override - public NettyTransportOptions getTransportOptions() { - if (options == null) { - if (isSSL()) { - options = NettyTransportSslOptions.INSTANCE; - } else { - options = NettyTransportOptions.INSTANCE; - } - } - - return options; + protected void handleConnected(Channel channel) throws Exception { + LOG.trace("Channel has become active, awaiting WebSocket handshake! Channel is {}", channel); } - @Override - public URI getRemoteLocation() { - return remote; - } + // ----- Handle connection events -----------------------------------------// - @Override - public Principal getLocalPrincipal() { - if (!isSSL()) { - throw new UnsupportedOperationException("Not connected to a secure channel"); - } - - SslHandler sslHandler = channel.pipeline().get(SslHandler.class); - - return sslHandler.engine().getSession().getLocalPrincipal(); - } - - //----- Internal implementation details, can be overridden as needed --// - - protected String getRemoteHost() { - return remote.getHost(); - } - - protected int getRemotePort() { - int port = remote.getPort(); - - if (port <= 0) { - if (isSSL()) { - port = getSslOptions().getDefaultSslPort(); - } else { - port = getTransportOptions().getDefaultTcpPort(); - } - } - - return port; - } - - protected void configureNetty(Bootstrap bootstrap, NettyTransportOptions options) { - bootstrap.option(ChannelOption.TCP_NODELAY, options.isTcpNoDelay()); - bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, options.getConnectTimeout()); - bootstrap.option(ChannelOption.SO_KEEPALIVE, options.isTcpKeepAlive()); - bootstrap.option(ChannelOption.SO_LINGER, options.getSoLinger()); - - if (options.getSendBufferSize() != -1) { - bootstrap.option(ChannelOption.SO_SNDBUF, options.getSendBufferSize()); - } - - if (options.getReceiveBufferSize() != -1) { - bootstrap.option(ChannelOption.SO_RCVBUF, options.getReceiveBufferSize()); - bootstrap.option(ChannelOption.RCVBUF_ALLOCATOR, new FixedRecvByteBufAllocator(options.getReceiveBufferSize())); - } - - if (options.getTrafficClass() != -1) { - bootstrap.option(ChannelOption.IP_TOS, options.getTrafficClass()); - } - } - - protected void configureChannel(final Channel channel) throws Exception { - if (isSSL()) { - SslHandler sslHandler = NettyTransportSupport.createSslHandler(getRemoteLocation(), getSslOptions()); - sslHandler.handshakeFuture().addListener(new GenericFutureListener>() { - @Override - public void operationComplete(Future future) throws Exception { - if (future.isSuccess()) { - LOG.trace("SSL Handshake has completed: {}", channel); - connectionEstablished(channel); - } else { - LOG.trace("SSL Handshake has failed: {}", channel); - connectionFailed(channel, IOExceptionSupport.create(future.cause())); - } - } - }); - - channel.pipeline().addLast(sslHandler); - } - - channel.pipeline().addLast(new HttpClientCodec()); - channel.pipeline().addLast(new HttpObjectAggregator(8192)); - channel.pipeline().addLast(new NettyTcpTransportHandler()); - } - - protected void handleConnected(final Channel channel) throws Exception { - if (!isSSL()) { - connectionEstablished(channel); - } - } - - //----- State change handlers and checks ---------------------------------// - - /** - * Called when the transport has successfully connected and is ready for use. - */ - protected void connectionEstablished(Channel connectedChannel) { - LOG.info("WebSocket connectionEstablished! {}", connectedChannel); - channel = connectedChannel; - connected.set(true); - } - - /** - * Called when the transport connection failed and an error should be returned. - * - * @param failedChannel The Channel instance that failed. - * @param cause An IOException that describes the cause of the failed connection. - */ - protected void connectionFailed(Channel failedChannel, IOException cause) { - failureCause = IOExceptionSupport.create(cause); - channel = failedChannel; - connected.set(false); - handshakeFuture.setFailure(cause); - } - - private NettyTransportSslOptions getSslOptions() { - return (NettyTransportSslOptions) getTransportOptions(); - } - - private void checkConnected() throws IOException { - if (!connected.get()) { - throw new IOException("Cannot send to a non-connected transport."); - } - } - - //----- Handle connection events -----------------------------------------// - - private class NettyTcpTransportHandler extends SimpleChannelInboundHandler { + private class NettyWebSocketTransportHandler extends NettyDefaultHandler { private final WebSocketClientHandshaker handshaker; - NettyTcpTransportHandler() { - handshaker = WebSocketClientHandshakerFactory.newHandshaker(remote, WebSocketVersion.V13, "amqp", false, new DefaultHttpHeaders()); - } - - @Override - public void handlerAdded(ChannelHandlerContext context) { - LOG.trace("Handler has become added! Channel is {}", context.channel()); - handshakeFuture = context.newPromise(); + NettyWebSocketTransportHandler() { + handshaker = WebSocketClientHandshakerFactory.newHandshaker(getRemoteLocation(), WebSocketVersion.V13, AMQP_SUB_PROTOCOL, true, + new DefaultHttpHeaders()); } @Override public void channelActive(ChannelHandlerContext context) throws Exception { - LOG.trace("Channel has become active! Channel is {}", context.channel()); handshaker.handshake(context.channel()); - } - @Override - public void channelInactive(ChannelHandlerContext context) throws Exception { - LOG.trace("Channel has gone inactive! Channel is {}", context.channel()); - if (connected.compareAndSet(true, false) && !closed.get()) { - LOG.trace("Firing onTransportClosed listener"); - listener.onTransportClosed(); - } - } - - @Override - public void exceptionCaught(ChannelHandlerContext context, Throwable cause) throws Exception { - LOG.trace("Exception on channel! Channel is {} -> {}", context.channel(), cause.getMessage()); - LOG.trace("Error Stack: ", cause); - if (connected.compareAndSet(true, false) && !closed.get()) { - LOG.trace("Firing onTransportError listener"); - if (pendingFailure != null) { - listener.onTransportError(pendingFailure); - } else { - listener.onTransportError(cause); - } - } else { - // Hold the first failure for later dispatch if connect succeeds. - // This will then trigger disconnect using the first error reported. - if (pendingFailure != null) { - LOG.trace("Holding error until connect succeeds: {}", cause.getMessage()); - pendingFailure = cause; - } - - if (!handshakeFuture.isDone()) { - handshakeFuture.setFailure(cause); - } - } + super.channelActive(context); } @Override @@ -427,16 +131,17 @@ public class NettyWSTransport implements NettyTransport { Channel ch = ctx.channel(); if (!handshaker.isHandshakeComplete()) { handshaker.finishHandshake(ch, (FullHttpResponse) message); - LOG.info("WebSocket Client connected! {}", ctx.channel()); - handshakeFuture.setSuccess(); + LOG.trace("WebSocket Client connected! {}", ctx.channel()); + // Now trigger super processing as we are really connected. + NettyWSTransport.super.handleConnected(ch); return; } // We shouldn't get this since we handle the handshake previously. if (message instanceof FullHttpResponse) { FullHttpResponse response = (FullHttpResponse) message; - throw new IllegalStateException("Unexpected FullHttpResponse (getStatus=" + response.getStatus() + - ", content=" + response.content().toString(CharsetUtil.UTF_8) + ')'); + throw new IllegalStateException( + "Unexpected FullHttpResponse (getStatus=" + response.status() + ", content=" + response.content().toString(StandardCharsets.UTF_8) + ')'); } WebSocketFrame frame = (WebSocketFrame) message; @@ -446,10 +151,11 @@ public class NettyWSTransport implements NettyTransport { ctx.fireExceptionCaught(new IOException("Received invalid frame over WebSocket.")); } else if (frame instanceof BinaryWebSocketFrame) { BinaryWebSocketFrame binaryFrame = (BinaryWebSocketFrame) frame; - LOG.info("WebSocket Client received data: {} bytes", binaryFrame.content().readableBytes()); + LOG.trace("WebSocket Client received data: {} bytes", binaryFrame.content().readableBytes()); listener.onData(binaryFrame.content()); - } else if (frame instanceof PongWebSocketFrame) { - LOG.trace("WebSocket Client received pong"); + } else if (frame instanceof PingWebSocketFrame) { + LOG.trace("WebSocket Client received ping, response with pong"); + ch.write(new PongWebSocketFrame(frame.content())); } else if (frame instanceof CloseWebSocketFrame) { LOG.trace("WebSocket Client received closing"); ch.close(); diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableConnection.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableConnection.java deleted file mode 100644 index 5e2ef15a9c..0000000000 --- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableConnection.java +++ /dev/null @@ -1,202 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.transport.amqp.client.util; - -import java.util.EnumSet; -import java.util.Map; - -import org.apache.qpid.proton.amqp.Symbol; -import org.apache.qpid.proton.amqp.transport.ErrorCondition; -import org.apache.qpid.proton.engine.Collector; -import org.apache.qpid.proton.engine.Connection; -import org.apache.qpid.proton.engine.Delivery; -import org.apache.qpid.proton.engine.EndpointState; -import org.apache.qpid.proton.engine.Link; -import org.apache.qpid.proton.engine.Record; -import org.apache.qpid.proton.engine.Session; -import org.apache.qpid.proton.engine.Transport; -import org.apache.qpid.proton.reactor.Reactor; - -/** - * Unmodifiable Connection wrapper used to prevent test code from accidentally - * modifying Connection state. - */ -public class UnmodifiableConnection implements Connection { - - private final Connection connection; - - public UnmodifiableConnection(Connection connection) { - this.connection = connection; - } - - @Override - public EndpointState getLocalState() { - return connection.getLocalState(); - } - - @Override - public EndpointState getRemoteState() { - return connection.getRemoteState(); - } - - @Override - public ErrorCondition getCondition() { - return connection.getCondition(); - } - - @Override - public void setCondition(ErrorCondition condition) { - throw new UnsupportedOperationException("Cannot alter the Connection"); - } - - @Override - public ErrorCondition getRemoteCondition() { - return connection.getRemoteCondition(); - } - - @Override - public void free() { - throw new UnsupportedOperationException("Cannot alter the Connection"); - } - - @Override - public void open() { - throw new UnsupportedOperationException("Cannot alter the Connection"); - } - - @Override - public void close() { - throw new UnsupportedOperationException("Cannot alter the Connection"); - } - - @Override - public Session session() { - throw new UnsupportedOperationException("Cannot alter the Connection"); - } - - @Override - public Session sessionHead(EnumSet local, EnumSet remote) { - Session head = connection.sessionHead(local, remote); - if (head != null) { - head = new UnmodifiableSession(head); - } - - return head; - } - - @Override - public Link linkHead(EnumSet local, EnumSet remote) { - // TODO - If implemented this method should return an unmodifiable link instance. - return null; - } - - @Override - public Delivery getWorkHead() { - // TODO - If implemented this method should return an unmodifiable delivery instance. - return null; - } - - @Override - public void setContainer(String container) { - throw new UnsupportedOperationException("Cannot alter the Connection"); - } - - @Override - public void setHostname(String hostname) { - throw new UnsupportedOperationException("Cannot alter the Connection"); - } - - @Override - public String getHostname() { - return connection.getHostname(); - } - - @Override - public String getRemoteContainer() { - return connection.getRemoteContainer(); - } - - @Override - public String getRemoteHostname() { - return connection.getRemoteHostname(); - } - - @Override - public void setOfferedCapabilities(Symbol[] capabilities) { - throw new UnsupportedOperationException("Cannot alter the Connection"); - } - - @Override - public void setDesiredCapabilities(Symbol[] capabilities) { - throw new UnsupportedOperationException("Cannot alter the Connection"); - } - - @Override - public Symbol[] getRemoteOfferedCapabilities() { - return connection.getRemoteOfferedCapabilities(); - } - - @Override - public Symbol[] getRemoteDesiredCapabilities() { - return connection.getRemoteDesiredCapabilities(); - } - - @Override - public Map getRemoteProperties() { - return connection.getRemoteProperties(); - } - - @Override - public void setProperties(Map properties) { - throw new UnsupportedOperationException("Cannot alter the Connection"); - } - - @Override - public Object getContext() { - return connection.getContext(); - } - - @Override - public void setContext(Object context) { - throw new UnsupportedOperationException("Cannot alter the Connection"); - } - - @Override - public void collect(Collector collector) { - throw new UnsupportedOperationException("Cannot alter the Connection"); - } - - @Override - public String getContainer() { - return connection.getContainer(); - } - - @Override - public Transport getTransport() { - return new UnmodifiableTransport(connection.getTransport()); - } - - @Override - public Record attachments() { - return connection.attachments(); - } - - @Override - public Reactor getReactor() { - return connection.getReactor(); - } -} diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableDelivery.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableDelivery.java deleted file mode 100644 index 5545884245..0000000000 --- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableDelivery.java +++ /dev/null @@ -1,179 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.transport.amqp.client.util; - -import org.apache.qpid.proton.amqp.transport.DeliveryState; -import org.apache.qpid.proton.engine.Delivery; -import org.apache.qpid.proton.engine.Link; -import org.apache.qpid.proton.engine.Receiver; -import org.apache.qpid.proton.engine.Record; -import org.apache.qpid.proton.engine.Sender; - -/** - * Unmodifiable Delivery wrapper used to prevent test code from accidentally - * modifying Delivery state. - */ -public class UnmodifiableDelivery implements Delivery { - - private final Delivery delivery; - - public UnmodifiableDelivery(Delivery delivery) { - this.delivery = delivery; - } - - @Override - public byte[] getTag() { - return delivery.getTag(); - } - - @Override - public Link getLink() { - if (delivery.getLink() instanceof Sender) { - return new UnmodifiableSender((Sender) delivery.getLink()); - } else if (delivery.getLink() instanceof Receiver) { - return new UnmodifiableReceiver((Receiver) delivery.getLink()); - } else { - throw new IllegalStateException("Delivery has unknown link type"); - } - } - - /* waiting Pull Request sent - @Override - public int getDataLength() { - return delivery.getDataLength(); - } */ - - @Override - public int available() { - return delivery.available(); - } - - @Override - public DeliveryState getLocalState() { - return delivery.getLocalState(); - } - - @Override - public DeliveryState getRemoteState() { - return delivery.getRemoteState(); - } - - @Override - public int getMessageFormat() { - return delivery.getMessageFormat(); - } - - @Override - public void disposition(DeliveryState state) { - throw new UnsupportedOperationException("Cannot alter the Delivery state"); - } - - @Override - public void settle() { - throw new UnsupportedOperationException("Cannot alter the Delivery state"); - } - - @Override - public boolean isSettled() { - return delivery.isSettled(); - } - - @Override - public boolean remotelySettled() { - return delivery.remotelySettled(); - } - - @Override - public void free() { - throw new UnsupportedOperationException("Cannot alter the Delivery state"); - } - - @Override - public Delivery getWorkNext() { - return new UnmodifiableDelivery(delivery.getWorkNext()); - } - - @Override - public Delivery next() { - return new UnmodifiableDelivery(delivery.next()); - } - - @Override - public boolean isWritable() { - return delivery.isWritable(); - } - - @Override - public boolean isReadable() { - return delivery.isReadable(); - } - - @Override - public void setContext(Object o) { - throw new UnsupportedOperationException("Cannot alter the Delivery state"); - } - - @Override - public Object getContext() { - return delivery.getContext(); - } - - @Override - public boolean isUpdated() { - return delivery.isUpdated(); - } - - @Override - public void clear() { - throw new UnsupportedOperationException("Cannot alter the Delivery state"); - } - - @Override - public boolean isPartial() { - return delivery.isPartial(); - } - - @Override - public int pending() { - return delivery.pending(); - } - - @Override - public boolean isBuffered() { - return delivery.isBuffered(); - } - - @Override - public Record attachments() { - return delivery.attachments(); - } - - @Override - public DeliveryState getDefaultDeliveryState() { - return delivery.getDefaultDeliveryState(); - } - - @Override - public void setDefaultDeliveryState(DeliveryState state) { - throw new UnsupportedOperationException("Cannot alter the Delivery"); - } - - @Override - public void setMessageFormat(int messageFormat) { - throw new UnsupportedOperationException("Cannot alter the Delivery"); - } -} diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableLink.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableLink.java deleted file mode 100644 index 7e4319d91b..0000000000 --- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableLink.java +++ /dev/null @@ -1,306 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.transport.amqp.client.util; - -import java.util.EnumSet; -import java.util.Map; - -import org.apache.qpid.proton.amqp.Symbol; -import org.apache.qpid.proton.amqp.transport.ErrorCondition; -import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode; -import org.apache.qpid.proton.amqp.transport.SenderSettleMode; -import org.apache.qpid.proton.amqp.transport.Source; -import org.apache.qpid.proton.amqp.transport.Target; -import org.apache.qpid.proton.engine.Delivery; -import org.apache.qpid.proton.engine.EndpointState; -import org.apache.qpid.proton.engine.Link; -import org.apache.qpid.proton.engine.Receiver; -import org.apache.qpid.proton.engine.Record; -import org.apache.qpid.proton.engine.Sender; -import org.apache.qpid.proton.engine.Session; - -/** - * Unmodifiable Session wrapper used to prevent test code from accidentally - * modifying Session state. - */ -public class UnmodifiableLink implements Link { - - private final Link link; - - public UnmodifiableLink(Link link) { - this.link = link; - } - - @Override - public EndpointState getLocalState() { - return link.getLocalState(); - } - - @Override - public EndpointState getRemoteState() { - return link.getRemoteState(); - } - - @Override - public ErrorCondition getCondition() { - return link.getCondition(); - } - - @Override - public void setCondition(ErrorCondition condition) { - throw new UnsupportedOperationException("Cannot alter the Link state"); - } - - @Override - public ErrorCondition getRemoteCondition() { - return link.getRemoteCondition(); - } - - @Override - public void free() { - throw new UnsupportedOperationException("Cannot alter the Link state"); - } - - @Override - public void open() { - throw new UnsupportedOperationException("Cannot alter the Link state"); - } - - @Override - public void close() { - throw new UnsupportedOperationException("Cannot alter the Link state"); - } - - @Override - public void setContext(Object o) { - throw new UnsupportedOperationException("Cannot alter the Link state"); - } - - @Override - public Object getContext() { - return link.getContext(); - } - - @Override - public String getName() { - return link.getName(); - } - - @Override - public Delivery delivery(byte[] tag) { - throw new UnsupportedOperationException("Cannot alter the Link state"); - } - - @Override - public Delivery delivery(byte[] tag, int offset, int length) { - throw new UnsupportedOperationException("Cannot alter the Link state"); - } - - @Override - public Delivery head() { - return new UnmodifiableDelivery(link.head()); - } - - @Override - public Delivery current() { - return new UnmodifiableDelivery(link.current()); - } - - @Override - public boolean advance() { - throw new UnsupportedOperationException("Cannot alter the Link state"); - } - - @Override - public Source getSource() { - // TODO Figure out a simple way to wrap the odd Source types in Proton-J - return link.getSource(); - } - - @Override - public Target getTarget() { - // TODO Figure out a simple way to wrap the odd Source types in Proton-J - return link.getTarget(); - } - - @Override - public void setSource(Source address) { - throw new UnsupportedOperationException("Cannot alter the Link state"); - } - - @Override - public void setTarget(Target address) { - throw new UnsupportedOperationException("Cannot alter the Link state"); - } - - @Override - public Source getRemoteSource() { - // TODO Figure out a simple way to wrap the odd Source types in Proton-J - return link.getRemoteSource(); - } - - @Override - public Target getRemoteTarget() { - // TODO Figure out a simple way to wrap the odd Target types in Proton-J - return link.getRemoteTarget(); - } - - @Override - public Link next(EnumSet local, EnumSet remote) { - Link next = link.next(local, remote); - - if (next != null) { - if (next instanceof Sender) { - next = new UnmodifiableSender((Sender) next); - } else { - next = new UnmodifiableReceiver((Receiver) next); - } - } - - return next; - } - - @Override - public int getCredit() { - return link.getCredit(); - } - - @Override - public int getQueued() { - return link.getQueued(); - } - - @Override - public int getUnsettled() { - return link.getUnsettled(); - } - - @Override - public Session getSession() { - return new UnmodifiableSession(link.getSession()); - } - - @Override - public SenderSettleMode getSenderSettleMode() { - return link.getSenderSettleMode(); - } - - @Override - public void setSenderSettleMode(SenderSettleMode senderSettleMode) { - throw new UnsupportedOperationException("Cannot alter the Link state"); - } - - @Override - public SenderSettleMode getRemoteSenderSettleMode() { - return link.getRemoteSenderSettleMode(); - } - - @Override - public ReceiverSettleMode getReceiverSettleMode() { - return link.getReceiverSettleMode(); - } - - @Override - public void setReceiverSettleMode(ReceiverSettleMode receiverSettleMode) { - throw new UnsupportedOperationException("Cannot alter the Link state"); - } - - @Override - public ReceiverSettleMode getRemoteReceiverSettleMode() { - return link.getRemoteReceiverSettleMode(); - } - - @Override - public void setRemoteSenderSettleMode(SenderSettleMode remoteSenderSettleMode) { - throw new UnsupportedOperationException("Cannot alter the Link state"); - } - - @Override - public int drained() { - return link.drained(); // TODO - Is this a mutating call? - } - - @Override - public int getRemoteCredit() { - return link.getRemoteCredit(); - } - - @Override - public boolean getDrain() { - return link.getDrain(); - } - - @Override - public void detach() { - throw new UnsupportedOperationException("Cannot alter the Link state"); - } - - @Override - public boolean detached() { - return link.detached(); - } - - @Override - public Record attachments() { - return link.attachments(); - } - - @Override - public Map getProperties() { - return link.getProperties(); - } - - @Override - public void setProperties(Map properties) { - throw new UnsupportedOperationException("Cannot alter the Link state"); - } - - @Override - public Map getRemoteProperties() { - return link.getRemoteProperties(); - } - - @Override - public Symbol[] getDesiredCapabilities() { - return link.getDesiredCapabilities(); - } - - @Override - public Symbol[] getOfferedCapabilities() { - return link.getOfferedCapabilities(); - } - - @Override - public Symbol[] getRemoteDesiredCapabilities() { - return link.getRemoteDesiredCapabilities(); - } - - @Override - public Symbol[] getRemoteOfferedCapabilities() { - return link.getRemoteOfferedCapabilities(); - } - - @Override - public void setDesiredCapabilities(Symbol[] capabilities) { - throw new UnsupportedOperationException("Cannot alter the Link state"); - } - - @Override - public void setOfferedCapabilities(Symbol[] capabilities) { - throw new UnsupportedOperationException("Cannot alter the Link state"); - } -} diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableProxy.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableProxy.java new file mode 100644 index 0000000000..25e6e6cac7 --- /dev/null +++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableProxy.java @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp.client.util; + +import java.lang.reflect.InvocationHandler; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Map; + +import org.apache.qpid.proton.engine.Connection; +import org.apache.qpid.proton.engine.Delivery; +import org.apache.qpid.proton.engine.Link; +import org.apache.qpid.proton.engine.Receiver; +import org.apache.qpid.proton.engine.Sasl; +import org.apache.qpid.proton.engine.Sender; +import org.apache.qpid.proton.engine.Session; +import org.apache.qpid.proton.engine.Transport; + +/** + * Utility that creates proxy objects for the Proton objects which won't allow any mutating + * operations to be applied so that the test code does not interact with the proton engine + * outside the client serialization thread. + */ +public final class UnmodifiableProxy { + + private static ArrayList blacklist = new ArrayList<>(); + + // These methods are mutating but don't take an arguments so they + // aren't automatically filtered out. We will have to keep an eye + // on proton API in the future and modify this list as it evolves. + static { + blacklist.add("close"); + blacklist.add("free"); + blacklist.add("open"); + blacklist.add("sasl"); + blacklist.add("session"); + blacklist.add("close_head"); + blacklist.add("close_tail"); + blacklist.add("outputConsumed"); + blacklist.add("process"); + blacklist.add("processInput"); + blacklist.add("unbind"); + blacklist.add("settle"); + blacklist.add("clear"); + blacklist.add("detach"); + blacklist.add("abort"); + } + + private UnmodifiableProxy() { + } + + public static Transport transportProxy(final Transport target) { + Transport wrap = wrap(Transport.class, target); + return wrap; + } + + public static Sasl saslProxy(final Sasl target) { + return wrap(Sasl.class, target); + } + + public static Connection connectionProxy(final Connection target) { + return wrap(Connection.class, target); + } + + public static Session sessionProxy(final Session target) { + return wrap(Session.class, target); + } + + public static Delivery deliveryProxy(final Delivery target) { + return wrap(Delivery.class, target); + } + + public static Link linkProxy(final Link target) { + return wrap(Link.class, target); + } + + public static Receiver receiverProxy(final Receiver target) { + return wrap(Receiver.class, target); + } + + public static Sender senderProxy(final Sender target) { + return wrap(Sender.class, target); + } + + private static boolean isProtonType(Class clazz) { + String packageName = clazz.getPackage().getName(); + + if (packageName.startsWith("org.apache.qpid.proton.")) { + return true; + } + + return false; + } + + private static T wrap(Class type, final Object target) { + return type.cast(java.lang.reflect.Proxy.newProxyInstance(type.getClassLoader(), new Class[] {type}, new InvocationHandler() { + @Override + public Object invoke(Object o, Method method, Object[] objects) throws Throwable { + if ("toString".equals(method.getName()) && method.getParameterTypes().length == 0) { + return "Unmodifiable proxy -> (" + method.invoke(target, objects) + ")"; + } + + // Don't let methods that mutate be invoked. + if (method.getParameterTypes().length > 0) { + throw new UnsupportedOperationException("Cannot mutate outside the Client work thread"); + } + + if (blacklist.contains(method.getName())) { + throw new UnsupportedOperationException("Cannot mutate outside the Client work thread"); + } + + Class returnType = method.getReturnType(); + + try { + Object result = method.invoke(target, objects); + if (result == null) { + return null; + } + + if (returnType.isPrimitive() || returnType.isArray() || Object.class.equals(returnType)) { + // Skip any other checks + } else if (returnType.isAssignableFrom(ByteBuffer.class)) { + // Buffers are modifiable but we can just return null to indicate + // there's nothing there to access. + result = null; + } else if (returnType.isAssignableFrom(Map.class)) { + // Prevent return of modifiable maps + result = Collections.unmodifiableMap((Map) result); + } else if (isProtonType(returnType) && returnType.isInterface()) { + + // Can't handle the crazy Source / Target types yet as there's two + // different types for Source and Target the result can't be cast to + // the one people actually want to use. + if (!returnType.getName().equals("org.apache.qpid.proton.amqp.transport.Source") + && !returnType.getName().equals("org.apache.qpid.proton.amqp.messaging.Source") + && !returnType.getName().equals("org.apache.qpid.proton.amqp.transport.Target") + && !returnType.getName().equals("org.apache.qpid.proton.amqp.messaging.Target")) { + + result = wrap(returnType, result); + } + } + + return result; + } catch (InvocationTargetException e) { + throw e.getTargetException(); + } + } + })); + } +} diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableReceiver.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableReceiver.java deleted file mode 100644 index f447d87d28..0000000000 --- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableReceiver.java +++ /dev/null @@ -1,65 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.transport.amqp.client.util; - -import org.apache.qpid.proton.codec.WritableBuffer; -import org.apache.qpid.proton.engine.Receiver; - -/** - * Unmodifiable Receiver wrapper used to prevent test code from accidentally - * modifying Receiver state. - */ -public class UnmodifiableReceiver extends UnmodifiableLink implements Receiver { - - private final Receiver receiver; - - public UnmodifiableReceiver(Receiver receiver) { - super(receiver); - - this.receiver = receiver; - } - - @Override - public void flow(int credits) { - throw new UnsupportedOperationException("Cannot alter the Link state"); - } - - @Override - public int recv(byte[] bytes, int offset, int size) { - throw new UnsupportedOperationException("Cannot alter the Link state"); - } - - @Override - public int recv(WritableBuffer buffer) { - throw new UnsupportedOperationException("Cannot alter the Link state"); - } - - @Override - public void drain(int credit) { - throw new UnsupportedOperationException("Cannot alter the Link state"); - } - - @Override - public boolean draining() { - return receiver.draining(); - } - - @Override - public void setDrain(boolean drain) { - throw new UnsupportedOperationException("Cannot alter the Link state"); - } -} diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableSender.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableSender.java deleted file mode 100644 index 3c67f682b1..0000000000 --- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableSender.java +++ /dev/null @@ -1,51 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.transport.amqp.client.util; - -import org.apache.qpid.proton.codec.ReadableBuffer; -import org.apache.qpid.proton.engine.Sender; - -/** - * Unmodifiable Sender wrapper used to prevent test code from accidentally - * modifying Sender state. - */ -public class UnmodifiableSender extends UnmodifiableLink implements Sender { - - public UnmodifiableSender(Sender sender) { - super(sender); - } - - @Override - public void offer(int credits) { - throw new UnsupportedOperationException("Cannot alter the Link state"); - } - - @Override - public int send(byte[] bytes, int offset, int length) { - throw new UnsupportedOperationException("Cannot alter the Link state"); - } - - @Override - public int send(ReadableBuffer buffer) { - throw new UnsupportedOperationException("Cannot alter the Link state"); - } - - @Override - public void abort() { - throw new UnsupportedOperationException("Cannot alter the Link state"); - } -} diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableSession.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableSession.java deleted file mode 100644 index 3fc26cbd74..0000000000 --- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableSession.java +++ /dev/null @@ -1,198 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.transport.amqp.client.util; - -import java.util.EnumSet; -import java.util.Map; - -import org.apache.qpid.proton.amqp.Symbol; -import org.apache.qpid.proton.amqp.transport.ErrorCondition; -import org.apache.qpid.proton.engine.Connection; -import org.apache.qpid.proton.engine.EndpointState; -import org.apache.qpid.proton.engine.Receiver; -import org.apache.qpid.proton.engine.Record; -import org.apache.qpid.proton.engine.Sender; -import org.apache.qpid.proton.engine.Session; - -/** - * Unmodifiable Session wrapper used to prevent test code from accidentally - * modifying Session state. - */ -public class UnmodifiableSession implements Session { - - private final Session session; - - public UnmodifiableSession(Session session) { - this.session = session; - } - - @Override - public EndpointState getLocalState() { - return session.getLocalState(); - } - - @Override - public EndpointState getRemoteState() { - return session.getRemoteState(); - } - - @Override - public ErrorCondition getCondition() { - return session.getCondition(); - } - - @Override - public void setCondition(ErrorCondition condition) { - throw new UnsupportedOperationException("Cannot alter the Session"); - } - - @Override - public ErrorCondition getRemoteCondition() { - return session.getRemoteCondition(); - } - - @Override - public void free() { - throw new UnsupportedOperationException("Cannot alter the Session"); - } - - @Override - public void open() { - throw new UnsupportedOperationException("Cannot alter the Session"); - } - - @Override - public void close() { - throw new UnsupportedOperationException("Cannot alter the Session"); - } - - @Override - public void setContext(Object o) { - throw new UnsupportedOperationException("Cannot alter the Session"); - } - - @Override - public Object getContext() { - return session.getContext(); - } - - @Override - public Sender sender(String name) { - throw new UnsupportedOperationException("Cannot alter the Session"); - } - - @Override - public Receiver receiver(String name) { - throw new UnsupportedOperationException("Cannot alter the Session"); - } - - @Override - public Session next(EnumSet local, EnumSet remote) { - Session next = session.next(local, remote); - if (next != null) { - next = new UnmodifiableSession(next); - } - - return next; - } - - @Override - public Connection getConnection() { - return new UnmodifiableConnection(session.getConnection()); - } - - @Override - public int getIncomingCapacity() { - return session.getIncomingCapacity(); - } - - @Override - public void setIncomingCapacity(int bytes) { - throw new UnsupportedOperationException("Cannot alter the Session"); - } - - @Override - public int getIncomingBytes() { - return session.getIncomingBytes(); - } - - @Override - public int getOutgoingBytes() { - return session.getOutgoingBytes(); - } - - @Override - public Record attachments() { - return session.attachments(); - } - - @Override - public long getOutgoingWindow() { - return session.getOutgoingWindow(); - } - - @Override - public void setOutgoingWindow(long outgoingWindowSize) { - throw new UnsupportedOperationException("Cannot alter the Session"); - } - - - @Override - public Symbol[] getDesiredCapabilities() { - return session.getDesiredCapabilities(); - } - - @Override - public Symbol[] getOfferedCapabilities() { - return session.getOfferedCapabilities(); - } - - @Override - public Map getProperties() { - return session.getProperties(); - } - - @Override - public Symbol[] getRemoteDesiredCapabilities() { - return session.getRemoteDesiredCapabilities(); - } - - @Override - public Symbol[] getRemoteOfferedCapabilities() { - return session.getRemoteOfferedCapabilities(); - } - - @Override - public Map getRemoteProperties() { - return session.getRemoteProperties(); - } - - @Override - public void setDesiredCapabilities(Symbol[] capabilities) { - throw new UnsupportedOperationException("Cannot alter the Link state"); - } - - @Override - public void setOfferedCapabilities(Symbol[] capabilities) { - throw new UnsupportedOperationException("Cannot alter the Link state"); - } - - @Override - public void setProperties(Map capabilities) { - throw new UnsupportedOperationException("Cannot alter the Link state"); - } -} diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableTransport.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableTransport.java deleted file mode 100644 index 5e305f4d11..0000000000 --- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableTransport.java +++ /dev/null @@ -1,274 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.transport.amqp.client.util; - -import java.nio.ByteBuffer; - -import org.apache.qpid.proton.amqp.transport.ErrorCondition; -import org.apache.qpid.proton.engine.Connection; -import org.apache.qpid.proton.engine.EndpointState; -import org.apache.qpid.proton.engine.Record; -import org.apache.qpid.proton.engine.Sasl; -import org.apache.qpid.proton.engine.Ssl; -import org.apache.qpid.proton.engine.SslDomain; -import org.apache.qpid.proton.engine.SslPeerDetails; -import org.apache.qpid.proton.engine.Transport; -import org.apache.qpid.proton.engine.TransportException; -import org.apache.qpid.proton.engine.TransportResult; - -/** - * Unmodifiable Transport wrapper used to prevent test code from accidentally - * modifying Transport state. - */ -public class UnmodifiableTransport implements Transport { - - private final Transport transport; - - public UnmodifiableTransport(Transport transport) { - this.transport = transport; - } - - @Override - public void close() { - throw new UnsupportedOperationException("Cannot alter the Transport"); - } - - @Override - public void free() { - throw new UnsupportedOperationException("Cannot alter the Transport"); - } - - @Override - public Object getContext() { - return null; - } - - @Override - public EndpointState getLocalState() { - return transport.getLocalState(); - } - - @Override - public ErrorCondition getRemoteCondition() { - return transport.getRemoteCondition(); - } - - @Override - public EndpointState getRemoteState() { - return transport.getRemoteState(); - } - - @Override - public void open() { - throw new UnsupportedOperationException("Cannot alter the Transport"); - } - - @Override - public void setCondition(ErrorCondition arg0) { - throw new UnsupportedOperationException("Cannot alter the Transport"); - } - - @Override - public void setContext(Object arg0) { - throw new UnsupportedOperationException("Cannot alter the Transport"); - } - - @Override - public void bind(Connection arg0) { - throw new UnsupportedOperationException("Cannot alter the Transport"); - } - - @Override - public int capacity() { - return transport.capacity(); - } - - @Override - public void close_head() { - throw new UnsupportedOperationException("Cannot alter the Transport"); - } - - @Override - public void close_tail() { - throw new UnsupportedOperationException("Cannot alter the Transport"); - } - - @Override - public int getChannelMax() { - return transport.getChannelMax(); - } - - @Override - public ErrorCondition getCondition() { - return transport.getCondition(); - } - - @Override - public int getIdleTimeout() { - return transport.getIdleTimeout(); - } - - @Override - public ByteBuffer getInputBuffer() { - return null; - } - - @Override - public int getMaxFrameSize() { - return transport.getMaxFrameSize(); - } - - @Override - public ByteBuffer getOutputBuffer() { - return null; - } - - @Override - public int getRemoteChannelMax() { - return transport.getRemoteChannelMax(); - } - - @Override - public int getRemoteIdleTimeout() { - return transport.getRemoteIdleTimeout(); - } - - @Override - public int getRemoteMaxFrameSize() { - return transport.getRemoteMaxFrameSize(); - } - - @Override - public ByteBuffer head() { - return null; - } - - @Override - public int input(byte[] arg0, int arg1, int arg2) { - throw new UnsupportedOperationException("Cannot alter the Transport"); - } - - @Override - public boolean isClosed() { - return transport.isClosed(); - } - - @Override - public int output(byte[] arg0, int arg1, int arg2) { - throw new UnsupportedOperationException("Cannot alter the Transport"); - } - - @Override - public void outputConsumed() { - throw new UnsupportedOperationException("Cannot alter the Transport"); - } - - @Override - public int pending() { - return transport.pending(); - } - - @Override - public void pop(int arg0) { - throw new UnsupportedOperationException("Cannot alter the Transport"); - } - - @Override - public void process() throws TransportException { - throw new UnsupportedOperationException("Cannot alter the Transport"); - } - - @Override - public TransportResult processInput() { - throw new UnsupportedOperationException("Cannot alter the Transport"); - } - - @Override - public Sasl sasl() throws IllegalStateException { - throw new UnsupportedOperationException("Cannot alter the Transport"); - } - - @Override - public void setChannelMax(int arg0) { - throw new UnsupportedOperationException("Cannot alter the Transport"); - } - - @Override - public void setIdleTimeout(int arg0) { - throw new UnsupportedOperationException("Cannot alter the Transport"); - } - - @Override - public void setMaxFrameSize(int arg0) { - throw new UnsupportedOperationException("Cannot alter the Transport"); - } - - @Override - public Ssl ssl(SslDomain arg0) { - throw new UnsupportedOperationException("Cannot alter the Transport"); - } - - @Override - public Ssl ssl(SslDomain arg0, SslPeerDetails arg1) { - throw new UnsupportedOperationException("Cannot alter the Transport"); - } - - @Override - public ByteBuffer tail() { - return null; - } - - @Override - public long tick(long arg0) { - throw new UnsupportedOperationException("Cannot alter the Transport"); - } - - @Override - public void trace(int arg0) { - throw new UnsupportedOperationException("Cannot alter the Transport"); - } - - @Override - public void unbind() { - throw new UnsupportedOperationException("Cannot alter the Transport"); - } - - @Override - public Record attachments() { - return transport.attachments(); - } - - @Override - public long getFramesInput() { - return transport.getFramesInput(); - } - - @Override - public long getFramesOutput() { - return transport.getFramesOutput(); - } - - @Override - public void setEmitFlowEventOnSend(boolean emitFlowEventOnSend) { - throw new UnsupportedOperationException("Cannot alter the Transport"); - } - - @Override - public boolean isEmitFlowEventOnSend() { - return transport.isEmitFlowEventOnSend(); - } -}