ARTEMIS-2259 Client session not exist if reattach on new connection timeout

This commit is contained in:
yang wei 2019-02-22 20:25:19 +08:00 committed by Clebert Suconic
parent 201d76bbd7
commit c1dcd4bec9
4 changed files with 61 additions and 4 deletions

View File

@ -771,7 +771,11 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
((CoreRemotingConnection) connection).syncIDGeneratorSequence(((CoreRemotingConnection) oldConnection).getIDGeneratorSequence());
for (ClientSessionInternal session : sessionsToFailover) {
session.handleFailover(connection, cause);
if (!session.handleFailover(connection, cause)) {
connection.destroy();
this.connection = null;
return;
}
}
}

View File

@ -1350,10 +1350,12 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
// Needs to be synchronized to prevent issues with occurring concurrently with close()
@Override
public void handleFailover(final RemotingConnection backupConnection, ActiveMQException cause) {
public boolean handleFailover(final RemotingConnection backupConnection, ActiveMQException cause) {
boolean suc = true;
synchronized (this) {
if (closed) {
return;
return true;
}
boolean resetCreditManager = false;
@ -1426,6 +1428,7 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
}
} catch (Throwable t) {
ActiveMQClientLogger.LOGGER.failedToHandleFailover(t);
suc = false;
} finally {
sessionContext.releaseCommunications();
}
@ -1448,6 +1451,8 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
sessionContext.resetMetadata(metaDataToSend);
return suc;
}
@Override

View File

@ -66,7 +66,7 @@ public interface ClientSessionInternal extends ClientSession {
void preHandleFailover(RemotingConnection connection);
void handleFailover(RemotingConnection backupConnection, ActiveMQException cause);
boolean handleFailover(RemotingConnection backupConnection, ActiveMQException cause);
RemotingConnection getConnection();

View File

@ -24,6 +24,9 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQNotConnectedException;
import org.apache.activemq.artemis.api.core.Interceptor;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.FailoverEventListener;
import org.apache.activemq.artemis.api.core.client.FailoverEventType;
@ -31,8 +34,11 @@ import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.api.core.client.SessionFailureListener;
import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal;
import org.apache.activemq.artemis.core.client.impl.ClientSessionInternal;
import org.apache.activemq.artemis.core.protocol.core.Packet;
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ServerSession;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.junit.Assert;
import org.junit.Test;
@ -318,6 +324,48 @@ public class ReconnectTest extends ActiveMQTestBase {
}
@Test
public void testReattachTimeout() throws Exception {
ActiveMQServer server = createServer(true, true);
server.start();
// imitate session reattach timeout
Interceptor reattachInterceptor = new Interceptor() {
@Override
public boolean intercept(Packet packet, RemotingConnection connection) throws ActiveMQException {
if (packet.getType() == PacketImpl.REATTACH_SESSION) {
return false;
} else {
return true;
}
}
};
server.getRemotingService().addIncomingInterceptor(reattachInterceptor);
final long retryInterval = 50;
final double retryMultiplier = 1d;
final int reconnectAttempts = 10;
ServerLocator locator = createFactory(true).setCallTimeout(2000).setRetryInterval(retryInterval).setRetryIntervalMultiplier(retryMultiplier).setReconnectAttempts(reconnectAttempts).setConfirmationWindowSize(-1);
ClientSessionFactoryInternal sf = (ClientSessionFactoryInternal) createSessionFactory(locator);
final CountDownLatch latch = new CountDownLatch(1);
sf.addFailoverListener(eventType -> {
if (eventType == FailoverEventType.FAILOVER_FAILED) {
latch.countDown();
}
});
ClientSession session = sf.createSession(false, true, true);
RemotingConnection conn = ((ClientSessionInternal) session).getConnection();
conn.fail(new ActiveMQNotConnectedException());
assertTrue(latch.await(1000, TimeUnit.MILLISECONDS));
assertTrue(session.isClosed());
session.close();
sf.close();
server.stop();
}
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------