diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java index 118ef69433..276bd9e3ec 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java @@ -46,6 +46,7 @@ import java.util.function.ToLongFunction; import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQNullRefException; +import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.Pair; import org.apache.activemq.artemis.api.core.QueueConfiguration; @@ -3572,22 +3573,24 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { private void createDeadLetterResources() throws Exception { AddressSettings addressSettings = server.getAddressSettingsRepository().getMatch(getAddress().toString()); - if (addressSettings.isAutoCreateDeadLetterResources() && !getAddress().equals(addressSettings.getDeadLetterAddress())) { - if (addressSettings.getDeadLetterAddress() != null && addressSettings.getDeadLetterAddress().length() != 0) { - SimpleString dlqName = addressSettings.getDeadLetterQueuePrefix().concat(getAddress()).concat(addressSettings.getDeadLetterQueueSuffix()); - SimpleString dlqFilter = new SimpleString(String.format("%s = '%s'", Message.HDR_ORIGINAL_ADDRESS, getAddress())); - server.createQueue(new QueueConfiguration(dlqName).setAddress(addressSettings.getDeadLetterAddress()).setFilterString(dlqFilter).setAutoCreated(true).setAutoCreateAddress(true), true); - } - } + createResources(addressSettings.isAutoCreateDeadLetterResources(), addressSettings.getDeadLetterAddress(), addressSettings.getDeadLetterQueuePrefix(), addressSettings.getDeadLetterQueueSuffix()); } private void createExpiryResources() throws Exception { AddressSettings addressSettings = server.getAddressSettingsRepository().getMatch(getAddress().toString()); - if (addressSettings.isAutoCreateExpiryResources() && !getAddress().equals(addressSettings.getExpiryAddress())) { - if (addressSettings.getExpiryAddress() != null && addressSettings.getExpiryAddress().length() != 0) { - SimpleString expiryQueueName = addressSettings.getExpiryQueuePrefix().concat(getAddress()).concat(addressSettings.getExpiryQueueSuffix()); - SimpleString expiryFilter = new SimpleString(String.format("%s = '%s'", Message.HDR_ORIGINAL_ADDRESS, getAddress())); - server.createQueue(new QueueConfiguration(expiryQueueName).setAddress(addressSettings.getExpiryAddress()).setFilterString(expiryFilter).setAutoCreated(true).setAutoCreateAddress(true), true); + createResources(addressSettings.isAutoCreateExpiryResources(), addressSettings.getExpiryAddress(), addressSettings.getExpiryQueuePrefix(), addressSettings.getExpiryQueueSuffix()); + } + + private void createResources(boolean isAutoCreate, SimpleString destinationAddress, SimpleString prefix, SimpleString suffix) throws Exception { + if (isAutoCreate && !getAddress().equals(destinationAddress)) { + if (destinationAddress != null && destinationAddress.length() != 0) { + SimpleString destinationQueueName = prefix.concat(getAddress()).concat(suffix); + SimpleString filter = new SimpleString(String.format("%s = '%s'", Message.HDR_ORIGINAL_ADDRESS, getAddress())); + try { + server.createQueue(new QueueConfiguration(destinationQueueName).setAddress(destinationAddress).setFilterString(filter).setAutoCreated(true).setAutoCreateAddress(true), true); + } catch (ActiveMQQueueExistsException e) { + // ignore + } } } } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/AutoCreateExpiryResourcesTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/AutoCreateExpiryResourcesTest.java index c3ac2813b5..7d46785c2a 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/AutoCreateExpiryResourcesTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/AutoCreateExpiryResourcesTest.java @@ -43,6 +43,7 @@ public class AutoCreateExpiryResourcesTest extends ActiveMQTestBase { public final SimpleString addressA = new SimpleString("addressA"); public final SimpleString queueA = new SimpleString("queueA"); public final SimpleString expiryAddress = new SimpleString("myExpiry"); + public final long EXPIRY_DELAY = 100L; private ActiveMQServer server; @@ -53,7 +54,7 @@ public class AutoCreateExpiryResourcesTest extends ActiveMQTestBase { server = createServer(false); // set common address settings needed for all tests; make sure to use getMatch instead of addMatch in invidual tests or these will be overwritten - server.getAddressSettingsRepository().addMatch("#", new AddressSettings().setAutoCreateExpiryResources(true).setExpiryAddress(expiryAddress).setExpiryDelay(100L)); + server.getAddressSettingsRepository().addMatch("#", new AddressSettings().setAutoCreateExpiryResources(true).setExpiryAddress(expiryAddress).setExpiryDelay(EXPIRY_DELAY)); server.getConfiguration().setMessageExpiryScanPeriod(50L); server.start(); @@ -193,12 +194,33 @@ public class AutoCreateExpiryResourcesTest extends ActiveMQTestBase { assertNotNull(context.createConsumer(context.createQueue(fqqn)).receive(2000)); } + @Test + public void testConcurrentExpirations() throws Exception { + SimpleString expiryQueueName = getDefaultExpiryQueueName(addressA); + final long COUNT = 5; + + for (int i = 0; i < COUNT; i++) { + server.createQueue(new QueueConfiguration(i + "").setAddress(addressA).setRoutingType(RoutingType.MULTICAST)); + } + + triggerExpiration(false); + + Wait.assertTrue(() -> server.locateQueue(expiryQueueName) != null, EXPIRY_DELAY * 2, 20); + Wait.assertEquals(COUNT, () -> server.locateQueue(expiryQueueName).getMessageCount(), EXPIRY_DELAY * 2, 20); + } + private SimpleString getDefaultExpiryQueueName(SimpleString address) { return AddressSettings.DEFAULT_EXPIRY_QUEUE_PREFIX.concat(address).concat(AddressSettings.DEFAULT_EXPIRY_QUEUE_SUFFIX); } private void triggerExpiration() throws Exception { - server.createQueue(new QueueConfiguration(queueA).setAddress(addressA).setRoutingType(RoutingType.ANYCAST)); + triggerExpiration(true); + } + + private void triggerExpiration(boolean createQueue) throws Exception { + if (createQueue) { + server.createQueue(new QueueConfiguration(queueA).setAddress(addressA).setRoutingType(RoutingType.ANYCAST)); + } ServerLocator locator = createInVMNonHALocator(); ClientSessionFactory sessionFactory = createSessionFactory(locator); ClientSession session = addClientSession(sessionFactory.createSession(true, false));