diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java index fc0497890e..bf81e98a4f 100644 --- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java +++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java @@ -16,6 +16,14 @@ */ package org.apache.activemq.transport.mqtt; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + import java.net.ProtocolException; import java.util.ArrayList; import java.util.Arrays; @@ -27,6 +35,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.regex.Pattern; + import javax.jms.BytesMessage; import javax.jms.Connection; import javax.jms.Destination; @@ -35,14 +44,6 @@ import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; -import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.broker.region.policy.LastImageSubscriptionRecoveryPolicy; import org.apache.activemq.broker.region.policy.PolicyEntry; @@ -1088,18 +1089,30 @@ public class MQTTTest extends MQTTTestSupport { final String clientId = "duplicateClient"; MQTT mqtt = createMQTTConnection(clientId, false); mqtt.setKeepAlive((short) 2); - BlockingConnection connection = mqtt.blockingConnection(); + final BlockingConnection connection = mqtt.blockingConnection(); connection.connect(); final String TOPICA = "TopicA"; connection.publish(TOPICA, TOPICA.getBytes(), QoS.EXACTLY_ONCE, true); MQTT mqtt1 = createMQTTConnection(clientId, false); mqtt1.setKeepAlive((short) 2); - BlockingConnection connection1 = mqtt1.blockingConnection(); + final BlockingConnection connection1 = mqtt1.blockingConnection(); connection1.connect(); - assertTrue("Duplicate client disconnected", connection1.isConnected()); - assertFalse("Old client still connected", connection.isConnected()); + assertTrue("Duplicate client disconnected", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return connection1.isConnected(); + } + })); + + assertTrue("Old client still connected", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return !connection.isConnected(); + } + })); + connection1.publish(TOPICA, TOPICA.getBytes(), QoS.EXACTLY_ONCE, true); connection1.disconnect(); @@ -1110,23 +1123,23 @@ public class MQTTTest extends MQTTTestSupport { mqtt = createMQTTConnection(clientId, false); mqtt.setKeepAlive((short) 2); - connection = mqtt.blockingConnection(); - connection.connect(); - connection.publish(TOPICA, TOPICA.getBytes(), QoS.EXACTLY_ONCE, true); + final BlockingConnection connection2 = mqtt.blockingConnection(); + connection2.connect(); + connection2.publish(TOPICA, TOPICA.getBytes(), QoS.EXACTLY_ONCE, true); mqtt1 = createMQTTConnection(clientId, false); mqtt1.setKeepAlive((short) 2); - connection1 = mqtt1.blockingConnection(); + final BlockingConnection connection3 = mqtt1.blockingConnection(); try { - connection1.connect(); + connection3.connect(); fail("Duplicate client connected"); } catch (Exception e) { // ignore } - assertTrue("Old client disconnected", connection.isConnected()); - connection.publish(TOPICA, TOPICA.getBytes(), QoS.EXACTLY_ONCE, true); - connection.disconnect(); + assertTrue("Old client disconnected", connection2.isConnected()); + connection2.publish(TOPICA, TOPICA.getBytes(), QoS.EXACTLY_ONCE, true); + connection2.disconnect(); } @Test(timeout = 30 * 10000)