diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java index b52ed24c78..c365b7d326 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java @@ -1558,4 +1558,8 @@ public interface ActiveMQServerLogger extends BasicLogger { @Message(id = 224072, value = "Message Counter Sample Period too short: {0}", format = Message.Format.MESSAGE_FORMAT) void invalidMessageCounterPeriod(long value); + @LogMessage(level = Logger.Level.ERROR) + @Message(id = 224073, value = "Failed to purge queue {0} on no consumers", format = Message.Format.MESSAGE_FORMAT) + void failedToPurgeQueue(@Cause Exception e, SimpleString bindingName); + } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/AutoCreatedQueueManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueManager.java similarity index 93% rename from artemis-server/src/main/java/org/apache/activemq/artemis/core/server/AutoCreatedQueueManager.java rename to artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueManager.java index 5af5c0e7da..a847757563 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/AutoCreatedQueueManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueManager.java @@ -19,7 +19,7 @@ package org.apache.activemq.artemis.core.server; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.utils.ReferenceCounter; -public interface AutoCreatedQueueManager extends ReferenceCounter { +public interface QueueManager extends ReferenceCounter { SimpleString getQueueName(); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java index 5c392ccfc9..75751007c1 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java @@ -2472,8 +2472,8 @@ public class ActiveMQServerImpl implements ActiveMQServer { if (transientQueue) { queue.setConsumersRefCount(new TransientQueueManagerImpl(this, queue.getName())); - } else if (queue.isAutoCreated()) { - queue.setConsumersRefCount(new AutoCreatedQueueManagerImpl(this, queue.getName())); + } else { + queue.setConsumersRefCount(new QueueManagerImpl(this, queue.getName())); } final QueueBinding localQueueBinding = new LocalQueueBinding(queue.getAddress(), queue, nodeManager.getNodeId()); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java index 65de0c96bc..a8f2d858bc 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java @@ -153,7 +153,7 @@ public class PostOfficeJournalLoader implements JournalLoader { .maxConsumers(queueBindingInfo.getMaxConsumers()) .routingType(RoutingType.getType(queueBindingInfo.getRoutingType())); final Queue queue = queueFactory.createQueueWith(queueConfigBuilder.build()); - queue.setConsumersRefCount(new AutoCreatedQueueManagerImpl(((PostOfficeImpl)postOffice).getServer(), queueBindingInfo.getQueueName())); + queue.setConsumersRefCount(new QueueManagerImpl(((PostOfficeImpl)postOffice).getServer(), queueBindingInfo.getQueueName())); if (queueBindingInfo.getQueueStatusEncodings() != null) { for (QueueStatusEncoding encoding : queueBindingInfo.getQueueStatusEncodings()) { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java index 6834bb46f6..8242760726 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java @@ -851,13 +851,7 @@ public class QueueImpl implements Queue { refCountForConsumers.decrement(); } - if (noConsumers.decrementAndGet() == 0 && purgeOnNoConsumers) { - try { - deleteQueue(); - } catch (Exception e) { - logger.error("Error deleting queue on no consumers. " + this.toString(), e); - } - } + noConsumers.decrementAndGet(); } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AutoCreatedQueueManagerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueManagerImpl.java similarity index 78% rename from artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AutoCreatedQueueManagerImpl.java rename to artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueManagerImpl.java index fd89a945b0..692eba7987 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AutoCreatedQueueManagerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueManagerImpl.java @@ -19,12 +19,12 @@ package org.apache.activemq.artemis.core.server.impl; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; -import org.apache.activemq.artemis.core.server.AutoCreatedQueueManager; +import org.apache.activemq.artemis.core.server.QueueManager; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.utils.ReferenceCounterUtil; -public class AutoCreatedQueueManagerImpl implements AutoCreatedQueueManager { +public class QueueManagerImpl implements QueueManager { private final SimpleString queueName; @@ -39,7 +39,7 @@ public class AutoCreatedQueueManagerImpl implements AutoCreatedQueueManager { long consumerCount = queue.getConsumerCount(); long messageCount = queue.getMessageCount(); - if (((queue.isAutoCreated() && settings.isAutoDeleteQueues()) || queue.isPurgeOnNoConsumers()) && queue.getMessageCount() == 0) { + if (queue.isAutoCreated() && settings.isAutoDeleteQueues() && queue.getMessageCount() == 0) { if (ActiveMQServerLogger.LOGGER.isDebugEnabled()) { ActiveMQServerLogger.LOGGER.debug("deleting " + (queue.isAutoCreated() ? "auto-created " : "") + "queue \"" + queueName + ".\" consumerCount = " + consumerCount + "; messageCount = " + messageCount + "; isAutoDeleteQueues = " + settings.isAutoDeleteQueues()); } @@ -49,13 +49,22 @@ public class AutoCreatedQueueManagerImpl implements AutoCreatedQueueManager { } catch (Exception e) { ActiveMQServerLogger.LOGGER.errorRemovingAutoCreatedQueue(e, queueName); } + } else if (queue.isPurgeOnNoConsumers()) { + if (ActiveMQServerLogger.LOGGER.isDebugEnabled()) { + ActiveMQServerLogger.LOGGER.debug("purging queue \"" + queueName + ".\" consumerCount = " + consumerCount + "; messageCount = " + messageCount); + } + try { + queue.deleteAllReferences(); + } catch (Exception e) { + ActiveMQServerLogger.LOGGER.failedToPurgeQueue(e, queueName); + } } } }; private final ReferenceCounterUtil referenceCounterUtil = new ReferenceCounterUtil(runnable); - public AutoCreatedQueueManagerImpl(ActiveMQServer server, SimpleString queueName) { + public QueueManagerImpl(ActiveMQServer server, SimpleString queueName) { this.server = server; this.queueName = queueName; } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/addressing/AddressingTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/addressing/AddressingTest.java index 98957e263d..0eb5f3254f 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/addressing/AddressingTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/addressing/AddressingTest.java @@ -224,37 +224,30 @@ public class AddressingTest extends ActiveMQTestBase { @Test public void testPurgeOnNoConsumersTrue() throws Exception { - SimpleString address = new SimpleString("test.address"); SimpleString queueName = SimpleString.toSimpleString(UUID.randomUUID().toString()); - // For each address, create 2 Queues with the same address, assert both queues receive message - boolean purgeOnNoConsumers = true; - Queue q1 = server.createQueue(address, RoutingType.MULTICAST, queueName, null, true, false, Queue.MAX_CONSUMERS_UNLIMITED, purgeOnNoConsumers, true); - + server.createQueue(address, RoutingType.ANYCAST, queueName, null, null, true, false, false, false, false, 1, true, true); + assertNotNull(server.locateQueue(queueName)); ClientSession session = sessionFactory.createSession(); - session.start(); - - ClientConsumer consumer1 = session.createConsumer(q1.getName()); - consumer1.close(); - - assertFalse(server.queueQuery(queueName).isExists()); + ClientProducer producer = session.createProducer(address); + producer.send(session.createMessage(true)); + session.createConsumer(queueName).close(); + assertNotNull(server.locateQueue(queueName)); + assertEquals(0, server.locateQueue(queueName).getMessageCount()); } @Test public void testPurgeOnNoConsumersFalse() throws Exception { SimpleString address = new SimpleString("test.address"); SimpleString queueName = SimpleString.toSimpleString(UUID.randomUUID().toString()); - // For each address, create 2 Queues with the same address, assert both queues receive message - boolean purgeOnNoConsumers = false; - Queue q1 = server.createQueue(address,RoutingType.MULTICAST, queueName, null, true, false, Queue.MAX_CONSUMERS_UNLIMITED, purgeOnNoConsumers, true); - + server.createQueue(address, RoutingType.ANYCAST, queueName, null, null, true, false, false, false, false, 1, false, true); + assertNotNull(server.locateQueue(queueName)); ClientSession session = sessionFactory.createSession(); - session.start(); - - ClientConsumer consumer1 = session.createConsumer(q1.getName()); - consumer1.close(); - - assertTrue(server.queueQuery(queueName).isExists()); + ClientProducer producer = session.createProducer(address); + producer.send(session.createMessage(true)); + session.createConsumer(queueName).close(); + assertNotNull(server.locateQueue(queueName)); + assertEquals(1, server.locateQueue(queueName).getMessageCount()); } @Test diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ClientDefinedMultiConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ClientDefinedMultiConsumerTest.java index 9406295c9c..2823983343 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ClientDefinedMultiConsumerTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ClientDefinedMultiConsumerTest.java @@ -20,6 +20,7 @@ import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.server.RoutingType; import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.core.server.impl.QueueImpl; +import org.apache.activemq.artemis.tests.util.Wait; import org.apache.activemq.transport.amqp.client.AmqpClient; import org.apache.activemq.transport.amqp.client.AmqpConnection; import org.apache.activemq.transport.amqp.client.AmqpMessage; @@ -61,7 +62,12 @@ public class ClientDefinedMultiConsumerTest extends AmqpClientTestSupport { assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub:shared-volatile"))); receiver2.close(); //check its been deleted - assertNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub:shared-volatile"))); + Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisfied() throws Exception { + return server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub:shared-volatile")) == null; + } + }, 1000); connection.close(); } @@ -117,7 +123,12 @@ public class ClientDefinedMultiConsumerTest extends AmqpClientTestSupport { assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub:shared-volatile"))); //check its been deleted connection.close(); - assertNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub:shared-volatile"))); + Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisfied() throws Exception { + return server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub:shared-volatile")) == null; + } + }, 1000); } @Test(timeout = 60000) @@ -144,7 +155,12 @@ public class ClientDefinedMultiConsumerTest extends AmqpClientTestSupport { assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("mySub:shared-volatile:global"))); receiver2.close(); //check its been deleted - assertNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("mySub:shared-volatile:global"))); + Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisfied() throws Exception { + return server.getPostOffice().getBinding(SimpleString.toSimpleString("mySub:shared-volatile:global")) == null; + } + }, 1000); connection.close(); }