Persist DataCounts every 10 seconds from a thread pool scheduled task. (elastic/elasticsearch#388)

* Persist DataCounts every 10 seconds from a thread pool scheduled task.

Also rework the isReportingBoundary function changing the function as the boundary changes

* Remove overloaded constructors from StatusReporter and DummyStatusReporter

* Persist dataCounts in a background thread from the status reporter

* Use generic threadpool

Original commit: elastic/x-pack-elasticsearch@f00c1067aa
This commit is contained in:
David Kyle 2016-11-29 16:37:47 +00:00 committed by GitHub
parent 688b5cc202
commit f88216eaa5
6 changed files with 310 additions and 163 deletions

View File

@ -132,8 +132,8 @@ public class AutodetectProcessManager extends AbstractComponent implements DataP
ExecutorService executorService = threadPool.executor(PrelertPlugin.AUTODETECT_PROCESS_THREAD_POOL_NAME); ExecutorService executorService = threadPool.executor(PrelertPlugin.AUTODETECT_PROCESS_THREAD_POOL_NAME);
UsageReporter usageReporter = new UsageReporter(settings, job.getJobId(), usagePersister); UsageReporter usageReporter = new UsageReporter(settings, job.getJobId(), usagePersister);
StatusReporter statusReporter = StatusReporter statusReporter = new StatusReporter(threadPool, settings, job.getJobId(),
new StatusReporter(settings, job.getJobId(), jobProvider.dataCounts(jobId), usageReporter, jobDataCountsPersister); jobProvider.dataCounts(jobId), usageReporter, jobDataCountsPersister);
AutoDetectResultProcessor processor = new AutoDetectResultProcessor(new NoOpRenormaliser(), jobResultsPersister, parser); AutoDetectResultProcessor processor = new AutoDetectResultProcessor(new NoOpRenormaliser(), jobResultsPersister, parser);
AutodetectProcess process = null; AutodetectProcess process = null;

View File

@ -87,6 +87,7 @@ public class AutodetectCommunicator implements Closeable {
@Override @Override
public void close() throws IOException { public void close() throws IOException {
checkAndRun(() -> Messages.getMessage(Messages.JOB_DATA_CONCURRENT_USE_CLOSE, jobId), () -> { checkAndRun(() -> Messages.getMessage(Messages.JOB_DATA_CONCURRENT_USE_CLOSE, jobId), () -> {
statusReporter.close();
autodetectProcess.close(); autodetectProcess.close();
autoDetectResultProcessor.awaitCompletion(); autoDetectResultProcessor.awaitCompletion();
return null; return null;

View File

@ -9,22 +9,39 @@ import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property; import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.prelert.PrelertPlugin;
import org.elasticsearch.xpack.prelert.job.DataCounts; import org.elasticsearch.xpack.prelert.job.DataCounts;
import org.elasticsearch.xpack.prelert.job.persistence.JobDataCountsPersister; import org.elasticsearch.xpack.prelert.job.persistence.JobDataCountsPersister;
import org.elasticsearch.xpack.prelert.job.usage.UsageReporter; import org.elasticsearch.xpack.prelert.job.usage.UsageReporter;
import java.io.Closeable;
import java.util.Date; import java.util.Date;
import java.util.Locale; import java.util.Locale;
import java.util.function.Function;
/** /**
* Status reporter for tracking all the good/bad * Status reporter for tracking counts of the good/bad records written to the API.
* records written to the API. Call one of the reportXXX() methods * Call one of the reportXXX() methods to update the records counts.
* to update the records counts if {@linkplain #isReportingBoundary(long)} *
* returns true then the count will be logged and the counts persisted * Stats are logged at specific stages
* via the {@linkplain JobDataCountsPersister}. * <ol>
* <li>Every 100 records for the first 1000 records</li>
* <li>Every 1000 records for the first 20000 records</li>
* <li>Every 10000 records after 20000 records</li>
* </ol>
* The {@link #reportingBoundaryFunction} member points to a different
* function depending on which reporting stage is the current, the function
* changes when each of the reporting stages are passed. If the
* function returns {@code true} the usage is logged.
*
* DataCounts are persisted periodically in a scheduled task via
* {@linkplain JobDataCountsPersister}, {@link #close()} must be called to
* cancel the scheduled task.
*/ */
public class StatusReporter extends AbstractComponent { public class StatusReporter extends AbstractComponent implements Closeable {
/** /**
* The max percentage of date parse errors allowed before * The max percentage of date parse errors allowed before
* an exception is thrown. * an exception is thrown.
@ -39,6 +56,8 @@ public class StatusReporter extends AbstractComponent {
public static final Setting<Integer> ACCEPTABLE_PERCENTAGE_OUT_OF_ORDER_ERRORS_SETTING = Setting public static final Setting<Integer> ACCEPTABLE_PERCENTAGE_OUT_OF_ORDER_ERRORS_SETTING = Setting
.intSetting("max.percent.outoforder.errors", 25, Property.NodeScope); .intSetting("max.percent.outoforder.errors", 25, Property.NodeScope);
private static final TimeValue PERSIST_INTERVAL = TimeValue.timeValueMillis(10_000L);
private final String jobId; private final String jobId;
private final UsageReporter usageReporter; private final UsageReporter usageReporter;
private final JobDataCountsPersister dataCountsPersister; private final JobDataCountsPersister dataCountsPersister;
@ -48,7 +67,6 @@ public class StatusReporter extends AbstractComponent {
private long analyzedFieldsPerRecord = 1; private long analyzedFieldsPerRecord = 1;
private long recordCountDivisor = 100;
private long lastRecordCountQuotient = 0; private long lastRecordCountQuotient = 0;
private long logEvery = 1; private long logEvery = 1;
private long logCount = 0; private long logCount = 0;
@ -56,28 +74,32 @@ public class StatusReporter extends AbstractComponent {
private final int acceptablePercentDateParseErrors; private final int acceptablePercentDateParseErrors;
private final int acceptablePercentOutOfOrderErrors; private final int acceptablePercentOutOfOrderErrors;
public StatusReporter(Settings settings, String jobId, UsageReporter usageReporter, private Function<Long, Boolean> reportingBoundaryFunction;
JobDataCountsPersister dataCountsPersister) {
this(settings, jobId, usageReporter, dataCountsPersister, new DataCounts(jobId));
}
public StatusReporter(Settings settings, String jobId, DataCounts counts, UsageReporter usageReporter, private volatile boolean persistDataCountsOnNextRecord;
JobDataCountsPersister dataCountsPersister) { private final ThreadPool.Cancellable persistDataCountsScheduledAction;
this(settings, jobId, usageReporter, dataCountsPersister, new DataCounts(counts)); private final ThreadPool threadPool;
}
public StatusReporter(ThreadPool threadPool, Settings settings, String jobId, DataCounts counts, UsageReporter usageReporter,
JobDataCountsPersister dataCountsPersister) {
private StatusReporter(Settings settings, String jobId, UsageReporter usageReporter, JobDataCountsPersister dataCountsPersister,
DataCounts totalCounts) {
super(settings); super(settings);
this.jobId = jobId; this.jobId = jobId;
this.usageReporter = usageReporter; this.usageReporter = usageReporter;
this.dataCountsPersister = dataCountsPersister; this.dataCountsPersister = dataCountsPersister;
totalRecordStats = totalCounts; totalRecordStats = counts;
incrementalRecordStats = new DataCounts(jobId); incrementalRecordStats = new DataCounts(jobId);
acceptablePercentDateParseErrors = ACCEPTABLE_PERCENTAGE_DATE_PARSE_ERRORS_SETTING.get(settings); acceptablePercentDateParseErrors = ACCEPTABLE_PERCENTAGE_DATE_PARSE_ERRORS_SETTING.get(settings);
acceptablePercentOutOfOrderErrors = ACCEPTABLE_PERCENTAGE_OUT_OF_ORDER_ERRORS_SETTING.get(settings); acceptablePercentOutOfOrderErrors = ACCEPTABLE_PERCENTAGE_OUT_OF_ORDER_ERRORS_SETTING.get(settings);
reportingBoundaryFunction = this::reportEvery100Records;
this.threadPool = threadPool;
persistDataCountsScheduledAction = threadPool.scheduleWithFixedDelay(() -> persistDataCountsOnNextRecord = true,
PERSIST_INTERVAL, ThreadPool.Names.GENERIC);
} }
/** /**
@ -103,17 +125,22 @@ public class StatusReporter extends AbstractComponent {
incrementalRecordStats.incrementProcessedRecordCount(1); incrementalRecordStats.incrementProcessedRecordCount(1);
incrementalRecordStats.setLatestRecordTimeStamp(recordDate); incrementalRecordStats.setLatestRecordTimeStamp(recordDate);
if (totalRecordStats.getEarliestRecordTimeStamp() == null) { boolean isFirstReport = totalRecordStats.getEarliestRecordTimeStamp() == null;
if (isFirstReport) {
totalRecordStats.setEarliestRecordTimeStamp(recordDate); totalRecordStats.setEarliestRecordTimeStamp(recordDate);
incrementalRecordStats.setEarliestRecordTimeStamp(recordDate); incrementalRecordStats.setEarliestRecordTimeStamp(recordDate);
} }
// report at various boundaries // report at various boundaries
long totalRecords = getInputRecordCount(); long totalRecords = getInputRecordCount();
if (isReportingBoundary(totalRecords)) { if (reportingBoundaryFunction.apply(totalRecords)) {
logStatus(totalRecords); logStatus(totalRecords);
}
dataCountsPersister.persistDataCounts(jobId, runningTotalStats()); if (persistDataCountsOnNextRecord) {
DataCounts copy = new DataCounts(runningTotalStats());
threadPool.generic().submit(() -> dataCountsPersister.persistDataCounts(jobId, copy));
persistDataCountsOnNextRecord = false;
} }
} }
@ -249,7 +276,7 @@ public class StatusReporter extends AbstractComponent {
* processes more data. Logging every 10000 records when the data rate is * processes more data. Logging every 10000 records when the data rate is
* 40000 per second quickly rolls the logs. * 40000 per second quickly rolls the logs.
*/ */
private void logStatus(long totalRecords) { protected void logStatus(long totalRecords) {
if (++logCount % logEvery != 0) { if (++logCount % logEvery != 0) {
return; return;
} }
@ -268,36 +295,41 @@ public class StatusReporter extends AbstractComponent {
} }
} }
/** private boolean reportEvery100Records(long totalRecords) {
* Don't update status for every update instead update on these if (totalRecords > 1000) {
* boundaries lastRecordCountQuotient = totalRecords / 1000;
* <ol> reportingBoundaryFunction = this::reportEvery1000Records;
* <li>For the first 1000 records update every 100</li> return false;
* <li>After 1000 records update every 1000</li>
* <li>After 20000 records update every 10000</li>
* </ol>
*/
private boolean isReportingBoundary(long totalRecords) {
// after 20,000 records update every 10,000
int divisor = 10000;
if (totalRecords <= 1000) {
// for the first 1000 records update every 100
divisor = 100;
} else if (totalRecords <= 20000) {
// before 20,000 records update every 1000
divisor = 1000;
} }
if (divisor != recordCountDivisor) { long quotient = totalRecords / 100;
// have crossed one of the reporting bands if (quotient > lastRecordCountQuotient) {
recordCountDivisor = divisor; lastRecordCountQuotient = quotient;
lastRecordCountQuotient = totalRecords / divisor; return true;
}
return false; return false;
} }
long quotient = totalRecords / divisor; private boolean reportEvery1000Records(long totalRecords) {
if (totalRecords > 20000) {
lastRecordCountQuotient = totalRecords / 10000;
reportingBoundaryFunction = this::reportEvery10000Records;
return false;
}
long quotient = totalRecords / 1000;
if (quotient > lastRecordCountQuotient) {
lastRecordCountQuotient = quotient;
return true;
}
return false;
}
private boolean reportEvery10000Records(long totalRecords) {
long quotient = totalRecords / 10000;
if (quotient > lastRecordCountQuotient) { if (quotient > lastRecordCountQuotient) {
lastRecordCountQuotient = quotient; lastRecordCountQuotient = quotient;
return true; return true;
@ -319,4 +351,9 @@ public class StatusReporter extends AbstractComponent {
totalRecordStats.calcProcessedFieldCount(getAnalysedFieldsPerRecord()); totalRecordStats.calcProcessedFieldCount(getAnalysedFieldsPerRecord());
return totalRecordStats; return totalRecordStats;
} }
@Override
public void close() {
persistDataCountsScheduledAction.cancel();
}
} }

View File

@ -20,6 +20,7 @@ import org.elasticsearch.xpack.prelert.job.usage.UsageReporter;
public class CountingInputStreamTests extends ESTestCase { public class CountingInputStreamTests extends ESTestCase {
public void testRead_OneByteAtATime() throws IOException { public void testRead_OneByteAtATime() throws IOException {
UsageReporter usageReporter = new UsageReporter(Settings.EMPTY, "foo", Mockito.mock(UsagePersister.class)); UsageReporter usageReporter = new UsageReporter(Settings.EMPTY, "foo", Mockito.mock(UsagePersister.class));
DummyStatusReporter statusReporter = new DummyStatusReporter(usageReporter); DummyStatusReporter statusReporter = new DummyStatusReporter(usageReporter);
@ -32,14 +33,12 @@ public class CountingInputStreamTests extends ESTestCase {
// value of the read() method // value of the read() method
Assert.assertEquals(TEXT.length() + 1, usageReporter.getBytesReadSinceLastReport()); Assert.assertEquals(TEXT.length() + 1, usageReporter.getBytesReadSinceLastReport());
Assert.assertEquals(usageReporter.getBytesReadSinceLastReport(), Assert.assertEquals(usageReporter.getBytesReadSinceLastReport(), statusReporter.getBytesRead());
statusReporter.getBytesRead());
} }
} }
public void testRead_WithBuffer() throws IOException { public void testRead_WithBuffer() throws IOException {
final String TEXT = "To the man who only has a hammer," final String TEXT = "To the man who only has a hammer, everything he encounters begins to look like a nail.";
+ " everything he encounters begins to look like a nail.";
UsageReporter usageReporter = new UsageReporter(Settings.EMPTY, "foo", Mockito.mock(UsagePersister.class)); UsageReporter usageReporter = new UsageReporter(Settings.EMPTY, "foo", Mockito.mock(UsagePersister.class));
DummyStatusReporter statusReporter = new DummyStatusReporter(usageReporter); DummyStatusReporter statusReporter = new DummyStatusReporter(usageReporter);
@ -53,14 +52,12 @@ public class CountingInputStreamTests extends ESTestCase {
// the return value of the read() method // the return value of the read() method
Assert.assertEquals(TEXT.length() - 1, usageReporter.getBytesReadSinceLastReport()); Assert.assertEquals(TEXT.length() - 1, usageReporter.getBytesReadSinceLastReport());
Assert.assertEquals(usageReporter.getBytesReadSinceLastReport(), Assert.assertEquals(usageReporter.getBytesReadSinceLastReport(), statusReporter.getBytesRead());
statusReporter.getBytesRead());
} }
} }
public void testRead_WithTinyBuffer() throws IOException { public void testRead_WithTinyBuffer() throws IOException {
final String TEXT = "To the man who only has a hammer," final String TEXT = "To the man who only has a hammer, everything he encounters begins to look like a nail.";
+ " everything he encounters begins to look like a nail.";
UsageReporter usageReporter = new UsageReporter(Settings.EMPTY, "foo", Mockito.mock(UsagePersister.class)); UsageReporter usageReporter = new UsageReporter(Settings.EMPTY, "foo", Mockito.mock(UsagePersister.class));
DummyStatusReporter statusReporter = new DummyStatusReporter(usageReporter); DummyStatusReporter statusReporter = new DummyStatusReporter(usageReporter);
@ -74,10 +71,8 @@ public class CountingInputStreamTests extends ESTestCase {
// value of the read() method // value of the read() method
Assert.assertEquals(TEXT.length() - 1, usageReporter.getBytesReadSinceLastReport()); Assert.assertEquals(TEXT.length() - 1, usageReporter.getBytesReadSinceLastReport());
Assert.assertEquals(usageReporter.getBytesReadSinceLastReport(), Assert.assertEquals(usageReporter.getBytesReadSinceLastReport(), statusReporter.getBytesRead());
statusReporter.getBytesRead());
} }
} }
} }

View File

@ -6,18 +6,41 @@
package org.elasticsearch.xpack.prelert.job.status; package org.elasticsearch.xpack.prelert.job.status;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.prelert.job.DataCounts;
import org.elasticsearch.xpack.prelert.job.persistence.JobDataCountsPersister; import org.elasticsearch.xpack.prelert.job.persistence.JobDataCountsPersister;
import org.elasticsearch.xpack.prelert.job.usage.UsageReporter; import org.elasticsearch.xpack.prelert.job.usage.UsageReporter;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
/** /**
* Dummy StatusReporter for testing abstract class * Dummy StatusReporter for testing
*/ */
class DummyStatusReporter extends StatusReporter { class DummyStatusReporter extends StatusReporter {
DummyStatusReporter(UsageReporter usageReporter) { int logStatusCallCount = 0;
super(Settings.EMPTY, "DummyJobId", usageReporter, mock(JobDataCountsPersister.class));
public DummyStatusReporter(UsageReporter usageReporter) {
super(mock(ThreadPool.class), Settings.EMPTY, "DummyJobId", new DataCounts("DummyJobId"),
usageReporter, mock(JobDataCountsPersister.class));
} }
/**
* It's difficult to use mocking to get the number of calls to {@link #logStatus(long)}
* and Mockito.spy() doesn't work due to the lambdas used in {@link StatusReporter}.
* Override the method here an count the calls
*/
@Override
protected void logStatus(long totalRecords) {
super.logStatus(totalRecords);
++logStatusCallCount;
}
/**
* @return Then number of times {@link #logStatus(long)} was called.
*/
public int getLogStatusCallCount() {
return logStatusCallCount;
}
} }

View File

@ -8,19 +8,29 @@ package org.elasticsearch.xpack.prelert.job.status;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.env.Environment; import org.elasticsearch.env.Environment;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.prelert.job.DataCounts; import org.elasticsearch.xpack.prelert.job.DataCounts;
import org.elasticsearch.xpack.prelert.job.persistence.JobDataCountsPersister; import org.elasticsearch.xpack.prelert.job.persistence.JobDataCountsPersister;
import org.elasticsearch.xpack.prelert.job.usage.UsageReporter; import org.elasticsearch.xpack.prelert.job.usage.UsageReporter;
import org.junit.Before; import org.junit.Before;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito; import org.mockito.Mockito;
import java.io.IOException;
import java.util.Date; import java.util.Date;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import static org.mockito.Matchers.any; import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.eq; import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never; import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
public class StatusReporterTests extends ESTestCase { public class StatusReporterTests extends ESTestCase {
private static final String JOB_ID = "SR"; private static final String JOB_ID = "SR";
@ -29,8 +39,8 @@ public class StatusReporterTests extends ESTestCase {
private UsageReporter usageReporter; private UsageReporter usageReporter;
private JobDataCountsPersister jobDataCountsPersister; private JobDataCountsPersister jobDataCountsPersister;
private StatusReporter statusReporter; private StatusReporter statusReporter;
private ThreadPool threadPool;
private Settings settings; private Settings settings;
@Before @Before
@ -40,27 +50,43 @@ public class StatusReporterTests extends ESTestCase {
.put(StatusReporter.ACCEPTABLE_PERCENTAGE_OUT_OF_ORDER_ERRORS_SETTING.getKey(), MAX_PERCENT_OUT_OF_ORDER_ERRORS).build(); .put(StatusReporter.ACCEPTABLE_PERCENTAGE_OUT_OF_ORDER_ERRORS_SETTING.getKey(), MAX_PERCENT_OUT_OF_ORDER_ERRORS).build();
usageReporter = Mockito.mock(UsageReporter.class); usageReporter = Mockito.mock(UsageReporter.class);
jobDataCountsPersister = Mockito.mock(JobDataCountsPersister.class); jobDataCountsPersister = Mockito.mock(JobDataCountsPersister.class);
statusReporter = new StatusReporter(settings, JOB_ID, usageReporter, jobDataCountsPersister); threadPool = Mockito.mock(ThreadPool.class);
when(threadPool.scheduleWithFixedDelay(any(Runnable.class), any(), any())).thenReturn(new ThreadPool.Cancellable() {
@Override
public void cancel() {
} }
public void testSettingAcceptablePercentages() { @Override
public boolean isCancelled() {
return false;
}
});
}
public void testSettingAcceptablePercentages() throws IOException {
StatusReporter statusReporter =
new StatusReporter(threadPool, settings, JOB_ID, new DataCounts(JOB_ID), usageReporter, jobDataCountsPersister);
assertEquals(statusReporter.getAcceptablePercentDateParseErrors(), MAX_PERCENT_DATE_PARSE_ERRORS); assertEquals(statusReporter.getAcceptablePercentDateParseErrors(), MAX_PERCENT_DATE_PARSE_ERRORS);
assertEquals(statusReporter.getAcceptablePercentOutOfOrderErrors(), MAX_PERCENT_OUT_OF_ORDER_ERRORS); assertEquals(statusReporter.getAcceptablePercentOutOfOrderErrors(), MAX_PERCENT_OUT_OF_ORDER_ERRORS);
} }
public void testSimpleConstructor() throws Exception { public void testSimpleConstructor() throws Exception {
try (StatusReporter statusReporter = new StatusReporter(threadPool, settings, JOB_ID, new DataCounts(JOB_ID), usageReporter,
jobDataCountsPersister)) {
DataCounts stats = statusReporter.incrementalStats(); DataCounts stats = statusReporter.incrementalStats();
assertNotNull(stats); assertNotNull(stats);
assertAllCountFieldsEqualZero(stats); assertAllCountFieldsEqualZero(stats);
} }
}
public void testComplexConstructor() throws Exception { public void testComplexConstructor() throws Exception {
Environment env = new Environment( Environment env = new Environment(
Settings.builder().put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString()).build()); Settings.builder().put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString()).build());
DataCounts counts = new DataCounts("foo", 1L, 1L, 2L, 0L, 3L, 4L, 5L, new Date(), new Date()); DataCounts counts = new DataCounts("foo", 1L, 1L, 2L, 0L, 3L, 4L, 5L, new Date(), new Date());
statusReporter = new StatusReporter(settings, JOB_ID, counts, usageReporter, jobDataCountsPersister); try (StatusReporter statusReporter =
new StatusReporter(threadPool, settings, JOB_ID, counts, usageReporter, jobDataCountsPersister)) {
DataCounts stats = statusReporter.incrementalStats(); DataCounts stats = statusReporter.incrementalStats();
assertNotNull(stats); assertNotNull(stats);
assertAllCountFieldsEqualZero(stats); assertAllCountFieldsEqualZero(stats);
@ -72,8 +98,11 @@ public class StatusReporterTests extends ESTestCase {
assertEquals(5, statusReporter.getOutOfOrderRecordCount()); assertEquals(5, statusReporter.getOutOfOrderRecordCount());
assertNull(stats.getEarliestRecordTimeStamp()); assertNull(stats.getEarliestRecordTimeStamp());
} }
}
public void testResetIncrementalCounts() throws Exception { public void testResetIncrementalCounts() throws Exception {
try (StatusReporter statusReporter = new StatusReporter(threadPool, settings, JOB_ID, new DataCounts(JOB_ID), usageReporter,
jobDataCountsPersister)) {
DataCounts stats = statusReporter.incrementalStats(); DataCounts stats = statusReporter.incrementalStats();
assertNotNull(stats); assertNotNull(stats);
assertAllCountFieldsEqualZero(stats); assertAllCountFieldsEqualZero(stats);
@ -95,14 +124,20 @@ public class StatusReporterTests extends ESTestCase {
assertNotNull(stats); assertNotNull(stats);
assertAllCountFieldsEqualZero(stats); assertAllCountFieldsEqualZero(stats);
} }
}
public void testReportLatestTimeIncrementalStats() { public void testReportLatestTimeIncrementalStats() throws IOException {
try (StatusReporter statusReporter = new StatusReporter(threadPool, settings, JOB_ID, new DataCounts(JOB_ID), usageReporter,
jobDataCountsPersister)) {
statusReporter.startNewIncrementalCount(); statusReporter.startNewIncrementalCount();
statusReporter.reportLatestTimeIncrementalStats(5001L); statusReporter.reportLatestTimeIncrementalStats(5001L);
assertEquals(5001L, statusReporter.incrementalStats().getLatestRecordTimeStamp().getTime()); assertEquals(5001L, statusReporter.incrementalStats().getLatestRecordTimeStamp().getTime());
} }
}
public void testReportRecordsWritten() { public void testReportRecordsWritten() {
try (StatusReporter statusReporter = new StatusReporter(threadPool, settings, JOB_ID, new DataCounts(JOB_ID), usageReporter,
jobDataCountsPersister)) {
statusReporter.setAnalysedFieldsPerRecord(3); statusReporter.setAnalysedFieldsPerRecord(3);
statusReporter.reportRecordWritten(5, 2000); statusReporter.reportRecordWritten(5, 2000);
@ -124,88 +159,100 @@ public class StatusReporterTests extends ESTestCase {
verify(jobDataCountsPersister, never()).persistDataCounts(anyString(), any(DataCounts.class)); verify(jobDataCountsPersister, never()).persistDataCounts(anyString(), any(DataCounts.class));
} }
}
public void testReportRecordsWritten_Given100Records() { public void testReportRecordsWritten_Given100Records() {
DummyStatusReporter statusReporter = new DummyStatusReporter(usageReporter);
statusReporter.setAnalysedFieldsPerRecord(3); statusReporter.setAnalysedFieldsPerRecord(3);
for (int i = 1; i <= 100; i++) { for (int i = 1; i <= 101; i++) {
statusReporter.reportRecordWritten(5, i); statusReporter.reportRecordWritten(5, i);
} }
assertEquals(100, statusReporter.incrementalStats().getInputRecordCount()); assertEquals(101, statusReporter.incrementalStats().getInputRecordCount());
assertEquals(500, statusReporter.incrementalStats().getInputFieldCount()); assertEquals(505, statusReporter.incrementalStats().getInputFieldCount());
assertEquals(100, statusReporter.incrementalStats().getProcessedRecordCount()); assertEquals(101, statusReporter.incrementalStats().getProcessedRecordCount());
assertEquals(300, statusReporter.incrementalStats().getProcessedFieldCount()); assertEquals(303, statusReporter.incrementalStats().getProcessedFieldCount());
assertEquals(100, statusReporter.incrementalStats().getLatestRecordTimeStamp().getTime()); assertEquals(101, statusReporter.incrementalStats().getLatestRecordTimeStamp().getTime());
verify(jobDataCountsPersister, times(1)).persistDataCounts(anyString(), any(DataCounts.class));
assertEquals(1, statusReporter.getLogStatusCallCount());
} }
public void testReportRecordsWritten_Given1000Records() { public void testReportRecordsWritten_Given1000Records() {
DummyStatusReporter statusReporter = new DummyStatusReporter(usageReporter);
statusReporter.setAnalysedFieldsPerRecord(3); statusReporter.setAnalysedFieldsPerRecord(3);
for (int i = 1; i <= 1000; i++) { for (int i = 1; i <= 1001; i++) {
statusReporter.reportRecordWritten(5, i); statusReporter.reportRecordWritten(5, i);
} }
assertEquals(1000, statusReporter.incrementalStats().getInputRecordCount()); assertEquals(1001, statusReporter.incrementalStats().getInputRecordCount());
assertEquals(5000, statusReporter.incrementalStats().getInputFieldCount()); assertEquals(5005, statusReporter.incrementalStats().getInputFieldCount());
assertEquals(1000, statusReporter.incrementalStats().getProcessedRecordCount()); assertEquals(1001, statusReporter.incrementalStats().getProcessedRecordCount());
assertEquals(3000, statusReporter.incrementalStats().getProcessedFieldCount()); assertEquals(3003, statusReporter.incrementalStats().getProcessedFieldCount());
assertEquals(1000, statusReporter.incrementalStats().getLatestRecordTimeStamp().getTime()); assertEquals(1001, statusReporter.incrementalStats().getLatestRecordTimeStamp().getTime());
assertEquals(10, statusReporter.getLogStatusCallCount());
verify(jobDataCountsPersister, times(10)).persistDataCounts(anyString(), any(DataCounts.class));
} }
public void testReportRecordsWritten_Given2000Records() { public void testReportRecordsWritten_Given2000Records() {
DummyStatusReporter statusReporter = new DummyStatusReporter(usageReporter);
statusReporter.setAnalysedFieldsPerRecord(3); statusReporter.setAnalysedFieldsPerRecord(3);
for (int i = 1; i <= 2000; i++) { for (int i = 1; i <= 2001; i++) {
statusReporter.reportRecordWritten(5, i); statusReporter.reportRecordWritten(5, i);
} }
assertEquals(2000, statusReporter.incrementalStats().getInputRecordCount()); assertEquals(2001, statusReporter.incrementalStats().getInputRecordCount());
assertEquals(10000, statusReporter.incrementalStats().getInputFieldCount()); assertEquals(10005, statusReporter.incrementalStats().getInputFieldCount());
assertEquals(2000, statusReporter.incrementalStats().getProcessedRecordCount()); assertEquals(2001, statusReporter.incrementalStats().getProcessedRecordCount());
assertEquals(6000, statusReporter.incrementalStats().getProcessedFieldCount()); assertEquals(6003, statusReporter.incrementalStats().getProcessedFieldCount());
assertEquals(2000, statusReporter.incrementalStats().getLatestRecordTimeStamp().getTime()); assertEquals(2001, statusReporter.incrementalStats().getLatestRecordTimeStamp().getTime());
verify(jobDataCountsPersister, times(11)).persistDataCounts(anyString(), any(DataCounts.class)); assertEquals(11, statusReporter.getLogStatusCallCount());
} }
public void testReportRecordsWritten_Given20000Records() { public void testReportRecordsWritten_Given20000Records() {
DummyStatusReporter statusReporter = new DummyStatusReporter(usageReporter);
statusReporter.setAnalysedFieldsPerRecord(3); statusReporter.setAnalysedFieldsPerRecord(3);
for (int i = 1; i <= 20000; i++) { for (int i = 1; i <= 20001; i++) {
statusReporter.reportRecordWritten(5, i); statusReporter.reportRecordWritten(5, i);
} }
assertEquals(20000, statusReporter.incrementalStats().getInputRecordCount()); assertEquals(20001, statusReporter.incrementalStats().getInputRecordCount());
assertEquals(100000, statusReporter.incrementalStats().getInputFieldCount()); assertEquals(100005, statusReporter.incrementalStats().getInputFieldCount());
assertEquals(20000, statusReporter.incrementalStats().getProcessedRecordCount()); assertEquals(20001, statusReporter.incrementalStats().getProcessedRecordCount());
assertEquals(60000, statusReporter.incrementalStats().getProcessedFieldCount()); assertEquals(60003, statusReporter.incrementalStats().getProcessedFieldCount());
assertEquals(20000, statusReporter.incrementalStats().getLatestRecordTimeStamp().getTime()); assertEquals(20001, statusReporter.incrementalStats().getLatestRecordTimeStamp().getTime());
verify(jobDataCountsPersister, times(29)).persistDataCounts(anyString(), any(DataCounts.class)); assertEquals(29, statusReporter.getLogStatusCallCount());
} }
public void testReportRecordsWritten_Given30000Records() { public void testReportRecordsWritten_Given30000Records() {
DummyStatusReporter statusReporter = new DummyStatusReporter(usageReporter);
statusReporter.setAnalysedFieldsPerRecord(3); statusReporter.setAnalysedFieldsPerRecord(3);
for (int i = 1; i <= 30000; i++) { for (int i = 1; i <= 30001; i++) {
statusReporter.reportRecordWritten(5, i); statusReporter.reportRecordWritten(5, i);
} }
assertEquals(30000, statusReporter.incrementalStats().getInputRecordCount()); assertEquals(30001, statusReporter.incrementalStats().getInputRecordCount());
assertEquals(150000, statusReporter.incrementalStats().getInputFieldCount()); assertEquals(150005, statusReporter.incrementalStats().getInputFieldCount());
assertEquals(30000, statusReporter.incrementalStats().getProcessedRecordCount()); assertEquals(30001, statusReporter.incrementalStats().getProcessedRecordCount());
assertEquals(90000, statusReporter.incrementalStats().getProcessedFieldCount()); assertEquals(90003, statusReporter.incrementalStats().getProcessedFieldCount());
assertEquals(30000, statusReporter.incrementalStats().getLatestRecordTimeStamp().getTime()); assertEquals(30001, statusReporter.incrementalStats().getLatestRecordTimeStamp().getTime());
verify(jobDataCountsPersister, times(30)).persistDataCounts(anyString(), any(DataCounts.class)); assertEquals(30, statusReporter.getLogStatusCallCount());
} }
public void testFinishReporting() { public void testFinishReporting() {
try (StatusReporter statusReporter = new StatusReporter(threadPool, settings, JOB_ID, new DataCounts(JOB_ID), usageReporter,
jobDataCountsPersister)) {
statusReporter.setAnalysedFieldsPerRecord(3); statusReporter.setAnalysedFieldsPerRecord(3);
DataCounts dc = new DataCounts(JOB_ID, 2L, 5L, 0L, 10L, 0L, 1L, 0L, new Date(2000), new Date(3000)); DataCounts dc = new DataCounts(JOB_ID, 2L, 5L, 0L, 10L, 0L, 1L, 0L, new Date(2000), new Date(3000));
@ -216,9 +263,53 @@ public class StatusReporterTests extends ESTestCase {
Mockito.verify(usageReporter, Mockito.times(1)).reportUsage(); Mockito.verify(usageReporter, Mockito.times(1)).reportUsage();
Mockito.verify(jobDataCountsPersister, Mockito.times(1)).persistDataCounts(eq("SR"), eq(dc)); Mockito.verify(jobDataCountsPersister, Mockito.times(1)).persistDataCounts(eq("SR"), eq(dc));
assertEquals(dc, statusReporter.incrementalStats()); assertEquals(dc, statusReporter.incrementalStats());
} }
}
public void testPersistenceTimeOut() {
ThreadPool mockThreadPool = Mockito.mock(ThreadPool.class);
ArgumentCaptor<Runnable> argumentCaptor = ArgumentCaptor.forClass(Runnable.class);
when(mockThreadPool.scheduleWithFixedDelay(argumentCaptor.capture(), any(), any())).thenReturn(new ThreadPool.Cancellable() {
@Override
public void cancel() {
}
@Override
public boolean isCancelled() {
return false;
}
});
ExecutorService executorService = mock(ExecutorService.class);
ArgumentCaptor<Runnable> persistTaskCapture = ArgumentCaptor.forClass(Runnable.class);
when(executorService.submit(persistTaskCapture.capture())).thenReturn(null);
when(mockThreadPool.generic()).thenReturn(executorService);
try (StatusReporter statusReporter = new StatusReporter(mockThreadPool, settings, JOB_ID, new DataCounts(JOB_ID), usageReporter,
jobDataCountsPersister)) {
statusReporter.setAnalysedFieldsPerRecord(3);
statusReporter.reportRecordWritten(5, 2000);
statusReporter.reportRecordWritten(5, 3000);
Mockito.verify(jobDataCountsPersister, Mockito.times(0)).persistDataCounts(eq("SR"), any());
argumentCaptor.getValue().run();
statusReporter.reportRecordWritten(5, 4000);
DataCounts dc = new DataCounts(JOB_ID, 2L, 6L, 0L, 10L, 0L, 0L, 0L, new Date(2000), new Date(4000));
// verify threadpool executor service to do the persistence is launched
Mockito.verify(mockThreadPool, Mockito.times(1)).generic();
// run the captured persist task
persistTaskCapture.getValue().run();
Mockito.verify(jobDataCountsPersister, Mockito.times(1)).persistDataCounts(eq("SR"), any());
}
}
private void assertAllCountFieldsEqualZero(DataCounts stats) throws Exception { private void assertAllCountFieldsEqualZero(DataCounts stats) throws Exception {
assertEquals(0L, stats.getProcessedRecordCount()); assertEquals(0L, stats.getProcessedRecordCount());