This commit is contained in:
Justin Bertram 2021-05-24 13:02:08 -05:00
commit 8c2b80f234
2 changed files with 39 additions and 14 deletions

View File

@ -46,6 +46,7 @@ import java.util.function.ToLongFunction;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQNullRefException;
import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
@ -3572,22 +3573,24 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
private void createDeadLetterResources() throws Exception {
AddressSettings addressSettings = server.getAddressSettingsRepository().getMatch(getAddress().toString());
if (addressSettings.isAutoCreateDeadLetterResources() && !getAddress().equals(addressSettings.getDeadLetterAddress())) {
if (addressSettings.getDeadLetterAddress() != null && addressSettings.getDeadLetterAddress().length() != 0) {
SimpleString dlqName = addressSettings.getDeadLetterQueuePrefix().concat(getAddress()).concat(addressSettings.getDeadLetterQueueSuffix());
SimpleString dlqFilter = new SimpleString(String.format("%s = '%s'", Message.HDR_ORIGINAL_ADDRESS, getAddress()));
server.createQueue(new QueueConfiguration(dlqName).setAddress(addressSettings.getDeadLetterAddress()).setFilterString(dlqFilter).setAutoCreated(true).setAutoCreateAddress(true), true);
}
}
createResources(addressSettings.isAutoCreateDeadLetterResources(), addressSettings.getDeadLetterAddress(), addressSettings.getDeadLetterQueuePrefix(), addressSettings.getDeadLetterQueueSuffix());
}
private void createExpiryResources() throws Exception {
AddressSettings addressSettings = server.getAddressSettingsRepository().getMatch(getAddress().toString());
if (addressSettings.isAutoCreateExpiryResources() && !getAddress().equals(addressSettings.getExpiryAddress())) {
if (addressSettings.getExpiryAddress() != null && addressSettings.getExpiryAddress().length() != 0) {
SimpleString expiryQueueName = addressSettings.getExpiryQueuePrefix().concat(getAddress()).concat(addressSettings.getExpiryQueueSuffix());
SimpleString expiryFilter = new SimpleString(String.format("%s = '%s'", Message.HDR_ORIGINAL_ADDRESS, getAddress()));
server.createQueue(new QueueConfiguration(expiryQueueName).setAddress(addressSettings.getExpiryAddress()).setFilterString(expiryFilter).setAutoCreated(true).setAutoCreateAddress(true), true);
createResources(addressSettings.isAutoCreateExpiryResources(), addressSettings.getExpiryAddress(), addressSettings.getExpiryQueuePrefix(), addressSettings.getExpiryQueueSuffix());
}
private void createResources(boolean isAutoCreate, SimpleString destinationAddress, SimpleString prefix, SimpleString suffix) throws Exception {
if (isAutoCreate && !getAddress().equals(destinationAddress)) {
if (destinationAddress != null && destinationAddress.length() != 0) {
SimpleString destinationQueueName = prefix.concat(getAddress()).concat(suffix);
SimpleString filter = new SimpleString(String.format("%s = '%s'", Message.HDR_ORIGINAL_ADDRESS, getAddress()));
try {
server.createQueue(new QueueConfiguration(destinationQueueName).setAddress(destinationAddress).setFilterString(filter).setAutoCreated(true).setAutoCreateAddress(true), true);
} catch (ActiveMQQueueExistsException e) {
// ignore
}
}
}
}

View File

@ -43,6 +43,7 @@ public class AutoCreateExpiryResourcesTest extends ActiveMQTestBase {
public final SimpleString addressA = new SimpleString("addressA");
public final SimpleString queueA = new SimpleString("queueA");
public final SimpleString expiryAddress = new SimpleString("myExpiry");
public final long EXPIRY_DELAY = 100L;
private ActiveMQServer server;
@ -53,7 +54,7 @@ public class AutoCreateExpiryResourcesTest extends ActiveMQTestBase {
server = createServer(false);
// set common address settings needed for all tests; make sure to use getMatch instead of addMatch in invidual tests or these will be overwritten
server.getAddressSettingsRepository().addMatch("#", new AddressSettings().setAutoCreateExpiryResources(true).setExpiryAddress(expiryAddress).setExpiryDelay(100L));
server.getAddressSettingsRepository().addMatch("#", new AddressSettings().setAutoCreateExpiryResources(true).setExpiryAddress(expiryAddress).setExpiryDelay(EXPIRY_DELAY));
server.getConfiguration().setMessageExpiryScanPeriod(50L);
server.start();
@ -193,12 +194,33 @@ public class AutoCreateExpiryResourcesTest extends ActiveMQTestBase {
assertNotNull(context.createConsumer(context.createQueue(fqqn)).receive(2000));
}
@Test
public void testConcurrentExpirations() throws Exception {
SimpleString expiryQueueName = getDefaultExpiryQueueName(addressA);
final long COUNT = 5;
for (int i = 0; i < COUNT; i++) {
server.createQueue(new QueueConfiguration(i + "").setAddress(addressA).setRoutingType(RoutingType.MULTICAST));
}
triggerExpiration(false);
Wait.assertTrue(() -> server.locateQueue(expiryQueueName) != null, EXPIRY_DELAY * 2, 20);
Wait.assertEquals(COUNT, () -> server.locateQueue(expiryQueueName).getMessageCount(), EXPIRY_DELAY * 2, 20);
}
private SimpleString getDefaultExpiryQueueName(SimpleString address) {
return AddressSettings.DEFAULT_EXPIRY_QUEUE_PREFIX.concat(address).concat(AddressSettings.DEFAULT_EXPIRY_QUEUE_SUFFIX);
}
private void triggerExpiration() throws Exception {
server.createQueue(new QueueConfiguration(queueA).setAddress(addressA).setRoutingType(RoutingType.ANYCAST));
triggerExpiration(true);
}
private void triggerExpiration(boolean createQueue) throws Exception {
if (createQueue) {
server.createQueue(new QueueConfiguration(queueA).setAddress(addressA).setRoutingType(RoutingType.ANYCAST));
}
ServerLocator locator = createInVMNonHALocator();
ClientSessionFactory sessionFactory = createSessionFactory(locator);
ClientSession session = addClientSession(sessionFactory.createSession(true, false));