diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/AddressManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/AddressManager.java index 858754d504..c8c0428d2e 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/AddressManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/AddressManager.java @@ -22,6 +22,7 @@ import java.util.Set; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.RoutingType; +import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType; import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.core.transaction.Transaction; @@ -76,4 +77,6 @@ public interface AddressManager { AddressInfo getAddressInfo(SimpleString address); + void updateMessageLoadBalancingTypeForAddress(SimpleString address, MessageLoadBalancingType messageLoadBalancingType) throws Exception; + } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java index b78883fdb5..5d081a3291 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java @@ -29,6 +29,7 @@ import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.RoutingContext; import org.apache.activemq.artemis.api.core.RoutingType; +import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType; import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.core.transaction.Transaction; @@ -150,4 +151,6 @@ public interface PostOffice extends ActiveMQComponent { boolean isAddressBound(SimpleString address) throws Exception; Set getAddresses(); + + void updateMessageLoadBalancingTypeForAddress(SimpleString address, MessageLoadBalancingType messageLoadBalancingType) throws Exception; } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java index 73d695322e..3f9356c664 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java @@ -69,6 +69,7 @@ import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.QueueFactory; import org.apache.activemq.artemis.core.server.RouteContextList; import org.apache.activemq.artemis.core.server.RoutingContext; +import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType; import org.apache.activemq.artemis.core.server.group.GroupingHandler; import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.core.server.impl.RoutingContextImpl; @@ -1005,6 +1006,11 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding return addressManager.getAddresses(); } + @Override + public void updateMessageLoadBalancingTypeForAddress(SimpleString address, MessageLoadBalancingType messageLoadBalancingType) throws Exception { + addressManager.updateMessageLoadBalancingTypeForAddress(address, messageLoadBalancingType); + } + @Override public SimpleString getMatchingQueue(SimpleString address, RoutingType routingType) throws Exception { return addressManager.getMatchingQueue(address, routingType); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/SimpleAddressManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/SimpleAddressManager.java index 054b536eba..aa94de292f 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/SimpleAddressManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/SimpleAddressManager.java @@ -36,6 +36,7 @@ import org.apache.activemq.artemis.core.postoffice.BindingsFactory; import org.apache.activemq.artemis.core.postoffice.QueueBinding; import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle; import org.apache.activemq.artemis.api.core.RoutingType; +import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType; import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.core.transaction.Transaction; import org.apache.activemq.artemis.utils.CompositeAddress; @@ -346,4 +347,9 @@ public class SimpleAddressManager implements AddressManager { public AddressInfo getAddressInfo(SimpleString addressName) { return addressInfoMap.get(addressName); } + + @Override + public void updateMessageLoadBalancingTypeForAddress(SimpleString address, MessageLoadBalancingType messageLoadBalancingType) throws Exception { + getBindingsForRoutingAddress(address).setMessageLoadBalancingType(messageLoadBalancingType); + } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/WildcardAddressManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/WildcardAddressManager.java index eb242f342e..2180e0b036 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/WildcardAddressManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/WildcardAddressManager.java @@ -23,6 +23,7 @@ import org.apache.activemq.artemis.core.postoffice.Address; import org.apache.activemq.artemis.core.postoffice.Binding; import org.apache.activemq.artemis.core.postoffice.Bindings; import org.apache.activemq.artemis.core.postoffice.BindingsFactory; +import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType; import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.core.transaction.Transaction; @@ -107,6 +108,24 @@ public class WildcardAddressManager extends SimpleAddressManager { return exists; } + @Override + public void updateMessageLoadBalancingTypeForAddress(SimpleString address, MessageLoadBalancingType messageLoadBalancingType) throws Exception { + Address add = addAndUpdateAddressMap(address); + Bindings bindingsForRoutingAddress = super.getBindingsForRoutingAddress(address); + if (bindingsForRoutingAddress != null) { + bindingsForRoutingAddress.setMessageLoadBalancingType(messageLoadBalancingType); + } + if (add.containsWildCard()) { + for (Address destAdd : add.getLinkedAddresses()) { + getBindingsForRoutingAddress(destAdd.getAddress()).setMessageLoadBalancingType(messageLoadBalancingType); + } + } else { + for (Address destAdd : add.getLinkedAddresses()) { + super.getBindingsForRoutingAddress(destAdd.getAddress()).setMessageLoadBalancingType(messageLoadBalancingType); + } + } + } + /** * If the address is a wild card then the binding will be removed from the actual mappings for any linked address. * otherwise it will be removed as normal. diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java index 454ba6f0ea..70923be669 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java @@ -48,7 +48,6 @@ import org.apache.activemq.artemis.core.client.impl.Topology; import org.apache.activemq.artemis.core.client.impl.TopologyMemberImpl; import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl; import org.apache.activemq.artemis.core.postoffice.Binding; -import org.apache.activemq.artemis.core.postoffice.Bindings; import org.apache.activemq.artemis.core.postoffice.PostOffice; import org.apache.activemq.artemis.core.postoffice.impl.PostOfficeImpl; import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle; @@ -1233,9 +1232,13 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn } catch (Exception ignore) { } - Bindings theBindings = postOffice.getBindingsForAddress(queueAddress); - - theBindings.setMessageLoadBalancingType(messageLoadBalancingType); + try { + postOffice.updateMessageLoadBalancingTypeForAddress(queueAddress, messageLoadBalancingType); + } catch (Exception e) { + if (logger.isTraceEnabled()) { + logger.trace(e.getLocalizedMessage(), e); + } + } } @@ -1256,7 +1259,8 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn RemoteQueueBinding binding = bindings.remove(clusterName); if (binding == null) { - throw new IllegalStateException("Cannot find binding for queue " + clusterName); + logger.warn("Cannot remove binding, because cannot find binding for queue " + clusterName); + return; } postOffice.removeBinding(binding.getUniqueName(), null, false); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MqttClusterWildcardTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MqttClusterWildcardTest.java index 5485f57e0b..105e7d72cb 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MqttClusterWildcardTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MqttClusterWildcardTest.java @@ -112,6 +112,99 @@ public class MqttClusterWildcardTest extends ClusterTestBase { } } + @Test + public void wildcardsWithBroker1Disconnected() throws Exception { + BlockingConnection connection1 = null; + BlockingConnection connection2 = null; + BlockingConnection connection3 = null; + final String TOPIC = "test/+/some/#"; + try { + + WildcardConfiguration wildcardConfiguration = new WildcardConfiguration(); + wildcardConfiguration.setAnyWords('#'); + wildcardConfiguration.setDelimiter('/'); + wildcardConfiguration.setRoutingEnabled(true); + wildcardConfiguration.setSingleWord('+'); + + setupServer(0, false, isNetty()); + servers[0].getConfiguration().setWildCardConfiguration(wildcardConfiguration); + + + setupClusterConnection("cluster0", "", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 0, 1); + + startServers(0); + + + connection1 = retrieveMQTTConnection("tcp://localhost:61616"); + + + // Subscribe to topics + Topic[] topics = {new Topic(TOPIC, QoS.AT_MOST_ONCE)}; + connection1.subscribe(topics); + + waitForBindings(0, TOPIC, 1, 1, true); + waitForBindings(0, TOPIC, 0, 0, false); + + // Publish Messages + String payload1 = "This is message 1"; + String payload2 = "This is message 2"; + String payload3 = "This is message 3"; + + connection1.publish("test/1/some/la", payload1.getBytes(), QoS.AT_LEAST_ONCE, false); + connection1.publish("test/1/some/la", payload2.getBytes(), QoS.AT_MOST_ONCE, false); + connection1.publish("test/1/some/la", payload3.getBytes(), QoS.AT_MOST_ONCE, false); + + Message message1 = connection1.receive(5, TimeUnit.SECONDS); + + setupServer(1, false, isNetty()); + servers[1].getConfiguration().setWildCardConfiguration(wildcardConfiguration); + + setupClusterConnection("cluster1", "", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 1, 0); + startServers(1); + + connection2 = retrieveMQTTConnection("tcp://localhost:61617"); + connection3 = retrieveMQTTConnection("tcp://localhost:61617"); + connection2.subscribe(topics); + connection3.subscribe(new Topic[]{new Topic("teste/1/some/1", QoS.AT_MOST_ONCE)}); + + waitForBindings(1, TOPIC, 1, 1, false); + waitForBindings(1, TOPIC, 1, 1, true); + + connection1.publish("test/1/some/la", payload1.getBytes(), QoS.AT_LEAST_ONCE, false); + connection1.publish("test/1/some/la", payload2.getBytes(), QoS.AT_MOST_ONCE, false); + connection1.publish("test/1/some/la", payload3.getBytes(), QoS.AT_MOST_ONCE, false); + + + Message message2 = connection1.receive(5, TimeUnit.SECONDS); + Message message3 = connection1.receive(5, TimeUnit.SECONDS); + Message message4 = connection2.receive(5, TimeUnit.SECONDS); + Message message5 = connection2.receive(5, TimeUnit.SECONDS); + Message message6 = connection2.receive(5, TimeUnit.SECONDS); + + assertEquals(payload1, new String(message1.getPayload())); + assertEquals(payload2, new String(message2.getPayload())); + assertEquals(payload3, new String(message3.getPayload())); + assertEquals(payload1, new String(message4.getPayload())); + assertEquals(payload2, new String(message5.getPayload())); + assertEquals(payload3, new String(message6.getPayload())); + + } finally { + String[] topics = new String[]{TOPIC}; + if (connection1 != null) { + connection1.unsubscribe(topics); + connection1.disconnect(); + } + if (connection2 != null) { + connection2.unsubscribe(topics); + connection2.disconnect(); + } + if (connection3 != null) { + connection3.unsubscribe(new String[]{"teste/1/some/1"}); + connection3.disconnect(); + } + } + } + private static BlockingConnection retrieveMQTTConnection(String host) throws Exception { MQTT mqtt = new MQTT(); mqtt.setHost(host); diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java index 96c7451f92..e455e41635 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java @@ -36,6 +36,7 @@ import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.RoutingContext; import org.apache.activemq.artemis.api.core.RoutingType; +import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType; import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.core.server.impl.MessageReferenceImpl; import org.apache.activemq.artemis.core.transaction.Transaction; @@ -68,6 +69,11 @@ public class FakePostOffice implements PostOffice { return null; } + @Override + public void updateMessageLoadBalancingTypeForAddress(SimpleString address, MessageLoadBalancingType messageLoadBalancingType) throws Exception { + + } + @Override public SimpleString getMatchingQueue(SimpleString address, RoutingType routingType) {