ARTEMIS-2870: Transfer connection close/failure listeners one by one on reattachment
Previously, when a session was reattached, all the close/failure listeners were removed from the old connection and set onto the new connection. This only worked when at most 1 session of the old connection was transferred: When the second session was transferred, the old connection already didn't contain any close/failure listeners anymore, and therefore the list of close/failure listeners was overwritten by an empty list for the new connection. Now, when a session is being transferred, it only transfers the close/failure listeners that belong to it, which are the session itself + the TempQueueCleanerUppers. Modified a test to check whether the sessions are failure listeners of the new connection after reattachment.
This commit is contained in:
parent
02ec0bfe42
commit
1d1a9433bc
|
@ -1004,9 +1004,6 @@ public class ServerSessionPacketHandler implements ChannelHandler {
|
|||
// before we have transferred the connection, leaving it in a started state
|
||||
session.setTransferring(true);
|
||||
|
||||
List<CloseListener> closeListeners = remotingConnection.removeCloseListeners();
|
||||
List<FailureListener> failureListeners = remotingConnection.removeFailureListeners();
|
||||
|
||||
// Note. We do not destroy the replicating connection here. In the case the live server has really crashed
|
||||
// then the connection will get cleaned up anyway when the server ping timeout kicks in.
|
||||
// In the case the live server is really still up, i.e. a split brain situation (or in tests), then closing
|
||||
|
@ -1024,9 +1021,6 @@ public class ServerSessionPacketHandler implements ChannelHandler {
|
|||
|
||||
remotingConnection = newConnection;
|
||||
|
||||
remotingConnection.setCloseListeners(closeListeners);
|
||||
remotingConnection.setFailureListeners(failureListeners);
|
||||
|
||||
int serverLastReceivedCommandID = channel.getLastConfirmedCommandID();
|
||||
|
||||
channel.replayCommands(lastReceivedCommandID);
|
||||
|
|
|
@ -1075,7 +1075,26 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
|
|||
|
||||
@Override
|
||||
public void transferConnection(RemotingConnection newConnection) {
|
||||
remotingConnection = newConnection;
|
||||
synchronized (this) {
|
||||
// Remove failure listeners from old connection
|
||||
remotingConnection.removeFailureListener(this);
|
||||
tempQueueCleannerUppers.values()
|
||||
.forEach(cleanerUpper -> {
|
||||
remotingConnection.removeCloseListener(cleanerUpper);
|
||||
remotingConnection.removeFailureListener(cleanerUpper);
|
||||
});
|
||||
|
||||
// Set the new connection
|
||||
remotingConnection = newConnection;
|
||||
|
||||
// Add failure listeners to new connection
|
||||
newConnection.addFailureListener(this);
|
||||
tempQueueCleannerUppers.values()
|
||||
.forEach(cleanerUpper -> {
|
||||
newConnection.addCloseListener(cleanerUpper);
|
||||
newConnection.addFailureListener(cleanerUpper);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
package org.apache.activemq.artemis.tests.integration.cluster.reattach;
|
||||
|
||||
import java.util.Objects;
|
||||
import java.util.Set;
|
||||
import java.util.Timer;
|
||||
import java.util.TimerTask;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
|
@ -148,6 +149,43 @@ public class ReattachTest extends ActiveMQTestBase {
|
|||
sf.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReattachTransferConnectionOnSession2() throws Exception {
|
||||
final long retryInterval = 50;
|
||||
final double retryMultiplier = 1d;
|
||||
final int reconnectAttempts = 10;
|
||||
|
||||
locator.setRetryInterval(retryInterval).setRetryIntervalMultiplier(retryMultiplier).setReconnectAttempts(reconnectAttempts).setConfirmationWindowSize(1024 * 1024);
|
||||
ClientSessionFactoryInternal sf = (ClientSessionFactoryInternal) createSessionFactory(locator);
|
||||
ClientSession session = sf.createSession(false, true, true);
|
||||
ClientSession secondSession = sf.createSession(false, true, true);
|
||||
|
||||
// there's only one connection on the broker
|
||||
Object originalConnectionID = ((ServerSession) server.getSessions().toArray()[0]).getConnectionID();
|
||||
RemotingConnection oldConnection = ((ServerSession) server.getSessions().toArray()[0]).getRemotingConnection();
|
||||
|
||||
// ensure sessions are set as failure listeners on old connection
|
||||
Set<ServerSession> originalServerSessions = server.getSessions();
|
||||
assertTrue(oldConnection.getFailureListeners().containsAll(originalServerSessions));
|
||||
|
||||
// trigger re-attach
|
||||
((ClientSessionInternal) session).getConnection().fail(new ActiveMQNotConnectedException());
|
||||
|
||||
session.start();
|
||||
secondSession.start();
|
||||
|
||||
assertFalse(Objects.equals(((ServerSession) server.getSessions().toArray()[0]).getConnectionID(), originalConnectionID));
|
||||
|
||||
// ensure sessions were removed as failure listeners of old connection and are now failure listeners of new connection
|
||||
assertTrue(originalServerSessions.stream().noneMatch(oldConnection.getFailureListeners()::contains));
|
||||
RemotingConnection newConnection = ((ServerSession) server.getSessions().toArray()[0]).getRemotingConnection();
|
||||
assertTrue(newConnection.getFailureListeners().containsAll(originalServerSessions));
|
||||
|
||||
session.close();
|
||||
secondSession.close();
|
||||
sf.close();
|
||||
}
|
||||
|
||||
/*
|
||||
* Test failure on connection, but server is still up so should immediately reconnect
|
||||
*/
|
||||
|
|
Loading…
Reference in New Issue