Fix race in BaseNodeRoleWatcher tests (#16064)

* Fix race in BaseNodeRoleWatcher tests

* Make non static
This commit is contained in:
AmatyaAvadhanula 2024-03-08 03:11:16 +05:30 committed by GitHub
parent aaa64832fd
commit 5871b81a78
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 39 additions and 28 deletions

View File

@ -210,7 +210,7 @@ public class K8sDruidNodeDiscoveryProvider extends DruidNodeDiscoveryProvider
this.k8sApiClient = k8sApiClient; this.k8sApiClient = k8sApiClient;
this.nodeRole = nodeRole; this.nodeRole = nodeRole;
this.baseNodeRoleWatcher = new BaseNodeRoleWatcher(listenerExecutor, nodeRole); this.baseNodeRoleWatcher = BaseNodeRoleWatcher.create(listenerExecutor, nodeRole);
this.watcherErrorRetryWaitMS = watcherErrorRetryWaitMS; this.watcherErrorRetryWaitMS = watcherErrorRetryWaitMS;
} }

View File

@ -185,7 +185,7 @@ public class CuratorDruidNodeDiscoveryProvider extends DruidNodeDiscoveryProvide
this.curatorFramework = curatorFramework; this.curatorFramework = curatorFramework;
this.nodeRole = nodeRole; this.nodeRole = nodeRole;
this.jsonMapper = jsonMapper; this.jsonMapper = jsonMapper;
this.baseNodeRoleWatcher = new BaseNodeRoleWatcher(listenerExecutor, nodeRole); this.baseNodeRoleWatcher = BaseNodeRoleWatcher.create(listenerExecutor, nodeRole);
// This is required to be single threaded from docs in PathChildrenCache. // This is required to be single threaded from docs in PathChildrenCache.
this.cacheExecutor = Execs.singleThreaded( this.cacheExecutor = Execs.singleThreaded(

View File

@ -75,29 +75,25 @@ public class BaseNodeRoleWatcher
ScheduledExecutorService listenerExecutor, ScheduledExecutorService listenerExecutor,
NodeRole nodeRole NodeRole nodeRole
) )
{
this(listenerExecutor, nodeRole, DEFAULT_TIMEOUT_SECONDS);
}
BaseNodeRoleWatcher(
ScheduledExecutorService listenerExecutor,
NodeRole nodeRole,
long timeout
)
{ {
this.nodeRole = nodeRole; this.nodeRole = nodeRole;
this.listenerExecutor = listenerExecutor; this.listenerExecutor = listenerExecutor;
this.listenerExecutor.schedule( }
this::cacheInitializedTimedOut,
timeout, public static BaseNodeRoleWatcher create(
TimeUnit.SECONDS ScheduledExecutorService listenerExecutor,
); NodeRole nodeRole
)
{
BaseNodeRoleWatcher nodeRoleWatcher = new BaseNodeRoleWatcher(listenerExecutor, nodeRole);
nodeRoleWatcher.scheduleTimeout(DEFAULT_TIMEOUT_SECONDS);
return nodeRoleWatcher;
} }
public Collection<DiscoveryDruidNode> getAllNodes() public Collection<DiscoveryDruidNode> getAllNodes()
{ {
try { try {
cacheInitialized.await(); awaitInitialization();
} }
catch (InterruptedException ex) { catch (InterruptedException ex) {
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
@ -255,12 +251,9 @@ public class BaseNodeRoleWatcher
synchronized (lock) { synchronized (lock) {
// No need to wait on CountDownLatch, because we are holding the lock under which it could only be // No need to wait on CountDownLatch, because we are holding the lock under which it could only be
// counted down. // counted down.
if (cacheInitialized.getCount() == 0) { if (cacheInitialized.getCount() != 0) {
LOGGER.warn("Cache for node watcher of role[%s] is already initialized. ignoring timeout.", nodeRole.getJsonName()); cacheInitialized(true);
return;
} }
cacheInitialized(true);
} }
} }
@ -352,6 +345,20 @@ public class BaseNodeRoleWatcher
} }
} }
void scheduleTimeout(long timeout)
{
listenerExecutor.schedule(
this::cacheInitializedTimedOut,
timeout,
TimeUnit.SECONDS
);
}
void awaitInitialization() throws InterruptedException
{
cacheInitialized.await();
}
private void safeSchedule(Runnable runnable, String errMsgFormat, Object... args) private void safeSchedule(Runnable runnable, String errMsgFormat, Object... args)
{ {
listenerExecutor.submit(() -> { listenerExecutor.submit(() -> {

View File

@ -40,7 +40,6 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
public class BaseNodeRoleWatcherTest public class BaseNodeRoleWatcherTest
@ -62,7 +61,7 @@ public class BaseNodeRoleWatcherTest
@Test(timeout = 60_000L) @Test(timeout = 60_000L)
public void testGeneralUseSimulation() public void testGeneralUseSimulation()
{ {
BaseNodeRoleWatcher nodeRoleWatcher = new BaseNodeRoleWatcher(exec, NodeRole.BROKER); BaseNodeRoleWatcher nodeRoleWatcher = BaseNodeRoleWatcher.create(exec, NodeRole.BROKER);
DiscoveryDruidNode broker1 = buildDiscoveryDruidNode(NodeRole.BROKER, "broker1"); DiscoveryDruidNode broker1 = buildDiscoveryDruidNode(NodeRole.BROKER, "broker1");
DiscoveryDruidNode broker2 = buildDiscoveryDruidNode(NodeRole.BROKER, "broker2"); DiscoveryDruidNode broker2 = buildDiscoveryDruidNode(NodeRole.BROKER, "broker2");
@ -139,7 +138,7 @@ public class BaseNodeRoleWatcherTest
@Test(timeout = 60_000L) @Test(timeout = 60_000L)
public void testRegisterListenerBeforeTimeout() throws InterruptedException public void testRegisterListenerBeforeTimeout() throws InterruptedException
{ {
BaseNodeRoleWatcher nodeRoleWatcher = new BaseNodeRoleWatcher(exec, NodeRole.BROKER, 1); BaseNodeRoleWatcher nodeRoleWatcher = new BaseNodeRoleWatcher(exec, NodeRole.BROKER);
TestListener listener1 = new TestListener(); TestListener listener1 = new TestListener();
nodeRoleWatcher.registerListener(listener1); nodeRoleWatcher.registerListener(listener1);
@ -161,16 +160,18 @@ public class BaseNodeRoleWatcherTest
assertListener(listener1, false, Collections.emptyList(), Collections.emptyList()); assertListener(listener1, false, Collections.emptyList(), Collections.emptyList());
Assert.assertTrue(listener1.ready.await(1500, TimeUnit.MILLISECONDS)); nodeRoleWatcher.scheduleTimeout(0);
nodeRoleWatcher.awaitInitialization();
Assert.assertTrue(listener1.nodeViewInitializationTimedOut.get()); Assert.assertTrue(listener1.nodeViewInitializationTimedOut.get());
assertListener(listener1, true, ImmutableList.of(broker1, broker3), ImmutableList.of()); assertListener(listener1, true, ImmutableList.of(broker1, broker3), ImmutableList.of());
} }
@Test(timeout = 60_000L) @Test(timeout = 60_000L)
public void testGetAllNodesBeforeTimeout() public void testGetAllNodesBeforeTimeout() throws InterruptedException
{ {
BaseNodeRoleWatcher nodeRoleWatcher = new BaseNodeRoleWatcher(exec, NodeRole.BROKER, 1); BaseNodeRoleWatcher nodeRoleWatcher = new BaseNodeRoleWatcher(exec, NodeRole.BROKER);
TestListener listener1 = new TestListener(); TestListener listener1 = new TestListener();
nodeRoleWatcher.registerListener(listener1); nodeRoleWatcher.registerListener(listener1);
@ -193,6 +194,9 @@ public class BaseNodeRoleWatcherTest
assertListener(listener1, false, Collections.emptyList(), Collections.emptyList()); assertListener(listener1, false, Collections.emptyList(), Collections.emptyList());
nodeRoleWatcher.scheduleTimeout(0);
nodeRoleWatcher.awaitInitialization();
Assert.assertEquals(2, nodeRoleWatcher.getAllNodes().size()); Assert.assertEquals(2, nodeRoleWatcher.getAllNodes().size());
Assert.assertTrue(listener1.nodeViewInitializationTimedOut.get()); Assert.assertTrue(listener1.nodeViewInitializationTimedOut.get());