From 32128894a588b68676e88ec372a31a4f2c0b1205 Mon Sep 17 00:00:00 2001 From: Dimitris Athanasiou Date: Tue, 25 Apr 2017 16:32:58 +0100 Subject: [PATCH] [ML] Delete interim results after job re-opening (elastic/x-pack-elasticsearch#1194) This is an issue where a bucket can have both interim results and non-interim results, a bucket should never have both at the same time. The steps to cause this situation are: 1. Flush a running job and create interim results 2. Close that job (this does not delete interim results) 3. Re-open the job and POST data 4. The job will eventually emit a bucket result which mingles with the existing interim results Normally interim results are deleted by AutoDetectResultProcessor when a bucket is parsed following a flush command. Because of the close and re-opening of the job AutoDetectResultProcessor no longer knows that a previous flush command creating interim results. The fix is to always delete interim results the first time AutoDetectResultProcessor sees a bucket. relates elastic/x-pack-elasticsearch#1188 Original commit: elastic/x-pack-elasticsearch@5326455f54b6296d66350d89a8b87a8105527a45 --- .../ml/job/persistence/JobDataDeleter.java | 16 ++- .../output/AutoDetectResultProcessor.java | 2 +- ...erimResultsDeletedAfterReopeningJobIT.java | 122 ++++++++++++++++++ .../job/persistence/JobDataDeleterTests.java | 1 - 4 files changed, 137 insertions(+), 4 deletions(-) create mode 100644 plugin/src/test/java/org/elasticsearch/xpack/ml/integration/InterimResultsDeletedAfterReopeningJobIT.java diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobDataDeleter.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobDataDeleter.java index 558b2b594f8..010121f2725 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobDataDeleter.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobDataDeleter.java @@ -14,6 +14,7 @@ import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.delete.DeleteAction; import org.elasticsearch.action.delete.DeleteRequestBuilder; import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.client.Client; import org.elasticsearch.common.logging.Loggers; @@ -134,6 +135,7 @@ public class JobDataDeleter { QueryBuilder qb = QueryBuilders.termQuery(Bucket.IS_INTERIM.getPreferredName(), true); SearchResponse searchResponse = client.prepareSearch(index) + .setIndicesOptions(IndicesOptions.lenientExpandOpen()) .setTypes(Result.TYPE.getPreferredName()) .setQuery(new ConstantScoreQueryBuilder(qb)) .setFetchSource(false) @@ -141,7 +143,6 @@ public class JobDataDeleter { .setSize(SCROLL_SIZE) .get(); - String scrollId = searchResponse.getScrollId(); long totalHits = searchResponse.getHits().getTotalHits(); long totalDeletedCount = 0; while (totalDeletedCount < totalHits) { @@ -152,7 +153,17 @@ public class JobDataDeleter { ++deletedResultCount; } - searchResponse = client.prepareSearchScroll(scrollId).setScroll(SCROLL_CONTEXT_DURATION).get(); + searchResponse = client.prepareSearchScroll(searchResponse.getScrollId()).setScroll(SCROLL_CONTEXT_DURATION).get(); + } + + clearScroll(searchResponse.getScrollId()); + } + + private void clearScroll(String scrollId) { + try { + client.prepareClearScroll().addScrollId(scrollId).get(); + } catch (Exception e) { + LOGGER.warn("[{}] Error while clearing scroll with id [{}]", jobId, scrollId); } } @@ -226,6 +237,7 @@ public class JobDataDeleter { client.prepareSearchScroll(searchResponse.getScrollId()).setScroll(SCROLL_CONTEXT_DURATION).execute(this); } else { + clearScroll(searchResponse.getScrollId()); scrollFinishedListener.onResponse(true); } } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessor.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessor.java index f422744de92..80f9e84258a 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessor.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessor.java @@ -271,7 +271,7 @@ public class AutoDetectResultProcessor { Context(String jobId, boolean isPerPartitionNormalization, JobResultsPersister.Builder bulkResultsPersister) { this.jobId = jobId; this.isPerPartitionNormalization = isPerPartitionNormalization; - this.deleteInterimRequired = false; + this.deleteInterimRequired = true; this.bulkResultsPersister = bulkResultsPersister; } } diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/InterimResultsDeletedAfterReopeningJobIT.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/InterimResultsDeletedAfterReopeningJobIT.java new file mode 100644 index 00000000000..33650010c75 --- /dev/null +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/InterimResultsDeletedAfterReopeningJobIT.java @@ -0,0 +1,122 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.integration; + +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.json.JsonXContent; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.xpack.ml.job.config.AnalysisConfig; +import org.elasticsearch.xpack.ml.job.config.DataDescription; +import org.elasticsearch.xpack.ml.job.config.Detector; +import org.elasticsearch.xpack.ml.job.config.Job; +import org.elasticsearch.xpack.ml.job.persistence.AnomalyDetectorsIndex; +import org.elasticsearch.xpack.ml.job.results.AnomalyRecord; +import org.junit.After; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; + +/** + * This tests that interim results created before a job was reopened get + * deleted after new buckets are created. + */ +public class InterimResultsDeletedAfterReopeningJobIT extends MlNativeAutodetectIntegTestCase { + + @After + public void cleanUpTest() throws Exception { + cleanUp(); + } + + public void test() throws Exception { + Detector.Builder detector = new Detector.Builder("mean", "value"); + detector.setByFieldName("by_field"); + + AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder( + Arrays.asList(detector.build())); + analysisConfig.setBucketSpan(TimeValue.timeValueHours(1)); + DataDescription.Builder dataDescription = new DataDescription.Builder(); + Job.Builder job = new Job.Builder("interim-results-deleted-after-reopening-job-test"); + job.setAnalysisConfig(analysisConfig); + job.setDataDescription(dataDescription); + + registerJob(job); + putJob(job); + openJob(job.getId()); + + long timestamp = 1491004800000L; + int totalBuckets = 2 * 24; + List byFieldValues = Arrays.asList("foo", "bar"); + int normalValue = 1000; + List data = new ArrayList<>(); + for (int bucket = 0; bucket < totalBuckets; bucket++) { + for (String byFieldValue : byFieldValues) { + data.add(createJsonRecord(createRecord(timestamp, byFieldValue, normalValue))); + } + timestamp += TimeValue.timeValueHours(1).getMillis(); + } + + data.add(createJsonRecord(createRecord(timestamp, "foo", 1))); + data.add(createJsonRecord(createRecord(timestamp, "bar", 1))); + postData(job.getId(), data.stream().collect(Collectors.joining())); + flushJob(job.getId(), true); + closeJob(job.getId()); + + // We should have 2 interim records + List records = getRecords(job.getId()); + assertThat(records.size(), equalTo(2)); + assertThat(records.stream().allMatch(AnomalyRecord::isInterim), is(true)); + + // Second batch + data = new ArrayList<>(); + + // This should fix the mean for 'foo' + data.add(createJsonRecord(createRecord(timestamp, "foo", 2000))); + + // Then advance time and send normal data to force creating final results for previous bucket + timestamp += TimeValue.timeValueHours(1).getMillis(); + data.add(createJsonRecord(createRecord(timestamp, "foo", normalValue))); + data.add(createJsonRecord(createRecord(timestamp, "bar", normalValue))); + + openJob(job.getId()); + postData(job.getId(), data.stream().collect(Collectors.joining())); + closeJob(job.getId()); + + records = getRecords(job.getId()); + assertThat(records.size(), equalTo(1)); + assertThat(records.stream().allMatch(AnomalyRecord::isInterim), is(false)); + + // No other interim results either + assertNoInterimResults(job.getId()); + } + + private static Map createRecord(long timestamp, String byFieldValue, int value) { + Map record = new HashMap<>(); + record.put("time", timestamp); + record.put("by_field", byFieldValue); + record.put("value", value); + return record; + } + + private static String createJsonRecord(Map keyValueMap) throws IOException { + return JsonXContent.contentBuilder().map(keyValueMap).string() + "\n"; + } + + private void assertNoInterimResults(String jobId) { + String indexName = AnomalyDetectorsIndex.jobResultsAliasedName(jobId); + SearchResponse search = client().prepareSearch(indexName).setTypes("result").setSize(1000) + .setQuery(QueryBuilders.termQuery("is_interim", true)).get(); + assertThat(search.getHits().getTotalHits(), equalTo(0L)); + } +} diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobDataDeleterTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobDataDeleterTests.java index 3dc40aa4ab3..ac1f97af798 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobDataDeleterTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobDataDeleterTests.java @@ -24,7 +24,6 @@ import java.util.Collections; import java.util.Date; import java.util.List; -import static org.elasticsearch.mock.orig.Mockito.never; import static org.elasticsearch.mock.orig.Mockito.times; import static org.elasticsearch.mock.orig.Mockito.verify; import static org.mockito.Matchers.any;