diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java b/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java index c64503de4c..67133a5c7c 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java @@ -67,84 +67,86 @@ public class InactivityMonitor extends TransportFilter { } final void writeCheck() { - synchronized (writeChecker) { if (inSend.get()) { - LOG.trace("A send is in progress"); - return; - } - - if (!commandSent.get()) { - LOG.trace("No message sent since last write check, sending a KeepAliveInfo"); - try { - next.oneway(new KeepAliveInfo()); - } catch (IOException e) { - onException(e); - } - } else { - LOG.trace("Message sent since last write check, resetting flag"); - } - - commandSent.set(false); + LOG.trace("A send is in progress"); + return; } + + if (!commandSent.get()) { + LOG.trace("No message sent since last write check, sending a KeepAliveInfo"); + try { + synchronized (writeChecker) { + next.oneway(new KeepAliveInfo()); + } + } catch (IOException e) { + onException(e); + } + } else { + LOG.trace("Message sent since last write check, resetting flag"); + } + + commandSent.set(false); } final void readCheck() { - synchronized (readChecker) { - if (inReceive.get()) { - LOG.trace("A receive is in progress"); - return; - } - - if (!commandReceived.get()) { - LOG.debug("No message received since last read check for " + toString() + "! Throwing InactivityIOException."); - onException(new InactivityIOException("Channel was inactive for too long.")); - } else { - LOG.trace("Message received since last read check, resetting flag: "); - } - - commandReceived.set(false); + if (inReceive.get()) { + LOG.trace("A receive is in progress"); + return; } + if (!commandReceived.get()) { + LOG.debug("No message received since last read check for " + toString() + "! Throwing InactivityIOException."); + synchronized (readChecker) { + onException(new InactivityIOException("Channel was inactive for too long.")); + } + } else { + LOG.trace("Message received since last read check, resetting flag: "); + } + commandReceived.set(false); } public void onCommand(Object command) { - synchronized (readChecker) { - inReceive.set(true); - try { - if (command.getClass() == WireFormatInfo.class) { - synchronized (this) { - remoteWireFormatInfo = (WireFormatInfo)command; - try { - startMonitorThreads(); - } catch (IOException e) { - onException(e); - } + inReceive.set(true); + try { + if (command.getClass() == WireFormatInfo.class) { + synchronized (this) { + IOException error=null; + remoteWireFormatInfo = (WireFormatInfo)command; + try { + startMonitorThreads(); + } catch (IOException e) { + error = e; + } + if( error!=null ) { + onException(error); } } - transportListener.onCommand(command); - } finally { - inReceive.set(false); - commandReceived.set(true); } + synchronized (readChecker) { + transportListener.onCommand(command); + } + } finally { + commandReceived.set(true); + inReceive.set(false); } } public void oneway(Object o) throws IOException { - synchronized (writeChecker) { - // Disable inactivity monitoring while processing a command. - inSend.set(true); - commandSent.set(true); - try { - if (o.getClass() == WireFormatInfo.class) { - synchronized (this) { - localWireFormatInfo = (WireFormatInfo)o; - startMonitorThreads(); - } + // Disable inactivity monitoring while processing a command. + inSend.set(true); + try { + if (o.getClass() == WireFormatInfo.class) { + synchronized (this) { + localWireFormatInfo = (WireFormatInfo)o; + startMonitorThreads(); } - next.oneway(o); - } finally { - inSend.set(false); } + synchronized (writeChecker) { + next.oneway(o); + } + } finally { + commandSent.set(true); + inSend.set(false); } } @@ -152,7 +154,9 @@ public class InactivityMonitor extends TransportFilter { if (monitorStarted.get()) { stopMonitorThreads(); } - getTransportListener().onException(error); + synchronized (readChecker) { + transportListener.onException(error); + } } private synchronized void startMonitorThreads() throws IOException {