diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java b/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java index 6c6f20b501..88aadd2989 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java @@ -27,14 +27,14 @@ import java.io.IOException; import java.util.concurrent.atomic.AtomicBoolean; /** - * Used to make sure that commands are arriving periodically from the peer of the transport. - * + * Used to make sure that commands are arriving periodically from the peer of the transport. + * * @version $Revision$ */ public class InactivityMonitor extends TransportFilter { private final Log log = LogFactory.getLog(InactivityMonitor.class); - + private WireFormatInfo localWireFormatInfo; private WireFormatInfo remoteWireFormatInfo; private final AtomicBoolean monitorStarted= new AtomicBoolean(false); @@ -44,20 +44,20 @@ public class InactivityMonitor extends TransportFilter { private final AtomicBoolean commandReceived=new AtomicBoolean(true); private final AtomicBoolean inReceive=new AtomicBoolean(false); - + private final Runnable readChecker = new Runnable() { public void run() { readCheck(); } }; - + private final Runnable writeChecker = new Runnable() { public void run() { writeCheck(); } }; - - + + public InactivityMonitor(Transport next) { super(next); } @@ -67,108 +67,116 @@ public class InactivityMonitor extends TransportFilter { next.stop(); } - + private void writeCheck() { - if( inSend.get() ) { - log.trace("A send is in progress"); - return; - } - - if( !commandSent.get() ) { - log.trace("No message sent since last write check, sending a KeepAliveInfo"); - try { - next.oneway(new KeepAliveInfo()); - } catch (IOException e) { - onException(e); + synchronized(writeChecker) { + if( inSend.get() ) { + log.trace("A send is in progress"); + return; } - } else { - log.trace("Message sent since last write check, resetting flag"); + + if( !commandSent.get() ) { + log.trace("No message sent since last write check, sending a KeepAliveInfo"); + try { + next.oneway(new KeepAliveInfo()); + } catch (IOException e) { + onException(e); + } + } else { + log.trace("Message sent since last write check, resetting flag"); + } + + commandSent.set(false); } - - commandSent.set(false); - } private void readCheck() { - if( inReceive.get() ) { - log.trace("A receive is in progress"); - return; + synchronized(readChecker) { + if( inReceive.get() ) { + log.trace("A receive is in progress"); + return; + } + + if( !commandReceived.get() ) { + log.debug("No message received since last read check for " + toString() + "! Throwing InactivityIOException."); + onException(new InactivityIOException("Channel was inactive for too long.")); + } else { + log.trace("Message received since last read check, resetting flag: "); + } + + commandReceived.set(false); } - - if( !commandReceived.get() ) { - log.debug("No message received since last read check for " + toString() + "! Throwing InactivityIOException."); - onException(new InactivityIOException("Channel was inactive for too long.")); - } else { - log.trace("Message received since last read check, resetting flag: "); - } - - commandReceived.set(false); + } public void onCommand(Object command) { - inReceive.set(true); - try { - if( command.getClass() == WireFormatInfo.class ) { - synchronized( this ) { - remoteWireFormatInfo = (WireFormatInfo) command; - try { - startMonitorThreads(); - } catch (IOException e) { - onException(e); + synchronized(readChecker) { + inReceive.set(true); + try { + if( command.getClass() == WireFormatInfo.class ) { + synchronized( this ) { + remoteWireFormatInfo = (WireFormatInfo) command; + try { + startMonitorThreads(); + } catch (IOException e) { + onException(e); + } } } + transportListener.onCommand(command); + } finally { + inReceive.set(false); + commandReceived.set(true); } - transportListener.onCommand(command); - } finally { - inReceive.set(false); - commandReceived.set(true); } } - + public void oneway(Object o) throws IOException { - // Disable inactivity monitoring while processing a command. - inSend.set(true); - commandSent.set(true); - try { - if( o.getClass() == WireFormatInfo.class ) { - synchronized( this ) { - localWireFormatInfo = (WireFormatInfo) o; - startMonitorThreads(); + synchronized(writeChecker) { + // Disable inactivity monitoring while processing a command. + inSend.set(true); + commandSent.set(true); + try { + if( o.getClass() == WireFormatInfo.class ) { + synchronized( this ) { + localWireFormatInfo = (WireFormatInfo) o; + startMonitorThreads(); + } } + next.oneway(o); + } finally { + inSend.set(false); } - next.oneway(o); - } finally { - inSend.set(false); } } - + public void onException(IOException error) { if( monitorStarted.get() ) { stopMonitorThreads(); } getTransportListener().onException(error); } - - + + synchronized private void startMonitorThreads() throws IOException { - if( monitorStarted.get() ) + if( monitorStarted.get() ) return; if( localWireFormatInfo == null ) return; if( remoteWireFormatInfo == null ) return; - + long l = Math.min(localWireFormatInfo.getMaxInactivityDuration(), remoteWireFormatInfo.getMaxInactivityDuration()); if( l > 0 ) { - monitorStarted.set(true); + monitorStarted.set(true); Scheduler.executePeriodically(writeChecker, l/2); Scheduler.executePeriodically(readChecker, l); } } - + /** - * + * */ synchronized private void stopMonitorThreads() { if( monitorStarted.compareAndSet(true, false) ) { @@ -176,6 +184,6 @@ public class InactivityMonitor extends TransportFilter { Scheduler.cancel(writeChecker); } } - + }