add transport options "?transport.hbGracePeriodMultiplier" which is used to add a user defined grace period to the read check interval indicated by the connecting STOMP client. 

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1514023 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Timothy A. Bish 2013-08-14 20:19:06 +00:00
parent 65034359a5
commit d0ab117a38
2 changed files with 46 additions and 29 deletions

View File

@ -34,33 +34,7 @@ import org.apache.activemq.ActiveMQPrefetchPolicy;
import org.apache.activemq.advisory.AdvisorySupport; import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.broker.BrokerContext; import org.apache.activemq.broker.BrokerContext;
import org.apache.activemq.broker.BrokerContextAware; import org.apache.activemq.broker.BrokerContextAware;
import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.*;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQTempQueue;
import org.apache.activemq.command.ActiveMQTempTopic;
import org.apache.activemq.command.Command;
import org.apache.activemq.command.CommandTypes;
import org.apache.activemq.command.ConnectionError;
import org.apache.activemq.command.ConnectionId;
import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.command.ConsumerControl;
import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.DestinationInfo;
import org.apache.activemq.command.ExceptionResponse;
import org.apache.activemq.command.LocalTransactionId;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.ProducerId;
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.command.RemoveSubscriptionInfo;
import org.apache.activemq.command.Response;
import org.apache.activemq.command.SessionId;
import org.apache.activemq.command.SessionInfo;
import org.apache.activemq.command.ShutdownInfo;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.command.TransactionInfo;
import org.apache.activemq.util.ByteArrayOutputStream; import org.apache.activemq.util.ByteArrayOutputStream;
import org.apache.activemq.util.FactoryFinder; import org.apache.activemq.util.FactoryFinder;
import org.apache.activemq.util.IOExceptionSupport; import org.apache.activemq.util.IOExceptionSupport;
@ -124,6 +98,7 @@ public class ProtocolConverter {
private String version = "1.0"; private String version = "1.0";
private long hbReadInterval; private long hbReadInterval;
private long hbWriteInterval; private long hbWriteInterval;
private float hbGracePeriodMultiplier = 1.0f;
private String defaultHeartBeat = Stomp.DEFAULT_HEART_BEAT; private String defaultHeartBeat = Stomp.DEFAULT_HEART_BEAT;
private static class AckEntry { private static class AckEntry {
@ -928,6 +903,20 @@ public class ProtocolConverter {
this.defaultHeartBeat = defaultHeartBeat; this.defaultHeartBeat = defaultHeartBeat;
} }
/**
* @return the hbGracePeriodMultiplier
*/
public float getHbGracePeriodMultiplier() {
return hbGracePeriodMultiplier;
}
/**
* @param hbGracePeriodMultiplier the hbGracePeriodMultiplier to set
*/
public void setHbGracePeriodMultiplier(float hbGracePeriodMultiplier) {
this.hbGracePeriodMultiplier = hbGracePeriodMultiplier;
}
protected void configureInactivityMonitor(String heartBeatConfig) throws ProtocolException { protected void configureInactivityMonitor(String heartBeatConfig) throws ProtocolException {
String[] keepAliveOpts = heartBeatConfig.split(Stomp.COMMA); String[] keepAliveOpts = heartBeatConfig.split(Stomp.COMMA);
@ -937,7 +926,7 @@ public class ProtocolConverter {
} else { } else {
try { try {
hbReadInterval = Long.parseLong(keepAliveOpts[0]); hbReadInterval = (long) (Long.parseLong(keepAliveOpts[0]) * hbGracePeriodMultiplier);
hbWriteInterval = Long.parseLong(keepAliveOpts[1]); hbWriteInterval = Long.parseLong(keepAliveOpts[1]);
} catch(NumberFormatException e) { } catch(NumberFormatException e) {
throw new ProtocolException("Invalid heart-beat header:" + heartBeatConfig, true); throw new ProtocolException("Invalid heart-beat header:" + heartBeatConfig, true);
@ -945,7 +934,7 @@ public class ProtocolConverter {
try { try {
StompInactivityMonitor monitor = this.stompTransport.getInactivityMonitor(); StompInactivityMonitor monitor = this.stompTransport.getInactivityMonitor();
monitor.setReadCheckTime(hbReadInterval); monitor.setReadCheckTime((long) (hbReadInterval * hbGracePeriodMultiplier));
monitor.setInitialDelayTime(Math.min(hbReadInterval, hbWriteInterval)); monitor.setInitialDelayTime(Math.min(hbReadInterval, hbWriteInterval));
monitor.setWriteCheckTime(hbWriteInterval); monitor.setWriteCheckTime(hbWriteInterval);
monitor.startMonitoring(); monitor.startMonitoring();

View File

@ -55,6 +55,7 @@ public class StompTransportFilter extends TransportFilter implements StompTransp
} }
} }
@Override
public void oneway(Object o) throws IOException { public void oneway(Object o) throws IOException {
try { try {
final Command command = (Command) o; final Command command = (Command) o;
@ -64,6 +65,7 @@ public class StompTransportFilter extends TransportFilter implements StompTransp
} }
} }
@Override
public void onCommand(Object command) { public void onCommand(Object command) {
try { try {
if (trace) { if (trace) {
@ -78,6 +80,7 @@ public class StompTransportFilter extends TransportFilter implements StompTransp
} }
} }
@Override
public void sendToActiveMQ(Command command) { public void sendToActiveMQ(Command command) {
TransportListener l = transportListener; TransportListener l = transportListener;
if (l != null) { if (l != null) {
@ -85,6 +88,7 @@ public class StompTransportFilter extends TransportFilter implements StompTransp
} }
} }
@Override
public void sendToStomp(StompFrame command) throws IOException { public void sendToStomp(StompFrame command) throws IOException {
if (trace) { if (trace) {
TRACE.trace("Sending: \n" + command); TRACE.trace("Sending: \n" + command);
@ -125,4 +129,28 @@ public class StompTransportFilter extends TransportFilter implements StompTransp
protocolConverter.setDefaultHeartBeat(defaultHeartBeat); protocolConverter.setDefaultHeartBeat(defaultHeartBeat);
} }
/**
* Returns the currently configured Read check grace period multiplier.
*
* @return the hbGracePeriodMultiplier
*/
public float getHbGracePeriodMultiplier() {
return protocolConverter != null ? protocolConverter.getHbGracePeriodMultiplier() : null;
}
/**
* Sets the read check grace period multiplier. New CONNECT frames that indicate a heart beat
* value with a read check interval will have that value multiplied by this value to add a
* grace period before the connection is considered invalid. By default this value is set to
* zero and no grace period is given. When set the value must be larger than 1.0 or it will
* be ignored.
*
* @param hbGracePeriodMultiplier the hbGracePeriodMultiplier to set
*/
public void setHbGracePeriodMultiplier(float hbGracePeriodMultiplier) {
if (hbGracePeriodMultiplier > 1.0f) {
protocolConverter.setHbGracePeriodMultiplier(hbGracePeriodMultiplier);
}
}
} }