diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java index b7efc1c721..b7bb6676c1 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java @@ -1605,6 +1605,11 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { return; } + if (address.startsWith(server.getConfiguration().getManagementAddress())) { + logger.debug("Queue {} is a management address, ignoring it for redistribution", address); + return; + } + clearRedistributorFuture(); if (redistributor != null) { diff --git a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/brokerConnection/mirror/ClusteredMirrorSoakTest.java b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/brokerConnection/mirror/ClusteredMirrorSoakTest.java index a2f765036c..d4ae867009 100644 --- a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/brokerConnection/mirror/ClusteredMirrorSoakTest.java +++ b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/brokerConnection/mirror/ClusteredMirrorSoakTest.java @@ -29,6 +29,7 @@ import javax.jms.Topic; import java.io.File; import java.io.StringWriter; import java.lang.invoke.MethodHandles; +import java.util.HashSet; import java.util.Properties; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; @@ -120,9 +121,9 @@ public class ClusteredMirrorSoakTest extends SoakTestBase { @Before public void cleanupServers() { cleanupData(DC1_NODE_A); + cleanupData(DC1_NODE_B); cleanupData(DC2_NODE_A); cleanupData(DC2_NODE_B); - cleanupData(DC2_NODE_B); } @@ -227,6 +228,8 @@ public class ClusteredMirrorSoakTest extends SoakTestBase { private CountDownLatch startConsumer(Executor executor, ConnectionFactory factory, String queue, AtomicBoolean running, AtomicInteger errorCount, AtomicInteger receivedCount) { CountDownLatch done = new CountDownLatch(1); + HashSet receivedMessages = new HashSet<>(); + executor.execute(() -> { try { try (Connection connection = factory.createConnection()) { @@ -237,6 +240,11 @@ public class ClusteredMirrorSoakTest extends SoakTestBase { Message message = consumer.receive(100); if (message != null) { receivedCount.incrementAndGet(); + Integer receivedI = message.getIntProperty("i"); + if (!receivedMessages.add(receivedI)) { + errorCount.incrementAndGet(); + logger.warn("Message {}, isLarge={} received in duplicate", receivedI, message.getBooleanProperty("large")); + } } } } @@ -302,6 +310,7 @@ public class ClusteredMirrorSoakTest extends SoakTestBase { final int numberOfMessages = 50; ConnectionFactory connectionFactoryDC1A = CFUtil.createConnectionFactory("amqp", DC1_NODEA_URI); + ConnectionFactory connectionFactoryDC2A = CFUtil.createConnectionFactory("amqp", DC2_NODEA_URI); ConnectionFactory connectionFactoryDC2B = CFUtil.createConnectionFactory("amqp", DC2_NODEB_URI); AtomicBoolean runningConsumers = new AtomicBoolean(true); @@ -309,30 +318,54 @@ public class ClusteredMirrorSoakTest extends SoakTestBase { AtomicInteger errors = new AtomicInteger(0); AtomicInteger receiverCount = new AtomicInteger(0); - SimpleManagement simpleManagementDC1A = new SimpleManagement(DC1_NODEA_URI, null, null); - SimpleManagement simpleManagementDC1B = new SimpleManagement(DC1_NODEB_URI, null, null); - SimpleManagement simpleManagementDC2A = new SimpleManagement(DC2_NODEA_URI, null, null); - SimpleManagement simpleManagementDC2B = new SimpleManagement(DC2_NODEB_URI, null, null); + try (SimpleManagement simpleManagementDC1A = new SimpleManagement(DC1_NODEA_URI, null, null); + SimpleManagement simpleManagementDC1B = new SimpleManagement(DC1_NODEB_URI, null, null); + SimpleManagement simpleManagementDC2A = new SimpleManagement(DC2_NODEA_URI, null, null); + SimpleManagement simpleManagementDC2B = new SimpleManagement(DC2_NODEB_URI, null, null)) { - CountDownLatch doneDC2B = startConsumer(executorService, connectionFactoryDC2B, queueName, runningConsumers, errors, receiverCount); + Assert.assertFalse(findQueue(simpleManagementDC1A, queueName)); + Assert.assertFalse(findQueue(simpleManagementDC1B, queueName)); + Assert.assertFalse(findQueue(simpleManagementDC2A, queueName)); + Assert.assertFalse(findQueue(simpleManagementDC2B, queueName)); - sendMessages(connectionFactoryDC1A, queueName, numberOfMessages, 10); + // just to allow auto-creation to kick in.... + Assert.assertTrue(startConsumer(executorService, connectionFactoryDC2A, queueName, new AtomicBoolean(false), errors, receiverCount).await(1, TimeUnit.MINUTES)); + Assert.assertTrue(startConsumer(executorService, connectionFactoryDC2B, queueName, new AtomicBoolean(false), errors, receiverCount).await(1, TimeUnit.MINUTES)); - Wait.assertEquals(numberOfMessages, receiverCount::get, 30_000); + Wait.assertTrue(() -> findQueue(simpleManagementDC1A, queueName)); + Wait.assertTrue(() -> findQueue(simpleManagementDC1B, queueName)); + Wait.assertTrue(() -> findQueue(simpleManagementDC2A, queueName)); + Wait.assertTrue(() -> findQueue(simpleManagementDC2B, queueName)); - Wait.assertTrue(() -> findQueue(simpleManagementDC1A, queueName)); - Wait.assertTrue(() -> findQueue(simpleManagementDC1B, queueName)); - Wait.assertTrue(() -> findQueue(simpleManagementDC2A, queueName)); - Wait.assertTrue(() -> findQueue(simpleManagementDC2B, queueName)); + sendMessages(connectionFactoryDC1A, queueName, numberOfMessages, 10); - Wait.assertEquals(0, () -> simpleManagementDC1A.getDeliveringCountOnQueue(queueName), 5000); - Wait.assertEquals(0, () -> simpleManagementDC1B.getDeliveringCountOnQueue(queueName), 5000); - Wait.assertEquals(0, () -> simpleManagementDC2A.getDeliveringCountOnQueue(queueName), 5000); - Wait.assertEquals(0, () -> simpleManagementDC2B.getDeliveringCountOnQueue(queueName), 5000); + Wait.assertEquals(numberOfMessages, () -> simpleManagementDC1A.getMessageCountOnQueue(queueName), 5000); + Wait.assertEquals(0, () -> simpleManagementDC1B.getMessageCountOnQueue(queueName), 5000); + Wait.assertEquals(numberOfMessages, () -> simpleManagementDC2A.getMessageCountOnQueue(queueName), 5000); + Wait.assertEquals(0, () -> simpleManagementDC2B.getMessageCountOnQueue(queueName), 5000); - runningConsumers.set(false); + CountDownLatch doneDC2B = startConsumer(executorService, connectionFactoryDC2B, queueName, runningConsumers, errors, receiverCount); + Wait.assertEquals(numberOfMessages, receiverCount::get, 30_000); - Assert.assertTrue(doneDC2B.await(5, TimeUnit.SECONDS)); + Wait.assertTrue(() -> findQueue(simpleManagementDC1A, queueName)); + Wait.assertTrue(() -> findQueue(simpleManagementDC1B, queueName)); + Wait.assertTrue(() -> findQueue(simpleManagementDC2A, queueName)); + Wait.assertTrue(() -> findQueue(simpleManagementDC2B, queueName)); + + Wait.assertEquals(0, () -> simpleManagementDC1A.getDeliveringCountOnQueue(queueName), 5000); + Wait.assertEquals(0, () -> simpleManagementDC1B.getDeliveringCountOnQueue(queueName), 5000); + Wait.assertEquals(0, () -> simpleManagementDC2A.getDeliveringCountOnQueue(queueName), 5000); + Wait.assertEquals(0, () -> simpleManagementDC2B.getDeliveringCountOnQueue(queueName), 5000); + Wait.assertEquals(0, () -> simpleManagementDC1A.getMessageCountOnQueue(queueName), 5000); + Wait.assertEquals(0, () -> simpleManagementDC1B.getMessageCountOnQueue(queueName), 5000); + Wait.assertEquals(0, () -> simpleManagementDC2A.getMessageCountOnQueue(queueName), 5000); + Wait.assertEquals(0, () -> simpleManagementDC2B.getMessageCountOnQueue(queueName), 5000); + + runningConsumers.set(false); + + Assert.assertTrue(doneDC2B.await(5, TimeUnit.SECONDS)); + Assert.assertEquals(0, errors.get()); + } } @Test