diff --git a/docs/content/configuration/indexing-service.md b/docs/content/configuration/indexing-service.md index a9c90bd6e9a..078e6c6ebd6 100644 --- a/docs/content/configuration/indexing-service.md +++ b/docs/content/configuration/indexing-service.md @@ -82,7 +82,7 @@ The following configs only apply if the overlord is running in remote mode: |`druid.indexer.runner.maxZnodeBytes`|The maximum size Znode in bytes that can be created in Zookeeper.|524288| |`druid.indexer.runner.taskCleanupTimeout`|How long to wait before failing a task after a middle manager is disconnected from Zookeeper.|PT15M| |`druid.indexer.runner.taskShutdownLinkTimeout`|How long to wait on a shutdown request to a middle manager before timing out|PT1M| -|`druid.indexer.runner.pendingTasksRunnerNumThreads`|Number of threads to allocate pending-tasks to workers, must be at least 1.|3| +|`druid.indexer.runner.pendingTasksRunnerNumThreads`|Number of threads to allocate pending-tasks to workers, must be at least 1.|1| There are additional configs for autoscaling (if it is enabled): @@ -186,6 +186,8 @@ Workers are assigned tasks until capacity. |--------|-----------|-------| |`type`|`fillCapacity`.|required; must be `fillCapacity`| +Note that, if `druid.indexer.runner.pendingTasksRunnerNumThreads` is set to n (> 1) then it means to fill n workers upto capacity simultaneously and then moving on. + ##### Fill Capacity With Affinity An affinity config can be provided. @@ -197,6 +199,8 @@ An affinity config can be provided. Tasks will try to be assigned to preferred workers. Fill capacity strategy is used if no preference for a datasource specified. +Note that, if `druid.indexer.runner.pendingTasksRunnerNumThreads` is set to n (> 1) then it means to fill n preferred workers upto capacity simultaneously and then moving on. + ##### Equal Distribution The workers with the least amount of tasks is assigned the task. diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunner.java b/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunner.java index 1a05d8a59b3..8b8f6cceded 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunner.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunner.java @@ -705,49 +705,46 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer ZkWorker assignedWorker = null; Optional immutableZkWorker = null; try { - immutableZkWorker = strategy.findWorkerForTask( - config, - ImmutableMap.copyOf( - Maps.transformEntries( - Maps.filterEntries( - zkWorkers, new Predicate>() - { - @Override - public boolean apply(Map.Entry input) + synchronized (workersWithUnacknowledgedTask) { + immutableZkWorker = strategy.findWorkerForTask( + config, + ImmutableMap.copyOf( + Maps.transformEntries( + Maps.filterEntries( + zkWorkers, new Predicate>() { - return !lazyWorkers.containsKey(input.getKey()) && - !workersWithUnacknowledgedTask.containsKey(input.getKey()); + @Override + public boolean apply(Map.Entry input) + { + return !lazyWorkers.containsKey(input.getKey()) && + !workersWithUnacknowledgedTask.containsKey(input.getKey()); + } } - } - ), - new Maps.EntryTransformer() - { - @Override - public ImmutableWorkerInfo transformEntry( - String key, ZkWorker value - ) + ), + new Maps.EntryTransformer() { - return value.toImmutable(); + @Override + public ImmutableWorkerInfo transformEntry( + String key, ZkWorker value + ) + { + return value.toImmutable(); + } } - } - ) - ), - task - ); + ) + ), + task + ); - if (immutableZkWorker.isPresent()) { - if (workersWithUnacknowledgedTask.putIfAbsent(immutableZkWorker.get().getWorker().getHost(), task.getId()) - == null) { + if (immutableZkWorker.isPresent() && + workersWithUnacknowledgedTask.putIfAbsent(immutableZkWorker.get().getWorker().getHost(), task.getId()) + == null) { assignedWorker = zkWorkers.get(immutableZkWorker.get().getWorker().getHost()); - return announceTask(task, assignedWorker, taskRunnerWorkItem); - } else { - log.debug( - "Lost race to run task [%s] on worker [%s]. Workers to ack tasks are [%s].", - task.getId(), - immutableZkWorker.get().getWorker().getHost(), - workersWithUnacknowledgedTask - ); } + } + + if (assignedWorker != null) { + return announceTask(task, assignedWorker, taskRunnerWorkItem); } else { log.debug( "Unsuccessful task-assign attempt for task [%s] on workers [%s]. Workers to ack tasks are [%s].", @@ -762,10 +759,6 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer finally { if (assignedWorker != null) { workersWithUnacknowledgedTask.remove(assignedWorker.getWorker().getHost()); - } - - if(immutableZkWorker.isPresent()) { - //if this attempt lost the race to run the task then there might be another worker available to try on. //if this attempt won the race to run the task then other task might be able to use this worker now after task ack. runPendingTasks(); } diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/config/RemoteTaskRunnerConfig.java b/indexing-service/src/main/java/io/druid/indexing/overlord/config/RemoteTaskRunnerConfig.java index e2321964bf8..c9e6df09776 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/config/RemoteTaskRunnerConfig.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/config/RemoteTaskRunnerConfig.java @@ -47,7 +47,7 @@ public class RemoteTaskRunnerConfig extends WorkerTaskRunnerConfig @JsonProperty @Min(1) - private int pendingTasksRunnerNumThreads = 3; + private int pendingTasksRunnerNumThreads = 1; public Period getTaskAssignmentTimeout() {