This commit is contained in:
Dejan Bosanac 2014-03-21 11:25:11 +01:00
parent 13b33da37b
commit 9c67f0df71
1 changed files with 26 additions and 37 deletions

View File

@ -694,8 +694,8 @@ public class MQTTTest extends AbstractMQTTTest {
// create overlapping subscriptions with different QoSs // create overlapping subscriptions with different QoSs
final String TOPIC = "TopicA/"; final String TOPIC = "TopicA/";
final String[] subs = { TOPIC, "+/"}; final Topic[] topics = {new Topic(TOPIC, QoS.AT_LEAST_ONCE)};
connection.subscribe(new Topic[]{new Topic(subs[0], QoS.AT_LEAST_ONCE), new Topic(subs[1], QoS.EXACTLY_ONCE)}); connection.subscribe(topics);
// publish non-retained message // publish non-retained message
connection.publish(TOPIC, TOPIC.getBytes(), QoS.EXACTLY_ONCE, false); connection.publish(TOPIC, TOPIC.getBytes(), QoS.EXACTLY_ONCE, false);
@ -703,45 +703,24 @@ public class MQTTTest extends AbstractMQTTTest {
Message msg = connection.receive(1000, TimeUnit.MILLISECONDS); Message msg = connection.receive(1000, TimeUnit.MILLISECONDS);
assertNotNull(msg); assertNotNull(msg);
assertEquals(TOPIC, new String(msg.getPayload())); assertEquals(TOPIC, new String(msg.getPayload()));
msg = connection.receive(1000, TimeUnit.MILLISECONDS);
assertNotNull(msg);
assertEquals(TOPIC, new String(msg.getPayload()));
// drop subs without acknowledging messages, then subscribe and receive again // drop subs without acknowledging messages, then subscribe and receive again
connection.unsubscribe(subs); connection.unsubscribe(new String[]{ TOPIC });
Thread.sleep(1000); Thread.sleep(1000);
connection.subscribe(new Topic[]{new Topic(subs[0], QoS.AT_LEAST_ONCE), new Topic(subs[1], QoS.EXACTLY_ONCE)}); connection.subscribe(topics);
Thread.sleep(1000); Thread.sleep(1000);
msg = connection.receive(5000, TimeUnit.MILLISECONDS); msg = connection.receive(30000, TimeUnit.MILLISECONDS);
assertNotNull(msg); assertNotNull(msg);
assertEquals(TOPIC, new String(msg.getPayload())); assertEquals(TOPIC, new String(msg.getPayload()));
final Message msg2 = connection.receive(5000, TimeUnit.MILLISECONDS);
assertNotNull(msg2);
assertEquals(TOPIC, new String(msg2.getPayload()));
// ack messages after receiving all of them
msg.ack(); msg.ack();
msg2.ack();
// make sure we received duplicate message ids // make sure we received duplicate message ids
List<Integer> dups = new ArrayList<Integer>(); assertEquals(2, publishList.size());
for (int i = 0; i < publishList.size() - 1; i++) { assertEquals(publishList.get(0).messageId(), publishList.get(1).messageId());
if (!dups.contains(i)) { assertTrue(publishList.get(0).dup() || publishList.get(1).dup());
boolean found = false;
for (int j = i + 1; j < publishList.size(); j++) {
if (publishList.get(i).messageId() == publishList.get(j).messageId()) {
// one of them is a duplicate
assertTrue(publishList.get(i).dup() || publishList.get(j).dup());
found = true;
dups.add(j);
break;
}
}
assertTrue("Dup Not found " + publishList.get(i), found);
}
}
connection.unsubscribe(subs); connection.unsubscribe(new String[] { TOPIC });
connection.disconnect(); connection.disconnect();
} }
@ -751,9 +730,14 @@ public class MQTTTest extends AbstractMQTTTest {
brokerService.start(); brokerService.start();
MQTT mqtt = createMQTTConnection("reconnect", false); MQTT mqtt = createMQTTConnection("reconnect", false);
BlockingConnection connection = mqtt.blockingConnection(); final BlockingConnection connection = mqtt.blockingConnection();
connection.connect(); connection.connect();
assertTrue(connection.isConnected()); Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return connection.isConnected();
}
});
final String TOPIC = "TopicA"; final String TOPIC = "TopicA";
final byte[] qos = connection.subscribe(new Topic[]{new Topic(TOPIC, QoS.EXACTLY_ONCE)}); final byte[] qos = connection.subscribe(new Topic[]{new Topic(TOPIC, QoS.EXACTLY_ONCE)});
@ -762,16 +746,21 @@ public class MQTTTest extends AbstractMQTTTest {
// kill transport // kill transport
connection.kill(); connection.kill();
connection = mqtt.blockingConnection(); final BlockingConnection newConnection = mqtt.blockingConnection();
connection.connect(); newConnection.connect();
assertTrue(connection.isConnected()); Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return newConnection.isConnected();
}
});
assertEquals(QoS.EXACTLY_ONCE.ordinal(), qos[0]); assertEquals(QoS.EXACTLY_ONCE.ordinal(), qos[0]);
Message msg = connection.receive(1000, TimeUnit.MILLISECONDS); Message msg = newConnection.receive(1000, TimeUnit.MILLISECONDS);
assertNotNull(msg); assertNotNull(msg);
assertEquals(TOPIC, new String(msg.getPayload())); assertEquals(TOPIC, new String(msg.getPayload()));
msg.ack(); msg.ack();
connection.disconnect(); newConnection.disconnect();
} }
@Test(timeout = 60 * 1000) @Test(timeout = 60 * 1000)