diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java index 13c95ddcc0..6272971592 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java @@ -192,7 +192,7 @@ public final class BindingsImpl implements Bindings { logger.tracef("Redistributing message %s", message); } - final SimpleString routingName = originatingQueue.getName(); + final SimpleString routingName = CompositeAddress.isFullyQualified(message.getAddress()) && originatingQueue.getRoutingType() == RoutingType.ANYCAST ? CompositeAddress.extractAddressName(message.getAddressSimpleString()) : originatingQueue.getName(); final Pair bindingsAndPosition = routingNameBindingMap.getBindings(routingName); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java index 6beb348023..bef63159ed 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java @@ -415,7 +415,8 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding return; } - Binding binding = getBinding(queueName); + SimpleString addressName = props.getSimpleStringProperty(ManagementHelper.HDR_ADDRESS); + Binding binding = getBinding(CompositeAddress.isFullyQualified(addressName) ? addressName : queueName); if (binding != null) { // We have a local queue diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java index 902071652e..60d3a9eaf7 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java @@ -71,6 +71,7 @@ import org.apache.activemq.artemis.core.server.group.impl.Response; import org.apache.activemq.artemis.core.server.management.ManagementService; import org.apache.activemq.artemis.core.server.management.Notification; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; +import org.apache.activemq.artemis.utils.CompositeAddress; import org.apache.activemq.artemis.utils.ExecutorFactory; import org.apache.activemq.artemis.utils.FutureLatch; import org.apache.activemq.artemis.utils.collections.TypedProperties; @@ -1426,7 +1427,9 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn // Need to propagate the consumer add TypedProperties props = new TypedProperties(); - props.putSimpleStringProperty(ManagementHelper.HDR_ADDRESS, binding.getAddress()); + SimpleString addressName = message.getSimpleStringProperty(ManagementHelper.HDR_ADDRESS); + + props.putSimpleStringProperty(ManagementHelper.HDR_ADDRESS, CompositeAddress.isFullyQualified(addressName) ? addressName : binding.getAddress()); props.putSimpleStringProperty(ManagementHelper.HDR_CLUSTER_NAME, clusterName); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java index 5057d362e1..a446d93aea 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java @@ -580,7 +580,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener { if (!browseOnly) { TypedProperties props = new TypedProperties(); - props.putSimpleStringProperty(ManagementHelper.HDR_ADDRESS, address); + props.putSimpleStringProperty(ManagementHelper.HDR_ADDRESS, CompositeAddress.isFullyQualified(unPrefixedQueueName) ? unPrefixedQueueName : address); props.putSimpleStringProperty(ManagementHelper.HDR_CLUSTER_NAME, binding.getClusterName()); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/MessageRedistributionTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/MessageRedistributionTest.java index 54d3ab8367..10bf026796 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/MessageRedistributionTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/MessageRedistributionTest.java @@ -16,6 +16,12 @@ */ package org.apache.activemq.artemis.tests.integration.cluster.distribution; +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; import javax.transaction.xa.XAResource; import javax.transaction.xa.Xid; import java.nio.ByteBuffer; @@ -30,7 +36,7 @@ import org.apache.activemq.artemis.api.core.client.ClientConsumer; import org.apache.activemq.artemis.api.core.client.ClientMessage; import org.apache.activemq.artemis.api.core.client.ClientProducer; import org.apache.activemq.artemis.api.core.client.ClientSession; - +import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.Bindable; import org.apache.activemq.artemis.core.server.Queue; @@ -40,7 +46,9 @@ import org.apache.activemq.artemis.core.server.group.impl.GroupingHandlerConfigu import org.apache.activemq.artemis.core.server.impl.QueueImpl; import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; +import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; import org.apache.activemq.artemis.tests.util.Wait; +import org.apache.activemq.artemis.utils.CompositeAddress; import org.jboss.logging.Logger; import org.junit.Assert; import org.junit.Before; @@ -68,8 +76,6 @@ public class MessageRedistributionTest extends ClusterTestBase { return false; } - - @Override protected void setSessionFactoryCreateLocator(int node, boolean ha, TransportConfiguration serverTotc) { super.setSessionFactoryCreateLocator(node, ha, serverTotc); @@ -78,7 +84,6 @@ public class MessageRedistributionTest extends ClusterTestBase { } - //https://issues.jboss.org/browse/HORNETQ-1061 @Test public void testRedistributionWithMessageGroups() throws Exception { @@ -728,6 +733,102 @@ public class MessageRedistributionTest extends ClusterTestBase { verifyNotReceive(1); } + @Test + public void testRedistributionWithFqqnAnycast() throws Exception { + internalTestRedistributionWithFqqn(RoutingType.ANYCAST); + } + + @Test + public void testRedistributionWithFqqnMulticast() throws Exception { + internalTestRedistributionWithFqqn(RoutingType.MULTICAST); + } + + private void internalTestRedistributionWithFqqn(RoutingType routingType) throws Exception { + final String ADDRESS = "myAddress"; + final String QUEUE = "myQueue"; + final String FQQN = CompositeAddress.toFullyQualified(ADDRESS, QUEUE); + AddressSettings as = new AddressSettings().setRedistributionDelay(0); + getServer(0).getAddressSettingsRepository().addMatch(ADDRESS, as); + getServer(1).getAddressSettingsRepository().addMatch(ADDRESS, as); + setupCluster(ADDRESS, MessageLoadBalancingType.ON_DEMAND); + + startServers(0, 1); + + setupSessionFactory(0, isNetty()); + createQueue(0, ADDRESS, QUEUE, null, false, routingType); + createQueue(0, ADDRESS, "extra", null, false, routingType); + waitForBindings(0, ADDRESS, 2, 0, true); + waitForBindings(1, ADDRESS, 2, 0, false); + + send(0, FQQN, 20, false, null, routingType, null); + + setupSessionFactory(1, isNetty()); + createQueue(1, ADDRESS, QUEUE, null, false, routingType); + waitForBindings(0, ADDRESS, 1, 0, false); + waitForBindings(1, ADDRESS, 1, 0, true); + + addConsumer(1, 1, FQQN, null); + waitForBindings(1, ADDRESS, 1, 1, true); + + verifyReceiveAll(20, 1); + verifyNotReceive(1); + } + + @Test + public void testRedistributionWithFqqnJmsQueue() throws Exception { + final String ADDRESS = "myAddress"; + final String QUEUE = "myQueue"; + final String FQQN = CompositeAddress.toFullyQualified(ADDRESS, QUEUE); + + AddressSettings as = new AddressSettings().setRedistributionDelay(0); + getServer(0).getAddressSettingsRepository().addMatch(ADDRESS, as); + getServer(1).getAddressSettingsRepository().addMatch(ADDRESS, as); + setupCluster(ADDRESS, MessageLoadBalancingType.ON_DEMAND); + getServer(0).getConfiguration().setName("0"); + getServer(1).getConfiguration().setName("1"); + startServers(0, 1); + + ConnectionFactory cf0 = new ActiveMQConnectionFactory("vm://0"); + ConnectionFactory cf1 = new ActiveMQConnectionFactory("vm://1"); + + try (Connection connection0 = cf0.createConnection(); + Connection connection1 = cf1.createConnection()) { + javax.jms.Queue sendTo = ActiveMQJMSClient.createQueue(FQQN); + javax.jms.Queue consumeFrom = ActiveMQJMSClient.createQueue(FQQN); + + setupSessionFactory(0, isNetty()); + createQueue(0, ADDRESS, QUEUE, null, false, RoutingType.ANYCAST); + waitForBindings(0, ADDRESS, 1, 0, true); + waitForBindings(1, ADDRESS, 1, 0, false); + + Session session0 = connection0.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = session0.createProducer(sendTo); + + final int numMessages = 10; + for (int i = 0; i < numMessages; i++) { + TextMessage message = session0.createTextMessage("This is text message " + i); + producer.send(message); + } + producer.close(); + assertEquals(numMessages, servers[0].locateQueue(QUEUE).getMessageCount()); + + setupSessionFactory(1, isNetty()); + createQueue(1, ADDRESS, QUEUE, null, false, RoutingType.ANYCAST); + waitForBindings(1, ADDRESS, 1, 0, true); + waitForBindings(0, ADDRESS, 1, 0, false); + + Session session1 = connection1.createSession(false, Session.AUTO_ACKNOWLEDGE); + connection1.start(); + MessageConsumer consumer1 = session1.createConsumer(consumeFrom); + waitForBindings(1, ADDRESS, 1, 1, true); + javax.jms.Message message1; + for (int i = 0; i < numMessages; i++) { + message1 = consumer1.receive(5000); + assertNotNull(message1); + } + } + } + @Test public void testRedistributionWhenRemoteConsumerIsAddedLbOffWithRedistribution() throws Exception { setupCluster(MessageLoadBalancingType.OFF_WITH_REDISTRIBUTION); @@ -1293,11 +1394,15 @@ public class MessageRedistributionTest extends ClusterTestBase { } protected void setupCluster(final MessageLoadBalancingType messageLoadBalancingType) throws Exception { - setupClusterConnection("cluster0", "queues", messageLoadBalancingType, 1, isNetty(), 0, 1, 2); + setupCluster("queues", messageLoadBalancingType); + } - setupClusterConnection("cluster1", "queues", messageLoadBalancingType, 1, isNetty(), 1, 0, 2); + protected void setupCluster(final String address, final MessageLoadBalancingType messageLoadBalancingType) throws Exception { + setupClusterConnection("cluster0", address, messageLoadBalancingType, 1, isNetty(), 0, 1, 2); - setupClusterConnection("cluster2", "queues", messageLoadBalancingType, 1, isNetty(), 2, 0, 1); + setupClusterConnection("cluster1", address, messageLoadBalancingType, 1, isNetty(), 1, 0, 2); + + setupClusterConnection("cluster2", address, messageLoadBalancingType, 1, isNetty(), 2, 0, 1); } protected void setRedistributionDelay(final long delay) {