From 86db53da9acb96ed81b38fe3962aa0532047898a Mon Sep 17 00:00:00 2001 From: Justin Bertram Date: Fri, 29 Jul 2022 16:12:53 -0500 Subject: [PATCH] ARTEMIS-3918 support FQQN + anycast + redistribution When a message is sent to an anycast queue via FQQN on one node of a cluster and then a consumer is created on that same anycast queue via FQQN on another node in the cluster the message is not redistributed to the node with the consumer. This commit fixes this use-case primarily by including the FQQN info in the notification messages sent to other nodes in the cluster. --- .../core/postoffice/impl/BindingsImpl.java | 2 +- .../core/postoffice/impl/PostOfficeImpl.java | 3 +- .../cluster/impl/ClusterConnectionImpl.java | 5 +- .../core/server/impl/ServerSessionImpl.java | 2 +- .../MessageRedistributionTest.java | 119 ++++++++++++++++-- 5 files changed, 120 insertions(+), 11 deletions(-) diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java index 13c95ddcc0..6272971592 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java @@ -192,7 +192,7 @@ public final class BindingsImpl implements Bindings { logger.tracef("Redistributing message %s", message); } - final SimpleString routingName = originatingQueue.getName(); + final SimpleString routingName = CompositeAddress.isFullyQualified(message.getAddress()) && originatingQueue.getRoutingType() == RoutingType.ANYCAST ? CompositeAddress.extractAddressName(message.getAddressSimpleString()) : originatingQueue.getName(); final Pair bindingsAndPosition = routingNameBindingMap.getBindings(routingName); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java index 6beb348023..bef63159ed 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java @@ -415,7 +415,8 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding return; } - Binding binding = getBinding(queueName); + SimpleString addressName = props.getSimpleStringProperty(ManagementHelper.HDR_ADDRESS); + Binding binding = getBinding(CompositeAddress.isFullyQualified(addressName) ? addressName : queueName); if (binding != null) { // We have a local queue diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java index 902071652e..60d3a9eaf7 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java @@ -71,6 +71,7 @@ import org.apache.activemq.artemis.core.server.group.impl.Response; import org.apache.activemq.artemis.core.server.management.ManagementService; import org.apache.activemq.artemis.core.server.management.Notification; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; +import org.apache.activemq.artemis.utils.CompositeAddress; import org.apache.activemq.artemis.utils.ExecutorFactory; import org.apache.activemq.artemis.utils.FutureLatch; import org.apache.activemq.artemis.utils.collections.TypedProperties; @@ -1426,7 +1427,9 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn // Need to propagate the consumer add TypedProperties props = new TypedProperties(); - props.putSimpleStringProperty(ManagementHelper.HDR_ADDRESS, binding.getAddress()); + SimpleString addressName = message.getSimpleStringProperty(ManagementHelper.HDR_ADDRESS); + + props.putSimpleStringProperty(ManagementHelper.HDR_ADDRESS, CompositeAddress.isFullyQualified(addressName) ? addressName : binding.getAddress()); props.putSimpleStringProperty(ManagementHelper.HDR_CLUSTER_NAME, clusterName); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java index 5057d362e1..a446d93aea 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java @@ -580,7 +580,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener { if (!browseOnly) { TypedProperties props = new TypedProperties(); - props.putSimpleStringProperty(ManagementHelper.HDR_ADDRESS, address); + props.putSimpleStringProperty(ManagementHelper.HDR_ADDRESS, CompositeAddress.isFullyQualified(unPrefixedQueueName) ? unPrefixedQueueName : address); props.putSimpleStringProperty(ManagementHelper.HDR_CLUSTER_NAME, binding.getClusterName()); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/MessageRedistributionTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/MessageRedistributionTest.java index 54d3ab8367..10bf026796 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/MessageRedistributionTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/MessageRedistributionTest.java @@ -16,6 +16,12 @@ */ package org.apache.activemq.artemis.tests.integration.cluster.distribution; +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; import javax.transaction.xa.XAResource; import javax.transaction.xa.Xid; import java.nio.ByteBuffer; @@ -30,7 +36,7 @@ import org.apache.activemq.artemis.api.core.client.ClientConsumer; import org.apache.activemq.artemis.api.core.client.ClientMessage; import org.apache.activemq.artemis.api.core.client.ClientProducer; import org.apache.activemq.artemis.api.core.client.ClientSession; - +import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.Bindable; import org.apache.activemq.artemis.core.server.Queue; @@ -40,7 +46,9 @@ import org.apache.activemq.artemis.core.server.group.impl.GroupingHandlerConfigu import org.apache.activemq.artemis.core.server.impl.QueueImpl; import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; +import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; import org.apache.activemq.artemis.tests.util.Wait; +import org.apache.activemq.artemis.utils.CompositeAddress; import org.jboss.logging.Logger; import org.junit.Assert; import org.junit.Before; @@ -68,8 +76,6 @@ public class MessageRedistributionTest extends ClusterTestBase { return false; } - - @Override protected void setSessionFactoryCreateLocator(int node, boolean ha, TransportConfiguration serverTotc) { super.setSessionFactoryCreateLocator(node, ha, serverTotc); @@ -78,7 +84,6 @@ public class MessageRedistributionTest extends ClusterTestBase { } - //https://issues.jboss.org/browse/HORNETQ-1061 @Test public void testRedistributionWithMessageGroups() throws Exception { @@ -728,6 +733,102 @@ public class MessageRedistributionTest extends ClusterTestBase { verifyNotReceive(1); } + @Test + public void testRedistributionWithFqqnAnycast() throws Exception { + internalTestRedistributionWithFqqn(RoutingType.ANYCAST); + } + + @Test + public void testRedistributionWithFqqnMulticast() throws Exception { + internalTestRedistributionWithFqqn(RoutingType.MULTICAST); + } + + private void internalTestRedistributionWithFqqn(RoutingType routingType) throws Exception { + final String ADDRESS = "myAddress"; + final String QUEUE = "myQueue"; + final String FQQN = CompositeAddress.toFullyQualified(ADDRESS, QUEUE); + AddressSettings as = new AddressSettings().setRedistributionDelay(0); + getServer(0).getAddressSettingsRepository().addMatch(ADDRESS, as); + getServer(1).getAddressSettingsRepository().addMatch(ADDRESS, as); + setupCluster(ADDRESS, MessageLoadBalancingType.ON_DEMAND); + + startServers(0, 1); + + setupSessionFactory(0, isNetty()); + createQueue(0, ADDRESS, QUEUE, null, false, routingType); + createQueue(0, ADDRESS, "extra", null, false, routingType); + waitForBindings(0, ADDRESS, 2, 0, true); + waitForBindings(1, ADDRESS, 2, 0, false); + + send(0, FQQN, 20, false, null, routingType, null); + + setupSessionFactory(1, isNetty()); + createQueue(1, ADDRESS, QUEUE, null, false, routingType); + waitForBindings(0, ADDRESS, 1, 0, false); + waitForBindings(1, ADDRESS, 1, 0, true); + + addConsumer(1, 1, FQQN, null); + waitForBindings(1, ADDRESS, 1, 1, true); + + verifyReceiveAll(20, 1); + verifyNotReceive(1); + } + + @Test + public void testRedistributionWithFqqnJmsQueue() throws Exception { + final String ADDRESS = "myAddress"; + final String QUEUE = "myQueue"; + final String FQQN = CompositeAddress.toFullyQualified(ADDRESS, QUEUE); + + AddressSettings as = new AddressSettings().setRedistributionDelay(0); + getServer(0).getAddressSettingsRepository().addMatch(ADDRESS, as); + getServer(1).getAddressSettingsRepository().addMatch(ADDRESS, as); + setupCluster(ADDRESS, MessageLoadBalancingType.ON_DEMAND); + getServer(0).getConfiguration().setName("0"); + getServer(1).getConfiguration().setName("1"); + startServers(0, 1); + + ConnectionFactory cf0 = new ActiveMQConnectionFactory("vm://0"); + ConnectionFactory cf1 = new ActiveMQConnectionFactory("vm://1"); + + try (Connection connection0 = cf0.createConnection(); + Connection connection1 = cf1.createConnection()) { + javax.jms.Queue sendTo = ActiveMQJMSClient.createQueue(FQQN); + javax.jms.Queue consumeFrom = ActiveMQJMSClient.createQueue(FQQN); + + setupSessionFactory(0, isNetty()); + createQueue(0, ADDRESS, QUEUE, null, false, RoutingType.ANYCAST); + waitForBindings(0, ADDRESS, 1, 0, true); + waitForBindings(1, ADDRESS, 1, 0, false); + + Session session0 = connection0.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = session0.createProducer(sendTo); + + final int numMessages = 10; + for (int i = 0; i < numMessages; i++) { + TextMessage message = session0.createTextMessage("This is text message " + i); + producer.send(message); + } + producer.close(); + assertEquals(numMessages, servers[0].locateQueue(QUEUE).getMessageCount()); + + setupSessionFactory(1, isNetty()); + createQueue(1, ADDRESS, QUEUE, null, false, RoutingType.ANYCAST); + waitForBindings(1, ADDRESS, 1, 0, true); + waitForBindings(0, ADDRESS, 1, 0, false); + + Session session1 = connection1.createSession(false, Session.AUTO_ACKNOWLEDGE); + connection1.start(); + MessageConsumer consumer1 = session1.createConsumer(consumeFrom); + waitForBindings(1, ADDRESS, 1, 1, true); + javax.jms.Message message1; + for (int i = 0; i < numMessages; i++) { + message1 = consumer1.receive(5000); + assertNotNull(message1); + } + } + } + @Test public void testRedistributionWhenRemoteConsumerIsAddedLbOffWithRedistribution() throws Exception { setupCluster(MessageLoadBalancingType.OFF_WITH_REDISTRIBUTION); @@ -1293,11 +1394,15 @@ public class MessageRedistributionTest extends ClusterTestBase { } protected void setupCluster(final MessageLoadBalancingType messageLoadBalancingType) throws Exception { - setupClusterConnection("cluster0", "queues", messageLoadBalancingType, 1, isNetty(), 0, 1, 2); + setupCluster("queues", messageLoadBalancingType); + } - setupClusterConnection("cluster1", "queues", messageLoadBalancingType, 1, isNetty(), 1, 0, 2); + protected void setupCluster(final String address, final MessageLoadBalancingType messageLoadBalancingType) throws Exception { + setupClusterConnection("cluster0", address, messageLoadBalancingType, 1, isNetty(), 0, 1, 2); - setupClusterConnection("cluster2", "queues", messageLoadBalancingType, 1, isNetty(), 2, 0, 1); + setupClusterConnection("cluster1", address, messageLoadBalancingType, 1, isNetty(), 1, 0, 2); + + setupClusterConnection("cluster2", address, messageLoadBalancingType, 1, isNetty(), 2, 0, 1); } protected void setRedistributionDelay(final long delay) {