diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/RunWorkOrder.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/RunWorkOrder.java index 0173979efee..27779f53251 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/RunWorkOrder.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/RunWorkOrder.java @@ -110,6 +110,7 @@ import java.util.stream.Collectors; */ public class RunWorkOrder { + private final String controllerTaskId; private final WorkOrder workOrder; private final InputChannelFactory inputChannelFactory; private final CounterTracker counterTracker; @@ -138,6 +139,7 @@ public class RunWorkOrder private ListenableFuture stageOutputChannelsFuture; public RunWorkOrder( + final String controllerTaskId, final WorkOrder workOrder, final InputChannelFactory inputChannelFactory, final CounterTracker counterTracker, @@ -150,6 +152,7 @@ public class RunWorkOrder final boolean removeNullBytes ) { + this.controllerTaskId = controllerTaskId; this.workOrder = workOrder; this.inputChannelFactory = inputChannelFactory; this.counterTracker = counterTracker; @@ -565,7 +568,7 @@ public class RunWorkOrder ) { return DurableStorageOutputChannelFactory.createStandardImplementation( - workOrder.getQueryDefinition().getQueryId(), + controllerTaskId, workOrder.getWorkerNumber(), workOrder.getStageNumber(), workerContext.workerId(), diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java index 912826c5c5a..90f018d07ae 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java @@ -408,6 +408,7 @@ public class WorkerImpl implements Worker final QueryContext queryContext = task != null ? QueryContext.of(task.getContext()) : QueryContext.empty(); final RunWorkOrder runWorkOrder = new RunWorkOrder( + task.getControllerTaskId(), workOrder, inputChannelFactory, stageCounters.computeIfAbsent(