From 8122650a55c3b6d1830f2f363ca9258ed940662d Mon Sep 17 00:00:00 2001 From: Dimitris Athanasiou Date: Thu, 28 Feb 2019 11:08:27 +0200 Subject: [PATCH] [ML] Add integration test for interim results after advancing bucket (#39447) This is an integration test that captures the issue described in elastic/ml-cpp#324 --- ...imResultsIT.java => InterimResultsIT.java} | 57 ++++++++++++++++--- 1 file changed, 49 insertions(+), 8 deletions(-) rename x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/{UpdateInterimResultsIT.java => InterimResultsIT.java} (67%) diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/UpdateInterimResultsIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/InterimResultsIT.java similarity index 67% rename from x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/UpdateInterimResultsIT.java rename to x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/InterimResultsIT.java index 4cbeaf1dc48..5689e7bfe54 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/UpdateInterimResultsIT.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/InterimResultsIT.java @@ -5,13 +5,16 @@ */ package org.elasticsearch.xpack.ml.integration; +import org.elasticsearch.common.Strings; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.xpack.core.ml.action.FlushJobAction; import org.elasticsearch.xpack.core.ml.action.GetBucketsAction; import org.elasticsearch.xpack.core.ml.action.util.PageParams; import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig; import org.elasticsearch.xpack.core.ml.job.config.DataDescription; import org.elasticsearch.xpack.core.ml.job.config.Detector; import org.elasticsearch.xpack.core.ml.job.config.Job; +import org.elasticsearch.xpack.core.ml.job.results.AnomalyRecord; import org.elasticsearch.xpack.core.ml.job.results.Bucket; import org.junit.After; @@ -24,28 +27,25 @@ import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.lessThan; -/** - * Tests that interim results get updated correctly - */ -public class UpdateInterimResultsIT extends MlNativeAutodetectIntegTestCase { +public class InterimResultsIT extends MlNativeAutodetectIntegTestCase { - private static final String JOB_ID = "update-interim-test"; private static final long BUCKET_SPAN_SECONDS = 1000; private long time; @After - public void cleanUpTest() throws Exception { + public void cleanUpTest() { cleanUp(); } - public void test() throws Exception { + public void testInterimResultsUpdates() throws Exception { + String jobId = "test-interim-results-updates"; AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder( Collections.singletonList(new Detector.Builder("max", "value").build())); analysisConfig.setBucketSpan(TimeValue.timeValueSeconds(BUCKET_SPAN_SECONDS)); DataDescription.Builder dataDescription = new DataDescription.Builder(); dataDescription.setTimeFormat("epoch"); - Job.Builder job = new Job.Builder(JOB_ID); + Job.Builder job = new Job.Builder(jobId); job.setAnalysisConfig(analysisConfig); job.setDataDescription(dataDescription); @@ -106,6 +106,47 @@ public class UpdateInterimResultsIT extends MlNativeAutodetectIntegTestCase { assertThat(bucket.get(0).getRecords().get(0).getActual().get(0), equalTo(16.0)); } + public void testNoInterimResultsAfterAdvancingBucket() throws Exception { + String jobId = "test-no-inerim-results-after-advancing-bucket"; + AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder( + Collections.singletonList(new Detector.Builder("count", null).build())); + analysisConfig.setBucketSpan(TimeValue.timeValueSeconds(BUCKET_SPAN_SECONDS)); + DataDescription.Builder dataDescription = new DataDescription.Builder(); + dataDescription.setTimeFormat("epoch"); + Job.Builder job = new Job.Builder(jobId); + job.setAnalysisConfig(analysisConfig); + job.setDataDescription(dataDescription); + + registerJob(job); + putJob(job); + openJob(job.getId()); + + time = 1400000000; + + // push some data, flush job, verify no interim results + assertThat(postData(job.getId(), createData(50)).getProcessedRecordCount(), equalTo(50L)); + FlushJobAction.Response flushResponse = flushJob(job.getId(), false); + assertThat(getInterimResults(job.getId()).isEmpty(), is(true)); + + // advance time and request interim results + long lastFinalizedBucketEnd = flushResponse.getLastFinalizedBucketEnd().getTime(); + FlushJobAction.Request advanceTimeRequest = new FlushJobAction.Request(jobId); + advanceTimeRequest.setAdvanceTime(String.valueOf(lastFinalizedBucketEnd + BUCKET_SPAN_SECONDS * 1000)); + advanceTimeRequest.setCalcInterim(true); + assertThat(client().execute(FlushJobAction.INSTANCE, advanceTimeRequest).actionGet().isFlushed(), is(true)); + + List interimResults = getInterimResults(job.getId()); + assertThat(interimResults.size(), equalTo(1)); + + // We expect there are no records. The bucket count is low but at the same time + // it is too early into the bucket to consider it an anomaly. Let's verify that. + List records = interimResults.get(0).getRecords(); + List recordsJson = records.stream().map(Strings::toString).collect(Collectors.toList()); + assertThat("Found interim records: " + recordsJson, records.isEmpty(), is(true)); + + closeJob(jobId); + } + private String createData(int halfBuckets) { StringBuilder data = new StringBuilder(); for (int i = 0; i < halfBuckets; i++) {