diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MqttClusterWildcardTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MqttClusterWildcardTest.java index b9185b2f23..ab8d30c271 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MqttClusterWildcardTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MqttClusterWildcardTest.java @@ -26,6 +26,7 @@ import org.fusesource.mqtt.client.MQTT; import org.fusesource.mqtt.client.Message; import org.fusesource.mqtt.client.QoS; import org.fusesource.mqtt.client.Topic; +import org.junit.Assert; import org.junit.Test; public class MqttClusterWildcardTest extends ClusterTestBase { @@ -116,7 +117,6 @@ public class MqttClusterWildcardTest extends ClusterTestBase { public void wildcardsWithBroker1Disconnected() throws Exception { BlockingConnection connection1 = null; BlockingConnection connection2 = null; - BlockingConnection connection3 = null; final String TOPIC = "test/+/some/#"; try { @@ -129,15 +129,12 @@ public class MqttClusterWildcardTest extends ClusterTestBase { setupServer(0, false, isNetty()); servers[0].getConfiguration().setWildCardConfiguration(wildcardConfiguration); - setupClusterConnection("cluster0", "", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 0, 1); startServers(0); - connection1 = retrieveMQTTConnection("tcp://localhost:61616"); - // Subscribe to topics Topic[] topics = {new Topic(TOPIC, QoS.AT_MOST_ONCE)}; connection1.subscribe(topics); @@ -163,25 +160,27 @@ public class MqttClusterWildcardTest extends ClusterTestBase { startServers(1); connection2 = retrieveMQTTConnection("tcp://localhost:61617"); - connection3 = retrieveMQTTConnection("tcp://localhost:61617"); connection2.subscribe(topics); - connection3.subscribe(new Topic[]{new Topic("teste/1/some/1", QoS.AT_MOST_ONCE)}); waitForBindings(1, TOPIC, 1, 1, false); waitForBindings(1, TOPIC, 1, 1, true); + waitForBindings(0, TOPIC, 1, 1, false); waitForBindings(0, TOPIC, 1, 1, true); - connection1.publish("test/1/some/la", payload1.getBytes(), QoS.AT_LEAST_ONCE, false); connection1.publish("test/1/some/la", payload2.getBytes(), QoS.AT_MOST_ONCE, false); connection1.publish("test/1/some/la", payload3.getBytes(), QoS.AT_MOST_ONCE, false); - - Message message2 = connection1.receive(5, TimeUnit.SECONDS); - Message message3 = connection1.receive(5, TimeUnit.SECONDS); - Message message4 = connection2.receive(5, TimeUnit.SECONDS); - Message message5 = connection2.receive(5, TimeUnit.SECONDS); - Message message6 = connection2.receive(5, TimeUnit.SECONDS); + Message message2 = connection1.receive(10, TimeUnit.SECONDS); + Assert.assertNotNull(message2); + Message message3 = connection1.receive(10, TimeUnit.SECONDS); + Assert.assertNotNull(message3); + Message message4 = connection2.receive(10, TimeUnit.SECONDS); + Assert.assertNotNull(message4); + Message message5 = connection2.receive(10, TimeUnit.SECONDS); + Assert.assertNotNull(message5); + Message message6 = connection2.receive(10, TimeUnit.SECONDS); + Assert.assertNotNull(message6); assertEquals(payload1, new String(message1.getPayload())); assertEquals(payload2, new String(message2.getPayload())); @@ -200,10 +199,6 @@ public class MqttClusterWildcardTest extends ClusterTestBase { connection2.unsubscribe(topics); connection2.disconnect(); } - if (connection3 != null) { - connection3.unsubscribe(new String[]{"teste/1/some/1"}); - connection3.disconnect(); - } } }