ARTEMIS-1680 - Synchronize message load balacing type between brokers
This guarantees the update of message load balancing type between addresses and linked adresses
This commit is contained in:
parent
8dfa345562
commit
13e071158d
|
@ -22,6 +22,7 @@ import java.util.Set;
|
|||
|
||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
import org.apache.activemq.artemis.api.core.RoutingType;
|
||||
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
|
||||
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
|
||||
import org.apache.activemq.artemis.core.transaction.Transaction;
|
||||
|
||||
|
@ -76,4 +77,6 @@ public interface AddressManager {
|
|||
|
||||
AddressInfo getAddressInfo(SimpleString address);
|
||||
|
||||
void updateMessageLoadBalancingTypeForAddress(SimpleString address, MessageLoadBalancingType messageLoadBalancingType) throws Exception;
|
||||
|
||||
}
|
||||
|
|
|
@ -29,6 +29,7 @@ import org.apache.activemq.artemis.core.server.MessageReference;
|
|||
import org.apache.activemq.artemis.core.server.Queue;
|
||||
import org.apache.activemq.artemis.core.server.RoutingContext;
|
||||
import org.apache.activemq.artemis.api.core.RoutingType;
|
||||
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
|
||||
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
|
||||
import org.apache.activemq.artemis.core.transaction.Transaction;
|
||||
|
||||
|
@ -150,4 +151,6 @@ public interface PostOffice extends ActiveMQComponent {
|
|||
boolean isAddressBound(SimpleString address) throws Exception;
|
||||
|
||||
Set<SimpleString> getAddresses();
|
||||
|
||||
void updateMessageLoadBalancingTypeForAddress(SimpleString address, MessageLoadBalancingType messageLoadBalancingType) throws Exception;
|
||||
}
|
||||
|
|
|
@ -69,6 +69,7 @@ import org.apache.activemq.artemis.core.server.Queue;
|
|||
import org.apache.activemq.artemis.core.server.QueueFactory;
|
||||
import org.apache.activemq.artemis.core.server.RouteContextList;
|
||||
import org.apache.activemq.artemis.core.server.RoutingContext;
|
||||
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
|
||||
import org.apache.activemq.artemis.core.server.group.GroupingHandler;
|
||||
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
|
||||
import org.apache.activemq.artemis.core.server.impl.RoutingContextImpl;
|
||||
|
@ -1005,6 +1006,11 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
|
|||
return addressManager.getAddresses();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void updateMessageLoadBalancingTypeForAddress(SimpleString address, MessageLoadBalancingType messageLoadBalancingType) throws Exception {
|
||||
addressManager.updateMessageLoadBalancingTypeForAddress(address, messageLoadBalancingType);
|
||||
}
|
||||
|
||||
@Override
|
||||
public SimpleString getMatchingQueue(SimpleString address, RoutingType routingType) throws Exception {
|
||||
return addressManager.getMatchingQueue(address, routingType);
|
||||
|
|
|
@ -36,6 +36,7 @@ import org.apache.activemq.artemis.core.postoffice.BindingsFactory;
|
|||
import org.apache.activemq.artemis.core.postoffice.QueueBinding;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
|
||||
import org.apache.activemq.artemis.api.core.RoutingType;
|
||||
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
|
||||
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
|
||||
import org.apache.activemq.artemis.core.transaction.Transaction;
|
||||
import org.apache.activemq.artemis.utils.CompositeAddress;
|
||||
|
@ -346,4 +347,9 @@ public class SimpleAddressManager implements AddressManager {
|
|||
public AddressInfo getAddressInfo(SimpleString addressName) {
|
||||
return addressInfoMap.get(addressName);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void updateMessageLoadBalancingTypeForAddress(SimpleString address, MessageLoadBalancingType messageLoadBalancingType) throws Exception {
|
||||
getBindingsForRoutingAddress(address).setMessageLoadBalancingType(messageLoadBalancingType);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -23,6 +23,7 @@ import org.apache.activemq.artemis.core.postoffice.Address;
|
|||
import org.apache.activemq.artemis.core.postoffice.Binding;
|
||||
import org.apache.activemq.artemis.core.postoffice.Bindings;
|
||||
import org.apache.activemq.artemis.core.postoffice.BindingsFactory;
|
||||
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
|
||||
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
|
||||
import org.apache.activemq.artemis.core.transaction.Transaction;
|
||||
|
||||
|
@ -107,6 +108,24 @@ public class WildcardAddressManager extends SimpleAddressManager {
|
|||
return exists;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void updateMessageLoadBalancingTypeForAddress(SimpleString address, MessageLoadBalancingType messageLoadBalancingType) throws Exception {
|
||||
Address add = addAndUpdateAddressMap(address);
|
||||
Bindings bindingsForRoutingAddress = super.getBindingsForRoutingAddress(address);
|
||||
if (bindingsForRoutingAddress != null) {
|
||||
bindingsForRoutingAddress.setMessageLoadBalancingType(messageLoadBalancingType);
|
||||
}
|
||||
if (add.containsWildCard()) {
|
||||
for (Address destAdd : add.getLinkedAddresses()) {
|
||||
getBindingsForRoutingAddress(destAdd.getAddress()).setMessageLoadBalancingType(messageLoadBalancingType);
|
||||
}
|
||||
} else {
|
||||
for (Address destAdd : add.getLinkedAddresses()) {
|
||||
super.getBindingsForRoutingAddress(destAdd.getAddress()).setMessageLoadBalancingType(messageLoadBalancingType);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* If the address is a wild card then the binding will be removed from the actual mappings for any linked address.
|
||||
* otherwise it will be removed as normal.
|
||||
|
|
|
@ -48,7 +48,6 @@ import org.apache.activemq.artemis.core.client.impl.Topology;
|
|||
import org.apache.activemq.artemis.core.client.impl.TopologyMemberImpl;
|
||||
import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl;
|
||||
import org.apache.activemq.artemis.core.postoffice.Binding;
|
||||
import org.apache.activemq.artemis.core.postoffice.Bindings;
|
||||
import org.apache.activemq.artemis.core.postoffice.PostOffice;
|
||||
import org.apache.activemq.artemis.core.postoffice.impl.PostOfficeImpl;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
|
||||
|
@ -1233,9 +1232,13 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn
|
|||
} catch (Exception ignore) {
|
||||
}
|
||||
|
||||
Bindings theBindings = postOffice.getBindingsForAddress(queueAddress);
|
||||
|
||||
theBindings.setMessageLoadBalancingType(messageLoadBalancingType);
|
||||
try {
|
||||
postOffice.updateMessageLoadBalancingTypeForAddress(queueAddress, messageLoadBalancingType);
|
||||
} catch (Exception e) {
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace(e.getLocalizedMessage(), e);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
@ -1256,7 +1259,8 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn
|
|||
RemoteQueueBinding binding = bindings.remove(clusterName);
|
||||
|
||||
if (binding == null) {
|
||||
throw new IllegalStateException("Cannot find binding for queue " + clusterName);
|
||||
logger.warn("Cannot remove binding, because cannot find binding for queue " + clusterName);
|
||||
return;
|
||||
}
|
||||
|
||||
postOffice.removeBinding(binding.getUniqueName(), null, false);
|
||||
|
|
|
@ -112,6 +112,99 @@ public class MqttClusterWildcardTest extends ClusterTestBase {
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void wildcardsWithBroker1Disconnected() throws Exception {
|
||||
BlockingConnection connection1 = null;
|
||||
BlockingConnection connection2 = null;
|
||||
BlockingConnection connection3 = null;
|
||||
final String TOPIC = "test/+/some/#";
|
||||
try {
|
||||
|
||||
WildcardConfiguration wildcardConfiguration = new WildcardConfiguration();
|
||||
wildcardConfiguration.setAnyWords('#');
|
||||
wildcardConfiguration.setDelimiter('/');
|
||||
wildcardConfiguration.setRoutingEnabled(true);
|
||||
wildcardConfiguration.setSingleWord('+');
|
||||
|
||||
setupServer(0, false, isNetty());
|
||||
servers[0].getConfiguration().setWildCardConfiguration(wildcardConfiguration);
|
||||
|
||||
|
||||
setupClusterConnection("cluster0", "", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 0, 1);
|
||||
|
||||
startServers(0);
|
||||
|
||||
|
||||
connection1 = retrieveMQTTConnection("tcp://localhost:61616");
|
||||
|
||||
|
||||
// Subscribe to topics
|
||||
Topic[] topics = {new Topic(TOPIC, QoS.AT_MOST_ONCE)};
|
||||
connection1.subscribe(topics);
|
||||
|
||||
waitForBindings(0, TOPIC, 1, 1, true);
|
||||
waitForBindings(0, TOPIC, 0, 0, false);
|
||||
|
||||
// Publish Messages
|
||||
String payload1 = "This is message 1";
|
||||
String payload2 = "This is message 2";
|
||||
String payload3 = "This is message 3";
|
||||
|
||||
connection1.publish("test/1/some/la", payload1.getBytes(), QoS.AT_LEAST_ONCE, false);
|
||||
connection1.publish("test/1/some/la", payload2.getBytes(), QoS.AT_MOST_ONCE, false);
|
||||
connection1.publish("test/1/some/la", payload3.getBytes(), QoS.AT_MOST_ONCE, false);
|
||||
|
||||
Message message1 = connection1.receive(5, TimeUnit.SECONDS);
|
||||
|
||||
setupServer(1, false, isNetty());
|
||||
servers[1].getConfiguration().setWildCardConfiguration(wildcardConfiguration);
|
||||
|
||||
setupClusterConnection("cluster1", "", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 1, 0);
|
||||
startServers(1);
|
||||
|
||||
connection2 = retrieveMQTTConnection("tcp://localhost:61617");
|
||||
connection3 = retrieveMQTTConnection("tcp://localhost:61617");
|
||||
connection2.subscribe(topics);
|
||||
connection3.subscribe(new Topic[]{new Topic("teste/1/some/1", QoS.AT_MOST_ONCE)});
|
||||
|
||||
waitForBindings(1, TOPIC, 1, 1, false);
|
||||
waitForBindings(1, TOPIC, 1, 1, true);
|
||||
|
||||
connection1.publish("test/1/some/la", payload1.getBytes(), QoS.AT_LEAST_ONCE, false);
|
||||
connection1.publish("test/1/some/la", payload2.getBytes(), QoS.AT_MOST_ONCE, false);
|
||||
connection1.publish("test/1/some/la", payload3.getBytes(), QoS.AT_MOST_ONCE, false);
|
||||
|
||||
|
||||
Message message2 = connection1.receive(5, TimeUnit.SECONDS);
|
||||
Message message3 = connection1.receive(5, TimeUnit.SECONDS);
|
||||
Message message4 = connection2.receive(5, TimeUnit.SECONDS);
|
||||
Message message5 = connection2.receive(5, TimeUnit.SECONDS);
|
||||
Message message6 = connection2.receive(5, TimeUnit.SECONDS);
|
||||
|
||||
assertEquals(payload1, new String(message1.getPayload()));
|
||||
assertEquals(payload2, new String(message2.getPayload()));
|
||||
assertEquals(payload3, new String(message3.getPayload()));
|
||||
assertEquals(payload1, new String(message4.getPayload()));
|
||||
assertEquals(payload2, new String(message5.getPayload()));
|
||||
assertEquals(payload3, new String(message6.getPayload()));
|
||||
|
||||
} finally {
|
||||
String[] topics = new String[]{TOPIC};
|
||||
if (connection1 != null) {
|
||||
connection1.unsubscribe(topics);
|
||||
connection1.disconnect();
|
||||
}
|
||||
if (connection2 != null) {
|
||||
connection2.unsubscribe(topics);
|
||||
connection2.disconnect();
|
||||
}
|
||||
if (connection3 != null) {
|
||||
connection3.unsubscribe(new String[]{"teste/1/some/1"});
|
||||
connection3.disconnect();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static BlockingConnection retrieveMQTTConnection(String host) throws Exception {
|
||||
MQTT mqtt = new MQTT();
|
||||
mqtt.setHost(host);
|
||||
|
|
|
@ -36,6 +36,7 @@ import org.apache.activemq.artemis.core.server.MessageReference;
|
|||
import org.apache.activemq.artemis.core.server.Queue;
|
||||
import org.apache.activemq.artemis.core.server.RoutingContext;
|
||||
import org.apache.activemq.artemis.api.core.RoutingType;
|
||||
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
|
||||
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
|
||||
import org.apache.activemq.artemis.core.server.impl.MessageReferenceImpl;
|
||||
import org.apache.activemq.artemis.core.transaction.Transaction;
|
||||
|
@ -68,6 +69,11 @@ public class FakePostOffice implements PostOffice {
|
|||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void updateMessageLoadBalancingTypeForAddress(SimpleString address, MessageLoadBalancingType messageLoadBalancingType) throws Exception {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public SimpleString getMatchingQueue(SimpleString address, RoutingType routingType) {
|
||||
|
||||
|
|
Loading…
Reference in New Issue