diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/AbstractInactivityMonitor.java b/activemq-core/src/main/java/org/apache/activemq/transport/AbstractInactivityMonitor.java index da17d105b5..3357358399 100644 --- a/activemq-core/src/main/java/org/apache/activemq/transport/AbstractInactivityMonitor.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/AbstractInactivityMonitor.java @@ -18,6 +18,7 @@ package org.apache.activemq.transport; import java.io.IOException; import java.util.Timer; +import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; @@ -107,10 +108,11 @@ public abstract class AbstractInactivityMonitor extends TransportFilter { private final Runnable writeChecker = new Runnable() { long lastRunTime; + public void run() { long now = System.currentTimeMillis(); - if( lastRunTime != 0 && LOG.isDebugEnabled() ) { - LOG.debug(this + " "+(now-lastRunTime)+" ms elapsed since last write check."); + if (lastRunTime != 0 && LOG.isDebugEnabled()) { + LOG.debug(this + " " + (now - lastRunTime) + " ms elapsed since last write check."); } lastRunTime = now; @@ -146,39 +148,49 @@ public abstract class AbstractInactivityMonitor extends TransportFilter { return; } - if (!commandSent.get() && useKeepAlive && monitorStarted.get() && !ASYNC_TASKS.isTerminating()) { + if (!commandSent.get() && useKeepAlive && monitorStarted.get() && + !ASYNC_TASKS.isTerminating() && !ASYNC_TASKS.isTerminated()) { + if (LOG.isTraceEnabled()) { LOG.trace(this + " no message sent since last write check, sending a KeepAliveInfo"); } - ASYNC_TASKS.execute(new Runnable() { - public void run() { - if (LOG.isDebugEnabled()) { - LOG.debug("Running {}", this); - } - if (monitorStarted.get()) { - try { - // If we can't get the lock it means another write beat us into the - // send and we don't need to heart beat now. - if (sendLock.writeLock().tryLock()) { - KeepAliveInfo info = new KeepAliveInfo(); - info.setResponseRequired(keepAliveResponseRequired); - doOnewaySend(info); + + try { + ASYNC_TASKS.execute(new Runnable() { + public void run() { + if (LOG.isDebugEnabled()) { + LOG.debug("Running {}", this); + } + if (monitorStarted.get()) { + try { + // If we can't get the lock it means another write beat us into the + // send and we don't need to heart beat now. + if (sendLock.writeLock().tryLock()) { + KeepAliveInfo info = new KeepAliveInfo(); + info.setResponseRequired(keepAliveResponseRequired); + doOnewaySend(info); + } + } catch (IOException e) { + onException(e); + } finally { + if (sendLock.writeLock().isHeldByCurrentThread()) { + sendLock.writeLock().unlock(); + } } - } catch (IOException e) { - onException(e); - } finally { - if (sendLock.writeLock().isHeldByCurrentThread()) { - sendLock.writeLock().unlock(); - } } } - } - @Override - public String toString() { - return "WriteCheck[" + getRemoteAddress() + "]"; - }; - }); + @Override + public String toString() { + return "WriteCheck[" + getRemoteAddress() + "]"; + }; + }); + } catch (RejectedExecutionException ex) { + if (!ASYNC_TASKS.isTerminating() && !ASYNC_TASKS.isTerminated()) { + LOG.error("Async write check was rejected from the executor: ", ex); + throw ex; + } + } } else { if (LOG.isTraceEnabled()) { LOG.trace(this + " message sent since last write check, resetting flag"); @@ -197,23 +209,33 @@ public abstract class AbstractInactivityMonitor extends TransportFilter { } return; } - if (!commandReceived.get() && monitorStarted.get() && !ASYNC_TASKS.isTerminating()) { + if (!commandReceived.get() && monitorStarted.get() && + !ASYNC_TASKS.isTerminating() && !ASYNC_TASKS.isTerminated()) { + if (LOG.isDebugEnabled()) { LOG.debug("No message received since last read check for " + toString() + ". Throwing InactivityIOException."); } - ASYNC_TASKS.execute(new Runnable() { - public void run() { - if (LOG.isDebugEnabled()) { - LOG.debug("Running {}", this); - } - onException(new InactivityIOException("Channel was inactive for too (>" + readCheckTime + ") long: "+next.getRemoteAddress())); - } - @Override - public String toString() { - return "ReadCheck[" + getRemoteAddress() + "]"; - }; - }); + try { + ASYNC_TASKS.execute(new Runnable() { + public void run() { + if (LOG.isDebugEnabled()) { + LOG.debug("Running {}", this); + } + onException(new InactivityIOException("Channel was inactive for too (>" + readCheckTime + ") long: "+next.getRemoteAddress())); + } + + @Override + public String toString() { + return "ReadCheck[" + getRemoteAddress() + "]"; + }; + }); + } catch (RejectedExecutionException ex) { + if (!ASYNC_TASKS.isTerminating() && !ASYNC_TASKS.isTerminated()) { + LOG.error("Async read check was rejected from the executor: ", ex); + throw ex; + } + } } else { if (LOG.isTraceEnabled()) { LOG.trace("Message received since last read check, resetting flag: "); @@ -278,8 +300,8 @@ public abstract class AbstractInactivityMonitor extends TransportFilter { // Must be called under lock, either read or write on sendLock. private void doOnewaySend(Object command) throws IOException { - if( failed.get() ) { - throw new InactivityIOException("Cannot send, channel has already failed: "+next.getRemoteAddress()); + if (failed.get()) { + throw new InactivityIOException("Cannot send, channel has already failed: " + next.getRemoteAddress()); } if (command.getClass() == WireFormatInfo.class) { synchronized (this) { @@ -382,13 +404,13 @@ public abstract class AbstractInactivityMonitor extends TransportFilter { if (writeCheckerTask != null) { writeCheckerTask.cancel(); } - synchronized( AbstractInactivityMonitor.class ) { + synchronized (AbstractInactivityMonitor.class) { WRITE_CHECK_TIMER.purge(); READ_CHECK_TIMER.purge(); CHECKER_COUNTER--; - if(CHECKER_COUNTER==0) { - WRITE_CHECK_TIMER.cancel(); - READ_CHECK_TIMER.cancel(); + if (CHECKER_COUNTER == 0) { + WRITE_CHECK_TIMER.cancel(); + READ_CHECK_TIMER.cancel(); WRITE_CHECK_TIMER = null; READ_CHECK_TIMER = null; ThreadPoolUtils.shutdown(ASYNC_TASKS);