ARTEMIS-820 AMQP: Add frame inspection capability to the test client.
This commit is contained in:
parent
1ac69fd173
commit
490bd31c4b
|
@ -50,6 +50,7 @@ import org.apache.qpid.proton.engine.Event.Type;
|
|||
import org.apache.qpid.proton.engine.Sasl;
|
||||
import org.apache.qpid.proton.engine.Transport;
|
||||
import org.apache.qpid.proton.engine.impl.CollectorImpl;
|
||||
import org.apache.qpid.proton.engine.impl.TransportImpl;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
@ -87,6 +88,8 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements
|
|||
private List<Symbol> offeredCapabilities = Collections.emptyList();
|
||||
private Map<Symbol, Object> offeredProperties = Collections.emptyMap();
|
||||
|
||||
private volatile AmqpFrameValidator sentFrameInspector;
|
||||
private volatile AmqpFrameValidator receivedFrameInspector;
|
||||
private AmqpConnectionListener listener;
|
||||
private SaslAuthenticator authenticator;
|
||||
private String mechanismRestriction;
|
||||
|
@ -100,6 +103,7 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements
|
|||
private long connectTimeout = DEFAULT_CONNECT_TIMEOUT;
|
||||
private long closeTimeout = DEFAULT_CLOSE_TIMEOUT;
|
||||
private long drainTimeout = DEFAULT_DRAIN_TIMEOUT;
|
||||
private boolean trace;
|
||||
|
||||
public AmqpConnection(org.apache.activemq.transport.amqp.client.transport.NettyTransport transport,
|
||||
String username,
|
||||
|
@ -155,6 +159,7 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements
|
|||
sasl.client();
|
||||
}
|
||||
authenticator = new SaslAuthenticator(sasl, username, password, authzid, mechanismRestriction);
|
||||
((TransportImpl) protonTransport).setProtocolTracer(new AmqpProtocolTracer(AmqpConnection.this));
|
||||
open(future);
|
||||
|
||||
pumpToProtonTransport(future);
|
||||
|
@ -439,6 +444,30 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements
|
|||
return mechanismRestriction;
|
||||
}
|
||||
|
||||
public boolean isTraceFrames() {
|
||||
return trace;
|
||||
}
|
||||
|
||||
public void setTraceFrames(boolean trace) {
|
||||
this.trace = trace;
|
||||
}
|
||||
|
||||
public AmqpFrameValidator getSentFrameInspector() {
|
||||
return sentFrameInspector;
|
||||
}
|
||||
|
||||
public void setSentFrameInspector(AmqpFrameValidator amqpFrameInspector) {
|
||||
this.sentFrameInspector = amqpFrameInspector;
|
||||
}
|
||||
|
||||
public AmqpFrameValidator getReceivedFrameInspector() {
|
||||
return receivedFrameInspector;
|
||||
}
|
||||
|
||||
public void setReceivedFrameInspector(AmqpFrameValidator amqpFrameInspector) {
|
||||
this.receivedFrameInspector = amqpFrameInspector;
|
||||
}
|
||||
|
||||
//----- Internal getters used from the child AmqpResource classes --------//
|
||||
|
||||
ScheduledExecutorService getScheduler() {
|
||||
|
@ -706,4 +735,4 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements
|
|||
public String toString() {
|
||||
return "AmqpConnection { " + connectionId + " }";
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,103 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.transport.amqp.client;
|
||||
|
||||
import org.apache.qpid.proton.amqp.Binary;
|
||||
import org.apache.qpid.proton.amqp.transport.Attach;
|
||||
import org.apache.qpid.proton.amqp.transport.Begin;
|
||||
import org.apache.qpid.proton.amqp.transport.Close;
|
||||
import org.apache.qpid.proton.amqp.transport.Detach;
|
||||
import org.apache.qpid.proton.amqp.transport.Disposition;
|
||||
import org.apache.qpid.proton.amqp.transport.End;
|
||||
import org.apache.qpid.proton.amqp.transport.Flow;
|
||||
import org.apache.qpid.proton.amqp.transport.Open;
|
||||
import org.apache.qpid.proton.amqp.transport.Transfer;
|
||||
|
||||
/**
|
||||
* Abstract base for a validation hook that is used in tests to check
|
||||
* the state of a remote resource after a variety of lifecycle events.
|
||||
*/
|
||||
public class AmqpFrameValidator {
|
||||
|
||||
private boolean valid = true;
|
||||
private String errorMessage;
|
||||
|
||||
public void inspectOpen(Open open, Binary encoded) {
|
||||
|
||||
}
|
||||
|
||||
public void inspectBegin(Begin begin, Binary encoded) {
|
||||
|
||||
}
|
||||
|
||||
public void inspectAttach(Attach attach, Binary encoded) {
|
||||
|
||||
}
|
||||
|
||||
public void inspectFlow(Flow flow, Binary encoded) {
|
||||
|
||||
}
|
||||
|
||||
public void inspectTransfer(Transfer transfer, Binary encoded) {
|
||||
|
||||
}
|
||||
|
||||
public void inspectDisposition(Disposition disposition, Binary encoded) {
|
||||
|
||||
}
|
||||
|
||||
public void inspectDetach(Detach detach, Binary encoded) {
|
||||
|
||||
}
|
||||
|
||||
public void inspectEnd(End end, Binary encoded) {
|
||||
|
||||
}
|
||||
|
||||
public void inspectClose(Close close, Binary encoded) {
|
||||
|
||||
}
|
||||
|
||||
public boolean isValid() {
|
||||
return valid;
|
||||
}
|
||||
|
||||
protected void setValid(boolean valid) {
|
||||
this.valid = valid;
|
||||
}
|
||||
|
||||
public String getErrorMessage() {
|
||||
return errorMessage;
|
||||
}
|
||||
|
||||
protected void setErrorMessage(String errorMessage) {
|
||||
this.errorMessage = errorMessage;
|
||||
}
|
||||
|
||||
protected void markAsInvalid(String errorMessage) {
|
||||
if (valid) {
|
||||
setValid(false);
|
||||
setErrorMessage(errorMessage);
|
||||
}
|
||||
}
|
||||
|
||||
public void assertValid() {
|
||||
if (!isValid()) {
|
||||
throw new AssertionError(errorMessage);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,116 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.transport.amqp.client;
|
||||
|
||||
import org.apache.qpid.proton.amqp.Binary;
|
||||
import org.apache.qpid.proton.amqp.transport.Attach;
|
||||
import org.apache.qpid.proton.amqp.transport.Begin;
|
||||
import org.apache.qpid.proton.amqp.transport.Close;
|
||||
import org.apache.qpid.proton.amqp.transport.Detach;
|
||||
import org.apache.qpid.proton.amqp.transport.Disposition;
|
||||
import org.apache.qpid.proton.amqp.transport.End;
|
||||
import org.apache.qpid.proton.amqp.transport.Flow;
|
||||
import org.apache.qpid.proton.amqp.transport.FrameBody.FrameBodyHandler;
|
||||
import org.apache.qpid.proton.amqp.transport.Open;
|
||||
import org.apache.qpid.proton.amqp.transport.Transfer;
|
||||
import org.apache.qpid.proton.engine.impl.ProtocolTracer;
|
||||
import org.apache.qpid.proton.framing.TransportFrame;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* Tracer used to spy on AMQP traffic
|
||||
*/
|
||||
public class AmqpProtocolTracer implements ProtocolTracer, FrameBodyHandler<AmqpFrameValidator> {
|
||||
|
||||
private static final Logger TRACE_FRAMES = LoggerFactory.getLogger(AmqpProtocolTracer.class.getPackage().getName() + ".FRAMES");
|
||||
|
||||
private final AmqpConnection connection;
|
||||
|
||||
public AmqpProtocolTracer(AmqpConnection connection) {
|
||||
this.connection = connection;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void receivedFrame(TransportFrame transportFrame) {
|
||||
if (connection.isTraceFrames()) {
|
||||
TRACE_FRAMES.trace("{} | RECV: {}", connection.getRemoteURI(), transportFrame.getBody());
|
||||
}
|
||||
|
||||
AmqpFrameValidator inspector = connection.getReceivedFrameInspector();
|
||||
if (inspector != null) {
|
||||
transportFrame.getBody().invoke(this, transportFrame.getPayload(), inspector);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void sentFrame(TransportFrame transportFrame) {
|
||||
if (connection.isTraceFrames()) {
|
||||
TRACE_FRAMES.trace("{} | SENT: {}", connection.getRemoteURI(), transportFrame.getBody());
|
||||
}
|
||||
|
||||
AmqpFrameValidator inspector = connection.getSentFrameInspector();
|
||||
if (inspector != null) {
|
||||
transportFrame.getBody().invoke(this, transportFrame.getPayload(), inspector);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handleOpen(Open open, Binary payload, AmqpFrameValidator context) {
|
||||
context.inspectOpen(open, payload);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handleBegin(Begin begin, Binary payload, AmqpFrameValidator context) {
|
||||
context.inspectBegin(begin, payload);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handleAttach(Attach attach, Binary payload, AmqpFrameValidator context) {
|
||||
context.inspectAttach(attach, payload);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handleFlow(Flow flow, Binary payload, AmqpFrameValidator context) {
|
||||
context.inspectFlow(flow, payload);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handleTransfer(Transfer transfer, Binary payload, AmqpFrameValidator context) {
|
||||
context.inspectTransfer(transfer, payload);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handleDisposition(Disposition disposition, Binary payload, AmqpFrameValidator context) {
|
||||
context.inspectDisposition(disposition, payload);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handleDetach(Detach detach, Binary payload, AmqpFrameValidator context) {
|
||||
context.inspectDetach(detach, payload);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handleEnd(End end, Binary payload, AmqpFrameValidator context) {
|
||||
context.inspectEnd(end, payload);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handleClose(Close close, Binary payload, AmqpFrameValidator context) {
|
||||
context.inspectClose(close, payload);
|
||||
}
|
||||
}
|
|
@ -6,7 +6,7 @@
|
|||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
|
@ -16,7 +16,9 @@
|
|||
*/
|
||||
package org.apache.activemq.transport.amqp.client;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
import org.apache.activemq.transport.amqp.client.util.AsyncResult;
|
||||
|
@ -38,6 +40,7 @@ public class AmqpSession extends AmqpAbstractResource<Session> {
|
|||
private final AmqpConnection connection;
|
||||
private final String sessionId;
|
||||
private final AmqpTransactionContext txContext;
|
||||
private final AtomicBoolean closed = new AtomicBoolean();
|
||||
|
||||
/**
|
||||
* Create a new session instance.
|
||||
|
@ -51,6 +54,40 @@ public class AmqpSession extends AmqpAbstractResource<Session> {
|
|||
this.txContext = new AmqpTransactionContext(this);
|
||||
}
|
||||
|
||||
/**
|
||||
* Close the receiver, a closed receiver will throw exceptions if any further send
|
||||
* calls are made.
|
||||
*
|
||||
* @throws IOException if an error occurs while closing the receiver.
|
||||
*/
|
||||
public void close() throws IOException {
|
||||
if (closed.compareAndSet(false, true)) {
|
||||
final ClientFuture request = new ClientFuture();
|
||||
getScheduler().execute(new Runnable() {
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
checkClosed();
|
||||
close(request);
|
||||
pumpToProtonTransport(request);
|
||||
}
|
||||
});
|
||||
|
||||
request.sync();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Create an anonymous sender.
|
||||
*
|
||||
* @return a newly created sender that is ready for use.
|
||||
*
|
||||
* @throws Exception if an error occurs while creating the sender.
|
||||
*/
|
||||
public AmqpSender createSender() throws Exception {
|
||||
return createSender(null, false);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a sender instance using the given address
|
||||
*
|
||||
|
@ -101,9 +138,21 @@ public class AmqpSession extends AmqpAbstractResource<Session> {
|
|||
* @throws Exception if an error occurs while creating the receiver.
|
||||
*/
|
||||
public AmqpSender createSender(Target target) throws Exception {
|
||||
return createSender(target, getNextSenderId());
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a sender instance using the given Target
|
||||
*
|
||||
* @param target the caller created and configured Traget used to create the sender link.
|
||||
* @param senderId the sender ID to assign to the newly created Sender.
|
||||
* @return a newly created sender that is ready for use.
|
||||
* @throws Exception if an error occurs while creating the receiver.
|
||||
*/
|
||||
public AmqpSender createSender(Target target, String senderId) throws Exception {
|
||||
checkClosed();
|
||||
|
||||
final AmqpSender sender = new AmqpSender(AmqpSession.this, target, getNextSenderId());
|
||||
final AmqpSender sender = new AmqpSender(AmqpSession.this, target, senderId);
|
||||
final ClientFuture request = new ClientFuture();
|
||||
|
||||
connection.getScheduler().execute(new Runnable() {
|
||||
|
@ -222,7 +271,7 @@ public class AmqpSession extends AmqpAbstractResource<Session> {
|
|||
checkClosed();
|
||||
|
||||
final ClientFuture request = new ClientFuture();
|
||||
final AmqpReceiver receiver = new AmqpReceiver(AmqpSession.this, source, receiverId);
|
||||
final AmqpReceiver receiver = new AmqpReceiver(AmqpSession.this, source, getNextReceiverId());
|
||||
|
||||
connection.getScheduler().execute(new Runnable() {
|
||||
|
||||
|
@ -465,4 +514,4 @@ public class AmqpSession extends AmqpAbstractResource<Session> {
|
|||
throw new IllegalStateException("Session is already closed");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -26,14 +26,17 @@ import org.apache.activemq.artemis.api.core.SimpleString;
|
|||
import org.apache.activemq.artemis.core.postoffice.Binding;
|
||||
import org.apache.activemq.transport.amqp.client.AmqpClient;
|
||||
import org.apache.activemq.transport.amqp.client.AmqpConnection;
|
||||
import org.apache.activemq.transport.amqp.client.AmqpFrameValidator;
|
||||
import org.apache.activemq.transport.amqp.client.AmqpMessage;
|
||||
import org.apache.activemq.transport.amqp.client.AmqpReceiver;
|
||||
import org.apache.activemq.transport.amqp.client.AmqpSender;
|
||||
import org.apache.activemq.transport.amqp.client.AmqpSession;
|
||||
import org.apache.qpid.proton.amqp.Binary;
|
||||
import org.apache.qpid.proton.amqp.DescribedType;
|
||||
import org.apache.qpid.proton.amqp.messaging.Source;
|
||||
import org.apache.qpid.proton.amqp.messaging.TerminusDurability;
|
||||
import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy;
|
||||
import org.apache.qpid.proton.amqp.transport.Detach;
|
||||
import org.apache.qpid.proton.engine.Receiver;
|
||||
import org.junit.Test;
|
||||
import org.slf4j.Logger;
|
||||
|
@ -90,6 +93,26 @@ public class AmqpDurableReceiverTest extends AmqpClientTestSupport {
|
|||
connection.setContainerId(getContainerID());
|
||||
connection.connect();
|
||||
|
||||
connection.setReceivedFrameInspector(new AmqpFrameValidator() {
|
||||
|
||||
@Override
|
||||
public void inspectDetach(Detach detach, Binary encoded) {
|
||||
if (detach.getClosed()) {
|
||||
markAsInvalid("Remote should have detached but closed instead.");
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
connection.setSentFrameInspector(new AmqpFrameValidator() {
|
||||
|
||||
@Override
|
||||
public void inspectDetach(Detach detach, Binary encoded) {
|
||||
if (detach.getClosed()) {
|
||||
markAsInvalid("Client should have detached but closed instead.");
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
AmqpSession session = connection.createSession();
|
||||
AmqpReceiver receiver = session.createDurableReceiver(getTopicName(), getSubscriptionName());
|
||||
|
||||
|
@ -99,6 +122,9 @@ public class AmqpDurableReceiverTest extends AmqpClientTestSupport {
|
|||
|
||||
assertEquals(getTopicName(), lookupSubscription());
|
||||
|
||||
connection.getSentFrameInspector().assertValid();
|
||||
connection.getReceivedFrameInspector().assertValid();
|
||||
|
||||
connection.close();
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue