From 75ac9051cdff1870dfcba048dd412d7da4635804 Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Fri, 13 Sep 2024 15:59:41 -0700 Subject: [PATCH] MSQ: Fix two issues with phase transitions. (#17053) 1) ControllerQueryKernel: Update readyToReadResults to acknowledge that sorting stages can go directly from READING_INPUT to RESULTS_READY. 2) WorkerStageKernel: Ignore RESULTS_COMPLETE if work is already finished, which can happen if the transition to FINISHED comes early due to a downstream LIMIT. (cherry picked from commit 654e0b444bfc968ba0ec65a669af6dfef98604cd) --- .../druid/msq/kernel/controller/ControllerQueryKernel.java | 7 +++---- .../apache/druid/msq/kernel/worker/WorkerStageKernel.java | 6 ++++++ 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerQueryKernel.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerQueryKernel.java index b0200234b40..90c9b496721 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerQueryKernel.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerQueryKernel.java @@ -692,11 +692,10 @@ public class ControllerQueryKernel { if (stageOutputChannelModes.get(stageId) == OutputChannelMode.MEMORY) { if (getStageDefinition(stageId).doesSortDuringShuffle()) { - // Stages that sort during shuffle go through a READING_INPUT phase followed by a POST_READING phase - // (once all input is read). These stages start producing output once POST_READING starts. - return newPhase == ControllerStagePhase.POST_READING; + // Sorting stages start producing output when they finish reading their input. + return newPhase.isDoneReadingInput(); } else { - // Can read results immediately. + // Non-sorting stages start producing output immediately. return newPhase == ControllerStagePhase.NEW; } } else { diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/worker/WorkerStageKernel.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/worker/WorkerStageKernel.java index 5745e9b75af..726333a2d1d 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/worker/WorkerStageKernel.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/worker/WorkerStageKernel.java @@ -183,6 +183,12 @@ public class WorkerStageKernel throw new NullPointerException("resultObject must not be null"); } + if (phase.isTerminal()) { + // Ignore RESULTS_COMPLETE if work is already finished. This can happen if we transition to FINISHED early + // due to a downstream stage including a limit. + return; + } + transitionTo(WorkerStagePhase.RESULTS_COMPLETE); this.resultObject = resultObject; }