From fdf2ea874bd903aba446d0bca377793028ed56b4 Mon Sep 17 00:00:00 2001 From: Clebert Suconic Date: Fri, 19 Apr 2024 14:50:11 -0400 Subject: [PATCH] ARTEMIS-4733 Infinite mirror reflections after CreateAddress --- .../api/core/management/SimpleManagement.java | 4 + .../core/postoffice/impl/PostOfficeImpl.java | 8 +- .../mirror/ClusteredMirrorSoakTest.java | 186 +++++++++++++++--- 3 files changed, 168 insertions(+), 30 deletions(-) diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/SimpleManagement.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/SimpleManagement.java index 1b46ab264b..ece36b40b7 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/SimpleManagement.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/SimpleManagement.java @@ -118,6 +118,10 @@ public class SimpleManagement implements AutoCloseable { return simpleManagementLong(ResourceNames.QUEUE + queueName, "getMessageCount"); } + public long getMessageAddedOnQueue(String queueName) throws Exception { + return simpleManagementLong(ResourceNames.QUEUE + queueName, "getMessagesAdded"); + } + public int getDeliveringCountOnQueue(String queueName) throws Exception { return simpleManagementInt(ResourceNames.QUEUE + queueName, "getDeliveringCount"); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java index 752cb23cab..1e21bb2639 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java @@ -542,10 +542,6 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding server.callBrokerAddressPlugins(plugin -> plugin.beforeAddAddress(addressInfo, reload)); } - if (!reload && mirrorControllerSource != null) { - mirrorControllerSource.addAddress(addressInfo); - } - boolean result; if (reload) { result = addressManager.reloadAddressInfo(addressInfo); @@ -554,6 +550,10 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding } // only register address if it is new if (result) { + if (!reload && mirrorControllerSource != null) { + mirrorControllerSource.addAddress(addressInfo); + } + try { managementService.registerAddress(addressInfo); diff --git a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/brokerConnection/mirror/ClusteredMirrorSoakTest.java b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/brokerConnection/mirror/ClusteredMirrorSoakTest.java index d4ae867009..95f54f5593 100644 --- a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/brokerConnection/mirror/ClusteredMirrorSoakTest.java +++ b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/brokerConnection/mirror/ClusteredMirrorSoakTest.java @@ -39,8 +39,12 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.activemq.artemis.api.core.QueueConfiguration; +import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.management.SimpleManagement; import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectionAddressType; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.tests.soak.SoakTestBase; import org.apache.activemq.artemis.tests.util.CFUtil; import org.apache.activemq.artemis.tests.util.RandomUtil; @@ -49,8 +53,6 @@ import org.apache.activemq.artemis.utils.FileUtil; import org.apache.activemq.artemis.utils.Wait; import org.apache.activemq.artemis.utils.cli.helper.HelperCreate; import org.junit.Assert; -import org.junit.Before; -import org.junit.BeforeClass; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -80,18 +82,17 @@ public class ClusteredMirrorSoakTest extends SoakTestBase { Process processDC2_node_A; Process processDC2_node_B; - private static String DC1_NODEA_URI = "tcp://localhost:61616"; private static String DC1_NODEB_URI = "tcp://localhost:61617"; private static String DC2_NODEA_URI = "tcp://localhost:61618"; private static String DC2_NODEB_URI = "tcp://localhost:61619"; - private static void createServer(String serverName, String connectionName, String clusterURI, String mirrorURI, int porOffset) throws Exception { + private static void createServer(String serverName, String connectionName, String clusterURI, String mirrorURI, int porOffset, boolean paging) throws Exception { File serverLocation = getFileServerLocation(serverName); deleteDirectory(serverLocation); HelperCreate cliCreateServer = new HelperCreate(); - cliCreateServer.setAllowAnonymous(true).setNoWeb(true).setArtemisInstance(serverLocation); + cliCreateServer.setAllowAnonymous(true).setArtemisInstance(serverLocation); cliCreateServer.setMessageLoadBalancing("ON_DEMAND"); cliCreateServer.setClustered(true); cliCreateServer.setNoWeb(true); @@ -115,24 +116,16 @@ public class ClusteredMirrorSoakTest extends SoakTestBase { Assert.assertTrue(brokerXml.exists()); // Adding redistribution delay to broker configuration Assert.assertTrue(FileUtil.findReplace(brokerXml, "", "\n\n" + " 0 \n")); + if (paging) { + Assert.assertTrue(FileUtil.findReplace(brokerXml, "-1", "1")); + } } - - @Before - public void cleanupServers() { - cleanupData(DC1_NODE_A); - cleanupData(DC1_NODE_B); - cleanupData(DC2_NODE_A); - cleanupData(DC2_NODE_B); - } - - - @BeforeClass - public static void createServers() throws Exception { - createServer(DC1_NODE_A, "mirror", DC1_NODEB_URI, DC2_NODEA_URI, 0); - createServer(DC1_NODE_B, "mirror", DC1_NODEA_URI, DC2_NODEB_URI, 1); - createServer(DC2_NODE_A, "mirror", DC2_NODEB_URI, DC1_NODEA_URI, 2); - createServer(DC2_NODE_B, "mirror", DC2_NODEA_URI, DC1_NODEB_URI, 3); + public static void createRealServers(boolean paging) throws Exception { + createServer(DC1_NODE_A, "mirror", DC1_NODEB_URI, DC2_NODEA_URI, 0, paging); + createServer(DC1_NODE_B, "mirror", DC1_NODEA_URI, DC2_NODEB_URI, 1, paging); + createServer(DC2_NODE_A, "mirror", DC2_NODEB_URI, DC1_NODEA_URI, 2, paging); + createServer(DC2_NODE_B, "mirror", DC2_NODEA_URI, DC1_NODEB_URI, 3, paging); } private void startServers() throws Exception { @@ -147,8 +140,147 @@ public class ClusteredMirrorSoakTest extends SoakTestBase { ServerUtil.waitForServerToStart(3, 10_000); } + @Test + public void testAvoidReflections() throws Exception { + createRealServers(true); + + String internalQueue = "INTERNAL_QUEUE"; + + ActiveMQServer tempServer = createServer(true); + tempServer.getConfiguration().setBindingsDirectory(getServerLocation(DC1_NODE_A) + "/data/bindings"); + tempServer.getConfiguration().setJournalDirectory(getServerLocation(DC1_NODE_A) + "/data/journal"); + tempServer.getConfiguration().setJournalFileSize(10 * 1024 * 1024); + tempServer.start(); + tempServer.addAddressInfo(new AddressInfo(internalQueue).addRoutingType(RoutingType.ANYCAST).setInternal(true)); + tempServer.createQueue(new QueueConfiguration(internalQueue).setDurable(true).setRoutingType(RoutingType.ANYCAST).setInternal(true).setAddress(internalQueue)); + tempServer.stop(); + + startServers(); + + SimpleManagement simpleManagementDC1A = new SimpleManagement(DC1_NODEA_URI, null, null); + SimpleManagement simpleManagementDC2A = new SimpleManagement(DC2_NODEA_URI, null, null); + SimpleManagement simpleManagementDC1B = new SimpleManagement(DC1_NODEA_URI, null, null); + SimpleManagement simpleManagementDC2B = new SimpleManagement(DC2_NODEB_URI, null, null); + + String snfQueue = "$ACTIVEMQ_ARTEMIS_MIRROR_mirror"; + + String queueName = "myQueue"; + String topicName = "order"; + + for (int i = 0; i < 5; i++) { + logger.info("DC1A={}", simpleManagementDC1A.getMessageAddedOnQueue(snfQueue)); + logger.info("DC1B={}", simpleManagementDC1B.getMessageAddedOnQueue(snfQueue)); + logger.info("DC2A={}", simpleManagementDC2A.getMessageAddedOnQueue(snfQueue)); + logger.info("DC2B={}", simpleManagementDC2B.getMessageAddedOnQueue(snfQueue)); + + // no load generated.. just initial queues should have been sent + Assert.assertTrue(simpleManagementDC1A.getMessageAddedOnQueue(snfQueue) < 20); + Assert.assertTrue(simpleManagementDC2A.getMessageAddedOnQueue(snfQueue) < 20); + Assert.assertTrue(simpleManagementDC1B.getMessageAddedOnQueue(snfQueue) < 20); + Assert.assertTrue(simpleManagementDC2B.getMessageAddedOnQueue(snfQueue) < 20); + Thread.sleep(100); + } + + Assert.assertEquals(0, simpleManagementDC2A.getMessageCountOnQueue(queueName)); + Assert.assertEquals(0, simpleManagementDC1A.getMessageCountOnQueue(internalQueue)); + try { + simpleManagementDC2A.getMessageCountOnQueue(internalQueue); + Assert.fail("Exception expected"); + } catch (Exception expected) { + } + + ConnectionFactory connectionFactoryDC1A = CFUtil.createConnectionFactory("amqp", DC1_NODEA_URI); + + int numberOfMessages = 1_000; + + try (Connection connection = connectionFactoryDC1A.createConnection()) { + connection.setClientID("conn1"); + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + Topic topic = session.createTopic(topicName); + MessageConsumer con = session.createDurableConsumer(topic, "hello1"); + MessageConsumer con2 = session.createDurableConsumer(topic, "hello2"); + + MessageProducer producer = session.createProducer(topic); + for (int i = 0; i < numberOfMessages; i++) { + if (i % 100 == 0) { + logger.info("Sent topic {}", i); + } + producer.send(session.createTextMessage("hello " + i)); + } + session.commit(); + + } + + try (Connection connection = connectionFactoryDC1A.createConnection()) { + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + Queue queue = session.createQueue(queueName); + + MessageProducer producer = session.createProducer(queue); + for (int i = 0; i < numberOfMessages; i++) { + if (i % 100 == 0) { + logger.info("Sent queue {}", i); + } + producer.send(session.createTextMessage("hello " + i)); + } + session.commit(); + } + + Wait.assertEquals(numberOfMessages, () -> simpleManagementDC1A.getMessageCountOnQueue(queueName), 5000); + Wait.assertEquals(numberOfMessages, () -> simpleManagementDC2A.getMessageCountOnQueue(queueName), 5000); + Wait.assertEquals(numberOfMessages, () -> simpleManagementDC1A.getMessageCountOnQueue("conn1.hello2"), 5000); + Wait.assertEquals(numberOfMessages, () -> simpleManagementDC1A.getMessageCountOnQueue("conn1.hello2"), 5000); + Wait.assertEquals(numberOfMessages, () -> simpleManagementDC2A.getMessageCountOnQueue("conn1.hello2"), 5000); + Wait.assertEquals(numberOfMessages, () -> simpleManagementDC2A.getMessageCountOnQueue("conn1.hello2"), 5000); + + try (Connection connection = connectionFactoryDC1A.createConnection()) { + connection.setClientID("conn1"); + connection.start(); + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + Topic topic = session.createTopic(topicName); + Queue queue = session.createQueue(queueName); + MessageConsumer[] consumers = new MessageConsumer[] {session.createDurableSubscriber(topic, "hello1"), session.createDurableSubscriber(topic, "hello2"), session.createConsumer(queue)}; + + for (MessageConsumer c : consumers) { + for (int i = 0; i < numberOfMessages; i++) { + Assert.assertNotNull(c.receive(5000)); + if (i % 100 == 0) { + session.commit(); + } + } + session.commit(); + } + } + + Wait.assertEquals(0, () -> simpleManagementDC1A.getMessageCountOnQueue(queueName), 5000); + Wait.assertEquals(0, () -> simpleManagementDC2A.getMessageCountOnQueue(queueName), 5000); + Wait.assertEquals(0, () -> simpleManagementDC1A.getMessageCountOnQueue("conn1.hello2"), 5000); + Wait.assertEquals(0, () -> simpleManagementDC1A.getMessageCountOnQueue("conn1.hello2"), 5000); + Wait.assertEquals(0, () -> simpleManagementDC2A.getMessageCountOnQueue("conn1.hello2"), 5000); + Wait.assertEquals(0, () -> simpleManagementDC2A.getMessageCountOnQueue("conn1.hello2"), 5000); + + long countDC1A = simpleManagementDC1A.getMessageAddedOnQueue(snfQueue); + long countDC1B = simpleManagementDC1B.getMessageAddedOnQueue(snfQueue); + + for (int i = 0; i < 10; i++) { + // DC1 should be quiet and nothing moving out of it + Assert.assertEquals(countDC1A, simpleManagementDC1A.getMessageAddedOnQueue(snfQueue)); + Assert.assertEquals(countDC1B, simpleManagementDC1B.getMessageAddedOnQueue(snfQueue)); + + // DC2 is totally passive, nothing should have been generated + Assert.assertTrue(simpleManagementDC2A.getMessageAddedOnQueue(snfQueue) < 20); + Assert.assertTrue(simpleManagementDC2B.getMessageAddedOnQueue(snfQueue) < 20); + // we take intervals, allowing to make sure it doesn't grow + Thread.sleep(100); + logger.info("DC1A={}", simpleManagementDC1A.getMessageAddedOnQueue(snfQueue)); + logger.info("DC1B={}", simpleManagementDC1B.getMessageAddedOnQueue(snfQueue)); + logger.info("DC2A={}", simpleManagementDC2A.getMessageAddedOnQueue(snfQueue)); + logger.info("DC2B={}", simpleManagementDC2B.getMessageAddedOnQueue(snfQueue)); + } + } + @Test public void testSimpleQueue() throws Exception { + createRealServers(false); startServers(); final int numberOfMessages = 200; @@ -303,6 +435,7 @@ public class ClusteredMirrorSoakTest extends SoakTestBase { ExecutorService executorService = Executors.newFixedThreadPool(2); runAfter(executorService::shutdownNow); + createRealServers(false); startServers(); String queueName = "testqueue" + RandomUtil.randomString(); @@ -370,6 +503,7 @@ public class ClusteredMirrorSoakTest extends SoakTestBase { @Test public void testMirroredTopics() throws Exception { + createRealServers(false); startServers(); final int numberOfMessages = 200; @@ -389,8 +523,8 @@ public class ClusteredMirrorSoakTest extends SoakTestBase { SimpleManagement simpleManagementDC1B = new SimpleManagement(DC1_NODEB_URI, null, null); SimpleManagement simpleManagementDC2B = new SimpleManagement(DC2_NODEB_URI, null, null); - consume(connectionFactoryDC1A, clientIDA, subscriptionID, 0, 0, false); - consume(connectionFactoryDC1B, clientIDB, subscriptionID, 0, 0, false); + consume(connectionFactoryDC1A, clientIDA, subscriptionID, 0, 0, false); + consume(connectionFactoryDC1B, clientIDB, subscriptionID, 0, 0, false); try (Connection connection = connectionFactoryDC1B.createConnection()) { Session session = connection.createSession(true, Session.SESSION_TRANSACTED); @@ -414,7 +548,7 @@ public class ClusteredMirrorSoakTest extends SoakTestBase { } logger.debug("Consuming from DC1B"); - consume(connectionFactoryDC1B, clientIDB, subscriptionID, 0, numberOfMessages / 2, false); + consume(connectionFactoryDC1B, clientIDB, subscriptionID, 0, numberOfMessages / 2, false); processDC2_node_B.destroyForcibly(); processDC2_node_B.waitFor(); @@ -425,12 +559,12 @@ public class ClusteredMirrorSoakTest extends SoakTestBase { logger.debug("Consuming from DC2B with {}", simpleManagementDC2B.getMessageCountOnQueue("nodeB.my-order")); - consume(connectionFactoryDC2B, clientIDB, subscriptionID, numberOfMessages / 2, numberOfMessages / 2, true); + consume(connectionFactoryDC2B, clientIDB, subscriptionID, numberOfMessages / 2, numberOfMessages / 2, true); Wait.assertEquals(0, () -> simpleManagementDC2B.getMessageCountOnQueue("nodeB.my-order"), 10000); Wait.assertEquals(0, () -> simpleManagementDC1B.getMessageCountOnQueue("nodeB.my-order"), 10000); - consume(connectionFactoryDC1B, clientIDB, subscriptionID, numberOfMessages, 0, true); + consume(connectionFactoryDC1B, clientIDB, subscriptionID, numberOfMessages, 0, true); logger.debug("DC1B nodeB.my-order=0"); }