diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/RevertModelSnapshotIT.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/RevertModelSnapshotIT.java index 2d423e48d30..ec53eb0d29f 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/RevertModelSnapshotIT.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/RevertModelSnapshotIT.java @@ -7,6 +7,8 @@ package org.elasticsearch.xpack.ml.integration; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.common.xcontent.json.JsonXContent; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.search.SearchHits; @@ -17,6 +19,7 @@ import org.elasticsearch.xpack.ml.job.config.Job; import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSizeStats; import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSnapshot; import org.elasticsearch.xpack.ml.job.process.autodetect.state.Quantiles; +import org.elasticsearch.xpack.ml.job.results.Bucket; import org.junit.After; import java.io.IOException; @@ -31,6 +34,7 @@ import java.util.stream.Collectors; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.not; /** @@ -49,14 +53,19 @@ public class RevertModelSnapshotIT extends MlNativeAutodetectIntegTestCase { TimeValue bucketSpan = TimeValue.timeValueHours(1); long startTime = 1491004800000L; - Job.Builder job = buildAndRegisterJob("revert-model-snapshot-split-it-job", bucketSpan); + Job.Builder job = buildAndRegisterJob("revert-model-snapshot-it-job", bucketSpan); openJob(job.getId()); postData(job.getId(), generateData(startTime, bucketSpan, 10, Arrays.asList("foo"), (bucketIndex, series) -> bucketIndex == 5 ? 100.0 : 10.0).stream().collect(Collectors.joining())); + flushJob(job.getId(), true); closeJob(job.getId()); ModelSizeStats modelSizeStats1 = getJobStats(job.getId()).get(0).getModelSizeStats(); - String quantiles1 = getQuantiles(job.getId()); + Quantiles quantiles1 = getQuantiles(job.getId()); + + List midwayBuckets = getBuckets(job.getId()); + Bucket revertPointBucket = midwayBuckets.get(midwayBuckets.size() - 1); + assertThat(revertPointBucket.isInterim(), is(true)); // We need to wait a second to ensure the second time around model snapshot will have a different ID (it depends on epoch seconds) awaitBusy(() -> false, 1, TimeUnit.SECONDS); @@ -67,7 +76,7 @@ public class RevertModelSnapshotIT extends MlNativeAutodetectIntegTestCase { closeJob(job.getId()); ModelSizeStats modelSizeStats2 = getJobStats(job.getId()).get(0).getModelSizeStats(); - String quantiles2 = getQuantiles(job.getId()); + Quantiles quantiles2 = getQuantiles(job.getId()); // Check model has grown since a new series was introduced assertThat(modelSizeStats2.getModelBytes(), greaterThan(modelSizeStats1.getModelBytes())); @@ -75,6 +84,10 @@ public class RevertModelSnapshotIT extends MlNativeAutodetectIntegTestCase { // Check quantiles have changed assertThat(quantiles2, not(equalTo(quantiles1))); + List finalPreRevertBuckets = getBuckets(job.getId()); + Bucket finalPreRevertPointBucket = finalPreRevertBuckets.get(midwayBuckets.size() - 1); + assertThat(finalPreRevertPointBucket.isInterim(), is(false)); + List modelSnapshots = getModelSnapshots(job.getId()); assertThat(modelSnapshots.size(), equalTo(2)); @@ -89,7 +102,19 @@ public class RevertModelSnapshotIT extends MlNativeAutodetectIntegTestCase { assertThat(getJobStats(job.getId()).get(0).getModelSizeStats().getModelBytes(), equalTo(modelSizeStats1.getModelBytes())); // Check quantiles have been reverted - assertThat(getQuantiles(job.getId()), equalTo(quantiles1)); + assertThat(getQuantiles(job.getId()).getTimestamp(), equalTo(revertSnapshot.getLatestResultTimeStamp())); + + // Re-run 2nd half of data + openJob(job.getId()); + postData(job.getId(), generateData(startTime + 10 * bucketSpan.getMillis(), bucketSpan, 10, Arrays.asList("foo", "bar"), + (bucketIndex, series) -> 10.0).stream().collect(Collectors.joining())); + closeJob(job.getId()); + + List finalPostRevertBuckets = getBuckets(job.getId()); + Bucket finalPostRevertPointBucket = finalPostRevertBuckets.get(midwayBuckets.size() - 1); + assertThat(finalPostRevertPointBucket.getTimestamp(), equalTo(finalPreRevertPointBucket.getTimestamp())); + assertThat(finalPostRevertPointBucket.getAnomalyScore(), equalTo(finalPreRevertPointBucket.getAnomalyScore())); + assertThat(finalPostRevertPointBucket.getEventCount(), equalTo(finalPreRevertPointBucket.getEventCount())); } private Job.Builder buildAndRegisterJob(String jobId, TimeValue bucketSpan) throws Exception { @@ -117,19 +142,30 @@ public class RevertModelSnapshotIT extends MlNativeAutodetectIntegTestCase { record.put("value", timeAndSeriesToValueFunction.apply(i, field)); record.put("series", field); data.add(createJsonRecord(record)); + + record = new HashMap<>(); + record.put("time", now + bucketSpan.getMillis() / 2); + record.put("value", timeAndSeriesToValueFunction.apply(i, field)); + record.put("series", field); + data.add(createJsonRecord(record)); } now += bucketSpan.getMillis(); } return data; } - private String getQuantiles(String jobId) { + private Quantiles getQuantiles(String jobId) { SearchResponse response = client().prepareSearch(".ml-state") .setQuery(QueryBuilders.idsQuery().addIds(Quantiles.documentId(jobId))) .setSize(1) .get(); SearchHits hits = response.getHits(); assertThat(hits.getTotalHits(), equalTo(1L)); - return hits.getAt(0).getSourceAsString(); + try { + XContentParser parser = JsonXContent.jsonXContent.createParser(null, hits.getAt(0).getSourceAsString()); + return Quantiles.PARSER.apply(parser, null); + } catch (IOException e) { + throw new IllegalStateException(e); + } } }