diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java index 98f728d3cf..58f31ea889 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java @@ -1484,9 +1484,16 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { return iterQueue(flushLimit, filter1, new QueueIterateAction() { @Override public void actMessage(Transaction tx, MessageReference ref) throws Exception { + actMessage(tx, ref, true); + } + + @Override + public void actMessage(Transaction tx, MessageReference ref, boolean fromMessageReferences) throws Exception { incDelivering(ref); acknowledge(tx, ref, ackReason); - refRemoved(ref); + if (fromMessageReferences) { + refRemoved(ref); + } } }); } @@ -1558,7 +1565,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { if (filter1 == null || filter1.match(reference.getMessage())) { count++; txCount++; - messageAction.actMessage(tx, reference); + messageAction.actMessage(tx, reference, false); } else { addTail(reference, false); } @@ -2764,7 +2771,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { ref.acknowledge(tx, AckReason.KILLED); } else { ActiveMQServerLogger.LOGGER.messageExceededMaxDeliverySendtoDLA(ref, deadLetterAddress, name); - move(tx, deadLetterAddress,null, ref, false, AckReason.KILLED); + move(tx, deadLetterAddress, null, ref, false, AckReason.KILLED); } } else { ActiveMQServerLogger.LOGGER.messageExceededMaxDeliveryNoDLA(ref, name); @@ -3191,6 +3198,10 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { abstract class QueueIterateAction { public abstract void actMessage(Transaction tx, MessageReference ref) throws Exception; + + public void actMessage(Transaction tx, MessageReference ref, boolean fromMessageReferences) throws Exception { + actMessage(tx, ref); + } } /* For external use we need to use a synchronized version since the list is not thread safe */ diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java index 9478690db4..8bd5d0370c 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java @@ -19,6 +19,7 @@ package org.apache.activemq.artemis.tests.integration.management; import static org.apache.activemq.artemis.core.management.impl.openmbean.CompositeDataConstants.BODY; import java.lang.reflect.Field; +import java.nio.ByteBuffer; import java.util.Arrays; import java.util.Collection; import java.util.HashMap; @@ -36,6 +37,7 @@ import javax.json.JsonObject; import javax.management.Notification; import javax.management.openmbean.CompositeData; +import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.JsonUtil; import org.apache.activemq.artemis.api.core.Message; @@ -57,6 +59,7 @@ import org.apache.activemq.artemis.api.core.management.QueueControl; import org.apache.activemq.artemis.core.config.Configuration; import org.apache.activemq.artemis.core.config.DivertConfiguration; import org.apache.activemq.artemis.core.messagecounter.impl.MessageCounterManagerImpl; +import org.apache.activemq.artemis.core.paging.impl.PagingManagerImpl; import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServers; @@ -946,8 +949,8 @@ public class QueueControlTest extends ManagementTestBase { session.createAddress(myTopic, RoutingType.MULTICAST, false); DivertConfiguration divert = new DivertConfiguration().setName("local-divert") - .setRoutingName("some-name").setAddress(myTopic.toString()) - .setForwardingAddress(forwardingAddress.toString()).setExclusive(false); + .setRoutingName("some-name").setAddress(myTopic.toString()) + .setForwardingAddress(forwardingAddress.toString()).setExclusive(false); server.deployDivert(divert); // Send message to topic. @@ -1505,6 +1508,63 @@ public class QueueControlTest extends ManagementTestBase { session.deleteQueue(queue); } + @Test + public void testRemoveAllWithPagingMode() throws Exception { + + final int MESSAGE_SIZE = 1024 * 3; // 3k + + // reset maxSize for Paging mode + Field maxSizField = PagingManagerImpl.class.getDeclaredField("maxSize"); + maxSizField.setAccessible(true); + maxSizField.setLong(server.getPagingManager(), 10240); + clearDataRecreateServerDirs(); + + SimpleString address = RandomUtil.randomSimpleString(); + SimpleString queueName = RandomUtil.randomSimpleString(); + + session.createQueue(address, RoutingType.MULTICAST, queueName, null, durable); + + Queue queue = server.locateQueue(queueName); + Assert.assertEquals(false, queue.getPageSubscription().isPaging()); + + ClientProducer producer = session.createProducer(address); + + byte[] body = new byte[MESSAGE_SIZE]; + + ByteBuffer bb = ByteBuffer.wrap(body); + + for (int j = 1; j <= MESSAGE_SIZE; j++) { + bb.put(getSamplebyte(j)); + } + + final int numberOfMessages = 8000; + ClientMessage message; + for (int i = 0; i < numberOfMessages; i++) { + message = session.createMessage(true); + + ActiveMQBuffer bodyLocal = message.getBodyBuffer(); + + bodyLocal.writeBytes(body); + + producer.send(message); + } + + Assert.assertEquals(true, queue.getPageSubscription().isPaging()); + + QueueControl queueControl = createManagementControl(address, queueName); + assertMessageMetrics(queueControl, numberOfMessages, durable); + int removedMatchedMessagesCount = queueControl.removeAllMessages(); + Assert.assertEquals(numberOfMessages, removedMatchedMessagesCount); + assertMessageMetrics(queueControl, 0, durable); + + Field queueMemoprySizeField = QueueImpl.class.getDeclaredField("queueMemorySize"); + queueMemoprySizeField.setAccessible(true); + AtomicInteger queueMemorySize = (AtomicInteger) queueMemoprySizeField.get(queue); + Assert.assertEquals(0, queueMemorySize.get()); + + session.deleteQueue(queueName); + } + @Test public void testRemoveMessagesWithEmptyFilter() throws Exception { SimpleString address = RandomUtil.randomSimpleString(); @@ -2491,8 +2551,8 @@ public class QueueControlTest extends ManagementTestBase { } protected void assertMetrics(final QueueControl queueControl, long messageCount, boolean durable, - Supplier count, Supplier size, - SupplierdurableCount, Supplier durableSize) throws Exception { + Supplier count, Supplier size, + Supplier durableCount, Supplier durableSize) throws Exception { //make sure count stat equals message count Assert.assertTrue(Wait.waitFor(() -> count.get().longValue() == messageCount, 3, 100));