This commit is contained in:
Clebert Suconic 2017-05-11 16:52:03 -04:00
commit b7b79e5dfd
21 changed files with 565 additions and 1835 deletions

View File

@ -25,8 +25,8 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@ -39,7 +39,7 @@ import org.apache.activemq.transport.amqp.client.util.AsyncResult;
import org.apache.activemq.transport.amqp.client.util.ClientFuture;
import org.apache.activemq.transport.amqp.client.util.IdGenerator;
import org.apache.activemq.transport.amqp.client.util.NoOpAsyncResult;
import org.apache.activemq.transport.amqp.client.util.UnmodifiableConnection;
import org.apache.activemq.transport.amqp.client.util.UnmodifiableProxy;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.engine.Collector;
import org.apache.qpid.proton.engine.Connection;
@ -74,7 +74,7 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements
public static final long DEFAULT_CLOSE_TIMEOUT = 30000;
public static final long DEFAULT_DRAIN_TIMEOUT = 60000;
private final ScheduledExecutorService serializer;
private ScheduledThreadPoolExecutor serializer;
private final AtomicBoolean closed = new AtomicBoolean();
private final AtomicBoolean connected = new AtomicBoolean();
private final AtomicLong sessionIdGenerator = new AtomicLong();
@ -121,7 +121,7 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements
this.connectionId = CONNECTION_ID_GENERATOR.generateId();
this.remoteURI = transport.getRemoteLocation();
this.serializer = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
this.serializer = new ScheduledThreadPoolExecutor(1, new ThreadFactory() {
@Override
public Thread newThread(Runnable runner) {
@ -132,6 +132,10 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements
}
});
// Ensure timely shutdown
this.serializer.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
this.serializer.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
this.transport.setTransportListener(this);
}
@ -434,7 +438,7 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements
}
public Connection getConnection() {
return new UnmodifiableConnection(getEndpoint());
return UnmodifiableProxy.connectionProxy(getEndpoint());
}
public AmqpConnectionListener getListener() {

View File

@ -20,7 +20,7 @@ import java.util.HashMap;
import java.util.Map;
import java.util.NoSuchElementException;
import org.apache.activemq.transport.amqp.client.util.UnmodifiableDelivery;
import org.apache.activemq.transport.amqp.client.util.UnmodifiableProxy;
import org.apache.qpid.proton.Proton;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.DescribedType;
@ -100,7 +100,7 @@ public class AmqpMessage {
*/
public Delivery getWrappedDelivery() {
if (delivery != null) {
return new UnmodifiableDelivery(delivery);
return UnmodifiableProxy.deliveryProxy(delivery);
}
return null;

View File

@ -35,7 +35,7 @@ import javax.jms.InvalidDestinationException;
import org.apache.activemq.transport.amqp.client.util.AsyncResult;
import org.apache.activemq.transport.amqp.client.util.ClientFuture;
import org.apache.activemq.transport.amqp.client.util.IOExceptionSupport;
import org.apache.activemq.transport.amqp.client.util.UnmodifiableReceiver;
import org.apache.activemq.transport.amqp.client.util.UnmodifiableProxy;
import org.apache.qpid.jms.JmsOperationTimedOutException;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.DescribedType;
@ -677,7 +677,7 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> {
* @return an unmodifiable view of the underlying Receiver instance.
*/
public Receiver getReceiver() {
return new UnmodifiableReceiver(getEndpoint());
return UnmodifiableProxy.receiverProxy(getEndpoint());
}
// ----- Receiver configuration properties --------------------------------//

View File

@ -29,7 +29,7 @@ import javax.jms.InvalidDestinationException;
import org.apache.activemq.transport.amqp.client.util.AsyncResult;
import org.apache.activemq.transport.amqp.client.util.ClientFuture;
import org.apache.activemq.transport.amqp.client.util.UnmodifiableSender;
import org.apache.activemq.transport.amqp.client.util.UnmodifiableProxy;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.Accepted;
@ -231,7 +231,7 @@ public class AmqpSender extends AmqpAbstractResource<Sender> {
* @return an unmodifiable view of the underlying Sender instance.
*/
public Sender getSender() {
return new UnmodifiableSender(getEndpoint());
return UnmodifiableProxy.senderProxy(getEndpoint());
}
/**

View File

@ -24,7 +24,7 @@ import java.util.concurrent.atomic.AtomicLong;
import org.apache.activemq.transport.amqp.client.util.AsyncResult;
import org.apache.activemq.transport.amqp.client.util.ClientFuture;
import org.apache.activemq.transport.amqp.client.util.UnmodifiableSession;
import org.apache.activemq.transport.amqp.client.util.UnmodifiableProxy;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.Source;
import org.apache.qpid.proton.amqp.messaging.Target;
@ -598,7 +598,7 @@ public class AmqpSession extends AmqpAbstractResource<Session> {
}
public Session getSession() {
return new UnmodifiableSession(getEndpoint());
return UnmodifiableProxy.sessionProxy(getEndpoint());
}
public boolean isInTransaction() {

View File

@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@ -6,7 +6,7 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@ -23,25 +23,29 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.transport.amqp.client.util.IOExceptionSupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.FixedRecvByteBufAllocator;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.ssl.SslHandler;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import org.apache.activemq.transport.amqp.client.util.IOExceptionSupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* TCP based transport that uses Netty as the underlying IO layer.
@ -50,28 +54,27 @@ public class NettyTcpTransport implements NettyTransport {
private static final Logger LOG = LoggerFactory.getLogger(NettyTcpTransport.class);
private static final int QUIET_PERIOD = 20;
private static final int SHUTDOWN_TIMEOUT = 100;
protected Bootstrap bootstrap;
protected EventLoopGroup group;
protected Channel channel;
protected NettyTransportListener listener;
protected NettyTransportOptions options;
protected final NettyTransportOptions options;
protected final URI remote;
protected boolean secure;
private final AtomicBoolean connected = new AtomicBoolean();
private final AtomicBoolean closed = new AtomicBoolean();
private final CountDownLatch connectLatch = new CountDownLatch(1);
private IOException failureCause;
private Throwable pendingFailure;
private volatile IOException failureCause;
/**
* Create a new transport instance
*
* @param remoteLocation the URI that defines the remote resource to connect to.
* @param options the transport options used to configure the socket connection.
* @param remoteLocation
* the URI that defines the remote resource to connect to.
* @param options
* the transport options used to configure the socket connection.
*/
public NettyTcpTransport(URI remoteLocation, NettyTransportOptions options) {
this(null, remoteLocation, options);
@ -80,15 +83,25 @@ public class NettyTcpTransport implements NettyTransport {
/**
* Create a new transport instance
*
* @param listener the TransportListener that will receive events from this Transport.
* @param remoteLocation the URI that defines the remote resource to connect to.
* @param options the transport options used to configure the socket connection.
* @param listener
* the TransportListener that will receive events from this Transport.
* @param remoteLocation
* the URI that defines the remote resource to connect to.
* @param options
* the transport options used to configure the socket connection.
*/
public NettyTcpTransport(NettyTransportListener listener, URI remoteLocation, NettyTransportOptions options) {
if (options == null) {
throw new IllegalArgumentException("Transport Options cannot be null");
}
if (remoteLocation == null) {
throw new IllegalArgumentException("Transport remote location cannot be null");
}
this.options = options;
this.listener = listener;
this.remote = remoteLocation;
this.secure = remoteLocation.getScheme().equalsIgnoreCase("ssl");
}
@Override
@ -98,16 +111,27 @@ public class NettyTcpTransport implements NettyTransport {
throw new IllegalStateException("A transport listener must be set before connection attempts.");
}
final SslHandler sslHandler;
if (isSSL()) {
try {
sslHandler = NettyTransportSupport.createSslHandler(getRemoteLocation(), getSslOptions());
} catch (Exception ex) {
// TODO: can we stop it throwing Exception?
throw IOExceptionSupport.create(ex);
}
} else {
sslHandler = null;
}
group = new NioEventLoopGroup(1);
bootstrap = new Bootstrap();
bootstrap.group(group);
bootstrap.channel(NioSocketChannel.class);
bootstrap.handler(new ChannelInitializer<Channel>() {
@Override
public void initChannel(Channel connectedChannel) throws Exception {
configureChannel(connectedChannel);
configureChannel(connectedChannel, sslHandler);
}
});
@ -118,12 +142,8 @@ public class NettyTcpTransport implements NettyTransport {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
handleConnected(future.channel());
} else if (future.isCancelled()) {
connectionFailed(future.channel(), new IOException("Connection attempt was cancelled"));
} else {
connectionFailed(future.channel(), IOExceptionSupport.create(future.cause()));
if (!future.isSuccess()) {
handleException(future.channel(), IOExceptionSupport.create(future.cause()));
}
}
});
@ -143,7 +163,10 @@ public class NettyTcpTransport implements NettyTransport {
channel = null;
}
if (group != null) {
group.shutdownGracefully(QUIET_PERIOD, SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS);
Future<?> fut = group.shutdownGracefully(0, SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS);
if (!fut.awaitUninterruptibly(2 * SHUTDOWN_TIMEOUT)) {
LOG.trace("Channel group shutdown failed to complete in allotted time");
}
group = null;
}
@ -154,8 +177,8 @@ public class NettyTcpTransport implements NettyTransport {
@Override
public void run() {
if (pendingFailure != null) {
channel.pipeline().fireExceptionCaught(pendingFailure);
if (failureCause != null) {
channel.pipeline().fireExceptionCaught(failureCause);
}
}
});
@ -169,18 +192,24 @@ public class NettyTcpTransport implements NettyTransport {
@Override
public boolean isSSL() {
return secure;
return options.isSSL();
}
@Override
public void close() throws IOException {
if (closed.compareAndSet(false, true)) {
connected.set(false);
if (channel != null) {
channel.close().syncUninterruptibly();
}
if (group != null) {
group.shutdownGracefully(QUIET_PERIOD, SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS);
try {
if (channel != null) {
channel.close().syncUninterruptibly();
}
} finally {
if (group != null) {
Future<?> fut = group.shutdownGracefully(0, SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS);
if (!fut.awaitUninterruptibly(2 * SHUTDOWN_TIMEOUT)) {
LOG.trace("Channel group shutdown failed to complete in allotted time");
}
}
}
}
}
@ -216,14 +245,6 @@ public class NettyTcpTransport implements NettyTransport {
@Override
public NettyTransportOptions getTransportOptions() {
if (options == null) {
if (isSSL()) {
options = NettyTransportSslOptions.INSTANCE;
} else {
options = NettyTransportOptions.INSTANCE;
}
}
return options;
}
@ -234,36 +255,106 @@ public class NettyTcpTransport implements NettyTransport {
@Override
public Principal getLocalPrincipal() {
if (!isSSL()) {
throw new UnsupportedOperationException("Not connected to a secure channel");
Principal result = null;
if (isSSL()) {
SslHandler sslHandler = channel.pipeline().get(SslHandler.class);
result = sslHandler.engine().getSession().getLocalPrincipal();
}
SslHandler sslHandler = channel.pipeline().get(SslHandler.class);
return sslHandler.engine().getSession().getLocalPrincipal();
return result;
}
//----- Internal implementation details, can be overridden as needed --//
// ----- Internal implementation details, can be overridden as needed -----//
protected String getRemoteHost() {
return remote.getHost();
}
protected int getRemotePort() {
int port = remote.getPort();
if (port <= 0) {
if (isSSL()) {
port = getSslOptions().getDefaultSslPort();
} else {
port = getTransportOptions().getDefaultTcpPort();
}
if (remote.getPort() != -1) {
return remote.getPort();
} else {
return isSSL() ? getSslOptions().getDefaultSslPort() : getTransportOptions().getDefaultTcpPort();
}
return port;
}
protected void configureNetty(Bootstrap bootstrap, NettyTransportOptions options) {
protected void addAdditionalHandlers(ChannelPipeline pipeline) {
}
protected ChannelInboundHandlerAdapter createChannelHandler() {
return new NettyTcpTransportHandler();
}
// ----- Event Handlers which can be overridden in subclasses -------------//
protected void handleConnected(Channel channel) throws Exception {
LOG.trace("Channel has become active! Channel is {}", channel);
connectionEstablished(channel);
}
protected void handleChannelInactive(Channel channel) throws Exception {
LOG.trace("Channel has gone inactive! Channel is {}", channel);
if (connected.compareAndSet(true, false) && !closed.get()) {
LOG.trace("Firing onTransportClosed listener");
listener.onTransportClosed();
}
}
protected void handleException(Channel channel, Throwable cause) throws Exception {
LOG.trace("Exception on channel! Channel is {}", channel);
if (connected.compareAndSet(true, false) && !closed.get()) {
LOG.trace("Firing onTransportError listener");
if (failureCause != null) {
listener.onTransportError(failureCause);
} else {
listener.onTransportError(cause);
}
} else {
// Hold the first failure for later dispatch if connect succeeds.
// This will then trigger disconnect using the first error reported.
if (failureCause == null) {
LOG.trace("Holding error until connect succeeds: {}", cause.getMessage());
failureCause = IOExceptionSupport.create(cause);
}
connectionFailed(channel, failureCause);
}
}
// ----- State change handlers and checks ---------------------------------//
protected final void checkConnected() throws IOException {
if (!connected.get()) {
throw new IOException("Cannot send to a non-connected transport.");
}
}
/*
* Called when the transport has successfully connected and is ready for use.
*/
private void connectionEstablished(Channel connectedChannel) {
channel = connectedChannel;
connected.set(true);
connectLatch.countDown();
}
/*
* Called when the transport connection failed and an error should be returned.
*/
private void connectionFailed(Channel failedChannel, IOException cause) {
failureCause = cause;
channel = failedChannel;
connected.set(false);
connectLatch.countDown();
}
private NettyTransportSslOptions getSslOptions() {
return (NettyTransportSslOptions) getTransportOptions();
}
private void configureNetty(Bootstrap bootstrap, NettyTransportOptions options) {
bootstrap.option(ChannelOption.TCP_NODELAY, options.isTcpNoDelay());
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, options.getConnectTimeout());
bootstrap.option(ChannelOption.SO_KEEPALIVE, options.isTcpKeepAlive());
@ -283,105 +374,66 @@ public class NettyTcpTransport implements NettyTransport {
}
}
protected void configureChannel(final Channel channel) throws Exception {
private void configureChannel(final Channel channel, final SslHandler sslHandler) throws Exception {
if (isSSL()) {
SslHandler sslHandler = NettyTransportSupport.createSslHandler(getRemoteLocation(), getSslOptions());
sslHandler.handshakeFuture().addListener(new GenericFutureListener<Future<Channel>>() {
@Override
public void operationComplete(Future<Channel> future) throws Exception {
if (future.isSuccess()) {
LOG.trace("SSL Handshake has completed: {}", channel);
connectionEstablished(channel);
} else {
LOG.trace("SSL Handshake has failed: {}", channel);
connectionFailed(channel, IOExceptionSupport.create(future.cause()));
}
}
});
channel.pipeline().addLast(sslHandler);
}
channel.pipeline().addLast(new NettyTcpTransportHandler());
}
protected void handleConnected(final Channel channel) throws Exception {
if (!isSSL()) {
connectionEstablished(channel);
if (getTransportOptions().isTraceBytes()) {
channel.pipeline().addLast("logger", new LoggingHandler(getClass()));
}
addAdditionalHandlers(channel.pipeline());
channel.pipeline().addLast(createChannelHandler());
}
//----- State change handlers and checks ---------------------------------//
// ----- Handle connection events -----------------------------------------//
/**
* Called when the transport has successfully connected and is ready for use.
*/
protected void connectionEstablished(Channel connectedChannel) {
channel = connectedChannel;
connected.set(true);
connectLatch.countDown();
}
protected abstract class NettyDefaultHandler<E> extends SimpleChannelInboundHandler<E> {
/**
* Called when the transport connection failed and an error should be returned.
*
* @param failedChannel The Channel instance that failed.
* @param cause An IOException that describes the cause of the failed connection.
*/
protected void connectionFailed(Channel failedChannel, IOException cause) {
failureCause = IOExceptionSupport.create(cause);
channel = failedChannel;
connected.set(false);
connectLatch.countDown();
}
private NettyTransportSslOptions getSslOptions() {
return (NettyTransportSslOptions) getTransportOptions();
}
private void checkConnected() throws IOException {
if (!connected.get()) {
throw new IOException("Cannot send to a non-connected transport.");
@Override
public void channelRegistered(ChannelHandlerContext context) throws Exception {
channel = context.channel();
}
}
//----- Handle connection events -----------------------------------------//
private class NettyTcpTransportHandler extends SimpleChannelInboundHandler<ByteBuf> {
@Override
public void channelActive(ChannelHandlerContext context) throws Exception {
LOG.trace("Channel has become active! Channel is {}", context.channel());
// In the Secure case we need to let the handshake complete before we
// trigger the connected event.
if (!isSSL()) {
handleConnected(context.channel());
} else {
SslHandler sslHandler = context.pipeline().get(SslHandler.class);
sslHandler.handshakeFuture().addListener(new GenericFutureListener<Future<Channel>>() {
@Override
public void operationComplete(Future<Channel> future) throws Exception {
if (future.isSuccess()) {
LOG.trace("SSL Handshake has completed: {}", channel);
handleConnected(channel);
} else {
LOG.trace("SSL Handshake has failed: {}", channel);
handleException(channel, future.cause());
}
}
});
}
}
@Override
public void channelInactive(ChannelHandlerContext context) throws Exception {
LOG.trace("Channel has gone inactive! Channel is {}", context.channel());
if (connected.compareAndSet(true, false) && !closed.get()) {
LOG.trace("Firing onTransportClosed listener");
listener.onTransportClosed();
}
handleChannelInactive(context.channel());
}
@Override
public void exceptionCaught(ChannelHandlerContext context, Throwable cause) throws Exception {
LOG.trace("Exception on channel! Channel is {}", context.channel());
if (connected.compareAndSet(true, false) && !closed.get()) {
LOG.trace("Firing onTransportError listener");
if (pendingFailure != null) {
listener.onTransportError(pendingFailure);
} else {
listener.onTransportError(cause);
}
} else {
// Hold the first failure for later dispatch if connect succeeds.
// This will then trigger disconnect using the first error reported.
if (pendingFailure != null) {
LOG.trace("Holding error until connect succeeds: {}", cause.getMessage());
pendingFailure = cause;
}
}
handleException(context.channel(), cause);
}
}
// ----- Handle Binary data from connection -------------------------------//
protected class NettyTcpTransportHandler extends NettyDefaultHandler<ByteBuf> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception {

View File

@ -23,7 +23,7 @@ import java.security.Principal;
import io.netty.buffer.ByteBuf;
/**
*
* Base for all Netty based Transports in this client.
*/
public interface NettyTransport {

View File

@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@ -6,7 +6,7 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@ -30,12 +30,16 @@ public final class NettyTransportFactory {
}
/**
* Creates an instance of the given Transport and configures it using the
* properties set on the given remote broker URI.
* Creates an instance of the given Transport and configures it using the properties set on
* the given remote broker URI.
*
* @param remoteURI
* The URI used to connect to a remote Peer.
*
* @param remoteURI The URI used to connect to a remote Peer.
* @return a new Transport instance.
* @throws Exception if an error occurs while creating the Transport instance.
*
* @throws Exception
* if an error occurs while creating the Transport instance.
*/
public static NettyTransport createTransport(URI remoteURI) throws Exception {
Map<String, String> map = PropertyUtil.parseQuery(remoteURI.getQuery());

View File

@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@ -6,7 +6,7 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@ -19,15 +19,16 @@ package org.apache.activemq.transport.amqp.client.transport;
import io.netty.buffer.ByteBuf;
/**
* Listener interface that should be implemented by users of the various
* QpidJMS Transport classes.
* Listener interface that should be implemented by users of the various QpidJMS Transport
* classes.
*/
public interface NettyTransportListener {
/**
* Called when new incoming data has become available.
*
* @param incoming the next incoming packet of data.
* @param incoming
* the next incoming packet of data.
*/
void onData(ByteBuf incoming);
@ -39,7 +40,8 @@ public interface NettyTransportListener {
/**
* Called when an error occurs during normal Transport operations.
*
* @param cause the error that triggered this event.
* @param cause
* the error that triggered this event.
*/
void onTransportError(Throwable cause);

View File

@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@ -6,7 +6,7 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@ -30,6 +30,7 @@ public class NettyTransportOptions implements Cloneable {
public static final int DEFAULT_SO_TIMEOUT = -1;
public static final int DEFAULT_CONNECT_TIMEOUT = 60000;
public static final int DEFAULT_TCP_PORT = 5672;
public static final boolean DEFAULT_TRACE_BYTES = false;
public static final NettyTransportOptions INSTANCE = new NettyTransportOptions();
@ -42,6 +43,7 @@ public class NettyTransportOptions implements Cloneable {
private boolean tcpKeepAlive = DEFAULT_TCP_KEEP_ALIVE;
private boolean tcpNoDelay = DEFAULT_TCP_NO_DELAY;
private int defaultTcpPort = DEFAULT_TCP_PORT;
private boolean traceBytes = DEFAULT_TRACE_BYTES;
/**
* @return the currently set send buffer size in bytes.
@ -51,11 +53,14 @@ public class NettyTransportOptions implements Cloneable {
}
/**
* Sets the send buffer size in bytes, the value must be greater than zero
* or an {@link IllegalArgumentException} will be thrown.
* Sets the send buffer size in bytes, the value must be greater than zero or an
* {@link IllegalArgumentException} will be thrown.
*
* @param sendBufferSize the new send buffer size for the TCP Transport.
* @throws IllegalArgumentException if the value given is not in the valid range.
* @param sendBufferSize
* the new send buffer size for the TCP Transport.
*
* @throws IllegalArgumentException
* if the value given is not in the valid range.
*/
public void setSendBufferSize(int sendBufferSize) {
if (sendBufferSize <= 0) {
@ -73,11 +78,14 @@ public class NettyTransportOptions implements Cloneable {
}
/**
* Sets the receive buffer size in bytes, the value must be greater than zero
* or an {@link IllegalArgumentException} will be thrown.
* Sets the receive buffer size in bytes, the value must be greater than zero or an
* {@link IllegalArgumentException} will be thrown.
*
* @param receiveBufferSize the new receive buffer size for the TCP Transport.
* @throws IllegalArgumentException if the value given is not in the valid range.
* @param receiveBufferSize
* the new receive buffer size for the TCP Transport.
*
* @throws IllegalArgumentException
* if the value given is not in the valid range.
*/
public void setReceiveBufferSize(int receiveBufferSize) {
if (receiveBufferSize <= 0) {
@ -95,11 +103,13 @@ public class NettyTransportOptions implements Cloneable {
}
/**
* Sets the traffic class value used by the TCP connection, valid
* range is between 0 and 255.
* Sets the traffic class value used by the TCP connection, valid range is between 0 and 255.
*
* @param trafficClass the new traffic class value.
* @throws IllegalArgumentException if the value given is not in the valid range.
* @param trafficClass
* the new traffic class value.
*
* @throws IllegalArgumentException
* if the value given is not in the valid range.
*/
public void setTrafficClass(int trafficClass) {
if (trafficClass < 0 || trafficClass > 255) {
@ -157,6 +167,27 @@ public class NettyTransportOptions implements Cloneable {
this.defaultTcpPort = defaultTcpPort;
}
/**
* @return true if the transport should enable byte tracing
*/
public boolean isTraceBytes() {
return traceBytes;
}
/**
* Determines if the transport should add a logger for bytes in / out
*
* @param traceBytes
* should the transport log the bytes in and out.
*/
public void setTraceBytes(boolean traceBytes) {
this.traceBytes = traceBytes;
}
public boolean isSSL() {
return false;
}
@Override
public NettyTransportOptions clone() {
return copyOptions(new NettyTransportOptions());

View File

@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@ -6,7 +6,7 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@ -21,9 +21,8 @@ import java.util.Collections;
import java.util.List;
/**
* Holds the defined SSL options for connections that operate over a secure
* transport. Options are read from the environment and can be overridden by
* specifying them on the connection URI.
* Holds the defined SSL options for connections that operate over a secure transport. Options
* are read from the environment and can be overridden by specifying them on the connection URI.
*/
public class NettyTransportSslOptions extends NettyTransportOptions {
@ -31,7 +30,7 @@ public class NettyTransportSslOptions extends NettyTransportOptions {
public static final String DEFAULT_CONTEXT_PROTOCOL = "TLS";
public static final boolean DEFAULT_TRUST_ALL = false;
public static final boolean DEFAULT_VERIFY_HOST = false;
public static final List<String> DEFAULT_DISABLED_PROTOCOLS = Collections.unmodifiableList(Arrays.asList(new String[]{"SSLv2Hello", "SSLv3"}));
public static final List<String> DEFAULT_DISABLED_PROTOCOLS = Collections.unmodifiableList(Arrays.asList(new String[] {"SSLv2Hello", "SSLv3"}));
public static final int DEFAULT_SSL_PORT = 5671;
public static final NettyTransportSslOptions INSTANCE = new NettyTransportSslOptions();
@ -69,7 +68,8 @@ public class NettyTransportSslOptions extends NettyTransportOptions {
/**
* Sets the location on disk of the key store to use.
*
* @param keyStoreLocation the keyStoreLocation to use to create the key manager.
* @param keyStoreLocation
* the keyStoreLocation to use to create the key manager.
*/
public void setKeyStoreLocation(String keyStoreLocation) {
this.keyStoreLocation = keyStoreLocation;
@ -83,7 +83,8 @@ public class NettyTransportSslOptions extends NettyTransportOptions {
}
/**
* @param keyStorePassword the keyStorePassword to set
* @param keyStorePassword
* the keyStorePassword to set
*/
public void setKeyStorePassword(String keyStorePassword) {
this.keyStorePassword = keyStorePassword;
@ -97,7 +98,8 @@ public class NettyTransportSslOptions extends NettyTransportOptions {
}
/**
* @param trustStoreLocation the trustStoreLocation to set
* @param trustStoreLocation
* the trustStoreLocation to set
*/
public void setTrustStoreLocation(String trustStoreLocation) {
this.trustStoreLocation = trustStoreLocation;
@ -111,7 +113,8 @@ public class NettyTransportSslOptions extends NettyTransportOptions {
}
/**
* @param trustStorePassword the trustStorePassword to set
* @param trustStorePassword
* the trustStorePassword to set
*/
public void setTrustStorePassword(String trustStorePassword) {
this.trustStorePassword = trustStorePassword;
@ -125,7 +128,8 @@ public class NettyTransportSslOptions extends NettyTransportOptions {
}
/**
* @param storeType the format that the store files are encoded in.
* @param storeType
* the format that the store files are encoded in.
*/
public void setStoreType(String storeType) {
this.storeType = storeType;
@ -139,7 +143,8 @@ public class NettyTransportSslOptions extends NettyTransportOptions {
}
/**
* @param enabledCipherSuites the enabledCipherSuites to set
* @param enabledCipherSuites
* the enabledCipherSuites to set
*/
public void setEnabledCipherSuites(String[] enabledCipherSuites) {
this.enabledCipherSuites = enabledCipherSuites;
@ -153,7 +158,8 @@ public class NettyTransportSslOptions extends NettyTransportOptions {
}
/**
* @param disabledCipherSuites the disabledCipherSuites to set
* @param disabledCipherSuites
* the disabledCipherSuites to set
*/
public void setDisabledCipherSuites(String[] disabledCipherSuites) {
this.disabledCipherSuites = disabledCipherSuites;
@ -169,13 +175,15 @@ public class NettyTransportSslOptions extends NettyTransportOptions {
/**
* The protocols to be set as enabled.
*
* @param enabledProtocols the enabled protocols to set, or null if the defaults should be used.
* @param enabledProtocols
* the enabled protocols to set, or null if the defaults should be used.
*/
public void setEnabledProtocols(String[] enabledProtocols) {
this.enabledProtocols = enabledProtocols;
}
/**
*
* @return the protocols to disable or null if none should be
*/
public String[] getDisabledProtocols() {
@ -185,7 +193,8 @@ public class NettyTransportSslOptions extends NettyTransportOptions {
/**
* The protocols to be disable.
*
* @param disabledProtocols the protocols to disable, or null if none should be.
* @param disabledProtocols
* the protocols to disable, or null if none should be.
*/
public void setDisabledProtocols(String[] disabledProtocols) {
this.disabledProtocols = disabledProtocols;
@ -202,7 +211,8 @@ public class NettyTransportSslOptions extends NettyTransportOptions {
* The protocol value to use when creating an SSLContext via
* SSLContext.getInstance(protocol).
*
* @param contextProtocol the context protocol to use.
* @param contextProtocol
* the context protocol to use.
*/
public void setContextProtocol(String contextProtocol) {
this.contextProtocol = contextProtocol;
@ -216,7 +226,8 @@ public class NettyTransportSslOptions extends NettyTransportOptions {
}
/**
* @param trustAll the trustAll to set
* @param trustAll
* the trustAll to set
*/
public void setTrustAll(boolean trustAll) {
this.trustAll = trustAll;
@ -230,7 +241,8 @@ public class NettyTransportSslOptions extends NettyTransportOptions {
}
/**
* @param verifyHost the verifyHost to set
* @param verifyHost
* the verifyHost to set
*/
public void setVerifyHost(boolean verifyHost) {
this.verifyHost = verifyHost;
@ -244,7 +256,8 @@ public class NettyTransportSslOptions extends NettyTransportOptions {
}
/**
* @param keyAlias the key alias to use
* @param keyAlias
* the key alias to use
*/
public void setKeyAlias(String keyAlias) {
this.keyAlias = keyAlias;
@ -258,6 +271,11 @@ public class NettyTransportSslOptions extends NettyTransportOptions {
this.defaultSslPort = defaultSslPort;
}
@Override
public boolean isSSL() {
return true;
}
@Override
public NettyTransportSslOptions clone() {
return copyOptions(new NettyTransportSslOptions());

View File

@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@ -6,7 +6,7 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@ -16,15 +16,6 @@
*/
package org.apache.activemq.transport.amqp.client.transport;
import javax.net.ssl.KeyManager;
import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLParameters;
import javax.net.ssl.TrustManager;
import javax.net.ssl.TrustManagerFactory;
import javax.net.ssl.X509ExtendedKeyManager;
import javax.net.ssl.X509TrustManager;
import java.io.File;
import java.io.FileInputStream;
import java.io.InputStream;
@ -38,10 +29,21 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import io.netty.handler.ssl.SslHandler;
import javax.net.ssl.KeyManager;
import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLParameters;
import javax.net.ssl.TrustManager;
import javax.net.ssl.TrustManagerFactory;
import javax.net.ssl.X509ExtendedKeyManager;
import javax.net.ssl.X509TrustManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.handler.ssl.SslHandler;
/**
* Static class that provides various utility methods used by Transport implementations.
*/
@ -50,13 +52,18 @@ public class NettyTransportSupport {
private static final Logger LOG = LoggerFactory.getLogger(NettyTransportSupport.class);
/**
* Creates a Netty SslHandler instance for use in Transports that require
* an SSL encoder / decoder.
* Creates a Netty SslHandler instance for use in Transports that require an SSL encoder /
* decoder.
*
* @param remote
* The URI of the remote peer that the SslHandler will be used against.
* @param options
* The SSL options object to build the SslHandler instance from.
*
* @param remote The URI of the remote peer that the SslHandler will be used against.
* @param options The SSL options object to build the SslHandler instance from.
* @return a new SslHandler that is configured from the given options.
* @throws Exception if an error occurs while creating the SslHandler instance.
*
* @throws Exception
* if an error occurs while creating the SslHandler instance.
*/
public static SslHandler createSslHandler(URI remote, NettyTransportSslOptions options) throws Exception {
return new SslHandler(createSslEngine(remote, createSslContext(options), options));
@ -66,9 +73,13 @@ public class NettyTransportSupport {
* Create a new SSLContext using the options specific in the given TransportSslOptions
* instance.
*
* @param options the configured options used to create the SSLContext.
* @param options
* the configured options used to create the SSLContext.
*
* @return a new SSLContext instance.
* @throws Exception if an error occurs while creating the context.
*
* @throws Exception
* if an error occurs while creating the context.
*/
public static SSLContext createSslContext(NettyTransportSslOptions options) throws Exception {
try {
@ -91,10 +102,15 @@ public class NettyTransportSupport {
* Create a new SSLEngine instance in client mode from the given SSLContext and
* TransportSslOptions instances.
*
* @param context the SSLContext to use when creating the engine.
* @param options the TransportSslOptions to use to configure the new SSLEngine.
* @param context
* the SSLContext to use when creating the engine.
* @param options
* the TransportSslOptions to use to configure the new SSLEngine.
*
* @return a new SSLEngine instance in client mode.
* @throws Exception if an error occurs while creating the new SSLEngine.
*
* @throws Exception
* if an error occurs while creating the new SSLEngine.
*/
public static SSLEngine createSslEngine(SSLContext context, NettyTransportSslOptions options) throws Exception {
return createSslEngine(null, context, options);
@ -104,15 +120,20 @@ public class NettyTransportSupport {
* Create a new SSLEngine instance in client mode from the given SSLContext and
* TransportSslOptions instances.
*
* @param remote the URI of the remote peer that will be used to initialize the engine, may be null if none should.
* @param context the SSLContext to use when creating the engine.
* @param options the TransportSslOptions to use to configure the new SSLEngine.
* @param remote
* the URI of the remote peer that will be used to initialize the engine, may be null
* if none should.
* @param context
* the SSLContext to use when creating the engine.
* @param options
* the TransportSslOptions to use to configure the new SSLEngine.
*
* @return a new SSLEngine instance in client mode.
* @throws Exception if an error occurs while creating the new SSLEngine.
*
* @throws Exception
* if an error occurs while creating the new SSLEngine.
*/
public static SSLEngine createSslEngine(URI remote,
SSLContext context,
NettyTransportSslOptions options) throws Exception {
public static SSLEngine createSslEngine(URI remote, SSLContext context, NettyTransportSslOptions options) throws Exception {
SSLEngine engine = null;
if (remote == null) {
engine = context.createSSLEngine();
@ -185,7 +206,7 @@ public class NettyTransportSupport {
private static TrustManager[] loadTrustManagers(NettyTransportSslOptions options) throws Exception {
if (options.isTrustAll()) {
return new TrustManager[]{createTrustAllTrustManager()};
return new TrustManager[] {createTrustAllTrustManager()};
}
if (options.getTrustStoreLocation() == null) {

View File

@ -18,73 +18,46 @@ package org.apache.activemq.transport.amqp.client.transport;
import java.io.IOException;
import java.net.URI;
import java.security.Principal;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.nio.charset.StandardCharsets;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPromise;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.FixedRecvByteBufAllocator;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.http.DefaultHttpHeaders;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpClientCodec;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame;
import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
import io.netty.handler.codec.http.websocketx.PingWebSocketFrame;
import io.netty.handler.codec.http.websocketx.PongWebSocketFrame;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketClientHandshaker;
import io.netty.handler.codec.http.websocketx.WebSocketClientHandshakerFactory;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketVersion;
import io.netty.handler.ssl.SslHandler;
import io.netty.util.CharsetUtil;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import org.apache.activemq.transport.amqp.client.util.IOExceptionSupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Transport for communicating over WebSockets
*/
public class NettyWSTransport implements NettyTransport {
public class NettyWSTransport extends NettyTcpTransport {
private static final Logger LOG = LoggerFactory.getLogger(NettyWSTransport.class);
private static final int QUIET_PERIOD = 20;
private static final int SHUTDOWN_TIMEOUT = 100;
protected Bootstrap bootstrap;
protected EventLoopGroup group;
protected Channel channel;
protected NettyTransportListener listener;
protected NettyTransportOptions options;
protected final URI remote;
protected boolean secure;
private final AtomicBoolean connected = new AtomicBoolean();
private final AtomicBoolean closed = new AtomicBoolean();
private ChannelPromise handshakeFuture;
private IOException failureCause;
private Throwable pendingFailure;
private static final String AMQP_SUB_PROTOCOL = "amqp";
/**
* Create a new transport instance
*
* @param remoteLocation the URI that defines the remote resource to connect to.
* @param options the transport options used to configure the socket connection.
* @param remoteLocation
* the URI that defines the remote resource to connect to.
* @param options
* the transport options used to configure the socket connection.
*/
public NettyWSTransport(URI remoteLocation, NettyTransportOptions options) {
this(null, remoteLocation, options);
@ -93,119 +66,15 @@ public class NettyWSTransport implements NettyTransport {
/**
* Create a new transport instance
*
* @param listener the TransportListener that will receive events from this Transport.
* @param remoteLocation the URI that defines the remote resource to connect to.
* @param options the transport options used to configure the socket connection.
* @param listener
* the TransportListener that will receive events from this Transport.
* @param remoteLocation
* the URI that defines the remote resource to connect to.
* @param options
* the transport options used to configure the socket connection.
*/
public NettyWSTransport(NettyTransportListener listener, URI remoteLocation, NettyTransportOptions options) {
this.options = options;
this.listener = listener;
this.remote = remoteLocation;
this.secure = remoteLocation.getScheme().equalsIgnoreCase("wss");
}
@Override
public void connect() throws IOException {
if (listener == null) {
throw new IllegalStateException("A transport listener must be set before connection attempts.");
}
group = new NioEventLoopGroup(1);
bootstrap = new Bootstrap();
bootstrap.group(group);
bootstrap.channel(NioSocketChannel.class);
bootstrap.handler(new ChannelInitializer<Channel>() {
@Override
public void initChannel(Channel connectedChannel) throws Exception {
configureChannel(connectedChannel);
}
});
configureNetty(bootstrap, getTransportOptions());
ChannelFuture future;
try {
future = bootstrap.connect(getRemoteHost(), getRemotePort());
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
handleConnected(future.channel());
} else if (future.isCancelled()) {
connectionFailed(future.channel(), new IOException("Connection attempt was cancelled"));
} else {
connectionFailed(future.channel(), IOExceptionSupport.create(future.cause()));
}
}
});
future.sync();
// Now wait for WS protocol level handshake completion
handshakeFuture.await();
} catch (InterruptedException ex) {
LOG.debug("Transport connection attempt was interrupted.");
Thread.interrupted();
failureCause = IOExceptionSupport.create(ex);
}
if (failureCause != null) {
// Close out any Netty resources now as they are no longer needed.
if (channel != null) {
channel.close().syncUninterruptibly();
channel = null;
}
if (group != null) {
group.shutdownGracefully(QUIET_PERIOD, SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS);
group = null;
}
throw failureCause;
} else {
// Connected, allow any held async error to fire now and close the transport.
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (pendingFailure != null) {
channel.pipeline().fireExceptionCaught(pendingFailure);
}
}
});
}
}
@Override
public boolean isConnected() {
return connected.get();
}
@Override
public boolean isSSL() {
return secure;
}
@Override
public void close() throws IOException {
if (closed.compareAndSet(false, true)) {
connected.set(false);
if (channel != null) {
channel.close().syncUninterruptibly();
}
if (group != null) {
group.shutdownGracefully(QUIET_PERIOD, SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS);
}
}
}
@Override
public ByteBuf allocateSendBuffer(int size) throws IOException {
checkConnected();
return channel.alloc().ioBuffer(size, size);
super(listener, remoteLocation, options);
}
@Override
@ -222,202 +91,37 @@ public class NettyWSTransport implements NettyTransport {
}
@Override
public NettyTransportListener getTransportListener() {
return listener;
protected ChannelInboundHandlerAdapter createChannelHandler() {
return new NettyWebSocketTransportHandler();
}
@Override
public void setTransportListener(NettyTransportListener listener) {
this.listener = listener;
protected void addAdditionalHandlers(ChannelPipeline pipeline) {
pipeline.addLast(new HttpClientCodec());
pipeline.addLast(new HttpObjectAggregator(8192));
}
@Override
public NettyTransportOptions getTransportOptions() {
if (options == null) {
if (isSSL()) {
options = NettyTransportSslOptions.INSTANCE;
} else {
options = NettyTransportOptions.INSTANCE;
}
}
return options;
protected void handleConnected(Channel channel) throws Exception {
LOG.trace("Channel has become active, awaiting WebSocket handshake! Channel is {}", channel);
}
@Override
public URI getRemoteLocation() {
return remote;
}
// ----- Handle connection events -----------------------------------------//
@Override
public Principal getLocalPrincipal() {
if (!isSSL()) {
throw new UnsupportedOperationException("Not connected to a secure channel");
}
SslHandler sslHandler = channel.pipeline().get(SslHandler.class);
return sslHandler.engine().getSession().getLocalPrincipal();
}
//----- Internal implementation details, can be overridden as needed --//
protected String getRemoteHost() {
return remote.getHost();
}
protected int getRemotePort() {
int port = remote.getPort();
if (port <= 0) {
if (isSSL()) {
port = getSslOptions().getDefaultSslPort();
} else {
port = getTransportOptions().getDefaultTcpPort();
}
}
return port;
}
protected void configureNetty(Bootstrap bootstrap, NettyTransportOptions options) {
bootstrap.option(ChannelOption.TCP_NODELAY, options.isTcpNoDelay());
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, options.getConnectTimeout());
bootstrap.option(ChannelOption.SO_KEEPALIVE, options.isTcpKeepAlive());
bootstrap.option(ChannelOption.SO_LINGER, options.getSoLinger());
if (options.getSendBufferSize() != -1) {
bootstrap.option(ChannelOption.SO_SNDBUF, options.getSendBufferSize());
}
if (options.getReceiveBufferSize() != -1) {
bootstrap.option(ChannelOption.SO_RCVBUF, options.getReceiveBufferSize());
bootstrap.option(ChannelOption.RCVBUF_ALLOCATOR, new FixedRecvByteBufAllocator(options.getReceiveBufferSize()));
}
if (options.getTrafficClass() != -1) {
bootstrap.option(ChannelOption.IP_TOS, options.getTrafficClass());
}
}
protected void configureChannel(final Channel channel) throws Exception {
if (isSSL()) {
SslHandler sslHandler = NettyTransportSupport.createSslHandler(getRemoteLocation(), getSslOptions());
sslHandler.handshakeFuture().addListener(new GenericFutureListener<Future<Channel>>() {
@Override
public void operationComplete(Future<Channel> future) throws Exception {
if (future.isSuccess()) {
LOG.trace("SSL Handshake has completed: {}", channel);
connectionEstablished(channel);
} else {
LOG.trace("SSL Handshake has failed: {}", channel);
connectionFailed(channel, IOExceptionSupport.create(future.cause()));
}
}
});
channel.pipeline().addLast(sslHandler);
}
channel.pipeline().addLast(new HttpClientCodec());
channel.pipeline().addLast(new HttpObjectAggregator(8192));
channel.pipeline().addLast(new NettyTcpTransportHandler());
}
protected void handleConnected(final Channel channel) throws Exception {
if (!isSSL()) {
connectionEstablished(channel);
}
}
//----- State change handlers and checks ---------------------------------//
/**
* Called when the transport has successfully connected and is ready for use.
*/
protected void connectionEstablished(Channel connectedChannel) {
LOG.info("WebSocket connectionEstablished! {}", connectedChannel);
channel = connectedChannel;
connected.set(true);
}
/**
* Called when the transport connection failed and an error should be returned.
*
* @param failedChannel The Channel instance that failed.
* @param cause An IOException that describes the cause of the failed connection.
*/
protected void connectionFailed(Channel failedChannel, IOException cause) {
failureCause = IOExceptionSupport.create(cause);
channel = failedChannel;
connected.set(false);
handshakeFuture.setFailure(cause);
}
private NettyTransportSslOptions getSslOptions() {
return (NettyTransportSslOptions) getTransportOptions();
}
private void checkConnected() throws IOException {
if (!connected.get()) {
throw new IOException("Cannot send to a non-connected transport.");
}
}
//----- Handle connection events -----------------------------------------//
private class NettyTcpTransportHandler extends SimpleChannelInboundHandler<Object> {
private class NettyWebSocketTransportHandler extends NettyDefaultHandler<Object> {
private final WebSocketClientHandshaker handshaker;
NettyTcpTransportHandler() {
handshaker = WebSocketClientHandshakerFactory.newHandshaker(remote, WebSocketVersion.V13, "amqp", false, new DefaultHttpHeaders());
}
@Override
public void handlerAdded(ChannelHandlerContext context) {
LOG.trace("Handler has become added! Channel is {}", context.channel());
handshakeFuture = context.newPromise();
NettyWebSocketTransportHandler() {
handshaker = WebSocketClientHandshakerFactory.newHandshaker(getRemoteLocation(), WebSocketVersion.V13, AMQP_SUB_PROTOCOL, true,
new DefaultHttpHeaders());
}
@Override
public void channelActive(ChannelHandlerContext context) throws Exception {
LOG.trace("Channel has become active! Channel is {}", context.channel());
handshaker.handshake(context.channel());
}
@Override
public void channelInactive(ChannelHandlerContext context) throws Exception {
LOG.trace("Channel has gone inactive! Channel is {}", context.channel());
if (connected.compareAndSet(true, false) && !closed.get()) {
LOG.trace("Firing onTransportClosed listener");
listener.onTransportClosed();
}
}
@Override
public void exceptionCaught(ChannelHandlerContext context, Throwable cause) throws Exception {
LOG.trace("Exception on channel! Channel is {} -> {}", context.channel(), cause.getMessage());
LOG.trace("Error Stack: ", cause);
if (connected.compareAndSet(true, false) && !closed.get()) {
LOG.trace("Firing onTransportError listener");
if (pendingFailure != null) {
listener.onTransportError(pendingFailure);
} else {
listener.onTransportError(cause);
}
} else {
// Hold the first failure for later dispatch if connect succeeds.
// This will then trigger disconnect using the first error reported.
if (pendingFailure != null) {
LOG.trace("Holding error until connect succeeds: {}", cause.getMessage());
pendingFailure = cause;
}
if (!handshakeFuture.isDone()) {
handshakeFuture.setFailure(cause);
}
}
super.channelActive(context);
}
@Override
@ -427,16 +131,17 @@ public class NettyWSTransport implements NettyTransport {
Channel ch = ctx.channel();
if (!handshaker.isHandshakeComplete()) {
handshaker.finishHandshake(ch, (FullHttpResponse) message);
LOG.info("WebSocket Client connected! {}", ctx.channel());
handshakeFuture.setSuccess();
LOG.trace("WebSocket Client connected! {}", ctx.channel());
// Now trigger super processing as we are really connected.
NettyWSTransport.super.handleConnected(ch);
return;
}
// We shouldn't get this since we handle the handshake previously.
if (message instanceof FullHttpResponse) {
FullHttpResponse response = (FullHttpResponse) message;
throw new IllegalStateException("Unexpected FullHttpResponse (getStatus=" + response.getStatus() +
", content=" + response.content().toString(CharsetUtil.UTF_8) + ')');
throw new IllegalStateException(
"Unexpected FullHttpResponse (getStatus=" + response.status() + ", content=" + response.content().toString(StandardCharsets.UTF_8) + ')');
}
WebSocketFrame frame = (WebSocketFrame) message;
@ -446,10 +151,11 @@ public class NettyWSTransport implements NettyTransport {
ctx.fireExceptionCaught(new IOException("Received invalid frame over WebSocket."));
} else if (frame instanceof BinaryWebSocketFrame) {
BinaryWebSocketFrame binaryFrame = (BinaryWebSocketFrame) frame;
LOG.info("WebSocket Client received data: {} bytes", binaryFrame.content().readableBytes());
LOG.trace("WebSocket Client received data: {} bytes", binaryFrame.content().readableBytes());
listener.onData(binaryFrame.content());
} else if (frame instanceof PongWebSocketFrame) {
LOG.trace("WebSocket Client received pong");
} else if (frame instanceof PingWebSocketFrame) {
LOG.trace("WebSocket Client received ping, response with pong");
ch.write(new PongWebSocketFrame(frame.content()));
} else if (frame instanceof CloseWebSocketFrame) {
LOG.trace("WebSocket Client received closing");
ch.close();

View File

@ -1,202 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.amqp.client.util;
import java.util.EnumSet;
import java.util.Map;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.engine.Collector;
import org.apache.qpid.proton.engine.Connection;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.EndpointState;
import org.apache.qpid.proton.engine.Link;
import org.apache.qpid.proton.engine.Record;
import org.apache.qpid.proton.engine.Session;
import org.apache.qpid.proton.engine.Transport;
import org.apache.qpid.proton.reactor.Reactor;
/**
* Unmodifiable Connection wrapper used to prevent test code from accidentally
* modifying Connection state.
*/
public class UnmodifiableConnection implements Connection {
private final Connection connection;
public UnmodifiableConnection(Connection connection) {
this.connection = connection;
}
@Override
public EndpointState getLocalState() {
return connection.getLocalState();
}
@Override
public EndpointState getRemoteState() {
return connection.getRemoteState();
}
@Override
public ErrorCondition getCondition() {
return connection.getCondition();
}
@Override
public void setCondition(ErrorCondition condition) {
throw new UnsupportedOperationException("Cannot alter the Connection");
}
@Override
public ErrorCondition getRemoteCondition() {
return connection.getRemoteCondition();
}
@Override
public void free() {
throw new UnsupportedOperationException("Cannot alter the Connection");
}
@Override
public void open() {
throw new UnsupportedOperationException("Cannot alter the Connection");
}
@Override
public void close() {
throw new UnsupportedOperationException("Cannot alter the Connection");
}
@Override
public Session session() {
throw new UnsupportedOperationException("Cannot alter the Connection");
}
@Override
public Session sessionHead(EnumSet<EndpointState> local, EnumSet<EndpointState> remote) {
Session head = connection.sessionHead(local, remote);
if (head != null) {
head = new UnmodifiableSession(head);
}
return head;
}
@Override
public Link linkHead(EnumSet<EndpointState> local, EnumSet<EndpointState> remote) {
// TODO - If implemented this method should return an unmodifiable link instance.
return null;
}
@Override
public Delivery getWorkHead() {
// TODO - If implemented this method should return an unmodifiable delivery instance.
return null;
}
@Override
public void setContainer(String container) {
throw new UnsupportedOperationException("Cannot alter the Connection");
}
@Override
public void setHostname(String hostname) {
throw new UnsupportedOperationException("Cannot alter the Connection");
}
@Override
public String getHostname() {
return connection.getHostname();
}
@Override
public String getRemoteContainer() {
return connection.getRemoteContainer();
}
@Override
public String getRemoteHostname() {
return connection.getRemoteHostname();
}
@Override
public void setOfferedCapabilities(Symbol[] capabilities) {
throw new UnsupportedOperationException("Cannot alter the Connection");
}
@Override
public void setDesiredCapabilities(Symbol[] capabilities) {
throw new UnsupportedOperationException("Cannot alter the Connection");
}
@Override
public Symbol[] getRemoteOfferedCapabilities() {
return connection.getRemoteOfferedCapabilities();
}
@Override
public Symbol[] getRemoteDesiredCapabilities() {
return connection.getRemoteDesiredCapabilities();
}
@Override
public Map<Symbol, Object> getRemoteProperties() {
return connection.getRemoteProperties();
}
@Override
public void setProperties(Map<Symbol, Object> properties) {
throw new UnsupportedOperationException("Cannot alter the Connection");
}
@Override
public Object getContext() {
return connection.getContext();
}
@Override
public void setContext(Object context) {
throw new UnsupportedOperationException("Cannot alter the Connection");
}
@Override
public void collect(Collector collector) {
throw new UnsupportedOperationException("Cannot alter the Connection");
}
@Override
public String getContainer() {
return connection.getContainer();
}
@Override
public Transport getTransport() {
return new UnmodifiableTransport(connection.getTransport());
}
@Override
public Record attachments() {
return connection.attachments();
}
@Override
public Reactor getReactor() {
return connection.getReactor();
}
}

View File

@ -1,179 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.amqp.client.util;
import org.apache.qpid.proton.amqp.transport.DeliveryState;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.Link;
import org.apache.qpid.proton.engine.Receiver;
import org.apache.qpid.proton.engine.Record;
import org.apache.qpid.proton.engine.Sender;
/**
* Unmodifiable Delivery wrapper used to prevent test code from accidentally
* modifying Delivery state.
*/
public class UnmodifiableDelivery implements Delivery {
private final Delivery delivery;
public UnmodifiableDelivery(Delivery delivery) {
this.delivery = delivery;
}
@Override
public byte[] getTag() {
return delivery.getTag();
}
@Override
public Link getLink() {
if (delivery.getLink() instanceof Sender) {
return new UnmodifiableSender((Sender) delivery.getLink());
} else if (delivery.getLink() instanceof Receiver) {
return new UnmodifiableReceiver((Receiver) delivery.getLink());
} else {
throw new IllegalStateException("Delivery has unknown link type");
}
}
/* waiting Pull Request sent
@Override
public int getDataLength() {
return delivery.getDataLength();
} */
@Override
public int available() {
return delivery.available();
}
@Override
public DeliveryState getLocalState() {
return delivery.getLocalState();
}
@Override
public DeliveryState getRemoteState() {
return delivery.getRemoteState();
}
@Override
public int getMessageFormat() {
return delivery.getMessageFormat();
}
@Override
public void disposition(DeliveryState state) {
throw new UnsupportedOperationException("Cannot alter the Delivery state");
}
@Override
public void settle() {
throw new UnsupportedOperationException("Cannot alter the Delivery state");
}
@Override
public boolean isSettled() {
return delivery.isSettled();
}
@Override
public boolean remotelySettled() {
return delivery.remotelySettled();
}
@Override
public void free() {
throw new UnsupportedOperationException("Cannot alter the Delivery state");
}
@Override
public Delivery getWorkNext() {
return new UnmodifiableDelivery(delivery.getWorkNext());
}
@Override
public Delivery next() {
return new UnmodifiableDelivery(delivery.next());
}
@Override
public boolean isWritable() {
return delivery.isWritable();
}
@Override
public boolean isReadable() {
return delivery.isReadable();
}
@Override
public void setContext(Object o) {
throw new UnsupportedOperationException("Cannot alter the Delivery state");
}
@Override
public Object getContext() {
return delivery.getContext();
}
@Override
public boolean isUpdated() {
return delivery.isUpdated();
}
@Override
public void clear() {
throw new UnsupportedOperationException("Cannot alter the Delivery state");
}
@Override
public boolean isPartial() {
return delivery.isPartial();
}
@Override
public int pending() {
return delivery.pending();
}
@Override
public boolean isBuffered() {
return delivery.isBuffered();
}
@Override
public Record attachments() {
return delivery.attachments();
}
@Override
public DeliveryState getDefaultDeliveryState() {
return delivery.getDefaultDeliveryState();
}
@Override
public void setDefaultDeliveryState(DeliveryState state) {
throw new UnsupportedOperationException("Cannot alter the Delivery");
}
@Override
public void setMessageFormat(int messageFormat) {
throw new UnsupportedOperationException("Cannot alter the Delivery");
}
}

View File

@ -1,306 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.amqp.client.util;
import java.util.EnumSet;
import java.util.Map;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
import org.apache.qpid.proton.amqp.transport.Source;
import org.apache.qpid.proton.amqp.transport.Target;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.EndpointState;
import org.apache.qpid.proton.engine.Link;
import org.apache.qpid.proton.engine.Receiver;
import org.apache.qpid.proton.engine.Record;
import org.apache.qpid.proton.engine.Sender;
import org.apache.qpid.proton.engine.Session;
/**
* Unmodifiable Session wrapper used to prevent test code from accidentally
* modifying Session state.
*/
public class UnmodifiableLink implements Link {
private final Link link;
public UnmodifiableLink(Link link) {
this.link = link;
}
@Override
public EndpointState getLocalState() {
return link.getLocalState();
}
@Override
public EndpointState getRemoteState() {
return link.getRemoteState();
}
@Override
public ErrorCondition getCondition() {
return link.getCondition();
}
@Override
public void setCondition(ErrorCondition condition) {
throw new UnsupportedOperationException("Cannot alter the Link state");
}
@Override
public ErrorCondition getRemoteCondition() {
return link.getRemoteCondition();
}
@Override
public void free() {
throw new UnsupportedOperationException("Cannot alter the Link state");
}
@Override
public void open() {
throw new UnsupportedOperationException("Cannot alter the Link state");
}
@Override
public void close() {
throw new UnsupportedOperationException("Cannot alter the Link state");
}
@Override
public void setContext(Object o) {
throw new UnsupportedOperationException("Cannot alter the Link state");
}
@Override
public Object getContext() {
return link.getContext();
}
@Override
public String getName() {
return link.getName();
}
@Override
public Delivery delivery(byte[] tag) {
throw new UnsupportedOperationException("Cannot alter the Link state");
}
@Override
public Delivery delivery(byte[] tag, int offset, int length) {
throw new UnsupportedOperationException("Cannot alter the Link state");
}
@Override
public Delivery head() {
return new UnmodifiableDelivery(link.head());
}
@Override
public Delivery current() {
return new UnmodifiableDelivery(link.current());
}
@Override
public boolean advance() {
throw new UnsupportedOperationException("Cannot alter the Link state");
}
@Override
public Source getSource() {
// TODO Figure out a simple way to wrap the odd Source types in Proton-J
return link.getSource();
}
@Override
public Target getTarget() {
// TODO Figure out a simple way to wrap the odd Source types in Proton-J
return link.getTarget();
}
@Override
public void setSource(Source address) {
throw new UnsupportedOperationException("Cannot alter the Link state");
}
@Override
public void setTarget(Target address) {
throw new UnsupportedOperationException("Cannot alter the Link state");
}
@Override
public Source getRemoteSource() {
// TODO Figure out a simple way to wrap the odd Source types in Proton-J
return link.getRemoteSource();
}
@Override
public Target getRemoteTarget() {
// TODO Figure out a simple way to wrap the odd Target types in Proton-J
return link.getRemoteTarget();
}
@Override
public Link next(EnumSet<EndpointState> local, EnumSet<EndpointState> remote) {
Link next = link.next(local, remote);
if (next != null) {
if (next instanceof Sender) {
next = new UnmodifiableSender((Sender) next);
} else {
next = new UnmodifiableReceiver((Receiver) next);
}
}
return next;
}
@Override
public int getCredit() {
return link.getCredit();
}
@Override
public int getQueued() {
return link.getQueued();
}
@Override
public int getUnsettled() {
return link.getUnsettled();
}
@Override
public Session getSession() {
return new UnmodifiableSession(link.getSession());
}
@Override
public SenderSettleMode getSenderSettleMode() {
return link.getSenderSettleMode();
}
@Override
public void setSenderSettleMode(SenderSettleMode senderSettleMode) {
throw new UnsupportedOperationException("Cannot alter the Link state");
}
@Override
public SenderSettleMode getRemoteSenderSettleMode() {
return link.getRemoteSenderSettleMode();
}
@Override
public ReceiverSettleMode getReceiverSettleMode() {
return link.getReceiverSettleMode();
}
@Override
public void setReceiverSettleMode(ReceiverSettleMode receiverSettleMode) {
throw new UnsupportedOperationException("Cannot alter the Link state");
}
@Override
public ReceiverSettleMode getRemoteReceiverSettleMode() {
return link.getRemoteReceiverSettleMode();
}
@Override
public void setRemoteSenderSettleMode(SenderSettleMode remoteSenderSettleMode) {
throw new UnsupportedOperationException("Cannot alter the Link state");
}
@Override
public int drained() {
return link.drained(); // TODO - Is this a mutating call?
}
@Override
public int getRemoteCredit() {
return link.getRemoteCredit();
}
@Override
public boolean getDrain() {
return link.getDrain();
}
@Override
public void detach() {
throw new UnsupportedOperationException("Cannot alter the Link state");
}
@Override
public boolean detached() {
return link.detached();
}
@Override
public Record attachments() {
return link.attachments();
}
@Override
public Map<Symbol, Object> getProperties() {
return link.getProperties();
}
@Override
public void setProperties(Map<Symbol, Object> properties) {
throw new UnsupportedOperationException("Cannot alter the Link state");
}
@Override
public Map<Symbol, Object> getRemoteProperties() {
return link.getRemoteProperties();
}
@Override
public Symbol[] getDesiredCapabilities() {
return link.getDesiredCapabilities();
}
@Override
public Symbol[] getOfferedCapabilities() {
return link.getOfferedCapabilities();
}
@Override
public Symbol[] getRemoteDesiredCapabilities() {
return link.getRemoteDesiredCapabilities();
}
@Override
public Symbol[] getRemoteOfferedCapabilities() {
return link.getRemoteOfferedCapabilities();
}
@Override
public void setDesiredCapabilities(Symbol[] capabilities) {
throw new UnsupportedOperationException("Cannot alter the Link state");
}
@Override
public void setOfferedCapabilities(Symbol[] capabilities) {
throw new UnsupportedOperationException("Cannot alter the Link state");
}
}

View File

@ -0,0 +1,167 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.amqp.client.util;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Map;
import org.apache.qpid.proton.engine.Connection;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.Link;
import org.apache.qpid.proton.engine.Receiver;
import org.apache.qpid.proton.engine.Sasl;
import org.apache.qpid.proton.engine.Sender;
import org.apache.qpid.proton.engine.Session;
import org.apache.qpid.proton.engine.Transport;
/**
* Utility that creates proxy objects for the Proton objects which won't allow any mutating
* operations to be applied so that the test code does not interact with the proton engine
* outside the client serialization thread.
*/
public final class UnmodifiableProxy {
private static ArrayList<String> blacklist = new ArrayList<>();
// These methods are mutating but don't take an arguments so they
// aren't automatically filtered out. We will have to keep an eye
// on proton API in the future and modify this list as it evolves.
static {
blacklist.add("close");
blacklist.add("free");
blacklist.add("open");
blacklist.add("sasl");
blacklist.add("session");
blacklist.add("close_head");
blacklist.add("close_tail");
blacklist.add("outputConsumed");
blacklist.add("process");
blacklist.add("processInput");
blacklist.add("unbind");
blacklist.add("settle");
blacklist.add("clear");
blacklist.add("detach");
blacklist.add("abort");
}
private UnmodifiableProxy() {
}
public static Transport transportProxy(final Transport target) {
Transport wrap = wrap(Transport.class, target);
return wrap;
}
public static Sasl saslProxy(final Sasl target) {
return wrap(Sasl.class, target);
}
public static Connection connectionProxy(final Connection target) {
return wrap(Connection.class, target);
}
public static Session sessionProxy(final Session target) {
return wrap(Session.class, target);
}
public static Delivery deliveryProxy(final Delivery target) {
return wrap(Delivery.class, target);
}
public static Link linkProxy(final Link target) {
return wrap(Link.class, target);
}
public static Receiver receiverProxy(final Receiver target) {
return wrap(Receiver.class, target);
}
public static Sender senderProxy(final Sender target) {
return wrap(Sender.class, target);
}
private static boolean isProtonType(Class<?> clazz) {
String packageName = clazz.getPackage().getName();
if (packageName.startsWith("org.apache.qpid.proton.")) {
return true;
}
return false;
}
private static <T> T wrap(Class<T> type, final Object target) {
return type.cast(java.lang.reflect.Proxy.newProxyInstance(type.getClassLoader(), new Class[] {type}, new InvocationHandler() {
@Override
public Object invoke(Object o, Method method, Object[] objects) throws Throwable {
if ("toString".equals(method.getName()) && method.getParameterTypes().length == 0) {
return "Unmodifiable proxy -> (" + method.invoke(target, objects) + ")";
}
// Don't let methods that mutate be invoked.
if (method.getParameterTypes().length > 0) {
throw new UnsupportedOperationException("Cannot mutate outside the Client work thread");
}
if (blacklist.contains(method.getName())) {
throw new UnsupportedOperationException("Cannot mutate outside the Client work thread");
}
Class<?> returnType = method.getReturnType();
try {
Object result = method.invoke(target, objects);
if (result == null) {
return null;
}
if (returnType.isPrimitive() || returnType.isArray() || Object.class.equals(returnType)) {
// Skip any other checks
} else if (returnType.isAssignableFrom(ByteBuffer.class)) {
// Buffers are modifiable but we can just return null to indicate
// there's nothing there to access.
result = null;
} else if (returnType.isAssignableFrom(Map.class)) {
// Prevent return of modifiable maps
result = Collections.unmodifiableMap((Map<?, ?>) result);
} else if (isProtonType(returnType) && returnType.isInterface()) {
// Can't handle the crazy Source / Target types yet as there's two
// different types for Source and Target the result can't be cast to
// the one people actually want to use.
if (!returnType.getName().equals("org.apache.qpid.proton.amqp.transport.Source")
&& !returnType.getName().equals("org.apache.qpid.proton.amqp.messaging.Source")
&& !returnType.getName().equals("org.apache.qpid.proton.amqp.transport.Target")
&& !returnType.getName().equals("org.apache.qpid.proton.amqp.messaging.Target")) {
result = wrap(returnType, result);
}
}
return result;
} catch (InvocationTargetException e) {
throw e.getTargetException();
}
}
}));
}
}

View File

@ -1,65 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.amqp.client.util;
import org.apache.qpid.proton.codec.WritableBuffer;
import org.apache.qpid.proton.engine.Receiver;
/**
* Unmodifiable Receiver wrapper used to prevent test code from accidentally
* modifying Receiver state.
*/
public class UnmodifiableReceiver extends UnmodifiableLink implements Receiver {
private final Receiver receiver;
public UnmodifiableReceiver(Receiver receiver) {
super(receiver);
this.receiver = receiver;
}
@Override
public void flow(int credits) {
throw new UnsupportedOperationException("Cannot alter the Link state");
}
@Override
public int recv(byte[] bytes, int offset, int size) {
throw new UnsupportedOperationException("Cannot alter the Link state");
}
@Override
public int recv(WritableBuffer buffer) {
throw new UnsupportedOperationException("Cannot alter the Link state");
}
@Override
public void drain(int credit) {
throw new UnsupportedOperationException("Cannot alter the Link state");
}
@Override
public boolean draining() {
return receiver.draining();
}
@Override
public void setDrain(boolean drain) {
throw new UnsupportedOperationException("Cannot alter the Link state");
}
}

View File

@ -1,51 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.amqp.client.util;
import org.apache.qpid.proton.codec.ReadableBuffer;
import org.apache.qpid.proton.engine.Sender;
/**
* Unmodifiable Sender wrapper used to prevent test code from accidentally
* modifying Sender state.
*/
public class UnmodifiableSender extends UnmodifiableLink implements Sender {
public UnmodifiableSender(Sender sender) {
super(sender);
}
@Override
public void offer(int credits) {
throw new UnsupportedOperationException("Cannot alter the Link state");
}
@Override
public int send(byte[] bytes, int offset, int length) {
throw new UnsupportedOperationException("Cannot alter the Link state");
}
@Override
public int send(ReadableBuffer buffer) {
throw new UnsupportedOperationException("Cannot alter the Link state");
}
@Override
public void abort() {
throw new UnsupportedOperationException("Cannot alter the Link state");
}
}

View File

@ -1,198 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.amqp.client.util;
import java.util.EnumSet;
import java.util.Map;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.engine.Connection;
import org.apache.qpid.proton.engine.EndpointState;
import org.apache.qpid.proton.engine.Receiver;
import org.apache.qpid.proton.engine.Record;
import org.apache.qpid.proton.engine.Sender;
import org.apache.qpid.proton.engine.Session;
/**
* Unmodifiable Session wrapper used to prevent test code from accidentally
* modifying Session state.
*/
public class UnmodifiableSession implements Session {
private final Session session;
public UnmodifiableSession(Session session) {
this.session = session;
}
@Override
public EndpointState getLocalState() {
return session.getLocalState();
}
@Override
public EndpointState getRemoteState() {
return session.getRemoteState();
}
@Override
public ErrorCondition getCondition() {
return session.getCondition();
}
@Override
public void setCondition(ErrorCondition condition) {
throw new UnsupportedOperationException("Cannot alter the Session");
}
@Override
public ErrorCondition getRemoteCondition() {
return session.getRemoteCondition();
}
@Override
public void free() {
throw new UnsupportedOperationException("Cannot alter the Session");
}
@Override
public void open() {
throw new UnsupportedOperationException("Cannot alter the Session");
}
@Override
public void close() {
throw new UnsupportedOperationException("Cannot alter the Session");
}
@Override
public void setContext(Object o) {
throw new UnsupportedOperationException("Cannot alter the Session");
}
@Override
public Object getContext() {
return session.getContext();
}
@Override
public Sender sender(String name) {
throw new UnsupportedOperationException("Cannot alter the Session");
}
@Override
public Receiver receiver(String name) {
throw new UnsupportedOperationException("Cannot alter the Session");
}
@Override
public Session next(EnumSet<EndpointState> local, EnumSet<EndpointState> remote) {
Session next = session.next(local, remote);
if (next != null) {
next = new UnmodifiableSession(next);
}
return next;
}
@Override
public Connection getConnection() {
return new UnmodifiableConnection(session.getConnection());
}
@Override
public int getIncomingCapacity() {
return session.getIncomingCapacity();
}
@Override
public void setIncomingCapacity(int bytes) {
throw new UnsupportedOperationException("Cannot alter the Session");
}
@Override
public int getIncomingBytes() {
return session.getIncomingBytes();
}
@Override
public int getOutgoingBytes() {
return session.getOutgoingBytes();
}
@Override
public Record attachments() {
return session.attachments();
}
@Override
public long getOutgoingWindow() {
return session.getOutgoingWindow();
}
@Override
public void setOutgoingWindow(long outgoingWindowSize) {
throw new UnsupportedOperationException("Cannot alter the Session");
}
@Override
public Symbol[] getDesiredCapabilities() {
return session.getDesiredCapabilities();
}
@Override
public Symbol[] getOfferedCapabilities() {
return session.getOfferedCapabilities();
}
@Override
public Map<Symbol, Object> getProperties() {
return session.getProperties();
}
@Override
public Symbol[] getRemoteDesiredCapabilities() {
return session.getRemoteDesiredCapabilities();
}
@Override
public Symbol[] getRemoteOfferedCapabilities() {
return session.getRemoteOfferedCapabilities();
}
@Override
public Map<Symbol, Object> getRemoteProperties() {
return session.getRemoteProperties();
}
@Override
public void setDesiredCapabilities(Symbol[] capabilities) {
throw new UnsupportedOperationException("Cannot alter the Link state");
}
@Override
public void setOfferedCapabilities(Symbol[] capabilities) {
throw new UnsupportedOperationException("Cannot alter the Link state");
}
@Override
public void setProperties(Map<Symbol, Object> capabilities) {
throw new UnsupportedOperationException("Cannot alter the Link state");
}
}

View File

@ -1,274 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.amqp.client.util;
import java.nio.ByteBuffer;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.engine.Connection;
import org.apache.qpid.proton.engine.EndpointState;
import org.apache.qpid.proton.engine.Record;
import org.apache.qpid.proton.engine.Sasl;
import org.apache.qpid.proton.engine.Ssl;
import org.apache.qpid.proton.engine.SslDomain;
import org.apache.qpid.proton.engine.SslPeerDetails;
import org.apache.qpid.proton.engine.Transport;
import org.apache.qpid.proton.engine.TransportException;
import org.apache.qpid.proton.engine.TransportResult;
/**
* Unmodifiable Transport wrapper used to prevent test code from accidentally
* modifying Transport state.
*/
public class UnmodifiableTransport implements Transport {
private final Transport transport;
public UnmodifiableTransport(Transport transport) {
this.transport = transport;
}
@Override
public void close() {
throw new UnsupportedOperationException("Cannot alter the Transport");
}
@Override
public void free() {
throw new UnsupportedOperationException("Cannot alter the Transport");
}
@Override
public Object getContext() {
return null;
}
@Override
public EndpointState getLocalState() {
return transport.getLocalState();
}
@Override
public ErrorCondition getRemoteCondition() {
return transport.getRemoteCondition();
}
@Override
public EndpointState getRemoteState() {
return transport.getRemoteState();
}
@Override
public void open() {
throw new UnsupportedOperationException("Cannot alter the Transport");
}
@Override
public void setCondition(ErrorCondition arg0) {
throw new UnsupportedOperationException("Cannot alter the Transport");
}
@Override
public void setContext(Object arg0) {
throw new UnsupportedOperationException("Cannot alter the Transport");
}
@Override
public void bind(Connection arg0) {
throw new UnsupportedOperationException("Cannot alter the Transport");
}
@Override
public int capacity() {
return transport.capacity();
}
@Override
public void close_head() {
throw new UnsupportedOperationException("Cannot alter the Transport");
}
@Override
public void close_tail() {
throw new UnsupportedOperationException("Cannot alter the Transport");
}
@Override
public int getChannelMax() {
return transport.getChannelMax();
}
@Override
public ErrorCondition getCondition() {
return transport.getCondition();
}
@Override
public int getIdleTimeout() {
return transport.getIdleTimeout();
}
@Override
public ByteBuffer getInputBuffer() {
return null;
}
@Override
public int getMaxFrameSize() {
return transport.getMaxFrameSize();
}
@Override
public ByteBuffer getOutputBuffer() {
return null;
}
@Override
public int getRemoteChannelMax() {
return transport.getRemoteChannelMax();
}
@Override
public int getRemoteIdleTimeout() {
return transport.getRemoteIdleTimeout();
}
@Override
public int getRemoteMaxFrameSize() {
return transport.getRemoteMaxFrameSize();
}
@Override
public ByteBuffer head() {
return null;
}
@Override
public int input(byte[] arg0, int arg1, int arg2) {
throw new UnsupportedOperationException("Cannot alter the Transport");
}
@Override
public boolean isClosed() {
return transport.isClosed();
}
@Override
public int output(byte[] arg0, int arg1, int arg2) {
throw new UnsupportedOperationException("Cannot alter the Transport");
}
@Override
public void outputConsumed() {
throw new UnsupportedOperationException("Cannot alter the Transport");
}
@Override
public int pending() {
return transport.pending();
}
@Override
public void pop(int arg0) {
throw new UnsupportedOperationException("Cannot alter the Transport");
}
@Override
public void process() throws TransportException {
throw new UnsupportedOperationException("Cannot alter the Transport");
}
@Override
public TransportResult processInput() {
throw new UnsupportedOperationException("Cannot alter the Transport");
}
@Override
public Sasl sasl() throws IllegalStateException {
throw new UnsupportedOperationException("Cannot alter the Transport");
}
@Override
public void setChannelMax(int arg0) {
throw new UnsupportedOperationException("Cannot alter the Transport");
}
@Override
public void setIdleTimeout(int arg0) {
throw new UnsupportedOperationException("Cannot alter the Transport");
}
@Override
public void setMaxFrameSize(int arg0) {
throw new UnsupportedOperationException("Cannot alter the Transport");
}
@Override
public Ssl ssl(SslDomain arg0) {
throw new UnsupportedOperationException("Cannot alter the Transport");
}
@Override
public Ssl ssl(SslDomain arg0, SslPeerDetails arg1) {
throw new UnsupportedOperationException("Cannot alter the Transport");
}
@Override
public ByteBuffer tail() {
return null;
}
@Override
public long tick(long arg0) {
throw new UnsupportedOperationException("Cannot alter the Transport");
}
@Override
public void trace(int arg0) {
throw new UnsupportedOperationException("Cannot alter the Transport");
}
@Override
public void unbind() {
throw new UnsupportedOperationException("Cannot alter the Transport");
}
@Override
public Record attachments() {
return transport.attachments();
}
@Override
public long getFramesInput() {
return transport.getFramesInput();
}
@Override
public long getFramesOutput() {
return transport.getFramesOutput();
}
@Override
public void setEmitFlowEventOnSend(boolean emitFlowEventOnSend) {
throw new UnsupportedOperationException("Cannot alter the Transport");
}
@Override
public boolean isEmitFlowEventOnSend() {
return transport.isEmitFlowEventOnSend();
}
}