diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTInactivityMonitor.java b/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTInactivityMonitor.java index 350c2ac77c..3e3987625a 100644 --- a/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTInactivityMonitor.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTInactivityMonitor.java @@ -109,6 +109,11 @@ public class MQTTInactivityMonitor extends TransportFilter { final void readCheck() { int currentCounter = next.getReceiveCounter(); int previousCounter = lastReceiveCounter.getAndSet(currentCounter); + + // for the PINGREQ/RESP frames, the currentCounter will be different from previousCounter, and that + // should be sufficient to indicate the connection is still alive. If there were random data, or something + // outside the scope of the spec, the wire format unrmarshalling would fail, so we don't need to handle + // PINGREQ/RESP explicitly here if (inReceive.get() || currentCounter != previousCounter) { if (LOG.isTraceEnabled()) { LOG.trace("A receive is in progress"); @@ -139,22 +144,7 @@ public class MQTTInactivityMonitor extends TransportFilter { commandReceived.set(true); inReceive.set(true); try { - if (command.getClass() == KeepAliveInfo.class) { - KeepAliveInfo info = (KeepAliveInfo) command; - if (info.isResponseRequired()) { - sendLock.lock(); - try { - info.setResponseRequired(false); - oneway(info); - } catch (IOException e) { - onException(e); - } finally { - sendLock.unlock(); - } - } - } else { - transportListener.onCommand(command); - } + transportListener.onCommand(command); } finally { inReceive.set(false); } diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java b/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java index 293add7eac..d84efbea6e 100644 --- a/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java @@ -84,6 +84,7 @@ class MQTTProtocolConverter { private static final IdGenerator CONNECTION_ID_GENERATOR = new IdGenerator(); private static final MQTTFrame PING_RESP_FRAME = new PINGRESP().encode(); + private static final double MQTT_KEEP_ALIVE_GRACE_PERIOD= 1.5; private final ConnectionId connectionId = new ConnectionId(CONNECTION_ID_GENERATOR.generateId()); private final SessionId sessionId = new SessionId(connectionId, -1); @@ -106,10 +107,12 @@ class MQTTProtocolConverter { private ConnectionInfo connectionInfo = new ConnectionInfo(); private CONNECT connect; private String clientId; + private long defaultKeepAlive; private final String QOS_PROPERTY_NAME = "QoSPropertyName"; public MQTTProtocolConverter(MQTTTransport mqttTransport, BrokerContext brokerContext) { this.mqttTransport = mqttTransport; + this.defaultKeepAlive = 0; } int generateCommandId() { @@ -142,6 +145,7 @@ class MQTTProtocolConverter { switch (frame.messageType()) { case PINGREQ.TYPE: { + LOG.debug("Received a ping from client: " + getClientId()); mqttTransport.sendToMQTT(PING_RESP_FRAME); LOG.debug("Sent Ping Response to " + getClientId()); break; @@ -538,19 +542,49 @@ class MQTTProtocolConverter { } } - void configureInactivityMonitor(short heartBeat) { + void configureInactivityMonitor(short keepAliveSeconds) { + MQTTInactivityMonitor monitor = getMQTTTransport().getInactivityMonitor(); + + // If the user specifically shuts off the InactivityMonitor with transport.useInactivityMonitor=false, + // then ignore configuring it because it won't exist + if (monitor == null) { + return; + } + + + long keepAliveMS = keepAliveSeconds * 1000; + + if (LOG.isDebugEnabled()) { + LOG.debug("MQTT Client " + getClientId() + " requests heart beat of " + keepAliveMS + " ms"); + } + try { - int heartBeatMS = heartBeat * 1000; - MQTTInactivityMonitor monitor = getMQTTTransport().getInactivityMonitor(); + + long keepAliveMSWithGracePeriod = (long) (keepAliveMS * MQTT_KEEP_ALIVE_GRACE_PERIOD); + + // if we have a default keep-alive value, and the client is trying to turn off keep-alive, + // we'll observe the server-side configured default value (note, no grace period) + if (keepAliveMSWithGracePeriod == 0 && defaultKeepAlive > 0) { + keepAliveMSWithGracePeriod = defaultKeepAlive; + } + monitor.setProtocolConverter(this); - monitor.setReadCheckTime(heartBeatMS); - monitor.setInitialDelayTime(heartBeatMS); + monitor.setReadCheckTime(keepAliveMSWithGracePeriod); + monitor.setInitialDelayTime(keepAliveMS); monitor.startMonitorThread(); + + if (LOG.isDebugEnabled()) { + LOG.debug("MQTT Client " + getClientId() + + " established heart beat of " + keepAliveMSWithGracePeriod + + " ms (" + keepAliveMS + "ms + " + (keepAliveMSWithGracePeriod - keepAliveMS) + + "ms grace period)"); + } } catch (Exception ex) { LOG.warn("Failed to start MQTT InactivityMonitor ", ex); } - LOG.debug(getClientId() + " MQTT Connection using heart beat of " + heartBeat + " secs"); + + } void handleException(Throwable exception, MQTTFrame command) { @@ -577,8 +611,9 @@ class MQTTProtocolConverter { if (connect != null && connect.clientId() != null) { clientId = connect.clientId().toString(); } - } else { - clientId = ""; + else { + clientId = ""; + } } return clientId; } @@ -635,4 +670,18 @@ class MQTTProtocolConverter { result = result.replace('/', '.'); return result; } + + public long getDefaultKeepAlive() { + return defaultKeepAlive; + } + + /** + * Set the default keep alive time (in milliseconds) that would be used if configured on server side + * and the client sends a keep-alive value of 0 (zero) on a CONNECT frame + * + * @param defaultKeepAlive + */ + public void setDefaultKeepAlive(long defaultKeepAlive) { + this.defaultKeepAlive = defaultKeepAlive; + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFilter.java b/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFilter.java index f56bfb3235..8b450f87dd 100644 --- a/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFilter.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFilter.java @@ -134,5 +134,13 @@ public class MQTTTransportFilter extends TransportFilter implements MQTTTranspor super.onException(e); } + public long getDefaultKeepAlive() { + return protocolConverter != null ? protocolConverter.getDefaultKeepAlive() : -1; + } + + public void setDefaultKeepAlive(long defaultHeartBeat) { + protocolConverter.setDefaultKeepAlive(defaultHeartBeat); + } + } diff --git a/activemq-core/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java b/activemq-core/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java index 4f6fc4ac79..87a87975eb 100644 --- a/activemq-core/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java @@ -19,10 +19,9 @@ package org.apache.activemq.transport.mqtt; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; -import java.util.Vector; +import java.util.LinkedList; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; @@ -34,6 +33,7 @@ import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.AutoFailTestSupport; import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.TransportConnector; import org.apache.activemq.command.ActiveMQMessage; import org.apache.activemq.transport.Transport; import org.apache.activemq.transport.TransportListener; @@ -58,7 +58,7 @@ import static org.junit.Assert.*; public class MQTTTest { protected static final Logger LOG = LoggerFactory.getLogger(MQTTTest.class); protected BrokerService brokerService; - protected Vector exceptions = new Vector(); + protected LinkedList exceptions = new LinkedList(); protected int numberOfMessages; AutoFailTestSupport autoFailTestSupport = new AutoFailTestSupport() {}; @@ -120,6 +120,8 @@ public class MQTTTest { latch.await(10, TimeUnit.SECONDS); assertEquals(0, latch.getCount()); + subscribeConnection.disconnect(); + publisherConnection.disconnect(); } @Test @@ -136,7 +138,7 @@ public class MQTTTest { connection.subscribe(topics); for (int i = 0; i < numberOfMessages; i++) { String payload = "Test Message: " + i; - connection.publish("foo2", payload.getBytes(), QoS.AT_MOST_ONCE, false); + connection.publish("foo", payload.getBytes(), QoS.AT_MOST_ONCE, false); Message message = connection.receive(5, TimeUnit.SECONDS); assertNotNull("Should get a message", message); assertEquals(payload, new String(message.getPayload())); @@ -294,25 +296,30 @@ public class MQTTTest { addMQTTConnector(brokerService); brokerService.start(); + TransportConnector mqttConnector = brokerService.getTransportConnectorByScheme("mqtt"); // manually need to create the client so we don't send keep alive (PINGREQ) frames to keep the conn // from timing out - final AtomicLong exceptionCount = new AtomicLong(0); - Transport clientTransport = createManualMQTTClient(exceptionCount); + Transport clientTransport = createManualMQTTClient(); clientTransport.start(); CONNECT connectFrame = new CONNECT().clientId(new UTF8Buffer("testClient")).keepAlive((short)2); clientTransport.oneway(connectFrame.encode()); + // wait for broker to register the MQTT connection + TimeUnit.SECONDS.sleep(1); + assertTrue(mqttConnector.getConnections().size() > 0); - + // wait for the inactivity monitor to remove the connection due to inactivity TimeUnit.SECONDS.sleep(10); - System.out.println("Done waiting"); - assertEquals("We have elapsed the keep alive, we should have disconnected", 1, exceptionCount.get()); + assertTrue(mqttConnector.getConnections().size() == 0); + assertTrue("Should have seen client transport exception", exceptions.size() > 0); + + clientTransport.stop(); } - private Transport createManualMQTTClient(final AtomicLong exceptionCount) throws IOException, URISyntaxException { + private Transport createManualMQTTClient() throws IOException, URISyntaxException { Transport clientTransport = new TcpTransport(new MQTTWireFormat(), SocketFactory.getDefault(), new URI("tcp://localhost:1883"), null); clientTransport.setTransportListener(new TransportListener() { @@ -322,8 +329,7 @@ public class MQTTTest { @Override public void onException(IOException error) { - System.out.println("Exception!!!" + error.getMessage()); - exceptionCount.incrementAndGet(); + exceptions.add(error); } @Override @@ -353,14 +359,70 @@ public class MQTTTest { connection.disconnect(); } + @Test + public void testTurnOffInactivityMonitor()throws Exception{ + addMQTTConnector(brokerService, "?transport.useInactivityMonitor=false"); + brokerService.start(); + MQTT mqtt = createMQTTConnection(); + mqtt.setKeepAlive((short)2); + final BlockingConnection connection = mqtt.blockingConnection(); + connection.connect(); + + TimeUnit.SECONDS.sleep(10); + + + assertTrue("KeepAlive didn't work properly", connection.isConnected()); + + connection.disconnect(); + } + + @Test + public void testPingOnMQTTNIO() throws Exception { + brokerService.addConnector("mqtt+nio://localhost:1883"); + brokerService.start(); + MQTT mqtt = createMQTTConnection(); + mqtt.setKeepAlive((short)2); + final BlockingConnection connection = mqtt.blockingConnection(); + connection.connect(); + + TimeUnit.SECONDS.sleep(10); + + assertTrue("KeepAlive didn't work properly", connection.isConnected()); + + connection.disconnect(); + } + + @Test + public void testDefaultKeepAliveWhenClientSpecifiesZero() throws Exception { + // default keep alive in milliseconds + brokerService.addConnector("mqtt://localhost:1883?transport.defaultKeepAlive=2000"); + brokerService.start(); + MQTT mqtt = createMQTTConnection(); + mqtt.setKeepAlive((short)0); + final BlockingConnection connection = mqtt.blockingConnection(); + connection.connect(); + + TimeUnit.SECONDS.sleep(10); + + assertFalse("KeepAlive didn't work properly", connection.isConnected()); + + } + protected void addMQTTConnector(BrokerService brokerService) throws Exception { brokerService.addConnector("mqtt://localhost:1883"); } + protected void addMQTTConnector(BrokerService brokerService, String config) throws Exception { + brokerService.addConnector("mqtt://localhost:1883" + config); + } + protected MQTT createMQTTConnection() throws Exception { MQTT mqtt = new MQTT(); mqtt.setHost("localhost", 1883); + // shut off connect retry + mqtt.setConnectAttemptsMax(0); + mqtt.setReconnectAttemptsMax(0); return mqtt; }