This closes #982

This commit is contained in:
Clebert Suconic 2017-02-01 09:49:48 -05:00
commit 57038ff47e
8 changed files with 55 additions and 39 deletions

View File

@ -1558,4 +1558,8 @@ public interface ActiveMQServerLogger extends BasicLogger {
@Message(id = 224072, value = "Message Counter Sample Period too short: {0}", format = Message.Format.MESSAGE_FORMAT)
void invalidMessageCounterPeriod(long value);
@LogMessage(level = Logger.Level.ERROR)
@Message(id = 224073, value = "Failed to purge queue {0} on no consumers", format = Message.Format.MESSAGE_FORMAT)
void failedToPurgeQueue(@Cause Exception e, SimpleString bindingName);
}

View File

@ -19,7 +19,7 @@ package org.apache.activemq.artemis.core.server;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.utils.ReferenceCounter;
public interface AutoCreatedQueueManager extends ReferenceCounter {
public interface QueueManager extends ReferenceCounter {
SimpleString getQueueName();
}

View File

@ -2472,8 +2472,8 @@ public class ActiveMQServerImpl implements ActiveMQServer {
if (transientQueue) {
queue.setConsumersRefCount(new TransientQueueManagerImpl(this, queue.getName()));
} else if (queue.isAutoCreated()) {
queue.setConsumersRefCount(new AutoCreatedQueueManagerImpl(this, queue.getName()));
} else {
queue.setConsumersRefCount(new QueueManagerImpl(this, queue.getName()));
}
final QueueBinding localQueueBinding = new LocalQueueBinding(queue.getAddress(), queue, nodeManager.getNodeId());

View File

@ -153,7 +153,7 @@ public class PostOfficeJournalLoader implements JournalLoader {
.maxConsumers(queueBindingInfo.getMaxConsumers())
.routingType(RoutingType.getType(queueBindingInfo.getRoutingType()));
final Queue queue = queueFactory.createQueueWith(queueConfigBuilder.build());
queue.setConsumersRefCount(new AutoCreatedQueueManagerImpl(((PostOfficeImpl)postOffice).getServer(), queueBindingInfo.getQueueName()));
queue.setConsumersRefCount(new QueueManagerImpl(((PostOfficeImpl)postOffice).getServer(), queueBindingInfo.getQueueName()));
if (queueBindingInfo.getQueueStatusEncodings() != null) {
for (QueueStatusEncoding encoding : queueBindingInfo.getQueueStatusEncodings()) {

View File

@ -851,13 +851,7 @@ public class QueueImpl implements Queue {
refCountForConsumers.decrement();
}
if (noConsumers.decrementAndGet() == 0 && purgeOnNoConsumers) {
try {
deleteQueue();
} catch (Exception e) {
logger.error("Error deleting queue on no consumers. " + this.toString(), e);
}
}
noConsumers.decrementAndGet();
}
}

View File

@ -19,12 +19,12 @@ package org.apache.activemq.artemis.core.server.impl;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.AutoCreatedQueueManager;
import org.apache.activemq.artemis.core.server.QueueManager;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.utils.ReferenceCounterUtil;
public class AutoCreatedQueueManagerImpl implements AutoCreatedQueueManager {
public class QueueManagerImpl implements QueueManager {
private final SimpleString queueName;
@ -39,7 +39,7 @@ public class AutoCreatedQueueManagerImpl implements AutoCreatedQueueManager {
long consumerCount = queue.getConsumerCount();
long messageCount = queue.getMessageCount();
if (((queue.isAutoCreated() && settings.isAutoDeleteQueues()) || queue.isPurgeOnNoConsumers()) && queue.getMessageCount() == 0) {
if (queue.isAutoCreated() && settings.isAutoDeleteQueues() && queue.getMessageCount() == 0) {
if (ActiveMQServerLogger.LOGGER.isDebugEnabled()) {
ActiveMQServerLogger.LOGGER.debug("deleting " + (queue.isAutoCreated() ? "auto-created " : "") + "queue \"" + queueName + ".\" consumerCount = " + consumerCount + "; messageCount = " + messageCount + "; isAutoDeleteQueues = " + settings.isAutoDeleteQueues());
}
@ -49,13 +49,22 @@ public class AutoCreatedQueueManagerImpl implements AutoCreatedQueueManager {
} catch (Exception e) {
ActiveMQServerLogger.LOGGER.errorRemovingAutoCreatedQueue(e, queueName);
}
} else if (queue.isPurgeOnNoConsumers()) {
if (ActiveMQServerLogger.LOGGER.isDebugEnabled()) {
ActiveMQServerLogger.LOGGER.debug("purging queue \"" + queueName + ".\" consumerCount = " + consumerCount + "; messageCount = " + messageCount);
}
try {
queue.deleteAllReferences();
} catch (Exception e) {
ActiveMQServerLogger.LOGGER.failedToPurgeQueue(e, queueName);
}
}
}
};
private final ReferenceCounterUtil referenceCounterUtil = new ReferenceCounterUtil(runnable);
public AutoCreatedQueueManagerImpl(ActiveMQServer server, SimpleString queueName) {
public QueueManagerImpl(ActiveMQServer server, SimpleString queueName) {
this.server = server;
this.queueName = queueName;
}

View File

@ -224,37 +224,30 @@ public class AddressingTest extends ActiveMQTestBase {
@Test
public void testPurgeOnNoConsumersTrue() throws Exception {
SimpleString address = new SimpleString("test.address");
SimpleString queueName = SimpleString.toSimpleString(UUID.randomUUID().toString());
// For each address, create 2 Queues with the same address, assert both queues receive message
boolean purgeOnNoConsumers = true;
Queue q1 = server.createQueue(address, RoutingType.MULTICAST, queueName, null, true, false, Queue.MAX_CONSUMERS_UNLIMITED, purgeOnNoConsumers, true);
server.createQueue(address, RoutingType.ANYCAST, queueName, null, null, true, false, false, false, false, 1, true, true);
assertNotNull(server.locateQueue(queueName));
ClientSession session = sessionFactory.createSession();
session.start();
ClientConsumer consumer1 = session.createConsumer(q1.getName());
consumer1.close();
assertFalse(server.queueQuery(queueName).isExists());
ClientProducer producer = session.createProducer(address);
producer.send(session.createMessage(true));
session.createConsumer(queueName).close();
assertNotNull(server.locateQueue(queueName));
assertEquals(0, server.locateQueue(queueName).getMessageCount());
}
@Test
public void testPurgeOnNoConsumersFalse() throws Exception {
SimpleString address = new SimpleString("test.address");
SimpleString queueName = SimpleString.toSimpleString(UUID.randomUUID().toString());
// For each address, create 2 Queues with the same address, assert both queues receive message
boolean purgeOnNoConsumers = false;
Queue q1 = server.createQueue(address,RoutingType.MULTICAST, queueName, null, true, false, Queue.MAX_CONSUMERS_UNLIMITED, purgeOnNoConsumers, true);
server.createQueue(address, RoutingType.ANYCAST, queueName, null, null, true, false, false, false, false, 1, false, true);
assertNotNull(server.locateQueue(queueName));
ClientSession session = sessionFactory.createSession();
session.start();
ClientConsumer consumer1 = session.createConsumer(q1.getName());
consumer1.close();
assertTrue(server.queueQuery(queueName).isExists());
ClientProducer producer = session.createProducer(address);
producer.send(session.createMessage(true));
session.createConsumer(queueName).close();
assertNotNull(server.locateQueue(queueName));
assertEquals(1, server.locateQueue(queueName).getMessageCount());
}
@Test

View File

@ -20,6 +20,7 @@ import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.server.RoutingType;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.impl.QueueImpl;
import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.activemq.transport.amqp.client.AmqpClient;
import org.apache.activemq.transport.amqp.client.AmqpConnection;
import org.apache.activemq.transport.amqp.client.AmqpMessage;
@ -61,7 +62,12 @@ public class ClientDefinedMultiConsumerTest extends AmqpClientTestSupport {
assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub:shared-volatile")));
receiver2.close();
//check its been deleted
assertNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub:shared-volatile")));
Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisfied() throws Exception {
return server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub:shared-volatile")) == null;
}
}, 1000);
connection.close();
}
@ -117,7 +123,12 @@ public class ClientDefinedMultiConsumerTest extends AmqpClientTestSupport {
assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub:shared-volatile")));
//check its been deleted
connection.close();
assertNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub:shared-volatile")));
Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisfied() throws Exception {
return server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub:shared-volatile")) == null;
}
}, 1000);
}
@Test(timeout = 60000)
@ -144,7 +155,12 @@ public class ClientDefinedMultiConsumerTest extends AmqpClientTestSupport {
assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("mySub:shared-volatile:global")));
receiver2.close();
//check its been deleted
assertNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("mySub:shared-volatile:global")));
Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisfied() throws Exception {
return server.getPostOffice().getBinding(SimpleString.toSimpleString("mySub:shared-volatile:global")) == null;
}
}, 1000);
connection.close();
}