[ML] DataStreamDiagnostics Rebase (elastic/x-pack-elasticsearch#698)

DataStreamDiagnostics

DataStreamDiagnostics analyzes input data regarding machine learning fit. It checks whether data is sane/plausible as anomaly detection on broken data (or misconfiguration).

Original commit: elastic/x-pack-elasticsearch@2f37d3c960
This commit is contained in:
Hendrik Muhs 2017-03-12 09:40:27 -07:00 committed by GitHub
parent 0457de27dd
commit ce1a910542
13 changed files with 742 additions and 31 deletions

View File

@ -466,12 +466,27 @@ public class ElasticsearchMappings {
.startObject(DataCounts.OUT_OF_ORDER_TIME_COUNT.getPreferredName()) .startObject(DataCounts.OUT_OF_ORDER_TIME_COUNT.getPreferredName())
.field(TYPE, LONG) .field(TYPE, LONG)
.endObject() .endObject()
.startObject(DataCounts.EMPTY_BUCKET_COUNT.getPreferredName())
.field(TYPE, LONG)
.endObject()
.startObject(DataCounts.SPARSE_BUCKET_COUNT.getPreferredName())
.field(TYPE, LONG)
.endObject()
.startObject(DataCounts.BUCKET_COUNT.getPreferredName())
.field(TYPE, LONG)
.endObject()
.startObject(DataCounts.EARLIEST_RECORD_TIME.getPreferredName()) .startObject(DataCounts.EARLIEST_RECORD_TIME.getPreferredName())
.field(TYPE, DATE) .field(TYPE, DATE)
.endObject() .endObject()
.startObject(DataCounts.LATEST_RECORD_TIME.getPreferredName()) .startObject(DataCounts.LATEST_RECORD_TIME.getPreferredName())
.field(TYPE, DATE) .field(TYPE, DATE)
.endObject() .endObject()
.startObject(DataCounts.LATEST_EMPTY_BUCKET_TIME.getPreferredName())
.field(TYPE, DATE)
.endObject()
.startObject(DataCounts.LATEST_SPARSE_BUCKET_TIME.getPreferredName())
.field(TYPE, DATE)
.endObject()
.startObject(DataCounts.LAST_DATA_TIME.getPreferredName()) .startObject(DataCounts.LAST_DATA_TIME.getPreferredName())
.field(TYPE, DATE) .field(TYPE, DATE)
.endObject() .endObject()

View File

@ -191,6 +191,43 @@ public class DataCountsReporter extends AbstractComponent implements Closeable {
incrementalRecordStats.incrementInputFieldCount(inputFieldCount); incrementalRecordStats.incrementInputFieldCount(inputFieldCount);
} }
public void reportEmptyBucket(long recordTimeMs ){
Date recordDate = new Date(recordTimeMs);
totalRecordStats.incrementEmptyBucketCount(1);
totalRecordStats.setLatestEmptyBucketTimeStamp(recordDate);
incrementalRecordStats.incrementEmptyBucketCount(1);
incrementalRecordStats.setLatestEmptyBucketTimeStamp(recordDate);
}
public void reportEmptyBuckets(long additional, long lastRecordTimeMs ){
Date recordDate = new Date(lastRecordTimeMs);
totalRecordStats.incrementEmptyBucketCount(additional);
totalRecordStats.setLatestEmptyBucketTimeStamp(recordDate);
incrementalRecordStats.incrementEmptyBucketCount(additional);
incrementalRecordStats.setLatestEmptyBucketTimeStamp(recordDate);
}
public void reportSparseBucket(long recordTimeMs){
Date recordDate = new Date(recordTimeMs);
totalRecordStats.incrementSparseBucketCount(1);
totalRecordStats.setLatestSparseBucketTimeStamp(recordDate);
incrementalRecordStats.incrementSparseBucketCount(1);
incrementalRecordStats.setLatestSparseBucketTimeStamp(recordDate);
}
public void reportBucket(){
totalRecordStats.incrementBucketCount(1);
incrementalRecordStats.incrementBucketCount(1);
}
public void reportBuckets(long additional){
totalRecordStats.incrementBucketCount(additional);
incrementalRecordStats.incrementBucketCount(additional);
}
/** /**
* Total records seen = records written to the Engine (processed record * Total records seen = records written to the Engine (processed record
* count) + date parse error records count + out of order record count. * count) + date parse error records count + out of order record count.
@ -216,7 +253,19 @@ public class DataCountsReporter extends AbstractComponent implements Closeable {
public long getOutOfOrderRecordCount() { public long getOutOfOrderRecordCount() {
return totalRecordStats.getOutOfOrderTimeStampCount(); return totalRecordStats.getOutOfOrderTimeStampCount();
} }
public long getEmptyBucketCount() {
return totalRecordStats.getEmptyBucketCount();
}
public long getSparseBucketCount() {
return totalRecordStats.getSparseBucketCount();
}
public long getBucketCount() {
return totalRecordStats.getBucketCount();
}
public long getBytesRead() { public long getBytesRead() {
return totalRecordStats.getInputBytes(); return totalRecordStats.getInputBytes();
} }
@ -224,6 +273,14 @@ public class DataCountsReporter extends AbstractComponent implements Closeable {
public Date getLatestRecordTime() { public Date getLatestRecordTime() {
return totalRecordStats.getLatestRecordTimeStamp(); return totalRecordStats.getLatestRecordTimeStamp();
} }
public Date getLatestEmptyBucketTime() {
return totalRecordStats.getLatestEmptyBucketTimeStamp();
}
public Date getLatestSparseBucketTime() {
return totalRecordStats.getLatestSparseBucketTimeStamp();
}
public long getProcessedFieldCount() { public long getProcessedFieldCount() {
totalRecordStats.calcProcessedFieldCount(getAnalysedFieldsPerRecord()); totalRecordStats.calcProcessedFieldCount(getAnalysedFieldsPerRecord());

View File

@ -0,0 +1,177 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.job.process;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.xpack.ml.job.config.AnalysisConfig;
import org.apache.lucene.util.Counter;
import java.util.SortedMap;
import java.util.TreeMap;
public class DataStreamDiagnostics {
/**
* Minimum window to take into consideration for bucket count histogram.
*/
private static final int MIN_BUCKET_WINDOW = 10;
/**
* Threshold to report potential sparsity problems.
*
* Sparsity score is calculated: log(average) - log(current)
*
* If score is above the threshold, bucket is reported as sparse bucket.
*/
private static final int DATA_SPARSITY_THRESHOLD = 2;
private static final long MS_IN_SECOND = 1000;
private final DataCountsReporter dataCountsReporter;
private final Logger logger;
/**
* Container for the histogram
*
* Note: Using a sorted map in order to iterate in order when consuming the
* data. The counter is lazily initialized and potentially missing in case
* of empty buckets.
*
* The container gets pruned along the data streaming based on the bucket
* window, so it should not contain more than max(MIN_BUCKET_WINDOW,
* 'buckets_required_by_latency') + 1 items at any time.
*/
private final SortedMap<Long, Counter> movingBucketHistogram = new TreeMap<>();
private final long bucketSpan;
private final long latency;
private long movingBucketCount = 0;
private long lastReportedBucket = -1;
public DataStreamDiagnostics(DataCountsReporter dataCountsReporter, AnalysisConfig analysisConfig, Logger logger) {
this.dataCountsReporter = dataCountsReporter;
this.logger = logger;
bucketSpan = analysisConfig.getBucketSpanOrDefault();
latency = analysisConfig.getLatency();
}
/**
* Check record
*
* @param recordTimestampInMs
* The record timestamp in milliseconds since epoch
*/
public void checkRecord(long recordTimestampInMs) {
checkBucketing(recordTimestampInMs);
}
/**
* Flush all counters, should be called at the end of the data stream
*/
public void flush() {
// flush all we know
flush(movingBucketHistogram.lastKey() + 1);
}
/**
* Check bucketing of record. Report empty and sparse buckets.
*
* @param recordTimestampInMs
* The record timestamp in milliseconds since epoch
*/
private void checkBucketing(long recordTimestampInMs) {
long bucket = (recordTimestampInMs / MS_IN_SECOND) / bucketSpan;
long bucketHistogramStartBucket = ((recordTimestampInMs / MS_IN_SECOND) - latency) / bucketSpan;
bucketHistogramStartBucket = Math.min(bucket - MIN_BUCKET_WINDOW, bucketHistogramStartBucket);
movingBucketHistogram.computeIfAbsent(bucket, l -> Counter.newCounter()).addAndGet(1);
++movingBucketCount;
// find the very first bucket
if (lastReportedBucket == -1) {
lastReportedBucket = bucket - 1;
}
// flush all bucket out of the window
flush(bucketHistogramStartBucket);
}
/**
* Flush Bucket reporting till the given bucket.
*
* @param bucketTimeStamp
* The timestamp of the last bucket that can be flushed.
*/
private void flush(long bucketTimeStamp) {
// check for a longer period of empty buckets
long emptyBuckets = movingBucketHistogram.firstKey() - lastReportedBucket - 1;
if (emptyBuckets > 0) {
dataCountsReporter.reportBuckets(emptyBuckets);
dataCountsReporter.reportEmptyBuckets(emptyBuckets, (movingBucketHistogram.firstKey() - 1) * bucketSpan * MS_IN_SECOND);
lastReportedBucket = movingBucketHistogram.firstKey() - 1;
}
// calculate the average number of data points in a bucket based on the
// current history
double averageBucketSize = (float) movingBucketCount / movingBucketHistogram.size();
// prune all buckets that can be flushed
long lastBucketSparsityCheck = Math.min(bucketTimeStamp, movingBucketHistogram.lastKey());
for (long pruneBucket = movingBucketHistogram.firstKey(); pruneBucket < lastBucketSparsityCheck; ++pruneBucket) {
Counter bucketSizeHolder = movingBucketHistogram.remove(pruneBucket);
long bucketSize = bucketSizeHolder != null ? bucketSizeHolder.get() : 0L;
logger.debug("Checking bucket {} compare sizes, this bucket: {} average: {}", pruneBucket, bucketSize, averageBucketSize);
dataCountsReporter.reportBucket();
lastReportedBucket = pruneBucket;
// substract bucketSize from the counter
movingBucketCount -= bucketSize;
// check if bucket is empty
if (bucketSize == 0L) {
dataCountsReporter.reportEmptyBucket(pruneBucket * bucketSpan * MS_IN_SECOND);
// do not do sparse analysis on an empty bucket
continue;
}
// simplistic way to calculate data sparsity, just take the log and
// check the difference
double logAverageBucketSize = Math.log(averageBucketSize);
double logBucketSize = Math.log(bucketSize);
double sparsityScore = logAverageBucketSize - logBucketSize;
if (sparsityScore > DATA_SPARSITY_THRESHOLD) {
logger.debug("Sparse bucket {}, this bucket: {} average: {}, sparsity score: {}", pruneBucket, bucketSize,
averageBucketSize, sparsityScore);
dataCountsReporter.reportSparseBucket(pruneBucket * bucketSpan * MS_IN_SECOND);
}
}
// prune the rest if necessary
for (long pruneBucket = lastBucketSparsityCheck; pruneBucket < bucketTimeStamp; ++pruneBucket) {
Counter bucketSizeHolder = movingBucketHistogram.remove(pruneBucket);
long bucketSize = bucketSizeHolder != null ? bucketSizeHolder.get() : 0L;
dataCountsReporter.reportBucket();
lastReportedBucket = pruneBucket;
// substract bucketSize from the counter
movingBucketCount -= bucketSize;
// check if bucket is empty
if (bucketSize == 0L) {
dataCountsReporter.reportEmptyBucket(pruneBucket * bucketSpan * MS_IN_SECOND);
}
}
}
}

View File

@ -45,10 +45,15 @@ public class DataCounts extends ToXContentToBytes implements Writeable {
public static final String INVALID_DATE_COUNT_STR = "invalid_date_count"; public static final String INVALID_DATE_COUNT_STR = "invalid_date_count";
public static final String MISSING_FIELD_COUNT_STR = "missing_field_count"; public static final String MISSING_FIELD_COUNT_STR = "missing_field_count";
public static final String OUT_OF_ORDER_TIME_COUNT_STR = "out_of_order_timestamp_count"; public static final String OUT_OF_ORDER_TIME_COUNT_STR = "out_of_order_timestamp_count";
public static final String EMPTY_BUCKET_COUNT_STR = "empty_bucket_count";
public static final String SPARSE_BUCKET_COUNT_STR = "sparse_bucket_count";
public static final String BUCKET_COUNT_STR = "bucket_count";
public static final String EARLIEST_RECORD_TIME_STR = "earliest_record_timestamp"; public static final String EARLIEST_RECORD_TIME_STR = "earliest_record_timestamp";
public static final String LATEST_RECORD_TIME_STR = "latest_record_timestamp"; public static final String LATEST_RECORD_TIME_STR = "latest_record_timestamp";
public static final String LAST_DATA_TIME_STR = "last_data_time"; public static final String LAST_DATA_TIME_STR = "last_data_time";
public static final String LATEST_EMPTY_BUCKET_TIME_STR = "latest_empty_bucket_timestamp";
public static final String LATEST_SPARSE_BUCKET_TIME_STR = "latest_sparse_bucket_timestamp";
public static final ParseField PROCESSED_RECORD_COUNT = new ParseField(PROCESSED_RECORD_COUNT_STR); public static final ParseField PROCESSED_RECORD_COUNT = new ParseField(PROCESSED_RECORD_COUNT_STR);
public static final ParseField PROCESSED_FIELD_COUNT = new ParseField(PROCESSED_FIELD_COUNT_STR); public static final ParseField PROCESSED_FIELD_COUNT = new ParseField(PROCESSED_FIELD_COUNT_STR);
public static final ParseField INPUT_BYTES = new ParseField(INPUT_BYTES_STR); public static final ParseField INPUT_BYTES = new ParseField(INPUT_BYTES_STR);
@ -57,15 +62,21 @@ public class DataCounts extends ToXContentToBytes implements Writeable {
public static final ParseField INVALID_DATE_COUNT = new ParseField(INVALID_DATE_COUNT_STR); public static final ParseField INVALID_DATE_COUNT = new ParseField(INVALID_DATE_COUNT_STR);
public static final ParseField MISSING_FIELD_COUNT = new ParseField(MISSING_FIELD_COUNT_STR); public static final ParseField MISSING_FIELD_COUNT = new ParseField(MISSING_FIELD_COUNT_STR);
public static final ParseField OUT_OF_ORDER_TIME_COUNT = new ParseField(OUT_OF_ORDER_TIME_COUNT_STR); public static final ParseField OUT_OF_ORDER_TIME_COUNT = new ParseField(OUT_OF_ORDER_TIME_COUNT_STR);
public static final ParseField EMPTY_BUCKET_COUNT = new ParseField(EMPTY_BUCKET_COUNT_STR);
public static final ParseField SPARSE_BUCKET_COUNT = new ParseField(SPARSE_BUCKET_COUNT_STR);
public static final ParseField BUCKET_COUNT = new ParseField(BUCKET_COUNT_STR);
public static final ParseField EARLIEST_RECORD_TIME = new ParseField(EARLIEST_RECORD_TIME_STR); public static final ParseField EARLIEST_RECORD_TIME = new ParseField(EARLIEST_RECORD_TIME_STR);
public static final ParseField LATEST_RECORD_TIME = new ParseField(LATEST_RECORD_TIME_STR); public static final ParseField LATEST_RECORD_TIME = new ParseField(LATEST_RECORD_TIME_STR);
public static final ParseField LAST_DATA_TIME = new ParseField(LAST_DATA_TIME_STR); public static final ParseField LAST_DATA_TIME = new ParseField(LAST_DATA_TIME_STR);
public static final ParseField LATEST_EMPTY_BUCKET_TIME = new ParseField(LATEST_EMPTY_BUCKET_TIME_STR);
public static final ParseField LATEST_SPARSE_BUCKET_TIME = new ParseField(LATEST_SPARSE_BUCKET_TIME_STR);
public static final ParseField TYPE = new ParseField("data_counts"); public static final ParseField TYPE = new ParseField("data_counts");
public static final ConstructingObjectParser<DataCounts, Void> PARSER = public static final ConstructingObjectParser<DataCounts, Void> PARSER =
new ConstructingObjectParser<>("data_counts", a -> new DataCounts((String) a[0], (long) a[1], (long) a[2], (long) a[3], new ConstructingObjectParser<>("data_counts", a -> new DataCounts((String) a[0], (long) a[1], (long) a[2], (long) a[3],
(long) a[4], (long) a[5], (long) a[6], (long) a[7], (Date) a[8], (Date) a[9], (Date) a[10])); (long) a[4], (long) a[5], (long) a[6], (long) a[7], (long) a[8], (long) a[9], (long) a[10],
(Date) a[11], (Date) a[12], (Date) a[13], (Date) a[14], (Date) a[15]));
static { static {
PARSER.declareString(ConstructingObjectParser.constructorArg(), Job.ID); PARSER.declareString(ConstructingObjectParser.constructorArg(), Job.ID);
@ -76,6 +87,9 @@ public class DataCounts extends ToXContentToBytes implements Writeable {
PARSER.declareLong(ConstructingObjectParser.constructorArg(), INVALID_DATE_COUNT); PARSER.declareLong(ConstructingObjectParser.constructorArg(), INVALID_DATE_COUNT);
PARSER.declareLong(ConstructingObjectParser.constructorArg(), MISSING_FIELD_COUNT); PARSER.declareLong(ConstructingObjectParser.constructorArg(), MISSING_FIELD_COUNT);
PARSER.declareLong(ConstructingObjectParser.constructorArg(), OUT_OF_ORDER_TIME_COUNT); PARSER.declareLong(ConstructingObjectParser.constructorArg(), OUT_OF_ORDER_TIME_COUNT);
PARSER.declareLong(ConstructingObjectParser.constructorArg(), EMPTY_BUCKET_COUNT);
PARSER.declareLong(ConstructingObjectParser.constructorArg(), SPARSE_BUCKET_COUNT);
PARSER.declareLong(ConstructingObjectParser.constructorArg(), BUCKET_COUNT);
PARSER.declareField(ConstructingObjectParser.optionalConstructorArg(), p -> { PARSER.declareField(ConstructingObjectParser.optionalConstructorArg(), p -> {
if (p.currentToken() == Token.VALUE_NUMBER) { if (p.currentToken() == Token.VALUE_NUMBER) {
return new Date(p.longValue()); return new Date(p.longValue());
@ -103,6 +117,24 @@ public class DataCounts extends ToXContentToBytes implements Writeable {
throw new IllegalArgumentException( throw new IllegalArgumentException(
"unexpected token [" + p.currentToken() + "] for [" + LAST_DATA_TIME.getPreferredName() + "]"); "unexpected token [" + p.currentToken() + "] for [" + LAST_DATA_TIME.getPreferredName() + "]");
}, LAST_DATA_TIME, ValueType.VALUE); }, LAST_DATA_TIME, ValueType.VALUE);
PARSER.declareField(ConstructingObjectParser.optionalConstructorArg(), p -> {
if (p.currentToken() == Token.VALUE_NUMBER) {
return new Date(p.longValue());
} else if (p.currentToken() == Token.VALUE_STRING) {
return new Date(TimeUtils.dateStringToEpoch(p.text()));
}
throw new IllegalArgumentException(
"unexpected token [" + p.currentToken() + "] for [" + LATEST_EMPTY_BUCKET_TIME.getPreferredName() + "]");
}, LATEST_EMPTY_BUCKET_TIME, ValueType.VALUE);
PARSER.declareField(ConstructingObjectParser.optionalConstructorArg(), p -> {
if (p.currentToken() == Token.VALUE_NUMBER) {
return new Date(p.longValue());
} else if (p.currentToken() == Token.VALUE_STRING) {
return new Date(TimeUtils.dateStringToEpoch(p.text()));
}
throw new IllegalArgumentException(
"unexpected token [" + p.currentToken() + "] for [" + LATEST_SPARSE_BUCKET_TIME.getPreferredName() + "]");
}, LATEST_SPARSE_BUCKET_TIME, ValueType.VALUE);
PARSER.declareLong((t, u) -> {;}, INPUT_RECORD_COUNT); PARSER.declareLong((t, u) -> {;}, INPUT_RECORD_COUNT);
} }
@ -118,14 +150,21 @@ public class DataCounts extends ToXContentToBytes implements Writeable {
private long invalidDateCount; private long invalidDateCount;
private long missingFieldCount; private long missingFieldCount;
private long outOfOrderTimeStampCount; private long outOfOrderTimeStampCount;
private long emptyBucketCount;
private long sparseBucketCount;
private long bucketCount;
// NORELEASE: Use Jodatime instead // NORELEASE: Use Jodatime instead
private Date earliestRecordTimeStamp; private Date earliestRecordTimeStamp;
private Date latestRecordTimeStamp; private Date latestRecordTimeStamp;
private Date lastDataTimeStamp; private Date lastDataTimeStamp;
private Date latestEmptyBucketTimeStamp;
private Date latestSparseBucketTimeStamp;
public DataCounts(String jobId, long processedRecordCount, long processedFieldCount, long inputBytes, public DataCounts(String jobId, long processedRecordCount, long processedFieldCount, long inputBytes,
long inputFieldCount, long invalidDateCount, long missingFieldCount, long outOfOrderTimeStampCount, long inputFieldCount, long invalidDateCount, long missingFieldCount, long outOfOrderTimeStampCount,
Date earliestRecordTimeStamp, Date latestRecordTimeStamp, Date lastDataTimeStamp) { long emptyBucketCount, long sparseBucketCount, long bucketCount,
Date earliestRecordTimeStamp, Date latestRecordTimeStamp, Date lastDataTimeStamp,
Date latestEmptyBucketTimeStamp, Date latestSparseBucketTimeStamp) {
this.jobId = jobId; this.jobId = jobId;
this.processedRecordCount = processedRecordCount; this.processedRecordCount = processedRecordCount;
this.processedFieldCount = processedFieldCount; this.processedFieldCount = processedFieldCount;
@ -134,9 +173,14 @@ public class DataCounts extends ToXContentToBytes implements Writeable {
this.invalidDateCount = invalidDateCount; this.invalidDateCount = invalidDateCount;
this.missingFieldCount = missingFieldCount; this.missingFieldCount = missingFieldCount;
this.outOfOrderTimeStampCount = outOfOrderTimeStampCount; this.outOfOrderTimeStampCount = outOfOrderTimeStampCount;
this.emptyBucketCount = emptyBucketCount;
this.sparseBucketCount = sparseBucketCount;
this.bucketCount = bucketCount;
this.latestRecordTimeStamp = latestRecordTimeStamp; this.latestRecordTimeStamp = latestRecordTimeStamp;
this.earliestRecordTimeStamp = earliestRecordTimeStamp; this.earliestRecordTimeStamp = earliestRecordTimeStamp;
this.lastDataTimeStamp = lastDataTimeStamp; this.lastDataTimeStamp = lastDataTimeStamp;
this.latestEmptyBucketTimeStamp = latestEmptyBucketTimeStamp;
this.latestSparseBucketTimeStamp = latestSparseBucketTimeStamp;
} }
public DataCounts(String jobId) { public DataCounts(String jobId) {
@ -152,9 +196,14 @@ public class DataCounts extends ToXContentToBytes implements Writeable {
invalidDateCount = lhs.invalidDateCount; invalidDateCount = lhs.invalidDateCount;
missingFieldCount = lhs.missingFieldCount; missingFieldCount = lhs.missingFieldCount;
outOfOrderTimeStampCount = lhs.outOfOrderTimeStampCount; outOfOrderTimeStampCount = lhs.outOfOrderTimeStampCount;
emptyBucketCount = lhs.emptyBucketCount;
sparseBucketCount = lhs.sparseBucketCount;
bucketCount = lhs.bucketCount;
latestRecordTimeStamp = lhs.latestRecordTimeStamp; latestRecordTimeStamp = lhs.latestRecordTimeStamp;
earliestRecordTimeStamp = lhs.earliestRecordTimeStamp; earliestRecordTimeStamp = lhs.earliestRecordTimeStamp;
lastDataTimeStamp = lhs.lastDataTimeStamp; lastDataTimeStamp = lhs.lastDataTimeStamp;
latestEmptyBucketTimeStamp = lhs.latestEmptyBucketTimeStamp;
latestSparseBucketTimeStamp = lhs.latestSparseBucketTimeStamp;
} }
public DataCounts(StreamInput in) throws IOException { public DataCounts(StreamInput in) throws IOException {
@ -166,6 +215,9 @@ public class DataCounts extends ToXContentToBytes implements Writeable {
invalidDateCount = in.readVLong(); invalidDateCount = in.readVLong();
missingFieldCount = in.readVLong(); missingFieldCount = in.readVLong();
outOfOrderTimeStampCount = in.readVLong(); outOfOrderTimeStampCount = in.readVLong();
emptyBucketCount = in.readVLong();
sparseBucketCount = in.readVLong();
bucketCount = in.readVLong();
if (in.readBoolean()) { if (in.readBoolean()) {
latestRecordTimeStamp = new Date(in.readVLong()); latestRecordTimeStamp = new Date(in.readVLong());
} }
@ -175,6 +227,12 @@ public class DataCounts extends ToXContentToBytes implements Writeable {
if (in.readBoolean()) { if (in.readBoolean()) {
lastDataTimeStamp = new Date(in.readVLong()); lastDataTimeStamp = new Date(in.readVLong());
} }
if (in.readBoolean()) {
latestEmptyBucketTimeStamp = new Date(in.readVLong());
}
if (in.readBoolean()) {
latestSparseBucketTimeStamp = new Date(in.readVLong());
}
in.readVLong(); // throw away inputRecordCount in.readVLong(); // throw away inputRecordCount
} }
@ -307,6 +365,46 @@ public class DataCounts extends ToXContentToBytes implements Writeable {
outOfOrderTimeStampCount += additional; outOfOrderTimeStampCount += additional;
} }
/**
* The number of buckets with no records in it. Used to measure general data fitness and/or
* configuration problems (bucket span).
*
* @return Number of empty buckets processed by this job {@code long}
*/
public long getEmptyBucketCount() {
return emptyBucketCount;
}
public void incrementEmptyBucketCount(long additional) {
emptyBucketCount += additional;
}
/**
* The number of buckets with few records compared to the overall counts.
* Used to measure general data fitness and/or configuration problems (bucket span).
*
* @return Number of sparse buckets processed by this job {@code long}
*/
public long getSparseBucketCount() {
return sparseBucketCount;
}
public void incrementSparseBucketCount(long additional) {
sparseBucketCount += additional;
}
/**
* The number of buckets overall.
*
* @return Number of buckets processed by this job {@code long}
*/
public long getBucketCount() {
return bucketCount;
}
public void incrementBucketCount(long additional) {
bucketCount += additional;
}
/** /**
* The time of the first record seen. * The time of the first record seen.
* *
@ -357,6 +455,33 @@ public class DataCounts extends ToXContentToBytes implements Writeable {
this.lastDataTimeStamp = lastDataTimeStamp; this.lastDataTimeStamp = lastDataTimeStamp;
} }
/**
* The time of the latest empty bucket seen.
*
* @return Latest empty bucket time
*/
public Date getLatestEmptyBucketTimeStamp() {
return latestEmptyBucketTimeStamp;
}
public void setLatestEmptyBucketTimeStamp(Date latestEmptyBucketTimeStamp) {
this.latestEmptyBucketTimeStamp = latestEmptyBucketTimeStamp;
}
/**
* The time of the latest sparse bucket seen.
*
* @return Latest sparse bucket time
*/
public Date getLatestSparseBucketTimeStamp() {
return latestSparseBucketTimeStamp;
}
public void setLatestSparseBucketTimeStamp(Date latestSparseBucketTimeStamp) {
this.latestSparseBucketTimeStamp = latestSparseBucketTimeStamp;
}
@Override @Override
public void writeTo(StreamOutput out) throws IOException { public void writeTo(StreamOutput out) throws IOException {
out.writeString(jobId); out.writeString(jobId);
@ -367,6 +492,9 @@ public class DataCounts extends ToXContentToBytes implements Writeable {
out.writeVLong(invalidDateCount); out.writeVLong(invalidDateCount);
out.writeVLong(missingFieldCount); out.writeVLong(missingFieldCount);
out.writeVLong(outOfOrderTimeStampCount); out.writeVLong(outOfOrderTimeStampCount);
out.writeVLong(emptyBucketCount);
out.writeVLong(sparseBucketCount);
out.writeVLong(bucketCount);
if (latestRecordTimeStamp != null) { if (latestRecordTimeStamp != null) {
out.writeBoolean(true); out.writeBoolean(true);
out.writeVLong(latestRecordTimeStamp.getTime()); out.writeVLong(latestRecordTimeStamp.getTime());
@ -385,6 +513,18 @@ public class DataCounts extends ToXContentToBytes implements Writeable {
} else { } else {
out.writeBoolean(false); out.writeBoolean(false);
} }
if (latestEmptyBucketTimeStamp != null) {
out.writeBoolean(true);
out.writeVLong(latestEmptyBucketTimeStamp.getTime());
} else {
out.writeBoolean(false);
}
if (latestSparseBucketTimeStamp != null) {
out.writeBoolean(true);
out.writeVLong(latestSparseBucketTimeStamp.getTime());
} else {
out.writeBoolean(false);
}
out.writeVLong(getInputRecordCount()); out.writeVLong(getInputRecordCount());
} }
@ -405,6 +545,9 @@ public class DataCounts extends ToXContentToBytes implements Writeable {
builder.field(INVALID_DATE_COUNT.getPreferredName(), invalidDateCount); builder.field(INVALID_DATE_COUNT.getPreferredName(), invalidDateCount);
builder.field(MISSING_FIELD_COUNT.getPreferredName(), missingFieldCount); builder.field(MISSING_FIELD_COUNT.getPreferredName(), missingFieldCount);
builder.field(OUT_OF_ORDER_TIME_COUNT.getPreferredName(), outOfOrderTimeStampCount); builder.field(OUT_OF_ORDER_TIME_COUNT.getPreferredName(), outOfOrderTimeStampCount);
builder.field(EMPTY_BUCKET_COUNT.getPreferredName(), emptyBucketCount);
builder.field(SPARSE_BUCKET_COUNT.getPreferredName(), sparseBucketCount);
builder.field(BUCKET_COUNT.getPreferredName(), bucketCount);
if (earliestRecordTimeStamp != null) { if (earliestRecordTimeStamp != null) {
builder.dateField(EARLIEST_RECORD_TIME.getPreferredName(), EARLIEST_RECORD_TIME.getPreferredName() + "_string", builder.dateField(EARLIEST_RECORD_TIME.getPreferredName(), EARLIEST_RECORD_TIME.getPreferredName() + "_string",
earliestRecordTimeStamp.getTime()); earliestRecordTimeStamp.getTime());
@ -417,6 +560,14 @@ public class DataCounts extends ToXContentToBytes implements Writeable {
builder.dateField(LAST_DATA_TIME.getPreferredName(), LAST_DATA_TIME.getPreferredName() + "_string", builder.dateField(LAST_DATA_TIME.getPreferredName(), LAST_DATA_TIME.getPreferredName() + "_string",
lastDataTimeStamp.getTime()); lastDataTimeStamp.getTime());
} }
if (latestEmptyBucketTimeStamp != null) {
builder.dateField(LATEST_EMPTY_BUCKET_TIME.getPreferredName(), LATEST_EMPTY_BUCKET_TIME.getPreferredName() + "_string",
latestEmptyBucketTimeStamp.getTime());
}
if (latestSparseBucketTimeStamp != null) {
builder.dateField(LATEST_SPARSE_BUCKET_TIME.getPreferredName(), LATEST_SPARSE_BUCKET_TIME.getPreferredName() + "_string",
latestSparseBucketTimeStamp.getTime());
}
builder.field(INPUT_RECORD_COUNT.getPreferredName(), getInputRecordCount()); builder.field(INPUT_RECORD_COUNT.getPreferredName(), getInputRecordCount());
return builder; return builder;
@ -445,16 +596,21 @@ public class DataCounts extends ToXContentToBytes implements Writeable {
this.invalidDateCount == that.invalidDateCount && this.invalidDateCount == that.invalidDateCount &&
this.missingFieldCount == that.missingFieldCount && this.missingFieldCount == that.missingFieldCount &&
this.outOfOrderTimeStampCount == that.outOfOrderTimeStampCount && this.outOfOrderTimeStampCount == that.outOfOrderTimeStampCount &&
this.emptyBucketCount == that.emptyBucketCount &&
this.sparseBucketCount == that.sparseBucketCount &&
this.bucketCount == that.bucketCount &&
Objects.equals(this.latestRecordTimeStamp, that.latestRecordTimeStamp) && Objects.equals(this.latestRecordTimeStamp, that.latestRecordTimeStamp) &&
Objects.equals(this.earliestRecordTimeStamp, that.earliestRecordTimeStamp) && Objects.equals(this.earliestRecordTimeStamp, that.earliestRecordTimeStamp) &&
Objects.equals(this.lastDataTimeStamp, that.lastDataTimeStamp); Objects.equals(this.lastDataTimeStamp, that.lastDataTimeStamp) &&
Objects.equals(this.latestEmptyBucketTimeStamp, that.latestEmptyBucketTimeStamp) &&
Objects.equals(this.latestSparseBucketTimeStamp, that.latestSparseBucketTimeStamp);
} }
@Override @Override
public int hashCode() { public int hashCode() {
return Objects.hash(jobId, processedRecordCount, processedFieldCount, return Objects.hash(jobId, processedRecordCount, processedFieldCount,
inputBytes, inputFieldCount, invalidDateCount, missingFieldCount, inputBytes, inputFieldCount, invalidDateCount, missingFieldCount,
outOfOrderTimeStampCount, latestRecordTimeStamp, earliestRecordTimeStamp, lastDataTimeStamp); outOfOrderTimeStampCount, lastDataTimeStamp, emptyBucketCount, sparseBucketCount, bucketCount,
latestRecordTimeStamp, earliestRecordTimeStamp, latestEmptyBucketTimeStamp, latestSparseBucketTimeStamp);
} }
} }

View File

@ -10,6 +10,7 @@ import org.elasticsearch.xpack.ml.job.config.AnalysisConfig;
import org.elasticsearch.xpack.ml.job.config.DataDescription; import org.elasticsearch.xpack.ml.job.config.DataDescription;
import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcess; import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcess;
import org.elasticsearch.xpack.ml.job.process.DataCountsReporter; import org.elasticsearch.xpack.ml.job.process.DataCountsReporter;
import org.elasticsearch.xpack.ml.job.process.DataStreamDiagnostics;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
@ -40,6 +41,7 @@ public abstract class AbstractDataToProcessWriter implements DataToProcessWriter
private final Logger logger; private final Logger logger;
private final DateTransformer dateTransformer; private final DateTransformer dateTransformer;
private final DataStreamDiagnostics diagnostics;
protected Map<String, Integer> inFieldIndexes; protected Map<String, Integer> inFieldIndexes;
protected List<InputOutputMap> inputOutputMap; protected List<InputOutputMap> inputOutputMap;
@ -57,6 +59,7 @@ public abstract class AbstractDataToProcessWriter implements DataToProcessWriter
this.analysisConfig = Objects.requireNonNull(analysisConfig); this.analysisConfig = Objects.requireNonNull(analysisConfig);
this.dataCountsReporter = Objects.requireNonNull(dataCountsReporter); this.dataCountsReporter = Objects.requireNonNull(dataCountsReporter);
this.logger = Objects.requireNonNull(logger); this.logger = Objects.requireNonNull(logger);
this.diagnostics = new DataStreamDiagnostics(this.dataCountsReporter, this.analysisConfig, this.logger);
Date date = dataCountsReporter.getLatestRecordTime(); Date date = dataCountsReporter.getLatestRecordTime();
latestEpochMsThisUpload = 0; latestEpochMsThisUpload = 0;
@ -135,10 +138,12 @@ public abstract class AbstractDataToProcessWriter implements DataToProcessWriter
return false; return false;
} }
record[TIME_FIELD_OUT_INDEX] = Long.toString(epochMs / MS_IN_SECOND);
// Records have epoch seconds timestamp so compare for out of order in seconds // Records have epoch seconds timestamp so compare for out of order in seconds
if (epochMs / MS_IN_SECOND < latestEpochMs / MS_IN_SECOND - analysisConfig.getLatency()) { if (epochMs / MS_IN_SECOND < latestEpochMs / MS_IN_SECOND - analysisConfig.getLatency()) {
// out of order // out of order
dataCountsReporter.reportOutOfOrderRecord(inFieldIndexes.size()); dataCountsReporter.reportOutOfOrderRecord(numberOfFieldsRead);
if (epochMs > latestEpochMsThisUpload) { if (epochMs > latestEpochMsThisUpload) {
// record this timestamp even if the record won't be processed // record this timestamp even if the record won't be processed
@ -148,11 +153,12 @@ public abstract class AbstractDataToProcessWriter implements DataToProcessWriter
return false; return false;
} }
record[TIME_FIELD_OUT_INDEX] = Long.toString(epochMs / MS_IN_SECOND);
latestEpochMs = Math.max(latestEpochMs, epochMs); latestEpochMs = Math.max(latestEpochMs, epochMs);
latestEpochMsThisUpload = latestEpochMs; latestEpochMsThisUpload = latestEpochMs;
// check record in diagnostics
diagnostics.checkRecord(epochMs);
autodetectProcess.writeRecord(record); autodetectProcess.writeRecord(record);
dataCountsReporter.reportRecordWritten(numberOfFieldsRead, latestEpochMs); dataCountsReporter.reportRecordWritten(numberOfFieldsRead, latestEpochMs);

View File

@ -106,9 +106,14 @@ public final class ReservedFieldNames {
DataCounts.INVALID_DATE_COUNT.getPreferredName(), DataCounts.INVALID_DATE_COUNT.getPreferredName(),
DataCounts.MISSING_FIELD_COUNT.getPreferredName(), DataCounts.MISSING_FIELD_COUNT.getPreferredName(),
DataCounts.OUT_OF_ORDER_TIME_COUNT.getPreferredName(), DataCounts.OUT_OF_ORDER_TIME_COUNT.getPreferredName(),
DataCounts.EMPTY_BUCKET_COUNT.getPreferredName(),
DataCounts.SPARSE_BUCKET_COUNT.getPreferredName(),
DataCounts.BUCKET_COUNT.getPreferredName(),
DataCounts.LATEST_RECORD_TIME.getPreferredName(), DataCounts.LATEST_RECORD_TIME.getPreferredName(),
DataCounts.EARLIEST_RECORD_TIME.getPreferredName(), DataCounts.EARLIEST_RECORD_TIME.getPreferredName(),
DataCounts.LAST_DATA_TIME.getPreferredName(), DataCounts.LAST_DATA_TIME.getPreferredName(),
DataCounts.LATEST_EMPTY_BUCKET_TIME.getPreferredName(),
DataCounts.LATEST_SPARSE_BUCKET_TIME.getPreferredName(),
Influence.INFLUENCER_FIELD_NAME.getPreferredName(), Influence.INFLUENCER_FIELD_NAME.getPreferredName(),
Influence.INFLUENCER_FIELD_VALUES.getPreferredName(), Influence.INFLUENCER_FIELD_VALUES.getPreferredName(),

View File

@ -37,6 +37,7 @@ public class GetJobStatsActionResponseTests extends AbstractStreamableTestCase<R
String jobId = randomAsciiOfLength(10); String jobId = randomAsciiOfLength(10);
DataCounts dataCounts = new DataCountsTests().createTestInstance(); DataCounts dataCounts = new DataCountsTests().createTestInstance();
ModelSizeStats sizeStats = null; ModelSizeStats sizeStats = null;
if (randomBoolean()) { if (randomBoolean()) {
sizeStats = new ModelSizeStats.Builder("foo").build(); sizeStats = new ModelSizeStats.Builder("foo").build();

View File

@ -8,14 +8,12 @@ package org.elasticsearch.xpack.ml.action;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts; import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCountsTests; import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCountsTests;
import org.elasticsearch.xpack.ml.support.AbstractStreamableTestCase; import org.elasticsearch.xpack.ml.support.AbstractStreamableTestCase;
import org.joda.time.DateTime;
public class PostDataActionResponseTests extends AbstractStreamableTestCase<PostDataAction.Response> { public class PostDataActionResponseTests extends AbstractStreamableTestCase<PostDataAction.Response> {
@Override @Override
protected PostDataAction.Response createTestInstance() { protected PostDataAction.Response createTestInstance() {
DataCounts counts = new DataCountsTests().createTestInstance(); DataCounts counts = new DataCountsTests().createTestInstance();
return new PostDataAction.Response(counts); return new PostDataAction.Response(counts);
} }

View File

@ -183,7 +183,8 @@ public class DatafeedJobRunnerTests extends ESTestCase {
when(dataExtractor.hasNext()).thenReturn(true).thenReturn(false); when(dataExtractor.hasNext()).thenReturn(true).thenReturn(false);
InputStream in = new ByteArrayInputStream("".getBytes(Charset.forName("utf-8"))); InputStream in = new ByteArrayInputStream("".getBytes(Charset.forName("utf-8")));
when(dataExtractor.next()).thenReturn(Optional.of(in)); when(dataExtractor.next()).thenReturn(Optional.of(in));
DataCounts dataCounts = new DataCounts("job_id", 1, 0, 0, 0, 0, 0, 0, new Date(0), new Date(0), new Date(0)); DataCounts dataCounts = new DataCounts("job_id", 1, 0, 0, 0, 0, 0, 0, 0, 0, 0,
new Date(0), new Date(0), new Date(0), new Date(0), new Date(0));
when(jobDataFuture.actionGet()).thenReturn(new PostDataAction.Response(dataCounts)); when(jobDataFuture.actionGet()).thenReturn(new PostDataAction.Response(dataCounts));
Consumer<Exception> handler = mockConsumer(); Consumer<Exception> handler = mockConsumer();
StartDatafeedAction.DatafeedTask task = createDatafeedTask("datafeed_id", 0L, 60000L); StartDatafeedAction.DatafeedTask task = createDatafeedTask("datafeed_id", 0L, 60000L);
@ -209,7 +210,8 @@ public class DatafeedJobRunnerTests extends ESTestCase {
when(dataExtractorFactory.newExtractor(0L, 60000L)).thenReturn(dataExtractor); when(dataExtractorFactory.newExtractor(0L, 60000L)).thenReturn(dataExtractor);
when(dataExtractor.hasNext()).thenReturn(true).thenReturn(false); when(dataExtractor.hasNext()).thenReturn(true).thenReturn(false);
when(dataExtractor.next()).thenThrow(new RuntimeException("dummy")); when(dataExtractor.next()).thenThrow(new RuntimeException("dummy"));
DataCounts dataCounts = new DataCounts("job_id", 1, 0, 0, 0, 0, 0, 0, new Date(0), new Date(0), new Date(0)); DataCounts dataCounts = new DataCounts("job_id", 1, 0, 0, 0, 0, 0, 0, 0, 0, 0,
new Date(0), new Date(0), new Date(0), new Date(0), new Date(0));
when(jobDataFuture.actionGet()).thenReturn(new PostDataAction.Response(dataCounts)); when(jobDataFuture.actionGet()).thenReturn(new PostDataAction.Response(dataCounts));
Consumer<Exception> handler = mockConsumer(); Consumer<Exception> handler = mockConsumer();
StartDatafeedAction.DatafeedTask task = createDatafeedTask("datafeed_id", 0L, 60000L); StartDatafeedAction.DatafeedTask task = createDatafeedTask("datafeed_id", 0L, 60000L);
@ -263,7 +265,8 @@ public class DatafeedJobRunnerTests extends ESTestCase {
when(dataExtractor.hasNext()).thenReturn(true).thenReturn(false); when(dataExtractor.hasNext()).thenReturn(true).thenReturn(false);
InputStream in = new ByteArrayInputStream("".getBytes(Charset.forName("utf-8"))); InputStream in = new ByteArrayInputStream("".getBytes(Charset.forName("utf-8")));
when(dataExtractor.next()).thenReturn(Optional.of(in)); when(dataExtractor.next()).thenReturn(Optional.of(in));
DataCounts dataCounts = new DataCounts("job_id", 1, 0, 0, 0, 0, 0, 0, new Date(0), new Date(0), new Date(0)); DataCounts dataCounts = new DataCounts("job_id", 1, 0, 0, 0, 0, 0, 0, 0, 0, 0,
new Date(0), new Date(0), new Date(0), new Date(0), new Date(0));
when(jobDataFuture.actionGet()).thenReturn(new PostDataAction.Response(dataCounts)); when(jobDataFuture.actionGet()).thenReturn(new PostDataAction.Response(dataCounts));
Consumer<Exception> handler = mockConsumer(); Consumer<Exception> handler = mockConsumer();
boolean cancelled = randomBoolean(); boolean cancelled = randomBoolean();

View File

@ -63,7 +63,8 @@ public class DatafeedJobTests extends ESTestCase {
when(dataExtractor.hasNext()).thenReturn(true).thenReturn(false); when(dataExtractor.hasNext()).thenReturn(true).thenReturn(false);
InputStream inputStream = new ByteArrayInputStream("content".getBytes(StandardCharsets.UTF_8)); InputStream inputStream = new ByteArrayInputStream("content".getBytes(StandardCharsets.UTF_8));
when(dataExtractor.next()).thenReturn(Optional.of(inputStream)); when(dataExtractor.next()).thenReturn(Optional.of(inputStream));
DataCounts dataCounts = new DataCounts("_job_id", 1, 0, 0, 0, 0, 0, 0, new Date(0), new Date(0), new Date(0)); DataCounts dataCounts = new DataCounts("_job_id", 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, new Date(0), new Date(0),
new Date(0), new Date(0), new Date(0));
PostDataAction.Request expectedRequest = new PostDataAction.Request("_job_id"); PostDataAction.Request expectedRequest = new PostDataAction.Request("_job_id");
expectedRequest.setDataDescription(dataDescription.build()); expectedRequest.setDataDescription(dataDescription.build());

View File

@ -73,7 +73,8 @@ public class DataCountsReporterTests extends ESTestCase {
} }
public void testComplexConstructor() throws Exception { public void testComplexConstructor() throws Exception {
DataCounts counts = new DataCounts("foo", 1L, 1L, 2L, 0L, 3L, 4L, 5L, new Date(), new Date(), new Date()); DataCounts counts = new DataCounts("foo", 1L, 1L, 2L, 0L, 3L, 4L, 5L, 6L, 7L, 8L,
new Date(), new Date(), new Date(), new Date(), new Date());
try (DataCountsReporter dataCountsReporter = try (DataCountsReporter dataCountsReporter =
new DataCountsReporter(threadPool, settings, JOB_ID, counts, jobDataCountsPersister)) { new DataCountsReporter(threadPool, settings, JOB_ID, counts, jobDataCountsPersister)) {
@ -86,6 +87,9 @@ public class DataCountsReporterTests extends ESTestCase {
assertEquals(3, dataCountsReporter.getDateParseErrorsCount()); assertEquals(3, dataCountsReporter.getDateParseErrorsCount());
assertEquals(4, dataCountsReporter.getMissingFieldErrorCount()); assertEquals(4, dataCountsReporter.getMissingFieldErrorCount());
assertEquals(5, dataCountsReporter.getOutOfOrderRecordCount()); assertEquals(5, dataCountsReporter.getOutOfOrderRecordCount());
assertEquals(6, dataCountsReporter.getEmptyBucketCount());
assertEquals(7, dataCountsReporter.getSparseBucketCount());
assertEquals(8, dataCountsReporter.getBucketCount());
assertNull(stats.getEarliestRecordTimeStamp()); assertNull(stats.getEarliestRecordTimeStamp());
} }
} }
@ -248,9 +252,9 @@ public class DataCountsReporterTests extends ESTestCase {
jobDataCountsPersister)) { jobDataCountsPersister)) {
dataCountsReporter.setAnalysedFieldsPerRecord(3); dataCountsReporter.setAnalysedFieldsPerRecord(3);
Date now = new Date(); Date now = new Date();
DataCounts dc = new DataCounts(JOB_ID, 2L, 5L, 0L, 10L, 0L, 1L, 0L, new Date(2000), new Date(3000), now); DataCounts dc = new DataCounts(JOB_ID, 2L, 5L, 0L, 10L, 0L, 1L, 0L, 0L, 0L, 0L, new Date(2000), new Date(3000),
now, (Date) null, (Date) null);
dataCountsReporter.reportRecordWritten(5, 2000); dataCountsReporter.reportRecordWritten(5, 2000);
dataCountsReporter.reportRecordWritten(5, 3000); dataCountsReporter.reportRecordWritten(5, 3000);
dataCountsReporter.reportMissingField(); dataCountsReporter.reportMissingField();
@ -309,5 +313,7 @@ public class DataCountsReporterTests extends ESTestCase {
assertEquals(0L, stats.getInvalidDateCount()); assertEquals(0L, stats.getInvalidDateCount());
assertEquals(0L, stats.getMissingFieldCount()); assertEquals(0L, stats.getMissingFieldCount());
assertEquals(0L, stats.getOutOfOrderTimeStampCount()); assertEquals(0L, stats.getOutOfOrderTimeStampCount());
assertEquals(0L, stats.getEmptyBucketCount());
assertEquals(0L, stats.getSparseBucketCount());;
} }
} }

View File

@ -0,0 +1,278 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.job.process;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.ml.job.config.AnalysisConfig;
import org.elasticsearch.xpack.ml.job.config.Detector;
import org.junit.Before;
import java.io.IOException;
import java.util.Arrays;
import java.util.Date;
public class DataStreamDiagnosticsTests extends ESTestCase {
private AnalysisConfig analysisConfig;
private Logger logger;
@Before
public void setUpMocks() throws IOException {
logger = Loggers.getLogger(DataStreamDiagnosticsTests.class);
AnalysisConfig.Builder acBuilder = new AnalysisConfig.Builder(Arrays.asList(new Detector.Builder("metric", "field").build()));
acBuilder.setBucketSpan(60L);
analysisConfig = acBuilder.build();
}
public void testSimple() {
DataCountsReporter dataCountsReporter = new DummyDataCountsReporter();
DataStreamDiagnostics d = new DataStreamDiagnostics(dataCountsReporter, analysisConfig, logger);
d.checkRecord(70000);
d.checkRecord(130000);
d.checkRecord(190000);
d.checkRecord(250000);
d.checkRecord(310000);
d.checkRecord(370000);
d.checkRecord(430000);
d.checkRecord(490000);
d.checkRecord(550000);
d.checkRecord(610000);
d.flush();
assertEquals(10, dataCountsReporter.getBucketCount());
assertEquals(0, dataCountsReporter.getEmptyBucketCount());
assertEquals(0, dataCountsReporter.getSparseBucketCount());
assertEquals(null, dataCountsReporter.getLatestSparseBucketTime());
assertEquals(null, dataCountsReporter.getLatestEmptyBucketTime());
}
public void testEmptyBuckets() {
DataCountsReporter dataCountsReporter = new DummyDataCountsReporter();
DataStreamDiagnostics d = new DataStreamDiagnostics(dataCountsReporter, analysisConfig, logger);
d.checkRecord(10000);
d.checkRecord(70000);
// empty bucket
d.checkRecord(190000);
d.checkRecord(250000);
d.checkRecord(310000);
d.checkRecord(370000);
// empty bucket
d.checkRecord(490000);
d.checkRecord(550000);
d.flush();
assertEquals(10, dataCountsReporter.getBucketCount());
assertEquals(2, dataCountsReporter.getEmptyBucketCount());
assertEquals(0, dataCountsReporter.getSparseBucketCount());
assertEquals(null, dataCountsReporter.getLatestSparseBucketTime());
assertEquals(new Date(420000), dataCountsReporter.getLatestEmptyBucketTime());
}
public void testEmptyBucketsStartLater() {
DataCountsReporter dataCountsReporter = new DummyDataCountsReporter();
DataStreamDiagnostics d = new DataStreamDiagnostics(dataCountsReporter, analysisConfig, logger);
d.checkRecord(1110000);
d.checkRecord(1170000);
// empty bucket
d.checkRecord(1290000);
d.checkRecord(1350000);
d.checkRecord(1410000);
d.checkRecord(1470000);
// empty bucket
d.checkRecord(1590000);
d.checkRecord(1650000);
d.flush();
assertEquals(10, dataCountsReporter.getBucketCount());
assertEquals(2, dataCountsReporter.getEmptyBucketCount());
assertEquals(0, dataCountsReporter.getSparseBucketCount());
assertEquals(null, dataCountsReporter.getLatestSparseBucketTime());
assertEquals(new Date(1500000), dataCountsReporter.getLatestEmptyBucketTime());
}
public void testSparseBuckets() {
DataCountsReporter dataCountsReporter = new DummyDataCountsReporter();
DataStreamDiagnostics d = new DataStreamDiagnostics(dataCountsReporter, analysisConfig, logger);
sendManyDataPoints(d, 10000, 69000, 1000);
sendManyDataPoints(d, 70000, 129000, 1200);
// sparse bucket
sendManyDataPoints(d, 130000, 189000, 1);
sendManyDataPoints(d, 190000, 249000, 1100);
sendManyDataPoints(d, 250000, 309000, 1300);
sendManyDataPoints(d, 310000, 369000, 1050);
sendManyDataPoints(d, 370000, 429000, 1022);
// sparse bucket
sendManyDataPoints(d, 430000, 489000, 10);
sendManyDataPoints(d, 490000, 549000, 1333);
sendManyDataPoints(d, 550000, 609000, 1400);
d.flush();
assertEquals(10, dataCountsReporter.getBucketCount());
assertEquals(0, dataCountsReporter.getEmptyBucketCount());
assertEquals(2, dataCountsReporter.getSparseBucketCount());
assertEquals(new Date(420000), dataCountsReporter.getLatestSparseBucketTime());
assertEquals(null, dataCountsReporter.getLatestEmptyBucketTime());
}
/**
* Test for sparsity on the last bucket should not create a sparse bucket
* signal
*/
public void testSparseBucketsLast() {
DataCountsReporter dataCountsReporter = new DummyDataCountsReporter();
DataStreamDiagnostics d = new DataStreamDiagnostics(dataCountsReporter, analysisConfig, logger);
sendManyDataPoints(d, 10000, 69000, 1000);
sendManyDataPoints(d, 70000, 129000, 1200);
// sparse bucket
sendManyDataPoints(d, 130000, 189000, 1);
sendManyDataPoints(d, 190000, 249000, 1100);
sendManyDataPoints(d, 250000, 309000, 1300);
sendManyDataPoints(d, 310000, 369000, 1050);
sendManyDataPoints(d, 370000, 429000, 1022);
sendManyDataPoints(d, 430000, 489000, 1400);
sendManyDataPoints(d, 490000, 549000, 1333);
// sparse bucket (but last one)
sendManyDataPoints(d, 550000, 609000, 10);
d.flush();
assertEquals(10, dataCountsReporter.getBucketCount());
assertEquals(0, dataCountsReporter.getEmptyBucketCount());
assertEquals(1, dataCountsReporter.getSparseBucketCount());
assertEquals(new Date(120000), dataCountsReporter.getLatestSparseBucketTime());
assertEquals(null, dataCountsReporter.getLatestEmptyBucketTime());
}
/**
* Test for sparsity on the last 2 buckets, should create a sparse bucket
* signal on the 2nd to last
*/
public void testSparseBucketsLastTwo() {
DataCountsReporter dataCountsReporter = new DummyDataCountsReporter();
DataStreamDiagnostics d = new DataStreamDiagnostics(dataCountsReporter, analysisConfig, logger);
sendManyDataPoints(d, 10000, 69000, 1000);
sendManyDataPoints(d, 70000, 129000, 1200);
// sparse bucket
sendManyDataPoints(d, 130000, 189000, 1);
sendManyDataPoints(d, 190000, 249000, 1100);
sendManyDataPoints(d, 250000, 309000, 1300);
sendManyDataPoints(d, 310000, 369000, 1050);
sendManyDataPoints(d, 370000, 429000, 1022);
sendManyDataPoints(d, 430000, 489000, 1400);
// sparse bucket (2nd to last one)
sendManyDataPoints(d, 490000, 549000, 9);
// sparse bucket (but last one)
sendManyDataPoints(d, 550000, 609000, 10);
d.flush();
assertEquals(10, dataCountsReporter.getBucketCount());
assertEquals(0, dataCountsReporter.getEmptyBucketCount());
assertEquals(2, dataCountsReporter.getSparseBucketCount());
assertEquals(new Date(480000), dataCountsReporter.getLatestSparseBucketTime());
assertEquals(null, dataCountsReporter.getLatestEmptyBucketTime());
}
public void testMixedEmptyAndSparseBuckets() {
DataCountsReporter dataCountsReporter = new DummyDataCountsReporter();
DataStreamDiagnostics d = new DataStreamDiagnostics(dataCountsReporter, analysisConfig, logger);
sendManyDataPoints(d, 10000, 69000, 1000);
sendManyDataPoints(d, 70000, 129000, 1200);
// sparse bucket
sendManyDataPoints(d, 130000, 189000, 1);
// empty bucket
sendManyDataPoints(d, 250000, 309000, 1300);
sendManyDataPoints(d, 310000, 369000, 1050);
sendManyDataPoints(d, 370000, 429000, 1022);
// sparse bucket
sendManyDataPoints(d, 430000, 489000, 10);
// empty bucket
sendManyDataPoints(d, 550000, 609000, 1400);
d.flush();
assertEquals(10, dataCountsReporter.getBucketCount());
assertEquals(2, dataCountsReporter.getSparseBucketCount());
assertEquals(new Date(420000), dataCountsReporter.getLatestSparseBucketTime());
assertEquals(2, dataCountsReporter.getEmptyBucketCount());
assertEquals(new Date(480000), dataCountsReporter.getLatestEmptyBucketTime());
}
/**
* Send signals, then make a long pause, send another signal and then check
* whether counts are right.
*/
public void testEmptyBucketsLongerOutage() {
DataCountsReporter dataCountsReporter = new DummyDataCountsReporter();
DataStreamDiagnostics d = new DataStreamDiagnostics(dataCountsReporter, analysisConfig, logger);
d.checkRecord(10000);
d.checkRecord(70000);
// empty bucket
d.checkRecord(190000);
d.checkRecord(250000);
d.checkRecord(310000);
d.checkRecord(370000);
// empty bucket
d.checkRecord(490000);
d.checkRecord(550000);
// 98 empty buckets
d.checkRecord(6490000);
d.flush();
assertEquals(109, dataCountsReporter.getBucketCount());
assertEquals(100, dataCountsReporter.getEmptyBucketCount());
assertEquals(0, dataCountsReporter.getSparseBucketCount());
assertEquals(null, dataCountsReporter.getLatestSparseBucketTime());
assertEquals(new Date(6420000), dataCountsReporter.getLatestEmptyBucketTime());
}
/**
* Send signals, make a longer period of sparse signals, then go up again
*
* The number of sparse buckets should not be to much, it could be normal.
*/
public void testSparseBucketsLongerPeriod() {
DataCountsReporter dataCountsReporter = new DummyDataCountsReporter();
DataStreamDiagnostics d = new DataStreamDiagnostics(dataCountsReporter, analysisConfig, logger);
sendManyDataPoints(d, 10000, 69000, 1000);
sendManyDataPoints(d, 70000, 129000, 1200);
// sparse bucket
sendManyDataPoints(d, 130000, 189000, 1);
sendManyDataPoints(d, 190000, 249000, 1100);
sendManyDataPoints(d, 250000, 309000, 1300);
sendManyDataPoints(d, 310000, 369000, 1050);
sendManyDataPoints(d, 370000, 429000, 1022);
// sparse bucket
sendManyDataPoints(d, 430000, 489000, 10);
sendManyDataPoints(d, 490000, 549000, 1333);
sendManyDataPoints(d, 550000, 609000, 1400);
d.flush();
assertEquals(10, dataCountsReporter.getBucketCount());
assertEquals(0, dataCountsReporter.getEmptyBucketCount());
assertEquals(2, dataCountsReporter.getSparseBucketCount());
assertEquals(new Date(420000), dataCountsReporter.getLatestSparseBucketTime());
assertEquals(null, dataCountsReporter.getLatestEmptyBucketTime());
}
private void sendManyDataPoints(DataStreamDiagnostics d, long recordTimestampInMsMin, long recordTimestampInMsMax, long howMuch) {
long range = recordTimestampInMsMax - recordTimestampInMsMin;
for (int i = 0; i < howMuch; ++i) {
d.checkRecord(recordTimestampInMsMin + i % range);
}
}
}

View File

@ -21,6 +21,8 @@ public class DataCountsTests extends AbstractSerializingTestCase<DataCounts> {
return new DataCounts(randomAsciiOfLength(10), randomIntBetween(1, 1_000_000), return new DataCounts(randomAsciiOfLength(10), randomIntBetween(1, 1_000_000),
randomIntBetween(1, 1_000_000), randomIntBetween(1, 1_000_000), randomIntBetween(1, 1_000_000), randomIntBetween(1, 1_000_000), randomIntBetween(1, 1_000_000), randomIntBetween(1, 1_000_000),
randomIntBetween(1, 1_000_000), randomIntBetween(1, 1_000_000), randomIntBetween(1, 1_000_000), randomIntBetween(1, 1_000_000), randomIntBetween(1, 1_000_000), randomIntBetween(1, 1_000_000),
randomIntBetween(1, 1_000_000), randomIntBetween(1, 1_000_000), randomIntBetween(1, 1_000_000),
new DateTime(randomDateTimeZone()).toDate(), new DateTime(randomDateTimeZone()).toDate(),
new DateTime(randomDateTimeZone()).toDate(), new DateTime(randomDateTimeZone()).toDate(), new DateTime(randomDateTimeZone()).toDate(), new DateTime(randomDateTimeZone()).toDate(),
new DateTime(randomDateTimeZone()).toDate()); new DateTime(randomDateTimeZone()).toDate());
} }
@ -36,22 +38,22 @@ public class DataCountsTests extends AbstractSerializingTestCase<DataCounts> {
} }
public void testCountsEquals_GivenEqualCounts() { public void testCountsEquals_GivenEqualCounts() {
DataCounts counts1 = createCounts(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); DataCounts counts1 = createCounts(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12);
DataCounts counts2 = createCounts(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); DataCounts counts2 = createCounts(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12);
assertTrue(counts1.equals(counts2)); assertTrue(counts1.equals(counts2));
assertTrue(counts2.equals(counts1)); assertTrue(counts2.equals(counts1));
} }
public void testCountsHashCode_GivenEqualCounts() { public void testCountsHashCode_GivenEqualCounts() {
DataCounts counts1 = createCounts(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); DataCounts counts1 = createCounts(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12);
DataCounts counts2 = createCounts(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); DataCounts counts2 = createCounts(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12);
assertEquals(counts1.hashCode(), counts2.hashCode()); assertEquals(counts1.hashCode(), counts2.hashCode());
} }
public void testCountsCopyConstructor() { public void testCountsCopyConstructor() {
DataCounts counts1 = createCounts(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); DataCounts counts1 = createCounts(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12);
DataCounts counts2 = new DataCounts(counts1); DataCounts counts2 = new DataCounts(counts1);
assertEquals(counts1.hashCode(), counts2.hashCode()); assertEquals(counts1.hashCode(), counts2.hashCode());
@ -63,7 +65,7 @@ public class DataCountsTests extends AbstractSerializingTestCase<DataCounts> {
} }
public void testCountCopyCreatedFieldsNotZero() throws Exception { public void testCountCopyCreatedFieldsNotZero() throws Exception {
DataCounts counts1 = createCounts(1, 200, 400, 3, 4, 5, 6, 1479211200000L, 1479384000000L, 1488282343000L); DataCounts counts1 = createCounts(1, 200, 400, 3, 4, 5, 6, 7, 8, 9, 1479211200000L, 1479384000000L);
assertAllFieldsGreaterThanZero(counts1); assertAllFieldsGreaterThanZero(counts1);
DataCounts counts2 = new DataCounts(counts1); DataCounts counts2 = new DataCounts(counts1);
@ -102,19 +104,22 @@ public class DataCountsTests extends AbstractSerializingTestCase<DataCounts> {
} }
public void testCalcProcessedFieldCount() { public void testCalcProcessedFieldCount() {
DataCounts counts = new DataCounts(randomAsciiOfLength(16), 10L, 0L, 0L, 0L, 0L, 0L, 0L, new Date(), new Date(), new Date()); DataCounts counts = new DataCounts(randomAsciiOfLength(16), 10L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, new Date(), new Date(),
new Date(), new Date(), new Date());
counts.calcProcessedFieldCount(3); counts.calcProcessedFieldCount(3);
assertEquals(30, counts.getProcessedFieldCount()); assertEquals(30, counts.getProcessedFieldCount());
counts = new DataCounts(randomAsciiOfLength(16), 10L, 0L, 0L, 0L, 0L, 5L, 0L, new Date(), new Date(), new Date()); counts = new DataCounts(randomAsciiOfLength(16), 10L, 0L, 0L, 0L, 0L, 5L, 0L, 0L, 0L, 0L, new Date(), new Date(),
new Date(), new Date(), new Date());
counts.calcProcessedFieldCount(3); counts.calcProcessedFieldCount(3);
assertEquals(25, counts.getProcessedFieldCount()); assertEquals(25, counts.getProcessedFieldCount());
} }
public void testEquals() { public void testEquals() {
DataCounts counts1 = new DataCounts( DataCounts counts1 = new DataCounts(
randomAsciiOfLength(16), 10L, 5000L, 2000L, 300L, 6L, 15L, 0L, new Date(), new Date(1435000000L), new Date(10L)); randomAsciiOfLength(16), 10L, 5000L, 2000L, 300L, 6L, 15L, 0L, 0L, 0L, 0L, new Date(), new Date(1435000000L),
new Date(), new Date(), new Date());
DataCounts counts2 = new DataCounts(counts1); DataCounts counts2 = new DataCounts(counts1);
assertEquals(counts1, counts2); assertEquals(counts1, counts2);
@ -156,12 +161,15 @@ public class DataCountsTests extends AbstractSerializingTestCase<DataCounts> {
private static DataCounts createCounts( private static DataCounts createCounts(
long processedRecordCount, long processedFieldCount, long inputBytes, long inputFieldCount, long processedRecordCount, long processedFieldCount, long inputBytes, long inputFieldCount,
long invalidDateCount, long missingFieldCount, long outOfOrderTimeStampCount, long earliestRecordTime, long latestRecordTime, long invalidDateCount, long missingFieldCount, long outOfOrderTimeStampCount,
long lastDataTime) { long emptyBucketCount, long sparseBucketCount, long bucketCount,
long earliestRecordTime, long latestRecordTime) {
DataCounts counts = new DataCounts("foo", processedRecordCount, processedFieldCount, inputBytes, DataCounts counts = new DataCounts("foo", processedRecordCount, processedFieldCount, inputBytes,
inputFieldCount, invalidDateCount, missingFieldCount, outOfOrderTimeStampCount, inputFieldCount, invalidDateCount, missingFieldCount, outOfOrderTimeStampCount,
new Date(earliestRecordTime), new Date(latestRecordTime), new Date(lastDataTime)); emptyBucketCount, sparseBucketCount, bucketCount,
new Date(earliestRecordTime), new Date(latestRecordTime),
new Date(), new Date(), new Date());
return counts; return counts;
} }