fix potential NPE
This commit is contained in:
Timothy Bish 2014-02-13 16:39:12 -05:00
parent 190a44bf25
commit a059bf4a90
1 changed files with 9 additions and 2 deletions

View File

@ -63,6 +63,7 @@ public class MQTTInactivityMonitor extends TransportFilter {
private final Runnable readChecker = new Runnable() { private final Runnable readChecker = new Runnable() {
long lastReceiveTime = System.currentTimeMillis(); long lastReceiveTime = System.currentTimeMillis();
@Override
public void run() { public void run() {
long now = System.currentTimeMillis(); long now = System.currentTimeMillis();
@ -86,6 +87,7 @@ public class MQTTInactivityMonitor extends TransportFilter {
LOG.debug("No message received since last read check for " + MQTTInactivityMonitor.this.toString() + "! Throwing InactivityIOException."); LOG.debug("No message received since last read check for " + MQTTInactivityMonitor.this.toString() + "! Throwing InactivityIOException.");
} }
ASYNC_TASKS.execute(new Runnable() { ASYNC_TASKS.execute(new Runnable() {
@Override
public void run() { public void run() {
onException(new InactivityIOException("Channel was inactive for too (>" + (readKeepAliveTime+readGraceTime) + ") long: " + next.getRemoteAddress())); onException(new InactivityIOException("Channel was inactive for too (>" + (readKeepAliveTime+readGraceTime) + ") long: " + next.getRemoteAddress()));
} }
@ -102,16 +104,19 @@ public class MQTTInactivityMonitor extends TransportFilter {
super(next); super(next);
} }
@Override
public void start() throws Exception { public void start() throws Exception {
next.start(); next.start();
startMonitorThread(); startMonitorThread();
} }
@Override
public void stop() throws Exception { public void stop() throws Exception {
stopMonitorThread(); stopMonitorThread();
next.stop(); next.stop();
} }
@Override
public void onCommand(Object command) { public void onCommand(Object command) {
inReceive.set(true); inReceive.set(true);
try { try {
@ -121,6 +126,7 @@ public class MQTTInactivityMonitor extends TransportFilter {
} }
} }
@Override
public void oneway(Object o) throws IOException { public void oneway(Object o) throws IOException {
// To prevent the inactivity monitor from sending a message while we // To prevent the inactivity monitor from sending a message while we
// are performing a send we take the lock. // are performing a send we take the lock.
@ -140,13 +146,13 @@ public class MQTTInactivityMonitor extends TransportFilter {
next.oneway(command); next.oneway(command);
} }
@Override
public void onException(IOException error) { public void onException(IOException error) {
if (failed.compareAndSet(false, true)) { if (failed.compareAndSet(false, true)) {
stopMonitorThread(); stopMonitorThread();
if (protocolConverter != null) { if (protocolConverter != null) {
protocolConverter.onTransportError(); protocolConverter.onTransportError();
} }
protocolConverter.onTransportError();
transportListener.onException(error); transportListener.onException(error);
} }
} }
@ -236,7 +242,8 @@ public class MQTTInactivityMonitor extends TransportFilter {
} }
} }
private ThreadFactory factory = new ThreadFactory() { private final ThreadFactory factory = new ThreadFactory() {
@Override
public Thread newThread(Runnable runnable) { public Thread newThread(Runnable runnable) {
Thread thread = new Thread(runnable, "MQTTInactivityMonitor Async Task: " + runnable); Thread thread = new Thread(runnable, "MQTTInactivityMonitor Async Task: " + runnable);
thread.setDaemon(true); thread.setDaemon(true);