diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java b/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java index 4f45c6a6f0..663931baac 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java @@ -23,6 +23,7 @@ import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import org.apache.activemq.command.KeepAliveInfo; import org.apache.activemq.command.WireFormatInfo; @@ -51,12 +52,13 @@ public class InactivityMonitor extends TransportFilter { private final AtomicBoolean commandSent = new AtomicBoolean(false); private final AtomicBoolean inSend = new AtomicBoolean(false); + private final AtomicBoolean inactive = new AtomicBoolean(false); private final AtomicBoolean commandReceived = new AtomicBoolean(true); private final AtomicBoolean inReceive = new AtomicBoolean(false); private SchedulerTimerTask writeCheckerTask; private SchedulerTimerTask readCheckerTask; - private Thread writeThread; + private long readCheckTime; private long writeCheckTime; @@ -151,20 +153,17 @@ public class InactivityMonitor extends TransportFilter { return; } if (!commandReceived.get()) { - if (LOG.isDebugEnabled()) { - LOG.debug("No message received since last read check for " + toString() + "! Throwing InactivityIOException."); + if( inactive.getAndSet(false) ) { + if (LOG.isDebugEnabled()) { + LOG.debug("No message received since last read check for " + toString() + "! Throwing InactivityIOException."); + } + ASYNC_TASKS.execute(new Runnable() { + public void run() { + onException(new InactivityIOException("Channel was inactive for too long: "+next.getRemoteAddress())); + }; + + }); } - ASYNC_TASKS.execute(new Runnable() { - public void run() { - Thread t = writeThread; - if (t != null) { - t.interrupt(); - } - onException(new InactivityIOException("Channel was inactive for too long: "+next.getRemoteAddress())); - - }; - - }); } else { if (LOG.isTraceEnabled()) { @@ -223,12 +222,11 @@ public class InactivityMonitor extends TransportFilter { startMonitorThreads(); } } - synchronized (writeChecker) { - writeThread=Thread.currentThread(); - next.oneway(o); + if( inactive.get() ) { + throw new InactivityIOException("Channel was inactive for too long: "+next.getRemoteAddress()); } + next.oneway(o); } finally { - writeThread=null; commandSent.set(true); inSend.set(false); }