Handle uninitialized cache in Node role watchers (#15726)

BaseNodeRoleWatcher counts down cacheInitialized after a timeout, but also sets some flag that it was a timed-out initialization. and call nodeViewInitializationTimedOut (new method on listeners) instead of nodeViewInitialized. Then listeners can do what is most appropriate with this information.
This commit is contained in:
AmatyaAvadhanula 2024-03-06 16:00:24 +05:30 committed by GitHub
parent cf9bc507f6
commit c2841425f4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 346 additions and 50 deletions

View File

@ -43,6 +43,7 @@ import java.net.SocketTimeoutException;
import java.util.Collection;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BooleanSupplier;
@ -57,7 +58,7 @@ public class K8sDruidNodeDiscoveryProvider extends DruidNodeDiscoveryProvider
private final K8sApiClient k8sApiClient;
private ExecutorService listenerExecutor;
private ScheduledExecutorService listenerExecutor;
private final ConcurrentHashMap<NodeRole, NodeRoleWatcher> nodeTypeWatchers = new ConcurrentHashMap<>();
@ -145,7 +146,7 @@ public class K8sDruidNodeDiscoveryProvider extends DruidNodeDiscoveryProvider
// This is single-threaded to ensure that all listener calls are executed precisely in the oder of add/remove
// event occurences.
listenerExecutor = Execs.singleThreaded("K8sDruidNodeDiscoveryProvider-ListenerExecutor");
listenerExecutor = Execs.scheduledSingleThreaded("K8sDruidNodeDiscoveryProvider-ListenerExecutor");
LOGGER.info("started");
@ -196,7 +197,7 @@ public class K8sDruidNodeDiscoveryProvider extends DruidNodeDiscoveryProvider
private final long watcherErrorRetryWaitMS;
NodeRoleWatcher(
ExecutorService listenerExecutor,
ScheduledExecutorService listenerExecutor,
NodeRole nodeRole,
PodInfo podInfo,
K8sDiscoveryConfig discoveryConfig,

View File

@ -101,6 +101,12 @@ public class K8sAnnouncerAndDiscoveryIntTest
{
nodeViewInitialized.countDown();
}
@Override
public void nodeViewInitializedTimedOut()
{
nodeViewInitialized();
}
}
);

View File

@ -314,6 +314,12 @@ public class K8sDruidNodeDiscoveryProviderTest
assertNextEvent(Event.inited());
}
@Override
public void nodeViewInitializedTimedOut()
{
nodeViewInitialized();
}
@Override
public void nodesAdded(Collection<DiscoveryDruidNode> nodes)
{

View File

@ -542,6 +542,12 @@ public class HttpRemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
//CountDownLatch.countDown() does nothing when count has already reached 0.
workerViewInitialized.countDown();
}
@Override
public void nodeViewInitializedTimedOut()
{
nodeViewInitialized();
}
};
druidNodeDiscovery.registerListener(nodeDiscoveryListener);

View File

@ -150,6 +150,55 @@ public class HttpRemoteTaskRunnerTest
Assert.assertEquals(0, taskRunner.getUsedCapacity());
}
@Test(timeout = 60_000L)
public void testFreshStart_nodeDiscoveryTimedOut() throws Exception
{
TestDruidNodeDiscovery druidNodeDiscovery = new TestDruidNodeDiscovery(true);
DruidNodeDiscoveryProvider druidNodeDiscoveryProvider = EasyMock.createMock(DruidNodeDiscoveryProvider.class);
EasyMock.expect(druidNodeDiscoveryProvider.getForService(WorkerNodeService.DISCOVERY_SERVICE_KEY))
.andReturn(druidNodeDiscovery);
EasyMock.replay(druidNodeDiscoveryProvider);
HttpRemoteTaskRunner taskRunner = newHttpTaskRunnerInstance(
druidNodeDiscoveryProvider,
new NoopProvisioningStrategy<>());
taskRunner.start();
DiscoveryDruidNode druidNode1 = new DiscoveryDruidNode(
new DruidNode("service", "host1", false, 8080, null, true, false),
NodeRole.MIDDLE_MANAGER,
ImmutableMap.of(
WorkerNodeService.DISCOVERY_SERVICE_KEY, new WorkerNodeService("ip1", 2, "0", WorkerConfig.DEFAULT_CATEGORY)
)
);
DiscoveryDruidNode druidNode2 = new DiscoveryDruidNode(
new DruidNode("service", "host2", false, 8080, null, true, false),
NodeRole.MIDDLE_MANAGER,
ImmutableMap.of(
WorkerNodeService.DISCOVERY_SERVICE_KEY, new WorkerNodeService("ip2", 2, "0", WorkerConfig.DEFAULT_CATEGORY)
)
);
druidNodeDiscovery.getListeners().get(0).nodesAdded(ImmutableList.of(druidNode1, druidNode2));
int numTasks = 8;
List<Future<TaskStatus>> futures = new ArrayList<>();
for (int i = 0; i < numTasks; i++) {
futures.add(taskRunner.run(NoopTask.create()));
}
for (Future<TaskStatus> future : futures) {
Assert.assertTrue(future.get().isSuccess());
}
Assert.assertEquals(numTasks, taskRunner.getKnownTasks().size());
Assert.assertEquals(numTasks, taskRunner.getCompletedTasks().size());
Assert.assertEquals(4, taskRunner.getTotalCapacity());
Assert.assertEquals(0, taskRunner.getUsedCapacity());
}
/*
Simulates startup of Overlord. Overlord is then stopped and is expected to close down certain things.
*/
@ -1986,11 +2035,19 @@ public class HttpRemoteTaskRunnerTest
public static class TestDruidNodeDiscovery implements DruidNodeDiscovery
{
private final boolean timedOut;
private List<Listener> listeners;
public TestDruidNodeDiscovery()
{
this(false);
}
public TestDruidNodeDiscovery(boolean timedOut)
{
listeners = new ArrayList<>();
this.timedOut = timedOut;
}
@Override
@ -2003,7 +2060,11 @@ public class HttpRemoteTaskRunnerTest
public void registerListener(Listener listener)
{
listener.nodesAdded(ImmutableList.of());
listener.nodeViewInitialized();
if (timedOut) {
listener.nodeViewInitializedTimedOut();
} else {
listener.nodeViewInitialized();
}
listeners.add(listener);
}

View File

@ -187,6 +187,12 @@ public class HttpServerInventoryView implements ServerInventoryView, FilteredSer
}
}
@Override
public void nodeViewInitializedTimedOut()
{
nodeViewInitialized();
}
private DruidServer toDruidServer(DiscoveryDruidNode node)
{
final DruidNode druidNode = node.getDruidNode();

View File

@ -56,6 +56,7 @@ import java.util.Collection;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.BooleanSupplier;
/**
@ -70,7 +71,7 @@ public class CuratorDruidNodeDiscoveryProvider extends DruidNodeDiscoveryProvide
private final ZkPathsConfig config;
private final ObjectMapper jsonMapper;
private ExecutorService listenerExecutor;
private ScheduledExecutorService listenerExecutor;
private final ConcurrentHashMap<NodeRole, NodeRoleWatcher> nodeRoleWatchers = new ConcurrentHashMap<>();
private final ConcurrentLinkedQueue<NodeDiscoverer> nodeDiscoverers = new ConcurrentLinkedQueue<>();
@ -131,7 +132,7 @@ public class CuratorDruidNodeDiscoveryProvider extends DruidNodeDiscoveryProvide
try {
// This is single-threaded to ensure that all listener calls are executed precisely in the order of add/remove
// event occurrences.
listenerExecutor = Execs.singleThreaded("CuratorDruidNodeDiscoveryProvider-ListenerExecutor");
listenerExecutor = Execs.scheduledSingleThreaded("CuratorDruidNodeDiscoveryProvider-ListenerExecutor");
log.debug("Started.");
@ -174,7 +175,7 @@ public class CuratorDruidNodeDiscoveryProvider extends DruidNodeDiscoveryProvide
private final Object lock = new Object();
NodeRoleWatcher(
ExecutorService listenerExecutor,
ScheduledExecutorService listenerExecutor,
CuratorFramework curatorFramework,
String basePath,
ObjectMapper jsonMapper,

View File

@ -32,7 +32,7 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
/**
@ -50,6 +50,7 @@ import java.util.concurrent.TimeUnit;
public class BaseNodeRoleWatcher
{
private static final Logger LOGGER = new Logger(BaseNodeRoleWatcher.class);
private static final long DEFAULT_TIMEOUT_SECONDS = 30L;
private final NodeRole nodeRole;
@ -59,38 +60,50 @@ public class BaseNodeRoleWatcher
private final ConcurrentMap<String, DiscoveryDruidNode> nodes = new ConcurrentHashMap<>();
private final Collection<DiscoveryDruidNode> unmodifiableNodes = Collections.unmodifiableCollection(nodes.values());
private final ExecutorService listenerExecutor;
private final ScheduledExecutorService listenerExecutor;
private final List<DruidNodeDiscovery.Listener> nodeListeners = new ArrayList<>();
private final Object lock = new Object();
// Always countdown under lock
private final CountDownLatch cacheInitialized = new CountDownLatch(1);
private volatile boolean cacheInitializationTimedOut = false;
public BaseNodeRoleWatcher(
ExecutorService listenerExecutor,
ScheduledExecutorService listenerExecutor,
NodeRole nodeRole
)
{
this.listenerExecutor = listenerExecutor;
this(listenerExecutor, nodeRole, DEFAULT_TIMEOUT_SECONDS);
}
BaseNodeRoleWatcher(
ScheduledExecutorService listenerExecutor,
NodeRole nodeRole,
long timeout
)
{
this.nodeRole = nodeRole;
this.listenerExecutor = listenerExecutor;
this.listenerExecutor.schedule(
this::cacheInitializedTimedOut,
timeout,
TimeUnit.SECONDS
);
}
public Collection<DiscoveryDruidNode> getAllNodes()
{
boolean nodeViewInitialized;
try {
nodeViewInitialized = cacheInitialized.await((long) 30, TimeUnit.SECONDS);
cacheInitialized.await();
}
catch (InterruptedException ex) {
Thread.currentThread().interrupt();
nodeViewInitialized = false;
}
if (!nodeViewInitialized) {
LOGGER.info(
"Cache for node role [%s] not initialized yet; getAllNodes() might not return full information.",
nodeRole.getJsonName()
);
if (unmodifiableNodes.isEmpty()) {
LOGGER.warn("Watcher for node role [%s] returned an empty collection.", nodeRole.getJsonName());
}
return unmodifiableNodes;
}
@ -106,7 +119,11 @@ public class BaseNodeRoleWatcher
safeSchedule(
() -> {
listener.nodesAdded(currNodes);
listener.nodeViewInitialized();
if (cacheInitializationTimedOut) {
listener.nodeViewInitializedTimedOut();
} else {
listener.nodeViewInitialized();
}
},
"Exception occurred in nodesAdded([%s]) in listener [%s].", currNodes, listener
);
@ -215,34 +232,79 @@ public class BaseNodeRoleWatcher
// No need to wait on CountDownLatch, because we are holding the lock under which it could only be
// counted down.
if (cacheInitialized.getCount() == 0) {
LOGGER.error("cache is already initialized. ignoring cache initialization event.");
if (cacheInitializationTimedOut) {
LOGGER.warn(
"Cache initialization for node role[%s] has already timed out. Ignoring cache initialization event.",
nodeRole.getJsonName()
);
} else {
LOGGER.error(
"Cache for node role[%s] is already initialized. ignoring cache initialization event.",
nodeRole.getJsonName()
);
}
return;
}
// It is important to take a snapshot here as list of nodes might change by the time listeners process
// the changes.
List<DiscoveryDruidNode> currNodes = Lists.newArrayList(nodes.values());
LOGGER.info(
"Node watcher of role [%s] is now initialized with %d nodes.",
nodeRole.getJsonName(),
currNodes.size());
cacheInitialized(false);
}
}
for (DruidNodeDiscovery.Listener listener : nodeListeners) {
safeSchedule(
() -> {
listener.nodesAdded(currNodes);
listener.nodeViewInitialized();
},
"Exception occurred in nodesAdded([%s]) in listener [%s].",
currNodes,
listener
);
private void cacheInitializedTimedOut()
{
synchronized (lock) {
// No need to wait on CountDownLatch, because we are holding the lock under which it could only be
// counted down.
if (cacheInitialized.getCount() == 0) {
LOGGER.warn("Cache for node watcher of role[%s] is already initialized. ignoring timeout.", nodeRole.getJsonName());
return;
}
cacheInitialized.countDown();
cacheInitialized(true);
}
}
// This method is called only once with either timedOut = true or false, but not both.
@GuardedBy("lock")
private void cacheInitialized(boolean timedOut)
{
if (timedOut) {
LOGGER.warn(
"Cache for node role [%s] could not be initialized before timeout. "
+ "This service may not have full information about other nodes of type [%s].",
nodeRole.getJsonName(),
nodeRole.getJsonName()
);
cacheInitializationTimedOut = true;
}
// It is important to take a snapshot here as list of nodes might change by the time listeners process
// the changes.
List<DiscoveryDruidNode> currNodes = Lists.newArrayList(nodes.values());
LOGGER.info(
"Node watcher of role [%s] is now initialized with %d nodes.",
nodeRole.getJsonName(),
currNodes.size());
for (DruidNodeDiscovery.Listener listener : nodeListeners) {
safeSchedule(
() -> {
listener.nodesAdded(currNodes);
if (timedOut) {
listener.nodeViewInitializedTimedOut();
} else {
listener.nodeViewInitialized();
}
},
"Exception occurred in nodesAdded([%s]) in listener [%s].",
currNodes,
listener
);
}
cacheInitialized.countDown();
}
public void resetNodes(Map<String, DiscoveryDruidNode> fullNodes)
{
synchronized (lock) {

View File

@ -54,5 +54,13 @@ public interface DruidNodeDiscovery
{
// do nothing
}
/**
* Called once when the underlying cache in the DruidNodeDiscovery implementation has timed out trying to initialize.
*/
default void nodeViewInitializedTimedOut()
{
// do nothing
}
}
}

View File

@ -216,20 +216,35 @@ public abstract class DruidNodeDiscoveryProvider
@Override
public void nodeViewInitialized()
{
nodeViewInitialized(false);
}
@Override
public void nodeViewInitializedTimedOut()
{
nodeViewInitialized(true);
}
private void nodeViewInitialized(final boolean timedOut)
{
synchronized (lock) {
if (uninitializedNodeRoles == 0) {
log.error("Unexpected call of nodeViewInitialized()");
log.error("Unexpected call of nodeViewInitialized(timedOut = %s)", timedOut);
return;
}
uninitializedNodeRoles--;
if (uninitializedNodeRoles == 0) {
for (Listener listener : listeners) {
try {
listener.nodeViewInitialized();
if (timedOut) {
listener.nodeViewInitializedTimedOut();
} else {
listener.nodeViewInitialized();
}
}
catch (Exception ex) {
log.error(ex, "Listener[%s].nodeViewInitialized() threw exception. Ignored.", listener);
log.error(ex, "Listener[%s].nodeViewInitialized(%s) threw exception. Ignored.", listener, timedOut);
}
}
}

View File

@ -157,5 +157,11 @@ public class DiscoveryServiceLocator implements ServiceLocator
}
}
}
@Override
public void nodeViewInitializedTimedOut()
{
nodeViewInitialized();
}
}
}

View File

@ -19,11 +19,15 @@
package org.apache.druid.discovery;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.Futures;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.server.DruidNode;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import java.util.ArrayList;
@ -32,17 +36,33 @@ import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
public class BaseNodeRoleWatcherTest
{
private static ScheduledExecutorService exec;
@BeforeClass
public static void setup()
{
exec = createScheduledSingleThreadedExecutor();
}
@AfterClass
public static void teardown()
{
exec.shutdown();
}
@Test(timeout = 60_000L)
public void testGeneralUseSimulation()
{
BaseNodeRoleWatcher nodeRoleWatcher = new BaseNodeRoleWatcher(
Execs.directExecutor(),
NodeRole.BROKER
);
BaseNodeRoleWatcher nodeRoleWatcher = new BaseNodeRoleWatcher(exec, NodeRole.BROKER);
DiscoveryDruidNode broker1 = buildDiscoveryDruidNode(NodeRole.BROKER, "broker1");
DiscoveryDruidNode broker2 = buildDiscoveryDruidNode(NodeRole.BROKER, "broker2");
@ -116,6 +136,69 @@ public class BaseNodeRoleWatcherTest
assertListener(listener3, true, nodesAdded, nodesRemoved);
}
@Test(timeout = 60_000L)
public void testRegisterListenerBeforeTimeout() throws InterruptedException
{
BaseNodeRoleWatcher nodeRoleWatcher = new BaseNodeRoleWatcher(exec, NodeRole.BROKER, 1);
TestListener listener1 = new TestListener();
nodeRoleWatcher.registerListener(listener1);
DiscoveryDruidNode broker1 = buildDiscoveryDruidNode(NodeRole.BROKER, "broker1");
DiscoveryDruidNode broker2 = buildDiscoveryDruidNode(NodeRole.BROKER, "broker2");
DiscoveryDruidNode broker3 = buildDiscoveryDruidNode(NodeRole.BROKER, "broker3");
DiscoveryDruidNode notBroker = new DiscoveryDruidNode(
new DruidNode("s3", "h3", false, 8080, null, true, false),
NodeRole.COORDINATOR,
ImmutableMap.of()
);
nodeRoleWatcher.childAdded(broker1);
nodeRoleWatcher.childAdded(notBroker);
nodeRoleWatcher.childAdded(broker3);
nodeRoleWatcher.childRemoved(broker2);
assertListener(listener1, false, Collections.emptyList(), Collections.emptyList());
Assert.assertTrue(listener1.ready.await(1500, TimeUnit.MILLISECONDS));
Assert.assertTrue(listener1.nodeViewInitializationTimedOut.get());
assertListener(listener1, true, ImmutableList.of(broker1, broker3), ImmutableList.of());
}
@Test(timeout = 60_000L)
public void testGetAllNodesBeforeTimeout()
{
BaseNodeRoleWatcher nodeRoleWatcher = new BaseNodeRoleWatcher(exec, NodeRole.BROKER, 1);
TestListener listener1 = new TestListener();
nodeRoleWatcher.registerListener(listener1);
DiscoveryDruidNode broker1 = buildDiscoveryDruidNode(NodeRole.BROKER, "broker1");
DiscoveryDruidNode broker2 = buildDiscoveryDruidNode(NodeRole.BROKER, "broker2");
DiscoveryDruidNode broker3 = buildDiscoveryDruidNode(NodeRole.BROKER, "broker3");
DiscoveryDruidNode notBroker = new DiscoveryDruidNode(
new DruidNode("s3", "h3", false, 8080, null, true, false),
NodeRole.COORDINATOR,
ImmutableMap.of()
);
nodeRoleWatcher.childAdded(broker1);
nodeRoleWatcher.childAdded(broker2);
nodeRoleWatcher.childAdded(notBroker);
nodeRoleWatcher.childAdded(broker3);
nodeRoleWatcher.childRemoved(broker2);
assertListener(listener1, false, Collections.emptyList(), Collections.emptyList());
Assert.assertEquals(2, nodeRoleWatcher.getAllNodes().size());
Assert.assertTrue(listener1.nodeViewInitializationTimedOut.get());
assertListener(listener1, true, ImmutableList.of(broker1, broker3), ImmutableList.of());
}
private DiscoveryDruidNode buildDiscoveryDruidNode(NodeRole role, String host)
{
return new DiscoveryDruidNode(
@ -125,16 +208,39 @@ public class BaseNodeRoleWatcherTest
);
}
private void assertListener(TestListener listener, boolean nodeViewInitialized, List<DiscoveryDruidNode> nodesAdded, List<DiscoveryDruidNode> nodesRemoved)
private void assertListener(
TestListener listener,
boolean ready,
List<DiscoveryDruidNode> nodesAdded,
List<DiscoveryDruidNode> nodesRemoved
)
{
Assert.assertEquals(nodeViewInitialized, listener.nodeViewInitialized.get());
final int count = ready ? 0 : 1;
Assert.assertEquals(count, listener.ready.getCount());
Assert.assertEquals(nodesAdded, listener.nodesAddedList);
Assert.assertEquals(nodesRemoved, listener.nodesRemovedList);
}
private static ScheduledExecutorService createScheduledSingleThreadedExecutor()
{
return new ScheduledThreadPoolExecutor(
1,
Execs.makeThreadFactory("BaseNodeRoleWatcher")
)
{
@Override
public Future<?> submit(Runnable task)
{
task.run();
return Futures.immediateFuture(null);
}
};
}
public static class TestListener implements DruidNodeDiscovery.Listener
{
private final AtomicBoolean nodeViewInitialized = new AtomicBoolean(false);
private final CountDownLatch ready = new CountDownLatch(1);
private final AtomicBoolean nodeViewInitializationTimedOut = new AtomicBoolean(false);
private final List<DiscoveryDruidNode> nodesAddedList = new ArrayList<>();
private final List<DiscoveryDruidNode> nodesRemovedList = new ArrayList<>();
@ -153,9 +259,21 @@ public class BaseNodeRoleWatcherTest
@Override
public void nodeViewInitialized()
{
if (!nodeViewInitialized.compareAndSet(false, true)) {
if (ready.getCount() == 0) {
throw new RuntimeException("NodeViewInitialized called again!");
}
ready.countDown();
}
@Override
public void nodeViewInitializedTimedOut()
{
if (!nodeViewInitializationTimedOut.compareAndSet(false, true)) {
throw new RuntimeException("NodeViewInitializedTimedOut called again!");
} else if (ready.getCount() == 0) {
throw new RuntimeException("NodeViewInitialized was already called!");
}
ready.countDown();
}
}
}