[ML] Account for gaps in data counts after job is reopened (#30294)

This commit fixes an issue with the data diagnostics were
empty buckets are not reported even though they should. Once
a job is reopened, the diagnostics do not get initialized from
the current data counts (especially the latest record timestamp).
The result is that if the data that is sent have a time gap compared
to the previous ones, that gap is not accounted for in the empty bucket
count.

This commit fixes that by initializing the diagnostics with the current
data counts.

Closes #30080
This commit is contained in:
Dimitris Athanasiou 2018-05-03 15:08:24 +01:00 committed by GitHub
parent ccd791b3b4
commit 3b260dcfc1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 129 additions and 20 deletions

View File

@ -109,6 +109,10 @@ Do not ignore request analysis/similarity settings on index resize operations wh
Fix NPE when CumulativeSum agg encounters null value/empty bucket ({pull}29641[#29641])
Machine Learning::
* Account for gaps in data counts after job is reopened ({pull}30294[#30294])
//[float]
//=== Regressions

View File

@ -82,7 +82,7 @@ public class DataCountsReporter extends AbstractComponent {
totalRecordStats = counts;
incrementalRecordStats = new DataCounts(job.getId());
diagnostics = new DataStreamDiagnostics(job);
diagnostics = new DataStreamDiagnostics(job, counts);
acceptablePercentDateParseErrors = ACCEPTABLE_PERCENTAGE_DATE_PARSE_ERRORS_SETTING.get(settings);
acceptablePercentOutOfOrderErrors = ACCEPTABLE_PERCENTAGE_OUT_OF_ORDER_ERRORS_SETTING.get(settings);

View File

@ -6,8 +6,11 @@
package org.elasticsearch.xpack.ml.job.process.diagnostics;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts;
import org.elasticsearch.xpack.core.ml.utils.Intervals;
import java.util.Date;
/**
* A moving window of buckets that allow keeping
* track of some statistics like the bucket count,
@ -33,12 +36,17 @@ class BucketDiagnostics {
private long latestFlushedBucketStartMs = -1;
private final BucketFlushListener bucketFlushListener;
BucketDiagnostics(Job job, BucketFlushListener bucketFlushListener) {
BucketDiagnostics(Job job, DataCounts dataCounts, BucketFlushListener bucketFlushListener) {
bucketSpanMs = job.getAnalysisConfig().getBucketSpan().millis();
latencyMs = job.getAnalysisConfig().getLatency() == null ? 0 : job.getAnalysisConfig().getLatency().millis();
maxSize = Math.max((int) (Intervals.alignToCeil(latencyMs, bucketSpanMs) / bucketSpanMs), MIN_BUCKETS);
buckets = new long[maxSize];
this.bucketFlushListener = bucketFlushListener;
Date latestRecordTimestamp = dataCounts.getLatestRecordTimeStamp();
if (latestRecordTimestamp != null) {
addRecord(latestRecordTimestamp.getTime());
}
}
void addRecord(long recordTimestampMs) {

View File

@ -8,6 +8,7 @@ package org.elasticsearch.xpack.ml.job.process.diagnostics;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts;
import java.util.Date;
@ -32,8 +33,8 @@ public class DataStreamDiagnostics {
private long sparseBucketCount = 0;
private long latestSparseBucketTime = -1;
public DataStreamDiagnostics(Job job) {
bucketDiagnostics = new BucketDiagnostics(job, createBucketFlushListener());
public DataStreamDiagnostics(Job job, DataCounts dataCounts) {
bucketDiagnostics = new BucketDiagnostics(job, dataCounts, createBucketFlushListener());
}
private BucketDiagnostics.BucketFlushListener createBucketFlushListener() {

View File

@ -11,6 +11,7 @@ import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig;
import org.elasticsearch.xpack.core.ml.job.config.DataDescription;
import org.elasticsearch.xpack.core.ml.job.config.Detector;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts;
import org.junit.Before;
import java.util.Arrays;
@ -20,6 +21,7 @@ public class DataStreamDiagnosticsTests extends ESTestCase {
private static final long BUCKET_SPAN = 60000;
private Job job;
private DataCounts dataCounts;
@Before
public void setUpMocks() {
@ -32,10 +34,11 @@ public class DataStreamDiagnosticsTests extends ESTestCase {
builder.setAnalysisConfig(acBuilder);
builder.setDataDescription(new DataDescription.Builder());
job = createJob(TimeValue.timeValueMillis(BUCKET_SPAN), null);
dataCounts = new DataCounts(job.getId());
}
public void testIncompleteBuckets() {
DataStreamDiagnostics d = new DataStreamDiagnostics(job);
DataStreamDiagnostics d = new DataStreamDiagnostics(job, dataCounts);
d.checkRecord(1000);
d.checkRecord(2000);
@ -81,7 +84,7 @@ public class DataStreamDiagnosticsTests extends ESTestCase {
}
public void testSimple() {
DataStreamDiagnostics d = new DataStreamDiagnostics(job);
DataStreamDiagnostics d = new DataStreamDiagnostics(job, dataCounts);
d.checkRecord(70000);
d.checkRecord(130000);
@ -103,7 +106,7 @@ public class DataStreamDiagnosticsTests extends ESTestCase {
}
public void testSimpleReverse() {
DataStreamDiagnostics d = new DataStreamDiagnostics(job);
DataStreamDiagnostics d = new DataStreamDiagnostics(job, dataCounts);
d.checkRecord(610000);
d.checkRecord(550000);
@ -126,7 +129,7 @@ public class DataStreamDiagnosticsTests extends ESTestCase {
public void testWithLatencyLessThanTenBuckets() {
job = createJob(TimeValue.timeValueMillis(BUCKET_SPAN), TimeValue.timeValueMillis(3 * BUCKET_SPAN));
DataStreamDiagnostics d = new DataStreamDiagnostics(job);
DataStreamDiagnostics d = new DataStreamDiagnostics(job, dataCounts);
long timestamp = 70000;
while (timestamp < 70000 + 20 * BUCKET_SPAN) {
@ -141,7 +144,7 @@ public class DataStreamDiagnosticsTests extends ESTestCase {
public void testWithLatencyGreaterThanTenBuckets() {
job = createJob(TimeValue.timeValueMillis(BUCKET_SPAN), TimeValue.timeValueMillis(13 * BUCKET_SPAN + 10000));
DataStreamDiagnostics d = new DataStreamDiagnostics(job);
DataStreamDiagnostics d = new DataStreamDiagnostics(job, dataCounts);
long timestamp = 70000;
while (timestamp < 70000 + 20 * BUCKET_SPAN) {
@ -155,7 +158,7 @@ public class DataStreamDiagnosticsTests extends ESTestCase {
}
public void testEmptyBuckets() {
DataStreamDiagnostics d = new DataStreamDiagnostics(job);
DataStreamDiagnostics d = new DataStreamDiagnostics(job, dataCounts);
d.checkRecord(10000);
d.checkRecord(70000);
@ -177,7 +180,7 @@ public class DataStreamDiagnosticsTests extends ESTestCase {
}
public void testEmptyBucketsStartLater() {
DataStreamDiagnostics d = new DataStreamDiagnostics(job);
DataStreamDiagnostics d = new DataStreamDiagnostics(job, dataCounts);
d.checkRecord(1110000);
d.checkRecord(1170000);
@ -199,7 +202,7 @@ public class DataStreamDiagnosticsTests extends ESTestCase {
}
public void testSparseBuckets() {
DataStreamDiagnostics d = new DataStreamDiagnostics(job);
DataStreamDiagnostics d = new DataStreamDiagnostics(job, dataCounts);
sendManyDataPoints(d, 10000, 69000, 1000);
sendManyDataPoints(d, 70000, 129000, 1200);
@ -227,7 +230,7 @@ public class DataStreamDiagnosticsTests extends ESTestCase {
* signal
*/
public void testSparseBucketsLast() {
DataStreamDiagnostics d = new DataStreamDiagnostics(job);
DataStreamDiagnostics d = new DataStreamDiagnostics(job, dataCounts);
sendManyDataPoints(d, 10000, 69000, 1000);
sendManyDataPoints(d, 70000, 129000, 1200);
@ -255,7 +258,7 @@ public class DataStreamDiagnosticsTests extends ESTestCase {
* signal on the 2nd to last
*/
public void testSparseBucketsLastTwo() {
DataStreamDiagnostics d = new DataStreamDiagnostics(job);
DataStreamDiagnostics d = new DataStreamDiagnostics(job, dataCounts);
sendManyDataPoints(d, 10000, 69000, 1000);
sendManyDataPoints(d, 70000, 129000, 1200);
@ -280,7 +283,7 @@ public class DataStreamDiagnosticsTests extends ESTestCase {
}
public void testMixedEmptyAndSparseBuckets() {
DataStreamDiagnostics d = new DataStreamDiagnostics(job);
DataStreamDiagnostics d = new DataStreamDiagnostics(job, dataCounts);
sendManyDataPoints(d, 10000, 69000, 1000);
sendManyDataPoints(d, 70000, 129000, 1200);
@ -308,7 +311,7 @@ public class DataStreamDiagnosticsTests extends ESTestCase {
* whether counts are right.
*/
public void testEmptyBucketsLongerOutage() {
DataStreamDiagnostics d = new DataStreamDiagnostics(job);
DataStreamDiagnostics d = new DataStreamDiagnostics(job, dataCounts);
d.checkRecord(10000);
d.checkRecord(70000);
@ -336,7 +339,7 @@ public class DataStreamDiagnosticsTests extends ESTestCase {
* The number of sparse buckets should not be to much, it could be normal.
*/
public void testSparseBucketsLongerPeriod() {
DataStreamDiagnostics d = new DataStreamDiagnostics(job);
DataStreamDiagnostics d = new DataStreamDiagnostics(job, dataCounts);
sendManyDataPoints(d, 10000, 69000, 1000);
sendManyDataPoints(d, 70000, 129000, 1200);
@ -374,7 +377,7 @@ public class DataStreamDiagnosticsTests extends ESTestCase {
}
public void testFlushAfterZeroRecords() {
DataStreamDiagnostics d = new DataStreamDiagnostics(job);
DataStreamDiagnostics d = new DataStreamDiagnostics(job, dataCounts);
d.flush();
assertEquals(0, d.getBucketCount());
}

View File

@ -241,7 +241,7 @@ public class MlBasicMultiNodeIT extends ESRestTestCase {
assertEquals(0, responseBody.get("invalid_date_count"));
assertEquals(0, responseBody.get("missing_field_count"));
assertEquals(0, responseBody.get("out_of_order_timestamp_count"));
assertEquals(0, responseBody.get("bucket_count"));
assertEquals(1000, responseBody.get("bucket_count"));
// unintuitive: should return the earliest record timestamp of this feed???
assertEquals(null, responseBody.get("earliest_record_timestamp"));
@ -266,7 +266,7 @@ public class MlBasicMultiNodeIT extends ESRestTestCase {
assertEquals(0, dataCountsDoc.get("invalid_date_count"));
assertEquals(0, dataCountsDoc.get("missing_field_count"));
assertEquals(0, dataCountsDoc.get("out_of_order_timestamp_count"));
assertEquals(0, dataCountsDoc.get("bucket_count"));
assertEquals(1000, dataCountsDoc.get("bucket_count"));
assertEquals(1403481600000L, dataCountsDoc.get("earliest_record_timestamp"));
assertEquals(1407082000000L, dataCountsDoc.get("latest_record_timestamp"));

View File

@ -0,0 +1,93 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.integration;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.xpack.core.ml.action.GetBucketsAction;
import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig;
import org.elasticsearch.xpack.core.ml.job.config.DataDescription;
import org.elasticsearch.xpack.core.ml.job.config.Detector;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts;
import org.junit.After;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import static org.hamcrest.Matchers.equalTo;
/**
* Tests that after reopening a job and sending more
* data after a gap, data counts are reported correctly.
*/
public class ReopenJobWithGapIT extends MlNativeAutodetectIntegTestCase {
private static final String JOB_ID = "reopen-job-with-gap-test";
private static final long BUCKET_SPAN_SECONDS = 3600;
@After
public void cleanUpTest() {
cleanUp();
}
public void test() throws Exception {
AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(
Collections.singletonList(new Detector.Builder("count", null).build()));
analysisConfig.setBucketSpan(TimeValue.timeValueSeconds(BUCKET_SPAN_SECONDS));
DataDescription.Builder dataDescription = new DataDescription.Builder();
dataDescription.setTimeFormat("epoch");
Job.Builder job = new Job.Builder(JOB_ID);
job.setAnalysisConfig(analysisConfig);
job.setDataDescription(dataDescription);
registerJob(job);
putJob(job);
openJob(job.getId());
long timestamp = 1483228800L; // 2017-01-01T00:00:00Z
List<String> data = new ArrayList<>();
for (int i = 0; i < 10; i++) {
data.add(createJsonRecord(createRecord(timestamp)));
timestamp += BUCKET_SPAN_SECONDS;
}
postData(job.getId(), data.stream().collect(Collectors.joining()));
flushJob(job.getId(), true);
closeJob(job.getId());
GetBucketsAction.Request request = new GetBucketsAction.Request(job.getId());
request.setExcludeInterim(true);
assertThat(client().execute(GetBucketsAction.INSTANCE, request).actionGet().getBuckets().count(), equalTo(9L));
assertThat(getJobStats(job.getId()).get(0).getDataCounts().getBucketCount(), equalTo(9L));
timestamp += 10 * BUCKET_SPAN_SECONDS;
data = new ArrayList<>();
for (int i = 0; i < 10; i++) {
data.add(createJsonRecord(createRecord(timestamp)));
timestamp += BUCKET_SPAN_SECONDS;
}
openJob(job.getId());
postData(job.getId(), data.stream().collect(Collectors.joining()));
flushJob(job.getId(), true);
closeJob(job.getId());
assertThat(client().execute(GetBucketsAction.INSTANCE, request).actionGet().getBuckets().count(), equalTo(29L));
DataCounts dataCounts = getJobStats(job.getId()).get(0).getDataCounts();
assertThat(dataCounts.getBucketCount(), equalTo(29L));
assertThat(dataCounts.getEmptyBucketCount(), equalTo(10L));
}
private static Map<String, Object> createRecord(long timestamp) {
Map<String, Object> record = new HashMap<>();
record.put("time", timestamp);
return record;
}
}