diff --git a/extensions-core/kubernetes-extensions/src/main/java/org/apache/druid/k8s/discovery/K8sDruidNodeDiscoveryProvider.java b/extensions-core/kubernetes-extensions/src/main/java/org/apache/druid/k8s/discovery/K8sDruidNodeDiscoveryProvider.java index 3cdafe0952c..559b53b6080 100644 --- a/extensions-core/kubernetes-extensions/src/main/java/org/apache/druid/k8s/discovery/K8sDruidNodeDiscoveryProvider.java +++ b/extensions-core/kubernetes-extensions/src/main/java/org/apache/druid/k8s/discovery/K8sDruidNodeDiscoveryProvider.java @@ -210,7 +210,7 @@ public class K8sDruidNodeDiscoveryProvider extends DruidNodeDiscoveryProvider this.k8sApiClient = k8sApiClient; this.nodeRole = nodeRole; - this.baseNodeRoleWatcher = new BaseNodeRoleWatcher(listenerExecutor, nodeRole); + this.baseNodeRoleWatcher = BaseNodeRoleWatcher.create(listenerExecutor, nodeRole); this.watcherErrorRetryWaitMS = watcherErrorRetryWaitMS; } diff --git a/server/src/main/java/org/apache/druid/curator/discovery/CuratorDruidNodeDiscoveryProvider.java b/server/src/main/java/org/apache/druid/curator/discovery/CuratorDruidNodeDiscoveryProvider.java index 7000246dfb4..1a3f8be9cb0 100644 --- a/server/src/main/java/org/apache/druid/curator/discovery/CuratorDruidNodeDiscoveryProvider.java +++ b/server/src/main/java/org/apache/druid/curator/discovery/CuratorDruidNodeDiscoveryProvider.java @@ -185,7 +185,7 @@ public class CuratorDruidNodeDiscoveryProvider extends DruidNodeDiscoveryProvide this.curatorFramework = curatorFramework; this.nodeRole = nodeRole; this.jsonMapper = jsonMapper; - this.baseNodeRoleWatcher = new BaseNodeRoleWatcher(listenerExecutor, nodeRole); + this.baseNodeRoleWatcher = BaseNodeRoleWatcher.create(listenerExecutor, nodeRole); // This is required to be single threaded from docs in PathChildrenCache. this.cacheExecutor = Execs.singleThreaded( diff --git a/server/src/main/java/org/apache/druid/discovery/BaseNodeRoleWatcher.java b/server/src/main/java/org/apache/druid/discovery/BaseNodeRoleWatcher.java index 5de0a14b6d5..0484e229a26 100644 --- a/server/src/main/java/org/apache/druid/discovery/BaseNodeRoleWatcher.java +++ b/server/src/main/java/org/apache/druid/discovery/BaseNodeRoleWatcher.java @@ -75,29 +75,25 @@ public class BaseNodeRoleWatcher ScheduledExecutorService listenerExecutor, NodeRole nodeRole ) - { - this(listenerExecutor, nodeRole, DEFAULT_TIMEOUT_SECONDS); - } - - BaseNodeRoleWatcher( - ScheduledExecutorService listenerExecutor, - NodeRole nodeRole, - long timeout - ) { this.nodeRole = nodeRole; this.listenerExecutor = listenerExecutor; - this.listenerExecutor.schedule( - this::cacheInitializedTimedOut, - timeout, - TimeUnit.SECONDS - ); + } + + public static BaseNodeRoleWatcher create( + ScheduledExecutorService listenerExecutor, + NodeRole nodeRole + ) + { + BaseNodeRoleWatcher nodeRoleWatcher = new BaseNodeRoleWatcher(listenerExecutor, nodeRole); + nodeRoleWatcher.scheduleTimeout(DEFAULT_TIMEOUT_SECONDS); + return nodeRoleWatcher; } public Collection getAllNodes() { try { - cacheInitialized.await(); + awaitInitialization(); } catch (InterruptedException ex) { Thread.currentThread().interrupt(); @@ -255,12 +251,9 @@ public class BaseNodeRoleWatcher synchronized (lock) { // No need to wait on CountDownLatch, because we are holding the lock under which it could only be // counted down. - if (cacheInitialized.getCount() == 0) { - LOGGER.warn("Cache for node watcher of role[%s] is already initialized. ignoring timeout.", nodeRole.getJsonName()); - return; + if (cacheInitialized.getCount() != 0) { + cacheInitialized(true); } - - cacheInitialized(true); } } @@ -352,6 +345,20 @@ public class BaseNodeRoleWatcher } } + void scheduleTimeout(long timeout) + { + listenerExecutor.schedule( + this::cacheInitializedTimedOut, + timeout, + TimeUnit.SECONDS + ); + } + + void awaitInitialization() throws InterruptedException + { + cacheInitialized.await(); + } + private void safeSchedule(Runnable runnable, String errMsgFormat, Object... args) { listenerExecutor.submit(() -> { diff --git a/server/src/test/java/org/apache/druid/discovery/BaseNodeRoleWatcherTest.java b/server/src/test/java/org/apache/druid/discovery/BaseNodeRoleWatcherTest.java index f4fc454968d..fa3cd0f4483 100644 --- a/server/src/test/java/org/apache/druid/discovery/BaseNodeRoleWatcherTest.java +++ b/server/src/test/java/org/apache/druid/discovery/BaseNodeRoleWatcherTest.java @@ -40,7 +40,6 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; public class BaseNodeRoleWatcherTest @@ -62,7 +61,7 @@ public class BaseNodeRoleWatcherTest @Test(timeout = 60_000L) public void testGeneralUseSimulation() { - BaseNodeRoleWatcher nodeRoleWatcher = new BaseNodeRoleWatcher(exec, NodeRole.BROKER); + BaseNodeRoleWatcher nodeRoleWatcher = BaseNodeRoleWatcher.create(exec, NodeRole.BROKER); DiscoveryDruidNode broker1 = buildDiscoveryDruidNode(NodeRole.BROKER, "broker1"); DiscoveryDruidNode broker2 = buildDiscoveryDruidNode(NodeRole.BROKER, "broker2"); @@ -139,7 +138,7 @@ public class BaseNodeRoleWatcherTest @Test(timeout = 60_000L) public void testRegisterListenerBeforeTimeout() throws InterruptedException { - BaseNodeRoleWatcher nodeRoleWatcher = new BaseNodeRoleWatcher(exec, NodeRole.BROKER, 1); + BaseNodeRoleWatcher nodeRoleWatcher = new BaseNodeRoleWatcher(exec, NodeRole.BROKER); TestListener listener1 = new TestListener(); nodeRoleWatcher.registerListener(listener1); @@ -161,16 +160,18 @@ public class BaseNodeRoleWatcherTest assertListener(listener1, false, Collections.emptyList(), Collections.emptyList()); - Assert.assertTrue(listener1.ready.await(1500, TimeUnit.MILLISECONDS)); + nodeRoleWatcher.scheduleTimeout(0); + nodeRoleWatcher.awaitInitialization(); + Assert.assertTrue(listener1.nodeViewInitializationTimedOut.get()); assertListener(listener1, true, ImmutableList.of(broker1, broker3), ImmutableList.of()); } @Test(timeout = 60_000L) - public void testGetAllNodesBeforeTimeout() + public void testGetAllNodesBeforeTimeout() throws InterruptedException { - BaseNodeRoleWatcher nodeRoleWatcher = new BaseNodeRoleWatcher(exec, NodeRole.BROKER, 1); + BaseNodeRoleWatcher nodeRoleWatcher = new BaseNodeRoleWatcher(exec, NodeRole.BROKER); TestListener listener1 = new TestListener(); nodeRoleWatcher.registerListener(listener1); @@ -193,6 +194,9 @@ public class BaseNodeRoleWatcherTest assertListener(listener1, false, Collections.emptyList(), Collections.emptyList()); + nodeRoleWatcher.scheduleTimeout(0); + nodeRoleWatcher.awaitInitialization(); + Assert.assertEquals(2, nodeRoleWatcher.getAllNodes().size()); Assert.assertTrue(listener1.nodeViewInitializationTimedOut.get());