More defensive checks when queuing read and write checks to the static executor.

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1405122 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Timothy A. Bish 2012-11-02 19:39:53 +00:00
parent 437ea2f6e5
commit 32e009dd7b
1 changed files with 69 additions and 47 deletions

View File

@ -18,6 +18,7 @@ package org.apache.activemq.transport;
import java.io.IOException;
import java.util.Timer;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
@ -107,10 +108,11 @@ public abstract class AbstractInactivityMonitor extends TransportFilter {
private final Runnable writeChecker = new Runnable() {
long lastRunTime;
public void run() {
long now = System.currentTimeMillis();
if( lastRunTime != 0 && LOG.isDebugEnabled() ) {
LOG.debug(this + " "+(now-lastRunTime)+" ms elapsed since last write check.");
if (lastRunTime != 0 && LOG.isDebugEnabled()) {
LOG.debug(this + " " + (now - lastRunTime) + " ms elapsed since last write check.");
}
lastRunTime = now;
@ -146,39 +148,49 @@ public abstract class AbstractInactivityMonitor extends TransportFilter {
return;
}
if (!commandSent.get() && useKeepAlive && monitorStarted.get() && !ASYNC_TASKS.isTerminating()) {
if (!commandSent.get() && useKeepAlive && monitorStarted.get() &&
!ASYNC_TASKS.isTerminating() && !ASYNC_TASKS.isTerminated()) {
if (LOG.isTraceEnabled()) {
LOG.trace(this + " no message sent since last write check, sending a KeepAliveInfo");
}
ASYNC_TASKS.execute(new Runnable() {
public void run() {
if (LOG.isDebugEnabled()) {
LOG.debug("Running {}", this);
}
if (monitorStarted.get()) {
try {
// If we can't get the lock it means another write beat us into the
// send and we don't need to heart beat now.
if (sendLock.writeLock().tryLock()) {
KeepAliveInfo info = new KeepAliveInfo();
info.setResponseRequired(keepAliveResponseRequired);
doOnewaySend(info);
try {
ASYNC_TASKS.execute(new Runnable() {
public void run() {
if (LOG.isDebugEnabled()) {
LOG.debug("Running {}", this);
}
if (monitorStarted.get()) {
try {
// If we can't get the lock it means another write beat us into the
// send and we don't need to heart beat now.
if (sendLock.writeLock().tryLock()) {
KeepAliveInfo info = new KeepAliveInfo();
info.setResponseRequired(keepAliveResponseRequired);
doOnewaySend(info);
}
} catch (IOException e) {
onException(e);
} finally {
if (sendLock.writeLock().isHeldByCurrentThread()) {
sendLock.writeLock().unlock();
}
}
} catch (IOException e) {
onException(e);
} finally {
if (sendLock.writeLock().isHeldByCurrentThread()) {
sendLock.writeLock().unlock();
}
}
}
}
@Override
public String toString() {
return "WriteCheck[" + getRemoteAddress() + "]";
};
});
@Override
public String toString() {
return "WriteCheck[" + getRemoteAddress() + "]";
};
});
} catch (RejectedExecutionException ex) {
if (!ASYNC_TASKS.isTerminating() && !ASYNC_TASKS.isTerminated()) {
LOG.error("Async write check was rejected from the executor: ", ex);
throw ex;
}
}
} else {
if (LOG.isTraceEnabled()) {
LOG.trace(this + " message sent since last write check, resetting flag");
@ -197,23 +209,33 @@ public abstract class AbstractInactivityMonitor extends TransportFilter {
}
return;
}
if (!commandReceived.get() && monitorStarted.get() && !ASYNC_TASKS.isTerminating()) {
if (!commandReceived.get() && monitorStarted.get() &&
!ASYNC_TASKS.isTerminating() && !ASYNC_TASKS.isTerminated()) {
if (LOG.isDebugEnabled()) {
LOG.debug("No message received since last read check for " + toString() + ". Throwing InactivityIOException.");
}
ASYNC_TASKS.execute(new Runnable() {
public void run() {
if (LOG.isDebugEnabled()) {
LOG.debug("Running {}", this);
}
onException(new InactivityIOException("Channel was inactive for too (>" + readCheckTime + ") long: "+next.getRemoteAddress()));
}
@Override
public String toString() {
return "ReadCheck[" + getRemoteAddress() + "]";
};
});
try {
ASYNC_TASKS.execute(new Runnable() {
public void run() {
if (LOG.isDebugEnabled()) {
LOG.debug("Running {}", this);
}
onException(new InactivityIOException("Channel was inactive for too (>" + readCheckTime + ") long: "+next.getRemoteAddress()));
}
@Override
public String toString() {
return "ReadCheck[" + getRemoteAddress() + "]";
};
});
} catch (RejectedExecutionException ex) {
if (!ASYNC_TASKS.isTerminating() && !ASYNC_TASKS.isTerminated()) {
LOG.error("Async read check was rejected from the executor: ", ex);
throw ex;
}
}
} else {
if (LOG.isTraceEnabled()) {
LOG.trace("Message received since last read check, resetting flag: ");
@ -278,8 +300,8 @@ public abstract class AbstractInactivityMonitor extends TransportFilter {
// Must be called under lock, either read or write on sendLock.
private void doOnewaySend(Object command) throws IOException {
if( failed.get() ) {
throw new InactivityIOException("Cannot send, channel has already failed: "+next.getRemoteAddress());
if (failed.get()) {
throw new InactivityIOException("Cannot send, channel has already failed: " + next.getRemoteAddress());
}
if (command.getClass() == WireFormatInfo.class) {
synchronized (this) {
@ -382,13 +404,13 @@ public abstract class AbstractInactivityMonitor extends TransportFilter {
if (writeCheckerTask != null) {
writeCheckerTask.cancel();
}
synchronized( AbstractInactivityMonitor.class ) {
synchronized (AbstractInactivityMonitor.class) {
WRITE_CHECK_TIMER.purge();
READ_CHECK_TIMER.purge();
CHECKER_COUNTER--;
if(CHECKER_COUNTER==0) {
WRITE_CHECK_TIMER.cancel();
READ_CHECK_TIMER.cancel();
if (CHECKER_COUNTER == 0) {
WRITE_CHECK_TIMER.cancel();
READ_CHECK_TIMER.cancel();
WRITE_CHECK_TIMER = null;
READ_CHECK_TIMER = null;
ThreadPoolUtils.shutdown(ASYNC_TASKS);