From 8630974157c071a2aac03d6493f17e18374b8681 Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Mon, 16 Sep 2024 05:30:09 -0700 Subject: [PATCH] MSQ: Wake up the main controller thread on workerError. (#17075) This isn't necessary when using MSQWorkerTaskLauncher as the WorkerManager implementation, because in that case, task failure also wakes up the main thread. However, when using workers that are not task-based, we don't want to rely on the WorkerManager for this. --- .../main/java/org/apache/druid/msq/exec/ControllerImpl.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java index 6d1ef21abbf..2a29d40b9fe 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java @@ -117,6 +117,7 @@ import org.apache.druid.msq.indexing.error.TooManyBucketsFault; import org.apache.druid.msq.indexing.error.TooManySegmentsInTimeChunkFault; import org.apache.druid.msq.indexing.error.TooManyWarningsFault; import org.apache.druid.msq.indexing.error.UnknownFault; +import org.apache.druid.msq.indexing.error.WorkerFailedFault; import org.apache.druid.msq.indexing.error.WorkerRpcFailedFault; import org.apache.druid.msq.indexing.processor.SegmentGeneratorFrameProcessorFactory; import org.apache.druid.msq.indexing.report.MSQSegmentReport; @@ -754,6 +755,11 @@ public class ControllerImpl implements Controller } workerErrorRef.compareAndSet(null, mapQueryColumnNameToOutputColumnName(errorReport)); + + // Wake up the main controller thread. + addToKernelManipulationQueue(kernel -> { + throw new MSQException(new WorkerFailedFault(errorReport.getTaskId(), null)); + }); } /**