diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java index 76279c5e1a..adb7acc00b 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java @@ -482,6 +482,8 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr sender.send(nettyBuffer.array(), nettyBuffer.arrayOffset() + nettyBuffer.readerIndex(), nettyBuffer.readableBytes()); if (preSettle) { + // Presettled means the client implicitly accepts any delivery we send it. + sessionSPI.ack(null, brokerConsumer, message); delivery.settle(); } else { sender.advance(); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpPresettledReceiverTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpPresettledReceiverTest.java new file mode 100644 index 0000000000..657aff702d --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpPresettledReceiverTest.java @@ -0,0 +1,270 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.amqp; + +import java.util.concurrent.TimeUnit; + +import org.apache.activemq.artemis.core.server.Queue; +import org.apache.activemq.transport.amqp.client.AmqpClient; +import org.apache.activemq.transport.amqp.client.AmqpConnection; +import org.apache.activemq.transport.amqp.client.AmqpMessage; +import org.apache.activemq.transport.amqp.client.AmqpReceiver; +import org.apache.activemq.transport.amqp.client.AmqpSender; +import org.apache.activemq.transport.amqp.client.AmqpSession; +import org.junit.Test; + +/** + * Test various behaviors of AMQP receivers with the broker. + */ +public class AmqpPresettledReceiverTest extends AmqpClientTestSupport { + + @Test(timeout = 60000) + public void testPresettledReceiverAndNonPresettledReceiverOnSameQueue() throws Exception { + final int MSG_COUNT = 2; + sendMessages(getTestName(), MSG_COUNT); + + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + AmqpSession session = connection.createSession(); + + AmqpReceiver receiver1 = session.createReceiver(getTestName(), null, false, true); + AmqpReceiver receiver2 = session.createReceiver(getTestName()); + + final Queue queueView = getProxyToQueue(getTestName()); + assertEquals(MSG_COUNT, queueView.getMessageCount()); + + receiver1.flow(1); + receiver2.flow(1); + + AmqpMessage message1 = receiver1.receive(5, TimeUnit.SECONDS); + AmqpMessage message2 = receiver2.receive(5, TimeUnit.SECONDS); + + assertNotNull(message1); + assertNotNull(message2); + + // Receiver 1 is presettled so messages are not accepted. + assertTrue(message1.getWrappedDelivery().remotelySettled()); + + // Receiver 2 is not presettled so it needs to accept. + message2.accept(); + + receiver1.close(); + receiver2.close(); + + System.out.println("Message Count after all consumed: " + queueView.getMessageCount()); + + // Should be nothing left on the Queue + AmqpReceiver receiver3 = session.createReceiver(getTestName()); + receiver3.flow(1); + + AmqpMessage received = receiver3.receive(5, TimeUnit.SECONDS); + if (received != null) { + System.out.println("Message read: " + received.getMessageId()); + } + assertNull(received); + + assertEquals(0, queueView.getMessageCount()); + + connection.close(); + } + + @Test(timeout = 60000) + public void testPresettledReceiverReadsAllMessages() throws Exception { + final int MSG_COUNT = 100; + sendMessages(getTestName(), MSG_COUNT); + + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + AmqpSession session = connection.createSession(); + + AmqpReceiver receiver = session.createReceiver(getTestName(), null, false, true); + + final Queue queueView = getProxyToQueue(getTestName()); + assertEquals(MSG_COUNT, queueView.getMessageCount()); + + receiver.flow(MSG_COUNT); + for (int i = 0; i < MSG_COUNT; ++i) { + assertNotNull(receiver.receive(5, TimeUnit.SECONDS)); + } + receiver.close(); + + System.out.println("Message Count after all consumed: " + queueView.getMessageCount()); + + // Open a new receiver and see if any message are left on the Queue + receiver = session.createReceiver(getTestName()); + receiver.flow(1); + AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS); + if (received != null) { + System.out.println("Message read: " + received.getMessageId()); + } + assertNull(received); + + assertEquals(0, queueView.getMessageCount()); + + connection.close(); + } + + @Test(timeout = 60000) + public void testPresettledReceiverReadsAllMessagesInWhenReadInBatches() throws Exception { + final int MSG_COUNT = 100; + sendMessages(getTestName(), MSG_COUNT); + + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + AmqpSession session = connection.createSession(); + + AmqpReceiver receiver = session.createReceiver(getTestName(), null, false, true); + + final Queue queueView = getProxyToQueue(getTestName()); + assertEquals(MSG_COUNT, queueView.getMessageCount()); + + // Consume all 100 but do so in batches by flowing only limited credit. + + receiver.flow(20); + // consume less that flow + for (int j = 0; j < 10; j++) { + assertNotNull(receiver.receive(5, TimeUnit.SECONDS)); + } + + // flow more and consume all + receiver.flow(10); + for (int j = 0; j < 20; j++) { + assertNotNull(receiver.receive(5, TimeUnit.SECONDS)); + } + + // remainder + receiver.flow(70); + for (int j = 0; j < 70; j++) { + assertNotNull(receiver.receive(5, TimeUnit.SECONDS)); + } + + receiver.close(); + + System.out.println("Message Count after all consumed: " + queueView.getMessageCount()); + + // Open a new receiver and see if any message are left on the Queue + receiver = session.createReceiver(getTestName()); + receiver.flow(1); + AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS); + if (received != null) { + System.out.println("Message read: " + received.getMessageId()); + } + assertNull(received); + + assertEquals(0, queueView.getMessageCount()); + + connection.close(); + } + + @Test(timeout = 60000) + public void testPresettledReceiverWithinBoundsOfActiveTXWithCommit() throws Exception { + doTestPresettledReceiverWithinBoundsOfActiveTX(true); + } + + @Test(timeout = 60000) + public void testPresettledReceiverWithinBoundsOfActiveTXWithRollback() throws Exception { + doTestPresettledReceiverWithinBoundsOfActiveTX(false); + } + + private void doTestPresettledReceiverWithinBoundsOfActiveTX(boolean commit) throws Exception { + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + AmqpSession session = connection.createSession(); + + AmqpSender sender = session.createSender(getTestName()); + final Queue queue = getProxyToQueue(getTestName()); + + AmqpMessage message = new AmqpMessage(); + message.setText("Test-Message"); + sender.send(message); + + assertEquals(1, queue.getMessageCount()); + + AmqpReceiver receiver = session.createReceiver(getTestName(), null, false, true); + + session.begin(); + + receiver.flow(1); + AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS); + assertNotNull(received); + assertTrue(received.getWrappedDelivery().remotelySettled()); + + if (commit) { + session.commit(); + } else { + session.rollback(); + } + + assertEquals(0, queue.getMessageCount()); + + sender.close(); + connection.close(); + } + + @Test(timeout = 60000) + public void testPresettledReceiverWithinBoundsOfActiveTXWithSendAndRollback() throws Exception { + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + AmqpSession session = connection.createSession(); + + AmqpSender sender = session.createSender(getTestName()); + final Queue queue = getProxyToQueue(getTestName()); + + AmqpMessage message = new AmqpMessage(); + message.setText("Test-Message"); + sender.send(message); + + assertEquals(1, queue.getMessageCount()); + + AmqpReceiver receiver = session.createReceiver(getTestName(), null, false, true); + + session.begin(); + + receiver.flow(1); + AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS); + assertNotNull(received); + assertTrue(received.getWrappedDelivery().remotelySettled()); + + message = new AmqpMessage(); + message.setText("Test-Message - Rolled Back"); + sender.send(message); + + session.rollback(); + + assertEquals(0, queue.getMessageCount()); + + sender.close(); + connection.close(); + } + + public void sendMessages(String destinationName, int count) throws Exception { + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + try { + AmqpSession session = connection.createSession(); + AmqpSender sender = session.createSender(destinationName); + + for (int i = 0; i < count; ++i) { + AmqpMessage message = new AmqpMessage(); + message.setMessageId("MessageID:" + i); + sender.send(message); + } + } finally { + connection.close(); + } + } +}