From 766f88c22af032fdcae08c0f525f73d528b4a162 Mon Sep 17 00:00:00 2001 From: Justin Bertram Date: Fri, 21 Feb 2020 17:48:16 -0600 Subject: [PATCH] ARTEMIS-2629 ensure queue auto-delete after expiration --- .../core/postoffice/impl/PostOfficeImpl.java | 6 ++- .../artemis/core/server/impl/QueueImpl.java | 10 +++++ .../core/server/impl/QueueManagerImpl.java | 9 ++--- .../client/AutoDeleteQueueTest.java | 38 ++++++++++++++++++- 4 files changed, 56 insertions(+), 7 deletions(-) diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java index bbebfae11d..f3f0c5d02b 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java @@ -1735,7 +1735,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding @Override public void run() { for (Queue queue : getLocalQueues()) { - if (!queue.isInternalQueue() && QueueManagerImpl.isAutoDelete(queue) && QueueManagerImpl.consumerCountCheck(queue) && QueueManagerImpl.delayCheck(queue) && QueueManagerImpl.messageCountCheck(queue)) { + if (!queue.isInternalQueue() && QueueManagerImpl.isAutoDelete(queue) && QueueManagerImpl.consumerCountCheck(queue) && QueueManagerImpl.delayCheck(queue) && QueueManagerImpl.messageCountCheck(queue) && queueWasUsed(queue)) { QueueManagerImpl.performAutoDeleteQueue(server, queue); } } @@ -1760,6 +1760,10 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding } } } + + private boolean queueWasUsed(Queue queue) { + return queue.getMessagesExpired() > 0 || queue.getMessagesAcknowledged() > 0 || queue.getMessagesKilled() > 0 || queue.getConsumerRemovedTimestamp() != -1; + } } private List getLocalQueues() { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java index 53b701ebe1..9ce735d623 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java @@ -1859,6 +1859,9 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { acknowledge(ref, AckReason.EXPIRED, consumer); } + // potentially auto-delete this queue if this expired the last message + refCountForConsumers.check(); + if (server != null && server.hasBrokerMessagePlugins()) { final SimpleString expiryAddress = messageExpiryAddress; server.callBrokerMessagePlugins(plugin -> plugin.messageExpired(ref, expiryAddress, consumer)); @@ -3366,6 +3369,13 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { expiryLogger.addExpiry(address, ref); } + // potentially auto-delete this queue if this expired the last message + tx.addOperation(new TransactionOperationAbstract() { + @Override + public void afterCommit(Transaction tx) { + refCountForConsumers.check(); + } + }); } private class ExpiryLogger extends TransactionOperationAbstract { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueManagerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueManagerImpl.java index b20b537923..549686f8a6 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueManagerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueManagerImpl.java @@ -35,7 +35,7 @@ public class QueueManagerImpl extends ReferenceCounterUtil implements QueueManag //the queue may already have been deleted and this is a result of that if (queue == null) { if (ActiveMQServerLogger.LOGGER.isDebugEnabled()) { - ActiveMQServerLogger.LOGGER.debug("no queue to delete \"" + queueName + ".\""); + ActiveMQServerLogger.LOGGER.debug("no queue to delete \"" + queueName + "\"."); } return; } @@ -52,7 +52,7 @@ public class QueueManagerImpl extends ReferenceCounterUtil implements QueueManag long messageCount = queue.getMessageCount(); if (ActiveMQServerLogger.LOGGER.isDebugEnabled()) { - ActiveMQServerLogger.LOGGER.debug("purging queue \"" + queue.getName() + ".\" consumerCount = " + consumerCount + "; messageCount = " + messageCount); + ActiveMQServerLogger.LOGGER.debug("purging queue \"" + queue.getName() + "\": consumerCount = " + consumerCount + "; messageCount = " + messageCount); } try { queue.deleteMatchingReferences(QueueImpl.DEFAULT_FLUSH_LIMIT, null, AckReason.KILLED); @@ -65,7 +65,7 @@ public class QueueManagerImpl extends ReferenceCounterUtil implements QueueManag SimpleString queueName = queue.getName(); AddressSettings settings = server.getAddressSettingsRepository().getMatch(queue.getAddress().toString()); if (ActiveMQServerLogger.LOGGER.isDebugEnabled()) { - ActiveMQServerLogger.LOGGER.info("deleting auto-created queue \"" + queueName + ".\" consumerCount = " + queue.getConsumerCount() + "; messageCount = " + queue.getMessageCount() + "; isAutoDelete = " + queue.isAutoDelete()); + ActiveMQServerLogger.LOGGER.debug("deleting auto-created queue \"" + queueName + "\": consumerCount = " + queue.getConsumerCount() + "; messageCount = " + queue.getMessageCount() + "; isAutoDelete = " + queue.isAutoDelete()); } try { @@ -84,8 +84,7 @@ public class QueueManagerImpl extends ReferenceCounterUtil implements QueueManag } public static boolean delayCheck(Queue queue) { - long consumerRemovedTimestamp = queue.getConsumerRemovedTimestamp(); - return consumerRemovedTimestamp != -1 && System.currentTimeMillis() - consumerRemovedTimestamp >= queue.getAutoDeleteDelay(); + return System.currentTimeMillis() - queue.getConsumerRemovedTimestamp() >= queue.getAutoDeleteDelay(); } public static boolean consumerCountCheck(Queue queue) { diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AutoDeleteQueueTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AutoDeleteQueueTest.java index 8603e7b166..5bce2cf885 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AutoDeleteQueueTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AutoDeleteQueueTest.java @@ -17,6 +17,8 @@ package org.apache.activemq.artemis.tests.integration.client; import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.api.core.client.ClientProducer; +import org.apache.activemq.artemis.api.core.client.ClientSession; import org.apache.activemq.artemis.api.core.client.ClientSessionFactory; import org.apache.activemq.artemis.api.core.client.ServerLocator; import org.apache.activemq.artemis.core.server.ActiveMQServer; @@ -42,13 +44,15 @@ public class AutoDeleteQueueTest extends ActiveMQTestBase { super.setUp(); locator = createInVMNonHALocator(); server = createServer(false); + server.getConfiguration().setAddressQueueScanPeriod(500); + server.getConfiguration().setMessageExpiryScanPeriod(500); server.start(); cf = createSessionFactory(locator); } @Test - public void testAutoDeleteAutoCreatedQueue() throws Exception { + public void testAutoDeleteAutoCreatedQueueOnLastConsumerClose() throws Exception { // auto-delete-queues defaults to true server.createQueue(addressA, RoutingType.ANYCAST, queueA, null, null, true, false, false, false, true, 1, false, true); assertNotNull(server.locateQueue(queueA)); @@ -56,6 +60,30 @@ public class AutoDeleteQueueTest extends ActiveMQTestBase { Wait.assertTrue(() -> server.locateQueue(queueA) == null); } + @Test + public void testAutoDeleteAutoCreatedQueueOnLastMessageRemovedWithoutConsumer() throws Exception { + // auto-delete-queues defaults to true + server.createQueue(addressA, RoutingType.ANYCAST, queueA, null, null, true, false, false, false, true, 1, false, true); + assertNotNull(server.locateQueue(queueA)); + ClientSession session = cf.createSession(); + ClientProducer producer = session.createProducer(addressA); + producer.send(session.createMessage(true)); + Wait.assertEquals(1, server.locateQueue(queueA)::getMessageCount); + server.locateQueue(queueA).deleteAllReferences(); + Wait.assertTrue(() -> server.locateQueue(queueA) == null, 2000, 100); + } + + @Test + public void testAutoDeleteAutoCreatedQueueOnLastMessageExpired() throws Exception { + // auto-delete-queues defaults to true + server.createQueue(addressA, RoutingType.ANYCAST, queueA, null, null, true, false, false, false, true, 1, false, true); + assertNotNull(server.locateQueue(queueA)); + ClientSession session = cf.createSession(); + ClientProducer producer = session.createProducer(addressA); + producer.send(session.createMessage(true).setExpiration(System.currentTimeMillis())); + Wait.assertTrue(() -> server.locateQueue(queueA) == null, 2000, 100); + } + @Test public void testNegativeAutoDeleteAutoCreatedQueue() throws Exception { server.getAddressSettingsRepository().addMatch(addressA.toString(), new AddressSettings().setAutoDeleteQueues(false)); @@ -64,4 +92,12 @@ public class AutoDeleteQueueTest extends ActiveMQTestBase { cf.createSession().createConsumer(queueA).close(); assertNotNull(server.locateQueue(queueA)); } + + @Test + public void testNegativeAutoDeleteAutoCreatedQueue2() throws Exception { + server.getAddressSettingsRepository().addMatch(addressA.toString(), new AddressSettings()); + server.createQueue(addressA, RoutingType.ANYCAST, queueA, null, null, true, false, false, false, true, 1, false, true); + assertNotNull(server.locateQueue(queueA)); + assertFalse(Wait.waitFor(() -> server.locateQueue(queueA) == null, 5000, 100)); + } }