From 1435b9f4bdcfee19e7b4268ae33ec53462da8e17 Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Sat, 5 Oct 2024 19:32:21 +0530 Subject: [PATCH] Dart: Skip final getCounters, postFinish to idle historicals. (#17255) (#17259) In a Dart query, all Historicals are given worker IDs, but not all of them are going to actually be started or receive work orders. Attempting to send a getCounters or postFinish command to a worker that never received a work order is not only wasteful, but it causes errors due to the workers not knowing about that query ID. Co-authored-by: Gian Merlino --- .../apache/druid/msq/exec/ControllerImpl.java | 20 ++++++++++--------- .../controller/ControllerQueryKernel.java | 15 ++++++++++++++ .../controller/ControllerStageTracker.java | 14 ++++++------- 3 files changed, 32 insertions(+), 17 deletions(-) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java index 60e0910e15b..936ba4af44d 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java @@ -432,8 +432,10 @@ public class ControllerImpl implements Controller } } if (queryKernel != null && queryKernel.isSuccess()) { - // If successful, encourage the tasks to exit successfully. - postFinishToAllTasks(); + // If successful, encourage workers to exit successfully. + // Only send this command to participating workers. For task-based queries, this is all tasks, since tasks + // are launched only when needed. For Dart, this is any servers that were actually assigned work items. + postFinishToWorkers(queryKernel.getAllParticipatingWorkers()); workerManager.stop(false); } else { // If not successful, cancel running tasks. @@ -1456,15 +1458,15 @@ public class ControllerImpl implements Controller return IntervalUtils.difference(replaceIntervals, publishIntervals); } - private CounterSnapshotsTree getCountersFromAllTasks() + private CounterSnapshotsTree fetchCountersFromWorkers(final IntSet workers) { final CounterSnapshotsTree retVal = new CounterSnapshotsTree(); final List taskList = getWorkerIds(); final List> futures = new ArrayList<>(); - for (String taskId : taskList) { - futures.add(netClient.getCounters(taskId)); + for (int workerNumber : workers) { + futures.add(netClient.getCounters(taskList.get(workerNumber))); } final List snapshotsTrees = @@ -1477,14 +1479,14 @@ public class ControllerImpl implements Controller return retVal; } - private void postFinishToAllTasks() + private void postFinishToWorkers(final IntSet workers) { final List taskList = getWorkerIds(); final List> futures = new ArrayList<>(); - for (String taskId : taskList) { - futures.add(netClient.postFinish(taskId)); + for (int workerNumber : workers) { + futures.add(netClient.postFinish(taskList.get(workerNumber))); } FutureUtils.getUnchecked(MSQFutureUtils.allAsList(futures, true), true); @@ -1499,7 +1501,7 @@ public class ControllerImpl implements Controller private CounterSnapshotsTree getFinalCountersSnapshot(@Nullable final ControllerQueryKernel queryKernel) { if (queryKernel != null && queryKernel.isSuccess()) { - return getCountersFromAllTasks(); + return fetchCountersFromWorkers(queryKernel.getAllParticipatingWorkers()); } else { return makeCountersSnapshotForLiveReports(); } diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerQueryKernel.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerQueryKernel.java index 16ed68211d5..b0200234b40 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerQueryKernel.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerQueryKernel.java @@ -27,6 +27,7 @@ import it.unimi.dsi.fastutil.ints.Int2IntAVLTreeMap; import it.unimi.dsi.fastutil.ints.Int2IntMap; import it.unimi.dsi.fastutil.ints.Int2ObjectAVLTreeMap; import it.unimi.dsi.fastutil.ints.Int2ObjectMap; +import it.unimi.dsi.fastutil.ints.IntAVLTreeSet; import it.unimi.dsi.fastutil.ints.IntSet; import org.apache.druid.frame.key.ClusterByPartitions; import org.apache.druid.java.util.common.IAE; @@ -643,6 +644,20 @@ public class ControllerQueryKernel doWithStageTracker(stageId, ControllerStageTracker::fail); } + /** + * Returns the set of all worker numbers that have participated in work done so far by this query. + */ + public IntSet getAllParticipatingWorkers() + { + final IntSet retVal = new IntAVLTreeSet(); + + for (final ControllerStageTracker tracker : stageTrackers.values()) { + retVal.addAll(tracker.getWorkerInputs().workers()); + } + + return retVal; + } + /** * Fetches and returns the stage kernel corresponding to the provided stage id, else throws {@link IAE} */ diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerStageTracker.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerStageTracker.java index 0a62ba24b63..338a35e0d24 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerStageTracker.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerStageTracker.java @@ -63,7 +63,6 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.TreeMap; -import java.util.stream.IntStream; /** * Controller-side state machine for each stage. Used by {@link ControllerQueryKernel} to form the overall state @@ -137,7 +136,7 @@ class ControllerStageTracker this.workerInputs = workerInputs; this.maxRetainedPartitionSketchBytes = maxRetainedPartitionSketchBytes; - initializeWorkerState(workerCount); + initializeWorkerState(workerInputs.workers()); if (stageDef.mustGatherResultKeyStatistics()) { this.completeKeyStatisticsInformation = @@ -149,14 +148,13 @@ class ControllerStageTracker } /** - * Initialize stage for each worker to {@link ControllerWorkerStagePhase#NEW} - * - * @param workerCount + * Initialize stage for each worker to {@link ControllerWorkerStagePhase#NEW}. */ - private void initializeWorkerState(int workerCount) + private void initializeWorkerState(IntSet workers) { - IntStream.range(0, workerCount) - .forEach(wokerNumber -> workerToPhase.put(wokerNumber, ControllerWorkerStagePhase.NEW)); + for (int workerNumber : workers) { + workerToPhase.put(workerNumber, ControllerWorkerStagePhase.NEW); + } } /**