diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java index 58b9eee098..ec2dc79d8a 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java @@ -1377,7 +1377,9 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi sessionContext.resetName(name); - for (ClientConsumerInternal consumer : cloneConsumers()) { + Map clonedConsumerEntries = cloneConsumerEntries(); + + for (ClientConsumerInternal consumer : clonedConsumerEntries.values()) { consumer.clearAtFailover(); } @@ -1395,11 +1397,14 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi if (!inClose && mayAttemptToFailover) { sessionContext.recreateSession(username, password, minLargeMessageSize, xa, autoCommitSends, autoCommitAcks, preAcknowledge); - for (Map.Entry entryx : consumers.entrySet()) { + for (Map.Entry entryx : clonedConsumerEntries.entrySet()) { ClientConsumerInternal consumerInternal = entryx.getValue(); - - sessionContext.recreateConsumerOnServer(consumerInternal, entryx.getKey().getId(), started); + synchronized (consumerInternal) { + if (!consumerInternal.isClosed()) { + sessionContext.recreateConsumerOnServer(consumerInternal, entryx.getKey().getId(), started); + } + } } if ((!autoCommitAcks || !autoCommitSends) && workDone) { @@ -1414,7 +1419,7 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi // Now start the session if it was already started if (started) { - for (ClientConsumerInternal consumer : cloneConsumers()) { + for (ClientConsumerInternal consumer : clonedConsumerEntries.values()) { consumer.clearAtFailover(); consumer.start(); } @@ -2082,6 +2087,12 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi } } + public Map cloneConsumerEntries() { + synchronized (consumers) { + return new HashMap<>(consumers); + } + } + private void closeChildren() throws ActiveMQException { Set consumersClone = cloneConsumers(); diff --git a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/RaceOnClosingConsumerWhileReconnecting.java b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/RaceOnClosingConsumerWhileReconnecting.java new file mode 100644 index 0000000000..db27c0a5c7 --- /dev/null +++ b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/RaceOnClosingConsumerWhileReconnecting.java @@ -0,0 +1,187 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.tests.extras.byteman; + +import java.util.Set; + +import org.apache.activemq.artemis.api.core.ActiveMQNotConnectedException; +import org.apache.activemq.artemis.api.core.RoutingType; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.api.core.client.ClientConsumer; +import org.apache.activemq.artemis.api.core.client.ServerLocator; +import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal; +import org.apache.activemq.artemis.core.client.impl.ClientSessionInternal; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.ServerConsumer; +import org.apache.activemq.artemis.core.server.ServerSession; +import org.apache.activemq.artemis.core.server.impl.AddressInfo; +import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; +import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; +import org.jboss.byteman.contrib.bmunit.BMRule; +import org.jboss.byteman.contrib.bmunit.BMRules; +import org.jboss.byteman.contrib.bmunit.BMUnitRunner; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; + +@RunWith(BMUnitRunner.class) +public class RaceOnClosingConsumerWhileReconnecting extends ActiveMQTestBase { + static RemotingConnection conn; + + static ClientConsumer consumer; + + protected ActiveMQServer server = null; + + protected ClientSessionFactoryInternal sf = null; + + protected ClientSessionInternal session = null; + + protected final SimpleString queueName1 = new SimpleString("my_queue_one"); + + @Override + @Before + public void setUp() throws Exception { + super.setUp(); + conn = null; + consumer = null; + server = createServer(true, true); + server.start(); + + SimpleString addressName1 = new SimpleString("my_address_one"); + + server.addAddressInfo(new AddressInfo(addressName1, RoutingType.ANYCAST)); + server.createQueue(addressName1, RoutingType.ANYCAST, queueName1, null, true, false); + + final long retryInterval = 500; + final double retryMultiplier = 1d; + final int reconnectAttempts = 10; + ServerLocator locator = createFactory(true).setCallFailoverTimeout(0).setCallTimeout(2000).setRetryInterval(retryInterval).setRetryIntervalMultiplier(retryMultiplier).setReconnectAttempts(reconnectAttempts).setConfirmationWindowSize(-1); + sf = (ClientSessionFactoryInternal) createSessionFactory(locator); + session = (ClientSessionInternal)sf.createSession(false, true, true); + + } + + @Override + @After + public void tearDown() throws Exception { + if (session != null) { + session.close(); + } + if (sf != null) { + sf.close(); + } + if (server != null) { + server.stop(); + } + conn = null; + consumer = null; + super.tearDown(); + } + + @Test + @BMRules( + rules = {@BMRule( + name = "session.removeConsumer wait", + targetClass = "org.apache.activemq.artemis.core.client.impl.ClientSessionImpl", + targetMethod = "removeConsumer(org.apache.activemq.artemis.core.client.impl.ClientConsumerInternal)", + targetLocation = "ENTRY", + action = "org.apache.activemq.artemis.tests.extras.byteman.RaceOnClosingConsumerWhileReconnecting.waitForReconnection();")}) + public void testClosingConsumerBeforeReconnecting() throws Exception { + conn = session.getConnection(); + + ClientConsumer clientConsumer1 = session.createConsumer(queueName1); + ClientConsumer clientConsumer2 = session.createConsumer(queueName1); + clientConsumer1.close(); + + Thread.sleep(500); + Set serverConsumers = server.getSessionByID(session.getName()).getServerConsumers(); + ServerConsumer serverConsumer = serverConsumers.iterator().next(); + assertEquals(1, serverConsumers.size()); + assertEquals(clientConsumer2.getConsumerContext().getId(), serverConsumer.getID()); + } + + @Test + @BMRules( + rules = {@BMRule( + name = "session.closeConsumer before recreating consumer", + targetClass = "org.apache.activemq.artemis.core.client.impl.ClientSessionImpl", + targetMethod = "handleFailover", + targetLocation = "AFTER WRITE $consumerInternal 1", + action = "org.apache.activemq.artemis.tests.extras.byteman.RaceOnClosingConsumerWhileReconnecting.closeConsumer();")}) + public void testClosingConsumerBeforeRecreatingOneConsumer() throws Exception { + RemotingConnection conn = session.getConnection(); + + ClientConsumer clientConsumer1 = session.createConsumer(queueName1); + consumer = clientConsumer1; + conn.fail(new ActiveMQNotConnectedException()); + + Thread.sleep(500); + Set serverConsumers = server.getSessionByID(session.getName()).getServerConsumers(); + assertEquals(0, serverConsumers.size()); + } + + @Test + @BMRules( + rules = {@BMRule( + name = "session.closeConsumer before recreating consumer", + targetClass = "org.apache.activemq.artemis.core.client.impl.ClientSessionImpl", + targetMethod = "handleFailover", + targetLocation = "AFTER WRITE $consumerInternal 1", + action = "org.apache.activemq.artemis.tests.extras.byteman.RaceOnClosingConsumerWhileReconnecting.closeConsumer();")}) + public void testClosingConsumerBeforeRecreatingTwoConsumers() throws Exception { + RemotingConnection conn = session.getConnection(); + + ClientConsumer clientConsumer1 = session.createConsumer(queueName1); + ClientConsumer clientConsumer2 = session.createConsumer(queueName1); + consumer = clientConsumer1; + conn.fail(new ActiveMQNotConnectedException()); + + Thread.sleep(500); + ServerSession serverSession = server.getSessionByID(session.getName()); + assertNotNull(serverSession); + Set serverConsumers = serverSession.getServerConsumers(); + ServerConsumer serverConsumer = serverConsumers.iterator().next(); + assertEquals(1, serverConsumers.size()); + assertEquals(clientConsumer2.getConsumerContext().getId(), serverConsumer.getID()); + } + + public static void closeConsumer() { + if (consumer != null) { + try { + consumer.close(); + } catch (Exception e) { + e.printStackTrace(); + } finally { + consumer = null; + } + } + } + + public static void waitForReconnection() { + if (conn != null) { + try { + conn.fail(new ActiveMQNotConnectedException()); + } catch (Exception e) { + e.printStackTrace(); + } finally { + conn = null; + } + } + } +} \ No newline at end of file diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/remoting/ReconnectTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/remoting/ReconnectTest.java index c15b175737..8c9ff19e55 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/remoting/ReconnectTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/remoting/ReconnectTest.java @@ -19,6 +19,7 @@ package org.apache.activemq.artemis.tests.integration.remoting; import java.util.ArrayList; import java.util.LinkedList; import java.util.List; +import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -26,6 +27,9 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQNotConnectedException; import org.apache.activemq.artemis.api.core.Interceptor; +import org.apache.activemq.artemis.api.core.RoutingType; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.api.core.client.ClientConsumer; import org.apache.activemq.artemis.api.core.client.ClientSession; import org.apache.activemq.artemis.api.core.client.ClientSessionFactory; import org.apache.activemq.artemis.api.core.client.FailoverEventListener; @@ -37,7 +41,9 @@ import org.apache.activemq.artemis.core.client.impl.ClientSessionInternal; import org.apache.activemq.artemis.core.protocol.core.Packet; import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl; import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.ServerConsumer; import org.apache.activemq.artemis.core.server.ServerSession; +import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.junit.Assert; @@ -370,6 +376,54 @@ public class ReconnectTest extends ActiveMQTestBase { server.stop(); } + @Test + public void testClosingConsumerTimeout() throws Exception { + ActiveMQServer server = createServer(true, true); + server.start(); + + // imitate consumer close timeout + Interceptor reattachInterceptor = new Interceptor() { + boolean consumerClosed; + + @Override + public boolean intercept(Packet packet, RemotingConnection connection) throws ActiveMQException { + if (!consumerClosed && packet.getType() == PacketImpl.SESS_CONSUMER_CLOSE) { + consumerClosed = true; + return false; + } else { + return true; + } + + } + }; + server.getRemotingService().addIncomingInterceptor(reattachInterceptor); + + final long retryInterval = 500; + final double retryMultiplier = 1d; + final int reconnectAttempts = 10; + ServerLocator locator = createFactory(true).setCallTimeout(2000).setRetryInterval(retryInterval).setRetryIntervalMultiplier(retryMultiplier).setReconnectAttempts(reconnectAttempts).setConfirmationWindowSize(-1); + ClientSessionFactoryInternal sf = (ClientSessionFactoryInternal) createSessionFactory(locator); + + ClientSessionInternal session = (ClientSessionInternal)sf.createSession(false, true, true); + SimpleString queueName1 = new SimpleString("my_queue_one"); + SimpleString addressName1 = new SimpleString("my_address_one"); + + server.addAddressInfo(new AddressInfo(addressName1, RoutingType.ANYCAST)); + server.createQueue(addressName1, RoutingType.ANYCAST, queueName1, null, true, false); + ClientConsumer clientConsumer1 = session.createConsumer(queueName1); + ClientConsumer clientConsumer2 = session.createConsumer(queueName1); + clientConsumer1.close(); + + Set serverConsumers = server.getSessionByID(session.getName()).getServerConsumers(); + ServerConsumer serverConsumer = serverConsumers.iterator().next(); + assertEquals(1, serverConsumers.size()); + assertEquals(clientConsumer2.getConsumerContext().getId(), serverConsumer.getID()); + + session.close(); + sf.close(); + server.stop(); + } + // Package protected --------------------------------------------- // Protected -----------------------------------------------------