diff --git a/activemq-core/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java b/activemq-core/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java index 6523c38d4b..4f6fc4ac79 100644 --- a/activemq-core/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java @@ -16,34 +16,43 @@ */ package org.apache.activemq.transport.mqtt; +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; import java.util.Vector; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; +import javax.net.SocketFactory; + import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.AutoFailTestSupport; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.command.ActiveMQMessage; +import org.apache.activemq.transport.Transport; +import org.apache.activemq.transport.TransportListener; +import org.apache.activemq.transport.tcp.TcpTransport; import org.apache.activemq.util.ByteSequence; +import org.fusesource.hawtbuf.UTF8Buffer; import org.fusesource.mqtt.client.BlockingConnection; import org.fusesource.mqtt.client.MQTT; import org.fusesource.mqtt.client.Message; import org.fusesource.mqtt.client.QoS; import org.fusesource.mqtt.client.Topic; +import org.fusesource.mqtt.codec.CONNECT; import org.junit.After; import org.junit.Before; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import static org.fusesource.hawtbuf.UTF8Buffer.utf8; -import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.*; public class MQTTTest { @@ -280,6 +289,70 @@ public class MQTTTest { connection.disconnect(); } + @Test + public void testInactivityTimeoutDisconnectsClient() throws Exception{ + + addMQTTConnector(brokerService); + brokerService.start(); + + // manually need to create the client so we don't send keep alive (PINGREQ) frames to keep the conn + // from timing out + final AtomicLong exceptionCount = new AtomicLong(0); + Transport clientTransport = createManualMQTTClient(exceptionCount); + clientTransport.start(); + CONNECT connectFrame = new CONNECT().clientId(new UTF8Buffer("testClient")).keepAlive((short)2); + clientTransport.oneway(connectFrame.encode()); + + + + TimeUnit.SECONDS.sleep(10); + System.out.println("Done waiting"); + assertEquals("We have elapsed the keep alive, we should have disconnected", 1, exceptionCount.get()); + + } + + + private Transport createManualMQTTClient(final AtomicLong exceptionCount) throws IOException, URISyntaxException { + Transport clientTransport = new TcpTransport(new MQTTWireFormat(), SocketFactory.getDefault(), + new URI("tcp://localhost:1883"), null); + clientTransport.setTransportListener(new TransportListener() { + @Override + public void onCommand(Object command) { + } + + @Override + public void onException(IOException error) { + System.out.println("Exception!!!" + error.getMessage()); + exceptionCount.incrementAndGet(); + } + + @Override + public void transportInterupted() { + } + + @Override + public void transportResumed() { + } + }); + return clientTransport; + } + + @Test + public void testPingKeepsInactivityMonitorAlive() throws Exception { + addMQTTConnector(brokerService); + brokerService.start(); + MQTT mqtt = createMQTTConnection(); + mqtt.setKeepAlive((short)2); + final BlockingConnection connection = mqtt.blockingConnection(); + connection.connect(); + + TimeUnit.SECONDS.sleep(10); + + assertTrue("KeepAlive didn't work properly", connection.isConnected()); + + connection.disconnect(); + } + protected void addMQTTConnector(BrokerService brokerService) throws Exception { brokerService.addConnector("mqtt://localhost:1883");