From bf83a9b3d1c9573bcbed01f6846ebbed8bfb96e6 Mon Sep 17 00:00:00 2001 From: AntonRoskvist Date: Thu, 16 Dec 2021 11:00:27 +0100 Subject: [PATCH] ARTEMIS-3608 - Add distribution for Multicast messages to OFF_WITH_REDISTRIBUTION to avoid message loss --- .../core/postoffice/impl/BindingsImpl.java | 4 ++- .../cluster/distribution/ClusterTestBase.java | 19 +++++++++-- .../MessageRedistributionTest.java | 33 +++++++++++++++++++ 3 files changed, 53 insertions(+), 3 deletions(-) diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java index e193296806..75fb7b810b 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java @@ -23,6 +23,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArraySet; @@ -30,6 +31,7 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.Pair; +import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.filter.Filter; import org.apache.activemq.artemis.core.postoffice.Binding; @@ -408,7 +410,7 @@ public final class BindingsImpl implements Bindings { private static boolean matchBinding(final Message message, final Binding binding, final MessageLoadBalancingType loadBalancingType) { - if ((loadBalancingType.equals(MessageLoadBalancingType.OFF) || loadBalancingType.equals(MessageLoadBalancingType.OFF_WITH_REDISTRIBUTION)) && binding instanceof RemoteQueueBinding) { + if ((loadBalancingType.equals(MessageLoadBalancingType.OFF) || loadBalancingType.equals(MessageLoadBalancingType.OFF_WITH_REDISTRIBUTION) && !Objects.equals(message.getRoutingType(), RoutingType.MULTICAST)) && binding instanceof RemoteQueueBinding) { return false; } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusterTestBase.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusterTestBase.java index 939d916099..6c5d176e1c 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusterTestBase.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusterTestBase.java @@ -723,7 +723,7 @@ public abstract class ClusterTestBase extends ActiveMQTestBase { final int msgEnd, final boolean durable, final String filterVal) throws Exception { - sendInRange(node, address, msgStart, msgEnd, durable, filterVal, null); + sendInRange(node, address, msgStart, msgEnd, durable, filterVal, null, null); } protected void sendInRange(final int node, @@ -732,6 +732,7 @@ public abstract class ClusterTestBase extends ActiveMQTestBase { final int msgEnd, final boolean durable, final String filterVal, + final RoutingType routingType, final AtomicInteger duplicateDetectionSeq) throws Exception { ClientSessionFactory sf = sfs[node]; @@ -756,6 +757,10 @@ public abstract class ClusterTestBase extends ActiveMQTestBase { message.putStringProperty(Message.HDR_DUPLICATE_DETECTION_ID, new SimpleString(str)); } + if (routingType != null) { + message.setRoutingType(routingType); + } + message.putIntProperty(ClusterTestBase.COUNT_PROP, i); if (isLargeMessage()) { @@ -853,7 +858,17 @@ public abstract class ClusterTestBase extends ActiveMQTestBase { final boolean durable, final String filterVal, final AtomicInteger duplicateDetectionCounter) throws Exception { - sendInRange(node, address, 0, numMessages, durable, filterVal, duplicateDetectionCounter); + send(node, address, numMessages, durable, filterVal, null, duplicateDetectionCounter); + } + + protected void send(final int node, + final String address, + final int numMessages, + final boolean durable, + final String filterVal, + final RoutingType routingType, + final AtomicInteger duplicateDetectionCounter) throws Exception { + sendInRange(node, address, 0, numMessages, durable, filterVal, routingType, duplicateDetectionCounter); } protected void verifyReceiveAllInRange(final boolean ack, diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/MessageRedistributionTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/MessageRedistributionTest.java index 1ee78bbbf7..360f1a0830 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/MessageRedistributionTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/MessageRedistributionTest.java @@ -831,6 +831,39 @@ public class MessageRedistributionTest extends ClusterTestBase { } + @Test + public void testRedistributionToRemoteMulticastConsumerLbOffWithRedistribution() throws Exception { + + String address = "test.address"; + String queue = "queue"; + String clusterAddress = "test"; + AddressSettings settings = new AddressSettings().setRedistributionDelay(0).setAutoCreateAddresses(true).setAutoCreateQueues(true); + RoutingType routingType = RoutingType.MULTICAST; + + getServer(0).getAddressSettingsRepository().addMatch(address, settings); + getServer(1).getAddressSettingsRepository().addMatch(address, settings); + + setupClusterConnection("cluster0", clusterAddress, MessageLoadBalancingType.OFF_WITH_REDISTRIBUTION, 1, isNetty(), 0, 1); + setupClusterConnection("cluster0", clusterAddress, MessageLoadBalancingType.OFF_WITH_REDISTRIBUTION, 1, isNetty(), 1, 0); + + startServers(0, 1); + + setupSessionFactory(0, isNetty()); + setupSessionFactory(1, isNetty()); + + createQueue(0, address, queue, null, false, routingType); + addConsumer(0, 0, queue, null); + waitForBindings(0, address, 1, 1, true); + waitForBindings(1, address, 1, 1, false); + + createAddressInfo(1, address, routingType, 0, false); + + final int noMessages = 10; + send(1, address, noMessages, false, null, routingType, null); + verifyReceiveAll(noMessages, 0); + + } + @Test public void testBackAndForth() throws Exception { for (int i = 0; i < 10; i++) {