NO-JIRA Improving MqttClusterWildcardTest
This commit is contained in:
parent
50e170b7ec
commit
82d8992bce
|
@ -26,6 +26,7 @@ import org.fusesource.mqtt.client.MQTT;
|
||||||
import org.fusesource.mqtt.client.Message;
|
import org.fusesource.mqtt.client.Message;
|
||||||
import org.fusesource.mqtt.client.QoS;
|
import org.fusesource.mqtt.client.QoS;
|
||||||
import org.fusesource.mqtt.client.Topic;
|
import org.fusesource.mqtt.client.Topic;
|
||||||
|
import org.junit.Assert;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
public class MqttClusterWildcardTest extends ClusterTestBase {
|
public class MqttClusterWildcardTest extends ClusterTestBase {
|
||||||
|
@ -116,7 +117,6 @@ public class MqttClusterWildcardTest extends ClusterTestBase {
|
||||||
public void wildcardsWithBroker1Disconnected() throws Exception {
|
public void wildcardsWithBroker1Disconnected() throws Exception {
|
||||||
BlockingConnection connection1 = null;
|
BlockingConnection connection1 = null;
|
||||||
BlockingConnection connection2 = null;
|
BlockingConnection connection2 = null;
|
||||||
BlockingConnection connection3 = null;
|
|
||||||
final String TOPIC = "test/+/some/#";
|
final String TOPIC = "test/+/some/#";
|
||||||
try {
|
try {
|
||||||
|
|
||||||
|
@ -129,15 +129,12 @@ public class MqttClusterWildcardTest extends ClusterTestBase {
|
||||||
setupServer(0, false, isNetty());
|
setupServer(0, false, isNetty());
|
||||||
servers[0].getConfiguration().setWildCardConfiguration(wildcardConfiguration);
|
servers[0].getConfiguration().setWildCardConfiguration(wildcardConfiguration);
|
||||||
|
|
||||||
|
|
||||||
setupClusterConnection("cluster0", "", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 0, 1);
|
setupClusterConnection("cluster0", "", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 0, 1);
|
||||||
|
|
||||||
startServers(0);
|
startServers(0);
|
||||||
|
|
||||||
|
|
||||||
connection1 = retrieveMQTTConnection("tcp://localhost:61616");
|
connection1 = retrieveMQTTConnection("tcp://localhost:61616");
|
||||||
|
|
||||||
|
|
||||||
// Subscribe to topics
|
// Subscribe to topics
|
||||||
Topic[] topics = {new Topic(TOPIC, QoS.AT_MOST_ONCE)};
|
Topic[] topics = {new Topic(TOPIC, QoS.AT_MOST_ONCE)};
|
||||||
connection1.subscribe(topics);
|
connection1.subscribe(topics);
|
||||||
|
@ -163,25 +160,27 @@ public class MqttClusterWildcardTest extends ClusterTestBase {
|
||||||
startServers(1);
|
startServers(1);
|
||||||
|
|
||||||
connection2 = retrieveMQTTConnection("tcp://localhost:61617");
|
connection2 = retrieveMQTTConnection("tcp://localhost:61617");
|
||||||
connection3 = retrieveMQTTConnection("tcp://localhost:61617");
|
|
||||||
connection2.subscribe(topics);
|
connection2.subscribe(topics);
|
||||||
connection3.subscribe(new Topic[]{new Topic("teste/1/some/1", QoS.AT_MOST_ONCE)});
|
|
||||||
|
|
||||||
waitForBindings(1, TOPIC, 1, 1, false);
|
waitForBindings(1, TOPIC, 1, 1, false);
|
||||||
waitForBindings(1, TOPIC, 1, 1, true);
|
waitForBindings(1, TOPIC, 1, 1, true);
|
||||||
|
waitForBindings(0, TOPIC, 1, 1, false);
|
||||||
waitForBindings(0, TOPIC, 1, 1, true);
|
waitForBindings(0, TOPIC, 1, 1, true);
|
||||||
|
|
||||||
|
|
||||||
connection1.publish("test/1/some/la", payload1.getBytes(), QoS.AT_LEAST_ONCE, false);
|
connection1.publish("test/1/some/la", payload1.getBytes(), QoS.AT_LEAST_ONCE, false);
|
||||||
connection1.publish("test/1/some/la", payload2.getBytes(), QoS.AT_MOST_ONCE, false);
|
connection1.publish("test/1/some/la", payload2.getBytes(), QoS.AT_MOST_ONCE, false);
|
||||||
connection1.publish("test/1/some/la", payload3.getBytes(), QoS.AT_MOST_ONCE, false);
|
connection1.publish("test/1/some/la", payload3.getBytes(), QoS.AT_MOST_ONCE, false);
|
||||||
|
|
||||||
|
Message message2 = connection1.receive(10, TimeUnit.SECONDS);
|
||||||
Message message2 = connection1.receive(5, TimeUnit.SECONDS);
|
Assert.assertNotNull(message2);
|
||||||
Message message3 = connection1.receive(5, TimeUnit.SECONDS);
|
Message message3 = connection1.receive(10, TimeUnit.SECONDS);
|
||||||
Message message4 = connection2.receive(5, TimeUnit.SECONDS);
|
Assert.assertNotNull(message3);
|
||||||
Message message5 = connection2.receive(5, TimeUnit.SECONDS);
|
Message message4 = connection2.receive(10, TimeUnit.SECONDS);
|
||||||
Message message6 = connection2.receive(5, TimeUnit.SECONDS);
|
Assert.assertNotNull(message4);
|
||||||
|
Message message5 = connection2.receive(10, TimeUnit.SECONDS);
|
||||||
|
Assert.assertNotNull(message5);
|
||||||
|
Message message6 = connection2.receive(10, TimeUnit.SECONDS);
|
||||||
|
Assert.assertNotNull(message6);
|
||||||
|
|
||||||
assertEquals(payload1, new String(message1.getPayload()));
|
assertEquals(payload1, new String(message1.getPayload()));
|
||||||
assertEquals(payload2, new String(message2.getPayload()));
|
assertEquals(payload2, new String(message2.getPayload()));
|
||||||
|
@ -200,10 +199,6 @@ public class MqttClusterWildcardTest extends ClusterTestBase {
|
||||||
connection2.unsubscribe(topics);
|
connection2.unsubscribe(topics);
|
||||||
connection2.disconnect();
|
connection2.disconnect();
|
||||||
}
|
}
|
||||||
if (connection3 != null) {
|
|
||||||
connection3.unsubscribe(new String[]{"teste/1/some/1"});
|
|
||||||
connection3.disconnect();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue