ARTEMIS-3337: Correctly handle multiple connection failures

Previously, when during reconnect one session couldn't be transferred
to the new connection, we instantly returned and didn't execute failover
for the other sessions. This produced the issue that for sessions
where no failover was executed, their channels were still present on the
old connection. When the old connection was then destroyed, these channels
were closed although the reconnect was still ongoing, which lead to
"dead" sessions.

Now, if a session failover fails, for the remaining sessions the "client-side" part
of failover is executed, which removes the sessions from the old connection so that
they are not closed when the old connection is closed afterwards.
This commit is contained in:
Markus Meierhofer 2021-06-01 11:53:26 +02:00 committed by clebertsuconic
parent a7089d0050
commit 3b1f6eee7d
3 changed files with 22 additions and 6 deletions

View File

@ -802,13 +802,20 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
// it needs to be done on the protocol
((CoreRemotingConnection) connection).syncIDGeneratorSequence(((CoreRemotingConnection) oldConnection).getIDGeneratorSequence());
boolean sessionFailoverError = false;
for (ClientSessionInternal session : sessionsToFailover) {
if (!session.handleFailover(connection, cause)) {
return false;
if (sessionFailoverError) {
// If 1 session had a failover error, just detach the remaining sessions from the old connection so that
// they are not closed when the old connection is destroyed
session.getSessionContext().transferConnection(connection);
} else {
if (!session.handleFailover(connection, cause)) {
sessionFailoverError = true;
}
}
}
return true;
return !sessionFailoverError;
}
private void getConnectionWithRetry(final int reconnectAttempts, RemotingConnection oldConnection) {

View File

@ -822,12 +822,16 @@ public class ActiveMQSessionContext extends SessionContext {
}
}
@Override
public void transferConnection(RemotingConnection newConnection) {
this.remotingConnection = newConnection;
sessionChannel.transferConnection((CoreRemotingConnection) newConnection);
}
@Override
public boolean reattachOnNewConnection(RemotingConnection newConnection) throws ActiveMQException {
this.remotingConnection = newConnection;
sessionChannel.transferConnection((CoreRemotingConnection) newConnection);
transferConnection(newConnection);
Packet request = new ReattachSessionMessage(name, sessionChannel.getLastConfirmedCommandID());

View File

@ -69,6 +69,11 @@ public abstract class SessionContext {
public abstract int getReconnectID();
/**
* Transfers the session context to the given newConnection on the client-side
*/
public abstract void transferConnection(RemotingConnection newConnection);
/**
* it will either reattach or reconnect, preferably reattaching it.
*