Honor the sender settle mode from the client and ensure we always set
receiver mode to FIRST to reflect we don't support SECOND.  Adds tests
coverage and needed test client changes for this.
This commit is contained in:
Timothy Bish 2017-04-21 10:51:35 -04:00
parent 1fd2450544
commit 381a1ae206
7 changed files with 337 additions and 14 deletions

View File

@ -18,6 +18,7 @@ package org.apache.activemq.transport.amqp.protocol;
import org.apache.activemq.command.LocalTransactionId;
import org.apache.activemq.transport.amqp.AmqpProtocolException;
import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.Receiver;
import org.fusesource.hawtbuf.Buffer;
@ -47,6 +48,12 @@ public abstract class AmqpAbstractReceiver extends AmqpAbstractLink<Receiver> {
public AmqpAbstractReceiver(AmqpSession session, Receiver endpoint) {
super(session, endpoint);
this.configuredCredit = session.getConnection().getConfiguredReceiverCredit();
// We don't support second so enforce it as First and let remote decide what to do
this.endpoint.setReceiverSettleMode(ReceiverSettleMode.FIRST);
// Match what the sender mode is
this.endpoint.setSenderSettleMode(endpoint.getRemoteSenderSettleMode());
}
@Override

View File

@ -52,6 +52,7 @@ import org.apache.qpid.proton.amqp.transaction.TransactionalState;
import org.apache.qpid.proton.amqp.transport.AmqpError;
import org.apache.qpid.proton.amqp.transport.DeliveryState;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.Link;
@ -77,8 +78,8 @@ public class AmqpSender extends AmqpAbstractLink<Sender> {
private final OutboundTransformer outboundTransformer = new AutoOutboundTransformer();
private final AmqpTransferTagGenerator tagCache = new AmqpTransferTagGenerator();
private final LinkedList<MessageDispatch> outbound = new LinkedList<MessageDispatch>();
private final LinkedList<Delivery> dispatchedInTx = new LinkedList<Delivery>();
private final LinkedList<MessageDispatch> outbound = new LinkedList<>();
private final LinkedList<Delivery> dispatchedInTx = new LinkedList<>();
private final ConsumerInfo consumerInfo;
private AbstractSubscription subscription;
@ -106,8 +107,14 @@ public class AmqpSender extends AmqpAbstractLink<Sender> {
public AmqpSender(AmqpSession session, Sender endpoint, ConsumerInfo consumerInfo) {
super(session, endpoint);
// We don't support second so enforce it as First and let remote decide what to do
this.endpoint.setReceiverSettleMode(ReceiverSettleMode.FIRST);
// Match what the sender mode is
this.endpoint.setSenderSettleMode(endpoint.getRemoteSenderSettleMode());
this.consumerInfo = consumerInfo;
this.presettle = getEndpoint().getRemoteSenderSettleMode() == SenderSettleMode.SETTLED;
this.presettle = getEndpoint().getSenderSettleMode() == SenderSettleMode.SETTLED;
}
@Override

View File

@ -65,12 +65,15 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> {
private static final Logger LOG = LoggerFactory.getLogger(AmqpReceiver.class);
private final AtomicBoolean closed = new AtomicBoolean();
private final BlockingQueue<AmqpMessage> prefetch = new LinkedBlockingDeque<AmqpMessage>();
private final BlockingQueue<AmqpMessage> prefetch = new LinkedBlockingDeque<>();
private final AmqpSession session;
private final String address;
private final String receiverId;
private final Source userSpecifiedSource;
private final SenderSettleMode userSpecifiedSenderSettlementMode;
private final ReceiverSettleMode userSpecifiedReceiverSettlementMode;
private String subscriptionName;
private String selector;
@ -91,6 +94,24 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> {
* The unique ID assigned to this receiver.
*/
public AmqpReceiver(AmqpSession session, String address, String receiverId) {
this(session, address, receiverId, null, null);
}
/**
* Create a new receiver instance.
*
* @param session
* The parent session that created the receiver.
* @param address
* The address that this receiver should listen on.
* @param receiverId
* The unique ID assigned to this receiver.
* @param senderMode
* The {@link SenderSettleMode} to use on open.
* @param receiverMode
* The {@link ReceiverSettleMode} to use on open.
*/
public AmqpReceiver(AmqpSession session, String address, String receiverId, SenderSettleMode senderMode, ReceiverSettleMode receiverMode) {
if (address != null && address.isEmpty()) {
throw new IllegalArgumentException("Address cannot be empty.");
@ -100,6 +121,8 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> {
this.session = session;
this.address = address;
this.receiverId = receiverId;
this.userSpecifiedSenderSettlementMode = senderMode;
this.userSpecifiedReceiverSettlementMode = receiverMode;
}
/**
@ -122,6 +145,8 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> {
this.userSpecifiedSource = source;
this.address = source.getAddress();
this.receiverId = receiverId;
this.userSpecifiedSenderSettlementMode = null;
this.userSpecifiedReceiverSettlementMode = null;
}
/**
@ -715,12 +740,25 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> {
Receiver receiver = session.getEndpoint().receiver(receiverName);
receiver.setSource(source);
receiver.setTarget(target);
if (userSpecifiedSenderSettlementMode != null) {
receiver.setSenderSettleMode(userSpecifiedSenderSettlementMode);
if (SenderSettleMode.SETTLED.equals(userSpecifiedSenderSettlementMode)) {
setPresettle(true);
}
} else {
if (isPresettle()) {
receiver.setSenderSettleMode(SenderSettleMode.SETTLED);
} else {
receiver.setSenderSettleMode(SenderSettleMode.UNSETTLED);
}
}
if (userSpecifiedReceiverSettlementMode != null) {
receiver.setReceiverSettleMode(userSpecifiedReceiverSettlementMode);
} else {
receiver.setReceiverSettleMode(ReceiverSettleMode.FIRST);
}
setEndpoint(receiver);
@ -788,7 +826,7 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> {
}
protected void configureSource(Source source) {
Map<Symbol, DescribedType> filters = new HashMap<Symbol, DescribedType>();
Map<Symbol, DescribedType> filters = new HashMap<>();
Symbol[] outcomes = new Symbol[]{Accepted.DESCRIPTOR_SYMBOL, Rejected.DESCRIPTOR_SYMBOL,
Released.DESCRIPTOR_SYMBOL, Modified.DESCRIPTOR_SYMBOL};

View File

@ -66,7 +66,10 @@ public class AmqpSender extends AmqpAbstractResource<Sender> {
private final AmqpSession session;
private final String address;
private final String senderId;
private final Target userSpecifiedTarget;
private final SenderSettleMode userSpecifiedSenderSettlementMode;
private final ReceiverSettleMode userSpecifiedReceiverSettlementMode;
private boolean presettle;
private long sendTimeout = DEFAULT_SEND_TIMEOUT;
@ -90,6 +93,24 @@ public class AmqpSender extends AmqpAbstractResource<Sender> {
* The unique ID assigned to this sender.
*/
public AmqpSender(AmqpSession session, String address, String senderId) {
this(session, address, senderId, null, null);
}
/**
* Create a new sender instance.
*
* @param session
* The parent session that created the session.
* @param address
* The address that this sender produces to.
* @param senderId
* The unique ID assigned to this sender.
* @param senderMode
* The {@link SenderSettleMode} to use on open.
* @param receiverMode
* The {@link ReceiverSettleMode} to use on open.
*/
public AmqpSender(AmqpSession session, String address, String senderId, SenderSettleMode senderMode, ReceiverSettleMode receiverMode) {
if (address != null && address.isEmpty()) {
throw new IllegalArgumentException("Address cannot be empty.");
@ -99,6 +120,8 @@ public class AmqpSender extends AmqpAbstractResource<Sender> {
this.address = address;
this.senderId = senderId;
this.userSpecifiedTarget = null;
this.userSpecifiedSenderSettlementMode = senderMode;
this.userSpecifiedReceiverSettlementMode = receiverMode;
}
/**
@ -121,6 +144,8 @@ public class AmqpSender extends AmqpAbstractResource<Sender> {
this.userSpecifiedTarget = target;
this.address = target.getAddress();
this.senderId = senderId;
this.userSpecifiedSenderSettlementMode = null;
this.userSpecifiedReceiverSettlementMode = null;
}
/**
@ -302,12 +327,25 @@ public class AmqpSender extends AmqpAbstractResource<Sender> {
Sender sender = session.getEndpoint().sender(senderName);
sender.setSource(source);
sender.setTarget(target);
if (userSpecifiedSenderSettlementMode != null) {
sender.setSenderSettleMode(userSpecifiedSenderSettlementMode);
if (SenderSettleMode.SETTLED.equals(userSpecifiedSenderSettlementMode)) {
presettle = true;
}
} else {
if (presettle) {
sender.setSenderSettleMode(SenderSettleMode.SETTLED);
} else {
sender.setSenderSettleMode(SenderSettleMode.UNSETTLED);
}
}
if (userSpecifiedReceiverSettlementMode != null) {
sender.setReceiverSettleMode(userSpecifiedReceiverSettlementMode);
} else {
sender.setReceiverSettleMode(ReceiverSettleMode.FIRST);
}
sender.setDesiredCapabilities(desiredCapabilities);
sender.setOfferedCapabilities(offeredCapabilities);

View File

@ -28,6 +28,8 @@ import org.apache.activemq.transport.amqp.client.util.UnmodifiableSession;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.Source;
import org.apache.qpid.proton.amqp.messaging.Target;
import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
import org.apache.qpid.proton.engine.Connection;
import org.apache.qpid.proton.engine.Session;
@ -138,6 +140,42 @@ public class AmqpSession extends AmqpAbstractResource<Session> {
return createSender(address, presettle, null, null, null);
}
/**
* Create a sender instance using the given address
*
* @param address
* the address to which the sender will produce its messages.
* @param senderSettlementMode
* controls the settlement mode used by the created Sender
* @param receiverSettlementMode
* controls the desired settlement mode used by the remote Receiver
*
* @return a newly created sender that is ready for use.
*
* @throws Exception if an error occurs while creating the sender.
*/
public AmqpSender createSender(final String address, final SenderSettleMode senderMode, ReceiverSettleMode receiverMode) throws Exception {
checkClosed();
final AmqpSender sender = new AmqpSender(AmqpSession.this, address, getNextSenderId(), senderMode, receiverMode);
final ClientFuture request = new ClientFuture();
connection.getScheduler().execute(new Runnable() {
@Override
public void run() {
checkClosed();
sender.setStateInspector(getStateInspector());
sender.open(request);
pumpToProtonTransport(request);
}
});
request.sync();
return sender;
}
/**
* Create a sender instance using the given address
*
@ -349,6 +387,42 @@ public class AmqpSession extends AmqpAbstractResource<Session> {
return receiver;
}
/**
* Create a receiver instance using the given address
*
* @param address
* the address to which the receiver will subscribe for its messages.
* @param senderSettlementMode
* controls the desired settlement mode used by the remote Sender
* @param receiverSettlementMode
* controls the settlement mode used by the created Receiver
*
* @return a newly created receiver that is ready for use.
*
* @throws Exception if an error occurs while creating the receiver.
*/
public AmqpReceiver createReceiver(String address, SenderSettleMode senderMode, ReceiverSettleMode receiverMode) throws Exception {
checkClosed();
final ClientFuture request = new ClientFuture();
final AmqpReceiver receiver = new AmqpReceiver(AmqpSession.this, address, getNextReceiverId(), senderMode, receiverMode);
connection.getScheduler().execute(new Runnable() {
@Override
public void run() {
checkClosed();
receiver.setStateInspector(getStateInspector());
receiver.open(request);
pumpToProtonTransport(request);
}
});
request.sync();
return receiver;
}
/**
* Create a receiver instance using the given Source
*

View File

@ -47,6 +47,8 @@ import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.Source;
import org.apache.qpid.proton.amqp.messaging.TerminusDurability;
import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy;
import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
import org.apache.qpid.proton.engine.Receiver;
import org.apache.qpid.proton.engine.Session;
import org.apache.qpid.proton.message.Message;
@ -124,6 +126,79 @@ public class AmqpReceiverTest extends AmqpClientTestSupport {
connection.close();
}
@Test(timeout = 60000)
public void testSenderSettlementModeSettledIsHonored() throws Exception {
doTestSenderSettlementModeIsHonored(SenderSettleMode.SETTLED);
}
@Test(timeout = 60000)
public void testSenderSettlementModeUnsettledIsHonored() throws Exception {
doTestSenderSettlementModeIsHonored(SenderSettleMode.UNSETTLED);
}
@Test(timeout = 60000)
public void testSenderSettlementModeMixedIsHonored() throws Exception {
doTestSenderSettlementModeIsHonored(SenderSettleMode.MIXED);
}
public void doTestSenderSettlementModeIsHonored(SenderSettleMode settleMode) throws Exception {
AmqpClient client = createAmqpClient();
AmqpConnection connection = trackConnection(client.connect());
AmqpSession session = connection.createSession();
assertEquals(0, brokerService.getAdminView().getQueues().length);
AmqpReceiver receiver = session.createReceiver("queue://" + getTestName(), settleMode, ReceiverSettleMode.FIRST);
assertEquals(1, brokerService.getAdminView().getQueues().length);
assertNotNull(getProxyToQueue(getTestName()));
assertEquals(1, brokerService.getAdminView().getQueueSubscribers().length);
assertEquals(settleMode, receiver.getEndpoint().getRemoteSenderSettleMode());
receiver.close();
assertEquals(0, brokerService.getAdminView().getQueueSubscribers().length);
connection.close();
}
@Test(timeout = 60000)
public void testReceiverSettlementModeSetToFirst() throws Exception {
doTestReceiverSettlementModeForcedToFirst(ReceiverSettleMode.FIRST);
}
@Test(timeout = 60000)
public void testReceiverSettlementModeSetToSecond() throws Exception {
doTestReceiverSettlementModeForcedToFirst(ReceiverSettleMode.SECOND);
}
/*
* The Broker does not currently support ReceiverSettleMode of SECOND so we ensure that
* it always drops that back to FIRST to let the client know. The client will need to
* check and react accordingly.
*/
private void doTestReceiverSettlementModeForcedToFirst(ReceiverSettleMode modeToUse) throws Exception {
AmqpClient client = createAmqpClient();
AmqpConnection connection = trackConnection(client.connect());
AmqpSession session = connection.createSession();
assertEquals(0, brokerService.getAdminView().getQueues().length);
AmqpReceiver receiver = session.createReceiver(
"queue://" + getTestName(), SenderSettleMode.MIXED, modeToUse);
assertEquals(1, brokerService.getAdminView().getQueues().length);
assertNotNull(getProxyToQueue(getTestName()));
assertEquals(1, brokerService.getAdminView().getQueueSubscribers().length);
assertEquals(ReceiverSettleMode.FIRST, receiver.getEndpoint().getRemoteReceiverSettleMode());
receiver.close();
assertEquals(0, brokerService.getAdminView().getQueueSubscribers().length);
connection.close();
}
@Test(timeout = 60000)
public void testCreateQueueReceiverWithJMSSelector() throws Exception {
AmqpClient client = createAmqpClient();

View File

@ -37,6 +37,8 @@ import org.apache.activemq.transport.amqp.client.AmqpSupport;
import org.apache.activemq.transport.amqp.client.AmqpValidator;
import org.apache.activemq.util.Wait;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.Sender;
import org.junit.Test;
@ -49,6 +51,7 @@ public class AmqpSenderTest extends AmqpClientTestSupport {
@Test(timeout = 60000)
public void testCreateQueueSender() throws Exception {
AmqpClient client = createAmqpClient();
AmqpConnection connection = trackConnection(client.connect());
AmqpSession session = connection.createSession();
@ -84,6 +87,87 @@ public class AmqpSenderTest extends AmqpClientTestSupport {
connection.close();
}
@Test(timeout = 60000)
public void testSenderSettlementModeSettledIsHonored() throws Exception {
doTestSenderSettlementModeIsHonored(SenderSettleMode.SETTLED);
}
@Test(timeout = 60000)
public void testSenderSettlementModeUnsettledIsHonored() throws Exception {
doTestSenderSettlementModeIsHonored(SenderSettleMode.UNSETTLED);
}
@Test(timeout = 60000)
public void testSenderSettlementModeMixedIsHonored() throws Exception {
doTestSenderSettlementModeIsHonored(SenderSettleMode.MIXED);
}
public void doTestSenderSettlementModeIsHonored(SenderSettleMode settleMode) throws Exception {
AmqpClient client = createAmqpClient();
client.setTraceFrames(true);
AmqpConnection connection = trackConnection(client.connect());
AmqpSession session = connection.createSession();
assertEquals(0, brokerService.getAdminView().getQueues().length);
AmqpSender sender = session.createSender(
"queue://" + getTestName(), settleMode, ReceiverSettleMode.FIRST);
assertEquals(1, brokerService.getAdminView().getQueues().length);
assertNotNull(getProxyToQueue(getTestName()));
assertEquals(1, brokerService.getAdminView().getQueueProducers().length);
assertEquals(settleMode, sender.getEndpoint().getRemoteSenderSettleMode());
AmqpMessage message = new AmqpMessage();
message.setText("Test-Message");
sender.send(message);
sender.close();
assertEquals(0, brokerService.getAdminView().getQueueProducers().length);
connection.close();
}
@Test(timeout = 60000)
public void testReceiverSettlementModeSetToFirst() throws Exception {
doTestReceiverSettlementModeForcedToFirst(ReceiverSettleMode.FIRST);
}
@Test(timeout = 60000)
public void testReceiverSettlementModeSetToSecond() throws Exception {
doTestReceiverSettlementModeForcedToFirst(ReceiverSettleMode.SECOND);
}
/*
* The Broker does not currently support ReceiverSettleMode of SECOND so we ensure that
* it always drops that back to FIRST to let the client know. The client will need to
* check and react accordingly.
*/
private void doTestReceiverSettlementModeForcedToFirst(ReceiverSettleMode modeToUse) throws Exception {
AmqpClient client = createAmqpClient();
AmqpConnection connection = trackConnection(client.connect());
AmqpSession session = connection.createSession();
assertEquals(0, brokerService.getAdminView().getQueues().length);
AmqpSender sender = session.createSender(
"queue://" + getTestName(), SenderSettleMode.UNSETTLED, modeToUse);
assertEquals(1, brokerService.getAdminView().getQueues().length);
assertNotNull(getProxyToQueue(getTestName()));
assertEquals(1, brokerService.getAdminView().getQueueProducers().length);
assertEquals(ReceiverSettleMode.FIRST, sender.getEndpoint().getRemoteReceiverSettleMode());
sender.close();
assertEquals(0, brokerService.getAdminView().getQueueProducers().length);
connection.close();
}
@Test(timeout = 60000)
public void testSendMessageToQueue() throws Exception {
AmqpClient client = createAmqpClient();