ARTEMIS-3525 Empty Auto Created queues should be removed on restart
This commit is contained in:
parent
e86acc977b
commit
2383aa0125
|
@ -83,6 +83,11 @@ public interface AddressManager {
|
||||||
|
|
||||||
void scanAddresses(MirrorController mirrorController) throws Exception;
|
void scanAddresses(MirrorController mirrorController) throws Exception;
|
||||||
|
|
||||||
|
boolean checkAutoRemoveAddress(SimpleString address,
|
||||||
|
AddressInfo addressInfo,
|
||||||
|
AddressSettings settings,
|
||||||
|
boolean ignoreDelay) throws Exception;
|
||||||
|
|
||||||
boolean checkAutoRemoveAddress(SimpleString address,
|
boolean checkAutoRemoveAddress(SimpleString address,
|
||||||
AddressInfo addressInfo,
|
AddressInfo addressInfo,
|
||||||
AddressSettings settings) throws Exception;
|
AddressSettings settings) throws Exception;
|
||||||
|
|
|
@ -1779,6 +1779,8 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public synchronized void startAddressQueueScanner() {
|
public synchronized void startAddressQueueScanner() {
|
||||||
|
reapAddresses(true); // we need to check for empty auto-created queues before the acceptors are on
|
||||||
|
// empty auto-created queues and addresses should be removed right away
|
||||||
if (addressQueueReaperPeriod > 0) {
|
if (addressQueueReaperPeriod > 0) {
|
||||||
if (addressQueueReaperRunnable != null)
|
if (addressQueueReaperRunnable != null)
|
||||||
addressQueueReaperRunnable.stop();
|
addressQueueReaperRunnable.stop();
|
||||||
|
@ -1845,7 +1847,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
reapAddresses();
|
reapAddresses(false);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1855,13 +1857,17 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
|
||||||
|
|
||||||
/** To be used by the AddressQueueReaper.
|
/** To be used by the AddressQueueReaper.
|
||||||
* It is also exposed for tests through PostOfficeTestAccessor */
|
* It is also exposed for tests through PostOfficeTestAccessor */
|
||||||
void reapAddresses() {
|
void reapAddresses(boolean initialCheck) {
|
||||||
getLocalQueues().forEach(queue -> {
|
getLocalQueues().forEach(queue -> {
|
||||||
if (!queue.isInternalQueue() && QueueManagerImpl.isAutoDelete(queue) && QueueManagerImpl.consumerCountCheck(queue) && QueueManagerImpl.delayCheck(queue) && QueueManagerImpl.messageCountCheck(queue) && queueWasUsed(queue)) {
|
if (!queue.isInternalQueue() && QueueManagerImpl.isAutoDelete(queue) && QueueManagerImpl.consumerCountCheck(queue) && (initialCheck || QueueManagerImpl.delayCheck(queue)) && QueueManagerImpl.messageCountCheck(queue) && (initialCheck || queueWasUsed(queue))) {
|
||||||
if (queue.isSwept()) {
|
if (initialCheck || queue.isSwept()) {
|
||||||
if (logger.isDebugEnabled()) {
|
if (logger.isDebugEnabled()) {
|
||||||
|
if (initialCheck) {
|
||||||
|
logger.debug("Removing queue " + queue.getName() + " during the reload check");
|
||||||
|
} else {
|
||||||
logger.debug("Removing queue " + queue.getName() + " after it being swept twice on reaping process");
|
logger.debug("Removing queue " + queue.getName() + " after it being swept twice on reaping process");
|
||||||
}
|
}
|
||||||
|
}
|
||||||
QueueManagerImpl.performAutoDeleteQueue(server, queue);
|
QueueManagerImpl.performAutoDeleteQueue(server, queue);
|
||||||
} else {
|
} else {
|
||||||
queue.setSwept(true);
|
queue.setSwept(true);
|
||||||
|
@ -1878,8 +1884,8 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
|
||||||
AddressSettings settings = addressSettingsRepository.getMatch(address.toString());
|
AddressSettings settings = addressSettingsRepository.getMatch(address.toString());
|
||||||
|
|
||||||
try {
|
try {
|
||||||
if (addressManager.checkAutoRemoveAddress(address, addressInfo, settings)) {
|
if (addressManager.checkAutoRemoveAddress(address, addressInfo, settings, initialCheck)) {
|
||||||
if (addressInfo.isSwept()) {
|
if (initialCheck || addressInfo.isSwept()) {
|
||||||
|
|
||||||
server.autoRemoveAddressInfo(address, null);
|
server.autoRemoveAddressInfo(address, null);
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -369,7 +369,14 @@ public class SimpleAddressManager implements AddressManager {
|
||||||
public boolean checkAutoRemoveAddress(SimpleString address,
|
public boolean checkAutoRemoveAddress(SimpleString address,
|
||||||
AddressInfo addressInfo,
|
AddressInfo addressInfo,
|
||||||
AddressSettings settings) throws Exception {
|
AddressSettings settings) throws Exception {
|
||||||
return settings.isAutoDeleteAddresses() && addressInfo != null && addressInfo.isAutoCreated() && !bindingsFactory.isAddressBound(address) && addressInfo.getBindingRemovedTimestamp() != -1 && (System.currentTimeMillis() - addressInfo.getBindingRemovedTimestamp() >= settings.getAutoDeleteAddressesDelay());
|
return checkAutoRemoveAddress(address, addressInfo, settings, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean checkAutoRemoveAddress(SimpleString address,
|
||||||
|
AddressInfo addressInfo,
|
||||||
|
AddressSettings settings, boolean ignoreDelay) throws Exception {
|
||||||
|
return settings.isAutoDeleteAddresses() && addressInfo != null && addressInfo.isAutoCreated() && !bindingsFactory.isAddressBound(address) && (ignoreDelay || addressInfo.getBindingRemovedTimestamp() != -1 && (System.currentTimeMillis() - addressInfo.getBindingRemovedTimestamp() >= settings.getAutoDeleteAddressesDelay()));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -20,7 +20,7 @@ package org.apache.activemq.artemis.core.postoffice.impl;
|
||||||
public class PostOfficeTestAccessor {
|
public class PostOfficeTestAccessor {
|
||||||
|
|
||||||
public static void reapAddresses(PostOfficeImpl postOffice) {
|
public static void reapAddresses(PostOfficeImpl postOffice) {
|
||||||
postOffice.reapAddresses();
|
postOffice.reapAddresses(false);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -22,6 +22,7 @@ import javax.jms.MessageConsumer;
|
||||||
import javax.jms.MessageProducer;
|
import javax.jms.MessageProducer;
|
||||||
import javax.jms.Queue;
|
import javax.jms.Queue;
|
||||||
import javax.jms.Session;
|
import javax.jms.Session;
|
||||||
|
import javax.jms.TextMessage;
|
||||||
import javax.jms.Topic;
|
import javax.jms.Topic;
|
||||||
|
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
@ -42,6 +43,7 @@ import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
||||||
import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
|
import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
|
||||||
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
|
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
|
||||||
import org.apache.activemq.artemis.tests.util.CFUtil;
|
import org.apache.activemq.artemis.tests.util.CFUtil;
|
||||||
|
import org.apache.activemq.artemis.tests.util.RandomUtil;
|
||||||
import org.apache.activemq.artemis.tests.util.Wait;
|
import org.apache.activemq.artemis.tests.util.Wait;
|
||||||
import org.jboss.logging.Logger;
|
import org.jboss.logging.Logger;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
|
@ -327,6 +329,95 @@ public class AutoCreateTest extends ActiveMQTestBase {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCleanupAfterRebootOpenWire() throws Exception {
|
||||||
|
testCleanupAfterReboot("OPENWIRE", false);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCleanupAfterRebootCore() throws Exception {
|
||||||
|
// there is no need to duplicate the test between usedelay and not.
|
||||||
|
// doing it in one of the protocols should be enough
|
||||||
|
testCleanupAfterReboot("CORE", true);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCleanupAfterRebootAMQP() throws Exception {
|
||||||
|
testCleanupAfterReboot("AMQP", false);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testCleanupAfterReboot(String protocol, boolean useDelay) throws Exception {
|
||||||
|
|
||||||
|
if (useDelay) {
|
||||||
|
// setting up a delay, to make things a bit more challenging
|
||||||
|
server.getAddressSettingsRepository().addMatch(getName(), new AddressSettings().setAutoCreateAddresses(true).setAutoDeleteAddressesDelay(TimeUnit.DAYS.toMillis(1)).setAutoDeleteQueuesDelay(TimeUnit.DAYS.toMillis(1)));
|
||||||
|
}
|
||||||
|
|
||||||
|
AssertionLoggerHandler.startCapture();
|
||||||
|
server.getConfiguration().setAddressQueueScanPeriod(-1); // disabling scanner, we will perform it manually
|
||||||
|
server.start();
|
||||||
|
String QUEUE_NAME = getName();
|
||||||
|
|
||||||
|
ConnectionFactory cf = CFUtil.createConnectionFactory(protocol, "tcp://localhost:61616");
|
||||||
|
try (Connection connection = cf.createConnection()) {
|
||||||
|
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
Queue queue = session.createQueue(QUEUE_NAME);
|
||||||
|
MessageConsumer consumer = session.createConsumer(queue);
|
||||||
|
connection.start();
|
||||||
|
}
|
||||||
|
|
||||||
|
AddressInfo info = server.getPostOffice().getAddressInfo(SimpleString.toSimpleString(QUEUE_NAME));
|
||||||
|
Assert.assertNotNull(info);
|
||||||
|
Assert.assertTrue(info.isAutoCreated());
|
||||||
|
|
||||||
|
server.stop();
|
||||||
|
server.start();
|
||||||
|
|
||||||
|
Assert.assertTrue(AssertionLoggerHandler.findText("AMQ224113"));
|
||||||
|
Assert.assertTrue(AssertionLoggerHandler.findText("AMQ224112"));
|
||||||
|
|
||||||
|
AssertionLoggerHandler.clear();
|
||||||
|
|
||||||
|
String randomString = "random " + RandomUtil.randomString();
|
||||||
|
|
||||||
|
try (Connection connection = cf.createConnection()) {
|
||||||
|
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
Queue queue = session.createQueue(QUEUE_NAME);
|
||||||
|
MessageProducer producer = session.createProducer(queue);
|
||||||
|
producer.send(session.createTextMessage(randomString));
|
||||||
|
}
|
||||||
|
|
||||||
|
info = server.getPostOffice().getAddressInfo(SimpleString.toSimpleString(QUEUE_NAME));
|
||||||
|
Assert.assertNotNull(info);
|
||||||
|
Assert.assertTrue(info.isAutoCreated());
|
||||||
|
|
||||||
|
server.stop();
|
||||||
|
server.start();
|
||||||
|
|
||||||
|
Assert.assertFalse(AssertionLoggerHandler.findText("AMQ224113")); // this time around the queue had messages, it has to exist
|
||||||
|
Assert.assertFalse(AssertionLoggerHandler.findText("AMQ224112"));
|
||||||
|
|
||||||
|
info = server.getPostOffice().getAddressInfo(SimpleString.toSimpleString(QUEUE_NAME));
|
||||||
|
Assert.assertNotNull(info);
|
||||||
|
Assert.assertTrue(info.isAutoCreated());
|
||||||
|
|
||||||
|
{ // just a namespace
|
||||||
|
org.apache.activemq.artemis.core.server.Queue serverQueue = server.locateQueue(QUEUE_NAME);
|
||||||
|
Wait.assertEquals(1, serverQueue::getMessageCount);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
try (Connection connection = cf.createConnection()) {
|
||||||
|
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
connection.start();
|
||||||
|
Queue queue = session.createQueue(QUEUE_NAME);
|
||||||
|
MessageConsumer consumer = session.createConsumer(queue);
|
||||||
|
TextMessage message = (TextMessage)consumer.receive(5000);
|
||||||
|
Assert.assertEquals(randomString, message.getText());
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue