ARTEMIS-4718 Diverted messages are not propertly routed on cluster remote bindings

This commit is contained in:
AntonRoskvist 2024-04-08 11:45:12 +02:00 committed by Clebert Suconic
parent 6c02950db3
commit a723f634a2
2 changed files with 53 additions and 0 deletions

View File

@ -116,6 +116,10 @@ public class DivertImpl implements Divert {
copy.setExpiration(message.getExpiration());
//This header could be set if the message is redistributed from a clustered broker.
//It needs to be removed as it will interfere with upcoming routing
//copy.removeExtraBytesProperty(Message.HDR_ROUTE_TO_IDS);
switch (routingType) {
case ANYCAST:
copy.setRoutingType(RoutingType.ANYCAST);

View File

@ -26,12 +26,15 @@ import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.core.config.DivertConfiguration;
import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.server.ComponentConfigurationRoutingType;
import org.apache.activemq.artemis.core.server.cluster.ClusterConnection;
import org.apache.activemq.artemis.core.server.cluster.MessageFlowRecord;
import org.apache.activemq.artemis.core.server.cluster.RemoteQueueBinding;
import org.apache.activemq.artemis.core.server.cluster.impl.ClusterConnectionImpl;
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.util.Wait;
import org.junit.Assert;
import org.junit.Ignore;
@ -546,4 +549,50 @@ public class SimpleSymmetricClusterTest extends ClusterTestBase {
}
@Test
public void testDivertRedistributedMessage() throws Exception {
final String queue = "queue0";
final String divertedQueueName = "divertedQueue";
final int messageCount = 10;
setupServer(0, true, isNetty());
setupServer(1, true, isNetty());
setupClusterConnection("cluster0", "", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 0, 1);
setupClusterConnection("cluster0", "", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 1, 0);
servers[0].getConfiguration().addAddressSetting("#", new AddressSettings().setRedistributionDelay(0));
servers[1].getConfiguration().addAddressSetting("#", new AddressSettings().setRedistributionDelay(0));
startServers(0, 1);
servers[0].deployDivert(new DivertConfiguration()
.setName("myDivert")
.setAddress(queue)
.setRoutingType(ComponentConfigurationRoutingType.ANYCAST)
.setForwardingAddress(divertedQueueName)
.setExclusive(true));
setupSessionFactory(0, isNetty());
setupSessionFactory(1, isNetty());
createQueue(0, queue, queue, null, true, RoutingType.ANYCAST);
createQueue(1, queue, queue, null, true, RoutingType.ANYCAST);
createQueue(0, divertedQueueName, divertedQueueName, null, true, RoutingType.ANYCAST);
createQueue(1, divertedQueueName, divertedQueueName, null, true, RoutingType.ANYCAST);
addConsumer(0, 0, queue, null);
waitForBindings(0, queue, 1, 1, true);
waitForBindings(1, queue, 1, 1, false);
send(1, queue, messageCount, true, null);
Wait.assertEquals((long) messageCount, () -> servers[0].locateQueue(divertedQueueName).getMessageCount(), 2000, 100);
addConsumer(1, 1, divertedQueueName, null);
verifyReceiveAll(messageCount, 1);
closeAllConsumers();
}
}