From 709c11990743c59ed42c6327b6994bf5e03d3945 Mon Sep 17 00:00:00 2001 From: AmatyaAvadhanula Date: Wed, 9 Oct 2024 19:09:06 +0530 Subject: [PATCH] MSQ: Wake up the main controller thread on workerError. (#17075) (#17304) [Backport] MSQ: Wake up the main controller thread on workerError. (#17075) #17304 This isn't necessary when using MSQWorkerTaskLauncher as the WorkerManager implementation, because in that case, task failure also wakes up the main thread. However, when using workers that are not task-based, we don't want to rely on the WorkerManager for this. Co-authored-by: Gian Merlino --- .../main/java/org/apache/druid/msq/exec/ControllerImpl.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java index 936ba4af44d..7c0a18f3db9 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java @@ -117,6 +117,7 @@ import org.apache.druid.msq.indexing.error.TooManyBucketsFault; import org.apache.druid.msq.indexing.error.TooManySegmentsInTimeChunkFault; import org.apache.druid.msq.indexing.error.TooManyWarningsFault; import org.apache.druid.msq.indexing.error.UnknownFault; +import org.apache.druid.msq.indexing.error.WorkerFailedFault; import org.apache.druid.msq.indexing.error.WorkerRpcFailedFault; import org.apache.druid.msq.indexing.processor.SegmentGeneratorFrameProcessorFactory; import org.apache.druid.msq.indexing.report.MSQSegmentReport; @@ -752,6 +753,11 @@ public class ControllerImpl implements Controller } workerErrorRef.compareAndSet(null, mapQueryColumnNameToOutputColumnName(errorReport)); + + // Wake up the main controller thread. + addToKernelManipulationQueue(kernel -> { + throw new MSQException(new WorkerFailedFault(errorReport.getTaskId(), null)); + }); } /**