Allow the AMQP test client to also be configure to trace frames

The test client can allow for quick tracing of the frame traffic via a
call to setTraceFrames on the client or connection instance before
connection to the remote.  This allows for tests to easily switch on /
off tracing.  The log4j.properties is also updated to output frame
tracing with the URI option is put on the AMQP transport or the client
value is enabled.
This commit is contained in:
Timothy Bish 2016-09-08 16:19:37 -04:00
parent a038655605
commit 84cd815500
4 changed files with 56 additions and 3 deletions

View File

@ -21,8 +21,8 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.activemq.transport.amqp.client.transport.NettyTransportFactory;
import org.apache.activemq.transport.amqp.client.transport.NettyTransport;
import org.apache.activemq.transport.amqp.client.transport.NettyTransportFactory;
import org.apache.qpid.proton.amqp.Symbol;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -40,6 +40,7 @@ public class AmqpClient {
private final URI remoteURI;
private String authzid;
private String mechanismRestriction;
private boolean traceFrames;
private AmqpValidator stateInspector = new AmqpValidator();
private List<Symbol> offeredCapabilities = Collections.emptyList();
@ -103,6 +104,7 @@ public class AmqpClient {
connection.setOfferedCapabilities(getOfferedCapabilities());
connection.setOfferedProperties(getOfferedProperties());
connection.setStateInspector(getStateInspector());
connection.setTraceFrames(isTraceFrames());
return connection;
}
@ -218,6 +220,24 @@ public class AmqpClient {
this.stateInspector = stateInspector;
}
/**
* @return the traceFrames setting for the client, true indicates frame tracing is on.
*/
public boolean isTraceFrames() {
return traceFrames;
}
/**
* Controls whether connections created from this client object will log AMQP
* frames to a trace level logger or not.
*
* @param traceFrames
* configure the trace frames option for the client created connections.
*/
public void setTraceFrames(boolean traceFrames) {
this.traceFrames = traceFrames;
}
@Override
public String toString() {
return "AmqpClient: " + getRemoteURI().getHost() + ":" + getRemoteURI().getPort();

View File

@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@ -49,6 +49,9 @@ import org.apache.qpid.proton.engine.Event.Type;
import org.apache.qpid.proton.engine.Sasl;
import org.apache.qpid.proton.engine.Transport;
import org.apache.qpid.proton.engine.impl.CollectorImpl;
import org.apache.qpid.proton.engine.impl.ProtocolTracer;
import org.apache.qpid.proton.engine.impl.TransportImpl;
import org.apache.qpid.proton.framing.TransportFrame;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -59,6 +62,7 @@ import io.netty.util.ReferenceCountUtil;
public class AmqpConnection extends AmqpAbstractResource<Connection> implements NettyTransportListener {
private static final Logger LOG = LoggerFactory.getLogger(AmqpConnection.class);
private static final Logger TRACE_FRAMES = LoggerFactory.getLogger(AmqpConnection.class.getPackage().getName() + ".FRAMES");
private static final NoOpAsyncResult NOOP_REQUEST = new NoOpAsyncResult();
@ -101,6 +105,7 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements
private long connectTimeout = DEFAULT_CONNECT_TIMEOUT;
private long closeTimeout = DEFAULT_CLOSE_TIMEOUT;
private long drainTimeout = DEFAULT_DRAIN_TIMEOUT;
private boolean trace;
public AmqpConnection(org.apache.activemq.transport.amqp.client.transport.NettyTransport transport, String username, String password) {
setEndpoint(Connection.Factory.create());
@ -154,6 +159,7 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements
sasl.client();
}
authenticator = new SaslAuthenticator(sasl, username, password, authzid, mechanismRestriction);
updateTracer();
open(future);
pumpToProtonTransport(future);
@ -434,6 +440,14 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements
return mechanismRestriction;
}
public boolean isTraceFrames() {
return trace;
}
public void setTraceFrames(boolean trace) {
this.trace = trace;
}
//----- Internal getters used from the child AmqpResource classes --------//
ScheduledExecutorService getScheduler() {
@ -698,6 +712,22 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements
return containerId;
}
private void updateTracer() {
if (isTraceFrames()) {
((TransportImpl) protonTransport).setProtocolTracer(new ProtocolTracer() {
@Override
public void receivedFrame(TransportFrame transportFrame) {
TRACE_FRAMES.trace("{} | RECV: {}", getRemoteURI(), transportFrame.getBody());
}
@Override
public void sentFrame(TransportFrame transportFrame) {
TRACE_FRAMES.trace("{} | SENT: {}", this, transportFrame.getBody());
}
});
}
}
@Override
public String toString() {
return "AmqpConnection { " + connectionId + " }";

View File

@ -39,6 +39,8 @@ public class AmqpAnonymousSenderTest extends AmqpClientTestSupport {
public void testSendMessageOnAnonymousRelayLinkUsingMessageTo() throws Exception {
AmqpClient client = createAmqpClient();
client.setTraceFrames(false);
AmqpConnection connection = client.connect();
AmqpSession session = connection.createSession();

View File

@ -21,7 +21,8 @@
log4j.rootLogger=WARN, console, file
log4j.logger.org.apache.activemq=INFO
log4j.logger.org.apache.activemq.transport.amqp=DEBUG
log4j.logger.org.apache.activemq.transport.amqp.FRAMES=INFO
log4j.logger.org.apache.activemq.transport.amqp.client.FRAMES=TRACE
log4j.logger.org.apache.activemq.transport.amqp.FRAMES=TRACE
log4j.logger.org.fusesource=INFO
# Configure various level of detail for Qpid JMS logs.