From 46b642447c20200fa29932d1fd5df596a7b97759 Mon Sep 17 00:00:00 2001 From: "raul.valdoleiros" Date: Fri, 24 Nov 2017 17:53:14 +0000 Subject: [PATCH 1/2] ARTEMIS-1523 wildcard binding configured wrong Bindings with wildcards are not configured with the proper message-load-balancing type. --- .../artemis/core/postoffice/Bindings.java | 2 + .../core/postoffice/impl/BindingsImpl.java | 5 + .../impl/WildcardAddressManager.java | 11 +- .../mqtt/clustered-queue-mqtt/pom.xml | 161 ++++++++++++++++++ .../mqtt/clustered-queue-mqtt/readme.html | 55 ++++++ .../example/ClusteredQueueMQTTExample.java | 82 +++++++++ .../resources/activemq/server0/broker.xml | 98 +++++++++++ .../resources/activemq/server1/broker.xml | 98 +++++++++++ examples/protocols/mqtt/pom.xml | 2 + .../imported/MqttClusterWildcardTest.java | 122 +++++++++++++ .../impl/WildcardAddressManagerUnitTest.java | 5 + 11 files changed, 636 insertions(+), 5 deletions(-) create mode 100644 examples/protocols/mqtt/clustered-queue-mqtt/pom.xml create mode 100644 examples/protocols/mqtt/clustered-queue-mqtt/readme.html create mode 100644 examples/protocols/mqtt/clustered-queue-mqtt/src/main/java/org/apache/activemq/artemis/jms/example/ClusteredQueueMQTTExample.java create mode 100644 examples/protocols/mqtt/clustered-queue-mqtt/src/main/resources/activemq/server0/broker.xml create mode 100644 examples/protocols/mqtt/clustered-queue-mqtt/src/main/resources/activemq/server1/broker.xml create mode 100644 tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MqttClusterWildcardTest.java diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/Bindings.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/Bindings.java index 1d335ad894..f3592c4a79 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/Bindings.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/Bindings.java @@ -34,6 +34,8 @@ public interface Bindings extends UnproposalListener { void setMessageLoadBalancingType(MessageLoadBalancingType messageLoadBalancingType); + MessageLoadBalancingType getMessageLoadBalancingType(); + boolean redistribute(Message message, Queue originatingQueue, RoutingContext context) throws Exception; void route(Message message, RoutingContext context) throws Exception; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java index c3d2f0f7f1..2e2b31cfdf 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java @@ -77,6 +77,11 @@ public final class BindingsImpl implements Bindings { this.messageLoadBalancingType = messageLoadBalancingType; } + @Override + public MessageLoadBalancingType getMessageLoadBalancingType() { + return this.messageLoadBalancingType; + } + @Override public Collection getBindings() { return bindingsMap.values(); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/WildcardAddressManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/WildcardAddressManager.java index b1d15c3599..8ff1a38b89 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/WildcardAddressManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/WildcardAddressManager.java @@ -16,11 +16,6 @@ */ package org.apache.activemq.artemis.core.postoffice.impl; -import java.util.Collection; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; - import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.config.WildcardConfiguration; import org.apache.activemq.artemis.core.persistence.StorageManager; @@ -30,6 +25,11 @@ import org.apache.activemq.artemis.core.postoffice.Bindings; import org.apache.activemq.artemis.core.postoffice.BindingsFactory; import org.apache.activemq.artemis.core.transaction.Transaction; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + /** * extends the simple manager to allow wildcard addresses to be used. */ @@ -67,6 +67,7 @@ public class WildcardAddressManager extends SimpleAddressManager { for (Binding theBinding : theBindings) { super.addMappingInternal(address, theBinding); } + super.getBindingsForRoutingAddress(address).setMessageLoadBalancingType(b.getMessageLoadBalancingType()); } } } diff --git a/examples/protocols/mqtt/clustered-queue-mqtt/pom.xml b/examples/protocols/mqtt/clustered-queue-mqtt/pom.xml new file mode 100644 index 0000000000..6deacc0cda --- /dev/null +++ b/examples/protocols/mqtt/clustered-queue-mqtt/pom.xml @@ -0,0 +1,161 @@ + + + + + 4.0.0 + + + org.apache.activemq.examples.clustered + broker-clustered + 2.5.0-SNAPSHOT + + + clustered-queue-mqtt + jar + ActiveMQ Artemis JMS Clustered Queue Example + + + ${project.basedir}/../../../.. + + + + + org.apache.activemq + artemis-jms-client + ${project.version} + + + org.fusesource.mqtt-client + mqtt-client + + + + + + + org.apache.activemq + artemis-maven-plugin + + + create0 + + create + + + ${noServer} + ${basedir}/target/server0 + ${basedir}/target/classes/activemq/server0 + + -Djava.net.preferIPv4Stack=true + + + + create1 + + create + + + ${noServer} + ${basedir}/target/server1 + ${basedir}/target/classes/activemq/server1 + + -Djava.net.preferIPv4Stack=true + + + + start0 + + cli + + + ${noServer} + true + ${basedir}/target/server0 + tcp://localhost:61616 + + run + + server0 + + + + start1 + + cli + + + ${noServer} + true + ${basedir}/target/server1 + tcp://localhost:61617 + + run + + server1 + + + + runClient + + runClient + + + org.apache.activemq.artemis.jms.example.ClusteredQueueMQTTExample + + + + stop0 + + cli + + + ${noServer} + ${basedir}/target/server0 + + stop + + + + + stop1 + + cli + + + ${noServer} + ${basedir}/target/server1 + + stop + + + + + + + org.apache.activemq.examples.clustered + clustered-queue-mqtt + ${project.version} + + + + + + + diff --git a/examples/protocols/mqtt/clustered-queue-mqtt/readme.html b/examples/protocols/mqtt/clustered-queue-mqtt/readme.html new file mode 100644 index 0000000000..a39fc4c6b4 --- /dev/null +++ b/examples/protocols/mqtt/clustered-queue-mqtt/readme.html @@ -0,0 +1,55 @@ + + + + + ActiveMQ Artemis JMS Load Balanced Clustered Queue Example + + + + + +

JMS Load Balanced Clustered Queue Example

+ +
To run the example, simply type mvn verify from this directory, 
or mvn -PnoServer verify if you want to start and create the server manually.
+ +

This example demonstrates a JMS queue deployed on two different nodes. The two nodes are configured to form a cluster.

+

We then create a consumer on the queue on each node, and we create a producer on only one of the nodes.

+

We then send some messages via the producer, and we verify that both consumers receive the sent messages + in a round-robin fashion.

+

In other words, ActiveMQ Artemis load balances the sent messages across all consumers on the cluster

+

This example uses JNDI to lookup the JMS Queue and ConnectionFactory objects. If you prefer not to use + JNDI, these could be instantiated directly.

+

Here's the relevant snippet from the server configuration, which tells the server to form a cluster between the two nodes + and to load balance the messages between the nodes.

+
+     <cluster-connection name="my-cluster">
+        <connector-ref>netty-connector</connector-ref>
+        <retry-interval>500</retry-interval>
+        <use-duplicate-detection>true</use-duplicate-detection>
+        <message-load-balancing>STRICT</message-load-balancing>
+        <max-hops>1</max-hops>
+        <discovery-group-ref discovery-group-name="my-discovery-group"/>
+     </cluster-connection>
+     
+     
+

For more information on ActiveMQ Artemis load balancing, and clustering in general, please see the clustering + section of the user manual.

+ + diff --git a/examples/protocols/mqtt/clustered-queue-mqtt/src/main/java/org/apache/activemq/artemis/jms/example/ClusteredQueueMQTTExample.java b/examples/protocols/mqtt/clustered-queue-mqtt/src/main/java/org/apache/activemq/artemis/jms/example/ClusteredQueueMQTTExample.java new file mode 100644 index 0000000000..0f60ac31ad --- /dev/null +++ b/examples/protocols/mqtt/clustered-queue-mqtt/src/main/java/org/apache/activemq/artemis/jms/example/ClusteredQueueMQTTExample.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.jms.example; + +import java.util.concurrent.TimeUnit; +import org.fusesource.mqtt.client.BlockingConnection; +import org.fusesource.mqtt.client.MQTT; +import org.fusesource.mqtt.client.Message; +import org.fusesource.mqtt.client.QoS; +import org.fusesource.mqtt.client.Topic; + +/** + * A simple example that demonstrates server side load-balancing of messages between the queue instances on different + * nodes of the cluster. + */ +public class ClusteredQueueMQTTExample { + + public static void main(final String[] args) throws Exception { + // Create a new MQTT connection to the broker. We are not setting the client ID. The broker will pick one for us. + System.out.println("Connecting to Artemis using MQTT"); + BlockingConnection connection1 = retrieveMQTTConnection("tcp://localhost:1883"); + System.out.println("Connected to Artemis 1"); + BlockingConnection connection2 = retrieveMQTTConnection("tcp://localhost:1884"); + System.out.println("Connected to Artemis 2"); + + // Subscribe to topics + Topic[] topics = {new Topic("test/+/some/#", QoS.AT_MOST_ONCE)}; + connection1.subscribe(topics); + connection2.subscribe(topics); + System.out.println("Subscribed to topics."); + + // Publish Messages + String payload1 = "This is message 1"; + String payload2 = "This is message 2"; + String payload3 = "This is message 3"; + + connection1.publish("test/1/some/la", payload1.getBytes(), QoS.AT_LEAST_ONCE, false); + connection1.publish("test/1/some/la", payload2.getBytes(), QoS.AT_MOST_ONCE, false); + connection1.publish("test/1/some/la", payload3.getBytes(), QoS.AT_MOST_ONCE, false); + System.out.println("Sent messages."); + + Message message1 = connection1.receive(5, TimeUnit.SECONDS); + Message message2 = connection1.receive(5, TimeUnit.SECONDS); + Message message3 = connection1.receive(5, TimeUnit.SECONDS); + Message message4 = connection2.receive(5, TimeUnit.SECONDS); + Message message5 = connection2.receive(5, TimeUnit.SECONDS); + Message message6 = connection2.receive(5, TimeUnit.SECONDS); + System.out.println("Received messages."); + + System.out.println("Broker 1: " + new String(message1.getPayload())); + System.out.println("Broker 1: " + new String(message2.getPayload())); + System.out.println("Broker 1: " + new String(message3.getPayload())); + System.out.println("Broker 2: " + new String(message4.getPayload())); + System.out.println("Broker 2: " + new String(message5.getPayload())); + System.out.println("Broker 2: " + new String(message6.getPayload())); + } + + private static BlockingConnection retrieveMQTTConnection(String host) throws Exception { + MQTT mqtt = new MQTT(); + mqtt.setHost(host); + mqtt.setUserName("admin"); + mqtt.setPassword("admin"); + BlockingConnection connection = mqtt.blockingConnection(); + connection.connect(); + return connection; + } + +} diff --git a/examples/protocols/mqtt/clustered-queue-mqtt/src/main/resources/activemq/server0/broker.xml b/examples/protocols/mqtt/clustered-queue-mqtt/src/main/resources/activemq/server0/broker.xml new file mode 100644 index 0000000000..bc4365967d --- /dev/null +++ b/examples/protocols/mqtt/clustered-queue-mqtt/src/main/resources/activemq/server0/broker.xml @@ -0,0 +1,98 @@ + + + + + false + + ./data/bindings + + ./data/journal + + ./data/largemessages + + ./data/paging + + + + tcp://localhost:61616 + + + + + tcp://localhost:61616 + tcp://0.0.0.0:1883?protocols=MQTT + + + + + + ${udp-address:231.7.7.7} + 9876 + 100 + netty-connector + + + + + + ${udp-address:231.7.7.7} + 9876 + 10000 + + + + + + + netty-connector + 5 + true + STRICT + 1 + + + + + + + + + + + + + + + + + + + + + + + + true + / + # + + + + + diff --git a/examples/protocols/mqtt/clustered-queue-mqtt/src/main/resources/activemq/server1/broker.xml b/examples/protocols/mqtt/clustered-queue-mqtt/src/main/resources/activemq/server1/broker.xml new file mode 100644 index 0000000000..6b386daffb --- /dev/null +++ b/examples/protocols/mqtt/clustered-queue-mqtt/src/main/resources/activemq/server1/broker.xml @@ -0,0 +1,98 @@ + + + + + false + + target/server1/data/messaging/bindings + + target/server1/data/messaging/journal + + target/server1/data/messaging/largemessages + + target/server1/data/messaging/paging + + + + tcp://localhost:61617 + + + + + tcp://localhost:61617 + tcp://0.0.0.0:1884?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=MQTT;useEpoll=true + + + + + + ${udp-address:231.7.7.7} + 9876 + 100 + netty-connector + + + + + + ${udp-address:231.7.7.7} + 9876 + 10000 + + + + + + + netty-connector + 5 + true + STRICT + 1 + + + + + + + + + + + + + + + + + + + + + + + + true + / + # + + + + + diff --git a/examples/protocols/mqtt/pom.xml b/examples/protocols/mqtt/pom.xml index afdb5baa37..5df091caa2 100644 --- a/examples/protocols/mqtt/pom.xml +++ b/examples/protocols/mqtt/pom.xml @@ -41,12 +41,14 @@ under the License. release basic-pubsub + clustered-queue-mqtt examples basic-pubsub + clustered-queue-mqtt diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MqttClusterWildcardTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MqttClusterWildcardTest.java new file mode 100644 index 0000000000..5485f57e0b --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MqttClusterWildcardTest.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.mqtt.imported; + +import java.util.concurrent.TimeUnit; + +import org.apache.activemq.artemis.core.config.WildcardConfiguration; +import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType; +import org.apache.activemq.artemis.tests.integration.cluster.distribution.ClusterTestBase; +import org.fusesource.mqtt.client.BlockingConnection; +import org.fusesource.mqtt.client.MQTT; +import org.fusesource.mqtt.client.Message; +import org.fusesource.mqtt.client.QoS; +import org.fusesource.mqtt.client.Topic; +import org.junit.Test; + +public class MqttClusterWildcardTest extends ClusterTestBase { + + @Override + protected boolean isResolveProtocols() { + return true; + } + + public boolean isNetty() { + return true; + } + + @Test + public void loadBalanceRequests() throws Exception { + final String TOPIC = "test/+/some/#"; + + WildcardConfiguration wildcardConfiguration = new WildcardConfiguration(); + wildcardConfiguration.setAnyWords('#'); + wildcardConfiguration.setDelimiter('/'); + wildcardConfiguration.setRoutingEnabled(true); + wildcardConfiguration.setSingleWord('+'); + + setupServer(0, false, isNetty()); + servers[0].getConfiguration().setWildCardConfiguration(wildcardConfiguration); + setupServer(1, false, isNetty()); + servers[1].getConfiguration().setWildCardConfiguration(wildcardConfiguration); + + setupClusterConnection("cluster0", "", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 0, 1); + setupClusterConnection("cluster1", "", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 1, 0); + + startServers(0, 1); + + BlockingConnection connection1 = null; + BlockingConnection connection2 = null; + try { + connection1 = retrieveMQTTConnection("tcp://localhost:61616"); + connection2 = retrieveMQTTConnection("tcp://localhost:61617"); + + // Subscribe to topics + Topic[] topics = {new Topic(TOPIC, QoS.AT_MOST_ONCE)}; + connection1.subscribe(topics); + connection2.subscribe(topics); + + waitForBindings(0, TOPIC, 1, 1, true); + waitForBindings(1, TOPIC, 1, 1, true); + + waitForBindings(0, TOPIC, 1, 1, false); + waitForBindings(1, TOPIC, 1, 1, false); + + // Publish Messages + String payload1 = "This is message 1"; + String payload2 = "This is message 2"; + String payload3 = "This is message 3"; + + connection1.publish("test/1/some/la", payload1.getBytes(), QoS.AT_LEAST_ONCE, false); + connection1.publish("test/1/some/la", payload2.getBytes(), QoS.AT_MOST_ONCE, false); + connection1.publish("test/1/some/la", payload3.getBytes(), QoS.AT_MOST_ONCE, false); + + Message message1 = connection1.receive(5, TimeUnit.SECONDS); + Message message2 = connection1.receive(5, TimeUnit.SECONDS); + Message message3 = connection1.receive(5, TimeUnit.SECONDS); + Message message4 = connection2.receive(5, TimeUnit.SECONDS); + Message message5 = connection2.receive(5, TimeUnit.SECONDS); + Message message6 = connection2.receive(5, TimeUnit.SECONDS); + + assertEquals(payload1, new String(message1.getPayload())); + assertEquals(payload2, new String(message2.getPayload())); + assertEquals(payload3, new String(message3.getPayload())); + assertEquals(payload1, new String(message4.getPayload())); + assertEquals(payload2, new String(message5.getPayload())); + assertEquals(payload3, new String(message6.getPayload())); + + } finally { + String[] topics = new String[]{TOPIC}; + if (connection1 != null) { + connection1.unsubscribe(topics); + connection1.disconnect(); + } + if (connection2 != null) { + connection2.unsubscribe(topics); + connection2.disconnect(); + } + } + } + + private static BlockingConnection retrieveMQTTConnection(String host) throws Exception { + MQTT mqtt = new MQTT(); + mqtt.setHost(host); + BlockingConnection connection = mqtt.blockingConnection(); + connection.connect(); + return connection; + } +} diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/WildcardAddressManagerUnitTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/WildcardAddressManagerUnitTest.java index d88d18feb3..f628fa095b 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/WildcardAddressManagerUnitTest.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/WildcardAddressManagerUnitTest.java @@ -229,6 +229,11 @@ public class WildcardAddressManagerUnitTest extends ActiveMQTestBase { } + @Override + public MessageLoadBalancingType getMessageLoadBalancingType() { + return null; + } + @Override public void unproposed(SimpleString groupID) { } From 08106453b5a9269ac384f59b26578feee2908282 Mon Sep 17 00:00:00 2001 From: Justin Bertram Date: Fri, 8 Dec 2017 11:31:27 -0600 Subject: [PATCH 2/2] NO-JIRA add expected MQTT threadgroup to test rule --- .../org/apache/activemq/artemis/utils/ThreadLeakCheckRule.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/ThreadLeakCheckRule.java b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/ThreadLeakCheckRule.java index 85af09d838..2cf59d2356 100644 --- a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/ThreadLeakCheckRule.java +++ b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/ThreadLeakCheckRule.java @@ -230,7 +230,7 @@ public class ThreadLeakCheckRule extends ExternalResource { } else if (threadName.contains("Abandoned connection cleanup thread")) { // MySQL Engine checks for abandoned connections return true; - } else if (threadName.contains("hawtdispatch")) { + } else if (threadName.contains("hawtdispatch") || (group != null && group.getName().contains("hawtdispatch"))) { // Static workers used by MQTT client. return true; } else {