diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java index d8788d362c..31198bae4c 100644 --- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java +++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java @@ -693,41 +693,185 @@ public class MQTTTest extends AbstractMQTTTest { } }); - final BlockingConnection connection = mqtt.blockingConnection(); + BlockingConnection connection = mqtt.blockingConnection(); connection.connect(); - - // create overlapping subscriptions with different QoSs final String TOPIC = "TopicA/"; - final Topic[] topics = {new Topic(TOPIC, QoS.AT_LEAST_ONCE)}; - connection.subscribe(topics); + final String[] topics = new String[] {TOPIC, "TopicA/+"}; + connection.subscribe(new Topic[]{new Topic(topics[0], QoS.AT_LEAST_ONCE), new Topic(topics[1], QoS.EXACTLY_ONCE)}); // publish non-retained message connection.publish(TOPIC, TOPIC.getBytes(), QoS.EXACTLY_ONCE, false); - Message msg = connection.receive(1000, TimeUnit.MILLISECONDS); - assertNotNull(msg); - assertEquals(TOPIC, new String(msg.getPayload())); + Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return publishList.size() == 2; + } + }, 5000); + assertEquals(2, publishList.size()); - // drop subs without acknowledging messages, then subscribe and receive again - connection.unsubscribe(new String[]{ TOPIC }); - Thread.sleep(1000); - connection.subscribe(topics); - Thread.sleep(1000); + connection.disconnect(); - msg = connection.receive(30000, TimeUnit.MILLISECONDS); - assertNotNull(msg); - assertEquals(TOPIC, new String(msg.getPayload())); - msg.ack(); + connection = mqtt.blockingConnection(); + connection.connect(); + + Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return publishList.size() == 4; + } + }, 5000); + assertEquals(4, publishList.size()); // make sure we received duplicate message ids - assertEquals(2, publishList.size()); - assertEquals(publishList.get(0).messageId(), publishList.get(1).messageId()); - assertTrue(publishList.get(0).dup() || publishList.get(1).dup()); + assertTrue(publishList.get(0).messageId() == publishList.get(2).messageId() || + publishList.get(0).messageId() == publishList.get(3).messageId()); + assertTrue(publishList.get(1).messageId() == publishList.get(3).messageId() || + publishList.get(1).messageId() == publishList.get(2).messageId()); + assertTrue(publishList.get(2).dup() && publishList.get(3).dup()); + + connection.unsubscribe(topics); + connection.disconnect(); + } + + @Test(timeout = 90 * 1000) + public void testPacketIdGeneratorNonCleanSession() throws Exception { + addMQTTConnector("trace=true"); + brokerService.start(); + + final MQTT mqtt = createMQTTConnection("nonclean-packetid", false); + mqtt.setKeepAlive((short) 15); + + final Map publishMap = new ConcurrentHashMap(); + mqtt.setTracer(new Tracer() { + @Override + public void onReceive(MQTTFrame frame) { + LOG.info("Client received:\n" + frame); + if (frame.messageType() == PUBLISH.TYPE) { + PUBLISH publish = new PUBLISH(); + try { + publish.decode(frame); + LOG.info("PUBLISH " + publish); + } catch (ProtocolException e) { + fail("Error decoding publish " + e.getMessage()); + } + if (publishMap.get(publish.messageId()) != null) { + assertTrue(publish.dup()); + } + publishMap.put(publish.messageId(), publish); + } + } + + @Override + public void onSend(MQTTFrame frame) { + LOG.info("Client sent:\n" + frame); + } + }); + + BlockingConnection connection = mqtt.blockingConnection(); + connection.connect(); + final String TOPIC = "TopicA/"; + connection.subscribe(new Topic[]{new Topic(TOPIC, QoS.EXACTLY_ONCE)}); + + // publish non-retained messages + final int TOTAL_MESSAGES = 10; + for (int i = 0; i < TOTAL_MESSAGES; i++) { + connection.publish(TOPIC, TOPIC.getBytes(), QoS.EXACTLY_ONCE, false); + } + + // receive half the messages in this session + for (int i = 0; i < TOTAL_MESSAGES / 2; i++) { + final Message msg = connection.receive(1000, TimeUnit.MILLISECONDS); + assertNotNull(msg); + assertEquals(TOPIC, new String(msg.getPayload())); + msg.ack(); + } + + connection.disconnect(); + // resume session + connection = mqtt.blockingConnection(); + connection.connect(); + // receive rest of the messages + Message msg = null; + do { + msg = connection.receive(1000, TimeUnit.MILLISECONDS); + if (msg != null) { + assertEquals(TOPIC, new String(msg.getPayload())); + msg.ack(); + } + } while (msg != null); + + // make sure we received all message ids + for (short id = 1; id <= TOTAL_MESSAGES; id++) { + assertNotNull("No message for id " + id, publishMap.get(id)); + } connection.unsubscribe(new String[] { TOPIC }); connection.disconnect(); } + @Test(timeout = 90 * 1000) + public void testPacketIdGeneratorCleanSession() throws Exception { + addMQTTConnector("trace=true"); + brokerService.start(); + + final String[] cleanClientIds = new String[] { "", "clean-packetid", null}; + final Map publishMap = new ConcurrentHashMap(); + MQTT[] mqtts = new MQTT[cleanClientIds.length]; + for (int i = 0; i < cleanClientIds.length; i++) { + mqtts[i] = createMQTTConnection("", true); + mqtts[i].setKeepAlive((short) 15); + + mqtts[i].setTracer(new Tracer() { + @Override + public void onReceive(MQTTFrame frame) { + LOG.info("Client received:\n" + frame); + if (frame.messageType() == PUBLISH.TYPE) { + PUBLISH publish = new PUBLISH(); + try { + publish.decode(frame); + LOG.info("PUBLISH " + publish); + } catch (ProtocolException e) { + fail("Error decoding publish " + e.getMessage()); + } + if (publishMap.get(publish.messageId()) != null) { + assertTrue(publish.dup()); + } + publishMap.put(publish.messageId(), publish); + } + } + + @Override + public void onSend(MQTTFrame frame) { + LOG.info("Client sent:\n" + frame); + } + }); + } + + final Random random = new Random(); + for (short i = 0; i < 10; i++) { + BlockingConnection connection = mqtts[random.nextInt(cleanClientIds.length)].blockingConnection(); + connection.connect(); + final String TOPIC = "TopicA/"; + connection.subscribe(new Topic[]{new Topic(TOPIC, QoS.EXACTLY_ONCE)}); + + // publish non-retained message + connection.publish(TOPIC, TOPIC.getBytes(), QoS.EXACTLY_ONCE, false); + Message msg = connection.receive(1000, TimeUnit.MILLISECONDS); + assertNotNull(msg); + assertEquals(TOPIC, new String(msg.getPayload())); + msg.ack(); + + assertEquals(1, publishMap.size()); + final short id = (short) (i + 1); + assertNotNull("No message for id " + id, publishMap.get(id)); + publishMap.clear(); + + connection.disconnect(); + } + + } + @Test(timeout = 60 * 1000) public void testClientConnectionFailure() throws Exception { addMQTTConnector();