From a741642a489d36dadb972feec39afbc4176be1e3 Mon Sep 17 00:00:00 2001 From: bayern39 Date: Fri, 15 Jul 2016 15:33:42 +0800 Subject: [PATCH] [ARTEMIS-642] Disable slow client reconnecting with KILL slow client policy --- .../client/impl/ClientSessionFactoryImpl.java | 15 +++++ .../core/client/impl/ClientSessionImpl.java | 5 ++ .../client/impl/ClientSessionInternal.java | 3 + .../core/impl/ActiveMQSessionContext.java | 21 +++++++ .../core/protocol/core/impl/PacketImpl.java | 2 + .../core/impl/RemotingConnectionImpl.java | 12 ++++ .../DisconnectConsumerWithKillMessage.java | 60 +++++++++++++++++++ .../spi/core/protocol/RemotingConnection.java | 5 ++ .../resources/activemq-version.properties | 2 +- .../ActiveMQProtonRemotingConnection.java | 6 ++ .../core/protocol/mqtt/MQTTConnection.java | 6 ++ .../protocol/openwire/OpenWireConnection.java | 5 ++ .../core/protocol/stomp/StompConnection.java | 5 ++ .../artemis/core/server/impl/QueueImpl.java | 5 +- pom.xml | 2 +- .../integration/client/SlowConsumerTest.java | 38 ++++++++++++ 16 files changed, 189 insertions(+), 3 deletions(-) create mode 100644 artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/DisconnectConsumerWithKillMessage.java diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java index 96b40592cb..c944fa1eee 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java @@ -48,6 +48,7 @@ import org.apache.activemq.artemis.api.core.client.SessionFailureListener; import org.apache.activemq.artemis.core.client.ActiveMQClientLogger; import org.apache.activemq.artemis.core.client.ActiveMQClientMessageBundle; import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection; +import org.apache.activemq.artemis.core.protocol.core.impl.ActiveMQSessionContext; import org.apache.activemq.artemis.core.remoting.FailureListener; import org.apache.activemq.artemis.core.remoting.impl.TransportConfigurationUtil; import org.apache.activemq.artemis.core.server.ActiveMQComponent; @@ -528,6 +529,16 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C String scaleDownTargetNodeID) { ActiveMQClientLogger.LOGGER.failoverOrReconnect(connectionID, me); + for (ClientSessionInternal session : sessions) { + SessionContext context = session.getSessionContext(); + if (context instanceof ActiveMQSessionContext) { + ActiveMQSessionContext sessionContext = (ActiveMQSessionContext)context; + if (sessionContext.isKilled()) { + setReconnectAttempts(0); + } + } + } + Set sessionsToClose = null; if (!clientProtocolManager.isAlive()) return; @@ -1028,6 +1039,10 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C reconnectAttempts = attempts; } + public int getReconnectAttempts() { + return reconnectAttempts; + } + @Override public Object getConnector() { return connector; diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java index bec10fbeaa..8b0fc8ecba 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java @@ -1787,4 +1787,9 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi } }); } + + @Override + public SessionContext getSessionContext() { + return sessionContext; + } } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionInternal.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionInternal.java index b25e1a869e..b082786054 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionInternal.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionInternal.java @@ -25,6 +25,7 @@ import org.apache.activemq.artemis.api.core.client.SendAcknowledgementHandler; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; import org.apache.activemq.artemis.spi.core.remoting.ConsumerContext; import org.apache.activemq.artemis.spi.core.remoting.ReadyListener; +import org.apache.activemq.artemis.spi.core.remoting.SessionContext; public interface ClientSessionInternal extends ClientSession { @@ -130,4 +131,6 @@ public interface ClientSessionInternal extends ClientSession { String getNodeId(); boolean isWritable(ReadyListener callback); + + SessionContext getSessionContext(); } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java index f49a22a9d9..32f2d14a51 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java @@ -54,6 +54,7 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateQueu import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateSessionMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateSharedQueueMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.DisconnectConsumerMessage; +import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.DisconnectConsumerWithKillMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReattachSessionMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReattachSessionResponseMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.RollbackMessage; @@ -117,6 +118,7 @@ public class ActiveMQSessionContext extends SessionContext { private final int serverVersion; private int confirmationWindow; private String name; + private boolean killed; protected Channel getSessionChannel() { return sessionChannel; @@ -162,6 +164,14 @@ public class ActiveMQSessionContext extends SessionContext { return sessionChannel.getReconnectID(); } + public boolean isKilled() { + return killed; + } + + public void kill() { + this.killed = true; + } + private final CommandConfirmationHandler confirmationHandler = new CommandConfirmationHandler() { @Override public void commandConfirmed(final Packet packet) { @@ -759,6 +769,12 @@ public class ActiveMQSessionContext extends SessionContext { handleReceiveProducerFailCredits(message.getAddress(), message.getCredits()); } + protected void handleReceiveSlowConsumerKillMessage(DisconnectConsumerWithKillMessage message) { + if (message.getNodeID() != null) { + kill(); + } + } + class ClientSessionPacketHandler implements ChannelHandler { @Override @@ -796,6 +812,11 @@ public class ActiveMQSessionContext extends SessionContext { break; } + case PacketImpl.DISCONNECT_CONSUMER_KILL: { + handleReceiveSlowConsumerKillMessage((DisconnectConsumerWithKillMessage) packet); + + break; + } case EXCEPTION: { // We can only log these exceptions // maybe we should cache it on SessionContext and throw an exception on any next calls diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java index ac1edf7f2a..6dddf3bd4b 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java @@ -46,6 +46,8 @@ public class PacketImpl implements Packet { public static final byte DISCONNECT_CONSUMER = 12; + public static final byte DISCONNECT_CONSUMER_KILL = 13; + // Miscellaneous public static final byte EXCEPTION = 20; diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/RemotingConnectionImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/RemotingConnectionImpl.java index b051519e00..f7dfa32281 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/RemotingConnectionImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/RemotingConnectionImpl.java @@ -32,6 +32,7 @@ import org.apache.activemq.artemis.core.protocol.core.Channel; import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection; import org.apache.activemq.artemis.core.protocol.core.Packet; import org.apache.activemq.artemis.core.protocol.core.impl.ChannelImpl.CHANNEL_ID; +import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.DisconnectConsumerWithKillMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.DisconnectMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.DisconnectMessage_V2; import org.apache.activemq.artemis.core.security.ActiveMQPrincipal; @@ -387,4 +388,15 @@ public class RemotingConnectionImpl extends AbstractRemotingConnection implement public String getClientID() { return clientID; } + + @Override + public void killMessage(SimpleString nodeID) { + if (clientVersion < DisconnectConsumerWithKillMessage.VERSION_INTRODUCED) { + return; + } + Channel clientChannel = getChannel(1, -1); + DisconnectConsumerWithKillMessage response = new DisconnectConsumerWithKillMessage(nodeID); + + clientChannel.send(response, -1); + } } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/DisconnectConsumerWithKillMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/DisconnectConsumerWithKillMessage.java new file mode 100644 index 0000000000..ce55bbdb0b --- /dev/null +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/DisconnectConsumerWithKillMessage.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.protocol.core.impl.wireformat; + +import org.apache.activemq.artemis.api.core.ActiveMQBuffer; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl; + +public class DisconnectConsumerWithKillMessage extends PacketImpl { + + private SimpleString nodeID; + + public static final int VERSION_INTRODUCED = 128; + + public DisconnectConsumerWithKillMessage(final SimpleString nodeID) { + super(DISCONNECT_CONSUMER_KILL); + this.nodeID = nodeID; + } + + public DisconnectConsumerWithKillMessage() { + super(DISCONNECT_CONSUMER_KILL); + } + + @Override + public void encodeRest(final ActiveMQBuffer buffer) { + buffer.writeNullableSimpleString(nodeID); + } + + @Override + public void decodeRest(final ActiveMQBuffer buffer) { + nodeID = buffer.readNullableSimpleString(); + } + + @Override + public String toString() { + StringBuffer buff = new StringBuffer(getParentString()); + buff.append(", nodeID=" + nodeID); + buff.append("]"); + return buff.toString(); + } + + public SimpleString getNodeID() { + return nodeID; + } + +} diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/protocol/RemotingConnection.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/protocol/RemotingConnection.java index 078e42e35f..fe1a087aee 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/protocol/RemotingConnection.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/protocol/RemotingConnection.java @@ -20,6 +20,7 @@ import java.util.List; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQException; +import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.remoting.CloseListener; import org.apache.activemq.artemis.core.remoting.FailureListener; import org.apache.activemq.artemis.spi.core.remoting.BufferHandler; @@ -184,4 +185,8 @@ public interface RemotingConnection extends BufferHandler { boolean isWritable(ReadyListener callback); + /** + *if slow consumer is killed,send the msessage to client. + */ + void killMessage(SimpleString nodeID); } diff --git a/artemis-core-client/src/main/resources/activemq-version.properties b/artemis-core-client/src/main/resources/activemq-version.properties index 07685e1074..b6f4af27c2 100644 --- a/artemis-core-client/src/main/resources/activemq-version.properties +++ b/artemis-core-client/src/main/resources/activemq-version.properties @@ -20,4 +20,4 @@ activemq.version.minorVersion=${activemq.version.minorVersion} activemq.version.microVersion=${activemq.version.microVersion} activemq.version.incrementingVersion=${activemq.version.incrementingVersion} activemq.version.versionTag=${activemq.version.versionTag} -activemq.version.compatibleVersionList=121,122,123,124,125,126,127 +activemq.version.compatibleVersionList=121,122,123,124,125,126,127,128 diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/ActiveMQProtonRemotingConnection.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/ActiveMQProtonRemotingConnection.java index 0edd6b955c..670ca5b963 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/ActiveMQProtonRemotingConnection.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/ActiveMQProtonRemotingConnection.java @@ -20,6 +20,7 @@ import java.util.concurrent.Executor; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQException; +import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.client.ActiveMQClientLogger; import org.apache.activemq.artemis.spi.core.protocol.AbstractRemotingConnection; import org.apache.activemq.artemis.spi.core.remoting.Connection; @@ -133,4 +134,9 @@ public class ActiveMQProtonRemotingConnection extends AbstractRemotingConnection // We close the underlying transport connection getTransportConnection().close(); } + + @Override + public void killMessage(SimpleString nodeID) { + //unsupported + } } diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnection.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnection.java index 6126bb351d..f651d3d964 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnection.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnection.java @@ -24,6 +24,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQException; +import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.remoting.CloseListener; import org.apache.activemq.artemis.core.remoting.FailureListener; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; @@ -210,4 +211,9 @@ public class MQTTConnection implements RemotingConnection { public boolean getConnected() { return connected; } + + @Override + public void killMessage(SimpleString nodeID) { + //unsupported + } } diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java index a6f00df785..7c52b279d0 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java @@ -1412,4 +1412,9 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se return xaException; } + @Override + public void killMessage(SimpleString nodeID) { + //unsupported + } + } diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java index 28d0de4813..475a34c0a5 100644 --- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java +++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java @@ -728,4 +728,9 @@ public final class StompConnection implements RemotingConnection { return manager; } + @Override + public void killMessage(SimpleString nodeID) { + //unsupported + } + } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java index b8d917afb1..16b8880305 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java @@ -60,6 +60,7 @@ import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding; import org.apache.activemq.artemis.core.postoffice.impl.PostOfficeImpl; import org.apache.activemq.artemis.core.remoting.server.RemotingService; import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle; +import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.Consumer; import org.apache.activemq.artemis.core.server.HandleStatus; @@ -2989,7 +2990,8 @@ public class QueueImpl implements Queue { } else if (consumerRate < threshold) { RemotingConnection connection = null; - RemotingService remotingService = ((PostOfficeImpl) postOffice).getServer().getRemotingService(); + ActiveMQServer server = ((PostOfficeImpl) postOffice).getServer(); + RemotingService remotingService = server.getRemotingService(); for (RemotingConnection potentialConnection : remotingService.getConnections()) { if (potentialConnection.getID().toString().equals(serverConsumer.getConnectionID())) { @@ -3002,6 +3004,7 @@ public class QueueImpl implements Queue { if (connection != null) { ActiveMQServerLogger.LOGGER.slowConsumerDetected(serverConsumer.getSessionID(), serverConsumer.getID(), getName().toString(), connection.getRemoteAddress(), threshold, consumerRate); if (policy.equals(SlowConsumerPolicy.KILL)) { + connection.killMessage(server.getNodeID()); remotingService.removeConnection(connection.getID()); connection.fail(ActiveMQMessageBundle.BUNDLE.connectionsClosedByManagement(connection.getRemoteAddress())); } diff --git a/pom.xml b/pom.xml index 9607ead7b0..386a95ef34 100644 --- a/pom.xml +++ b/pom.xml @@ -91,7 +91,7 @@ 1 0 0 - 127,126,125,124,123,122 + 128,127,126,125,124,123,122 ${project.version} ${project.version}(${activemq.version.incrementingVersion}) diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SlowConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SlowConsumerTest.java index 79eac1c9dd..788a810811 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SlowConsumerTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SlowConsumerTest.java @@ -18,6 +18,7 @@ package org.apache.activemq.artemis.tests.integration.client; import java.util.Arrays; import java.util.Collection; +import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; @@ -36,9 +37,12 @@ import org.apache.activemq.artemis.api.core.client.MessageHandler; import org.apache.activemq.artemis.api.core.client.ServerLocator; import org.apache.activemq.artemis.api.core.management.CoreNotificationType; import org.apache.activemq.artemis.api.core.management.ManagementHelper; +import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl; +import org.apache.activemq.artemis.core.remoting.server.RemotingService; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.core.settings.impl.SlowConsumerPolicy; +import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.apache.activemq.artemis.utils.RandomUtil; import org.junit.Before; @@ -114,6 +118,40 @@ public class SlowConsumerTest extends ActiveMQTestBase { } } + @Test + public void testDisableSlowConsumerReconnectWithKilled() throws Exception { + ClientSessionFactory sf = createSessionFactory(locator); + + ClientSession session = addClientSession(sf.createSession(false, true, true, false)); + + session.createQueue(QUEUE, QUEUE, null, false); + + ClientProducer producer = addClientProducer(session.createProducer(QUEUE)); + + final int numMessages = 25; + + for (int i = 0; i < numMessages; i++) { + producer.send(createTextMessage(session, "m" + i)); + } + + ClientConsumer consumer = addClientConsumer(session.createConsumer(QUEUE)); + session.start(); + + Thread.sleep(3000); + + RemotingService service = server.getRemotingService(); + Set connections = service.getConnections(); + assertTrue(connections.isEmpty()); + + if (sf instanceof ClientSessionFactoryImpl) { + int reconnectAttemps = ((ClientSessionFactoryImpl)sf).getReconnectAttempts(); + assertEquals(0, reconnectAttemps); + } + else { + fail("ClientSessionFactory is not the instance of ClientSessionFactoryImpl"); + } + } + @Test public void testSlowConsumerNotification() throws Exception {