This commit is contained in:
Justin Bertram 2021-03-17 13:36:09 -05:00
commit 321802f525
4 changed files with 36 additions and 1 deletions

View File

@ -1018,6 +1018,8 @@ public class ServerSessionPacketHandler implements ChannelHandler {
newConnection.syncIDGeneratorSequence(remotingConnection.getIDGeneratorSequence());
session.transferConnection(newConnection);
Connection oldTransportConnection = remotingConnection.getTransportConnection();
remotingConnection = newConnection;

View File

@ -58,6 +58,8 @@ public interface ServerSession extends SecurityAuth {
@Override
RemotingConnection getRemotingConnection();
void transferConnection(RemotingConnection newConnection);
Transaction newTransaction();
boolean removeConsumer(long consumerID) throws Exception;

View File

@ -136,7 +136,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
protected final boolean strictUpdateDeliveryCount;
protected final RemotingConnection remotingConnection;
protected RemotingConnection remotingConnection;
protected final Map<Long, ServerConsumer> consumers = new ConcurrentHashMap<>();
@ -1073,6 +1073,11 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
return remotingConnection;
}
@Override
public void transferConnection(RemotingConnection newConnection) {
remotingConnection = newConnection;
}
@Override
public String getSecurityDomain() {
return securityDomain;

View File

@ -16,6 +16,7 @@
*/
package org.apache.activemq.artemis.tests.integration.cluster.reattach;
import java.util.Objects;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.CountDownLatch;
@ -41,6 +42,7 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionPro
import org.apache.activemq.artemis.core.remoting.impl.invm.InVMConnector;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.core.server.ServerSession;
import org.apache.activemq.artemis.jms.client.ActiveMQTextMessage;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
@ -122,6 +124,30 @@ public class ReattachTest extends ActiveMQTestBase {
sf.close();
}
@Test
public void testReattachTransferConnectionOnSession() throws Exception {
final long retryInterval = 50;
final double retryMultiplier = 1d;
final int reconnectAttempts = 10;
locator.setRetryInterval(retryInterval).setRetryIntervalMultiplier(retryMultiplier).setReconnectAttempts(reconnectAttempts).setConfirmationWindowSize(1024 * 1024);
ClientSessionFactoryInternal sf = (ClientSessionFactoryInternal) createSessionFactory(locator);
ClientSession session = sf.createSession(false, true, true);
// there's only one session on the broker
Object originalConnectionID = ((ServerSession)server.getSessions().toArray()[0]).getConnectionID();
// trigger re-attach
((ClientSessionInternal) session).getConnection().fail(new ActiveMQNotConnectedException());
session.start();
assertFalse(Objects.equals(((ServerSession)server.getSessions().toArray()[0]).getConnectionID(), originalConnectionID));
session.close();
sf.close();
}
/*
* Test failure on connection, but server is still up so should immediately reconnect
*/