Fixed AMQ-5160, polished MQTT tests

This commit is contained in:
Dhiraj Bokde 2014-05-13 00:30:11 -07:00 committed by Dejan Bosanac
parent 88c6ee97e0
commit 0a39782bf5
1 changed files with 22 additions and 9 deletions

View File

@ -379,7 +379,7 @@ public class MQTTTest extends AbstractMQTTTest {
connection.publish(topic, (RETAINED + topic).getBytes(), QoS.AT_LEAST_ONCE, true); connection.publish(topic, (RETAINED + topic).getBytes(), QoS.AT_LEAST_ONCE, true);
connection.subscribe(new Topic[] { new Topic(topic, QoS.AT_LEAST_ONCE) }); connection.subscribe(new Topic[] { new Topic(topic, QoS.AT_LEAST_ONCE) });
Message msg = connection.receive(1000, TimeUnit.MILLISECONDS); Message msg = connection.receive(5, TimeUnit.SECONDS);
assertNotNull("No message for " + topic, msg); assertNotNull("No message for " + topic, msg);
assertEquals(RETAINED + topic, new String(msg.getPayload())); assertEquals(RETAINED + topic, new String(msg.getPayload()));
msg.ack(); msg.ack();
@ -406,7 +406,7 @@ public class MQTTTest extends AbstractMQTTTest {
assertNotEquals("Subscribe failed " + wildcard, (byte)0x80, qos[0]); assertNotEquals("Subscribe failed " + wildcard, (byte)0x80, qos[0]);
// test retained messages // test retained messages
Message msg = connection.receive(5000, TimeUnit.MILLISECONDS); Message msg = connection.receive(5, TimeUnit.SECONDS);
do { do {
assertNotNull("RETAINED null " + wildcard, msg); assertNotNull("RETAINED null " + wildcard, msg);
assertTrue("RETAINED prefix " + wildcard, new String(msg.getPayload()).startsWith(RETAINED)); assertTrue("RETAINED prefix " + wildcard, new String(msg.getPayload()).startsWith(RETAINED));
@ -459,7 +459,7 @@ public class MQTTTest extends AbstractMQTTTest {
final BlockingConnection connection = mqtt.blockingConnection(); final BlockingConnection connection = mqtt.blockingConnection();
connection.connect(); connection.connect();
connection.publish(topic, topic.getBytes(), QoS.EXACTLY_ONCE, true); connection.publish(topic, topic.getBytes(), QoS.EXACTLY_ONCE, true);
connection.subscribe(new Topic[] { new Topic(topic, QoS.valueOf(topic)) }); connection.subscribe(new Topic[]{new Topic(topic, QoS.valueOf(topic))});
final Message msg = connection.receive(5000, TimeUnit.MILLISECONDS); final Message msg = connection.receive(5000, TimeUnit.MILLISECONDS);
assertNotNull(msg); assertNotNull(msg);
@ -472,7 +472,7 @@ public class MQTTTest extends AbstractMQTTTest {
assertEquals(i, actualQoS[0]); assertEquals(i, actualQoS[0]);
msg.ack(); msg.ack();
connection.unsubscribe(new String[] { topic }); connection.unsubscribe(new String[]{topic});
connection.disconnect(); connection.disconnect();
} }
@ -1341,10 +1341,9 @@ public class MQTTTest extends AbstractMQTTTest {
BlockingConnection connectionSub = mqttSub.blockingConnection(); BlockingConnection connectionSub = mqttSub.blockingConnection();
connectionSub.connect(); connectionSub.connect();
connectionSub.subscribe(topics); connectionSub.subscribe(topics);
connectionSub.unsubscribe(new String[] { "TopicA" });
connectionSub.disconnect(); connectionSub.disconnect();
for (int i = 0; i < 10; i++) { for (int i = 0; i < 5; i++) {
String payload = "Message " + i; String payload = "Message " + i;
connectionPub.publish(topics[0].name().toString(), payload.getBytes(), QoS.EXACTLY_ONCE, false); connectionPub.publish(topics[0].name().toString(), payload.getBytes(), QoS.EXACTLY_ONCE, false);
} }
@ -1353,14 +1352,28 @@ public class MQTTTest extends AbstractMQTTTest {
connectionSub.connect(); connectionSub.connect();
int received = 0; int received = 0;
for (int i = 0; i < 10; ++i) { for (int i = 0; i < 5; ++i) {
Message message = connectionSub.receive(5, TimeUnit.SECONDS); Message message = connectionSub.receive(5, TimeUnit.SECONDS);
assertNotNull(message); assertNotNull("Missing message " + i, message);
LOG.info("Message is " + new String(message.getPayload())); LOG.info("Message is " + new String(message.getPayload()));
received++; received++;
message.ack(); message.ack();
} }
assertEquals(10, received); assertEquals(5, received);
// unsubscribe from topic
connectionSub.unsubscribe(new String[]{"TopicA"});
// send more messages
for (int i = 0; i < 5; i++) {
String payload = "Message " + i;
connectionPub.publish(topics[0].name().toString(), payload.getBytes(), QoS.EXACTLY_ONCE, false);
}
// these should not be received
connectionSub = mqttSub.blockingConnection();
connectionSub.connect();
assertNull(connectionSub.receive(5, TimeUnit.SECONDS));
connectionSub.disconnect(); connectionSub.disconnect();
connectionPub.disconnect(); connectionPub.disconnect();