This closes #650
This commit is contained in:
commit
cfaaba41db
|
@ -286,6 +286,12 @@ public final class StompConnection implements RemotingConnection {
|
|||
return;
|
||||
}
|
||||
|
||||
if (me != null) {
|
||||
StompFrame frame = frameHandler.createStompFrame(Stomp.Responses.ERROR);
|
||||
frame.addHeader(Stomp.Headers.Error.MESSAGE, me.getMessage());
|
||||
sendFrame(frame);
|
||||
}
|
||||
|
||||
destroyed = true;
|
||||
}
|
||||
|
||||
|
|
|
@ -269,7 +269,7 @@ public class StompFrameHandlerV11 extends VersionedStompFrameHandler implements
|
|||
long ttlMax = ttlMaxStr == null ? Long.MAX_VALUE : Long.valueOf(ttlMaxStr);
|
||||
|
||||
String ttlMinStr = (String) connection.getAcceptorUsed().getConfiguration().get(TransportConstants.CONNECTION_TTL_MIN);
|
||||
long ttlMin = ttlMinStr == null ? 500 : Long.valueOf(ttlMinStr);
|
||||
long ttlMin = ttlMinStr == null ? 1000 : Long.valueOf(ttlMinStr);
|
||||
|
||||
String heartBeatToTtlModifierStr = (String) connection.getAcceptorUsed().getConfiguration().get(TransportConstants.HEART_BEAT_TO_CONNECTION_TTL_MODIFIER);
|
||||
double heartBeatToTtlModifier = heartBeatToTtlModifierStr == null ? 2 : Double.valueOf(heartBeatToTtlModifierStr);
|
||||
|
@ -284,7 +284,9 @@ public class StompFrameHandlerV11 extends VersionedStompFrameHandler implements
|
|||
connectionTtl = ttlMax;
|
||||
clientPingResponse = (long) (ttlMax / heartBeatToTtlModifier);
|
||||
}
|
||||
ActiveMQServerLogger.LOGGER.info("Setting TTL to: " + connectionTtl);
|
||||
if (ActiveMQServerLogger.LOGGER.isDebugEnabled()) {
|
||||
ActiveMQServerLogger.LOGGER.debug("Setting STOMP client TTL to: " + connectionTtl);
|
||||
}
|
||||
connectionEntry.ttl = connectionTtl;
|
||||
|
||||
if (clientAcceptPing != 0) {
|
||||
|
|
|
@ -41,6 +41,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
|||
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException;
|
||||
import org.apache.activemq.artemis.api.core.BaseInterceptor;
|
||||
import org.apache.activemq.artemis.api.core.Pair;
|
||||
import org.apache.activemq.artemis.api.core.TransportConfiguration;
|
||||
import org.apache.activemq.artemis.core.config.Configuration;
|
||||
import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection;
|
||||
|
@ -688,11 +689,10 @@ public class RemotingServiceImpl implements RemotingService, ServerConnectionLif
|
|||
@Override
|
||||
public void run() {
|
||||
while (!closed) {
|
||||
ActiveMQServerLogger.LOGGER.info("Checking...");
|
||||
try {
|
||||
long now = System.currentTimeMillis();
|
||||
|
||||
Set<Object> idsToRemove = new HashSet<>();
|
||||
Set<Pair<Object, Long>> toRemove = new HashSet<>();
|
||||
|
||||
for (ConnectionEntry entry : connections.values()) {
|
||||
final RemotingConnection conn = entry.connection;
|
||||
|
@ -702,7 +702,7 @@ public class RemotingServiceImpl implements RemotingService, ServerConnectionLif
|
|||
if (entry.ttl != -1) {
|
||||
if (!conn.checkDataReceived()) {
|
||||
if (now >= entry.lastCheck + entry.ttl) {
|
||||
idsToRemove.add(conn.getID());
|
||||
toRemove.add(new Pair<>(conn.getID(), entry.ttl));
|
||||
|
||||
flush = false;
|
||||
}
|
||||
|
@ -731,8 +731,8 @@ public class RemotingServiceImpl implements RemotingService, ServerConnectionLif
|
|||
}
|
||||
}
|
||||
|
||||
for (Object id : idsToRemove) {
|
||||
final RemotingConnection conn = getConnection(id);
|
||||
for (final Pair<Object, Long> pair : toRemove) {
|
||||
final RemotingConnection conn = getConnection(pair.getA());
|
||||
if (conn != null) {
|
||||
// In certain cases (replicationManager for instance) calling fail could take some time
|
||||
// We can't pause the FailureCheckAndFlushThread as that would lead other clients to fail for
|
||||
|
@ -740,10 +740,10 @@ public class RemotingServiceImpl implements RemotingService, ServerConnectionLif
|
|||
flushExecutor.execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
conn.fail(ActiveMQMessageBundle.BUNDLE.clientExited(conn.getRemoteAddress()));
|
||||
conn.fail(ActiveMQMessageBundle.BUNDLE.clientExited(conn.getRemoteAddress(), pair.getB()));
|
||||
}
|
||||
});
|
||||
removeConnection(id);
|
||||
removeConnection(pair.getA());
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -100,12 +100,8 @@ public interface ActiveMQMessageBundle {
|
|||
ActiveMQInternalErrorException bindingNotDivert(SimpleString name);
|
||||
|
||||
@Message(id = 119014,
|
||||
value = "Did not receive data from {0}. It is likely the client has exited or crashed without " +
|
||||
"closing its connection, or the network between the server and client has failed. " +
|
||||
"You also might have configured connection-ttl and client-failure-check-period incorrectly. " +
|
||||
"Please check user manual for more information." +
|
||||
" The connection will now be closed.", format = Message.Format.MESSAGE_FORMAT)
|
||||
ActiveMQConnectionTimedOutException clientExited(String remoteAddress);
|
||||
value = "Did not receive data from {0} within the {1}ms connection TTL. The connection will now be closed.", format = Message.Format.MESSAGE_FORMAT)
|
||||
ActiveMQConnectionTimedOutException clientExited(String remoteAddress, long ttl);
|
||||
|
||||
@Message(id = 119017, value = "Queue {0} does not exist", format = Message.Format.MESSAGE_FORMAT)
|
||||
ActiveMQNonExistentQueueException noSuchQueue(SimpleString queueName);
|
||||
|
|
|
@ -320,7 +320,7 @@ not to be considered late and trigger a disconnect.
|
|||
|
||||
The minimum and maximum connection TTL allowed can also be specified on the
|
||||
acceptor via the `connectionTtlMin` and `connectionTtlMax` properties respectively.
|
||||
The default `connectionTtlMin` is 500 and the default `connectionTtlMax` is Java's
|
||||
The default `connectionTtlMin` is 1000 and the default `connectionTtlMax` is Java's
|
||||
`Long.MAX_VALUE` meaning there essentially is no max connection TTL by default.
|
||||
Keep in mind that the `heartBeatConnectionTtlModifer` is relevant here. For
|
||||
example, if a client sends a `heart-beat` header of `20000,0` and the acceptor
|
||||
|
|
|
@ -66,6 +66,8 @@ public class StompTest extends StompTestBase {
|
|||
|
||||
Thread.sleep(5000);
|
||||
|
||||
assertTrue(receiveFrame(index, 10000).indexOf(Stomp.Responses.ERROR) != -1);
|
||||
|
||||
assertChannelClosed(index);
|
||||
}
|
||||
|
||||
|
|
|
@ -761,6 +761,8 @@ public class StompV11Test extends StompV11TestBase {
|
|||
|
||||
Thread.sleep(3000);
|
||||
|
||||
assertEquals(Stomp.Responses.ERROR, connection.receiveFrame(1000).getCommand());
|
||||
|
||||
assertEquals(0, connection.getFrameQueueSize());
|
||||
|
||||
try {
|
||||
|
@ -790,6 +792,8 @@ public class StompV11Test extends StompV11TestBase {
|
|||
|
||||
Thread.sleep(3000);
|
||||
|
||||
assertEquals(Stomp.Responses.ERROR, connection.receiveFrame(1000).getCommand());
|
||||
|
||||
assertEquals(0, connection.getFrameQueueSize());
|
||||
|
||||
try {
|
||||
|
|
Loading…
Reference in New Issue