ARTEMIS-1925 - ensure OFF_WITH_REDISTRIBUTION behaves like OFF for initial routing

This commit is contained in:
gtully 2021-10-29 09:03:58 +01:00 committed by Gary Tully
parent c54f335f62
commit 2167ac2e30
4 changed files with 105 additions and 15 deletions

View File

@ -408,7 +408,7 @@ public final class BindingsImpl implements Bindings {
private static boolean matchBinding(final Message message,
final Binding binding,
final MessageLoadBalancingType loadBalancingType) {
if (loadBalancingType.equals(MessageLoadBalancingType.OFF) && binding instanceof RemoteQueueBinding) {
if ((loadBalancingType.equals(MessageLoadBalancingType.OFF) || loadBalancingType.equals(MessageLoadBalancingType.OFF_WITH_REDISTRIBUTION)) && binding instanceof RemoteQueueBinding) {
return false;
}

View File

@ -154,7 +154,7 @@ public class RemoteQueueBindingImpl implements RemoteQueueBinding {
@Override
public synchronized boolean isHighAcceptPriority(final Message message) {
if (consumerCount == 0 || messageLoadBalancingType.equals(MessageLoadBalancingType.OFF)) {
if (consumerCount == 0 || messageLoadBalancingType.equals(MessageLoadBalancingType.OFF) || messageLoadBalancingType.equals(MessageLoadBalancingType.OFF_WITH_REDISTRIBUTION)) {
return false;
}

View File

@ -22,6 +22,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.Message;
@ -35,6 +36,8 @@ import org.apache.activemq.artemis.core.postoffice.impl.BindingsImpl;
import org.apache.activemq.artemis.core.server.Bindable;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.RoutingContext;
import org.apache.activemq.artemis.core.server.cluster.RemoteQueueBinding;
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
import org.apache.activemq.artemis.core.server.impl.AckReason;
import org.apache.activemq.artemis.core.server.impl.RefsOperation;
import org.apache.activemq.artemis.core.server.impl.RoutingContextImpl;
@ -45,15 +48,39 @@ import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.junit.Test;
public class BindingsImplTest extends ActiveMQTestBase {
// Constants -----------------------------------------------------
// Attributes ----------------------------------------------------
@Test
public void testGetNextBindingWithLoadBalancingOnDemand() throws Exception {
final FakeRemoteBinding fake = new FakeRemoteBinding(new SimpleString("a"));
fake.filter = null; // such that it wil match all messages
fake.messageLoadBalancingType = MessageLoadBalancingType.ON_DEMAND;
final Bindings bind = new BindingsImpl(null, null);
bind.addBinding(fake);
bind.route(new CoreMessage(0, 100), new RoutingContextImpl(new FakeTransaction()));
assertEquals(1, fake.routedCount.get());
}
// Static --------------------------------------------------------
@Test
public void testGetNextBindingWithLoadBalancingOff() throws Exception {
final FakeRemoteBinding fake = new FakeRemoteBinding(new SimpleString("a"));
fake.filter = null; // such that it wil match all messages
fake.messageLoadBalancingType = MessageLoadBalancingType.OFF;
final Bindings bind = new BindingsImpl(null, null);
bind.addBinding(fake);
bind.route(new CoreMessage(0, 100), new RoutingContextImpl(new FakeTransaction()));
assertEquals(0, fake.routedCount.get());
}
// Constructors --------------------------------------------------
// Public --------------------------------------------------------
@Test
public void testGetNextBindingWithLoadBalancingOffWithRedistribution() throws Exception {
final FakeRemoteBinding fake = new FakeRemoteBinding(new SimpleString("a"));
fake.filter = null; // such that it wil match all messages
fake.messageLoadBalancingType = MessageLoadBalancingType.OFF_WITH_REDISTRIBUTION;
final Bindings bind = new BindingsImpl(null, null);
bind.addBinding(fake);
bind.route(new CoreMessage(0, 100), new RoutingContextImpl(new FakeTransaction()));
assertEquals(0, fake.routedCount.get());
}
@Test
public void testRemoveWhileRouting() throws Exception {
@ -299,8 +326,10 @@ public class BindingsImplTest extends ActiveMQTestBase {
}
private final class FakeBinding implements Binding {
private class FakeBinding implements Binding {
Filter filter = new FakeFilter();
AtomicInteger routedCount = new AtomicInteger();
@Override
public void close() throws Exception {
@ -354,7 +383,7 @@ public class BindingsImplTest extends ActiveMQTestBase {
*/
@Override
public Filter getFilter() {
return new FakeFilter();
return filter;
}
@Override
@ -399,7 +428,7 @@ public class BindingsImplTest extends ActiveMQTestBase {
@Override
public void route(final Message message, final RoutingContext context) throws Exception {
routedCount.incrementAndGet();
}
/* (non-Javadoc)
@ -422,12 +451,56 @@ public class BindingsImplTest extends ActiveMQTestBase {
}
// Package protected ---------------------------------------------
private final class FakeRemoteBinding extends FakeBinding implements RemoteQueueBinding {
MessageLoadBalancingType messageLoadBalancingType;
FakeRemoteBinding(SimpleString name) {
super(name);
}
// Protected -----------------------------------------------------
@Override
public boolean isLocal() {
return false;
}
// Private -------------------------------------------------------
@Override
public int consumerCount() {
return 0;
}
// Inner classes -------------------------------------------------
@Override
public Queue getQueue() {
return null;
}
@Override
public void addConsumer(SimpleString filterString) throws Exception {
}
@Override
public void removeConsumer(SimpleString filterString) throws Exception {
}
@Override
public void reset() {
}
@Override
public void disconnect() {
}
@Override
public void connect() {
}
@Override
public long getRemoteQueueID() {
return 0;
}
@Override
public MessageLoadBalancingType getMessageLoadBalancingType() {
return messageLoadBalancingType;
}
}
}

View File

@ -81,4 +81,21 @@ public class RemoteQueueBindImplTest extends ActiveMQTestBase {
testAddRemoveConsumerWithFilter(i -> null, 1, 0);
}
@Test
public void testIsHighAcceptPriority() throws Exception {
final long id = RandomUtil.randomLong();
final SimpleString address = RandomUtil.randomSimpleString();
final SimpleString uniqueName = RandomUtil.randomSimpleString();
final SimpleString routingName = RandomUtil.randomSimpleString();
final Long remoteQueueID = RandomUtil.randomLong();
final SimpleString filterString = new SimpleString("A>B");
final Queue storeAndForwardQueue = new FakeQueue(null);
final SimpleString bridgeName = RandomUtil.randomSimpleString();
final int distance = 0;
RemoteQueueBindingImpl bindingOff = new RemoteQueueBindingImpl(id, address, uniqueName, routingName, remoteQueueID, filterString, storeAndForwardQueue, bridgeName, distance, MessageLoadBalancingType.OFF);
assertFalse(bindingOff.isHighAcceptPriority(null));
RemoteQueueBindingImpl bindingOffWithRedistribution = new RemoteQueueBindingImpl(id, address, uniqueName, routingName, remoteQueueID, filterString, storeAndForwardQueue, bridgeName, distance, MessageLoadBalancingType.OFF_WITH_REDISTRIBUTION);
assertFalse(bindingOffWithRedistribution.isHighAcceptPriority(null));
}
}