mirror of https://github.com/apache/activemq.git
Add option to ignore the values in the remote WireFormatInfo in case the user wants control over the Brokers inactivity timeouts. git-svn-id: https://svn.apache.org/repos/asf/activemq/branches/activemq-5.3@905779 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
e0ce1d770d
commit
88fbea116d
|
@ -35,18 +35,18 @@ import org.apache.commons.logging.LogFactory;
|
|||
/**
|
||||
* Used to make sure that commands are arriving periodically from the peer of
|
||||
* the transport.
|
||||
*
|
||||
*
|
||||
* @version $Revision$
|
||||
*/
|
||||
public class InactivityMonitor extends TransportFilter {
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(InactivityMonitor.class);
|
||||
private static final ThreadPoolExecutor ASYNC_TASKS;
|
||||
|
||||
|
||||
private static int CHECKER_COUNTER;
|
||||
private static Timer READ_CHECK_TIMER;
|
||||
private static Timer WRITE_CHECK_TIMER;
|
||||
|
||||
|
||||
private WireFormatInfo localWireFormatInfo;
|
||||
private WireFormatInfo remoteWireFormatInfo;
|
||||
private final AtomicBoolean monitorStarted = new AtomicBoolean(false);
|
||||
|
@ -58,16 +58,17 @@ public class InactivityMonitor extends TransportFilter {
|
|||
private final AtomicBoolean commandReceived = new AtomicBoolean(true);
|
||||
private final AtomicBoolean inReceive = new AtomicBoolean(false);
|
||||
private final AtomicInteger lastReceiveCounter = new AtomicInteger(0);
|
||||
|
||||
|
||||
private SchedulerTimerTask writeCheckerTask;
|
||||
private SchedulerTimerTask readCheckerTask;
|
||||
|
||||
|
||||
private boolean ignoreRemoteWireFormat = false;
|
||||
private long readCheckTime;
|
||||
private long writeCheckTime;
|
||||
private long initialDelayTime;
|
||||
private boolean keepAliveResponseRequired;
|
||||
private WireFormat wireFormat;
|
||||
|
||||
|
||||
private final Runnable readChecker = new Runnable() {
|
||||
long lastRunTime;
|
||||
public void run() {
|
||||
|
@ -77,22 +78,22 @@ public class InactivityMonitor extends TransportFilter {
|
|||
if( lastRunTime != 0 && LOG.isDebugEnabled() ) {
|
||||
LOG.debug(""+elapsed+" ms elapsed since last read check.");
|
||||
}
|
||||
|
||||
|
||||
// Perhaps the timer executed a read check late.. and then executes
|
||||
// the next read check on time which causes the time elapsed between
|
||||
// read checks to be small..
|
||||
|
||||
// If less than 90% of the read check Time elapsed then abort this readcheck.
|
||||
|
||||
// If less than 90% of the read check Time elapsed then abort this readcheck.
|
||||
if( !allowReadCheck(elapsed) ) { // FUNKY qdox bug does not allow me to inline this expression.
|
||||
LOG.debug("Aborting read check.. Not enough time elapsed since last read check.");
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
lastRunTime = now;
|
||||
readCheck();
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
private boolean allowReadCheck(long elapsed) {
|
||||
return elapsed > (readCheckTime * 9 / 10);
|
||||
}
|
||||
|
@ -103,9 +104,9 @@ public class InactivityMonitor extends TransportFilter {
|
|||
long now = System.currentTimeMillis();
|
||||
if( lastRunTime != 0 && LOG.isDebugEnabled() ) {
|
||||
LOG.debug(""+(now-lastRunTime)+" ms elapsed since last write check.");
|
||||
|
||||
|
||||
}
|
||||
lastRunTime = now;
|
||||
lastRunTime = now;
|
||||
writeCheck();
|
||||
}
|
||||
};
|
||||
|
@ -116,7 +117,7 @@ public class InactivityMonitor extends TransportFilter {
|
|||
}
|
||||
|
||||
public void stop() throws Exception {
|
||||
stopMonitorThreads();
|
||||
stopMonitorThreads();
|
||||
next.stop();
|
||||
}
|
||||
|
||||
|
@ -168,11 +169,11 @@ public class InactivityMonitor extends TransportFilter {
|
|||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("No message received since last read check for " + toString() + "! Throwing InactivityIOException.");
|
||||
}
|
||||
ASYNC_TASKS.execute(new Runnable() {
|
||||
ASYNC_TASKS.execute(new Runnable() {
|
||||
public void run() {
|
||||
onException(new InactivityIOException("Channel was inactive for too long: "+next.getRemoteAddress()));
|
||||
};
|
||||
|
||||
|
||||
});
|
||||
} else {
|
||||
if (LOG.isTraceEnabled()) {
|
||||
|
@ -216,7 +217,7 @@ public class InactivityMonitor extends TransportFilter {
|
|||
}
|
||||
}
|
||||
} finally {
|
||||
|
||||
|
||||
inReceive.set(false);
|
||||
}
|
||||
}
|
||||
|
@ -224,12 +225,12 @@ public class InactivityMonitor extends TransportFilter {
|
|||
public void oneway(Object o) throws IOException {
|
||||
// Disable inactivity monitoring while processing a command.
|
||||
//synchronize this method - its not synchronized
|
||||
//further down the transport stack and gets called by more
|
||||
//further down the transport stack and gets called by more
|
||||
//than one thread by this class
|
||||
synchronized(inSend) {
|
||||
inSend.set(true);
|
||||
try {
|
||||
|
||||
|
||||
if( failed.get() ) {
|
||||
throw new InactivityIOException("Channel was inactive for too long: "+next.getRemoteAddress());
|
||||
}
|
||||
|
@ -252,12 +253,16 @@ public class InactivityMonitor extends TransportFilter {
|
|||
stopMonitorThreads();
|
||||
transportListener.onException(error);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public void setKeepAliveResponseRequired(boolean val) {
|
||||
keepAliveResponseRequired = val;
|
||||
}
|
||||
|
||||
public void setIgnoreRemoteWireFormat(boolean val) {
|
||||
ignoreRemoteWireFormat = val;
|
||||
}
|
||||
|
||||
private synchronized void startMonitorThreads() throws IOException {
|
||||
if (monitorStarted.get()) {
|
||||
return;
|
||||
|
@ -269,19 +274,25 @@ public class InactivityMonitor extends TransportFilter {
|
|||
return;
|
||||
}
|
||||
|
||||
readCheckTime = Math.min(localWireFormatInfo.getMaxInactivityDuration(), remoteWireFormatInfo.getMaxInactivityDuration());
|
||||
initialDelayTime = Math.min(localWireFormatInfo.getMaxInactivityDurationInitalDelay(), remoteWireFormatInfo.getMaxInactivityDurationInitalDelay());
|
||||
if (!ignoreRemoteWireFormat) {
|
||||
readCheckTime = Math.min(localWireFormatInfo.getMaxInactivityDuration(), remoteWireFormatInfo.getMaxInactivityDuration());
|
||||
initialDelayTime = Math.min(localWireFormatInfo.getMaxInactivityDurationInitalDelay(), remoteWireFormatInfo.getMaxInactivityDurationInitalDelay());
|
||||
} else {
|
||||
readCheckTime = localWireFormatInfo.getMaxInactivityDuration();
|
||||
initialDelayTime = localWireFormatInfo.getMaxInactivityDurationInitalDelay();
|
||||
}
|
||||
|
||||
if (readCheckTime > 0) {
|
||||
monitorStarted.set(true);
|
||||
writeCheckerTask = new SchedulerTimerTask(writeChecker);
|
||||
readCheckerTask = new SchedulerTimerTask(readChecker);
|
||||
writeCheckTime = readCheckTime>3 ? readCheckTime/3 : readCheckTime;
|
||||
synchronized( InactivityMonitor.class ) {
|
||||
if( CHECKER_COUNTER == 0 ) {
|
||||
READ_CHECK_TIMER = new Timer("InactivityMonitor ReadCheck",true);
|
||||
WRITE_CHECK_TIMER = new Timer("InactivityMonitor WriteCheck",true);
|
||||
}
|
||||
CHECKER_COUNTER++;
|
||||
if( CHECKER_COUNTER == 0 ) {
|
||||
READ_CHECK_TIMER = new Timer("InactivityMonitor ReadCheck",true);
|
||||
WRITE_CHECK_TIMER = new Timer("InactivityMonitor WriteCheck",true);
|
||||
}
|
||||
CHECKER_COUNTER++;
|
||||
WRITE_CHECK_TIMER.scheduleAtFixedRate(writeCheckerTask, initialDelayTime,writeCheckTime);
|
||||
READ_CHECK_TIMER.scheduleAtFixedRate(readCheckerTask, initialDelayTime,readCheckTime);
|
||||
}
|
||||
|
@ -296,20 +307,20 @@ public class InactivityMonitor extends TransportFilter {
|
|||
readCheckerTask.cancel();
|
||||
writeCheckerTask.cancel();
|
||||
synchronized( InactivityMonitor.class ) {
|
||||
WRITE_CHECK_TIMER.purge();
|
||||
READ_CHECK_TIMER.purge();
|
||||
CHECKER_COUNTER--;
|
||||
if(CHECKER_COUNTER==0) {
|
||||
WRITE_CHECK_TIMER.cancel();
|
||||
READ_CHECK_TIMER.cancel();
|
||||
WRITE_CHECK_TIMER = null;
|
||||
READ_CHECK_TIMER = null;
|
||||
}
|
||||
WRITE_CHECK_TIMER.purge();
|
||||
READ_CHECK_TIMER.purge();
|
||||
CHECKER_COUNTER--;
|
||||
if(CHECKER_COUNTER==0) {
|
||||
WRITE_CHECK_TIMER.cancel();
|
||||
READ_CHECK_TIMER.cancel();
|
||||
WRITE_CHECK_TIMER = null;
|
||||
READ_CHECK_TIMER = null;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
static {
|
||||
ASYNC_TASKS = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 10, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new ThreadFactory() {
|
||||
public Thread newThread(Runnable runnable) {
|
||||
|
|
Loading…
Reference in New Issue