From ce1a91054251fd4a8c6da4f93a2948925d10ef01 Mon Sep 17 00:00:00 2001 From: Hendrik Muhs Date: Sun, 12 Mar 2017 09:40:27 -0700 Subject: [PATCH] [ML] DataStreamDiagnostics Rebase (elastic/x-pack-elasticsearch#698) DataStreamDiagnostics DataStreamDiagnostics analyzes input data regarding machine learning fit. It checks whether data is sane/plausible as anomaly detection on broken data (or misconfiguration). Original commit: elastic/x-pack-elasticsearch@2f37d3c9603cdabeb379f7370f087cae15d8f720 --- .../persistence/ElasticsearchMappings.java | 15 + .../ml/job/process/DataCountsReporter.java | 59 +++- .../ml/job/process/DataStreamDiagnostics.java | 177 +++++++++++ .../process/autodetect/state/DataCounts.java | 168 ++++++++++- .../writer/AbstractDataToProcessWriter.java | 12 +- .../ml/job/results/ReservedFieldNames.java | 5 + .../GetJobStatsActionResponseTests.java | 1 + .../action/PostDataActionResponseTests.java | 2 - .../ml/datafeed/DatafeedJobRunnerTests.java | 9 +- .../xpack/ml/datafeed/DatafeedJobTests.java | 3 +- .../job/process/DataCountsReporterTests.java | 12 +- .../process/DataStreamDiagnosticsTests.java | 278 ++++++++++++++++++ .../autodetect/state/DataCountsTests.java | 32 +- 13 files changed, 742 insertions(+), 31 deletions(-) create mode 100644 plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/DataStreamDiagnostics.java create mode 100644 plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/DataStreamDiagnosticsTests.java diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/ElasticsearchMappings.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/ElasticsearchMappings.java index e2f2e31d103..ea9cb707e8e 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/ElasticsearchMappings.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/ElasticsearchMappings.java @@ -466,12 +466,27 @@ public class ElasticsearchMappings { .startObject(DataCounts.OUT_OF_ORDER_TIME_COUNT.getPreferredName()) .field(TYPE, LONG) .endObject() + .startObject(DataCounts.EMPTY_BUCKET_COUNT.getPreferredName()) + .field(TYPE, LONG) + .endObject() + .startObject(DataCounts.SPARSE_BUCKET_COUNT.getPreferredName()) + .field(TYPE, LONG) + .endObject() + .startObject(DataCounts.BUCKET_COUNT.getPreferredName()) + .field(TYPE, LONG) + .endObject() .startObject(DataCounts.EARLIEST_RECORD_TIME.getPreferredName()) .field(TYPE, DATE) .endObject() .startObject(DataCounts.LATEST_RECORD_TIME.getPreferredName()) .field(TYPE, DATE) .endObject() + .startObject(DataCounts.LATEST_EMPTY_BUCKET_TIME.getPreferredName()) + .field(TYPE, DATE) + .endObject() + .startObject(DataCounts.LATEST_SPARSE_BUCKET_TIME.getPreferredName()) + .field(TYPE, DATE) + .endObject() .startObject(DataCounts.LAST_DATA_TIME.getPreferredName()) .field(TYPE, DATE) .endObject() diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/DataCountsReporter.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/DataCountsReporter.java index cba69a3d20d..0b3a966b2d3 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/DataCountsReporter.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/DataCountsReporter.java @@ -191,6 +191,43 @@ public class DataCountsReporter extends AbstractComponent implements Closeable { incrementalRecordStats.incrementInputFieldCount(inputFieldCount); } + public void reportEmptyBucket(long recordTimeMs ){ + Date recordDate = new Date(recordTimeMs); + + totalRecordStats.incrementEmptyBucketCount(1); + totalRecordStats.setLatestEmptyBucketTimeStamp(recordDate); + incrementalRecordStats.incrementEmptyBucketCount(1); + incrementalRecordStats.setLatestEmptyBucketTimeStamp(recordDate); + } + + public void reportEmptyBuckets(long additional, long lastRecordTimeMs ){ + Date recordDate = new Date(lastRecordTimeMs); + + totalRecordStats.incrementEmptyBucketCount(additional); + totalRecordStats.setLatestEmptyBucketTimeStamp(recordDate); + incrementalRecordStats.incrementEmptyBucketCount(additional); + incrementalRecordStats.setLatestEmptyBucketTimeStamp(recordDate); + } + + public void reportSparseBucket(long recordTimeMs){ + Date recordDate = new Date(recordTimeMs); + + totalRecordStats.incrementSparseBucketCount(1); + totalRecordStats.setLatestSparseBucketTimeStamp(recordDate); + incrementalRecordStats.incrementSparseBucketCount(1); + incrementalRecordStats.setLatestSparseBucketTimeStamp(recordDate); + } + + public void reportBucket(){ + totalRecordStats.incrementBucketCount(1); + incrementalRecordStats.incrementBucketCount(1); + } + + public void reportBuckets(long additional){ + totalRecordStats.incrementBucketCount(additional); + incrementalRecordStats.incrementBucketCount(additional); + } + /** * Total records seen = records written to the Engine (processed record * count) + date parse error records count + out of order record count. @@ -216,7 +253,19 @@ public class DataCountsReporter extends AbstractComponent implements Closeable { public long getOutOfOrderRecordCount() { return totalRecordStats.getOutOfOrderTimeStampCount(); } - + + public long getEmptyBucketCount() { + return totalRecordStats.getEmptyBucketCount(); + } + + public long getSparseBucketCount() { + return totalRecordStats.getSparseBucketCount(); + } + + public long getBucketCount() { + return totalRecordStats.getBucketCount(); + } + public long getBytesRead() { return totalRecordStats.getInputBytes(); } @@ -224,6 +273,14 @@ public class DataCountsReporter extends AbstractComponent implements Closeable { public Date getLatestRecordTime() { return totalRecordStats.getLatestRecordTimeStamp(); } + + public Date getLatestEmptyBucketTime() { + return totalRecordStats.getLatestEmptyBucketTimeStamp(); + } + + public Date getLatestSparseBucketTime() { + return totalRecordStats.getLatestSparseBucketTimeStamp(); + } public long getProcessedFieldCount() { totalRecordStats.calcProcessedFieldCount(getAnalysedFieldsPerRecord()); diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/DataStreamDiagnostics.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/DataStreamDiagnostics.java new file mode 100644 index 00000000000..c7732a92541 --- /dev/null +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/DataStreamDiagnostics.java @@ -0,0 +1,177 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.job.process; + +import org.apache.logging.log4j.Logger; +import org.elasticsearch.xpack.ml.job.config.AnalysisConfig; +import org.apache.lucene.util.Counter; + +import java.util.SortedMap; +import java.util.TreeMap; + +public class DataStreamDiagnostics { + + /** + * Minimum window to take into consideration for bucket count histogram. + */ + private static final int MIN_BUCKET_WINDOW = 10; + + /** + * Threshold to report potential sparsity problems. + * + * Sparsity score is calculated: log(average) - log(current) + * + * If score is above the threshold, bucket is reported as sparse bucket. + */ + private static final int DATA_SPARSITY_THRESHOLD = 2; + private static final long MS_IN_SECOND = 1000; + + private final DataCountsReporter dataCountsReporter; + private final Logger logger; + + /** + * Container for the histogram + * + * Note: Using a sorted map in order to iterate in order when consuming the + * data. The counter is lazily initialized and potentially missing in case + * of empty buckets. + * + * The container gets pruned along the data streaming based on the bucket + * window, so it should not contain more than max(MIN_BUCKET_WINDOW, + * 'buckets_required_by_latency') + 1 items at any time. + */ + private final SortedMap movingBucketHistogram = new TreeMap<>(); + + private final long bucketSpan; + private final long latency; + private long movingBucketCount = 0; + private long lastReportedBucket = -1; + + public DataStreamDiagnostics(DataCountsReporter dataCountsReporter, AnalysisConfig analysisConfig, Logger logger) { + this.dataCountsReporter = dataCountsReporter; + this.logger = logger; + bucketSpan = analysisConfig.getBucketSpanOrDefault(); + latency = analysisConfig.getLatency(); + } + + /** + * Check record + * + * @param recordTimestampInMs + * The record timestamp in milliseconds since epoch + */ + + public void checkRecord(long recordTimestampInMs) { + checkBucketing(recordTimestampInMs); + } + + /** + * Flush all counters, should be called at the end of the data stream + */ + public void flush() { + // flush all we know + flush(movingBucketHistogram.lastKey() + 1); + } + + /** + * Check bucketing of record. Report empty and sparse buckets. + * + * @param recordTimestampInMs + * The record timestamp in milliseconds since epoch + */ + private void checkBucketing(long recordTimestampInMs) { + long bucket = (recordTimestampInMs / MS_IN_SECOND) / bucketSpan; + long bucketHistogramStartBucket = ((recordTimestampInMs / MS_IN_SECOND) - latency) / bucketSpan; + + bucketHistogramStartBucket = Math.min(bucket - MIN_BUCKET_WINDOW, bucketHistogramStartBucket); + + movingBucketHistogram.computeIfAbsent(bucket, l -> Counter.newCounter()).addAndGet(1); + ++movingBucketCount; + + // find the very first bucket + if (lastReportedBucket == -1) { + lastReportedBucket = bucket - 1; + } + + // flush all bucket out of the window + flush(bucketHistogramStartBucket); + } + + /** + * Flush Bucket reporting till the given bucket. + * + * @param bucketTimeStamp + * The timestamp of the last bucket that can be flushed. + */ + private void flush(long bucketTimeStamp) { + + // check for a longer period of empty buckets + long emptyBuckets = movingBucketHistogram.firstKey() - lastReportedBucket - 1; + if (emptyBuckets > 0) { + dataCountsReporter.reportBuckets(emptyBuckets); + dataCountsReporter.reportEmptyBuckets(emptyBuckets, (movingBucketHistogram.firstKey() - 1) * bucketSpan * MS_IN_SECOND); + lastReportedBucket = movingBucketHistogram.firstKey() - 1; + } + + // calculate the average number of data points in a bucket based on the + // current history + double averageBucketSize = (float) movingBucketCount / movingBucketHistogram.size(); + + // prune all buckets that can be flushed + long lastBucketSparsityCheck = Math.min(bucketTimeStamp, movingBucketHistogram.lastKey()); + + for (long pruneBucket = movingBucketHistogram.firstKey(); pruneBucket < lastBucketSparsityCheck; ++pruneBucket) { + + Counter bucketSizeHolder = movingBucketHistogram.remove(pruneBucket); + long bucketSize = bucketSizeHolder != null ? bucketSizeHolder.get() : 0L; + + logger.debug("Checking bucket {} compare sizes, this bucket: {} average: {}", pruneBucket, bucketSize, averageBucketSize); + dataCountsReporter.reportBucket(); + lastReportedBucket = pruneBucket; + + // substract bucketSize from the counter + movingBucketCount -= bucketSize; + + // check if bucket is empty + if (bucketSize == 0L) { + dataCountsReporter.reportEmptyBucket(pruneBucket * bucketSpan * MS_IN_SECOND); + + // do not do sparse analysis on an empty bucket + continue; + } + + // simplistic way to calculate data sparsity, just take the log and + // check the difference + double logAverageBucketSize = Math.log(averageBucketSize); + double logBucketSize = Math.log(bucketSize); + double sparsityScore = logAverageBucketSize - logBucketSize; + + if (sparsityScore > DATA_SPARSITY_THRESHOLD) { + logger.debug("Sparse bucket {}, this bucket: {} average: {}, sparsity score: {}", pruneBucket, bucketSize, + averageBucketSize, sparsityScore); + dataCountsReporter.reportSparseBucket(pruneBucket * bucketSpan * MS_IN_SECOND); + } + } + + // prune the rest if necessary + for (long pruneBucket = lastBucketSparsityCheck; pruneBucket < bucketTimeStamp; ++pruneBucket) { + Counter bucketSizeHolder = movingBucketHistogram.remove(pruneBucket); + long bucketSize = bucketSizeHolder != null ? bucketSizeHolder.get() : 0L; + + dataCountsReporter.reportBucket(); + lastReportedBucket = pruneBucket; + + // substract bucketSize from the counter + movingBucketCount -= bucketSize; + + // check if bucket is empty + if (bucketSize == 0L) { + dataCountsReporter.reportEmptyBucket(pruneBucket * bucketSpan * MS_IN_SECOND); + } + } + } + +} diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/state/DataCounts.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/state/DataCounts.java index 4ddb26f0680..9c059222eeb 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/state/DataCounts.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/state/DataCounts.java @@ -45,10 +45,15 @@ public class DataCounts extends ToXContentToBytes implements Writeable { public static final String INVALID_DATE_COUNT_STR = "invalid_date_count"; public static final String MISSING_FIELD_COUNT_STR = "missing_field_count"; public static final String OUT_OF_ORDER_TIME_COUNT_STR = "out_of_order_timestamp_count"; + public static final String EMPTY_BUCKET_COUNT_STR = "empty_bucket_count"; + public static final String SPARSE_BUCKET_COUNT_STR = "sparse_bucket_count"; + public static final String BUCKET_COUNT_STR = "bucket_count"; public static final String EARLIEST_RECORD_TIME_STR = "earliest_record_timestamp"; public static final String LATEST_RECORD_TIME_STR = "latest_record_timestamp"; public static final String LAST_DATA_TIME_STR = "last_data_time"; - + public static final String LATEST_EMPTY_BUCKET_TIME_STR = "latest_empty_bucket_timestamp"; + public static final String LATEST_SPARSE_BUCKET_TIME_STR = "latest_sparse_bucket_timestamp"; + public static final ParseField PROCESSED_RECORD_COUNT = new ParseField(PROCESSED_RECORD_COUNT_STR); public static final ParseField PROCESSED_FIELD_COUNT = new ParseField(PROCESSED_FIELD_COUNT_STR); public static final ParseField INPUT_BYTES = new ParseField(INPUT_BYTES_STR); @@ -57,15 +62,21 @@ public class DataCounts extends ToXContentToBytes implements Writeable { public static final ParseField INVALID_DATE_COUNT = new ParseField(INVALID_DATE_COUNT_STR); public static final ParseField MISSING_FIELD_COUNT = new ParseField(MISSING_FIELD_COUNT_STR); public static final ParseField OUT_OF_ORDER_TIME_COUNT = new ParseField(OUT_OF_ORDER_TIME_COUNT_STR); + public static final ParseField EMPTY_BUCKET_COUNT = new ParseField(EMPTY_BUCKET_COUNT_STR); + public static final ParseField SPARSE_BUCKET_COUNT = new ParseField(SPARSE_BUCKET_COUNT_STR); + public static final ParseField BUCKET_COUNT = new ParseField(BUCKET_COUNT_STR); public static final ParseField EARLIEST_RECORD_TIME = new ParseField(EARLIEST_RECORD_TIME_STR); public static final ParseField LATEST_RECORD_TIME = new ParseField(LATEST_RECORD_TIME_STR); public static final ParseField LAST_DATA_TIME = new ParseField(LAST_DATA_TIME_STR); + public static final ParseField LATEST_EMPTY_BUCKET_TIME = new ParseField(LATEST_EMPTY_BUCKET_TIME_STR); + public static final ParseField LATEST_SPARSE_BUCKET_TIME = new ParseField(LATEST_SPARSE_BUCKET_TIME_STR); public static final ParseField TYPE = new ParseField("data_counts"); public static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>("data_counts", a -> new DataCounts((String) a[0], (long) a[1], (long) a[2], (long) a[3], - (long) a[4], (long) a[5], (long) a[6], (long) a[7], (Date) a[8], (Date) a[9], (Date) a[10])); + (long) a[4], (long) a[5], (long) a[6], (long) a[7], (long) a[8], (long) a[9], (long) a[10], + (Date) a[11], (Date) a[12], (Date) a[13], (Date) a[14], (Date) a[15])); static { PARSER.declareString(ConstructingObjectParser.constructorArg(), Job.ID); @@ -76,6 +87,9 @@ public class DataCounts extends ToXContentToBytes implements Writeable { PARSER.declareLong(ConstructingObjectParser.constructorArg(), INVALID_DATE_COUNT); PARSER.declareLong(ConstructingObjectParser.constructorArg(), MISSING_FIELD_COUNT); PARSER.declareLong(ConstructingObjectParser.constructorArg(), OUT_OF_ORDER_TIME_COUNT); + PARSER.declareLong(ConstructingObjectParser.constructorArg(), EMPTY_BUCKET_COUNT); + PARSER.declareLong(ConstructingObjectParser.constructorArg(), SPARSE_BUCKET_COUNT); + PARSER.declareLong(ConstructingObjectParser.constructorArg(), BUCKET_COUNT); PARSER.declareField(ConstructingObjectParser.optionalConstructorArg(), p -> { if (p.currentToken() == Token.VALUE_NUMBER) { return new Date(p.longValue()); @@ -103,6 +117,24 @@ public class DataCounts extends ToXContentToBytes implements Writeable { throw new IllegalArgumentException( "unexpected token [" + p.currentToken() + "] for [" + LAST_DATA_TIME.getPreferredName() + "]"); }, LAST_DATA_TIME, ValueType.VALUE); + PARSER.declareField(ConstructingObjectParser.optionalConstructorArg(), p -> { + if (p.currentToken() == Token.VALUE_NUMBER) { + return new Date(p.longValue()); + } else if (p.currentToken() == Token.VALUE_STRING) { + return new Date(TimeUtils.dateStringToEpoch(p.text())); + } + throw new IllegalArgumentException( + "unexpected token [" + p.currentToken() + "] for [" + LATEST_EMPTY_BUCKET_TIME.getPreferredName() + "]"); + }, LATEST_EMPTY_BUCKET_TIME, ValueType.VALUE); + PARSER.declareField(ConstructingObjectParser.optionalConstructorArg(), p -> { + if (p.currentToken() == Token.VALUE_NUMBER) { + return new Date(p.longValue()); + } else if (p.currentToken() == Token.VALUE_STRING) { + return new Date(TimeUtils.dateStringToEpoch(p.text())); + } + throw new IllegalArgumentException( + "unexpected token [" + p.currentToken() + "] for [" + LATEST_SPARSE_BUCKET_TIME.getPreferredName() + "]"); + }, LATEST_SPARSE_BUCKET_TIME, ValueType.VALUE); PARSER.declareLong((t, u) -> {;}, INPUT_RECORD_COUNT); } @@ -118,14 +150,21 @@ public class DataCounts extends ToXContentToBytes implements Writeable { private long invalidDateCount; private long missingFieldCount; private long outOfOrderTimeStampCount; + private long emptyBucketCount; + private long sparseBucketCount; + private long bucketCount; // NORELEASE: Use Jodatime instead private Date earliestRecordTimeStamp; private Date latestRecordTimeStamp; private Date lastDataTimeStamp; + private Date latestEmptyBucketTimeStamp; + private Date latestSparseBucketTimeStamp; public DataCounts(String jobId, long processedRecordCount, long processedFieldCount, long inputBytes, long inputFieldCount, long invalidDateCount, long missingFieldCount, long outOfOrderTimeStampCount, - Date earliestRecordTimeStamp, Date latestRecordTimeStamp, Date lastDataTimeStamp) { + long emptyBucketCount, long sparseBucketCount, long bucketCount, + Date earliestRecordTimeStamp, Date latestRecordTimeStamp, Date lastDataTimeStamp, + Date latestEmptyBucketTimeStamp, Date latestSparseBucketTimeStamp) { this.jobId = jobId; this.processedRecordCount = processedRecordCount; this.processedFieldCount = processedFieldCount; @@ -134,9 +173,14 @@ public class DataCounts extends ToXContentToBytes implements Writeable { this.invalidDateCount = invalidDateCount; this.missingFieldCount = missingFieldCount; this.outOfOrderTimeStampCount = outOfOrderTimeStampCount; + this.emptyBucketCount = emptyBucketCount; + this.sparseBucketCount = sparseBucketCount; + this.bucketCount = bucketCount; this.latestRecordTimeStamp = latestRecordTimeStamp; this.earliestRecordTimeStamp = earliestRecordTimeStamp; this.lastDataTimeStamp = lastDataTimeStamp; + this.latestEmptyBucketTimeStamp = latestEmptyBucketTimeStamp; + this.latestSparseBucketTimeStamp = latestSparseBucketTimeStamp; } public DataCounts(String jobId) { @@ -152,9 +196,14 @@ public class DataCounts extends ToXContentToBytes implements Writeable { invalidDateCount = lhs.invalidDateCount; missingFieldCount = lhs.missingFieldCount; outOfOrderTimeStampCount = lhs.outOfOrderTimeStampCount; + emptyBucketCount = lhs.emptyBucketCount; + sparseBucketCount = lhs.sparseBucketCount; + bucketCount = lhs.bucketCount; latestRecordTimeStamp = lhs.latestRecordTimeStamp; earliestRecordTimeStamp = lhs.earliestRecordTimeStamp; lastDataTimeStamp = lhs.lastDataTimeStamp; + latestEmptyBucketTimeStamp = lhs.latestEmptyBucketTimeStamp; + latestSparseBucketTimeStamp = lhs.latestSparseBucketTimeStamp; } public DataCounts(StreamInput in) throws IOException { @@ -166,6 +215,9 @@ public class DataCounts extends ToXContentToBytes implements Writeable { invalidDateCount = in.readVLong(); missingFieldCount = in.readVLong(); outOfOrderTimeStampCount = in.readVLong(); + emptyBucketCount = in.readVLong(); + sparseBucketCount = in.readVLong(); + bucketCount = in.readVLong(); if (in.readBoolean()) { latestRecordTimeStamp = new Date(in.readVLong()); } @@ -175,6 +227,12 @@ public class DataCounts extends ToXContentToBytes implements Writeable { if (in.readBoolean()) { lastDataTimeStamp = new Date(in.readVLong()); } + if (in.readBoolean()) { + latestEmptyBucketTimeStamp = new Date(in.readVLong()); + } + if (in.readBoolean()) { + latestSparseBucketTimeStamp = new Date(in.readVLong()); + } in.readVLong(); // throw away inputRecordCount } @@ -307,6 +365,46 @@ public class DataCounts extends ToXContentToBytes implements Writeable { outOfOrderTimeStampCount += additional; } + /** + * The number of buckets with no records in it. Used to measure general data fitness and/or + * configuration problems (bucket span). + * + * @return Number of empty buckets processed by this job {@code long} + */ + public long getEmptyBucketCount() { + return emptyBucketCount; + } + + public void incrementEmptyBucketCount(long additional) { + emptyBucketCount += additional; + } + + /** + * The number of buckets with few records compared to the overall counts. + * Used to measure general data fitness and/or configuration problems (bucket span). + * + * @return Number of sparse buckets processed by this job {@code long} + */ + public long getSparseBucketCount() { + return sparseBucketCount; + } + + public void incrementSparseBucketCount(long additional) { + sparseBucketCount += additional; + } + + /** + * The number of buckets overall. + * + * @return Number of buckets processed by this job {@code long} + */ + public long getBucketCount() { + return bucketCount; + } + + public void incrementBucketCount(long additional) { + bucketCount += additional; + } /** * The time of the first record seen. * @@ -357,6 +455,33 @@ public class DataCounts extends ToXContentToBytes implements Writeable { this.lastDataTimeStamp = lastDataTimeStamp; } + /** + * The time of the latest empty bucket seen. + * + * @return Latest empty bucket time + */ + public Date getLatestEmptyBucketTimeStamp() { + return latestEmptyBucketTimeStamp; + } + + public void setLatestEmptyBucketTimeStamp(Date latestEmptyBucketTimeStamp) { + this.latestEmptyBucketTimeStamp = latestEmptyBucketTimeStamp; + } + + + /** + * The time of the latest sparse bucket seen. + * + * @return Latest sparse bucket time + */ + public Date getLatestSparseBucketTimeStamp() { + return latestSparseBucketTimeStamp; + } + + public void setLatestSparseBucketTimeStamp(Date latestSparseBucketTimeStamp) { + this.latestSparseBucketTimeStamp = latestSparseBucketTimeStamp; + } + @Override public void writeTo(StreamOutput out) throws IOException { out.writeString(jobId); @@ -367,6 +492,9 @@ public class DataCounts extends ToXContentToBytes implements Writeable { out.writeVLong(invalidDateCount); out.writeVLong(missingFieldCount); out.writeVLong(outOfOrderTimeStampCount); + out.writeVLong(emptyBucketCount); + out.writeVLong(sparseBucketCount); + out.writeVLong(bucketCount); if (latestRecordTimeStamp != null) { out.writeBoolean(true); out.writeVLong(latestRecordTimeStamp.getTime()); @@ -385,6 +513,18 @@ public class DataCounts extends ToXContentToBytes implements Writeable { } else { out.writeBoolean(false); } + if (latestEmptyBucketTimeStamp != null) { + out.writeBoolean(true); + out.writeVLong(latestEmptyBucketTimeStamp.getTime()); + } else { + out.writeBoolean(false); + } + if (latestSparseBucketTimeStamp != null) { + out.writeBoolean(true); + out.writeVLong(latestSparseBucketTimeStamp.getTime()); + } else { + out.writeBoolean(false); + } out.writeVLong(getInputRecordCount()); } @@ -405,6 +545,9 @@ public class DataCounts extends ToXContentToBytes implements Writeable { builder.field(INVALID_DATE_COUNT.getPreferredName(), invalidDateCount); builder.field(MISSING_FIELD_COUNT.getPreferredName(), missingFieldCount); builder.field(OUT_OF_ORDER_TIME_COUNT.getPreferredName(), outOfOrderTimeStampCount); + builder.field(EMPTY_BUCKET_COUNT.getPreferredName(), emptyBucketCount); + builder.field(SPARSE_BUCKET_COUNT.getPreferredName(), sparseBucketCount); + builder.field(BUCKET_COUNT.getPreferredName(), bucketCount); if (earliestRecordTimeStamp != null) { builder.dateField(EARLIEST_RECORD_TIME.getPreferredName(), EARLIEST_RECORD_TIME.getPreferredName() + "_string", earliestRecordTimeStamp.getTime()); @@ -417,6 +560,14 @@ public class DataCounts extends ToXContentToBytes implements Writeable { builder.dateField(LAST_DATA_TIME.getPreferredName(), LAST_DATA_TIME.getPreferredName() + "_string", lastDataTimeStamp.getTime()); } + if (latestEmptyBucketTimeStamp != null) { + builder.dateField(LATEST_EMPTY_BUCKET_TIME.getPreferredName(), LATEST_EMPTY_BUCKET_TIME.getPreferredName() + "_string", + latestEmptyBucketTimeStamp.getTime()); + } + if (latestSparseBucketTimeStamp != null) { + builder.dateField(LATEST_SPARSE_BUCKET_TIME.getPreferredName(), LATEST_SPARSE_BUCKET_TIME.getPreferredName() + "_string", + latestSparseBucketTimeStamp.getTime()); + } builder.field(INPUT_RECORD_COUNT.getPreferredName(), getInputRecordCount()); return builder; @@ -445,16 +596,21 @@ public class DataCounts extends ToXContentToBytes implements Writeable { this.invalidDateCount == that.invalidDateCount && this.missingFieldCount == that.missingFieldCount && this.outOfOrderTimeStampCount == that.outOfOrderTimeStampCount && + this.emptyBucketCount == that.emptyBucketCount && + this.sparseBucketCount == that.sparseBucketCount && + this.bucketCount == that.bucketCount && Objects.equals(this.latestRecordTimeStamp, that.latestRecordTimeStamp) && Objects.equals(this.earliestRecordTimeStamp, that.earliestRecordTimeStamp) && - Objects.equals(this.lastDataTimeStamp, that.lastDataTimeStamp); - + Objects.equals(this.lastDataTimeStamp, that.lastDataTimeStamp) && + Objects.equals(this.latestEmptyBucketTimeStamp, that.latestEmptyBucketTimeStamp) && + Objects.equals(this.latestSparseBucketTimeStamp, that.latestSparseBucketTimeStamp); } @Override public int hashCode() { return Objects.hash(jobId, processedRecordCount, processedFieldCount, inputBytes, inputFieldCount, invalidDateCount, missingFieldCount, - outOfOrderTimeStampCount, latestRecordTimeStamp, earliestRecordTimeStamp, lastDataTimeStamp); + outOfOrderTimeStampCount, lastDataTimeStamp, emptyBucketCount, sparseBucketCount, bucketCount, + latestRecordTimeStamp, earliestRecordTimeStamp, latestEmptyBucketTimeStamp, latestSparseBucketTimeStamp); } } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/AbstractDataToProcessWriter.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/AbstractDataToProcessWriter.java index 9647d4353f7..0a1076c3962 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/AbstractDataToProcessWriter.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/AbstractDataToProcessWriter.java @@ -10,6 +10,7 @@ import org.elasticsearch.xpack.ml.job.config.AnalysisConfig; import org.elasticsearch.xpack.ml.job.config.DataDescription; import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcess; import org.elasticsearch.xpack.ml.job.process.DataCountsReporter; +import org.elasticsearch.xpack.ml.job.process.DataStreamDiagnostics; import java.io.IOException; import java.util.ArrayList; @@ -40,6 +41,7 @@ public abstract class AbstractDataToProcessWriter implements DataToProcessWriter private final Logger logger; private final DateTransformer dateTransformer; + private final DataStreamDiagnostics diagnostics; protected Map inFieldIndexes; protected List inputOutputMap; @@ -57,6 +59,7 @@ public abstract class AbstractDataToProcessWriter implements DataToProcessWriter this.analysisConfig = Objects.requireNonNull(analysisConfig); this.dataCountsReporter = Objects.requireNonNull(dataCountsReporter); this.logger = Objects.requireNonNull(logger); + this.diagnostics = new DataStreamDiagnostics(this.dataCountsReporter, this.analysisConfig, this.logger); Date date = dataCountsReporter.getLatestRecordTime(); latestEpochMsThisUpload = 0; @@ -135,10 +138,12 @@ public abstract class AbstractDataToProcessWriter implements DataToProcessWriter return false; } + record[TIME_FIELD_OUT_INDEX] = Long.toString(epochMs / MS_IN_SECOND); + // Records have epoch seconds timestamp so compare for out of order in seconds if (epochMs / MS_IN_SECOND < latestEpochMs / MS_IN_SECOND - analysisConfig.getLatency()) { // out of order - dataCountsReporter.reportOutOfOrderRecord(inFieldIndexes.size()); + dataCountsReporter.reportOutOfOrderRecord(numberOfFieldsRead); if (epochMs > latestEpochMsThisUpload) { // record this timestamp even if the record won't be processed @@ -148,11 +153,12 @@ public abstract class AbstractDataToProcessWriter implements DataToProcessWriter return false; } - record[TIME_FIELD_OUT_INDEX] = Long.toString(epochMs / MS_IN_SECOND); - latestEpochMs = Math.max(latestEpochMs, epochMs); latestEpochMsThisUpload = latestEpochMs; + // check record in diagnostics + diagnostics.checkRecord(epochMs); + autodetectProcess.writeRecord(record); dataCountsReporter.reportRecordWritten(numberOfFieldsRead, latestEpochMs); diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/results/ReservedFieldNames.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/results/ReservedFieldNames.java index 27d2daeacaa..8a1decbf16c 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/results/ReservedFieldNames.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/results/ReservedFieldNames.java @@ -106,9 +106,14 @@ public final class ReservedFieldNames { DataCounts.INVALID_DATE_COUNT.getPreferredName(), DataCounts.MISSING_FIELD_COUNT.getPreferredName(), DataCounts.OUT_OF_ORDER_TIME_COUNT.getPreferredName(), + DataCounts.EMPTY_BUCKET_COUNT.getPreferredName(), + DataCounts.SPARSE_BUCKET_COUNT.getPreferredName(), + DataCounts.BUCKET_COUNT.getPreferredName(), DataCounts.LATEST_RECORD_TIME.getPreferredName(), DataCounts.EARLIEST_RECORD_TIME.getPreferredName(), DataCounts.LAST_DATA_TIME.getPreferredName(), + DataCounts.LATEST_EMPTY_BUCKET_TIME.getPreferredName(), + DataCounts.LATEST_SPARSE_BUCKET_TIME.getPreferredName(), Influence.INFLUENCER_FIELD_NAME.getPreferredName(), Influence.INFLUENCER_FIELD_VALUES.getPreferredName(), diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/GetJobStatsActionResponseTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/GetJobStatsActionResponseTests.java index a63727bc449..5a841024bef 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/GetJobStatsActionResponseTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/GetJobStatsActionResponseTests.java @@ -37,6 +37,7 @@ public class GetJobStatsActionResponseTests extends AbstractStreamableTestCase { @Override protected PostDataAction.Response createTestInstance() { DataCounts counts = new DataCountsTests().createTestInstance(); - return new PostDataAction.Response(counts); } diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobRunnerTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobRunnerTests.java index d82f3cfe0da..eba6a235239 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobRunnerTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobRunnerTests.java @@ -183,7 +183,8 @@ public class DatafeedJobRunnerTests extends ESTestCase { when(dataExtractor.hasNext()).thenReturn(true).thenReturn(false); InputStream in = new ByteArrayInputStream("".getBytes(Charset.forName("utf-8"))); when(dataExtractor.next()).thenReturn(Optional.of(in)); - DataCounts dataCounts = new DataCounts("job_id", 1, 0, 0, 0, 0, 0, 0, new Date(0), new Date(0), new Date(0)); + DataCounts dataCounts = new DataCounts("job_id", 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, + new Date(0), new Date(0), new Date(0), new Date(0), new Date(0)); when(jobDataFuture.actionGet()).thenReturn(new PostDataAction.Response(dataCounts)); Consumer handler = mockConsumer(); StartDatafeedAction.DatafeedTask task = createDatafeedTask("datafeed_id", 0L, 60000L); @@ -209,7 +210,8 @@ public class DatafeedJobRunnerTests extends ESTestCase { when(dataExtractorFactory.newExtractor(0L, 60000L)).thenReturn(dataExtractor); when(dataExtractor.hasNext()).thenReturn(true).thenReturn(false); when(dataExtractor.next()).thenThrow(new RuntimeException("dummy")); - DataCounts dataCounts = new DataCounts("job_id", 1, 0, 0, 0, 0, 0, 0, new Date(0), new Date(0), new Date(0)); + DataCounts dataCounts = new DataCounts("job_id", 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, + new Date(0), new Date(0), new Date(0), new Date(0), new Date(0)); when(jobDataFuture.actionGet()).thenReturn(new PostDataAction.Response(dataCounts)); Consumer handler = mockConsumer(); StartDatafeedAction.DatafeedTask task = createDatafeedTask("datafeed_id", 0L, 60000L); @@ -263,7 +265,8 @@ public class DatafeedJobRunnerTests extends ESTestCase { when(dataExtractor.hasNext()).thenReturn(true).thenReturn(false); InputStream in = new ByteArrayInputStream("".getBytes(Charset.forName("utf-8"))); when(dataExtractor.next()).thenReturn(Optional.of(in)); - DataCounts dataCounts = new DataCounts("job_id", 1, 0, 0, 0, 0, 0, 0, new Date(0), new Date(0), new Date(0)); + DataCounts dataCounts = new DataCounts("job_id", 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, + new Date(0), new Date(0), new Date(0), new Date(0), new Date(0)); when(jobDataFuture.actionGet()).thenReturn(new PostDataAction.Response(dataCounts)); Consumer handler = mockConsumer(); boolean cancelled = randomBoolean(); diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobTests.java index 0a85e166677..163e2c63179 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobTests.java @@ -63,7 +63,8 @@ public class DatafeedJobTests extends ESTestCase { when(dataExtractor.hasNext()).thenReturn(true).thenReturn(false); InputStream inputStream = new ByteArrayInputStream("content".getBytes(StandardCharsets.UTF_8)); when(dataExtractor.next()).thenReturn(Optional.of(inputStream)); - DataCounts dataCounts = new DataCounts("_job_id", 1, 0, 0, 0, 0, 0, 0, new Date(0), new Date(0), new Date(0)); + DataCounts dataCounts = new DataCounts("_job_id", 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, new Date(0), new Date(0), + new Date(0), new Date(0), new Date(0)); PostDataAction.Request expectedRequest = new PostDataAction.Request("_job_id"); expectedRequest.setDataDescription(dataDescription.build()); diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/DataCountsReporterTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/DataCountsReporterTests.java index ffb8ece5dfd..cf691c1f4b5 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/DataCountsReporterTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/DataCountsReporterTests.java @@ -73,7 +73,8 @@ public class DataCountsReporterTests extends ESTestCase { } public void testComplexConstructor() throws Exception { - DataCounts counts = new DataCounts("foo", 1L, 1L, 2L, 0L, 3L, 4L, 5L, new Date(), new Date(), new Date()); + DataCounts counts = new DataCounts("foo", 1L, 1L, 2L, 0L, 3L, 4L, 5L, 6L, 7L, 8L, + new Date(), new Date(), new Date(), new Date(), new Date()); try (DataCountsReporter dataCountsReporter = new DataCountsReporter(threadPool, settings, JOB_ID, counts, jobDataCountsPersister)) { @@ -86,6 +87,9 @@ public class DataCountsReporterTests extends ESTestCase { assertEquals(3, dataCountsReporter.getDateParseErrorsCount()); assertEquals(4, dataCountsReporter.getMissingFieldErrorCount()); assertEquals(5, dataCountsReporter.getOutOfOrderRecordCount()); + assertEquals(6, dataCountsReporter.getEmptyBucketCount()); + assertEquals(7, dataCountsReporter.getSparseBucketCount()); + assertEquals(8, dataCountsReporter.getBucketCount()); assertNull(stats.getEarliestRecordTimeStamp()); } } @@ -248,9 +252,9 @@ public class DataCountsReporterTests extends ESTestCase { jobDataCountsPersister)) { dataCountsReporter.setAnalysedFieldsPerRecord(3); - Date now = new Date(); - DataCounts dc = new DataCounts(JOB_ID, 2L, 5L, 0L, 10L, 0L, 1L, 0L, new Date(2000), new Date(3000), now); + DataCounts dc = new DataCounts(JOB_ID, 2L, 5L, 0L, 10L, 0L, 1L, 0L, 0L, 0L, 0L, new Date(2000), new Date(3000), + now, (Date) null, (Date) null); dataCountsReporter.reportRecordWritten(5, 2000); dataCountsReporter.reportRecordWritten(5, 3000); dataCountsReporter.reportMissingField(); @@ -309,5 +313,7 @@ public class DataCountsReporterTests extends ESTestCase { assertEquals(0L, stats.getInvalidDateCount()); assertEquals(0L, stats.getMissingFieldCount()); assertEquals(0L, stats.getOutOfOrderTimeStampCount()); + assertEquals(0L, stats.getEmptyBucketCount()); + assertEquals(0L, stats.getSparseBucketCount());; } } diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/DataStreamDiagnosticsTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/DataStreamDiagnosticsTests.java new file mode 100644 index 00000000000..ba5c86dbd8a --- /dev/null +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/DataStreamDiagnosticsTests.java @@ -0,0 +1,278 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.job.process; + +import org.apache.logging.log4j.Logger; +import org.elasticsearch.common.logging.Loggers; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xpack.ml.job.config.AnalysisConfig; +import org.elasticsearch.xpack.ml.job.config.Detector; +import org.junit.Before; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Date; + +public class DataStreamDiagnosticsTests extends ESTestCase { + + private AnalysisConfig analysisConfig; + private Logger logger; + + @Before + public void setUpMocks() throws IOException { + logger = Loggers.getLogger(DataStreamDiagnosticsTests.class); + + AnalysisConfig.Builder acBuilder = new AnalysisConfig.Builder(Arrays.asList(new Detector.Builder("metric", "field").build())); + acBuilder.setBucketSpan(60L); + analysisConfig = acBuilder.build(); + } + + public void testSimple() { + DataCountsReporter dataCountsReporter = new DummyDataCountsReporter(); + DataStreamDiagnostics d = new DataStreamDiagnostics(dataCountsReporter, analysisConfig, logger); + + d.checkRecord(70000); + d.checkRecord(130000); + d.checkRecord(190000); + d.checkRecord(250000); + d.checkRecord(310000); + d.checkRecord(370000); + d.checkRecord(430000); + d.checkRecord(490000); + d.checkRecord(550000); + d.checkRecord(610000); + + d.flush(); + assertEquals(10, dataCountsReporter.getBucketCount()); + assertEquals(0, dataCountsReporter.getEmptyBucketCount()); + assertEquals(0, dataCountsReporter.getSparseBucketCount()); + assertEquals(null, dataCountsReporter.getLatestSparseBucketTime()); + assertEquals(null, dataCountsReporter.getLatestEmptyBucketTime()); + } + + public void testEmptyBuckets() { + DataCountsReporter dataCountsReporter = new DummyDataCountsReporter(); + DataStreamDiagnostics d = new DataStreamDiagnostics(dataCountsReporter, analysisConfig, logger); + + d.checkRecord(10000); + d.checkRecord(70000); + // empty bucket + d.checkRecord(190000); + d.checkRecord(250000); + d.checkRecord(310000); + d.checkRecord(370000); + // empty bucket + d.checkRecord(490000); + d.checkRecord(550000); + + d.flush(); + assertEquals(10, dataCountsReporter.getBucketCount()); + assertEquals(2, dataCountsReporter.getEmptyBucketCount()); + assertEquals(0, dataCountsReporter.getSparseBucketCount()); + assertEquals(null, dataCountsReporter.getLatestSparseBucketTime()); + assertEquals(new Date(420000), dataCountsReporter.getLatestEmptyBucketTime()); + } + + public void testEmptyBucketsStartLater() { + DataCountsReporter dataCountsReporter = new DummyDataCountsReporter(); + DataStreamDiagnostics d = new DataStreamDiagnostics(dataCountsReporter, analysisConfig, logger); + + d.checkRecord(1110000); + d.checkRecord(1170000); + // empty bucket + d.checkRecord(1290000); + d.checkRecord(1350000); + d.checkRecord(1410000); + d.checkRecord(1470000); + // empty bucket + d.checkRecord(1590000); + d.checkRecord(1650000); + + d.flush(); + assertEquals(10, dataCountsReporter.getBucketCount()); + assertEquals(2, dataCountsReporter.getEmptyBucketCount()); + assertEquals(0, dataCountsReporter.getSparseBucketCount()); + assertEquals(null, dataCountsReporter.getLatestSparseBucketTime()); + assertEquals(new Date(1500000), dataCountsReporter.getLatestEmptyBucketTime()); + } + + public void testSparseBuckets() { + DataCountsReporter dataCountsReporter = new DummyDataCountsReporter(); + DataStreamDiagnostics d = new DataStreamDiagnostics(dataCountsReporter, analysisConfig, logger); + + sendManyDataPoints(d, 10000, 69000, 1000); + sendManyDataPoints(d, 70000, 129000, 1200); + // sparse bucket + sendManyDataPoints(d, 130000, 189000, 1); + sendManyDataPoints(d, 190000, 249000, 1100); + sendManyDataPoints(d, 250000, 309000, 1300); + sendManyDataPoints(d, 310000, 369000, 1050); + sendManyDataPoints(d, 370000, 429000, 1022); + // sparse bucket + sendManyDataPoints(d, 430000, 489000, 10); + sendManyDataPoints(d, 490000, 549000, 1333); + sendManyDataPoints(d, 550000, 609000, 1400); + + d.flush(); + assertEquals(10, dataCountsReporter.getBucketCount()); + assertEquals(0, dataCountsReporter.getEmptyBucketCount()); + assertEquals(2, dataCountsReporter.getSparseBucketCount()); + assertEquals(new Date(420000), dataCountsReporter.getLatestSparseBucketTime()); + assertEquals(null, dataCountsReporter.getLatestEmptyBucketTime()); + } + + /** + * Test for sparsity on the last bucket should not create a sparse bucket + * signal + */ + public void testSparseBucketsLast() { + DataCountsReporter dataCountsReporter = new DummyDataCountsReporter(); + DataStreamDiagnostics d = new DataStreamDiagnostics(dataCountsReporter, analysisConfig, logger); + + sendManyDataPoints(d, 10000, 69000, 1000); + sendManyDataPoints(d, 70000, 129000, 1200); + // sparse bucket + sendManyDataPoints(d, 130000, 189000, 1); + sendManyDataPoints(d, 190000, 249000, 1100); + sendManyDataPoints(d, 250000, 309000, 1300); + sendManyDataPoints(d, 310000, 369000, 1050); + sendManyDataPoints(d, 370000, 429000, 1022); + sendManyDataPoints(d, 430000, 489000, 1400); + sendManyDataPoints(d, 490000, 549000, 1333); + // sparse bucket (but last one) + sendManyDataPoints(d, 550000, 609000, 10); + + d.flush(); + assertEquals(10, dataCountsReporter.getBucketCount()); + assertEquals(0, dataCountsReporter.getEmptyBucketCount()); + assertEquals(1, dataCountsReporter.getSparseBucketCount()); + assertEquals(new Date(120000), dataCountsReporter.getLatestSparseBucketTime()); + assertEquals(null, dataCountsReporter.getLatestEmptyBucketTime()); + } + + /** + * Test for sparsity on the last 2 buckets, should create a sparse bucket + * signal on the 2nd to last + */ + public void testSparseBucketsLastTwo() { + DataCountsReporter dataCountsReporter = new DummyDataCountsReporter(); + DataStreamDiagnostics d = new DataStreamDiagnostics(dataCountsReporter, analysisConfig, logger); + + sendManyDataPoints(d, 10000, 69000, 1000); + sendManyDataPoints(d, 70000, 129000, 1200); + // sparse bucket + sendManyDataPoints(d, 130000, 189000, 1); + sendManyDataPoints(d, 190000, 249000, 1100); + sendManyDataPoints(d, 250000, 309000, 1300); + sendManyDataPoints(d, 310000, 369000, 1050); + sendManyDataPoints(d, 370000, 429000, 1022); + sendManyDataPoints(d, 430000, 489000, 1400); + // sparse bucket (2nd to last one) + sendManyDataPoints(d, 490000, 549000, 9); + // sparse bucket (but last one) + sendManyDataPoints(d, 550000, 609000, 10); + + d.flush(); + assertEquals(10, dataCountsReporter.getBucketCount()); + assertEquals(0, dataCountsReporter.getEmptyBucketCount()); + assertEquals(2, dataCountsReporter.getSparseBucketCount()); + assertEquals(new Date(480000), dataCountsReporter.getLatestSparseBucketTime()); + assertEquals(null, dataCountsReporter.getLatestEmptyBucketTime()); + } + + public void testMixedEmptyAndSparseBuckets() { + DataCountsReporter dataCountsReporter = new DummyDataCountsReporter(); + DataStreamDiagnostics d = new DataStreamDiagnostics(dataCountsReporter, analysisConfig, logger); + + sendManyDataPoints(d, 10000, 69000, 1000); + sendManyDataPoints(d, 70000, 129000, 1200); + // sparse bucket + sendManyDataPoints(d, 130000, 189000, 1); + // empty bucket + sendManyDataPoints(d, 250000, 309000, 1300); + sendManyDataPoints(d, 310000, 369000, 1050); + sendManyDataPoints(d, 370000, 429000, 1022); + // sparse bucket + sendManyDataPoints(d, 430000, 489000, 10); + // empty bucket + sendManyDataPoints(d, 550000, 609000, 1400); + + d.flush(); + assertEquals(10, dataCountsReporter.getBucketCount()); + assertEquals(2, dataCountsReporter.getSparseBucketCount()); + assertEquals(new Date(420000), dataCountsReporter.getLatestSparseBucketTime()); + assertEquals(2, dataCountsReporter.getEmptyBucketCount()); + assertEquals(new Date(480000), dataCountsReporter.getLatestEmptyBucketTime()); + } + + /** + * Send signals, then make a long pause, send another signal and then check + * whether counts are right. + */ + public void testEmptyBucketsLongerOutage() { + DataCountsReporter dataCountsReporter = new DummyDataCountsReporter(); + DataStreamDiagnostics d = new DataStreamDiagnostics(dataCountsReporter, analysisConfig, logger); + + d.checkRecord(10000); + d.checkRecord(70000); + // empty bucket + d.checkRecord(190000); + d.checkRecord(250000); + d.checkRecord(310000); + d.checkRecord(370000); + // empty bucket + d.checkRecord(490000); + d.checkRecord(550000); + // 98 empty buckets + d.checkRecord(6490000); + d.flush(); + assertEquals(109, dataCountsReporter.getBucketCount()); + assertEquals(100, dataCountsReporter.getEmptyBucketCount()); + assertEquals(0, dataCountsReporter.getSparseBucketCount()); + assertEquals(null, dataCountsReporter.getLatestSparseBucketTime()); + assertEquals(new Date(6420000), dataCountsReporter.getLatestEmptyBucketTime()); + } + + /** + * Send signals, make a longer period of sparse signals, then go up again + * + * The number of sparse buckets should not be to much, it could be normal. + */ + public void testSparseBucketsLongerPeriod() { + DataCountsReporter dataCountsReporter = new DummyDataCountsReporter(); + DataStreamDiagnostics d = new DataStreamDiagnostics(dataCountsReporter, analysisConfig, logger); + + sendManyDataPoints(d, 10000, 69000, 1000); + sendManyDataPoints(d, 70000, 129000, 1200); + // sparse bucket + sendManyDataPoints(d, 130000, 189000, 1); + sendManyDataPoints(d, 190000, 249000, 1100); + sendManyDataPoints(d, 250000, 309000, 1300); + sendManyDataPoints(d, 310000, 369000, 1050); + sendManyDataPoints(d, 370000, 429000, 1022); + // sparse bucket + sendManyDataPoints(d, 430000, 489000, 10); + sendManyDataPoints(d, 490000, 549000, 1333); + sendManyDataPoints(d, 550000, 609000, 1400); + + d.flush(); + assertEquals(10, dataCountsReporter.getBucketCount()); + assertEquals(0, dataCountsReporter.getEmptyBucketCount()); + assertEquals(2, dataCountsReporter.getSparseBucketCount()); + assertEquals(new Date(420000), dataCountsReporter.getLatestSparseBucketTime()); + assertEquals(null, dataCountsReporter.getLatestEmptyBucketTime()); + } + + private void sendManyDataPoints(DataStreamDiagnostics d, long recordTimestampInMsMin, long recordTimestampInMsMax, long howMuch) { + + long range = recordTimestampInMsMax - recordTimestampInMsMin; + + for (int i = 0; i < howMuch; ++i) { + d.checkRecord(recordTimestampInMsMin + i % range); + } + } + +} diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/state/DataCountsTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/state/DataCountsTests.java index a3ebec4f190..013a7ab04eb 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/state/DataCountsTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/state/DataCountsTests.java @@ -21,6 +21,8 @@ public class DataCountsTests extends AbstractSerializingTestCase { return new DataCounts(randomAsciiOfLength(10), randomIntBetween(1, 1_000_000), randomIntBetween(1, 1_000_000), randomIntBetween(1, 1_000_000), randomIntBetween(1, 1_000_000), randomIntBetween(1, 1_000_000), randomIntBetween(1, 1_000_000), randomIntBetween(1, 1_000_000), + randomIntBetween(1, 1_000_000), randomIntBetween(1, 1_000_000), randomIntBetween(1, 1_000_000), + new DateTime(randomDateTimeZone()).toDate(), new DateTime(randomDateTimeZone()).toDate(), new DateTime(randomDateTimeZone()).toDate(), new DateTime(randomDateTimeZone()).toDate(), new DateTime(randomDateTimeZone()).toDate()); } @@ -36,22 +38,22 @@ public class DataCountsTests extends AbstractSerializingTestCase { } public void testCountsEquals_GivenEqualCounts() { - DataCounts counts1 = createCounts(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); - DataCounts counts2 = createCounts(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); + DataCounts counts1 = createCounts(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12); + DataCounts counts2 = createCounts(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12); assertTrue(counts1.equals(counts2)); assertTrue(counts2.equals(counts1)); } public void testCountsHashCode_GivenEqualCounts() { - DataCounts counts1 = createCounts(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); - DataCounts counts2 = createCounts(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); + DataCounts counts1 = createCounts(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12); + DataCounts counts2 = createCounts(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12); assertEquals(counts1.hashCode(), counts2.hashCode()); } public void testCountsCopyConstructor() { - DataCounts counts1 = createCounts(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); + DataCounts counts1 = createCounts(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12); DataCounts counts2 = new DataCounts(counts1); assertEquals(counts1.hashCode(), counts2.hashCode()); @@ -63,7 +65,7 @@ public class DataCountsTests extends AbstractSerializingTestCase { } public void testCountCopyCreatedFieldsNotZero() throws Exception { - DataCounts counts1 = createCounts(1, 200, 400, 3, 4, 5, 6, 1479211200000L, 1479384000000L, 1488282343000L); + DataCounts counts1 = createCounts(1, 200, 400, 3, 4, 5, 6, 7, 8, 9, 1479211200000L, 1479384000000L); assertAllFieldsGreaterThanZero(counts1); DataCounts counts2 = new DataCounts(counts1); @@ -102,19 +104,22 @@ public class DataCountsTests extends AbstractSerializingTestCase { } public void testCalcProcessedFieldCount() { - DataCounts counts = new DataCounts(randomAsciiOfLength(16), 10L, 0L, 0L, 0L, 0L, 0L, 0L, new Date(), new Date(), new Date()); + DataCounts counts = new DataCounts(randomAsciiOfLength(16), 10L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, new Date(), new Date(), + new Date(), new Date(), new Date()); counts.calcProcessedFieldCount(3); assertEquals(30, counts.getProcessedFieldCount()); - counts = new DataCounts(randomAsciiOfLength(16), 10L, 0L, 0L, 0L, 0L, 5L, 0L, new Date(), new Date(), new Date()); + counts = new DataCounts(randomAsciiOfLength(16), 10L, 0L, 0L, 0L, 0L, 5L, 0L, 0L, 0L, 0L, new Date(), new Date(), + new Date(), new Date(), new Date()); counts.calcProcessedFieldCount(3); assertEquals(25, counts.getProcessedFieldCount()); } public void testEquals() { DataCounts counts1 = new DataCounts( - randomAsciiOfLength(16), 10L, 5000L, 2000L, 300L, 6L, 15L, 0L, new Date(), new Date(1435000000L), new Date(10L)); + randomAsciiOfLength(16), 10L, 5000L, 2000L, 300L, 6L, 15L, 0L, 0L, 0L, 0L, new Date(), new Date(1435000000L), + new Date(), new Date(), new Date()); DataCounts counts2 = new DataCounts(counts1); assertEquals(counts1, counts2); @@ -156,12 +161,15 @@ public class DataCountsTests extends AbstractSerializingTestCase { private static DataCounts createCounts( long processedRecordCount, long processedFieldCount, long inputBytes, long inputFieldCount, - long invalidDateCount, long missingFieldCount, long outOfOrderTimeStampCount, long earliestRecordTime, long latestRecordTime, - long lastDataTime) { + long invalidDateCount, long missingFieldCount, long outOfOrderTimeStampCount, + long emptyBucketCount, long sparseBucketCount, long bucketCount, + long earliestRecordTime, long latestRecordTime) { DataCounts counts = new DataCounts("foo", processedRecordCount, processedFieldCount, inputBytes, inputFieldCount, invalidDateCount, missingFieldCount, outOfOrderTimeStampCount, - new Date(earliestRecordTime), new Date(latestRecordTime), new Date(lastDataTime)); + emptyBucketCount, sparseBucketCount, bucketCount, + new Date(earliestRecordTime), new Date(latestRecordTime), + new Date(), new Date(), new Date()); return counts; }