Preference to first worker error in-case job fails with `TooManyAttemptsForWorker` (#14170)

This commit is contained in:
Karan Kumar 2023-05-01 14:47:11 +05:30 committed by GitHub
parent 90ea192d9c
commit 078d5ac590
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 55 additions and 1 deletions

View File

@ -34,6 +34,8 @@ import org.apache.druid.msq.indexing.error.MSQException;
import org.apache.druid.msq.indexing.error.MSQFault;
import org.apache.druid.msq.indexing.error.MSQFaultUtils;
import org.apache.druid.msq.indexing.error.QueryRuntimeFault;
import org.apache.druid.msq.indexing.error.TooManyAttemptsForJob;
import org.apache.druid.msq.indexing.error.TooManyAttemptsForWorker;
import org.apache.druid.msq.indexing.error.UnknownFault;
import org.apache.druid.msq.indexing.error.WorkerFailedFault;
import org.apache.druid.msq.indexing.error.WorkerRpcFailedFault;
@ -201,7 +203,10 @@ public class MSQTasks
// function, and it's best if helper functions run quietly.)
if (workerErrorReport != null && (controllerErrorReport.getFault() instanceof WorkerFailedFault
|| controllerErrorReport.getFault() instanceof WorkerRpcFailedFault
|| controllerErrorReport.getFault() instanceof CanceledFault)) {
|| controllerErrorReport.getFault() instanceof CanceledFault
|| controllerErrorReport.getFault() instanceof TooManyAttemptsForWorker
|| controllerErrorReport.getFault() instanceof TooManyAttemptsForJob)) {
return workerErrorReport;
} else {
return controllerErrorReport;

View File

@ -31,6 +31,8 @@ import org.apache.druid.msq.indexing.error.MSQErrorReport;
import org.apache.druid.msq.indexing.error.MSQException;
import org.apache.druid.msq.indexing.error.MSQFaultUtils;
import org.apache.druid.msq.indexing.error.TaskStartTimeoutFault;
import org.apache.druid.msq.indexing.error.TooManyAttemptsForJob;
import org.apache.druid.msq.indexing.error.TooManyAttemptsForWorker;
import org.apache.druid.msq.indexing.error.TooManyColumnsFault;
import org.apache.druid.msq.indexing.error.TooManyWorkersFault;
import org.apache.druid.msq.indexing.error.UnknownFault;
@ -141,6 +143,53 @@ public class MSQTasksTest
);
}
@Test
public void test_makeErrorReport_controllerWithTooManyAttemptsForJob_workerPreferred()
{
final MSQErrorReport controllerReport = MSQTasks.makeErrorReport(
WORKER_ID,
WORKER_HOST,
MSQErrorReport.fromFault(WORKER_ID, WORKER_HOST, null, new TooManyAttemptsForJob(1, 1, "xxx", "xxx")),
null
);
final MSQErrorReport workerReport = MSQTasks.makeErrorReport(
WORKER_ID,
WORKER_HOST,
MSQErrorReport.fromFault(WORKER_ID, WORKER_HOST, null, new TooManyColumnsFault(1, 10)),
null
);
Assert.assertEquals(
workerReport,
MSQTasks.makeErrorReport(WORKER_ID, WORKER_HOST, controllerReport, workerReport)
);
}
@Test
public void test_makeErrorReport_controllerWithTooManyAttemptsForWorker_workerPreferred()
{
final MSQErrorReport controllerReport = MSQTasks.makeErrorReport(
WORKER_ID,
WORKER_HOST,
MSQErrorReport.fromFault(WORKER_ID, WORKER_HOST, null, new TooManyAttemptsForWorker(1, "xxx", 1, "xxx")),
null
);
final MSQErrorReport workerReport = MSQTasks.makeErrorReport(
WORKER_ID,
WORKER_HOST,
MSQErrorReport.fromFault(WORKER_ID, WORKER_HOST, null, new TooManyColumnsFault(1, 10)),
null
);
Assert.assertEquals(
workerReport,
MSQTasks.makeErrorReport(WORKER_ID, WORKER_HOST, controllerReport, workerReport)
);
}
@Test
public void test_getWorkerFromTaskId()
{