diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java b/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java index 10344a74f3..e844712410 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java @@ -70,77 +70,85 @@ public class InactivityMonitor extends TransportFilter { private void writeCheck() { - if( inSend.get() ) { - log.trace("A send is in progress"); - return; - } - - if( !commandSent.get() ) { - log.trace("No message sent since last write check, sending a KeepAliveInfo"); - try { - next.oneway(new KeepAliveInfo()); - } catch (IOException e) { - onException(e); + synchronized(writeChecker) { + if( inSend.get() ) { + log.trace("A send is in progress"); + return; } - } else { - log.trace("Message sent since last write check, resetting flag"); + + if( !commandSent.get() ) { + log.trace("No message sent since last write check, sending a KeepAliveInfo"); + try { + next.oneway(new KeepAliveInfo()); + } catch (IOException e) { + onException(e); + } + } else { + log.trace("Message sent since last write check, resetting flag"); + } + + commandSent.set(false); } - - commandSent.set(false); - } private void readCheck() { - if( inReceive.get() ) { - log.trace("A receive is in progress"); - return; + synchronized(readChecker) { + if( inReceive.get() ) { + log.trace("A receive is in progress"); + return; + } + + if( !commandReceived.get() ) { + log.debug("No message received since last read check for " + toString() + "! Throwing InactivityIOException."); + onException(new InactivityIOException("Channel was inactive for too long.")); + } else { + log.trace("Message received since last read check, resetting flag: "); + } + + commandReceived.set(false); } - - if( !commandReceived.get() ) { - log.debug("No message received since last read check for " + toString() + "! Throwing InactivityIOException."); - onException(new InactivityIOException("Channel was inactive for too long.")); - } else { - log.trace("Message received since last read check, resetting flag: "); - } - - commandReceived.set(false); + } public void onCommand(Object command) { - inReceive.set(true); - try { - if( command.getClass() == WireFormatInfo.class ) { - synchronized( this ) { - remoteWireFormatInfo = (WireFormatInfo) command; - try { - startMonitorThreads(); - } catch (IOException e) { - onException(e); + synchronized(readChecker) { + inReceive.set(true); + try { + if( command.getClass() == WireFormatInfo.class ) { + synchronized( this ) { + remoteWireFormatInfo = (WireFormatInfo) command; + try { + startMonitorThreads(); + } catch (IOException e) { + onException(e); + } } } + transportListener.onCommand(command); + } finally { + inReceive.set(false); + commandReceived.set(true); } - transportListener.onCommand(command); - } finally { - inReceive.set(false); - commandReceived.set(true); } } public void oneway(Object o) throws IOException { - // Disable inactivity monitoring while processing a command. - inSend.set(true); - commandSent.set(true); - try { - if( o.getClass() == WireFormatInfo.class ) { - synchronized( this ) { - localWireFormatInfo = (WireFormatInfo) o; - startMonitorThreads(); + synchronized(writeChecker) { + // Disable inactivity monitoring while processing a command. + inSend.set(true); + commandSent.set(true); + try { + if( o.getClass() == WireFormatInfo.class ) { + synchronized( this ) { + localWireFormatInfo = (WireFormatInfo) o; + startMonitorThreads(); + } } + next.oneway(o); + } finally { + inSend.set(false); } - next.oneway(o); - } finally { - inSend.set(false); } }