[7.x][ML] Avoid marking data frame analytics task completed twice (#46721) (#46724)

When the stop API is called while the task is running there is
a chance the task gets marked completed twice. This may cause
undesired side effects, like indexing the progress document a second
time after the stop API has returned (the cause for #46705).

This commit adds a check that the task has not been completed before
proceeding to mark it so. In addition, when we update the task's state
we could get some warnings that the task was missing if the stop API
has been called in the meantime. We now check the errors are
`ResourceNotFoundException` and ignore them if so.

Closes #46705

Backports #46721
This commit is contained in:
Dimitris Athanasiou 2019-09-15 17:25:26 +03:00 committed by GitHub
parent e1842c0e5a
commit 63eb0d9081
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 21 additions and 3 deletions

View File

@ -7,6 +7,7 @@ package org.elasticsearch.xpack.ml.dataframe;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexAction;
@ -78,7 +79,13 @@ public class DataFrameAnalyticsManager {
case STARTED:
task.updatePersistentTaskState(reindexingState, ActionListener.wrap(
updatedTask -> reindexingStateListener.onResponse(config),
reindexingStateListener::onFailure));
error -> {
if (error instanceof ResourceNotFoundException) {
// The task has been stopped
} else {
reindexingStateListener.onFailure(error);
}
}));
break;
// The task has fully reindexed the documents and we should continue on with our analyses
case ANALYZING:
@ -221,7 +228,13 @@ public class DataFrameAnalyticsManager {
task.markAsCompleted();
}
}),
error -> task.updateState(DataFrameAnalyticsState.FAILED, error.getMessage())
error -> {
if (error instanceof ResourceNotFoundException) {
// Task has stopped
} else {
task.updateState(DataFrameAnalyticsState.FAILED, error.getMessage());
}
}
));
},
error -> task.updateState(DataFrameAnalyticsState.FAILED, error.getMessage())

View File

@ -96,7 +96,12 @@ public class DataFrameAnalyticsTask extends AllocatedPersistentTask implements S
@Override
public void markAsCompleted() {
persistProgress(() -> super.markAsCompleted());
// It is possible that the stop API has been called in the meantime and that
// may also cause this method to be called. We check whether we have already
// been marked completed to avoid doing it twice.
if (isCompleted() == false) {
persistProgress(() -> super.markAsCompleted());
}
}
@Override