diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java b/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java index a83d45167a..325ffcf789 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java @@ -126,7 +126,9 @@ public class InactivityMonitor extends TransportFilter { ASYNC_TASKS.execute(new Runnable() { public void run() { try { - oneway(new KeepAliveInfo()); + KeepAliveInfo info = new KeepAliveInfo(); + info.setResponseRequired(true); + oneway(info); } catch (IOException e) { onException(e); } @@ -173,22 +175,34 @@ public class InactivityMonitor extends TransportFilter { commandReceived.set(true); inReceive.set(true); try { - if (command.getClass() == WireFormatInfo.class) { - synchronized (this) { - IOException error=null; - remoteWireFormatInfo = (WireFormatInfo)command; + if (command.getClass() == KeepAliveInfo.class) { + KeepAliveInfo info = (KeepAliveInfo) command; + if (info.isResponseRequired()) { try { - startMonitorThreads(); + info.setResponseRequired(false); + oneway(info); } catch (IOException e) { - error = e; - } - if( error!=null ) { - onException(error); + onException(e); } } - } - synchronized (readChecker) { - transportListener.onCommand(command); + } else { + if (command.getClass() == WireFormatInfo.class) { + synchronized (this) { + IOException error = null; + remoteWireFormatInfo = (WireFormatInfo) command; + try { + startMonitorThreads(); + } catch (IOException e) { + error = e; + } + if (error != null) { + onException(error); + } + } + } + synchronized (readChecker) { + transportListener.onCommand(command); + } } } finally {