From f09bde07dfa56e2c65ccf04adf8e158003fd673b Mon Sep 17 00:00:00 2001 From: Martyn Taylor Date: Wed, 17 Jan 2018 10:24:20 +0000 Subject: [PATCH] ARTEMIS-1612 Fix message redistribution for prefixed addresses --- .../core/postoffice/impl/PostOfficeImpl.java | 2 +- .../artemis/tests/util/ActiveMQTestBase.java | 2 +- .../MessageRedistributionTest.java | 48 +++++++++++++++++++ 3 files changed, 50 insertions(+), 2 deletions(-) diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java index e2153e1dec..d5af56b2cd 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java @@ -892,7 +892,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding // as described on https://issues.jboss.org/browse/JBPAPP-6130 Message copyRedistribute = message.copy(storageManager.generateID()); - Bindings bindings = addressManager.getBindingsForRoutingAddress(message.getAddressSimpleString()); + Bindings bindings = addressManager.getBindingsForRoutingAddress(originatingQueue.getAddress()); if (bindings != null) { RoutingContext context = new RoutingContextImpl(tx); diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java index 7cd225fa9b..ac2e406d7d 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java @@ -1243,7 +1243,7 @@ public abstract class ActiveMQTestBase extends Assert { } if (params == null) params = new HashMap<>(); - return new TransportConfiguration(className, params); + return new TransportConfiguration(className, params, UUIDGenerator.getInstance().generateStringUUID(), new HashMap()); } protected void waitForServerToStart(ActiveMQServer server) throws InterruptedException { diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/MessageRedistributionTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/MessageRedistributionTest.java index 2020489a15..df824c6e92 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/MessageRedistributionTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/MessageRedistributionTest.java @@ -23,6 +23,7 @@ import java.util.ArrayList; import java.util.concurrent.atomic.AtomicInteger; import org.apache.activemq.artemis.api.core.Message; +import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.TransportConfiguration; import org.apache.activemq.artemis.api.core.client.ClientConsumer; @@ -30,6 +31,7 @@ import org.apache.activemq.artemis.api.core.client.ClientMessage; import org.apache.activemq.artemis.api.core.client.ClientProducer; import org.apache.activemq.artemis.api.core.client.ClientSession; +import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.Bindable; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType; @@ -651,6 +653,52 @@ public class MessageRedistributionTest extends ClusterTestBase { verifyReceiveRoundRobinInSomeOrderWithCounts(false, ids0, 2); } + @Test + public void testRedistributionWithPrefixesWhenRemoteConsumerIsAdded() throws Exception { + + for (int i = 0; i <= 2; i++) { + ActiveMQServer server = getServer(i); + for (TransportConfiguration c : server.getConfiguration().getAcceptorConfigurations()) { + c.getExtraParams().putIfAbsent("anycastPrefix", "jms.queue."); + } + } + + setupCluster(MessageLoadBalancingType.ON_DEMAND); + + startServers(0, 1, 2); + + setupSessionFactory(0, isNetty()); + setupSessionFactory(1, isNetty()); + setupSessionFactory(2, isNetty()); + + String name = "queues.queue"; + + createQueue(0, name, name, null, false, RoutingType.ANYCAST); + createQueue(1, name, name, null, false, RoutingType.ANYCAST); + createQueue(2, name, name, null, false, RoutingType.ANYCAST); + + addConsumer(0, 0, name, null); + + waitForBindings(0, name, 1, 1, true); + waitForBindings(1, name, 1, 0, true); + waitForBindings(2, name, 1, 0, true); + + waitForBindings(0, name, 2, 0, false); + waitForBindings(1, name, 2, 1, false); + waitForBindings(2, name, 2, 1, false); + + removeConsumer(0); + + Thread.sleep(2000); + + send(0, "jms.queue." + name, 20, false, null); + + addConsumer(1, 1, name, null); + + verifyReceiveAll(20, 1); + verifyNotReceive(1); + } + @Test public void testRedistributionWhenRemoteConsumerIsAdded() throws Exception { setupCluster(MessageLoadBalancingType.ON_DEMAND);