ARTEMIS-4283 Fail fast CORE client connect on closing

ServerLocatorImpl waits for topology after connecting a new session factory.
It should interrupt waiting for topology when it is closed to fail fast.
This commit is contained in:
Domenico Francesco Bruscino 2023-05-18 08:40:04 +02:00 committed by clebertsuconic
parent 1e8b5cbd72
commit c47d15c12a
4 changed files with 67 additions and 3 deletions

View File

@ -237,4 +237,7 @@ public interface ActiveMQClientMessageBundle {
@Message(id = 219067, value = "Keystore alias {} not found in {}") @Message(id = 219067, value = "Keystore alias {} not found in {}")
IllegalArgumentException keystoreAliasNotFound(String keystoreAlias, String keystorePath); IllegalArgumentException keystoreAliasNotFound(String keystoreAlias, String keystorePath);
@Message(id = 219068, value = "Connection closed while receiving cluster topology. Group:{}")
ActiveMQObjectClosedException connectionClosedOnReceiveTopology(DiscoveryGroup discoveryGroup);
} }

View File

@ -124,6 +124,8 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
private final double retryIntervalMultiplier; // For exponential backoff private final double retryIntervalMultiplier; // For exponential backoff
private volatile boolean topologyReady = false;
private final CountDownLatch latchFinalTopology = new CountDownLatch(1); private final CountDownLatch latchFinalTopology = new CountDownLatch(1);
private final long maxRetryInterval; private final long maxRetryInterval;
@ -474,6 +476,9 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
closeCleanSessions(close); closeCleanSessions(close);
closed = true; closed = true;
} }
//release all threads waiting for topology
latchFinalTopology.countDown();
} }
/** /**
@ -521,7 +526,9 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
@Override @Override
public boolean waitForTopology(long timeout, TimeUnit unit) { public boolean waitForTopology(long timeout, TimeUnit unit) {
try { try {
return latchFinalTopology.await(timeout, unit); //latchFinalTopology is decremented on last topology message or on close
//topologyReady is set to true only on last topology message
return latchFinalTopology.await(timeout, unit) && topologyReady;
} catch (InterruptedException e) { } catch (InterruptedException e) {
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
ActiveMQClientLogger.LOGGER.unableToReceiveClusterTopology(e); ActiveMQClientLogger.LOGGER.unableToReceiveClusterTopology(e);
@ -1502,6 +1509,7 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
serverLocator.notifyNodeUp(uniqueEventID, nodeID, backupGroupName, scaleDownGroupName, connectorPair, isLast); serverLocator.notifyNodeUp(uniqueEventID, nodeID, backupGroupName, scaleDownGroupName, connectorPair, isLast);
} finally { } finally {
if (isLast) { if (isLast) {
topologyReady = true;
latchFinalTopology.countDown(); latchFinalTopology.countDown();
} }
} }

View File

@ -697,6 +697,8 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
// We always try to connect here with only one attempt, // We always try to connect here with only one attempt,
// as we will perform the initial retry here, looking for all possible connectors // as we will perform the initial retry here, looking for all possible connectors
factory.connect(1, false); factory.connect(1, false);
addFactory(factory);
} finally { } finally {
removeFromConnecting(factory); removeFromConnecting(factory);
} }
@ -750,11 +752,16 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
// how the sendSubscription happens. // how the sendSubscription happens.
// in case this ever changes. // in case this ever changes.
if (topology != null && !factory.waitForTopology(config.callTimeout, TimeUnit.MILLISECONDS)) { if (topology != null && !factory.waitForTopology(config.callTimeout, TimeUnit.MILLISECONDS)) {
factoryClosed(factory);
factory.cleanup(); factory.cleanup();
throw ActiveMQClientMessageBundle.BUNDLE.connectionTimedOutOnReceiveTopology(discoveryGroup);
if (isClosed()) {
throw ActiveMQClientMessageBundle.BUNDLE.connectionClosedOnReceiveTopology(discoveryGroup);
} }
addFactory(factory); throw ActiveMQClientMessageBundle.BUNDLE.connectionTimedOutOnReceiveTopology(discoveryGroup);
}
return factory; return factory;
} }

View File

@ -19,18 +19,23 @@ package org.apache.activemq.artemis.tests.integration.client;
import java.net.URI; import java.net.URI;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQNotConnectedException; import org.apache.activemq.artemis.api.core.ActiveMQNotConnectedException;
import org.apache.activemq.artemis.api.core.ActiveMQObjectClosedException;
import org.apache.activemq.artemis.api.core.Interceptor;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient; import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory; import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator; import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal; import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal;
import org.apache.activemq.artemis.core.client.impl.ServerLocatorInternal; import org.apache.activemq.artemis.core.client.impl.ServerLocatorInternal;
import org.apache.activemq.artemis.core.config.Configuration; import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.uri.ServerLocatorParser; import org.apache.activemq.artemis.uri.ServerLocatorParser;
import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
@ -47,6 +52,47 @@ public class ServerLocatorConnectTest extends ActiveMQTestBase {
server.start(); server.start();
} }
@Test
public void testFailFastConnectOnClosing() throws Exception {
CountDownLatch connectLatch = new CountDownLatch(1);
CountDownLatch subscribeLatch = new CountDownLatch(1);
AtomicBoolean connectTimedOut = new AtomicBoolean(false);
ServerLocator locator = createNonHALocator(isNetty()).setCallTimeout(30000);
try (ClientSessionFactory csf = locator.createSessionFactory()) {
Assert.assertFalse(csf.isClosed());
}
server.getRemotingService().addIncomingInterceptor((Interceptor) (packet, connection) -> {
if (packet.getType() == PacketImpl.SUBSCRIBE_TOPOLOGY_V2) {
subscribeLatch.countDown();
return false;
}
return true;
});
new Thread(() -> {
try {
locator.createSessionFactory();
} catch (Exception e) {
connectTimedOut.set(e.getClass() == ActiveMQObjectClosedException.class);
}
connectLatch.countDown();
}).start();
//wait for locator subscribing
subscribeLatch.await();
//close locator while it is waiting for topology
locator.close();
//check connect fails fast
Assert.assertTrue(connectLatch.await(3000, TimeUnit.MILLISECONDS));
//check connect timed out
Assert.assertTrue(connectTimedOut.get());
}
@Test @Test
public void testURL() throws Exception { public void testURL() throws Exception {
ServerLocatorParser parser = new ServerLocatorParser(); ServerLocatorParser parser = new ServerLocatorParser();