This commit is contained in:
Clebert Suconic 2020-03-05 21:35:36 -05:00
commit 32829d6542
4 changed files with 56 additions and 7 deletions

View File

@ -1735,7 +1735,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
@Override
public void run() {
for (Queue queue : getLocalQueues()) {
if (!queue.isInternalQueue() && QueueManagerImpl.isAutoDelete(queue) && QueueManagerImpl.consumerCountCheck(queue) && QueueManagerImpl.delayCheck(queue) && QueueManagerImpl.messageCountCheck(queue)) {
if (!queue.isInternalQueue() && QueueManagerImpl.isAutoDelete(queue) && QueueManagerImpl.consumerCountCheck(queue) && QueueManagerImpl.delayCheck(queue) && QueueManagerImpl.messageCountCheck(queue) && queueWasUsed(queue)) {
QueueManagerImpl.performAutoDeleteQueue(server, queue);
}
}
@ -1760,6 +1760,10 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
}
}
}
private boolean queueWasUsed(Queue queue) {
return queue.getMessagesExpired() > 0 || queue.getMessagesAcknowledged() > 0 || queue.getMessagesKilled() > 0 || queue.getConsumerRemovedTimestamp() != -1;
}
}
private List<Queue> getLocalQueues() {

View File

@ -1859,6 +1859,9 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
acknowledge(ref, AckReason.EXPIRED, consumer);
}
// potentially auto-delete this queue if this expired the last message
refCountForConsumers.check();
if (server != null && server.hasBrokerMessagePlugins()) {
final SimpleString expiryAddress = messageExpiryAddress;
server.callBrokerMessagePlugins(plugin -> plugin.messageExpired(ref, expiryAddress, consumer));
@ -3366,6 +3369,13 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
expiryLogger.addExpiry(address, ref);
}
// potentially auto-delete this queue if this expired the last message
tx.addOperation(new TransactionOperationAbstract() {
@Override
public void afterCommit(Transaction tx) {
refCountForConsumers.check();
}
});
}
private class ExpiryLogger extends TransactionOperationAbstract {

View File

@ -35,7 +35,7 @@ public class QueueManagerImpl extends ReferenceCounterUtil implements QueueManag
//the queue may already have been deleted and this is a result of that
if (queue == null) {
if (ActiveMQServerLogger.LOGGER.isDebugEnabled()) {
ActiveMQServerLogger.LOGGER.debug("no queue to delete \"" + queueName + ".\"");
ActiveMQServerLogger.LOGGER.debug("no queue to delete \"" + queueName + "\".");
}
return;
}
@ -52,7 +52,7 @@ public class QueueManagerImpl extends ReferenceCounterUtil implements QueueManag
long messageCount = queue.getMessageCount();
if (ActiveMQServerLogger.LOGGER.isDebugEnabled()) {
ActiveMQServerLogger.LOGGER.debug("purging queue \"" + queue.getName() + ".\" consumerCount = " + consumerCount + "; messageCount = " + messageCount);
ActiveMQServerLogger.LOGGER.debug("purging queue \"" + queue.getName() + "\": consumerCount = " + consumerCount + "; messageCount = " + messageCount);
}
try {
queue.deleteMatchingReferences(QueueImpl.DEFAULT_FLUSH_LIMIT, null, AckReason.KILLED);
@ -65,7 +65,7 @@ public class QueueManagerImpl extends ReferenceCounterUtil implements QueueManag
SimpleString queueName = queue.getName();
AddressSettings settings = server.getAddressSettingsRepository().getMatch(queue.getAddress().toString());
if (ActiveMQServerLogger.LOGGER.isDebugEnabled()) {
ActiveMQServerLogger.LOGGER.info("deleting auto-created queue \"" + queueName + ".\" consumerCount = " + queue.getConsumerCount() + "; messageCount = " + queue.getMessageCount() + "; isAutoDelete = " + queue.isAutoDelete());
ActiveMQServerLogger.LOGGER.debug("deleting auto-created queue \"" + queueName + "\": consumerCount = " + queue.getConsumerCount() + "; messageCount = " + queue.getMessageCount() + "; isAutoDelete = " + queue.isAutoDelete());
}
try {
@ -84,8 +84,7 @@ public class QueueManagerImpl extends ReferenceCounterUtil implements QueueManag
}
public static boolean delayCheck(Queue queue) {
long consumerRemovedTimestamp = queue.getConsumerRemovedTimestamp();
return consumerRemovedTimestamp != -1 && System.currentTimeMillis() - consumerRemovedTimestamp >= queue.getAutoDeleteDelay();
return System.currentTimeMillis() - queue.getConsumerRemovedTimestamp() >= queue.getAutoDeleteDelay();
}
public static boolean consumerCountCheck(Queue queue) {

View File

@ -17,6 +17,8 @@
package org.apache.activemq.artemis.tests.integration.client;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
@ -42,13 +44,15 @@ public class AutoDeleteQueueTest extends ActiveMQTestBase {
super.setUp();
locator = createInVMNonHALocator();
server = createServer(false);
server.getConfiguration().setAddressQueueScanPeriod(500);
server.getConfiguration().setMessageExpiryScanPeriod(500);
server.start();
cf = createSessionFactory(locator);
}
@Test
public void testAutoDeleteAutoCreatedQueue() throws Exception {
public void testAutoDeleteAutoCreatedQueueOnLastConsumerClose() throws Exception {
// auto-delete-queues defaults to true
server.createQueue(addressA, RoutingType.ANYCAST, queueA, null, null, true, false, false, false, true, 1, false, true);
assertNotNull(server.locateQueue(queueA));
@ -56,6 +60,30 @@ public class AutoDeleteQueueTest extends ActiveMQTestBase {
Wait.assertTrue(() -> server.locateQueue(queueA) == null);
}
@Test
public void testAutoDeleteAutoCreatedQueueOnLastMessageRemovedWithoutConsumer() throws Exception {
// auto-delete-queues defaults to true
server.createQueue(addressA, RoutingType.ANYCAST, queueA, null, null, true, false, false, false, true, 1, false, true);
assertNotNull(server.locateQueue(queueA));
ClientSession session = cf.createSession();
ClientProducer producer = session.createProducer(addressA);
producer.send(session.createMessage(true));
Wait.assertEquals(1, server.locateQueue(queueA)::getMessageCount);
server.locateQueue(queueA).deleteAllReferences();
Wait.assertTrue(() -> server.locateQueue(queueA) == null, 2000, 100);
}
@Test
public void testAutoDeleteAutoCreatedQueueOnLastMessageExpired() throws Exception {
// auto-delete-queues defaults to true
server.createQueue(addressA, RoutingType.ANYCAST, queueA, null, null, true, false, false, false, true, 1, false, true);
assertNotNull(server.locateQueue(queueA));
ClientSession session = cf.createSession();
ClientProducer producer = session.createProducer(addressA);
producer.send(session.createMessage(true).setExpiration(System.currentTimeMillis()));
Wait.assertTrue(() -> server.locateQueue(queueA) == null, 2000, 100);
}
@Test
public void testNegativeAutoDeleteAutoCreatedQueue() throws Exception {
server.getAddressSettingsRepository().addMatch(addressA.toString(), new AddressSettings().setAutoDeleteQueues(false));
@ -64,4 +92,12 @@ public class AutoDeleteQueueTest extends ActiveMQTestBase {
cf.createSession().createConsumer(queueA).close();
assertNotNull(server.locateQueue(queueA));
}
@Test
public void testNegativeAutoDeleteAutoCreatedQueue2() throws Exception {
server.getAddressSettingsRepository().addMatch(addressA.toString(), new AddressSettings());
server.createQueue(addressA, RoutingType.ANYCAST, queueA, null, null, true, false, false, false, true, 1, false, true);
assertNotNull(server.locateQueue(queueA));
assertFalse(Wait.waitFor(() -> server.locateQueue(queueA) == null, 5000, 100));
}
}