[7.x][ML] Reset reindexing progress when DFA job resumes with incomplete reindexing (#62772) (#62816)

This fixes reindexing progress in the scenario when a DFA job that had not finished
reindexing is resumed (either because the user called stop and start or because the
job was reassigned in the middle of reindexing). Before the fix reindexing progress
stays to the value it had reached before until it surpasses that value.

When we resume a data frame analytics job we want to preserve reindexing progress
and reset all other phases. Except for when reindexing was not completed.
In that case we are deleting the destination index and starting reindexing
from scratch. Thus we need to reset reindexing progress too.

Backport of #62772
This commit is contained in:
Dimitris Athanasiou 2020-09-23 14:09:04 +03:00 committed by GitHub
parent 054a950ceb
commit 69e72656fa
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 41 additions and 14 deletions

View File

@ -90,10 +90,7 @@ public class DataFrameAnalyticsManager {
// With config in hand, determine action to take // With config in hand, determine action to take
ActionListener<DataFrameAnalyticsConfig> configListener = ActionListener.wrap( ActionListener<DataFrameAnalyticsConfig> configListener = ActionListener.wrap(
config -> { config -> {
// At this point we have the config at hand and we can reset the progress tracker task.getStatsHolder().adjustProgressTracker(config.getAnalysis().getProgressPhases(),
// to use the analyses phases. We preserve reindexing progress as if reindexing was
// finished it will not be reset.
task.getStatsHolder().resetProgressTrackerPreservingReindexingProgress(config.getAnalysis().getProgressPhases(),
config.getAnalysis().supportsInference()); config.getAnalysis().supportsInference());
switch(currentState) { switch(currentState) {

View File

@ -30,10 +30,14 @@ public class StatsHolder {
dataCountsTracker = new DataCountsTracker(); dataCountsTracker = new DataCountsTracker();
} }
public void resetProgressTrackerPreservingReindexingProgress(List<String> analysisPhases, boolean hasInferencePhase) { public void adjustProgressTracker(List<String> analysisPhases, boolean hasInferencePhase) {
int reindexingProgressPercent = progressTracker.getReindexingProgressPercent(); int reindexingProgressPercent = progressTracker.getReindexingProgressPercent();
progressTracker = ProgressTracker.fromZeroes(analysisPhases, hasInferencePhase); progressTracker = ProgressTracker.fromZeroes(analysisPhases, hasInferencePhase);
progressTracker.updateReindexingProgress(reindexingProgressPercent);
// If reindexing progress was less than 100 (ie not complete) we reset it to 1
// as we will have to do reindexing from scratch and at the same time we want
// to differentiate from a job that has never started before.
progressTracker.updateReindexingProgress(reindexingProgressPercent < 100 ? 1 : reindexingProgressPercent);
} }
public ProgressTracker getProgressTracker() { public ProgressTracker getProgressTracker() {

View File

@ -19,10 +19,10 @@ import static org.hamcrest.Matchers.equalTo;
public class StatsHolderTests extends ESTestCase { public class StatsHolderTests extends ESTestCase {
public void testResetProgressTrackerPreservingReindexingProgress_GivenSameAnalysisPhases() { public void testAdjustProgressTracker_GivenSameAnalysisPhases() {
List<PhaseProgress> phases = Collections.unmodifiableList( List<PhaseProgress> phases = Collections.unmodifiableList(
Arrays.asList( Arrays.asList(
new org.elasticsearch.xpack.core.ml.utils.PhaseProgress("reindexing", 10), new org.elasticsearch.xpack.core.ml.utils.PhaseProgress("reindexing", 100),
new org.elasticsearch.xpack.core.ml.utils.PhaseProgress("loading_data", 20), new org.elasticsearch.xpack.core.ml.utils.PhaseProgress("loading_data", 20),
new org.elasticsearch.xpack.core.ml.utils.PhaseProgress("a", 30), new org.elasticsearch.xpack.core.ml.utils.PhaseProgress("a", 30),
new org.elasticsearch.xpack.core.ml.utils.PhaseProgress("b", 40), new org.elasticsearch.xpack.core.ml.utils.PhaseProgress("b", 40),
@ -31,24 +31,24 @@ public class StatsHolderTests extends ESTestCase {
); );
StatsHolder statsHolder = new StatsHolder(phases); StatsHolder statsHolder = new StatsHolder(phases);
statsHolder.resetProgressTrackerPreservingReindexingProgress(Arrays.asList("a", "b"), false); statsHolder.adjustProgressTracker(Arrays.asList("a", "b"), false);
List<PhaseProgress> phaseProgresses = statsHolder.getProgressTracker().report(); List<PhaseProgress> phaseProgresses = statsHolder.getProgressTracker().report();
assertThat(phaseProgresses.size(), equalTo(5)); assertThat(phaseProgresses.size(), equalTo(5));
assertThat(phaseProgresses.stream().map(PhaseProgress::getPhase).collect(Collectors.toList()), assertThat(phaseProgresses.stream().map(PhaseProgress::getPhase).collect(Collectors.toList()),
contains("reindexing", "loading_data", "a", "b", "writing_results")); contains("reindexing", "loading_data", "a", "b", "writing_results"));
assertThat(phaseProgresses.get(0).getProgressPercent(), equalTo(10)); assertThat(phaseProgresses.get(0).getProgressPercent(), equalTo(100));
assertThat(phaseProgresses.get(1).getProgressPercent(), equalTo(0)); assertThat(phaseProgresses.get(1).getProgressPercent(), equalTo(0));
assertThat(phaseProgresses.get(2).getProgressPercent(), equalTo(0)); assertThat(phaseProgresses.get(2).getProgressPercent(), equalTo(0));
assertThat(phaseProgresses.get(3).getProgressPercent(), equalTo(0)); assertThat(phaseProgresses.get(3).getProgressPercent(), equalTo(0));
assertThat(phaseProgresses.get(4).getProgressPercent(), equalTo(0)); assertThat(phaseProgresses.get(4).getProgressPercent(), equalTo(0));
} }
public void testResetProgressTrackerPreservingReindexingProgress_GivenDifferentAnalysisPhases() { public void testAdjustProgressTracker_GivenDifferentAnalysisPhases() {
List<PhaseProgress> phases = Collections.unmodifiableList( List<PhaseProgress> phases = Collections.unmodifiableList(
Arrays.asList( Arrays.asList(
new org.elasticsearch.xpack.core.ml.utils.PhaseProgress("reindexing", 10), new org.elasticsearch.xpack.core.ml.utils.PhaseProgress("reindexing", 100),
new org.elasticsearch.xpack.core.ml.utils.PhaseProgress("loading_data", 20), new org.elasticsearch.xpack.core.ml.utils.PhaseProgress("loading_data", 20),
new org.elasticsearch.xpack.core.ml.utils.PhaseProgress("a", 30), new org.elasticsearch.xpack.core.ml.utils.PhaseProgress("a", 30),
new org.elasticsearch.xpack.core.ml.utils.PhaseProgress("b", 40), new org.elasticsearch.xpack.core.ml.utils.PhaseProgress("b", 40),
@ -57,14 +57,40 @@ public class StatsHolderTests extends ESTestCase {
); );
StatsHolder statsHolder = new StatsHolder(phases); StatsHolder statsHolder = new StatsHolder(phases);
statsHolder.resetProgressTrackerPreservingReindexingProgress(Arrays.asList("c", "d"), false); statsHolder.adjustProgressTracker(Arrays.asList("c", "d"), false);
List<PhaseProgress> phaseProgresses = statsHolder.getProgressTracker().report(); List<PhaseProgress> phaseProgresses = statsHolder.getProgressTracker().report();
assertThat(phaseProgresses.size(), equalTo(5)); assertThat(phaseProgresses.size(), equalTo(5));
assertThat(phaseProgresses.stream().map(PhaseProgress::getPhase).collect(Collectors.toList()), assertThat(phaseProgresses.stream().map(PhaseProgress::getPhase).collect(Collectors.toList()),
contains("reindexing", "loading_data", "c", "d", "writing_results")); contains("reindexing", "loading_data", "c", "d", "writing_results"));
assertThat(phaseProgresses.get(0).getProgressPercent(), equalTo(10)); assertThat(phaseProgresses.get(0).getProgressPercent(), equalTo(100));
assertThat(phaseProgresses.get(1).getProgressPercent(), equalTo(0));
assertThat(phaseProgresses.get(2).getProgressPercent(), equalTo(0));
assertThat(phaseProgresses.get(3).getProgressPercent(), equalTo(0));
assertThat(phaseProgresses.get(4).getProgressPercent(), equalTo(0));
}
public void testAdjustProgressTracker_GivenReindexingProgressIncomplete() {
List<PhaseProgress> phases = Collections.unmodifiableList(
Arrays.asList(
new org.elasticsearch.xpack.core.ml.utils.PhaseProgress("reindexing", 42),
new org.elasticsearch.xpack.core.ml.utils.PhaseProgress("loading_data", 20),
new org.elasticsearch.xpack.core.ml.utils.PhaseProgress("a", 30),
new org.elasticsearch.xpack.core.ml.utils.PhaseProgress("b", 40),
new PhaseProgress("writing_results", 50)
)
);
StatsHolder statsHolder = new StatsHolder(phases);
statsHolder.adjustProgressTracker(Arrays.asList("a", "b"), false);
List<PhaseProgress> phaseProgresses = statsHolder.getProgressTracker().report();
assertThat(phaseProgresses.size(), equalTo(5));
assertThat(phaseProgresses.stream().map(PhaseProgress::getPhase).collect(Collectors.toList()),
contains("reindexing", "loading_data", "a", "b", "writing_results"));
assertThat(phaseProgresses.get(0).getProgressPercent(), equalTo(1));
assertThat(phaseProgresses.get(1).getProgressPercent(), equalTo(0)); assertThat(phaseProgresses.get(1).getProgressPercent(), equalTo(0));
assertThat(phaseProgresses.get(2).getProgressPercent(), equalTo(0)); assertThat(phaseProgresses.get(2).getProgressPercent(), equalTo(0));
assertThat(phaseProgresses.get(3).getProgressPercent(), equalTo(0)); assertThat(phaseProgresses.get(3).getProgressPercent(), equalTo(0));