diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java b/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java index 7f14aba23e..237232ce69 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java @@ -62,7 +62,7 @@ public class InactivityMonitor extends TransportFilter { private long readCheckTime; private long writeCheckTime; private long initialDelayTime; - + private boolean keepAliveResponseRequired; private WireFormat wireFormat; private final Runnable readChecker = new Runnable() { @@ -126,7 +126,7 @@ public class InactivityMonitor extends TransportFilter { } if (!commandSent.get()) { - if(LOG.isTraceEnabled()) { + if (LOG.isTraceEnabled()) { LOG.trace("No message sent since last write check, sending a KeepAliveInfo"); } ASYNC_TASKS.execute(new Runnable() { @@ -135,7 +135,7 @@ public class InactivityMonitor extends TransportFilter { try { KeepAliveInfo info = new KeepAliveInfo(); - info.setResponseRequired(true); + info.setResponseRequired(keepAliveResponseRequired); oneway(info); } catch (IOException e) { onException(e); @@ -247,7 +247,11 @@ public class InactivityMonitor extends TransportFilter { stopMonitorThreads(); transportListener.onException(error); } - } + } + + public void setKeepAliveResponseRequired(boolean val) { + keepAliveResponseRequired = val; + } private synchronized void startMonitorThreads() throws IOException { if (monitorStarted.get()) { @@ -266,7 +270,7 @@ public class InactivityMonitor extends TransportFilter { monitorStarted.set(true); writeCheckerTask = new SchedulerTimerTask(writeChecker); readCheckerTask = new SchedulerTimerTask(readChecker); - writeCheckTime = readCheckTime/3; + writeCheckTime = readCheckTime>3 ? readCheckTime/3 : readCheckTime; synchronized( InactivityMonitor.class ) { if( CHECKER_COUNTER == 0 ) { READ_CHECK_TIMER = new Timer("InactivityMonitor ReadCheck",true); diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java b/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java index 7d2ebbe2ea..ddfdb5dd3a 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java @@ -99,7 +99,9 @@ public class TcpTransportFactory extends TransportFactory { boolean useInactivityMonitor = "true".equals(getOption(options, "useInactivityMonitor", "true")); if (useInactivityMonitor && isUseInactivityMonitor(transport)) { transport = new InactivityMonitor(transport, format); + IntrospectionSupport.setProperties(transport, options); } + // Only need the WireFormatNegotiator if using openwire if (format instanceof OpenWireFormat) {