mirror of https://github.com/apache/druid.git
Fix cancellation bug in MSQ. (#15368)
Saw bug where MSQ controller task would continue to hold the task slot even after cancel was issued. This was due to a deadlock created on work launch. The main thread was waiting for tasks to spawn and the cancel thread was waiting for tasks to finish. The fix was to instruct the MSQWorkerTaskLauncher thread to stop creating new tasks which would enable the main thread to unblock and release the slot. Also short circuited the taskRetriable condition. Now the check is run in the MSQWorkerTaskLauncher thread as opposed to the main event thread loop. This will result in faster task failure in case the task is deemed to be non retriable.
This commit is contained in:
parent
2e79fd56a7
commit
a70a3d5d48
|
@ -303,7 +303,7 @@ public class ControllerImpl implements Controller
|
|||
private Map<Integer, ClusterStatisticsMergeMode> stageToStatsMergingMode;
|
||||
private WorkerMemoryParameters workerMemoryParameters;
|
||||
private boolean isDurableStorageEnabled;
|
||||
private boolean isFaultToleranceEnabled;
|
||||
private final boolean isFaultToleranceEnabled;
|
||||
private volatile SegmentLoadStatusFetcher segmentLoadWaiter;
|
||||
|
||||
public ControllerImpl(
|
||||
|
@ -316,6 +316,9 @@ public class ControllerImpl implements Controller
|
|||
this.isDurableStorageEnabled = MultiStageQueryContext.isDurableStorageEnabled(
|
||||
task.getQuerySpec().getQuery().context()
|
||||
);
|
||||
this.isFaultToleranceEnabled = MultiStageQueryContext.isFaultToleranceEnabled(task.getQuerySpec()
|
||||
.getQuery()
|
||||
.context());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -372,7 +375,7 @@ public class ControllerImpl implements Controller
|
|||
);
|
||||
|
||||
if (workerTaskLauncher != null) {
|
||||
workerTaskLauncher.waitForWorkerShutdown();
|
||||
workerTaskLauncher.stop(true);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -605,8 +608,6 @@ public class ControllerImpl implements Controller
|
|||
private QueryDefinition initializeQueryDefAndState(final Closer closer)
|
||||
{
|
||||
final QueryContext queryContext = task.getQuerySpec().getQuery().context();
|
||||
isFaultToleranceEnabled = MultiStageQueryContext.isFaultToleranceEnabled(queryContext);
|
||||
|
||||
if (isFaultToleranceEnabled) {
|
||||
if (!queryContext.containsKey(MultiStageQueryContext.CTX_DURABLE_SHUFFLE_STORAGE)) {
|
||||
// if context key not set, enable durableStorage automatically.
|
||||
|
@ -686,13 +687,13 @@ public class ControllerImpl implements Controller
|
|||
task.getDataSource(),
|
||||
context,
|
||||
(failedTask, fault) -> {
|
||||
addToKernelManipulationQueue((kernel) -> {
|
||||
if (isFaultToleranceEnabled) {
|
||||
if (isFaultToleranceEnabled && ControllerQueryKernel.isRetriableFault(fault)) {
|
||||
addToKernelManipulationQueue((kernel) -> {
|
||||
addToRetryQueue(kernel, failedTask.getWorkerNumber(), fault);
|
||||
} else {
|
||||
throw new MSQException(fault);
|
||||
}
|
||||
});
|
||||
});
|
||||
} else {
|
||||
throw new MSQException(fault);
|
||||
}
|
||||
},
|
||||
taskContextOverridesBuilder.build(),
|
||||
// 10 minutes +- 2 minutes jitter
|
||||
|
|
|
@ -664,21 +664,23 @@ public class ControllerQueryKernel
|
|||
*/
|
||||
public List<WorkOrder> getWorkInCaseWorkerEligibleForRetryElseThrow(int workerNumber, MSQFault msqFault)
|
||||
{
|
||||
if (isRetriableFault(msqFault)) {
|
||||
return getWorkInCaseWorkerEligibleForRetry(workerNumber);
|
||||
} else {
|
||||
throw new MSQException(msqFault);
|
||||
}
|
||||
}
|
||||
|
||||
public static boolean isRetriableFault(MSQFault msqFault)
|
||||
{
|
||||
final String errorCode;
|
||||
if (msqFault instanceof WorkerFailedFault) {
|
||||
errorCode = MSQFaultUtils.getErrorCodeFromMessage((((WorkerFailedFault) msqFault).getErrorMsg()));
|
||||
} else {
|
||||
errorCode = msqFault.getErrorCode();
|
||||
}
|
||||
|
||||
log.info("Parsed out errorCode[%s] to check eligibility for retry", errorCode);
|
||||
|
||||
if (RETRIABLE_ERROR_CODES.contains(errorCode)) {
|
||||
return getWorkInCaseWorkerEligibleForRetry(workerNumber);
|
||||
} else {
|
||||
throw new MSQException(msqFault);
|
||||
}
|
||||
log.debug("Parsed out errorCode[%s] to check eligibility for retry", errorCode);
|
||||
return RETRIABLE_ERROR_CODES.contains(errorCode);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
Loading…
Reference in New Issue