ARTEMIS-4684 Internal queues should not redistribute

This commit is contained in:
Clebert Suconic 2024-03-13 15:01:20 -04:00 committed by clebertsuconic
parent 11b7671960
commit d864780293
2 changed files with 56 additions and 18 deletions

View File

@ -1605,6 +1605,11 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
return;
}
if (address.startsWith(server.getConfiguration().getManagementAddress())) {
logger.debug("Queue {} is a management address, ignoring it for redistribution", address);
return;
}
clearRedistributorFuture();
if (redistributor != null) {

View File

@ -29,6 +29,7 @@ import javax.jms.Topic;
import java.io.File;
import java.io.StringWriter;
import java.lang.invoke.MethodHandles;
import java.util.HashSet;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
@ -120,9 +121,9 @@ public class ClusteredMirrorSoakTest extends SoakTestBase {
@Before
public void cleanupServers() {
cleanupData(DC1_NODE_A);
cleanupData(DC1_NODE_B);
cleanupData(DC2_NODE_A);
cleanupData(DC2_NODE_B);
cleanupData(DC2_NODE_B);
}
@ -227,6 +228,8 @@ public class ClusteredMirrorSoakTest extends SoakTestBase {
private CountDownLatch startConsumer(Executor executor, ConnectionFactory factory, String queue, AtomicBoolean running, AtomicInteger errorCount, AtomicInteger receivedCount) {
CountDownLatch done = new CountDownLatch(1);
HashSet<Integer> receivedMessages = new HashSet<>();
executor.execute(() -> {
try {
try (Connection connection = factory.createConnection()) {
@ -237,6 +240,11 @@ public class ClusteredMirrorSoakTest extends SoakTestBase {
Message message = consumer.receive(100);
if (message != null) {
receivedCount.incrementAndGet();
Integer receivedI = message.getIntProperty("i");
if (!receivedMessages.add(receivedI)) {
errorCount.incrementAndGet();
logger.warn("Message {}, isLarge={} received in duplicate", receivedI, message.getBooleanProperty("large"));
}
}
}
}
@ -302,6 +310,7 @@ public class ClusteredMirrorSoakTest extends SoakTestBase {
final int numberOfMessages = 50;
ConnectionFactory connectionFactoryDC1A = CFUtil.createConnectionFactory("amqp", DC1_NODEA_URI);
ConnectionFactory connectionFactoryDC2A = CFUtil.createConnectionFactory("amqp", DC2_NODEA_URI);
ConnectionFactory connectionFactoryDC2B = CFUtil.createConnectionFactory("amqp", DC2_NODEB_URI);
AtomicBoolean runningConsumers = new AtomicBoolean(true);
@ -309,30 +318,54 @@ public class ClusteredMirrorSoakTest extends SoakTestBase {
AtomicInteger errors = new AtomicInteger(0);
AtomicInteger receiverCount = new AtomicInteger(0);
SimpleManagement simpleManagementDC1A = new SimpleManagement(DC1_NODEA_URI, null, null);
SimpleManagement simpleManagementDC1B = new SimpleManagement(DC1_NODEB_URI, null, null);
SimpleManagement simpleManagementDC2A = new SimpleManagement(DC2_NODEA_URI, null, null);
SimpleManagement simpleManagementDC2B = new SimpleManagement(DC2_NODEB_URI, null, null);
try (SimpleManagement simpleManagementDC1A = new SimpleManagement(DC1_NODEA_URI, null, null);
SimpleManagement simpleManagementDC1B = new SimpleManagement(DC1_NODEB_URI, null, null);
SimpleManagement simpleManagementDC2A = new SimpleManagement(DC2_NODEA_URI, null, null);
SimpleManagement simpleManagementDC2B = new SimpleManagement(DC2_NODEB_URI, null, null)) {
CountDownLatch doneDC2B = startConsumer(executorService, connectionFactoryDC2B, queueName, runningConsumers, errors, receiverCount);
Assert.assertFalse(findQueue(simpleManagementDC1A, queueName));
Assert.assertFalse(findQueue(simpleManagementDC1B, queueName));
Assert.assertFalse(findQueue(simpleManagementDC2A, queueName));
Assert.assertFalse(findQueue(simpleManagementDC2B, queueName));
sendMessages(connectionFactoryDC1A, queueName, numberOfMessages, 10);
// just to allow auto-creation to kick in....
Assert.assertTrue(startConsumer(executorService, connectionFactoryDC2A, queueName, new AtomicBoolean(false), errors, receiverCount).await(1, TimeUnit.MINUTES));
Assert.assertTrue(startConsumer(executorService, connectionFactoryDC2B, queueName, new AtomicBoolean(false), errors, receiverCount).await(1, TimeUnit.MINUTES));
Wait.assertEquals(numberOfMessages, receiverCount::get, 30_000);
Wait.assertTrue(() -> findQueue(simpleManagementDC1A, queueName));
Wait.assertTrue(() -> findQueue(simpleManagementDC1B, queueName));
Wait.assertTrue(() -> findQueue(simpleManagementDC2A, queueName));
Wait.assertTrue(() -> findQueue(simpleManagementDC2B, queueName));
Wait.assertTrue(() -> findQueue(simpleManagementDC1A, queueName));
Wait.assertTrue(() -> findQueue(simpleManagementDC1B, queueName));
Wait.assertTrue(() -> findQueue(simpleManagementDC2A, queueName));
Wait.assertTrue(() -> findQueue(simpleManagementDC2B, queueName));
sendMessages(connectionFactoryDC1A, queueName, numberOfMessages, 10);
Wait.assertEquals(0, () -> simpleManagementDC1A.getDeliveringCountOnQueue(queueName), 5000);
Wait.assertEquals(0, () -> simpleManagementDC1B.getDeliveringCountOnQueue(queueName), 5000);
Wait.assertEquals(0, () -> simpleManagementDC2A.getDeliveringCountOnQueue(queueName), 5000);
Wait.assertEquals(0, () -> simpleManagementDC2B.getDeliveringCountOnQueue(queueName), 5000);
Wait.assertEquals(numberOfMessages, () -> simpleManagementDC1A.getMessageCountOnQueue(queueName), 5000);
Wait.assertEquals(0, () -> simpleManagementDC1B.getMessageCountOnQueue(queueName), 5000);
Wait.assertEquals(numberOfMessages, () -> simpleManagementDC2A.getMessageCountOnQueue(queueName), 5000);
Wait.assertEquals(0, () -> simpleManagementDC2B.getMessageCountOnQueue(queueName), 5000);
runningConsumers.set(false);
CountDownLatch doneDC2B = startConsumer(executorService, connectionFactoryDC2B, queueName, runningConsumers, errors, receiverCount);
Wait.assertEquals(numberOfMessages, receiverCount::get, 30_000);
Assert.assertTrue(doneDC2B.await(5, TimeUnit.SECONDS));
Wait.assertTrue(() -> findQueue(simpleManagementDC1A, queueName));
Wait.assertTrue(() -> findQueue(simpleManagementDC1B, queueName));
Wait.assertTrue(() -> findQueue(simpleManagementDC2A, queueName));
Wait.assertTrue(() -> findQueue(simpleManagementDC2B, queueName));
Wait.assertEquals(0, () -> simpleManagementDC1A.getDeliveringCountOnQueue(queueName), 5000);
Wait.assertEquals(0, () -> simpleManagementDC1B.getDeliveringCountOnQueue(queueName), 5000);
Wait.assertEquals(0, () -> simpleManagementDC2A.getDeliveringCountOnQueue(queueName), 5000);
Wait.assertEquals(0, () -> simpleManagementDC2B.getDeliveringCountOnQueue(queueName), 5000);
Wait.assertEquals(0, () -> simpleManagementDC1A.getMessageCountOnQueue(queueName), 5000);
Wait.assertEquals(0, () -> simpleManagementDC1B.getMessageCountOnQueue(queueName), 5000);
Wait.assertEquals(0, () -> simpleManagementDC2A.getMessageCountOnQueue(queueName), 5000);
Wait.assertEquals(0, () -> simpleManagementDC2B.getMessageCountOnQueue(queueName), 5000);
runningConsumers.set(false);
Assert.assertTrue(doneDC2B.await(5, TimeUnit.SECONDS));
Assert.assertEquals(0, errors.get());
}
}
@Test