ARTEMIS-922 implement purge semantics
This commit is contained in:
parent
35415510df
commit
1752814197
|
@ -1558,4 +1558,8 @@ public interface ActiveMQServerLogger extends BasicLogger {
|
||||||
@Message(id = 224072, value = "Message Counter Sample Period too short: {0}", format = Message.Format.MESSAGE_FORMAT)
|
@Message(id = 224072, value = "Message Counter Sample Period too short: {0}", format = Message.Format.MESSAGE_FORMAT)
|
||||||
void invalidMessageCounterPeriod(long value);
|
void invalidMessageCounterPeriod(long value);
|
||||||
|
|
||||||
|
@LogMessage(level = Logger.Level.ERROR)
|
||||||
|
@Message(id = 224073, value = "Failed to purge queue {0} on no consumers", format = Message.Format.MESSAGE_FORMAT)
|
||||||
|
void failedToPurgeQueue(@Cause Exception e, SimpleString bindingName);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,7 +19,7 @@ package org.apache.activemq.artemis.core.server;
|
||||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||||
import org.apache.activemq.artemis.utils.ReferenceCounter;
|
import org.apache.activemq.artemis.utils.ReferenceCounter;
|
||||||
|
|
||||||
public interface AutoCreatedQueueManager extends ReferenceCounter {
|
public interface QueueManager extends ReferenceCounter {
|
||||||
|
|
||||||
SimpleString getQueueName();
|
SimpleString getQueueName();
|
||||||
}
|
}
|
|
@ -2472,8 +2472,8 @@ public class ActiveMQServerImpl implements ActiveMQServer {
|
||||||
|
|
||||||
if (transientQueue) {
|
if (transientQueue) {
|
||||||
queue.setConsumersRefCount(new TransientQueueManagerImpl(this, queue.getName()));
|
queue.setConsumersRefCount(new TransientQueueManagerImpl(this, queue.getName()));
|
||||||
} else if (queue.isAutoCreated()) {
|
} else {
|
||||||
queue.setConsumersRefCount(new AutoCreatedQueueManagerImpl(this, queue.getName()));
|
queue.setConsumersRefCount(new QueueManagerImpl(this, queue.getName()));
|
||||||
}
|
}
|
||||||
|
|
||||||
final QueueBinding localQueueBinding = new LocalQueueBinding(queue.getAddress(), queue, nodeManager.getNodeId());
|
final QueueBinding localQueueBinding = new LocalQueueBinding(queue.getAddress(), queue, nodeManager.getNodeId());
|
||||||
|
|
|
@ -153,7 +153,7 @@ public class PostOfficeJournalLoader implements JournalLoader {
|
||||||
.maxConsumers(queueBindingInfo.getMaxConsumers())
|
.maxConsumers(queueBindingInfo.getMaxConsumers())
|
||||||
.routingType(RoutingType.getType(queueBindingInfo.getRoutingType()));
|
.routingType(RoutingType.getType(queueBindingInfo.getRoutingType()));
|
||||||
final Queue queue = queueFactory.createQueueWith(queueConfigBuilder.build());
|
final Queue queue = queueFactory.createQueueWith(queueConfigBuilder.build());
|
||||||
queue.setConsumersRefCount(new AutoCreatedQueueManagerImpl(((PostOfficeImpl)postOffice).getServer(), queueBindingInfo.getQueueName()));
|
queue.setConsumersRefCount(new QueueManagerImpl(((PostOfficeImpl)postOffice).getServer(), queueBindingInfo.getQueueName()));
|
||||||
|
|
||||||
if (queueBindingInfo.getQueueStatusEncodings() != null) {
|
if (queueBindingInfo.getQueueStatusEncodings() != null) {
|
||||||
for (QueueStatusEncoding encoding : queueBindingInfo.getQueueStatusEncodings()) {
|
for (QueueStatusEncoding encoding : queueBindingInfo.getQueueStatusEncodings()) {
|
||||||
|
|
|
@ -851,13 +851,7 @@ public class QueueImpl implements Queue {
|
||||||
refCountForConsumers.decrement();
|
refCountForConsumers.decrement();
|
||||||
}
|
}
|
||||||
|
|
||||||
if (noConsumers.decrementAndGet() == 0 && purgeOnNoConsumers) {
|
noConsumers.decrementAndGet();
|
||||||
try {
|
|
||||||
deleteQueue();
|
|
||||||
} catch (Exception e) {
|
|
||||||
logger.error("Error deleting queue on no consumers. " + this.toString(), e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -19,12 +19,12 @@ package org.apache.activemq.artemis.core.server.impl;
|
||||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||||
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||||
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
|
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
|
||||||
import org.apache.activemq.artemis.core.server.AutoCreatedQueueManager;
|
import org.apache.activemq.artemis.core.server.QueueManager;
|
||||||
import org.apache.activemq.artemis.core.server.Queue;
|
import org.apache.activemq.artemis.core.server.Queue;
|
||||||
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
||||||
import org.apache.activemq.artemis.utils.ReferenceCounterUtil;
|
import org.apache.activemq.artemis.utils.ReferenceCounterUtil;
|
||||||
|
|
||||||
public class AutoCreatedQueueManagerImpl implements AutoCreatedQueueManager {
|
public class QueueManagerImpl implements QueueManager {
|
||||||
|
|
||||||
private final SimpleString queueName;
|
private final SimpleString queueName;
|
||||||
|
|
||||||
|
@ -39,7 +39,7 @@ public class AutoCreatedQueueManagerImpl implements AutoCreatedQueueManager {
|
||||||
long consumerCount = queue.getConsumerCount();
|
long consumerCount = queue.getConsumerCount();
|
||||||
long messageCount = queue.getMessageCount();
|
long messageCount = queue.getMessageCount();
|
||||||
|
|
||||||
if (((queue.isAutoCreated() && settings.isAutoDeleteQueues()) || queue.isPurgeOnNoConsumers()) && queue.getMessageCount() == 0) {
|
if (queue.isAutoCreated() && settings.isAutoDeleteQueues() && queue.getMessageCount() == 0) {
|
||||||
if (ActiveMQServerLogger.LOGGER.isDebugEnabled()) {
|
if (ActiveMQServerLogger.LOGGER.isDebugEnabled()) {
|
||||||
ActiveMQServerLogger.LOGGER.debug("deleting " + (queue.isAutoCreated() ? "auto-created " : "") + "queue \"" + queueName + ".\" consumerCount = " + consumerCount + "; messageCount = " + messageCount + "; isAutoDeleteQueues = " + settings.isAutoDeleteQueues());
|
ActiveMQServerLogger.LOGGER.debug("deleting " + (queue.isAutoCreated() ? "auto-created " : "") + "queue \"" + queueName + ".\" consumerCount = " + consumerCount + "; messageCount = " + messageCount + "; isAutoDeleteQueues = " + settings.isAutoDeleteQueues());
|
||||||
}
|
}
|
||||||
|
@ -49,13 +49,22 @@ public class AutoCreatedQueueManagerImpl implements AutoCreatedQueueManager {
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
ActiveMQServerLogger.LOGGER.errorRemovingAutoCreatedQueue(e, queueName);
|
ActiveMQServerLogger.LOGGER.errorRemovingAutoCreatedQueue(e, queueName);
|
||||||
}
|
}
|
||||||
|
} else if (queue.isPurgeOnNoConsumers()) {
|
||||||
|
if (ActiveMQServerLogger.LOGGER.isDebugEnabled()) {
|
||||||
|
ActiveMQServerLogger.LOGGER.debug("purging queue \"" + queueName + ".\" consumerCount = " + consumerCount + "; messageCount = " + messageCount);
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
queue.deleteAllReferences();
|
||||||
|
} catch (Exception e) {
|
||||||
|
ActiveMQServerLogger.LOGGER.failedToPurgeQueue(e, queueName);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
private final ReferenceCounterUtil referenceCounterUtil = new ReferenceCounterUtil(runnable);
|
private final ReferenceCounterUtil referenceCounterUtil = new ReferenceCounterUtil(runnable);
|
||||||
|
|
||||||
public AutoCreatedQueueManagerImpl(ActiveMQServer server, SimpleString queueName) {
|
public QueueManagerImpl(ActiveMQServer server, SimpleString queueName) {
|
||||||
this.server = server;
|
this.server = server;
|
||||||
this.queueName = queueName;
|
this.queueName = queueName;
|
||||||
}
|
}
|
|
@ -224,37 +224,30 @@ public class AddressingTest extends ActiveMQTestBase {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testPurgeOnNoConsumersTrue() throws Exception {
|
public void testPurgeOnNoConsumersTrue() throws Exception {
|
||||||
|
|
||||||
SimpleString address = new SimpleString("test.address");
|
SimpleString address = new SimpleString("test.address");
|
||||||
SimpleString queueName = SimpleString.toSimpleString(UUID.randomUUID().toString());
|
SimpleString queueName = SimpleString.toSimpleString(UUID.randomUUID().toString());
|
||||||
// For each address, create 2 Queues with the same address, assert both queues receive message
|
server.createQueue(address, RoutingType.ANYCAST, queueName, null, null, true, false, false, false, false, 1, true, true);
|
||||||
boolean purgeOnNoConsumers = true;
|
assertNotNull(server.locateQueue(queueName));
|
||||||
Queue q1 = server.createQueue(address, RoutingType.MULTICAST, queueName, null, true, false, Queue.MAX_CONSUMERS_UNLIMITED, purgeOnNoConsumers, true);
|
|
||||||
|
|
||||||
ClientSession session = sessionFactory.createSession();
|
ClientSession session = sessionFactory.createSession();
|
||||||
session.start();
|
ClientProducer producer = session.createProducer(address);
|
||||||
|
producer.send(session.createMessage(true));
|
||||||
ClientConsumer consumer1 = session.createConsumer(q1.getName());
|
session.createConsumer(queueName).close();
|
||||||
consumer1.close();
|
assertNotNull(server.locateQueue(queueName));
|
||||||
|
assertEquals(0, server.locateQueue(queueName).getMessageCount());
|
||||||
assertFalse(server.queueQuery(queueName).isExists());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testPurgeOnNoConsumersFalse() throws Exception {
|
public void testPurgeOnNoConsumersFalse() throws Exception {
|
||||||
SimpleString address = new SimpleString("test.address");
|
SimpleString address = new SimpleString("test.address");
|
||||||
SimpleString queueName = SimpleString.toSimpleString(UUID.randomUUID().toString());
|
SimpleString queueName = SimpleString.toSimpleString(UUID.randomUUID().toString());
|
||||||
// For each address, create 2 Queues with the same address, assert both queues receive message
|
server.createQueue(address, RoutingType.ANYCAST, queueName, null, null, true, false, false, false, false, 1, false, true);
|
||||||
boolean purgeOnNoConsumers = false;
|
assertNotNull(server.locateQueue(queueName));
|
||||||
Queue q1 = server.createQueue(address,RoutingType.MULTICAST, queueName, null, true, false, Queue.MAX_CONSUMERS_UNLIMITED, purgeOnNoConsumers, true);
|
|
||||||
|
|
||||||
ClientSession session = sessionFactory.createSession();
|
ClientSession session = sessionFactory.createSession();
|
||||||
session.start();
|
ClientProducer producer = session.createProducer(address);
|
||||||
|
producer.send(session.createMessage(true));
|
||||||
ClientConsumer consumer1 = session.createConsumer(q1.getName());
|
session.createConsumer(queueName).close();
|
||||||
consumer1.close();
|
assertNotNull(server.locateQueue(queueName));
|
||||||
|
assertEquals(1, server.locateQueue(queueName).getMessageCount());
|
||||||
assertTrue(server.queueQuery(queueName).isExists());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|
|
@ -20,6 +20,7 @@ import org.apache.activemq.artemis.api.core.SimpleString;
|
||||||
import org.apache.activemq.artemis.core.server.RoutingType;
|
import org.apache.activemq.artemis.core.server.RoutingType;
|
||||||
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
|
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
|
||||||
import org.apache.activemq.artemis.core.server.impl.QueueImpl;
|
import org.apache.activemq.artemis.core.server.impl.QueueImpl;
|
||||||
|
import org.apache.activemq.artemis.tests.util.Wait;
|
||||||
import org.apache.activemq.transport.amqp.client.AmqpClient;
|
import org.apache.activemq.transport.amqp.client.AmqpClient;
|
||||||
import org.apache.activemq.transport.amqp.client.AmqpConnection;
|
import org.apache.activemq.transport.amqp.client.AmqpConnection;
|
||||||
import org.apache.activemq.transport.amqp.client.AmqpMessage;
|
import org.apache.activemq.transport.amqp.client.AmqpMessage;
|
||||||
|
@ -61,7 +62,12 @@ public class ClientDefinedMultiConsumerTest extends AmqpClientTestSupport {
|
||||||
assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub:shared-volatile")));
|
assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub:shared-volatile")));
|
||||||
receiver2.close();
|
receiver2.close();
|
||||||
//check its been deleted
|
//check its been deleted
|
||||||
assertNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub:shared-volatile")));
|
Wait.waitFor(new Wait.Condition() {
|
||||||
|
@Override
|
||||||
|
public boolean isSatisfied() throws Exception {
|
||||||
|
return server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub:shared-volatile")) == null;
|
||||||
|
}
|
||||||
|
}, 1000);
|
||||||
connection.close();
|
connection.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -117,7 +123,12 @@ public class ClientDefinedMultiConsumerTest extends AmqpClientTestSupport {
|
||||||
assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub:shared-volatile")));
|
assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub:shared-volatile")));
|
||||||
//check its been deleted
|
//check its been deleted
|
||||||
connection.close();
|
connection.close();
|
||||||
assertNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub:shared-volatile")));
|
Wait.waitFor(new Wait.Condition() {
|
||||||
|
@Override
|
||||||
|
public boolean isSatisfied() throws Exception {
|
||||||
|
return server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub:shared-volatile")) == null;
|
||||||
|
}
|
||||||
|
}, 1000);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test(timeout = 60000)
|
@Test(timeout = 60000)
|
||||||
|
@ -144,7 +155,12 @@ public class ClientDefinedMultiConsumerTest extends AmqpClientTestSupport {
|
||||||
assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("mySub:shared-volatile:global")));
|
assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("mySub:shared-volatile:global")));
|
||||||
receiver2.close();
|
receiver2.close();
|
||||||
//check its been deleted
|
//check its been deleted
|
||||||
assertNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("mySub:shared-volatile:global")));
|
Wait.waitFor(new Wait.Condition() {
|
||||||
|
@Override
|
||||||
|
public boolean isSatisfied() throws Exception {
|
||||||
|
return server.getPostOffice().getBinding(SimpleString.toSimpleString("mySub:shared-volatile:global")) == null;
|
||||||
|
}
|
||||||
|
}, 1000);
|
||||||
connection.close();
|
connection.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue