diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/DataCountsReporter.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/DataCountsReporter.java index 9d02453645e..7b5d6b51ae1 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/DataCountsReporter.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/DataCountsReporter.java @@ -324,7 +324,6 @@ public class DataCountsReporter extends AbstractComponent { public void startNewIncrementalCount() { incrementalRecordStats = new DataCounts(job.getId()); retrieveDiagnosticsIntermediateResults(); - diagnostics.resetCounts(); } public DataCounts incrementalStats() { @@ -338,14 +337,14 @@ public class DataCountsReporter extends AbstractComponent { } private void retrieveDiagnosticsIntermediateResults() { - totalRecordStats.incrementBucketCount(diagnostics.getEmptyBucketCount()); totalRecordStats.incrementBucketCount(diagnostics.getBucketCount()); + totalRecordStats.incrementEmptyBucketCount(diagnostics.getEmptyBucketCount()); totalRecordStats.incrementSparseBucketCount(diagnostics.getSparseBucketCount()); totalRecordStats.updateLatestEmptyBucketTimeStamp(diagnostics.getLatestEmptyBucketTime()); totalRecordStats.updateLatestSparseBucketTimeStamp(diagnostics.getLatestSparseBucketTime()); - incrementalRecordStats.incrementEmptyBucketCount(diagnostics.getEmptyBucketCount()); incrementalRecordStats.incrementBucketCount(diagnostics.getBucketCount()); + incrementalRecordStats.incrementEmptyBucketCount(diagnostics.getEmptyBucketCount()); incrementalRecordStats.incrementSparseBucketCount(diagnostics.getSparseBucketCount()); incrementalRecordStats.updateLatestEmptyBucketTimeStamp(diagnostics.getLatestEmptyBucketTime()); incrementalRecordStats.updateLatestSparseBucketTimeStamp(diagnostics.getLatestSparseBucketTime()); diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/DataCountsReporterTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/DataCountsReporterTests.java index c98e6e0a02b..9b3a68c966a 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/DataCountsReporterTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/DataCountsReporterTests.java @@ -37,6 +37,7 @@ public class DataCountsReporterTests extends ESTestCase { private Job job; private JobDataCountsPersister jobDataCountsPersister; private Settings settings; + private TimeValue bucketSpan = TimeValue.timeValueSeconds(300); @Before public void setUpMocks() { @@ -46,7 +47,7 @@ public class DataCountsReporterTests extends ESTestCase { .build(); AnalysisConfig.Builder acBuilder = new AnalysisConfig.Builder(Arrays.asList(new Detector.Builder("metric", "field").build())); - acBuilder.setBucketSpan(TimeValue.timeValueSeconds(300)); + acBuilder.setBucketSpan(bucketSpan); acBuilder.setLatency(TimeValue.ZERO); acBuilder.setDetectors(Arrays.asList(new Detector.Builder("metric", "field").build())); @@ -119,26 +120,32 @@ public class DataCountsReporterTests extends ESTestCase { assertAllCountFieldsEqualZero(stats); // write some more data - dataCountsReporter.reportRecordWritten(5, 302000); - dataCountsReporter.reportRecordWritten(5, 302000); + // skip a bucket so there is a non-zero empty bucket count + long timeStamp = bucketSpan.millis() * 2 + 2000; + dataCountsReporter.reportRecordWritten(5, timeStamp); + dataCountsReporter.reportRecordWritten(5, timeStamp); assertEquals(2, dataCountsReporter.incrementalStats().getInputRecordCount()); assertEquals(10, dataCountsReporter.incrementalStats().getInputFieldCount()); assertEquals(2, dataCountsReporter.incrementalStats().getProcessedRecordCount()); assertEquals(6, dataCountsReporter.incrementalStats().getProcessedFieldCount()); - assertEquals(302000, dataCountsReporter.incrementalStats().getLatestRecordTimeStamp().getTime()); + assertEquals(602000, dataCountsReporter.incrementalStats().getLatestRecordTimeStamp().getTime()); // check total stats assertEquals(4, dataCountsReporter.runningTotalStats().getInputRecordCount()); assertEquals(20, dataCountsReporter.runningTotalStats().getInputFieldCount()); assertEquals(4, dataCountsReporter.runningTotalStats().getProcessedRecordCount()); assertEquals(12, dataCountsReporter.runningTotalStats().getProcessedFieldCount()); - assertEquals(302000, dataCountsReporter.runningTotalStats().getLatestRecordTimeStamp().getTime()); + assertEquals(602000, dataCountsReporter.runningTotalStats().getLatestRecordTimeStamp().getTime()); // send 'flush' signal dataCountsReporter.finishReporting(ActionListener.wrap(r -> {}, e -> {})); - assertEquals(2, dataCountsReporter.runningTotalStats().getBucketCount()); - assertEquals(0, dataCountsReporter.runningTotalStats().getEmptyBucketCount()); + assertEquals(3, dataCountsReporter.runningTotalStats().getBucketCount()); + assertEquals(1, dataCountsReporter.runningTotalStats().getEmptyBucketCount()); assertEquals(0, dataCountsReporter.runningTotalStats().getSparseBucketCount()); + + assertEquals(3, dataCountsReporter.incrementalStats().getBucketCount()); + assertEquals(1, dataCountsReporter.incrementalStats().getEmptyBucketCount()); + assertEquals(0, dataCountsReporter.incrementalStats().getSparseBucketCount()); } public void testReportLatestTimeIncrementalStats() throws IOException {