Treat PostDataActionResponse.DataCounts.bucketCount as incremental rather than absolute (total). (#44803) (#44856)
This commit is contained in:
parent
03dd22b56c
commit
8bb8543fdf
|
@ -114,8 +114,8 @@ public class DatafeedTimingStats implements ToXContentObject, Writeable {
|
||||||
this.totalSearchTimeMs += searchTimeMs;
|
this.totalSearchTimeMs += searchTimeMs;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void setBucketCount(long bucketCount) {
|
public void incrementBucketCount(long bucketCount) {
|
||||||
this.bucketCount = bucketCount;
|
this.bucketCount += bucketCount;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -125,9 +125,9 @@ public class DatafeedTimingStatsTests extends AbstractSerializingTestCase<Datafe
|
||||||
assertThat(stats.getAvgSearchTimePerBucketMs(), equalTo(30.0));
|
assertThat(stats.getAvgSearchTimePerBucketMs(), equalTo(30.0));
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testSetBucketCount() {
|
public void testIncrementBucketCount() {
|
||||||
DatafeedTimingStats stats = new DatafeedTimingStats(JOB_ID, 5, 10, 100.0);
|
DatafeedTimingStats stats = new DatafeedTimingStats(JOB_ID, 5, 10, 100.0);
|
||||||
stats.setBucketCount(20);
|
stats.incrementBucketCount(10);
|
||||||
assertThat(stats.getJobId(), equalTo(JOB_ID));
|
assertThat(stats.getJobId(), equalTo(JOB_ID));
|
||||||
assertThat(stats.getSearchCount(), equalTo(5L));
|
assertThat(stats.getSearchCount(), equalTo(5L));
|
||||||
assertThat(stats.getBucketCount(), equalTo(20L));
|
assertThat(stats.getBucketCount(), equalTo(20L));
|
||||||
|
@ -141,7 +141,7 @@ public class DatafeedTimingStatsTests extends AbstractSerializingTestCase<Datafe
|
||||||
assertThat(stats.getTotalSearchTimeMs(), equalTo(100.0));
|
assertThat(stats.getTotalSearchTimeMs(), equalTo(100.0));
|
||||||
assertThat(stats.getAvgSearchTimePerBucketMs(), equalTo(10.0));
|
assertThat(stats.getAvgSearchTimePerBucketMs(), equalTo(10.0));
|
||||||
|
|
||||||
stats.setBucketCount(20);
|
stats.incrementBucketCount(10);
|
||||||
assertThat(stats.getBucketCount(), equalTo(20L));
|
assertThat(stats.getBucketCount(), equalTo(20L));
|
||||||
assertThat(stats.getTotalSearchTimeMs(), equalTo(100.0));
|
assertThat(stats.getTotalSearchTimeMs(), equalTo(100.0));
|
||||||
assertThat(stats.getAvgSearchTimePerBucketMs(), equalTo(5.0));
|
assertThat(stats.getAvgSearchTimePerBucketMs(), equalTo(5.0));
|
||||||
|
@ -151,7 +151,7 @@ public class DatafeedTimingStatsTests extends AbstractSerializingTestCase<Datafe
|
||||||
assertThat(stats.getTotalSearchTimeMs(), equalTo(300.0));
|
assertThat(stats.getTotalSearchTimeMs(), equalTo(300.0));
|
||||||
assertThat(stats.getAvgSearchTimePerBucketMs(), equalTo(15.0));
|
assertThat(stats.getAvgSearchTimePerBucketMs(), equalTo(15.0));
|
||||||
|
|
||||||
stats.setBucketCount(25);
|
stats.incrementBucketCount(5);
|
||||||
assertThat(stats.getBucketCount(), equalTo(25L));
|
assertThat(stats.getBucketCount(), equalTo(25L));
|
||||||
assertThat(stats.getTotalSearchTimeMs(), equalTo(300.0));
|
assertThat(stats.getTotalSearchTimeMs(), equalTo(300.0));
|
||||||
assertThat(stats.getAvgSearchTimePerBucketMs(), equalTo(12.0));
|
assertThat(stats.getAvgSearchTimePerBucketMs(), equalTo(12.0));
|
||||||
|
|
|
@ -57,7 +57,7 @@ public class DatafeedTimingStatsReporter {
|
||||||
if (dataCounts == null) {
|
if (dataCounts == null) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
currentTimingStats.setBucketCount(dataCounts.getBucketCount());
|
currentTimingStats.incrementBucketCount(dataCounts.getBucketCount());
|
||||||
flushIfDifferSignificantly();
|
flushIfDifferSignificantly();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -223,7 +223,7 @@ public class AutodetectProcessManager implements ClusterStateListener {
|
||||||
* @param input Data input stream
|
* @param input Data input stream
|
||||||
* @param xContentType the {@link XContentType} of the input
|
* @param xContentType the {@link XContentType} of the input
|
||||||
* @param params Data processing parameters
|
* @param params Data processing parameters
|
||||||
* @param handler Delegate error or datacount results (Count of records, fields, bytes, etc written)
|
* @param handler Delegate error or datacount results (Count of records, fields, bytes, etc written as a result of this call)
|
||||||
*/
|
*/
|
||||||
public void processData(JobTask jobTask, AnalysisRegistry analysisRegistry, InputStream input,
|
public void processData(JobTask jobTask, AnalysisRegistry analysisRegistry, InputStream input,
|
||||||
XContentType xContentType, DataLoadParams params, BiConsumer<DataCounts, Exception> handler) {
|
XContentType xContentType, DataLoadParams params, BiConsumer<DataCounts, Exception> handler) {
|
||||||
|
|
|
@ -94,22 +94,17 @@ public class DatafeedTimingStatsReporterTests extends ESTestCase {
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testReportDataCounts() {
|
public void testReportDataCounts() {
|
||||||
DataCounts dataCounts = new DataCounts(JOB_ID);
|
|
||||||
dataCounts.incrementBucketCount(20);
|
|
||||||
DatafeedTimingStatsReporter timingStatsReporter =
|
DatafeedTimingStatsReporter timingStatsReporter =
|
||||||
new DatafeedTimingStatsReporter(new DatafeedTimingStats(JOB_ID, 3, dataCounts.getBucketCount(), 10000.0), jobResultsPersister);
|
new DatafeedTimingStatsReporter(new DatafeedTimingStats(JOB_ID, 3, 20, 10000.0), jobResultsPersister);
|
||||||
assertThat(timingStatsReporter.getCurrentTimingStats(), equalTo(new DatafeedTimingStats(JOB_ID, 3, 20, 10000.0)));
|
assertThat(timingStatsReporter.getCurrentTimingStats(), equalTo(new DatafeedTimingStats(JOB_ID, 3, 20, 10000.0)));
|
||||||
|
|
||||||
dataCounts.incrementBucketCount(1);
|
timingStatsReporter.reportDataCounts(createDataCountsWithBucketCount(1));
|
||||||
timingStatsReporter.reportDataCounts(dataCounts);
|
|
||||||
assertThat(timingStatsReporter.getCurrentTimingStats(), equalTo(new DatafeedTimingStats(JOB_ID, 3, 21, 10000.0)));
|
assertThat(timingStatsReporter.getCurrentTimingStats(), equalTo(new DatafeedTimingStats(JOB_ID, 3, 21, 10000.0)));
|
||||||
|
|
||||||
dataCounts.incrementBucketCount(1);
|
timingStatsReporter.reportDataCounts(createDataCountsWithBucketCount(1));
|
||||||
timingStatsReporter.reportDataCounts(dataCounts);
|
|
||||||
assertThat(timingStatsReporter.getCurrentTimingStats(), equalTo(new DatafeedTimingStats(JOB_ID, 3, 22, 10000.0)));
|
assertThat(timingStatsReporter.getCurrentTimingStats(), equalTo(new DatafeedTimingStats(JOB_ID, 3, 22, 10000.0)));
|
||||||
|
|
||||||
dataCounts.incrementBucketCount(1);
|
timingStatsReporter.reportDataCounts(createDataCountsWithBucketCount(1));
|
||||||
timingStatsReporter.reportDataCounts(dataCounts);
|
|
||||||
assertThat(timingStatsReporter.getCurrentTimingStats(), equalTo(new DatafeedTimingStats(JOB_ID, 3, 23, 10000.0)));
|
assertThat(timingStatsReporter.getCurrentTimingStats(), equalTo(new DatafeedTimingStats(JOB_ID, 3, 23, 10000.0)));
|
||||||
|
|
||||||
InOrder inOrder = inOrder(jobResultsPersister);
|
InOrder inOrder = inOrder(jobResultsPersister);
|
||||||
|
@ -118,6 +113,12 @@ public class DatafeedTimingStatsReporterTests extends ESTestCase {
|
||||||
verifyNoMoreInteractions(jobResultsPersister);
|
verifyNoMoreInteractions(jobResultsPersister);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static DataCounts createDataCountsWithBucketCount(long bucketCount) {
|
||||||
|
DataCounts dataCounts = new DataCounts(JOB_ID);
|
||||||
|
dataCounts.incrementBucketCount(bucketCount);
|
||||||
|
return dataCounts;
|
||||||
|
}
|
||||||
|
|
||||||
public void testTimingStatsDifferSignificantly() {
|
public void testTimingStatsDifferSignificantly() {
|
||||||
assertThat(
|
assertThat(
|
||||||
DatafeedTimingStatsReporter.differSignificantly(
|
DatafeedTimingStatsReporter.differSignificantly(
|
||||||
|
|
Loading…
Reference in New Issue