From 245a063492500039b6c5dec4602b1538e9150011 Mon Sep 17 00:00:00 2001 From: Dimitris Athanasiou Date: Wed, 14 Feb 2018 12:09:05 +0000 Subject: [PATCH] [ML][TEST] Rewrite RestoreModelSnapshotIT to make more robust (elastic/x-pack-elasticsearch#3925) Now that the forecast API is available, we can use it to significantly simplify this test. The test is rewritten to leverage the contract that when a forecast is requested to a job without state it fails. relates elastic/x-pack-elasticsearch#3909 Original commit: elastic/x-pack-elasticsearch@978ae352c554d750397c7095a078924c7f0af31f --- .../integration/RestoreModelSnapshotIT.java | 114 ++++++------------ 1 file changed, 34 insertions(+), 80 deletions(-) diff --git a/qa/ml-native-tests/src/test/java/org/elasticsearch/xpack/ml/integration/RestoreModelSnapshotIT.java b/qa/ml-native-tests/src/test/java/org/elasticsearch/xpack/ml/integration/RestoreModelSnapshotIT.java index 1c25139f05c..bd67f7165f1 100644 --- a/qa/ml-native-tests/src/test/java/org/elasticsearch/xpack/ml/integration/RestoreModelSnapshotIT.java +++ b/qa/ml-native-tests/src/test/java/org/elasticsearch/xpack/ml/integration/RestoreModelSnapshotIT.java @@ -5,6 +5,7 @@ */ package org.elasticsearch.xpack.ml.integration; +import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.xpack.core.ml.action.GetJobsStatsAction; import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig; @@ -12,115 +13,76 @@ import org.elasticsearch.xpack.core.ml.job.config.DataDescription; import org.elasticsearch.xpack.core.ml.job.config.Detector; import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats; -import org.elasticsearch.xpack.core.ml.job.results.Bucket; +import org.elasticsearch.xpack.core.ml.job.results.ForecastRequestStats; import org.junit.After; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.greaterThanOrEqualTo; -import static org.hamcrest.Matchers.is; /** - * This test generates data and it runs them through 2 jobs. - * The data covers 3 days. During the first 2 days, bucket values alternate - * between 2 modes. During the 3rd day, there is only one of the 2 modes - * except for a single bucket when the other mode reappears. - * * The first job receives the data in one go. The second job receives the - * data split into 2 parts: the alternating part and the stable part. - * After the first half, the job is closed and reopened, forcing the model - * snapshot to be restored. - * - * The test is designed so that no anomalies should be detected. However, - * for the split job, if the model fails to be restored the reappearance of - * the lost mode should cause an anomaly. - * - * The test asserts the 2 jobs have equal data counts and no records. + * This test aims to catch regressions where, + * when a job is reopened, it does not get restored + * with its model snapshot. To achieve this we + * leverage the forecast API. Requesting a forecast + * when there's no model state results to an error. + * Thus, we create a job, send some data, and we close it. + * Then we open it again and we request a forecast asserting + * the forecast was successful. */ public class RestoreModelSnapshotIT extends MlNativeAutodetectIntegTestCase { @After - public void tearDownData() throws Exception { + public void tearDownData() { cleanUp(); } public void test() throws Exception { TimeValue bucketSpan = TimeValue.timeValueHours(1); int bucketCount = 72; - List byFieldValues = Arrays.asList("foo", "bar"); List data = new ArrayList<>(); long now = System.currentTimeMillis(); long timestamp = now - bucketCount * bucketSpan.getMillis(); - boolean isMode1 = true; - int alternatingModesCutoff = 48; - int differentModeBucketAfterAlternatingCutoff = 60; for (int i = 0; i < bucketCount; i++) { - for (String byFieldValue : byFieldValues) { - double value = isMode1 ? 10.0 : 100.0; - if (i == differentModeBucketAfterAlternatingCutoff) { - value = isMode1 ? 100.0 : 10.0; - } - - Map record = new HashMap<>(); - record.put("time", timestamp); - record.put("value", value); - record.put("by_field", byFieldValue); - data.add(createJsonRecord(record)); - } + Map record = new HashMap<>(); + record.put("time", timestamp); + data.add(createJsonRecord(record)); timestamp += bucketSpan.getMillis(); - if (i < alternatingModesCutoff) { - isMode1 = !isMode1; - } } - Job.Builder oneGoJob = buildAndRegisterJob("restore-model-snapshot-one-go-job", bucketSpan); - openJob(oneGoJob.getId()); - postData(oneGoJob.getId(), joinBetween(0, data.size(), data)); - closeJob(oneGoJob.getId()); + // Create the job, post the data and close the job + Job.Builder job = buildAndRegisterJob("restore-model-snapshot-job", bucketSpan); + openJob(job.getId()); + // Forecast should fail when the model has seen no data, ie model state not initialized + expectThrows(ElasticsearchStatusException.class, () -> forecast(job.getId(), TimeValue.timeValueHours(3), null)); + postData(job.getId(), data.stream().collect(Collectors.joining())); + closeJob(job.getId()); - Job.Builder splitJob = buildAndRegisterJob("restore-model-snapshot-split-job", bucketSpan); - openJob(splitJob.getId()); - int splitPoint = alternatingModesCutoff * byFieldValues.size(); - postData(splitJob.getId(), joinBetween(0, splitPoint, data)); - closeJob(splitJob.getId()); + // Reopen the job and check forecast works + openJob(job.getId()); + String forecastId = forecast(job.getId(), TimeValue.timeValueHours(3), null); + waitForecastToFinish(job.getId(), forecastId); + ForecastRequestStats forecastStats = getForecastStats(job.getId(), forecastId); + assertThat(forecastStats.getStatus(), equalTo(ForecastRequestStats.ForecastRequestStatus.FINISHED)); - openJob(splitJob.getId()); - postData(splitJob.getId(), joinBetween(splitPoint, data.size(), data)); - closeJob(splitJob.getId()); - - // Compare data counts - GetJobsStatsAction.Response.JobStats oneGoJobStats = getJobStats(oneGoJob.getId()).get(0); - GetJobsStatsAction.Response.JobStats splitJobStats = getJobStats(splitJob.getId()).get(0); - assertThat(oneGoJobStats.getDataCounts().getProcessedRecordCount(), - equalTo(splitJobStats.getDataCounts().getProcessedRecordCount())); - assertThat(oneGoJobStats.getDataCounts().getLatestRecordTimeStamp(), - equalTo(splitJobStats.getDataCounts().getLatestRecordTimeStamp())); - - List oneGoBuckets = getBuckets(oneGoJob.getId()); - assertThat(oneGoBuckets.size(), greaterThanOrEqualTo(70)); - assertThat(getBuckets(splitJob.getId()).size(), equalTo(oneGoBuckets.size())); - assertThat(getRecords(oneGoJob.getId()).isEmpty(), is(true)); - assertThat(getRecords(splitJob.getId()).isEmpty(), is(true)); + closeJob(job.getId()); // Since these jobs ran for 72 buckets, it's a good place to assert // that established model memory matches model memory in the job stats - for (Job.Builder job : Arrays.asList(oneGoJob, splitJob)) { - GetJobsStatsAction.Response.JobStats jobStats = getJobStats(job.getId()).get(0); - ModelSizeStats modelSizeStats = jobStats.getModelSizeStats(); - Job updatedJob = getJob(job.getId()).get(0); - assertThat(updatedJob.getEstablishedModelMemory(), equalTo(modelSizeStats.getModelBytes())); - } + GetJobsStatsAction.Response.JobStats jobStats = getJobStats(job.getId()).get(0); + ModelSizeStats modelSizeStats = jobStats.getModelSizeStats(); + Job updatedJob = getJob(job.getId()).get(0); + assertThat(updatedJob.getEstablishedModelMemory(), equalTo(modelSizeStats.getModelBytes())); } private Job.Builder buildAndRegisterJob(String jobId, TimeValue bucketSpan) throws Exception { - Detector.Builder detector = new Detector.Builder("mean", "value"); - detector.setByFieldName("by_field"); + Detector.Builder detector = new Detector.Builder("count", null); AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(Collections.singletonList(detector.build())); analysisConfig.setBucketSpan(bucketSpan); Job.Builder job = new Job.Builder(jobId); @@ -131,12 +93,4 @@ public class RestoreModelSnapshotIT extends MlNativeAutodetectIntegTestCase { putJob(job); return job; } - - private String joinBetween(int start, int end, List input) { - StringBuilder result = new StringBuilder(); - for (int i = start; i < end; i++) { - result.append(input.get(i)); - } - return result.toString(); - } }