[ML] Add integration test for interim results after advancing bucket (#39447)

This is an integration test that captures the issue described in
elastic/ml-cpp#324
This commit is contained in:
Dimitris Athanasiou 2019-02-28 11:08:27 +02:00
parent 90ab4a6f6e
commit 8122650a55
1 changed files with 49 additions and 8 deletions

View File

@ -5,13 +5,16 @@
*/ */
package org.elasticsearch.xpack.ml.integration; package org.elasticsearch.xpack.ml.integration;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.xpack.core.ml.action.FlushJobAction;
import org.elasticsearch.xpack.core.ml.action.GetBucketsAction; import org.elasticsearch.xpack.core.ml.action.GetBucketsAction;
import org.elasticsearch.xpack.core.ml.action.util.PageParams; import org.elasticsearch.xpack.core.ml.action.util.PageParams;
import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig; import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig;
import org.elasticsearch.xpack.core.ml.job.config.DataDescription; import org.elasticsearch.xpack.core.ml.job.config.DataDescription;
import org.elasticsearch.xpack.core.ml.job.config.Detector; import org.elasticsearch.xpack.core.ml.job.config.Detector;
import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.job.results.AnomalyRecord;
import org.elasticsearch.xpack.core.ml.job.results.Bucket; import org.elasticsearch.xpack.core.ml.job.results.Bucket;
import org.junit.After; import org.junit.After;
@ -24,28 +27,25 @@ import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThan; import static org.hamcrest.Matchers.lessThan;
/** public class InterimResultsIT extends MlNativeAutodetectIntegTestCase {
* Tests that interim results get updated correctly
*/
public class UpdateInterimResultsIT extends MlNativeAutodetectIntegTestCase {
private static final String JOB_ID = "update-interim-test";
private static final long BUCKET_SPAN_SECONDS = 1000; private static final long BUCKET_SPAN_SECONDS = 1000;
private long time; private long time;
@After @After
public void cleanUpTest() throws Exception { public void cleanUpTest() {
cleanUp(); cleanUp();
} }
public void test() throws Exception { public void testInterimResultsUpdates() throws Exception {
String jobId = "test-interim-results-updates";
AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder( AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(
Collections.singletonList(new Detector.Builder("max", "value").build())); Collections.singletonList(new Detector.Builder("max", "value").build()));
analysisConfig.setBucketSpan(TimeValue.timeValueSeconds(BUCKET_SPAN_SECONDS)); analysisConfig.setBucketSpan(TimeValue.timeValueSeconds(BUCKET_SPAN_SECONDS));
DataDescription.Builder dataDescription = new DataDescription.Builder(); DataDescription.Builder dataDescription = new DataDescription.Builder();
dataDescription.setTimeFormat("epoch"); dataDescription.setTimeFormat("epoch");
Job.Builder job = new Job.Builder(JOB_ID); Job.Builder job = new Job.Builder(jobId);
job.setAnalysisConfig(analysisConfig); job.setAnalysisConfig(analysisConfig);
job.setDataDescription(dataDescription); job.setDataDescription(dataDescription);
@ -106,6 +106,47 @@ public class UpdateInterimResultsIT extends MlNativeAutodetectIntegTestCase {
assertThat(bucket.get(0).getRecords().get(0).getActual().get(0), equalTo(16.0)); assertThat(bucket.get(0).getRecords().get(0).getActual().get(0), equalTo(16.0));
} }
public void testNoInterimResultsAfterAdvancingBucket() throws Exception {
String jobId = "test-no-inerim-results-after-advancing-bucket";
AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(
Collections.singletonList(new Detector.Builder("count", null).build()));
analysisConfig.setBucketSpan(TimeValue.timeValueSeconds(BUCKET_SPAN_SECONDS));
DataDescription.Builder dataDescription = new DataDescription.Builder();
dataDescription.setTimeFormat("epoch");
Job.Builder job = new Job.Builder(jobId);
job.setAnalysisConfig(analysisConfig);
job.setDataDescription(dataDescription);
registerJob(job);
putJob(job);
openJob(job.getId());
time = 1400000000;
// push some data, flush job, verify no interim results
assertThat(postData(job.getId(), createData(50)).getProcessedRecordCount(), equalTo(50L));
FlushJobAction.Response flushResponse = flushJob(job.getId(), false);
assertThat(getInterimResults(job.getId()).isEmpty(), is(true));
// advance time and request interim results
long lastFinalizedBucketEnd = flushResponse.getLastFinalizedBucketEnd().getTime();
FlushJobAction.Request advanceTimeRequest = new FlushJobAction.Request(jobId);
advanceTimeRequest.setAdvanceTime(String.valueOf(lastFinalizedBucketEnd + BUCKET_SPAN_SECONDS * 1000));
advanceTimeRequest.setCalcInterim(true);
assertThat(client().execute(FlushJobAction.INSTANCE, advanceTimeRequest).actionGet().isFlushed(), is(true));
List<Bucket> interimResults = getInterimResults(job.getId());
assertThat(interimResults.size(), equalTo(1));
// We expect there are no records. The bucket count is low but at the same time
// it is too early into the bucket to consider it an anomaly. Let's verify that.
List<AnomalyRecord> records = interimResults.get(0).getRecords();
List<String> recordsJson = records.stream().map(Strings::toString).collect(Collectors.toList());
assertThat("Found interim records: " + recordsJson, records.isEmpty(), is(true));
closeJob(jobId);
}
private String createData(int halfBuckets) { private String createData(int halfBuckets) {
StringBuilder data = new StringBuilder(); StringBuilder data = new StringBuilder();
for (int i = 0; i < halfBuckets; i++) { for (int i = 0; i < halfBuckets; i++) {