ARTEMIS-3159 errors during concurrent expiration with auto-creation
This commit is contained in:
parent
69cd877b96
commit
e543aa3bd5
|
@ -46,6 +46,7 @@ import java.util.function.ToLongFunction;
|
|||
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQNullRefException;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException;
|
||||
import org.apache.activemq.artemis.api.core.Message;
|
||||
import org.apache.activemq.artemis.api.core.Pair;
|
||||
import org.apache.activemq.artemis.api.core.QueueConfiguration;
|
||||
|
@ -3572,22 +3573,24 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
|
|||
|
||||
private void createDeadLetterResources() throws Exception {
|
||||
AddressSettings addressSettings = server.getAddressSettingsRepository().getMatch(getAddress().toString());
|
||||
if (addressSettings.isAutoCreateDeadLetterResources() && !getAddress().equals(addressSettings.getDeadLetterAddress())) {
|
||||
if (addressSettings.getDeadLetterAddress() != null && addressSettings.getDeadLetterAddress().length() != 0) {
|
||||
SimpleString dlqName = addressSettings.getDeadLetterQueuePrefix().concat(getAddress()).concat(addressSettings.getDeadLetterQueueSuffix());
|
||||
SimpleString dlqFilter = new SimpleString(String.format("%s = '%s'", Message.HDR_ORIGINAL_ADDRESS, getAddress()));
|
||||
server.createQueue(new QueueConfiguration(dlqName).setAddress(addressSettings.getDeadLetterAddress()).setFilterString(dlqFilter).setAutoCreated(true).setAutoCreateAddress(true), true);
|
||||
}
|
||||
}
|
||||
createResources(addressSettings.isAutoCreateDeadLetterResources(), addressSettings.getDeadLetterAddress(), addressSettings.getDeadLetterQueuePrefix(), addressSettings.getDeadLetterQueueSuffix());
|
||||
}
|
||||
|
||||
private void createExpiryResources() throws Exception {
|
||||
AddressSettings addressSettings = server.getAddressSettingsRepository().getMatch(getAddress().toString());
|
||||
if (addressSettings.isAutoCreateExpiryResources() && !getAddress().equals(addressSettings.getExpiryAddress())) {
|
||||
if (addressSettings.getExpiryAddress() != null && addressSettings.getExpiryAddress().length() != 0) {
|
||||
SimpleString expiryQueueName = addressSettings.getExpiryQueuePrefix().concat(getAddress()).concat(addressSettings.getExpiryQueueSuffix());
|
||||
SimpleString expiryFilter = new SimpleString(String.format("%s = '%s'", Message.HDR_ORIGINAL_ADDRESS, getAddress()));
|
||||
server.createQueue(new QueueConfiguration(expiryQueueName).setAddress(addressSettings.getExpiryAddress()).setFilterString(expiryFilter).setAutoCreated(true).setAutoCreateAddress(true), true);
|
||||
createResources(addressSettings.isAutoCreateExpiryResources(), addressSettings.getExpiryAddress(), addressSettings.getExpiryQueuePrefix(), addressSettings.getExpiryQueueSuffix());
|
||||
}
|
||||
|
||||
private void createResources(boolean isAutoCreate, SimpleString destinationAddress, SimpleString prefix, SimpleString suffix) throws Exception {
|
||||
if (isAutoCreate && !getAddress().equals(destinationAddress)) {
|
||||
if (destinationAddress != null && destinationAddress.length() != 0) {
|
||||
SimpleString destinationQueueName = prefix.concat(getAddress()).concat(suffix);
|
||||
SimpleString filter = new SimpleString(String.format("%s = '%s'", Message.HDR_ORIGINAL_ADDRESS, getAddress()));
|
||||
try {
|
||||
server.createQueue(new QueueConfiguration(destinationQueueName).setAddress(destinationAddress).setFilterString(filter).setAutoCreated(true).setAutoCreateAddress(true), true);
|
||||
} catch (ActiveMQQueueExistsException e) {
|
||||
// ignore
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -43,6 +43,7 @@ public class AutoCreateExpiryResourcesTest extends ActiveMQTestBase {
|
|||
public final SimpleString addressA = new SimpleString("addressA");
|
||||
public final SimpleString queueA = new SimpleString("queueA");
|
||||
public final SimpleString expiryAddress = new SimpleString("myExpiry");
|
||||
public final long EXPIRY_DELAY = 100L;
|
||||
|
||||
private ActiveMQServer server;
|
||||
|
||||
|
@ -53,7 +54,7 @@ public class AutoCreateExpiryResourcesTest extends ActiveMQTestBase {
|
|||
server = createServer(false);
|
||||
|
||||
// set common address settings needed for all tests; make sure to use getMatch instead of addMatch in invidual tests or these will be overwritten
|
||||
server.getAddressSettingsRepository().addMatch("#", new AddressSettings().setAutoCreateExpiryResources(true).setExpiryAddress(expiryAddress).setExpiryDelay(100L));
|
||||
server.getAddressSettingsRepository().addMatch("#", new AddressSettings().setAutoCreateExpiryResources(true).setExpiryAddress(expiryAddress).setExpiryDelay(EXPIRY_DELAY));
|
||||
server.getConfiguration().setMessageExpiryScanPeriod(50L);
|
||||
|
||||
server.start();
|
||||
|
@ -193,12 +194,33 @@ public class AutoCreateExpiryResourcesTest extends ActiveMQTestBase {
|
|||
assertNotNull(context.createConsumer(context.createQueue(fqqn)).receive(2000));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testConcurrentExpirations() throws Exception {
|
||||
SimpleString expiryQueueName = getDefaultExpiryQueueName(addressA);
|
||||
final long COUNT = 5;
|
||||
|
||||
for (int i = 0; i < COUNT; i++) {
|
||||
server.createQueue(new QueueConfiguration(i + "").setAddress(addressA).setRoutingType(RoutingType.MULTICAST));
|
||||
}
|
||||
|
||||
triggerExpiration(false);
|
||||
|
||||
Wait.assertTrue(() -> server.locateQueue(expiryQueueName) != null, EXPIRY_DELAY * 2, 20);
|
||||
Wait.assertEquals(COUNT, () -> server.locateQueue(expiryQueueName).getMessageCount(), EXPIRY_DELAY * 2, 20);
|
||||
}
|
||||
|
||||
private SimpleString getDefaultExpiryQueueName(SimpleString address) {
|
||||
return AddressSettings.DEFAULT_EXPIRY_QUEUE_PREFIX.concat(address).concat(AddressSettings.DEFAULT_EXPIRY_QUEUE_SUFFIX);
|
||||
}
|
||||
|
||||
private void triggerExpiration() throws Exception {
|
||||
server.createQueue(new QueueConfiguration(queueA).setAddress(addressA).setRoutingType(RoutingType.ANYCAST));
|
||||
triggerExpiration(true);
|
||||
}
|
||||
|
||||
private void triggerExpiration(boolean createQueue) throws Exception {
|
||||
if (createQueue) {
|
||||
server.createQueue(new QueueConfiguration(queueA).setAddress(addressA).setRoutingType(RoutingType.ANYCAST));
|
||||
}
|
||||
ServerLocator locator = createInVMNonHALocator();
|
||||
ClientSessionFactory sessionFactory = createSessionFactory(locator);
|
||||
ClientSession session = addClientSession(sessionFactory.createSession(true, false));
|
||||
|
|
Loading…
Reference in New Issue