diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/DataCountsReporter.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/DataCountsReporter.java
index c9609d37ff5..e6d8a61eff4 100644
--- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/DataCountsReporter.java
+++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/DataCountsReporter.java
@@ -5,7 +5,6 @@
*/
package org.elasticsearch.xpack.ml.job.process;
-import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Setting;
@@ -27,9 +26,10 @@ import java.util.function.Function;
*
* Stats are logged at specific stages
*
- * - Every 100 records for the first 1000 records
- * - Every 1000 records for the first 20000 records
- * - Every 10000 records after 20000 records
+ * - Every 10,000 records for the first 100,000 records
+ * - Every 100,000 records until 1,000,000 records
+ * - Every 1,000,000 records until 10,000,000 records
+ * - and so on...
*
* The {@link #reportingBoundaryFunction} member points to a different
* function depending on which reporting stage is the current, the function
@@ -86,7 +86,7 @@ public class DataCountsReporter extends AbstractComponent {
acceptablePercentDateParseErrors = ACCEPTABLE_PERCENTAGE_DATE_PARSE_ERRORS_SETTING.get(settings);
acceptablePercentOutOfOrderErrors = ACCEPTABLE_PERCENTAGE_OUT_OF_ORDER_ERRORS_SETTING.get(settings);
- reportingBoundaryFunction = this::reportEvery100Records;
+ reportingBoundaryFunction = this::reportEvery10000Records;
}
/**
@@ -275,9 +275,9 @@ public class DataCountsReporter extends AbstractComponent {
* processes more data. Logging every 10000 records when the data rate is
* 40000 per second quickly rolls the logs.
*/
- protected void logStatus(long totalRecords) {
+ protected boolean logStatus(long totalRecords) {
if (++logCount % logEvery != 0) {
- return;
+ return false;
}
String status = String.format(Locale.ROOT,
@@ -287,48 +287,32 @@ public class DataCountsReporter extends AbstractComponent {
logger.info(status);
int log10TotalRecords = (int) Math.floor(Math.log10(totalRecords));
- // Start reducing the logging rate after 10 million records have been seen
- if (log10TotalRecords > 6) {
- logEvery = (int) Math.pow(10.0, log10TotalRecords - 6);
+ // Start reducing the logging rate after a million records have been seen
+ if (log10TotalRecords > 5) {
+ logEvery = (int) Math.pow(10.0, log10TotalRecords - 5);
logCount = 0;
}
- }
-
- private boolean reportEvery100Records(long totalRecords) {
- if (totalRecords > 1000) {
- lastRecordCountQuotient = totalRecords / 1000;
- reportingBoundaryFunction = this::reportEvery1000Records;
- return false;
- }
-
- long quotient = totalRecords / 100;
- if (quotient > lastRecordCountQuotient) {
- lastRecordCountQuotient = quotient;
- return true;
- }
-
- return false;
- }
-
- private boolean reportEvery1000Records(long totalRecords) {
-
- if (totalRecords > 20000) {
- lastRecordCountQuotient = totalRecords / 10000;
- reportingBoundaryFunction = this::reportEvery10000Records;
- return false;
- }
-
- long quotient = totalRecords / 1000;
- if (quotient > lastRecordCountQuotient) {
- lastRecordCountQuotient = quotient;
- return true;
- }
-
- return false;
+ return true;
}
private boolean reportEvery10000Records(long totalRecords) {
- long quotient = totalRecords / 10000;
+ if (totalRecords > 100_000) {
+ lastRecordCountQuotient = totalRecords / 100_000;
+ reportingBoundaryFunction = this::reportEvery100000Records;
+ return false;
+ }
+
+ long quotient = totalRecords / 10_000;
+ if (quotient > lastRecordCountQuotient) {
+ lastRecordCountQuotient = quotient;
+ return true;
+ }
+
+ return false;
+ }
+
+ private boolean reportEvery100000Records(long totalRecords) {
+ long quotient = totalRecords / 100_000;
if (quotient > lastRecordCountQuotient) {
lastRecordCountQuotient = quotient;
return true;
diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/DataCountsReporterTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/DataCountsReporterTests.java
index 22b491f2783..01be84465ac 100644
--- a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/DataCountsReporterTests.java
+++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/DataCountsReporterTests.java
@@ -172,73 +172,21 @@ public class DataCountsReporterTests extends ESTestCase {
verify(jobDataCountsPersister, never()).persistDataCounts(anyString(), any(DataCounts.class), any());
}
- public void testReportRecordsWritten_Given100Records() {
+ public void testReportRecordsWritten_Given9999Records() {
DummyDataCountsReporter dataCountsReporter = new DummyDataCountsReporter();
dataCountsReporter.setAnalysedFieldsPerRecord(3);
- for (int i = 1; i <= 101; i++) {
+ for (int i = 1; i <= 9999; i++) {
dataCountsReporter.reportRecordWritten(5, i);
}
- assertEquals(101, dataCountsReporter.incrementalStats().getInputRecordCount());
- assertEquals(505, dataCountsReporter.incrementalStats().getInputFieldCount());
- assertEquals(101, dataCountsReporter.incrementalStats().getProcessedRecordCount());
- assertEquals(303, dataCountsReporter.incrementalStats().getProcessedFieldCount());
- assertEquals(101, dataCountsReporter.incrementalStats().getLatestRecordTimeStamp().getTime());
+ assertEquals(9999, dataCountsReporter.incrementalStats().getInputRecordCount());
+ assertEquals(49995, dataCountsReporter.incrementalStats().getInputFieldCount());
+ assertEquals(9999, dataCountsReporter.incrementalStats().getProcessedRecordCount());
+ assertEquals(29997, dataCountsReporter.incrementalStats().getProcessedFieldCount());
+ assertEquals(9999, dataCountsReporter.incrementalStats().getLatestRecordTimeStamp().getTime());
- assertEquals(1, dataCountsReporter.getLogStatusCallCount());
- }
-
- public void testReportRecordsWritten_Given1000Records() {
- DummyDataCountsReporter dataCountsReporter = new DummyDataCountsReporter();
-
- dataCountsReporter.setAnalysedFieldsPerRecord(3);
-
- for (int i = 1; i <= 1001; i++) {
- dataCountsReporter.reportRecordWritten(5, i);
- }
-
- assertEquals(1001, dataCountsReporter.incrementalStats().getInputRecordCount());
- assertEquals(5005, dataCountsReporter.incrementalStats().getInputFieldCount());
- assertEquals(1001, dataCountsReporter.incrementalStats().getProcessedRecordCount());
- assertEquals(3003, dataCountsReporter.incrementalStats().getProcessedFieldCount());
- assertEquals(1001, dataCountsReporter.incrementalStats().getLatestRecordTimeStamp().getTime());
-
- assertEquals(10, dataCountsReporter.getLogStatusCallCount());
- }
-
- public void testReportRecordsWritten_Given2000Records() {
- DummyDataCountsReporter dataCountsReporter = new DummyDataCountsReporter();
- dataCountsReporter.setAnalysedFieldsPerRecord(3);
-
- for (int i = 1; i <= 2001; i++) {
- dataCountsReporter.reportRecordWritten(5, i);
- }
-
- assertEquals(2001, dataCountsReporter.incrementalStats().getInputRecordCount());
- assertEquals(10005, dataCountsReporter.incrementalStats().getInputFieldCount());
- assertEquals(2001, dataCountsReporter.incrementalStats().getProcessedRecordCount());
- assertEquals(6003, dataCountsReporter.incrementalStats().getProcessedFieldCount());
- assertEquals(2001, dataCountsReporter.incrementalStats().getLatestRecordTimeStamp().getTime());
-
- assertEquals(11, dataCountsReporter.getLogStatusCallCount());
- }
-
- public void testReportRecordsWritten_Given20000Records() {
- DummyDataCountsReporter dataCountsReporter = new DummyDataCountsReporter();
- dataCountsReporter.setAnalysedFieldsPerRecord(3);
-
- for (int i = 1; i <= 20001; i++) {
- dataCountsReporter.reportRecordWritten(5, i);
- }
-
- assertEquals(20001, dataCountsReporter.incrementalStats().getInputRecordCount());
- assertEquals(100005, dataCountsReporter.incrementalStats().getInputFieldCount());
- assertEquals(20001, dataCountsReporter.incrementalStats().getProcessedRecordCount());
- assertEquals(60003, dataCountsReporter.incrementalStats().getProcessedFieldCount());
- assertEquals(20001, dataCountsReporter.incrementalStats().getLatestRecordTimeStamp().getTime());
-
- assertEquals(29, dataCountsReporter.getLogStatusCallCount());
+ assertEquals(0, dataCountsReporter.getLogStatusCallCount());
}
public void testReportRecordsWritten_Given30000Records() {
@@ -255,9 +203,61 @@ public class DataCountsReporterTests extends ESTestCase {
assertEquals(90003, dataCountsReporter.incrementalStats().getProcessedFieldCount());
assertEquals(30001, dataCountsReporter.incrementalStats().getLatestRecordTimeStamp().getTime());
- assertEquals(30, dataCountsReporter.getLogStatusCallCount());
+ assertEquals(3, dataCountsReporter.getLogStatusCallCount());
}
+ public void testReportRecordsWritten_Given100_000Records() {
+ DummyDataCountsReporter dataCountsReporter = new DummyDataCountsReporter();
+ dataCountsReporter.setAnalysedFieldsPerRecord(3);
+
+ for (int i = 1; i <= 100000; i++) {
+ dataCountsReporter.reportRecordWritten(5, i);
+ }
+
+ assertEquals(100000, dataCountsReporter.incrementalStats().getInputRecordCount());
+ assertEquals(500000, dataCountsReporter.incrementalStats().getInputFieldCount());
+ assertEquals(100000, dataCountsReporter.incrementalStats().getProcessedRecordCount());
+ assertEquals(300000, dataCountsReporter.incrementalStats().getProcessedFieldCount());
+ assertEquals(100000, dataCountsReporter.incrementalStats().getLatestRecordTimeStamp().getTime());
+
+ assertEquals(10, dataCountsReporter.getLogStatusCallCount());
+ }
+
+ public void testReportRecordsWritten_Given1_000_000Records() {
+ DummyDataCountsReporter dataCountsReporter = new DummyDataCountsReporter();
+ dataCountsReporter.setAnalysedFieldsPerRecord(3);
+
+ for (int i = 1; i <= 1_000_000; i++) {
+ dataCountsReporter.reportRecordWritten(5, i);
+ }
+
+ assertEquals(1_000_000, dataCountsReporter.incrementalStats().getInputRecordCount());
+ assertEquals(5_000_000, dataCountsReporter.incrementalStats().getInputFieldCount());
+ assertEquals(1_000_000, dataCountsReporter.incrementalStats().getProcessedRecordCount());
+ assertEquals(3_000_000, dataCountsReporter.incrementalStats().getProcessedFieldCount());
+ assertEquals(1_000_000, dataCountsReporter.incrementalStats().getLatestRecordTimeStamp().getTime());
+
+ assertEquals(19, dataCountsReporter.getLogStatusCallCount());
+ }
+
+ public void testReportRecordsWritten_Given2_000_000Records() {
+ DummyDataCountsReporter dataCountsReporter = new DummyDataCountsReporter();
+ dataCountsReporter.setAnalysedFieldsPerRecord(3);
+
+ for (int i = 1; i <= 2_000_000; i++) {
+ dataCountsReporter.reportRecordWritten(5, i);
+ }
+
+ assertEquals(2000000, dataCountsReporter.incrementalStats().getInputRecordCount());
+ assertEquals(10000000, dataCountsReporter.incrementalStats().getInputFieldCount());
+ assertEquals(2000000, dataCountsReporter.incrementalStats().getProcessedRecordCount());
+ assertEquals(6000000, dataCountsReporter.incrementalStats().getProcessedFieldCount());
+ assertEquals(2000000, dataCountsReporter.incrementalStats().getLatestRecordTimeStamp().getTime());
+
+ assertEquals(20, dataCountsReporter.getLogStatusCallCount());
+ }
+
+
public void testFinishReporting() {
DataCountsReporter dataCountsReporter = new DataCountsReporter(settings, job, new DataCounts(job.getId()),
jobDataCountsPersister);
diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/DummyDataCountsReporter.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/DummyDataCountsReporter.java
index fb4de2eeffb..91e8de2f2e8 100644
--- a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/DummyDataCountsReporter.java
+++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/DummyDataCountsReporter.java
@@ -37,9 +37,13 @@ class DummyDataCountsReporter extends DataCountsReporter {
* Override the method here an count the calls
*/
@Override
- protected void logStatus(long totalRecords) {
- super.logStatus(totalRecords);
- ++logStatusCallCount;
+ protected boolean logStatus(long totalRecords) {
+ boolean messageLogged = super.logStatus(totalRecords);
+ if (messageLogged) {
+ ++logStatusCallCount;
+ }
+
+ return messageLogged;
}
/**