diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/MSQTasks.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/MSQTasks.java index bc006898675..44fe0dbce8d 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/MSQTasks.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/MSQTasks.java @@ -34,6 +34,8 @@ import org.apache.druid.msq.indexing.error.MSQException; import org.apache.druid.msq.indexing.error.MSQFault; import org.apache.druid.msq.indexing.error.MSQFaultUtils; import org.apache.druid.msq.indexing.error.QueryRuntimeFault; +import org.apache.druid.msq.indexing.error.TooManyAttemptsForJob; +import org.apache.druid.msq.indexing.error.TooManyAttemptsForWorker; import org.apache.druid.msq.indexing.error.UnknownFault; import org.apache.druid.msq.indexing.error.WorkerFailedFault; import org.apache.druid.msq.indexing.error.WorkerRpcFailedFault; @@ -201,7 +203,10 @@ public class MSQTasks // function, and it's best if helper functions run quietly.) if (workerErrorReport != null && (controllerErrorReport.getFault() instanceof WorkerFailedFault || controllerErrorReport.getFault() instanceof WorkerRpcFailedFault - || controllerErrorReport.getFault() instanceof CanceledFault)) { + || controllerErrorReport.getFault() instanceof CanceledFault + || controllerErrorReport.getFault() instanceof TooManyAttemptsForWorker + || controllerErrorReport.getFault() instanceof TooManyAttemptsForJob)) { + return workerErrorReport; } else { return controllerErrorReport; diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQTasksTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQTasksTest.java index 8371705dfde..bd6911c04d8 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQTasksTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQTasksTest.java @@ -31,6 +31,8 @@ import org.apache.druid.msq.indexing.error.MSQErrorReport; import org.apache.druid.msq.indexing.error.MSQException; import org.apache.druid.msq.indexing.error.MSQFaultUtils; import org.apache.druid.msq.indexing.error.TaskStartTimeoutFault; +import org.apache.druid.msq.indexing.error.TooManyAttemptsForJob; +import org.apache.druid.msq.indexing.error.TooManyAttemptsForWorker; import org.apache.druid.msq.indexing.error.TooManyColumnsFault; import org.apache.druid.msq.indexing.error.TooManyWorkersFault; import org.apache.druid.msq.indexing.error.UnknownFault; @@ -141,6 +143,53 @@ public class MSQTasksTest ); } + @Test + public void test_makeErrorReport_controllerWithTooManyAttemptsForJob_workerPreferred() + { + final MSQErrorReport controllerReport = MSQTasks.makeErrorReport( + WORKER_ID, + WORKER_HOST, + MSQErrorReport.fromFault(WORKER_ID, WORKER_HOST, null, new TooManyAttemptsForJob(1, 1, "xxx", "xxx")), + null + ); + + final MSQErrorReport workerReport = MSQTasks.makeErrorReport( + WORKER_ID, + WORKER_HOST, + MSQErrorReport.fromFault(WORKER_ID, WORKER_HOST, null, new TooManyColumnsFault(1, 10)), + null + ); + + Assert.assertEquals( + workerReport, + MSQTasks.makeErrorReport(WORKER_ID, WORKER_HOST, controllerReport, workerReport) + ); + } + + @Test + public void test_makeErrorReport_controllerWithTooManyAttemptsForWorker_workerPreferred() + { + final MSQErrorReport controllerReport = MSQTasks.makeErrorReport( + WORKER_ID, + WORKER_HOST, + MSQErrorReport.fromFault(WORKER_ID, WORKER_HOST, null, new TooManyAttemptsForWorker(1, "xxx", 1, "xxx")), + null + ); + + final MSQErrorReport workerReport = MSQTasks.makeErrorReport( + WORKER_ID, + WORKER_HOST, + MSQErrorReport.fromFault(WORKER_ID, WORKER_HOST, null, new TooManyColumnsFault(1, 10)), + null + ); + + Assert.assertEquals( + workerReport, + MSQTasks.makeErrorReport(WORKER_ID, WORKER_HOST, controllerReport, workerReport) + ); + } + + @Test public void test_getWorkerFromTaskId() {