From 2484c67cb148a49376ce98b5d5327e010bff74e6 Mon Sep 17 00:00:00 2001 From: "Timothy A. Bish" Date: Tue, 18 Jun 2013 17:37:01 +0000 Subject: [PATCH] test case for: https://issues.apache.org/jira/browse/AMQ-4585 git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1494222 13f79535-47bb-0310-9956-ffa450edef68 --- .../activemq/transport/mqtt/MQTTTest.java | 70 +++++++++++++++++++ 1 file changed, 70 insertions(+) diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java index 9761d91708..b445a256b8 100644 --- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java +++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java @@ -16,6 +16,8 @@ */ package org.apache.activemq.transport.mqtt; +import java.util.concurrent.TimeUnit; + import org.apache.activemq.util.Wait; import org.fusesource.mqtt.client.BlockingConnection; import org.fusesource.mqtt.client.MQTT; @@ -72,6 +74,7 @@ public class MQTTTest extends AbstractMQTTTest { connection.disconnect(); } + @Test(timeout=300000) public void testSubscribeMultipleTopics() throws Exception { byte[] payload = new byte[1024 * 32]; for (int i = 0; i < payload.length; i++){ @@ -109,6 +112,65 @@ public class MQTTTest extends AbstractMQTTTest { assertEquals("Should have received " + topics.length + " messages", topics.length, received); } + @Test(timeout=300000) + public void testReceiveMessageSentWhileOffline() throws Exception { + addMQTTConnector(); + brokerService.start(); + final MQTTClientProvider publisher = getMQTTClientProvider(); + initializeConnection(publisher); + + MQTT mqtt = createMQTTConnection(); + mqtt.setClientId("MQTT-Client"); + mqtt.setCleanSession(false); + + { + final BlockingConnection subscriber = mqtt.blockingConnection(); + subscriber.connect(); + Topic[] topic = {new Topic("foo", QoS.EXACTLY_ONCE)}; + subscriber.subscribe(topic); + + for (int i = 0; i < numberOfMessages; i++) { + String payload = "Test Message: " + i; + publisher.publish("foo", payload.getBytes(), EXACTLY_ONCE); + } + + for (int i = 0; i < numberOfMessages / 2; i++) { + Message message = subscriber.receive(5, TimeUnit.SECONDS); + assertNotNull(message); + byte[] payload = message.getPayload(); + String messageContent = new String(payload); + if (i % 100 == 0) { + LOG.debug("Received message from topic: " + message.getTopic() + + " Message content: " + messageContent); + } + message.ack(); + } + + subscriber.disconnect(); + } + + publisher.disconnect(); + + final BlockingConnection subscriber = mqtt.blockingConnection(); + subscriber.connect(); + Topic[] topic = {new Topic("foo", QoS.EXACTLY_ONCE)}; + subscriber.subscribe(topic); + + for (int i = 0; i < numberOfMessages / 2; i++) { + Message message = subscriber.receive(5, TimeUnit.SECONDS); + assertNotNull(message); + byte[] payload = message.getPayload(); + String messageContent = new String(payload); + if (i % 100 == 0) { + LOG.debug("Received message from topic: " + message.getTopic() + + " Message content: " + messageContent); + } + message.ack(); + } + + subscriber.disconnect(); + } + @Test(timeout=30000) public void testDefaultKeepAliveWhenClientSpecifiesZero() throws Exception { // default keep alive in milliseconds @@ -166,10 +228,18 @@ public class MQTTTest extends AbstractMQTTTest { } protected MQTT createMQTTConnection() throws Exception { + return createMQTTConnection(null, false); + } + + protected MQTT createMQTTConnection(String clientId, boolean clean) throws Exception { MQTT mqtt = new MQTT(); mqtt.setConnectAttemptsMax(1); mqtt.setReconnectAttemptsMax(0); mqtt.setTracer(createTracer()); + if (clientId != null) { + mqtt.setClientId(clientId); + } + mqtt.setCleanSession(clean); mqtt.setHost("localhost", mqttConnector.getConnectUri().getPort()); // shut off connect retry return mqtt;