diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/DataCountsReporter.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/DataCountsReporter.java index d03bbaac885..055f01d50b3 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/DataCountsReporter.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/DataCountsReporter.java @@ -12,12 +12,10 @@ import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Setting.Property; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts; import org.elasticsearch.xpack.ml.job.config.Job; import org.elasticsearch.xpack.ml.job.persistence.JobDataCountsPersister; -import java.io.Closeable; import java.util.Date; import java.util.Locale; import java.util.function.Function; @@ -37,12 +35,8 @@ import java.util.function.Function; * function depending on which reporting stage is the current, the function * changes when each of the reporting stages are passed. If the * function returns {@code true} the usage is logged. - * - * DataCounts are persisted periodically in a datafeed task via - * {@linkplain JobDataCountsPersister}, {@link #close()} must be called to - * cancel the datafeed task. */ -public class DataCountsReporter extends AbstractComponent implements Closeable { +public class DataCountsReporter extends AbstractComponent { /** * The max percentage of date parse errors allowed before * an exception is thrown. @@ -76,13 +70,9 @@ public class DataCountsReporter extends AbstractComponent implements Closeable { private Function reportingBoundaryFunction; - private volatile boolean persistDataCountsOnNextRecord; - private final ThreadPool.Cancellable persistDataCountsDatafeedAction; - private DataStreamDiagnostics diagnostics; - public DataCountsReporter(ThreadPool threadPool, Settings settings, Job job, DataCounts counts, - JobDataCountsPersister dataCountsPersister) { + public DataCountsReporter(Settings settings, Job job, DataCounts counts, JobDataCountsPersister dataCountsPersister) { super(settings); @@ -97,9 +87,6 @@ public class DataCountsReporter extends AbstractComponent implements Closeable { acceptablePercentOutOfOrderErrors = ACCEPTABLE_PERCENTAGE_OUT_OF_ORDER_ERRORS_SETTING.get(settings); reportingBoundaryFunction = this::reportEvery100Records; - - persistDataCountsDatafeedAction = threadPool.scheduleWithFixedDelay(() -> persistDataCountsOnNextRecord = true, - PERSIST_INTERVAL, ThreadPool.Names.GENERIC); } /** @@ -135,14 +122,6 @@ public class DataCountsReporter extends AbstractComponent implements Closeable { logStatus(totalRecords); } - if (persistDataCountsOnNextRecord) { - retrieveDiagnosticsIntermediateResults(); - - DataCounts copy = new DataCounts(runningTotalStats()); - dataCountsPersister.persistDataCounts(job.getId(), copy, new LoggingActionListener()); - persistDataCountsOnNextRecord = false; - } - diagnostics.checkRecord(recordTimeMs); } @@ -374,11 +353,6 @@ public class DataCountsReporter extends AbstractComponent implements Closeable { return totalRecordStats; } - @Override - public void close() { - persistDataCountsDatafeedAction.cancel(); - } - private void retrieveDiagnosticsIntermediateResults() { totalRecordStats.incrementBucketCount(diagnostics.getEmptyBucketCount()); totalRecordStats.incrementBucketCount(diagnostics.getBucketCount()); diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicator.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicator.java index ef461a15d7f..d00b7c8d621 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicator.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicator.java @@ -106,7 +106,6 @@ public class AutodetectCommunicator implements Closeable { public void close(boolean restart, String reason) throws IOException { Future future = autodetectWorkerExecutor.submit(() -> { checkProcessIsAlive(); - dataCountsReporter.close(); autodetectProcess.close(); autoDetectResultProcessor.awaitCompletion(); handler.accept(restart ? new ElasticsearchException(reason) : null); @@ -123,7 +122,6 @@ public class AutodetectCommunicator implements Closeable { } } - public void writeUpdateProcessMessage(ModelPlotConfig config, List updates, BiConsumer handler) throws IOException { submitOperation(() -> { diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java index 330526c4d69..3871408d55a 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java @@ -270,37 +270,37 @@ public class AutodetectProcessManager extends AbstractComponent { Job job = jobManager.getJobOrThrowIfUnknown(jobId); // A TP with no queue, so that we fail immediately if there are no threads available ExecutorService autoDetectExecutorService = threadPool.executor(MachineLearning.AUTODETECT_THREAD_POOL_NAME); - try (DataCountsReporter dataCountsReporter = new DataCountsReporter(threadPool, settings, job, - autodetectParams.dataCounts(), jobDataCountsPersister)) { - ScoresUpdater scoresUpdater = new ScoresUpdater(job, jobProvider, new JobRenormalizedResultsPersister(settings, client), - normalizerFactory); - ExecutorService renormalizerExecutorService = threadPool.executor(MachineLearning.NORMALIZER_THREAD_POOL_NAME); - Renormalizer renormalizer = new ShortCircuitingRenormalizer(jobId, scoresUpdater, - renormalizerExecutorService, job.getAnalysisConfig().getUsePerPartitionNormalization()); + DataCountsReporter dataCountsReporter = new DataCountsReporter(settings, job, autodetectParams.dataCounts(), + jobDataCountsPersister); + ScoresUpdater scoresUpdater = new ScoresUpdater(job, jobProvider, new JobRenormalizedResultsPersister(settings, client), + normalizerFactory); + ExecutorService renormalizerExecutorService = threadPool.executor(MachineLearning.NORMALIZER_THREAD_POOL_NAME); + Renormalizer renormalizer = new ShortCircuitingRenormalizer(jobId, scoresUpdater, + renormalizerExecutorService, job.getAnalysisConfig().getUsePerPartitionNormalization()); - AutodetectProcess process = autodetectProcessFactory.createAutodetectProcess(job, autodetectParams.modelSnapshot(), - autodetectParams.quantiles(), autodetectParams.filters(), ignoreDowntime, - autoDetectExecutorService, () -> setJobState(jobTask, JobState.FAILED)); - boolean usePerPartitionNormalization = job.getAnalysisConfig().getUsePerPartitionNormalization(); - AutoDetectResultProcessor processor = new AutoDetectResultProcessor( - client, jobId, renormalizer, jobResultsPersister, autodetectParams.modelSizeStats()); - ExecutorService autodetectWorkerExecutor; + AutodetectProcess process = autodetectProcessFactory.createAutodetectProcess(job, autodetectParams.modelSnapshot(), + autodetectParams.quantiles(), autodetectParams.filters(), ignoreDowntime, + autoDetectExecutorService, () -> setJobState(jobTask, JobState.FAILED)); + boolean usePerPartitionNormalization = job.getAnalysisConfig().getUsePerPartitionNormalization(); + AutoDetectResultProcessor processor = new AutoDetectResultProcessor( + client, jobId, renormalizer, jobResultsPersister, autodetectParams.modelSizeStats()); + ExecutorService autodetectWorkerExecutor; + try { + autodetectWorkerExecutor = createAutodetectExecutorService(autoDetectExecutorService); + autoDetectExecutorService.submit(() -> processor.process(process, usePerPartitionNormalization)); + } catch (EsRejectedExecutionException e) { + // If submitting the operation to read the results from the process fails we need to close + // the process too, so that other submitted operations to threadpool are stopped. try { - autodetectWorkerExecutor = createAutodetectExecutorService(autoDetectExecutorService); - autoDetectExecutorService.submit(() -> processor.process(process, usePerPartitionNormalization)); - } catch (EsRejectedExecutionException e) { - // If submitting the operation to read the results from the process fails we need to close - // the process too, so that other submitted operations to threadpool are stopped. - try { - IOUtils.close(process); - } catch (IOException ioe) { - logger.error("Can't close autodetect", ioe); - } - throw e; + IOUtils.close(process); + } catch (IOException ioe) { + logger.error("Can't close autodetect", ioe); } - return new AutodetectCommunicator(job, process, dataCountsReporter, processor, - handler, xContentRegistry, autodetectWorkerExecutor); + throw e; } + return new AutodetectCommunicator(job, process, dataCountsReporter, processor, + handler, xContentRegistry, autodetectWorkerExecutor); + } /** diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/DataCountsReporterTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/DataCountsReporterTests.java index 859c4204048..76bee10cd6f 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/DataCountsReporterTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/DataCountsReporterTests.java @@ -9,14 +9,12 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.env.Environment; import org.elasticsearch.test.ESTestCase; -import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.ml.job.config.AnalysisConfig; import org.elasticsearch.xpack.ml.job.config.Detector; import org.elasticsearch.xpack.ml.job.config.Job; import org.elasticsearch.xpack.ml.job.persistence.JobDataCountsPersister; import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts; import org.junit.Before; -import org.mockito.ArgumentCaptor; import org.mockito.Mockito; import java.io.IOException; @@ -29,24 +27,22 @@ import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; public class DataCountsReporterTests extends ESTestCase { private static final int MAX_PERCENT_DATE_PARSE_ERRORS = 40; private static final int MAX_PERCENT_OUT_OF_ORDER_ERRORS = 30; - + private Job job; private JobDataCountsPersister jobDataCountsPersister; - private ThreadPool threadPool; private Settings settings; - + @Before public void setUpMocks() { settings = Settings.builder().put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString()) .put(DataCountsReporter.ACCEPTABLE_PERCENTAGE_DATE_PARSE_ERRORS_SETTING.getKey(), MAX_PERCENT_DATE_PARSE_ERRORS) .put(DataCountsReporter.ACCEPTABLE_PERCENTAGE_OUT_OF_ORDER_ERRORS_SETTING.getKey(), MAX_PERCENT_OUT_OF_ORDER_ERRORS) .build(); - + AnalysisConfig.Builder acBuilder = new AnalysisConfig.Builder(Arrays.asList(new Detector.Builder("metric", "field").build())); acBuilder.setBucketSpan(TimeValue.timeValueSeconds(300)); acBuilder.setLatency(TimeValue.ZERO); @@ -55,292 +51,235 @@ public class DataCountsReporterTests extends ESTestCase { Job.Builder builder = new Job.Builder("sr"); builder.setAnalysisConfig(acBuilder); job = builder.build(new Date()); - + jobDataCountsPersister = Mockito.mock(JobDataCountsPersister.class); - threadPool = Mockito.mock(ThreadPool.class); - - when(threadPool.scheduleWithFixedDelay(any(Runnable.class), any(), any())).thenReturn(new ThreadPool.Cancellable() { - @Override - public void cancel() { - } - - @Override - public boolean isCancelled() { - return false; - } - }); } public void testSettingAcceptablePercentages() throws IOException { - try (DataCountsReporter dataCountsReporter = new DataCountsReporter(threadPool, settings, job, new DataCounts(job.getId()), - jobDataCountsPersister)) { - assertEquals(dataCountsReporter.getAcceptablePercentDateParseErrors(), MAX_PERCENT_DATE_PARSE_ERRORS); - assertEquals(dataCountsReporter.getAcceptablePercentOutOfOrderErrors(), MAX_PERCENT_OUT_OF_ORDER_ERRORS); - } + DataCountsReporter dataCountsReporter = new DataCountsReporter(settings, job, new DataCounts(job.getId()), + jobDataCountsPersister); + assertEquals(dataCountsReporter.getAcceptablePercentDateParseErrors(), MAX_PERCENT_DATE_PARSE_ERRORS); + assertEquals(dataCountsReporter.getAcceptablePercentOutOfOrderErrors(), MAX_PERCENT_OUT_OF_ORDER_ERRORS); } public void testSimpleConstructor() throws Exception { - try (DataCountsReporter dataCountsReporter = new DataCountsReporter(threadPool, settings, job, new DataCounts(job.getId()), - jobDataCountsPersister)) { - DataCounts stats = dataCountsReporter.incrementalStats(); - assertNotNull(stats); - assertAllCountFieldsEqualZero(stats); - } + DataCountsReporter dataCountsReporter = new DataCountsReporter(settings, job, new DataCounts(job.getId()), + jobDataCountsPersister); + DataCounts stats = dataCountsReporter.incrementalStats(); + assertNotNull(stats); + assertAllCountFieldsEqualZero(stats); } public void testComplexConstructor() throws Exception { - DataCounts counts = new DataCounts("foo", 1L, 1L, 2L, 0L, 3L, 4L, 5L, 6L, 7L, 8L, + DataCounts counts = new DataCounts("foo", 1L, 1L, 2L, 0L, 3L, 4L, 5L, 6L, 7L, 8L, new Date(), new Date(), new Date(), new Date(), new Date()); - try (DataCountsReporter dataCountsReporter = - new DataCountsReporter(threadPool, settings, job, counts, jobDataCountsPersister)) { - DataCounts stats = dataCountsReporter.incrementalStats(); - assertNotNull(stats); - assertAllCountFieldsEqualZero(stats); + DataCountsReporter dataCountsReporter = + new DataCountsReporter(settings, job, counts, jobDataCountsPersister); + DataCounts stats = dataCountsReporter.incrementalStats(); + assertNotNull(stats); + assertAllCountFieldsEqualZero(stats); - assertEquals(1, dataCountsReporter.getProcessedRecordCount()); - assertEquals(2, dataCountsReporter.getBytesRead()); - assertEquals(3, dataCountsReporter.getDateParseErrorsCount()); - assertEquals(4, dataCountsReporter.getMissingFieldErrorCount()); - assertEquals(5, dataCountsReporter.getOutOfOrderRecordCount()); - assertEquals(6, dataCountsReporter.getEmptyBucketCount()); - assertEquals(7, dataCountsReporter.getSparseBucketCount()); - assertEquals(8, dataCountsReporter.getBucketCount()); - assertNull(stats.getEarliestRecordTimeStamp()); - } + assertEquals(1, dataCountsReporter.getProcessedRecordCount()); + assertEquals(2, dataCountsReporter.getBytesRead()); + assertEquals(3, dataCountsReporter.getDateParseErrorsCount()); + assertEquals(4, dataCountsReporter.getMissingFieldErrorCount()); + assertEquals(5, dataCountsReporter.getOutOfOrderRecordCount()); + assertEquals(6, dataCountsReporter.getEmptyBucketCount()); + assertEquals(7, dataCountsReporter.getSparseBucketCount()); + assertEquals(8, dataCountsReporter.getBucketCount()); + assertNull(stats.getEarliestRecordTimeStamp()); } public void testResetIncrementalCounts() throws Exception { - try (DataCountsReporter dataCountsReporter = new DataCountsReporter(threadPool, settings, job, new DataCounts(job.getId()), - jobDataCountsPersister)) { - DataCounts stats = dataCountsReporter.incrementalStats(); - assertNotNull(stats); - assertAllCountFieldsEqualZero(stats); + DataCountsReporter dataCountsReporter = new DataCountsReporter(settings, job, new DataCounts(job.getId()), + jobDataCountsPersister); + DataCounts stats = dataCountsReporter.incrementalStats(); + assertNotNull(stats); + assertAllCountFieldsEqualZero(stats); - dataCountsReporter.setAnalysedFieldsPerRecord(3); + dataCountsReporter.setAnalysedFieldsPerRecord(3); - dataCountsReporter.reportRecordWritten(5, 1000); - dataCountsReporter.reportRecordWritten(5, 1000); - assertEquals(2, dataCountsReporter.incrementalStats().getInputRecordCount()); - assertEquals(10, dataCountsReporter.incrementalStats().getInputFieldCount()); - assertEquals(2, dataCountsReporter.incrementalStats().getProcessedRecordCount()); - assertEquals(6, dataCountsReporter.incrementalStats().getProcessedFieldCount()); - assertEquals(1000, dataCountsReporter.incrementalStats().getLatestRecordTimeStamp().getTime()); + dataCountsReporter.reportRecordWritten(5, 1000); + dataCountsReporter.reportRecordWritten(5, 1000); + assertEquals(2, dataCountsReporter.incrementalStats().getInputRecordCount()); + assertEquals(10, dataCountsReporter.incrementalStats().getInputFieldCount()); + assertEquals(2, dataCountsReporter.incrementalStats().getProcessedRecordCount()); + assertEquals(6, dataCountsReporter.incrementalStats().getProcessedFieldCount()); + assertEquals(1000, dataCountsReporter.incrementalStats().getLatestRecordTimeStamp().getTime()); - assertEquals(dataCountsReporter.incrementalStats(), dataCountsReporter.runningTotalStats()); + assertEquals(dataCountsReporter.incrementalStats(), dataCountsReporter.runningTotalStats()); - dataCountsReporter.startNewIncrementalCount(); - stats = dataCountsReporter.incrementalStats(); - assertNotNull(stats); - assertAllCountFieldsEqualZero(stats); + dataCountsReporter.startNewIncrementalCount(); + stats = dataCountsReporter.incrementalStats(); + assertNotNull(stats); + assertAllCountFieldsEqualZero(stats); - // write some more data - dataCountsReporter.reportRecordWritten(5, 302000); - dataCountsReporter.reportRecordWritten(5, 302000); - assertEquals(2, dataCountsReporter.incrementalStats().getInputRecordCount()); - assertEquals(10, dataCountsReporter.incrementalStats().getInputFieldCount()); - assertEquals(2, dataCountsReporter.incrementalStats().getProcessedRecordCount()); - assertEquals(6, dataCountsReporter.incrementalStats().getProcessedFieldCount()); - assertEquals(302000, dataCountsReporter.incrementalStats().getLatestRecordTimeStamp().getTime()); + // write some more data + dataCountsReporter.reportRecordWritten(5, 302000); + dataCountsReporter.reportRecordWritten(5, 302000); + assertEquals(2, dataCountsReporter.incrementalStats().getInputRecordCount()); + assertEquals(10, dataCountsReporter.incrementalStats().getInputFieldCount()); + assertEquals(2, dataCountsReporter.incrementalStats().getProcessedRecordCount()); + assertEquals(6, dataCountsReporter.incrementalStats().getProcessedFieldCount()); + assertEquals(302000, dataCountsReporter.incrementalStats().getLatestRecordTimeStamp().getTime()); - // check total stats - assertEquals(4, dataCountsReporter.runningTotalStats().getInputRecordCount()); - assertEquals(20, dataCountsReporter.runningTotalStats().getInputFieldCount()); - assertEquals(4, dataCountsReporter.runningTotalStats().getProcessedRecordCount()); - assertEquals(12, dataCountsReporter.runningTotalStats().getProcessedFieldCount()); - assertEquals(302000, dataCountsReporter.runningTotalStats().getLatestRecordTimeStamp().getTime()); + // check total stats + assertEquals(4, dataCountsReporter.runningTotalStats().getInputRecordCount()); + assertEquals(20, dataCountsReporter.runningTotalStats().getInputFieldCount()); + assertEquals(4, dataCountsReporter.runningTotalStats().getProcessedRecordCount()); + assertEquals(12, dataCountsReporter.runningTotalStats().getProcessedFieldCount()); + assertEquals(302000, dataCountsReporter.runningTotalStats().getLatestRecordTimeStamp().getTime()); - // send 'flush' signal - dataCountsReporter.finishReporting(); - assertEquals(2, dataCountsReporter.runningTotalStats().getBucketCount()); - assertEquals(0, dataCountsReporter.runningTotalStats().getEmptyBucketCount()); - assertEquals(0, dataCountsReporter.runningTotalStats().getSparseBucketCount()); - } + // send 'flush' signal + dataCountsReporter.finishReporting(); + assertEquals(2, dataCountsReporter.runningTotalStats().getBucketCount()); + assertEquals(0, dataCountsReporter.runningTotalStats().getEmptyBucketCount()); + assertEquals(0, dataCountsReporter.runningTotalStats().getSparseBucketCount()); } public void testReportLatestTimeIncrementalStats() throws IOException { - try (DataCountsReporter dataCountsReporter = new DataCountsReporter(threadPool, settings, job, new DataCounts(job.getId()), - jobDataCountsPersister)) { - dataCountsReporter.startNewIncrementalCount(); - dataCountsReporter.reportLatestTimeIncrementalStats(5001L); - assertEquals(5001L, dataCountsReporter.incrementalStats().getLatestRecordTimeStamp().getTime()); - } + DataCountsReporter dataCountsReporter = new DataCountsReporter(settings, job, new DataCounts(job.getId()), + jobDataCountsPersister); + dataCountsReporter.startNewIncrementalCount(); + dataCountsReporter.reportLatestTimeIncrementalStats(5001L); + assertEquals(5001L, dataCountsReporter.incrementalStats().getLatestRecordTimeStamp().getTime()); } public void testReportRecordsWritten() { - try (DataCountsReporter dataCountsReporter = new DataCountsReporter(threadPool, settings, job, new DataCounts(job.getId()), - jobDataCountsPersister)) { - dataCountsReporter.setAnalysedFieldsPerRecord(3); + DataCountsReporter dataCountsReporter = new DataCountsReporter(settings, job, new DataCounts(job.getId()), + jobDataCountsPersister); + dataCountsReporter.setAnalysedFieldsPerRecord(3); - dataCountsReporter.reportRecordWritten(5, 2000); - assertEquals(1, dataCountsReporter.incrementalStats().getInputRecordCount()); - assertEquals(5, dataCountsReporter.incrementalStats().getInputFieldCount()); - assertEquals(1, dataCountsReporter.incrementalStats().getProcessedRecordCount()); - assertEquals(3, dataCountsReporter.incrementalStats().getProcessedFieldCount()); - assertEquals(2000, dataCountsReporter.incrementalStats().getLatestRecordTimeStamp().getTime()); + dataCountsReporter.reportRecordWritten(5, 2000); + assertEquals(1, dataCountsReporter.incrementalStats().getInputRecordCount()); + assertEquals(5, dataCountsReporter.incrementalStats().getInputFieldCount()); + assertEquals(1, dataCountsReporter.incrementalStats().getProcessedRecordCount()); + assertEquals(3, dataCountsReporter.incrementalStats().getProcessedFieldCount()); + assertEquals(2000, dataCountsReporter.incrementalStats().getLatestRecordTimeStamp().getTime()); - dataCountsReporter.reportRecordWritten(5, 3000); - dataCountsReporter.reportMissingField(); - assertEquals(2, dataCountsReporter.incrementalStats().getInputRecordCount()); - assertEquals(10, dataCountsReporter.incrementalStats().getInputFieldCount()); - assertEquals(2, dataCountsReporter.incrementalStats().getProcessedRecordCount()); - assertEquals(5, dataCountsReporter.incrementalStats().getProcessedFieldCount()); - assertEquals(3000, dataCountsReporter.incrementalStats().getLatestRecordTimeStamp().getTime()); + dataCountsReporter.reportRecordWritten(5, 3000); + dataCountsReporter.reportMissingField(); + assertEquals(2, dataCountsReporter.incrementalStats().getInputRecordCount()); + assertEquals(10, dataCountsReporter.incrementalStats().getInputFieldCount()); + assertEquals(2, dataCountsReporter.incrementalStats().getProcessedRecordCount()); + assertEquals(5, dataCountsReporter.incrementalStats().getProcessedFieldCount()); + assertEquals(3000, dataCountsReporter.incrementalStats().getLatestRecordTimeStamp().getTime()); - assertEquals(dataCountsReporter.incrementalStats(), dataCountsReporter.runningTotalStats()); + assertEquals(dataCountsReporter.incrementalStats(), dataCountsReporter.runningTotalStats()); - verify(jobDataCountsPersister, never()).persistDataCounts(anyString(), any(DataCounts.class), any()); - } + verify(jobDataCountsPersister, never()).persistDataCounts(anyString(), any(DataCounts.class), any()); } public void testReportRecordsWritten_Given100Records() { - try (DummyDataCountsReporter dataCountsReporter = new DummyDataCountsReporter()) { - dataCountsReporter.setAnalysedFieldsPerRecord(3); + DummyDataCountsReporter dataCountsReporter = new DummyDataCountsReporter(); + dataCountsReporter.setAnalysedFieldsPerRecord(3); - for (int i = 1; i <= 101; i++) { - dataCountsReporter.reportRecordWritten(5, i); - } - - assertEquals(101, dataCountsReporter.incrementalStats().getInputRecordCount()); - assertEquals(505, dataCountsReporter.incrementalStats().getInputFieldCount()); - assertEquals(101, dataCountsReporter.incrementalStats().getProcessedRecordCount()); - assertEquals(303, dataCountsReporter.incrementalStats().getProcessedFieldCount()); - assertEquals(101, dataCountsReporter.incrementalStats().getLatestRecordTimeStamp().getTime()); - - assertEquals(1, dataCountsReporter.getLogStatusCallCount()); + for (int i = 1; i <= 101; i++) { + dataCountsReporter.reportRecordWritten(5, i); } + + assertEquals(101, dataCountsReporter.incrementalStats().getInputRecordCount()); + assertEquals(505, dataCountsReporter.incrementalStats().getInputFieldCount()); + assertEquals(101, dataCountsReporter.incrementalStats().getProcessedRecordCount()); + assertEquals(303, dataCountsReporter.incrementalStats().getProcessedFieldCount()); + assertEquals(101, dataCountsReporter.incrementalStats().getLatestRecordTimeStamp().getTime()); + + assertEquals(1, dataCountsReporter.getLogStatusCallCount()); } public void testReportRecordsWritten_Given1000Records() { - try (DummyDataCountsReporter dataCountsReporter = new DummyDataCountsReporter()) { + DummyDataCountsReporter dataCountsReporter = new DummyDataCountsReporter(); - dataCountsReporter.setAnalysedFieldsPerRecord(3); + dataCountsReporter.setAnalysedFieldsPerRecord(3); - for (int i = 1; i <= 1001; i++) { - dataCountsReporter.reportRecordWritten(5, i); - } - - assertEquals(1001, dataCountsReporter.incrementalStats().getInputRecordCount()); - assertEquals(5005, dataCountsReporter.incrementalStats().getInputFieldCount()); - assertEquals(1001, dataCountsReporter.incrementalStats().getProcessedRecordCount()); - assertEquals(3003, dataCountsReporter.incrementalStats().getProcessedFieldCount()); - assertEquals(1001, dataCountsReporter.incrementalStats().getLatestRecordTimeStamp().getTime()); - - assertEquals(10, dataCountsReporter.getLogStatusCallCount()); + for (int i = 1; i <= 1001; i++) { + dataCountsReporter.reportRecordWritten(5, i); } + assertEquals(1001, dataCountsReporter.incrementalStats().getInputRecordCount()); + assertEquals(5005, dataCountsReporter.incrementalStats().getInputFieldCount()); + assertEquals(1001, dataCountsReporter.incrementalStats().getProcessedRecordCount()); + assertEquals(3003, dataCountsReporter.incrementalStats().getProcessedFieldCount()); + assertEquals(1001, dataCountsReporter.incrementalStats().getLatestRecordTimeStamp().getTime()); + + assertEquals(10, dataCountsReporter.getLogStatusCallCount()); } public void testReportRecordsWritten_Given2000Records() { - try (DummyDataCountsReporter dataCountsReporter = new DummyDataCountsReporter()) { - dataCountsReporter.setAnalysedFieldsPerRecord(3); + DummyDataCountsReporter dataCountsReporter = new DummyDataCountsReporter(); + dataCountsReporter.setAnalysedFieldsPerRecord(3); - for (int i = 1; i <= 2001; i++) { - dataCountsReporter.reportRecordWritten(5, i); - } - - assertEquals(2001, dataCountsReporter.incrementalStats().getInputRecordCount()); - assertEquals(10005, dataCountsReporter.incrementalStats().getInputFieldCount()); - assertEquals(2001, dataCountsReporter.incrementalStats().getProcessedRecordCount()); - assertEquals(6003, dataCountsReporter.incrementalStats().getProcessedFieldCount()); - assertEquals(2001, dataCountsReporter.incrementalStats().getLatestRecordTimeStamp().getTime()); - - assertEquals(11, dataCountsReporter.getLogStatusCallCount()); + for (int i = 1; i <= 2001; i++) { + dataCountsReporter.reportRecordWritten(5, i); } + + assertEquals(2001, dataCountsReporter.incrementalStats().getInputRecordCount()); + assertEquals(10005, dataCountsReporter.incrementalStats().getInputFieldCount()); + assertEquals(2001, dataCountsReporter.incrementalStats().getProcessedRecordCount()); + assertEquals(6003, dataCountsReporter.incrementalStats().getProcessedFieldCount()); + assertEquals(2001, dataCountsReporter.incrementalStats().getLatestRecordTimeStamp().getTime()); + + assertEquals(11, dataCountsReporter.getLogStatusCallCount()); } public void testReportRecordsWritten_Given20000Records() { - try (DummyDataCountsReporter dataCountsReporter = new DummyDataCountsReporter()) { - dataCountsReporter.setAnalysedFieldsPerRecord(3); + DummyDataCountsReporter dataCountsReporter = new DummyDataCountsReporter(); + dataCountsReporter.setAnalysedFieldsPerRecord(3); - for (int i = 1; i <= 20001; i++) { - dataCountsReporter.reportRecordWritten(5, i); - } - - assertEquals(20001, dataCountsReporter.incrementalStats().getInputRecordCount()); - assertEquals(100005, dataCountsReporter.incrementalStats().getInputFieldCount()); - assertEquals(20001, dataCountsReporter.incrementalStats().getProcessedRecordCount()); - assertEquals(60003, dataCountsReporter.incrementalStats().getProcessedFieldCount()); - assertEquals(20001, dataCountsReporter.incrementalStats().getLatestRecordTimeStamp().getTime()); - - assertEquals(29, dataCountsReporter.getLogStatusCallCount()); + for (int i = 1; i <= 20001; i++) { + dataCountsReporter.reportRecordWritten(5, i); } + + assertEquals(20001, dataCountsReporter.incrementalStats().getInputRecordCount()); + assertEquals(100005, dataCountsReporter.incrementalStats().getInputFieldCount()); + assertEquals(20001, dataCountsReporter.incrementalStats().getProcessedRecordCount()); + assertEquals(60003, dataCountsReporter.incrementalStats().getProcessedFieldCount()); + assertEquals(20001, dataCountsReporter.incrementalStats().getLatestRecordTimeStamp().getTime()); + + assertEquals(29, dataCountsReporter.getLogStatusCallCount()); } public void testReportRecordsWritten_Given30000Records() { - try (DummyDataCountsReporter dataCountsReporter = new DummyDataCountsReporter()) { - dataCountsReporter.setAnalysedFieldsPerRecord(3); + DummyDataCountsReporter dataCountsReporter = new DummyDataCountsReporter(); + dataCountsReporter.setAnalysedFieldsPerRecord(3); - for (int i = 1; i <= 30001; i++) { - dataCountsReporter.reportRecordWritten(5, i); - } - - assertEquals(30001, dataCountsReporter.incrementalStats().getInputRecordCount()); - assertEquals(150005, dataCountsReporter.incrementalStats().getInputFieldCount()); - assertEquals(30001, dataCountsReporter.incrementalStats().getProcessedRecordCount()); - assertEquals(90003, dataCountsReporter.incrementalStats().getProcessedFieldCount()); - assertEquals(30001, dataCountsReporter.incrementalStats().getLatestRecordTimeStamp().getTime()); - - assertEquals(30, dataCountsReporter.getLogStatusCallCount()); + for (int i = 1; i <= 30001; i++) { + dataCountsReporter.reportRecordWritten(5, i); } + + assertEquals(30001, dataCountsReporter.incrementalStats().getInputRecordCount()); + assertEquals(150005, dataCountsReporter.incrementalStats().getInputFieldCount()); + assertEquals(30001, dataCountsReporter.incrementalStats().getProcessedRecordCount()); + assertEquals(90003, dataCountsReporter.incrementalStats().getProcessedFieldCount()); + assertEquals(30001, dataCountsReporter.incrementalStats().getLatestRecordTimeStamp().getTime()); + + assertEquals(30, dataCountsReporter.getLogStatusCallCount()); } public void testFinishReporting() { - try (DataCountsReporter dataCountsReporter = new DataCountsReporter(threadPool, settings, job, new DataCounts(job.getId()), - jobDataCountsPersister)) { + DataCountsReporter dataCountsReporter = new DataCountsReporter(settings, job, new DataCounts(job.getId()), + jobDataCountsPersister); - dataCountsReporter.setAnalysedFieldsPerRecord(3); - Date now = new Date(); - DataCounts dc = new DataCounts(job.getId(), 2L, 5L, 0L, 10L, 0L, 1L, 0L, 0L, 0L, 1L, new Date(2000), new Date(3000), - now, (Date) null, (Date) null); - dataCountsReporter.reportRecordWritten(5, 2000); - dataCountsReporter.reportRecordWritten(5, 3000); - dataCountsReporter.reportMissingField(); - dataCountsReporter.finishReporting(); + dataCountsReporter.setAnalysedFieldsPerRecord(3); + Date now = new Date(); + DataCounts dc = new DataCounts(job.getId(), 2L, 5L, 0L, 10L, 0L, 1L, 0L, 0L, 0L, 1L, new Date(2000), new Date(3000), + now, (Date) null, (Date) null); + dataCountsReporter.reportRecordWritten(5, 2000); + dataCountsReporter.reportRecordWritten(5, 3000); + dataCountsReporter.reportMissingField(); + dataCountsReporter.finishReporting(); - long lastReportedTimeMs = dataCountsReporter.incrementalStats().getLastDataTimeStamp().getTime(); - // check last data time is equal to now give or take a second - assertTrue(lastReportedTimeMs >= now.getTime() - && lastReportedTimeMs <= now.getTime() + TimeUnit.SECONDS.toMillis(1)); - assertEquals(dataCountsReporter.incrementalStats().getLastDataTimeStamp(), - dataCountsReporter.runningTotalStats().getLastDataTimeStamp()); + long lastReportedTimeMs = dataCountsReporter.incrementalStats().getLastDataTimeStamp().getTime(); + // check last data time is equal to now give or take a second + assertTrue(lastReportedTimeMs >= now.getTime() + && lastReportedTimeMs <= now.getTime() + TimeUnit.SECONDS.toMillis(1)); + assertEquals(dataCountsReporter.incrementalStats().getLastDataTimeStamp(), + dataCountsReporter.runningTotalStats().getLastDataTimeStamp()); - dc.setLastDataTimeStamp(dataCountsReporter.incrementalStats().getLastDataTimeStamp()); - Mockito.verify(jobDataCountsPersister, Mockito.times(1)).persistDataCounts(eq("sr"), eq(dc), any()); - assertEquals(dc, dataCountsReporter.incrementalStats()); - } - } - - public void testPersistenceTimeOut() { - - ThreadPool mockThreadPool = Mockito.mock(ThreadPool.class); - ArgumentCaptor argumentCaptor = ArgumentCaptor.forClass(Runnable.class); - - when(mockThreadPool.scheduleWithFixedDelay(argumentCaptor.capture(), any(), any())).thenReturn(new ThreadPool.Cancellable() { - @Override - public void cancel() { - - } - - @Override - public boolean isCancelled() { - return false; - } - }); - - try (DataCountsReporter dataCountsReporter = new DataCountsReporter(mockThreadPool, settings, job, new DataCounts(job.getId()), - jobDataCountsPersister)) { - - dataCountsReporter.setAnalysedFieldsPerRecord(3); - - dataCountsReporter.reportRecordWritten(5, 2000); - dataCountsReporter.reportRecordWritten(5, 3000); - - Mockito.verify(jobDataCountsPersister, Mockito.times(0)).persistDataCounts(eq("sr"), any(), any()); - argumentCaptor.getValue().run(); - dataCountsReporter.reportRecordWritten(5, 4000); - Mockito.verify(jobDataCountsPersister, Mockito.times(1)).persistDataCounts(eq("sr"), any(), any()); - } + dc.setLastDataTimeStamp(dataCountsReporter.incrementalStats().getLastDataTimeStamp()); + Mockito.verify(jobDataCountsPersister, Mockito.times(1)).persistDataCounts(eq("sr"), eq(dc), any()); + assertEquals(dc, dataCountsReporter.incrementalStats()); } private void assertAllCountFieldsEqualZero(DataCounts stats) throws Exception { @@ -353,6 +292,6 @@ public class DataCountsReporterTests extends ESTestCase { assertEquals(0L, stats.getMissingFieldCount()); assertEquals(0L, stats.getOutOfOrderTimeStampCount()); assertEquals(0L, stats.getEmptyBucketCount()); - assertEquals(0L, stats.getSparseBucketCount());; + assertEquals(0L, stats.getSparseBucketCount()); } } diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/DummyDataCountsReporter.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/DummyDataCountsReporter.java index 476864ba494..fb4de2eeffb 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/DummyDataCountsReporter.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/DummyDataCountsReporter.java @@ -27,7 +27,7 @@ class DummyDataCountsReporter extends DataCountsReporter { int logStatusCallCount = 0; DummyDataCountsReporter() { - super(mock(ThreadPool.class), Settings.EMPTY, createJob(), new DataCounts("DummyJobId"), + super(Settings.EMPTY, createJob(), new DataCounts("DummyJobId"), mock(JobDataCountsPersister.class)); } @@ -42,7 +42,6 @@ class DummyDataCountsReporter extends DataCountsReporter { ++logStatusCallCount; } - /** * @return Then number of times {@link #logStatus(long)} was called. */ @@ -50,11 +49,6 @@ class DummyDataCountsReporter extends DataCountsReporter { return logStatusCallCount; } - @Override - public void close() { - // Do nothing - } - private static Job createJob() { AnalysisConfig.Builder acBuilder = new AnalysisConfig.Builder( Arrays.asList(new Detector.Builder("metric", "field").build()));