ARTEMIS-1127 Match remote Sender and Receiver settle modes

On link attach we currently default out SenderSettleMode to MIXED which
while legal doesn't truly reflect what the client asked for. We instead
now update the link to reflect the mode requested by the client

Also add some tests to ensure that we always return the
ReceiverSettleMode as FIRST since we don't support SECOND.
This commit is contained in:
Timothy Bish 2017-04-21 14:22:09 -04:00
parent 310f953c79
commit 9eed28e0aa
7 changed files with 396 additions and 22 deletions

View File

@ -31,6 +31,7 @@ import org.apache.qpid.proton.amqp.messaging.Rejected;
import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy;
import org.apache.qpid.proton.amqp.transaction.TransactionalState;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.Receiver;
import org.jboss.logging.Logger;
@ -81,6 +82,12 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
super.initialise();
org.apache.qpid.proton.amqp.messaging.Target target = (org.apache.qpid.proton.amqp.messaging.Target) receiver.getRemoteTarget();
// Match the settlement mode of the remote instead of relying on the default of MIXED.
receiver.setSenderSettleMode(receiver.getRemoteSenderSettleMode());
// We don't currently support SECOND so enforce that the answer is anlways FIRST
receiver.setReceiverSettleMode(ReceiverSettleMode.FIRST);
if (target != null) {
if (target.getDynamic()) {
// if dynamic we have to create the node (queue) and set the address on the target, the node is temporary and

View File

@ -22,8 +22,6 @@ import java.util.Objects;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException;
import org.apache.activemq.artemis.api.core.Message;
@ -63,12 +61,16 @@ import org.apache.qpid.proton.amqp.transaction.TransactionalState;
import org.apache.qpid.proton.amqp.transport.AmqpError;
import org.apache.qpid.proton.amqp.transport.DeliveryState;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.EndpointState;
import org.apache.qpid.proton.engine.Sender;
import org.jboss.logging.Logger;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
/**
* TODO: Merge {@link ProtonServerSenderContext} and {@link org.apache.activemq.artemis.protocol.amqp.client.ProtonClientSenderContext} once we support 'global' link names. The split is a workaround for outgoing links
*/
@ -155,6 +157,12 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
String selector = null;
final Map<Symbol, Object> supportedFilters = new HashMap<>();
// Match the settlement mode of the remote instead of relying on the default of MIXED.
sender.setSenderSettleMode(sender.getRemoteSenderSettleMode());
// We don't currently support SECOND so enforce that the answer is anlways FIRST
sender.setReceiverSettleMode(ReceiverSettleMode.FIRST);
if (source != null) {
// We look for message selectors on every receiver, while in other cases we might only
// consume the filter depending on the subscription type.
@ -570,7 +578,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
Modified modification = (Modified) remoteState;
if (Boolean.TRUE.equals(modification.getUndeliverableHere())) {
message.rejectConsumer(((Consumer) brokerConsumer).sequentialID());
message.rejectConsumer(brokerConsumer.sequentialID());
}
if (Boolean.TRUE.equals(modification.getDeliveryFailed())) {

View File

@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@ -70,7 +70,10 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> {
private final AmqpSession session;
private final String address;
private final String receiverId;
private final Source userSpecifiedSource;
private final SenderSettleMode userSpecifiedSenderSettlementMode;
private final ReceiverSettleMode userSpecifiedReceiverSettlementMode;
private String subscriptionName;
private String selector;
@ -83,11 +86,32 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> {
/**
* Create a new receiver instance.
*
* @param session The parent session that created the receiver.
* @param address The address that this receiver should listen on.
* @param receiverId The unique ID assigned to this receiver.
* @param session
* The parent session that created the receiver.
* @param address
* The address that this receiver should listen on.
* @param receiverId
* The unique ID assigned to this receiver.
*/
public AmqpReceiver(AmqpSession session, String address, String receiverId) {
this(session, address, receiverId, null, null);
}
/**
* Create a new receiver instance.
*
* @param session
* The parent session that created the receiver.
* @param address
* The address that this receiver should listen on.
* @param receiverId
* The unique ID assigned to this receiver.
* @param senderMode
* The {@link SenderSettleMode} to use on open.
* @param receiverMode
* The {@link ReceiverSettleMode} to use on open.
*/
public AmqpReceiver(AmqpSession session, String address, String receiverId, SenderSettleMode senderMode, ReceiverSettleMode receiverMode) {
if (address != null && address.isEmpty()) {
throw new IllegalArgumentException("Address cannot be empty.");
@ -97,6 +121,8 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> {
this.session = session;
this.address = address;
this.receiverId = receiverId;
this.userSpecifiedSenderSettlementMode = senderMode;
this.userSpecifiedReceiverSettlementMode = receiverMode;
}
/**
@ -113,9 +139,11 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> {
}
this.session = session;
this.userSpecifiedSource = source;
this.address = source.getAddress();
this.receiverId = receiverId;
this.userSpecifiedSource = source;
this.userSpecifiedSenderSettlementMode = null;
this.userSpecifiedReceiverSettlementMode = null;
}
/**
@ -687,12 +715,25 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> {
Receiver receiver = session.getEndpoint().receiver(receiverName);
receiver.setSource(source);
receiver.setTarget(target);
if (userSpecifiedSenderSettlementMode != null) {
receiver.setSenderSettleMode(userSpecifiedSenderSettlementMode);
if (SenderSettleMode.SETTLED.equals(userSpecifiedSenderSettlementMode)) {
setPresettle(true);
}
} else {
if (isPresettle()) {
receiver.setSenderSettleMode(SenderSettleMode.SETTLED);
} else {
receiver.setSenderSettleMode(SenderSettleMode.UNSETTLED);
}
}
if (userSpecifiedReceiverSettlementMode != null) {
receiver.setReceiverSettleMode(userSpecifiedReceiverSettlementMode);
} else {
receiver.setReceiverSettleMode(ReceiverSettleMode.FIRST);
}
setEndpoint(receiver);

View File

@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@ -66,7 +66,10 @@ public class AmqpSender extends AmqpAbstractResource<Sender> {
private final AmqpSession session;
private final String address;
private final String senderId;
private final Target userSpecifiedTarget;
private final SenderSettleMode userSpecifiedSenderSettlementMode;
private final ReceiverSettleMode userSpecifiedReceiverSettlementMode;
private boolean presettle;
private long sendTimeout = DEFAULT_SEND_TIMEOUT;
@ -81,11 +84,32 @@ public class AmqpSender extends AmqpAbstractResource<Sender> {
/**
* Create a new sender instance.
*
* @param session The parent session that created the session.
* @param address The address that this sender produces to.
* @param senderId The unique ID assigned to this sender.
* @param session
* The parent session that created the session.
* @param address
* The address that this sender produces to.
* @param senderId
* The unique ID assigned to this sender.
*/
public AmqpSender(AmqpSession session, String address, String senderId) {
this(session, address, senderId, null, null);
}
/**
* Create a new sender instance.
*
* @param session
* The parent session that created the session.
* @param address
* The address that this sender produces to.
* @param senderId
* The unique ID assigned to this sender.
* @param senderMode
* The {@link SenderSettleMode} to use on open.
* @param receiverMode
* The {@link ReceiverSettleMode} to use on open.
*/
public AmqpSender(AmqpSession session, String address, String senderId, SenderSettleMode senderMode, ReceiverSettleMode receiverMode) {
if (address != null && address.isEmpty()) {
throw new IllegalArgumentException("Address cannot be empty.");
@ -95,6 +119,8 @@ public class AmqpSender extends AmqpAbstractResource<Sender> {
this.address = address;
this.senderId = senderId;
this.userSpecifiedTarget = null;
this.userSpecifiedSenderSettlementMode = senderMode;
this.userSpecifiedReceiverSettlementMode = receiverMode;
}
/**
@ -111,9 +137,11 @@ public class AmqpSender extends AmqpAbstractResource<Sender> {
}
this.session = session;
this.userSpecifiedTarget = target;
this.address = target.getAddress();
this.senderId = senderId;
this.userSpecifiedTarget = target;
this.userSpecifiedSenderSettlementMode = null;
this.userSpecifiedReceiverSettlementMode = null;
}
/**
@ -289,12 +317,25 @@ public class AmqpSender extends AmqpAbstractResource<Sender> {
Sender sender = session.getEndpoint().sender(senderName);
sender.setSource(source);
sender.setTarget(target);
if (userSpecifiedSenderSettlementMode != null) {
sender.setSenderSettleMode(userSpecifiedSenderSettlementMode);
if (SenderSettleMode.SETTLED.equals(userSpecifiedSenderSettlementMode)) {
presettle = true;
}
} else {
if (presettle) {
sender.setSenderSettleMode(SenderSettleMode.SETTLED);
} else {
sender.setSenderSettleMode(SenderSettleMode.UNSETTLED);
}
}
if (userSpecifiedReceiverSettlementMode != null) {
sender.setReceiverSettleMode(userSpecifiedReceiverSettlementMode);
} else {
sender.setReceiverSettleMode(ReceiverSettleMode.FIRST);
}
sender.setDesiredCapabilities(desiredCapabilities);
sender.setOfferedCapabilities(offeredCapabilities);

View File

@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@ -28,6 +28,8 @@ import org.apache.activemq.transport.amqp.client.util.UnmodifiableSession;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.Source;
import org.apache.qpid.proton.amqp.messaging.Target;
import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
import org.apache.qpid.proton.engine.Connection;
import org.apache.qpid.proton.engine.Session;
@ -173,6 +175,42 @@ public class AmqpSession extends AmqpAbstractResource<Session> {
return sender;
}
/**
* Create a sender instance using the given address
*
* @param address
* the address to which the sender will produce its messages.
* @param senderSettlementMode
* controls the settlement mode used by the created Sender
* @param receiverSettlementMode
* controls the desired settlement mode used by the remote Receiver
*
* @return a newly created sender that is ready for use.
*
* @throws Exception if an error occurs while creating the sender.
*/
public AmqpSender createSender(final String address, final SenderSettleMode senderMode, ReceiverSettleMode receiverMode) throws Exception {
checkClosed();
final AmqpSender sender = new AmqpSender(AmqpSession.this, address, getNextSenderId(), senderMode, receiverMode);
final ClientFuture request = new ClientFuture();
connection.getScheduler().execute(new Runnable() {
@Override
public void run() {
checkClosed();
sender.setStateInspector(getStateInspector());
sender.open(request);
pumpToProtonTransport(request);
}
});
request.sync();
return sender;
}
/**
* Create a sender instance using the given Target
*
@ -309,6 +347,42 @@ public class AmqpSession extends AmqpAbstractResource<Session> {
return receiver;
}
/**
* Create a receiver instance using the given address
*
* @param address
* the address to which the receiver will subscribe for its messages.
* @param senderSettlementMode
* controls the desired settlement mode used by the remote Sender
* @param receiverSettlementMode
* controls the settlement mode used by the created Receiver
*
* @return a newly created receiver that is ready for use.
*
* @throws Exception if an error occurs while creating the receiver.
*/
public AmqpReceiver createReceiver(String address, SenderSettleMode senderMode, ReceiverSettleMode receiverMode) throws Exception {
checkClosed();
final ClientFuture request = new ClientFuture();
final AmqpReceiver receiver = new AmqpReceiver(AmqpSession.this, address, getNextReceiverId(), senderMode, receiverMode);
connection.getScheduler().execute(new Runnable() {
@Override
public void run() {
checkClosed();
receiver.setStateInspector(getStateInspector());
receiver.open(request);
pumpToProtonTransport(request);
}
});
request.sync();
return receiver;
}
/**
* Create a receiver instance using the given Source
*

View File

@ -0,0 +1,99 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.amqp;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.transport.amqp.client.AmqpClient;
import org.apache.activemq.transport.amqp.client.AmqpConnection;
import org.apache.activemq.transport.amqp.client.AmqpReceiver;
import org.apache.activemq.transport.amqp.client.AmqpSession;
import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
import org.junit.Test;
/**
* Test various behaviors of AMQP receivers with the broker.
*/
public class AmqpReceiverTest extends AmqpClientTestSupport {
@Test(timeout = 60000)
public void testSenderSettlementModeSettledIsHonored() throws Exception {
doTestSenderSettlementModeIsHonored(SenderSettleMode.SETTLED);
}
@Test(timeout = 60000)
public void testSenderSettlementModeUnsettledIsHonored() throws Exception {
doTestSenderSettlementModeIsHonored(SenderSettleMode.UNSETTLED);
}
@Test(timeout = 60000)
public void testSenderSettlementModeMixedIsHonored() throws Exception {
doTestSenderSettlementModeIsHonored(SenderSettleMode.MIXED);
}
public void doTestSenderSettlementModeIsHonored(SenderSettleMode settleMode) throws Exception {
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpReceiver receiver = session.createReceiver("queue://" + getTestName(), settleMode, ReceiverSettleMode.FIRST);
Queue queueView = getProxyToQueue(getQueueName());
assertNotNull(queueView);
assertEquals(0, queueView.getMessageCount());
assertEquals(1, server.getTotalConsumerCount());
assertEquals(settleMode, receiver.getEndpoint().getRemoteSenderSettleMode());
receiver.close();
connection.close();
}
@Test(timeout = 60000)
public void testReceiverSettlementModeSetToFirst() throws Exception {
doTestReceiverSettlementModeForcedToFirst(ReceiverSettleMode.FIRST);
}
@Test(timeout = 60000)
public void testReceiverSettlementModeSetToSecond() throws Exception {
doTestReceiverSettlementModeForcedToFirst(ReceiverSettleMode.SECOND);
}
/*
* The Broker does not currently support ReceiverSettleMode of SECOND so we ensure that it
* always drops that back to FIRST to let the client know. The client will need to check and
* react accordingly.
*/
private void doTestReceiverSettlementModeForcedToFirst(ReceiverSettleMode modeToUse) throws Exception {
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpReceiver receiver = session.createReceiver("queue://" + getTestName(), SenderSettleMode.MIXED, modeToUse);
Queue queueView = getProxyToQueue(getQueueName());
assertNotNull(queueView);
assertEquals(0, queueView.getMessageCount());
assertEquals(1, server.getTotalConsumerCount());
assertEquals(ReceiverSettleMode.FIRST, receiver.getEndpoint().getRemoteReceiverSettleMode());
receiver.close();
connection.close();
}
}

View File

@ -0,0 +1,104 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.amqp;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.transport.amqp.client.AmqpClient;
import org.apache.activemq.transport.amqp.client.AmqpConnection;
import org.apache.activemq.transport.amqp.client.AmqpMessage;
import org.apache.activemq.transport.amqp.client.AmqpSender;
import org.apache.activemq.transport.amqp.client.AmqpSession;
import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
import org.junit.Test;
/**
* Test broker behavior when creating AMQP senders
*/
public class AmqpSenderTest extends AmqpClientTestSupport {
@Test(timeout = 60000)
public void testSenderSettlementModeSettledIsHonored() throws Exception {
doTestSenderSettlementModeIsHonored(SenderSettleMode.SETTLED);
}
@Test(timeout = 60000)
public void testSenderSettlementModeUnsettledIsHonored() throws Exception {
doTestSenderSettlementModeIsHonored(SenderSettleMode.UNSETTLED);
}
@Test(timeout = 60000)
public void testSenderSettlementModeMixedIsHonored() throws Exception {
doTestSenderSettlementModeIsHonored(SenderSettleMode.MIXED);
}
public void doTestSenderSettlementModeIsHonored(SenderSettleMode settleMode) throws Exception {
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender("queue://" + getTestName(), settleMode, ReceiverSettleMode.FIRST);
Queue queueView = getProxyToQueue(getQueueName());
assertNotNull(queueView);
assertEquals(0, queueView.getMessageCount());
assertEquals(settleMode, sender.getEndpoint().getRemoteSenderSettleMode());
AmqpMessage message = new AmqpMessage();
message.setText("Test-Message");
sender.send(message);
sender.close();
connection.close();
}
@Test(timeout = 60000)
public void testReceiverSettlementModeSetToFirst() throws Exception {
doTestReceiverSettlementModeForcedToFirst(ReceiverSettleMode.FIRST);
}
@Test(timeout = 60000)
public void testReceiverSettlementModeSetToSecond() throws Exception {
doTestReceiverSettlementModeForcedToFirst(ReceiverSettleMode.SECOND);
}
/*
* The Broker does not currently support ReceiverSettleMode of SECOND so we ensure that it
* always drops that back to FIRST to let the client know. The client will need to check and
* react accordingly.
*/
private void doTestReceiverSettlementModeForcedToFirst(ReceiverSettleMode modeToUse) throws Exception {
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender("queue://" + getTestName(), SenderSettleMode.UNSETTLED, modeToUse);
Queue queueView = getProxyToQueue(getQueueName());
assertNotNull(queueView);
assertEquals(0, queueView.getMessageCount());
assertEquals(ReceiverSettleMode.FIRST, sender.getEndpoint().getRemoteReceiverSettleMode());
sender.close();
connection.close();
}
}