From 3b1f6eee7d942706f40b4e35f1dc9dd0537b42a7 Mon Sep 17 00:00:00 2001 From: Markus Meierhofer Date: Tue, 1 Jun 2021 11:53:26 +0200 Subject: [PATCH] ARTEMIS-3337: Correctly handle multiple connection failures Previously, when during reconnect one session couldn't be transferred to the new connection, we instantly returned and didn't execute failover for the other sessions. This produced the issue that for sessions where no failover was executed, their channels were still present on the old connection. When the old connection was then destroyed, these channels were closed although the reconnect was still ongoing, which lead to "dead" sessions. Now, if a session failover fails, for the remaining sessions the "client-side" part of failover is executed, which removes the sessions from the old connection so that they are not closed when the old connection is closed afterwards. --- .../core/client/impl/ClientSessionFactoryImpl.java | 13 ++++++++++--- .../protocol/core/impl/ActiveMQSessionContext.java | 10 +++++++--- .../artemis/spi/core/remoting/SessionContext.java | 5 +++++ 3 files changed, 22 insertions(+), 6 deletions(-) diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java index 00ffb7ceb5..40fb156777 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java @@ -802,13 +802,20 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C // it needs to be done on the protocol ((CoreRemotingConnection) connection).syncIDGeneratorSequence(((CoreRemotingConnection) oldConnection).getIDGeneratorSequence()); + boolean sessionFailoverError = false; for (ClientSessionInternal session : sessionsToFailover) { - if (!session.handleFailover(connection, cause)) { - return false; + if (sessionFailoverError) { + // If 1 session had a failover error, just detach the remaining sessions from the old connection so that + // they are not closed when the old connection is destroyed + session.getSessionContext().transferConnection(connection); + } else { + if (!session.handleFailover(connection, cause)) { + sessionFailoverError = true; + } } } - return true; + return !sessionFailoverError; } private void getConnectionWithRetry(final int reconnectAttempts, RemotingConnection oldConnection) { diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java index 899fc2b59a..4e122a9c87 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java @@ -822,12 +822,16 @@ public class ActiveMQSessionContext extends SessionContext { } } + @Override + public void transferConnection(RemotingConnection newConnection) { + this.remotingConnection = newConnection; + sessionChannel.transferConnection((CoreRemotingConnection) newConnection); + } + @Override public boolean reattachOnNewConnection(RemotingConnection newConnection) throws ActiveMQException { - this.remotingConnection = newConnection; - - sessionChannel.transferConnection((CoreRemotingConnection) newConnection); + transferConnection(newConnection); Packet request = new ReattachSessionMessage(name, sessionChannel.getLastConfirmedCommandID()); diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java index 1f6f4fbbce..cbe9dd8b54 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java @@ -69,6 +69,11 @@ public abstract class SessionContext { public abstract int getReconnectID(); + /** + * Transfers the session context to the given newConnection on the client-side + */ + public abstract void transferConnection(RemotingConnection newConnection); + /** * it will either reattach or reconnect, preferably reattaching it. *