This closes #865

This commit is contained in:
Clebert Suconic 2016-10-25 14:15:28 -04:00
commit 2dfd14421f
5 changed files with 328 additions and 5 deletions

View File

@ -50,6 +50,7 @@ import org.apache.qpid.proton.engine.Event.Type;
import org.apache.qpid.proton.engine.Sasl;
import org.apache.qpid.proton.engine.Transport;
import org.apache.qpid.proton.engine.impl.CollectorImpl;
import org.apache.qpid.proton.engine.impl.TransportImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -87,6 +88,8 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements
private List<Symbol> offeredCapabilities = Collections.emptyList();
private Map<Symbol, Object> offeredProperties = Collections.emptyMap();
private volatile AmqpFrameValidator sentFrameInspector;
private volatile AmqpFrameValidator receivedFrameInspector;
private AmqpConnectionListener listener;
private SaslAuthenticator authenticator;
private String mechanismRestriction;
@ -100,6 +103,7 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements
private long connectTimeout = DEFAULT_CONNECT_TIMEOUT;
private long closeTimeout = DEFAULT_CLOSE_TIMEOUT;
private long drainTimeout = DEFAULT_DRAIN_TIMEOUT;
private boolean trace;
public AmqpConnection(org.apache.activemq.transport.amqp.client.transport.NettyTransport transport,
String username,
@ -155,6 +159,7 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements
sasl.client();
}
authenticator = new SaslAuthenticator(sasl, username, password, authzid, mechanismRestriction);
((TransportImpl) protonTransport).setProtocolTracer(new AmqpProtocolTracer(AmqpConnection.this));
open(future);
pumpToProtonTransport(future);
@ -439,6 +444,30 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements
return mechanismRestriction;
}
public boolean isTraceFrames() {
return trace;
}
public void setTraceFrames(boolean trace) {
this.trace = trace;
}
public AmqpFrameValidator getSentFrameInspector() {
return sentFrameInspector;
}
public void setSentFrameInspector(AmqpFrameValidator amqpFrameInspector) {
this.sentFrameInspector = amqpFrameInspector;
}
public AmqpFrameValidator getReceivedFrameInspector() {
return receivedFrameInspector;
}
public void setReceivedFrameInspector(AmqpFrameValidator amqpFrameInspector) {
this.receivedFrameInspector = amqpFrameInspector;
}
//----- Internal getters used from the child AmqpResource classes --------//
ScheduledExecutorService getScheduler() {
@ -706,4 +735,4 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements
public String toString() {
return "AmqpConnection { " + connectionId + " }";
}
}
}

View File

@ -0,0 +1,103 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.amqp.client;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.transport.Attach;
import org.apache.qpid.proton.amqp.transport.Begin;
import org.apache.qpid.proton.amqp.transport.Close;
import org.apache.qpid.proton.amqp.transport.Detach;
import org.apache.qpid.proton.amqp.transport.Disposition;
import org.apache.qpid.proton.amqp.transport.End;
import org.apache.qpid.proton.amqp.transport.Flow;
import org.apache.qpid.proton.amqp.transport.Open;
import org.apache.qpid.proton.amqp.transport.Transfer;
/**
* Abstract base for a validation hook that is used in tests to check
* the state of a remote resource after a variety of lifecycle events.
*/
public class AmqpFrameValidator {
private boolean valid = true;
private String errorMessage;
public void inspectOpen(Open open, Binary encoded) {
}
public void inspectBegin(Begin begin, Binary encoded) {
}
public void inspectAttach(Attach attach, Binary encoded) {
}
public void inspectFlow(Flow flow, Binary encoded) {
}
public void inspectTransfer(Transfer transfer, Binary encoded) {
}
public void inspectDisposition(Disposition disposition, Binary encoded) {
}
public void inspectDetach(Detach detach, Binary encoded) {
}
public void inspectEnd(End end, Binary encoded) {
}
public void inspectClose(Close close, Binary encoded) {
}
public boolean isValid() {
return valid;
}
protected void setValid(boolean valid) {
this.valid = valid;
}
public String getErrorMessage() {
return errorMessage;
}
protected void setErrorMessage(String errorMessage) {
this.errorMessage = errorMessage;
}
protected void markAsInvalid(String errorMessage) {
if (valid) {
setValid(false);
setErrorMessage(errorMessage);
}
}
public void assertValid() {
if (!isValid()) {
throw new AssertionError(errorMessage);
}
}
}

View File

@ -0,0 +1,116 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.amqp.client;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.transport.Attach;
import org.apache.qpid.proton.amqp.transport.Begin;
import org.apache.qpid.proton.amqp.transport.Close;
import org.apache.qpid.proton.amqp.transport.Detach;
import org.apache.qpid.proton.amqp.transport.Disposition;
import org.apache.qpid.proton.amqp.transport.End;
import org.apache.qpid.proton.amqp.transport.Flow;
import org.apache.qpid.proton.amqp.transport.FrameBody.FrameBodyHandler;
import org.apache.qpid.proton.amqp.transport.Open;
import org.apache.qpid.proton.amqp.transport.Transfer;
import org.apache.qpid.proton.engine.impl.ProtocolTracer;
import org.apache.qpid.proton.framing.TransportFrame;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Tracer used to spy on AMQP traffic
*/
public class AmqpProtocolTracer implements ProtocolTracer, FrameBodyHandler<AmqpFrameValidator> {
private static final Logger TRACE_FRAMES = LoggerFactory.getLogger(AmqpProtocolTracer.class.getPackage().getName() + ".FRAMES");
private final AmqpConnection connection;
public AmqpProtocolTracer(AmqpConnection connection) {
this.connection = connection;
}
@Override
public void receivedFrame(TransportFrame transportFrame) {
if (connection.isTraceFrames()) {
TRACE_FRAMES.trace("{} | RECV: {}", connection.getRemoteURI(), transportFrame.getBody());
}
AmqpFrameValidator inspector = connection.getReceivedFrameInspector();
if (inspector != null) {
transportFrame.getBody().invoke(this, transportFrame.getPayload(), inspector);
}
}
@Override
public void sentFrame(TransportFrame transportFrame) {
if (connection.isTraceFrames()) {
TRACE_FRAMES.trace("{} | SENT: {}", connection.getRemoteURI(), transportFrame.getBody());
}
AmqpFrameValidator inspector = connection.getSentFrameInspector();
if (inspector != null) {
transportFrame.getBody().invoke(this, transportFrame.getPayload(), inspector);
}
}
@Override
public void handleOpen(Open open, Binary payload, AmqpFrameValidator context) {
context.inspectOpen(open, payload);
}
@Override
public void handleBegin(Begin begin, Binary payload, AmqpFrameValidator context) {
context.inspectBegin(begin, payload);
}
@Override
public void handleAttach(Attach attach, Binary payload, AmqpFrameValidator context) {
context.inspectAttach(attach, payload);
}
@Override
public void handleFlow(Flow flow, Binary payload, AmqpFrameValidator context) {
context.inspectFlow(flow, payload);
}
@Override
public void handleTransfer(Transfer transfer, Binary payload, AmqpFrameValidator context) {
context.inspectTransfer(transfer, payload);
}
@Override
public void handleDisposition(Disposition disposition, Binary payload, AmqpFrameValidator context) {
context.inspectDisposition(disposition, payload);
}
@Override
public void handleDetach(Detach detach, Binary payload, AmqpFrameValidator context) {
context.inspectDetach(detach, payload);
}
@Override
public void handleEnd(End end, Binary payload, AmqpFrameValidator context) {
context.inspectEnd(end, payload);
}
@Override
public void handleClose(Close close, Binary payload, AmqpFrameValidator context) {
context.inspectClose(close, payload);
}
}

View File

@ -6,7 +6,7 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@ -16,7 +16,9 @@
*/
package org.apache.activemq.transport.amqp.client;
import java.io.IOException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.activemq.transport.amqp.client.util.AsyncResult;
@ -38,6 +40,7 @@ public class AmqpSession extends AmqpAbstractResource<Session> {
private final AmqpConnection connection;
private final String sessionId;
private final AmqpTransactionContext txContext;
private final AtomicBoolean closed = new AtomicBoolean();
/**
* Create a new session instance.
@ -51,6 +54,40 @@ public class AmqpSession extends AmqpAbstractResource<Session> {
this.txContext = new AmqpTransactionContext(this);
}
/**
* Close the receiver, a closed receiver will throw exceptions if any further send
* calls are made.
*
* @throws IOException if an error occurs while closing the receiver.
*/
public void close() throws IOException {
if (closed.compareAndSet(false, true)) {
final ClientFuture request = new ClientFuture();
getScheduler().execute(new Runnable() {
@Override
public void run() {
checkClosed();
close(request);
pumpToProtonTransport(request);
}
});
request.sync();
}
}
/**
* Create an anonymous sender.
*
* @return a newly created sender that is ready for use.
*
* @throws Exception if an error occurs while creating the sender.
*/
public AmqpSender createSender() throws Exception {
return createSender(null, false);
}
/**
* Create a sender instance using the given address
*
@ -101,9 +138,21 @@ public class AmqpSession extends AmqpAbstractResource<Session> {
* @throws Exception if an error occurs while creating the receiver.
*/
public AmqpSender createSender(Target target) throws Exception {
return createSender(target, getNextSenderId());
}
/**
* Create a sender instance using the given Target
*
* @param target the caller created and configured Traget used to create the sender link.
* @param senderId the sender ID to assign to the newly created Sender.
* @return a newly created sender that is ready for use.
* @throws Exception if an error occurs while creating the receiver.
*/
public AmqpSender createSender(Target target, String senderId) throws Exception {
checkClosed();
final AmqpSender sender = new AmqpSender(AmqpSession.this, target, getNextSenderId());
final AmqpSender sender = new AmqpSender(AmqpSession.this, target, senderId);
final ClientFuture request = new ClientFuture();
connection.getScheduler().execute(new Runnable() {
@ -222,7 +271,7 @@ public class AmqpSession extends AmqpAbstractResource<Session> {
checkClosed();
final ClientFuture request = new ClientFuture();
final AmqpReceiver receiver = new AmqpReceiver(AmqpSession.this, source, receiverId);
final AmqpReceiver receiver = new AmqpReceiver(AmqpSession.this, source, getNextReceiverId());
connection.getScheduler().execute(new Runnable() {
@ -465,4 +514,4 @@ public class AmqpSession extends AmqpAbstractResource<Session> {
throw new IllegalStateException("Session is already closed");
}
}
}
}

View File

@ -26,14 +26,17 @@ import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.transport.amqp.client.AmqpClient;
import org.apache.activemq.transport.amqp.client.AmqpConnection;
import org.apache.activemq.transport.amqp.client.AmqpFrameValidator;
import org.apache.activemq.transport.amqp.client.AmqpMessage;
import org.apache.activemq.transport.amqp.client.AmqpReceiver;
import org.apache.activemq.transport.amqp.client.AmqpSender;
import org.apache.activemq.transport.amqp.client.AmqpSession;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.DescribedType;
import org.apache.qpid.proton.amqp.messaging.Source;
import org.apache.qpid.proton.amqp.messaging.TerminusDurability;
import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy;
import org.apache.qpid.proton.amqp.transport.Detach;
import org.apache.qpid.proton.engine.Receiver;
import org.junit.Test;
import org.slf4j.Logger;
@ -90,6 +93,26 @@ public class AmqpDurableReceiverTest extends AmqpClientTestSupport {
connection.setContainerId(getContainerID());
connection.connect();
connection.setReceivedFrameInspector(new AmqpFrameValidator() {
@Override
public void inspectDetach(Detach detach, Binary encoded) {
if (detach.getClosed()) {
markAsInvalid("Remote should have detached but closed instead.");
}
}
});
connection.setSentFrameInspector(new AmqpFrameValidator() {
@Override
public void inspectDetach(Detach detach, Binary encoded) {
if (detach.getClosed()) {
markAsInvalid("Client should have detached but closed instead.");
}
}
});
AmqpSession session = connection.createSession();
AmqpReceiver receiver = session.createDurableReceiver(getTopicName(), getSubscriptionName());
@ -99,6 +122,9 @@ public class AmqpDurableReceiverTest extends AmqpClientTestSupport {
assertEquals(getTopicName(), lookupSubscription());
connection.getSentFrameInspector().assertValid();
connection.getReceivedFrameInspector().assertValid();
connection.close();
}