ARTEMIS-2875 - retry to reattach sessions on failed failover for the specified amount of times set in reconnectAttempts parameter

This commit is contained in:
Stefan Krutzler 2020-09-04 09:53:56 +02:00
parent ec1c5a96c7
commit 1783aa15cb
2 changed files with 21 additions and 7 deletions

View File

@ -632,7 +632,21 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
connector = null; connector = null;
reconnectSessions(oldConnection, reconnectAttempts, me); boolean allSessionReconnected;
int failedReconnectSessionsCounter = 0;
do {
allSessionReconnected = reconnectSessions(oldConnection, reconnectAttempts, me);
if (oldConnection != null) {
oldConnection.destroy();
}
if (!allSessionReconnected) {
failedReconnectSessionsCounter++;
oldConnection = connection;
connection = null;
}
}
while ((reconnectAttempts == -1 || failedReconnectSessionsCounter < reconnectAttempts) && !allSessionReconnected);
if (oldConnection != null) { if (oldConnection != null) {
oldConnection.destroy(); oldConnection.destroy();
@ -750,7 +764,7 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
/* /*
* Re-attach sessions all pre-existing sessions to the new remoting connection * Re-attach sessions all pre-existing sessions to the new remoting connection
*/ */
private void reconnectSessions(final RemotingConnection oldConnection, private boolean reconnectSessions(final RemotingConnection oldConnection,
final int reconnectAttempts, final int reconnectAttempts,
final ActiveMQException cause) { final ActiveMQException cause) {
HashSet<ClientSessionInternal> sessionsToFailover; HashSet<ClientSessionInternal> sessionsToFailover;
@ -768,7 +782,7 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
if (!clientProtocolManager.isAlive()) if (!clientProtocolManager.isAlive())
ActiveMQClientLogger.LOGGER.failedToConnectToServer(); ActiveMQClientLogger.LOGGER.failedToConnectToServer();
return; return true;
} }
List<FailureListener> oldListeners = oldConnection.getFailureListeners(); List<FailureListener> oldListeners = oldConnection.getFailureListeners();
@ -790,11 +804,11 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
for (ClientSessionInternal session : sessionsToFailover) { for (ClientSessionInternal session : sessionsToFailover) {
if (!session.handleFailover(connection, cause)) { if (!session.handleFailover(connection, cause)) {
connection.destroy(); return false;
this.connection = null;
return;
} }
} }
return true;
} }
private void getConnectionWithRetry(final int reconnectAttempts, RemotingConnection oldConnection) { private void getConnectionWithRetry(final int reconnectAttempts, RemotingConnection oldConnection) {

View File

@ -361,7 +361,7 @@ public class ReconnectTest extends ActiveMQTestBase {
final long retryInterval = 50; final long retryInterval = 50;
final double retryMultiplier = 1d; final double retryMultiplier = 1d;
final int reconnectAttempts = 10; final int reconnectAttempts = 1;
ServerLocator locator = createFactory(true).setCallTimeout(2000).setRetryInterval(retryInterval).setRetryIntervalMultiplier(retryMultiplier).setReconnectAttempts(reconnectAttempts).setConfirmationWindowSize(-1); ServerLocator locator = createFactory(true).setCallTimeout(2000).setRetryInterval(retryInterval).setRetryIntervalMultiplier(retryMultiplier).setReconnectAttempts(reconnectAttempts).setConfirmationWindowSize(-1);
ClientSessionFactoryInternal sf = (ClientSessionFactoryInternal) createSessionFactory(locator); ClientSessionFactoryInternal sf = (ClientSessionFactoryInternal) createSessionFactory(locator);
final CountDownLatch latch = new CountDownLatch(1); final CountDownLatch latch = new CountDownLatch(1);