From 037f09bef2224b0912b0f225c7653929dd98af88 Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Thu, 6 Jul 2023 11:51:04 -0700 Subject: [PATCH] HttpRemoteTaskRunner: Fix markLazyWorkers for maxLazyWorkers == 0. (#14532) --- .../druid/indexing/overlord/RemoteTaskRunner.java | 6 +++--- .../druid/indexing/overlord/WorkerTaskRunner.java | 5 ++++- .../indexing/overlord/hrtr/HttpRemoteTaskRunner.java | 11 +++++++++-- .../overlord/hrtr/HttpRemoteTaskRunnerTest.java | 11 +++++++++++ 4 files changed, 27 insertions(+), 6 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java index c3c00e2f0e6..10e66395996 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java @@ -1397,11 +1397,11 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer } @Override - public Collection markWorkersLazy(Predicate isLazyWorker, int maxWorkers) + public Collection markWorkersLazy(Predicate isLazyWorker, int maxLazyWorkers) { // skip the lock and bail early if we should not mark any workers lazy (e.g. number // of current workers is at or below the minNumWorkers of autoscaler config) - if (maxWorkers < 1) { + if (maxLazyWorkers < 1) { return Collections.emptyList(); } // status lock is used to prevent any tasks being assigned to the worker while we mark it lazy @@ -1412,7 +1412,7 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer if (getAssignedTasks(zkWorker.getWorker()).isEmpty() && isLazyWorker.apply(zkWorker.toImmutable())) { log.info("Adding Worker[%s] to lazySet!", zkWorker.getWorker().getHost()); lazyWorkers.put(worker.getKey(), zkWorker); - if (lazyWorkers.size() == maxWorkers) { + if (lazyWorkers.size() == maxLazyWorkers) { // only mark excess workers as lazy and allow their cleanup break; } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/WorkerTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/WorkerTaskRunner.java index 6520de2111f..9ef9065dc8f 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/WorkerTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/WorkerTaskRunner.java @@ -48,8 +48,11 @@ public interface WorkerTaskRunner extends TaskRunner /** * Check which workers can be marked as lazy + * + * @param isLazyWorker predicate that checks if a worker is lazy + * @param maxLazyWorkers maximum number of lazy workers to return */ - Collection markWorkersLazy(Predicate isLazyWorker, int maxWorkers); + Collection markWorkersLazy(Predicate isLazyWorker, int maxLazyWorkers); WorkerTaskRunnerConfig getConfig(); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java index a6354c90628..7d0d210736f 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java @@ -96,6 +96,7 @@ import java.io.InputStream; import java.net.URL; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -922,8 +923,14 @@ public class HttpRemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer } @Override - public Collection markWorkersLazy(Predicate isLazyWorker, int maxWorkers) + public Collection markWorkersLazy(Predicate isLazyWorker, int maxLazyWorkers) { + // skip the lock and bail early if we should not mark any workers lazy (e.g. number + // of current workers is at or below the minNumWorkers of autoscaler config) + if (maxLazyWorkers < 1) { + return Collections.emptyList(); + } + synchronized (statusLock) { for (Map.Entry worker : workers.entrySet()) { final WorkerHolder workerHolder = worker.getValue(); @@ -931,7 +938,7 @@ public class HttpRemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer if (isWorkerOkForMarkingLazy(workerHolder.getWorker()) && isLazyWorker.apply(workerHolder.toImmutable())) { log.info("Adding Worker[%s] to lazySet!", workerHolder.getWorker().getHost()); lazyWorkers.put(worker.getKey(), workerHolder); - if (lazyWorkers.size() == maxWorkers) { + if (lazyWorkers.size() == maxLazyWorkers) { // only mark excess workers as lazy and allow their cleanup break; } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerTest.java index 6cc6329c584..d7feece444c 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerTest.java @@ -1105,6 +1105,17 @@ public class HttpRemoteTaskRunnerTest Assert.assertEquals(task1.getId(), Iterables.getOnlyElement(taskRunner.getRunningTasks()).getTaskId()); Assert.assertEquals(task2.getId(), Iterables.getOnlyElement(taskRunner.getPendingTasks()).getTaskId()); + Assert.assertEquals( + Collections.emptyList(), + taskRunner.markWorkersLazy(Predicates.alwaysTrue(), 0) + ); + + Assert.assertEquals( + "host3:8080", + Iterables.getOnlyElement(taskRunner.markWorkersLazy(Predicates.alwaysTrue(), 1)) + .getHost() + ); + Assert.assertEquals( "host3:8080", Iterables.getOnlyElement(taskRunner.markWorkersLazy(Predicates.alwaysTrue(), Integer.MAX_VALUE))