diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java b/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java index 0aa99081f3..8b312eee24 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java @@ -71,12 +71,16 @@ public class InactivityMonitor extends AbstractInactivityMonitor { long readCheckTime = getReadCheckTime(); if (readCheckTime > 0) { - setWriteCheckTime(readCheckTime>3 ? readCheckTime/3 : readCheckTime); + setWriteCheckTime(writeCheckValueFromReadCheck(readCheckTime)); } super.startMonitorThreads(); } + private long writeCheckValueFromReadCheck(long readCheckTime) { + return readCheckTime>3 ? readCheckTime/3 : readCheckTime; + } + @Override protected boolean configuredOk() throws IOException { boolean configured = false; @@ -89,7 +93,7 @@ public class InactivityMonitor extends AbstractInactivityMonitor { } long readCheckTime = Math.min(localWireFormatInfo.getMaxInactivityDuration(), remoteWireFormatInfo.getMaxInactivityDuration()); - long writeCheckTime = readCheckTime>3 ? readCheckTime/3 : readCheckTime; + long writeCheckTime = writeCheckValueFromReadCheck(readCheckTime); setReadCheckTime(readCheckTime); setInitialDelayTime(Math.min(localWireFormatInfo.getMaxInactivityDurationInitalDelay(), remoteWireFormatInfo.getMaxInactivityDurationInitalDelay())); @@ -101,7 +105,7 @@ public class InactivityMonitor extends AbstractInactivityMonitor { } long readCheckTime = localWireFormatInfo.getMaxInactivityDuration(); - long writeCheckTime = readCheckTime>3 ? readCheckTime/3 : readCheckTime; + long writeCheckTime = writeCheckValueFromReadCheck(readCheckTime); setReadCheckTime(readCheckTime); setInitialDelayTime(localWireFormatInfo.getMaxInactivityDurationInitalDelay()); diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java index 8dce4356ce..98f843d993 100644 --- a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java @@ -82,10 +82,6 @@ public class ProtocolConverter { private static final String BROKER_VERSION; private static final StompFrame ping = new StompFrame(Stomp.Commands.KEEPALIVE); - private static final long DEFAULT_OUTBOUND_HEARTBEAT = 100; - private static final long DEFAULT_INBOUND_HEARTBEAT = 1000; - private static final long DEFAULT_INITIAL_HEARTBEAT_DELAY = 1000; - static { InputStream in = null; String version = "5.6.0"; @@ -123,8 +119,9 @@ public class ProtocolConverter { private final FactoryFinder FRAME_TRANSLATOR_FINDER = new FactoryFinder("META-INF/services/org/apache/activemq/transport/frametranslator/"); private final BrokerContext brokerContext; private String version = "1.0"; - private long hbReadInterval = DEFAULT_INBOUND_HEARTBEAT; - private long hbWriteInterval = DEFAULT_OUTBOUND_HEARTBEAT; + private long hbReadInterval; + private long hbWriteInterval; + private String defaultHeartBeat = Stomp.DEFAULT_HEART_BEAT; public ProtocolConverter(StompTransport stompTransport, BrokerContext brokerContext) { this.stompTransport = stompTransport; @@ -620,7 +617,7 @@ public class ProtocolConverter { accepts = Stomp.DEFAULT_VERSION; } if (heartBeat == null) { - heartBeat = Stomp.DEFAULT_HEART_BEAT; + heartBeat = defaultHeartBeat; } HashSet acceptsVersions = new HashSet(Arrays.asList(accepts.split(Stomp.COMMA))); @@ -793,28 +790,27 @@ public class ProtocolConverter { return tempDestinationAmqToStompMap.get(destination.getQualifiedName()); } + public String getDefaultHeartBeat() { + return defaultHeartBeat; + } + + public void setDefaultHeartBeat(String defaultHeartBeat) { + this.defaultHeartBeat = defaultHeartBeat; + } + protected void configureInactivityMonitor(String heartBeatConfig) throws ProtocolException { String[] keepAliveOpts = heartBeatConfig.split(Stomp.COMMA); if (keepAliveOpts == null || keepAliveOpts.length != 2) { - throw new ProtocolException("Invlid heart-beat header:" + heartBeatConfig, true); + throw new ProtocolException("Invalid heart-beat header:" + heartBeatConfig, true); } else { try { hbReadInterval = Long.parseLong(keepAliveOpts[0]); hbWriteInterval = Long.parseLong(keepAliveOpts[1]); } catch(NumberFormatException e) { - throw new ProtocolException("Invlid heart-beat header:" + heartBeatConfig, true); - } - - if (hbReadInterval > 0) { - hbReadInterval = Math.max(DEFAULT_INBOUND_HEARTBEAT, hbReadInterval); - hbReadInterval += Math.min(hbReadInterval, 5000); - } - - if (hbWriteInterval > 0) { - hbWriteInterval = Math.max(DEFAULT_OUTBOUND_HEARTBEAT, hbWriteInterval); + throw new ProtocolException("Invalid heart-beat header:" + heartBeatConfig, true); } try { @@ -822,7 +818,7 @@ public class ProtocolConverter { StompInactivityMonitor monitor = this.stompTransport.getInactivityMonitor(); monitor.setReadCheckTime(hbReadInterval); - monitor.setInitialDelayTime(DEFAULT_INITIAL_HEARTBEAT_DELAY); + monitor.setInitialDelayTime(Math.min(hbReadInterval, hbWriteInterval)); monitor.setWriteCheckTime(hbWriteInterval); monitor.startMonitoring(); diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransportFilter.java b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransportFilter.java index 8b38e4c2ed..329ae16a90 100644 --- a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransportFilter.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransportFilter.java @@ -130,4 +130,13 @@ public class StompTransportFilter extends TransportFilter implements StompTransp public StompWireFormat getWireFormat() { return this.wireFormat; } + + public String getDefaultHeartBeat() { + return protocolConverter != null ? protocolConverter.getDefaultHeartBeat() : null; + } + + public void setDefaultHeartBeat(String defaultHeartBeat) { + protocolConverter.setDefaultHeartBeat(defaultHeartBeat); + } + } diff --git a/activemq-core/src/test/java/org/apache/activemq/transport/stomp/ConnectTest.java b/activemq-core/src/test/java/org/apache/activemq/transport/stomp/ConnectTest.java index f656afb2d7..bd2d33c014 100644 --- a/activemq-core/src/test/java/org/apache/activemq/transport/stomp/ConnectTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/transport/stomp/ConnectTest.java @@ -136,4 +136,43 @@ public class ConnectTest { socket.close(); assertTrue("no exceptions", exceptions.isEmpty()); } + + @Test + public void testInactivityMonitor() throws Exception { + + brokerService.addConnector("stomp://0.0.0.0:0?transport.defaultHeartBeat=5000,0&transport.useKeepAlive=false"); + brokerService.start(); + + Thread t1 = new Thread() { + StompConnection connection = new StompConnection(); + + public void run() { + try { + connection.open("localhost", brokerService.getTransportConnectors().get(0).getConnectUri().getPort()); + connection.connect("system", "manager"); + } catch (Exception ex) { + LOG.error("unexpected exception on connect/disconnect", ex); + exceptions.add(ex); + } + } + }; + + t1.run(); + + assertTrue("one connection", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return 1 == brokerService.getTransportConnectors().get(0).connectionCount(); + } + })); + + // and it should be closed due to inactivity + assertTrue("no dangling connections", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return 0 == brokerService.getTransportConnectors().get(0).connectionCount(); + } + })); + assertTrue("no exceptions", exceptions.isEmpty()); + } } \ No newline at end of file