ARTEMIS-3608 - Add distribution for Multicast messages to OFF_WITH_REDISTRIBUTION to avoid message loss
This commit is contained in:
parent
8353eca9ee
commit
bf83a9b3d1
|
@ -23,6 +23,7 @@ import java.util.ArrayList;
|
|||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.CopyOnWriteArraySet;
|
||||
|
@ -30,6 +31,7 @@ import java.util.concurrent.atomic.AtomicInteger;
|
|||
|
||||
import org.apache.activemq.artemis.api.core.Message;
|
||||
import org.apache.activemq.artemis.api.core.Pair;
|
||||
import org.apache.activemq.artemis.api.core.RoutingType;
|
||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
import org.apache.activemq.artemis.core.filter.Filter;
|
||||
import org.apache.activemq.artemis.core.postoffice.Binding;
|
||||
|
@ -408,7 +410,7 @@ public final class BindingsImpl implements Bindings {
|
|||
private static boolean matchBinding(final Message message,
|
||||
final Binding binding,
|
||||
final MessageLoadBalancingType loadBalancingType) {
|
||||
if ((loadBalancingType.equals(MessageLoadBalancingType.OFF) || loadBalancingType.equals(MessageLoadBalancingType.OFF_WITH_REDISTRIBUTION)) && binding instanceof RemoteQueueBinding) {
|
||||
if ((loadBalancingType.equals(MessageLoadBalancingType.OFF) || loadBalancingType.equals(MessageLoadBalancingType.OFF_WITH_REDISTRIBUTION) && !Objects.equals(message.getRoutingType(), RoutingType.MULTICAST)) && binding instanceof RemoteQueueBinding) {
|
||||
return false;
|
||||
}
|
||||
|
||||
|
|
|
@ -723,7 +723,7 @@ public abstract class ClusterTestBase extends ActiveMQTestBase {
|
|||
final int msgEnd,
|
||||
final boolean durable,
|
||||
final String filterVal) throws Exception {
|
||||
sendInRange(node, address, msgStart, msgEnd, durable, filterVal, null);
|
||||
sendInRange(node, address, msgStart, msgEnd, durable, filterVal, null, null);
|
||||
}
|
||||
|
||||
protected void sendInRange(final int node,
|
||||
|
@ -732,6 +732,7 @@ public abstract class ClusterTestBase extends ActiveMQTestBase {
|
|||
final int msgEnd,
|
||||
final boolean durable,
|
||||
final String filterVal,
|
||||
final RoutingType routingType,
|
||||
final AtomicInteger duplicateDetectionSeq) throws Exception {
|
||||
ClientSessionFactory sf = sfs[node];
|
||||
|
||||
|
@ -756,6 +757,10 @@ public abstract class ClusterTestBase extends ActiveMQTestBase {
|
|||
message.putStringProperty(Message.HDR_DUPLICATE_DETECTION_ID, new SimpleString(str));
|
||||
}
|
||||
|
||||
if (routingType != null) {
|
||||
message.setRoutingType(routingType);
|
||||
}
|
||||
|
||||
message.putIntProperty(ClusterTestBase.COUNT_PROP, i);
|
||||
|
||||
if (isLargeMessage()) {
|
||||
|
@ -853,7 +858,17 @@ public abstract class ClusterTestBase extends ActiveMQTestBase {
|
|||
final boolean durable,
|
||||
final String filterVal,
|
||||
final AtomicInteger duplicateDetectionCounter) throws Exception {
|
||||
sendInRange(node, address, 0, numMessages, durable, filterVal, duplicateDetectionCounter);
|
||||
send(node, address, numMessages, durable, filterVal, null, duplicateDetectionCounter);
|
||||
}
|
||||
|
||||
protected void send(final int node,
|
||||
final String address,
|
||||
final int numMessages,
|
||||
final boolean durable,
|
||||
final String filterVal,
|
||||
final RoutingType routingType,
|
||||
final AtomicInteger duplicateDetectionCounter) throws Exception {
|
||||
sendInRange(node, address, 0, numMessages, durable, filterVal, routingType, duplicateDetectionCounter);
|
||||
}
|
||||
|
||||
protected void verifyReceiveAllInRange(final boolean ack,
|
||||
|
|
|
@ -831,6 +831,39 @@ public class MessageRedistributionTest extends ClusterTestBase {
|
|||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRedistributionToRemoteMulticastConsumerLbOffWithRedistribution() throws Exception {
|
||||
|
||||
String address = "test.address";
|
||||
String queue = "queue";
|
||||
String clusterAddress = "test";
|
||||
AddressSettings settings = new AddressSettings().setRedistributionDelay(0).setAutoCreateAddresses(true).setAutoCreateQueues(true);
|
||||
RoutingType routingType = RoutingType.MULTICAST;
|
||||
|
||||
getServer(0).getAddressSettingsRepository().addMatch(address, settings);
|
||||
getServer(1).getAddressSettingsRepository().addMatch(address, settings);
|
||||
|
||||
setupClusterConnection("cluster0", clusterAddress, MessageLoadBalancingType.OFF_WITH_REDISTRIBUTION, 1, isNetty(), 0, 1);
|
||||
setupClusterConnection("cluster0", clusterAddress, MessageLoadBalancingType.OFF_WITH_REDISTRIBUTION, 1, isNetty(), 1, 0);
|
||||
|
||||
startServers(0, 1);
|
||||
|
||||
setupSessionFactory(0, isNetty());
|
||||
setupSessionFactory(1, isNetty());
|
||||
|
||||
createQueue(0, address, queue, null, false, routingType);
|
||||
addConsumer(0, 0, queue, null);
|
||||
waitForBindings(0, address, 1, 1, true);
|
||||
waitForBindings(1, address, 1, 1, false);
|
||||
|
||||
createAddressInfo(1, address, routingType, 0, false);
|
||||
|
||||
final int noMessages = 10;
|
||||
send(1, address, noMessages, false, null, routingType, null);
|
||||
verifyReceiveAll(noMessages, 0);
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBackAndForth() throws Exception {
|
||||
for (int i = 0; i < 10; i++) {
|
||||
|
|
Loading…
Reference in New Issue