diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/MlNativeAutodetectIntegTestCase.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/MlNativeAutodetectIntegTestCase.java index 6dc7fb7375a..dab19ccbbab 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/MlNativeAutodetectIntegTestCase.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/MlNativeAutodetectIntegTestCase.java @@ -5,19 +5,23 @@ */ package org.elasticsearch.xpack.ml.integration; +import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.network.NetworkModule; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.test.SecurityIntegTestCase; import org.elasticsearch.xpack.XPackSettings; import org.elasticsearch.xpack.ml.action.CloseJobAction; import org.elasticsearch.xpack.ml.action.DeleteDatafeedAction; import org.elasticsearch.xpack.ml.action.DeleteJobAction; +import org.elasticsearch.xpack.ml.action.FlushJobAction; import org.elasticsearch.xpack.ml.action.GetBucketsAction; import org.elasticsearch.xpack.ml.action.GetCategoriesAction; import org.elasticsearch.xpack.ml.action.GetJobsStatsAction; import org.elasticsearch.xpack.ml.action.GetModelSnapshotsAction; import org.elasticsearch.xpack.ml.action.GetRecordsAction; import org.elasticsearch.xpack.ml.action.OpenJobAction; +import org.elasticsearch.xpack.ml.action.PostDataAction; import org.elasticsearch.xpack.ml.action.PutDatafeedAction; import org.elasticsearch.xpack.ml.action.PutJobAction; import org.elasticsearch.xpack.ml.action.StartDatafeedAction; @@ -26,6 +30,7 @@ import org.elasticsearch.xpack.ml.action.util.PageParams; import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig; import org.elasticsearch.xpack.ml.job.config.Job; import org.elasticsearch.xpack.ml.job.config.JobState; +import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts; import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSnapshot; import org.elasticsearch.xpack.ml.job.results.AnomalyRecord; import org.elasticsearch.xpack.ml.job.results.Bucket; @@ -114,6 +119,12 @@ abstract class MlNativeAutodetectIntegTestCase extends SecurityIntegTestCase { client().execute(CloseJobAction.INSTANCE, request).get(); } + protected void flushJob(String jobId, boolean calcInterim) throws Exception { + FlushJobAction.Request request = new FlushJobAction.Request(jobId); + request.setCalcInterim(calcInterim); + client().execute(FlushJobAction.INSTANCE, request).get(); + } + protected void deleteJob(String jobId) throws Exception { DeleteJobAction.Request request = new DeleteJobAction.Request(jobId); client().execute(DeleteJobAction.INSTANCE, request).get(); @@ -179,6 +190,13 @@ abstract class MlNativeAutodetectIntegTestCase extends SecurityIntegTestCase { return categoriesResponse.getResult().results(); } + protected DataCounts postData(String jobId, String data) { + logger.debug("Posting data to job [{}]:\n{}", jobId, data); + PostDataAction.Request request = new PostDataAction.Request(jobId); + request.setContent(new BytesArray(data), XContentType.JSON); + return client().execute(PostDataAction.INSTANCE, request).actionGet().getDataCounts(); + } + @Override protected void ensureClusterStateConsistency() throws IOException { // this method in ESIntegTestCase is not plugin-friendly - it does not account for plugin NamedWritableRegistries diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/UpdateInterimResultsIT.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/UpdateInterimResultsIT.java new file mode 100644 index 00000000000..c2146a62698 --- /dev/null +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/UpdateInterimResultsIT.java @@ -0,0 +1,129 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.integration; + +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.xpack.ml.action.GetBucketsAction; +import org.elasticsearch.xpack.ml.action.util.PageParams; +import org.elasticsearch.xpack.ml.job.config.AnalysisConfig; +import org.elasticsearch.xpack.ml.job.config.DataDescription; +import org.elasticsearch.xpack.ml.job.config.Detector; +import org.elasticsearch.xpack.ml.job.config.Job; +import org.elasticsearch.xpack.ml.job.results.Bucket; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.lessThan; + +/** + * A fast integration test for categorization + */ +public class UpdateInterimResultsIT extends MlNativeAutodetectIntegTestCase { + + private static final String JOB_ID = "update-interim-test"; + private static final long BUCKET_SPAN_SECONDS = 1000; + + private long time; + + public void test() throws Exception { + AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder( + Arrays.asList(new Detector.Builder("max", "value").build())); + analysisConfig.setBucketSpan(TimeValue.timeValueSeconds(BUCKET_SPAN_SECONDS)); + analysisConfig.setOverlappingBuckets(true); + DataDescription.Builder dataDescription = new DataDescription.Builder(); + dataDescription.setTimeFormat("epoch"); + Job.Builder job = new Job.Builder(JOB_ID); + job.setAnalysisConfig(analysisConfig); + job.setDataDescription(dataDescription); + + registerJob(job); + putJob(job); + openJob(job.getId()); + + time = 1400000000; + Map anomalies = new HashMap<>(); + anomalies.put(1400021500L, 14); + + // push some data, flush job, verify no interim results + assertThat(postData(job.getId(), createData(50, anomalies)).getProcessedRecordCount(), equalTo(50L)); + flushJob(job.getId(), false); + assertThat(getInterimResults(job.getId()).isEmpty(), is(true)); + + // push some more data, flush job, verify no interim results + assertThat(postData(job.getId(), createData(30, anomalies)).getProcessedRecordCount(), equalTo(30L)); + flushJob(job.getId(), false); + assertThat(getInterimResults(job.getId()).isEmpty(), is(true)); + assertThat(time, equalTo(1400040000L)); + + // push some data up to a 1/4 bucket boundary, flush (with interim), check interim results + String data = "{\"time\":1400040000,\"value\":14}\n" + + "{\"time\":1400040500,\"value\":12}\n" + + "{\"time\":1400040510,\"value\":16}\n"; + assertThat(postData(job.getId(), data).getProcessedRecordCount(), equalTo(3L)); + flushJob(job.getId(), true); + List firstInterimBuckets = getInterimResults(job.getId()); + assertThat(firstInterimBuckets.size(), equalTo(2)); + assertThat(firstInterimBuckets.get(0).getTimestamp().getTime(), equalTo(1400039000000L)); + assertThat(firstInterimBuckets.get(0).getRecordCount(), equalTo(0)); + assertThat(firstInterimBuckets.get(1).getTimestamp().getTime(), equalTo(1400040000000L)); + assertThat(firstInterimBuckets.get(1).getRecordCount(), equalTo(1)); + assertThat(firstInterimBuckets.get(1).getRecords().get(0).getActual().get(0), equalTo(16.0)); + + // push 1 more record, flush (with interim), check same interim result + data = "{\"time\":1400040520,\"value\":15}\n"; + assertThat(postData(job.getId(), data).getProcessedRecordCount(), equalTo(1L)); + flushJob(job.getId(), true); + List secondInterimBuckets = getInterimResults(job.getId()); + assertThat(secondInterimBuckets.get(0).getTimestamp().getTime(), equalTo(1400039000000L)); + assertThat(secondInterimBuckets.get(0).getRecordCount(), equalTo(0)); + assertThat(secondInterimBuckets.get(1).getTimestamp().getTime(), equalTo(1400040000000L)); + assertThat(secondInterimBuckets.get(1).getRecordCount(), equalTo(1)); + assertThat(secondInterimBuckets.get(1).getRecords().get(0).getActual().get(0), equalTo(16.0)); + + // push rest of data, close, verify no interim results + time += BUCKET_SPAN_SECONDS; + assertThat(postData(job.getId(), createData(30, anomalies)).getProcessedRecordCount(), equalTo(30L)); + closeJob(job.getId()); + assertThat(getInterimResults(job.getId()).isEmpty(), is(true)); + + // Verify interim results have been replaced with finalized results + GetBucketsAction.Request bucketRequest = new GetBucketsAction.Request(job.getId()); + bucketRequest.setTimestamp("1400039500000"); + bucketRequest.setExpand(true); + List bucket = client().execute(GetBucketsAction.INSTANCE, bucketRequest).get().getBuckets().results(); + assertThat(bucket.size(), equalTo(1)); + assertThat(bucket.get(0).getRecords().get(0).getActual().get(0), equalTo(14.0)); + } + + private String createData(int halfBuckets, Map timeToValueMap) { + StringBuilder data = new StringBuilder(); + for (int i = 0; i < halfBuckets; i++) { + int value = timeToValueMap.getOrDefault(time, randomIntBetween(1, 3)); + data.append("{\"time\":" + time + ", \"value\":" + value + "}\n"); + time += BUCKET_SPAN_SECONDS / 2; + } + return data.toString(); + } + + private List getInterimResults(String jobId) throws Exception { + GetBucketsAction.Request request = new GetBucketsAction.Request(jobId); + request.setIncludeInterim(true); + request.setExpand(true); + request.setPageParams(new PageParams(0, 1500)); + GetBucketsAction.Response response = client().execute(GetBucketsAction.INSTANCE, request).get(); + assertThat(response.getBuckets().count(), lessThan(1500L)); + List buckets = response.getBuckets().results(); + assertThat(buckets.size(), greaterThan(0)); + return buckets.stream().filter(b -> b.isInterim()).collect(Collectors.toList()); + } +}