mirror of https://github.com/apache/druid.git
MSQ: Launch initial tasks faster. (#13393)
Notify the mainLoop thread to skip a sleep when the desired task count changes.
This commit is contained in:
parent
b8ca03d283
commit
f037776fd8
|
@ -93,9 +93,14 @@ public class MSQWorkerTaskLauncher
|
|||
private final AtomicReference<State> state = new AtomicReference<>(State.NEW);
|
||||
private final AtomicBoolean cancelTasksOnStop = new AtomicBoolean();
|
||||
|
||||
// Set by launchTasksIfNeeded.
|
||||
@GuardedBy("taskIds")
|
||||
private int desiredTaskCount = 0;
|
||||
|
||||
// Set by the main loop when it acknowledges a new desiredTaskCount.
|
||||
@GuardedBy("taskIds")
|
||||
private int acknowledgedDesiredTaskCount = 0;
|
||||
|
||||
// Worker number -> task ID.
|
||||
@GuardedBy("taskIds")
|
||||
private final List<String> taskIds = new ArrayList<>();
|
||||
|
@ -208,6 +213,7 @@ public class MSQWorkerTaskLauncher
|
|||
synchronized (taskIds) {
|
||||
if (taskCount > desiredTaskCount) {
|
||||
desiredTaskCount = taskCount;
|
||||
taskIds.notifyAll();
|
||||
}
|
||||
|
||||
while (taskIds.size() < taskCount || !IntStream.range(0, taskCount).allMatch(fullyStartedTasks::contains)) {
|
||||
|
@ -325,6 +331,7 @@ public class MSQWorkerTaskLauncher
|
|||
synchronized (taskIds) {
|
||||
firstTask = taskIds.size();
|
||||
taskCount = desiredTaskCount;
|
||||
acknowledgedDesiredTaskCount = desiredTaskCount;
|
||||
}
|
||||
|
||||
for (int i = firstTask; i < taskCount; i++) {
|
||||
|
@ -460,9 +467,13 @@ public class MSQWorkerTaskLauncher
|
|||
} else {
|
||||
// wait on taskIds so we can wake up early if needed.
|
||||
synchronized (taskIds) {
|
||||
// desiredTaskCount is set by launchTasksIfNeeded, and acknowledgedDesiredTaskCount is set by mainLoop when
|
||||
// it acknowledges a new target. If these are not equal, do another run immediately and launch more tasks.
|
||||
if (acknowledgedDesiredTaskCount == desiredTaskCount) {
|
||||
taskIds.wait(sleepMillis);
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// No wait, but check interrupted status anyway.
|
||||
if (Thread.interrupted()) {
|
||||
|
|
Loading…
Reference in New Issue