[7.x][ML] Fix race condition updating reindexing progress (#56135) (#56146)

In #55763 I thought I could remove the flag that marks
reindexing was finished on a data frame analytics task.
However, that exposed a race condition. It is possible that
between updating reindexing progress to 100 because we
have called `DataFrameAnalyticsManager.startAnalytics()` and
a call to the _stats API which updates reindexing progress via the
method `DataFrameAnalyticsTask.updateReindexTaskProgress()` we
end up overwriting the 100 with a lower progress value.

This commit fixes this issue by bringing back the help of
a `isReindexingFinished` flag as it was prior to #55763.

Closes #56128

Backport of #56135
This commit is contained in:
Dimitris Athanasiou 2020-05-05 10:48:42 +03:00 committed by GitHub
parent e8763bad41
commit 6061aa3db4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 8 additions and 3 deletions

View File

@ -338,7 +338,7 @@ public class DataFrameAnalyticsManager {
ActionListener<RefreshResponse> refreshListener = ActionListener.wrap(
refreshResponse -> {
// Now we can ensure reindexing progress is complete
task.getStatsHolder().getProgressTracker().updateReindexingProgress(100);
task.setReindexingFinished();
// TODO This could fail with errors. In that case we get stuck with the copied index.
// We could delete the index in case of failure or we could try building the factory before reindexing

View File

@ -67,6 +67,7 @@ public class DataFrameAnalyticsTask extends AllocatedPersistentTask implements S
private final StartDataFrameAnalyticsAction.TaskParams taskParams;
@Nullable
private volatile Long reindexingTaskId;
private volatile boolean isReindexingFinished;
private volatile boolean isStopping;
private volatile boolean isMarkAsCompletedCalled;
private final StatsHolder statsHolder;
@ -92,6 +93,10 @@ public class DataFrameAnalyticsTask extends AllocatedPersistentTask implements S
this.reindexingTaskId = reindexingTaskId;
}
public void setReindexingFinished() {
isReindexingFinished = true;
}
public boolean isStopping() {
return isStopping;
}
@ -228,7 +233,7 @@ public class DataFrameAnalyticsTask extends AllocatedPersistentTask implements S
private void getReindexTaskProgress(ActionListener<Integer> listener) {
TaskId reindexTaskId = getReindexTaskId();
if (reindexTaskId == null) {
listener.onResponse(statsHolder.getProgressTracker().getReindexingProgressPercent());
listener.onResponse(isReindexingFinished ? 100 : 0);
return;
}
@ -244,7 +249,7 @@ public class DataFrameAnalyticsTask extends AllocatedPersistentTask implements S
error -> {
if (ExceptionsHelper.unwrapCause(error) instanceof ResourceNotFoundException) {
// The task is not present which means either it has not started yet or it finished.
listener.onResponse(statsHolder.getProgressTracker().getReindexingProgressPercent());
listener.onResponse(isReindexingFinished ? 100 : 0);
} else {
listener.onFailure(error);
}