access to the writeThread was not safe.. Plus interrupting another thread is not recommended anyways.

also fail new oneway() operations once inactivity is detected.



git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@638596 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hiram R. Chirino 2008-03-18 21:43:42 +00:00
parent dc4f2993cc
commit 6b33749b17
1 changed files with 16 additions and 18 deletions

View File

@ -23,6 +23,7 @@ import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.activemq.command.KeepAliveInfo; import org.apache.activemq.command.KeepAliveInfo;
import org.apache.activemq.command.WireFormatInfo; import org.apache.activemq.command.WireFormatInfo;
@ -51,12 +52,13 @@ public class InactivityMonitor extends TransportFilter {
private final AtomicBoolean commandSent = new AtomicBoolean(false); private final AtomicBoolean commandSent = new AtomicBoolean(false);
private final AtomicBoolean inSend = new AtomicBoolean(false); private final AtomicBoolean inSend = new AtomicBoolean(false);
private final AtomicBoolean inactive = new AtomicBoolean(false);
private final AtomicBoolean commandReceived = new AtomicBoolean(true); private final AtomicBoolean commandReceived = new AtomicBoolean(true);
private final AtomicBoolean inReceive = new AtomicBoolean(false); private final AtomicBoolean inReceive = new AtomicBoolean(false);
private SchedulerTimerTask writeCheckerTask; private SchedulerTimerTask writeCheckerTask;
private SchedulerTimerTask readCheckerTask; private SchedulerTimerTask readCheckerTask;
private Thread writeThread;
private long readCheckTime; private long readCheckTime;
private long writeCheckTime; private long writeCheckTime;
@ -151,20 +153,17 @@ public class InactivityMonitor extends TransportFilter {
return; return;
} }
if (!commandReceived.get()) { if (!commandReceived.get()) {
if (LOG.isDebugEnabled()) { if( inactive.getAndSet(false) ) {
LOG.debug("No message received since last read check for " + toString() + "! Throwing InactivityIOException."); if (LOG.isDebugEnabled()) {
LOG.debug("No message received since last read check for " + toString() + "! Throwing InactivityIOException.");
}
ASYNC_TASKS.execute(new Runnable() {
public void run() {
onException(new InactivityIOException("Channel was inactive for too long: "+next.getRemoteAddress()));
};
});
} }
ASYNC_TASKS.execute(new Runnable() {
public void run() {
Thread t = writeThread;
if (t != null) {
t.interrupt();
}
onException(new InactivityIOException("Channel was inactive for too long: "+next.getRemoteAddress()));
};
});
} else { } else {
if (LOG.isTraceEnabled()) { if (LOG.isTraceEnabled()) {
@ -223,12 +222,11 @@ public class InactivityMonitor extends TransportFilter {
startMonitorThreads(); startMonitorThreads();
} }
} }
synchronized (writeChecker) { if( inactive.get() ) {
writeThread=Thread.currentThread(); throw new InactivityIOException("Channel was inactive for too long: "+next.getRemoteAddress());
next.oneway(o);
} }
next.oneway(o);
} finally { } finally {
writeThread=null;
commandSent.set(true); commandSent.set(true);
inSend.set(false); inSend.set(false);
} }