diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java index 2379d5e648..923eadf699 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java @@ -3919,10 +3919,14 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { return; } - if (sorted) { - addSorted(refs, false); - } else { - addHead(refs, false); + + // if the queue is non-destructive then any ack is ignored so no need to add messages back onto the queue + if (!isNonDestructive()) { + if (sorted) { + addSorted(refs, false); + } else { + addHead(refs, false); + } } } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSNonDestructiveTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSNonDestructiveTest.java index 8b54045140..7d96e287c4 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSNonDestructiveTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSNonDestructiveTest.java @@ -16,7 +16,6 @@ */ package org.apache.activemq.artemis.tests.integration.amqp; - import javax.jms.Connection; import javax.jms.JMSException; import javax.jms.MessageConsumer; @@ -329,6 +328,30 @@ public class JMSNonDestructiveTest extends JMSClientTestSupport { } + @Test + public void testMessageCount() throws Exception { + sendMessage(CoreConnection, NON_DESTRUCTIVE_QUEUE_NAME); + + QueueBinding queueBinding = (QueueBinding) server.getPostOffice().getBinding(SimpleString.toSimpleString(NON_DESTRUCTIVE_QUEUE_NAME)); + assertEquals("Ensure Message count", 1, queueBinding.getQueue().getMessageCount()); + + //Consume Once + receive(CoreConnection, NON_DESTRUCTIVE_QUEUE_NAME); + assertEquals("Ensure Message count", 1, queueBinding.getQueue().getMessageCount()); + + sendMessage(CoreConnection, NON_DESTRUCTIVE_QUEUE_NAME); + assertEquals("Ensure Message count", 2, queueBinding.getQueue().getMessageCount()); + + //Consume Again as should be non-destructive + receive(CoreConnection, NON_DESTRUCTIVE_QUEUE_NAME); + assertEquals("Ensure Message count", 2, queueBinding.getQueue().getMessageCount()); + + QueueControl control = (QueueControl) server.getManagementService().getResource(ResourceNames.QUEUE + NON_DESTRUCTIVE_QUEUE_NAME); + control.removeAllMessages(); + + assertEquals("Message count after clearing queue via queue control should be 0", 0, queueBinding.getQueue().getMessageCount()); + } + private void receive(ConnectionSupplier consumerConnectionSupplier, String queueName, int i) throws JMSException { try (Connection consumerConnection = consumerConnectionSupplier.createConnection()) {