diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsManager.java index aa84fda1f70..a096eae3345 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsManager.java @@ -90,10 +90,7 @@ public class DataFrameAnalyticsManager { // With config in hand, determine action to take ActionListener configListener = ActionListener.wrap( config -> { - // At this point we have the config at hand and we can reset the progress tracker - // to use the analyses phases. We preserve reindexing progress as if reindexing was - // finished it will not be reset. - task.getStatsHolder().resetProgressTrackerPreservingReindexingProgress(config.getAnalysis().getProgressPhases(), + task.getStatsHolder().adjustProgressTracker(config.getAnalysis().getProgressPhases(), config.getAnalysis().supportsInference()); switch(currentState) { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/stats/StatsHolder.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/stats/StatsHolder.java index 2adb206a5d0..0ae94d9b624 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/stats/StatsHolder.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/stats/StatsHolder.java @@ -30,10 +30,14 @@ public class StatsHolder { dataCountsTracker = new DataCountsTracker(); } - public void resetProgressTrackerPreservingReindexingProgress(List analysisPhases, boolean hasInferencePhase) { + public void adjustProgressTracker(List analysisPhases, boolean hasInferencePhase) { int reindexingProgressPercent = progressTracker.getReindexingProgressPercent(); progressTracker = ProgressTracker.fromZeroes(analysisPhases, hasInferencePhase); - progressTracker.updateReindexingProgress(reindexingProgressPercent); + + // If reindexing progress was less than 100 (ie not complete) we reset it to 1 + // as we will have to do reindexing from scratch and at the same time we want + // to differentiate from a job that has never started before. + progressTracker.updateReindexingProgress(reindexingProgressPercent < 100 ? 1 : reindexingProgressPercent); } public ProgressTracker getProgressTracker() { diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/stats/StatsHolderTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/stats/StatsHolderTests.java index f71afeabced..d321da67deb 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/stats/StatsHolderTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/stats/StatsHolderTests.java @@ -19,10 +19,10 @@ import static org.hamcrest.Matchers.equalTo; public class StatsHolderTests extends ESTestCase { - public void testResetProgressTrackerPreservingReindexingProgress_GivenSameAnalysisPhases() { + public void testAdjustProgressTracker_GivenSameAnalysisPhases() { List phases = Collections.unmodifiableList( Arrays.asList( - new org.elasticsearch.xpack.core.ml.utils.PhaseProgress("reindexing", 10), + new org.elasticsearch.xpack.core.ml.utils.PhaseProgress("reindexing", 100), new org.elasticsearch.xpack.core.ml.utils.PhaseProgress("loading_data", 20), new org.elasticsearch.xpack.core.ml.utils.PhaseProgress("a", 30), new org.elasticsearch.xpack.core.ml.utils.PhaseProgress("b", 40), @@ -31,24 +31,24 @@ public class StatsHolderTests extends ESTestCase { ); StatsHolder statsHolder = new StatsHolder(phases); - statsHolder.resetProgressTrackerPreservingReindexingProgress(Arrays.asList("a", "b"), false); + statsHolder.adjustProgressTracker(Arrays.asList("a", "b"), false); List phaseProgresses = statsHolder.getProgressTracker().report(); assertThat(phaseProgresses.size(), equalTo(5)); assertThat(phaseProgresses.stream().map(PhaseProgress::getPhase).collect(Collectors.toList()), contains("reindexing", "loading_data", "a", "b", "writing_results")); - assertThat(phaseProgresses.get(0).getProgressPercent(), equalTo(10)); + assertThat(phaseProgresses.get(0).getProgressPercent(), equalTo(100)); assertThat(phaseProgresses.get(1).getProgressPercent(), equalTo(0)); assertThat(phaseProgresses.get(2).getProgressPercent(), equalTo(0)); assertThat(phaseProgresses.get(3).getProgressPercent(), equalTo(0)); assertThat(phaseProgresses.get(4).getProgressPercent(), equalTo(0)); } - public void testResetProgressTrackerPreservingReindexingProgress_GivenDifferentAnalysisPhases() { + public void testAdjustProgressTracker_GivenDifferentAnalysisPhases() { List phases = Collections.unmodifiableList( Arrays.asList( - new org.elasticsearch.xpack.core.ml.utils.PhaseProgress("reindexing", 10), + new org.elasticsearch.xpack.core.ml.utils.PhaseProgress("reindexing", 100), new org.elasticsearch.xpack.core.ml.utils.PhaseProgress("loading_data", 20), new org.elasticsearch.xpack.core.ml.utils.PhaseProgress("a", 30), new org.elasticsearch.xpack.core.ml.utils.PhaseProgress("b", 40), @@ -57,14 +57,40 @@ public class StatsHolderTests extends ESTestCase { ); StatsHolder statsHolder = new StatsHolder(phases); - statsHolder.resetProgressTrackerPreservingReindexingProgress(Arrays.asList("c", "d"), false); + statsHolder.adjustProgressTracker(Arrays.asList("c", "d"), false); List phaseProgresses = statsHolder.getProgressTracker().report(); assertThat(phaseProgresses.size(), equalTo(5)); assertThat(phaseProgresses.stream().map(PhaseProgress::getPhase).collect(Collectors.toList()), contains("reindexing", "loading_data", "c", "d", "writing_results")); - assertThat(phaseProgresses.get(0).getProgressPercent(), equalTo(10)); + assertThat(phaseProgresses.get(0).getProgressPercent(), equalTo(100)); + assertThat(phaseProgresses.get(1).getProgressPercent(), equalTo(0)); + assertThat(phaseProgresses.get(2).getProgressPercent(), equalTo(0)); + assertThat(phaseProgresses.get(3).getProgressPercent(), equalTo(0)); + assertThat(phaseProgresses.get(4).getProgressPercent(), equalTo(0)); + } + + public void testAdjustProgressTracker_GivenReindexingProgressIncomplete() { + List phases = Collections.unmodifiableList( + Arrays.asList( + new org.elasticsearch.xpack.core.ml.utils.PhaseProgress("reindexing", 42), + new org.elasticsearch.xpack.core.ml.utils.PhaseProgress("loading_data", 20), + new org.elasticsearch.xpack.core.ml.utils.PhaseProgress("a", 30), + new org.elasticsearch.xpack.core.ml.utils.PhaseProgress("b", 40), + new PhaseProgress("writing_results", 50) + ) + ); + StatsHolder statsHolder = new StatsHolder(phases); + + statsHolder.adjustProgressTracker(Arrays.asList("a", "b"), false); + + List phaseProgresses = statsHolder.getProgressTracker().report(); + + assertThat(phaseProgresses.size(), equalTo(5)); + assertThat(phaseProgresses.stream().map(PhaseProgress::getPhase).collect(Collectors.toList()), + contains("reindexing", "loading_data", "a", "b", "writing_results")); + assertThat(phaseProgresses.get(0).getProgressPercent(), equalTo(1)); assertThat(phaseProgresses.get(1).getProgressPercent(), equalTo(0)); assertThat(phaseProgresses.get(2).getProgressPercent(), equalTo(0)); assertThat(phaseProgresses.get(3).getProgressPercent(), equalTo(0));