This commit is contained in:
Clebert Suconic 2019-03-12 15:54:56 -04:00
commit 1c637c1a2e
4 changed files with 61 additions and 4 deletions

View File

@ -771,7 +771,11 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
((CoreRemotingConnection) connection).syncIDGeneratorSequence(((CoreRemotingConnection) oldConnection).getIDGeneratorSequence()); ((CoreRemotingConnection) connection).syncIDGeneratorSequence(((CoreRemotingConnection) oldConnection).getIDGeneratorSequence());
for (ClientSessionInternal session : sessionsToFailover) { for (ClientSessionInternal session : sessionsToFailover) {
session.handleFailover(connection, cause); if (!session.handleFailover(connection, cause)) {
connection.destroy();
this.connection = null;
return;
}
} }
} }

View File

@ -1350,10 +1350,12 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
// Needs to be synchronized to prevent issues with occurring concurrently with close() // Needs to be synchronized to prevent issues with occurring concurrently with close()
@Override @Override
public void handleFailover(final RemotingConnection backupConnection, ActiveMQException cause) { public boolean handleFailover(final RemotingConnection backupConnection, ActiveMQException cause) {
boolean suc = true;
synchronized (this) { synchronized (this) {
if (closed) { if (closed) {
return; return true;
} }
boolean resetCreditManager = false; boolean resetCreditManager = false;
@ -1426,6 +1428,7 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
} }
} catch (Throwable t) { } catch (Throwable t) {
ActiveMQClientLogger.LOGGER.failedToHandleFailover(t); ActiveMQClientLogger.LOGGER.failedToHandleFailover(t);
suc = false;
} finally { } finally {
sessionContext.releaseCommunications(); sessionContext.releaseCommunications();
} }
@ -1448,6 +1451,8 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
sessionContext.resetMetadata(metaDataToSend); sessionContext.resetMetadata(metaDataToSend);
return suc;
} }
@Override @Override

View File

@ -66,7 +66,7 @@ public interface ClientSessionInternal extends ClientSession {
void preHandleFailover(RemotingConnection connection); void preHandleFailover(RemotingConnection connection);
void handleFailover(RemotingConnection backupConnection, ActiveMQException cause); boolean handleFailover(RemotingConnection backupConnection, ActiveMQException cause);
RemotingConnection getConnection(); RemotingConnection getConnection();

View File

@ -24,6 +24,9 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQNotConnectedException;
import org.apache.activemq.artemis.api.core.Interceptor;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory; import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.FailoverEventListener; import org.apache.activemq.artemis.api.core.client.FailoverEventListener;
import org.apache.activemq.artemis.api.core.client.FailoverEventType; import org.apache.activemq.artemis.api.core.client.FailoverEventType;
@ -31,8 +34,11 @@ import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.api.core.client.SessionFailureListener; import org.apache.activemq.artemis.api.core.client.SessionFailureListener;
import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal; import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal;
import org.apache.activemq.artemis.core.client.impl.ClientSessionInternal; import org.apache.activemq.artemis.core.client.impl.ClientSessionInternal;
import org.apache.activemq.artemis.core.protocol.core.Packet;
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ServerSession; import org.apache.activemq.artemis.core.server.ServerSession;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
@ -318,6 +324,48 @@ public class ReconnectTest extends ActiveMQTestBase {
} }
@Test
public void testReattachTimeout() throws Exception {
ActiveMQServer server = createServer(true, true);
server.start();
// imitate session reattach timeout
Interceptor reattachInterceptor = new Interceptor() {
@Override
public boolean intercept(Packet packet, RemotingConnection connection) throws ActiveMQException {
if (packet.getType() == PacketImpl.REATTACH_SESSION) {
return false;
} else {
return true;
}
}
};
server.getRemotingService().addIncomingInterceptor(reattachInterceptor);
final long retryInterval = 50;
final double retryMultiplier = 1d;
final int reconnectAttempts = 10;
ServerLocator locator = createFactory(true).setCallTimeout(2000).setRetryInterval(retryInterval).setRetryIntervalMultiplier(retryMultiplier).setReconnectAttempts(reconnectAttempts).setConfirmationWindowSize(-1);
ClientSessionFactoryInternal sf = (ClientSessionFactoryInternal) createSessionFactory(locator);
final CountDownLatch latch = new CountDownLatch(1);
sf.addFailoverListener(eventType -> {
if (eventType == FailoverEventType.FAILOVER_FAILED) {
latch.countDown();
}
});
ClientSession session = sf.createSession(false, true, true);
RemotingConnection conn = ((ClientSessionInternal) session).getConnection();
conn.fail(new ActiveMQNotConnectedException());
assertTrue(latch.await(1000, TimeUnit.MILLISECONDS));
assertTrue(session.isClosed());
session.close();
sf.close();
server.stop();
}
// Package protected --------------------------------------------- // Package protected ---------------------------------------------
// Protected ----------------------------------------------------- // Protected -----------------------------------------------------