From f55923619d34be7402bab9d3a9ad50b41d8efcc3 Mon Sep 17 00:00:00 2001 From: Robert Davies Date: Wed, 19 Mar 2008 16:07:54 +0000 Subject: [PATCH] Added separately configurable initial delay for timeout tasks on InactivityMonitor git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@638910 13f79535-47bb-0310-9956-ffa450edef68 --- .../activemq/command/WireFormatInfo.java | 13 ++++- .../openwire/OpenWireFormatFactory.java | 15 ++++- .../activemq/transport/InactivityMonitor.java | 57 +++++++++++-------- .../transport/failover/BackupTransport.java | 1 + .../transport/failover/FailoverTransport.java | 13 ++++- .../transport/tcp/InactivityMonitorTest.java | 2 +- 6 files changed, 71 insertions(+), 30 deletions(-) diff --git a/activemq-core/src/main/java/org/apache/activemq/command/WireFormatInfo.java b/activemq-core/src/main/java/org/apache/activemq/command/WireFormatInfo.java index 05e25a72d8..f44bc0f5a7 100755 --- a/activemq-core/src/main/java/org/apache/activemq/command/WireFormatInfo.java +++ b/activemq-core/src/main/java/org/apache/activemq/command/WireFormatInfo.java @@ -259,9 +259,20 @@ public class WireFormatInfo implements Command, MarshallAware { return l == null ? 0 : l.longValue(); } - public void seMaxInactivityDuration(long maxInactivityDuration) throws IOException { + public void setMaxInactivityDuration(long maxInactivityDuration) throws IOException { setProperty("MaxInactivityDuration", new Long(maxInactivityDuration)); } + + public long getMaxInactivityDurationInitalDelay() throws IOException { + Long l = (Long)getProperty("MaxInactivityDurationInitalDelay"); + return l == null ? 0 : l.longValue(); + } + + public void setMaxInactivityDurationInitalDelay(long maxInactivityDurationInitalDelay) throws IOException { + setProperty("MaxInactivityDurationInitalDelay", new Long(maxInactivityDurationInitalDelay)); + } + + /** * @throws IOException diff --git a/activemq-core/src/main/java/org/apache/activemq/openwire/OpenWireFormatFactory.java b/activemq-core/src/main/java/org/apache/activemq/openwire/OpenWireFormatFactory.java index 376cb64f11..0b8426ab8d 100755 --- a/activemq-core/src/main/java/org/apache/activemq/openwire/OpenWireFormatFactory.java +++ b/activemq-core/src/main/java/org/apache/activemq/openwire/OpenWireFormatFactory.java @@ -36,7 +36,8 @@ public class OpenWireFormatFactory implements WireFormatFactory { private boolean cacheEnabled = true; private boolean tightEncodingEnabled = true; private boolean sizePrefixDisabled; - private long maxInactivityDuration = 30 * 1000; + private long maxInactivityDuration = 30*1000; + private long maxInactivityDurationInitalDelay = 10*1000; private int cacheSize = 1024; public WireFormat createWireFormat() { @@ -49,7 +50,8 @@ public class OpenWireFormatFactory implements WireFormatFactory { info.setTcpNoDelayEnabled(tcpNoDelayEnabled); info.setTightEncodingEnabled(tightEncodingEnabled); info.setSizePrefixDisabled(sizePrefixDisabled); - info.seMaxInactivityDuration(maxInactivityDuration); + info.setMaxInactivityDuration(maxInactivityDuration); + info.setMaxInactivityDurationInitalDelay(maxInactivityDurationInitalDelay); info.setCacheSize(cacheSize); } catch (Exception e) { IllegalStateException ise = new IllegalStateException("Could not configure WireFormatInfo"); @@ -125,4 +127,13 @@ public class OpenWireFormatFactory implements WireFormatFactory { public void setCacheSize(int cacheSize) { this.cacheSize = cacheSize; } + + public long getMaxInactivityDurationInitalDelay() { + return maxInactivityDurationInitalDelay; + } + + public void setMaxInactivityDurationInitalDelay( + long maxInactivityDurationInitalDelay) { + this.maxInactivityDurationInitalDelay = maxInactivityDurationInitalDelay; + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java b/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java index cc31a265be..0a06ee912c 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java @@ -23,6 +23,7 @@ import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; + import org.apache.activemq.command.KeepAliveInfo; import org.apache.activemq.command.WireFormatInfo; import org.apache.activemq.thread.SchedulerTimerTask; @@ -51,6 +52,7 @@ public class InactivityMonitor extends TransportFilter { private final AtomicBoolean commandSent = new AtomicBoolean(false); private final AtomicBoolean inSend = new AtomicBoolean(false); private final AtomicBoolean failed = new AtomicBoolean(false); + private final AtomicBoolean stopped = new AtomicBoolean(false); private final AtomicBoolean commandReceived = new AtomicBoolean(true); private final AtomicBoolean inReceive = new AtomicBoolean(false); @@ -59,6 +61,7 @@ public class InactivityMonitor extends TransportFilter { private long readCheckTime; private long writeCheckTime; + private long initialDelayTime; private final Runnable readChecker = new Runnable() { long lastRunTime; @@ -107,7 +110,7 @@ public class InactivityMonitor extends TransportFilter { } public void stop() throws Exception { - stopMonitorThreads(); + closeDown(); next.stop(); } @@ -125,12 +128,15 @@ public class InactivityMonitor extends TransportFilter { } ASYNC_TASKS.execute(new Runnable() { public void run() { - try { - KeepAliveInfo info = new KeepAliveInfo(); - info.setResponseRequired(true); - oneway(info); - } catch (IOException e) { - onException(e); + if (stopped.get() == false) { + try { + + KeepAliveInfo info = new KeepAliveInfo(); + info.setResponseRequired(true); + oneway(info); + } catch (IOException e) { + onException(e); + } } }; }); @@ -155,9 +161,10 @@ public class InactivityMonitor extends TransportFilter { if (LOG.isDebugEnabled()) { LOG.debug("No message received since last read check for " + toString() + "! Throwing InactivityIOException."); } + closeDown(); ASYNC_TASKS.execute(new Runnable() { public void run() { - handleException(new InactivityIOException("Channel was inactive for too long: "+next.getRemoteAddress())); + onException(new InactivityIOException("Channel was inactive for too long: "+next.getRemoteAddress())); }; }); @@ -218,15 +225,17 @@ public class InactivityMonitor extends TransportFilter { synchronized(inSend) { inSend.set(true); try { + + if( failed.get() ) { + closeDown(); + throw new InactivityIOException("Channel was inactive for too long: "+next.getRemoteAddress()); + } if (o.getClass() == WireFormatInfo.class) { synchronized (this) { localWireFormatInfo = (WireFormatInfo)o; startMonitorThreads(); } } - if( failed.get() ) { - throw new InactivityIOException("Channel was inactive for too long: "+next.getRemoteAddress()); - } next.oneway(o); } finally { commandSent.set(true); @@ -236,17 +245,18 @@ public class InactivityMonitor extends TransportFilter { } public void onException(IOException error) { - if( !failed.getAndSet(true) ) { - handleException(error); - } + closeDown(); + if (!failed.getAndSet(true)) { + transportListener.onException(error); + } + } + + private void closeDown() { + stopped.set(true); + if (monitorStarted.get()) { + stopMonitorThreads(); + } } - - private void handleException(IOException error) { - if (monitorStarted.get()) { - stopMonitorThreads(); - } - transportListener.onException(error); - } private synchronized void startMonitorThreads() throws IOException { if (monitorStarted.get()) { @@ -260,6 +270,7 @@ public class InactivityMonitor extends TransportFilter { } readCheckTime = Math.min(localWireFormatInfo.getMaxInactivityDuration(), remoteWireFormatInfo.getMaxInactivityDuration()); + initialDelayTime = Math.min(localWireFormatInfo.getMaxInactivityDurationInitalDelay(), remoteWireFormatInfo.getMaxInactivityDurationInitalDelay()); if (readCheckTime > 0) { monitorStarted.set(true); writeCheckerTask = new SchedulerTimerTask(writeChecker); @@ -271,8 +282,8 @@ public class InactivityMonitor extends TransportFilter { WRITE_CHECK_TIMER = new Timer("InactivityMonitor WriteCheck"); } CHECKER_COUNTER++; - WRITE_CHECK_TIMER.scheduleAtFixedRate(writeCheckerTask, writeCheckTime,writeCheckTime); - READ_CHECK_TIMER.scheduleAtFixedRate(readCheckerTask, readCheckTime,readCheckTime); + WRITE_CHECK_TIMER.scheduleAtFixedRate(writeCheckerTask, initialDelayTime,writeCheckTime); + READ_CHECK_TIMER.scheduleAtFixedRate(readCheckerTask, initialDelayTime,readCheckTime); } } } diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/failover/BackupTransport.java b/activemq-core/src/main/java/org/apache/activemq/transport/failover/BackupTransport.java index b9e67eff70..e4b7656c5d 100644 --- a/activemq-core/src/main/java/org/apache/activemq/transport/failover/BackupTransport.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/failover/BackupTransport.java @@ -45,6 +45,7 @@ class BackupTransport extends DefaultTransportListener{ } public void setTransport(Transport transport) { this.transport = transport; + this.transport.setTransportListener(this); } public URI getUri() { return uri; diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java b/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java index 3940746b81..a78792485a 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java @@ -107,9 +107,12 @@ public class FailoverTransport implements CompositeTransport { public boolean iterate() { boolean result=false; boolean buildBackup=true; - if (connectedTransport.get()==null && !disposed) { - result=doReconnect(); - buildBackup=false; + boolean doReconnect = !disposed; + synchronized(backupMutex) { + if (connectedTransport.get()==null && !disposed) { + result=doReconnect(); + buildBackup=false; + } } if(buildBackup) { buildBackups(); @@ -253,6 +256,10 @@ public class FailoverTransport implements CompositeTransport { started = false; disposed = true; connected = false; + for (BackupTransport t:backups) { + t.setDisposed(true); + } + backups.clear(); if (connectedTransport.get() != null) { transportToStop = connectedTransport.getAndSet(null); diff --git a/activemq-core/src/test/java/org/apache/activemq/transport/tcp/InactivityMonitorTest.java b/activemq-core/src/test/java/org/apache/activemq/transport/tcp/InactivityMonitorTest.java index 96267b7605..dac7a1d1d2 100644 --- a/activemq-core/src/test/java/org/apache/activemq/transport/tcp/InactivityMonitorTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/transport/tcp/InactivityMonitorTest.java @@ -186,7 +186,7 @@ public class InactivityMonitorTest extends CombinationTestSupport implements Tra }); clientTransport.start(); WireFormatInfo info = new WireFormatInfo(); - info.seMaxInactivityDuration(1000); + info.setMaxInactivityDuration(1000); clientTransport.oneway(info); assertEquals(0, serverErrorCount.get());