From 3b260dcfc1e938103659e31a909bdd6ba9be0457 Mon Sep 17 00:00:00 2001 From: Dimitris Athanasiou Date: Thu, 3 May 2018 15:08:24 +0100 Subject: [PATCH] [ML] Account for gaps in data counts after job is reopened (#30294) This commit fixes an issue with the data diagnostics were empty buckets are not reported even though they should. Once a job is reopened, the diagnostics do not get initialized from the current data counts (especially the latest record timestamp). The result is that if the data that is sent have a time gap compared to the previous ones, that gap is not accounted for in the empty bucket count. This commit fixes that by initializing the diagnostics with the current data counts. Closes #30080 --- docs/CHANGELOG.asciidoc | 4 + .../ml/job/process/DataCountsReporter.java | 2 +- .../diagnostics/BucketDiagnostics.java | 10 +- .../diagnostics/DataStreamDiagnostics.java | 5 +- .../DataStreamDiagnosticsTests.java | 31 ++++--- .../ml/integration/MlBasicMultiNodeIT.java | 4 +- .../ml/integration/ReopenJobWithGapIT.java | 93 +++++++++++++++++++ 7 files changed, 129 insertions(+), 20 deletions(-) create mode 100644 x-pack/qa/ml-native-tests/src/test/java/org/elasticsearch/xpack/ml/integration/ReopenJobWithGapIT.java diff --git a/docs/CHANGELOG.asciidoc b/docs/CHANGELOG.asciidoc index 7c14d41724a..1fb8534a19c 100644 --- a/docs/CHANGELOG.asciidoc +++ b/docs/CHANGELOG.asciidoc @@ -109,6 +109,10 @@ Do not ignore request analysis/similarity settings on index resize operations wh Fix NPE when CumulativeSum agg encounters null value/empty bucket ({pull}29641[#29641]) +Machine Learning:: + +* Account for gaps in data counts after job is reopened ({pull}30294[#30294]) + //[float] //=== Regressions diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/DataCountsReporter.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/DataCountsReporter.java index 80223027e8e..d906ccf2f7a 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/DataCountsReporter.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/DataCountsReporter.java @@ -82,7 +82,7 @@ public class DataCountsReporter extends AbstractComponent { totalRecordStats = counts; incrementalRecordStats = new DataCounts(job.getId()); - diagnostics = new DataStreamDiagnostics(job); + diagnostics = new DataStreamDiagnostics(job, counts); acceptablePercentDateParseErrors = ACCEPTABLE_PERCENTAGE_DATE_PARSE_ERRORS_SETTING.get(settings); acceptablePercentOutOfOrderErrors = ACCEPTABLE_PERCENTAGE_OUT_OF_ORDER_ERRORS_SETTING.get(settings); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/diagnostics/BucketDiagnostics.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/diagnostics/BucketDiagnostics.java index c61926dfb04..a4497653497 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/diagnostics/BucketDiagnostics.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/diagnostics/BucketDiagnostics.java @@ -6,8 +6,11 @@ package org.elasticsearch.xpack.ml.job.process.diagnostics; import org.elasticsearch.xpack.core.ml.job.config.Job; +import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts; import org.elasticsearch.xpack.core.ml.utils.Intervals; +import java.util.Date; + /** * A moving window of buckets that allow keeping * track of some statistics like the bucket count, @@ -33,12 +36,17 @@ class BucketDiagnostics { private long latestFlushedBucketStartMs = -1; private final BucketFlushListener bucketFlushListener; - BucketDiagnostics(Job job, BucketFlushListener bucketFlushListener) { + BucketDiagnostics(Job job, DataCounts dataCounts, BucketFlushListener bucketFlushListener) { bucketSpanMs = job.getAnalysisConfig().getBucketSpan().millis(); latencyMs = job.getAnalysisConfig().getLatency() == null ? 0 : job.getAnalysisConfig().getLatency().millis(); maxSize = Math.max((int) (Intervals.alignToCeil(latencyMs, bucketSpanMs) / bucketSpanMs), MIN_BUCKETS); buckets = new long[maxSize]; this.bucketFlushListener = bucketFlushListener; + + Date latestRecordTimestamp = dataCounts.getLatestRecordTimeStamp(); + if (latestRecordTimestamp != null) { + addRecord(latestRecordTimestamp.getTime()); + } } void addRecord(long recordTimestampMs) { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/diagnostics/DataStreamDiagnostics.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/diagnostics/DataStreamDiagnostics.java index a19f6eba023..a225587d0bb 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/diagnostics/DataStreamDiagnostics.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/diagnostics/DataStreamDiagnostics.java @@ -8,6 +8,7 @@ package org.elasticsearch.xpack.ml.job.process.diagnostics; import org.apache.logging.log4j.Logger; import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.xpack.core.ml.job.config.Job; +import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts; import java.util.Date; @@ -32,8 +33,8 @@ public class DataStreamDiagnostics { private long sparseBucketCount = 0; private long latestSparseBucketTime = -1; - public DataStreamDiagnostics(Job job) { - bucketDiagnostics = new BucketDiagnostics(job, createBucketFlushListener()); + public DataStreamDiagnostics(Job job, DataCounts dataCounts) { + bucketDiagnostics = new BucketDiagnostics(job, dataCounts, createBucketFlushListener()); } private BucketDiagnostics.BucketFlushListener createBucketFlushListener() { diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/diagnostics/DataStreamDiagnosticsTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/diagnostics/DataStreamDiagnosticsTests.java index 19f7f88c38f..0d9c52a28bd 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/diagnostics/DataStreamDiagnosticsTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/diagnostics/DataStreamDiagnosticsTests.java @@ -11,6 +11,7 @@ import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig; import org.elasticsearch.xpack.core.ml.job.config.DataDescription; import org.elasticsearch.xpack.core.ml.job.config.Detector; import org.elasticsearch.xpack.core.ml.job.config.Job; +import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts; import org.junit.Before; import java.util.Arrays; @@ -20,6 +21,7 @@ public class DataStreamDiagnosticsTests extends ESTestCase { private static final long BUCKET_SPAN = 60000; private Job job; + private DataCounts dataCounts; @Before public void setUpMocks() { @@ -32,10 +34,11 @@ public class DataStreamDiagnosticsTests extends ESTestCase { builder.setAnalysisConfig(acBuilder); builder.setDataDescription(new DataDescription.Builder()); job = createJob(TimeValue.timeValueMillis(BUCKET_SPAN), null); + dataCounts = new DataCounts(job.getId()); } public void testIncompleteBuckets() { - DataStreamDiagnostics d = new DataStreamDiagnostics(job); + DataStreamDiagnostics d = new DataStreamDiagnostics(job, dataCounts); d.checkRecord(1000); d.checkRecord(2000); @@ -81,7 +84,7 @@ public class DataStreamDiagnosticsTests extends ESTestCase { } public void testSimple() { - DataStreamDiagnostics d = new DataStreamDiagnostics(job); + DataStreamDiagnostics d = new DataStreamDiagnostics(job, dataCounts); d.checkRecord(70000); d.checkRecord(130000); @@ -103,7 +106,7 @@ public class DataStreamDiagnosticsTests extends ESTestCase { } public void testSimpleReverse() { - DataStreamDiagnostics d = new DataStreamDiagnostics(job); + DataStreamDiagnostics d = new DataStreamDiagnostics(job, dataCounts); d.checkRecord(610000); d.checkRecord(550000); @@ -126,7 +129,7 @@ public class DataStreamDiagnosticsTests extends ESTestCase { public void testWithLatencyLessThanTenBuckets() { job = createJob(TimeValue.timeValueMillis(BUCKET_SPAN), TimeValue.timeValueMillis(3 * BUCKET_SPAN)); - DataStreamDiagnostics d = new DataStreamDiagnostics(job); + DataStreamDiagnostics d = new DataStreamDiagnostics(job, dataCounts); long timestamp = 70000; while (timestamp < 70000 + 20 * BUCKET_SPAN) { @@ -141,7 +144,7 @@ public class DataStreamDiagnosticsTests extends ESTestCase { public void testWithLatencyGreaterThanTenBuckets() { job = createJob(TimeValue.timeValueMillis(BUCKET_SPAN), TimeValue.timeValueMillis(13 * BUCKET_SPAN + 10000)); - DataStreamDiagnostics d = new DataStreamDiagnostics(job); + DataStreamDiagnostics d = new DataStreamDiagnostics(job, dataCounts); long timestamp = 70000; while (timestamp < 70000 + 20 * BUCKET_SPAN) { @@ -155,7 +158,7 @@ public class DataStreamDiagnosticsTests extends ESTestCase { } public void testEmptyBuckets() { - DataStreamDiagnostics d = new DataStreamDiagnostics(job); + DataStreamDiagnostics d = new DataStreamDiagnostics(job, dataCounts); d.checkRecord(10000); d.checkRecord(70000); @@ -177,7 +180,7 @@ public class DataStreamDiagnosticsTests extends ESTestCase { } public void testEmptyBucketsStartLater() { - DataStreamDiagnostics d = new DataStreamDiagnostics(job); + DataStreamDiagnostics d = new DataStreamDiagnostics(job, dataCounts); d.checkRecord(1110000); d.checkRecord(1170000); @@ -199,7 +202,7 @@ public class DataStreamDiagnosticsTests extends ESTestCase { } public void testSparseBuckets() { - DataStreamDiagnostics d = new DataStreamDiagnostics(job); + DataStreamDiagnostics d = new DataStreamDiagnostics(job, dataCounts); sendManyDataPoints(d, 10000, 69000, 1000); sendManyDataPoints(d, 70000, 129000, 1200); @@ -227,7 +230,7 @@ public class DataStreamDiagnosticsTests extends ESTestCase { * signal */ public void testSparseBucketsLast() { - DataStreamDiagnostics d = new DataStreamDiagnostics(job); + DataStreamDiagnostics d = new DataStreamDiagnostics(job, dataCounts); sendManyDataPoints(d, 10000, 69000, 1000); sendManyDataPoints(d, 70000, 129000, 1200); @@ -255,7 +258,7 @@ public class DataStreamDiagnosticsTests extends ESTestCase { * signal on the 2nd to last */ public void testSparseBucketsLastTwo() { - DataStreamDiagnostics d = new DataStreamDiagnostics(job); + DataStreamDiagnostics d = new DataStreamDiagnostics(job, dataCounts); sendManyDataPoints(d, 10000, 69000, 1000); sendManyDataPoints(d, 70000, 129000, 1200); @@ -280,7 +283,7 @@ public class DataStreamDiagnosticsTests extends ESTestCase { } public void testMixedEmptyAndSparseBuckets() { - DataStreamDiagnostics d = new DataStreamDiagnostics(job); + DataStreamDiagnostics d = new DataStreamDiagnostics(job, dataCounts); sendManyDataPoints(d, 10000, 69000, 1000); sendManyDataPoints(d, 70000, 129000, 1200); @@ -308,7 +311,7 @@ public class DataStreamDiagnosticsTests extends ESTestCase { * whether counts are right. */ public void testEmptyBucketsLongerOutage() { - DataStreamDiagnostics d = new DataStreamDiagnostics(job); + DataStreamDiagnostics d = new DataStreamDiagnostics(job, dataCounts); d.checkRecord(10000); d.checkRecord(70000); @@ -336,7 +339,7 @@ public class DataStreamDiagnosticsTests extends ESTestCase { * The number of sparse buckets should not be to much, it could be normal. */ public void testSparseBucketsLongerPeriod() { - DataStreamDiagnostics d = new DataStreamDiagnostics(job); + DataStreamDiagnostics d = new DataStreamDiagnostics(job, dataCounts); sendManyDataPoints(d, 10000, 69000, 1000); sendManyDataPoints(d, 70000, 129000, 1200); @@ -374,7 +377,7 @@ public class DataStreamDiagnosticsTests extends ESTestCase { } public void testFlushAfterZeroRecords() { - DataStreamDiagnostics d = new DataStreamDiagnostics(job); + DataStreamDiagnostics d = new DataStreamDiagnostics(job, dataCounts); d.flush(); assertEquals(0, d.getBucketCount()); } diff --git a/x-pack/qa/ml-basic-multi-node/src/test/java/org/elasticsearch/xpack/ml/integration/MlBasicMultiNodeIT.java b/x-pack/qa/ml-basic-multi-node/src/test/java/org/elasticsearch/xpack/ml/integration/MlBasicMultiNodeIT.java index 3b84994f5ac..e7381050260 100644 --- a/x-pack/qa/ml-basic-multi-node/src/test/java/org/elasticsearch/xpack/ml/integration/MlBasicMultiNodeIT.java +++ b/x-pack/qa/ml-basic-multi-node/src/test/java/org/elasticsearch/xpack/ml/integration/MlBasicMultiNodeIT.java @@ -241,7 +241,7 @@ public class MlBasicMultiNodeIT extends ESRestTestCase { assertEquals(0, responseBody.get("invalid_date_count")); assertEquals(0, responseBody.get("missing_field_count")); assertEquals(0, responseBody.get("out_of_order_timestamp_count")); - assertEquals(0, responseBody.get("bucket_count")); + assertEquals(1000, responseBody.get("bucket_count")); // unintuitive: should return the earliest record timestamp of this feed??? assertEquals(null, responseBody.get("earliest_record_timestamp")); @@ -266,7 +266,7 @@ public class MlBasicMultiNodeIT extends ESRestTestCase { assertEquals(0, dataCountsDoc.get("invalid_date_count")); assertEquals(0, dataCountsDoc.get("missing_field_count")); assertEquals(0, dataCountsDoc.get("out_of_order_timestamp_count")); - assertEquals(0, dataCountsDoc.get("bucket_count")); + assertEquals(1000, dataCountsDoc.get("bucket_count")); assertEquals(1403481600000L, dataCountsDoc.get("earliest_record_timestamp")); assertEquals(1407082000000L, dataCountsDoc.get("latest_record_timestamp")); diff --git a/x-pack/qa/ml-native-tests/src/test/java/org/elasticsearch/xpack/ml/integration/ReopenJobWithGapIT.java b/x-pack/qa/ml-native-tests/src/test/java/org/elasticsearch/xpack/ml/integration/ReopenJobWithGapIT.java new file mode 100644 index 00000000000..993f3707237 --- /dev/null +++ b/x-pack/qa/ml-native-tests/src/test/java/org/elasticsearch/xpack/ml/integration/ReopenJobWithGapIT.java @@ -0,0 +1,93 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.integration; + +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.xpack.core.ml.action.GetBucketsAction; +import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig; +import org.elasticsearch.xpack.core.ml.job.config.DataDescription; +import org.elasticsearch.xpack.core.ml.job.config.Detector; +import org.elasticsearch.xpack.core.ml.job.config.Job; +import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts; +import org.junit.After; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import static org.hamcrest.Matchers.equalTo; + +/** + * Tests that after reopening a job and sending more + * data after a gap, data counts are reported correctly. + */ +public class ReopenJobWithGapIT extends MlNativeAutodetectIntegTestCase { + + private static final String JOB_ID = "reopen-job-with-gap-test"; + private static final long BUCKET_SPAN_SECONDS = 3600; + + @After + public void cleanUpTest() { + cleanUp(); + } + + public void test() throws Exception { + AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder( + Collections.singletonList(new Detector.Builder("count", null).build())); + analysisConfig.setBucketSpan(TimeValue.timeValueSeconds(BUCKET_SPAN_SECONDS)); + DataDescription.Builder dataDescription = new DataDescription.Builder(); + dataDescription.setTimeFormat("epoch"); + Job.Builder job = new Job.Builder(JOB_ID); + job.setAnalysisConfig(analysisConfig); + job.setDataDescription(dataDescription); + + registerJob(job); + putJob(job); + openJob(job.getId()); + + long timestamp = 1483228800L; // 2017-01-01T00:00:00Z + List data = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + data.add(createJsonRecord(createRecord(timestamp))); + timestamp += BUCKET_SPAN_SECONDS; + } + + postData(job.getId(), data.stream().collect(Collectors.joining())); + flushJob(job.getId(), true); + closeJob(job.getId()); + + GetBucketsAction.Request request = new GetBucketsAction.Request(job.getId()); + request.setExcludeInterim(true); + assertThat(client().execute(GetBucketsAction.INSTANCE, request).actionGet().getBuckets().count(), equalTo(9L)); + assertThat(getJobStats(job.getId()).get(0).getDataCounts().getBucketCount(), equalTo(9L)); + + timestamp += 10 * BUCKET_SPAN_SECONDS; + data = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + data.add(createJsonRecord(createRecord(timestamp))); + timestamp += BUCKET_SPAN_SECONDS; + } + + openJob(job.getId()); + postData(job.getId(), data.stream().collect(Collectors.joining())); + flushJob(job.getId(), true); + closeJob(job.getId()); + + assertThat(client().execute(GetBucketsAction.INSTANCE, request).actionGet().getBuckets().count(), equalTo(29L)); + DataCounts dataCounts = getJobStats(job.getId()).get(0).getDataCounts(); + assertThat(dataCounts.getBucketCount(), equalTo(29L)); + assertThat(dataCounts.getEmptyBucketCount(), equalTo(10L)); + } + + private static Map createRecord(long timestamp) { + Map record = new HashMap<>(); + record.put("time", timestamp); + return record; + } +}