diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java b/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java index 083608968a..691149c169 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java @@ -35,18 +35,18 @@ import org.apache.commons.logging.LogFactory; /** * Used to make sure that commands are arriving periodically from the peer of * the transport. - * + * * @version $Revision$ */ public class InactivityMonitor extends TransportFilter { private static final Log LOG = LogFactory.getLog(InactivityMonitor.class); private static final ThreadPoolExecutor ASYNC_TASKS; - + private static int CHECKER_COUNTER; private static Timer READ_CHECK_TIMER; private static Timer WRITE_CHECK_TIMER; - + private WireFormatInfo localWireFormatInfo; private WireFormatInfo remoteWireFormatInfo; private final AtomicBoolean monitorStarted = new AtomicBoolean(false); @@ -58,16 +58,17 @@ public class InactivityMonitor extends TransportFilter { private final AtomicBoolean commandReceived = new AtomicBoolean(true); private final AtomicBoolean inReceive = new AtomicBoolean(false); private final AtomicInteger lastReceiveCounter = new AtomicInteger(0); - + private SchedulerTimerTask writeCheckerTask; private SchedulerTimerTask readCheckerTask; - + + private boolean ignoreRemoteWireFormat = false; private long readCheckTime; private long writeCheckTime; private long initialDelayTime; private boolean keepAliveResponseRequired; private WireFormat wireFormat; - + private final Runnable readChecker = new Runnable() { long lastRunTime; public void run() { @@ -77,22 +78,22 @@ public class InactivityMonitor extends TransportFilter { if( lastRunTime != 0 && LOG.isDebugEnabled() ) { LOG.debug(""+elapsed+" ms elapsed since last read check."); } - + // Perhaps the timer executed a read check late.. and then executes // the next read check on time which causes the time elapsed between // read checks to be small.. - - // If less than 90% of the read check Time elapsed then abort this readcheck. + + // If less than 90% of the read check Time elapsed then abort this readcheck. if( !allowReadCheck(elapsed) ) { // FUNKY qdox bug does not allow me to inline this expression. LOG.debug("Aborting read check.. Not enough time elapsed since last read check."); return; } - + lastRunTime = now; readCheck(); } }; - + private boolean allowReadCheck(long elapsed) { return elapsed > (readCheckTime * 9 / 10); } @@ -103,9 +104,9 @@ public class InactivityMonitor extends TransportFilter { long now = System.currentTimeMillis(); if( lastRunTime != 0 && LOG.isDebugEnabled() ) { LOG.debug(""+(now-lastRunTime)+" ms elapsed since last write check."); - + } - lastRunTime = now; + lastRunTime = now; writeCheck(); } }; @@ -116,7 +117,7 @@ public class InactivityMonitor extends TransportFilter { } public void stop() throws Exception { - stopMonitorThreads(); + stopMonitorThreads(); next.stop(); } @@ -168,11 +169,11 @@ public class InactivityMonitor extends TransportFilter { if (LOG.isDebugEnabled()) { LOG.debug("No message received since last read check for " + toString() + "! Throwing InactivityIOException."); } - ASYNC_TASKS.execute(new Runnable() { + ASYNC_TASKS.execute(new Runnable() { public void run() { onException(new InactivityIOException("Channel was inactive for too long: "+next.getRemoteAddress())); }; - + }); } else { if (LOG.isTraceEnabled()) { @@ -216,7 +217,7 @@ public class InactivityMonitor extends TransportFilter { } } } finally { - + inReceive.set(false); } } @@ -224,12 +225,12 @@ public class InactivityMonitor extends TransportFilter { public void oneway(Object o) throws IOException { // Disable inactivity monitoring while processing a command. //synchronize this method - its not synchronized - //further down the transport stack and gets called by more + //further down the transport stack and gets called by more //than one thread by this class synchronized(inSend) { inSend.set(true); try { - + if( failed.get() ) { throw new InactivityIOException("Channel was inactive for too long: "+next.getRemoteAddress()); } @@ -252,12 +253,16 @@ public class InactivityMonitor extends TransportFilter { stopMonitorThreads(); transportListener.onException(error); } - } - + } + public void setKeepAliveResponseRequired(boolean val) { keepAliveResponseRequired = val; } + public void setIgnoreRemoteWireFormat(boolean val) { + ignoreRemoteWireFormat = val; + } + private synchronized void startMonitorThreads() throws IOException { if (monitorStarted.get()) { return; @@ -269,19 +274,25 @@ public class InactivityMonitor extends TransportFilter { return; } - readCheckTime = Math.min(localWireFormatInfo.getMaxInactivityDuration(), remoteWireFormatInfo.getMaxInactivityDuration()); - initialDelayTime = Math.min(localWireFormatInfo.getMaxInactivityDurationInitalDelay(), remoteWireFormatInfo.getMaxInactivityDurationInitalDelay()); + if (!ignoreRemoteWireFormat) { + readCheckTime = Math.min(localWireFormatInfo.getMaxInactivityDuration(), remoteWireFormatInfo.getMaxInactivityDuration()); + initialDelayTime = Math.min(localWireFormatInfo.getMaxInactivityDurationInitalDelay(), remoteWireFormatInfo.getMaxInactivityDurationInitalDelay()); + } else { + readCheckTime = localWireFormatInfo.getMaxInactivityDuration(); + initialDelayTime = localWireFormatInfo.getMaxInactivityDurationInitalDelay(); + } + if (readCheckTime > 0) { monitorStarted.set(true); writeCheckerTask = new SchedulerTimerTask(writeChecker); readCheckerTask = new SchedulerTimerTask(readChecker); writeCheckTime = readCheckTime>3 ? readCheckTime/3 : readCheckTime; synchronized( InactivityMonitor.class ) { - if( CHECKER_COUNTER == 0 ) { - READ_CHECK_TIMER = new Timer("InactivityMonitor ReadCheck",true); - WRITE_CHECK_TIMER = new Timer("InactivityMonitor WriteCheck",true); - } - CHECKER_COUNTER++; + if( CHECKER_COUNTER == 0 ) { + READ_CHECK_TIMER = new Timer("InactivityMonitor ReadCheck",true); + WRITE_CHECK_TIMER = new Timer("InactivityMonitor WriteCheck",true); + } + CHECKER_COUNTER++; WRITE_CHECK_TIMER.scheduleAtFixedRate(writeCheckerTask, initialDelayTime,writeCheckTime); READ_CHECK_TIMER.scheduleAtFixedRate(readCheckerTask, initialDelayTime,readCheckTime); } @@ -296,20 +307,20 @@ public class InactivityMonitor extends TransportFilter { readCheckerTask.cancel(); writeCheckerTask.cancel(); synchronized( InactivityMonitor.class ) { - WRITE_CHECK_TIMER.purge(); - READ_CHECK_TIMER.purge(); - CHECKER_COUNTER--; - if(CHECKER_COUNTER==0) { - WRITE_CHECK_TIMER.cancel(); - READ_CHECK_TIMER.cancel(); - WRITE_CHECK_TIMER = null; - READ_CHECK_TIMER = null; - } + WRITE_CHECK_TIMER.purge(); + READ_CHECK_TIMER.purge(); + CHECKER_COUNTER--; + if(CHECKER_COUNTER==0) { + WRITE_CHECK_TIMER.cancel(); + READ_CHECK_TIMER.cancel(); + WRITE_CHECK_TIMER = null; + READ_CHECK_TIMER = null; + } } } } - - + + static { ASYNC_TASKS = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 10, TimeUnit.SECONDS, new SynchronousQueue(), new ThreadFactory() { public Thread newThread(Runnable runnable) {