diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java index 5931afea99..7e44e7948a 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java @@ -326,7 +326,8 @@ public class AMQPSessionCallback implements SessionCallback { public void cancel(Object brokerConsumer, Message message, boolean updateCounts) throws Exception { recoverContext(); try { - ((ServerConsumer) brokerConsumer).individualCancel(message.getMessageID(), updateCounts); + ((ServerConsumer) brokerConsumer).individualCancel(message.getMessageID(), updateCounts);; + ((ServerConsumer) brokerConsumer).getQueue().forceDelivery(); } finally { resetContext(); } @@ -560,7 +561,6 @@ public class AMQPSessionCallback implements SessionCallback { Transaction tx = protonSPI.getTransaction(txid); tx.rollback(); protonSPI.removeTransaction(txid); - } public SimpleString getMatchingQueue(SimpleString address, RoutingType routingType) throws Exception { diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverDispositionTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverDispositionTest.java index d92fa0feec..f206654654 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverDispositionTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverDispositionTest.java @@ -44,6 +44,10 @@ public class AmqpReceiverDispositionTest extends AmqpClientTestSupport { receiver1.flow(1); AmqpMessage message = receiver1.receive(5, TimeUnit.SECONDS); + + AmqpReceiver receiver2 = session.createReceiver(getTestName()); + + assertNotNull("did not receive message first time", message); assertEquals("MessageID:0", message.getMessageId()); @@ -51,12 +55,11 @@ public class AmqpReceiverDispositionTest extends AmqpClientTestSupport { assertNotNull(protonMessage); assertEquals("Unexpected initial value for AMQP delivery-count", 0, protonMessage.getDeliveryCount()); + receiver2.flow(1); message.release(); - // Read the message again and validate its state - AmqpReceiver receiver2 = session.createReceiver(getTestName()); - receiver2.flow(1); + // Read the message again and validate its state message = receiver2.receive(10, TimeUnit.SECONDS); assertNotNull("did not receive message again", message); assertEquals("MessageID:0", message.getMessageId()); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java index 39daee4727..1308c37b3b 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java @@ -110,7 +110,6 @@ public class ProtonTest extends ProtonTestBase { private static final String amqpConnectionUri = "amqp://localhost:5672"; private static final String tcpAmqpConnectionUri = "tcp://localhost:5672"; - private static final String brokerName = "my-broker"; private static final long maxSizeBytes = 1 * 1024 * 1024; @@ -472,7 +471,7 @@ public class ProtonTest extends ProtonTestBase { session.close(); Queue q = (Queue) server.getPostOffice().getBinding(new SimpleString(coreAddress)).getBindable(); //because tx commit is executed async on broker, we use a timed wait. - assertTrue(TimeUtils.waitOnBoolean(true, 10000, ()-> q.getMessageCount() == 10)); + assertTrue(TimeUtils.waitOnBoolean(true, 10000, () -> q.getMessageCount() == 10)); } @Test @@ -548,7 +547,7 @@ public class ProtonTest extends ProtonTestBase { session.rollback(); Queue q = (Queue) server.getPostOffice().getBinding(new SimpleString(coreAddress)).getBindable(); //because tx rollback is executed async on broker, we use a timed wait. - assertTrue(TimeUtils.waitOnBoolean(true, 10000, ()-> q.getMessageCount() == 10)); + assertTrue(TimeUtils.waitOnBoolean(true, 10000, () -> q.getMessageCount() == 10)); } @@ -1855,4 +1854,32 @@ public class ProtonTest extends ProtonTestBase { return count; } } + + @Test + public void testReleaseDisposition() throws Exception { + AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password); + AmqpConnection connection = client.connect(); + try { + AmqpSession session = connection.createSession(); + + AmqpSender sender = session.createSender(address); + AmqpMessage message = new AmqpMessage(); + message.setText("Test-Message"); + sender.send(message); + + AmqpReceiver receiver = session.createReceiver(address); + receiver.flow(10); + + AmqpMessage m1 = receiver.receive(5, TimeUnit.SECONDS); + assertNotNull(m1); + m1.release(); + + //receiver.flow(10); + AmqpMessage m2 = receiver.receive(5, TimeUnit.SECONDS); + assertNotNull(m2); + m2.accept(); + } finally { + connection.close(); + } + } }