diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java index d3b19090bd..5993c7a666 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java @@ -1004,9 +1004,6 @@ public class ServerSessionPacketHandler implements ChannelHandler { // before we have transferred the connection, leaving it in a started state session.setTransferring(true); - List closeListeners = remotingConnection.removeCloseListeners(); - List failureListeners = remotingConnection.removeFailureListeners(); - // Note. We do not destroy the replicating connection here. In the case the live server has really crashed // then the connection will get cleaned up anyway when the server ping timeout kicks in. // In the case the live server is really still up, i.e. a split brain situation (or in tests), then closing @@ -1024,9 +1021,6 @@ public class ServerSessionPacketHandler implements ChannelHandler { remotingConnection = newConnection; - remotingConnection.setCloseListeners(closeListeners); - remotingConnection.setFailureListeners(failureListeners); - int serverLastReceivedCommandID = channel.getLastConfirmedCommandID(); channel.replayCommands(lastReceivedCommandID); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java index 05d9eb800f..12a080e18e 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java @@ -1075,7 +1075,26 @@ public class ServerSessionImpl implements ServerSession, FailureListener { @Override public void transferConnection(RemotingConnection newConnection) { - remotingConnection = newConnection; + synchronized (this) { + // Remove failure listeners from old connection + remotingConnection.removeFailureListener(this); + tempQueueCleannerUppers.values() + .forEach(cleanerUpper -> { + remotingConnection.removeCloseListener(cleanerUpper); + remotingConnection.removeFailureListener(cleanerUpper); + }); + + // Set the new connection + remotingConnection = newConnection; + + // Add failure listeners to new connection + newConnection.addFailureListener(this); + tempQueueCleannerUppers.values() + .forEach(cleanerUpper -> { + newConnection.addCloseListener(cleanerUpper); + newConnection.addFailureListener(cleanerUpper); + }); + } } @Override diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/reattach/ReattachTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/reattach/ReattachTest.java index 15bc09157c..b5a6236d62 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/reattach/ReattachTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/reattach/ReattachTest.java @@ -17,6 +17,7 @@ package org.apache.activemq.artemis.tests.integration.cluster.reattach; import java.util.Objects; +import java.util.Set; import java.util.Timer; import java.util.TimerTask; import java.util.concurrent.CountDownLatch; @@ -148,6 +149,43 @@ public class ReattachTest extends ActiveMQTestBase { sf.close(); } + @Test + public void testReattachTransferConnectionOnSession2() throws Exception { + final long retryInterval = 50; + final double retryMultiplier = 1d; + final int reconnectAttempts = 10; + + locator.setRetryInterval(retryInterval).setRetryIntervalMultiplier(retryMultiplier).setReconnectAttempts(reconnectAttempts).setConfirmationWindowSize(1024 * 1024); + ClientSessionFactoryInternal sf = (ClientSessionFactoryInternal) createSessionFactory(locator); + ClientSession session = sf.createSession(false, true, true); + ClientSession secondSession = sf.createSession(false, true, true); + + // there's only one connection on the broker + Object originalConnectionID = ((ServerSession) server.getSessions().toArray()[0]).getConnectionID(); + RemotingConnection oldConnection = ((ServerSession) server.getSessions().toArray()[0]).getRemotingConnection(); + + // ensure sessions are set as failure listeners on old connection + Set originalServerSessions = server.getSessions(); + assertTrue(oldConnection.getFailureListeners().containsAll(originalServerSessions)); + + // trigger re-attach + ((ClientSessionInternal) session).getConnection().fail(new ActiveMQNotConnectedException()); + + session.start(); + secondSession.start(); + + assertFalse(Objects.equals(((ServerSession) server.getSessions().toArray()[0]).getConnectionID(), originalConnectionID)); + + // ensure sessions were removed as failure listeners of old connection and are now failure listeners of new connection + assertTrue(originalServerSessions.stream().noneMatch(oldConnection.getFailureListeners()::contains)); + RemotingConnection newConnection = ((ServerSession) server.getSessions().toArray()[0]).getRemotingConnection(); + assertTrue(newConnection.getFailureListeners().containsAll(originalServerSessions)); + + session.close(); + secondSession.close(); + sf.close(); + } + /* * Test failure on connection, but server is still up so should immediately reconnect */