Allow a unit test to inspect AMQP frames as part of the test.
This commit is contained in:
Timothy Bish 2016-10-24 15:39:01 -04:00
parent 5660753097
commit 24a79413c6
5 changed files with 298 additions and 22 deletions

View File

@ -49,9 +49,7 @@ import org.apache.qpid.proton.engine.Event.Type;
import org.apache.qpid.proton.engine.Sasl;
import org.apache.qpid.proton.engine.Transport;
import org.apache.qpid.proton.engine.impl.CollectorImpl;
import org.apache.qpid.proton.engine.impl.ProtocolTracer;
import org.apache.qpid.proton.engine.impl.TransportImpl;
import org.apache.qpid.proton.framing.TransportFrame;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -62,7 +60,6 @@ import io.netty.util.ReferenceCountUtil;
public class AmqpConnection extends AmqpAbstractResource<Connection> implements NettyTransportListener {
private static final Logger LOG = LoggerFactory.getLogger(AmqpConnection.class);
private static final Logger TRACE_FRAMES = LoggerFactory.getLogger(AmqpConnection.class.getPackage().getName() + ".FRAMES");
private static final NoOpAsyncResult NOOP_REQUEST = new NoOpAsyncResult();
@ -92,6 +89,8 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements
private List<Symbol> offeredCapabilities = Collections.emptyList();
private Map<Symbol, Object> offeredProperties = Collections.emptyMap();
private volatile AmqpFrameValidator sentFrameInspector;
private volatile AmqpFrameValidator receivedFrameInspector;
private AmqpConnectionListener listener;
private SaslAuthenticator authenticator;
private String mechanismRestriction;
@ -159,7 +158,7 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements
sasl.client();
}
authenticator = new SaslAuthenticator(sasl, username, password, authzid, mechanismRestriction);
updateTracer();
((TransportImpl) protonTransport).setProtocolTracer(new AmqpProtocolTracer(AmqpConnection.this));
open(future);
pumpToProtonTransport(future);
@ -454,6 +453,22 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements
this.trace = trace;
}
public AmqpFrameValidator getSentFrameInspector() {
return sentFrameInspector;
}
public void setSentFrameInspector(AmqpFrameValidator amqpFrameInspector) {
this.sentFrameInspector = amqpFrameInspector;
}
public AmqpFrameValidator getReceivedFrameInspector() {
return receivedFrameInspector;
}
public void setReceivedFrameInspector(AmqpFrameValidator amqpFrameInspector) {
this.receivedFrameInspector = amqpFrameInspector;
}
//----- Internal getters used from the child AmqpResource classes --------//
ScheduledExecutorService getScheduler() {
@ -718,22 +733,6 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements
return containerId;
}
private void updateTracer() {
if (isTraceFrames()) {
((TransportImpl) protonTransport).setProtocolTracer(new ProtocolTracer() {
@Override
public void receivedFrame(TransportFrame transportFrame) {
TRACE_FRAMES.trace("{} | RECV: {}", getRemoteURI(), transportFrame.getBody());
}
@Override
public void sentFrame(TransportFrame transportFrame) {
TRACE_FRAMES.trace("{} | SENT: {}", this, transportFrame.getBody());
}
});
}
}
@Override
public String toString() {
return "AmqpConnection { " + connectionId + " }";

View File

@ -0,0 +1,103 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.amqp.client;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.transport.Attach;
import org.apache.qpid.proton.amqp.transport.Begin;
import org.apache.qpid.proton.amqp.transport.Close;
import org.apache.qpid.proton.amqp.transport.Detach;
import org.apache.qpid.proton.amqp.transport.Disposition;
import org.apache.qpid.proton.amqp.transport.End;
import org.apache.qpid.proton.amqp.transport.Flow;
import org.apache.qpid.proton.amqp.transport.Open;
import org.apache.qpid.proton.amqp.transport.Transfer;
/**
* Abstract base for a validation hook that is used in tests to check
* the state of a remote resource after a variety of lifecycle events.
*/
public class AmqpFrameValidator {
private boolean valid = true;
private String errorMessage;
public void inspectOpen(Open open, Binary encoded) {
}
public void inspectBegin(Begin begin, Binary encoded) {
}
public void inspectAttach(Attach attach, Binary encoded) {
}
public void inspectFlow(Flow flow, Binary encoded) {
}
public void inspectTransfer(Transfer transfer, Binary encoded) {
}
public void inspectDisposition(Disposition disposition, Binary encoded) {
}
public void inspectDetach(Detach detach, Binary encoded) {
}
public void inspectEnd(End end, Binary encoded) {
}
public void inspectClose(Close close, Binary encoded) {
}
public boolean isValid() {
return valid;
}
protected void setValid(boolean valid) {
this.valid = valid;
}
public String getErrorMessage() {
return errorMessage;
}
protected void setErrorMessage(String errorMessage) {
this.errorMessage = errorMessage;
}
protected void markAsInvalid(String errorMessage) {
if (valid) {
setValid(false);
setErrorMessage(errorMessage);
}
}
public void assertValid() {
if (!isValid()) {
throw new AssertionError(errorMessage);
}
}
}

View File

@ -0,0 +1,116 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.amqp.client;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.transport.Attach;
import org.apache.qpid.proton.amqp.transport.Begin;
import org.apache.qpid.proton.amqp.transport.Close;
import org.apache.qpid.proton.amqp.transport.Detach;
import org.apache.qpid.proton.amqp.transport.Disposition;
import org.apache.qpid.proton.amqp.transport.End;
import org.apache.qpid.proton.amqp.transport.Flow;
import org.apache.qpid.proton.amqp.transport.FrameBody.FrameBodyHandler;
import org.apache.qpid.proton.amqp.transport.Open;
import org.apache.qpid.proton.amqp.transport.Transfer;
import org.apache.qpid.proton.engine.impl.ProtocolTracer;
import org.apache.qpid.proton.framing.TransportFrame;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Tracer used to spy on AMQP traffic
*/
public class AmqpProtocolTracer implements ProtocolTracer, FrameBodyHandler<AmqpFrameValidator> {
private static final Logger TRACE_FRAMES = LoggerFactory.getLogger(AmqpProtocolTracer.class.getPackage().getName() + ".FRAMES");
private final AmqpConnection connection;
public AmqpProtocolTracer(AmqpConnection connection) {
this.connection = connection;
}
@Override
public void receivedFrame(TransportFrame transportFrame) {
if (connection.isTraceFrames()) {
TRACE_FRAMES.trace("{} | RECV: {}", connection.getRemoteURI(), transportFrame.getBody());
}
AmqpFrameValidator inspector = connection.getReceivedFrameInspector();
if (inspector != null) {
transportFrame.getBody().invoke(this, transportFrame.getPayload(), inspector);
}
}
@Override
public void sentFrame(TransportFrame transportFrame) {
if (connection.isTraceFrames()) {
TRACE_FRAMES.trace("{} | SENT: {}", connection.getRemoteURI(), transportFrame.getBody());
}
AmqpFrameValidator inspector = connection.getSentFrameInspector();
if (inspector != null) {
transportFrame.getBody().invoke(this, transportFrame.getPayload(), inspector);
}
}
@Override
public void handleOpen(Open open, Binary payload, AmqpFrameValidator context) {
context.inspectOpen(open, payload);
}
@Override
public void handleBegin(Begin begin, Binary payload, AmqpFrameValidator context) {
context.inspectBegin(begin, payload);
}
@Override
public void handleAttach(Attach attach, Binary payload, AmqpFrameValidator context) {
context.inspectAttach(attach, payload);
}
@Override
public void handleFlow(Flow flow, Binary payload, AmqpFrameValidator context) {
context.inspectFlow(flow, payload);
}
@Override
public void handleTransfer(Transfer transfer, Binary payload, AmqpFrameValidator context) {
context.inspectTransfer(transfer, payload);
}
@Override
public void handleDisposition(Disposition disposition, Binary payload, AmqpFrameValidator context) {
context.inspectDisposition(disposition, payload);
}
@Override
public void handleDetach(Detach detach, Binary payload, AmqpFrameValidator context) {
context.inspectDetach(detach, payload);
}
@Override
public void handleEnd(End end, Binary payload, AmqpFrameValidator context) {
context.inspectEnd(end, payload);
}
@Override
public void handleClose(Close close, Binary payload, AmqpFrameValidator context) {
context.inspectClose(close, payload);
}
}

View File

@ -143,16 +143,32 @@ public class AmqpSession extends AmqpAbstractResource<Session> {
* Create a sender instance using the given Target
*
* @param target
* the caller created and configured Traget used to create the sender link.
* the caller created and configured Target used to create the sender link.
*
* @return a newly created sender that is ready for use.
*
* @throws Exception if an error occurs while creating the receiver.
*/
public AmqpSender createSender(Target target) throws Exception {
return createSender(target, getNextSenderId());
}
/**
* Create a sender instance using the given Target
*
* @param target
* the caller created and configured Target used to create the sender link.
* @param sender
* the sender ID to assign to the newly created Sender.
*
* @return a newly created sender that is ready for use.
*
* @throws Exception if an error occurs while creating the receiver.
*/
public AmqpSender createSender(Target target, String senderId) throws Exception {
checkClosed();
final AmqpSender sender = new AmqpSender(AmqpSession.this, target, getNextSenderId());
final AmqpSender sender = new AmqpSender(AmqpSession.this, target, senderId);
final ClientFuture request = new ClientFuture();
connection.getScheduler().execute(new Runnable() {
@ -274,6 +290,22 @@ public class AmqpSession extends AmqpAbstractResource<Session> {
* @throws Exception if an error occurs while creating the receiver.
*/
public AmqpReceiver createReceiver(Source source) throws Exception {
return createReceiver(source, getNextReceiverId());
}
/**
* Create a receiver instance using the given Source
*
* @param source
* the caller created and configured Source used to create the receiver link.
* @param receivedId
* the ID value to assign to the newly created receiver
*
* @return a newly created receiver that is ready for use.
*
* @throws Exception if an error occurs while creating the receiver.
*/
public AmqpReceiver createReceiver(Source source, String receiverId) throws Exception {
checkClosed();
final ClientFuture request = new ClientFuture();

View File

@ -29,12 +29,15 @@ import org.apache.activemq.broker.jmx.BrokerViewMBean;
import org.apache.activemq.transport.amqp.client.AmqpClient;
import org.apache.activemq.transport.amqp.client.AmqpClientTestSupport;
import org.apache.activemq.transport.amqp.client.AmqpConnection;
import org.apache.activemq.transport.amqp.client.AmqpFrameValidator;
import org.apache.activemq.transport.amqp.client.AmqpReceiver;
import org.apache.activemq.transport.amqp.client.AmqpSession;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.DescribedType;
import org.apache.qpid.proton.amqp.messaging.Source;
import org.apache.qpid.proton.amqp.messaging.TerminusDurability;
import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy;
import org.apache.qpid.proton.amqp.transport.Detach;
import org.apache.qpid.proton.engine.Receiver;
import org.junit.Test;
@ -81,6 +84,26 @@ public class AmqpDurableReceiverTest extends AmqpClientTestSupport {
connection.setContainerId(getTestName());
connection.connect();
connection.setReceivedFrameInspector(new AmqpFrameValidator() {
@Override
public void inspectDetach(Detach detach, Binary encoded) {
if (detach.getClosed()) {
markAsInvalid("Remote should have detached but closed instead.");
}
}
});
connection.setSentFrameInspector(new AmqpFrameValidator() {
@Override
public void inspectDetach(Detach detach, Binary encoded) {
if (detach.getClosed()) {
markAsInvalid("Client should have detached but closed instead.");
}
}
});
AmqpSession session = connection.createSession();
AmqpReceiver receiver = session.createDurableReceiver("topic://" + getTestName(), getTestName());
@ -94,6 +117,9 @@ public class AmqpDurableReceiverTest extends AmqpClientTestSupport {
assertEquals(0, brokerView.getDurableTopicSubscribers().length);
assertEquals(1, brokerView.getInactiveDurableTopicSubscribers().length);
connection.getSentFrameInspector().assertValid();
connection.getReceivedFrameInspector().assertValid();
connection.close();
}