https://issues.apache.org/jira/browse/AMQ-3603 - STOMP 1.1 introduced the heartBeat header implemented by the inactivity monitor, would be nice to have this option for stomp 1.0. Implement support for ?transport.defaultHeartBeat=5000,0 with test

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1205011 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2011-11-22 14:20:04 +00:00
parent 7c2d5e492f
commit 33edc9902e
4 changed files with 70 additions and 22 deletions

View File

@ -71,12 +71,16 @@ public class InactivityMonitor extends AbstractInactivityMonitor {
long readCheckTime = getReadCheckTime();
if (readCheckTime > 0) {
setWriteCheckTime(readCheckTime>3 ? readCheckTime/3 : readCheckTime);
setWriteCheckTime(writeCheckValueFromReadCheck(readCheckTime));
}
super.startMonitorThreads();
}
private long writeCheckValueFromReadCheck(long readCheckTime) {
return readCheckTime>3 ? readCheckTime/3 : readCheckTime;
}
@Override
protected boolean configuredOk() throws IOException {
boolean configured = false;
@ -89,7 +93,7 @@ public class InactivityMonitor extends AbstractInactivityMonitor {
}
long readCheckTime = Math.min(localWireFormatInfo.getMaxInactivityDuration(), remoteWireFormatInfo.getMaxInactivityDuration());
long writeCheckTime = readCheckTime>3 ? readCheckTime/3 : readCheckTime;
long writeCheckTime = writeCheckValueFromReadCheck(readCheckTime);
setReadCheckTime(readCheckTime);
setInitialDelayTime(Math.min(localWireFormatInfo.getMaxInactivityDurationInitalDelay(), remoteWireFormatInfo.getMaxInactivityDurationInitalDelay()));
@ -101,7 +105,7 @@ public class InactivityMonitor extends AbstractInactivityMonitor {
}
long readCheckTime = localWireFormatInfo.getMaxInactivityDuration();
long writeCheckTime = readCheckTime>3 ? readCheckTime/3 : readCheckTime;
long writeCheckTime = writeCheckValueFromReadCheck(readCheckTime);
setReadCheckTime(readCheckTime);
setInitialDelayTime(localWireFormatInfo.getMaxInactivityDurationInitalDelay());

View File

@ -82,10 +82,6 @@ public class ProtocolConverter {
private static final String BROKER_VERSION;
private static final StompFrame ping = new StompFrame(Stomp.Commands.KEEPALIVE);
private static final long DEFAULT_OUTBOUND_HEARTBEAT = 100;
private static final long DEFAULT_INBOUND_HEARTBEAT = 1000;
private static final long DEFAULT_INITIAL_HEARTBEAT_DELAY = 1000;
static {
InputStream in = null;
String version = "5.6.0";
@ -123,8 +119,9 @@ public class ProtocolConverter {
private final FactoryFinder FRAME_TRANSLATOR_FINDER = new FactoryFinder("META-INF/services/org/apache/activemq/transport/frametranslator/");
private final BrokerContext brokerContext;
private String version = "1.0";
private long hbReadInterval = DEFAULT_INBOUND_HEARTBEAT;
private long hbWriteInterval = DEFAULT_OUTBOUND_HEARTBEAT;
private long hbReadInterval;
private long hbWriteInterval;
private String defaultHeartBeat = Stomp.DEFAULT_HEART_BEAT;
public ProtocolConverter(StompTransport stompTransport, BrokerContext brokerContext) {
this.stompTransport = stompTransport;
@ -620,7 +617,7 @@ public class ProtocolConverter {
accepts = Stomp.DEFAULT_VERSION;
}
if (heartBeat == null) {
heartBeat = Stomp.DEFAULT_HEART_BEAT;
heartBeat = defaultHeartBeat;
}
HashSet<String> acceptsVersions = new HashSet<String>(Arrays.asList(accepts.split(Stomp.COMMA)));
@ -793,28 +790,27 @@ public class ProtocolConverter {
return tempDestinationAmqToStompMap.get(destination.getQualifiedName());
}
public String getDefaultHeartBeat() {
return defaultHeartBeat;
}
public void setDefaultHeartBeat(String defaultHeartBeat) {
this.defaultHeartBeat = defaultHeartBeat;
}
protected void configureInactivityMonitor(String heartBeatConfig) throws ProtocolException {
String[] keepAliveOpts = heartBeatConfig.split(Stomp.COMMA);
if (keepAliveOpts == null || keepAliveOpts.length != 2) {
throw new ProtocolException("Invlid heart-beat header:" + heartBeatConfig, true);
throw new ProtocolException("Invalid heart-beat header:" + heartBeatConfig, true);
} else {
try {
hbReadInterval = Long.parseLong(keepAliveOpts[0]);
hbWriteInterval = Long.parseLong(keepAliveOpts[1]);
} catch(NumberFormatException e) {
throw new ProtocolException("Invlid heart-beat header:" + heartBeatConfig, true);
}
if (hbReadInterval > 0) {
hbReadInterval = Math.max(DEFAULT_INBOUND_HEARTBEAT, hbReadInterval);
hbReadInterval += Math.min(hbReadInterval, 5000);
}
if (hbWriteInterval > 0) {
hbWriteInterval = Math.max(DEFAULT_OUTBOUND_HEARTBEAT, hbWriteInterval);
throw new ProtocolException("Invalid heart-beat header:" + heartBeatConfig, true);
}
try {
@ -822,7 +818,7 @@ public class ProtocolConverter {
StompInactivityMonitor monitor = this.stompTransport.getInactivityMonitor();
monitor.setReadCheckTime(hbReadInterval);
monitor.setInitialDelayTime(DEFAULT_INITIAL_HEARTBEAT_DELAY);
monitor.setInitialDelayTime(Math.min(hbReadInterval, hbWriteInterval));
monitor.setWriteCheckTime(hbWriteInterval);
monitor.startMonitoring();

View File

@ -130,4 +130,13 @@ public class StompTransportFilter extends TransportFilter implements StompTransp
public StompWireFormat getWireFormat() {
return this.wireFormat;
}
public String getDefaultHeartBeat() {
return protocolConverter != null ? protocolConverter.getDefaultHeartBeat() : null;
}
public void setDefaultHeartBeat(String defaultHeartBeat) {
protocolConverter.setDefaultHeartBeat(defaultHeartBeat);
}
}

View File

@ -136,4 +136,43 @@ public class ConnectTest {
socket.close();
assertTrue("no exceptions", exceptions.isEmpty());
}
@Test
public void testInactivityMonitor() throws Exception {
brokerService.addConnector("stomp://0.0.0.0:0?transport.defaultHeartBeat=5000,0&transport.useKeepAlive=false");
brokerService.start();
Thread t1 = new Thread() {
StompConnection connection = new StompConnection();
public void run() {
try {
connection.open("localhost", brokerService.getTransportConnectors().get(0).getConnectUri().getPort());
connection.connect("system", "manager");
} catch (Exception ex) {
LOG.error("unexpected exception on connect/disconnect", ex);
exceptions.add(ex);
}
}
};
t1.run();
assertTrue("one connection", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return 1 == brokerService.getTransportConnectors().get(0).connectionCount();
}
}));
// and it should be closed due to inactivity
assertTrue("no dangling connections", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return 0 == brokerService.getTransportConnectors().get(0).connectionCount();
}
}));
assertTrue("no exceptions", exceptions.isEmpty());
}
}