This closes #672

This commit is contained in:
Clebert Suconic 2016-08-03 18:39:58 -04:00
commit d871dfe622
16 changed files with 189 additions and 3 deletions

View File

@ -48,6 +48,7 @@ import org.apache.activemq.artemis.api.core.client.SessionFailureListener;
import org.apache.activemq.artemis.core.client.ActiveMQClientLogger;
import org.apache.activemq.artemis.core.client.ActiveMQClientMessageBundle;
import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection;
import org.apache.activemq.artemis.core.protocol.core.impl.ActiveMQSessionContext;
import org.apache.activemq.artemis.core.remoting.FailureListener;
import org.apache.activemq.artemis.core.remoting.impl.TransportConfigurationUtil;
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
@ -528,6 +529,16 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
String scaleDownTargetNodeID) {
ActiveMQClientLogger.LOGGER.failoverOrReconnect(connectionID, me);
for (ClientSessionInternal session : sessions) {
SessionContext context = session.getSessionContext();
if (context instanceof ActiveMQSessionContext) {
ActiveMQSessionContext sessionContext = (ActiveMQSessionContext)context;
if (sessionContext.isKilled()) {
setReconnectAttempts(0);
}
}
}
Set<ClientSessionInternal> sessionsToClose = null;
if (!clientProtocolManager.isAlive())
return;
@ -1028,6 +1039,10 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
reconnectAttempts = attempts;
}
public int getReconnectAttempts() {
return reconnectAttempts;
}
@Override
public Object getConnector() {
return connector;

View File

@ -1787,4 +1787,9 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
}
});
}
@Override
public SessionContext getSessionContext() {
return sessionContext;
}
}

View File

@ -25,6 +25,7 @@ import org.apache.activemq.artemis.api.core.client.SendAcknowledgementHandler;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.spi.core.remoting.ConsumerContext;
import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
import org.apache.activemq.artemis.spi.core.remoting.SessionContext;
public interface ClientSessionInternal extends ClientSession {
@ -130,4 +131,6 @@ public interface ClientSessionInternal extends ClientSession {
String getNodeId();
boolean isWritable(ReadyListener callback);
SessionContext getSessionContext();
}

View File

@ -54,6 +54,7 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateQueu
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateSessionMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateSharedQueueMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.DisconnectConsumerMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.DisconnectConsumerWithKillMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReattachSessionMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReattachSessionResponseMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.RollbackMessage;
@ -117,6 +118,7 @@ public class ActiveMQSessionContext extends SessionContext {
private final int serverVersion;
private int confirmationWindow;
private String name;
private boolean killed;
protected Channel getSessionChannel() {
return sessionChannel;
@ -162,6 +164,14 @@ public class ActiveMQSessionContext extends SessionContext {
return sessionChannel.getReconnectID();
}
public boolean isKilled() {
return killed;
}
public void kill() {
this.killed = true;
}
private final CommandConfirmationHandler confirmationHandler = new CommandConfirmationHandler() {
@Override
public void commandConfirmed(final Packet packet) {
@ -759,6 +769,12 @@ public class ActiveMQSessionContext extends SessionContext {
handleReceiveProducerFailCredits(message.getAddress(), message.getCredits());
}
protected void handleReceiveSlowConsumerKillMessage(DisconnectConsumerWithKillMessage message) {
if (message.getNodeID() != null) {
kill();
}
}
class ClientSessionPacketHandler implements ChannelHandler {
@Override
@ -796,6 +812,11 @@ public class ActiveMQSessionContext extends SessionContext {
break;
}
case PacketImpl.DISCONNECT_CONSUMER_KILL: {
handleReceiveSlowConsumerKillMessage((DisconnectConsumerWithKillMessage) packet);
break;
}
case EXCEPTION: {
// We can only log these exceptions
// maybe we should cache it on SessionContext and throw an exception on any next calls

View File

@ -46,6 +46,8 @@ public class PacketImpl implements Packet {
public static final byte DISCONNECT_CONSUMER = 12;
public static final byte DISCONNECT_CONSUMER_KILL = 13;
// Miscellaneous
public static final byte EXCEPTION = 20;

View File

@ -32,6 +32,7 @@ import org.apache.activemq.artemis.core.protocol.core.Channel;
import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection;
import org.apache.activemq.artemis.core.protocol.core.Packet;
import org.apache.activemq.artemis.core.protocol.core.impl.ChannelImpl.CHANNEL_ID;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.DisconnectConsumerWithKillMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.DisconnectMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.DisconnectMessage_V2;
import org.apache.activemq.artemis.core.security.ActiveMQPrincipal;
@ -387,4 +388,15 @@ public class RemotingConnectionImpl extends AbstractRemotingConnection implement
public String getClientID() {
return clientID;
}
@Override
public void killMessage(SimpleString nodeID) {
if (clientVersion < DisconnectConsumerWithKillMessage.VERSION_INTRODUCED) {
return;
}
Channel clientChannel = getChannel(1, -1);
DisconnectConsumerWithKillMessage response = new DisconnectConsumerWithKillMessage(nodeID);
clientChannel.send(response, -1);
}
}

View File

@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
public class DisconnectConsumerWithKillMessage extends PacketImpl {
private SimpleString nodeID;
public static final int VERSION_INTRODUCED = 128;
public DisconnectConsumerWithKillMessage(final SimpleString nodeID) {
super(DISCONNECT_CONSUMER_KILL);
this.nodeID = nodeID;
}
public DisconnectConsumerWithKillMessage() {
super(DISCONNECT_CONSUMER_KILL);
}
@Override
public void encodeRest(final ActiveMQBuffer buffer) {
buffer.writeNullableSimpleString(nodeID);
}
@Override
public void decodeRest(final ActiveMQBuffer buffer) {
nodeID = buffer.readNullableSimpleString();
}
@Override
public String toString() {
StringBuffer buff = new StringBuffer(getParentString());
buff.append(", nodeID=" + nodeID);
buff.append("]");
return buff.toString();
}
public SimpleString getNodeID() {
return nodeID;
}
}

View File

@ -20,6 +20,7 @@ import java.util.List;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.remoting.CloseListener;
import org.apache.activemq.artemis.core.remoting.FailureListener;
import org.apache.activemq.artemis.spi.core.remoting.BufferHandler;
@ -184,4 +185,8 @@ public interface RemotingConnection extends BufferHandler {
boolean isWritable(ReadyListener callback);
/**
*if slow consumer is killed,send the msessage to client.
*/
void killMessage(SimpleString nodeID);
}

View File

@ -20,4 +20,4 @@ activemq.version.minorVersion=${activemq.version.minorVersion}
activemq.version.microVersion=${activemq.version.microVersion}
activemq.version.incrementingVersion=${activemq.version.incrementingVersion}
activemq.version.versionTag=${activemq.version.versionTag}
activemq.version.compatibleVersionList=121,122,123,124,125,126,127
activemq.version.compatibleVersionList=121,122,123,124,125,126,127,128

View File

@ -20,6 +20,7 @@ import java.util.concurrent.Executor;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.client.ActiveMQClientLogger;
import org.apache.activemq.artemis.spi.core.protocol.AbstractRemotingConnection;
import org.apache.activemq.artemis.spi.core.remoting.Connection;
@ -133,4 +134,9 @@ public class ActiveMQProtonRemotingConnection extends AbstractRemotingConnection
// We close the underlying transport connection
getTransportConnection().close();
}
@Override
public void killMessage(SimpleString nodeID) {
//unsupported
}
}

View File

@ -24,6 +24,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.remoting.CloseListener;
import org.apache.activemq.artemis.core.remoting.FailureListener;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
@ -210,4 +211,9 @@ public class MQTTConnection implements RemotingConnection {
public boolean getConnected() {
return connected;
}
@Override
public void killMessage(SimpleString nodeID) {
//unsupported
}
}

View File

@ -1412,4 +1412,9 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
return xaException;
}
@Override
public void killMessage(SimpleString nodeID) {
//unsupported
}
}

View File

@ -728,4 +728,9 @@ public final class StompConnection implements RemotingConnection {
return manager;
}
@Override
public void killMessage(SimpleString nodeID) {
//unsupported
}
}

View File

@ -60,6 +60,7 @@ import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
import org.apache.activemq.artemis.core.postoffice.impl.PostOfficeImpl;
import org.apache.activemq.artemis.core.remoting.server.RemotingService;
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.Consumer;
import org.apache.activemq.artemis.core.server.HandleStatus;
@ -2989,7 +2990,8 @@ public class QueueImpl implements Queue {
}
else if (consumerRate < threshold) {
RemotingConnection connection = null;
RemotingService remotingService = ((PostOfficeImpl) postOffice).getServer().getRemotingService();
ActiveMQServer server = ((PostOfficeImpl) postOffice).getServer();
RemotingService remotingService = server.getRemotingService();
for (RemotingConnection potentialConnection : remotingService.getConnections()) {
if (potentialConnection.getID().toString().equals(serverConsumer.getConnectionID())) {
@ -3002,6 +3004,7 @@ public class QueueImpl implements Queue {
if (connection != null) {
ActiveMQServerLogger.LOGGER.slowConsumerDetected(serverConsumer.getSessionID(), serverConsumer.getID(), getName().toString(), connection.getRemoteAddress(), threshold, consumerRate);
if (policy.equals(SlowConsumerPolicy.KILL)) {
connection.killMessage(server.getNodeID());
remotingService.removeConnection(connection.getID());
connection.fail(ActiveMQMessageBundle.BUNDLE.connectionsClosedByManagement(connection.getRemoteAddress()));
}

View File

@ -91,7 +91,7 @@
<activemq.version.majorVersion>1</activemq.version.majorVersion>
<activemq.version.minorVersion>0</activemq.version.minorVersion>
<activemq.version.microVersion>0</activemq.version.microVersion>
<activemq.version.incrementingVersion>127,126,125,124,123,122</activemq.version.incrementingVersion>
<activemq.version.incrementingVersion>128,127,126,125,124,123,122</activemq.version.incrementingVersion>
<activemq.version.versionTag>${project.version}</activemq.version.versionTag>
<ActiveMQ-Version>${project.version}(${activemq.version.incrementingVersion})</ActiveMQ-Version>

View File

@ -18,6 +18,7 @@ package org.apache.activemq.artemis.tests.integration.client;
import java.util.Arrays;
import java.util.Collection;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
@ -36,9 +37,12 @@ import org.apache.activemq.artemis.api.core.client.MessageHandler;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.api.core.management.CoreNotificationType;
import org.apache.activemq.artemis.api.core.management.ManagementHelper;
import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl;
import org.apache.activemq.artemis.core.remoting.server.RemotingService;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.core.settings.impl.SlowConsumerPolicy;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.utils.RandomUtil;
import org.junit.Before;
@ -114,6 +118,40 @@ public class SlowConsumerTest extends ActiveMQTestBase {
}
}
@Test
public void testDisableSlowConsumerReconnectWithKilled() throws Exception {
ClientSessionFactory sf = createSessionFactory(locator);
ClientSession session = addClientSession(sf.createSession(false, true, true, false));
session.createQueue(QUEUE, QUEUE, null, false);
ClientProducer producer = addClientProducer(session.createProducer(QUEUE));
final int numMessages = 25;
for (int i = 0; i < numMessages; i++) {
producer.send(createTextMessage(session, "m" + i));
}
ClientConsumer consumer = addClientConsumer(session.createConsumer(QUEUE));
session.start();
Thread.sleep(3000);
RemotingService service = server.getRemotingService();
Set<RemotingConnection> connections = service.getConnections();
assertTrue(connections.isEmpty());
if (sf instanceof ClientSessionFactoryImpl) {
int reconnectAttemps = ((ClientSessionFactoryImpl)sf).getReconnectAttempts();
assertEquals(0, reconnectAttemps);
}
else {
fail("ClientSessionFactory is not the instance of ClientSessionFactoryImpl");
}
}
@Test
public void testSlowConsumerNotification() throws Exception {