diff --git a/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java b/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java index c0fc66bce5..88e16cf52b 100644 --- a/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java +++ b/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java @@ -34,33 +34,7 @@ import org.apache.activemq.ActiveMQPrefetchPolicy; import org.apache.activemq.advisory.AdvisorySupport; import org.apache.activemq.broker.BrokerContext; import org.apache.activemq.broker.BrokerContextAware; -import org.apache.activemq.command.ActiveMQDestination; -import org.apache.activemq.command.ActiveMQMessage; -import org.apache.activemq.command.ActiveMQTempQueue; -import org.apache.activemq.command.ActiveMQTempTopic; -import org.apache.activemq.command.Command; -import org.apache.activemq.command.CommandTypes; -import org.apache.activemq.command.ConnectionError; -import org.apache.activemq.command.ConnectionId; -import org.apache.activemq.command.ConnectionInfo; -import org.apache.activemq.command.ConsumerControl; -import org.apache.activemq.command.ConsumerId; -import org.apache.activemq.command.ConsumerInfo; -import org.apache.activemq.command.DestinationInfo; -import org.apache.activemq.command.ExceptionResponse; -import org.apache.activemq.command.LocalTransactionId; -import org.apache.activemq.command.MessageAck; -import org.apache.activemq.command.MessageDispatch; -import org.apache.activemq.command.MessageId; -import org.apache.activemq.command.ProducerId; -import org.apache.activemq.command.ProducerInfo; -import org.apache.activemq.command.RemoveSubscriptionInfo; -import org.apache.activemq.command.Response; -import org.apache.activemq.command.SessionId; -import org.apache.activemq.command.SessionInfo; -import org.apache.activemq.command.ShutdownInfo; -import org.apache.activemq.command.TransactionId; -import org.apache.activemq.command.TransactionInfo; +import org.apache.activemq.command.*; import org.apache.activemq.util.ByteArrayOutputStream; import org.apache.activemq.util.FactoryFinder; import org.apache.activemq.util.IOExceptionSupport; @@ -124,6 +98,7 @@ public class ProtocolConverter { private String version = "1.0"; private long hbReadInterval; private long hbWriteInterval; + private float hbGracePeriodMultiplier = 1.0f; private String defaultHeartBeat = Stomp.DEFAULT_HEART_BEAT; private static class AckEntry { @@ -928,6 +903,20 @@ public class ProtocolConverter { this.defaultHeartBeat = defaultHeartBeat; } + /** + * @return the hbGracePeriodMultiplier + */ + public float getHbGracePeriodMultiplier() { + return hbGracePeriodMultiplier; + } + + /** + * @param hbGracePeriodMultiplier the hbGracePeriodMultiplier to set + */ + public void setHbGracePeriodMultiplier(float hbGracePeriodMultiplier) { + this.hbGracePeriodMultiplier = hbGracePeriodMultiplier; + } + protected void configureInactivityMonitor(String heartBeatConfig) throws ProtocolException { String[] keepAliveOpts = heartBeatConfig.split(Stomp.COMMA); @@ -937,7 +926,7 @@ public class ProtocolConverter { } else { try { - hbReadInterval = Long.parseLong(keepAliveOpts[0]); + hbReadInterval = (long) (Long.parseLong(keepAliveOpts[0]) * hbGracePeriodMultiplier); hbWriteInterval = Long.parseLong(keepAliveOpts[1]); } catch(NumberFormatException e) { throw new ProtocolException("Invalid heart-beat header:" + heartBeatConfig, true); @@ -945,7 +934,7 @@ public class ProtocolConverter { try { StompInactivityMonitor monitor = this.stompTransport.getInactivityMonitor(); - monitor.setReadCheckTime(hbReadInterval); + monitor.setReadCheckTime((long) (hbReadInterval * hbGracePeriodMultiplier)); monitor.setInitialDelayTime(Math.min(hbReadInterval, hbWriteInterval)); monitor.setWriteCheckTime(hbWriteInterval); monitor.startMonitoring(); diff --git a/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompTransportFilter.java b/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompTransportFilter.java index 311a28a827..14c8122266 100644 --- a/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompTransportFilter.java +++ b/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompTransportFilter.java @@ -55,6 +55,7 @@ public class StompTransportFilter extends TransportFilter implements StompTransp } } + @Override public void oneway(Object o) throws IOException { try { final Command command = (Command) o; @@ -64,6 +65,7 @@ public class StompTransportFilter extends TransportFilter implements StompTransp } } + @Override public void onCommand(Object command) { try { if (trace) { @@ -78,6 +80,7 @@ public class StompTransportFilter extends TransportFilter implements StompTransp } } + @Override public void sendToActiveMQ(Command command) { TransportListener l = transportListener; if (l != null) { @@ -85,6 +88,7 @@ public class StompTransportFilter extends TransportFilter implements StompTransp } } + @Override public void sendToStomp(StompFrame command) throws IOException { if (trace) { TRACE.trace("Sending: \n" + command); @@ -125,4 +129,28 @@ public class StompTransportFilter extends TransportFilter implements StompTransp protocolConverter.setDefaultHeartBeat(defaultHeartBeat); } + /** + * Returns the currently configured Read check grace period multiplier. + * + * @return the hbGracePeriodMultiplier + */ + public float getHbGracePeriodMultiplier() { + return protocolConverter != null ? protocolConverter.getHbGracePeriodMultiplier() : null; + } + + /** + * Sets the read check grace period multiplier. New CONNECT frames that indicate a heart beat + * value with a read check interval will have that value multiplied by this value to add a + * grace period before the connection is considered invalid. By default this value is set to + * zero and no grace period is given. When set the value must be larger than 1.0 or it will + * be ignored. + * + * @param hbGracePeriodMultiplier the hbGracePeriodMultiplier to set + */ + public void setHbGracePeriodMultiplier(float hbGracePeriodMultiplier) { + if (hbGracePeriodMultiplier > 1.0f) { + protocolConverter.setHbGracePeriodMultiplier(hbGracePeriodMultiplier); + } + } + }