From 6b5f78bfc4f53ac32808cb5a5cf947ef649fc4a3 Mon Sep 17 00:00:00 2001 From: iliya Date: Fri, 10 Feb 2023 15:37:38 +0300 Subject: [PATCH] ARTEMIS-4165 Delete messages in case of queue destroy Messages should be acked even while paging. That will allow page transactions or anything else to be cleared accordingly. --- .../artemis/core/server/impl/QueueImpl.java | 10 +---- .../artemis/tests/db/paging/PagingTest.java | 40 +++++++++++++++++-- 2 files changed, 37 insertions(+), 13 deletions(-) diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java index 226ffe02ee..e9323f36d7 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java @@ -2294,13 +2294,6 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { while (iter.hasNext()) { MessageReference ref = iter.next(); - if (ref.isPaged() && queueDestroyed) { - // this means the queue is being removed - // hence paged references are just going away through - // page cleanup - continue; - } - if (filter1 == null || filter1.match(ref.getMessage())) { if (messageAction.actMessage(tx, ref)) { iter.remove(); @@ -2337,7 +2330,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { } } - if (pageIterator != null && !queueDestroyed) { + if (pageIterator != null) { while (pageIterator.hasNext()) { PagedReference reference = pageIterator.next(); pageIterator.remove(); @@ -2362,7 +2355,6 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { if (txCount > 0) { tx.commit(); - tx = null; } if (filter != null && !queueDestroyed && pageSubscription != null) { diff --git a/tests/db-tests/src/test/java/org/apache/activemq/artemis/tests/db/paging/PagingTest.java b/tests/db-tests/src/test/java/org/apache/activemq/artemis/tests/db/paging/PagingTest.java index 437ac48a21..fa960f06e6 100644 --- a/tests/db-tests/src/test/java/org/apache/activemq/artemis/tests/db/paging/PagingTest.java +++ b/tests/db-tests/src/test/java/org/apache/activemq/artemis/tests/db/paging/PagingTest.java @@ -100,6 +100,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl; import org.apache.activemq.artemis.core.server.impl.AddressInfo; +import org.apache.activemq.artemis.core.server.impl.QueueImpl; import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.logs.AssertionLoggerHandler; @@ -1738,8 +1739,6 @@ public class PagingTest extends ParameterDBTestBase { producer.close(); session.start(); - long timeout = System.currentTimeMillis() + 30000; - waitBuffer(cons, NUM_MESSAGES / 5); waitBuffer(cons2, NUM_MESSAGES / 5); @@ -1760,8 +1759,6 @@ public class PagingTest extends ParameterDBTestBase { final HashMap recordsType = countJournal(config); - assertNull("The system is acking page records instead of just delete data", recordsType.get((int) JournalRecordIds.ACKNOWLEDGE_CURSOR)); - Pair, List> journalData = loadMessageJournal(config); HashSet deletedQueueReferences = new HashSet<>(); @@ -1822,6 +1819,41 @@ public class PagingTest extends ParameterDBTestBase { server.stop(); } + @Test + public void testDeleteQueue() throws Exception { + clearDataRecreateServerDirs(); + + Configuration config = createDefaultNettyConfig().setJournalSyncNonTransactional(false); + + server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX); + + server.start(); + + SimpleString queue = new SimpleString("testPurge:" + RandomUtil.randomString()); + server.addAddressInfo(new AddressInfo(queue, RoutingType.ANYCAST)); + QueueImpl purgeQueue = (QueueImpl) server.createQueue(new QueueConfiguration(queue).setRoutingType(RoutingType.ANYCAST).setMaxConsumers(1).setPurgeOnNoConsumers(false).setAutoCreateAddress(false)); + + ConnectionFactory cf = CFUtil.createConnectionFactory("CORE", "tcp://localhost:61616"); + Connection connection = cf.createConnection(); + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + javax.jms.Queue jmsQueue = session.createQueue(queue.toString()); + + purgeQueue.getPageSubscription().getPagingStore().startPaging(); + + MessageProducer producer = session.createProducer(jmsQueue); + + for (int i = 0; i < 100; i++) { + producer.send(session.createTextMessage("hello" + i)); + session.commit(); + } + + Wait.assertEquals(100, purgeQueue::getMessageCount); + + purgeQueue.deleteQueue(false); + + Wait.assertEquals(0, ()->server.getPagingManager().getTransactions().size(), 5_000); + } + private void waitBuffer(ClientConsumerInternal clientBuffer, int bufferSize) { Wait.assertTrue(() -> "expected " + bufferSize + " but got " + clientBuffer.getBufferSize(), () -> clientBuffer.getBufferSize() > bufferSize, 5000, 100); }