From 708190f35657b600307c268c70360d7f7a742647 Mon Sep 17 00:00:00 2001 From: Dimitrios Athanasiou Date: Mon, 24 Apr 2017 22:56:57 +0100 Subject: [PATCH] [TEST] Add model snapshot restore integration test Relates elastic/x-pack-elasticsearch#882 Original commit: elastic/x-pack-elasticsearch@fbb983e63b2d684bc97534108e6fb5d0601fe8ca --- .../ml/integration/DetectionRulesIT.java | 6 - ...erimResultsDeletedAfterReopeningJobIT.java | 6 - .../MlNativeAutodetectIntegTestCase.java | 20 ++- .../integration/RestoreModelSnapshotIT.java | 131 ++++++++++++++++++ 4 files changed, 148 insertions(+), 15 deletions(-) create mode 100644 plugin/src/test/java/org/elasticsearch/xpack/ml/integration/RestoreModelSnapshotIT.java diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/DetectionRulesIT.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/DetectionRulesIT.java index 25cb9209393..364ce6d2cf8 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/DetectionRulesIT.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/DetectionRulesIT.java @@ -6,7 +6,6 @@ package org.elasticsearch.xpack.ml.integration; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.common.xcontent.json.JsonXContent; import org.elasticsearch.xpack.ml.action.GetRecordsAction; import org.elasticsearch.xpack.ml.job.config.AnalysisConfig; import org.elasticsearch.xpack.ml.job.config.Condition; @@ -21,7 +20,6 @@ import org.elasticsearch.xpack.ml.job.config.RuleConditionType; import org.elasticsearch.xpack.ml.job.results.AnomalyRecord; import org.junit.After; -import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; @@ -151,10 +149,6 @@ public class DetectionRulesIT extends MlNativeAutodetectIntegTestCase { assertThat(secondHaldRecordByFieldValues, contains("by_field_value_1", "by_field_value_2")); } - private static String createJsonRecord(Map keyValueMap) throws IOException { - return JsonXContent.contentBuilder().map(keyValueMap).string() + "\n"; - } - private String joinBetween(int start, int end, List input) { StringBuilder result = new StringBuilder(); for (int i = start; i < end; i++) { diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/InterimResultsDeletedAfterReopeningJobIT.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/InterimResultsDeletedAfterReopeningJobIT.java index 33650010c75..b05c6d5f839 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/InterimResultsDeletedAfterReopeningJobIT.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/InterimResultsDeletedAfterReopeningJobIT.java @@ -7,7 +7,6 @@ package org.elasticsearch.xpack.ml.integration; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.common.xcontent.json.JsonXContent; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.xpack.ml.job.config.AnalysisConfig; import org.elasticsearch.xpack.ml.job.config.DataDescription; @@ -17,7 +16,6 @@ import org.elasticsearch.xpack.ml.job.persistence.AnomalyDetectorsIndex; import org.elasticsearch.xpack.ml.job.results.AnomalyRecord; import org.junit.After; -import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; @@ -109,10 +107,6 @@ public class InterimResultsDeletedAfterReopeningJobIT extends MlNativeAutodetect return record; } - private static String createJsonRecord(Map keyValueMap) throws IOException { - return JsonXContent.contentBuilder().map(keyValueMap).string() + "\n"; - } - private void assertNoInterimResults(String jobId) { String indexName = AnomalyDetectorsIndex.jobResultsAliasedName(jobId); SearchResponse search = client().prepareSearch(indexName).setTypes("result").setSize(1000) diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/MlNativeAutodetectIntegTestCase.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/MlNativeAutodetectIntegTestCase.java index 9250a29ed29..b4a94673486 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/MlNativeAutodetectIntegTestCase.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/MlNativeAutodetectIntegTestCase.java @@ -11,6 +11,7 @@ import org.elasticsearch.common.network.NetworkModule; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.common.xcontent.json.JsonXContent; import org.elasticsearch.test.SecurityIntegTestCase; import org.elasticsearch.xpack.XPackSettings; import org.elasticsearch.xpack.ml.action.CloseJobAction; @@ -44,6 +45,7 @@ import org.elasticsearch.xpack.security.Security; import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.Map; import static org.hamcrest.Matchers.equalTo; @@ -183,17 +185,25 @@ abstract class MlNativeAutodetectIntegTestCase extends SecurityIntegTestCase { protected void waitUntilJobIsClosed(String jobId) throws Exception { assertBusy(() -> { try { - GetJobsStatsAction.Request request = new GetJobsStatsAction.Request(jobId); - GetJobsStatsAction.Response response = client().execute(GetJobsStatsAction.INSTANCE, request).get(); - assertThat(response.getResponse().results().get(0).getState(), equalTo(JobState.CLOSED)); + assertThat(getJobStats(jobId).get(0).getState(), equalTo(JobState.CLOSED)); } catch (Exception e) { throw new RuntimeException(e); } }); } + protected List getJobStats(String jobId) throws Exception { + GetJobsStatsAction.Request request = new GetJobsStatsAction.Request(jobId); + GetJobsStatsAction.Response response = client().execute(GetJobsStatsAction.INSTANCE, request).get(); + return response.getResponse().results(); + } + protected List getBuckets(String jobId) throws Exception { GetBucketsAction.Request request = new GetBucketsAction.Request(jobId); + return getBuckets(request); + } + + protected List getBuckets(GetBucketsAction.Request request) throws Exception { GetBucketsAction.Response response = client().execute(GetBucketsAction.INSTANCE, request).get(); return response.getBuckets().results(); } @@ -234,4 +244,8 @@ abstract class MlNativeAutodetectIntegTestCase extends SecurityIntegTestCase { protected void ensureClusterStateConsistency() throws IOException { // this method in ESIntegTestCase is not plugin-friendly - it does not account for plugin NamedWritableRegistries } + + protected static String createJsonRecord(Map keyValueMap) throws IOException { + return JsonXContent.contentBuilder().map(keyValueMap).string() + "\n"; + } } diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/RestoreModelSnapshotIT.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/RestoreModelSnapshotIT.java new file mode 100644 index 00000000000..daa035405c2 --- /dev/null +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/RestoreModelSnapshotIT.java @@ -0,0 +1,131 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.integration; + +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.xpack.ml.action.GetJobsStatsAction; +import org.elasticsearch.xpack.ml.job.config.AnalysisConfig; +import org.elasticsearch.xpack.ml.job.config.DataDescription; +import org.elasticsearch.xpack.ml.job.config.Detector; +import org.elasticsearch.xpack.ml.job.config.Job; +import org.elasticsearch.xpack.ml.job.results.Bucket; +import org.junit.After; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.is; + +/** + * This test generates data and it runs them through 2 jobs. + * The data covers 3 days. During the first 2 days, bucket values alternate + * between 2 modes. During the 3rd day, there is only one of the 2 modes + * except for a single bucket when the other mode reappears. + * * The first job receives the data in one go. The second job receives the + * data split into 2 parts: the alternating part and the stable part. + * After the first half, the job is closed and reopened, forcing the model + * snapshot to be restored. + * + * The test is designed so that no anomalies should be detected. However, + * for the split job, if the model fails to be restored the reappearance of + * the lost mode should cause an anomaly. + * + * The test asserts the 2 jobs have equal data counts and no records. + */ +public class RestoreModelSnapshotIT extends MlNativeAutodetectIntegTestCase { + + @After + public void tearDownData() throws Exception { + cleanUp(); + } + + public void test() throws Exception { + TimeValue bucketSpan = TimeValue.timeValueHours(1); + int bucketCount = 72; + List byFieldValues = Arrays.asList("foo", "bar"); + + List data = new ArrayList<>(); + long now = System.currentTimeMillis(); + long timestamp = now - bucketCount * bucketSpan.getMillis(); + boolean isMode1 = true; + int alternatingModesCutoff = 48; + int differentModeBucketAfterAlternatingCutoff = 60; + for (int i = 0; i < bucketCount; i++) { + for (String byFieldValue : byFieldValues) { + double value = isMode1 ? 10.0 : 100.0; + if (i == differentModeBucketAfterAlternatingCutoff) { + value = isMode1 ? 100.0 : 10.0; + } + + Map record = new HashMap<>(); + record.put("time", timestamp); + record.put("value", value); + record.put("by_field", byFieldValue); + data.add(createJsonRecord(record)); + } + timestamp += bucketSpan.getMillis(); + if (i < alternatingModesCutoff) { + isMode1 = !isMode1; + } + } + + Job.Builder oneGoJob = buildAndRegisterJob("restore-model-snapshot-one-go-job", bucketSpan); + openJob(oneGoJob.getId()); + postData(oneGoJob.getId(), joinBetween(0, data.size(), data)); + closeJob(oneGoJob.getId()); + + Job.Builder splitJob = buildAndRegisterJob("restore-model-snapshot-split-job", bucketSpan); + openJob(splitJob.getId()); + int splitPoint = alternatingModesCutoff * byFieldValues.size(); + postData(splitJob.getId(), joinBetween(0, splitPoint, data)); + closeJob(splitJob.getId()); + + openJob(splitJob.getId()); + postData(splitJob.getId(), joinBetween(splitPoint, data.size(), data)); + closeJob(splitJob.getId()); + + // Compare data counts + GetJobsStatsAction.Response.JobStats oneGoJobStats = getJobStats(oneGoJob.getId()).get(0); + GetJobsStatsAction.Response.JobStats splitJobStats = getJobStats(splitJob.getId()).get(0); + assertThat(oneGoJobStats.getDataCounts().getProcessedRecordCount(), + equalTo(splitJobStats.getDataCounts().getProcessedRecordCount())); + assertThat(oneGoJobStats.getDataCounts().getLatestRecordTimeStamp(), + equalTo(splitJobStats.getDataCounts().getLatestRecordTimeStamp())); + + List oneGoBuckets = getBuckets(oneGoJob.getId()); + assertThat(oneGoBuckets.size(), greaterThanOrEqualTo(70)); + assertThat(getBuckets(splitJob.getId()).size(), equalTo(oneGoBuckets.size())); + assertThat(getRecords(oneGoJob.getId()).isEmpty(), is(true)); + assertThat(getRecords(splitJob.getId()).isEmpty(), is(true)); + } + + private Job.Builder buildAndRegisterJob(String jobId, TimeValue bucketSpan) throws Exception { + Detector.Builder detector = new Detector.Builder("mean", "value"); + detector.setByFieldName("by_field"); + AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(Arrays.asList(detector.build())); + analysisConfig.setBucketSpan(bucketSpan); + Job.Builder job = new Job.Builder(jobId); + job.setAnalysisConfig(analysisConfig); + DataDescription.Builder dataDescription = new DataDescription.Builder(); + job.setDataDescription(dataDescription); + registerJob(job); + putJob(job); + return job; + } + + private String joinBetween(int start, int end, List input) { + StringBuilder result = new StringBuilder(); + for (int i = start; i < end; i++) { + result.append(input.get(i)); + } + return result.toString(); + } +}