This commit is contained in:
Justin Bertram 2022-02-07 20:06:46 -06:00
commit 124cd6ca58
No known key found for this signature in database
GPG Key ID: F41830B875BB8633
3 changed files with 53 additions and 3 deletions

View File

@ -23,6 +23,7 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
@ -30,6 +31,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.postoffice.Binding;
@ -408,7 +410,7 @@ public final class BindingsImpl implements Bindings {
private static boolean matchBinding(final Message message,
final Binding binding,
final MessageLoadBalancingType loadBalancingType) {
if ((loadBalancingType.equals(MessageLoadBalancingType.OFF) || loadBalancingType.equals(MessageLoadBalancingType.OFF_WITH_REDISTRIBUTION)) && binding instanceof RemoteQueueBinding) {
if ((loadBalancingType.equals(MessageLoadBalancingType.OFF) || loadBalancingType.equals(MessageLoadBalancingType.OFF_WITH_REDISTRIBUTION) && !Objects.equals(message.getRoutingType(), RoutingType.MULTICAST)) && binding instanceof RemoteQueueBinding) {
return false;
}

View File

@ -723,7 +723,7 @@ public abstract class ClusterTestBase extends ActiveMQTestBase {
final int msgEnd,
final boolean durable,
final String filterVal) throws Exception {
sendInRange(node, address, msgStart, msgEnd, durable, filterVal, null);
sendInRange(node, address, msgStart, msgEnd, durable, filterVal, null, null);
}
protected void sendInRange(final int node,
@ -732,6 +732,7 @@ public abstract class ClusterTestBase extends ActiveMQTestBase {
final int msgEnd,
final boolean durable,
final String filterVal,
final RoutingType routingType,
final AtomicInteger duplicateDetectionSeq) throws Exception {
ClientSessionFactory sf = sfs[node];
@ -756,6 +757,10 @@ public abstract class ClusterTestBase extends ActiveMQTestBase {
message.putStringProperty(Message.HDR_DUPLICATE_DETECTION_ID, new SimpleString(str));
}
if (routingType != null) {
message.setRoutingType(routingType);
}
message.putIntProperty(ClusterTestBase.COUNT_PROP, i);
if (isLargeMessage()) {
@ -853,7 +858,17 @@ public abstract class ClusterTestBase extends ActiveMQTestBase {
final boolean durable,
final String filterVal,
final AtomicInteger duplicateDetectionCounter) throws Exception {
sendInRange(node, address, 0, numMessages, durable, filterVal, duplicateDetectionCounter);
send(node, address, numMessages, durable, filterVal, null, duplicateDetectionCounter);
}
protected void send(final int node,
final String address,
final int numMessages,
final boolean durable,
final String filterVal,
final RoutingType routingType,
final AtomicInteger duplicateDetectionCounter) throws Exception {
sendInRange(node, address, 0, numMessages, durable, filterVal, routingType, duplicateDetectionCounter);
}
protected void verifyReceiveAllInRange(final boolean ack,

View File

@ -831,6 +831,39 @@ public class MessageRedistributionTest extends ClusterTestBase {
}
@Test
public void testRedistributionToRemoteMulticastConsumerLbOffWithRedistribution() throws Exception {
String address = "test.address";
String queue = "queue";
String clusterAddress = "test";
AddressSettings settings = new AddressSettings().setRedistributionDelay(0).setAutoCreateAddresses(true).setAutoCreateQueues(true);
RoutingType routingType = RoutingType.MULTICAST;
getServer(0).getAddressSettingsRepository().addMatch(address, settings);
getServer(1).getAddressSettingsRepository().addMatch(address, settings);
setupClusterConnection("cluster0", clusterAddress, MessageLoadBalancingType.OFF_WITH_REDISTRIBUTION, 1, isNetty(), 0, 1);
setupClusterConnection("cluster0", clusterAddress, MessageLoadBalancingType.OFF_WITH_REDISTRIBUTION, 1, isNetty(), 1, 0);
startServers(0, 1);
setupSessionFactory(0, isNetty());
setupSessionFactory(1, isNetty());
createQueue(0, address, queue, null, false, routingType);
addConsumer(0, 0, queue, null);
waitForBindings(0, address, 1, 1, true);
waitForBindings(1, address, 1, 1, false);
createAddressInfo(1, address, routingType, 0, false);
final int noMessages = 10;
send(1, address, noMessages, false, null, routingType, null);
verifyReceiveAll(noMessages, 0);
}
@Test
public void testBackAndForth() throws Exception {
for (int i = 0; i < 10; i++) {