When a data frame analytics job is stopped, if the reindexing task was still in progress we cancel it. Cancelling it should be done from the same context as when we executed the reindexing task. That means from a thread context with ML origin. Backport of #54874
This commit is contained in:
parent
bbc57828c4
commit
9b4ac60b53
|
@ -25,6 +25,7 @@ import org.elasticsearch.client.Client;
|
|||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.util.concurrent.ThreadContext;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.json.JsonXContent;
|
||||
import org.elasticsearch.index.query.IdsQueryBuilder;
|
||||
|
@ -161,7 +162,11 @@ public class DataFrameAnalyticsTask extends AllocatedPersistentTask implements S
|
|||
cancelReindex.setTaskId(reindexTaskId);
|
||||
cancelReindex.setReason(reason);
|
||||
cancelReindex.setTimeout(timeout);
|
||||
CancelTasksResponse cancelReindexResponse = client.admin().cluster().cancelTasks(cancelReindex).actionGet();
|
||||
|
||||
// We need to cancel the reindexing task within context with ML origin as we started the task
|
||||
// from the same context
|
||||
CancelTasksResponse cancelReindexResponse = cancelTaskWithinMlOriginContext(cancelReindex);
|
||||
|
||||
Throwable firstError = null;
|
||||
if (cancelReindexResponse.getNodeFailures().isEmpty() == false) {
|
||||
firstError = cancelReindexResponse.getNodeFailures().get(0).getRootCause();
|
||||
|
@ -178,6 +183,13 @@ public class DataFrameAnalyticsTask extends AllocatedPersistentTask implements S
|
|||
}
|
||||
}
|
||||
|
||||
private CancelTasksResponse cancelTaskWithinMlOriginContext(CancelTasksRequest cancelTasksRequest) {
|
||||
final ThreadContext threadContext = client.threadPool().getThreadContext();
|
||||
try (ThreadContext.StoredContext ignore = threadContext.stashWithOrigin(ML_ORIGIN)) {
|
||||
return client.admin().cluster().cancelTasks(cancelTasksRequest).actionGet();
|
||||
}
|
||||
}
|
||||
|
||||
public void setFailed(String reason) {
|
||||
DataFrameAnalyticsTaskState newTaskState = new DataFrameAnalyticsTaskState(DataFrameAnalyticsState.FAILED,
|
||||
getAllocationId(), reason);
|
||||
|
|
Loading…
Reference in New Issue