ARTEMIS-1925 - allow redistribution with new loadbalance type of OFF_WITH_REDISTRIBUTION to ensure local consumers get priority, we only optionally redistribute when messages are stuck

This commit is contained in:
gtully 2021-07-27 14:15:58 +01:00 committed by Gary Tully
parent b27aa03a37
commit 276f822a0e
11 changed files with 68 additions and 8 deletions

View File

@ -175,7 +175,7 @@ public class Create extends InputAbstract {
@Option(name = "--max-hops", description = "Number of hops on the cluster configuration")
private int maxHops = 0;
@Option(name = "--message-load-balancing", description = "Load balancing policy on cluster. [ON_DEMAND (default) | STRICT | OFF]")
@Option(name = "--message-load-balancing", description = "Load balancing policy on cluster. [ON_DEMAND (default) | STRICT | OFF | OFF_WITH_REDISTRIBUTION]")
private MessageLoadBalancingType messageLoadBalancing = MessageLoadBalancingType.ON_DEMAND;
@Option(name = "--replicated", description = "Enable broker replication")

View File

@ -233,6 +233,7 @@ public final class Validators {
public void validate(final String name, final Object value) {
String val = (String) value;
if (val == null || !val.equals(MessageLoadBalancingType.OFF.toString()) &&
!val.equals(MessageLoadBalancingType.OFF_WITH_REDISTRIBUTION.toString()) &&
!val.equals(MessageLoadBalancingType.STRICT.toString()) &&
!val.equals(MessageLoadBalancingType.ON_DEMAND.toString())) {
throw ActiveMQMessageBundle.BUNDLE.invalidMessageLoadBalancingType(val);

View File

@ -174,7 +174,7 @@ public final class BindingsImpl implements Bindings {
@Override
public boolean allowRedistribute() {
return messageLoadBalancingType.equals(MessageLoadBalancingType.ON_DEMAND);
return messageLoadBalancingType.equals(MessageLoadBalancingType.ON_DEMAND) || messageLoadBalancingType.equals(MessageLoadBalancingType.OFF_WITH_REDISTRIBUTION);
}
@Override

View File

@ -20,7 +20,7 @@ import org.apache.activemq.artemis.utils.uri.BeanSupport;
import org.apache.commons.beanutils.Converter;
public enum MessageLoadBalancingType {
OFF("OFF"), STRICT("STRICT"), ON_DEMAND("ON_DEMAND");
OFF("OFF"), STRICT("STRICT"), ON_DEMAND("ON_DEMAND"), OFF_WITH_REDISTRIBUTION("OFF_WITH_REDISTRIBUTION");
static {
// for URI support on ClusterConnection
@ -52,6 +52,8 @@ public enum MessageLoadBalancingType {
return MessageLoadBalancingType.STRICT;
} else if (string.equals(ON_DEMAND.getType())) {
return MessageLoadBalancingType.ON_DEMAND;
} else if (string.equals(OFF_WITH_REDISTRIBUTION.getType())) {
return MessageLoadBalancingType.OFF_WITH_REDISTRIBUTION;
} else {
return null;
}

View File

@ -2509,6 +2509,7 @@
<xsd:enumeration value="OFF"/>
<xsd:enumeration value="STRICT"/>
<xsd:enumeration value="ON_DEMAND"/>
<xsd:enumeration value="OFF_WITH_REDISTRIBUTION"/>
</xsd:restriction>
</xsd:simpleType>
</xsd:element>

View File

@ -337,7 +337,7 @@ public class FileConfigurationTest extends ConfigurationImplTest {
}
}
Assert.assertEquals(2, conf.getClusterConfigurations().size());
Assert.assertEquals(3, conf.getClusterConfigurations().size());
HAPolicyConfiguration pc = conf.getHAPolicyConfiguration();
assertNotNull(pc);
@ -348,7 +348,9 @@ public class FileConfigurationTest extends ConfigurationImplTest {
assertEquals(lopc.getScaleDownConfiguration().getDiscoveryGroup(), "dg1");
for (ClusterConnectionConfiguration ccc : conf.getClusterConfigurations()) {
if (ccc.getName().equals("cluster-connection1")) {
if (ccc.getName().equals("cluster-connection3")) {
Assert.assertEquals(MessageLoadBalancingType.OFF_WITH_REDISTRIBUTION, ccc.getMessageLoadBalancingType());
} else if (ccc.getName().equals("cluster-connection1")) {
Assert.assertEquals("cluster-connection1", ccc.getName());
Assert.assertEquals("clusterConnectionConf minLargeMessageSize", 321, ccc.getMinLargeMessageSize());
assertEquals("check-period", 331, ccc.getClientFailureCheckPeriod());

View File

@ -36,6 +36,13 @@ public class ClusterConnectionConfigurationTest {
Assert.assertEquals("tcp://localhost:6557", configuration.getCompositeMembers().getComponents()[1].toString());
}
@Test
public void testClusterConnectionStaticOffWithRedistribution() throws Exception {
ClusterConnectionConfigurationParser parser = new ClusterConnectionConfigurationParser();
ClusterConnectionConfiguration configuration = parser.newObject(new URI("static:(tcp://localhost:6556,tcp://localhost:6557)?minLargeMessageSize=132;s&messageLoadBalancingType=OFF_WITH_REDISTRIBUTION"), null);
Assert.assertEquals(MessageLoadBalancingType.OFF_WITH_REDISTRIBUTION, configuration.getMessageLoadBalancingType());
}
@Test
public void testClusterConnectionStatic2() throws Exception {
ClusterConnectionConfigurationParser parser = new ClusterConnectionConfigurationParser();

View File

@ -413,6 +413,10 @@
<call-failover-timeout>456</call-failover-timeout>
<discovery-group-ref discovery-group-name="dg1"/>
</cluster-connection>
<cluster-connection name="cluster-connection3">
<connector-ref>connector2</connector-ref>
<message-load-balancing>OFF_WITH_REDISTRIBUTION</message-load-balancing>
</cluster-connection>
</cluster-connections>
<broker-connections>
<amqp-connection uri="tcp://test1:111" name="test1" retry-interval="333" reconnect-attempts="33" user="testuser" password="testpassword">

View File

@ -297,6 +297,10 @@
<call-failover-timeout>456</call-failover-timeout>
<discovery-group-ref discovery-group-name="dg1"/>
</cluster-connection>
<cluster-connection name="cluster-connection3">
<connector-ref>connector2</connector-ref>
<message-load-balancing>OFF_WITH_REDISTRIBUTION</message-load-balancing>
</cluster-connection>
</cluster-connections>
<broker-connections>
<amqp-connection uri="tcp://test1:111" name="test1" retry-interval="333" reconnect-attempts="33" user="testuser" password="testpassword">

View File

@ -609,7 +609,7 @@ specified. The following shows all the available configuration options
- `message-load-balancing`. This parameter determines if/how
messages will be distributed between other nodes of the cluster.
It can be one of three values - `OFF`, `STRICT`, or `ON_DEMAND`
It can be one of four values - `OFF`, `STRICT`, `OFF_WITH_REDISTRIBUTION` or `ON_DEMAND`
(default). This parameter replaces the deprecated
`forward-when-no-consumers` parameter.
@ -631,7 +631,12 @@ specified. The following shows all the available configuration options
consumers have message filters (selectors) at least one of those
selectors must match the message. Using `ON_DEMAND` is like setting
the legacy `forward-when-no-consumers` parameter to `false`.
If this is set to `OFF_WITH_REDISTRIBUTION` then like with 'OFF' messages
won't be initially routed to other nodes in the cluster. However, if [redistribution](#message-redistribution)
is configured, it can forward messages in the normal way. In this way local consumers
will always have priority.
Keep in mind that this message forwarding/balancing is what we call
"initial distribution." It is different than *redistribution* which
is [discussed below](#message-redistribution).
@ -824,7 +829,7 @@ redistribution Apache ActiveMQ Artemis can be configured to automatically
*redistribute* messages from queues which have no consumers or consumers
with filters that don't match messages. The messages are re-routed to
other nodes in the cluster which do have matching consumers. To enable
this functionality `message-load-balancing` must be `ON_DEMAND`.
this functionality `message-load-balancing` must be `ON_DEMAND` or `OFF_WITH_REDISTRIBUTION`
Message redistribution can be configured to kick in immediately after
the need to redistribute is detected, or to wait a configurable delay before redistributing.

View File

@ -730,6 +730,40 @@ public class MessageRedistributionTest extends ClusterTestBase {
verifyNotReceive(1);
}
@Test
public void testRedistributionWhenRemoteConsumerIsAddedLbOffWithRedistribution() throws Exception {
setupCluster(MessageLoadBalancingType.OFF_WITH_REDISTRIBUTION);
startServers(0, 1, 2);
setupSessionFactory(0, isNetty());
setupSessionFactory(1, isNetty());
setupSessionFactory(2, isNetty());
createQueue(0, "queues.testaddress", "queue0", null, false);
createQueue(1, "queues.testaddress", "queue0", null, false);
createQueue(2, "queues.testaddress", "queue0", null, false);
addConsumer(0, 0, "queue0", null);
waitForBindings(0, "queues.testaddress", 1, 1, true);
waitForBindings(1, "queues.testaddress", 1, 0, true);
waitForBindings(2, "queues.testaddress", 1, 0, true);
waitForBindings(0, "queues.testaddress", 2, 0, false);
waitForBindings(1, "queues.testaddress", 2, 1, false);
waitForBindings(2, "queues.testaddress", 2, 1, false);
send(0, "queues.testaddress", 20, false, null);
removeConsumer(0);
addConsumer(1, 1, "queue0", null);
verifyReceiveAll(20, 1);
verifyNotReceive(1);
}
@Test
public void testBackAndForth() throws Exception {
for (int i = 0; i < 10; i++) {