AMQ-4123 Improve MQTT Inactivity Monitoring

* Fixed broken MQTTTests
* Added more tests around Inactivity Monitoring / MQTT keep alive
* Removed the KeepAliveInfo check from the MQTTInactivityMonitor, clarified the PINGREQ/RESP frames in the inactivity check
* Implemented a grace period for the keep alive on the server side per MQTT spec
* Fixed clientId assignment
* Added "default" keep alive for server-side control of lingering MQTT connections

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1401509 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Christian Posta 2012-10-23 22:52:06 +00:00
parent 692411e989
commit d212d3c02a
4 changed files with 145 additions and 36 deletions

View File

@ -109,6 +109,11 @@ public class MQTTInactivityMonitor extends TransportFilter {
final void readCheck() {
int currentCounter = next.getReceiveCounter();
int previousCounter = lastReceiveCounter.getAndSet(currentCounter);
// for the PINGREQ/RESP frames, the currentCounter will be different from previousCounter, and that
// should be sufficient to indicate the connection is still alive. If there were random data, or something
// outside the scope of the spec, the wire format unrmarshalling would fail, so we don't need to handle
// PINGREQ/RESP explicitly here
if (inReceive.get() || currentCounter != previousCounter) {
if (LOG.isTraceEnabled()) {
LOG.trace("A receive is in progress");
@ -139,22 +144,7 @@ public class MQTTInactivityMonitor extends TransportFilter {
commandReceived.set(true);
inReceive.set(true);
try {
if (command.getClass() == KeepAliveInfo.class) {
KeepAliveInfo info = (KeepAliveInfo) command;
if (info.isResponseRequired()) {
sendLock.lock();
try {
info.setResponseRequired(false);
oneway(info);
} catch (IOException e) {
onException(e);
} finally {
sendLock.unlock();
}
}
} else {
transportListener.onCommand(command);
}
} finally {
inReceive.set(false);
}

View File

@ -84,6 +84,7 @@ class MQTTProtocolConverter {
private static final IdGenerator CONNECTION_ID_GENERATOR = new IdGenerator();
private static final MQTTFrame PING_RESP_FRAME = new PINGRESP().encode();
private static final double MQTT_KEEP_ALIVE_GRACE_PERIOD= 1.5;
private final ConnectionId connectionId = new ConnectionId(CONNECTION_ID_GENERATOR.generateId());
private final SessionId sessionId = new SessionId(connectionId, -1);
@ -106,10 +107,12 @@ class MQTTProtocolConverter {
private ConnectionInfo connectionInfo = new ConnectionInfo();
private CONNECT connect;
private String clientId;
private long defaultKeepAlive;
private final String QOS_PROPERTY_NAME = "QoSPropertyName";
public MQTTProtocolConverter(MQTTTransport mqttTransport, BrokerContext brokerContext) {
this.mqttTransport = mqttTransport;
this.defaultKeepAlive = 0;
}
int generateCommandId() {
@ -142,6 +145,7 @@ class MQTTProtocolConverter {
switch (frame.messageType()) {
case PINGREQ.TYPE: {
LOG.debug("Received a ping from client: " + getClientId());
mqttTransport.sendToMQTT(PING_RESP_FRAME);
LOG.debug("Sent Ping Response to " + getClientId());
break;
@ -538,19 +542,49 @@ class MQTTProtocolConverter {
}
}
void configureInactivityMonitor(short heartBeat) {
try {
int heartBeatMS = heartBeat * 1000;
void configureInactivityMonitor(short keepAliveSeconds) {
MQTTInactivityMonitor monitor = getMQTTTransport().getInactivityMonitor();
// If the user specifically shuts off the InactivityMonitor with transport.useInactivityMonitor=false,
// then ignore configuring it because it won't exist
if (monitor == null) {
return;
}
long keepAliveMS = keepAliveSeconds * 1000;
if (LOG.isDebugEnabled()) {
LOG.debug("MQTT Client " + getClientId() + " requests heart beat of " + keepAliveMS + " ms");
}
try {
long keepAliveMSWithGracePeriod = (long) (keepAliveMS * MQTT_KEEP_ALIVE_GRACE_PERIOD);
// if we have a default keep-alive value, and the client is trying to turn off keep-alive,
// we'll observe the server-side configured default value (note, no grace period)
if (keepAliveMSWithGracePeriod == 0 && defaultKeepAlive > 0) {
keepAliveMSWithGracePeriod = defaultKeepAlive;
}
monitor.setProtocolConverter(this);
monitor.setReadCheckTime(heartBeatMS);
monitor.setInitialDelayTime(heartBeatMS);
monitor.setReadCheckTime(keepAliveMSWithGracePeriod);
monitor.setInitialDelayTime(keepAliveMS);
monitor.startMonitorThread();
if (LOG.isDebugEnabled()) {
LOG.debug("MQTT Client " + getClientId() +
" established heart beat of " + keepAliveMSWithGracePeriod +
" ms (" + keepAliveMS + "ms + " + (keepAliveMSWithGracePeriod - keepAliveMS) +
"ms grace period)");
}
} catch (Exception ex) {
LOG.warn("Failed to start MQTT InactivityMonitor ", ex);
}
LOG.debug(getClientId() + " MQTT Connection using heart beat of " + heartBeat + " secs");
}
void handleException(Throwable exception, MQTTFrame command) {
@ -577,9 +611,10 @@ class MQTTProtocolConverter {
if (connect != null && connect.clientId() != null) {
clientId = connect.clientId().toString();
}
} else {
else {
clientId = "";
}
}
return clientId;
}
@ -635,4 +670,18 @@ class MQTTProtocolConverter {
result = result.replace('/', '.');
return result;
}
public long getDefaultKeepAlive() {
return defaultKeepAlive;
}
/**
* Set the default keep alive time (in milliseconds) that would be used if configured on server side
* and the client sends a keep-alive value of 0 (zero) on a CONNECT frame
*
* @param defaultKeepAlive
*/
public void setDefaultKeepAlive(long defaultKeepAlive) {
this.defaultKeepAlive = defaultKeepAlive;
}
}

View File

@ -134,5 +134,13 @@ public class MQTTTransportFilter extends TransportFilter implements MQTTTranspor
super.onException(e);
}
public long getDefaultKeepAlive() {
return protocolConverter != null ? protocolConverter.getDefaultKeepAlive() : -1;
}
public void setDefaultKeepAlive(long defaultHeartBeat) {
protocolConverter.setDefaultKeepAlive(defaultHeartBeat);
}
}

View File

@ -19,10 +19,9 @@ package org.apache.activemq.transport.mqtt;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Vector;
import java.util.LinkedList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
@ -34,6 +33,7 @@ import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.AutoFailTestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportListener;
@ -58,7 +58,7 @@ import static org.junit.Assert.*;
public class MQTTTest {
protected static final Logger LOG = LoggerFactory.getLogger(MQTTTest.class);
protected BrokerService brokerService;
protected Vector<Throwable> exceptions = new Vector<Throwable>();
protected LinkedList<Throwable> exceptions = new LinkedList<Throwable>();
protected int numberOfMessages;
AutoFailTestSupport autoFailTestSupport = new AutoFailTestSupport() {};
@ -120,6 +120,8 @@ public class MQTTTest {
latch.await(10, TimeUnit.SECONDS);
assertEquals(0, latch.getCount());
subscribeConnection.disconnect();
publisherConnection.disconnect();
}
@Test
@ -136,7 +138,7 @@ public class MQTTTest {
connection.subscribe(topics);
for (int i = 0; i < numberOfMessages; i++) {
String payload = "Test Message: " + i;
connection.publish("foo2", payload.getBytes(), QoS.AT_MOST_ONCE, false);
connection.publish("foo", payload.getBytes(), QoS.AT_MOST_ONCE, false);
Message message = connection.receive(5, TimeUnit.SECONDS);
assertNotNull("Should get a message", message);
assertEquals(payload, new String(message.getPayload()));
@ -294,25 +296,30 @@ public class MQTTTest {
addMQTTConnector(brokerService);
brokerService.start();
TransportConnector mqttConnector = brokerService.getTransportConnectorByScheme("mqtt");
// manually need to create the client so we don't send keep alive (PINGREQ) frames to keep the conn
// from timing out
final AtomicLong exceptionCount = new AtomicLong(0);
Transport clientTransport = createManualMQTTClient(exceptionCount);
Transport clientTransport = createManualMQTTClient();
clientTransport.start();
CONNECT connectFrame = new CONNECT().clientId(new UTF8Buffer("testClient")).keepAlive((short)2);
clientTransport.oneway(connectFrame.encode());
// wait for broker to register the MQTT connection
TimeUnit.SECONDS.sleep(1);
assertTrue(mqttConnector.getConnections().size() > 0);
// wait for the inactivity monitor to remove the connection due to inactivity
TimeUnit.SECONDS.sleep(10);
System.out.println("Done waiting");
assertEquals("We have elapsed the keep alive, we should have disconnected", 1, exceptionCount.get());
assertTrue(mqttConnector.getConnections().size() == 0);
assertTrue("Should have seen client transport exception", exceptions.size() > 0);
clientTransport.stop();
}
private Transport createManualMQTTClient(final AtomicLong exceptionCount) throws IOException, URISyntaxException {
private Transport createManualMQTTClient() throws IOException, URISyntaxException {
Transport clientTransport = new TcpTransport(new MQTTWireFormat(), SocketFactory.getDefault(),
new URI("tcp://localhost:1883"), null);
clientTransport.setTransportListener(new TransportListener() {
@ -322,8 +329,7 @@ public class MQTTTest {
@Override
public void onException(IOException error) {
System.out.println("Exception!!!" + error.getMessage());
exceptionCount.incrementAndGet();
exceptions.add(error);
}
@Override
@ -353,14 +359,70 @@ public class MQTTTest {
connection.disconnect();
}
@Test
public void testTurnOffInactivityMonitor()throws Exception{
addMQTTConnector(brokerService, "?transport.useInactivityMonitor=false");
brokerService.start();
MQTT mqtt = createMQTTConnection();
mqtt.setKeepAlive((short)2);
final BlockingConnection connection = mqtt.blockingConnection();
connection.connect();
TimeUnit.SECONDS.sleep(10);
assertTrue("KeepAlive didn't work properly", connection.isConnected());
connection.disconnect();
}
@Test
public void testPingOnMQTTNIO() throws Exception {
brokerService.addConnector("mqtt+nio://localhost:1883");
brokerService.start();
MQTT mqtt = createMQTTConnection();
mqtt.setKeepAlive((short)2);
final BlockingConnection connection = mqtt.blockingConnection();
connection.connect();
TimeUnit.SECONDS.sleep(10);
assertTrue("KeepAlive didn't work properly", connection.isConnected());
connection.disconnect();
}
@Test
public void testDefaultKeepAliveWhenClientSpecifiesZero() throws Exception {
// default keep alive in milliseconds
brokerService.addConnector("mqtt://localhost:1883?transport.defaultKeepAlive=2000");
brokerService.start();
MQTT mqtt = createMQTTConnection();
mqtt.setKeepAlive((short)0);
final BlockingConnection connection = mqtt.blockingConnection();
connection.connect();
TimeUnit.SECONDS.sleep(10);
assertFalse("KeepAlive didn't work properly", connection.isConnected());
}
protected void addMQTTConnector(BrokerService brokerService) throws Exception {
brokerService.addConnector("mqtt://localhost:1883");
}
protected void addMQTTConnector(BrokerService brokerService, String config) throws Exception {
brokerService.addConnector("mqtt://localhost:1883" + config);
}
protected MQTT createMQTTConnection() throws Exception {
MQTT mqtt = new MQTT();
mqtt.setHost("localhost", 1883);
// shut off connect retry
mqtt.setConnectAttemptsMax(0);
mqtt.setReconnectAttemptsMax(0);
return mqtt;
}