mirror of
https://github.com/honeymoose/OpenSearch.git
synced 2025-03-08 20:10:00 +00:00
[ML] Don’t count incomplete buckets in data stream diagnostics (elastic/x-pack-elasticsearch#2351)
* Don’t count incomplete buckets in data stream diagnostics * Fix tests now bucket_count doesn’t include partial buckets Original commit: elastic/x-pack-elasticsearch@bc1a7bd9e7
This commit is contained in:
parent
b30c634326
commit
175e8db3aa
@ -82,7 +82,7 @@ public class DataStreamDiagnostics {
|
||||
public void flush() {
|
||||
// flush all we know
|
||||
if (movingBucketHistogram.isEmpty() == false) {
|
||||
flush(movingBucketHistogram.lastKey() + 1);
|
||||
flush(movingBucketHistogram.lastKey());
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -456,7 +456,7 @@ public class DatafeedJobsRestIT extends ESRestTestCase {
|
||||
assertThat(jobStatsResponseAsString, containsString("\"input_record_count\":40"));
|
||||
assertThat(jobStatsResponseAsString, containsString("\"processed_record_count\":40"));
|
||||
assertThat(jobStatsResponseAsString, containsString("\"out_of_order_timestamp_count\":0"));
|
||||
assertThat(jobStatsResponseAsString, containsString("\"bucket_count\":4"));
|
||||
assertThat(jobStatsResponseAsString, containsString("\"bucket_count\":3"));
|
||||
// The derivative agg won't have values for the first bucket of each host
|
||||
assertThat(jobStatsResponseAsString, containsString("\"missing_field_count\":2"));
|
||||
}
|
||||
|
@ -139,11 +139,11 @@ public class DataCountsReporterTests extends ESTestCase {
|
||||
|
||||
// send 'flush' signal
|
||||
dataCountsReporter.finishReporting(ActionListener.wrap(r -> {}, e -> {}));
|
||||
assertEquals(3, dataCountsReporter.runningTotalStats().getBucketCount());
|
||||
assertEquals(2, dataCountsReporter.runningTotalStats().getBucketCount());
|
||||
assertEquals(1, dataCountsReporter.runningTotalStats().getEmptyBucketCount());
|
||||
assertEquals(0, dataCountsReporter.runningTotalStats().getSparseBucketCount());
|
||||
|
||||
assertEquals(3, dataCountsReporter.incrementalStats().getBucketCount());
|
||||
assertEquals(2, dataCountsReporter.incrementalStats().getBucketCount());
|
||||
assertEquals(1, dataCountsReporter.incrementalStats().getEmptyBucketCount());
|
||||
assertEquals(0, dataCountsReporter.incrementalStats().getSparseBucketCount());
|
||||
}
|
||||
@ -273,7 +273,7 @@ public class DataCountsReporterTests extends ESTestCase {
|
||||
|
||||
dataCountsReporter.setAnalysedFieldsPerRecord(3);
|
||||
Date now = new Date();
|
||||
DataCounts dc = new DataCounts(job.getId(), 2L, 5L, 0L, 10L, 0L, 1L, 0L, 0L, 0L, 1L, new Date(2000), new Date(3000),
|
||||
DataCounts dc = new DataCounts(job.getId(), 2L, 5L, 0L, 10L, 0L, 1L, 0L, 0L, 0L, 0L, new Date(2000), new Date(3000),
|
||||
now, (Date) null, (Date) null);
|
||||
dataCountsReporter.reportRecordWritten(5, 2000);
|
||||
dataCountsReporter.reportRecordWritten(5, 3000);
|
||||
|
@ -19,12 +19,13 @@ import java.util.Date;
|
||||
|
||||
public class DataStreamDiagnosticsTests extends ESTestCase {
|
||||
|
||||
private static final long BUCKET_SPAN = 60000;
|
||||
private Job job;
|
||||
|
||||
@Before
|
||||
public void setUpMocks() throws IOException {
|
||||
AnalysisConfig.Builder acBuilder = new AnalysisConfig.Builder(Arrays.asList(new Detector.Builder("metric", "field").build()));
|
||||
acBuilder.setBucketSpan(TimeValue.timeValueSeconds(60));
|
||||
acBuilder.setBucketSpan(TimeValue.timeValueMillis(BUCKET_SPAN));
|
||||
acBuilder.setLatency(TimeValue.ZERO);
|
||||
acBuilder.setDetectors(Arrays.asList(new Detector.Builder("metric", "field").build()));
|
||||
|
||||
@ -34,6 +35,51 @@ public class DataStreamDiagnosticsTests extends ESTestCase {
|
||||
job = builder.build(new Date());
|
||||
}
|
||||
|
||||
public void testIncompleteBuckets() {
|
||||
DataStreamDiagnostics d = new DataStreamDiagnostics(job);
|
||||
|
||||
d.checkRecord(1000);
|
||||
d.checkRecord(2000);
|
||||
d.checkRecord(3000);
|
||||
d.flush();
|
||||
|
||||
assertEquals(0, d.getBucketCount());
|
||||
assertEquals(0, d.getEmptyBucketCount());
|
||||
assertEquals(0, d.getSparseBucketCount());
|
||||
assertEquals(null, d.getLatestSparseBucketTime());
|
||||
assertEquals(null, d.getLatestEmptyBucketTime());
|
||||
|
||||
d.checkRecord(4000);
|
||||
d.checkRecord(5000);
|
||||
d.checkRecord(6000);
|
||||
d.flush();
|
||||
|
||||
assertEquals(0, d.getBucketCount());
|
||||
assertEquals(0, d.getEmptyBucketCount());
|
||||
assertEquals(0, d.getSparseBucketCount());
|
||||
assertEquals(null, d.getLatestSparseBucketTime());
|
||||
assertEquals(null, d.getLatestEmptyBucketTime());
|
||||
|
||||
d.checkRecord(BUCKET_SPAN + 1000);
|
||||
d.checkRecord(BUCKET_SPAN + 2000);
|
||||
d.flush();
|
||||
|
||||
assertEquals(1, d.getBucketCount());
|
||||
assertEquals(0, d.getEmptyBucketCount());
|
||||
assertEquals(0, d.getSparseBucketCount());
|
||||
assertEquals(null, d.getLatestSparseBucketTime());
|
||||
assertEquals(null, d.getLatestEmptyBucketTime());
|
||||
|
||||
d.checkRecord(BUCKET_SPAN * 3 + 1000);
|
||||
d.checkRecord(BUCKET_SPAN * 3 + 1001);
|
||||
d.flush();
|
||||
|
||||
assertEquals(3, d.getBucketCount());
|
||||
assertEquals(1, d.getEmptyBucketCount());
|
||||
assertEquals(0, d.getSparseBucketCount());
|
||||
assertEquals(null, d.getLatestSparseBucketTime());
|
||||
assertEquals(new Date(BUCKET_SPAN * 2), d.getLatestEmptyBucketTime());
|
||||
}
|
||||
public void testSimple() {
|
||||
DataStreamDiagnostics d = new DataStreamDiagnostics(job);
|
||||
|
||||
@ -49,7 +95,7 @@ public class DataStreamDiagnosticsTests extends ESTestCase {
|
||||
d.checkRecord(610000);
|
||||
|
||||
d.flush();
|
||||
assertEquals(10, d.getBucketCount());
|
||||
assertEquals(9, d.getBucketCount());
|
||||
assertEquals(0, d.getEmptyBucketCount());
|
||||
assertEquals(0, d.getSparseBucketCount());
|
||||
assertEquals(null, d.getLatestSparseBucketTime());
|
||||
@ -71,7 +117,7 @@ public class DataStreamDiagnosticsTests extends ESTestCase {
|
||||
d.checkRecord(550000);
|
||||
|
||||
d.flush();
|
||||
assertEquals(10, d.getBucketCount());
|
||||
assertEquals(9, d.getBucketCount());
|
||||
assertEquals(2, d.getEmptyBucketCount());
|
||||
assertEquals(0, d.getSparseBucketCount());
|
||||
assertEquals(null, d.getLatestSparseBucketTime());
|
||||
@ -93,7 +139,7 @@ public class DataStreamDiagnosticsTests extends ESTestCase {
|
||||
d.checkRecord(1650000);
|
||||
|
||||
d.flush();
|
||||
assertEquals(10, d.getBucketCount());
|
||||
assertEquals(9, d.getBucketCount());
|
||||
assertEquals(2, d.getEmptyBucketCount());
|
||||
assertEquals(0, d.getSparseBucketCount());
|
||||
assertEquals(null, d.getLatestSparseBucketTime());
|
||||
@ -117,7 +163,7 @@ public class DataStreamDiagnosticsTests extends ESTestCase {
|
||||
sendManyDataPoints(d, 550000, 609000, 1400);
|
||||
|
||||
d.flush();
|
||||
assertEquals(10, d.getBucketCount());
|
||||
assertEquals(9, d.getBucketCount());
|
||||
assertEquals(0, d.getEmptyBucketCount());
|
||||
assertEquals(2, d.getSparseBucketCount());
|
||||
assertEquals(new Date(420000), d.getLatestSparseBucketTime());
|
||||
@ -145,7 +191,7 @@ public class DataStreamDiagnosticsTests extends ESTestCase {
|
||||
sendManyDataPoints(d, 550000, 609000, 10);
|
||||
|
||||
d.flush();
|
||||
assertEquals(10, d.getBucketCount());
|
||||
assertEquals(9, d.getBucketCount());
|
||||
assertEquals(0, d.getEmptyBucketCount());
|
||||
assertEquals(1, d.getSparseBucketCount());
|
||||
assertEquals(new Date(120000), d.getLatestSparseBucketTime());
|
||||
@ -174,7 +220,7 @@ public class DataStreamDiagnosticsTests extends ESTestCase {
|
||||
sendManyDataPoints(d, 550000, 609000, 10);
|
||||
|
||||
d.flush();
|
||||
assertEquals(10, d.getBucketCount());
|
||||
assertEquals(9, d.getBucketCount());
|
||||
assertEquals(0, d.getEmptyBucketCount());
|
||||
assertEquals(2, d.getSparseBucketCount());
|
||||
assertEquals(new Date(480000), d.getLatestSparseBucketTime());
|
||||
@ -198,7 +244,7 @@ public class DataStreamDiagnosticsTests extends ESTestCase {
|
||||
sendManyDataPoints(d, 550000, 609000, 1400);
|
||||
|
||||
d.flush();
|
||||
assertEquals(10, d.getBucketCount());
|
||||
assertEquals(9, d.getBucketCount());
|
||||
assertEquals(2, d.getSparseBucketCount());
|
||||
assertEquals(new Date(420000), d.getLatestSparseBucketTime());
|
||||
assertEquals(2, d.getEmptyBucketCount());
|
||||
@ -225,7 +271,7 @@ public class DataStreamDiagnosticsTests extends ESTestCase {
|
||||
// 98 empty buckets
|
||||
d.checkRecord(6490000);
|
||||
d.flush();
|
||||
assertEquals(109, d.getBucketCount());
|
||||
assertEquals(108, d.getBucketCount());
|
||||
assertEquals(100, d.getEmptyBucketCount());
|
||||
assertEquals(0, d.getSparseBucketCount());
|
||||
assertEquals(null, d.getLatestSparseBucketTime());
|
||||
@ -254,7 +300,7 @@ public class DataStreamDiagnosticsTests extends ESTestCase {
|
||||
sendManyDataPoints(d, 550000, 609000, 1400);
|
||||
|
||||
d.flush();
|
||||
assertEquals(10, d.getBucketCount());
|
||||
assertEquals(9, d.getBucketCount());
|
||||
assertEquals(0, d.getEmptyBucketCount());
|
||||
assertEquals(2, d.getSparseBucketCount());
|
||||
assertEquals(new Date(420000), d.getLatestSparseBucketTime());
|
||||
|
Loading…
x
Reference in New Issue
Block a user