From 23d5f7babf42397172778e69c9d02602e99487c2 Mon Sep 17 00:00:00 2001 From: Ed Savage <32410745+edsavage@users.noreply.github.com> Date: Thu, 21 Mar 2019 17:01:10 +0000 Subject: [PATCH] [ML] Add integration tests to check persistence (#40272) (#40315) Additional checks to exercise the behaviour of persistence on graceful close of an anomaly job. Related to elastic/ml-cpp#393 Backports #40272 --- .../xpack/ml/integration/PersistJobIT.java | 155 +++++++++++++++++- 1 file changed, 150 insertions(+), 5 deletions(-) diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/PersistJobIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/PersistJobIT.java index 6f885744b21..0d84822f8a8 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/PersistJobIT.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/PersistJobIT.java @@ -5,12 +5,17 @@ */ package org.elasticsearch.xpack.ml.integration; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.xpack.core.ml.action.FlushJobAction; import org.elasticsearch.xpack.core.ml.action.PersistJobAction; import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig; import org.elasticsearch.xpack.core.ml.job.config.DataDescription; import org.elasticsearch.xpack.core.ml.job.config.Detector; import org.elasticsearch.xpack.core.ml.job.config.Job; +import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot; import org.junit.After; @@ -18,8 +23,14 @@ import java.util.Arrays; import java.util.List; import java.util.stream.Collectors; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; + public class PersistJobIT extends MlNativeAutodetectIntegTestCase { + private static final long BUCKET_SPAN_SECONDS = 300; + private static final TimeValue BUCKET_SPAN = TimeValue.timeValueSeconds(BUCKET_SPAN_SECONDS); + @After public void cleanUpJobs() { cleanUp(); @@ -39,11 +50,141 @@ public class PersistJobIT extends MlNativeAutodetectIntegTestCase { }); } - private void runJob(String jobId) throws Exception { - TimeValue bucketSpan = TimeValue.timeValueMinutes(5); + // check that state is persisted after time has been advanced even if no new data is seen in the interim + public void testPersistJobOnGracefulShutdown_givenTimeAdvancedAfterNoNewData() throws Exception { + String jobId = "time-advanced-after-no-new-data-test"; + + // open and run a job with a small data set + runJob(jobId); + FlushJobAction.Response flushResponse = flushJob(jobId, true); + + closeJob(jobId); + + // Check that state has been persisted + SearchResponse stateDocsResponse1 = client().prepareSearch(AnomalyDetectorsIndex.jobStateIndexPattern()) + .setFetchSource(false) + .setTrackTotalHits(true) + .setSize(10000) + .get(); + + int numQuantileRecords = 0; + int numStateRecords = 0; + for (SearchHit hit : stateDocsResponse1.getHits().getHits()) { + logger.info(hit.getId()); + if (hit.getId().contains("quantiles")) { + ++numQuantileRecords; + } else if (hit.getId().contains("model_state")) { + ++numStateRecords; + } + } + assertThat(stateDocsResponse1.getHits().getTotalHits().value, equalTo(2L)); + assertThat(numQuantileRecords, equalTo(1)); + assertThat(numStateRecords, equalTo(1)); + + // re-open the job + openJob(jobId); + + // advance time + long lastFinalizedBucketEnd = flushResponse.getLastFinalizedBucketEnd().getTime(); + FlushJobAction.Request advanceTimeRequest = new FlushJobAction.Request(jobId); + advanceTimeRequest.setAdvanceTime(String.valueOf(lastFinalizedBucketEnd + BUCKET_SPAN_SECONDS * 1000)); + advanceTimeRequest.setCalcInterim(false); + assertThat(client().execute(FlushJobAction.INSTANCE, advanceTimeRequest).actionGet().isFlushed(), is(true)); + + closeJob(jobId); + + // Check that a new state record exists. + SearchResponse stateDocsResponse2 = client().prepareSearch(AnomalyDetectorsIndex.jobStateIndexPattern()) + .setFetchSource(false) + .setTrackTotalHits(true) + .setSize(10000) + .get(); + + numQuantileRecords = 0; + numStateRecords = 0; + for (SearchHit hit : stateDocsResponse2.getHits().getHits()) { + logger.info(hit.getId()); + if (hit.getId().contains("quantiles")) { + ++numQuantileRecords; + } else if (hit.getId().contains("model_state")) { + ++numStateRecords; + } + } + + assertThat(stateDocsResponse2.getHits().getTotalHits().value, equalTo(3L)); + assertThat(numQuantileRecords, equalTo(1)); + assertThat(numStateRecords, equalTo(2)); + + deleteJob(jobId); + } + + // Check an edge case where time is manually advanced before any valid data is seen + public void testPersistJobOnGracefulShutdown_givenNoDataAndTimeAdvanced() throws Exception { + String jobId = "no-data-and-time-advanced-test"; + + createAndOpenJob(jobId); + + // Manually advance time. + FlushJobAction.Request advanceTimeRequest = new FlushJobAction.Request(jobId); + advanceTimeRequest.setAdvanceTime(String.valueOf(BUCKET_SPAN_SECONDS * 1000)); + advanceTimeRequest.setCalcInterim(false); + assertThat(client().execute(FlushJobAction.INSTANCE, advanceTimeRequest).actionGet().isFlushed(), is(true)); + + closeJob(jobId); + + // Check that state has been persisted + SearchResponse stateDocsResponse = client().prepareSearch(AnomalyDetectorsIndex.jobStateIndexPattern()) + .setFetchSource(false) + .setTrackTotalHits(true) + .setSize(10000) + .get(); + + int numQuantileRecords = 0; + int numStateRecords = 0; + for (SearchHit hit : stateDocsResponse.getHits().getHits()) { + logger.info(hit.getId()); + if (hit.getId().contains("quantiles")) { + ++numQuantileRecords; + } else if (hit.getId().contains("model_state")) { + ++numStateRecords; + } + } + assertThat(stateDocsResponse.getHits().getTotalHits().value, equalTo(2L)); + assertThat(numQuantileRecords, equalTo(1)); + assertThat(numStateRecords, equalTo(1)); + + // now check that the job can be happily restored - even though no data has been seen + AcknowledgedResponse ack = openJob(jobId); + assertTrue(ack.isAcknowledged()); + + closeJob(jobId); + deleteJob(jobId); + } + + // Check an edge case where a job is opened and then immediately closed + public void testPersistJobOnGracefulShutdown_givenNoDataAndNoTimeAdvance() throws Exception { + String jobId = "no-data-and-no-time-advance-test"; + + createAndOpenJob(jobId); + + closeJob(jobId); + + // Check that state has not been persisted + SearchResponse stateDocsResponse = client().prepareSearch(AnomalyDetectorsIndex.jobStateIndexPattern()) + .setFetchSource(false) + .setTrackTotalHits(true) + .setSize(10000) + .get(); + + assertThat(stateDocsResponse.getHits().getTotalHits().value, equalTo(0L)); + + deleteJob(jobId); + } + + private void createAndOpenJob(String jobId) throws Exception { Detector.Builder detector = new Detector.Builder("count", null); AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(Arrays.asList(detector.build())); - analysisConfig.setBucketSpan(bucketSpan); + analysisConfig.setBucketSpan(BUCKET_SPAN); Job.Builder job = new Job.Builder(jobId); job.setAnalysisConfig(analysisConfig); job.setDataDescription(new DataDescription.Builder()); @@ -51,7 +192,11 @@ public class PersistJobIT extends MlNativeAutodetectIntegTestCase { putJob(job); openJob(job.getId()); - List data = generateData(System.currentTimeMillis(), bucketSpan, 10, bucketIndex -> randomIntBetween(10, 20)); - postData(job.getId(), data.stream().collect(Collectors.joining())); + } + + private void runJob(String jobId) throws Exception { + createAndOpenJob(jobId); + List data = generateData(System.currentTimeMillis(), BUCKET_SPAN, 10, bucketIndex -> randomIntBetween(10, 20)); + postData(jobId, data.stream().collect(Collectors.joining())); } }