From 24a79413c66013818795360cf3ecc4489d1e4d5f Mon Sep 17 00:00:00 2001 From: Timothy Bish Date: Mon, 24 Oct 2016 15:39:01 -0400 Subject: [PATCH] https://issues.apache.org/jira/browse/AMQ-6479 Allow a unit test to inspect AMQP frames as part of the test. --- .../transport/amqp/client/AmqpConnection.java | 39 +++--- .../amqp/client/AmqpFrameValidator.java | 103 ++++++++++++++++ .../amqp/client/AmqpProtocolTracer.java | 116 ++++++++++++++++++ .../transport/amqp/client/AmqpSession.java | 36 +++++- .../amqp/interop/AmqpDurableReceiverTest.java | 26 ++++ 5 files changed, 298 insertions(+), 22 deletions(-) create mode 100644 activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpFrameValidator.java create mode 100644 activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpProtocolTracer.java diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java index 33280447d5..c6c994daaf 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java @@ -49,9 +49,7 @@ import org.apache.qpid.proton.engine.Event.Type; import org.apache.qpid.proton.engine.Sasl; import org.apache.qpid.proton.engine.Transport; import org.apache.qpid.proton.engine.impl.CollectorImpl; -import org.apache.qpid.proton.engine.impl.ProtocolTracer; import org.apache.qpid.proton.engine.impl.TransportImpl; -import org.apache.qpid.proton.framing.TransportFrame; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -62,7 +60,6 @@ import io.netty.util.ReferenceCountUtil; public class AmqpConnection extends AmqpAbstractResource implements NettyTransportListener { private static final Logger LOG = LoggerFactory.getLogger(AmqpConnection.class); - private static final Logger TRACE_FRAMES = LoggerFactory.getLogger(AmqpConnection.class.getPackage().getName() + ".FRAMES"); private static final NoOpAsyncResult NOOP_REQUEST = new NoOpAsyncResult(); @@ -92,6 +89,8 @@ public class AmqpConnection extends AmqpAbstractResource implements private List offeredCapabilities = Collections.emptyList(); private Map offeredProperties = Collections.emptyMap(); + private volatile AmqpFrameValidator sentFrameInspector; + private volatile AmqpFrameValidator receivedFrameInspector; private AmqpConnectionListener listener; private SaslAuthenticator authenticator; private String mechanismRestriction; @@ -159,7 +158,7 @@ public class AmqpConnection extends AmqpAbstractResource implements sasl.client(); } authenticator = new SaslAuthenticator(sasl, username, password, authzid, mechanismRestriction); - updateTracer(); + ((TransportImpl) protonTransport).setProtocolTracer(new AmqpProtocolTracer(AmqpConnection.this)); open(future); pumpToProtonTransport(future); @@ -454,6 +453,22 @@ public class AmqpConnection extends AmqpAbstractResource implements this.trace = trace; } + public AmqpFrameValidator getSentFrameInspector() { + return sentFrameInspector; + } + + public void setSentFrameInspector(AmqpFrameValidator amqpFrameInspector) { + this.sentFrameInspector = amqpFrameInspector; + } + + public AmqpFrameValidator getReceivedFrameInspector() { + return receivedFrameInspector; + } + + public void setReceivedFrameInspector(AmqpFrameValidator amqpFrameInspector) { + this.receivedFrameInspector = amqpFrameInspector; + } + //----- Internal getters used from the child AmqpResource classes --------// ScheduledExecutorService getScheduler() { @@ -718,22 +733,6 @@ public class AmqpConnection extends AmqpAbstractResource implements return containerId; } - private void updateTracer() { - if (isTraceFrames()) { - ((TransportImpl) protonTransport).setProtocolTracer(new ProtocolTracer() { - @Override - public void receivedFrame(TransportFrame transportFrame) { - TRACE_FRAMES.trace("{} | RECV: {}", getRemoteURI(), transportFrame.getBody()); - } - - @Override - public void sentFrame(TransportFrame transportFrame) { - TRACE_FRAMES.trace("{} | SENT: {}", this, transportFrame.getBody()); - } - }); - } - } - @Override public String toString() { return "AmqpConnection { " + connectionId + " }"; diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpFrameValidator.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpFrameValidator.java new file mode 100644 index 0000000000..7588572dc9 --- /dev/null +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpFrameValidator.java @@ -0,0 +1,103 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp.client; + +import org.apache.qpid.proton.amqp.Binary; +import org.apache.qpid.proton.amqp.transport.Attach; +import org.apache.qpid.proton.amqp.transport.Begin; +import org.apache.qpid.proton.amqp.transport.Close; +import org.apache.qpid.proton.amqp.transport.Detach; +import org.apache.qpid.proton.amqp.transport.Disposition; +import org.apache.qpid.proton.amqp.transport.End; +import org.apache.qpid.proton.amqp.transport.Flow; +import org.apache.qpid.proton.amqp.transport.Open; +import org.apache.qpid.proton.amqp.transport.Transfer; + +/** + * Abstract base for a validation hook that is used in tests to check + * the state of a remote resource after a variety of lifecycle events. + */ +public class AmqpFrameValidator { + + private boolean valid = true; + private String errorMessage; + + public void inspectOpen(Open open, Binary encoded) { + + } + + public void inspectBegin(Begin begin, Binary encoded) { + + } + + public void inspectAttach(Attach attach, Binary encoded) { + + } + + public void inspectFlow(Flow flow, Binary encoded) { + + } + + public void inspectTransfer(Transfer transfer, Binary encoded) { + + } + + public void inspectDisposition(Disposition disposition, Binary encoded) { + + } + + public void inspectDetach(Detach detach, Binary encoded) { + + } + + public void inspectEnd(End end, Binary encoded) { + + } + + public void inspectClose(Close close, Binary encoded) { + + } + + public boolean isValid() { + return valid; + } + + protected void setValid(boolean valid) { + this.valid = valid; + } + + public String getErrorMessage() { + return errorMessage; + } + + protected void setErrorMessage(String errorMessage) { + this.errorMessage = errorMessage; + } + + protected void markAsInvalid(String errorMessage) { + if (valid) { + setValid(false); + setErrorMessage(errorMessage); + } + } + + public void assertValid() { + if (!isValid()) { + throw new AssertionError(errorMessage); + } + } +} diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpProtocolTracer.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpProtocolTracer.java new file mode 100644 index 0000000000..efda4d216e --- /dev/null +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpProtocolTracer.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp.client; + +import org.apache.qpid.proton.amqp.Binary; +import org.apache.qpid.proton.amqp.transport.Attach; +import org.apache.qpid.proton.amqp.transport.Begin; +import org.apache.qpid.proton.amqp.transport.Close; +import org.apache.qpid.proton.amqp.transport.Detach; +import org.apache.qpid.proton.amqp.transport.Disposition; +import org.apache.qpid.proton.amqp.transport.End; +import org.apache.qpid.proton.amqp.transport.Flow; +import org.apache.qpid.proton.amqp.transport.FrameBody.FrameBodyHandler; +import org.apache.qpid.proton.amqp.transport.Open; +import org.apache.qpid.proton.amqp.transport.Transfer; +import org.apache.qpid.proton.engine.impl.ProtocolTracer; +import org.apache.qpid.proton.framing.TransportFrame; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Tracer used to spy on AMQP traffic + */ +public class AmqpProtocolTracer implements ProtocolTracer, FrameBodyHandler { + + private static final Logger TRACE_FRAMES = LoggerFactory.getLogger(AmqpProtocolTracer.class.getPackage().getName() + ".FRAMES"); + + private final AmqpConnection connection; + + public AmqpProtocolTracer(AmqpConnection connection) { + this.connection = connection; + } + + @Override + public void receivedFrame(TransportFrame transportFrame) { + if (connection.isTraceFrames()) { + TRACE_FRAMES.trace("{} | RECV: {}", connection.getRemoteURI(), transportFrame.getBody()); + } + + AmqpFrameValidator inspector = connection.getReceivedFrameInspector(); + if (inspector != null) { + transportFrame.getBody().invoke(this, transportFrame.getPayload(), inspector); + } + } + + @Override + public void sentFrame(TransportFrame transportFrame) { + if (connection.isTraceFrames()) { + TRACE_FRAMES.trace("{} | SENT: {}", connection.getRemoteURI(), transportFrame.getBody()); + } + + AmqpFrameValidator inspector = connection.getSentFrameInspector(); + if (inspector != null) { + transportFrame.getBody().invoke(this, transportFrame.getPayload(), inspector); + } + } + + @Override + public void handleOpen(Open open, Binary payload, AmqpFrameValidator context) { + context.inspectOpen(open, payload); + } + + @Override + public void handleBegin(Begin begin, Binary payload, AmqpFrameValidator context) { + context.inspectBegin(begin, payload); + } + + @Override + public void handleAttach(Attach attach, Binary payload, AmqpFrameValidator context) { + context.inspectAttach(attach, payload); + } + + @Override + public void handleFlow(Flow flow, Binary payload, AmqpFrameValidator context) { + context.inspectFlow(flow, payload); + } + + @Override + public void handleTransfer(Transfer transfer, Binary payload, AmqpFrameValidator context) { + context.inspectTransfer(transfer, payload); + } + + @Override + public void handleDisposition(Disposition disposition, Binary payload, AmqpFrameValidator context) { + context.inspectDisposition(disposition, payload); + } + + @Override + public void handleDetach(Detach detach, Binary payload, AmqpFrameValidator context) { + context.inspectDetach(detach, payload); + } + + @Override + public void handleEnd(End end, Binary payload, AmqpFrameValidator context) { + context.inspectEnd(end, payload); + } + + @Override + public void handleClose(Close close, Binary payload, AmqpFrameValidator context) { + context.inspectClose(close, payload); + } +} diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSession.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSession.java index 7cb745cb3f..b239daed1c 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSession.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSession.java @@ -143,16 +143,32 @@ public class AmqpSession extends AmqpAbstractResource { * Create a sender instance using the given Target * * @param target - * the caller created and configured Traget used to create the sender link. + * the caller created and configured Target used to create the sender link. * * @return a newly created sender that is ready for use. * * @throws Exception if an error occurs while creating the receiver. */ public AmqpSender createSender(Target target) throws Exception { + return createSender(target, getNextSenderId()); + } + + /** + * Create a sender instance using the given Target + * + * @param target + * the caller created and configured Target used to create the sender link. + * @param sender + * the sender ID to assign to the newly created Sender. + * + * @return a newly created sender that is ready for use. + * + * @throws Exception if an error occurs while creating the receiver. + */ + public AmqpSender createSender(Target target, String senderId) throws Exception { checkClosed(); - final AmqpSender sender = new AmqpSender(AmqpSession.this, target, getNextSenderId()); + final AmqpSender sender = new AmqpSender(AmqpSession.this, target, senderId); final ClientFuture request = new ClientFuture(); connection.getScheduler().execute(new Runnable() { @@ -274,6 +290,22 @@ public class AmqpSession extends AmqpAbstractResource { * @throws Exception if an error occurs while creating the receiver. */ public AmqpReceiver createReceiver(Source source) throws Exception { + return createReceiver(source, getNextReceiverId()); + } + + /** + * Create a receiver instance using the given Source + * + * @param source + * the caller created and configured Source used to create the receiver link. + * @param receivedId + * the ID value to assign to the newly created receiver + * + * @return a newly created receiver that is ready for use. + * + * @throws Exception if an error occurs while creating the receiver. + */ + public AmqpReceiver createReceiver(Source source, String receiverId) throws Exception { checkClosed(); final ClientFuture request = new ClientFuture(); diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpDurableReceiverTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpDurableReceiverTest.java index 3db3301cea..c5636db1fe 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpDurableReceiverTest.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpDurableReceiverTest.java @@ -29,12 +29,15 @@ import org.apache.activemq.broker.jmx.BrokerViewMBean; import org.apache.activemq.transport.amqp.client.AmqpClient; import org.apache.activemq.transport.amqp.client.AmqpClientTestSupport; import org.apache.activemq.transport.amqp.client.AmqpConnection; +import org.apache.activemq.transport.amqp.client.AmqpFrameValidator; import org.apache.activemq.transport.amqp.client.AmqpReceiver; import org.apache.activemq.transport.amqp.client.AmqpSession; +import org.apache.qpid.proton.amqp.Binary; import org.apache.qpid.proton.amqp.DescribedType; import org.apache.qpid.proton.amqp.messaging.Source; import org.apache.qpid.proton.amqp.messaging.TerminusDurability; import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy; +import org.apache.qpid.proton.amqp.transport.Detach; import org.apache.qpid.proton.engine.Receiver; import org.junit.Test; @@ -81,6 +84,26 @@ public class AmqpDurableReceiverTest extends AmqpClientTestSupport { connection.setContainerId(getTestName()); connection.connect(); + connection.setReceivedFrameInspector(new AmqpFrameValidator() { + + @Override + public void inspectDetach(Detach detach, Binary encoded) { + if (detach.getClosed()) { + markAsInvalid("Remote should have detached but closed instead."); + } + } + }); + + connection.setSentFrameInspector(new AmqpFrameValidator() { + + @Override + public void inspectDetach(Detach detach, Binary encoded) { + if (detach.getClosed()) { + markAsInvalid("Client should have detached but closed instead."); + } + } + }); + AmqpSession session = connection.createSession(); AmqpReceiver receiver = session.createDurableReceiver("topic://" + getTestName(), getTestName()); @@ -94,6 +117,9 @@ public class AmqpDurableReceiverTest extends AmqpClientTestSupport { assertEquals(0, brokerView.getDurableTopicSubscribers().length); assertEquals(1, brokerView.getInactiveDurableTopicSubscribers().length); + connection.getSentFrameInspector().assertValid(); + connection.getReceivedFrameInspector().assertValid(); + connection.close(); }