More improvements for AMQ-5043. Reworked the MQTT inactivity monitor so that it's more accurate.

This commit is contained in:
Hiram Chirino 2014-02-12 13:26:16 -05:00
parent e2a7d6af5a
commit 6e68a37115
2 changed files with 48 additions and 75 deletions

View File

@ -27,7 +27,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
import org.apache.activemq.command.KeepAliveInfo;
import org.apache.activemq.thread.SchedulerTimerTask; import org.apache.activemq.thread.SchedulerTimerTask;
import org.apache.activemq.transport.AbstractInactivityMonitor; import org.apache.activemq.transport.AbstractInactivityMonitor;
import org.apache.activemq.transport.InactivityIOException; import org.apache.activemq.transport.InactivityIOException;
@ -50,46 +49,53 @@ public class MQTTInactivityMonitor extends TransportFilter {
private final AtomicBoolean monitorStarted = new AtomicBoolean(false); private final AtomicBoolean monitorStarted = new AtomicBoolean(false);
private final AtomicBoolean failed = new AtomicBoolean(false); private final AtomicBoolean failed = new AtomicBoolean(false);
private final AtomicBoolean commandReceived = new AtomicBoolean(true);
private final AtomicBoolean inReceive = new AtomicBoolean(false); private final AtomicBoolean inReceive = new AtomicBoolean(false);
private final AtomicInteger lastReceiveCounter = new AtomicInteger(0); private final AtomicInteger lastReceiveCounter = new AtomicInteger(0);
private final ReentrantLock sendLock = new ReentrantLock(); private final ReentrantLock sendLock = new ReentrantLock();
private SchedulerTimerTask readCheckerTask; private SchedulerTimerTask readCheckerTask;
private long readCheckTime = DEFAULT_CHECK_TIME_MILLS; private long readGraceTime = DEFAULT_CHECK_TIME_MILLS;
private long initialDelayTime = DEFAULT_CHECK_TIME_MILLS; private long readKeepAliveTime = DEFAULT_CHECK_TIME_MILLS;
private boolean keepAliveResponseRequired; private boolean keepAliveResponseRequired;
private MQTTProtocolConverter protocolConverter; private MQTTProtocolConverter protocolConverter;
private final Runnable readChecker = new Runnable() { private final Runnable readChecker = new Runnable() {
long lastRunTime; long lastReceiveTime = System.currentTimeMillis();
public void run() { public void run() {
long now = System.currentTimeMillis(); long now = System.currentTimeMillis();
long elapsed = (now - lastRunTime); int currentCounter = next.getReceiveCounter();
int previousCounter = lastReceiveCounter.getAndSet(currentCounter);
if (lastRunTime != 0 && LOG.isDebugEnabled()) { // for the PINGREQ/RESP frames, the currentCounter will be different from previousCounter, and that
LOG.debug("" + elapsed + " ms elapsed since last read check."); // should be sufficient to indicate the connection is still alive. If there were random data, or something
// outside the scope of the spec, the wire format unrmarshalling would fail, so we don't need to handle
// PINGREQ/RESP explicitly here
if (inReceive.get() || currentCounter != previousCounter) {
if (LOG.isTraceEnabled()) {
LOG.trace("Command received since last read check.");
} }
lastReceiveTime = now;
// Perhaps the timer executed a read check late.. and then executes
// the next read check on time which causes the time elapsed between
// read checks to be small..
// If less than 90% of the read check Time elapsed then abort this readcheck.
if (!allowReadCheck(elapsed)) { // FUNKY qdox bug does not allow me to inline this expression.
LOG.debug("Aborting read check.. Not enough time elapsed since last read check.");
return; return;
} }
lastRunTime = now; if( (now-lastReceiveTime) >= readKeepAliveTime+readGraceTime && monitorStarted.get() && !ASYNC_TASKS.isTerminating()) {
readCheck(); if (LOG.isDebugEnabled()) {
LOG.debug("No message received since last read check for " + MQTTInactivityMonitor.this.toString() + "! Throwing InactivityIOException.");
}
ASYNC_TASKS.execute(new Runnable() {
public void run() {
onException(new InactivityIOException("Channel was inactive for too (>" + (readKeepAliveTime+readGraceTime) + ") long: " + next.getRemoteAddress()));
}
});
}
} }
}; };
private boolean allowReadCheck(long elapsed) { private boolean allowReadCheck(long elapsed) {
return elapsed > (readCheckTime * 9 / 10); return elapsed > (readGraceTime * 9 / 10);
} }
public MQTTInactivityMonitor(Transport next, WireFormat wireFormat) { public MQTTInactivityMonitor(Transport next, WireFormat wireFormat) {
@ -106,39 +112,7 @@ public class MQTTInactivityMonitor extends TransportFilter {
next.stop(); next.stop();
} }
final void readCheck() {
int currentCounter = next.getReceiveCounter();
int previousCounter = lastReceiveCounter.getAndSet(currentCounter);
// for the PINGREQ/RESP frames, the currentCounter will be different from previousCounter, and that
// should be sufficient to indicate the connection is still alive. If there were random data, or something
// outside the scope of the spec, the wire format unrmarshalling would fail, so we don't need to handle
// PINGREQ/RESP explicitly here
if (inReceive.get() || currentCounter != previousCounter) {
if (LOG.isTraceEnabled()) {
LOG.trace("A receive is in progress");
}
return;
}
if (!commandReceived.get() && monitorStarted.get() && !ASYNC_TASKS.isTerminating()) {
if (LOG.isDebugEnabled()) {
LOG.debug("No message received since last read check for " + toString() + "! Throwing InactivityIOException.");
}
ASYNC_TASKS.execute(new Runnable() {
public void run() {
onException(new InactivityIOException("Channel was inactive for too (>" + readCheckTime + ") long: " + next.getRemoteAddress()));
}
});
} else {
if (LOG.isTraceEnabled()) {
LOG.trace("Message received since last read check, resetting flag: ");
}
}
commandReceived.set(false);
}
public void onCommand(Object command) { public void onCommand(Object command) {
commandReceived.set(true);
inReceive.set(true); inReceive.set(true);
try { try {
transportListener.onCommand(command); transportListener.onCommand(command);
@ -177,20 +151,20 @@ public class MQTTInactivityMonitor extends TransportFilter {
} }
} }
public long getReadCheckTime() { public long getReadGraceTime() {
return readCheckTime; return readGraceTime;
} }
public void setReadCheckTime(long readCheckTime) { public void setReadGraceTime(long readGraceTime) {
this.readCheckTime = readCheckTime; this.readGraceTime = readGraceTime;
} }
public long getInitialDelayTime() { public long getReadKeepAliveTime() {
return initialDelayTime; return readKeepAliveTime;
} }
public void setInitialDelayTime(long initialDelayTime) { public void setReadKeepAliveTime(long readKeepAliveTime) {
this.initialDelayTime = initialDelayTime; this.readKeepAliveTime = readKeepAliveTime;
} }
public boolean isKeepAliveResponseRequired() { public boolean isKeepAliveResponseRequired() {
@ -224,11 +198,11 @@ public class MQTTInactivityMonitor extends TransportFilter {
return; return;
} }
if (readCheckTime > 0) { if (readKeepAliveTime > 0) {
readCheckerTask = new SchedulerTimerTask(readChecker); readCheckerTask = new SchedulerTimerTask(readChecker);
} }
if (readCheckTime > 0) { if (readKeepAliveTime > 0) {
monitorStarted.set(true); monitorStarted.set(true);
synchronized (AbstractInactivityMonitor.class) { synchronized (AbstractInactivityMonitor.class) {
if (CHECKER_COUNTER == 0) { if (CHECKER_COUNTER == 0) {
@ -236,8 +210,8 @@ public class MQTTInactivityMonitor extends TransportFilter {
READ_CHECK_TIMER = new Timer("InactivityMonitor ReadCheck", true); READ_CHECK_TIMER = new Timer("InactivityMonitor ReadCheck", true);
} }
CHECKER_COUNTER++; CHECKER_COUNTER++;
if (readCheckTime > 0) { if (readKeepAliveTime > 0) {
READ_CHECK_TIMER.schedule(readCheckerTask, initialDelayTime, readCheckTime); READ_CHECK_TIMER.schedule(readCheckerTask, readKeepAliveTime, readGraceTime);
} }
} }
} }

View File

@ -30,7 +30,6 @@ import javax.jms.Message;
import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.*; import org.apache.activemq.command.*;
import org.apache.activemq.store.PersistenceAdapterSupport; import org.apache.activemq.store.PersistenceAdapterSupport;
import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.util.ByteArrayOutputStream; import org.apache.activemq.util.ByteArrayOutputStream;
import org.apache.activemq.util.ByteSequence; import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.IOExceptionSupport; import org.apache.activemq.util.IOExceptionSupport;
@ -51,7 +50,7 @@ public class MQTTProtocolConverter {
private static final IdGenerator CONNECTION_ID_GENERATOR = new IdGenerator(); private static final IdGenerator CONNECTION_ID_GENERATOR = new IdGenerator();
private static final MQTTFrame PING_RESP_FRAME = new PINGRESP().encode(); private static final MQTTFrame PING_RESP_FRAME = new PINGRESP().encode();
private static final double MQTT_KEEP_ALIVE_GRACE_PERIOD= 1.5; private static final double MQTT_KEEP_ALIVE_GRACE_PERIOD= 0.5;
private static final int DEFAULT_CACHE_SIZE = 5000; private static final int DEFAULT_CACHE_SIZE = 5000;
private final ConnectionId connectionId = new ConnectionId(CONNECTION_ID_GENERATOR.generateId()); private final ConnectionId connectionId = new ConnectionId(CONNECTION_ID_GENERATOR.generateId());
@ -609,24 +608,24 @@ public class MQTTProtocolConverter {
} }
try { try {
long keepAliveMSWithGracePeriod = (long) (keepAliveMS * MQTT_KEEP_ALIVE_GRACE_PERIOD);
// if we have a default keep-alive value, and the client is trying to turn off keep-alive, // if we have a default keep-alive value, and the client is trying to turn off keep-alive,
// we'll observe the server-side configured default value (note, no grace period) // we'll observe the server-side configured default value (note, no grace period)
if (keepAliveMSWithGracePeriod == 0 && defaultKeepAlive > 0) { if (keepAliveMS == 0 && defaultKeepAlive > 0) {
keepAliveMSWithGracePeriod = defaultKeepAlive; keepAliveMS = defaultKeepAlive;
} }
long readGracePeriod = (long) (keepAliveMS * MQTT_KEEP_ALIVE_GRACE_PERIOD);
monitor.setProtocolConverter(this); monitor.setProtocolConverter(this);
monitor.setReadCheckTime(keepAliveMSWithGracePeriod); monitor.setReadKeepAliveTime(keepAliveMS);
monitor.setInitialDelayTime(keepAliveMS); monitor.setReadGraceTime(readGracePeriod);
monitor.startMonitorThread(); monitor.startMonitorThread();
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("MQTT Client " + getClientId() + LOG.debug("MQTT Client " + getClientId() +
" established heart beat of " + keepAliveMSWithGracePeriod + " established heart beat of " + keepAliveMS +
" ms (" + keepAliveMS + "ms + " + (keepAliveMSWithGracePeriod - keepAliveMS) + " ms (" + keepAliveMS + "ms + " + readGracePeriod +
"ms grace period)"); "ms grace period)");
} }
} catch (Exception ex) { } catch (Exception ex) {