Add option to ignore the values in the remote WireFormatInfo in case the user wants control over the Brokers inactivity timeouts.

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@905769 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Timothy A. Bish 2010-02-02 20:10:39 +00:00
parent c32820d87b
commit 4d0a8a5c7e
1 changed files with 50 additions and 39 deletions

View File

@ -35,18 +35,18 @@ import org.apache.commons.logging.LogFactory;
/** /**
* Used to make sure that commands are arriving periodically from the peer of * Used to make sure that commands are arriving periodically from the peer of
* the transport. * the transport.
* *
* @version $Revision$ * @version $Revision$
*/ */
public class InactivityMonitor extends TransportFilter { public class InactivityMonitor extends TransportFilter {
private static final Log LOG = LogFactory.getLog(InactivityMonitor.class); private static final Log LOG = LogFactory.getLog(InactivityMonitor.class);
private static final ThreadPoolExecutor ASYNC_TASKS; private static final ThreadPoolExecutor ASYNC_TASKS;
private static int CHECKER_COUNTER; private static int CHECKER_COUNTER;
private static Timer READ_CHECK_TIMER; private static Timer READ_CHECK_TIMER;
private static Timer WRITE_CHECK_TIMER; private static Timer WRITE_CHECK_TIMER;
private WireFormatInfo localWireFormatInfo; private WireFormatInfo localWireFormatInfo;
private WireFormatInfo remoteWireFormatInfo; private WireFormatInfo remoteWireFormatInfo;
private final AtomicBoolean monitorStarted = new AtomicBoolean(false); private final AtomicBoolean monitorStarted = new AtomicBoolean(false);
@ -58,16 +58,17 @@ public class InactivityMonitor extends TransportFilter {
private final AtomicBoolean commandReceived = new AtomicBoolean(true); private final AtomicBoolean commandReceived = new AtomicBoolean(true);
private final AtomicBoolean inReceive = new AtomicBoolean(false); private final AtomicBoolean inReceive = new AtomicBoolean(false);
private final AtomicInteger lastReceiveCounter = new AtomicInteger(0); private final AtomicInteger lastReceiveCounter = new AtomicInteger(0);
private SchedulerTimerTask writeCheckerTask; private SchedulerTimerTask writeCheckerTask;
private SchedulerTimerTask readCheckerTask; private SchedulerTimerTask readCheckerTask;
private boolean ignoreRemoteWireFormat = false;
private long readCheckTime; private long readCheckTime;
private long writeCheckTime; private long writeCheckTime;
private long initialDelayTime; private long initialDelayTime;
private boolean keepAliveResponseRequired; private boolean keepAliveResponseRequired;
private WireFormat wireFormat; private WireFormat wireFormat;
private final Runnable readChecker = new Runnable() { private final Runnable readChecker = new Runnable() {
long lastRunTime; long lastRunTime;
public void run() { public void run() {
@ -77,22 +78,22 @@ public class InactivityMonitor extends TransportFilter {
if( lastRunTime != 0 && LOG.isDebugEnabled() ) { if( lastRunTime != 0 && LOG.isDebugEnabled() ) {
LOG.debug(""+elapsed+" ms elapsed since last read check."); LOG.debug(""+elapsed+" ms elapsed since last read check.");
} }
// Perhaps the timer executed a read check late.. and then executes // Perhaps the timer executed a read check late.. and then executes
// the next read check on time which causes the time elapsed between // the next read check on time which causes the time elapsed between
// read checks to be small.. // read checks to be small..
// If less than 90% of the read check Time elapsed then abort this readcheck. // If less than 90% of the read check Time elapsed then abort this readcheck.
if( !allowReadCheck(elapsed) ) { // FUNKY qdox bug does not allow me to inline this expression. if( !allowReadCheck(elapsed) ) { // FUNKY qdox bug does not allow me to inline this expression.
LOG.debug("Aborting read check.. Not enough time elapsed since last read check."); LOG.debug("Aborting read check.. Not enough time elapsed since last read check.");
return; return;
} }
lastRunTime = now; lastRunTime = now;
readCheck(); readCheck();
} }
}; };
private boolean allowReadCheck(long elapsed) { private boolean allowReadCheck(long elapsed) {
return elapsed > (readCheckTime * 9 / 10); return elapsed > (readCheckTime * 9 / 10);
} }
@ -103,9 +104,9 @@ public class InactivityMonitor extends TransportFilter {
long now = System.currentTimeMillis(); long now = System.currentTimeMillis();
if( lastRunTime != 0 && LOG.isDebugEnabled() ) { if( lastRunTime != 0 && LOG.isDebugEnabled() ) {
LOG.debug(""+(now-lastRunTime)+" ms elapsed since last write check."); LOG.debug(""+(now-lastRunTime)+" ms elapsed since last write check.");
} }
lastRunTime = now; lastRunTime = now;
writeCheck(); writeCheck();
} }
}; };
@ -116,7 +117,7 @@ public class InactivityMonitor extends TransportFilter {
} }
public void stop() throws Exception { public void stop() throws Exception {
stopMonitorThreads(); stopMonitorThreads();
next.stop(); next.stop();
} }
@ -168,11 +169,11 @@ public class InactivityMonitor extends TransportFilter {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("No message received since last read check for " + toString() + "! Throwing InactivityIOException."); LOG.debug("No message received since last read check for " + toString() + "! Throwing InactivityIOException.");
} }
ASYNC_TASKS.execute(new Runnable() { ASYNC_TASKS.execute(new Runnable() {
public void run() { public void run() {
onException(new InactivityIOException("Channel was inactive for too (>" + readCheckTime + ") long: "+next.getRemoteAddress())); onException(new InactivityIOException("Channel was inactive for too (>" + readCheckTime + ") long: "+next.getRemoteAddress()));
}; };
}); });
} else { } else {
if (LOG.isTraceEnabled()) { if (LOG.isTraceEnabled()) {
@ -216,7 +217,7 @@ public class InactivityMonitor extends TransportFilter {
} }
} }
} finally { } finally {
inReceive.set(false); inReceive.set(false);
} }
} }
@ -224,12 +225,12 @@ public class InactivityMonitor extends TransportFilter {
public void oneway(Object o) throws IOException { public void oneway(Object o) throws IOException {
// Disable inactivity monitoring while processing a command. // Disable inactivity monitoring while processing a command.
//synchronize this method - its not synchronized //synchronize this method - its not synchronized
//further down the transport stack and gets called by more //further down the transport stack and gets called by more
//than one thread by this class //than one thread by this class
synchronized(inSend) { synchronized(inSend) {
inSend.set(true); inSend.set(true);
try { try {
if( failed.get() ) { if( failed.get() ) {
throw new InactivityIOException("Channel was inactive for too long: "+next.getRemoteAddress()); throw new InactivityIOException("Channel was inactive for too long: "+next.getRemoteAddress());
} }
@ -252,12 +253,16 @@ public class InactivityMonitor extends TransportFilter {
stopMonitorThreads(); stopMonitorThreads();
transportListener.onException(error); transportListener.onException(error);
} }
} }
public void setKeepAliveResponseRequired(boolean val) { public void setKeepAliveResponseRequired(boolean val) {
keepAliveResponseRequired = val; keepAliveResponseRequired = val;
} }
public void setIgnoreRemoteWireFormat(boolean val) {
ignoreRemoteWireFormat = val;
}
private synchronized void startMonitorThreads() throws IOException { private synchronized void startMonitorThreads() throws IOException {
if (monitorStarted.get()) { if (monitorStarted.get()) {
return; return;
@ -269,19 +274,25 @@ public class InactivityMonitor extends TransportFilter {
return; return;
} }
readCheckTime = Math.min(localWireFormatInfo.getMaxInactivityDuration(), remoteWireFormatInfo.getMaxInactivityDuration()); if (!ignoreRemoteWireFormat) {
initialDelayTime = Math.min(localWireFormatInfo.getMaxInactivityDurationInitalDelay(), remoteWireFormatInfo.getMaxInactivityDurationInitalDelay()); readCheckTime = Math.min(localWireFormatInfo.getMaxInactivityDuration(), remoteWireFormatInfo.getMaxInactivityDuration());
initialDelayTime = Math.min(localWireFormatInfo.getMaxInactivityDurationInitalDelay(), remoteWireFormatInfo.getMaxInactivityDurationInitalDelay());
} else {
readCheckTime = localWireFormatInfo.getMaxInactivityDuration();
initialDelayTime = localWireFormatInfo.getMaxInactivityDurationInitalDelay();
}
if (readCheckTime > 0) { if (readCheckTime > 0) {
monitorStarted.set(true); monitorStarted.set(true);
writeCheckerTask = new SchedulerTimerTask(writeChecker); writeCheckerTask = new SchedulerTimerTask(writeChecker);
readCheckerTask = new SchedulerTimerTask(readChecker); readCheckerTask = new SchedulerTimerTask(readChecker);
writeCheckTime = readCheckTime>3 ? readCheckTime/3 : readCheckTime; writeCheckTime = readCheckTime>3 ? readCheckTime/3 : readCheckTime;
synchronized( InactivityMonitor.class ) { synchronized( InactivityMonitor.class ) {
if( CHECKER_COUNTER == 0 ) { if( CHECKER_COUNTER == 0 ) {
READ_CHECK_TIMER = new Timer("InactivityMonitor ReadCheck",true); READ_CHECK_TIMER = new Timer("InactivityMonitor ReadCheck",true);
WRITE_CHECK_TIMER = new Timer("InactivityMonitor WriteCheck",true); WRITE_CHECK_TIMER = new Timer("InactivityMonitor WriteCheck",true);
} }
CHECKER_COUNTER++; CHECKER_COUNTER++;
WRITE_CHECK_TIMER.scheduleAtFixedRate(writeCheckerTask, initialDelayTime,writeCheckTime); WRITE_CHECK_TIMER.scheduleAtFixedRate(writeCheckerTask, initialDelayTime,writeCheckTime);
READ_CHECK_TIMER.scheduleAtFixedRate(readCheckerTask, initialDelayTime,readCheckTime); READ_CHECK_TIMER.scheduleAtFixedRate(readCheckerTask, initialDelayTime,readCheckTime);
} }
@ -296,20 +307,20 @@ public class InactivityMonitor extends TransportFilter {
readCheckerTask.cancel(); readCheckerTask.cancel();
writeCheckerTask.cancel(); writeCheckerTask.cancel();
synchronized( InactivityMonitor.class ) { synchronized( InactivityMonitor.class ) {
WRITE_CHECK_TIMER.purge(); WRITE_CHECK_TIMER.purge();
READ_CHECK_TIMER.purge(); READ_CHECK_TIMER.purge();
CHECKER_COUNTER--; CHECKER_COUNTER--;
if(CHECKER_COUNTER==0) { if(CHECKER_COUNTER==0) {
WRITE_CHECK_TIMER.cancel(); WRITE_CHECK_TIMER.cancel();
READ_CHECK_TIMER.cancel(); READ_CHECK_TIMER.cancel();
WRITE_CHECK_TIMER = null; WRITE_CHECK_TIMER = null;
READ_CHECK_TIMER = null; READ_CHECK_TIMER = null;
} }
} }
} }
} }
static { static {
ASYNC_TASKS = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 10, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new ThreadFactory() { ASYNC_TASKS = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 10, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new ThreadFactory() {
public Thread newThread(Runnable runnable) { public Thread newThread(Runnable runnable) {