From e181e24d482b81836af41e3677ed999929d3e9b4 Mon Sep 17 00:00:00 2001 From: Benjamin Trent Date: Wed, 9 Sep 2020 12:04:09 -0400 Subject: [PATCH] [ML] only persist progress if it has changed (#62123) (#62180) * [ML] only persist progress if it has changed We already search for the previously stored progress document. For optimization purposes, and to prevent restoring the same progress after a failed analytics job is stopped, this commit does an equality check between the previously stored progress and current progress If the progress has changed, persistence continues as normal. --- .../ml/integration/ClassificationIT.java | 1 - ...NativeDataFrameAnalyticsIntegTestCase.java | 1 - .../ml/dataframe/DataFrameAnalyticsTask.java | 19 +++++++++++++++++-- 3 files changed, 17 insertions(+), 4 deletions(-) diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/ClassificationIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/ClassificationIT.java index 4830b57f6df..841a09a549a 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/ClassificationIT.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/ClassificationIT.java @@ -781,7 +781,6 @@ public class ClassificationIT extends MlNativeDataFrameAnalyticsIntegTestCase { assertThat(getOnlyElement(getAnalytics(jobId)).getDescription(), is(equalTo("updated-description-2"))); } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/61913") public void testTooLowConfiguredMemoryStillStarts() throws Exception { initialize("low_memory_analysis"); indexData(sourceIndex, 10_000, 0, NESTED_FIELD); diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/MlNativeDataFrameAnalyticsIntegTestCase.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/MlNativeDataFrameAnalyticsIntegTestCase.java index 8fdaead6fdf..30f96c351da 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/MlNativeDataFrameAnalyticsIntegTestCase.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/MlNativeDataFrameAnalyticsIntegTestCase.java @@ -93,7 +93,6 @@ abstract class MlNativeDataFrameAnalyticsIntegTestCase extends MlNativeIntegTest for (DataFrameAnalyticsConfig config : analytics) { try { assertThat(deleteAnalytics(config.getId()).isAcknowledged(), is(true)); - assertThat(searchStoredProgress(config.getId()).getHits().getTotalHits().value, equalTo(0L)); } catch (Exception e) { // just log and ignore logger.error(new ParameterizedMessage("[{}] Could not clean up analytics job config", config.getId()), e); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsTask.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsTask.java index 334f7dd2f2e..e8688df5dee 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsTask.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsTask.java @@ -49,6 +49,7 @@ import org.elasticsearch.xpack.core.watcher.watch.Payload; import org.elasticsearch.xpack.ml.dataframe.stats.ProgressTracker; import org.elasticsearch.xpack.ml.dataframe.stats.StatsHolder; import org.elasticsearch.xpack.ml.notifications.DataFrameAnalyticsAuditor; +import org.elasticsearch.xpack.ml.utils.persistence.MlParserUtils; import java.util.List; import java.util.Map; @@ -309,17 +310,31 @@ public class DataFrameAnalyticsTask extends AllocatedPersistentTask implements S ActionListener searchFormerProgressDocListener = ActionListener.wrap( searchResponse -> { String indexOrAlias = AnomalyDetectorsIndex.jobStateIndexWriteAlias(); + StoredProgress previous = null; if (searchResponse.getHits().getHits().length > 0) { indexOrAlias = searchResponse.getHits().getHits()[0].getIndex(); + try { + previous = MlParserUtils.parse(searchResponse.getHits().getHits()[0], StoredProgress.PARSER); + } catch (Exception ex) { + LOGGER.warn(new ParameterizedMessage("[{}] failed to parse previously stored progress", jobId), ex); + } } + + List progress = statsHolder.getProgressTracker().report(); + final StoredProgress progressToStore = new StoredProgress(progress); + if (progressToStore.equals(previous)) { + LOGGER.debug("[{}] new progress is the same as previously persisted progress. Skipping storage.", jobId); + runnable.run(); + return; + } + IndexRequest indexRequest = new IndexRequest(indexOrAlias) .id(progressDocId) .setRequireAlias(AnomalyDetectorsIndex.jobStateIndexWriteAlias().equals(indexOrAlias)) .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); - List progress = statsHolder.getProgressTracker().report(); try (XContentBuilder jsonBuilder = JsonXContent.contentBuilder()) { LOGGER.debug("[{}] Persisting progress is: {}", jobId, progress); - new StoredProgress(progress).toXContent(jsonBuilder, Payload.XContent.EMPTY_PARAMS); + progressToStore.toXContent(jsonBuilder, Payload.XContent.EMPTY_PARAMS); indexRequest.source(jsonBuilder); } executeAsyncWithOrigin(client, ML_ORIGIN, IndexAction.INSTANCE, indexRequest, indexProgressDocListener);