This commit is contained in:
Clebert Suconic 2021-03-18 23:10:47 -04:00
commit 387553347b
3 changed files with 58 additions and 7 deletions

View File

@ -1004,9 +1004,6 @@ public class ServerSessionPacketHandler implements ChannelHandler {
// before we have transferred the connection, leaving it in a started state // before we have transferred the connection, leaving it in a started state
session.setTransferring(true); session.setTransferring(true);
List<CloseListener> closeListeners = remotingConnection.removeCloseListeners();
List<FailureListener> failureListeners = remotingConnection.removeFailureListeners();
// Note. We do not destroy the replicating connection here. In the case the live server has really crashed // Note. We do not destroy the replicating connection here. In the case the live server has really crashed
// then the connection will get cleaned up anyway when the server ping timeout kicks in. // then the connection will get cleaned up anyway when the server ping timeout kicks in.
// In the case the live server is really still up, i.e. a split brain situation (or in tests), then closing // In the case the live server is really still up, i.e. a split brain situation (or in tests), then closing
@ -1024,9 +1021,6 @@ public class ServerSessionPacketHandler implements ChannelHandler {
remotingConnection = newConnection; remotingConnection = newConnection;
remotingConnection.setCloseListeners(closeListeners);
remotingConnection.setFailureListeners(failureListeners);
int serverLastReceivedCommandID = channel.getLastConfirmedCommandID(); int serverLastReceivedCommandID = channel.getLastConfirmedCommandID();
channel.replayCommands(lastReceivedCommandID); channel.replayCommands(lastReceivedCommandID);

View File

@ -1075,7 +1075,26 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
@Override @Override
public void transferConnection(RemotingConnection newConnection) { public void transferConnection(RemotingConnection newConnection) {
remotingConnection = newConnection; synchronized (this) {
// Remove failure listeners from old connection
remotingConnection.removeFailureListener(this);
tempQueueCleannerUppers.values()
.forEach(cleanerUpper -> {
remotingConnection.removeCloseListener(cleanerUpper);
remotingConnection.removeFailureListener(cleanerUpper);
});
// Set the new connection
remotingConnection = newConnection;
// Add failure listeners to new connection
newConnection.addFailureListener(this);
tempQueueCleannerUppers.values()
.forEach(cleanerUpper -> {
newConnection.addCloseListener(cleanerUpper);
newConnection.addFailureListener(cleanerUpper);
});
}
} }
@Override @Override

View File

@ -17,6 +17,7 @@
package org.apache.activemq.artemis.tests.integration.cluster.reattach; package org.apache.activemq.artemis.tests.integration.cluster.reattach;
import java.util.Objects; import java.util.Objects;
import java.util.Set;
import java.util.Timer; import java.util.Timer;
import java.util.TimerTask; import java.util.TimerTask;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
@ -148,6 +149,43 @@ public class ReattachTest extends ActiveMQTestBase {
sf.close(); sf.close();
} }
@Test
public void testReattachTransferConnectionOnSession2() throws Exception {
final long retryInterval = 50;
final double retryMultiplier = 1d;
final int reconnectAttempts = 10;
locator.setRetryInterval(retryInterval).setRetryIntervalMultiplier(retryMultiplier).setReconnectAttempts(reconnectAttempts).setConfirmationWindowSize(1024 * 1024);
ClientSessionFactoryInternal sf = (ClientSessionFactoryInternal) createSessionFactory(locator);
ClientSession session = sf.createSession(false, true, true);
ClientSession secondSession = sf.createSession(false, true, true);
// there's only one connection on the broker
Object originalConnectionID = ((ServerSession) server.getSessions().toArray()[0]).getConnectionID();
RemotingConnection oldConnection = ((ServerSession) server.getSessions().toArray()[0]).getRemotingConnection();
// ensure sessions are set as failure listeners on old connection
Set<ServerSession> originalServerSessions = server.getSessions();
assertTrue(oldConnection.getFailureListeners().containsAll(originalServerSessions));
// trigger re-attach
((ClientSessionInternal) session).getConnection().fail(new ActiveMQNotConnectedException());
session.start();
secondSession.start();
assertFalse(Objects.equals(((ServerSession) server.getSessions().toArray()[0]).getConnectionID(), originalConnectionID));
// ensure sessions were removed as failure listeners of old connection and are now failure listeners of new connection
assertTrue(originalServerSessions.stream().noneMatch(oldConnection.getFailureListeners()::contains));
RemotingConnection newConnection = ((ServerSession) server.getSessions().toArray()[0]).getRemotingConnection();
assertTrue(newConnection.getFailureListeners().containsAll(originalServerSessions));
session.close();
secondSession.close();
sf.close();
}
/* /*
* Test failure on connection, but server is still up so should immediately reconnect * Test failure on connection, but server is still up so should immediately reconnect
*/ */