diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java index 26064828a4..57d7307ee5 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java @@ -31,6 +31,7 @@ import org.apache.qpid.proton.amqp.messaging.Rejected; import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy; import org.apache.qpid.proton.amqp.transaction.TransactionalState; import org.apache.qpid.proton.amqp.transport.ErrorCondition; +import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode; import org.apache.qpid.proton.engine.Delivery; import org.apache.qpid.proton.engine.Receiver; import org.jboss.logging.Logger; @@ -81,6 +82,12 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements super.initialise(); org.apache.qpid.proton.amqp.messaging.Target target = (org.apache.qpid.proton.amqp.messaging.Target) receiver.getRemoteTarget(); + // Match the settlement mode of the remote instead of relying on the default of MIXED. + receiver.setSenderSettleMode(receiver.getRemoteSenderSettleMode()); + + // We don't currently support SECOND so enforce that the answer is anlways FIRST + receiver.setReceiverSettleMode(ReceiverSettleMode.FIRST); + if (target != null) { if (target.getDynamic()) { // if dynamic we have to create the node (queue) and set the address on the target, the node is temporary and diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java index 4d8bf53225..f8fa473720 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java @@ -22,8 +22,6 @@ import java.util.Objects; import java.util.Set; import java.util.concurrent.TimeUnit; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.PooledByteBufAllocator; import org.apache.activemq.artemis.api.core.ActiveMQExceptionType; import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException; import org.apache.activemq.artemis.api.core.Message; @@ -63,12 +61,16 @@ import org.apache.qpid.proton.amqp.transaction.TransactionalState; import org.apache.qpid.proton.amqp.transport.AmqpError; import org.apache.qpid.proton.amqp.transport.DeliveryState; import org.apache.qpid.proton.amqp.transport.ErrorCondition; +import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode; import org.apache.qpid.proton.amqp.transport.SenderSettleMode; import org.apache.qpid.proton.engine.Delivery; import org.apache.qpid.proton.engine.EndpointState; import org.apache.qpid.proton.engine.Sender; import org.jboss.logging.Logger; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.PooledByteBufAllocator; + /** * TODO: Merge {@link ProtonServerSenderContext} and {@link org.apache.activemq.artemis.protocol.amqp.client.ProtonClientSenderContext} once we support 'global' link names. The split is a workaround for outgoing links */ @@ -155,6 +157,12 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr String selector = null; final Map supportedFilters = new HashMap<>(); + // Match the settlement mode of the remote instead of relying on the default of MIXED. + sender.setSenderSettleMode(sender.getRemoteSenderSettleMode()); + + // We don't currently support SECOND so enforce that the answer is anlways FIRST + sender.setReceiverSettleMode(ReceiverSettleMode.FIRST); + if (source != null) { // We look for message selectors on every receiver, while in other cases we might only // consume the filter depending on the subscription type. @@ -570,7 +578,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr Modified modification = (Modified) remoteState; if (Boolean.TRUE.equals(modification.getUndeliverableHere())) { - message.rejectConsumer(((Consumer) brokerConsumer).sequentialID()); + message.rejectConsumer(brokerConsumer.sequentialID()); } if (Boolean.TRUE.equals(modification.getDeliveryFailed())) { diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java index 414f93311a..9de2fcefe8 100644 --- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java +++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. @@ -70,7 +70,10 @@ public class AmqpReceiver extends AmqpAbstractResource { private final AmqpSession session; private final String address; private final String receiverId; + private final Source userSpecifiedSource; + private final SenderSettleMode userSpecifiedSenderSettlementMode; + private final ReceiverSettleMode userSpecifiedReceiverSettlementMode; private String subscriptionName; private String selector; @@ -83,11 +86,32 @@ public class AmqpReceiver extends AmqpAbstractResource { /** * Create a new receiver instance. * - * @param session The parent session that created the receiver. - * @param address The address that this receiver should listen on. - * @param receiverId The unique ID assigned to this receiver. + * @param session + * The parent session that created the receiver. + * @param address + * The address that this receiver should listen on. + * @param receiverId + * The unique ID assigned to this receiver. */ public AmqpReceiver(AmqpSession session, String address, String receiverId) { + this(session, address, receiverId, null, null); + } + + /** + * Create a new receiver instance. + * + * @param session + * The parent session that created the receiver. + * @param address + * The address that this receiver should listen on. + * @param receiverId + * The unique ID assigned to this receiver. + * @param senderMode + * The {@link SenderSettleMode} to use on open. + * @param receiverMode + * The {@link ReceiverSettleMode} to use on open. + */ + public AmqpReceiver(AmqpSession session, String address, String receiverId, SenderSettleMode senderMode, ReceiverSettleMode receiverMode) { if (address != null && address.isEmpty()) { throw new IllegalArgumentException("Address cannot be empty."); @@ -97,6 +121,8 @@ public class AmqpReceiver extends AmqpAbstractResource { this.session = session; this.address = address; this.receiverId = receiverId; + this.userSpecifiedSenderSettlementMode = senderMode; + this.userSpecifiedReceiverSettlementMode = receiverMode; } /** @@ -113,9 +139,11 @@ public class AmqpReceiver extends AmqpAbstractResource { } this.session = session; - this.userSpecifiedSource = source; this.address = source.getAddress(); this.receiverId = receiverId; + this.userSpecifiedSource = source; + this.userSpecifiedSenderSettlementMode = null; + this.userSpecifiedReceiverSettlementMode = null; } /** @@ -687,12 +715,25 @@ public class AmqpReceiver extends AmqpAbstractResource { Receiver receiver = session.getEndpoint().receiver(receiverName); receiver.setSource(source); receiver.setTarget(target); - if (isPresettle()) { - receiver.setSenderSettleMode(SenderSettleMode.SETTLED); + + if (userSpecifiedSenderSettlementMode != null) { + receiver.setSenderSettleMode(userSpecifiedSenderSettlementMode); + if (SenderSettleMode.SETTLED.equals(userSpecifiedSenderSettlementMode)) { + setPresettle(true); + } } else { - receiver.setSenderSettleMode(SenderSettleMode.UNSETTLED); + if (isPresettle()) { + receiver.setSenderSettleMode(SenderSettleMode.SETTLED); + } else { + receiver.setSenderSettleMode(SenderSettleMode.UNSETTLED); + } + } + + if (userSpecifiedReceiverSettlementMode != null) { + receiver.setReceiverSettleMode(userSpecifiedReceiverSettlementMode); + } else { + receiver.setReceiverSettleMode(ReceiverSettleMode.FIRST); } - receiver.setReceiverSettleMode(ReceiverSettleMode.FIRST); setEndpoint(receiver); diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSender.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSender.java index 0a41ce60d1..c9bc0d6ba3 100644 --- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSender.java +++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSender.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. @@ -66,7 +66,10 @@ public class AmqpSender extends AmqpAbstractResource { private final AmqpSession session; private final String address; private final String senderId; + private final Target userSpecifiedTarget; + private final SenderSettleMode userSpecifiedSenderSettlementMode; + private final ReceiverSettleMode userSpecifiedReceiverSettlementMode; private boolean presettle; private long sendTimeout = DEFAULT_SEND_TIMEOUT; @@ -81,11 +84,32 @@ public class AmqpSender extends AmqpAbstractResource { /** * Create a new sender instance. * - * @param session The parent session that created the session. - * @param address The address that this sender produces to. - * @param senderId The unique ID assigned to this sender. + * @param session + * The parent session that created the session. + * @param address + * The address that this sender produces to. + * @param senderId + * The unique ID assigned to this sender. */ public AmqpSender(AmqpSession session, String address, String senderId) { + this(session, address, senderId, null, null); + } + + /** + * Create a new sender instance. + * + * @param session + * The parent session that created the session. + * @param address + * The address that this sender produces to. + * @param senderId + * The unique ID assigned to this sender. + * @param senderMode + * The {@link SenderSettleMode} to use on open. + * @param receiverMode + * The {@link ReceiverSettleMode} to use on open. + */ + public AmqpSender(AmqpSession session, String address, String senderId, SenderSettleMode senderMode, ReceiverSettleMode receiverMode) { if (address != null && address.isEmpty()) { throw new IllegalArgumentException("Address cannot be empty."); @@ -95,6 +119,8 @@ public class AmqpSender extends AmqpAbstractResource { this.address = address; this.senderId = senderId; this.userSpecifiedTarget = null; + this.userSpecifiedSenderSettlementMode = senderMode; + this.userSpecifiedReceiverSettlementMode = receiverMode; } /** @@ -111,9 +137,11 @@ public class AmqpSender extends AmqpAbstractResource { } this.session = session; - this.userSpecifiedTarget = target; this.address = target.getAddress(); this.senderId = senderId; + this.userSpecifiedTarget = target; + this.userSpecifiedSenderSettlementMode = null; + this.userSpecifiedReceiverSettlementMode = null; } /** @@ -289,12 +317,25 @@ public class AmqpSender extends AmqpAbstractResource { Sender sender = session.getEndpoint().sender(senderName); sender.setSource(source); sender.setTarget(target); - if (presettle) { - sender.setSenderSettleMode(SenderSettleMode.SETTLED); + + if (userSpecifiedSenderSettlementMode != null) { + sender.setSenderSettleMode(userSpecifiedSenderSettlementMode); + if (SenderSettleMode.SETTLED.equals(userSpecifiedSenderSettlementMode)) { + presettle = true; + } } else { - sender.setSenderSettleMode(SenderSettleMode.UNSETTLED); + if (presettle) { + sender.setSenderSettleMode(SenderSettleMode.SETTLED); + } else { + sender.setSenderSettleMode(SenderSettleMode.UNSETTLED); + } + } + + if (userSpecifiedReceiverSettlementMode != null) { + sender.setReceiverSettleMode(userSpecifiedReceiverSettlementMode); + } else { + sender.setReceiverSettleMode(ReceiverSettleMode.FIRST); } - sender.setReceiverSettleMode(ReceiverSettleMode.FIRST); sender.setDesiredCapabilities(desiredCapabilities); sender.setOfferedCapabilities(offeredCapabilities); diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSession.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSession.java index 9589a7eaca..677b354ff4 100644 --- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSession.java +++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSession.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. @@ -28,6 +28,8 @@ import org.apache.activemq.transport.amqp.client.util.UnmodifiableSession; import org.apache.qpid.proton.amqp.Symbol; import org.apache.qpid.proton.amqp.messaging.Source; import org.apache.qpid.proton.amqp.messaging.Target; +import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode; +import org.apache.qpid.proton.amqp.transport.SenderSettleMode; import org.apache.qpid.proton.engine.Connection; import org.apache.qpid.proton.engine.Session; @@ -173,6 +175,42 @@ public class AmqpSession extends AmqpAbstractResource { return sender; } + /** + * Create a sender instance using the given address + * + * @param address + * the address to which the sender will produce its messages. + * @param senderSettlementMode + * controls the settlement mode used by the created Sender + * @param receiverSettlementMode + * controls the desired settlement mode used by the remote Receiver + * + * @return a newly created sender that is ready for use. + * + * @throws Exception if an error occurs while creating the sender. + */ + public AmqpSender createSender(final String address, final SenderSettleMode senderMode, ReceiverSettleMode receiverMode) throws Exception { + checkClosed(); + + final AmqpSender sender = new AmqpSender(AmqpSession.this, address, getNextSenderId(), senderMode, receiverMode); + final ClientFuture request = new ClientFuture(); + + connection.getScheduler().execute(new Runnable() { + + @Override + public void run() { + checkClosed(); + sender.setStateInspector(getStateInspector()); + sender.open(request); + pumpToProtonTransport(request); + } + }); + + request.sync(); + + return sender; + } + /** * Create a sender instance using the given Target * @@ -309,6 +347,42 @@ public class AmqpSession extends AmqpAbstractResource { return receiver; } + /** + * Create a receiver instance using the given address + * + * @param address + * the address to which the receiver will subscribe for its messages. + * @param senderSettlementMode + * controls the desired settlement mode used by the remote Sender + * @param receiverSettlementMode + * controls the settlement mode used by the created Receiver + * + * @return a newly created receiver that is ready for use. + * + * @throws Exception if an error occurs while creating the receiver. + */ + public AmqpReceiver createReceiver(String address, SenderSettleMode senderMode, ReceiverSettleMode receiverMode) throws Exception { + checkClosed(); + + final ClientFuture request = new ClientFuture(); + final AmqpReceiver receiver = new AmqpReceiver(AmqpSession.this, address, getNextReceiverId(), senderMode, receiverMode); + + connection.getScheduler().execute(new Runnable() { + + @Override + public void run() { + checkClosed(); + receiver.setStateInspector(getStateInspector()); + receiver.open(request); + pumpToProtonTransport(request); + } + }); + + request.sync(); + + return receiver; + } + /** * Create a receiver instance using the given Source * diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverTest.java new file mode 100644 index 0000000000..b47ad505eb --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverTest.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.amqp; + +import org.apache.activemq.artemis.core.server.Queue; +import org.apache.activemq.transport.amqp.client.AmqpClient; +import org.apache.activemq.transport.amqp.client.AmqpConnection; +import org.apache.activemq.transport.amqp.client.AmqpReceiver; +import org.apache.activemq.transport.amqp.client.AmqpSession; +import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode; +import org.apache.qpid.proton.amqp.transport.SenderSettleMode; +import org.junit.Test; + +/** + * Test various behaviors of AMQP receivers with the broker. + */ +public class AmqpReceiverTest extends AmqpClientTestSupport { + + @Test(timeout = 60000) + public void testSenderSettlementModeSettledIsHonored() throws Exception { + doTestSenderSettlementModeIsHonored(SenderSettleMode.SETTLED); + } + + @Test(timeout = 60000) + public void testSenderSettlementModeUnsettledIsHonored() throws Exception { + doTestSenderSettlementModeIsHonored(SenderSettleMode.UNSETTLED); + } + + @Test(timeout = 60000) + public void testSenderSettlementModeMixedIsHonored() throws Exception { + doTestSenderSettlementModeIsHonored(SenderSettleMode.MIXED); + } + + public void doTestSenderSettlementModeIsHonored(SenderSettleMode settleMode) throws Exception { + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + AmqpSession session = connection.createSession(); + + AmqpReceiver receiver = session.createReceiver("queue://" + getTestName(), settleMode, ReceiverSettleMode.FIRST); + + Queue queueView = getProxyToQueue(getQueueName()); + assertNotNull(queueView); + assertEquals(0, queueView.getMessageCount()); + assertEquals(1, server.getTotalConsumerCount()); + + assertEquals(settleMode, receiver.getEndpoint().getRemoteSenderSettleMode()); + + receiver.close(); + + connection.close(); + } + + @Test(timeout = 60000) + public void testReceiverSettlementModeSetToFirst() throws Exception { + doTestReceiverSettlementModeForcedToFirst(ReceiverSettleMode.FIRST); + } + + @Test(timeout = 60000) + public void testReceiverSettlementModeSetToSecond() throws Exception { + doTestReceiverSettlementModeForcedToFirst(ReceiverSettleMode.SECOND); + } + + /* + * The Broker does not currently support ReceiverSettleMode of SECOND so we ensure that it + * always drops that back to FIRST to let the client know. The client will need to check and + * react accordingly. + */ + private void doTestReceiverSettlementModeForcedToFirst(ReceiverSettleMode modeToUse) throws Exception { + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + AmqpSession session = connection.createSession(); + + AmqpReceiver receiver = session.createReceiver("queue://" + getTestName(), SenderSettleMode.MIXED, modeToUse); + + Queue queueView = getProxyToQueue(getQueueName()); + assertNotNull(queueView); + assertEquals(0, queueView.getMessageCount()); + assertEquals(1, server.getTotalConsumerCount()); + + assertEquals(ReceiverSettleMode.FIRST, receiver.getEndpoint().getRemoteReceiverSettleMode()); + + receiver.close(); + connection.close(); + } +} \ No newline at end of file diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSenderTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSenderTest.java new file mode 100644 index 0000000000..7b8cbefeea --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSenderTest.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.amqp; + +import org.apache.activemq.artemis.core.server.Queue; +import org.apache.activemq.transport.amqp.client.AmqpClient; +import org.apache.activemq.transport.amqp.client.AmqpConnection; +import org.apache.activemq.transport.amqp.client.AmqpMessage; +import org.apache.activemq.transport.amqp.client.AmqpSender; +import org.apache.activemq.transport.amqp.client.AmqpSession; +import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode; +import org.apache.qpid.proton.amqp.transport.SenderSettleMode; +import org.junit.Test; + +/** + * Test broker behavior when creating AMQP senders + */ +public class AmqpSenderTest extends AmqpClientTestSupport { + + @Test(timeout = 60000) + public void testSenderSettlementModeSettledIsHonored() throws Exception { + doTestSenderSettlementModeIsHonored(SenderSettleMode.SETTLED); + } + + @Test(timeout = 60000) + public void testSenderSettlementModeUnsettledIsHonored() throws Exception { + doTestSenderSettlementModeIsHonored(SenderSettleMode.UNSETTLED); + } + + @Test(timeout = 60000) + public void testSenderSettlementModeMixedIsHonored() throws Exception { + doTestSenderSettlementModeIsHonored(SenderSettleMode.MIXED); + } + + public void doTestSenderSettlementModeIsHonored(SenderSettleMode settleMode) throws Exception { + AmqpClient client = createAmqpClient(); + + AmqpConnection connection = addConnection(client.connect()); + AmqpSession session = connection.createSession(); + + AmqpSender sender = session.createSender("queue://" + getTestName(), settleMode, ReceiverSettleMode.FIRST); + + Queue queueView = getProxyToQueue(getQueueName()); + assertNotNull(queueView); + assertEquals(0, queueView.getMessageCount()); + + assertEquals(settleMode, sender.getEndpoint().getRemoteSenderSettleMode()); + + AmqpMessage message = new AmqpMessage(); + message.setText("Test-Message"); + sender.send(message); + + sender.close(); + + connection.close(); + } + + @Test(timeout = 60000) + public void testReceiverSettlementModeSetToFirst() throws Exception { + doTestReceiverSettlementModeForcedToFirst(ReceiverSettleMode.FIRST); + } + + @Test(timeout = 60000) + public void testReceiverSettlementModeSetToSecond() throws Exception { + doTestReceiverSettlementModeForcedToFirst(ReceiverSettleMode.SECOND); + } + + /* + * The Broker does not currently support ReceiverSettleMode of SECOND so we ensure that it + * always drops that back to FIRST to let the client know. The client will need to check and + * react accordingly. + */ + private void doTestReceiverSettlementModeForcedToFirst(ReceiverSettleMode modeToUse) throws Exception { + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + AmqpSession session = connection.createSession(); + + AmqpSender sender = session.createSender("queue://" + getTestName(), SenderSettleMode.UNSETTLED, modeToUse); + + Queue queueView = getProxyToQueue(getQueueName()); + assertNotNull(queueView); + assertEquals(0, queueView.getMessageCount()); + + assertEquals(ReceiverSettleMode.FIRST, sender.getEndpoint().getRemoteReceiverSettleMode()); + + sender.close(); + + connection.close(); + } +} \ No newline at end of file