diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java index 5ec11c577c..d3b19090bd 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java @@ -1018,6 +1018,8 @@ public class ServerSessionPacketHandler implements ChannelHandler { newConnection.syncIDGeneratorSequence(remotingConnection.getIDGeneratorSequence()); + session.transferConnection(newConnection); + Connection oldTransportConnection = remotingConnection.getTransportConnection(); remotingConnection = newConnection; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java index b319cda5d3..9be941200f 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java @@ -58,6 +58,8 @@ public interface ServerSession extends SecurityAuth { @Override RemotingConnection getRemotingConnection(); + void transferConnection(RemotingConnection newConnection); + Transaction newTransaction(); boolean removeConsumer(long consumerID) throws Exception; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java index dbc68f23ca..05d9eb800f 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java @@ -136,7 +136,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener { protected final boolean strictUpdateDeliveryCount; - protected final RemotingConnection remotingConnection; + protected RemotingConnection remotingConnection; protected final Map consumers = new ConcurrentHashMap<>(); @@ -1073,6 +1073,11 @@ public class ServerSessionImpl implements ServerSession, FailureListener { return remotingConnection; } + @Override + public void transferConnection(RemotingConnection newConnection) { + remotingConnection = newConnection; + } + @Override public String getSecurityDomain() { return securityDomain; diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/reattach/ReattachTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/reattach/ReattachTest.java index 3c91e7d970..15bc09157c 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/reattach/ReattachTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/reattach/ReattachTest.java @@ -16,6 +16,7 @@ */ package org.apache.activemq.artemis.tests.integration.cluster.reattach; +import java.util.Objects; import java.util.Timer; import java.util.TimerTask; import java.util.concurrent.CountDownLatch; @@ -41,6 +42,7 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionPro import org.apache.activemq.artemis.core.remoting.impl.invm.InVMConnector; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.api.core.RoutingType; +import org.apache.activemq.artemis.core.server.ServerSession; import org.apache.activemq.artemis.jms.client.ActiveMQTextMessage; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; @@ -122,6 +124,30 @@ public class ReattachTest extends ActiveMQTestBase { sf.close(); } + @Test + public void testReattachTransferConnectionOnSession() throws Exception { + final long retryInterval = 50; + final double retryMultiplier = 1d; + final int reconnectAttempts = 10; + + locator.setRetryInterval(retryInterval).setRetryIntervalMultiplier(retryMultiplier).setReconnectAttempts(reconnectAttempts).setConfirmationWindowSize(1024 * 1024); + ClientSessionFactoryInternal sf = (ClientSessionFactoryInternal) createSessionFactory(locator); + ClientSession session = sf.createSession(false, true, true); + + // there's only one session on the broker + Object originalConnectionID = ((ServerSession)server.getSessions().toArray()[0]).getConnectionID(); + + // trigger re-attach + ((ClientSessionInternal) session).getConnection().fail(new ActiveMQNotConnectedException()); + + session.start(); + + assertFalse(Objects.equals(((ServerSession)server.getSessions().toArray()[0]).getConnectionID(), originalConnectionID)); + + session.close(); + sf.close(); + } + /* * Test failure on connection, but server is still up so should immediately reconnect */