diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPreviewDatafeedAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPreviewDatafeedAction.java index 3be0327db1f..97afe1b7cfc 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPreviewDatafeedAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPreviewDatafeedAction.java @@ -24,7 +24,6 @@ import org.elasticsearch.xpack.ml.datafeed.DatafeedTimingStatsReporter; import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory; import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider; import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider; -import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister; import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider; import java.io.BufferedReader; @@ -42,21 +41,19 @@ public class TransportPreviewDatafeedAction extends HandledTransportAction {}), new ActionListener() { @Override public void onResponse(DataExtractorFactory dataExtractorFactory) { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java index 41e673807a8..5edcf2202a2 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java @@ -55,7 +55,6 @@ import org.elasticsearch.xpack.ml.datafeed.DatafeedTimingStatsReporter; import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory; import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider; import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider; -import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister; import org.elasticsearch.xpack.ml.notifications.Auditor; import java.io.IOException; @@ -82,7 +81,6 @@ public class TransportStartDatafeedAction extends TransportMasterNodeAction {}), ActionListener.wrap( unused -> persistentTasksService.sendStartRequest( diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJob.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJob.java index 18d724313e3..8837d3f03f9 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJob.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJob.java @@ -107,6 +107,10 @@ class DatafeedJob { return jobId; } + public void finishReportingTimingStats() { + timingStatsReporter.finishReporting(); + } + Long runLookBack(long startTime, Long endTime) throws Exception { lookbackStartTimeMs = skipToStartTime(startTime); Optional endMs = Optional.ofNullable(endTime); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobBuilder.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobBuilder.java index 778e2116402..a592f410226 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobBuilder.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobBuilder.java @@ -101,8 +101,9 @@ public class DatafeedJobBuilder { ); // Create data extractor factory - Consumer datafeedTimingStatsHandler = timingStats -> { - context.timingStatsReporter = new DatafeedTimingStatsReporter(timingStats, jobResultsPersister); + Consumer datafeedTimingStatsHandler = initialTimingStats -> { + context.timingStatsReporter = + new DatafeedTimingStatsReporter(initialTimingStats, jobResultsPersister::persistDatafeedTimingStats); DataExtractorFactory.create( client, datafeedConfigHolder.get(), diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManager.java index 53568c3705a..4425b624a06 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManager.java @@ -347,6 +347,7 @@ public class DatafeedManager { } auditor.info(datafeedJob.getJobId(), Messages.getMessage(isIsolated() ? Messages.JOB_AUDIT_DATAFEED_ISOLATED : Messages.JOB_AUDIT_DATAFEED_STOPPED)); + datafeedJob.finishReportingTimingStats(); finishHandler.accept(e); logger.info("[{}] datafeed [{}] for job [{}] has been stopped{}", source, datafeedId, datafeedJob.getJobId(), acquired ? "" : ", but there may be pending tasks as the timeout [" + timeout.getStringRep() + "] expired"); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedTimingStatsReporter.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedTimingStatsReporter.java index 7df3919c459..283b667f7b8 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedTimingStatsReporter.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedTimingStatsReporter.java @@ -9,7 +9,6 @@ import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedTimingStats; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts; -import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister; import java.util.Objects; @@ -21,20 +20,28 @@ import java.util.Objects; */ public class DatafeedTimingStatsReporter { + /** Interface used for persisting current timing stats to the results index. */ + @FunctionalInterface + public interface DatafeedTimingStatsPersister { + /** Does nothing by default. This behavior is useful when creating fake {@link DatafeedTimingStatsReporter} objects. */ + void persistDatafeedTimingStats(DatafeedTimingStats timingStats, WriteRequest.RefreshPolicy refreshPolicy); + } + /** Persisted timing stats. May be stale. */ private DatafeedTimingStats persistedTimingStats; /** Current timing stats. */ private volatile DatafeedTimingStats currentTimingStats; /** Object used to persist current timing stats. */ - private final JobResultsPersister jobResultsPersister; + private final DatafeedTimingStatsPersister persister; - public DatafeedTimingStatsReporter(DatafeedTimingStats timingStats, JobResultsPersister jobResultsPersister) { + public DatafeedTimingStatsReporter(DatafeedTimingStats timingStats, DatafeedTimingStatsPersister persister) { Objects.requireNonNull(timingStats); this.persistedTimingStats = new DatafeedTimingStats(timingStats); this.currentTimingStats = new DatafeedTimingStats(timingStats); - this.jobResultsPersister = Objects.requireNonNull(jobResultsPersister); + this.persister = Objects.requireNonNull(persister); } + /** Gets current timing stats. */ public DatafeedTimingStats getCurrentTimingStats() { return new DatafeedTimingStats(currentTimingStats); } @@ -64,16 +71,23 @@ public class DatafeedTimingStatsReporter { flushIfDifferSignificantly(); } - private void flushIfDifferSignificantly() { - if (differSignificantly(currentTimingStats, persistedTimingStats)) { - flush(); + /** Finishes reporting of timing stats. Makes timing stats persisted immediately. */ + public void finishReporting() { + // Don't flush if current timing stats are identical to the persisted ones + if (currentTimingStats.equals(persistedTimingStats) == false) { + flush(WriteRequest.RefreshPolicy.IMMEDIATE); } } - private void flush() { + private void flushIfDifferSignificantly() { + if (differSignificantly(currentTimingStats, persistedTimingStats)) { + flush(WriteRequest.RefreshPolicy.NONE); + } + } + + private void flush(WriteRequest.RefreshPolicy refreshPolicy) { persistedTimingStats = new DatafeedTimingStats(currentTimingStats); - // TODO: Consider changing refresh policy to NONE here and only do IMMEDIATE on datafeed _stop action - jobResultsPersister.persistDatafeedTimingStats(persistedTimingStats, WriteRequest.RefreshPolicy.IMMEDIATE); + persister.persistDatafeedTimingStats(persistedTimingStats, refreshPolicy); } /** diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedTimingStatsReporterTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedTimingStatsReporterTests.java index 6daa0f5a0b8..fff735418be 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedTimingStatsReporterTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedTimingStatsReporterTests.java @@ -11,10 +11,13 @@ import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedTimingStats; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts; import org.elasticsearch.xpack.core.ml.utils.ExponentialAverageCalculationContext; -import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister; +import org.elasticsearch.xpack.ml.datafeed.DatafeedTimingStatsReporter.DatafeedTimingStatsPersister; import org.junit.Before; import org.mockito.InOrder; +import java.sql.Date; +import java.time.Instant; + import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; import static org.mockito.Mockito.inOrder; @@ -26,93 +29,107 @@ import static org.mockito.Mockito.verifyZeroInteractions; public class DatafeedTimingStatsReporterTests extends ESTestCase { private static final String JOB_ID = "my-job-id"; + private static final Instant TIMESTAMP = Instant.ofEpochMilli(1000000000); private static final TimeValue ONE_SECOND = TimeValue.timeValueSeconds(1); - private JobResultsPersister jobResultsPersister; + private DatafeedTimingStatsPersister timingStatsPersister; @Before public void setUpTests() { - jobResultsPersister = mock(JobResultsPersister.class); + timingStatsPersister = mock(DatafeedTimingStatsPersister.class); } public void testReportSearchDuration_Null() { - DatafeedTimingStatsReporter timingStatsReporter = createReporter(createDatafeedTimingStats(JOB_ID, 3, 10, 10000.0)); - assertThat(timingStatsReporter.getCurrentTimingStats(), equalTo(createDatafeedTimingStats(JOB_ID, 3, 10, 10000.0))); + DatafeedTimingStatsReporter reporter = createReporter(createDatafeedTimingStats(JOB_ID, 3, 10, 10000.0)); + assertThat(reporter.getCurrentTimingStats(), equalTo(createDatafeedTimingStats(JOB_ID, 3, 10, 10000.0))); - timingStatsReporter.reportSearchDuration(null); - assertThat(timingStatsReporter.getCurrentTimingStats(), equalTo(createDatafeedTimingStats(JOB_ID, 3, 10, 10000.0))); + reporter.reportSearchDuration(null); + assertThat(reporter.getCurrentTimingStats(), equalTo(createDatafeedTimingStats(JOB_ID, 3, 10, 10000.0))); - verifyZeroInteractions(jobResultsPersister); + verifyZeroInteractions(timingStatsPersister); } public void testReportSearchDuration_Zero() { - DatafeedTimingStatsReporter timingStatsReporter = createReporter(createDatafeedTimingStats(JOB_ID, 0, 0, 0.0)); - assertThat(timingStatsReporter.getCurrentTimingStats(), equalTo(createDatafeedTimingStats(JOB_ID, 0, 0, 0.0))); + DatafeedTimingStatsReporter reporter = createReporter(createDatafeedTimingStats(JOB_ID, 0, 0, 0.0)); + assertThat(reporter.getCurrentTimingStats(), equalTo(createDatafeedTimingStats(JOB_ID, 0, 0, 0.0))); - timingStatsReporter.reportSearchDuration(TimeValue.ZERO); - assertThat(timingStatsReporter.getCurrentTimingStats(), equalTo(createDatafeedTimingStats(JOB_ID, 1, 0, 0.0))); + reporter.reportSearchDuration(TimeValue.ZERO); + assertThat(reporter.getCurrentTimingStats(), equalTo(createDatafeedTimingStats(JOB_ID, 1, 0, 0.0))); - verify(jobResultsPersister).persistDatafeedTimingStats(createDatafeedTimingStats(JOB_ID, 1, 0, 0.0), RefreshPolicy.IMMEDIATE); - verifyNoMoreInteractions(jobResultsPersister); + verify(timingStatsPersister).persistDatafeedTimingStats(createDatafeedTimingStats(JOB_ID, 1, 0, 0.0), RefreshPolicy.NONE); + verifyNoMoreInteractions(timingStatsPersister); } public void testReportSearchDuration() { - DatafeedTimingStatsReporter timingStatsReporter = createReporter(createDatafeedTimingStats(JOB_ID, 13, 10, 10000.0, 10000.0)); - assertThat(timingStatsReporter.getCurrentTimingStats(), equalTo(createDatafeedTimingStats(JOB_ID, 13, 10, 10000.0, 10000.0))); + DatafeedTimingStatsReporter reporter = createReporter(createDatafeedTimingStats(JOB_ID, 13, 10, 10000.0, 10000.0)); + assertThat(reporter.getCurrentTimingStats(), equalTo(createDatafeedTimingStats(JOB_ID, 13, 10, 10000.0, 10000.0))); - timingStatsReporter.reportSearchDuration(ONE_SECOND); - assertThat(timingStatsReporter.getCurrentTimingStats(), equalTo(createDatafeedTimingStats(JOB_ID, 14, 10, 11000.0, 11000.0))); + reporter.reportSearchDuration(ONE_SECOND); + assertThat(reporter.getCurrentTimingStats(), equalTo(createDatafeedTimingStats(JOB_ID, 14, 10, 11000.0, 11000.0))); - timingStatsReporter.reportSearchDuration(ONE_SECOND); - assertThat(timingStatsReporter.getCurrentTimingStats(), equalTo(createDatafeedTimingStats(JOB_ID, 15, 10, 12000.0, 12000.0))); + reporter.reportSearchDuration(ONE_SECOND); + assertThat(reporter.getCurrentTimingStats(), equalTo(createDatafeedTimingStats(JOB_ID, 15, 10, 12000.0, 12000.0))); - timingStatsReporter.reportSearchDuration(ONE_SECOND); - assertThat(timingStatsReporter.getCurrentTimingStats(), equalTo(createDatafeedTimingStats(JOB_ID, 16, 10, 13000.0, 13000.0))); + reporter.reportSearchDuration(ONE_SECOND); + assertThat(reporter.getCurrentTimingStats(), equalTo(createDatafeedTimingStats(JOB_ID, 16, 10, 13000.0, 13000.0))); - timingStatsReporter.reportSearchDuration(ONE_SECOND); - assertThat(timingStatsReporter.getCurrentTimingStats(), equalTo(createDatafeedTimingStats(JOB_ID, 17, 10, 14000.0, 14000.0))); + reporter.reportSearchDuration(ONE_SECOND); + assertThat(reporter.getCurrentTimingStats(), equalTo(createDatafeedTimingStats(JOB_ID, 17, 10, 14000.0, 14000.0))); - InOrder inOrder = inOrder(jobResultsPersister); - inOrder.verify(jobResultsPersister).persistDatafeedTimingStats( - createDatafeedTimingStats(JOB_ID, 15, 10, 12000.0, 12000.0), RefreshPolicy.IMMEDIATE); - inOrder.verify(jobResultsPersister).persistDatafeedTimingStats( - createDatafeedTimingStats(JOB_ID, 17, 10, 14000.0, 14000.0), RefreshPolicy.IMMEDIATE); - verifyNoMoreInteractions(jobResultsPersister); + InOrder inOrder = inOrder(timingStatsPersister); + inOrder.verify(timingStatsPersister).persistDatafeedTimingStats( + createDatafeedTimingStats(JOB_ID, 15, 10, 12000.0, 12000.0), RefreshPolicy.NONE); + inOrder.verify(timingStatsPersister).persistDatafeedTimingStats( + createDatafeedTimingStats(JOB_ID, 17, 10, 14000.0, 14000.0), RefreshPolicy.NONE); + verifyNoMoreInteractions(timingStatsPersister); } public void testReportDataCounts_Null() { - DatafeedTimingStatsReporter timingStatsReporter = createReporter(createDatafeedTimingStats(JOB_ID, 3, 10, 10000.0)); - assertThat(timingStatsReporter.getCurrentTimingStats(), equalTo(createDatafeedTimingStats(JOB_ID, 3, 10, 10000.0))); + DatafeedTimingStatsReporter reporter = createReporter(createDatafeedTimingStats(JOB_ID, 3, 10, 10000.0)); + assertThat(reporter.getCurrentTimingStats(), equalTo(createDatafeedTimingStats(JOB_ID, 3, 10, 10000.0))); - timingStatsReporter.reportDataCounts(null); - assertThat(timingStatsReporter.getCurrentTimingStats(), equalTo(createDatafeedTimingStats(JOB_ID, 3, 10, 10000.0))); + reporter.reportDataCounts(null); + assertThat(reporter.getCurrentTimingStats(), equalTo(createDatafeedTimingStats(JOB_ID, 3, 10, 10000.0))); - verifyZeroInteractions(jobResultsPersister); + verifyZeroInteractions(timingStatsPersister); } public void testReportDataCounts() { - DatafeedTimingStatsReporter timingStatsReporter = createReporter(createDatafeedTimingStats(JOB_ID, 3, 20, 10000.0)); - assertThat(timingStatsReporter.getCurrentTimingStats(), equalTo(createDatafeedTimingStats(JOB_ID, 3, 20, 10000.0))); + DatafeedTimingStatsReporter reporter = createReporter(createDatafeedTimingStats(JOB_ID, 3, 20, 10000.0)); + assertThat(reporter.getCurrentTimingStats(), equalTo(createDatafeedTimingStats(JOB_ID, 3, 20, 10000.0))); - timingStatsReporter.reportDataCounts(createDataCountsWithBucketCount(1)); - assertThat(timingStatsReporter.getCurrentTimingStats(), equalTo(createDatafeedTimingStats(JOB_ID, 3, 21, 10000.0))); + reporter.reportDataCounts(createDataCounts(1)); + assertThat(reporter.getCurrentTimingStats(), equalTo(createDatafeedTimingStats(JOB_ID, 3, 21, 10000.0))); - timingStatsReporter.reportDataCounts(createDataCountsWithBucketCount(1)); - assertThat(timingStatsReporter.getCurrentTimingStats(), equalTo(createDatafeedTimingStats(JOB_ID, 3, 22, 10000.0))); + reporter.reportDataCounts(createDataCounts(1)); + assertThat(reporter.getCurrentTimingStats(), equalTo(createDatafeedTimingStats(JOB_ID, 3, 22, 10000.0))); - timingStatsReporter.reportDataCounts(createDataCountsWithBucketCount(1)); - assertThat(timingStatsReporter.getCurrentTimingStats(), equalTo(createDatafeedTimingStats(JOB_ID, 3, 23, 10000.0))); + reporter.reportDataCounts(createDataCounts(1)); + assertThat(reporter.getCurrentTimingStats(), equalTo(createDatafeedTimingStats(JOB_ID, 3, 23, 10000.0))); - InOrder inOrder = inOrder(jobResultsPersister); - inOrder.verify(jobResultsPersister).persistDatafeedTimingStats( - createDatafeedTimingStats(JOB_ID, 3, 23, 10000.0), RefreshPolicy.IMMEDIATE); - verifyNoMoreInteractions(jobResultsPersister); + InOrder inOrder = inOrder(timingStatsPersister); + inOrder.verify(timingStatsPersister).persistDatafeedTimingStats( + createDatafeedTimingStats(JOB_ID, 3, 23, 10000.0), RefreshPolicy.NONE); + verifyNoMoreInteractions(timingStatsPersister); } - private static DataCounts createDataCountsWithBucketCount(long bucketCount) { - DataCounts dataCounts = new DataCounts(JOB_ID); - dataCounts.incrementBucketCount(bucketCount); - return dataCounts; + public void testFinishReporting_NoChange() { + DatafeedTimingStatsReporter reporter = createReporter(createDatafeedTimingStats(JOB_ID, 3, 10, 10000.0)); + reporter.reportDataCounts(createDataCounts(0)); + reporter.finishReporting(); + + verifyZeroInteractions(timingStatsPersister); + } + + public void testFinishReporting_WithChange() { + DatafeedTimingStatsReporter reporter = createReporter(new DatafeedTimingStats(JOB_ID)); + reporter.reportDataCounts(createDataCounts(0, TIMESTAMP)); + reporter.finishReporting(); + + verify(timingStatsPersister).persistDatafeedTimingStats( + new DatafeedTimingStats(JOB_ID, 0, 0, 0.0, new ExponentialAverageCalculationContext(0.0, TIMESTAMP, null)), + RefreshPolicy.IMMEDIATE); + verifyNoMoreInteractions(timingStatsPersister); } public void testTimingStatsDifferSignificantly() { @@ -151,7 +168,7 @@ public class DatafeedTimingStatsReporterTests extends ESTestCase { } private DatafeedTimingStatsReporter createReporter(DatafeedTimingStats timingStats) { - return new DatafeedTimingStatsReporter(timingStats, jobResultsPersister); + return new DatafeedTimingStatsReporter(timingStats, timingStatsPersister); } private static DatafeedTimingStats createDatafeedTimingStats( @@ -171,4 +188,16 @@ public class DatafeedTimingStatsReporterTests extends ESTestCase { ExponentialAverageCalculationContext context = new ExponentialAverageCalculationContext(incrementalSearchTimeMs, null, null); return new DatafeedTimingStats(jobId, searchCount, bucketCount, totalSearchTimeMs, context); } + + private static DataCounts createDataCounts(long bucketCount, Instant latestRecordTimestamp) { + DataCounts dataCounts = createDataCounts(bucketCount); + dataCounts.setLatestRecordTimeStamp(Date.from(latestRecordTimestamp)); + return dataCounts; + } + + private static DataCounts createDataCounts(long bucketCount) { + DataCounts dataCounts = new DataCounts(JOB_ID); + dataCounts.incrementBucketCount(bucketCount); + return dataCounts; + } } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationDataExtractorTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationDataExtractorTests.java index 6d9db043755..537a6b44d0b 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationDataExtractorTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationDataExtractorTests.java @@ -20,8 +20,8 @@ import org.elasticsearch.search.aggregations.bucket.histogram.Histogram; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedTimingStats; import org.elasticsearch.xpack.ml.datafeed.DatafeedTimingStatsReporter; +import org.elasticsearch.xpack.ml.datafeed.DatafeedTimingStatsReporter.DatafeedTimingStatsPersister; import org.elasticsearch.xpack.ml.datafeed.extractor.aggregation.AggregationTestUtils.Term; -import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister; import org.junit.Before; import java.io.BufferedReader; @@ -93,7 +93,7 @@ public class AggregationDataExtractorTests extends ESTestCase { .addAggregator(AggregationBuilders.histogram("time").field("time").interval(1000).subAggregation( AggregationBuilders.terms("airline").field("airline").subAggregation( AggregationBuilders.avg("responsetime").field("responsetime")))); - timingStatsReporter = new DatafeedTimingStatsReporter(new DatafeedTimingStats(jobId), mock(JobResultsPersister.class)); + timingStatsReporter = new DatafeedTimingStatsReporter(new DatafeedTimingStats(jobId), mock(DatafeedTimingStatsPersister.class)); } public void testExtraction() throws IOException { diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/chunked/ChunkedDataExtractorTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/chunked/ChunkedDataExtractorTests.java index 77ebc9651dd..e7ac3625439 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/chunked/ChunkedDataExtractorTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/chunked/ChunkedDataExtractorTests.java @@ -26,8 +26,8 @@ import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedTimingStats; import org.elasticsearch.xpack.core.ml.datafeed.extractor.DataExtractor; import org.elasticsearch.xpack.ml.datafeed.DatafeedTimingStatsReporter; +import org.elasticsearch.xpack.ml.datafeed.DatafeedTimingStatsReporter.DatafeedTimingStatsPersister; import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory; -import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister; import org.junit.Before; import java.io.IOException; @@ -93,7 +93,7 @@ public class ChunkedDataExtractorTests extends ESTestCase { scrollSize = 1000; chunkSpan = null; dataExtractorFactory = mock(DataExtractorFactory.class); - timingStatsReporter = new DatafeedTimingStatsReporter(new DatafeedTimingStats(jobId), mock(JobResultsPersister.class)); + timingStatsReporter = new DatafeedTimingStatsReporter(new DatafeedTimingStats(jobId), mock(DatafeedTimingStatsPersister.class)); } public void testExtractionGivenNoData() throws IOException { diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/ScrollDataExtractorTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/ScrollDataExtractorTests.java index f4074b0f5b4..c383cf20b18 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/ScrollDataExtractorTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/ScrollDataExtractorTests.java @@ -31,9 +31,9 @@ import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedTimingStats; import org.elasticsearch.xpack.ml.datafeed.DatafeedTimingStatsReporter; +import org.elasticsearch.xpack.ml.datafeed.DatafeedTimingStatsReporter.DatafeedTimingStatsPersister; import org.elasticsearch.xpack.ml.datafeed.extractor.fields.ExtractedField; import org.elasticsearch.xpack.ml.datafeed.extractor.fields.TimeBasedExtractedFields; -import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister; import org.junit.Before; import org.mockito.ArgumentCaptor; @@ -146,7 +146,7 @@ public class ScrollDataExtractorTests extends ESTestCase { clearScrollFuture = mock(ActionFuture.class); capturedClearScrollRequests = ArgumentCaptor.forClass(ClearScrollRequest.class); when(client.execute(same(ClearScrollAction.INSTANCE), capturedClearScrollRequests.capture())).thenReturn(clearScrollFuture); - timingStatsReporter = new DatafeedTimingStatsReporter(new DatafeedTimingStats(jobId), mock(JobResultsPersister.class)); + timingStatsReporter = new DatafeedTimingStatsReporter(new DatafeedTimingStats(jobId), mock(DatafeedTimingStatsPersister.class)); } public void testSinglePageExtraction() throws IOException { diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/persistence/TimingStatsReporterTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/persistence/TimingStatsReporterTests.java index 9e1e5646e11..d8a4d5aec24 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/persistence/TimingStatsReporterTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/persistence/TimingStatsReporterTests.java @@ -22,6 +22,7 @@ import static org.hamcrest.Matchers.is; import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.verifyZeroInteractions; public class TimingStatsReporterTests extends ESTestCase { @@ -61,7 +62,7 @@ public class TimingStatsReporterTests extends ESTestCase { InOrder inOrder = inOrder(bulkResultsPersister); inOrder.verify(bulkResultsPersister).persistTimingStats(createTimingStats(JOB_ID, 1, 10.0, 10.0, 10.0, 10.0, 10.0)); inOrder.verify(bulkResultsPersister).persistTimingStats(createTimingStats(JOB_ID, 2, 10.0, 20.0, 15.0, 10.1, 30.0)); - inOrder.verifyNoMoreInteractions(); + verifyNoMoreInteractions(bulkResultsPersister); } public void testFinishReporting() { @@ -83,25 +84,23 @@ public class TimingStatsReporterTests extends ESTestCase { InOrder inOrder = inOrder(bulkResultsPersister); inOrder.verify(bulkResultsPersister).persistTimingStats(createTimingStats(JOB_ID, 1, 10.0, 10.0, 10.0, 10.0, 10.0)); inOrder.verify(bulkResultsPersister).persistTimingStats(createTimingStats(JOB_ID, 3, 10.0, 10.0, 10.0, 10.0, 30.0)); - inOrder.verifyNoMoreInteractions(); + verifyNoMoreInteractions(bulkResultsPersister); } - public void testFinishReportingNoChange() { + public void testFinishReporting_NoChange() { TimingStatsReporter reporter = createReporter(new TimingStats(JOB_ID)); - reporter.finishReporting(); verifyZeroInteractions(bulkResultsPersister); } - public void testFinishReportingWithChange() { + public void testFinishReporting_WithChange() { TimingStatsReporter reporter = createReporter(new TimingStats(JOB_ID)); - reporter.reportBucket(createBucket(10)); - reporter.finishReporting(); verify(bulkResultsPersister).persistTimingStats(createTimingStats(JOB_ID, 1, 10.0, 10.0, 10.0, 10.0, 10.0)); + verifyNoMoreInteractions(bulkResultsPersister); } public void testTimingStatsDifferSignificantly() {