diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java index b135677d77..47237b5783 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java @@ -771,7 +771,11 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C ((CoreRemotingConnection) connection).syncIDGeneratorSequence(((CoreRemotingConnection) oldConnection).getIDGeneratorSequence()); for (ClientSessionInternal session : sessionsToFailover) { - session.handleFailover(connection, cause); + if (!session.handleFailover(connection, cause)) { + connection.destroy(); + this.connection = null; + return; + } } } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java index 55054c4e8b..766ca91bdc 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java @@ -1350,10 +1350,12 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi // Needs to be synchronized to prevent issues with occurring concurrently with close() @Override - public void handleFailover(final RemotingConnection backupConnection, ActiveMQException cause) { + public boolean handleFailover(final RemotingConnection backupConnection, ActiveMQException cause) { + boolean suc = true; + synchronized (this) { if (closed) { - return; + return true; } boolean resetCreditManager = false; @@ -1426,6 +1428,7 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi } } catch (Throwable t) { ActiveMQClientLogger.LOGGER.failedToHandleFailover(t); + suc = false; } finally { sessionContext.releaseCommunications(); } @@ -1448,6 +1451,8 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi sessionContext.resetMetadata(metaDataToSend); + return suc; + } @Override diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionInternal.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionInternal.java index 3c6829af7b..a3700b2e50 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionInternal.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionInternal.java @@ -66,7 +66,7 @@ public interface ClientSessionInternal extends ClientSession { void preHandleFailover(RemotingConnection connection); - void handleFailover(RemotingConnection backupConnection, ActiveMQException cause); + boolean handleFailover(RemotingConnection backupConnection, ActiveMQException cause); RemotingConnection getConnection(); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/remoting/ReconnectTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/remoting/ReconnectTest.java index c2c9f61fab..6eadf4faf2 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/remoting/ReconnectTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/remoting/ReconnectTest.java @@ -24,6 +24,9 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import org.apache.activemq.artemis.api.core.ActiveMQException; +import org.apache.activemq.artemis.api.core.ActiveMQNotConnectedException; +import org.apache.activemq.artemis.api.core.Interceptor; +import org.apache.activemq.artemis.api.core.client.ClientSession; import org.apache.activemq.artemis.api.core.client.ClientSessionFactory; import org.apache.activemq.artemis.api.core.client.FailoverEventListener; import org.apache.activemq.artemis.api.core.client.FailoverEventType; @@ -31,8 +34,11 @@ import org.apache.activemq.artemis.api.core.client.ServerLocator; import org.apache.activemq.artemis.api.core.client.SessionFailureListener; import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal; import org.apache.activemq.artemis.core.client.impl.ClientSessionInternal; +import org.apache.activemq.artemis.core.protocol.core.Packet; +import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ServerSession; +import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.junit.Assert; import org.junit.Test; @@ -318,6 +324,48 @@ public class ReconnectTest extends ActiveMQTestBase { } + @Test + public void testReattachTimeout() throws Exception { + ActiveMQServer server = createServer(true, true); + server.start(); + // imitate session reattach timeout + Interceptor reattachInterceptor = new Interceptor() { + @Override + public boolean intercept(Packet packet, RemotingConnection connection) throws ActiveMQException { + if (packet.getType() == PacketImpl.REATTACH_SESSION) { + return false; + } else { + return true; + } + + } + }; + server.getRemotingService().addIncomingInterceptor(reattachInterceptor); + + final long retryInterval = 50; + final double retryMultiplier = 1d; + final int reconnectAttempts = 10; + ServerLocator locator = createFactory(true).setCallTimeout(2000).setRetryInterval(retryInterval).setRetryIntervalMultiplier(retryMultiplier).setReconnectAttempts(reconnectAttempts).setConfirmationWindowSize(-1); + ClientSessionFactoryInternal sf = (ClientSessionFactoryInternal) createSessionFactory(locator); + final CountDownLatch latch = new CountDownLatch(1); + sf.addFailoverListener(eventType -> { + if (eventType == FailoverEventType.FAILOVER_FAILED) { + latch.countDown(); + } + }); + + ClientSession session = sf.createSession(false, true, true); + RemotingConnection conn = ((ClientSessionInternal) session).getConnection(); + conn.fail(new ActiveMQNotConnectedException()); + + assertTrue(latch.await(1000, TimeUnit.MILLISECONDS)); + assertTrue(session.isClosed()); + + session.close(); + sf.close(); + server.stop(); + } + // Package protected --------------------------------------------- // Protected -----------------------------------------------------