[ML] Delete interim results after job re-opening (elastic/x-pack-elasticsearch#1194)

This is an issue where a bucket can have both interim results and
non-interim results, a bucket should never have both at the same time.
The steps to cause this situation are:

1. Flush a running job and create interim results
2. Close that job (this does not delete interim results)
3. Re-open the job and POST data
4. The job will eventually emit a bucket result which mingles with the
existing interim results

Normally interim results are deleted by AutoDetectResultProcessor when a
bucket is parsed following a flush command. Because of the close and
re-opening of the job AutoDetectResultProcessor no longer knows that a
previous flush command creating interim results.

The fix is to always delete interim results the first time
AutoDetectResultProcessor sees a bucket.

relates elastic/x-pack-elasticsearch#1188

Original commit: elastic/x-pack-elasticsearch@5326455f54
This commit is contained in:
Dimitris Athanasiou 2017-04-25 16:32:58 +01:00 committed by GitHub
parent 019b1f7ece
commit 32128894a5
4 changed files with 137 additions and 4 deletions

View File

@ -14,6 +14,7 @@ import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteAction;
import org.elasticsearch.action.delete.DeleteRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.logging.Loggers;
@ -134,6 +135,7 @@ public class JobDataDeleter {
QueryBuilder qb = QueryBuilders.termQuery(Bucket.IS_INTERIM.getPreferredName(), true);
SearchResponse searchResponse = client.prepareSearch(index)
.setIndicesOptions(IndicesOptions.lenientExpandOpen())
.setTypes(Result.TYPE.getPreferredName())
.setQuery(new ConstantScoreQueryBuilder(qb))
.setFetchSource(false)
@ -141,7 +143,6 @@ public class JobDataDeleter {
.setSize(SCROLL_SIZE)
.get();
String scrollId = searchResponse.getScrollId();
long totalHits = searchResponse.getHits().getTotalHits();
long totalDeletedCount = 0;
while (totalDeletedCount < totalHits) {
@ -152,7 +153,17 @@ public class JobDataDeleter {
++deletedResultCount;
}
searchResponse = client.prepareSearchScroll(scrollId).setScroll(SCROLL_CONTEXT_DURATION).get();
searchResponse = client.prepareSearchScroll(searchResponse.getScrollId()).setScroll(SCROLL_CONTEXT_DURATION).get();
}
clearScroll(searchResponse.getScrollId());
}
private void clearScroll(String scrollId) {
try {
client.prepareClearScroll().addScrollId(scrollId).get();
} catch (Exception e) {
LOGGER.warn("[{}] Error while clearing scroll with id [{}]", jobId, scrollId);
}
}
@ -226,6 +237,7 @@ public class JobDataDeleter {
client.prepareSearchScroll(searchResponse.getScrollId()).setScroll(SCROLL_CONTEXT_DURATION).execute(this);
}
else {
clearScroll(searchResponse.getScrollId());
scrollFinishedListener.onResponse(true);
}
}

View File

@ -271,7 +271,7 @@ public class AutoDetectResultProcessor {
Context(String jobId, boolean isPerPartitionNormalization, JobResultsPersister.Builder bulkResultsPersister) {
this.jobId = jobId;
this.isPerPartitionNormalization = isPerPartitionNormalization;
this.deleteInterimRequired = false;
this.deleteInterimRequired = true;
this.bulkResultsPersister = bulkResultsPersister;
}
}

View File

@ -0,0 +1,122 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.integration;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.xpack.ml.job.config.AnalysisConfig;
import org.elasticsearch.xpack.ml.job.config.DataDescription;
import org.elasticsearch.xpack.ml.job.config.Detector;
import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.ml.job.results.AnomalyRecord;
import org.junit.After;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
/**
* This tests that interim results created before a job was reopened get
* deleted after new buckets are created.
*/
public class InterimResultsDeletedAfterReopeningJobIT extends MlNativeAutodetectIntegTestCase {
@After
public void cleanUpTest() throws Exception {
cleanUp();
}
public void test() throws Exception {
Detector.Builder detector = new Detector.Builder("mean", "value");
detector.setByFieldName("by_field");
AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(
Arrays.asList(detector.build()));
analysisConfig.setBucketSpan(TimeValue.timeValueHours(1));
DataDescription.Builder dataDescription = new DataDescription.Builder();
Job.Builder job = new Job.Builder("interim-results-deleted-after-reopening-job-test");
job.setAnalysisConfig(analysisConfig);
job.setDataDescription(dataDescription);
registerJob(job);
putJob(job);
openJob(job.getId());
long timestamp = 1491004800000L;
int totalBuckets = 2 * 24;
List<String> byFieldValues = Arrays.asList("foo", "bar");
int normalValue = 1000;
List<String> data = new ArrayList<>();
for (int bucket = 0; bucket < totalBuckets; bucket++) {
for (String byFieldValue : byFieldValues) {
data.add(createJsonRecord(createRecord(timestamp, byFieldValue, normalValue)));
}
timestamp += TimeValue.timeValueHours(1).getMillis();
}
data.add(createJsonRecord(createRecord(timestamp, "foo", 1)));
data.add(createJsonRecord(createRecord(timestamp, "bar", 1)));
postData(job.getId(), data.stream().collect(Collectors.joining()));
flushJob(job.getId(), true);
closeJob(job.getId());
// We should have 2 interim records
List<AnomalyRecord> records = getRecords(job.getId());
assertThat(records.size(), equalTo(2));
assertThat(records.stream().allMatch(AnomalyRecord::isInterim), is(true));
// Second batch
data = new ArrayList<>();
// This should fix the mean for 'foo'
data.add(createJsonRecord(createRecord(timestamp, "foo", 2000)));
// Then advance time and send normal data to force creating final results for previous bucket
timestamp += TimeValue.timeValueHours(1).getMillis();
data.add(createJsonRecord(createRecord(timestamp, "foo", normalValue)));
data.add(createJsonRecord(createRecord(timestamp, "bar", normalValue)));
openJob(job.getId());
postData(job.getId(), data.stream().collect(Collectors.joining()));
closeJob(job.getId());
records = getRecords(job.getId());
assertThat(records.size(), equalTo(1));
assertThat(records.stream().allMatch(AnomalyRecord::isInterim), is(false));
// No other interim results either
assertNoInterimResults(job.getId());
}
private static Map<String, Object> createRecord(long timestamp, String byFieldValue, int value) {
Map<String, Object> record = new HashMap<>();
record.put("time", timestamp);
record.put("by_field", byFieldValue);
record.put("value", value);
return record;
}
private static String createJsonRecord(Map<String, Object> keyValueMap) throws IOException {
return JsonXContent.contentBuilder().map(keyValueMap).string() + "\n";
}
private void assertNoInterimResults(String jobId) {
String indexName = AnomalyDetectorsIndex.jobResultsAliasedName(jobId);
SearchResponse search = client().prepareSearch(indexName).setTypes("result").setSize(1000)
.setQuery(QueryBuilders.termQuery("is_interim", true)).get();
assertThat(search.getHits().getTotalHits(), equalTo(0L));
}
}

View File

@ -24,7 +24,6 @@ import java.util.Collections;
import java.util.Date;
import java.util.List;
import static org.elasticsearch.mock.orig.Mockito.never;
import static org.elasticsearch.mock.orig.Mockito.times;
import static org.elasticsearch.mock.orig.Mockito.verify;
import static org.mockito.Matchers.any;