[ML] Remove periodic persistence of datacounts (elastic/x-pack-elasticsearch#944)

Original commit: elastic/x-pack-elasticsearch@756f06d316
This commit is contained in:
David Kyle 2017-04-04 14:45:15 +01:00 committed by GitHub
parent eb79be392c
commit 778e960d3e
5 changed files with 197 additions and 292 deletions

View File

@ -12,12 +12,10 @@ import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts;
import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.persistence.JobDataCountsPersister;
import java.io.Closeable;
import java.util.Date;
import java.util.Locale;
import java.util.function.Function;
@ -37,12 +35,8 @@ import java.util.function.Function;
* function depending on which reporting stage is the current, the function
* changes when each of the reporting stages are passed. If the
* function returns {@code true} the usage is logged.
*
* DataCounts are persisted periodically in a datafeed task via
* {@linkplain JobDataCountsPersister}, {@link #close()} must be called to
* cancel the datafeed task.
*/
public class DataCountsReporter extends AbstractComponent implements Closeable {
public class DataCountsReporter extends AbstractComponent {
/**
* The max percentage of date parse errors allowed before
* an exception is thrown.
@ -76,13 +70,9 @@ public class DataCountsReporter extends AbstractComponent implements Closeable {
private Function<Long, Boolean> reportingBoundaryFunction;
private volatile boolean persistDataCountsOnNextRecord;
private final ThreadPool.Cancellable persistDataCountsDatafeedAction;
private DataStreamDiagnostics diagnostics;
public DataCountsReporter(ThreadPool threadPool, Settings settings, Job job, DataCounts counts,
JobDataCountsPersister dataCountsPersister) {
public DataCountsReporter(Settings settings, Job job, DataCounts counts, JobDataCountsPersister dataCountsPersister) {
super(settings);
@ -97,9 +87,6 @@ public class DataCountsReporter extends AbstractComponent implements Closeable {
acceptablePercentOutOfOrderErrors = ACCEPTABLE_PERCENTAGE_OUT_OF_ORDER_ERRORS_SETTING.get(settings);
reportingBoundaryFunction = this::reportEvery100Records;
persistDataCountsDatafeedAction = threadPool.scheduleWithFixedDelay(() -> persistDataCountsOnNextRecord = true,
PERSIST_INTERVAL, ThreadPool.Names.GENERIC);
}
/**
@ -135,14 +122,6 @@ public class DataCountsReporter extends AbstractComponent implements Closeable {
logStatus(totalRecords);
}
if (persistDataCountsOnNextRecord) {
retrieveDiagnosticsIntermediateResults();
DataCounts copy = new DataCounts(runningTotalStats());
dataCountsPersister.persistDataCounts(job.getId(), copy, new LoggingActionListener());
persistDataCountsOnNextRecord = false;
}
diagnostics.checkRecord(recordTimeMs);
}
@ -374,11 +353,6 @@ public class DataCountsReporter extends AbstractComponent implements Closeable {
return totalRecordStats;
}
@Override
public void close() {
persistDataCountsDatafeedAction.cancel();
}
private void retrieveDiagnosticsIntermediateResults() {
totalRecordStats.incrementBucketCount(diagnostics.getEmptyBucketCount());
totalRecordStats.incrementBucketCount(diagnostics.getBucketCount());

View File

@ -106,7 +106,6 @@ public class AutodetectCommunicator implements Closeable {
public void close(boolean restart, String reason) throws IOException {
Future<?> future = autodetectWorkerExecutor.submit(() -> {
checkProcessIsAlive();
dataCountsReporter.close();
autodetectProcess.close();
autoDetectResultProcessor.awaitCompletion();
handler.accept(restart ? new ElasticsearchException(reason) : null);
@ -123,7 +122,6 @@ public class AutodetectCommunicator implements Closeable {
}
}
public void writeUpdateProcessMessage(ModelPlotConfig config, List<JobUpdate.DetectorUpdate> updates,
BiConsumer<Void, Exception> handler) throws IOException {
submitOperation(() -> {

View File

@ -270,37 +270,37 @@ public class AutodetectProcessManager extends AbstractComponent {
Job job = jobManager.getJobOrThrowIfUnknown(jobId);
// A TP with no queue, so that we fail immediately if there are no threads available
ExecutorService autoDetectExecutorService = threadPool.executor(MachineLearning.AUTODETECT_THREAD_POOL_NAME);
try (DataCountsReporter dataCountsReporter = new DataCountsReporter(threadPool, settings, job,
autodetectParams.dataCounts(), jobDataCountsPersister)) {
ScoresUpdater scoresUpdater = new ScoresUpdater(job, jobProvider, new JobRenormalizedResultsPersister(settings, client),
normalizerFactory);
ExecutorService renormalizerExecutorService = threadPool.executor(MachineLearning.NORMALIZER_THREAD_POOL_NAME);
Renormalizer renormalizer = new ShortCircuitingRenormalizer(jobId, scoresUpdater,
renormalizerExecutorService, job.getAnalysisConfig().getUsePerPartitionNormalization());
DataCountsReporter dataCountsReporter = new DataCountsReporter(settings, job, autodetectParams.dataCounts(),
jobDataCountsPersister);
ScoresUpdater scoresUpdater = new ScoresUpdater(job, jobProvider, new JobRenormalizedResultsPersister(settings, client),
normalizerFactory);
ExecutorService renormalizerExecutorService = threadPool.executor(MachineLearning.NORMALIZER_THREAD_POOL_NAME);
Renormalizer renormalizer = new ShortCircuitingRenormalizer(jobId, scoresUpdater,
renormalizerExecutorService, job.getAnalysisConfig().getUsePerPartitionNormalization());
AutodetectProcess process = autodetectProcessFactory.createAutodetectProcess(job, autodetectParams.modelSnapshot(),
autodetectParams.quantiles(), autodetectParams.filters(), ignoreDowntime,
autoDetectExecutorService, () -> setJobState(jobTask, JobState.FAILED));
boolean usePerPartitionNormalization = job.getAnalysisConfig().getUsePerPartitionNormalization();
AutoDetectResultProcessor processor = new AutoDetectResultProcessor(
client, jobId, renormalizer, jobResultsPersister, autodetectParams.modelSizeStats());
ExecutorService autodetectWorkerExecutor;
AutodetectProcess process = autodetectProcessFactory.createAutodetectProcess(job, autodetectParams.modelSnapshot(),
autodetectParams.quantiles(), autodetectParams.filters(), ignoreDowntime,
autoDetectExecutorService, () -> setJobState(jobTask, JobState.FAILED));
boolean usePerPartitionNormalization = job.getAnalysisConfig().getUsePerPartitionNormalization();
AutoDetectResultProcessor processor = new AutoDetectResultProcessor(
client, jobId, renormalizer, jobResultsPersister, autodetectParams.modelSizeStats());
ExecutorService autodetectWorkerExecutor;
try {
autodetectWorkerExecutor = createAutodetectExecutorService(autoDetectExecutorService);
autoDetectExecutorService.submit(() -> processor.process(process, usePerPartitionNormalization));
} catch (EsRejectedExecutionException e) {
// If submitting the operation to read the results from the process fails we need to close
// the process too, so that other submitted operations to threadpool are stopped.
try {
autodetectWorkerExecutor = createAutodetectExecutorService(autoDetectExecutorService);
autoDetectExecutorService.submit(() -> processor.process(process, usePerPartitionNormalization));
} catch (EsRejectedExecutionException e) {
// If submitting the operation to read the results from the process fails we need to close
// the process too, so that other submitted operations to threadpool are stopped.
try {
IOUtils.close(process);
} catch (IOException ioe) {
logger.error("Can't close autodetect", ioe);
}
throw e;
IOUtils.close(process);
} catch (IOException ioe) {
logger.error("Can't close autodetect", ioe);
}
return new AutodetectCommunicator(job, process, dataCountsReporter, processor,
handler, xContentRegistry, autodetectWorkerExecutor);
throw e;
}
return new AutodetectCommunicator(job, process, dataCountsReporter, processor,
handler, xContentRegistry, autodetectWorkerExecutor);
}
/**

View File

@ -9,14 +9,12 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.env.Environment;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.ml.job.config.AnalysisConfig;
import org.elasticsearch.xpack.ml.job.config.Detector;
import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.persistence.JobDataCountsPersister;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts;
import org.junit.Before;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
import java.io.IOException;
@ -29,24 +27,22 @@ import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
public class DataCountsReporterTests extends ESTestCase {
private static final int MAX_PERCENT_DATE_PARSE_ERRORS = 40;
private static final int MAX_PERCENT_OUT_OF_ORDER_ERRORS = 30;
private Job job;
private JobDataCountsPersister jobDataCountsPersister;
private ThreadPool threadPool;
private Settings settings;
@Before
public void setUpMocks() {
settings = Settings.builder().put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString())
.put(DataCountsReporter.ACCEPTABLE_PERCENTAGE_DATE_PARSE_ERRORS_SETTING.getKey(), MAX_PERCENT_DATE_PARSE_ERRORS)
.put(DataCountsReporter.ACCEPTABLE_PERCENTAGE_OUT_OF_ORDER_ERRORS_SETTING.getKey(), MAX_PERCENT_OUT_OF_ORDER_ERRORS)
.build();
AnalysisConfig.Builder acBuilder = new AnalysisConfig.Builder(Arrays.asList(new Detector.Builder("metric", "field").build()));
acBuilder.setBucketSpan(TimeValue.timeValueSeconds(300));
acBuilder.setLatency(TimeValue.ZERO);
@ -55,292 +51,235 @@ public class DataCountsReporterTests extends ESTestCase {
Job.Builder builder = new Job.Builder("sr");
builder.setAnalysisConfig(acBuilder);
job = builder.build(new Date());
jobDataCountsPersister = Mockito.mock(JobDataCountsPersister.class);
threadPool = Mockito.mock(ThreadPool.class);
when(threadPool.scheduleWithFixedDelay(any(Runnable.class), any(), any())).thenReturn(new ThreadPool.Cancellable() {
@Override
public void cancel() {
}
@Override
public boolean isCancelled() {
return false;
}
});
}
public void testSettingAcceptablePercentages() throws IOException {
try (DataCountsReporter dataCountsReporter = new DataCountsReporter(threadPool, settings, job, new DataCounts(job.getId()),
jobDataCountsPersister)) {
assertEquals(dataCountsReporter.getAcceptablePercentDateParseErrors(), MAX_PERCENT_DATE_PARSE_ERRORS);
assertEquals(dataCountsReporter.getAcceptablePercentOutOfOrderErrors(), MAX_PERCENT_OUT_OF_ORDER_ERRORS);
}
DataCountsReporter dataCountsReporter = new DataCountsReporter(settings, job, new DataCounts(job.getId()),
jobDataCountsPersister);
assertEquals(dataCountsReporter.getAcceptablePercentDateParseErrors(), MAX_PERCENT_DATE_PARSE_ERRORS);
assertEquals(dataCountsReporter.getAcceptablePercentOutOfOrderErrors(), MAX_PERCENT_OUT_OF_ORDER_ERRORS);
}
public void testSimpleConstructor() throws Exception {
try (DataCountsReporter dataCountsReporter = new DataCountsReporter(threadPool, settings, job, new DataCounts(job.getId()),
jobDataCountsPersister)) {
DataCounts stats = dataCountsReporter.incrementalStats();
assertNotNull(stats);
assertAllCountFieldsEqualZero(stats);
}
DataCountsReporter dataCountsReporter = new DataCountsReporter(settings, job, new DataCounts(job.getId()),
jobDataCountsPersister);
DataCounts stats = dataCountsReporter.incrementalStats();
assertNotNull(stats);
assertAllCountFieldsEqualZero(stats);
}
public void testComplexConstructor() throws Exception {
DataCounts counts = new DataCounts("foo", 1L, 1L, 2L, 0L, 3L, 4L, 5L, 6L, 7L, 8L,
DataCounts counts = new DataCounts("foo", 1L, 1L, 2L, 0L, 3L, 4L, 5L, 6L, 7L, 8L,
new Date(), new Date(), new Date(), new Date(), new Date());
try (DataCountsReporter dataCountsReporter =
new DataCountsReporter(threadPool, settings, job, counts, jobDataCountsPersister)) {
DataCounts stats = dataCountsReporter.incrementalStats();
assertNotNull(stats);
assertAllCountFieldsEqualZero(stats);
DataCountsReporter dataCountsReporter =
new DataCountsReporter(settings, job, counts, jobDataCountsPersister);
DataCounts stats = dataCountsReporter.incrementalStats();
assertNotNull(stats);
assertAllCountFieldsEqualZero(stats);
assertEquals(1, dataCountsReporter.getProcessedRecordCount());
assertEquals(2, dataCountsReporter.getBytesRead());
assertEquals(3, dataCountsReporter.getDateParseErrorsCount());
assertEquals(4, dataCountsReporter.getMissingFieldErrorCount());
assertEquals(5, dataCountsReporter.getOutOfOrderRecordCount());
assertEquals(6, dataCountsReporter.getEmptyBucketCount());
assertEquals(7, dataCountsReporter.getSparseBucketCount());
assertEquals(8, dataCountsReporter.getBucketCount());
assertNull(stats.getEarliestRecordTimeStamp());
}
assertEquals(1, dataCountsReporter.getProcessedRecordCount());
assertEquals(2, dataCountsReporter.getBytesRead());
assertEquals(3, dataCountsReporter.getDateParseErrorsCount());
assertEquals(4, dataCountsReporter.getMissingFieldErrorCount());
assertEquals(5, dataCountsReporter.getOutOfOrderRecordCount());
assertEquals(6, dataCountsReporter.getEmptyBucketCount());
assertEquals(7, dataCountsReporter.getSparseBucketCount());
assertEquals(8, dataCountsReporter.getBucketCount());
assertNull(stats.getEarliestRecordTimeStamp());
}
public void testResetIncrementalCounts() throws Exception {
try (DataCountsReporter dataCountsReporter = new DataCountsReporter(threadPool, settings, job, new DataCounts(job.getId()),
jobDataCountsPersister)) {
DataCounts stats = dataCountsReporter.incrementalStats();
assertNotNull(stats);
assertAllCountFieldsEqualZero(stats);
DataCountsReporter dataCountsReporter = new DataCountsReporter(settings, job, new DataCounts(job.getId()),
jobDataCountsPersister);
DataCounts stats = dataCountsReporter.incrementalStats();
assertNotNull(stats);
assertAllCountFieldsEqualZero(stats);
dataCountsReporter.setAnalysedFieldsPerRecord(3);
dataCountsReporter.setAnalysedFieldsPerRecord(3);
dataCountsReporter.reportRecordWritten(5, 1000);
dataCountsReporter.reportRecordWritten(5, 1000);
assertEquals(2, dataCountsReporter.incrementalStats().getInputRecordCount());
assertEquals(10, dataCountsReporter.incrementalStats().getInputFieldCount());
assertEquals(2, dataCountsReporter.incrementalStats().getProcessedRecordCount());
assertEquals(6, dataCountsReporter.incrementalStats().getProcessedFieldCount());
assertEquals(1000, dataCountsReporter.incrementalStats().getLatestRecordTimeStamp().getTime());
dataCountsReporter.reportRecordWritten(5, 1000);
dataCountsReporter.reportRecordWritten(5, 1000);
assertEquals(2, dataCountsReporter.incrementalStats().getInputRecordCount());
assertEquals(10, dataCountsReporter.incrementalStats().getInputFieldCount());
assertEquals(2, dataCountsReporter.incrementalStats().getProcessedRecordCount());
assertEquals(6, dataCountsReporter.incrementalStats().getProcessedFieldCount());
assertEquals(1000, dataCountsReporter.incrementalStats().getLatestRecordTimeStamp().getTime());
assertEquals(dataCountsReporter.incrementalStats(), dataCountsReporter.runningTotalStats());
assertEquals(dataCountsReporter.incrementalStats(), dataCountsReporter.runningTotalStats());
dataCountsReporter.startNewIncrementalCount();
stats = dataCountsReporter.incrementalStats();
assertNotNull(stats);
assertAllCountFieldsEqualZero(stats);
dataCountsReporter.startNewIncrementalCount();
stats = dataCountsReporter.incrementalStats();
assertNotNull(stats);
assertAllCountFieldsEqualZero(stats);
// write some more data
dataCountsReporter.reportRecordWritten(5, 302000);
dataCountsReporter.reportRecordWritten(5, 302000);
assertEquals(2, dataCountsReporter.incrementalStats().getInputRecordCount());
assertEquals(10, dataCountsReporter.incrementalStats().getInputFieldCount());
assertEquals(2, dataCountsReporter.incrementalStats().getProcessedRecordCount());
assertEquals(6, dataCountsReporter.incrementalStats().getProcessedFieldCount());
assertEquals(302000, dataCountsReporter.incrementalStats().getLatestRecordTimeStamp().getTime());
// write some more data
dataCountsReporter.reportRecordWritten(5, 302000);
dataCountsReporter.reportRecordWritten(5, 302000);
assertEquals(2, dataCountsReporter.incrementalStats().getInputRecordCount());
assertEquals(10, dataCountsReporter.incrementalStats().getInputFieldCount());
assertEquals(2, dataCountsReporter.incrementalStats().getProcessedRecordCount());
assertEquals(6, dataCountsReporter.incrementalStats().getProcessedFieldCount());
assertEquals(302000, dataCountsReporter.incrementalStats().getLatestRecordTimeStamp().getTime());
// check total stats
assertEquals(4, dataCountsReporter.runningTotalStats().getInputRecordCount());
assertEquals(20, dataCountsReporter.runningTotalStats().getInputFieldCount());
assertEquals(4, dataCountsReporter.runningTotalStats().getProcessedRecordCount());
assertEquals(12, dataCountsReporter.runningTotalStats().getProcessedFieldCount());
assertEquals(302000, dataCountsReporter.runningTotalStats().getLatestRecordTimeStamp().getTime());
// check total stats
assertEquals(4, dataCountsReporter.runningTotalStats().getInputRecordCount());
assertEquals(20, dataCountsReporter.runningTotalStats().getInputFieldCount());
assertEquals(4, dataCountsReporter.runningTotalStats().getProcessedRecordCount());
assertEquals(12, dataCountsReporter.runningTotalStats().getProcessedFieldCount());
assertEquals(302000, dataCountsReporter.runningTotalStats().getLatestRecordTimeStamp().getTime());
// send 'flush' signal
dataCountsReporter.finishReporting();
assertEquals(2, dataCountsReporter.runningTotalStats().getBucketCount());
assertEquals(0, dataCountsReporter.runningTotalStats().getEmptyBucketCount());
assertEquals(0, dataCountsReporter.runningTotalStats().getSparseBucketCount());
}
// send 'flush' signal
dataCountsReporter.finishReporting();
assertEquals(2, dataCountsReporter.runningTotalStats().getBucketCount());
assertEquals(0, dataCountsReporter.runningTotalStats().getEmptyBucketCount());
assertEquals(0, dataCountsReporter.runningTotalStats().getSparseBucketCount());
}
public void testReportLatestTimeIncrementalStats() throws IOException {
try (DataCountsReporter dataCountsReporter = new DataCountsReporter(threadPool, settings, job, new DataCounts(job.getId()),
jobDataCountsPersister)) {
dataCountsReporter.startNewIncrementalCount();
dataCountsReporter.reportLatestTimeIncrementalStats(5001L);
assertEquals(5001L, dataCountsReporter.incrementalStats().getLatestRecordTimeStamp().getTime());
}
DataCountsReporter dataCountsReporter = new DataCountsReporter(settings, job, new DataCounts(job.getId()),
jobDataCountsPersister);
dataCountsReporter.startNewIncrementalCount();
dataCountsReporter.reportLatestTimeIncrementalStats(5001L);
assertEquals(5001L, dataCountsReporter.incrementalStats().getLatestRecordTimeStamp().getTime());
}
public void testReportRecordsWritten() {
try (DataCountsReporter dataCountsReporter = new DataCountsReporter(threadPool, settings, job, new DataCounts(job.getId()),
jobDataCountsPersister)) {
dataCountsReporter.setAnalysedFieldsPerRecord(3);
DataCountsReporter dataCountsReporter = new DataCountsReporter(settings, job, new DataCounts(job.getId()),
jobDataCountsPersister);
dataCountsReporter.setAnalysedFieldsPerRecord(3);
dataCountsReporter.reportRecordWritten(5, 2000);
assertEquals(1, dataCountsReporter.incrementalStats().getInputRecordCount());
assertEquals(5, dataCountsReporter.incrementalStats().getInputFieldCount());
assertEquals(1, dataCountsReporter.incrementalStats().getProcessedRecordCount());
assertEquals(3, dataCountsReporter.incrementalStats().getProcessedFieldCount());
assertEquals(2000, dataCountsReporter.incrementalStats().getLatestRecordTimeStamp().getTime());
dataCountsReporter.reportRecordWritten(5, 2000);
assertEquals(1, dataCountsReporter.incrementalStats().getInputRecordCount());
assertEquals(5, dataCountsReporter.incrementalStats().getInputFieldCount());
assertEquals(1, dataCountsReporter.incrementalStats().getProcessedRecordCount());
assertEquals(3, dataCountsReporter.incrementalStats().getProcessedFieldCount());
assertEquals(2000, dataCountsReporter.incrementalStats().getLatestRecordTimeStamp().getTime());
dataCountsReporter.reportRecordWritten(5, 3000);
dataCountsReporter.reportMissingField();
assertEquals(2, dataCountsReporter.incrementalStats().getInputRecordCount());
assertEquals(10, dataCountsReporter.incrementalStats().getInputFieldCount());
assertEquals(2, dataCountsReporter.incrementalStats().getProcessedRecordCount());
assertEquals(5, dataCountsReporter.incrementalStats().getProcessedFieldCount());
assertEquals(3000, dataCountsReporter.incrementalStats().getLatestRecordTimeStamp().getTime());
dataCountsReporter.reportRecordWritten(5, 3000);
dataCountsReporter.reportMissingField();
assertEquals(2, dataCountsReporter.incrementalStats().getInputRecordCount());
assertEquals(10, dataCountsReporter.incrementalStats().getInputFieldCount());
assertEquals(2, dataCountsReporter.incrementalStats().getProcessedRecordCount());
assertEquals(5, dataCountsReporter.incrementalStats().getProcessedFieldCount());
assertEquals(3000, dataCountsReporter.incrementalStats().getLatestRecordTimeStamp().getTime());
assertEquals(dataCountsReporter.incrementalStats(), dataCountsReporter.runningTotalStats());
assertEquals(dataCountsReporter.incrementalStats(), dataCountsReporter.runningTotalStats());
verify(jobDataCountsPersister, never()).persistDataCounts(anyString(), any(DataCounts.class), any());
}
verify(jobDataCountsPersister, never()).persistDataCounts(anyString(), any(DataCounts.class), any());
}
public void testReportRecordsWritten_Given100Records() {
try (DummyDataCountsReporter dataCountsReporter = new DummyDataCountsReporter()) {
dataCountsReporter.setAnalysedFieldsPerRecord(3);
DummyDataCountsReporter dataCountsReporter = new DummyDataCountsReporter();
dataCountsReporter.setAnalysedFieldsPerRecord(3);
for (int i = 1; i <= 101; i++) {
dataCountsReporter.reportRecordWritten(5, i);
}
assertEquals(101, dataCountsReporter.incrementalStats().getInputRecordCount());
assertEquals(505, dataCountsReporter.incrementalStats().getInputFieldCount());
assertEquals(101, dataCountsReporter.incrementalStats().getProcessedRecordCount());
assertEquals(303, dataCountsReporter.incrementalStats().getProcessedFieldCount());
assertEquals(101, dataCountsReporter.incrementalStats().getLatestRecordTimeStamp().getTime());
assertEquals(1, dataCountsReporter.getLogStatusCallCount());
for (int i = 1; i <= 101; i++) {
dataCountsReporter.reportRecordWritten(5, i);
}
assertEquals(101, dataCountsReporter.incrementalStats().getInputRecordCount());
assertEquals(505, dataCountsReporter.incrementalStats().getInputFieldCount());
assertEquals(101, dataCountsReporter.incrementalStats().getProcessedRecordCount());
assertEquals(303, dataCountsReporter.incrementalStats().getProcessedFieldCount());
assertEquals(101, dataCountsReporter.incrementalStats().getLatestRecordTimeStamp().getTime());
assertEquals(1, dataCountsReporter.getLogStatusCallCount());
}
public void testReportRecordsWritten_Given1000Records() {
try (DummyDataCountsReporter dataCountsReporter = new DummyDataCountsReporter()) {
DummyDataCountsReporter dataCountsReporter = new DummyDataCountsReporter();
dataCountsReporter.setAnalysedFieldsPerRecord(3);
dataCountsReporter.setAnalysedFieldsPerRecord(3);
for (int i = 1; i <= 1001; i++) {
dataCountsReporter.reportRecordWritten(5, i);
}
assertEquals(1001, dataCountsReporter.incrementalStats().getInputRecordCount());
assertEquals(5005, dataCountsReporter.incrementalStats().getInputFieldCount());
assertEquals(1001, dataCountsReporter.incrementalStats().getProcessedRecordCount());
assertEquals(3003, dataCountsReporter.incrementalStats().getProcessedFieldCount());
assertEquals(1001, dataCountsReporter.incrementalStats().getLatestRecordTimeStamp().getTime());
assertEquals(10, dataCountsReporter.getLogStatusCallCount());
for (int i = 1; i <= 1001; i++) {
dataCountsReporter.reportRecordWritten(5, i);
}
assertEquals(1001, dataCountsReporter.incrementalStats().getInputRecordCount());
assertEquals(5005, dataCountsReporter.incrementalStats().getInputFieldCount());
assertEquals(1001, dataCountsReporter.incrementalStats().getProcessedRecordCount());
assertEquals(3003, dataCountsReporter.incrementalStats().getProcessedFieldCount());
assertEquals(1001, dataCountsReporter.incrementalStats().getLatestRecordTimeStamp().getTime());
assertEquals(10, dataCountsReporter.getLogStatusCallCount());
}
public void testReportRecordsWritten_Given2000Records() {
try (DummyDataCountsReporter dataCountsReporter = new DummyDataCountsReporter()) {
dataCountsReporter.setAnalysedFieldsPerRecord(3);
DummyDataCountsReporter dataCountsReporter = new DummyDataCountsReporter();
dataCountsReporter.setAnalysedFieldsPerRecord(3);
for (int i = 1; i <= 2001; i++) {
dataCountsReporter.reportRecordWritten(5, i);
}
assertEquals(2001, dataCountsReporter.incrementalStats().getInputRecordCount());
assertEquals(10005, dataCountsReporter.incrementalStats().getInputFieldCount());
assertEquals(2001, dataCountsReporter.incrementalStats().getProcessedRecordCount());
assertEquals(6003, dataCountsReporter.incrementalStats().getProcessedFieldCount());
assertEquals(2001, dataCountsReporter.incrementalStats().getLatestRecordTimeStamp().getTime());
assertEquals(11, dataCountsReporter.getLogStatusCallCount());
for (int i = 1; i <= 2001; i++) {
dataCountsReporter.reportRecordWritten(5, i);
}
assertEquals(2001, dataCountsReporter.incrementalStats().getInputRecordCount());
assertEquals(10005, dataCountsReporter.incrementalStats().getInputFieldCount());
assertEquals(2001, dataCountsReporter.incrementalStats().getProcessedRecordCount());
assertEquals(6003, dataCountsReporter.incrementalStats().getProcessedFieldCount());
assertEquals(2001, dataCountsReporter.incrementalStats().getLatestRecordTimeStamp().getTime());
assertEquals(11, dataCountsReporter.getLogStatusCallCount());
}
public void testReportRecordsWritten_Given20000Records() {
try (DummyDataCountsReporter dataCountsReporter = new DummyDataCountsReporter()) {
dataCountsReporter.setAnalysedFieldsPerRecord(3);
DummyDataCountsReporter dataCountsReporter = new DummyDataCountsReporter();
dataCountsReporter.setAnalysedFieldsPerRecord(3);
for (int i = 1; i <= 20001; i++) {
dataCountsReporter.reportRecordWritten(5, i);
}
assertEquals(20001, dataCountsReporter.incrementalStats().getInputRecordCount());
assertEquals(100005, dataCountsReporter.incrementalStats().getInputFieldCount());
assertEquals(20001, dataCountsReporter.incrementalStats().getProcessedRecordCount());
assertEquals(60003, dataCountsReporter.incrementalStats().getProcessedFieldCount());
assertEquals(20001, dataCountsReporter.incrementalStats().getLatestRecordTimeStamp().getTime());
assertEquals(29, dataCountsReporter.getLogStatusCallCount());
for (int i = 1; i <= 20001; i++) {
dataCountsReporter.reportRecordWritten(5, i);
}
assertEquals(20001, dataCountsReporter.incrementalStats().getInputRecordCount());
assertEquals(100005, dataCountsReporter.incrementalStats().getInputFieldCount());
assertEquals(20001, dataCountsReporter.incrementalStats().getProcessedRecordCount());
assertEquals(60003, dataCountsReporter.incrementalStats().getProcessedFieldCount());
assertEquals(20001, dataCountsReporter.incrementalStats().getLatestRecordTimeStamp().getTime());
assertEquals(29, dataCountsReporter.getLogStatusCallCount());
}
public void testReportRecordsWritten_Given30000Records() {
try (DummyDataCountsReporter dataCountsReporter = new DummyDataCountsReporter()) {
dataCountsReporter.setAnalysedFieldsPerRecord(3);
DummyDataCountsReporter dataCountsReporter = new DummyDataCountsReporter();
dataCountsReporter.setAnalysedFieldsPerRecord(3);
for (int i = 1; i <= 30001; i++) {
dataCountsReporter.reportRecordWritten(5, i);
}
assertEquals(30001, dataCountsReporter.incrementalStats().getInputRecordCount());
assertEquals(150005, dataCountsReporter.incrementalStats().getInputFieldCount());
assertEquals(30001, dataCountsReporter.incrementalStats().getProcessedRecordCount());
assertEquals(90003, dataCountsReporter.incrementalStats().getProcessedFieldCount());
assertEquals(30001, dataCountsReporter.incrementalStats().getLatestRecordTimeStamp().getTime());
assertEquals(30, dataCountsReporter.getLogStatusCallCount());
for (int i = 1; i <= 30001; i++) {
dataCountsReporter.reportRecordWritten(5, i);
}
assertEquals(30001, dataCountsReporter.incrementalStats().getInputRecordCount());
assertEquals(150005, dataCountsReporter.incrementalStats().getInputFieldCount());
assertEquals(30001, dataCountsReporter.incrementalStats().getProcessedRecordCount());
assertEquals(90003, dataCountsReporter.incrementalStats().getProcessedFieldCount());
assertEquals(30001, dataCountsReporter.incrementalStats().getLatestRecordTimeStamp().getTime());
assertEquals(30, dataCountsReporter.getLogStatusCallCount());
}
public void testFinishReporting() {
try (DataCountsReporter dataCountsReporter = new DataCountsReporter(threadPool, settings, job, new DataCounts(job.getId()),
jobDataCountsPersister)) {
DataCountsReporter dataCountsReporter = new DataCountsReporter(settings, job, new DataCounts(job.getId()),
jobDataCountsPersister);
dataCountsReporter.setAnalysedFieldsPerRecord(3);
Date now = new Date();
DataCounts dc = new DataCounts(job.getId(), 2L, 5L, 0L, 10L, 0L, 1L, 0L, 0L, 0L, 1L, new Date(2000), new Date(3000),
now, (Date) null, (Date) null);
dataCountsReporter.reportRecordWritten(5, 2000);
dataCountsReporter.reportRecordWritten(5, 3000);
dataCountsReporter.reportMissingField();
dataCountsReporter.finishReporting();
dataCountsReporter.setAnalysedFieldsPerRecord(3);
Date now = new Date();
DataCounts dc = new DataCounts(job.getId(), 2L, 5L, 0L, 10L, 0L, 1L, 0L, 0L, 0L, 1L, new Date(2000), new Date(3000),
now, (Date) null, (Date) null);
dataCountsReporter.reportRecordWritten(5, 2000);
dataCountsReporter.reportRecordWritten(5, 3000);
dataCountsReporter.reportMissingField();
dataCountsReporter.finishReporting();
long lastReportedTimeMs = dataCountsReporter.incrementalStats().getLastDataTimeStamp().getTime();
// check last data time is equal to now give or take a second
assertTrue(lastReportedTimeMs >= now.getTime()
&& lastReportedTimeMs <= now.getTime() + TimeUnit.SECONDS.toMillis(1));
assertEquals(dataCountsReporter.incrementalStats().getLastDataTimeStamp(),
dataCountsReporter.runningTotalStats().getLastDataTimeStamp());
long lastReportedTimeMs = dataCountsReporter.incrementalStats().getLastDataTimeStamp().getTime();
// check last data time is equal to now give or take a second
assertTrue(lastReportedTimeMs >= now.getTime()
&& lastReportedTimeMs <= now.getTime() + TimeUnit.SECONDS.toMillis(1));
assertEquals(dataCountsReporter.incrementalStats().getLastDataTimeStamp(),
dataCountsReporter.runningTotalStats().getLastDataTimeStamp());
dc.setLastDataTimeStamp(dataCountsReporter.incrementalStats().getLastDataTimeStamp());
Mockito.verify(jobDataCountsPersister, Mockito.times(1)).persistDataCounts(eq("sr"), eq(dc), any());
assertEquals(dc, dataCountsReporter.incrementalStats());
}
}
public void testPersistenceTimeOut() {
ThreadPool mockThreadPool = Mockito.mock(ThreadPool.class);
ArgumentCaptor<Runnable> argumentCaptor = ArgumentCaptor.forClass(Runnable.class);
when(mockThreadPool.scheduleWithFixedDelay(argumentCaptor.capture(), any(), any())).thenReturn(new ThreadPool.Cancellable() {
@Override
public void cancel() {
}
@Override
public boolean isCancelled() {
return false;
}
});
try (DataCountsReporter dataCountsReporter = new DataCountsReporter(mockThreadPool, settings, job, new DataCounts(job.getId()),
jobDataCountsPersister)) {
dataCountsReporter.setAnalysedFieldsPerRecord(3);
dataCountsReporter.reportRecordWritten(5, 2000);
dataCountsReporter.reportRecordWritten(5, 3000);
Mockito.verify(jobDataCountsPersister, Mockito.times(0)).persistDataCounts(eq("sr"), any(), any());
argumentCaptor.getValue().run();
dataCountsReporter.reportRecordWritten(5, 4000);
Mockito.verify(jobDataCountsPersister, Mockito.times(1)).persistDataCounts(eq("sr"), any(), any());
}
dc.setLastDataTimeStamp(dataCountsReporter.incrementalStats().getLastDataTimeStamp());
Mockito.verify(jobDataCountsPersister, Mockito.times(1)).persistDataCounts(eq("sr"), eq(dc), any());
assertEquals(dc, dataCountsReporter.incrementalStats());
}
private void assertAllCountFieldsEqualZero(DataCounts stats) throws Exception {
@ -353,6 +292,6 @@ public class DataCountsReporterTests extends ESTestCase {
assertEquals(0L, stats.getMissingFieldCount());
assertEquals(0L, stats.getOutOfOrderTimeStampCount());
assertEquals(0L, stats.getEmptyBucketCount());
assertEquals(0L, stats.getSparseBucketCount());;
assertEquals(0L, stats.getSparseBucketCount());
}
}

View File

@ -27,7 +27,7 @@ class DummyDataCountsReporter extends DataCountsReporter {
int logStatusCallCount = 0;
DummyDataCountsReporter() {
super(mock(ThreadPool.class), Settings.EMPTY, createJob(), new DataCounts("DummyJobId"),
super(Settings.EMPTY, createJob(), new DataCounts("DummyJobId"),
mock(JobDataCountsPersister.class));
}
@ -42,7 +42,6 @@ class DummyDataCountsReporter extends DataCountsReporter {
++logStatusCallCount;
}
/**
* @return Then number of times {@link #logStatus(long)} was called.
*/
@ -50,11 +49,6 @@ class DummyDataCountsReporter extends DataCountsReporter {
return logStatusCallCount;
}
@Override
public void close() {
// Do nothing
}
private static Job createJob() {
AnalysisConfig.Builder acBuilder = new AnalysisConfig.Builder(
Arrays.asList(new Detector.Builder("metric", "field").build()));