AMQ-6673 Add some fixes and improvements to the AMQP test client

Adds some thread safety fixes and Netty usage fixes to the transport as
well as adding a traceBytes option to trace the bytes sent / received
during testing.
This commit is contained in:
Timothy Bish 2017-05-11 15:01:16 -04:00
parent fddbac2b8a
commit 154ff81eee
3 changed files with 41 additions and 9 deletions

View File

@ -42,6 +42,7 @@ import io.netty.channel.FixedRecvByteBufAllocator;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.ssl.SslHandler;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
@ -53,7 +54,6 @@ public class NettyTcpTransport implements NettyTransport {
private static final Logger LOG = LoggerFactory.getLogger(NettyTcpTransport.class);
private static final int QUIET_PERIOD = 20;
private static final int SHUTDOWN_TIMEOUT = 100;
protected Bootstrap bootstrap;
@ -66,7 +66,7 @@ public class NettyTcpTransport implements NettyTransport {
private final AtomicBoolean connected = new AtomicBoolean();
private final AtomicBoolean closed = new AtomicBoolean();
private final CountDownLatch connectLatch = new CountDownLatch(1);
private IOException failureCause;
private volatile IOException failureCause;
/**
* Create a new transport instance
@ -163,7 +163,10 @@ public class NettyTcpTransport implements NettyTransport {
channel = null;
}
if (group != null) {
group.shutdownGracefully(QUIET_PERIOD, SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS);
Future<?> fut = group.shutdownGracefully(0, SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS);
if (!fut.awaitUninterruptibly(2 * SHUTDOWN_TIMEOUT)) {
LOG.trace("Channel group shutdown failed to complete in allotted time");
}
group = null;
}
@ -196,11 +199,17 @@ public class NettyTcpTransport implements NettyTransport {
public void close() throws IOException {
if (closed.compareAndSet(false, true)) {
connected.set(false);
try {
if (channel != null) {
channel.close().syncUninterruptibly();
}
} finally {
if (group != null) {
group.shutdownGracefully(QUIET_PERIOD, SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS);
Future<?> fut = group.shutdownGracefully(0, SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS);
if (!fut.awaitUninterruptibly(2 * SHUTDOWN_TIMEOUT)) {
LOG.trace("Channel group shutdown failed to complete in allotted time");
}
}
}
}
}
@ -371,6 +380,10 @@ public class NettyTcpTransport implements NettyTransport {
channel.pipeline().addLast(sslHandler);
}
if (getTransportOptions().isTraceBytes()) {
channel.pipeline().addLast("logger", new LoggingHandler(getClass()));
}
addAdditionalHandlers(channel.pipeline());
channel.pipeline().addLast(createChannelHandler());

View File

@ -30,6 +30,7 @@ public class NettyTransportOptions implements Cloneable {
public static final int DEFAULT_SO_TIMEOUT = -1;
public static final int DEFAULT_CONNECT_TIMEOUT = 60000;
public static final int DEFAULT_TCP_PORT = 5672;
public static final boolean DEFAULT_TRACE_BYTES = false;
public static final NettyTransportOptions INSTANCE = new NettyTransportOptions();
@ -42,6 +43,7 @@ public class NettyTransportOptions implements Cloneable {
private boolean tcpKeepAlive = DEFAULT_TCP_KEEP_ALIVE;
private boolean tcpNoDelay = DEFAULT_TCP_NO_DELAY;
private int defaultTcpPort = DEFAULT_TCP_PORT;
private boolean traceBytes = DEFAULT_TRACE_BYTES;
/**
* @return the currently set send buffer size in bytes.
@ -163,6 +165,23 @@ public class NettyTransportOptions implements Cloneable {
this.defaultTcpPort = defaultTcpPort;
}
/**
* @return true if the transport should enable byte tracing
*/
public boolean isTraceBytes() {
return traceBytes;
}
/**
* Determines if the transport should add a logger for bytes in / out
*
* @param traceBytes
* should the transport log the bytes in and out.
*/
public void setTraceBytes(boolean traceBytes) {
this.traceBytes = traceBytes;
}
public boolean isSSL() {
return false;
}

View File

@ -141,7 +141,7 @@ public class NettyWSTransport extends NettyTcpTransport {
if (message instanceof FullHttpResponse) {
FullHttpResponse response = (FullHttpResponse) message;
throw new IllegalStateException(
"Unexpected FullHttpResponse (getStatus=" + response.getStatus() +
"Unexpected FullHttpResponse (getStatus=" + response.status() +
", content=" + response.content().toString(StandardCharsets.UTF_8) + ')');
}