diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/AbstractMQTTTest.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/AbstractMQTTTest.java index 8b0ad30347..f973a7c384 100644 --- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/AbstractMQTTTest.java +++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/AbstractMQTTTest.java @@ -16,20 +16,18 @@ */ package org.apache.activemq.transport.mqtt; -import static org.junit.Assert.assertArrayEquals; - import java.io.File; import java.io.IOException; import java.security.ProtectionDomain; import java.util.LinkedList; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; - import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.AutoFailTestSupport; @@ -40,6 +38,7 @@ import org.apache.activemq.util.ByteSequence; import org.junit.After; import org.junit.Before; import org.junit.Test; +import static org.junit.Assert.assertArrayEquals; public abstract class AbstractMQTTTest extends AutoFailTestSupport { protected TransportConnector mqttConnector; @@ -78,6 +77,50 @@ public abstract class AbstractMQTTTest extends AutoFailTestSupport { super.tearDown(); } + @Test(timeout=300000) + public void testWillNotSentOnClose() throws Exception { + addMQTTConnector(); + brokerService.start(); + final MQTTClientProvider subscriptionProvider = getMQTTClientProvider(); + initializeConnection(subscriptionProvider); + + String willTopic = "lastWillAndTestament"; + + subscriptionProvider.subscribe(willTopic,AT_MOST_ONCE); + + final AtomicInteger count = new AtomicInteger(); + + Thread thread = new Thread(new Runnable() { + @Override + public void run() { + for (int i = 0; i < 1; i++){ + try { + byte[] payload = subscriptionProvider.receive(10000); + assertNull("Should get a message", payload); + count.incrementAndGet(); + } catch (Exception e) { + e.printStackTrace(); + break; + } + + } + } + }); + thread.start(); + + final MQTTClientProvider publishProvider = getMQTTClientProvider(); + publishProvider.setWillTopic(willTopic); + publishProvider.setWillMessage("EverythingGoesToRob"); + initializeConnection(publishProvider); + + Thread.sleep(1000); + publishProvider.disconnect(); + + assertEquals(0, count.get()); + subscriptionProvider.disconnect(); + publishProvider.disconnect(); + } + @Test(timeout=300000) public void testSendAndReceiveMQTT() throws Exception { addMQTTConnector(); diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/FuseMQQTTClientProvider.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/FuseMQQTTClientProvider.java index 2d0e591771..9aed8eadbb 100644 --- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/FuseMQQTTClientProvider.java +++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/FuseMQQTTClientProvider.java @@ -72,4 +72,14 @@ class FuseMQQTTClientProvider implements MQTTClientProvider { public void setSslContext(SSLContext sslContext) { mqtt.setSslContext(sslContext); } + + @Override + public void setWillMessage(String string) { + mqtt.setWillMessage(string); + } + + @Override + public void setWillTopic(String topic) { + mqtt.setWillTopic(topic); + } } diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTClientProvider.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTClientProvider.java index 2532c629c2..e23e27cf6c 100644 --- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTClientProvider.java +++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTClientProvider.java @@ -23,5 +23,7 @@ public interface MQTTClientProvider { void subscribe(String topic,int qos) throws Exception; byte[] receive(int timeout) throws Exception; void setSslContext(javax.net.ssl.SSLContext sslContext); + void setWillMessage(String string); + void setWillTopic(String topic); } diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java index 36e7974ecf..1cba4b2257 100644 --- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java +++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java @@ -39,6 +39,7 @@ public class MQTTTest extends AbstractMQTTTest { addMQTTConnector(); brokerService.start(); MQTT mqtt = createMQTTConnection(); + mqtt.setClientId("foo"); mqtt.setKeepAlive((short)2); final BlockingConnection connection = mqtt.blockingConnection(); connection.connect(); @@ -59,6 +60,7 @@ public class MQTTTest extends AbstractMQTTTest { addMQTTConnector("transport.useInactivityMonitor=false"); brokerService.start(); MQTT mqtt = createMQTTConnection(); + mqtt.setClientId("foo3"); mqtt.setKeepAlive((short)2); final BlockingConnection connection = mqtt.blockingConnection(); connection.connect(); @@ -192,6 +194,7 @@ public class MQTTTest extends AbstractMQTTTest { addMQTTConnector("transport.defaultKeepAlive=2000"); brokerService.start(); MQTT mqtt = createMQTTConnection(); + mqtt.setClientId("foo"); mqtt.setKeepAlive((short)0); final BlockingConnection connection = mqtt.blockingConnection(); connection.connect(); @@ -232,10 +235,6 @@ public class MQTTTest extends AbstractMQTTTest { return "mqtt"; } - @Override - protected void addMQTTConnector() throws Exception { - addMQTTConnector(); - } @Override protected MQTTClientProvider getMQTTClientProvider() {