diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java index e11f6e9040..3c0701ef18 100644 --- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java +++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java @@ -379,7 +379,7 @@ public class MQTTTest extends AbstractMQTTTest { connection.publish(topic, (RETAINED + topic).getBytes(), QoS.AT_LEAST_ONCE, true); connection.subscribe(new Topic[] { new Topic(topic, QoS.AT_LEAST_ONCE) }); - Message msg = connection.receive(1000, TimeUnit.MILLISECONDS); + Message msg = connection.receive(5, TimeUnit.SECONDS); assertNotNull("No message for " + topic, msg); assertEquals(RETAINED + topic, new String(msg.getPayload())); msg.ack(); @@ -406,7 +406,7 @@ public class MQTTTest extends AbstractMQTTTest { assertNotEquals("Subscribe failed " + wildcard, (byte)0x80, qos[0]); // test retained messages - Message msg = connection.receive(5000, TimeUnit.MILLISECONDS); + Message msg = connection.receive(5, TimeUnit.SECONDS); do { assertNotNull("RETAINED null " + wildcard, msg); assertTrue("RETAINED prefix " + wildcard, new String(msg.getPayload()).startsWith(RETAINED)); @@ -459,7 +459,7 @@ public class MQTTTest extends AbstractMQTTTest { final BlockingConnection connection = mqtt.blockingConnection(); connection.connect(); connection.publish(topic, topic.getBytes(), QoS.EXACTLY_ONCE, true); - connection.subscribe(new Topic[] { new Topic(topic, QoS.valueOf(topic)) }); + connection.subscribe(new Topic[]{new Topic(topic, QoS.valueOf(topic))}); final Message msg = connection.receive(5000, TimeUnit.MILLISECONDS); assertNotNull(msg); @@ -472,7 +472,7 @@ public class MQTTTest extends AbstractMQTTTest { assertEquals(i, actualQoS[0]); msg.ack(); - connection.unsubscribe(new String[] { topic }); + connection.unsubscribe(new String[]{topic}); connection.disconnect(); } @@ -1341,10 +1341,9 @@ public class MQTTTest extends AbstractMQTTTest { BlockingConnection connectionSub = mqttSub.blockingConnection(); connectionSub.connect(); connectionSub.subscribe(topics); - connectionSub.unsubscribe(new String[] { "TopicA" }); connectionSub.disconnect(); - for (int i = 0; i < 10; i++) { + for (int i = 0; i < 5; i++) { String payload = "Message " + i; connectionPub.publish(topics[0].name().toString(), payload.getBytes(), QoS.EXACTLY_ONCE, false); } @@ -1353,14 +1352,28 @@ public class MQTTTest extends AbstractMQTTTest { connectionSub.connect(); int received = 0; - for (int i = 0; i < 10; ++i) { + for (int i = 0; i < 5; ++i) { Message message = connectionSub.receive(5, TimeUnit.SECONDS); - assertNotNull(message); + assertNotNull("Missing message " + i, message); LOG.info("Message is " + new String(message.getPayload())); received++; message.ack(); } - assertEquals(10, received); + assertEquals(5, received); + + // unsubscribe from topic + connectionSub.unsubscribe(new String[]{"TopicA"}); + + // send more messages + for (int i = 0; i < 5; i++) { + String payload = "Message " + i; + connectionPub.publish(topics[0].name().toString(), payload.getBytes(), QoS.EXACTLY_ONCE, false); + } + + // these should not be received + connectionSub = mqttSub.blockingConnection(); + connectionSub.connect(); + assertNull(connectionSub.receive(5, TimeUnit.SECONDS)); connectionSub.disconnect(); connectionPub.disconnect();