Added separately configurable initial delay for timeout tasks on InactivityMonitor

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@638910 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2008-03-19 16:07:54 +00:00
parent c18b583020
commit f55923619d
6 changed files with 71 additions and 30 deletions

View File

@ -259,9 +259,20 @@ public class WireFormatInfo implements Command, MarshallAware {
return l == null ? 0 : l.longValue();
}
public void seMaxInactivityDuration(long maxInactivityDuration) throws IOException {
public void setMaxInactivityDuration(long maxInactivityDuration) throws IOException {
setProperty("MaxInactivityDuration", new Long(maxInactivityDuration));
}
public long getMaxInactivityDurationInitalDelay() throws IOException {
Long l = (Long)getProperty("MaxInactivityDurationInitalDelay");
return l == null ? 0 : l.longValue();
}
public void setMaxInactivityDurationInitalDelay(long maxInactivityDurationInitalDelay) throws IOException {
setProperty("MaxInactivityDurationInitalDelay", new Long(maxInactivityDurationInitalDelay));
}
/**
* @throws IOException

View File

@ -36,7 +36,8 @@ public class OpenWireFormatFactory implements WireFormatFactory {
private boolean cacheEnabled = true;
private boolean tightEncodingEnabled = true;
private boolean sizePrefixDisabled;
private long maxInactivityDuration = 30 * 1000;
private long maxInactivityDuration = 30*1000;
private long maxInactivityDurationInitalDelay = 10*1000;
private int cacheSize = 1024;
public WireFormat createWireFormat() {
@ -49,7 +50,8 @@ public class OpenWireFormatFactory implements WireFormatFactory {
info.setTcpNoDelayEnabled(tcpNoDelayEnabled);
info.setTightEncodingEnabled(tightEncodingEnabled);
info.setSizePrefixDisabled(sizePrefixDisabled);
info.seMaxInactivityDuration(maxInactivityDuration);
info.setMaxInactivityDuration(maxInactivityDuration);
info.setMaxInactivityDurationInitalDelay(maxInactivityDurationInitalDelay);
info.setCacheSize(cacheSize);
} catch (Exception e) {
IllegalStateException ise = new IllegalStateException("Could not configure WireFormatInfo");
@ -125,4 +127,13 @@ public class OpenWireFormatFactory implements WireFormatFactory {
public void setCacheSize(int cacheSize) {
this.cacheSize = cacheSize;
}
public long getMaxInactivityDurationInitalDelay() {
return maxInactivityDurationInitalDelay;
}
public void setMaxInactivityDurationInitalDelay(
long maxInactivityDurationInitalDelay) {
this.maxInactivityDurationInitalDelay = maxInactivityDurationInitalDelay;
}
}

View File

@ -23,6 +23,7 @@ import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.command.KeepAliveInfo;
import org.apache.activemq.command.WireFormatInfo;
import org.apache.activemq.thread.SchedulerTimerTask;
@ -51,6 +52,7 @@ public class InactivityMonitor extends TransportFilter {
private final AtomicBoolean commandSent = new AtomicBoolean(false);
private final AtomicBoolean inSend = new AtomicBoolean(false);
private final AtomicBoolean failed = new AtomicBoolean(false);
private final AtomicBoolean stopped = new AtomicBoolean(false);
private final AtomicBoolean commandReceived = new AtomicBoolean(true);
private final AtomicBoolean inReceive = new AtomicBoolean(false);
@ -59,6 +61,7 @@ public class InactivityMonitor extends TransportFilter {
private long readCheckTime;
private long writeCheckTime;
private long initialDelayTime;
private final Runnable readChecker = new Runnable() {
long lastRunTime;
@ -107,7 +110,7 @@ public class InactivityMonitor extends TransportFilter {
}
public void stop() throws Exception {
stopMonitorThreads();
closeDown();
next.stop();
}
@ -125,12 +128,15 @@ public class InactivityMonitor extends TransportFilter {
}
ASYNC_TASKS.execute(new Runnable() {
public void run() {
try {
KeepAliveInfo info = new KeepAliveInfo();
info.setResponseRequired(true);
oneway(info);
} catch (IOException e) {
onException(e);
if (stopped.get() == false) {
try {
KeepAliveInfo info = new KeepAliveInfo();
info.setResponseRequired(true);
oneway(info);
} catch (IOException e) {
onException(e);
}
}
};
});
@ -155,9 +161,10 @@ public class InactivityMonitor extends TransportFilter {
if (LOG.isDebugEnabled()) {
LOG.debug("No message received since last read check for " + toString() + "! Throwing InactivityIOException.");
}
closeDown();
ASYNC_TASKS.execute(new Runnable() {
public void run() {
handleException(new InactivityIOException("Channel was inactive for too long: "+next.getRemoteAddress()));
onException(new InactivityIOException("Channel was inactive for too long: "+next.getRemoteAddress()));
};
});
@ -218,15 +225,17 @@ public class InactivityMonitor extends TransportFilter {
synchronized(inSend) {
inSend.set(true);
try {
if( failed.get() ) {
closeDown();
throw new InactivityIOException("Channel was inactive for too long: "+next.getRemoteAddress());
}
if (o.getClass() == WireFormatInfo.class) {
synchronized (this) {
localWireFormatInfo = (WireFormatInfo)o;
startMonitorThreads();
}
}
if( failed.get() ) {
throw new InactivityIOException("Channel was inactive for too long: "+next.getRemoteAddress());
}
next.oneway(o);
} finally {
commandSent.set(true);
@ -236,17 +245,18 @@ public class InactivityMonitor extends TransportFilter {
}
public void onException(IOException error) {
if( !failed.getAndSet(true) ) {
handleException(error);
}
closeDown();
if (!failed.getAndSet(true)) {
transportListener.onException(error);
}
}
private void closeDown() {
stopped.set(true);
if (monitorStarted.get()) {
stopMonitorThreads();
}
}
private void handleException(IOException error) {
if (monitorStarted.get()) {
stopMonitorThreads();
}
transportListener.onException(error);
}
private synchronized void startMonitorThreads() throws IOException {
if (monitorStarted.get()) {
@ -260,6 +270,7 @@ public class InactivityMonitor extends TransportFilter {
}
readCheckTime = Math.min(localWireFormatInfo.getMaxInactivityDuration(), remoteWireFormatInfo.getMaxInactivityDuration());
initialDelayTime = Math.min(localWireFormatInfo.getMaxInactivityDurationInitalDelay(), remoteWireFormatInfo.getMaxInactivityDurationInitalDelay());
if (readCheckTime > 0) {
monitorStarted.set(true);
writeCheckerTask = new SchedulerTimerTask(writeChecker);
@ -271,8 +282,8 @@ public class InactivityMonitor extends TransportFilter {
WRITE_CHECK_TIMER = new Timer("InactivityMonitor WriteCheck");
}
CHECKER_COUNTER++;
WRITE_CHECK_TIMER.scheduleAtFixedRate(writeCheckerTask, writeCheckTime,writeCheckTime);
READ_CHECK_TIMER.scheduleAtFixedRate(readCheckerTask, readCheckTime,readCheckTime);
WRITE_CHECK_TIMER.scheduleAtFixedRate(writeCheckerTask, initialDelayTime,writeCheckTime);
READ_CHECK_TIMER.scheduleAtFixedRate(readCheckerTask, initialDelayTime,readCheckTime);
}
}
}

View File

@ -45,6 +45,7 @@ class BackupTransport extends DefaultTransportListener{
}
public void setTransport(Transport transport) {
this.transport = transport;
this.transport.setTransportListener(this);
}
public URI getUri() {
return uri;

View File

@ -107,9 +107,12 @@ public class FailoverTransport implements CompositeTransport {
public boolean iterate() {
boolean result=false;
boolean buildBackup=true;
if (connectedTransport.get()==null && !disposed) {
result=doReconnect();
buildBackup=false;
boolean doReconnect = !disposed;
synchronized(backupMutex) {
if (connectedTransport.get()==null && !disposed) {
result=doReconnect();
buildBackup=false;
}
}
if(buildBackup) {
buildBackups();
@ -253,6 +256,10 @@ public class FailoverTransport implements CompositeTransport {
started = false;
disposed = true;
connected = false;
for (BackupTransport t:backups) {
t.setDisposed(true);
}
backups.clear();
if (connectedTransport.get() != null) {
transportToStop = connectedTransport.getAndSet(null);

View File

@ -186,7 +186,7 @@ public class InactivityMonitorTest extends CombinationTestSupport implements Tra
});
clientTransport.start();
WireFormatInfo info = new WireFormatInfo();
info.seMaxInactivityDuration(1000);
info.setMaxInactivityDuration(1000);
clientTransport.oneway(info);
assertEquals(0, serverErrorCount.get());