From 9eed28e0aad9683b894f03f731e053b5c4639bc6 Mon Sep 17 00:00:00 2001 From: Timothy Bish Date: Fri, 21 Apr 2017 14:22:09 -0400 Subject: [PATCH] ARTEMIS-1127 Match remote Sender and Receiver settle modes On link attach we currently default out SenderSettleMode to MIXED which while legal doesn't truly reflect what the client asked for. We instead now update the link to reflect the mode requested by the client Also add some tests to ensure that we always return the ReceiverSettleMode as FIRST since we don't support SECOND. --- .../proton/ProtonServerReceiverContext.java | 7 ++ .../proton/ProtonServerSenderContext.java | 14 ++- .../transport/amqp/client/AmqpReceiver.java | 59 ++++++++-- .../transport/amqp/client/AmqpSender.java | 59 ++++++++-- .../transport/amqp/client/AmqpSession.java | 76 ++++++++++++- .../integration/amqp/AmqpReceiverTest.java | 99 +++++++++++++++++ .../integration/amqp/AmqpSenderTest.java | 104 ++++++++++++++++++ 7 files changed, 396 insertions(+), 22 deletions(-) create mode 100644 tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverTest.java create mode 100644 tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSenderTest.java diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java index 26064828a4..57d7307ee5 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java @@ -31,6 +31,7 @@ import org.apache.qpid.proton.amqp.messaging.Rejected; import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy; import org.apache.qpid.proton.amqp.transaction.TransactionalState; import org.apache.qpid.proton.amqp.transport.ErrorCondition; +import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode; import org.apache.qpid.proton.engine.Delivery; import org.apache.qpid.proton.engine.Receiver; import org.jboss.logging.Logger; @@ -81,6 +82,12 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements super.initialise(); org.apache.qpid.proton.amqp.messaging.Target target = (org.apache.qpid.proton.amqp.messaging.Target) receiver.getRemoteTarget(); + // Match the settlement mode of the remote instead of relying on the default of MIXED. + receiver.setSenderSettleMode(receiver.getRemoteSenderSettleMode()); + + // We don't currently support SECOND so enforce that the answer is anlways FIRST + receiver.setReceiverSettleMode(ReceiverSettleMode.FIRST); + if (target != null) { if (target.getDynamic()) { // if dynamic we have to create the node (queue) and set the address on the target, the node is temporary and diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java index 4d8bf53225..f8fa473720 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java @@ -22,8 +22,6 @@ import java.util.Objects; import java.util.Set; import java.util.concurrent.TimeUnit; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.PooledByteBufAllocator; import org.apache.activemq.artemis.api.core.ActiveMQExceptionType; import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException; import org.apache.activemq.artemis.api.core.Message; @@ -63,12 +61,16 @@ import org.apache.qpid.proton.amqp.transaction.TransactionalState; import org.apache.qpid.proton.amqp.transport.AmqpError; import org.apache.qpid.proton.amqp.transport.DeliveryState; import org.apache.qpid.proton.amqp.transport.ErrorCondition; +import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode; import org.apache.qpid.proton.amqp.transport.SenderSettleMode; import org.apache.qpid.proton.engine.Delivery; import org.apache.qpid.proton.engine.EndpointState; import org.apache.qpid.proton.engine.Sender; import org.jboss.logging.Logger; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.PooledByteBufAllocator; + /** * TODO: Merge {@link ProtonServerSenderContext} and {@link org.apache.activemq.artemis.protocol.amqp.client.ProtonClientSenderContext} once we support 'global' link names. The split is a workaround for outgoing links */ @@ -155,6 +157,12 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr String selector = null; final Map supportedFilters = new HashMap<>(); + // Match the settlement mode of the remote instead of relying on the default of MIXED. + sender.setSenderSettleMode(sender.getRemoteSenderSettleMode()); + + // We don't currently support SECOND so enforce that the answer is anlways FIRST + sender.setReceiverSettleMode(ReceiverSettleMode.FIRST); + if (source != null) { // We look for message selectors on every receiver, while in other cases we might only // consume the filter depending on the subscription type. @@ -570,7 +578,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr Modified modification = (Modified) remoteState; if (Boolean.TRUE.equals(modification.getUndeliverableHere())) { - message.rejectConsumer(((Consumer) brokerConsumer).sequentialID()); + message.rejectConsumer(brokerConsumer.sequentialID()); } if (Boolean.TRUE.equals(modification.getDeliveryFailed())) { diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java index 414f93311a..9de2fcefe8 100644 --- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java +++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. @@ -70,7 +70,10 @@ public class AmqpReceiver extends AmqpAbstractResource { private final AmqpSession session; private final String address; private final String receiverId; + private final Source userSpecifiedSource; + private final SenderSettleMode userSpecifiedSenderSettlementMode; + private final ReceiverSettleMode userSpecifiedReceiverSettlementMode; private String subscriptionName; private String selector; @@ -83,11 +86,32 @@ public class AmqpReceiver extends AmqpAbstractResource { /** * Create a new receiver instance. * - * @param session The parent session that created the receiver. - * @param address The address that this receiver should listen on. - * @param receiverId The unique ID assigned to this receiver. + * @param session + * The parent session that created the receiver. + * @param address + * The address that this receiver should listen on. + * @param receiverId + * The unique ID assigned to this receiver. */ public AmqpReceiver(AmqpSession session, String address, String receiverId) { + this(session, address, receiverId, null, null); + } + + /** + * Create a new receiver instance. + * + * @param session + * The parent session that created the receiver. + * @param address + * The address that this receiver should listen on. + * @param receiverId + * The unique ID assigned to this receiver. + * @param senderMode + * The {@link SenderSettleMode} to use on open. + * @param receiverMode + * The {@link ReceiverSettleMode} to use on open. + */ + public AmqpReceiver(AmqpSession session, String address, String receiverId, SenderSettleMode senderMode, ReceiverSettleMode receiverMode) { if (address != null && address.isEmpty()) { throw new IllegalArgumentException("Address cannot be empty."); @@ -97,6 +121,8 @@ public class AmqpReceiver extends AmqpAbstractResource { this.session = session; this.address = address; this.receiverId = receiverId; + this.userSpecifiedSenderSettlementMode = senderMode; + this.userSpecifiedReceiverSettlementMode = receiverMode; } /** @@ -113,9 +139,11 @@ public class AmqpReceiver extends AmqpAbstractResource { } this.session = session; - this.userSpecifiedSource = source; this.address = source.getAddress(); this.receiverId = receiverId; + this.userSpecifiedSource = source; + this.userSpecifiedSenderSettlementMode = null; + this.userSpecifiedReceiverSettlementMode = null; } /** @@ -687,12 +715,25 @@ public class AmqpReceiver extends AmqpAbstractResource { Receiver receiver = session.getEndpoint().receiver(receiverName); receiver.setSource(source); receiver.setTarget(target); - if (isPresettle()) { - receiver.setSenderSettleMode(SenderSettleMode.SETTLED); + + if (userSpecifiedSenderSettlementMode != null) { + receiver.setSenderSettleMode(userSpecifiedSenderSettlementMode); + if (SenderSettleMode.SETTLED.equals(userSpecifiedSenderSettlementMode)) { + setPresettle(true); + } } else { - receiver.setSenderSettleMode(SenderSettleMode.UNSETTLED); + if (isPresettle()) { + receiver.setSenderSettleMode(SenderSettleMode.SETTLED); + } else { + receiver.setSenderSettleMode(SenderSettleMode.UNSETTLED); + } + } + + if (userSpecifiedReceiverSettlementMode != null) { + receiver.setReceiverSettleMode(userSpecifiedReceiverSettlementMode); + } else { + receiver.setReceiverSettleMode(ReceiverSettleMode.FIRST); } - receiver.setReceiverSettleMode(ReceiverSettleMode.FIRST); setEndpoint(receiver); diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSender.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSender.java index 0a41ce60d1..c9bc0d6ba3 100644 --- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSender.java +++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSender.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. @@ -66,7 +66,10 @@ public class AmqpSender extends AmqpAbstractResource { private final AmqpSession session; private final String address; private final String senderId; + private final Target userSpecifiedTarget; + private final SenderSettleMode userSpecifiedSenderSettlementMode; + private final ReceiverSettleMode userSpecifiedReceiverSettlementMode; private boolean presettle; private long sendTimeout = DEFAULT_SEND_TIMEOUT; @@ -81,11 +84,32 @@ public class AmqpSender extends AmqpAbstractResource { /** * Create a new sender instance. * - * @param session The parent session that created the session. - * @param address The address that this sender produces to. - * @param senderId The unique ID assigned to this sender. + * @param session + * The parent session that created the session. + * @param address + * The address that this sender produces to. + * @param senderId + * The unique ID assigned to this sender. */ public AmqpSender(AmqpSession session, String address, String senderId) { + this(session, address, senderId, null, null); + } + + /** + * Create a new sender instance. + * + * @param session + * The parent session that created the session. + * @param address + * The address that this sender produces to. + * @param senderId + * The unique ID assigned to this sender. + * @param senderMode + * The {@link SenderSettleMode} to use on open. + * @param receiverMode + * The {@link ReceiverSettleMode} to use on open. + */ + public AmqpSender(AmqpSession session, String address, String senderId, SenderSettleMode senderMode, ReceiverSettleMode receiverMode) { if (address != null && address.isEmpty()) { throw new IllegalArgumentException("Address cannot be empty."); @@ -95,6 +119,8 @@ public class AmqpSender extends AmqpAbstractResource { this.address = address; this.senderId = senderId; this.userSpecifiedTarget = null; + this.userSpecifiedSenderSettlementMode = senderMode; + this.userSpecifiedReceiverSettlementMode = receiverMode; } /** @@ -111,9 +137,11 @@ public class AmqpSender extends AmqpAbstractResource { } this.session = session; - this.userSpecifiedTarget = target; this.address = target.getAddress(); this.senderId = senderId; + this.userSpecifiedTarget = target; + this.userSpecifiedSenderSettlementMode = null; + this.userSpecifiedReceiverSettlementMode = null; } /** @@ -289,12 +317,25 @@ public class AmqpSender extends AmqpAbstractResource { Sender sender = session.getEndpoint().sender(senderName); sender.setSource(source); sender.setTarget(target); - if (presettle) { - sender.setSenderSettleMode(SenderSettleMode.SETTLED); + + if (userSpecifiedSenderSettlementMode != null) { + sender.setSenderSettleMode(userSpecifiedSenderSettlementMode); + if (SenderSettleMode.SETTLED.equals(userSpecifiedSenderSettlementMode)) { + presettle = true; + } } else { - sender.setSenderSettleMode(SenderSettleMode.UNSETTLED); + if (presettle) { + sender.setSenderSettleMode(SenderSettleMode.SETTLED); + } else { + sender.setSenderSettleMode(SenderSettleMode.UNSETTLED); + } + } + + if (userSpecifiedReceiverSettlementMode != null) { + sender.setReceiverSettleMode(userSpecifiedReceiverSettlementMode); + } else { + sender.setReceiverSettleMode(ReceiverSettleMode.FIRST); } - sender.setReceiverSettleMode(ReceiverSettleMode.FIRST); sender.setDesiredCapabilities(desiredCapabilities); sender.setOfferedCapabilities(offeredCapabilities); diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSession.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSession.java index 9589a7eaca..677b354ff4 100644 --- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSession.java +++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSession.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. @@ -28,6 +28,8 @@ import org.apache.activemq.transport.amqp.client.util.UnmodifiableSession; import org.apache.qpid.proton.amqp.Symbol; import org.apache.qpid.proton.amqp.messaging.Source; import org.apache.qpid.proton.amqp.messaging.Target; +import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode; +import org.apache.qpid.proton.amqp.transport.SenderSettleMode; import org.apache.qpid.proton.engine.Connection; import org.apache.qpid.proton.engine.Session; @@ -173,6 +175,42 @@ public class AmqpSession extends AmqpAbstractResource { return sender; } + /** + * Create a sender instance using the given address + * + * @param address + * the address to which the sender will produce its messages. + * @param senderSettlementMode + * controls the settlement mode used by the created Sender + * @param receiverSettlementMode + * controls the desired settlement mode used by the remote Receiver + * + * @return a newly created sender that is ready for use. + * + * @throws Exception if an error occurs while creating the sender. + */ + public AmqpSender createSender(final String address, final SenderSettleMode senderMode, ReceiverSettleMode receiverMode) throws Exception { + checkClosed(); + + final AmqpSender sender = new AmqpSender(AmqpSession.this, address, getNextSenderId(), senderMode, receiverMode); + final ClientFuture request = new ClientFuture(); + + connection.getScheduler().execute(new Runnable() { + + @Override + public void run() { + checkClosed(); + sender.setStateInspector(getStateInspector()); + sender.open(request); + pumpToProtonTransport(request); + } + }); + + request.sync(); + + return sender; + } + /** * Create a sender instance using the given Target * @@ -309,6 +347,42 @@ public class AmqpSession extends AmqpAbstractResource { return receiver; } + /** + * Create a receiver instance using the given address + * + * @param address + * the address to which the receiver will subscribe for its messages. + * @param senderSettlementMode + * controls the desired settlement mode used by the remote Sender + * @param receiverSettlementMode + * controls the settlement mode used by the created Receiver + * + * @return a newly created receiver that is ready for use. + * + * @throws Exception if an error occurs while creating the receiver. + */ + public AmqpReceiver createReceiver(String address, SenderSettleMode senderMode, ReceiverSettleMode receiverMode) throws Exception { + checkClosed(); + + final ClientFuture request = new ClientFuture(); + final AmqpReceiver receiver = new AmqpReceiver(AmqpSession.this, address, getNextReceiverId(), senderMode, receiverMode); + + connection.getScheduler().execute(new Runnable() { + + @Override + public void run() { + checkClosed(); + receiver.setStateInspector(getStateInspector()); + receiver.open(request); + pumpToProtonTransport(request); + } + }); + + request.sync(); + + return receiver; + } + /** * Create a receiver instance using the given Source * diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverTest.java new file mode 100644 index 0000000000..b47ad505eb --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverTest.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.amqp; + +import org.apache.activemq.artemis.core.server.Queue; +import org.apache.activemq.transport.amqp.client.AmqpClient; +import org.apache.activemq.transport.amqp.client.AmqpConnection; +import org.apache.activemq.transport.amqp.client.AmqpReceiver; +import org.apache.activemq.transport.amqp.client.AmqpSession; +import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode; +import org.apache.qpid.proton.amqp.transport.SenderSettleMode; +import org.junit.Test; + +/** + * Test various behaviors of AMQP receivers with the broker. + */ +public class AmqpReceiverTest extends AmqpClientTestSupport { + + @Test(timeout = 60000) + public void testSenderSettlementModeSettledIsHonored() throws Exception { + doTestSenderSettlementModeIsHonored(SenderSettleMode.SETTLED); + } + + @Test(timeout = 60000) + public void testSenderSettlementModeUnsettledIsHonored() throws Exception { + doTestSenderSettlementModeIsHonored(SenderSettleMode.UNSETTLED); + } + + @Test(timeout = 60000) + public void testSenderSettlementModeMixedIsHonored() throws Exception { + doTestSenderSettlementModeIsHonored(SenderSettleMode.MIXED); + } + + public void doTestSenderSettlementModeIsHonored(SenderSettleMode settleMode) throws Exception { + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + AmqpSession session = connection.createSession(); + + AmqpReceiver receiver = session.createReceiver("queue://" + getTestName(), settleMode, ReceiverSettleMode.FIRST); + + Queue queueView = getProxyToQueue(getQueueName()); + assertNotNull(queueView); + assertEquals(0, queueView.getMessageCount()); + assertEquals(1, server.getTotalConsumerCount()); + + assertEquals(settleMode, receiver.getEndpoint().getRemoteSenderSettleMode()); + + receiver.close(); + + connection.close(); + } + + @Test(timeout = 60000) + public void testReceiverSettlementModeSetToFirst() throws Exception { + doTestReceiverSettlementModeForcedToFirst(ReceiverSettleMode.FIRST); + } + + @Test(timeout = 60000) + public void testReceiverSettlementModeSetToSecond() throws Exception { + doTestReceiverSettlementModeForcedToFirst(ReceiverSettleMode.SECOND); + } + + /* + * The Broker does not currently support ReceiverSettleMode of SECOND so we ensure that it + * always drops that back to FIRST to let the client know. The client will need to check and + * react accordingly. + */ + private void doTestReceiverSettlementModeForcedToFirst(ReceiverSettleMode modeToUse) throws Exception { + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + AmqpSession session = connection.createSession(); + + AmqpReceiver receiver = session.createReceiver("queue://" + getTestName(), SenderSettleMode.MIXED, modeToUse); + + Queue queueView = getProxyToQueue(getQueueName()); + assertNotNull(queueView); + assertEquals(0, queueView.getMessageCount()); + assertEquals(1, server.getTotalConsumerCount()); + + assertEquals(ReceiverSettleMode.FIRST, receiver.getEndpoint().getRemoteReceiverSettleMode()); + + receiver.close(); + connection.close(); + } +} \ No newline at end of file diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSenderTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSenderTest.java new file mode 100644 index 0000000000..7b8cbefeea --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSenderTest.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.amqp; + +import org.apache.activemq.artemis.core.server.Queue; +import org.apache.activemq.transport.amqp.client.AmqpClient; +import org.apache.activemq.transport.amqp.client.AmqpConnection; +import org.apache.activemq.transport.amqp.client.AmqpMessage; +import org.apache.activemq.transport.amqp.client.AmqpSender; +import org.apache.activemq.transport.amqp.client.AmqpSession; +import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode; +import org.apache.qpid.proton.amqp.transport.SenderSettleMode; +import org.junit.Test; + +/** + * Test broker behavior when creating AMQP senders + */ +public class AmqpSenderTest extends AmqpClientTestSupport { + + @Test(timeout = 60000) + public void testSenderSettlementModeSettledIsHonored() throws Exception { + doTestSenderSettlementModeIsHonored(SenderSettleMode.SETTLED); + } + + @Test(timeout = 60000) + public void testSenderSettlementModeUnsettledIsHonored() throws Exception { + doTestSenderSettlementModeIsHonored(SenderSettleMode.UNSETTLED); + } + + @Test(timeout = 60000) + public void testSenderSettlementModeMixedIsHonored() throws Exception { + doTestSenderSettlementModeIsHonored(SenderSettleMode.MIXED); + } + + public void doTestSenderSettlementModeIsHonored(SenderSettleMode settleMode) throws Exception { + AmqpClient client = createAmqpClient(); + + AmqpConnection connection = addConnection(client.connect()); + AmqpSession session = connection.createSession(); + + AmqpSender sender = session.createSender("queue://" + getTestName(), settleMode, ReceiverSettleMode.FIRST); + + Queue queueView = getProxyToQueue(getQueueName()); + assertNotNull(queueView); + assertEquals(0, queueView.getMessageCount()); + + assertEquals(settleMode, sender.getEndpoint().getRemoteSenderSettleMode()); + + AmqpMessage message = new AmqpMessage(); + message.setText("Test-Message"); + sender.send(message); + + sender.close(); + + connection.close(); + } + + @Test(timeout = 60000) + public void testReceiverSettlementModeSetToFirst() throws Exception { + doTestReceiverSettlementModeForcedToFirst(ReceiverSettleMode.FIRST); + } + + @Test(timeout = 60000) + public void testReceiverSettlementModeSetToSecond() throws Exception { + doTestReceiverSettlementModeForcedToFirst(ReceiverSettleMode.SECOND); + } + + /* + * The Broker does not currently support ReceiverSettleMode of SECOND so we ensure that it + * always drops that back to FIRST to let the client know. The client will need to check and + * react accordingly. + */ + private void doTestReceiverSettlementModeForcedToFirst(ReceiverSettleMode modeToUse) throws Exception { + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + AmqpSession session = connection.createSession(); + + AmqpSender sender = session.createSender("queue://" + getTestName(), SenderSettleMode.UNSETTLED, modeToUse); + + Queue queueView = getProxyToQueue(getQueueName()); + assertNotNull(queueView); + assertEquals(0, queueView.getMessageCount()); + + assertEquals(ReceiverSettleMode.FIRST, sender.getEndpoint().getRemoteReceiverSettleMode()); + + sender.close(); + + connection.close(); + } +} \ No newline at end of file