From 65782cb4578aaf85c47c2e4774203b133ebd7713 Mon Sep 17 00:00:00 2001 From: Domenico Francesco Bruscino Date: Mon, 14 Jun 2021 21:48:51 +0200 Subject: [PATCH] ARTEMIS-3337 Add test on multiple connection failures --- .../cluster/failover/FailoverTest.java | 51 +++++++++++++++++++ 1 file changed, 51 insertions(+) diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTest.java index e0b72662ef..b79c418329 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTest.java @@ -31,6 +31,7 @@ import java.util.concurrent.TimeUnit; import org.apache.activemq.artemis.api.core.ActiveMQDuplicateIdException; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQExceptionType; +import org.apache.activemq.artemis.api.core.ActiveMQInternalErrorException; import org.apache.activemq.artemis.api.core.ActiveMQObjectClosedException; import org.apache.activemq.artemis.api.core.ActiveMQTransactionOutcomeUnknownException; import org.apache.activemq.artemis.api.core.ActiveMQTransactionRolledBackException; @@ -49,6 +50,12 @@ import org.apache.activemq.artemis.api.core.client.ServerLocator; import org.apache.activemq.artemis.api.core.client.SessionFailureListener; import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl; import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal; +import org.apache.activemq.artemis.core.protocol.core.Channel; +import org.apache.activemq.artemis.core.protocol.core.Packet; +import org.apache.activemq.artemis.core.protocol.core.impl.ChannelImpl; +import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl; +import org.apache.activemq.artemis.core.protocol.core.impl.RemotingConnectionImpl; +import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ActiveMQExceptionMessage; import org.apache.activemq.artemis.core.server.cluster.ha.BackupPolicy; import org.apache.activemq.artemis.core.server.cluster.ha.HAPolicy; import org.apache.activemq.artemis.core.server.cluster.ha.ReplicaPolicy; @@ -59,6 +66,7 @@ import org.apache.activemq.artemis.core.server.files.FileMoveManager; import org.apache.activemq.artemis.core.server.impl.InVMNodeManager; import org.apache.activemq.artemis.core.transaction.impl.XidImpl; import org.apache.activemq.artemis.jms.client.ActiveMQTextMessage; +import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; import org.apache.activemq.artemis.tests.integration.cluster.util.TestableServer; import org.apache.activemq.artemis.tests.util.CountDownSessionFailureListener; import org.apache.activemq.artemis.tests.util.TransportConfigurationUtils; @@ -1911,6 +1919,49 @@ public class FailoverTest extends FailoverTestBase { receiveMessages(consumer); } + @Test(timeout = 120000) + public void testMultipleSessionFailover() throws Exception { + final String address = "TEST"; + locator.setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setBlockOnAcknowledge(true).setReconnectAttempts(300).setRetryInterval(100); + + sf = createSessionFactoryAndWaitForTopology(locator, 2); + + ClientSession session1 = createSession(sf, true, true, 0); + ClientSession session2 = createSession(sf, true, true, 0); + + backupServer.addInterceptor( + new Interceptor() { + private int index = 0; + + @Override + public boolean intercept(Packet packet, RemotingConnection connection) throws ActiveMQException { + if (packet.getType() == PacketImpl.CREATESESSION) { + index++; + if (index == 2 || index == 3) { + Channel sessionChannel = ((RemotingConnectionImpl) connection).getChannel(ChannelImpl.CHANNEL_ID.SESSION.id, -1); + sessionChannel.send(new ActiveMQExceptionMessage(new ActiveMQInternalErrorException())); + return false; + } + } + return true; + } + }); + + session1.start(); + session2.start(); + + crash(session1, session2); + + session1.createQueue(new QueueConfiguration(address).setAddress(address)); + + ClientProducer clientProducer = session1.createProducer(address); + clientProducer.send(session1.createMessage(false)); + + ClientConsumer clientConsumer = session2.createConsumer(address); + ClientMessage message = clientConsumer.receive(3000); + Assert.assertNotNull(message); + } + @Test(timeout = 120000) public void testForceBlockingReturn() throws Exception { locator.setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setBlockOnAcknowledge(true).setReconnectAttempts(300).setRetryInterval(100);