This commit is contained in:
Justin Bertram 2017-12-12 09:33:25 -06:00
commit 3efcca4b55
12 changed files with 637 additions and 6 deletions

View File

@ -230,7 +230,7 @@ public class ThreadLeakCheckRule extends ExternalResource {
} else if (threadName.contains("Abandoned connection cleanup thread")) { } else if (threadName.contains("Abandoned connection cleanup thread")) {
// MySQL Engine checks for abandoned connections // MySQL Engine checks for abandoned connections
return true; return true;
} else if (threadName.contains("hawtdispatch")) { } else if (threadName.contains("hawtdispatch") || (group != null && group.getName().contains("hawtdispatch"))) {
// Static workers used by MQTT client. // Static workers used by MQTT client.
return true; return true;
} else { } else {

View File

@ -34,6 +34,8 @@ public interface Bindings extends UnproposalListener {
void setMessageLoadBalancingType(MessageLoadBalancingType messageLoadBalancingType); void setMessageLoadBalancingType(MessageLoadBalancingType messageLoadBalancingType);
MessageLoadBalancingType getMessageLoadBalancingType();
boolean redistribute(Message message, Queue originatingQueue, RoutingContext context) throws Exception; boolean redistribute(Message message, Queue originatingQueue, RoutingContext context) throws Exception;
void route(Message message, RoutingContext context) throws Exception; void route(Message message, RoutingContext context) throws Exception;

View File

@ -77,6 +77,11 @@ public final class BindingsImpl implements Bindings {
this.messageLoadBalancingType = messageLoadBalancingType; this.messageLoadBalancingType = messageLoadBalancingType;
} }
@Override
public MessageLoadBalancingType getMessageLoadBalancingType() {
return this.messageLoadBalancingType;
}
@Override @Override
public Collection<Binding> getBindings() { public Collection<Binding> getBindings() {
return bindingsMap.values(); return bindingsMap.values();

View File

@ -16,11 +16,6 @@
*/ */
package org.apache.activemq.artemis.core.postoffice.impl; package org.apache.activemq.artemis.core.postoffice.impl;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.WildcardConfiguration; import org.apache.activemq.artemis.core.config.WildcardConfiguration;
import org.apache.activemq.artemis.core.persistence.StorageManager; import org.apache.activemq.artemis.core.persistence.StorageManager;
@ -30,6 +25,11 @@ import org.apache.activemq.artemis.core.postoffice.Bindings;
import org.apache.activemq.artemis.core.postoffice.BindingsFactory; import org.apache.activemq.artemis.core.postoffice.BindingsFactory;
import org.apache.activemq.artemis.core.transaction.Transaction; import org.apache.activemq.artemis.core.transaction.Transaction;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/** /**
* extends the simple manager to allow wildcard addresses to be used. * extends the simple manager to allow wildcard addresses to be used.
*/ */
@ -67,6 +67,7 @@ public class WildcardAddressManager extends SimpleAddressManager {
for (Binding theBinding : theBindings) { for (Binding theBinding : theBindings) {
super.addMappingInternal(address, theBinding); super.addMappingInternal(address, theBinding);
} }
super.getBindingsForRoutingAddress(address).setMessageLoadBalancingType(b.getMessageLoadBalancingType());
} }
} }
} }

View File

@ -0,0 +1,161 @@
<?xml version='1.0'?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.activemq.examples.clustered</groupId>
<artifactId>broker-clustered</artifactId>
<version>2.5.0-SNAPSHOT</version>
</parent>
<artifactId>clustered-queue-mqtt</artifactId>
<packaging>jar</packaging>
<name>ActiveMQ Artemis JMS Clustered Queue Example</name>
<properties>
<activemq.basedir>${project.basedir}/../../../..</activemq.basedir>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-jms-client</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.fusesource.mqtt-client</groupId>
<artifactId>mqtt-client</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-maven-plugin</artifactId>
<executions>
<execution>
<id>create0</id>
<goals>
<goal>create</goal>
</goals>
<configuration>
<ignore>${noServer}</ignore>
<instance>${basedir}/target/server0</instance>
<configuration>${basedir}/target/classes/activemq/server0</configuration>
<!-- this makes it easier in certain envs -->
<javaOptions>-Djava.net.preferIPv4Stack=true</javaOptions>
</configuration>
</execution>
<execution>
<id>create1</id>
<goals>
<goal>create</goal>
</goals>
<configuration>
<ignore>${noServer}</ignore>
<instance>${basedir}/target/server1</instance>
<configuration>${basedir}/target/classes/activemq/server1</configuration>
<!-- this makes it easier in certain envs -->
<javaOptions>-Djava.net.preferIPv4Stack=true</javaOptions>
</configuration>
</execution>
<execution>
<id>start0</id>
<goals>
<goal>cli</goal>
</goals>
<configuration>
<ignore>${noServer}</ignore>
<spawn>true</spawn>
<location>${basedir}/target/server0</location>
<testURI>tcp://localhost:61616</testURI>
<args>
<param>run</param>
</args>
<name>server0</name>
</configuration>
</execution>
<execution>
<id>start1</id>
<goals>
<goal>cli</goal>
</goals>
<configuration>
<ignore>${noServer}</ignore>
<spawn>true</spawn>
<location>${basedir}/target/server1</location>
<testURI>tcp://localhost:61617</testURI>
<args>
<param>run</param>
</args>
<name>server1</name>
</configuration>
</execution>
<execution>
<id>runClient</id>
<goals>
<goal>runClient</goal>
</goals>
<configuration>
<clientClass>org.apache.activemq.artemis.jms.example.ClusteredQueueMQTTExample</clientClass>
</configuration>
</execution>
<execution>
<id>stop0</id>
<goals>
<goal>cli</goal>
</goals>
<configuration>
<ignore>${noServer}</ignore>
<location>${basedir}/target/server0</location>
<args>
<param>stop</param>
</args>
</configuration>
</execution>
<execution>
<id>stop1</id>
<goals>
<goal>cli</goal>
</goals>
<configuration>
<ignore>${noServer}</ignore>
<location>${basedir}/target/server1</location>
<args>
<param>stop</param>
</args>
</configuration>
</execution>
</executions>
<dependencies>
<dependency>
<groupId>org.apache.activemq.examples.clustered</groupId>
<artifactId>clustered-queue-mqtt</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</plugin>
</plugins>
</build>
</project>

View File

@ -0,0 +1,55 @@
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<html>
<head>
<title>ActiveMQ Artemis JMS Load Balanced Clustered Queue Example</title>
<link rel="stylesheet" type="text/css" href="../../../common/common.css" />
<link rel="stylesheet" type="text/css" href="../../../common/prettify.css" />
<script type="text/javascript" src="../../../common/prettify.js"></script>
</head>
<body onload="prettyPrint()">
<h1>JMS Load Balanced Clustered Queue Example</h1>
<pre>To run the example, simply type <b>mvn verify</b> from this directory, <br>or <b>mvn -PnoServer verify</b> if you want to start and create the server manually.</pre>
<p>This example demonstrates a JMS queue deployed on two different nodes. The two nodes are configured to form a cluster.</p>
<p>We then create a consumer on the queue on each node, and we create a producer on only one of the nodes.</p>
<p>We then send some messages via the producer, and we verify that <b>both</b> consumers receive the sent messages
in a round-robin fashion.</p>
<p>In other words, ActiveMQ Artemis <b>load balances</b> the sent messages across all consumers on the cluster</p>
<p>This example uses JNDI to lookup the JMS Queue and ConnectionFactory objects. If you prefer not to use
JNDI, these could be instantiated directly.</p>
<p>Here's the relevant snippet from the server configuration, which tells the server to form a cluster between the two nodes
and to load balance the messages between the nodes.</p>
<pre class="prettyprint">
<code>&lt;cluster-connection name="my-cluster"&gt;
&lt;connector-ref>netty-connector&lt;/connector-ref>
&lt;retry-interval&gt;500&lt;/retry-interval&gt;
&lt;use-duplicate-detection&gt;true&lt;/use-duplicate-detection&gt;
&lt;message-load-balancing&gt;STRICT&lt;/message-load-balancing&gt;
&lt;max-hops&gt;1&lt;/max-hops&gt;
&lt;discovery-group-ref discovery-group-name="my-discovery-group"/&gt;
&lt;/cluster-connection&gt;
</code>
</pre>
<p>For more information on ActiveMQ Artemis load balancing, and clustering in general, please see the clustering
section of the user manual.</p>
</body>
</html>

View File

@ -0,0 +1,82 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.jms.example;
import java.util.concurrent.TimeUnit;
import org.fusesource.mqtt.client.BlockingConnection;
import org.fusesource.mqtt.client.MQTT;
import org.fusesource.mqtt.client.Message;
import org.fusesource.mqtt.client.QoS;
import org.fusesource.mqtt.client.Topic;
/**
* A simple example that demonstrates server side load-balancing of messages between the queue instances on different
* nodes of the cluster.
*/
public class ClusteredQueueMQTTExample {
public static void main(final String[] args) throws Exception {
// Create a new MQTT connection to the broker. We are not setting the client ID. The broker will pick one for us.
System.out.println("Connecting to Artemis using MQTT");
BlockingConnection connection1 = retrieveMQTTConnection("tcp://localhost:1883");
System.out.println("Connected to Artemis 1");
BlockingConnection connection2 = retrieveMQTTConnection("tcp://localhost:1884");
System.out.println("Connected to Artemis 2");
// Subscribe to topics
Topic[] topics = {new Topic("test/+/some/#", QoS.AT_MOST_ONCE)};
connection1.subscribe(topics);
connection2.subscribe(topics);
System.out.println("Subscribed to topics.");
// Publish Messages
String payload1 = "This is message 1";
String payload2 = "This is message 2";
String payload3 = "This is message 3";
connection1.publish("test/1/some/la", payload1.getBytes(), QoS.AT_LEAST_ONCE, false);
connection1.publish("test/1/some/la", payload2.getBytes(), QoS.AT_MOST_ONCE, false);
connection1.publish("test/1/some/la", payload3.getBytes(), QoS.AT_MOST_ONCE, false);
System.out.println("Sent messages.");
Message message1 = connection1.receive(5, TimeUnit.SECONDS);
Message message2 = connection1.receive(5, TimeUnit.SECONDS);
Message message3 = connection1.receive(5, TimeUnit.SECONDS);
Message message4 = connection2.receive(5, TimeUnit.SECONDS);
Message message5 = connection2.receive(5, TimeUnit.SECONDS);
Message message6 = connection2.receive(5, TimeUnit.SECONDS);
System.out.println("Received messages.");
System.out.println("Broker 1: " + new String(message1.getPayload()));
System.out.println("Broker 1: " + new String(message2.getPayload()));
System.out.println("Broker 1: " + new String(message3.getPayload()));
System.out.println("Broker 2: " + new String(message4.getPayload()));
System.out.println("Broker 2: " + new String(message5.getPayload()));
System.out.println("Broker 2: " + new String(message6.getPayload()));
}
private static BlockingConnection retrieveMQTTConnection(String host) throws Exception {
MQTT mqtt = new MQTT();
mqtt.setHost(host);
mqtt.setUserName("admin");
mqtt.setPassword("admin");
BlockingConnection connection = mqtt.blockingConnection();
connection.connect();
return connection;
}
}

View File

@ -0,0 +1,98 @@
<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<configuration xmlns="urn:activemq" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="urn:activemq /schema/artemis-server.xsd">
<core xmlns="urn:activemq:core">
<security-enabled>false</security-enabled>
<bindings-directory>./data/bindings</bindings-directory>
<journal-directory>./data/journal</journal-directory>
<large-messages-directory>./data/largemessages</large-messages-directory>
<paging-directory>./data/paging</paging-directory>
<!-- Connectors -->
<connectors>
<connector name="netty-connector">tcp://localhost:61616</connector>
</connectors>
<!-- Acceptors -->
<acceptors>
<acceptor name="netty-acceptor">tcp://localhost:61616</acceptor>
<acceptor name="mqtt">tcp://0.0.0.0:1883?protocols=MQTT</acceptor>
</acceptors>
<!-- Clustering configuration -->
<broadcast-groups>
<broadcast-group name="my-broadcast-group">
<group-address>${udp-address:231.7.7.7}</group-address>
<group-port>9876</group-port>
<broadcast-period>100</broadcast-period>
<connector-ref>netty-connector</connector-ref>
</broadcast-group>
</broadcast-groups>
<discovery-groups>
<discovery-group name="my-discovery-group">
<group-address>${udp-address:231.7.7.7}</group-address>
<group-port>9876</group-port>
<refresh-timeout>10000</refresh-timeout>
</discovery-group>
</discovery-groups>
<cluster-connections>
<cluster-connection name="my-cluster">
<!-- <address>test/+/some/#</address> -->
<connector-ref>netty-connector</connector-ref>
<retry-interval>5</retry-interval>
<use-duplicate-detection>true</use-duplicate-detection>
<message-load-balancing>STRICT</message-load-balancing>
<max-hops>1</max-hops>
<discovery-group-ref discovery-group-name="my-discovery-group"/>
</cluster-connection>
</cluster-connections>
<!-- Other config -->
<security-settings>
<security-setting match="#">
<permission type="createNonDurableQueue" roles="amq"/>
<permission type="deleteNonDurableQueue" roles="amq"/>
<permission type="createDurableQueue" roles="amq"/>
<permission type="deleteDurableQueue" roles="amq"/>
<permission type="createAddress" roles="amq"/>
<permission type="deleteAddress" roles="amq"/>
<permission type="consume" roles="amq"/>
<permission type="browse" roles="amq"/>
<permission type="send" roles="amq"/>
<!-- we need this otherwise ./artemis data imp wouldn't work -->
<permission type="manage" roles="amq"/>
</security-setting>
</security-settings>
<wildcard-addresses>
<routing-enabled>true</routing-enabled>
<delimiter>/</delimiter>
<any-words>#</any-words>
<single-word>+</single-word>
</wildcard-addresses>
</core>
</configuration>

View File

@ -0,0 +1,98 @@
<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<configuration xmlns="urn:activemq" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="urn:activemq /schema/artemis-server.xsd">
<core xmlns="urn:activemq:core">
<security-enabled>false</security-enabled>
<bindings-directory>target/server1/data/messaging/bindings</bindings-directory>
<journal-directory>target/server1/data/messaging/journal</journal-directory>
<large-messages-directory>target/server1/data/messaging/largemessages</large-messages-directory>
<paging-directory>target/server1/data/messaging/paging</paging-directory>
<!-- Connectors -->
<connectors>
<connector name="netty-connector">tcp://localhost:61617</connector>
</connectors>
<!-- Acceptors -->
<acceptors>
<acceptor name="netty-acceptor">tcp://localhost:61617</acceptor>
<acceptor name="mqtt">tcp://0.0.0.0:1884?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=MQTT;useEpoll=true</acceptor>
</acceptors>
<!-- Clustering configuration -->
<broadcast-groups>
<broadcast-group name="my-broadcast-group">
<group-address>${udp-address:231.7.7.7}</group-address>
<group-port>9876</group-port>
<broadcast-period>100</broadcast-period>
<connector-ref>netty-connector</connector-ref>
</broadcast-group>
</broadcast-groups>
<discovery-groups>
<discovery-group name="my-discovery-group">
<group-address>${udp-address:231.7.7.7}</group-address>
<group-port>9876</group-port>
<refresh-timeout>10000</refresh-timeout>
</discovery-group>
</discovery-groups>
<cluster-connections>
<cluster-connection name="my-cluster">
<!-- <address>test/+/some/#</address> -->
<connector-ref>netty-connector</connector-ref>
<retry-interval>5</retry-interval>
<use-duplicate-detection>true</use-duplicate-detection>
<message-load-balancing>STRICT</message-load-balancing>
<max-hops>1</max-hops>
<discovery-group-ref discovery-group-name="my-discovery-group"/>
</cluster-connection>
</cluster-connections>
<!-- Other config -->
<security-settings>
<security-setting match="#">
<permission type="createNonDurableQueue" roles="amq"/>
<permission type="deleteNonDurableQueue" roles="amq"/>
<permission type="createDurableQueue" roles="amq"/>
<permission type="deleteDurableQueue" roles="amq"/>
<permission type="createAddress" roles="amq"/>
<permission type="deleteAddress" roles="amq"/>
<permission type="consume" roles="amq"/>
<permission type="browse" roles="amq"/>
<permission type="send" roles="amq"/>
<!-- we need this otherwise ./artemis data imp wouldn't work -->
<permission type="manage" roles="amq"/>
</security-setting>
</security-settings>
<wildcard-addresses>
<routing-enabled>true</routing-enabled>
<delimiter>/</delimiter>
<any-words>#</any-words>
<single-word>+</single-word>
</wildcard-addresses>
</core>
</configuration>

View File

@ -41,12 +41,14 @@ under the License.
<id>release</id> <id>release</id>
<modules> <modules>
<module>basic-pubsub</module> <module>basic-pubsub</module>
<module>clustered-queue-mqtt</module>
</modules> </modules>
</profile> </profile>
<profile> <profile>
<id>examples</id> <id>examples</id>
<modules> <modules>
<module>basic-pubsub</module> <module>basic-pubsub</module>
<module>clustered-queue-mqtt</module>
</modules> </modules>
</profile> </profile>
</profiles> </profiles>

View File

@ -0,0 +1,122 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.mqtt.imported;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.core.config.WildcardConfiguration;
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
import org.apache.activemq.artemis.tests.integration.cluster.distribution.ClusterTestBase;
import org.fusesource.mqtt.client.BlockingConnection;
import org.fusesource.mqtt.client.MQTT;
import org.fusesource.mqtt.client.Message;
import org.fusesource.mqtt.client.QoS;
import org.fusesource.mqtt.client.Topic;
import org.junit.Test;
public class MqttClusterWildcardTest extends ClusterTestBase {
@Override
protected boolean isResolveProtocols() {
return true;
}
public boolean isNetty() {
return true;
}
@Test
public void loadBalanceRequests() throws Exception {
final String TOPIC = "test/+/some/#";
WildcardConfiguration wildcardConfiguration = new WildcardConfiguration();
wildcardConfiguration.setAnyWords('#');
wildcardConfiguration.setDelimiter('/');
wildcardConfiguration.setRoutingEnabled(true);
wildcardConfiguration.setSingleWord('+');
setupServer(0, false, isNetty());
servers[0].getConfiguration().setWildCardConfiguration(wildcardConfiguration);
setupServer(1, false, isNetty());
servers[1].getConfiguration().setWildCardConfiguration(wildcardConfiguration);
setupClusterConnection("cluster0", "", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 0, 1);
setupClusterConnection("cluster1", "", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 1, 0);
startServers(0, 1);
BlockingConnection connection1 = null;
BlockingConnection connection2 = null;
try {
connection1 = retrieveMQTTConnection("tcp://localhost:61616");
connection2 = retrieveMQTTConnection("tcp://localhost:61617");
// Subscribe to topics
Topic[] topics = {new Topic(TOPIC, QoS.AT_MOST_ONCE)};
connection1.subscribe(topics);
connection2.subscribe(topics);
waitForBindings(0, TOPIC, 1, 1, true);
waitForBindings(1, TOPIC, 1, 1, true);
waitForBindings(0, TOPIC, 1, 1, false);
waitForBindings(1, TOPIC, 1, 1, false);
// Publish Messages
String payload1 = "This is message 1";
String payload2 = "This is message 2";
String payload3 = "This is message 3";
connection1.publish("test/1/some/la", payload1.getBytes(), QoS.AT_LEAST_ONCE, false);
connection1.publish("test/1/some/la", payload2.getBytes(), QoS.AT_MOST_ONCE, false);
connection1.publish("test/1/some/la", payload3.getBytes(), QoS.AT_MOST_ONCE, false);
Message message1 = connection1.receive(5, TimeUnit.SECONDS);
Message message2 = connection1.receive(5, TimeUnit.SECONDS);
Message message3 = connection1.receive(5, TimeUnit.SECONDS);
Message message4 = connection2.receive(5, TimeUnit.SECONDS);
Message message5 = connection2.receive(5, TimeUnit.SECONDS);
Message message6 = connection2.receive(5, TimeUnit.SECONDS);
assertEquals(payload1, new String(message1.getPayload()));
assertEquals(payload2, new String(message2.getPayload()));
assertEquals(payload3, new String(message3.getPayload()));
assertEquals(payload1, new String(message4.getPayload()));
assertEquals(payload2, new String(message5.getPayload()));
assertEquals(payload3, new String(message6.getPayload()));
} finally {
String[] topics = new String[]{TOPIC};
if (connection1 != null) {
connection1.unsubscribe(topics);
connection1.disconnect();
}
if (connection2 != null) {
connection2.unsubscribe(topics);
connection2.disconnect();
}
}
}
private static BlockingConnection retrieveMQTTConnection(String host) throws Exception {
MQTT mqtt = new MQTT();
mqtt.setHost(host);
BlockingConnection connection = mqtt.blockingConnection();
connection.connect();
return connection;
}
}

View File

@ -229,6 +229,11 @@ public class WildcardAddressManagerUnitTest extends ActiveMQTestBase {
} }
@Override
public MessageLoadBalancingType getMessageLoadBalancingType() {
return null;
}
@Override @Override
public void unproposed(SimpleString groupID) { public void unproposed(SimpleString groupID) {
} }