From 6d2b96c79e75fb230ec0fd9f34d7910d7644593f Mon Sep 17 00:00:00 2001 From: Domenico Francesco Bruscino Date: Tue, 22 Jun 2021 09:52:48 +0200 Subject: [PATCH] ARTEMIS-3275 Lock CORE client communication during failover retries --- .../client/impl/ClientSessionFactoryImpl.java | 38 ++++++++----- .../core/client/impl/ClientSessionImpl.java | 54 +++++++++++-------- .../client/impl/ClientSessionInternal.java | 2 + .../artemis/core/protocol/core/Channel.java | 7 +++ .../core/impl/ActiveMQSessionContext.java | 2 +- .../core/protocol/core/impl/ChannelImpl.java | 5 ++ .../cluster/failover/FailoverTest.java | 41 ++++++++++++++ .../cluster/util/BackupSyncDelay.java | 5 ++ 8 files changed, 118 insertions(+), 36 deletions(-) diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java index 40fb156777..92236f4aa4 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java @@ -632,10 +632,19 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C connector = null; - boolean allSessionReconnected; + HashSet sessionsToFailover; + synchronized (sessions) { + sessionsToFailover = new HashSet<>(sessions); + } + + for (ClientSessionInternal session : sessionsToFailover) { + session.preHandleFailover(connection); + } + + boolean allSessionReconnected = false; int failedReconnectSessionsCounter = 0; do { - allSessionReconnected = reconnectSessions(oldConnection, reconnectAttempts, me); + allSessionReconnected = reconnectSessions(sessionsToFailover, oldConnection, reconnectAttempts, me); if (oldConnection != null) { oldConnection.destroy(); } @@ -644,10 +653,19 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C failedReconnectSessionsCounter++; oldConnection = connection; connection = null; + + // Wait for retry when the connection is established but not all session are reconnected. + if ((reconnectAttempts == -1 || failedReconnectSessionsCounter < reconnectAttempts) && oldConnection != null) { + waitForRetry(retryInterval); + } } } while ((reconnectAttempts == -1 || failedReconnectSessionsCounter < reconnectAttempts) && !allSessionReconnected); + for (ClientSessionInternal session : sessionsToFailover) { + session.postHandleFailover(connection, allSessionReconnected); + } + if (oldConnection != null) { oldConnection.destroy(); } @@ -764,18 +782,10 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C /* * Re-attach sessions all pre-existing sessions to the new remoting connection */ - private boolean reconnectSessions(final RemotingConnection oldConnection, - final int reconnectAttempts, - final ActiveMQException cause) { - HashSet sessionsToFailover; - synchronized (sessions) { - sessionsToFailover = new HashSet<>(sessions); - } - - for (ClientSessionInternal session : sessionsToFailover) { - session.preHandleFailover(connection); - } - + private boolean reconnectSessions(final Set sessionsToFailover, + final RemotingConnection oldConnection, + final int reconnectAttempts, + final ActiveMQException cause) { getConnectionWithRetry(reconnectAttempts, oldConnection); if (connection == null) { diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java index d3a66a5682..33e5a77f6c 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java @@ -141,6 +141,8 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi private volatile boolean mayAttemptToFailover = true; + private volatile boolean resetCreditManager = false; + /** * Current XID. this will be used in case of failover */ @@ -1387,8 +1389,6 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi return true; } - boolean resetCreditManager = false; - try { // TODO remove this and encapsulate it @@ -1463,30 +1463,42 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi } catch (Throwable t) { ActiveMQClientLogger.LOGGER.failedToHandleFailover(t); suc = false; - } finally { - sessionContext.releaseCommunications(); - } - - if (resetCreditManager) { - synchronized (producerCreditManager) { - producerCreditManager.reset(); - } - - // Also need to send more credits for consumers, otherwise the system could hand with the server - // not having any credits to send } } - HashMap metaDataToSend; - - synchronized (metadata) { - metaDataToSend = new HashMap<>(metadata); - } - - sessionContext.resetMetadata(metaDataToSend); - return suc; + } + @Override + public void postHandleFailover(RemotingConnection connection, boolean successful) { + sessionContext.releaseCommunications(); + + if (successful) { + synchronized (this) { + if (closed) { + return; + } + + if (resetCreditManager) { + synchronized (producerCreditManager) { + producerCreditManager.reset(); + } + + resetCreditManager = false; + + // Also need to send more credits for consumers, otherwise the system could hand with the server + // not having any credits to send + } + } + + HashMap metaDataToSend; + + synchronized (metadata) { + metaDataToSend = new HashMap<>(metadata); + } + + sessionContext.resetMetadata(metaDataToSend); + } } @Override diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionInternal.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionInternal.java index a3700b2e50..173087d9cc 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionInternal.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionInternal.java @@ -68,6 +68,8 @@ public interface ClientSessionInternal extends ClientSession { boolean handleFailover(RemotingConnection backupConnection, ActiveMQException cause); + void postHandleFailover(RemotingConnection connection, boolean successful); + RemotingConnection getConnection(); void cleanUp(boolean failingOver) throws ActiveMQException; diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Channel.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Channel.java index 372cad4db3..12817294c6 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Channel.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Channel.java @@ -184,6 +184,13 @@ public interface Channel { */ int getLastConfirmedCommandID(); + /** + * queries if this channel is locked. This method is designed for use in monitoring of the system state, not for synchronization control. + * + * @return true it the channel is locked and false otherwise + */ + boolean isLocked(); + /** * locks the channel. *

diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java index 4e122a9c87..811ebef669 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java @@ -139,7 +139,7 @@ public class ActiveMQSessionContext extends SessionContext { private String name; private boolean killed; - protected Channel getSessionChannel() { + public Channel getSessionChannel() { return sessionChannel; } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java index 4ecd2c63de..18bb08cc55 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java @@ -689,6 +689,11 @@ public final class ChannelImpl implements Channel { } } + @Override + public boolean isLocked() { + return failingOver; + } + @Override public void lock() { if (logger.isTraceEnabled()) { diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTest.java index 3dcf9a9d22..43756f1b58 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTest.java @@ -27,6 +27,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.activemq.artemis.api.core.ActiveMQDuplicateIdException; import org.apache.activemq.artemis.api.core.ActiveMQException; @@ -50,8 +51,10 @@ import org.apache.activemq.artemis.api.core.client.ServerLocator; import org.apache.activemq.artemis.api.core.client.SessionFailureListener; import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl; import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal; +import org.apache.activemq.artemis.core.client.impl.ClientSessionInternal; import org.apache.activemq.artemis.core.protocol.core.Channel; import org.apache.activemq.artemis.core.protocol.core.Packet; +import org.apache.activemq.artemis.core.protocol.core.impl.ActiveMQSessionContext; import org.apache.activemq.artemis.core.protocol.core.impl.ChannelImpl; import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl; import org.apache.activemq.artemis.core.protocol.core.impl.RemotingConnectionImpl; @@ -1971,6 +1974,44 @@ public class FailoverTest extends FailoverTestBase { Assert.assertNotNull(message); } + @Test(timeout = 120000) + public void testChannelStateDuringFailover() throws Exception { + locator.setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setBlockOnAcknowledge(true).setReconnectAttempts(300).setRetryInterval(100); + + sf = createSessionFactoryAndWaitForTopology(locator, 2); + + final AtomicBoolean channelLockedDuringFailover = new AtomicBoolean(false); + + ClientSession session = createSession(sf, true, true, 0); + + backupServer.addInterceptor( + new Interceptor() { + private int index = 0; + + @Override + public boolean intercept(Packet packet, RemotingConnection connection) throws ActiveMQException { + if (index < 1 && packet.getType() == PacketImpl.CREATESESSION) { + sf.getConnection().addCloseListener(() -> { + index++; + ActiveMQSessionContext sessionContext = (ActiveMQSessionContext)((ClientSessionInternal)session).getSessionContext(); + channelLockedDuringFailover.set(sessionContext.getSessionChannel().isLocked()); + }); + + Channel sessionChannel = ((RemotingConnectionImpl)connection).getChannel(ChannelImpl.CHANNEL_ID.SESSION.id, -1); + sessionChannel.send(new ActiveMQExceptionMessage(new ActiveMQInternalErrorException())); + return false; + } + return true; + } + }); + + session.start(); + + crash(session); + + Assert.assertTrue(channelLockedDuringFailover.get()); + } + @Test(timeout = 120000) public void testForceBlockingReturn() throws Exception { locator.setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setBlockOnAcknowledge(true).setReconnectAttempts(300).setRetryInterval(100); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/util/BackupSyncDelay.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/util/BackupSyncDelay.java index b0af71b367..287d730efc 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/util/BackupSyncDelay.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/util/BackupSyncDelay.java @@ -300,6 +300,11 @@ public class BackupSyncDelay implements Interceptor { throw new UnsupportedOperationException(); } + @Override + public boolean isLocked() { + return false; + } + @Override public void lock() { throw new UnsupportedOperationException();