ARTEMIS-4664 - autoCreatedResource can get removed while receiving batch of messages
This commit is contained in:
parent
56ec308045
commit
45533c38da
|
@ -4090,6 +4090,9 @@ public class ActiveMQServerImpl implements ActiveMQServer {
|
|||
}
|
||||
final QueueBinding queueBinding = (QueueBinding) rawBinding;
|
||||
if (ignoreIfExists) {
|
||||
//Reset potentially ongoing auto-delete status of queue
|
||||
queueBinding.getQueue().setSwept(false);
|
||||
|
||||
return queueBinding.getQueue();
|
||||
} else {
|
||||
throw ActiveMQMessageBundle.BUNDLE.queueAlreadyExists(queueConfiguration.getName(), queueBinding.getAddress());
|
||||
|
|
|
@ -27,6 +27,8 @@ import org.apache.activemq.artemis.api.core.client.ClientProducer;
|
|||
import org.apache.activemq.artemis.api.core.client.ClientSession;
|
||||
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
|
||||
import org.apache.activemq.artemis.api.core.client.ServerLocator;
|
||||
import org.apache.activemq.artemis.api.core.management.QueueControl;
|
||||
import org.apache.activemq.artemis.api.core.management.ResourceNames;
|
||||
import org.apache.activemq.artemis.core.config.DivertConfiguration;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||
import org.apache.activemq.artemis.core.server.Queue;
|
||||
|
@ -256,6 +258,41 @@ public class AutoCreateDeadLetterResourcesTest extends ActiveMQTestBase {
|
|||
message.acknowledge();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testOngingSendToDLAPreventAutoDelete() throws Exception {
|
||||
final int messageCount = 100;
|
||||
SimpleString dlqName = AddressSettings.DEFAULT_DEAD_LETTER_QUEUE_PREFIX.concat(addressA).concat(AddressSettings.DEFAULT_DEAD_LETTER_QUEUE_SUFFIX);
|
||||
|
||||
server.createQueue(new QueueConfiguration(queueA).setAddress(addressA).setRoutingType(RoutingType.ANYCAST));
|
||||
|
||||
ServerLocator locator = createInVMNonHALocator();
|
||||
ClientSessionFactory sessionFactory = createSessionFactory(locator);
|
||||
ClientSession session = addClientSession(sessionFactory.createSession(true, true));
|
||||
ClientProducer producer = addClientProducer(session.createProducer(addressA));
|
||||
|
||||
ClientMessage message = session.createMessage(true);
|
||||
message.getBodyBuffer().writeBytes(createFakeLargeStream(1024 * 1024).readAllBytes());
|
||||
|
||||
for (int i = 0; i < messageCount; i++) {
|
||||
producer.send(message);
|
||||
}
|
||||
|
||||
QueueControl queueControl = (QueueControl)server.getManagementService().getResource(ResourceNames.QUEUE + queueA);
|
||||
queueControl.sendMessagesToDeadLetterAddress(null);
|
||||
|
||||
QueueControl dlqControl = (QueueControl)server.getManagementService().getResource(ResourceNames.QUEUE + dlqName);
|
||||
dlqControl.retryMessages();
|
||||
|
||||
for (int i = 0; i < 10; i++) {
|
||||
queueControl.sendMessagesToDeadLetterAddress(null);
|
||||
dlqControl.retryMessages();
|
||||
}
|
||||
|
||||
Wait.assertTrue(() -> queueControl.getMessageCount() == messageCount, 2000);
|
||||
Wait.assertTrue(() -> server.locateQueue(dlqName) == null, 2000);
|
||||
|
||||
}
|
||||
|
||||
private void triggerDlaDelivery() throws Exception {
|
||||
try {
|
||||
server.createQueue(new QueueConfiguration(queueA).setAddress(addressA).setRoutingType(RoutingType.ANYCAST));
|
||||
|
|
Loading…
Reference in New Issue