From 490bd31c4b07df1e3d8b9636afdab9e9f89e81d6 Mon Sep 17 00:00:00 2001 From: Francesco Nigro Date: Tue, 25 Oct 2016 17:15:43 +0200 Subject: [PATCH] ARTEMIS-820 AMQP: Add frame inspection capability to the test client. --- .../transport/amqp/client/AmqpConnection.java | 31 ++++- .../amqp/client/AmqpFrameValidator.java | 103 ++++++++++++++++ .../amqp/client/AmqpProtocolTracer.java | 116 ++++++++++++++++++ .../transport/amqp/client/AmqpSession.java | 57 ++++++++- .../amqp/AmqpDurableReceiverTest.java | 26 ++++ 5 files changed, 328 insertions(+), 5 deletions(-) create mode 100644 tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpFrameValidator.java create mode 100644 tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpProtocolTracer.java diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java index 53fb9f5489..01c60bce14 100644 --- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java +++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java @@ -50,6 +50,7 @@ import org.apache.qpid.proton.engine.Event.Type; import org.apache.qpid.proton.engine.Sasl; import org.apache.qpid.proton.engine.Transport; import org.apache.qpid.proton.engine.impl.CollectorImpl; +import org.apache.qpid.proton.engine.impl.TransportImpl; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -87,6 +88,8 @@ public class AmqpConnection extends AmqpAbstractResource implements private List offeredCapabilities = Collections.emptyList(); private Map offeredProperties = Collections.emptyMap(); + private volatile AmqpFrameValidator sentFrameInspector; + private volatile AmqpFrameValidator receivedFrameInspector; private AmqpConnectionListener listener; private SaslAuthenticator authenticator; private String mechanismRestriction; @@ -100,6 +103,7 @@ public class AmqpConnection extends AmqpAbstractResource implements private long connectTimeout = DEFAULT_CONNECT_TIMEOUT; private long closeTimeout = DEFAULT_CLOSE_TIMEOUT; private long drainTimeout = DEFAULT_DRAIN_TIMEOUT; + private boolean trace; public AmqpConnection(org.apache.activemq.transport.amqp.client.transport.NettyTransport transport, String username, @@ -155,6 +159,7 @@ public class AmqpConnection extends AmqpAbstractResource implements sasl.client(); } authenticator = new SaslAuthenticator(sasl, username, password, authzid, mechanismRestriction); + ((TransportImpl) protonTransport).setProtocolTracer(new AmqpProtocolTracer(AmqpConnection.this)); open(future); pumpToProtonTransport(future); @@ -439,6 +444,30 @@ public class AmqpConnection extends AmqpAbstractResource implements return mechanismRestriction; } + public boolean isTraceFrames() { + return trace; + } + + public void setTraceFrames(boolean trace) { + this.trace = trace; + } + + public AmqpFrameValidator getSentFrameInspector() { + return sentFrameInspector; + } + + public void setSentFrameInspector(AmqpFrameValidator amqpFrameInspector) { + this.sentFrameInspector = amqpFrameInspector; + } + + public AmqpFrameValidator getReceivedFrameInspector() { + return receivedFrameInspector; + } + + public void setReceivedFrameInspector(AmqpFrameValidator amqpFrameInspector) { + this.receivedFrameInspector = amqpFrameInspector; + } + //----- Internal getters used from the child AmqpResource classes --------// ScheduledExecutorService getScheduler() { @@ -706,4 +735,4 @@ public class AmqpConnection extends AmqpAbstractResource implements public String toString() { return "AmqpConnection { " + connectionId + " }"; } -} +} \ No newline at end of file diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpFrameValidator.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpFrameValidator.java new file mode 100644 index 0000000000..47961108c4 --- /dev/null +++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpFrameValidator.java @@ -0,0 +1,103 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp.client; + +import org.apache.qpid.proton.amqp.Binary; +import org.apache.qpid.proton.amqp.transport.Attach; +import org.apache.qpid.proton.amqp.transport.Begin; +import org.apache.qpid.proton.amqp.transport.Close; +import org.apache.qpid.proton.amqp.transport.Detach; +import org.apache.qpid.proton.amqp.transport.Disposition; +import org.apache.qpid.proton.amqp.transport.End; +import org.apache.qpid.proton.amqp.transport.Flow; +import org.apache.qpid.proton.amqp.transport.Open; +import org.apache.qpid.proton.amqp.transport.Transfer; + +/** + * Abstract base for a validation hook that is used in tests to check + * the state of a remote resource after a variety of lifecycle events. + */ +public class AmqpFrameValidator { + + private boolean valid = true; + private String errorMessage; + + public void inspectOpen(Open open, Binary encoded) { + + } + + public void inspectBegin(Begin begin, Binary encoded) { + + } + + public void inspectAttach(Attach attach, Binary encoded) { + + } + + public void inspectFlow(Flow flow, Binary encoded) { + + } + + public void inspectTransfer(Transfer transfer, Binary encoded) { + + } + + public void inspectDisposition(Disposition disposition, Binary encoded) { + + } + + public void inspectDetach(Detach detach, Binary encoded) { + + } + + public void inspectEnd(End end, Binary encoded) { + + } + + public void inspectClose(Close close, Binary encoded) { + + } + + public boolean isValid() { + return valid; + } + + protected void setValid(boolean valid) { + this.valid = valid; + } + + public String getErrorMessage() { + return errorMessage; + } + + protected void setErrorMessage(String errorMessage) { + this.errorMessage = errorMessage; + } + + protected void markAsInvalid(String errorMessage) { + if (valid) { + setValid(false); + setErrorMessage(errorMessage); + } + } + + public void assertValid() { + if (!isValid()) { + throw new AssertionError(errorMessage); + } + } +} \ No newline at end of file diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpProtocolTracer.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpProtocolTracer.java new file mode 100644 index 0000000000..68fcd856d1 --- /dev/null +++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpProtocolTracer.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp.client; + +import org.apache.qpid.proton.amqp.Binary; +import org.apache.qpid.proton.amqp.transport.Attach; +import org.apache.qpid.proton.amqp.transport.Begin; +import org.apache.qpid.proton.amqp.transport.Close; +import org.apache.qpid.proton.amqp.transport.Detach; +import org.apache.qpid.proton.amqp.transport.Disposition; +import org.apache.qpid.proton.amqp.transport.End; +import org.apache.qpid.proton.amqp.transport.Flow; +import org.apache.qpid.proton.amqp.transport.FrameBody.FrameBodyHandler; +import org.apache.qpid.proton.amqp.transport.Open; +import org.apache.qpid.proton.amqp.transport.Transfer; +import org.apache.qpid.proton.engine.impl.ProtocolTracer; +import org.apache.qpid.proton.framing.TransportFrame; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Tracer used to spy on AMQP traffic + */ +public class AmqpProtocolTracer implements ProtocolTracer, FrameBodyHandler { + + private static final Logger TRACE_FRAMES = LoggerFactory.getLogger(AmqpProtocolTracer.class.getPackage().getName() + ".FRAMES"); + + private final AmqpConnection connection; + + public AmqpProtocolTracer(AmqpConnection connection) { + this.connection = connection; + } + + @Override + public void receivedFrame(TransportFrame transportFrame) { + if (connection.isTraceFrames()) { + TRACE_FRAMES.trace("{} | RECV: {}", connection.getRemoteURI(), transportFrame.getBody()); + } + + AmqpFrameValidator inspector = connection.getReceivedFrameInspector(); + if (inspector != null) { + transportFrame.getBody().invoke(this, transportFrame.getPayload(), inspector); + } + } + + @Override + public void sentFrame(TransportFrame transportFrame) { + if (connection.isTraceFrames()) { + TRACE_FRAMES.trace("{} | SENT: {}", connection.getRemoteURI(), transportFrame.getBody()); + } + + AmqpFrameValidator inspector = connection.getSentFrameInspector(); + if (inspector != null) { + transportFrame.getBody().invoke(this, transportFrame.getPayload(), inspector); + } + } + + @Override + public void handleOpen(Open open, Binary payload, AmqpFrameValidator context) { + context.inspectOpen(open, payload); + } + + @Override + public void handleBegin(Begin begin, Binary payload, AmqpFrameValidator context) { + context.inspectBegin(begin, payload); + } + + @Override + public void handleAttach(Attach attach, Binary payload, AmqpFrameValidator context) { + context.inspectAttach(attach, payload); + } + + @Override + public void handleFlow(Flow flow, Binary payload, AmqpFrameValidator context) { + context.inspectFlow(flow, payload); + } + + @Override + public void handleTransfer(Transfer transfer, Binary payload, AmqpFrameValidator context) { + context.inspectTransfer(transfer, payload); + } + + @Override + public void handleDisposition(Disposition disposition, Binary payload, AmqpFrameValidator context) { + context.inspectDisposition(disposition, payload); + } + + @Override + public void handleDetach(Detach detach, Binary payload, AmqpFrameValidator context) { + context.inspectDetach(detach, payload); + } + + @Override + public void handleEnd(End end, Binary payload, AmqpFrameValidator context) { + context.inspectEnd(end, payload); + } + + @Override + public void handleClose(Close close, Binary payload, AmqpFrameValidator context) { + context.inspectClose(close, payload); + } +} \ No newline at end of file diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSession.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSession.java index 936d4ef87c..fc3fdf71a3 100644 --- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSession.java +++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSession.java @@ -6,7 +6,7 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -16,7 +16,9 @@ */ package org.apache.activemq.transport.amqp.client; +import java.io.IOException; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import org.apache.activemq.transport.amqp.client.util.AsyncResult; @@ -38,6 +40,7 @@ public class AmqpSession extends AmqpAbstractResource { private final AmqpConnection connection; private final String sessionId; private final AmqpTransactionContext txContext; + private final AtomicBoolean closed = new AtomicBoolean(); /** * Create a new session instance. @@ -51,6 +54,40 @@ public class AmqpSession extends AmqpAbstractResource { this.txContext = new AmqpTransactionContext(this); } + /** + * Close the receiver, a closed receiver will throw exceptions if any further send + * calls are made. + * + * @throws IOException if an error occurs while closing the receiver. + */ + public void close() throws IOException { + if (closed.compareAndSet(false, true)) { + final ClientFuture request = new ClientFuture(); + getScheduler().execute(new Runnable() { + + @Override + public void run() { + checkClosed(); + close(request); + pumpToProtonTransport(request); + } + }); + + request.sync(); + } + } + + /** + * Create an anonymous sender. + * + * @return a newly created sender that is ready for use. + * + * @throws Exception if an error occurs while creating the sender. + */ + public AmqpSender createSender() throws Exception { + return createSender(null, false); + } + /** * Create a sender instance using the given address * @@ -101,9 +138,21 @@ public class AmqpSession extends AmqpAbstractResource { * @throws Exception if an error occurs while creating the receiver. */ public AmqpSender createSender(Target target) throws Exception { + return createSender(target, getNextSenderId()); + } + + /** + * Create a sender instance using the given Target + * + * @param target the caller created and configured Traget used to create the sender link. + * @param senderId the sender ID to assign to the newly created Sender. + * @return a newly created sender that is ready for use. + * @throws Exception if an error occurs while creating the receiver. + */ + public AmqpSender createSender(Target target, String senderId) throws Exception { checkClosed(); - final AmqpSender sender = new AmqpSender(AmqpSession.this, target, getNextSenderId()); + final AmqpSender sender = new AmqpSender(AmqpSession.this, target, senderId); final ClientFuture request = new ClientFuture(); connection.getScheduler().execute(new Runnable() { @@ -222,7 +271,7 @@ public class AmqpSession extends AmqpAbstractResource { checkClosed(); final ClientFuture request = new ClientFuture(); - final AmqpReceiver receiver = new AmqpReceiver(AmqpSession.this, source, receiverId); + final AmqpReceiver receiver = new AmqpReceiver(AmqpSession.this, source, getNextReceiverId()); connection.getScheduler().execute(new Runnable() { @@ -465,4 +514,4 @@ public class AmqpSession extends AmqpAbstractResource { throw new IllegalStateException("Session is already closed"); } } -} +} \ No newline at end of file diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpDurableReceiverTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpDurableReceiverTest.java index e0c6b6cbad..86a35a2f74 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpDurableReceiverTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpDurableReceiverTest.java @@ -26,14 +26,17 @@ import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.postoffice.Binding; import org.apache.activemq.transport.amqp.client.AmqpClient; import org.apache.activemq.transport.amqp.client.AmqpConnection; +import org.apache.activemq.transport.amqp.client.AmqpFrameValidator; import org.apache.activemq.transport.amqp.client.AmqpMessage; import org.apache.activemq.transport.amqp.client.AmqpReceiver; import org.apache.activemq.transport.amqp.client.AmqpSender; import org.apache.activemq.transport.amqp.client.AmqpSession; +import org.apache.qpid.proton.amqp.Binary; import org.apache.qpid.proton.amqp.DescribedType; import org.apache.qpid.proton.amqp.messaging.Source; import org.apache.qpid.proton.amqp.messaging.TerminusDurability; import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy; +import org.apache.qpid.proton.amqp.transport.Detach; import org.apache.qpid.proton.engine.Receiver; import org.junit.Test; import org.slf4j.Logger; @@ -90,6 +93,26 @@ public class AmqpDurableReceiverTest extends AmqpClientTestSupport { connection.setContainerId(getContainerID()); connection.connect(); + connection.setReceivedFrameInspector(new AmqpFrameValidator() { + + @Override + public void inspectDetach(Detach detach, Binary encoded) { + if (detach.getClosed()) { + markAsInvalid("Remote should have detached but closed instead."); + } + } + }); + + connection.setSentFrameInspector(new AmqpFrameValidator() { + + @Override + public void inspectDetach(Detach detach, Binary encoded) { + if (detach.getClosed()) { + markAsInvalid("Client should have detached but closed instead."); + } + } + }); + AmqpSession session = connection.createSession(); AmqpReceiver receiver = session.createDurableReceiver(getTopicName(), getSubscriptionName()); @@ -99,6 +122,9 @@ public class AmqpDurableReceiverTest extends AmqpClientTestSupport { assertEquals(getTopicName(), lookupSubscription()); + connection.getSentFrameInspector().assertValid(); + connection.getReceivedFrameInspector().assertValid(); + connection.close(); }