[ML] only persist progress if it has changed (#62123) (#62180)

* [ML] only persist progress if it has changed

We already search for the previously stored progress document.

For optimization purposes, and to prevent restoring the same
progress after a failed analytics job is stopped,
this commit does an equality check between the previously stored progress and current progress
If the progress has changed, persistence continues as normal.
This commit is contained in:
Benjamin Trent 2020-09-09 12:04:09 -04:00 committed by GitHub
parent f1522fcafc
commit e181e24d48
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 17 additions and 4 deletions

View File

@ -781,7 +781,6 @@ public class ClassificationIT extends MlNativeDataFrameAnalyticsIntegTestCase {
assertThat(getOnlyElement(getAnalytics(jobId)).getDescription(), is(equalTo("updated-description-2"))); assertThat(getOnlyElement(getAnalytics(jobId)).getDescription(), is(equalTo("updated-description-2")));
} }
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/61913")
public void testTooLowConfiguredMemoryStillStarts() throws Exception { public void testTooLowConfiguredMemoryStillStarts() throws Exception {
initialize("low_memory_analysis"); initialize("low_memory_analysis");
indexData(sourceIndex, 10_000, 0, NESTED_FIELD); indexData(sourceIndex, 10_000, 0, NESTED_FIELD);

View File

@ -93,7 +93,6 @@ abstract class MlNativeDataFrameAnalyticsIntegTestCase extends MlNativeIntegTest
for (DataFrameAnalyticsConfig config : analytics) { for (DataFrameAnalyticsConfig config : analytics) {
try { try {
assertThat(deleteAnalytics(config.getId()).isAcknowledged(), is(true)); assertThat(deleteAnalytics(config.getId()).isAcknowledged(), is(true));
assertThat(searchStoredProgress(config.getId()).getHits().getTotalHits().value, equalTo(0L));
} catch (Exception e) { } catch (Exception e) {
// just log and ignore // just log and ignore
logger.error(new ParameterizedMessage("[{}] Could not clean up analytics job config", config.getId()), e); logger.error(new ParameterizedMessage("[{}] Could not clean up analytics job config", config.getId()), e);

View File

@ -49,6 +49,7 @@ import org.elasticsearch.xpack.core.watcher.watch.Payload;
import org.elasticsearch.xpack.ml.dataframe.stats.ProgressTracker; import org.elasticsearch.xpack.ml.dataframe.stats.ProgressTracker;
import org.elasticsearch.xpack.ml.dataframe.stats.StatsHolder; import org.elasticsearch.xpack.ml.dataframe.stats.StatsHolder;
import org.elasticsearch.xpack.ml.notifications.DataFrameAnalyticsAuditor; import org.elasticsearch.xpack.ml.notifications.DataFrameAnalyticsAuditor;
import org.elasticsearch.xpack.ml.utils.persistence.MlParserUtils;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -309,17 +310,31 @@ public class DataFrameAnalyticsTask extends AllocatedPersistentTask implements S
ActionListener<SearchResponse> searchFormerProgressDocListener = ActionListener.wrap( ActionListener<SearchResponse> searchFormerProgressDocListener = ActionListener.wrap(
searchResponse -> { searchResponse -> {
String indexOrAlias = AnomalyDetectorsIndex.jobStateIndexWriteAlias(); String indexOrAlias = AnomalyDetectorsIndex.jobStateIndexWriteAlias();
StoredProgress previous = null;
if (searchResponse.getHits().getHits().length > 0) { if (searchResponse.getHits().getHits().length > 0) {
indexOrAlias = searchResponse.getHits().getHits()[0].getIndex(); indexOrAlias = searchResponse.getHits().getHits()[0].getIndex();
try {
previous = MlParserUtils.parse(searchResponse.getHits().getHits()[0], StoredProgress.PARSER);
} catch (Exception ex) {
LOGGER.warn(new ParameterizedMessage("[{}] failed to parse previously stored progress", jobId), ex);
}
} }
List<PhaseProgress> progress = statsHolder.getProgressTracker().report();
final StoredProgress progressToStore = new StoredProgress(progress);
if (progressToStore.equals(previous)) {
LOGGER.debug("[{}] new progress is the same as previously persisted progress. Skipping storage.", jobId);
runnable.run();
return;
}
IndexRequest indexRequest = new IndexRequest(indexOrAlias) IndexRequest indexRequest = new IndexRequest(indexOrAlias)
.id(progressDocId) .id(progressDocId)
.setRequireAlias(AnomalyDetectorsIndex.jobStateIndexWriteAlias().equals(indexOrAlias)) .setRequireAlias(AnomalyDetectorsIndex.jobStateIndexWriteAlias().equals(indexOrAlias))
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
List<PhaseProgress> progress = statsHolder.getProgressTracker().report();
try (XContentBuilder jsonBuilder = JsonXContent.contentBuilder()) { try (XContentBuilder jsonBuilder = JsonXContent.contentBuilder()) {
LOGGER.debug("[{}] Persisting progress is: {}", jobId, progress); LOGGER.debug("[{}] Persisting progress is: {}", jobId, progress);
new StoredProgress(progress).toXContent(jsonBuilder, Payload.XContent.EMPTY_PARAMS); progressToStore.toXContent(jsonBuilder, Payload.XContent.EMPTY_PARAMS);
indexRequest.source(jsonBuilder); indexRequest.source(jsonBuilder);
} }
executeAsyncWithOrigin(client, ML_ORIGIN, IndexAction.INSTANCE, indexRequest, indexProgressDocListener); executeAsyncWithOrigin(client, ML_ORIGIN, IndexAction.INSTANCE, indexRequest, indexProgressDocListener);