[ARTEMIS-642] Disable slow client reconnecting with KILL slow client policy
This commit is contained in:
parent
c13a9764c2
commit
a741642a48
|
@ -48,6 +48,7 @@ import org.apache.activemq.artemis.api.core.client.SessionFailureListener;
|
||||||
import org.apache.activemq.artemis.core.client.ActiveMQClientLogger;
|
import org.apache.activemq.artemis.core.client.ActiveMQClientLogger;
|
||||||
import org.apache.activemq.artemis.core.client.ActiveMQClientMessageBundle;
|
import org.apache.activemq.artemis.core.client.ActiveMQClientMessageBundle;
|
||||||
import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection;
|
import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection;
|
||||||
|
import org.apache.activemq.artemis.core.protocol.core.impl.ActiveMQSessionContext;
|
||||||
import org.apache.activemq.artemis.core.remoting.FailureListener;
|
import org.apache.activemq.artemis.core.remoting.FailureListener;
|
||||||
import org.apache.activemq.artemis.core.remoting.impl.TransportConfigurationUtil;
|
import org.apache.activemq.artemis.core.remoting.impl.TransportConfigurationUtil;
|
||||||
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
|
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
|
||||||
|
@ -528,6 +529,16 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
|
||||||
String scaleDownTargetNodeID) {
|
String scaleDownTargetNodeID) {
|
||||||
ActiveMQClientLogger.LOGGER.failoverOrReconnect(connectionID, me);
|
ActiveMQClientLogger.LOGGER.failoverOrReconnect(connectionID, me);
|
||||||
|
|
||||||
|
for (ClientSessionInternal session : sessions) {
|
||||||
|
SessionContext context = session.getSessionContext();
|
||||||
|
if (context instanceof ActiveMQSessionContext) {
|
||||||
|
ActiveMQSessionContext sessionContext = (ActiveMQSessionContext)context;
|
||||||
|
if (sessionContext.isKilled()) {
|
||||||
|
setReconnectAttempts(0);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
Set<ClientSessionInternal> sessionsToClose = null;
|
Set<ClientSessionInternal> sessionsToClose = null;
|
||||||
if (!clientProtocolManager.isAlive())
|
if (!clientProtocolManager.isAlive())
|
||||||
return;
|
return;
|
||||||
|
@ -1028,6 +1039,10 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
|
||||||
reconnectAttempts = attempts;
|
reconnectAttempts = attempts;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public int getReconnectAttempts() {
|
||||||
|
return reconnectAttempts;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Object getConnector() {
|
public Object getConnector() {
|
||||||
return connector;
|
return connector;
|
||||||
|
|
|
@ -1787,4 +1787,9 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public SessionContext getSessionContext() {
|
||||||
|
return sessionContext;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -25,6 +25,7 @@ import org.apache.activemq.artemis.api.core.client.SendAcknowledgementHandler;
|
||||||
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
|
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
|
||||||
import org.apache.activemq.artemis.spi.core.remoting.ConsumerContext;
|
import org.apache.activemq.artemis.spi.core.remoting.ConsumerContext;
|
||||||
import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
|
import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
|
||||||
|
import org.apache.activemq.artemis.spi.core.remoting.SessionContext;
|
||||||
|
|
||||||
public interface ClientSessionInternal extends ClientSession {
|
public interface ClientSessionInternal extends ClientSession {
|
||||||
|
|
||||||
|
@ -130,4 +131,6 @@ public interface ClientSessionInternal extends ClientSession {
|
||||||
String getNodeId();
|
String getNodeId();
|
||||||
|
|
||||||
boolean isWritable(ReadyListener callback);
|
boolean isWritable(ReadyListener callback);
|
||||||
|
|
||||||
|
SessionContext getSessionContext();
|
||||||
}
|
}
|
||||||
|
|
|
@ -54,6 +54,7 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateQueu
|
||||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateSessionMessage;
|
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateSessionMessage;
|
||||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateSharedQueueMessage;
|
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateSharedQueueMessage;
|
||||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.DisconnectConsumerMessage;
|
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.DisconnectConsumerMessage;
|
||||||
|
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.DisconnectConsumerWithKillMessage;
|
||||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReattachSessionMessage;
|
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReattachSessionMessage;
|
||||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReattachSessionResponseMessage;
|
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReattachSessionResponseMessage;
|
||||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.RollbackMessage;
|
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.RollbackMessage;
|
||||||
|
@ -117,6 +118,7 @@ public class ActiveMQSessionContext extends SessionContext {
|
||||||
private final int serverVersion;
|
private final int serverVersion;
|
||||||
private int confirmationWindow;
|
private int confirmationWindow;
|
||||||
private String name;
|
private String name;
|
||||||
|
private boolean killed;
|
||||||
|
|
||||||
protected Channel getSessionChannel() {
|
protected Channel getSessionChannel() {
|
||||||
return sessionChannel;
|
return sessionChannel;
|
||||||
|
@ -162,6 +164,14 @@ public class ActiveMQSessionContext extends SessionContext {
|
||||||
return sessionChannel.getReconnectID();
|
return sessionChannel.getReconnectID();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public boolean isKilled() {
|
||||||
|
return killed;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void kill() {
|
||||||
|
this.killed = true;
|
||||||
|
}
|
||||||
|
|
||||||
private final CommandConfirmationHandler confirmationHandler = new CommandConfirmationHandler() {
|
private final CommandConfirmationHandler confirmationHandler = new CommandConfirmationHandler() {
|
||||||
@Override
|
@Override
|
||||||
public void commandConfirmed(final Packet packet) {
|
public void commandConfirmed(final Packet packet) {
|
||||||
|
@ -759,6 +769,12 @@ public class ActiveMQSessionContext extends SessionContext {
|
||||||
handleReceiveProducerFailCredits(message.getAddress(), message.getCredits());
|
handleReceiveProducerFailCredits(message.getAddress(), message.getCredits());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected void handleReceiveSlowConsumerKillMessage(DisconnectConsumerWithKillMessage message) {
|
||||||
|
if (message.getNodeID() != null) {
|
||||||
|
kill();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
class ClientSessionPacketHandler implements ChannelHandler {
|
class ClientSessionPacketHandler implements ChannelHandler {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -796,6 +812,11 @@ public class ActiveMQSessionContext extends SessionContext {
|
||||||
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
case PacketImpl.DISCONNECT_CONSUMER_KILL: {
|
||||||
|
handleReceiveSlowConsumerKillMessage((DisconnectConsumerWithKillMessage) packet);
|
||||||
|
|
||||||
|
break;
|
||||||
|
}
|
||||||
case EXCEPTION: {
|
case EXCEPTION: {
|
||||||
// We can only log these exceptions
|
// We can only log these exceptions
|
||||||
// maybe we should cache it on SessionContext and throw an exception on any next calls
|
// maybe we should cache it on SessionContext and throw an exception on any next calls
|
||||||
|
|
|
@ -46,6 +46,8 @@ public class PacketImpl implements Packet {
|
||||||
|
|
||||||
public static final byte DISCONNECT_CONSUMER = 12;
|
public static final byte DISCONNECT_CONSUMER = 12;
|
||||||
|
|
||||||
|
public static final byte DISCONNECT_CONSUMER_KILL = 13;
|
||||||
|
|
||||||
// Miscellaneous
|
// Miscellaneous
|
||||||
public static final byte EXCEPTION = 20;
|
public static final byte EXCEPTION = 20;
|
||||||
|
|
||||||
|
|
|
@ -32,6 +32,7 @@ import org.apache.activemq.artemis.core.protocol.core.Channel;
|
||||||
import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection;
|
import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection;
|
||||||
import org.apache.activemq.artemis.core.protocol.core.Packet;
|
import org.apache.activemq.artemis.core.protocol.core.Packet;
|
||||||
import org.apache.activemq.artemis.core.protocol.core.impl.ChannelImpl.CHANNEL_ID;
|
import org.apache.activemq.artemis.core.protocol.core.impl.ChannelImpl.CHANNEL_ID;
|
||||||
|
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.DisconnectConsumerWithKillMessage;
|
||||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.DisconnectMessage;
|
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.DisconnectMessage;
|
||||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.DisconnectMessage_V2;
|
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.DisconnectMessage_V2;
|
||||||
import org.apache.activemq.artemis.core.security.ActiveMQPrincipal;
|
import org.apache.activemq.artemis.core.security.ActiveMQPrincipal;
|
||||||
|
@ -387,4 +388,15 @@ public class RemotingConnectionImpl extends AbstractRemotingConnection implement
|
||||||
public String getClientID() {
|
public String getClientID() {
|
||||||
return clientID;
|
return clientID;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void killMessage(SimpleString nodeID) {
|
||||||
|
if (clientVersion < DisconnectConsumerWithKillMessage.VERSION_INTRODUCED) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
Channel clientChannel = getChannel(1, -1);
|
||||||
|
DisconnectConsumerWithKillMessage response = new DisconnectConsumerWithKillMessage(nodeID);
|
||||||
|
|
||||||
|
clientChannel.send(response, -1);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,60 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
|
||||||
|
|
||||||
|
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
||||||
|
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||||
|
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
|
||||||
|
|
||||||
|
public class DisconnectConsumerWithKillMessage extends PacketImpl {
|
||||||
|
|
||||||
|
private SimpleString nodeID;
|
||||||
|
|
||||||
|
public static final int VERSION_INTRODUCED = 128;
|
||||||
|
|
||||||
|
public DisconnectConsumerWithKillMessage(final SimpleString nodeID) {
|
||||||
|
super(DISCONNECT_CONSUMER_KILL);
|
||||||
|
this.nodeID = nodeID;
|
||||||
|
}
|
||||||
|
|
||||||
|
public DisconnectConsumerWithKillMessage() {
|
||||||
|
super(DISCONNECT_CONSUMER_KILL);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void encodeRest(final ActiveMQBuffer buffer) {
|
||||||
|
buffer.writeNullableSimpleString(nodeID);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void decodeRest(final ActiveMQBuffer buffer) {
|
||||||
|
nodeID = buffer.readNullableSimpleString();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
StringBuffer buff = new StringBuffer(getParentString());
|
||||||
|
buff.append(", nodeID=" + nodeID);
|
||||||
|
buff.append("]");
|
||||||
|
return buff.toString();
|
||||||
|
}
|
||||||
|
|
||||||
|
public SimpleString getNodeID() {
|
||||||
|
return nodeID;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -20,6 +20,7 @@ import java.util.List;
|
||||||
|
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
||||||
|
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||||
import org.apache.activemq.artemis.core.remoting.CloseListener;
|
import org.apache.activemq.artemis.core.remoting.CloseListener;
|
||||||
import org.apache.activemq.artemis.core.remoting.FailureListener;
|
import org.apache.activemq.artemis.core.remoting.FailureListener;
|
||||||
import org.apache.activemq.artemis.spi.core.remoting.BufferHandler;
|
import org.apache.activemq.artemis.spi.core.remoting.BufferHandler;
|
||||||
|
@ -184,4 +185,8 @@ public interface RemotingConnection extends BufferHandler {
|
||||||
|
|
||||||
boolean isWritable(ReadyListener callback);
|
boolean isWritable(ReadyListener callback);
|
||||||
|
|
||||||
|
/**
|
||||||
|
*if slow consumer is killed,send the msessage to client.
|
||||||
|
*/
|
||||||
|
void killMessage(SimpleString nodeID);
|
||||||
}
|
}
|
||||||
|
|
|
@ -20,4 +20,4 @@ activemq.version.minorVersion=${activemq.version.minorVersion}
|
||||||
activemq.version.microVersion=${activemq.version.microVersion}
|
activemq.version.microVersion=${activemq.version.microVersion}
|
||||||
activemq.version.incrementingVersion=${activemq.version.incrementingVersion}
|
activemq.version.incrementingVersion=${activemq.version.incrementingVersion}
|
||||||
activemq.version.versionTag=${activemq.version.versionTag}
|
activemq.version.versionTag=${activemq.version.versionTag}
|
||||||
activemq.version.compatibleVersionList=121,122,123,124,125,126,127
|
activemq.version.compatibleVersionList=121,122,123,124,125,126,127,128
|
||||||
|
|
|
@ -20,6 +20,7 @@ import java.util.concurrent.Executor;
|
||||||
|
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
||||||
|
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||||
import org.apache.activemq.artemis.core.client.ActiveMQClientLogger;
|
import org.apache.activemq.artemis.core.client.ActiveMQClientLogger;
|
||||||
import org.apache.activemq.artemis.spi.core.protocol.AbstractRemotingConnection;
|
import org.apache.activemq.artemis.spi.core.protocol.AbstractRemotingConnection;
|
||||||
import org.apache.activemq.artemis.spi.core.remoting.Connection;
|
import org.apache.activemq.artemis.spi.core.remoting.Connection;
|
||||||
|
@ -133,4 +134,9 @@ public class ActiveMQProtonRemotingConnection extends AbstractRemotingConnection
|
||||||
// We close the underlying transport connection
|
// We close the underlying transport connection
|
||||||
getTransportConnection().close();
|
getTransportConnection().close();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void killMessage(SimpleString nodeID) {
|
||||||
|
//unsupported
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -24,6 +24,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
||||||
|
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||||
import org.apache.activemq.artemis.core.remoting.CloseListener;
|
import org.apache.activemq.artemis.core.remoting.CloseListener;
|
||||||
import org.apache.activemq.artemis.core.remoting.FailureListener;
|
import org.apache.activemq.artemis.core.remoting.FailureListener;
|
||||||
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
|
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
|
||||||
|
@ -210,4 +211,9 @@ public class MQTTConnection implements RemotingConnection {
|
||||||
public boolean getConnected() {
|
public boolean getConnected() {
|
||||||
return connected;
|
return connected;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void killMessage(SimpleString nodeID) {
|
||||||
|
//unsupported
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1412,4 +1412,9 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
|
||||||
return xaException;
|
return xaException;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void killMessage(SimpleString nodeID) {
|
||||||
|
//unsupported
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -728,4 +728,9 @@ public final class StompConnection implements RemotingConnection {
|
||||||
return manager;
|
return manager;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void killMessage(SimpleString nodeID) {
|
||||||
|
//unsupported
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -60,6 +60,7 @@ import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
|
||||||
import org.apache.activemq.artemis.core.postoffice.impl.PostOfficeImpl;
|
import org.apache.activemq.artemis.core.postoffice.impl.PostOfficeImpl;
|
||||||
import org.apache.activemq.artemis.core.remoting.server.RemotingService;
|
import org.apache.activemq.artemis.core.remoting.server.RemotingService;
|
||||||
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
|
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
|
||||||
|
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||||
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
|
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
|
||||||
import org.apache.activemq.artemis.core.server.Consumer;
|
import org.apache.activemq.artemis.core.server.Consumer;
|
||||||
import org.apache.activemq.artemis.core.server.HandleStatus;
|
import org.apache.activemq.artemis.core.server.HandleStatus;
|
||||||
|
@ -2989,7 +2990,8 @@ public class QueueImpl implements Queue {
|
||||||
}
|
}
|
||||||
else if (consumerRate < threshold) {
|
else if (consumerRate < threshold) {
|
||||||
RemotingConnection connection = null;
|
RemotingConnection connection = null;
|
||||||
RemotingService remotingService = ((PostOfficeImpl) postOffice).getServer().getRemotingService();
|
ActiveMQServer server = ((PostOfficeImpl) postOffice).getServer();
|
||||||
|
RemotingService remotingService = server.getRemotingService();
|
||||||
|
|
||||||
for (RemotingConnection potentialConnection : remotingService.getConnections()) {
|
for (RemotingConnection potentialConnection : remotingService.getConnections()) {
|
||||||
if (potentialConnection.getID().toString().equals(serverConsumer.getConnectionID())) {
|
if (potentialConnection.getID().toString().equals(serverConsumer.getConnectionID())) {
|
||||||
|
@ -3002,6 +3004,7 @@ public class QueueImpl implements Queue {
|
||||||
if (connection != null) {
|
if (connection != null) {
|
||||||
ActiveMQServerLogger.LOGGER.slowConsumerDetected(serverConsumer.getSessionID(), serverConsumer.getID(), getName().toString(), connection.getRemoteAddress(), threshold, consumerRate);
|
ActiveMQServerLogger.LOGGER.slowConsumerDetected(serverConsumer.getSessionID(), serverConsumer.getID(), getName().toString(), connection.getRemoteAddress(), threshold, consumerRate);
|
||||||
if (policy.equals(SlowConsumerPolicy.KILL)) {
|
if (policy.equals(SlowConsumerPolicy.KILL)) {
|
||||||
|
connection.killMessage(server.getNodeID());
|
||||||
remotingService.removeConnection(connection.getID());
|
remotingService.removeConnection(connection.getID());
|
||||||
connection.fail(ActiveMQMessageBundle.BUNDLE.connectionsClosedByManagement(connection.getRemoteAddress()));
|
connection.fail(ActiveMQMessageBundle.BUNDLE.connectionsClosedByManagement(connection.getRemoteAddress()));
|
||||||
}
|
}
|
||||||
|
|
2
pom.xml
2
pom.xml
|
@ -91,7 +91,7 @@
|
||||||
<activemq.version.majorVersion>1</activemq.version.majorVersion>
|
<activemq.version.majorVersion>1</activemq.version.majorVersion>
|
||||||
<activemq.version.minorVersion>0</activemq.version.minorVersion>
|
<activemq.version.minorVersion>0</activemq.version.minorVersion>
|
||||||
<activemq.version.microVersion>0</activemq.version.microVersion>
|
<activemq.version.microVersion>0</activemq.version.microVersion>
|
||||||
<activemq.version.incrementingVersion>127,126,125,124,123,122</activemq.version.incrementingVersion>
|
<activemq.version.incrementingVersion>128,127,126,125,124,123,122</activemq.version.incrementingVersion>
|
||||||
<activemq.version.versionTag>${project.version}</activemq.version.versionTag>
|
<activemq.version.versionTag>${project.version}</activemq.version.versionTag>
|
||||||
<ActiveMQ-Version>${project.version}(${activemq.version.incrementingVersion})</ActiveMQ-Version>
|
<ActiveMQ-Version>${project.version}(${activemq.version.incrementingVersion})</ActiveMQ-Version>
|
||||||
|
|
||||||
|
|
|
@ -18,6 +18,7 @@ package org.apache.activemq.artemis.tests.integration.client;
|
||||||
|
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
|
import java.util.Set;
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
@ -36,9 +37,12 @@ import org.apache.activemq.artemis.api.core.client.MessageHandler;
|
||||||
import org.apache.activemq.artemis.api.core.client.ServerLocator;
|
import org.apache.activemq.artemis.api.core.client.ServerLocator;
|
||||||
import org.apache.activemq.artemis.api.core.management.CoreNotificationType;
|
import org.apache.activemq.artemis.api.core.management.CoreNotificationType;
|
||||||
import org.apache.activemq.artemis.api.core.management.ManagementHelper;
|
import org.apache.activemq.artemis.api.core.management.ManagementHelper;
|
||||||
|
import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl;
|
||||||
|
import org.apache.activemq.artemis.core.remoting.server.RemotingService;
|
||||||
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||||
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
||||||
import org.apache.activemq.artemis.core.settings.impl.SlowConsumerPolicy;
|
import org.apache.activemq.artemis.core.settings.impl.SlowConsumerPolicy;
|
||||||
|
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
|
||||||
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
|
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
|
||||||
import org.apache.activemq.artemis.utils.RandomUtil;
|
import org.apache.activemq.artemis.utils.RandomUtil;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
|
@ -114,6 +118,40 @@ public class SlowConsumerTest extends ActiveMQTestBase {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testDisableSlowConsumerReconnectWithKilled() throws Exception {
|
||||||
|
ClientSessionFactory sf = createSessionFactory(locator);
|
||||||
|
|
||||||
|
ClientSession session = addClientSession(sf.createSession(false, true, true, false));
|
||||||
|
|
||||||
|
session.createQueue(QUEUE, QUEUE, null, false);
|
||||||
|
|
||||||
|
ClientProducer producer = addClientProducer(session.createProducer(QUEUE));
|
||||||
|
|
||||||
|
final int numMessages = 25;
|
||||||
|
|
||||||
|
for (int i = 0; i < numMessages; i++) {
|
||||||
|
producer.send(createTextMessage(session, "m" + i));
|
||||||
|
}
|
||||||
|
|
||||||
|
ClientConsumer consumer = addClientConsumer(session.createConsumer(QUEUE));
|
||||||
|
session.start();
|
||||||
|
|
||||||
|
Thread.sleep(3000);
|
||||||
|
|
||||||
|
RemotingService service = server.getRemotingService();
|
||||||
|
Set<RemotingConnection> connections = service.getConnections();
|
||||||
|
assertTrue(connections.isEmpty());
|
||||||
|
|
||||||
|
if (sf instanceof ClientSessionFactoryImpl) {
|
||||||
|
int reconnectAttemps = ((ClientSessionFactoryImpl)sf).getReconnectAttempts();
|
||||||
|
assertEquals(0, reconnectAttemps);
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
fail("ClientSessionFactory is not the instance of ClientSessionFactoryImpl");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testSlowConsumerNotification() throws Exception {
|
public void testSlowConsumerNotification() throws Exception {
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue