From 1d1a9433bc3bde0d65159117312dae3ce053d25c Mon Sep 17 00:00:00 2001 From: Markus Meierhofer Date: Tue, 16 Mar 2021 11:50:31 +0100 Subject: [PATCH] ARTEMIS-2870: Transfer connection close/failure listeners one by one on reattachment Previously, when a session was reattached, all the close/failure listeners were removed from the old connection and set onto the new connection. This only worked when at most 1 session of the old connection was transferred: When the second session was transferred, the old connection already didn't contain any close/failure listeners anymore, and therefore the list of close/failure listeners was overwritten by an empty list for the new connection. Now, when a session is being transferred, it only transfers the close/failure listeners that belong to it, which are the session itself + the TempQueueCleanerUppers. Modified a test to check whether the sessions are failure listeners of the new connection after reattachment. --- .../core/ServerSessionPacketHandler.java | 6 --- .../core/server/impl/ServerSessionImpl.java | 21 +++++++++- .../cluster/reattach/ReattachTest.java | 38 +++++++++++++++++++ 3 files changed, 58 insertions(+), 7 deletions(-) diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java index d3b19090bd..5993c7a666 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java @@ -1004,9 +1004,6 @@ public class ServerSessionPacketHandler implements ChannelHandler { // before we have transferred the connection, leaving it in a started state session.setTransferring(true); - List closeListeners = remotingConnection.removeCloseListeners(); - List failureListeners = remotingConnection.removeFailureListeners(); - // Note. We do not destroy the replicating connection here. In the case the live server has really crashed // then the connection will get cleaned up anyway when the server ping timeout kicks in. // In the case the live server is really still up, i.e. a split brain situation (or in tests), then closing @@ -1024,9 +1021,6 @@ public class ServerSessionPacketHandler implements ChannelHandler { remotingConnection = newConnection; - remotingConnection.setCloseListeners(closeListeners); - remotingConnection.setFailureListeners(failureListeners); - int serverLastReceivedCommandID = channel.getLastConfirmedCommandID(); channel.replayCommands(lastReceivedCommandID); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java index 05d9eb800f..12a080e18e 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java @@ -1075,7 +1075,26 @@ public class ServerSessionImpl implements ServerSession, FailureListener { @Override public void transferConnection(RemotingConnection newConnection) { - remotingConnection = newConnection; + synchronized (this) { + // Remove failure listeners from old connection + remotingConnection.removeFailureListener(this); + tempQueueCleannerUppers.values() + .forEach(cleanerUpper -> { + remotingConnection.removeCloseListener(cleanerUpper); + remotingConnection.removeFailureListener(cleanerUpper); + }); + + // Set the new connection + remotingConnection = newConnection; + + // Add failure listeners to new connection + newConnection.addFailureListener(this); + tempQueueCleannerUppers.values() + .forEach(cleanerUpper -> { + newConnection.addCloseListener(cleanerUpper); + newConnection.addFailureListener(cleanerUpper); + }); + } } @Override diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/reattach/ReattachTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/reattach/ReattachTest.java index 15bc09157c..b5a6236d62 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/reattach/ReattachTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/reattach/ReattachTest.java @@ -17,6 +17,7 @@ package org.apache.activemq.artemis.tests.integration.cluster.reattach; import java.util.Objects; +import java.util.Set; import java.util.Timer; import java.util.TimerTask; import java.util.concurrent.CountDownLatch; @@ -148,6 +149,43 @@ public class ReattachTest extends ActiveMQTestBase { sf.close(); } + @Test + public void testReattachTransferConnectionOnSession2() throws Exception { + final long retryInterval = 50; + final double retryMultiplier = 1d; + final int reconnectAttempts = 10; + + locator.setRetryInterval(retryInterval).setRetryIntervalMultiplier(retryMultiplier).setReconnectAttempts(reconnectAttempts).setConfirmationWindowSize(1024 * 1024); + ClientSessionFactoryInternal sf = (ClientSessionFactoryInternal) createSessionFactory(locator); + ClientSession session = sf.createSession(false, true, true); + ClientSession secondSession = sf.createSession(false, true, true); + + // there's only one connection on the broker + Object originalConnectionID = ((ServerSession) server.getSessions().toArray()[0]).getConnectionID(); + RemotingConnection oldConnection = ((ServerSession) server.getSessions().toArray()[0]).getRemotingConnection(); + + // ensure sessions are set as failure listeners on old connection + Set originalServerSessions = server.getSessions(); + assertTrue(oldConnection.getFailureListeners().containsAll(originalServerSessions)); + + // trigger re-attach + ((ClientSessionInternal) session).getConnection().fail(new ActiveMQNotConnectedException()); + + session.start(); + secondSession.start(); + + assertFalse(Objects.equals(((ServerSession) server.getSessions().toArray()[0]).getConnectionID(), originalConnectionID)); + + // ensure sessions were removed as failure listeners of old connection and are now failure listeners of new connection + assertTrue(originalServerSessions.stream().noneMatch(oldConnection.getFailureListeners()::contains)); + RemotingConnection newConnection = ((ServerSession) server.getSessions().toArray()[0]).getRemotingConnection(); + assertTrue(newConnection.getFailureListeners().containsAll(originalServerSessions)); + + session.close(); + secondSession.close(); + sf.close(); + } + /* * Test failure on connection, but server is still up so should immediately reconnect */