[ML] Reduce frequency of data counts log message (elastic/x-pack-elasticsearch#1030)
* Reduce data counts log message frequency * Start logging at 10,000 records Original commit: elastic/x-pack-elasticsearch@af6c791b6e
This commit is contained in:
parent
f11df2c0c7
commit
78a3c32ec4
|
@ -5,7 +5,6 @@
|
||||||
*/
|
*/
|
||||||
package org.elasticsearch.xpack.ml.job.process;
|
package org.elasticsearch.xpack.ml.job.process;
|
||||||
|
|
||||||
import org.apache.logging.log4j.message.ParameterizedMessage;
|
|
||||||
import org.elasticsearch.action.ActionListener;
|
import org.elasticsearch.action.ActionListener;
|
||||||
import org.elasticsearch.common.component.AbstractComponent;
|
import org.elasticsearch.common.component.AbstractComponent;
|
||||||
import org.elasticsearch.common.settings.Setting;
|
import org.elasticsearch.common.settings.Setting;
|
||||||
|
@ -27,9 +26,10 @@ import java.util.function.Function;
|
||||||
*
|
*
|
||||||
* Stats are logged at specific stages
|
* Stats are logged at specific stages
|
||||||
* <ol>
|
* <ol>
|
||||||
* <li>Every 100 records for the first 1000 records</li>
|
* <li>Every 10,000 records for the first 100,000 records</li>
|
||||||
* <li>Every 1000 records for the first 20000 records</li>
|
* <li>Every 100,000 records until 1,000,000 records</li>
|
||||||
* <li>Every 10000 records after 20000 records</li>
|
* <li>Every 1,000,000 records until 10,000,000 records</li>
|
||||||
|
* <li>and so on...</li>
|
||||||
* </ol>
|
* </ol>
|
||||||
* The {@link #reportingBoundaryFunction} member points to a different
|
* The {@link #reportingBoundaryFunction} member points to a different
|
||||||
* function depending on which reporting stage is the current, the function
|
* function depending on which reporting stage is the current, the function
|
||||||
|
@ -86,7 +86,7 @@ public class DataCountsReporter extends AbstractComponent {
|
||||||
acceptablePercentDateParseErrors = ACCEPTABLE_PERCENTAGE_DATE_PARSE_ERRORS_SETTING.get(settings);
|
acceptablePercentDateParseErrors = ACCEPTABLE_PERCENTAGE_DATE_PARSE_ERRORS_SETTING.get(settings);
|
||||||
acceptablePercentOutOfOrderErrors = ACCEPTABLE_PERCENTAGE_OUT_OF_ORDER_ERRORS_SETTING.get(settings);
|
acceptablePercentOutOfOrderErrors = ACCEPTABLE_PERCENTAGE_OUT_OF_ORDER_ERRORS_SETTING.get(settings);
|
||||||
|
|
||||||
reportingBoundaryFunction = this::reportEvery100Records;
|
reportingBoundaryFunction = this::reportEvery10000Records;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -275,9 +275,9 @@ public class DataCountsReporter extends AbstractComponent {
|
||||||
* processes more data. Logging every 10000 records when the data rate is
|
* processes more data. Logging every 10000 records when the data rate is
|
||||||
* 40000 per second quickly rolls the logs.
|
* 40000 per second quickly rolls the logs.
|
||||||
*/
|
*/
|
||||||
protected void logStatus(long totalRecords) {
|
protected boolean logStatus(long totalRecords) {
|
||||||
if (++logCount % logEvery != 0) {
|
if (++logCount % logEvery != 0) {
|
||||||
return;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
String status = String.format(Locale.ROOT,
|
String status = String.format(Locale.ROOT,
|
||||||
|
@ -287,48 +287,32 @@ public class DataCountsReporter extends AbstractComponent {
|
||||||
logger.info(status);
|
logger.info(status);
|
||||||
|
|
||||||
int log10TotalRecords = (int) Math.floor(Math.log10(totalRecords));
|
int log10TotalRecords = (int) Math.floor(Math.log10(totalRecords));
|
||||||
// Start reducing the logging rate after 10 million records have been seen
|
// Start reducing the logging rate after a million records have been seen
|
||||||
if (log10TotalRecords > 6) {
|
if (log10TotalRecords > 5) {
|
||||||
logEvery = (int) Math.pow(10.0, log10TotalRecords - 6);
|
logEvery = (int) Math.pow(10.0, log10TotalRecords - 5);
|
||||||
logCount = 0;
|
logCount = 0;
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
private boolean reportEvery100Records(long totalRecords) {
|
|
||||||
if (totalRecords > 1000) {
|
|
||||||
lastRecordCountQuotient = totalRecords / 1000;
|
|
||||||
reportingBoundaryFunction = this::reportEvery1000Records;
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
long quotient = totalRecords / 100;
|
|
||||||
if (quotient > lastRecordCountQuotient) {
|
|
||||||
lastRecordCountQuotient = quotient;
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
private boolean reportEvery1000Records(long totalRecords) {
|
|
||||||
|
|
||||||
if (totalRecords > 20000) {
|
|
||||||
lastRecordCountQuotient = totalRecords / 10000;
|
|
||||||
reportingBoundaryFunction = this::reportEvery10000Records;
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
long quotient = totalRecords / 1000;
|
|
||||||
if (quotient > lastRecordCountQuotient) {
|
|
||||||
lastRecordCountQuotient = quotient;
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
private boolean reportEvery10000Records(long totalRecords) {
|
private boolean reportEvery10000Records(long totalRecords) {
|
||||||
long quotient = totalRecords / 10000;
|
if (totalRecords > 100_000) {
|
||||||
|
lastRecordCountQuotient = totalRecords / 100_000;
|
||||||
|
reportingBoundaryFunction = this::reportEvery100000Records;
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
long quotient = totalRecords / 10_000;
|
||||||
|
if (quotient > lastRecordCountQuotient) {
|
||||||
|
lastRecordCountQuotient = quotient;
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
private boolean reportEvery100000Records(long totalRecords) {
|
||||||
|
long quotient = totalRecords / 100_000;
|
||||||
if (quotient > lastRecordCountQuotient) {
|
if (quotient > lastRecordCountQuotient) {
|
||||||
lastRecordCountQuotient = quotient;
|
lastRecordCountQuotient = quotient;
|
||||||
return true;
|
return true;
|
||||||
|
|
|
@ -172,73 +172,21 @@ public class DataCountsReporterTests extends ESTestCase {
|
||||||
verify(jobDataCountsPersister, never()).persistDataCounts(anyString(), any(DataCounts.class), any());
|
verify(jobDataCountsPersister, never()).persistDataCounts(anyString(), any(DataCounts.class), any());
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testReportRecordsWritten_Given100Records() {
|
public void testReportRecordsWritten_Given9999Records() {
|
||||||
DummyDataCountsReporter dataCountsReporter = new DummyDataCountsReporter();
|
DummyDataCountsReporter dataCountsReporter = new DummyDataCountsReporter();
|
||||||
dataCountsReporter.setAnalysedFieldsPerRecord(3);
|
dataCountsReporter.setAnalysedFieldsPerRecord(3);
|
||||||
|
|
||||||
for (int i = 1; i <= 101; i++) {
|
for (int i = 1; i <= 9999; i++) {
|
||||||
dataCountsReporter.reportRecordWritten(5, i);
|
dataCountsReporter.reportRecordWritten(5, i);
|
||||||
}
|
}
|
||||||
|
|
||||||
assertEquals(101, dataCountsReporter.incrementalStats().getInputRecordCount());
|
assertEquals(9999, dataCountsReporter.incrementalStats().getInputRecordCount());
|
||||||
assertEquals(505, dataCountsReporter.incrementalStats().getInputFieldCount());
|
assertEquals(49995, dataCountsReporter.incrementalStats().getInputFieldCount());
|
||||||
assertEquals(101, dataCountsReporter.incrementalStats().getProcessedRecordCount());
|
assertEquals(9999, dataCountsReporter.incrementalStats().getProcessedRecordCount());
|
||||||
assertEquals(303, dataCountsReporter.incrementalStats().getProcessedFieldCount());
|
assertEquals(29997, dataCountsReporter.incrementalStats().getProcessedFieldCount());
|
||||||
assertEquals(101, dataCountsReporter.incrementalStats().getLatestRecordTimeStamp().getTime());
|
assertEquals(9999, dataCountsReporter.incrementalStats().getLatestRecordTimeStamp().getTime());
|
||||||
|
|
||||||
assertEquals(1, dataCountsReporter.getLogStatusCallCount());
|
assertEquals(0, dataCountsReporter.getLogStatusCallCount());
|
||||||
}
|
|
||||||
|
|
||||||
public void testReportRecordsWritten_Given1000Records() {
|
|
||||||
DummyDataCountsReporter dataCountsReporter = new DummyDataCountsReporter();
|
|
||||||
|
|
||||||
dataCountsReporter.setAnalysedFieldsPerRecord(3);
|
|
||||||
|
|
||||||
for (int i = 1; i <= 1001; i++) {
|
|
||||||
dataCountsReporter.reportRecordWritten(5, i);
|
|
||||||
}
|
|
||||||
|
|
||||||
assertEquals(1001, dataCountsReporter.incrementalStats().getInputRecordCount());
|
|
||||||
assertEquals(5005, dataCountsReporter.incrementalStats().getInputFieldCount());
|
|
||||||
assertEquals(1001, dataCountsReporter.incrementalStats().getProcessedRecordCount());
|
|
||||||
assertEquals(3003, dataCountsReporter.incrementalStats().getProcessedFieldCount());
|
|
||||||
assertEquals(1001, dataCountsReporter.incrementalStats().getLatestRecordTimeStamp().getTime());
|
|
||||||
|
|
||||||
assertEquals(10, dataCountsReporter.getLogStatusCallCount());
|
|
||||||
}
|
|
||||||
|
|
||||||
public void testReportRecordsWritten_Given2000Records() {
|
|
||||||
DummyDataCountsReporter dataCountsReporter = new DummyDataCountsReporter();
|
|
||||||
dataCountsReporter.setAnalysedFieldsPerRecord(3);
|
|
||||||
|
|
||||||
for (int i = 1; i <= 2001; i++) {
|
|
||||||
dataCountsReporter.reportRecordWritten(5, i);
|
|
||||||
}
|
|
||||||
|
|
||||||
assertEquals(2001, dataCountsReporter.incrementalStats().getInputRecordCount());
|
|
||||||
assertEquals(10005, dataCountsReporter.incrementalStats().getInputFieldCount());
|
|
||||||
assertEquals(2001, dataCountsReporter.incrementalStats().getProcessedRecordCount());
|
|
||||||
assertEquals(6003, dataCountsReporter.incrementalStats().getProcessedFieldCount());
|
|
||||||
assertEquals(2001, dataCountsReporter.incrementalStats().getLatestRecordTimeStamp().getTime());
|
|
||||||
|
|
||||||
assertEquals(11, dataCountsReporter.getLogStatusCallCount());
|
|
||||||
}
|
|
||||||
|
|
||||||
public void testReportRecordsWritten_Given20000Records() {
|
|
||||||
DummyDataCountsReporter dataCountsReporter = new DummyDataCountsReporter();
|
|
||||||
dataCountsReporter.setAnalysedFieldsPerRecord(3);
|
|
||||||
|
|
||||||
for (int i = 1; i <= 20001; i++) {
|
|
||||||
dataCountsReporter.reportRecordWritten(5, i);
|
|
||||||
}
|
|
||||||
|
|
||||||
assertEquals(20001, dataCountsReporter.incrementalStats().getInputRecordCount());
|
|
||||||
assertEquals(100005, dataCountsReporter.incrementalStats().getInputFieldCount());
|
|
||||||
assertEquals(20001, dataCountsReporter.incrementalStats().getProcessedRecordCount());
|
|
||||||
assertEquals(60003, dataCountsReporter.incrementalStats().getProcessedFieldCount());
|
|
||||||
assertEquals(20001, dataCountsReporter.incrementalStats().getLatestRecordTimeStamp().getTime());
|
|
||||||
|
|
||||||
assertEquals(29, dataCountsReporter.getLogStatusCallCount());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testReportRecordsWritten_Given30000Records() {
|
public void testReportRecordsWritten_Given30000Records() {
|
||||||
|
@ -255,9 +203,61 @@ public class DataCountsReporterTests extends ESTestCase {
|
||||||
assertEquals(90003, dataCountsReporter.incrementalStats().getProcessedFieldCount());
|
assertEquals(90003, dataCountsReporter.incrementalStats().getProcessedFieldCount());
|
||||||
assertEquals(30001, dataCountsReporter.incrementalStats().getLatestRecordTimeStamp().getTime());
|
assertEquals(30001, dataCountsReporter.incrementalStats().getLatestRecordTimeStamp().getTime());
|
||||||
|
|
||||||
assertEquals(30, dataCountsReporter.getLogStatusCallCount());
|
assertEquals(3, dataCountsReporter.getLogStatusCallCount());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testReportRecordsWritten_Given100_000Records() {
|
||||||
|
DummyDataCountsReporter dataCountsReporter = new DummyDataCountsReporter();
|
||||||
|
dataCountsReporter.setAnalysedFieldsPerRecord(3);
|
||||||
|
|
||||||
|
for (int i = 1; i <= 100000; i++) {
|
||||||
|
dataCountsReporter.reportRecordWritten(5, i);
|
||||||
|
}
|
||||||
|
|
||||||
|
assertEquals(100000, dataCountsReporter.incrementalStats().getInputRecordCount());
|
||||||
|
assertEquals(500000, dataCountsReporter.incrementalStats().getInputFieldCount());
|
||||||
|
assertEquals(100000, dataCountsReporter.incrementalStats().getProcessedRecordCount());
|
||||||
|
assertEquals(300000, dataCountsReporter.incrementalStats().getProcessedFieldCount());
|
||||||
|
assertEquals(100000, dataCountsReporter.incrementalStats().getLatestRecordTimeStamp().getTime());
|
||||||
|
|
||||||
|
assertEquals(10, dataCountsReporter.getLogStatusCallCount());
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testReportRecordsWritten_Given1_000_000Records() {
|
||||||
|
DummyDataCountsReporter dataCountsReporter = new DummyDataCountsReporter();
|
||||||
|
dataCountsReporter.setAnalysedFieldsPerRecord(3);
|
||||||
|
|
||||||
|
for (int i = 1; i <= 1_000_000; i++) {
|
||||||
|
dataCountsReporter.reportRecordWritten(5, i);
|
||||||
|
}
|
||||||
|
|
||||||
|
assertEquals(1_000_000, dataCountsReporter.incrementalStats().getInputRecordCount());
|
||||||
|
assertEquals(5_000_000, dataCountsReporter.incrementalStats().getInputFieldCount());
|
||||||
|
assertEquals(1_000_000, dataCountsReporter.incrementalStats().getProcessedRecordCount());
|
||||||
|
assertEquals(3_000_000, dataCountsReporter.incrementalStats().getProcessedFieldCount());
|
||||||
|
assertEquals(1_000_000, dataCountsReporter.incrementalStats().getLatestRecordTimeStamp().getTime());
|
||||||
|
|
||||||
|
assertEquals(19, dataCountsReporter.getLogStatusCallCount());
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testReportRecordsWritten_Given2_000_000Records() {
|
||||||
|
DummyDataCountsReporter dataCountsReporter = new DummyDataCountsReporter();
|
||||||
|
dataCountsReporter.setAnalysedFieldsPerRecord(3);
|
||||||
|
|
||||||
|
for (int i = 1; i <= 2_000_000; i++) {
|
||||||
|
dataCountsReporter.reportRecordWritten(5, i);
|
||||||
|
}
|
||||||
|
|
||||||
|
assertEquals(2000000, dataCountsReporter.incrementalStats().getInputRecordCount());
|
||||||
|
assertEquals(10000000, dataCountsReporter.incrementalStats().getInputFieldCount());
|
||||||
|
assertEquals(2000000, dataCountsReporter.incrementalStats().getProcessedRecordCount());
|
||||||
|
assertEquals(6000000, dataCountsReporter.incrementalStats().getProcessedFieldCount());
|
||||||
|
assertEquals(2000000, dataCountsReporter.incrementalStats().getLatestRecordTimeStamp().getTime());
|
||||||
|
|
||||||
|
assertEquals(20, dataCountsReporter.getLogStatusCallCount());
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
public void testFinishReporting() {
|
public void testFinishReporting() {
|
||||||
DataCountsReporter dataCountsReporter = new DataCountsReporter(settings, job, new DataCounts(job.getId()),
|
DataCountsReporter dataCountsReporter = new DataCountsReporter(settings, job, new DataCounts(job.getId()),
|
||||||
jobDataCountsPersister);
|
jobDataCountsPersister);
|
||||||
|
|
|
@ -37,11 +37,15 @@ class DummyDataCountsReporter extends DataCountsReporter {
|
||||||
* Override the method here an count the calls
|
* Override the method here an count the calls
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
protected void logStatus(long totalRecords) {
|
protected boolean logStatus(long totalRecords) {
|
||||||
super.logStatus(totalRecords);
|
boolean messageLogged = super.logStatus(totalRecords);
|
||||||
|
if (messageLogged) {
|
||||||
++logStatusCallCount;
|
++logStatusCallCount;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return messageLogged;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @return Then number of times {@link #logStatus(long)} was called.
|
* @return Then number of times {@link #logStatus(long)} was called.
|
||||||
*/
|
*/
|
||||||
|
|
Loading…
Reference in New Issue