diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java index 10e66395996..0ea3039019e 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java @@ -1401,29 +1401,31 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer { // skip the lock and bail early if we should not mark any workers lazy (e.g. number // of current workers is at or below the minNumWorkers of autoscaler config) - if (maxLazyWorkers < 1) { - return Collections.emptyList(); + if (lazyWorkers.size() >= maxLazyWorkers) { + return getLazyWorkers(); } - // status lock is used to prevent any tasks being assigned to the worker while we mark it lazy + + // Search for new workers to mark lazy. + // Status lock is used to prevent any tasks being assigned to workers while we mark them lazy synchronized (statusLock) { for (Map.Entry worker : zkWorkers.entrySet()) { + if (lazyWorkers.size() >= maxLazyWorkers) { + break; + } final ZkWorker zkWorker = worker.getValue(); try { if (getAssignedTasks(zkWorker.getWorker()).isEmpty() && isLazyWorker.apply(zkWorker.toImmutable())) { log.info("Adding Worker[%s] to lazySet!", zkWorker.getWorker().getHost()); lazyWorkers.put(worker.getKey(), zkWorker); - if (lazyWorkers.size() == maxLazyWorkers) { - // only mark excess workers as lazy and allow their cleanup - break; - } } } catch (Exception e) { throw new RuntimeException(e); } } - return getWorkerFromZK(lazyWorkers.values()); } + + return getLazyWorkers(); } protected List getAssignedTasks(Worker worker) throws Exception diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/WorkerTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/WorkerTaskRunner.java index 9ef9065dc8f..04188f01c96 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/WorkerTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/WorkerTaskRunner.java @@ -26,6 +26,7 @@ import org.apache.druid.indexing.overlord.config.WorkerTaskRunnerConfig; import org.apache.druid.indexing.worker.Worker; import java.util.Collection; +import java.util.List; @PublicApi public interface WorkerTaskRunner extends TaskRunner @@ -47,10 +48,16 @@ public interface WorkerTaskRunner extends TaskRunner Collection getLazyWorkers(); /** - * Check which workers can be marked as lazy + * Mark workers matching a predicate as lazy, up to a maximum. If the number of workers previously marked lazy is + * equal to or higher than the provided maximum, this method will return those previously marked workers and will + * not mark any additional workers. Workers are never un-marked lazy once they are marked lazy. + * + * This method is called by {@link org.apache.druid.indexing.overlord.autoscaling.ProvisioningStrategy} + * implementations. It is expected that the lazy workers returned by this method will be terminated using + * {@link org.apache.druid.indexing.overlord.autoscaling.AutoScaler#terminate(List)}. * * @param isLazyWorker predicate that checks if a worker is lazy - * @param maxLazyWorkers maximum number of lazy workers to return + * @param maxLazyWorkers desired maximum number of lazy workers (actual number may be higher) */ Collection markWorkersLazy(Predicate isLazyWorker, int maxLazyWorkers); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java index 935f2bc61c4..57d610ab736 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java @@ -96,7 +96,6 @@ import java.io.InputStream; import java.net.URL; import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -927,29 +926,31 @@ public class HttpRemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer { // skip the lock and bail early if we should not mark any workers lazy (e.g. number // of current workers is at or below the minNumWorkers of autoscaler config) - if (maxLazyWorkers < 1) { - return Collections.emptyList(); + if (lazyWorkers.size() >= maxLazyWorkers) { + return getLazyWorkers(); } + // Search for new workers to mark lazy. + // Status lock is used to prevent any tasks being assigned to workers while we mark them lazy synchronized (statusLock) { for (Map.Entry worker : workers.entrySet()) { + if (lazyWorkers.size() >= maxLazyWorkers) { + break; + } final WorkerHolder workerHolder = worker.getValue(); try { if (isWorkerOkForMarkingLazy(workerHolder.getWorker()) && isLazyWorker.apply(workerHolder.toImmutable())) { log.info("Adding Worker[%s] to lazySet!", workerHolder.getWorker().getHost()); lazyWorkers.put(worker.getKey(), workerHolder); - if (lazyWorkers.size() == maxLazyWorkers) { - // only mark excess workers as lazy and allow their cleanup - break; - } } } catch (Exception e) { throw new RuntimeException(e); } } - return getLazyWorkers(); } + + return getLazyWorkers(); } private boolean isWorkerOkForMarkingLazy(Worker worker)