diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java b/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java index 41d1dd64dd..0a999852eb 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java @@ -52,7 +52,6 @@ public class InactivityMonitor extends TransportFilter { private final AtomicBoolean commandSent = new AtomicBoolean(false); private final AtomicBoolean inSend = new AtomicBoolean(false); private final AtomicBoolean failed = new AtomicBoolean(false); - private final AtomicBoolean stopped = new AtomicBoolean(false); private final AtomicBoolean commandReceived = new AtomicBoolean(true); private final AtomicBoolean inReceive = new AtomicBoolean(false); @@ -110,7 +109,7 @@ public class InactivityMonitor extends TransportFilter { } public void stop() throws Exception { - closeDown(); + stopMonitorThreads(); next.stop(); } @@ -128,7 +127,7 @@ public class InactivityMonitor extends TransportFilter { } ASYNC_TASKS.execute(new Runnable() { public void run() { - if (stopped.get() == false) { + if (monitorStarted.get()) { try { KeepAliveInfo info = new KeepAliveInfo(); @@ -157,19 +156,15 @@ public class InactivityMonitor extends TransportFilter { return; } if (!commandReceived.get()) { - if( !failed.getAndSet(true) ) { - if (LOG.isDebugEnabled()) { - LOG.debug("No message received since last read check for " + toString() + "! Throwing InactivityIOException."); - } - closeDown(); - ASYNC_TASKS.execute(new Runnable() { - public void run() { - onException(new InactivityIOException("Channel was inactive for too long: "+next.getRemoteAddress())); - }; - - }); + if (LOG.isDebugEnabled()) { + LOG.debug("No message received since last read check for " + toString() + "! Throwing InactivityIOException."); } - + ASYNC_TASKS.execute(new Runnable() { + public void run() { + onException(new InactivityIOException("Channel was inactive for too long: "+next.getRemoteAddress())); + }; + + }); } else { if (LOG.isTraceEnabled()) { LOG.trace("Message received since last read check, resetting flag: "); @@ -227,7 +222,6 @@ public class InactivityMonitor extends TransportFilter { try { if( failed.get() ) { - closeDown(); throw new InactivityIOException("Channel was inactive for too long: "+next.getRemoteAddress()); } if (o.getClass() == WireFormatInfo.class) { @@ -245,18 +239,11 @@ public class InactivityMonitor extends TransportFilter { } public void onException(IOException error) { - closeDown(); if (!failed.compareAndSet(false,true)) { - transportListener.onException(error); + stopMonitorThreads(); + transportListener.onException(error); } - } - - private void closeDown() { - stopped.set(true); - if (monitorStarted.get()) { - stopMonitorThreads(); - } - } + } private synchronized void startMonitorThreads() throws IOException { if (monitorStarted.get()) {