ARTEMIS-1159 Fixes and Improvements to the AMQP test client.

Port fixes to the AMQP test client recently made in the 5.x version.
Fixes some thread safety issues in the Transport.  Ensures more 
timely shutdown of the Connection executor.  Uses a dynamic Proxy 
to generate Read-Only Proton wrappers instead of the hand crafted 
versions.  Adds additional logging for test data
This commit is contained in:
Timothy Bish 2017-05-11 16:46:22 -04:00
parent a98dccb35d
commit 4ad78c7fd0
21 changed files with 565 additions and 1835 deletions

View File

@ -25,8 +25,8 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@ -39,7 +39,7 @@ import org.apache.activemq.transport.amqp.client.util.AsyncResult;
import org.apache.activemq.transport.amqp.client.util.ClientFuture;
import org.apache.activemq.transport.amqp.client.util.IdGenerator;
import org.apache.activemq.transport.amqp.client.util.NoOpAsyncResult;
import org.apache.activemq.transport.amqp.client.util.UnmodifiableConnection;
import org.apache.activemq.transport.amqp.client.util.UnmodifiableProxy;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.engine.Collector;
import org.apache.qpid.proton.engine.Connection;
@ -74,7 +74,7 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements
public static final long DEFAULT_CLOSE_TIMEOUT = 30000;
public static final long DEFAULT_DRAIN_TIMEOUT = 60000;
private final ScheduledExecutorService serializer;
private ScheduledThreadPoolExecutor serializer;
private final AtomicBoolean closed = new AtomicBoolean();
private final AtomicBoolean connected = new AtomicBoolean();
private final AtomicLong sessionIdGenerator = new AtomicLong();
@ -121,7 +121,7 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements
this.connectionId = CONNECTION_ID_GENERATOR.generateId();
this.remoteURI = transport.getRemoteLocation();
this.serializer = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
this.serializer = new ScheduledThreadPoolExecutor(1, new ThreadFactory() {
@Override
public Thread newThread(Runnable runner) {
@ -132,6 +132,10 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements
}
});
// Ensure timely shutdown
this.serializer.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
this.serializer.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
this.transport.setTransportListener(this);
}
@ -434,7 +438,7 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements
}
public Connection getConnection() {
return new UnmodifiableConnection(getEndpoint());
return UnmodifiableProxy.connectionProxy(getEndpoint());
}
public AmqpConnectionListener getListener() {

View File

@ -20,7 +20,7 @@ import java.util.HashMap;
import java.util.Map;
import java.util.NoSuchElementException;
import org.apache.activemq.transport.amqp.client.util.UnmodifiableDelivery;
import org.apache.activemq.transport.amqp.client.util.UnmodifiableProxy;
import org.apache.qpid.proton.Proton;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.DescribedType;
@ -100,7 +100,7 @@ public class AmqpMessage {
*/
public Delivery getWrappedDelivery() {
if (delivery != null) {
return new UnmodifiableDelivery(delivery);
return UnmodifiableProxy.deliveryProxy(delivery);
}
return null;

View File

@ -35,7 +35,7 @@ import javax.jms.InvalidDestinationException;
import org.apache.activemq.transport.amqp.client.util.AsyncResult;
import org.apache.activemq.transport.amqp.client.util.ClientFuture;
import org.apache.activemq.transport.amqp.client.util.IOExceptionSupport;
import org.apache.activemq.transport.amqp.client.util.UnmodifiableReceiver;
import org.apache.activemq.transport.amqp.client.util.UnmodifiableProxy;
import org.apache.qpid.jms.JmsOperationTimedOutException;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.DescribedType;
@ -677,7 +677,7 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> {
* @return an unmodifiable view of the underlying Receiver instance.
*/
public Receiver getReceiver() {
return new UnmodifiableReceiver(getEndpoint());
return UnmodifiableProxy.receiverProxy(getEndpoint());
}
// ----- Receiver configuration properties --------------------------------//

View File

@ -29,7 +29,7 @@ import javax.jms.InvalidDestinationException;
import org.apache.activemq.transport.amqp.client.util.AsyncResult;
import org.apache.activemq.transport.amqp.client.util.ClientFuture;
import org.apache.activemq.transport.amqp.client.util.UnmodifiableSender;
import org.apache.activemq.transport.amqp.client.util.UnmodifiableProxy;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.Accepted;
@ -231,7 +231,7 @@ public class AmqpSender extends AmqpAbstractResource<Sender> {
* @return an unmodifiable view of the underlying Sender instance.
*/
public Sender getSender() {
return new UnmodifiableSender(getEndpoint());
return UnmodifiableProxy.senderProxy(getEndpoint());
}
/**

View File

@ -24,7 +24,7 @@ import java.util.concurrent.atomic.AtomicLong;
import org.apache.activemq.transport.amqp.client.util.AsyncResult;
import org.apache.activemq.transport.amqp.client.util.ClientFuture;
import org.apache.activemq.transport.amqp.client.util.UnmodifiableSession;
import org.apache.activemq.transport.amqp.client.util.UnmodifiableProxy;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.Source;
import org.apache.qpid.proton.amqp.messaging.Target;
@ -598,7 +598,7 @@ public class AmqpSession extends AmqpAbstractResource<Session> {
}
public Session getSession() {
return new UnmodifiableSession(getEndpoint());
return UnmodifiableProxy.sessionProxy(getEndpoint());
}
public boolean isInTransaction() {

View File

@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@ -6,7 +6,7 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@ -23,25 +23,29 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.transport.amqp.client.util.IOExceptionSupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.FixedRecvByteBufAllocator;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.ssl.SslHandler;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import org.apache.activemq.transport.amqp.client.util.IOExceptionSupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* TCP based transport that uses Netty as the underlying IO layer.
@ -50,28 +54,27 @@ public class NettyTcpTransport implements NettyTransport {
private static final Logger LOG = LoggerFactory.getLogger(NettyTcpTransport.class);
private static final int QUIET_PERIOD = 20;
private static final int SHUTDOWN_TIMEOUT = 100;
protected Bootstrap bootstrap;
protected EventLoopGroup group;
protected Channel channel;
protected NettyTransportListener listener;
protected NettyTransportOptions options;
protected final NettyTransportOptions options;
protected final URI remote;
protected boolean secure;
private final AtomicBoolean connected = new AtomicBoolean();
private final AtomicBoolean closed = new AtomicBoolean();
private final CountDownLatch connectLatch = new CountDownLatch(1);
private IOException failureCause;
private Throwable pendingFailure;
private volatile IOException failureCause;
/**
* Create a new transport instance
*
* @param remoteLocation the URI that defines the remote resource to connect to.
* @param options the transport options used to configure the socket connection.
* @param remoteLocation
* the URI that defines the remote resource to connect to.
* @param options
* the transport options used to configure the socket connection.
*/
public NettyTcpTransport(URI remoteLocation, NettyTransportOptions options) {
this(null, remoteLocation, options);
@ -80,15 +83,25 @@ public class NettyTcpTransport implements NettyTransport {
/**
* Create a new transport instance
*
* @param listener the TransportListener that will receive events from this Transport.
* @param remoteLocation the URI that defines the remote resource to connect to.
* @param options the transport options used to configure the socket connection.
* @param listener
* the TransportListener that will receive events from this Transport.
* @param remoteLocation
* the URI that defines the remote resource to connect to.
* @param options
* the transport options used to configure the socket connection.
*/
public NettyTcpTransport(NettyTransportListener listener, URI remoteLocation, NettyTransportOptions options) {
if (options == null) {
throw new IllegalArgumentException("Transport Options cannot be null");
}
if (remoteLocation == null) {
throw new IllegalArgumentException("Transport remote location cannot be null");
}
this.options = options;
this.listener = listener;
this.remote = remoteLocation;
this.secure = remoteLocation.getScheme().equalsIgnoreCase("ssl");
}
@Override
@ -98,16 +111,27 @@ public class NettyTcpTransport implements NettyTransport {
throw new IllegalStateException("A transport listener must be set before connection attempts.");
}
final SslHandler sslHandler;
if (isSSL()) {
try {
sslHandler = NettyTransportSupport.createSslHandler(getRemoteLocation(), getSslOptions());
} catch (Exception ex) {
// TODO: can we stop it throwing Exception?
throw IOExceptionSupport.create(ex);
}
} else {
sslHandler = null;
}
group = new NioEventLoopGroup(1);
bootstrap = new Bootstrap();
bootstrap.group(group);
bootstrap.channel(NioSocketChannel.class);
bootstrap.handler(new ChannelInitializer<Channel>() {
@Override
public void initChannel(Channel connectedChannel) throws Exception {
configureChannel(connectedChannel);
configureChannel(connectedChannel, sslHandler);
}
});
@ -118,12 +142,8 @@ public class NettyTcpTransport implements NettyTransport {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
handleConnected(future.channel());
} else if (future.isCancelled()) {
connectionFailed(future.channel(), new IOException("Connection attempt was cancelled"));
} else {
connectionFailed(future.channel(), IOExceptionSupport.create(future.cause()));
if (!future.isSuccess()) {
handleException(future.channel(), IOExceptionSupport.create(future.cause()));
}
}
});
@ -143,7 +163,10 @@ public class NettyTcpTransport implements NettyTransport {
channel = null;
}
if (group != null) {
group.shutdownGracefully(QUIET_PERIOD, SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS);
Future<?> fut = group.shutdownGracefully(0, SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS);
if (!fut.awaitUninterruptibly(2 * SHUTDOWN_TIMEOUT)) {
LOG.trace("Channel group shutdown failed to complete in allotted time");
}
group = null;
}
@ -154,8 +177,8 @@ public class NettyTcpTransport implements NettyTransport {
@Override
public void run() {
if (pendingFailure != null) {
channel.pipeline().fireExceptionCaught(pendingFailure);
if (failureCause != null) {
channel.pipeline().fireExceptionCaught(failureCause);
}
}
});
@ -169,18 +192,24 @@ public class NettyTcpTransport implements NettyTransport {
@Override
public boolean isSSL() {
return secure;
return options.isSSL();
}
@Override
public void close() throws IOException {
if (closed.compareAndSet(false, true)) {
connected.set(false);
if (channel != null) {
channel.close().syncUninterruptibly();
}
if (group != null) {
group.shutdownGracefully(QUIET_PERIOD, SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS);
try {
if (channel != null) {
channel.close().syncUninterruptibly();
}
} finally {
if (group != null) {
Future<?> fut = group.shutdownGracefully(0, SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS);
if (!fut.awaitUninterruptibly(2 * SHUTDOWN_TIMEOUT)) {
LOG.trace("Channel group shutdown failed to complete in allotted time");
}
}
}
}
}
@ -216,14 +245,6 @@ public class NettyTcpTransport implements NettyTransport {
@Override
public NettyTransportOptions getTransportOptions() {
if (options == null) {
if (isSSL()) {
options = NettyTransportSslOptions.INSTANCE;
} else {
options = NettyTransportOptions.INSTANCE;
}
}
return options;
}
@ -234,36 +255,106 @@ public class NettyTcpTransport implements NettyTransport {
@Override
public Principal getLocalPrincipal() {
if (!isSSL()) {
throw new UnsupportedOperationException("Not connected to a secure channel");
Principal result = null;
if (isSSL()) {
SslHandler sslHandler = channel.pipeline().get(SslHandler.class);
result = sslHandler.engine().getSession().getLocalPrincipal();
}
SslHandler sslHandler = channel.pipeline().get(SslHandler.class);
return sslHandler.engine().getSession().getLocalPrincipal();
return result;
}
//----- Internal implementation details, can be overridden as needed --//
// ----- Internal implementation details, can be overridden as needed -----//
protected String getRemoteHost() {
return remote.getHost();
}
protected int getRemotePort() {
int port = remote.getPort();
if (port <= 0) {
if (isSSL()) {
port = getSslOptions().getDefaultSslPort();
} else {
port = getTransportOptions().getDefaultTcpPort();
}
if (remote.getPort() != -1) {
return remote.getPort();
} else {
return isSSL() ? getSslOptions().getDefaultSslPort() : getTransportOptions().getDefaultTcpPort();
}
return port;
}
protected void configureNetty(Bootstrap bootstrap, NettyTransportOptions options) {
protected void addAdditionalHandlers(ChannelPipeline pipeline) {
}
protected ChannelInboundHandlerAdapter createChannelHandler() {
return new NettyTcpTransportHandler();
}
// ----- Event Handlers which can be overridden in subclasses -------------//
protected void handleConnected(Channel channel) throws Exception {
LOG.trace("Channel has become active! Channel is {}", channel);
connectionEstablished(channel);
}
protected void handleChannelInactive(Channel channel) throws Exception {
LOG.trace("Channel has gone inactive! Channel is {}", channel);
if (connected.compareAndSet(true, false) && !closed.get()) {
LOG.trace("Firing onTransportClosed listener");
listener.onTransportClosed();
}
}
protected void handleException(Channel channel, Throwable cause) throws Exception {
LOG.trace("Exception on channel! Channel is {}", channel);
if (connected.compareAndSet(true, false) && !closed.get()) {
LOG.trace("Firing onTransportError listener");
if (failureCause != null) {
listener.onTransportError(failureCause);
} else {
listener.onTransportError(cause);
}
} else {
// Hold the first failure for later dispatch if connect succeeds.
// This will then trigger disconnect using the first error reported.
if (failureCause == null) {
LOG.trace("Holding error until connect succeeds: {}", cause.getMessage());
failureCause = IOExceptionSupport.create(cause);
}
connectionFailed(channel, failureCause);
}
}
// ----- State change handlers and checks ---------------------------------//
protected final void checkConnected() throws IOException {
if (!connected.get()) {
throw new IOException("Cannot send to a non-connected transport.");
}
}
/*
* Called when the transport has successfully connected and is ready for use.
*/
private void connectionEstablished(Channel connectedChannel) {
channel = connectedChannel;
connected.set(true);
connectLatch.countDown();
}
/*
* Called when the transport connection failed and an error should be returned.
*/
private void connectionFailed(Channel failedChannel, IOException cause) {
failureCause = cause;
channel = failedChannel;
connected.set(false);
connectLatch.countDown();
}
private NettyTransportSslOptions getSslOptions() {
return (NettyTransportSslOptions) getTransportOptions();
}
private void configureNetty(Bootstrap bootstrap, NettyTransportOptions options) {
bootstrap.option(ChannelOption.TCP_NODELAY, options.isTcpNoDelay());
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, options.getConnectTimeout());
bootstrap.option(ChannelOption.SO_KEEPALIVE, options.isTcpKeepAlive());
@ -283,105 +374,66 @@ public class NettyTcpTransport implements NettyTransport {
}
}
protected void configureChannel(final Channel channel) throws Exception {
private void configureChannel(final Channel channel, final SslHandler sslHandler) throws Exception {
if (isSSL()) {
SslHandler sslHandler = NettyTransportSupport.createSslHandler(getRemoteLocation(), getSslOptions());
sslHandler.handshakeFuture().addListener(new GenericFutureListener<Future<Channel>>() {
@Override
public void operationComplete(Future<Channel> future) throws Exception {
if (future.isSuccess()) {
LOG.trace("SSL Handshake has completed: {}", channel);
connectionEstablished(channel);
} else {
LOG.trace("SSL Handshake has failed: {}", channel);
connectionFailed(channel, IOExceptionSupport.create(future.cause()));
}
}
});
channel.pipeline().addLast(sslHandler);
}
channel.pipeline().addLast(new NettyTcpTransportHandler());
}
protected void handleConnected(final Channel channel) throws Exception {
if (!isSSL()) {
connectionEstablished(channel);
if (getTransportOptions().isTraceBytes()) {
channel.pipeline().addLast("logger", new LoggingHandler(getClass()));
}
addAdditionalHandlers(channel.pipeline());
channel.pipeline().addLast(createChannelHandler());
}
//----- State change handlers and checks ---------------------------------//
// ----- Handle connection events -----------------------------------------//
/**
* Called when the transport has successfully connected and is ready for use.
*/
protected void connectionEstablished(Channel connectedChannel) {
channel = connectedChannel;
connected.set(true);
connectLatch.countDown();
}
protected abstract class NettyDefaultHandler<E> extends SimpleChannelInboundHandler<E> {
/**
* Called when the transport connection failed and an error should be returned.
*
* @param failedChannel The Channel instance that failed.
* @param cause An IOException that describes the cause of the failed connection.
*/
protected void connectionFailed(Channel failedChannel, IOException cause) {
failureCause = IOExceptionSupport.create(cause);
channel = failedChannel;
connected.set(false);
connectLatch.countDown();
}
private NettyTransportSslOptions getSslOptions() {
return (NettyTransportSslOptions) getTransportOptions();
}
private void checkConnected() throws IOException {
if (!connected.get()) {
throw new IOException("Cannot send to a non-connected transport.");
@Override
public void channelRegistered(ChannelHandlerContext context) throws Exception {
channel = context.channel();
}
}
//----- Handle connection events -----------------------------------------//
private class NettyTcpTransportHandler extends SimpleChannelInboundHandler<ByteBuf> {
@Override
public void channelActive(ChannelHandlerContext context) throws Exception {
LOG.trace("Channel has become active! Channel is {}", context.channel());
// In the Secure case we need to let the handshake complete before we
// trigger the connected event.
if (!isSSL()) {
handleConnected(context.channel());
} else {
SslHandler sslHandler = context.pipeline().get(SslHandler.class);
sslHandler.handshakeFuture().addListener(new GenericFutureListener<Future<Channel>>() {
@Override
public void operationComplete(Future<Channel> future) throws Exception {
if (future.isSuccess()) {
LOG.trace("SSL Handshake has completed: {}", channel);
handleConnected(channel);
} else {
LOG.trace("SSL Handshake has failed: {}", channel);
handleException(channel, future.cause());
}
}
});
}
}
@Override
public void channelInactive(ChannelHandlerContext context) throws Exception {
LOG.trace("Channel has gone inactive! Channel is {}", context.channel());
if (connected.compareAndSet(true, false) && !closed.get()) {
LOG.trace("Firing onTransportClosed listener");
listener.onTransportClosed();
}
handleChannelInactive(context.channel());
}
@Override
public void exceptionCaught(ChannelHandlerContext context, Throwable cause) throws Exception {
LOG.trace("Exception on channel! Channel is {}", context.channel());
if (connected.compareAndSet(true, false) && !closed.get()) {
LOG.trace("Firing onTransportError listener");
if (pendingFailure != null) {
listener.onTransportError(pendingFailure);
} else {
listener.onTransportError(cause);
}
} else {
// Hold the first failure for later dispatch if connect succeeds.
// This will then trigger disconnect using the first error reported.
if (pendingFailure != null) {
LOG.trace("Holding error until connect succeeds: {}", cause.getMessage());
pendingFailure = cause;
}
}
handleException(context.channel(), cause);
}
}
// ----- Handle Binary data from connection -------------------------------//
protected class NettyTcpTransportHandler extends NettyDefaultHandler<ByteBuf> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception {

View File

@ -23,7 +23,7 @@ import java.security.Principal;
import io.netty.buffer.ByteBuf;
/**
*
* Base for all Netty based Transports in this client.
*/
public interface NettyTransport {

View File

@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@ -6,7 +6,7 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@ -30,12 +30,16 @@ public final class NettyTransportFactory {
}
/**
* Creates an instance of the given Transport and configures it using the
* properties set on the given remote broker URI.
* Creates an instance of the given Transport and configures it using the properties set on
* the given remote broker URI.
*
* @param remoteURI
* The URI used to connect to a remote Peer.
*
* @param remoteURI The URI used to connect to a remote Peer.
* @return a new Transport instance.
* @throws Exception if an error occurs while creating the Transport instance.
*
* @throws Exception
* if an error occurs while creating the Transport instance.
*/
public static NettyTransport createTransport(URI remoteURI) throws Exception {
Map<String, String> map = PropertyUtil.parseQuery(remoteURI.getQuery());

View File

@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@ -6,7 +6,7 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@ -19,15 +19,16 @@ package org.apache.activemq.transport.amqp.client.transport;
import io.netty.buffer.ByteBuf;
/**
* Listener interface that should be implemented by users of the various
* QpidJMS Transport classes.
* Listener interface that should be implemented by users of the various QpidJMS Transport
* classes.
*/
public interface NettyTransportListener {
/**
* Called when new incoming data has become available.
*
* @param incoming the next incoming packet of data.
* @param incoming
* the next incoming packet of data.
*/
void onData(ByteBuf incoming);
@ -39,7 +40,8 @@ public interface NettyTransportListener {
/**
* Called when an error occurs during normal Transport operations.
*
* @param cause the error that triggered this event.
* @param cause
* the error that triggered this event.
*/
void onTransportError(Throwable cause);

View File

@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@ -6,7 +6,7 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@ -30,6 +30,7 @@ public class NettyTransportOptions implements Cloneable {
public static final int DEFAULT_SO_TIMEOUT = -1;
public static final int DEFAULT_CONNECT_TIMEOUT = 60000;
public static final int DEFAULT_TCP_PORT = 5672;
public static final boolean DEFAULT_TRACE_BYTES = false;
public static final NettyTransportOptions INSTANCE = new NettyTransportOptions();
@ -42,6 +43,7 @@ public class NettyTransportOptions implements Cloneable {
private boolean tcpKeepAlive = DEFAULT_TCP_KEEP_ALIVE;
private boolean tcpNoDelay = DEFAULT_TCP_NO_DELAY;
private int defaultTcpPort = DEFAULT_TCP_PORT;
private boolean traceBytes = DEFAULT_TRACE_BYTES;
/**
* @return the currently set send buffer size in bytes.
@ -51,11 +53,14 @@ public class NettyTransportOptions implements Cloneable {
}
/**
* Sets the send buffer size in bytes, the value must be greater than zero
* or an {@link IllegalArgumentException} will be thrown.
* Sets the send buffer size in bytes, the value must be greater than zero or an
* {@link IllegalArgumentException} will be thrown.
*
* @param sendBufferSize the new send buffer size for the TCP Transport.
* @throws IllegalArgumentException if the value given is not in the valid range.
* @param sendBufferSize
* the new send buffer size for the TCP Transport.
*
* @throws IllegalArgumentException
* if the value given is not in the valid range.
*/
public void setSendBufferSize(int sendBufferSize) {
if (sendBufferSize <= 0) {
@ -73,11 +78,14 @@ public class NettyTransportOptions implements Cloneable {
}
/**
* Sets the receive buffer size in bytes, the value must be greater than zero
* or an {@link IllegalArgumentException} will be thrown.
* Sets the receive buffer size in bytes, the value must be greater than zero or an
* {@link IllegalArgumentException} will be thrown.
*
* @param receiveBufferSize the new receive buffer size for the TCP Transport.
* @throws IllegalArgumentException if the value given is not in the valid range.
* @param receiveBufferSize
* the new receive buffer size for the TCP Transport.
*
* @throws IllegalArgumentException
* if the value given is not in the valid range.
*/
public void setReceiveBufferSize(int receiveBufferSize) {
if (receiveBufferSize <= 0) {
@ -95,11 +103,13 @@ public class NettyTransportOptions implements Cloneable {
}
/**
* Sets the traffic class value used by the TCP connection, valid
* range is between 0 and 255.
* Sets the traffic class value used by the TCP connection, valid range is between 0 and 255.
*
* @param trafficClass the new traffic class value.
* @throws IllegalArgumentException if the value given is not in the valid range.
* @param trafficClass
* the new traffic class value.
*
* @throws IllegalArgumentException
* if the value given is not in the valid range.
*/
public void setTrafficClass(int trafficClass) {
if (trafficClass < 0 || trafficClass > 255) {
@ -157,6 +167,27 @@ public class NettyTransportOptions implements Cloneable {
this.defaultTcpPort = defaultTcpPort;
}
/**
* @return true if the transport should enable byte tracing
*/
public boolean isTraceBytes() {
return traceBytes;
}
/**
* Determines if the transport should add a logger for bytes in / out
*
* @param traceBytes
* should the transport log the bytes in and out.
*/
public void setTraceBytes(boolean traceBytes) {
this.traceBytes = traceBytes;
}
public boolean isSSL() {
return false;
}
@Override
public NettyTransportOptions clone() {
return copyOptions(new NettyTransportOptions());

View File

@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@ -6,7 +6,7 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@ -21,9 +21,8 @@ import java.util.Collections;
import java.util.List;
/**
* Holds the defined SSL options for connections that operate over a secure
* transport. Options are read from the environment and can be overridden by
* specifying them on the connection URI.
* Holds the defined SSL options for connections that operate over a secure transport. Options
* are read from the environment and can be overridden by specifying them on the connection URI.
*/
public class NettyTransportSslOptions extends NettyTransportOptions {
@ -31,7 +30,7 @@ public class NettyTransportSslOptions extends NettyTransportOptions {
public static final String DEFAULT_CONTEXT_PROTOCOL = "TLS";
public static final boolean DEFAULT_TRUST_ALL = false;
public static final boolean DEFAULT_VERIFY_HOST = false;
public static final List<String> DEFAULT_DISABLED_PROTOCOLS = Collections.unmodifiableList(Arrays.asList(new String[]{"SSLv2Hello", "SSLv3"}));
public static final List<String> DEFAULT_DISABLED_PROTOCOLS = Collections.unmodifiableList(Arrays.asList(new String[] {"SSLv2Hello", "SSLv3"}));
public static final int DEFAULT_SSL_PORT = 5671;
public static final NettyTransportSslOptions INSTANCE = new NettyTransportSslOptions();
@ -69,7 +68,8 @@ public class NettyTransportSslOptions extends NettyTransportOptions {
/**
* Sets the location on disk of the key store to use.
*
* @param keyStoreLocation the keyStoreLocation to use to create the key manager.
* @param keyStoreLocation
* the keyStoreLocation to use to create the key manager.
*/
public void setKeyStoreLocation(String keyStoreLocation) {
this.keyStoreLocation = keyStoreLocation;
@ -83,7 +83,8 @@ public class NettyTransportSslOptions extends NettyTransportOptions {
}
/**
* @param keyStorePassword the keyStorePassword to set
* @param keyStorePassword
* the keyStorePassword to set
*/
public void setKeyStorePassword(String keyStorePassword) {
this.keyStorePassword = keyStorePassword;
@ -97,7 +98,8 @@ public class NettyTransportSslOptions extends NettyTransportOptions {
}
/**
* @param trustStoreLocation the trustStoreLocation to set
* @param trustStoreLocation
* the trustStoreLocation to set
*/
public void setTrustStoreLocation(String trustStoreLocation) {
this.trustStoreLocation = trustStoreLocation;
@ -111,7 +113,8 @@ public class NettyTransportSslOptions extends NettyTransportOptions {
}
/**
* @param trustStorePassword the trustStorePassword to set
* @param trustStorePassword
* the trustStorePassword to set
*/
public void setTrustStorePassword(String trustStorePassword) {
this.trustStorePassword = trustStorePassword;
@ -125,7 +128,8 @@ public class NettyTransportSslOptions extends NettyTransportOptions {
}
/**
* @param storeType the format that the store files are encoded in.
* @param storeType
* the format that the store files are encoded in.
*/
public void setStoreType(String storeType) {
this.storeType = storeType;
@ -139,7 +143,8 @@ public class NettyTransportSslOptions extends NettyTransportOptions {
}
/**
* @param enabledCipherSuites the enabledCipherSuites to set
* @param enabledCipherSuites
* the enabledCipherSuites to set
*/
public void setEnabledCipherSuites(String[] enabledCipherSuites) {
this.enabledCipherSuites = enabledCipherSuites;
@ -153,7 +158,8 @@ public class NettyTransportSslOptions extends NettyTransportOptions {
}
/**
* @param disabledCipherSuites the disabledCipherSuites to set
* @param disabledCipherSuites
* the disabledCipherSuites to set
*/
public void setDisabledCipherSuites(String[] disabledCipherSuites) {
this.disabledCipherSuites = disabledCipherSuites;
@ -169,13 +175,15 @@ public class NettyTransportSslOptions extends NettyTransportOptions {
/**
* The protocols to be set as enabled.
*
* @param enabledProtocols the enabled protocols to set, or null if the defaults should be used.
* @param enabledProtocols
* the enabled protocols to set, or null if the defaults should be used.
*/
public void setEnabledProtocols(String[] enabledProtocols) {
this.enabledProtocols = enabledProtocols;
}
/**
*
* @return the protocols to disable or null if none should be
*/
public String[] getDisabledProtocols() {
@ -185,7 +193,8 @@ public class NettyTransportSslOptions extends NettyTransportOptions {
/**
* The protocols to be disable.
*
* @param disabledProtocols the protocols to disable, or null if none should be.
* @param disabledProtocols
* the protocols to disable, or null if none should be.
*/
public void setDisabledProtocols(String[] disabledProtocols) {
this.disabledProtocols = disabledProtocols;
@ -202,7 +211,8 @@ public class NettyTransportSslOptions extends NettyTransportOptions {
* The protocol value to use when creating an SSLContext via
* SSLContext.getInstance(protocol).
*
* @param contextProtocol the context protocol to use.
* @param contextProtocol
* the context protocol to use.
*/
public void setContextProtocol(String contextProtocol) {
this.contextProtocol = contextProtocol;
@ -216,7 +226,8 @@ public class NettyTransportSslOptions extends NettyTransportOptions {
}
/**
* @param trustAll the trustAll to set
* @param trustAll
* the trustAll to set
*/
public void setTrustAll(boolean trustAll) {
this.trustAll = trustAll;
@ -230,7 +241,8 @@ public class NettyTransportSslOptions extends NettyTransportOptions {
}
/**
* @param verifyHost the verifyHost to set
* @param verifyHost
* the verifyHost to set
*/
public void setVerifyHost(boolean verifyHost) {
this.verifyHost = verifyHost;
@ -244,7 +256,8 @@ public class NettyTransportSslOptions extends NettyTransportOptions {
}
/**
* @param keyAlias the key alias to use
* @param keyAlias
* the key alias to use
*/
public void setKeyAlias(String keyAlias) {
this.keyAlias = keyAlias;
@ -258,6 +271,11 @@ public class NettyTransportSslOptions extends NettyTransportOptions {
this.defaultSslPort = defaultSslPort;
}
@Override
public boolean isSSL() {
return true;
}
@Override
public NettyTransportSslOptions clone() {
return copyOptions(new NettyTransportSslOptions());

View File

@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@ -6,7 +6,7 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@ -16,15 +16,6 @@
*/
package org.apache.activemq.transport.amqp.client.transport;
import javax.net.ssl.KeyManager;
import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLParameters;
import javax.net.ssl.TrustManager;
import javax.net.ssl.TrustManagerFactory;
import javax.net.ssl.X509ExtendedKeyManager;
import javax.net.ssl.X509TrustManager;
import java.io.File;
import java.io.FileInputStream;
import java.io.InputStream;
@ -38,10 +29,21 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import io.netty.handler.ssl.SslHandler;
import javax.net.ssl.KeyManager;
import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLParameters;
import javax.net.ssl.TrustManager;
import javax.net.ssl.TrustManagerFactory;
import javax.net.ssl.X509ExtendedKeyManager;
import javax.net.ssl.X509TrustManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.handler.ssl.SslHandler;
/**
* Static class that provides various utility methods used by Transport implementations.
*/
@ -50,13 +52,18 @@ public class NettyTransportSupport {
private static final Logger LOG = LoggerFactory.getLogger(NettyTransportSupport.class);
/**
* Creates a Netty SslHandler instance for use in Transports that require
* an SSL encoder / decoder.
* Creates a Netty SslHandler instance for use in Transports that require an SSL encoder /
* decoder.
*
* @param remote
* The URI of the remote peer that the SslHandler will be used against.
* @param options
* The SSL options object to build the SslHandler instance from.
*
* @param remote The URI of the remote peer that the SslHandler will be used against.
* @param options The SSL options object to build the SslHandler instance from.
* @return a new SslHandler that is configured from the given options.
* @throws Exception if an error occurs while creating the SslHandler instance.
*
* @throws Exception
* if an error occurs while creating the SslHandler instance.
*/
public static SslHandler createSslHandler(URI remote, NettyTransportSslOptions options) throws Exception {
return new SslHandler(createSslEngine(remote, createSslContext(options), options));
@ -66,9 +73,13 @@ public class NettyTransportSupport {
* Create a new SSLContext using the options specific in the given TransportSslOptions
* instance.
*
* @param options the configured options used to create the SSLContext.
* @param options
* the configured options used to create the SSLContext.
*
* @return a new SSLContext instance.
* @throws Exception if an error occurs while creating the context.
*
* @throws Exception
* if an error occurs while creating the context.
*/
public static SSLContext createSslContext(NettyTransportSslOptions options) throws Exception {
try {
@ -91,10 +102,15 @@ public class NettyTransportSupport {
* Create a new SSLEngine instance in client mode from the given SSLContext and
* TransportSslOptions instances.
*
* @param context the SSLContext to use when creating the engine.
* @param options the TransportSslOptions to use to configure the new SSLEngine.
* @param context
* the SSLContext to use when creating the engine.
* @param options
* the TransportSslOptions to use to configure the new SSLEngine.
*
* @return a new SSLEngine instance in client mode.
* @throws Exception if an error occurs while creating the new SSLEngine.
*
* @throws Exception
* if an error occurs while creating the new SSLEngine.
*/
public static SSLEngine createSslEngine(SSLContext context, NettyTransportSslOptions options) throws Exception {
return createSslEngine(null, context, options);
@ -104,15 +120,20 @@ public class NettyTransportSupport {
* Create a new SSLEngine instance in client mode from the given SSLContext and
* TransportSslOptions instances.
*
* @param remote the URI of the remote peer that will be used to initialize the engine, may be null if none should.
* @param context the SSLContext to use when creating the engine.
* @param options the TransportSslOptions to use to configure the new SSLEngine.
* @param remote
* the URI of the remote peer that will be used to initialize the engine, may be null
* if none should.
* @param context
* the SSLContext to use when creating the engine.
* @param options
* the TransportSslOptions to use to configure the new SSLEngine.
*
* @return a new SSLEngine instance in client mode.
* @throws Exception if an error occurs while creating the new SSLEngine.
*
* @throws Exception
* if an error occurs while creating the new SSLEngine.
*/
public static SSLEngine createSslEngine(URI remote,
SSLContext context,
NettyTransportSslOptions options) throws Exception {
public static SSLEngine createSslEngine(URI remote, SSLContext context, NettyTransportSslOptions options) throws Exception {
SSLEngine engine = null;
if (remote == null) {
engine = context.createSSLEngine();
@ -185,7 +206,7 @@ public class NettyTransportSupport {
private static TrustManager[] loadTrustManagers(NettyTransportSslOptions options) throws Exception {
if (options.isTrustAll()) {
return new TrustManager[]{createTrustAllTrustManager()};
return new TrustManager[] {createTrustAllTrustManager()};
}
if (options.getTrustStoreLocation() == null) {

View File

@ -18,73 +18,46 @@ package org.apache.activemq.transport.amqp.client.transport;
import java.io.IOException;
import java.net.URI;
import java.security.Principal;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.nio.charset.StandardCharsets;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPromise;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.FixedRecvByteBufAllocator;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.http.DefaultHttpHeaders;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpClientCodec;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame;
import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
import io.netty.handler.codec.http.websocketx.PingWebSocketFrame;
import io.netty.handler.codec.http.websocketx.PongWebSocketFrame;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketClientHandshaker;
import io.netty.handler.codec.http.websocketx.WebSocketClientHandshakerFactory;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketVersion;
import io.netty.handler.ssl.SslHandler;
import io.netty.util.CharsetUtil;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import org.apache.activemq.transport.amqp.client.util.IOExceptionSupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Transport for communicating over WebSockets
*/
public class NettyWSTransport implements NettyTransport {
public class NettyWSTransport extends NettyTcpTransport {
private static final Logger LOG = LoggerFactory.getLogger(NettyWSTransport.class);
private static final int QUIET_PERIOD = 20;
private static final int SHUTDOWN_TIMEOUT = 100;
protected Bootstrap bootstrap;
protected EventLoopGroup group;
protected Channel channel;
protected NettyTransportListener listener;
protected NettyTransportOptions options;
protected final URI remote;
protected boolean secure;
private final AtomicBoolean connected = new AtomicBoolean();
private final AtomicBoolean closed = new AtomicBoolean();
private ChannelPromise handshakeFuture;
private IOException failureCause;
private Throwable pendingFailure;
private static final String AMQP_SUB_PROTOCOL = "amqp";
/**
* Create a new transport instance
*
* @param remoteLocation the URI that defines the remote resource to connect to.
* @param options the transport options used to configure the socket connection.
* @param remoteLocation
* the URI that defines the remote resource to connect to.
* @param options
* the transport options used to configure the socket connection.
*/
public NettyWSTransport(URI remoteLocation, NettyTransportOptions options) {
this(null, remoteLocation, options);
@ -93,119 +66,15 @@ public class NettyWSTransport implements NettyTransport {
/**
* Create a new transport instance
*
* @param listener the TransportListener that will receive events from this Transport.
* @param remoteLocation the URI that defines the remote resource to connect to.
* @param options the transport options used to configure the socket connection.
* @param listener
* the TransportListener that will receive events from this Transport.
* @param remoteLocation
* the URI that defines the remote resource to connect to.
* @param options
* the transport options used to configure the socket connection.
*/
public NettyWSTransport(NettyTransportListener listener, URI remoteLocation, NettyTransportOptions options) {
this.options = options;
this.listener = listener;
this.remote = remoteLocation;
this.secure = remoteLocation.getScheme().equalsIgnoreCase("wss");
}
@Override
public void connect() throws IOException {
if (listener == null) {
throw new IllegalStateException("A transport listener must be set before connection attempts.");
}
group = new NioEventLoopGroup(1);
bootstrap = new Bootstrap();
bootstrap.group(group);
bootstrap.channel(NioSocketChannel.class);
bootstrap.handler(new ChannelInitializer<Channel>() {
@Override
public void initChannel(Channel connectedChannel) throws Exception {
configureChannel(connectedChannel);
}
});
configureNetty(bootstrap, getTransportOptions());
ChannelFuture future;
try {
future = bootstrap.connect(getRemoteHost(), getRemotePort());
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
handleConnected(future.channel());
} else if (future.isCancelled()) {
connectionFailed(future.channel(), new IOException("Connection attempt was cancelled"));
} else {
connectionFailed(future.channel(), IOExceptionSupport.create(future.cause()));
}
}
});
future.sync();
// Now wait for WS protocol level handshake completion
handshakeFuture.await();
} catch (InterruptedException ex) {
LOG.debug("Transport connection attempt was interrupted.");
Thread.interrupted();
failureCause = IOExceptionSupport.create(ex);
}
if (failureCause != null) {
// Close out any Netty resources now as they are no longer needed.
if (channel != null) {
channel.close().syncUninterruptibly();
channel = null;
}
if (group != null) {
group.shutdownGracefully(QUIET_PERIOD, SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS);
group = null;
}
throw failureCause;
} else {
// Connected, allow any held async error to fire now and close the transport.
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (pendingFailure != null) {
channel.pipeline().fireExceptionCaught(pendingFailure);
}
}
});
}
}
@Override
public boolean isConnected() {
return connected.get();
}
@Override
public boolean isSSL() {
return secure;
}
@Override
public void close() throws IOException {
if (closed.compareAndSet(false, true)) {
connected.set(false);
if (channel != null) {
channel.close().syncUninterruptibly();
}
if (group != null) {
group.shutdownGracefully(QUIET_PERIOD, SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS);
}
}
}
@Override
public ByteBuf allocateSendBuffer(int size) throws IOException {
checkConnected();
return channel.alloc().ioBuffer(size, size);
super(listener, remoteLocation, options);
}
@Override
@ -222,202 +91,37 @@ public class NettyWSTransport implements NettyTransport {
}
@Override
public NettyTransportListener getTransportListener() {
return listener;
protected ChannelInboundHandlerAdapter createChannelHandler() {
return new NettyWebSocketTransportHandler();
}
@Override
public void setTransportListener(NettyTransportListener listener) {
this.listener = listener;
protected void addAdditionalHandlers(ChannelPipeline pipeline) {
pipeline.addLast(new HttpClientCodec());
pipeline.addLast(new HttpObjectAggregator(8192));
}
@Override
public NettyTransportOptions getTransportOptions() {
if (options == null) {
if (isSSL()) {
options = NettyTransportSslOptions.INSTANCE;
} else {
options = NettyTransportOptions.INSTANCE;
}
}
return options;
protected void handleConnected(Channel channel) throws Exception {
LOG.trace("Channel has become active, awaiting WebSocket handshake! Channel is {}", channel);
}
@Override
public URI getRemoteLocation() {
return remote;
}
// ----- Handle connection events -----------------------------------------//
@Override
public Principal getLocalPrincipal() {
if (!isSSL()) {
throw new UnsupportedOperationException("Not connected to a secure channel");
}
SslHandler sslHandler = channel.pipeline().get(SslHandler.class);
return sslHandler.engine().getSession().getLocalPrincipal();
}
//----- Internal implementation details, can be overridden as needed --//
protected String getRemoteHost() {
return remote.getHost();
}
protected int getRemotePort() {
int port = remote.getPort();
if (port <= 0) {
if (isSSL()) {
port = getSslOptions().getDefaultSslPort();
} else {
port = getTransportOptions().getDefaultTcpPort();
}
}
return port;
}
protected void configureNetty(Bootstrap bootstrap, NettyTransportOptions options) {
bootstrap.option(ChannelOption.TCP_NODELAY, options.isTcpNoDelay());
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, options.getConnectTimeout());
bootstrap.option(ChannelOption.SO_KEEPALIVE, options.isTcpKeepAlive());
bootstrap.option(ChannelOption.SO_LINGER, options.getSoLinger());
if (options.getSendBufferSize() != -1) {
bootstrap.option(ChannelOption.SO_SNDBUF, options.getSendBufferSize());
}
if (options.getReceiveBufferSize() != -1) {
bootstrap.option(ChannelOption.SO_RCVBUF, options.getReceiveBufferSize());
bootstrap.option(ChannelOption.RCVBUF_ALLOCATOR, new FixedRecvByteBufAllocator(options.getReceiveBufferSize()));
}
if (options.getTrafficClass() != -1) {
bootstrap.option(ChannelOption.IP_TOS, options.getTrafficClass());
}
}
protected void configureChannel(final Channel channel) throws Exception {
if (isSSL()) {
SslHandler sslHandler = NettyTransportSupport.createSslHandler(getRemoteLocation(), getSslOptions());
sslHandler.handshakeFuture().addListener(new GenericFutureListener<Future<Channel>>() {
@Override
public void operationComplete(Future<Channel> future) throws Exception {
if (future.isSuccess()) {
LOG.trace("SSL Handshake has completed: {}", channel);
connectionEstablished(channel);
} else {
LOG.trace("SSL Handshake has failed: {}", channel);
connectionFailed(channel, IOExceptionSupport.create(future.cause()));
}
}
});
channel.pipeline().addLast(sslHandler);
}
channel.pipeline().addLast(new HttpClientCodec());
channel.pipeline().addLast(new HttpObjectAggregator(8192));
channel.pipeline().addLast(new NettyTcpTransportHandler());
}
protected void handleConnected(final Channel channel) throws Exception {
if (!isSSL()) {
connectionEstablished(channel);
}
}
//----- State change handlers and checks ---------------------------------//
/**
* Called when the transport has successfully connected and is ready for use.
*/
protected void connectionEstablished(Channel connectedChannel) {
LOG.info("WebSocket connectionEstablished! {}", connectedChannel);
channel = connectedChannel;
connected.set(true);
}
/**
* Called when the transport connection failed and an error should be returned.
*
* @param failedChannel The Channel instance that failed.
* @param cause An IOException that describes the cause of the failed connection.
*/
protected void connectionFailed(Channel failedChannel, IOException cause) {
failureCause = IOExceptionSupport.create(cause);
channel = failedChannel;
connected.set(false);
handshakeFuture.setFailure(cause);
}
private NettyTransportSslOptions getSslOptions() {
return (NettyTransportSslOptions) getTransportOptions();
}
private void checkConnected() throws IOException {
if (!connected.get()) {
throw new IOException("Cannot send to a non-connected transport.");
}
}
//----- Handle connection events -----------------------------------------//
private class NettyTcpTransportHandler extends SimpleChannelInboundHandler<Object> {
private class NettyWebSocketTransportHandler extends NettyDefaultHandler<Object> {
private final WebSocketClientHandshaker handshaker;
NettyTcpTransportHandler() {
handshaker = WebSocketClientHandshakerFactory.newHandshaker(remote, WebSocketVersion.V13, "amqp", false, new DefaultHttpHeaders());
}
@Override
public void handlerAdded(ChannelHandlerContext context) {
LOG.trace("Handler has become added! Channel is {}", context.channel());
handshakeFuture = context.newPromise();
NettyWebSocketTransportHandler() {
handshaker = WebSocketClientHandshakerFactory.newHandshaker(getRemoteLocation(), WebSocketVersion.V13, AMQP_SUB_PROTOCOL, true,
new DefaultHttpHeaders());
}
@Override
public void channelActive(ChannelHandlerContext context) throws Exception {
LOG.trace("Channel has become active! Channel is {}", context.channel());
handshaker.handshake(context.channel());
}
@Override
public void channelInactive(ChannelHandlerContext context) throws Exception {
LOG.trace("Channel has gone inactive! Channel is {}", context.channel());
if (connected.compareAndSet(true, false) && !closed.get()) {
LOG.trace("Firing onTransportClosed listener");
listener.onTransportClosed();
}
}
@Override
public void exceptionCaught(ChannelHandlerContext context, Throwable cause) throws Exception {
LOG.trace("Exception on channel! Channel is {} -> {}", context.channel(), cause.getMessage());
LOG.trace("Error Stack: ", cause);
if (connected.compareAndSet(true, false) && !closed.get()) {
LOG.trace("Firing onTransportError listener");
if (pendingFailure != null) {
listener.onTransportError(pendingFailure);
} else {
listener.onTransportError(cause);
}
} else {
// Hold the first failure for later dispatch if connect succeeds.
// This will then trigger disconnect using the first error reported.
if (pendingFailure != null) {
LOG.trace("Holding error until connect succeeds: {}", cause.getMessage());
pendingFailure = cause;
}
if (!handshakeFuture.isDone()) {
handshakeFuture.setFailure(cause);
}
}
super.channelActive(context);
}
@Override
@ -427,16 +131,17 @@ public class NettyWSTransport implements NettyTransport {
Channel ch = ctx.channel();
if (!handshaker.isHandshakeComplete()) {
handshaker.finishHandshake(ch, (FullHttpResponse) message);
LOG.info("WebSocket Client connected! {}", ctx.channel());
handshakeFuture.setSuccess();
LOG.trace("WebSocket Client connected! {}", ctx.channel());
// Now trigger super processing as we are really connected.
NettyWSTransport.super.handleConnected(ch);
return;
}
// We shouldn't get this since we handle the handshake previously.
if (message instanceof FullHttpResponse) {
FullHttpResponse response = (FullHttpResponse) message;
throw new IllegalStateException("Unexpected FullHttpResponse (getStatus=" + response.getStatus() +
", content=" + response.content().toString(CharsetUtil.UTF_8) + ')');
throw new IllegalStateException(
"Unexpected FullHttpResponse (getStatus=" + response.status() + ", content=" + response.content().toString(StandardCharsets.UTF_8) + ')');
}
WebSocketFrame frame = (WebSocketFrame) message;
@ -446,10 +151,11 @@ public class NettyWSTransport implements NettyTransport {
ctx.fireExceptionCaught(new IOException("Received invalid frame over WebSocket."));
} else if (frame instanceof BinaryWebSocketFrame) {
BinaryWebSocketFrame binaryFrame = (BinaryWebSocketFrame) frame;
LOG.info("WebSocket Client received data: {} bytes", binaryFrame.content().readableBytes());
LOG.trace("WebSocket Client received data: {} bytes", binaryFrame.content().readableBytes());
listener.onData(binaryFrame.content());
} else if (frame instanceof PongWebSocketFrame) {
LOG.trace("WebSocket Client received pong");
} else if (frame instanceof PingWebSocketFrame) {
LOG.trace("WebSocket Client received ping, response with pong");
ch.write(new PongWebSocketFrame(frame.content()));
} else if (frame instanceof CloseWebSocketFrame) {
LOG.trace("WebSocket Client received closing");
ch.close();

View File

@ -1,202 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.amqp.client.util;
import java.util.EnumSet;
import java.util.Map;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.engine.Collector;
import org.apache.qpid.proton.engine.Connection;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.EndpointState;
import org.apache.qpid.proton.engine.Link;
import org.apache.qpid.proton.engine.Record;
import org.apache.qpid.proton.engine.Session;
import org.apache.qpid.proton.engine.Transport;
import org.apache.qpid.proton.reactor.Reactor;
/**
* Unmodifiable Connection wrapper used to prevent test code from accidentally
* modifying Connection state.
*/
public class UnmodifiableConnection implements Connection {
private final Connection connection;
public UnmodifiableConnection(Connection connection) {
this.connection = connection;
}
@Override
public EndpointState getLocalState() {
return connection.getLocalState();
}
@Override
public EndpointState getRemoteState() {
return connection.getRemoteState();
}
@Override
public ErrorCondition getCondition() {
return connection.getCondition();
}
@Override
public void setCondition(ErrorCondition condition) {
throw new UnsupportedOperationException("Cannot alter the Connection");
}
@Override
public ErrorCondition getRemoteCondition() {
return connection.getRemoteCondition();
}
@Override
public void free() {
throw new UnsupportedOperationException("Cannot alter the Connection");
}
@Override
public void open() {
throw new UnsupportedOperationException("Cannot alter the Connection");
}
@Override
public void close() {
throw new UnsupportedOperationException("Cannot alter the Connection");
}
@Override
public Session session() {
throw new UnsupportedOperationException("Cannot alter the Connection");
}
@Override
public Session sessionHead(EnumSet<EndpointState> local, EnumSet<EndpointState> remote) {
Session head = connection.sessionHead(local, remote);
if (head != null) {
head = new UnmodifiableSession(head);
}
return head;
}
@Override
public Link linkHead(EnumSet<EndpointState> local, EnumSet<EndpointState> remote) {
// TODO - If implemented this method should return an unmodifiable link instance.
return null;
}
@Override
public Delivery getWorkHead() {
// TODO - If implemented this method should return an unmodifiable delivery instance.
return null;
}
@Override
public void setContainer(String container) {
throw new UnsupportedOperationException("Cannot alter the Connection");
}
@Override
public void setHostname(String hostname) {
throw new UnsupportedOperationException("Cannot alter the Connection");
}
@Override
public String getHostname() {
return connection.getHostname();
}
@Override
public String getRemoteContainer() {
return connection.getRemoteContainer();
}
@Override
public String getRemoteHostname() {
return connection.getRemoteHostname();
}
@Override
public void setOfferedCapabilities(Symbol[] capabilities) {
throw new UnsupportedOperationException("Cannot alter the Connection");
}
@Override
public void setDesiredCapabilities(Symbol[] capabilities) {
throw new UnsupportedOperationException("Cannot alter the Connection");
}
@Override
public Symbol[] getRemoteOfferedCapabilities() {
return connection.getRemoteOfferedCapabilities();
}
@Override
public Symbol[] getRemoteDesiredCapabilities() {
return connection.getRemoteDesiredCapabilities();
}
@Override
public Map<Symbol, Object> getRemoteProperties() {
return connection.getRemoteProperties();
}
@Override
public void setProperties(Map<Symbol, Object> properties) {
throw new UnsupportedOperationException("Cannot alter the Connection");
}
@Override
public Object getContext() {
return connection.getContext();
}
@Override
public void setContext(Object context) {
throw new UnsupportedOperationException("Cannot alter the Connection");
}
@Override
public void collect(Collector collector) {
throw new UnsupportedOperationException("Cannot alter the Connection");
}
@Override
public String getContainer() {
return connection.getContainer();
}
@Override
public Transport getTransport() {
return new UnmodifiableTransport(connection.getTransport());
}
@Override
public Record attachments() {
return connection.attachments();
}
@Override
public Reactor getReactor() {
return connection.getReactor();
}
}

View File

@ -1,179 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.amqp.client.util;
import org.apache.qpid.proton.amqp.transport.DeliveryState;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.Link;
import org.apache.qpid.proton.engine.Receiver;
import org.apache.qpid.proton.engine.Record;
import org.apache.qpid.proton.engine.Sender;
/**
* Unmodifiable Delivery wrapper used to prevent test code from accidentally
* modifying Delivery state.
*/
public class UnmodifiableDelivery implements Delivery {
private final Delivery delivery;
public UnmodifiableDelivery(Delivery delivery) {
this.delivery = delivery;
}
@Override
public byte[] getTag() {
return delivery.getTag();
}
@Override
public Link getLink() {
if (delivery.getLink() instanceof Sender) {
return new UnmodifiableSender((Sender) delivery.getLink());
} else if (delivery.getLink() instanceof Receiver) {
return new UnmodifiableReceiver((Receiver) delivery.getLink());
} else {
throw new IllegalStateException("Delivery has unknown link type");
}
}
/* waiting Pull Request sent
@Override
public int getDataLength() {
return delivery.getDataLength();
} */
@Override
public int available() {
return delivery.available();
}
@Override
public DeliveryState getLocalState() {
return delivery.getLocalState();
}
@Override
public DeliveryState getRemoteState() {
return delivery.getRemoteState();
}
@Override
public int getMessageFormat() {
return delivery.getMessageFormat();
}
@Override
public void disposition(DeliveryState state) {
throw new UnsupportedOperationException("Cannot alter the Delivery state");
}
@Override
public void settle() {
throw new UnsupportedOperationException("Cannot alter the Delivery state");
}
@Override
public boolean isSettled() {
return delivery.isSettled();
}
@Override
public boolean remotelySettled() {
return delivery.remotelySettled();
}
@Override
public void free() {
throw new UnsupportedOperationException("Cannot alter the Delivery state");
}
@Override
public Delivery getWorkNext() {
return new UnmodifiableDelivery(delivery.getWorkNext());
}
@Override
public Delivery next() {
return new UnmodifiableDelivery(delivery.next());
}
@Override
public boolean isWritable() {
return delivery.isWritable();
}
@Override
public boolean isReadable() {
return delivery.isReadable();
}
@Override
public void setContext(Object o) {
throw new UnsupportedOperationException("Cannot alter the Delivery state");
}
@Override
public Object getContext() {
return delivery.getContext();
}
@Override
public boolean isUpdated() {
return delivery.isUpdated();
}
@Override
public void clear() {
throw new UnsupportedOperationException("Cannot alter the Delivery state");
}
@Override
public boolean isPartial() {
return delivery.isPartial();
}
@Override
public int pending() {
return delivery.pending();
}
@Override
public boolean isBuffered() {
return delivery.isBuffered();
}
@Override
public Record attachments() {
return delivery.attachments();
}
@Override
public DeliveryState getDefaultDeliveryState() {
return delivery.getDefaultDeliveryState();
}
@Override
public void setDefaultDeliveryState(DeliveryState state) {
throw new UnsupportedOperationException("Cannot alter the Delivery");
}
@Override
public void setMessageFormat(int messageFormat) {
throw new UnsupportedOperationException("Cannot alter the Delivery");
}
}

View File

@ -1,306 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.amqp.client.util;
import java.util.EnumSet;
import java.util.Map;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
import org.apache.qpid.proton.amqp.transport.Source;
import org.apache.qpid.proton.amqp.transport.Target;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.EndpointState;
import org.apache.qpid.proton.engine.Link;
import org.apache.qpid.proton.engine.Receiver;
import org.apache.qpid.proton.engine.Record;
import org.apache.qpid.proton.engine.Sender;
import org.apache.qpid.proton.engine.Session;
/**
* Unmodifiable Session wrapper used to prevent test code from accidentally
* modifying Session state.
*/
public class UnmodifiableLink implements Link {
private final Link link;
public UnmodifiableLink(Link link) {
this.link = link;
}
@Override
public EndpointState getLocalState() {
return link.getLocalState();
}
@Override
public EndpointState getRemoteState() {
return link.getRemoteState();
}
@Override
public ErrorCondition getCondition() {
return link.getCondition();
}
@Override
public void setCondition(ErrorCondition condition) {
throw new UnsupportedOperationException("Cannot alter the Link state");
}
@Override
public ErrorCondition getRemoteCondition() {
return link.getRemoteCondition();
}
@Override
public void free() {
throw new UnsupportedOperationException("Cannot alter the Link state");
}
@Override
public void open() {
throw new UnsupportedOperationException("Cannot alter the Link state");
}
@Override
public void close() {
throw new UnsupportedOperationException("Cannot alter the Link state");
}
@Override
public void setContext(Object o) {
throw new UnsupportedOperationException("Cannot alter the Link state");
}
@Override
public Object getContext() {
return link.getContext();
}
@Override
public String getName() {
return link.getName();
}
@Override
public Delivery delivery(byte[] tag) {
throw new UnsupportedOperationException("Cannot alter the Link state");
}
@Override
public Delivery delivery(byte[] tag, int offset, int length) {
throw new UnsupportedOperationException("Cannot alter the Link state");
}
@Override
public Delivery head() {
return new UnmodifiableDelivery(link.head());
}
@Override
public Delivery current() {
return new UnmodifiableDelivery(link.current());
}
@Override
public boolean advance() {
throw new UnsupportedOperationException("Cannot alter the Link state");
}
@Override
public Source getSource() {
// TODO Figure out a simple way to wrap the odd Source types in Proton-J
return link.getSource();
}
@Override
public Target getTarget() {
// TODO Figure out a simple way to wrap the odd Source types in Proton-J
return link.getTarget();
}
@Override
public void setSource(Source address) {
throw new UnsupportedOperationException("Cannot alter the Link state");
}
@Override
public void setTarget(Target address) {
throw new UnsupportedOperationException("Cannot alter the Link state");
}
@Override
public Source getRemoteSource() {
// TODO Figure out a simple way to wrap the odd Source types in Proton-J
return link.getRemoteSource();
}
@Override
public Target getRemoteTarget() {
// TODO Figure out a simple way to wrap the odd Target types in Proton-J
return link.getRemoteTarget();
}
@Override
public Link next(EnumSet<EndpointState> local, EnumSet<EndpointState> remote) {
Link next = link.next(local, remote);
if (next != null) {
if (next instanceof Sender) {
next = new UnmodifiableSender((Sender) next);
} else {
next = new UnmodifiableReceiver((Receiver) next);
}
}
return next;
}
@Override
public int getCredit() {
return link.getCredit();
}
@Override
public int getQueued() {
return link.getQueued();
}
@Override
public int getUnsettled() {
return link.getUnsettled();
}
@Override
public Session getSession() {
return new UnmodifiableSession(link.getSession());
}
@Override
public SenderSettleMode getSenderSettleMode() {
return link.getSenderSettleMode();
}
@Override
public void setSenderSettleMode(SenderSettleMode senderSettleMode) {
throw new UnsupportedOperationException("Cannot alter the Link state");
}
@Override
public SenderSettleMode getRemoteSenderSettleMode() {
return link.getRemoteSenderSettleMode();
}
@Override
public ReceiverSettleMode getReceiverSettleMode() {
return link.getReceiverSettleMode();
}
@Override
public void setReceiverSettleMode(ReceiverSettleMode receiverSettleMode) {
throw new UnsupportedOperationException("Cannot alter the Link state");
}
@Override
public ReceiverSettleMode getRemoteReceiverSettleMode() {
return link.getRemoteReceiverSettleMode();
}
@Override
public void setRemoteSenderSettleMode(SenderSettleMode remoteSenderSettleMode) {
throw new UnsupportedOperationException("Cannot alter the Link state");
}
@Override
public int drained() {
return link.drained(); // TODO - Is this a mutating call?
}
@Override
public int getRemoteCredit() {
return link.getRemoteCredit();
}
@Override
public boolean getDrain() {
return link.getDrain();
}
@Override
public void detach() {
throw new UnsupportedOperationException("Cannot alter the Link state");
}
@Override
public boolean detached() {
return link.detached();
}
@Override
public Record attachments() {
return link.attachments();
}
@Override
public Map<Symbol, Object> getProperties() {
return link.getProperties();
}
@Override
public void setProperties(Map<Symbol, Object> properties) {
throw new UnsupportedOperationException("Cannot alter the Link state");
}
@Override
public Map<Symbol, Object> getRemoteProperties() {
return link.getRemoteProperties();
}
@Override
public Symbol[] getDesiredCapabilities() {
return link.getDesiredCapabilities();
}
@Override
public Symbol[] getOfferedCapabilities() {
return link.getOfferedCapabilities();
}
@Override
public Symbol[] getRemoteDesiredCapabilities() {
return link.getRemoteDesiredCapabilities();
}
@Override
public Symbol[] getRemoteOfferedCapabilities() {
return link.getRemoteOfferedCapabilities();
}
@Override
public void setDesiredCapabilities(Symbol[] capabilities) {
throw new UnsupportedOperationException("Cannot alter the Link state");
}
@Override
public void setOfferedCapabilities(Symbol[] capabilities) {
throw new UnsupportedOperationException("Cannot alter the Link state");
}
}

View File

@ -0,0 +1,167 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.amqp.client.util;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Map;
import org.apache.qpid.proton.engine.Connection;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.Link;
import org.apache.qpid.proton.engine.Receiver;
import org.apache.qpid.proton.engine.Sasl;
import org.apache.qpid.proton.engine.Sender;
import org.apache.qpid.proton.engine.Session;
import org.apache.qpid.proton.engine.Transport;
/**
* Utility that creates proxy objects for the Proton objects which won't allow any mutating
* operations to be applied so that the test code does not interact with the proton engine
* outside the client serialization thread.
*/
public final class UnmodifiableProxy {
private static ArrayList<String> blacklist = new ArrayList<>();
// These methods are mutating but don't take an arguments so they
// aren't automatically filtered out. We will have to keep an eye
// on proton API in the future and modify this list as it evolves.
static {
blacklist.add("close");
blacklist.add("free");
blacklist.add("open");
blacklist.add("sasl");
blacklist.add("session");
blacklist.add("close_head");
blacklist.add("close_tail");
blacklist.add("outputConsumed");
blacklist.add("process");
blacklist.add("processInput");
blacklist.add("unbind");
blacklist.add("settle");
blacklist.add("clear");
blacklist.add("detach");
blacklist.add("abort");
}
private UnmodifiableProxy() {
}
public static Transport transportProxy(final Transport target) {
Transport wrap = wrap(Transport.class, target);
return wrap;
}
public static Sasl saslProxy(final Sasl target) {
return wrap(Sasl.class, target);
}
public static Connection connectionProxy(final Connection target) {
return wrap(Connection.class, target);
}
public static Session sessionProxy(final Session target) {
return wrap(Session.class, target);
}
public static Delivery deliveryProxy(final Delivery target) {
return wrap(Delivery.class, target);
}
public static Link linkProxy(final Link target) {
return wrap(Link.class, target);
}
public static Receiver receiverProxy(final Receiver target) {
return wrap(Receiver.class, target);
}
public static Sender senderProxy(final Sender target) {
return wrap(Sender.class, target);
}
private static boolean isProtonType(Class<?> clazz) {
String packageName = clazz.getPackage().getName();
if (packageName.startsWith("org.apache.qpid.proton.")) {
return true;
}
return false;
}
private static <T> T wrap(Class<T> type, final Object target) {
return type.cast(java.lang.reflect.Proxy.newProxyInstance(type.getClassLoader(), new Class[] {type}, new InvocationHandler() {
@Override
public Object invoke(Object o, Method method, Object[] objects) throws Throwable {
if ("toString".equals(method.getName()) && method.getParameterTypes().length == 0) {
return "Unmodifiable proxy -> (" + method.invoke(target, objects) + ")";
}
// Don't let methods that mutate be invoked.
if (method.getParameterTypes().length > 0) {
throw new UnsupportedOperationException("Cannot mutate outside the Client work thread");
}
if (blacklist.contains(method.getName())) {
throw new UnsupportedOperationException("Cannot mutate outside the Client work thread");
}
Class<?> returnType = method.getReturnType();
try {
Object result = method.invoke(target, objects);
if (result == null) {
return null;
}
if (returnType.isPrimitive() || returnType.isArray() || Object.class.equals(returnType)) {
// Skip any other checks
} else if (returnType.isAssignableFrom(ByteBuffer.class)) {
// Buffers are modifiable but we can just return null to indicate
// there's nothing there to access.
result = null;
} else if (returnType.isAssignableFrom(Map.class)) {
// Prevent return of modifiable maps
result = Collections.unmodifiableMap((Map<?, ?>) result);
} else if (isProtonType(returnType) && returnType.isInterface()) {
// Can't handle the crazy Source / Target types yet as there's two
// different types for Source and Target the result can't be cast to
// the one people actually want to use.
if (!returnType.getName().equals("org.apache.qpid.proton.amqp.transport.Source")
&& !returnType.getName().equals("org.apache.qpid.proton.amqp.messaging.Source")
&& !returnType.getName().equals("org.apache.qpid.proton.amqp.transport.Target")
&& !returnType.getName().equals("org.apache.qpid.proton.amqp.messaging.Target")) {
result = wrap(returnType, result);
}
}
return result;
} catch (InvocationTargetException e) {
throw e.getTargetException();
}
}
}));
}
}

View File

@ -1,65 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.amqp.client.util;
import org.apache.qpid.proton.codec.WritableBuffer;
import org.apache.qpid.proton.engine.Receiver;
/**
* Unmodifiable Receiver wrapper used to prevent test code from accidentally
* modifying Receiver state.
*/
public class UnmodifiableReceiver extends UnmodifiableLink implements Receiver {
private final Receiver receiver;
public UnmodifiableReceiver(Receiver receiver) {
super(receiver);
this.receiver = receiver;
}
@Override
public void flow(int credits) {
throw new UnsupportedOperationException("Cannot alter the Link state");
}
@Override
public int recv(byte[] bytes, int offset, int size) {
throw new UnsupportedOperationException("Cannot alter the Link state");
}
@Override
public int recv(WritableBuffer buffer) {
throw new UnsupportedOperationException("Cannot alter the Link state");
}
@Override
public void drain(int credit) {
throw new UnsupportedOperationException("Cannot alter the Link state");
}
@Override
public boolean draining() {
return receiver.draining();
}
@Override
public void setDrain(boolean drain) {
throw new UnsupportedOperationException("Cannot alter the Link state");
}
}

View File

@ -1,51 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.amqp.client.util;
import org.apache.qpid.proton.codec.ReadableBuffer;
import org.apache.qpid.proton.engine.Sender;
/**
* Unmodifiable Sender wrapper used to prevent test code from accidentally
* modifying Sender state.
*/
public class UnmodifiableSender extends UnmodifiableLink implements Sender {
public UnmodifiableSender(Sender sender) {
super(sender);
}
@Override
public void offer(int credits) {
throw new UnsupportedOperationException("Cannot alter the Link state");
}
@Override
public int send(byte[] bytes, int offset, int length) {
throw new UnsupportedOperationException("Cannot alter the Link state");
}
@Override
public int send(ReadableBuffer buffer) {
throw new UnsupportedOperationException("Cannot alter the Link state");
}
@Override
public void abort() {
throw new UnsupportedOperationException("Cannot alter the Link state");
}
}

View File

@ -1,198 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.amqp.client.util;
import java.util.EnumSet;
import java.util.Map;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.engine.Connection;
import org.apache.qpid.proton.engine.EndpointState;
import org.apache.qpid.proton.engine.Receiver;
import org.apache.qpid.proton.engine.Record;
import org.apache.qpid.proton.engine.Sender;
import org.apache.qpid.proton.engine.Session;
/**
* Unmodifiable Session wrapper used to prevent test code from accidentally
* modifying Session state.
*/
public class UnmodifiableSession implements Session {
private final Session session;
public UnmodifiableSession(Session session) {
this.session = session;
}
@Override
public EndpointState getLocalState() {
return session.getLocalState();
}
@Override
public EndpointState getRemoteState() {
return session.getRemoteState();
}
@Override
public ErrorCondition getCondition() {
return session.getCondition();
}
@Override
public void setCondition(ErrorCondition condition) {
throw new UnsupportedOperationException("Cannot alter the Session");
}
@Override
public ErrorCondition getRemoteCondition() {
return session.getRemoteCondition();
}
@Override
public void free() {
throw new UnsupportedOperationException("Cannot alter the Session");
}
@Override
public void open() {
throw new UnsupportedOperationException("Cannot alter the Session");
}
@Override
public void close() {
throw new UnsupportedOperationException("Cannot alter the Session");
}
@Override
public void setContext(Object o) {
throw new UnsupportedOperationException("Cannot alter the Session");
}
@Override
public Object getContext() {
return session.getContext();
}
@Override
public Sender sender(String name) {
throw new UnsupportedOperationException("Cannot alter the Session");
}
@Override
public Receiver receiver(String name) {
throw new UnsupportedOperationException("Cannot alter the Session");
}
@Override
public Session next(EnumSet<EndpointState> local, EnumSet<EndpointState> remote) {
Session next = session.next(local, remote);
if (next != null) {
next = new UnmodifiableSession(next);
}
return next;
}
@Override
public Connection getConnection() {
return new UnmodifiableConnection(session.getConnection());
}
@Override
public int getIncomingCapacity() {
return session.getIncomingCapacity();
}
@Override
public void setIncomingCapacity(int bytes) {
throw new UnsupportedOperationException("Cannot alter the Session");
}
@Override
public int getIncomingBytes() {
return session.getIncomingBytes();
}
@Override
public int getOutgoingBytes() {
return session.getOutgoingBytes();
}
@Override
public Record attachments() {
return session.attachments();
}
@Override
public long getOutgoingWindow() {
return session.getOutgoingWindow();
}
@Override
public void setOutgoingWindow(long outgoingWindowSize) {
throw new UnsupportedOperationException("Cannot alter the Session");
}
@Override
public Symbol[] getDesiredCapabilities() {
return session.getDesiredCapabilities();
}
@Override
public Symbol[] getOfferedCapabilities() {
return session.getOfferedCapabilities();
}
@Override
public Map<Symbol, Object> getProperties() {
return session.getProperties();
}
@Override
public Symbol[] getRemoteDesiredCapabilities() {
return session.getRemoteDesiredCapabilities();
}
@Override
public Symbol[] getRemoteOfferedCapabilities() {
return session.getRemoteOfferedCapabilities();
}
@Override
public Map<Symbol, Object> getRemoteProperties() {
return session.getRemoteProperties();
}
@Override
public void setDesiredCapabilities(Symbol[] capabilities) {
throw new UnsupportedOperationException("Cannot alter the Link state");
}
@Override
public void setOfferedCapabilities(Symbol[] capabilities) {
throw new UnsupportedOperationException("Cannot alter the Link state");
}
@Override
public void setProperties(Map<Symbol, Object> capabilities) {
throw new UnsupportedOperationException("Cannot alter the Link state");
}
}

View File

@ -1,274 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.amqp.client.util;
import java.nio.ByteBuffer;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.engine.Connection;
import org.apache.qpid.proton.engine.EndpointState;
import org.apache.qpid.proton.engine.Record;
import org.apache.qpid.proton.engine.Sasl;
import org.apache.qpid.proton.engine.Ssl;
import org.apache.qpid.proton.engine.SslDomain;
import org.apache.qpid.proton.engine.SslPeerDetails;
import org.apache.qpid.proton.engine.Transport;
import org.apache.qpid.proton.engine.TransportException;
import org.apache.qpid.proton.engine.TransportResult;
/**
* Unmodifiable Transport wrapper used to prevent test code from accidentally
* modifying Transport state.
*/
public class UnmodifiableTransport implements Transport {
private final Transport transport;
public UnmodifiableTransport(Transport transport) {
this.transport = transport;
}
@Override
public void close() {
throw new UnsupportedOperationException("Cannot alter the Transport");
}
@Override
public void free() {
throw new UnsupportedOperationException("Cannot alter the Transport");
}
@Override
public Object getContext() {
return null;
}
@Override
public EndpointState getLocalState() {
return transport.getLocalState();
}
@Override
public ErrorCondition getRemoteCondition() {
return transport.getRemoteCondition();
}
@Override
public EndpointState getRemoteState() {
return transport.getRemoteState();
}
@Override
public void open() {
throw new UnsupportedOperationException("Cannot alter the Transport");
}
@Override
public void setCondition(ErrorCondition arg0) {
throw new UnsupportedOperationException("Cannot alter the Transport");
}
@Override
public void setContext(Object arg0) {
throw new UnsupportedOperationException("Cannot alter the Transport");
}
@Override
public void bind(Connection arg0) {
throw new UnsupportedOperationException("Cannot alter the Transport");
}
@Override
public int capacity() {
return transport.capacity();
}
@Override
public void close_head() {
throw new UnsupportedOperationException("Cannot alter the Transport");
}
@Override
public void close_tail() {
throw new UnsupportedOperationException("Cannot alter the Transport");
}
@Override
public int getChannelMax() {
return transport.getChannelMax();
}
@Override
public ErrorCondition getCondition() {
return transport.getCondition();
}
@Override
public int getIdleTimeout() {
return transport.getIdleTimeout();
}
@Override
public ByteBuffer getInputBuffer() {
return null;
}
@Override
public int getMaxFrameSize() {
return transport.getMaxFrameSize();
}
@Override
public ByteBuffer getOutputBuffer() {
return null;
}
@Override
public int getRemoteChannelMax() {
return transport.getRemoteChannelMax();
}
@Override
public int getRemoteIdleTimeout() {
return transport.getRemoteIdleTimeout();
}
@Override
public int getRemoteMaxFrameSize() {
return transport.getRemoteMaxFrameSize();
}
@Override
public ByteBuffer head() {
return null;
}
@Override
public int input(byte[] arg0, int arg1, int arg2) {
throw new UnsupportedOperationException("Cannot alter the Transport");
}
@Override
public boolean isClosed() {
return transport.isClosed();
}
@Override
public int output(byte[] arg0, int arg1, int arg2) {
throw new UnsupportedOperationException("Cannot alter the Transport");
}
@Override
public void outputConsumed() {
throw new UnsupportedOperationException("Cannot alter the Transport");
}
@Override
public int pending() {
return transport.pending();
}
@Override
public void pop(int arg0) {
throw new UnsupportedOperationException("Cannot alter the Transport");
}
@Override
public void process() throws TransportException {
throw new UnsupportedOperationException("Cannot alter the Transport");
}
@Override
public TransportResult processInput() {
throw new UnsupportedOperationException("Cannot alter the Transport");
}
@Override
public Sasl sasl() throws IllegalStateException {
throw new UnsupportedOperationException("Cannot alter the Transport");
}
@Override
public void setChannelMax(int arg0) {
throw new UnsupportedOperationException("Cannot alter the Transport");
}
@Override
public void setIdleTimeout(int arg0) {
throw new UnsupportedOperationException("Cannot alter the Transport");
}
@Override
public void setMaxFrameSize(int arg0) {
throw new UnsupportedOperationException("Cannot alter the Transport");
}
@Override
public Ssl ssl(SslDomain arg0) {
throw new UnsupportedOperationException("Cannot alter the Transport");
}
@Override
public Ssl ssl(SslDomain arg0, SslPeerDetails arg1) {
throw new UnsupportedOperationException("Cannot alter the Transport");
}
@Override
public ByteBuffer tail() {
return null;
}
@Override
public long tick(long arg0) {
throw new UnsupportedOperationException("Cannot alter the Transport");
}
@Override
public void trace(int arg0) {
throw new UnsupportedOperationException("Cannot alter the Transport");
}
@Override
public void unbind() {
throw new UnsupportedOperationException("Cannot alter the Transport");
}
@Override
public Record attachments() {
return transport.attachments();
}
@Override
public long getFramesInput() {
return transport.getFramesInput();
}
@Override
public long getFramesOutput() {
return transport.getFramesOutput();
}
@Override
public void setEmitFlowEventOnSend(boolean emitFlowEventOnSend) {
throw new UnsupportedOperationException("Cannot alter the Transport");
}
@Override
public boolean isEmitFlowEventOnSend() {
return transport.isEmitFlowEventOnSend();
}
}