diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncher.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncher.java index 2273822402c..3ca45c7419f 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncher.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncher.java @@ -93,9 +93,14 @@ public class MSQWorkerTaskLauncher private final AtomicReference state = new AtomicReference<>(State.NEW); private final AtomicBoolean cancelTasksOnStop = new AtomicBoolean(); + // Set by launchTasksIfNeeded. @GuardedBy("taskIds") private int desiredTaskCount = 0; + // Set by the main loop when it acknowledges a new desiredTaskCount. + @GuardedBy("taskIds") + private int acknowledgedDesiredTaskCount = 0; + // Worker number -> task ID. @GuardedBy("taskIds") private final List taskIds = new ArrayList<>(); @@ -208,6 +213,7 @@ public class MSQWorkerTaskLauncher synchronized (taskIds) { if (taskCount > desiredTaskCount) { desiredTaskCount = taskCount; + taskIds.notifyAll(); } while (taskIds.size() < taskCount || !IntStream.range(0, taskCount).allMatch(fullyStartedTasks::contains)) { @@ -325,6 +331,7 @@ public class MSQWorkerTaskLauncher synchronized (taskIds) { firstTask = taskIds.size(); taskCount = desiredTaskCount; + acknowledgedDesiredTaskCount = desiredTaskCount; } for (int i = firstTask; i < taskCount; i++) { @@ -460,7 +467,11 @@ public class MSQWorkerTaskLauncher } else { // wait on taskIds so we can wake up early if needed. synchronized (taskIds) { - taskIds.wait(sleepMillis); + // desiredTaskCount is set by launchTasksIfNeeded, and acknowledgedDesiredTaskCount is set by mainLoop when + // it acknowledges a new target. If these are not equal, do another run immediately and launch more tasks. + if (acknowledgedDesiredTaskCount == desiredTaskCount) { + taskIds.wait(sleepMillis); + } } } } else {