From 4032a01c17bf1ea733979361b000f889f692e5eb Mon Sep 17 00:00:00 2001 From: Robert Davies Date: Mon, 17 Mar 2008 13:32:31 +0000 Subject: [PATCH] Interrupt the writing thread on failure git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@637881 13f79535-47bb-0310-9956-ffa450edef68 --- .../activemq/transport/InactivityMonitor.java | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java b/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java index 325ffcf789..4f45c6a6f0 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java @@ -56,7 +56,7 @@ public class InactivityMonitor extends TransportFilter { private final AtomicBoolean inReceive = new AtomicBoolean(false); private SchedulerTimerTask writeCheckerTask; private SchedulerTimerTask readCheckerTask; - + private Thread writeThread; private long readCheckTime; private long writeCheckTime; @@ -154,13 +154,16 @@ public class InactivityMonitor extends TransportFilter { if (LOG.isDebugEnabled()) { LOG.debug("No message received since last read check for " + toString() + "! Throwing InactivityIOException."); } - - - // TODO: use a thread pool for this.. - ASYNC_TASKS.execute(new Runnable() { + ASYNC_TASKS.execute(new Runnable() { public void run() { - onException(new InactivityIOException("Channel was inactive for too long: "+next.getRemoteAddress())); + Thread t = writeThread; + if (t != null) { + t.interrupt(); + } + onException(new InactivityIOException("Channel was inactive for too long: "+next.getRemoteAddress())); + }; + }); } else { @@ -221,9 +224,11 @@ public class InactivityMonitor extends TransportFilter { } } synchronized (writeChecker) { + writeThread=Thread.currentThread(); next.oneway(o); } } finally { + writeThread=null; commandSent.set(true); inSend.set(false); }