Original commit: elastic/x-pack-elasticsearch@6801e90d54
This commit is contained in:
David Kyle 2017-08-23 15:46:51 +01:00 committed by GitHub
parent 6ee4fe6a0b
commit 71063825be
2 changed files with 16 additions and 10 deletions

View File

@ -324,7 +324,6 @@ public class DataCountsReporter extends AbstractComponent {
public void startNewIncrementalCount() { public void startNewIncrementalCount() {
incrementalRecordStats = new DataCounts(job.getId()); incrementalRecordStats = new DataCounts(job.getId());
retrieveDiagnosticsIntermediateResults(); retrieveDiagnosticsIntermediateResults();
diagnostics.resetCounts();
} }
public DataCounts incrementalStats() { public DataCounts incrementalStats() {
@ -338,14 +337,14 @@ public class DataCountsReporter extends AbstractComponent {
} }
private void retrieveDiagnosticsIntermediateResults() { private void retrieveDiagnosticsIntermediateResults() {
totalRecordStats.incrementBucketCount(diagnostics.getEmptyBucketCount());
totalRecordStats.incrementBucketCount(diagnostics.getBucketCount()); totalRecordStats.incrementBucketCount(diagnostics.getBucketCount());
totalRecordStats.incrementEmptyBucketCount(diagnostics.getEmptyBucketCount());
totalRecordStats.incrementSparseBucketCount(diagnostics.getSparseBucketCount()); totalRecordStats.incrementSparseBucketCount(diagnostics.getSparseBucketCount());
totalRecordStats.updateLatestEmptyBucketTimeStamp(diagnostics.getLatestEmptyBucketTime()); totalRecordStats.updateLatestEmptyBucketTimeStamp(diagnostics.getLatestEmptyBucketTime());
totalRecordStats.updateLatestSparseBucketTimeStamp(diagnostics.getLatestSparseBucketTime()); totalRecordStats.updateLatestSparseBucketTimeStamp(diagnostics.getLatestSparseBucketTime());
incrementalRecordStats.incrementEmptyBucketCount(diagnostics.getEmptyBucketCount());
incrementalRecordStats.incrementBucketCount(diagnostics.getBucketCount()); incrementalRecordStats.incrementBucketCount(diagnostics.getBucketCount());
incrementalRecordStats.incrementEmptyBucketCount(diagnostics.getEmptyBucketCount());
incrementalRecordStats.incrementSparseBucketCount(diagnostics.getSparseBucketCount()); incrementalRecordStats.incrementSparseBucketCount(diagnostics.getSparseBucketCount());
incrementalRecordStats.updateLatestEmptyBucketTimeStamp(diagnostics.getLatestEmptyBucketTime()); incrementalRecordStats.updateLatestEmptyBucketTimeStamp(diagnostics.getLatestEmptyBucketTime());
incrementalRecordStats.updateLatestSparseBucketTimeStamp(diagnostics.getLatestSparseBucketTime()); incrementalRecordStats.updateLatestSparseBucketTimeStamp(diagnostics.getLatestSparseBucketTime());

View File

@ -37,6 +37,7 @@ public class DataCountsReporterTests extends ESTestCase {
private Job job; private Job job;
private JobDataCountsPersister jobDataCountsPersister; private JobDataCountsPersister jobDataCountsPersister;
private Settings settings; private Settings settings;
private TimeValue bucketSpan = TimeValue.timeValueSeconds(300);
@Before @Before
public void setUpMocks() { public void setUpMocks() {
@ -46,7 +47,7 @@ public class DataCountsReporterTests extends ESTestCase {
.build(); .build();
AnalysisConfig.Builder acBuilder = new AnalysisConfig.Builder(Arrays.asList(new Detector.Builder("metric", "field").build())); AnalysisConfig.Builder acBuilder = new AnalysisConfig.Builder(Arrays.asList(new Detector.Builder("metric", "field").build()));
acBuilder.setBucketSpan(TimeValue.timeValueSeconds(300)); acBuilder.setBucketSpan(bucketSpan);
acBuilder.setLatency(TimeValue.ZERO); acBuilder.setLatency(TimeValue.ZERO);
acBuilder.setDetectors(Arrays.asList(new Detector.Builder("metric", "field").build())); acBuilder.setDetectors(Arrays.asList(new Detector.Builder("metric", "field").build()));
@ -119,26 +120,32 @@ public class DataCountsReporterTests extends ESTestCase {
assertAllCountFieldsEqualZero(stats); assertAllCountFieldsEqualZero(stats);
// write some more data // write some more data
dataCountsReporter.reportRecordWritten(5, 302000); // skip a bucket so there is a non-zero empty bucket count
dataCountsReporter.reportRecordWritten(5, 302000); long timeStamp = bucketSpan.millis() * 2 + 2000;
dataCountsReporter.reportRecordWritten(5, timeStamp);
dataCountsReporter.reportRecordWritten(5, timeStamp);
assertEquals(2, dataCountsReporter.incrementalStats().getInputRecordCount()); assertEquals(2, dataCountsReporter.incrementalStats().getInputRecordCount());
assertEquals(10, dataCountsReporter.incrementalStats().getInputFieldCount()); assertEquals(10, dataCountsReporter.incrementalStats().getInputFieldCount());
assertEquals(2, dataCountsReporter.incrementalStats().getProcessedRecordCount()); assertEquals(2, dataCountsReporter.incrementalStats().getProcessedRecordCount());
assertEquals(6, dataCountsReporter.incrementalStats().getProcessedFieldCount()); assertEquals(6, dataCountsReporter.incrementalStats().getProcessedFieldCount());
assertEquals(302000, dataCountsReporter.incrementalStats().getLatestRecordTimeStamp().getTime()); assertEquals(602000, dataCountsReporter.incrementalStats().getLatestRecordTimeStamp().getTime());
// check total stats // check total stats
assertEquals(4, dataCountsReporter.runningTotalStats().getInputRecordCount()); assertEquals(4, dataCountsReporter.runningTotalStats().getInputRecordCount());
assertEquals(20, dataCountsReporter.runningTotalStats().getInputFieldCount()); assertEquals(20, dataCountsReporter.runningTotalStats().getInputFieldCount());
assertEquals(4, dataCountsReporter.runningTotalStats().getProcessedRecordCount()); assertEquals(4, dataCountsReporter.runningTotalStats().getProcessedRecordCount());
assertEquals(12, dataCountsReporter.runningTotalStats().getProcessedFieldCount()); assertEquals(12, dataCountsReporter.runningTotalStats().getProcessedFieldCount());
assertEquals(302000, dataCountsReporter.runningTotalStats().getLatestRecordTimeStamp().getTime()); assertEquals(602000, dataCountsReporter.runningTotalStats().getLatestRecordTimeStamp().getTime());
// send 'flush' signal // send 'flush' signal
dataCountsReporter.finishReporting(ActionListener.wrap(r -> {}, e -> {})); dataCountsReporter.finishReporting(ActionListener.wrap(r -> {}, e -> {}));
assertEquals(2, dataCountsReporter.runningTotalStats().getBucketCount()); assertEquals(3, dataCountsReporter.runningTotalStats().getBucketCount());
assertEquals(0, dataCountsReporter.runningTotalStats().getEmptyBucketCount()); assertEquals(1, dataCountsReporter.runningTotalStats().getEmptyBucketCount());
assertEquals(0, dataCountsReporter.runningTotalStats().getSparseBucketCount()); assertEquals(0, dataCountsReporter.runningTotalStats().getSparseBucketCount());
assertEquals(3, dataCountsReporter.incrementalStats().getBucketCount());
assertEquals(1, dataCountsReporter.incrementalStats().getEmptyBucketCount());
assertEquals(0, dataCountsReporter.incrementalStats().getSparseBucketCount());
} }
public void testReportLatestTimeIncrementalStats() throws IOException { public void testReportLatestTimeIncrementalStats() throws IOException {