ARTEMIS-3918 support FQQN + anycast + redistribution

When a message is sent to an anycast queue via FQQN on one node of a
cluster and then a consumer is created on that same anycast queue via
FQQN on another node in the cluster the message is not redistributed to
the node with the consumer.

This commit fixes this use-case primarily by including the FQQN info in
the notification messages sent to other nodes in the cluster.
This commit is contained in:
Justin Bertram 2022-07-29 16:12:53 -05:00 committed by clebertsuconic
parent 9acd036dcc
commit 86db53da9a
5 changed files with 120 additions and 11 deletions

View File

@ -192,7 +192,7 @@ public final class BindingsImpl implements Bindings {
logger.tracef("Redistributing message %s", message); logger.tracef("Redistributing message %s", message);
} }
final SimpleString routingName = originatingQueue.getName(); final SimpleString routingName = CompositeAddress.isFullyQualified(message.getAddress()) && originatingQueue.getRoutingType() == RoutingType.ANYCAST ? CompositeAddress.extractAddressName(message.getAddressSimpleString()) : originatingQueue.getName();
final Pair<Binding[], CopyOnWriteBindings.BindingIndex> bindingsAndPosition = routingNameBindingMap.getBindings(routingName); final Pair<Binding[], CopyOnWriteBindings.BindingIndex> bindingsAndPosition = routingNameBindingMap.getBindings(routingName);

View File

@ -415,7 +415,8 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
return; return;
} }
Binding binding = getBinding(queueName); SimpleString addressName = props.getSimpleStringProperty(ManagementHelper.HDR_ADDRESS);
Binding binding = getBinding(CompositeAddress.isFullyQualified(addressName) ? addressName : queueName);
if (binding != null) { if (binding != null) {
// We have a local queue // We have a local queue

View File

@ -71,6 +71,7 @@ import org.apache.activemq.artemis.core.server.group.impl.Response;
import org.apache.activemq.artemis.core.server.management.ManagementService; import org.apache.activemq.artemis.core.server.management.ManagementService;
import org.apache.activemq.artemis.core.server.management.Notification; import org.apache.activemq.artemis.core.server.management.Notification;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.utils.CompositeAddress;
import org.apache.activemq.artemis.utils.ExecutorFactory; import org.apache.activemq.artemis.utils.ExecutorFactory;
import org.apache.activemq.artemis.utils.FutureLatch; import org.apache.activemq.artemis.utils.FutureLatch;
import org.apache.activemq.artemis.utils.collections.TypedProperties; import org.apache.activemq.artemis.utils.collections.TypedProperties;
@ -1426,7 +1427,9 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn
// Need to propagate the consumer add // Need to propagate the consumer add
TypedProperties props = new TypedProperties(); TypedProperties props = new TypedProperties();
props.putSimpleStringProperty(ManagementHelper.HDR_ADDRESS, binding.getAddress()); SimpleString addressName = message.getSimpleStringProperty(ManagementHelper.HDR_ADDRESS);
props.putSimpleStringProperty(ManagementHelper.HDR_ADDRESS, CompositeAddress.isFullyQualified(addressName) ? addressName : binding.getAddress());
props.putSimpleStringProperty(ManagementHelper.HDR_CLUSTER_NAME, clusterName); props.putSimpleStringProperty(ManagementHelper.HDR_CLUSTER_NAME, clusterName);

View File

@ -580,7 +580,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
if (!browseOnly) { if (!browseOnly) {
TypedProperties props = new TypedProperties(); TypedProperties props = new TypedProperties();
props.putSimpleStringProperty(ManagementHelper.HDR_ADDRESS, address); props.putSimpleStringProperty(ManagementHelper.HDR_ADDRESS, CompositeAddress.isFullyQualified(unPrefixedQueueName) ? unPrefixedQueueName : address);
props.putSimpleStringProperty(ManagementHelper.HDR_CLUSTER_NAME, binding.getClusterName()); props.putSimpleStringProperty(ManagementHelper.HDR_CLUSTER_NAME, binding.getClusterName());

View File

@ -16,6 +16,12 @@
*/ */
package org.apache.activemq.artemis.tests.integration.cluster.distribution; package org.apache.activemq.artemis.tests.integration.cluster.distribution;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.transaction.xa.XAResource; import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid; import javax.transaction.xa.Xid;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
@ -30,7 +36,7 @@ import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage; import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientProducer; import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession; import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient;
import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.Bindable; import org.apache.activemq.artemis.core.server.Bindable;
import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.Queue;
@ -40,7 +46,9 @@ import org.apache.activemq.artemis.core.server.group.impl.GroupingHandlerConfigu
import org.apache.activemq.artemis.core.server.impl.QueueImpl; import org.apache.activemq.artemis.core.server.impl.QueueImpl;
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy; import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.tests.util.Wait; import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.activemq.artemis.utils.CompositeAddress;
import org.jboss.logging.Logger; import org.jboss.logging.Logger;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
@ -68,8 +76,6 @@ public class MessageRedistributionTest extends ClusterTestBase {
return false; return false;
} }
@Override @Override
protected void setSessionFactoryCreateLocator(int node, boolean ha, TransportConfiguration serverTotc) { protected void setSessionFactoryCreateLocator(int node, boolean ha, TransportConfiguration serverTotc) {
super.setSessionFactoryCreateLocator(node, ha, serverTotc); super.setSessionFactoryCreateLocator(node, ha, serverTotc);
@ -78,7 +84,6 @@ public class MessageRedistributionTest extends ClusterTestBase {
} }
//https://issues.jboss.org/browse/HORNETQ-1061 //https://issues.jboss.org/browse/HORNETQ-1061
@Test @Test
public void testRedistributionWithMessageGroups() throws Exception { public void testRedistributionWithMessageGroups() throws Exception {
@ -728,6 +733,102 @@ public class MessageRedistributionTest extends ClusterTestBase {
verifyNotReceive(1); verifyNotReceive(1);
} }
@Test
public void testRedistributionWithFqqnAnycast() throws Exception {
internalTestRedistributionWithFqqn(RoutingType.ANYCAST);
}
@Test
public void testRedistributionWithFqqnMulticast() throws Exception {
internalTestRedistributionWithFqqn(RoutingType.MULTICAST);
}
private void internalTestRedistributionWithFqqn(RoutingType routingType) throws Exception {
final String ADDRESS = "myAddress";
final String QUEUE = "myQueue";
final String FQQN = CompositeAddress.toFullyQualified(ADDRESS, QUEUE);
AddressSettings as = new AddressSettings().setRedistributionDelay(0);
getServer(0).getAddressSettingsRepository().addMatch(ADDRESS, as);
getServer(1).getAddressSettingsRepository().addMatch(ADDRESS, as);
setupCluster(ADDRESS, MessageLoadBalancingType.ON_DEMAND);
startServers(0, 1);
setupSessionFactory(0, isNetty());
createQueue(0, ADDRESS, QUEUE, null, false, routingType);
createQueue(0, ADDRESS, "extra", null, false, routingType);
waitForBindings(0, ADDRESS, 2, 0, true);
waitForBindings(1, ADDRESS, 2, 0, false);
send(0, FQQN, 20, false, null, routingType, null);
setupSessionFactory(1, isNetty());
createQueue(1, ADDRESS, QUEUE, null, false, routingType);
waitForBindings(0, ADDRESS, 1, 0, false);
waitForBindings(1, ADDRESS, 1, 0, true);
addConsumer(1, 1, FQQN, null);
waitForBindings(1, ADDRESS, 1, 1, true);
verifyReceiveAll(20, 1);
verifyNotReceive(1);
}
@Test
public void testRedistributionWithFqqnJmsQueue() throws Exception {
final String ADDRESS = "myAddress";
final String QUEUE = "myQueue";
final String FQQN = CompositeAddress.toFullyQualified(ADDRESS, QUEUE);
AddressSettings as = new AddressSettings().setRedistributionDelay(0);
getServer(0).getAddressSettingsRepository().addMatch(ADDRESS, as);
getServer(1).getAddressSettingsRepository().addMatch(ADDRESS, as);
setupCluster(ADDRESS, MessageLoadBalancingType.ON_DEMAND);
getServer(0).getConfiguration().setName("0");
getServer(1).getConfiguration().setName("1");
startServers(0, 1);
ConnectionFactory cf0 = new ActiveMQConnectionFactory("vm://0");
ConnectionFactory cf1 = new ActiveMQConnectionFactory("vm://1");
try (Connection connection0 = cf0.createConnection();
Connection connection1 = cf1.createConnection()) {
javax.jms.Queue sendTo = ActiveMQJMSClient.createQueue(FQQN);
javax.jms.Queue consumeFrom = ActiveMQJMSClient.createQueue(FQQN);
setupSessionFactory(0, isNetty());
createQueue(0, ADDRESS, QUEUE, null, false, RoutingType.ANYCAST);
waitForBindings(0, ADDRESS, 1, 0, true);
waitForBindings(1, ADDRESS, 1, 0, false);
Session session0 = connection0.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session0.createProducer(sendTo);
final int numMessages = 10;
for (int i = 0; i < numMessages; i++) {
TextMessage message = session0.createTextMessage("This is text message " + i);
producer.send(message);
}
producer.close();
assertEquals(numMessages, servers[0].locateQueue(QUEUE).getMessageCount());
setupSessionFactory(1, isNetty());
createQueue(1, ADDRESS, QUEUE, null, false, RoutingType.ANYCAST);
waitForBindings(1, ADDRESS, 1, 0, true);
waitForBindings(0, ADDRESS, 1, 0, false);
Session session1 = connection1.createSession(false, Session.AUTO_ACKNOWLEDGE);
connection1.start();
MessageConsumer consumer1 = session1.createConsumer(consumeFrom);
waitForBindings(1, ADDRESS, 1, 1, true);
javax.jms.Message message1;
for (int i = 0; i < numMessages; i++) {
message1 = consumer1.receive(5000);
assertNotNull(message1);
}
}
}
@Test @Test
public void testRedistributionWhenRemoteConsumerIsAddedLbOffWithRedistribution() throws Exception { public void testRedistributionWhenRemoteConsumerIsAddedLbOffWithRedistribution() throws Exception {
setupCluster(MessageLoadBalancingType.OFF_WITH_REDISTRIBUTION); setupCluster(MessageLoadBalancingType.OFF_WITH_REDISTRIBUTION);
@ -1293,11 +1394,15 @@ public class MessageRedistributionTest extends ClusterTestBase {
} }
protected void setupCluster(final MessageLoadBalancingType messageLoadBalancingType) throws Exception { protected void setupCluster(final MessageLoadBalancingType messageLoadBalancingType) throws Exception {
setupClusterConnection("cluster0", "queues", messageLoadBalancingType, 1, isNetty(), 0, 1, 2); setupCluster("queues", messageLoadBalancingType);
}
setupClusterConnection("cluster1", "queues", messageLoadBalancingType, 1, isNetty(), 1, 0, 2); protected void setupCluster(final String address, final MessageLoadBalancingType messageLoadBalancingType) throws Exception {
setupClusterConnection("cluster0", address, messageLoadBalancingType, 1, isNetty(), 0, 1, 2);
setupClusterConnection("cluster2", "queues", messageLoadBalancingType, 1, isNetty(), 2, 0, 1); setupClusterConnection("cluster1", address, messageLoadBalancingType, 1, isNetty(), 1, 0, 2);
setupClusterConnection("cluster2", address, messageLoadBalancingType, 1, isNetty(), 2, 0, 1);
} }
protected void setRedistributionDelay(final long delay) { protected void setRedistributionDelay(final long delay) {