diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsTask.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsTask.java index 4bdc0c9b1ac..8b4e32e59a3 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsTask.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsTask.java @@ -25,6 +25,7 @@ import org.elasticsearch.client.Client; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.json.JsonXContent; import org.elasticsearch.index.query.IdsQueryBuilder; @@ -161,7 +162,11 @@ public class DataFrameAnalyticsTask extends AllocatedPersistentTask implements S cancelReindex.setTaskId(reindexTaskId); cancelReindex.setReason(reason); cancelReindex.setTimeout(timeout); - CancelTasksResponse cancelReindexResponse = client.admin().cluster().cancelTasks(cancelReindex).actionGet(); + + // We need to cancel the reindexing task within context with ML origin as we started the task + // from the same context + CancelTasksResponse cancelReindexResponse = cancelTaskWithinMlOriginContext(cancelReindex); + Throwable firstError = null; if (cancelReindexResponse.getNodeFailures().isEmpty() == false) { firstError = cancelReindexResponse.getNodeFailures().get(0).getRootCause(); @@ -178,6 +183,13 @@ public class DataFrameAnalyticsTask extends AllocatedPersistentTask implements S } } + private CancelTasksResponse cancelTaskWithinMlOriginContext(CancelTasksRequest cancelTasksRequest) { + final ThreadContext threadContext = client.threadPool().getThreadContext(); + try (ThreadContext.StoredContext ignore = threadContext.stashWithOrigin(ML_ORIGIN)) { + return client.admin().cluster().cancelTasks(cancelTasksRequest).actionGet(); + } + } + public void setFailed(String reason) { DataFrameAnalyticsTaskState newTaskState = new DataFrameAnalyticsTaskState(DataFrameAnalyticsState.FAILED, getAllocationId(), reason);