From 3ff8419a4b0e32bff5b43997d09db9a9acc28586 Mon Sep 17 00:00:00 2001 From: Justin Bertram Date: Tue, 13 Jun 2023 11:28:00 -0500 Subject: [PATCH] ARTEMIS-4312 dupes w/redistribution and multicast Multiple multicast queues on the same address can lead to duplicate messages during redistribution in a cluster. --- .../core/postoffice/impl/BindingsImpl.java | 2 + .../cluster/distribution/ClusterTestBase.java | 13 ++- .../MessageRedistributionTest.java | 79 +++++++++++++++++++ 3 files changed, 93 insertions(+), 1 deletion(-) diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java index 252ae9b782..5f8ad7e3ce 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java @@ -269,7 +269,9 @@ public final class BindingsImpl implements Bindings { if (logger.isDebugEnabled()) { logger.debug("Message {} being copied as {}", message.getMessageID(), copyRedistribute.getMessageID()); } + copyRedistribute.setAddress(message.getAddress()); + copyRedistribute.clearInternalProperties(); if (context.getTransaction() == null) { context.setTransaction(new TransactionImpl(storageManager)); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusterTestBase.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusterTestBase.java index 0bfe403fc1..90d7a90fbd 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusterTestBase.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusterTestBase.java @@ -630,6 +630,17 @@ public abstract class ClusterTestBase extends ActiveMQTestBase { boolean autoCommitAcks, final String user, final String password) throws Exception { + addConsumer(consumerID, node, queueName, filterVal, autoCommitAcks, user, password, ActiveMQClient.DEFAULT_ACK_BATCH_SIZE); + } + + protected void addConsumer(final int consumerID, + final int node, + final String queueName, + final String filterVal, + boolean autoCommitAcks, + final String user, + final String password, + final int ackBatchSize) throws Exception { try { if (consumers[consumerID] != null) { throw new IllegalArgumentException("Already a consumer at " + node); @@ -641,7 +652,7 @@ public abstract class ClusterTestBase extends ActiveMQTestBase { throw new IllegalArgumentException("No sf at " + node); } - ClientSession session = addClientSession(sf.createSession(user, password, false, false, autoCommitAcks, ActiveMQClient.DEFAULT_PRE_ACKNOWLEDGE, ActiveMQClient.DEFAULT_ACK_BATCH_SIZE)); + ClientSession session = addClientSession(sf.createSession(user, password, false, false, autoCommitAcks, ActiveMQClient.DEFAULT_PRE_ACKNOWLEDGE, ackBatchSize)); String filterString = null; diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/MessageRedistributionTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/MessageRedistributionTest.java index 151c9d9d0c..3f8c825798 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/MessageRedistributionTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/MessageRedistributionTest.java @@ -173,6 +173,85 @@ public class MessageRedistributionTest extends ClusterTestBase { logger.debug("Test done"); } + @Test + public void testRedistributionWithMultipleQueuesOnTheSameAddress() throws Exception { + final int MESSAGE_COUNT = 10; + final String ADDRESS = "myAddress"; + final String QUEUE0 = "queue0"; + final String QUEUE1 = "queue1"; + + getServer(0).getConfiguration().addAddressSetting(ADDRESS, new AddressSettings().setRedistributionDelay(0)); + getServer(1).getConfiguration().addAddressSetting(ADDRESS, new AddressSettings().setRedistributionDelay(0)); + + setupClusterConnection("cluster0", ADDRESS, MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 0, 1); + setupClusterConnection("cluster1", ADDRESS, MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 1, 0); + + startServers(0, 1); + + waitForTopology(servers[0], 2); + waitForTopology(servers[1], 2); + + setupSessionFactory(0, isNetty()); + setupSessionFactory(1, isNetty()); + + createQueue(0, ADDRESS, QUEUE0, null, false); + createQueue(0, ADDRESS, QUEUE1, null, false); + + createQueue(1, ADDRESS, QUEUE0, null, false); + createQueue(1, ADDRESS, QUEUE1, null, false); + + addConsumer(0, 0, QUEUE0, null, true, null, null, 0); + addConsumer(1, 1, QUEUE0, null, true, null, null, 0); + + waitForBindings(0, ADDRESS, 2, 1, true); + waitForBindings(0, ADDRESS, 2, 1, false); + + waitForBindings(1, ADDRESS, 2, 1, true); + waitForBindings(1, ADDRESS, 2, 1, false); + + send(0, ADDRESS, MESSAGE_COUNT, true, null, RoutingType.MULTICAST, null); + + { // make sure all the messages were delivered to the proper queues & nodes + Wait.assertEquals(5L, () -> servers[0].locateQueue(QUEUE0).getMessagesAdded(), 2000, 100); + Wait.assertEquals(5L, () -> servers[1].locateQueue(QUEUE0).getMessagesAdded(), 2000, 100); + + Wait.assertEquals(10L, () -> servers[0].locateQueue(QUEUE1).getMessageCount(), 2000, 100); + Wait.assertEquals(0L, () -> servers[1].locateQueue(QUEUE1).getMessageCount(), 2000, 100); + } + + for (int i = 0; i < MESSAGE_COUNT / 2; i++) { + { + ClientMessage m = consumers[0].getConsumer().receive(1000); + assertNotNull(m); + m.acknowledge(); + } + { + ClientMessage m = consumers[1].getConsumer().receive(1000); + assertNotNull(m); + m.acknowledge(); + } + } + + { // make sure all the messages were consumed propertly + Wait.assertEquals(5L, () -> servers[0].locateQueue(QUEUE0).getMessagesAcknowledged(), 2000, 100); + Wait.assertEquals(0L, () -> servers[0].locateQueue(QUEUE0).getMessageCount(), 2000, 100); + + Wait.assertEquals(5L, () -> servers[1].locateQueue(QUEUE0).getMessagesAcknowledged(), 2000, 100); + Wait.assertEquals(0L, () -> servers[1].locateQueue(QUEUE0).getMessageCount(), 2000, 100); + } + + // add consumer to force redistribution of messages to node 1 + addConsumer(2, 1, QUEUE1, null); + waitForBindings(1, ADDRESS, 2, 2, true); + waitForBindings(0, ADDRESS, 2, 2, false); + + Wait.assertEquals(10L, () -> servers[1].locateQueue(QUEUE1).getMessageCount(), 2000, 100); + Wait.assertEquals(0L, () -> servers[0].locateQueue(QUEUE1).getMessageCount(), 2000, 100); + + // ensure no messages were inadvertently redistributed to the wrong queue (i.e. the main point of this test) + Wait.assertEquals(0L, () -> servers[1].locateQueue(QUEUE0).getMessageCount(), 2000, 100); + } + //https://issues.jboss.org/browse/HORNETQ-1057 @Test public void testRedistributionStopsWhenConsumerAdded() throws Exception {