mirror of https://github.com/apache/activemq.git
Allow a unit test to inspect AMQP frames as part of the test.
(cherry picked from commit 24a79413c6
)
This commit is contained in:
parent
2ea6b00eef
commit
72b642c2d3
|
@ -49,9 +49,7 @@ import org.apache.qpid.proton.engine.Event.Type;
|
|||
import org.apache.qpid.proton.engine.Sasl;
|
||||
import org.apache.qpid.proton.engine.Transport;
|
||||
import org.apache.qpid.proton.engine.impl.CollectorImpl;
|
||||
import org.apache.qpid.proton.engine.impl.ProtocolTracer;
|
||||
import org.apache.qpid.proton.engine.impl.TransportImpl;
|
||||
import org.apache.qpid.proton.framing.TransportFrame;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
@ -62,7 +60,6 @@ import io.netty.util.ReferenceCountUtil;
|
|||
public class AmqpConnection extends AmqpAbstractResource<Connection> implements NettyTransportListener {
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(AmqpConnection.class);
|
||||
private static final Logger TRACE_FRAMES = LoggerFactory.getLogger(AmqpConnection.class.getPackage().getName() + ".FRAMES");
|
||||
|
||||
private static final NoOpAsyncResult NOOP_REQUEST = new NoOpAsyncResult();
|
||||
|
||||
|
@ -92,6 +89,8 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements
|
|||
private List<Symbol> offeredCapabilities = Collections.emptyList();
|
||||
private Map<Symbol, Object> offeredProperties = Collections.emptyMap();
|
||||
|
||||
private volatile AmqpFrameValidator sentFrameInspector;
|
||||
private volatile AmqpFrameValidator receivedFrameInspector;
|
||||
private AmqpConnectionListener listener;
|
||||
private SaslAuthenticator authenticator;
|
||||
private String mechanismRestriction;
|
||||
|
@ -159,7 +158,7 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements
|
|||
sasl.client();
|
||||
}
|
||||
authenticator = new SaslAuthenticator(sasl, username, password, authzid, mechanismRestriction);
|
||||
updateTracer();
|
||||
((TransportImpl) protonTransport).setProtocolTracer(new AmqpProtocolTracer(AmqpConnection.this));
|
||||
open(future);
|
||||
|
||||
pumpToProtonTransport(future);
|
||||
|
@ -454,6 +453,22 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements
|
|||
this.trace = trace;
|
||||
}
|
||||
|
||||
public AmqpFrameValidator getSentFrameInspector() {
|
||||
return sentFrameInspector;
|
||||
}
|
||||
|
||||
public void setSentFrameInspector(AmqpFrameValidator amqpFrameInspector) {
|
||||
this.sentFrameInspector = amqpFrameInspector;
|
||||
}
|
||||
|
||||
public AmqpFrameValidator getReceivedFrameInspector() {
|
||||
return receivedFrameInspector;
|
||||
}
|
||||
|
||||
public void setReceivedFrameInspector(AmqpFrameValidator amqpFrameInspector) {
|
||||
this.receivedFrameInspector = amqpFrameInspector;
|
||||
}
|
||||
|
||||
//----- Internal getters used from the child AmqpResource classes --------//
|
||||
|
||||
ScheduledExecutorService getScheduler() {
|
||||
|
@ -718,22 +733,6 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements
|
|||
return containerId;
|
||||
}
|
||||
|
||||
private void updateTracer() {
|
||||
if (isTraceFrames()) {
|
||||
((TransportImpl) protonTransport).setProtocolTracer(new ProtocolTracer() {
|
||||
@Override
|
||||
public void receivedFrame(TransportFrame transportFrame) {
|
||||
TRACE_FRAMES.trace("{} | RECV: {}", getRemoteURI(), transportFrame.getBody());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void sentFrame(TransportFrame transportFrame) {
|
||||
TRACE_FRAMES.trace("{} | SENT: {}", this, transportFrame.getBody());
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "AmqpConnection { " + connectionId + " }";
|
||||
|
|
|
@ -0,0 +1,103 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.transport.amqp.client;
|
||||
|
||||
import org.apache.qpid.proton.amqp.Binary;
|
||||
import org.apache.qpid.proton.amqp.transport.Attach;
|
||||
import org.apache.qpid.proton.amqp.transport.Begin;
|
||||
import org.apache.qpid.proton.amqp.transport.Close;
|
||||
import org.apache.qpid.proton.amqp.transport.Detach;
|
||||
import org.apache.qpid.proton.amqp.transport.Disposition;
|
||||
import org.apache.qpid.proton.amqp.transport.End;
|
||||
import org.apache.qpid.proton.amqp.transport.Flow;
|
||||
import org.apache.qpid.proton.amqp.transport.Open;
|
||||
import org.apache.qpid.proton.amqp.transport.Transfer;
|
||||
|
||||
/**
|
||||
* Abstract base for a validation hook that is used in tests to check
|
||||
* the state of a remote resource after a variety of lifecycle events.
|
||||
*/
|
||||
public class AmqpFrameValidator {
|
||||
|
||||
private boolean valid = true;
|
||||
private String errorMessage;
|
||||
|
||||
public void inspectOpen(Open open, Binary encoded) {
|
||||
|
||||
}
|
||||
|
||||
public void inspectBegin(Begin begin, Binary encoded) {
|
||||
|
||||
}
|
||||
|
||||
public void inspectAttach(Attach attach, Binary encoded) {
|
||||
|
||||
}
|
||||
|
||||
public void inspectFlow(Flow flow, Binary encoded) {
|
||||
|
||||
}
|
||||
|
||||
public void inspectTransfer(Transfer transfer, Binary encoded) {
|
||||
|
||||
}
|
||||
|
||||
public void inspectDisposition(Disposition disposition, Binary encoded) {
|
||||
|
||||
}
|
||||
|
||||
public void inspectDetach(Detach detach, Binary encoded) {
|
||||
|
||||
}
|
||||
|
||||
public void inspectEnd(End end, Binary encoded) {
|
||||
|
||||
}
|
||||
|
||||
public void inspectClose(Close close, Binary encoded) {
|
||||
|
||||
}
|
||||
|
||||
public boolean isValid() {
|
||||
return valid;
|
||||
}
|
||||
|
||||
protected void setValid(boolean valid) {
|
||||
this.valid = valid;
|
||||
}
|
||||
|
||||
public String getErrorMessage() {
|
||||
return errorMessage;
|
||||
}
|
||||
|
||||
protected void setErrorMessage(String errorMessage) {
|
||||
this.errorMessage = errorMessage;
|
||||
}
|
||||
|
||||
protected void markAsInvalid(String errorMessage) {
|
||||
if (valid) {
|
||||
setValid(false);
|
||||
setErrorMessage(errorMessage);
|
||||
}
|
||||
}
|
||||
|
||||
public void assertValid() {
|
||||
if (!isValid()) {
|
||||
throw new AssertionError(errorMessage);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,116 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.transport.amqp.client;
|
||||
|
||||
import org.apache.qpid.proton.amqp.Binary;
|
||||
import org.apache.qpid.proton.amqp.transport.Attach;
|
||||
import org.apache.qpid.proton.amqp.transport.Begin;
|
||||
import org.apache.qpid.proton.amqp.transport.Close;
|
||||
import org.apache.qpid.proton.amqp.transport.Detach;
|
||||
import org.apache.qpid.proton.amqp.transport.Disposition;
|
||||
import org.apache.qpid.proton.amqp.transport.End;
|
||||
import org.apache.qpid.proton.amqp.transport.Flow;
|
||||
import org.apache.qpid.proton.amqp.transport.FrameBody.FrameBodyHandler;
|
||||
import org.apache.qpid.proton.amqp.transport.Open;
|
||||
import org.apache.qpid.proton.amqp.transport.Transfer;
|
||||
import org.apache.qpid.proton.engine.impl.ProtocolTracer;
|
||||
import org.apache.qpid.proton.framing.TransportFrame;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* Tracer used to spy on AMQP traffic
|
||||
*/
|
||||
public class AmqpProtocolTracer implements ProtocolTracer, FrameBodyHandler<AmqpFrameValidator> {
|
||||
|
||||
private static final Logger TRACE_FRAMES = LoggerFactory.getLogger(AmqpProtocolTracer.class.getPackage().getName() + ".FRAMES");
|
||||
|
||||
private final AmqpConnection connection;
|
||||
|
||||
public AmqpProtocolTracer(AmqpConnection connection) {
|
||||
this.connection = connection;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void receivedFrame(TransportFrame transportFrame) {
|
||||
if (connection.isTraceFrames()) {
|
||||
TRACE_FRAMES.trace("{} | RECV: {}", connection.getRemoteURI(), transportFrame.getBody());
|
||||
}
|
||||
|
||||
AmqpFrameValidator inspector = connection.getReceivedFrameInspector();
|
||||
if (inspector != null) {
|
||||
transportFrame.getBody().invoke(this, transportFrame.getPayload(), inspector);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void sentFrame(TransportFrame transportFrame) {
|
||||
if (connection.isTraceFrames()) {
|
||||
TRACE_FRAMES.trace("{} | SENT: {}", connection.getRemoteURI(), transportFrame.getBody());
|
||||
}
|
||||
|
||||
AmqpFrameValidator inspector = connection.getSentFrameInspector();
|
||||
if (inspector != null) {
|
||||
transportFrame.getBody().invoke(this, transportFrame.getPayload(), inspector);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handleOpen(Open open, Binary payload, AmqpFrameValidator context) {
|
||||
context.inspectOpen(open, payload);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handleBegin(Begin begin, Binary payload, AmqpFrameValidator context) {
|
||||
context.inspectBegin(begin, payload);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handleAttach(Attach attach, Binary payload, AmqpFrameValidator context) {
|
||||
context.inspectAttach(attach, payload);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handleFlow(Flow flow, Binary payload, AmqpFrameValidator context) {
|
||||
context.inspectFlow(flow, payload);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handleTransfer(Transfer transfer, Binary payload, AmqpFrameValidator context) {
|
||||
context.inspectTransfer(transfer, payload);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handleDisposition(Disposition disposition, Binary payload, AmqpFrameValidator context) {
|
||||
context.inspectDisposition(disposition, payload);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handleDetach(Detach detach, Binary payload, AmqpFrameValidator context) {
|
||||
context.inspectDetach(detach, payload);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handleEnd(End end, Binary payload, AmqpFrameValidator context) {
|
||||
context.inspectEnd(end, payload);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handleClose(Close close, Binary payload, AmqpFrameValidator context) {
|
||||
context.inspectClose(close, payload);
|
||||
}
|
||||
}
|
|
@ -143,16 +143,32 @@ public class AmqpSession extends AmqpAbstractResource<Session> {
|
|||
* Create a sender instance using the given Target
|
||||
*
|
||||
* @param target
|
||||
* the caller created and configured Traget used to create the sender link.
|
||||
* the caller created and configured Target used to create the sender link.
|
||||
*
|
||||
* @return a newly created sender that is ready for use.
|
||||
*
|
||||
* @throws Exception if an error occurs while creating the receiver.
|
||||
*/
|
||||
public AmqpSender createSender(Target target) throws Exception {
|
||||
return createSender(target, getNextSenderId());
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a sender instance using the given Target
|
||||
*
|
||||
* @param target
|
||||
* the caller created and configured Target used to create the sender link.
|
||||
* @param sender
|
||||
* the sender ID to assign to the newly created Sender.
|
||||
*
|
||||
* @return a newly created sender that is ready for use.
|
||||
*
|
||||
* @throws Exception if an error occurs while creating the receiver.
|
||||
*/
|
||||
public AmqpSender createSender(Target target, String senderId) throws Exception {
|
||||
checkClosed();
|
||||
|
||||
final AmqpSender sender = new AmqpSender(AmqpSession.this, target, getNextSenderId());
|
||||
final AmqpSender sender = new AmqpSender(AmqpSession.this, target, senderId);
|
||||
final ClientFuture request = new ClientFuture();
|
||||
|
||||
connection.getScheduler().execute(new Runnable() {
|
||||
|
@ -274,6 +290,22 @@ public class AmqpSession extends AmqpAbstractResource<Session> {
|
|||
* @throws Exception if an error occurs while creating the receiver.
|
||||
*/
|
||||
public AmqpReceiver createReceiver(Source source) throws Exception {
|
||||
return createReceiver(source, getNextReceiverId());
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a receiver instance using the given Source
|
||||
*
|
||||
* @param source
|
||||
* the caller created and configured Source used to create the receiver link.
|
||||
* @param receivedId
|
||||
* the ID value to assign to the newly created receiver
|
||||
*
|
||||
* @return a newly created receiver that is ready for use.
|
||||
*
|
||||
* @throws Exception if an error occurs while creating the receiver.
|
||||
*/
|
||||
public AmqpReceiver createReceiver(Source source, String receiverId) throws Exception {
|
||||
checkClosed();
|
||||
|
||||
final ClientFuture request = new ClientFuture();
|
||||
|
|
|
@ -29,12 +29,15 @@ import org.apache.activemq.broker.jmx.BrokerViewMBean;
|
|||
import org.apache.activemq.transport.amqp.client.AmqpClient;
|
||||
import org.apache.activemq.transport.amqp.client.AmqpClientTestSupport;
|
||||
import org.apache.activemq.transport.amqp.client.AmqpConnection;
|
||||
import org.apache.activemq.transport.amqp.client.AmqpFrameValidator;
|
||||
import org.apache.activemq.transport.amqp.client.AmqpReceiver;
|
||||
import org.apache.activemq.transport.amqp.client.AmqpSession;
|
||||
import org.apache.qpid.proton.amqp.Binary;
|
||||
import org.apache.qpid.proton.amqp.DescribedType;
|
||||
import org.apache.qpid.proton.amqp.messaging.Source;
|
||||
import org.apache.qpid.proton.amqp.messaging.TerminusDurability;
|
||||
import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy;
|
||||
import org.apache.qpid.proton.amqp.transport.Detach;
|
||||
import org.apache.qpid.proton.engine.Receiver;
|
||||
import org.junit.Test;
|
||||
|
||||
|
@ -81,6 +84,26 @@ public class AmqpDurableReceiverTest extends AmqpClientTestSupport {
|
|||
connection.setContainerId(getTestName());
|
||||
connection.connect();
|
||||
|
||||
connection.setReceivedFrameInspector(new AmqpFrameValidator() {
|
||||
|
||||
@Override
|
||||
public void inspectDetach(Detach detach, Binary encoded) {
|
||||
if (detach.getClosed()) {
|
||||
markAsInvalid("Remote should have detached but closed instead.");
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
connection.setSentFrameInspector(new AmqpFrameValidator() {
|
||||
|
||||
@Override
|
||||
public void inspectDetach(Detach detach, Binary encoded) {
|
||||
if (detach.getClosed()) {
|
||||
markAsInvalid("Client should have detached but closed instead.");
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
AmqpSession session = connection.createSession();
|
||||
AmqpReceiver receiver = session.createDurableReceiver("topic://" + getTestName(), getTestName());
|
||||
|
||||
|
@ -94,6 +117,9 @@ public class AmqpDurableReceiverTest extends AmqpClientTestSupport {
|
|||
assertEquals(0, brokerView.getDurableTopicSubscribers().length);
|
||||
assertEquals(1, brokerView.getInactiveDurableTopicSubscribers().length);
|
||||
|
||||
connection.getSentFrameInspector().assertValid();
|
||||
connection.getReceivedFrameInspector().assertValid();
|
||||
|
||||
connection.close();
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue