ARTEMIS-611 refactor STOMP cxn TTL + heart-beat

Adds 3 new URI properties for STOMP acceptors to allow finer grained
configuration of heart-beat / connection-TTL behavior.
This commit is contained in:
jbertram 2016-07-15 08:36:24 -05:00
parent dc76e2a6a0
commit 89e0c461e5
13 changed files with 511 additions and 181 deletions

View File

@ -187,6 +187,12 @@ public class TransportConstants {
public static final String CONNECTION_TTL = "connectionTtl";
public static final String CONNECTION_TTL_MAX = "connectionTtlMax";
public static final String CONNECTION_TTL_MIN = "connectionTtlMin";
public static final String HEART_BEAT_TO_CONNECTION_TTL_MODIFIER = "heartBeatToConnectionTtlModifier";
public static final String STOMP_ENABLE_MESSAGE_ID = "stomp-enable-message-id";
public static final String STOMP_MIN_LARGE_MESSAGE_SIZE = "stomp-min-large-message-size";
@ -230,6 +236,9 @@ public class TransportConstants {
allowableAcceptorKeys.add(TransportConstants.STOMP_CONSUMERS_CREDIT);
allowableAcceptorKeys.add(TransportConstants.STOMP_MIN_LARGE_MESSAGE_SIZE);
allowableAcceptorKeys.add(TransportConstants.CONNECTION_TTL);
allowableAcceptorKeys.add(TransportConstants.CONNECTION_TTL_MAX);
allowableAcceptorKeys.add(TransportConstants.CONNECTION_TTL_MIN);
allowableAcceptorKeys.add(TransportConstants.HEART_BEAT_TO_CONNECTION_TTL_MODIFIER);
allowableAcceptorKeys.add(TransportConstants.STOMP_ENABLE_MESSAGE_ID);
allowableAcceptorKeys.add(TransportConstants.CONNECTIONS_ALLOWED);
allowableAcceptorKeys.add(ActiveMQDefaultConfiguration.getPropMaskPassword());

View File

@ -269,7 +269,7 @@ public final class StompConnection implements RemotingConnection {
}
}
Acceptor getAcceptorUsed() {
public Acceptor getAcceptorUsed() {
return acceptorUsed;
}
@ -720,4 +720,8 @@ public final class StompConnection implements RemotingConnection {
return minLargeMessageSize;
}
public StompProtocolManager getManager() {
return manager;
}
}

View File

@ -55,7 +55,7 @@ import static org.apache.activemq.artemis.core.protocol.stomp.ActiveMQStompProto
/**
* StompProtocolManager
*/
class StompProtocolManager extends AbstractProtocolManager<StompFrame,StompFrameInterceptor,StompConnection> {
public class StompProtocolManager extends AbstractProtocolManager<StompFrame,StompFrameInterceptor,StompConnection> {
// Constants -----------------------------------------------------
// Attributes ----------------------------------------------------

View File

@ -29,7 +29,10 @@ import org.apache.activemq.artemis.core.protocol.stomp.StompDecoder;
import org.apache.activemq.artemis.core.protocol.stomp.StompFrame;
import org.apache.activemq.artemis.core.protocol.stomp.VersionedStompFrameHandler;
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnection;
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
import org.apache.activemq.artemis.core.remoting.server.impl.RemotingServiceImpl;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.spi.core.protocol.ConnectionEntry;
import org.apache.activemq.artemis.utils.CertificateUtil;
import static org.apache.activemq.artemis.core.protocol.stomp.ActiveMQStompProtocolMessageBundle.BUNDLE;
@ -92,7 +95,7 @@ public class StompFrameHandlerV11 extends VersionedStompFrameHandler implements
response.addHeader(Stomp.Headers.Connected.HEART_BEAT, "0,0");
}
else {
response.addHeader(Stomp.Headers.Connected.HEART_BEAT, heartBeater.getServerHeartBeatValue());
response.addHeader(Stomp.Headers.Connected.HEART_BEAT, Long.toString(heartBeater.serverPingPeriod) + "," + Long.toString(heartBeater.clientPingResponse));
}
}
}
@ -231,7 +234,7 @@ public class StompFrameHandlerV11 extends VersionedStompFrameHandler implements
}
private void startHeartBeat() {
if (heartBeater != null) {
if (heartBeater != null && heartBeater.serverPingPeriod != 0) {
heartBeater.start();
}
}
@ -242,31 +245,50 @@ public class StompFrameHandlerV11 extends VersionedStompFrameHandler implements
return frame;
}
//server heart beat
//algorithm:
//(a) server ping: if server hasn't sent any frame within serverPing
//interval, send a ping.
//(b) accept ping: if server hasn't received any frame within
// 2*serverAcceptPing, disconnect!
/*
* HeartBeater functions:
* (a) server ping: if server hasn't sent any frame within serverPingPeriod interval, send a ping
* (b) configure connection ttl so that org.apache.activemq.artemis.core.remoting.server.impl.RemotingServiceImpl.FailureCheckAndFlushThread
* can deal with closing connections which go stale
*/
private class HeartBeater extends Thread {
private static final int MIN_SERVER_PING = 500;
private static final int MIN_CLIENT_PING = 500;
long serverPing = 0;
long serverAcceptPing = 0;
long serverPingPeriod = 0;
long clientPingResponse;
volatile boolean shutdown = false;
AtomicLong lastPingTime = new AtomicLong(0);
AtomicLong lastAccepted = new AtomicLong(0);
StompFrame pingFrame;
AtomicLong lastPingTimestamp = new AtomicLong(0);
ConnectionEntry connectionEntry;
private HeartBeater(long clientPing, long clientAcceptPing) {
if (clientPing != 0) {
serverAcceptPing = clientPing > MIN_CLIENT_PING ? clientPing : MIN_CLIENT_PING;
private HeartBeater(final long clientPing, final long clientAcceptPing) {
connectionEntry = ((RemotingServiceImpl)connection.getManager().getServer().getRemotingService()).getConnectionEntry(connection.getID());
clientPingResponse = clientPing;
String ttlMaxStr = (String) connection.getAcceptorUsed().getConfiguration().get(TransportConstants.CONNECTION_TTL_MAX);
long ttlMax = ttlMaxStr == null ? Long.MAX_VALUE : Long.valueOf(ttlMaxStr);
String ttlMinStr = (String) connection.getAcceptorUsed().getConfiguration().get(TransportConstants.CONNECTION_TTL_MIN);
long ttlMin = ttlMinStr == null ? 500 : Long.valueOf(ttlMinStr);
String heartBeatToTtlModifierStr = (String) connection.getAcceptorUsed().getConfiguration().get(TransportConstants.HEART_BEAT_TO_CONNECTION_TTL_MODIFIER);
double heartBeatToTtlModifier = heartBeatToTtlModifierStr == null ? 2 : Double.valueOf(heartBeatToTtlModifierStr);
// The connection's TTL should be clientPing * 2, MIN_CLIENT_PING, or ttlMax set on the acceptor
long connectionTtl = (long) (clientPing * heartBeatToTtlModifier);
if (connectionTtl < ttlMin) {
connectionTtl = ttlMin;
clientPingResponse = (long) (ttlMin / heartBeatToTtlModifier);
}
else if (connectionTtl > ttlMax) {
connectionTtl = ttlMax;
clientPingResponse = (long) (ttlMax / heartBeatToTtlModifier);
}
ActiveMQServerLogger.LOGGER.info("Setting TTL to: " + connectionTtl);
connectionEntry.ttl = connectionTtl;
if (clientAcceptPing != 0) {
serverPing = clientAcceptPing > MIN_SERVER_PING ? clientAcceptPing : MIN_SERVER_PING;
serverPingPeriod = clientAcceptPing > MIN_SERVER_PING ? clientAcceptPing : MIN_SERVER_PING;
}
}
@ -275,85 +297,32 @@ public class StompFrameHandlerV11 extends VersionedStompFrameHandler implements
this.notify();
}
public String getServerHeartBeatValue() {
return String.valueOf(serverPing) + "," + String.valueOf(serverAcceptPing);
}
public void pinged() {
lastPingTime.set(System.currentTimeMillis());
lastPingTimestamp.set(System.currentTimeMillis());
}
@Override
public void run() {
lastAccepted.set(System.currentTimeMillis());
pingFrame = createPingFrame();
synchronized (this) {
while (!shutdown) {
long dur1 = 0;
long dur2 = 0;
if (serverPing != 0) {
dur1 = System.currentTimeMillis() - lastPingTime.get();
if (dur1 >= serverPing) {
lastPingTime.set(System.currentTimeMillis());
connection.ping(pingFrame);
dur1 = 0;
}
long lastPingPeriod = System.currentTimeMillis() - lastPingTimestamp.get();
if (lastPingPeriod >= serverPingPeriod) {
lastPingTimestamp.set(System.currentTimeMillis());
connection.ping(createPingFrame());
lastPingPeriod = 0;
}
if (serverAcceptPing != 0) {
dur2 = System.currentTimeMillis() - lastAccepted.get();
if (dur2 > (2 * serverAcceptPing)) {
connection.disconnect(false);
shutdown = true;
break;
}
}
long waitTime1 = 0;
long waitTime2 = 0;
if (serverPing > 0) {
waitTime1 = serverPing - dur1;
}
if (serverAcceptPing > 0) {
waitTime2 = serverAcceptPing * 2 - dur2;
}
long waitTime = 10L;
if ((waitTime1 > 0) && (waitTime2 > 0)) {
waitTime = Math.min(waitTime1, waitTime2);
}
else if (waitTime1 > 0) {
waitTime = waitTime1;
}
else if (waitTime2 > 0) {
waitTime = waitTime2;
}
try {
this.wait(waitTime);
this.wait(serverPingPeriod - lastPingPeriod);
}
catch (InterruptedException e) {
}
}
}
}
public void pingAccepted() {
this.lastAccepted.set(System.currentTimeMillis());
}
}
@Override
public void requestAccepted(StompFrame request) {
if (heartBeater != null) {
heartBeater.pingAccepted();
}
}
@Override
@ -403,10 +372,7 @@ public class StompFrameHandlerV11 extends VersionedStompFrameHandler implements
// either "[\r]\n"s or "\n"s)
while (true) {
if (workingBuffer[offset] == NEW_LINE) {
if (heartBeater != null) {
//client ping
heartBeater.pingAccepted();
}
//client ping
nextChar = false;
}
else if (workingBuffer[offset] == CR) {

View File

@ -20,6 +20,7 @@ import java.util.List;
import java.util.Set;
import org.apache.activemq.artemis.api.core.BaseInterceptor;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection;
import org.apache.activemq.artemis.core.security.ActiveMQPrincipal;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
@ -99,4 +100,9 @@ public interface RemotingService {
*/
Acceptor getAcceptor(String name);
Acceptor createAcceptor(String name, String uri) throws Exception;
Acceptor createAcceptor(TransportConfiguration transportConfiguration);
void destroyAcceptor(String name) throws Exception;
}

View File

@ -67,6 +67,7 @@ import org.apache.activemq.artemis.spi.core.remoting.AcceptorFactory;
import org.apache.activemq.artemis.spi.core.remoting.BufferHandler;
import org.apache.activemq.artemis.spi.core.remoting.Connection;
import org.apache.activemq.artemis.spi.core.remoting.ServerConnectionLifeCycleListener;
import org.apache.activemq.artemis.uri.AcceptorTransportConfigurationParser;
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
import org.apache.activemq.artemis.utils.ConfigurationHelper;
import org.apache.activemq.artemis.utils.ReusableLatch;
@ -77,8 +78,6 @@ public class RemotingServiceImpl implements RemotingService, ServerConnectionLif
private static final Logger logger = Logger.getLogger(RemotingServiceImpl.class);
public static final long CONNECTION_TTL_CHECK_INTERVAL = 2000;
// Attributes ----------------------------------------------------
private volatile boolean started = false;
@ -119,6 +118,8 @@ public class RemotingServiceImpl implements RemotingService, ServerConnectionLif
private AtomicLong totalConnectionCount = new AtomicLong(0);
private long connectionTtlCheckInterval;
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
@ -163,6 +164,8 @@ public class RemotingServiceImpl implements RemotingService, ServerConnectionLif
if (protocolManagerFactories != null) {
loadProtocolManagerFactories(protocolManagerFactories);
}
this.connectionTtlCheckInterval = config.getConnectionTtlCheckInterval();
}
private void setInterceptors(Configuration configuration) {
@ -198,54 +201,7 @@ public class RemotingServiceImpl implements RemotingService, ServerConnectionLif
threadPool = Executors.newCachedThreadPool(tFactory);
for (TransportConfiguration info : acceptorsConfig) {
try {
AcceptorFactory factory = server.getServiceRegistry().getAcceptorFactory(info.getName(), info.getFactoryClassName());
Map<String, ProtocolManagerFactory> selectedProtocolFactories = new ConcurrentHashMap<>();
@SuppressWarnings("deprecation")
String protocol = ConfigurationHelper.getStringProperty(TransportConstants.PROTOCOL_PROP_NAME, null, info.getParams());
if (protocol != null) {
ActiveMQServerLogger.LOGGER.warnDeprecatedProtocol();
locateProtocols(protocol, info, selectedProtocolFactories);
}
String protocols = ConfigurationHelper.getStringProperty(TransportConstants.PROTOCOLS_PROP_NAME, null, info.getParams());
if (protocols != null) {
locateProtocols(protocols, info, selectedProtocolFactories);
}
ClusterConnection clusterConnection = lookupClusterConnection(info);
// If empty: we get the default list
if (selectedProtocolFactories.isEmpty()) {
selectedProtocolFactories = protocolMap;
}
Map<String, ProtocolManager> selectedProtocols = new ConcurrentHashMap<>();
for (Map.Entry<String, ProtocolManagerFactory> entry: selectedProtocolFactories.entrySet()) {
selectedProtocols.put(entry.getKey(), entry.getValue().createProtocolManager(server, info.getExtraParams(), incomingInterceptors, outgoingInterceptors));
}
Acceptor acceptor = factory.createAcceptor(info.getName(), clusterConnection, info.getParams(), new DelegatingBufferHandler(), this, threadPool, scheduledThreadPool, selectedProtocols);
if (defaultInvmSecurityPrincipal != null && acceptor.isUnsecurable()) {
acceptor.setDefaultActiveMQPrincipal(defaultInvmSecurityPrincipal);
}
acceptors.put(info.getName(), acceptor);
if (managementService != null) {
acceptor.setNotificationService(managementService);
managementService.registerAcceptor(acceptor, info);
}
}
catch (Exception e) {
ActiveMQServerLogger.LOGGER.errorCreatingAcceptor(e, info.getFactoryClassName());
}
createAcceptor(info);
}
/**
@ -254,13 +210,87 @@ public class RemotingServiceImpl implements RemotingService, ServerConnectionLif
*/
// This thread checks connections that need to be closed, and also flushes confirmations
failureCheckAndFlushThread = new FailureCheckAndFlushThread(RemotingServiceImpl.CONNECTION_TTL_CHECK_INTERVAL);
failureCheckAndFlushThread = new FailureCheckAndFlushThread(connectionTtlCheckInterval);
failureCheckAndFlushThread.start();
started = true;
}
@Override
public Acceptor createAcceptor(String name, String uri) throws Exception {
AcceptorTransportConfigurationParser parser = new AcceptorTransportConfigurationParser();
List<TransportConfiguration> configurations = parser.newObject(parser.expandURI(uri), name);
return createAcceptor(configurations.get(0));
}
@Override
public Acceptor createAcceptor(TransportConfiguration info) {
Acceptor acceptor = null;
try {
AcceptorFactory factory = server.getServiceRegistry().getAcceptorFactory(info.getName(), info.getFactoryClassName());
Map<String, ProtocolManagerFactory> selectedProtocolFactories = new ConcurrentHashMap<>();
@SuppressWarnings("deprecation")
String protocol = ConfigurationHelper.getStringProperty(TransportConstants.PROTOCOL_PROP_NAME, null, info.getParams());
if (protocol != null) {
ActiveMQServerLogger.LOGGER.warnDeprecatedProtocol();
locateProtocols(protocol, info, selectedProtocolFactories);
}
String protocols = ConfigurationHelper.getStringProperty(TransportConstants.PROTOCOLS_PROP_NAME, null, info.getParams());
if (protocols != null) {
locateProtocols(protocols, info, selectedProtocolFactories);
}
ClusterConnection clusterConnection = lookupClusterConnection(info);
// If empty: we get the default list
if (selectedProtocolFactories.isEmpty()) {
selectedProtocolFactories = protocolMap;
}
Map<String, ProtocolManager> selectedProtocols = new ConcurrentHashMap<>();
for (Entry<String, ProtocolManagerFactory> entry: selectedProtocolFactories.entrySet()) {
selectedProtocols.put(entry.getKey(), entry.getValue().createProtocolManager(server, info.getExtraParams(), incomingInterceptors, outgoingInterceptors));
}
acceptor = factory.createAcceptor(info.getName(), clusterConnection, info.getParams(), new DelegatingBufferHandler(), this, threadPool, scheduledThreadPool, selectedProtocols);
if (defaultInvmSecurityPrincipal != null && acceptor.isUnsecurable()) {
acceptor.setDefaultActiveMQPrincipal(defaultInvmSecurityPrincipal);
}
acceptors.put(info.getName(), acceptor);
if (managementService != null) {
acceptor.setNotificationService(managementService);
managementService.registerAcceptor(acceptor, info);
}
}
catch (Exception e) {
ActiveMQServerLogger.LOGGER.errorCreatingAcceptor(e, info.getFactoryClassName());
}
return acceptor;
}
@Override
public void destroyAcceptor(String name) throws Exception {
Acceptor acceptor = acceptors.get(name);
if (acceptor != null) {
acceptor.stop();
acceptors.remove(name);
}
}
@Override
public synchronized void startAcceptors() throws Exception {
if (isStarted()) {
@ -423,6 +453,17 @@ public class RemotingServiceImpl implements RemotingService, ServerConnectionLif
}
}
public ConnectionEntry getConnectionEntry(final Object remotingConnectionID) {
ConnectionEntry entry = connections.get(remotingConnectionID);
if (entry != null) {
return entry;
}
else {
return null;
}
}
@Override
public RemotingConnection removeConnection(final Object remotingConnectionID) {
ConnectionEntry entry = connections.remove(remotingConnectionID);
@ -647,6 +688,7 @@ public class RemotingServiceImpl implements RemotingService, ServerConnectionLif
@Override
public void run() {
while (!closed) {
ActiveMQServerLogger.LOGGER.info("Checking...");
try {
long now = System.currentTimeMillis();

View File

@ -256,15 +256,6 @@ set).
Apache ActiveMQ Artemis currently doesn't support virtual hosting, which means the
'host' header in CONNECT fram will be ignored.
#### Heart-beating
Apache ActiveMQ Artemis specifies a minimum value for both client and server heart-beat
intervals. The minimum interval for both client and server heartbeats is
500 milliseconds. That means if a client sends a CONNECT frame with
heartbeat values lower than 500, the server will defaults the value to
500 milliseconds regardless the values of the 'heart-beat' header in the
frame.
### Mapping Stomp destinations to Apache ActiveMQ Artemis addresses and queues
Stomp clients deals with *destinations* when sending messages and
@ -278,7 +269,14 @@ specified destination is mapped to an address. When a Stomp client
subscribes (or unsubscribes) for a destination (using a `SUBSCRIBE` or
`UNSUBSCRIBE` frame), the destination is mapped to an Apache ActiveMQ Artemis queue.
### STOMP and connection-ttl
### STOMP heart-beating and connection-ttl
Apache ActiveMQ Artemis specifies a minimum value for both client and server heart-beat
intervals. The minimum interval for both client and server heartbeats is
500 milliseconds. That means if a client sends a CONNECT frame with
heartbeat values lower than 500, the server will defaults the value to
500 milliseconds regardless the values of the 'heart-beat' header in the
frame.
Well behaved STOMP clients will always send a DISCONNECT frame before
closing their connections. In this case the server will clear up any
@ -288,19 +286,50 @@ they crash the server will have no way of knowing immediately whether
the client is still alive or not. STOMP connections therefore default to
a connection-ttl value of 1 minute (see chapter on
[connection-ttl](#connection-ttl) for more information. This value can
be overridden using connection-ttl-override.
If you need a specific connectionTtl for your stomp connections without
affecting the connectionTtlOverride setting, you can configure your
stomp acceptor with the "connectionTtl" property, which is used to set
the ttl for connections that are created from that acceptor. For
example:
be overridden using the `connection-ttl-override` property or if you
need a specific connectionTtl for your stomp connections without
affecting the broker-wide `connection-ttl-override` setting, you can
configure your stomp acceptor with the "connectionTtl" property, which
is used to set the ttl for connections that are created from that acceptor.
For example:
<acceptor name="stomp-acceptor">tcp://localhost:61613?protocols=STOMP;connectionTtl=20000</acceptor>
The above configuration will make sure that any stomp connection that is
created from that acceptor will have its connection-ttl set to 20
seconds.
seconds. The `connectionTtl` set on an acceptor will take precedence over
`connection-ttl-override`.
Since Stomp 1.0 doesn't support heart-beating then all connections from
Stomp 1.0 clients will have a connection TTL imposed upon them by the broker
based on the aforementioned configuration options. Likewise, any Stomp 1.1
or 1.2 clients that don't specify a heart-beat or disable heart-beating
(e.g. by sending `0,0` in the `heart-beat` header) will have a connection
TTL imposed upon them by the broker.
For Stomp 1.1 and 1.2 clients which send a valid `heart-beat` header then
their connection TTL will be set accordingly. However, the broker will not
set the connection TTL to the same value as the specified in the `heart-beat`
since even small network delays could then cause spurious disconnects. Instead,
the value in the heart-beat will be multiplied by the `heartBeatConnectionTtlModifer`
specified on the acceptor. The `heartBeatConnectionTtlModifer` is a decimal
value that defaults to 2.0 so for example, if a client sends a `heart-beat`
frame of `1000,0` the the connection TTL will be set to `2000` so that the
ping frames sent every 1000 milliseconds will have a sufficient cushion so as
not to be considered late and trigger a disconnect.
The minimum and maximum connection TTL allowed can also be specified on the
acceptor via the `connectionTtlMin` and `connectionTtlMax` properties respectively.
The default `connectionTtlMin` is 500 and the default `connectionTtlMax` is Java's
`Long.MAX_VALUE` meaning there essentially is no max connection TTL by default.
Keep in mind that the `heartBeatConnectionTtlModifer` is relevant here. For
example, if a client sends a `heart-beat` header of `20000,0` and the acceptor
is using a `connectionTtlMax` of `30000` and a default `heartBeatConnectionTtlModifer`
of `2.0` then the connection TTL would be `40000` (i.e. `20000` * `2.0`) which would
exceed the `connectionTtlMax`. In this case the server would respond to the client
with a `heart-beat` header of `0,15000` (i.e. `30000` / `2.0`). As described
previously, this is to make sure there is a sufficient cushion for the client
heart-beats. The same kind of calculation is done for `connectionTtlMin`.
> **Note**
>

View File

@ -37,13 +37,13 @@ import io.netty.handler.codec.string.StringEncoder;
public class StompOverHttpTest extends StompTest {
@Override
protected void addChannelHandlers(SocketChannel ch) {
protected void addChannelHandlers(int index, SocketChannel ch) {
ch.pipeline().addLast(new HttpRequestEncoder());
ch.pipeline().addLast(new HttpResponseDecoder());
ch.pipeline().addLast(new HttpHandler());
ch.pipeline().addLast("decoder", new StringDecoder(StandardCharsets.UTF_8));
ch.pipeline().addLast("encoder", new StringEncoder(StandardCharsets.UTF_8));
ch.pipeline().addLast(new StompClientHandler());
ch.pipeline().addLast(new StompClientHandler(index));
}
@Override

View File

@ -63,12 +63,12 @@ public class StompOverWebsocketTest extends StompTest {
}
@Override
protected void addChannelHandlers(SocketChannel ch) throws URISyntaxException {
protected void addChannelHandlers(int index, SocketChannel ch) throws URISyntaxException {
ch.pipeline().addLast("http-codec", new HttpClientCodec());
ch.pipeline().addLast("aggregator", new HttpObjectAggregator(8192));
ch.pipeline().addLast(new WebsocketHandler(WebSocketClientHandshakerFactory.newHandshaker(new URI("ws://localhost:8080/websocket"), WebSocketVersion.V13, null, false, null)));
ch.pipeline().addLast("decoder", new StringDecoder(StandardCharsets.UTF_8));
ch.pipeline().addLast(new StompClientHandler());
ch.pipeline().addLast(new StompClientHandler(index));
}
@Override

View File

@ -51,6 +51,24 @@ public class StompTest extends StompTestBase {
private static final transient IntegrationTestLogger log = IntegrationTestLogger.LOGGER;
@Test
public void testConnectionTTL() throws Exception {
int index = 1;
int port = 61614;
server.getActiveMQServer().getRemotingService().createAcceptor("test", "tcp://127.0.0.1:" + port + "?connectionTtl=1000").start();
createBootstrap(index, port);
String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
sendFrame(index, frame);
frame = receiveFrame(index, 10000);
Assert.assertTrue(frame.startsWith("CONNECTED"));
Thread.sleep(5000);
assertChannelClosed(index);
}
@Test
public void testSendManyMessages() throws Exception {
MessageConsumer consumer = session.createConsumer(queue);

View File

@ -29,8 +29,10 @@ import java.io.IOException;
import java.net.Socket;
import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
@ -97,13 +99,15 @@ public abstract class StompTestBase extends ActiveMQTestBase {
protected boolean autoCreateServer = true;
private Bootstrap bootstrap;
private List<Bootstrap> bootstraps = new ArrayList<>();
private Channel channel;
// private Channel channel;
private BlockingQueue<String> priorityQueue;
private List<BlockingQueue<String>> priorityQueues = new ArrayList<>();
private EventLoopGroup group;
private List<EventLoopGroup> groups = new ArrayList<>();
private List<Channel> channels = new ArrayList<>();
// Implementation methods
// -------------------------------------------------------------------------
@ -111,7 +115,6 @@ public abstract class StompTestBase extends ActiveMQTestBase {
@Before
public void setUp() throws Exception {
super.setUp();
priorityQueue = new ArrayBlockingQueue<>(1000);
if (autoCreateServer) {
server = createServer();
addServer(server.getActiveMQServer());
@ -133,18 +136,27 @@ public abstract class StompTestBase extends ActiveMQTestBase {
}
private void createBootstrap() {
group = new NioEventLoopGroup();
bootstrap = new Bootstrap();
bootstrap.group(group).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true).handler(new ChannelInitializer<SocketChannel>() {
createBootstrap(0, port);
}
protected void createBootstrap(int port) {
createBootstrap(0, port);
}
protected void createBootstrap(final int index, int port) {
priorityQueues.add(index, new ArrayBlockingQueue<String>(1000));
groups.add(index, new NioEventLoopGroup());
bootstraps.add(index, new Bootstrap());
bootstraps.get(index).group(groups.get(index)).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true).handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
addChannelHandlers(ch);
addChannelHandlers(index, ch);
}
});
// Start the client.
try {
channel = bootstrap.connect("localhost", port).sync().channel();
channels.add(index, bootstraps.get(index).connect("localhost", port).sync().channel());
handshake();
}
catch (InterruptedException e) {
@ -156,10 +168,10 @@ public abstract class StompTestBase extends ActiveMQTestBase {
protected void handshake() throws InterruptedException {
}
protected void addChannelHandlers(SocketChannel ch) throws URISyntaxException {
protected void addChannelHandlers(int index, SocketChannel ch) throws URISyntaxException {
ch.pipeline().addLast("decoder", new StringDecoder(StandardCharsets.UTF_8));
ch.pipeline().addLast("encoder", new StringEncoder(StandardCharsets.UTF_8));
ch.pipeline().addLast(new StompClientHandler());
ch.pipeline().addLast(new StompClientHandler(index));
}
protected void setUpAfterServer() throws Exception {
@ -224,9 +236,13 @@ public abstract class StompTestBase extends ActiveMQTestBase {
if (autoCreateServer) {
connection.close();
if (group != null) {
channel.close();
group.shutdownGracefully(0, 5000, TimeUnit.MILLISECONDS);
for (EventLoopGroup group : groups) {
if (group != null) {
for (Channel channel : channels) {
channel.close();
}
group.shutdownGracefully(0, 5000, TimeUnit.MILLISECONDS);
}
}
}
super.tearDown();
@ -234,8 +250,8 @@ public abstract class StompTestBase extends ActiveMQTestBase {
protected void cleanUp() throws Exception {
connection.close();
if (group != null) {
group.shutdown();
if (groups.get(0) != null) {
groups.get(0).shutdown();
}
}
@ -244,7 +260,7 @@ public abstract class StompTestBase extends ActiveMQTestBase {
}
protected void reconnect(long sleep) throws Exception {
group.shutdown();
groups.get(0).shutdown();
if (sleep > 0) {
Thread.sleep(sleep);
@ -278,22 +294,38 @@ public abstract class StompTestBase extends ActiveMQTestBase {
}
protected void assertChannelClosed() throws InterruptedException {
boolean closed = channel.closeFuture().await(5000);
assertChannelClosed(0);
}
protected void assertChannelClosed(int index) throws InterruptedException {
boolean closed = channels.get(index).closeFuture().await(5000);
assertTrue("channel not closed", closed);
}
public void sendFrame(String data) throws Exception {
channel.writeAndFlush(data);
sendFrame(0, data);
}
public void sendFrame(int index, String data) throws Exception {
channels.get(index).writeAndFlush(data);
}
public void sendFrame(byte[] data) throws Exception {
sendFrame(0, data);
}
public void sendFrame(int index, byte[] data) throws Exception {
ByteBuf buffer = Unpooled.buffer(data.length);
buffer.writeBytes(data);
channel.writeAndFlush(buffer);
channels.get(index).writeAndFlush(buffer);
}
public String receiveFrame(long timeOut) throws Exception {
String msg = priorityQueue.poll(timeOut, TimeUnit.MILLISECONDS);
return receiveFrame(0, timeOut);
}
public String receiveFrame(int index, long timeOut) throws Exception {
String msg = priorityQueues.get(index).poll(timeOut, TimeUnit.MILLISECONDS);
return msg;
}
@ -344,6 +376,11 @@ public abstract class StompTestBase extends ActiveMQTestBase {
}
class StompClientHandler extends SimpleChannelInboundHandler<String> {
int index = 0;
StompClientHandler(int index) {
this.index = index;
}
StringBuffer currentMessage = new StringBuffer("");
@ -356,7 +393,12 @@ public abstract class StompTestBase extends ActiveMQTestBase {
String actualMessage = fullMessage.substring(0, messageEnd);
fullMessage = fullMessage.substring(messageEnd + 2);
currentMessage = new StringBuffer("");
priorityQueue.add(actualMessage);
BlockingQueue queue = priorityQueues.get(index);
if (queue == null) {
queue = new ArrayBlockingQueue(1000);
priorityQueues.add(index, queue);
}
queue.add(actualMessage);
if (fullMessage.length() > 0) {
channelRead(ctx, fullMessage);
}

View File

@ -740,6 +740,220 @@ public class StompV11Test extends StompV11TestBase {
}
}
@Test
public void testHeartBeatToTTL() throws Exception {
ClientStompFrame frame;
ClientStompFrame reply;
int port = 61614;
server.getActiveMQServer().getRemotingService().createAcceptor("test", "tcp://127.0.0.1:" + port + "?connectionTtl=1000&connectionTtlMin=5000&connectionTtlMax=10000").start();
StompClientConnection connection = StompClientConnectionFactory.createClientConnection("1.1", "localhost", port);
//no heart beat at all if heat-beat absent
frame = connection.createFrame("CONNECT");
frame.addHeader("host", "127.0.0.1");
frame.addHeader("login", this.defUser);
frame.addHeader("passcode", this.defPass);
reply = connection.sendFrame(frame);
assertEquals("CONNECTED", reply.getCommand());
Thread.sleep(3000);
assertEquals(0, connection.getFrameQueueSize());
try {
connection.disconnect();
fail("Channel should be closed here already due to TTL");
}
catch (Exception e) {
// ignore
}
//no heart beat for (0,0)
connection = StompClientConnectionFactory.createClientConnection("1.1", "localhost", port);
frame = connection.createFrame("CONNECT");
frame.addHeader("host", "127.0.0.1");
frame.addHeader("login", this.defUser);
frame.addHeader("passcode", this.defPass);
frame.addHeader("heart-beat", "0,0");
frame.addHeader("accept-version", "1.0,1.1");
reply = connection.sendFrame(frame);
IntegrationTestLogger.LOGGER.info("Reply: " + reply);
assertEquals("CONNECTED", reply.getCommand());
assertEquals("0,0", reply.getHeader("heart-beat"));
Thread.sleep(3000);
assertEquals(0, connection.getFrameQueueSize());
try {
connection.disconnect();
fail("Channel should be closed here already due to TTL");
}
catch (Exception e) {
// ignore
}
//heart-beat (1,0), should receive a min client ping accepted by server
connection = StompClientConnectionFactory.createClientConnection("1.1", "localhost", port);
frame = connection.createFrame("CONNECT");
frame.addHeader("host", "127.0.0.1");
frame.addHeader("login", this.defUser);
frame.addHeader("passcode", this.defPass);
frame.addHeader("heart-beat", "1,0");
frame.addHeader("accept-version", "1.0,1.1");
reply = connection.sendFrame(frame);
assertEquals("CONNECTED", reply.getCommand());
assertEquals("0,2500", reply.getHeader("heart-beat"));
Thread.sleep(7000);
//now server side should be disconnected because we didn't send ping for 2 sec
frame = connection.createFrame("SEND");
frame.addHeader("destination", getQueuePrefix() + getQueueName());
frame.addHeader("content-type", "text/plain");
frame.setBody("Hello World");
//send will fail
try {
connection.sendFrame(frame);
fail("connection should have been destroyed by now");
}
catch (IOException e) {
//ignore
}
//heart-beat (1,0), start a ping, then send a message, should be ok.
connection = StompClientConnectionFactory.createClientConnection("1.1", "localhost", port);
frame = connection.createFrame("CONNECT");
frame.addHeader("host", "127.0.0.1");
frame.addHeader("login", this.defUser);
frame.addHeader("passcode", this.defPass);
frame.addHeader("heart-beat", "1,0");
frame.addHeader("accept-version", "1.0,1.1");
reply = connection.sendFrame(frame);
assertEquals("CONNECTED", reply.getCommand());
assertEquals("0,2500", reply.getHeader("heart-beat"));
System.out.println("========== start pinger!");
connection.startPinger(2500);
Thread.sleep(7000);
//now server side should be disconnected because we didn't send ping for 2 sec
frame = connection.createFrame("SEND");
frame.addHeader("destination", getQueuePrefix() + getQueueName());
frame.addHeader("content-type", "text/plain");
frame.setBody("Hello World");
//send will be ok
connection.sendFrame(frame);
connection.stopPinger();
connection.disconnect();
//heart-beat (20000,0), should receive a max client ping accepted by server
connection = StompClientConnectionFactory.createClientConnection("1.1", "localhost", port);
frame = connection.createFrame("CONNECT");
frame.addHeader("host", "127.0.0.1");
frame.addHeader("login", this.defUser);
frame.addHeader("passcode", this.defPass);
frame.addHeader("heart-beat", "20000,0");
frame.addHeader("accept-version", "1.0,1.1");
reply = connection.sendFrame(frame);
assertEquals("CONNECTED", reply.getCommand());
assertEquals("0,5000", reply.getHeader("heart-beat"));
Thread.sleep(12000);
//now server side should be disconnected because we didn't send ping for 2 sec
frame = connection.createFrame("SEND");
frame.addHeader("destination", getQueuePrefix() + getQueueName());
frame.addHeader("content-type", "text/plain");
frame.setBody("Hello World");
//send will fail
try {
connection.sendFrame(frame);
fail("connection should have been destroyed by now");
}
catch (IOException e) {
//ignore
}
}
@Test
public void testHeartBeatToConnectionTTLModifier() throws Exception {
ClientStompFrame frame;
ClientStompFrame reply;
StompClientConnection connection;
int port = 61614;
server.getActiveMQServer().getRemotingService().createAcceptor("test", "tcp://127.0.0.1:" + port + "?heartBeatToConnectionTtlModifier=1").start();
connection = StompClientConnectionFactory.createClientConnection("1.1", "localhost", port);
frame = connection.createFrame("CONNECT");
frame.addHeader("host", "127.0.0.1");
frame.addHeader("login", this.defUser);
frame.addHeader("passcode", this.defPass);
frame.addHeader("heart-beat", "5000,0");
frame.addHeader("accept-version", "1.0,1.1");
reply = connection.sendFrame(frame);
assertEquals("CONNECTED", reply.getCommand());
assertEquals("0,5000", reply.getHeader("heart-beat"));
Thread.sleep(6000);
try {
connection.disconnect();
fail("Connection should be closed here already due to TTL");
}
catch (Exception e) {
// ignore
}
server.getActiveMQServer().getRemotingService().destroyAcceptor("test");
server.getActiveMQServer().getRemotingService().createAcceptor("test", "tcp://127.0.0.1:" + port + "?heartBeatToConnectionTtlModifier=1.5").start();
connection = StompClientConnectionFactory.createClientConnection("1.1", "localhost", port);
frame = connection.createFrame("CONNECT");
frame.addHeader("host", "127.0.0.1");
frame.addHeader("login", this.defUser);
frame.addHeader("passcode", this.defPass);
frame.addHeader("heart-beat", "5000,0");
frame.addHeader("accept-version", "1.0,1.1");
reply = connection.sendFrame(frame);
assertEquals("CONNECTED", reply.getCommand());
assertEquals("0,5000", reply.getHeader("heart-beat"));
Thread.sleep(6000);
connection.disconnect();
}
@Test
public void testNack() throws Exception {
connV11.connect(defUser, defPass);

View File

@ -103,7 +103,7 @@ public abstract class StompV11TestBase extends ActiveMQTestBase {
params.put(TransportConstants.STOMP_CONSUMERS_CREDIT, "-1");
TransportConfiguration stompTransport = new TransportConfiguration(NettyAcceptorFactory.class.getName(), params);
Configuration config = createBasicConfig().setPersistenceEnabled(persistenceEnabled).addAcceptorConfiguration(stompTransport).addAcceptorConfiguration(new TransportConfiguration(InVMAcceptorFactory.class.getName()));
Configuration config = createBasicConfig().setPersistenceEnabled(persistenceEnabled).addAcceptorConfiguration(stompTransport).addAcceptorConfiguration(new TransportConfiguration(InVMAcceptorFactory.class.getName())).setConnectionTtlCheckInterval(500);
ActiveMQServer activeMQServer = addServer(ActiveMQServers.newActiveMQServer(config, defUser, defPass));