[ML] Refactor DataStreamDiagnostics to use array (#30129)

This commit refactors the DataStreamDiagnostics class
achieving the following advantages:

- simpler code; by encapsulating the moving bucket histogram
into its own class
- better performance; by using an array to store the buckets
instead of a map
- explicit handling of gap buckets; in preparation of fixing #30080
This commit is contained in:
Dimitris Athanasiou 2018-05-01 09:50:32 +01:00 committed by GitHub
parent acdf330a0e
commit 057cdffed5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 319 additions and 230 deletions

View File

@ -12,8 +12,9 @@ import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.persistence.JobDataCountsPersister;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts;
import org.elasticsearch.xpack.ml.job.persistence.JobDataCountsPersister;
import org.elasticsearch.xpack.ml.job.process.diagnostics.DataStreamDiagnostics;
import java.util.Date;
import java.util.Locale;

View File

@ -1,223 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.job.process;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.util.Counter;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import java.util.Date;
import java.util.SortedMap;
import java.util.TreeMap;
public class DataStreamDiagnostics {
/**
* Minimum window to take into consideration for bucket count histogram.
*/
private static final int MIN_BUCKET_WINDOW = 10;
/**
* Threshold to report potential sparsity problems.
*
* Sparsity score is calculated: log(average) - log(current)
*
* If score is above the threshold, bucket is reported as sparse bucket.
*/
private static final int DATA_SPARSITY_THRESHOLD = 2;
private static final long MS_IN_SECOND = 1000;
private static final Logger LOGGER = Loggers.getLogger(DataStreamDiagnostics.class);
/**
* Container for the histogram
*
* Note: Using a sorted map in order to iterate in order when consuming the
* data. The counter is lazily initialized and potentially missing in case
* of empty buckets.
*
* The container gets pruned along the data streaming based on the bucket
* window, so it should not contain more than max(MIN_BUCKET_WINDOW,
* 'buckets_required_by_latency') + 1 items at any time.
*
* Sparsity can only be calculated after the window has been filled. Currently
* this window is lost if a job gets closed and re-opened. We might fix this
* in future.
*/
private final SortedMap<Long, Counter> movingBucketHistogram = new TreeMap<>();
private final long bucketSpan;
private final long latency;
private long movingBucketCount = 0;
private long latestReportedBucket = -1;
private long bucketCount = 0;
private long emptyBucketCount = 0;
private long latestEmptyBucketTime = -1;
private long sparseBucketCount = 0;
private long latestSparseBucketTime = -1;
public DataStreamDiagnostics(Job job) {
bucketSpan = job.getAnalysisConfig().getBucketSpan().seconds();
latency = job.getAnalysisConfig().getLatency() == null ? 0 : job.getAnalysisConfig().getLatency().seconds();
}
/**
* Check record
*
* @param recordTimestampInMs
* The record timestamp in milliseconds since epoch
*/
public void checkRecord(long recordTimestampInMs) {
checkBucketing(recordTimestampInMs);
}
/**
* Flush all counters, should be called at the end of the data stream
*/
public void flush() {
// flush all we know
if (movingBucketHistogram.isEmpty() == false) {
flush(movingBucketHistogram.lastKey());
}
}
/**
* Check bucketing of record. Report empty and sparse buckets.
*
* @param recordTimestampInMs
* The record timestamp in milliseconds since epoch
*/
private void checkBucketing(long recordTimestampInMs) {
long bucket = (recordTimestampInMs / MS_IN_SECOND) / bucketSpan;
long bucketHistogramStartBucket = ((recordTimestampInMs / MS_IN_SECOND) - latency) / bucketSpan;
bucketHistogramStartBucket = Math.min(bucket - MIN_BUCKET_WINDOW, bucketHistogramStartBucket);
movingBucketHistogram.computeIfAbsent(bucket, l -> Counter.newCounter()).addAndGet(1);
++movingBucketCount;
// find the very first bucket
if (latestReportedBucket == -1) {
latestReportedBucket = bucket - 1;
}
// flush all bucket out of the window
flush(bucketHistogramStartBucket);
}
/**
* Flush Bucket reporting till the given bucket.
*
* @param bucketNumber
* The number of the last bucket that can be flushed.
*/
private void flush(long bucketNumber) {
// check for a longer period of empty buckets
long emptyBuckets = movingBucketHistogram.firstKey() - latestReportedBucket - 1;
if (emptyBuckets > 0) {
bucketCount += emptyBuckets;
emptyBucketCount += emptyBuckets;
latestEmptyBucketTime = (movingBucketHistogram.firstKey() - 1) * bucketSpan * MS_IN_SECOND;
latestReportedBucket = movingBucketHistogram.firstKey() - 1;
}
// calculate the average number of data points in a bucket based on the
// current history
double averageBucketSize = (float) movingBucketCount / movingBucketHistogram.size();
// prune all buckets that can be flushed
long lastBucketSparsityCheck = Math.min(bucketNumber, movingBucketHistogram.lastKey());
for (long pruneBucket = movingBucketHistogram.firstKey(); pruneBucket < lastBucketSparsityCheck; ++pruneBucket) {
Counter bucketSizeHolder = movingBucketHistogram.remove(pruneBucket);
long bucketSize = bucketSizeHolder != null ? bucketSizeHolder.get() : 0L;
LOGGER.debug("Checking bucket {} compare sizes, this bucket: {} average: {}", pruneBucket, bucketSize, averageBucketSize);
++bucketCount;
latestReportedBucket = pruneBucket;
// substract bucketSize from the counter
movingBucketCount -= bucketSize;
// check if bucket is empty
if (bucketSize == 0L) {
latestEmptyBucketTime = pruneBucket * bucketSpan * MS_IN_SECOND;
++emptyBucketCount;
// do not do sparse analysis on an empty bucket
continue;
}
// simplistic way to calculate data sparsity, just take the log and
// check the difference
double logAverageBucketSize = Math.log(averageBucketSize);
double logBucketSize = Math.log(bucketSize);
double sparsityScore = logAverageBucketSize - logBucketSize;
if (sparsityScore > DATA_SPARSITY_THRESHOLD) {
LOGGER.debug("Sparse bucket {}, this bucket: {} average: {}, sparsity score: {}", pruneBucket, bucketSize,
averageBucketSize, sparsityScore);
++sparseBucketCount;
latestSparseBucketTime = pruneBucket * bucketSpan * MS_IN_SECOND;
}
}
// prune the rest if necessary
for (long pruneBucket = lastBucketSparsityCheck; pruneBucket < bucketNumber; ++pruneBucket) {
Counter bucketSizeHolder = movingBucketHistogram.remove(pruneBucket);
long bucketSize = bucketSizeHolder != null ? bucketSizeHolder.get() : 0L;
bucketCount++;
latestReportedBucket = pruneBucket;
// substract bucketSize from the counter
movingBucketCount -= bucketSize;
// check if bucket is empty
if (bucketSize == 0L) {
latestEmptyBucketTime = pruneBucket * bucketSpan * MS_IN_SECOND;
++emptyBucketCount;
}
}
}
public long getBucketCount() {
return bucketCount;
}
public long getEmptyBucketCount() {
return emptyBucketCount;
}
public Date getLatestEmptyBucketTime() {
return latestEmptyBucketTime > 0 ? new Date(latestEmptyBucketTime) : null;
}
public long getSparseBucketCount() {
return sparseBucketCount;
}
public Date getLatestSparseBucketTime() {
return latestSparseBucketTime > 0 ? new Date(latestSparseBucketTime) : null;
}
/**
* Resets counts,
*
* Note: This does not reset the inner state for e.g. sparse bucket
* detection.
*
*/
public void resetCounts() {
bucketCount = 0;
emptyBucketCount = 0;
sparseBucketCount = 0;
}
}

View File

@ -0,0 +1,132 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.job.process.diagnostics;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.utils.Intervals;
/**
* A moving window of buckets that allow keeping
* track of some statistics like the bucket count,
* empty or sparse buckets, etc.
*
* The counts are stored in an array that functions as a
* circular buffer. When time is advanced, all buckets
* out of the window are flushed.
*/
class BucketDiagnostics {
private static final int MIN_BUCKETS = 10;
private final long bucketSpanMs;
private final long latencyMs;
private final int maxSize;
private final long[] buckets;
private long movingBucketCount = 0;
private long latestBucketStartMs = -1;
private int latestBucketIndex;
private long earliestBucketStartMs = -1;
private int earliestBucketIndex;
private long latestFlushedBucketStartMs = -1;
private final BucketFlushListener bucketFlushListener;
BucketDiagnostics(Job job, BucketFlushListener bucketFlushListener) {
bucketSpanMs = job.getAnalysisConfig().getBucketSpan().millis();
latencyMs = job.getAnalysisConfig().getLatency() == null ? 0 : job.getAnalysisConfig().getLatency().millis();
maxSize = Math.max((int) (Intervals.alignToCeil(latencyMs, bucketSpanMs) / bucketSpanMs), MIN_BUCKETS);
buckets = new long[maxSize];
this.bucketFlushListener = bucketFlushListener;
}
void addRecord(long recordTimestampMs) {
long bucketStartMs = Intervals.alignToFloor(recordTimestampMs, bucketSpanMs);
// Initialize earliest/latest times
if (latestBucketStartMs < 0) {
latestBucketStartMs = bucketStartMs;
earliestBucketStartMs = bucketStartMs;
}
advanceTime(bucketStartMs);
addToBucket(bucketStartMs);
}
private void advanceTime(long bucketStartMs) {
while (bucketStartMs > latestBucketStartMs) {
int flushBucketIndex = (latestBucketIndex + 1) % maxSize;
if (flushBucketIndex == earliestBucketIndex) {
flush(flushBucketIndex);
movingBucketCount -= buckets[flushBucketIndex];
earliestBucketStartMs += bucketSpanMs;
earliestBucketIndex = (earliestBucketIndex + 1) % maxSize;
}
buckets[flushBucketIndex] = 0L;
latestBucketStartMs += bucketSpanMs;
latestBucketIndex = flushBucketIndex;
}
}
private void addToBucket(long bucketStartMs) {
int offsetToLatest = (int) ((bucketStartMs - latestBucketStartMs) / bucketSpanMs);
int bucketIndex = (latestBucketIndex + offsetToLatest) % maxSize;
if (bucketIndex < 0) {
bucketIndex = maxSize + bucketIndex;
}
++buckets[bucketIndex];
++movingBucketCount;
if (bucketStartMs < earliestBucketStartMs) {
earliestBucketStartMs = bucketStartMs;
earliestBucketIndex = bucketIndex;
}
}
private void flush(int bucketIndex) {
long bucketStartMs = getTimestampMs(bucketIndex);
if (bucketStartMs > latestFlushedBucketStartMs) {
bucketFlushListener.onBucketFlush(bucketStartMs, buckets[bucketIndex]);
latestFlushedBucketStartMs = bucketStartMs;
}
}
private long getTimestampMs(int bucketIndex) {
int offsetToLatest = latestBucketIndex - bucketIndex;
if (offsetToLatest < 0) {
offsetToLatest = maxSize + offsetToLatest;
}
return latestBucketStartMs - offsetToLatest * bucketSpanMs;
}
void flush() {
if (latestBucketStartMs < 0) {
return;
}
int bucketIndex = earliestBucketIndex;
while (bucketIndex != latestBucketIndex) {
flush(bucketIndex);
bucketIndex = (bucketIndex + 1) % maxSize;
}
}
double averageBucketCount() {
return (double) movingBucketCount / size();
}
private int size() {
if (latestBucketStartMs < 0) {
return 0;
}
return (int) ((latestBucketStartMs - earliestBucketStartMs) / bucketSpanMs) + 1;
}
interface BucketFlushListener {
void onBucketFlush(long bucketStartMs, long bucketCounts);
}
}

View File

@ -0,0 +1,113 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.job.process.diagnostics;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import java.util.Date;
public class DataStreamDiagnostics {
/**
* Threshold to report potential sparsity problems.
*
* Sparsity score is calculated: log(average) - log(current)
*
* If score is above the threshold, bucket is reported as sparse bucket.
*/
private static final int DATA_SPARSITY_THRESHOLD = 2;
private static final Logger LOGGER = Loggers.getLogger(DataStreamDiagnostics.class);
private final BucketDiagnostics bucketDiagnostics;
private long bucketCount = 0;
private long emptyBucketCount = 0;
private long latestEmptyBucketTime = -1;
private long sparseBucketCount = 0;
private long latestSparseBucketTime = -1;
public DataStreamDiagnostics(Job job) {
bucketDiagnostics = new BucketDiagnostics(job, createBucketFlushListener());
}
private BucketDiagnostics.BucketFlushListener createBucketFlushListener() {
return (flushedBucketStartMs, flushedBucketCount) -> {
++bucketCount;
if (flushedBucketCount == 0) {
++emptyBucketCount;
latestEmptyBucketTime = flushedBucketStartMs;
} else {
// simplistic way to calculate data sparsity, just take the log and
// check the difference
double averageBucketSize = bucketDiagnostics.averageBucketCount();
double logAverageBucketSize = Math.log(averageBucketSize);
double logBucketSize = Math.log(flushedBucketCount);
double sparsityScore = logAverageBucketSize - logBucketSize;
if (sparsityScore > DATA_SPARSITY_THRESHOLD) {
LOGGER.debug("Sparse bucket {}, this bucket: {} average: {}, sparsity score: {}", flushedBucketStartMs,
flushedBucketCount, averageBucketSize, sparsityScore);
++sparseBucketCount;
latestSparseBucketTime = flushedBucketStartMs;
}
}
};
}
/**
* Check record
*
* @param recordTimestampInMs
* The record timestamp in milliseconds since epoch
*/
public void checkRecord(long recordTimestampInMs) {
bucketDiagnostics.addRecord(recordTimestampInMs);
}
/**
* Flush all counters, should be called at the end of the data stream
*/
public void flush() {
// flush all we know
bucketDiagnostics.flush();
}
public long getBucketCount() {
return bucketCount;
}
public long getEmptyBucketCount() {
return emptyBucketCount;
}
public Date getLatestEmptyBucketTime() {
return latestEmptyBucketTime > 0 ? new Date(latestEmptyBucketTime) : null;
}
public long getSparseBucketCount() {
return sparseBucketCount;
}
public Date getLatestSparseBucketTime() {
return latestSparseBucketTime > 0 ? new Date(latestSparseBucketTime) : null;
}
/**
* Resets counts,
*
* Note: This does not reset the inner state for e.g. sparse bucket
* detection.
*
*/
public void resetCounts() {
bucketCount = 0;
emptyBucketCount = 0;
sparseBucketCount = 0;
}
}

View File

@ -3,7 +3,7 @@
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.job.process;
package org.elasticsearch.xpack.ml.job.process.diagnostics;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.test.ESTestCase;
@ -13,7 +13,6 @@ import org.elasticsearch.xpack.core.ml.job.config.Detector;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.junit.Before;
import java.io.IOException;
import java.util.Arrays;
import java.util.Date;
@ -21,9 +20,9 @@ public class DataStreamDiagnosticsTests extends ESTestCase {
private static final long BUCKET_SPAN = 60000;
private Job job;
@Before
public void setUpMocks() throws IOException {
public void setUpMocks() {
AnalysisConfig.Builder acBuilder = new AnalysisConfig.Builder(Arrays.asList(new Detector.Builder("metric", "field").build()));
acBuilder.setBucketSpan(TimeValue.timeValueMillis(BUCKET_SPAN));
acBuilder.setLatency(TimeValue.ZERO);
@ -32,7 +31,7 @@ public class DataStreamDiagnosticsTests extends ESTestCase {
Job.Builder builder = new Job.Builder("job_id");
builder.setAnalysisConfig(acBuilder);
builder.setDataDescription(new DataDescription.Builder());
job = builder.build(new Date());
job = createJob(TimeValue.timeValueMillis(BUCKET_SPAN), null);
}
public void testIncompleteBuckets() {
@ -80,6 +79,7 @@ public class DataStreamDiagnosticsTests extends ESTestCase {
assertEquals(null, d.getLatestSparseBucketTime());
assertEquals(new Date(BUCKET_SPAN * 2), d.getLatestEmptyBucketTime());
}
public void testSimple() {
DataStreamDiagnostics d = new DataStreamDiagnostics(job);
@ -102,6 +102,58 @@ public class DataStreamDiagnosticsTests extends ESTestCase {
assertEquals(null, d.getLatestEmptyBucketTime());
}
public void testSimpleReverse() {
DataStreamDiagnostics d = new DataStreamDiagnostics(job);
d.checkRecord(610000);
d.checkRecord(550000);
d.checkRecord(490000);
d.checkRecord(430000);
d.checkRecord(370000);
d.checkRecord(310000);
d.checkRecord(250000);
d.checkRecord(190000);
d.checkRecord(130000);
d.checkRecord(70000);
d.flush();
assertEquals(9, d.getBucketCount());
assertEquals(0, d.getEmptyBucketCount());
assertEquals(0, d.getSparseBucketCount());
assertEquals(null, d.getLatestSparseBucketTime());
assertEquals(null, d.getLatestEmptyBucketTime());
}
public void testWithLatencyLessThanTenBuckets() {
job = createJob(TimeValue.timeValueMillis(BUCKET_SPAN), TimeValue.timeValueMillis(3 * BUCKET_SPAN));
DataStreamDiagnostics d = new DataStreamDiagnostics(job);
long timestamp = 70000;
while (timestamp < 70000 + 20 * BUCKET_SPAN) {
sendManyDataPoints(d, timestamp - BUCKET_SPAN, timestamp + timestamp, 100);
timestamp += BUCKET_SPAN;
}
assertEquals(10, d.getBucketCount());
d.flush();
assertEquals(19, d.getBucketCount());
}
public void testWithLatencyGreaterThanTenBuckets() {
job = createJob(TimeValue.timeValueMillis(BUCKET_SPAN), TimeValue.timeValueMillis(13 * BUCKET_SPAN + 10000));
DataStreamDiagnostics d = new DataStreamDiagnostics(job);
long timestamp = 70000;
while (timestamp < 70000 + 20 * BUCKET_SPAN) {
sendManyDataPoints(d, timestamp - BUCKET_SPAN, timestamp + timestamp, 100);
timestamp += BUCKET_SPAN;
}
assertEquals(6, d.getBucketCount());
d.flush();
assertEquals(19, d.getBucketCount());
}
public void testEmptyBuckets() {
DataStreamDiagnostics d = new DataStreamDiagnostics(job);
@ -280,7 +332,7 @@ public class DataStreamDiagnosticsTests extends ESTestCase {
/**
* Send signals, make a longer period of sparse signals, then go up again
*
*
* The number of sparse buckets should not be to much, it could be normal.
*/
public void testSparseBucketsLongerPeriod() {
@ -307,6 +359,20 @@ public class DataStreamDiagnosticsTests extends ESTestCase {
assertEquals(null, d.getLatestEmptyBucketTime());
}
private static Job createJob(TimeValue bucketSpan, TimeValue latency) {
AnalysisConfig.Builder acBuilder = new AnalysisConfig.Builder(Arrays.asList(new Detector.Builder("metric", "field").build()));
acBuilder.setBucketSpan(bucketSpan);
if (latency != null) {
acBuilder.setLatency(latency);
}
acBuilder.setDetectors(Arrays.asList(new Detector.Builder("metric", "field").build()));
Job.Builder builder = new Job.Builder("job_id");
builder.setAnalysisConfig(acBuilder);
builder.setDataDescription(new DataDescription.Builder());
return builder.build(new Date());
}
public void testFlushAfterZeroRecords() {
DataStreamDiagnostics d = new DataStreamDiagnostics(job);
d.flush();