Dart: Skip final getCounters, postFinish to idle historicals. (#17255) (#17259)

In a Dart query, all Historicals are given worker IDs, but not all of them
are going to actually be started or receive work orders.

Attempting to send a getCounters or postFinish command to a worker that
never received a work order is not only wasteful, but it causes errors due
to the workers not knowing about that query ID.

Co-authored-by: Gian Merlino <gianmerlino@gmail.com>
This commit is contained in:
Kashif Faraz 2024-10-05 19:32:21 +05:30 committed by GitHub
parent d8e3ac89c3
commit 1435b9f4bd
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 32 additions and 17 deletions

View File

@ -432,8 +432,10 @@ public class ControllerImpl implements Controller
}
}
if (queryKernel != null && queryKernel.isSuccess()) {
// If successful, encourage the tasks to exit successfully.
postFinishToAllTasks();
// If successful, encourage workers to exit successfully.
// Only send this command to participating workers. For task-based queries, this is all tasks, since tasks
// are launched only when needed. For Dart, this is any servers that were actually assigned work items.
postFinishToWorkers(queryKernel.getAllParticipatingWorkers());
workerManager.stop(false);
} else {
// If not successful, cancel running tasks.
@ -1456,15 +1458,15 @@ public class ControllerImpl implements Controller
return IntervalUtils.difference(replaceIntervals, publishIntervals);
}
private CounterSnapshotsTree getCountersFromAllTasks()
private CounterSnapshotsTree fetchCountersFromWorkers(final IntSet workers)
{
final CounterSnapshotsTree retVal = new CounterSnapshotsTree();
final List<String> taskList = getWorkerIds();
final List<ListenableFuture<CounterSnapshotsTree>> futures = new ArrayList<>();
for (String taskId : taskList) {
futures.add(netClient.getCounters(taskId));
for (int workerNumber : workers) {
futures.add(netClient.getCounters(taskList.get(workerNumber)));
}
final List<CounterSnapshotsTree> snapshotsTrees =
@ -1477,14 +1479,14 @@ public class ControllerImpl implements Controller
return retVal;
}
private void postFinishToAllTasks()
private void postFinishToWorkers(final IntSet workers)
{
final List<String> taskList = getWorkerIds();
final List<ListenableFuture<Void>> futures = new ArrayList<>();
for (String taskId : taskList) {
futures.add(netClient.postFinish(taskId));
for (int workerNumber : workers) {
futures.add(netClient.postFinish(taskList.get(workerNumber)));
}
FutureUtils.getUnchecked(MSQFutureUtils.allAsList(futures, true), true);
@ -1499,7 +1501,7 @@ public class ControllerImpl implements Controller
private CounterSnapshotsTree getFinalCountersSnapshot(@Nullable final ControllerQueryKernel queryKernel)
{
if (queryKernel != null && queryKernel.isSuccess()) {
return getCountersFromAllTasks();
return fetchCountersFromWorkers(queryKernel.getAllParticipatingWorkers());
} else {
return makeCountersSnapshotForLiveReports();
}

View File

@ -27,6 +27,7 @@ import it.unimi.dsi.fastutil.ints.Int2IntAVLTreeMap;
import it.unimi.dsi.fastutil.ints.Int2IntMap;
import it.unimi.dsi.fastutil.ints.Int2ObjectAVLTreeMap;
import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
import it.unimi.dsi.fastutil.ints.IntAVLTreeSet;
import it.unimi.dsi.fastutil.ints.IntSet;
import org.apache.druid.frame.key.ClusterByPartitions;
import org.apache.druid.java.util.common.IAE;
@ -643,6 +644,20 @@ public class ControllerQueryKernel
doWithStageTracker(stageId, ControllerStageTracker::fail);
}
/**
* Returns the set of all worker numbers that have participated in work done so far by this query.
*/
public IntSet getAllParticipatingWorkers()
{
final IntSet retVal = new IntAVLTreeSet();
for (final ControllerStageTracker tracker : stageTrackers.values()) {
retVal.addAll(tracker.getWorkerInputs().workers());
}
return retVal;
}
/**
* Fetches and returns the stage kernel corresponding to the provided stage id, else throws {@link IAE}
*/

View File

@ -63,7 +63,6 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.stream.IntStream;
/**
* Controller-side state machine for each stage. Used by {@link ControllerQueryKernel} to form the overall state
@ -137,7 +136,7 @@ class ControllerStageTracker
this.workerInputs = workerInputs;
this.maxRetainedPartitionSketchBytes = maxRetainedPartitionSketchBytes;
initializeWorkerState(workerCount);
initializeWorkerState(workerInputs.workers());
if (stageDef.mustGatherResultKeyStatistics()) {
this.completeKeyStatisticsInformation =
@ -149,14 +148,13 @@ class ControllerStageTracker
}
/**
* Initialize stage for each worker to {@link ControllerWorkerStagePhase#NEW}
*
* @param workerCount
* Initialize stage for each worker to {@link ControllerWorkerStagePhase#NEW}.
*/
private void initializeWorkerState(int workerCount)
private void initializeWorkerState(IntSet workers)
{
IntStream.range(0, workerCount)
.forEach(wokerNumber -> workerToPhase.put(wokerNumber, ControllerWorkerStagePhase.NEW));
for (int workerNumber : workers) {
workerToPhase.put(workerNumber, ControllerWorkerStagePhase.NEW);
}
}
/**