From a723f634a28938b85326079efb4cdc02550be428 Mon Sep 17 00:00:00 2001 From: AntonRoskvist Date: Mon, 8 Apr 2024 11:45:12 +0200 Subject: [PATCH] ARTEMIS-4718 Diverted messages are not propertly routed on cluster remote bindings --- .../artemis/core/server/impl/DivertImpl.java | 4 ++ .../SimpleSymmetricClusterTest.java | 49 +++++++++++++++++++ 2 files changed, 53 insertions(+) diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/DivertImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/DivertImpl.java index c499a28d9d..be411303c8 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/DivertImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/DivertImpl.java @@ -116,6 +116,10 @@ public class DivertImpl implements Divert { copy.setExpiration(message.getExpiration()); + //This header could be set if the message is redistributed from a clustered broker. + //It needs to be removed as it will interfere with upcoming routing + //copy.removeExtraBytesProperty(Message.HDR_ROUTE_TO_IDS); + switch (routingType) { case ANYCAST: copy.setRoutingType(RoutingType.ANYCAST); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/SimpleSymmetricClusterTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/SimpleSymmetricClusterTest.java index 15c7af3ebf..c650014c62 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/SimpleSymmetricClusterTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/SimpleSymmetricClusterTest.java @@ -26,12 +26,15 @@ import org.apache.activemq.artemis.api.core.client.ClientMessage; import org.apache.activemq.artemis.api.core.client.ClientProducer; import org.apache.activemq.artemis.api.core.client.ClientSession; import org.apache.activemq.artemis.api.core.client.ClientSessionFactory; +import org.apache.activemq.artemis.core.config.DivertConfiguration; import org.apache.activemq.artemis.core.postoffice.Binding; +import org.apache.activemq.artemis.core.server.ComponentConfigurationRoutingType; import org.apache.activemq.artemis.core.server.cluster.ClusterConnection; import org.apache.activemq.artemis.core.server.cluster.MessageFlowRecord; import org.apache.activemq.artemis.core.server.cluster.RemoteQueueBinding; import org.apache.activemq.artemis.core.server.cluster.impl.ClusterConnectionImpl; import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType; +import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.tests.util.Wait; import org.junit.Assert; import org.junit.Ignore; @@ -546,4 +549,50 @@ public class SimpleSymmetricClusterTest extends ClusterTestBase { } + @Test + public void testDivertRedistributedMessage() throws Exception { + final String queue = "queue0"; + final String divertedQueueName = "divertedQueue"; + final int messageCount = 10; + + setupServer(0, true, isNetty()); + setupServer(1, true, isNetty()); + setupClusterConnection("cluster0", "", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 0, 1); + setupClusterConnection("cluster0", "", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 1, 0); + + servers[0].getConfiguration().addAddressSetting("#", new AddressSettings().setRedistributionDelay(0)); + servers[1].getConfiguration().addAddressSetting("#", new AddressSettings().setRedistributionDelay(0)); + + startServers(0, 1); + + servers[0].deployDivert(new DivertConfiguration() + .setName("myDivert") + .setAddress(queue) + .setRoutingType(ComponentConfigurationRoutingType.ANYCAST) + .setForwardingAddress(divertedQueueName) + .setExclusive(true)); + + setupSessionFactory(0, isNetty()); + setupSessionFactory(1, isNetty()); + + createQueue(0, queue, queue, null, true, RoutingType.ANYCAST); + createQueue(1, queue, queue, null, true, RoutingType.ANYCAST); + createQueue(0, divertedQueueName, divertedQueueName, null, true, RoutingType.ANYCAST); + createQueue(1, divertedQueueName, divertedQueueName, null, true, RoutingType.ANYCAST); + + addConsumer(0, 0, queue, null); + + waitForBindings(0, queue, 1, 1, true); + waitForBindings(1, queue, 1, 1, false); + + send(1, queue, messageCount, true, null); + + Wait.assertEquals((long) messageCount, () -> servers[0].locateQueue(divertedQueueName).getMessageCount(), 2000, 100); + + addConsumer(1, 1, divertedQueueName, null); + + verifyReceiveAll(messageCount, 1); + closeAllConsumers(); + } + }