Persist DatafeedTimingStats with RefreshPolicy.NONE by default (#44940) (#45079)

This commit is contained in:
Przemysław Witek 2019-08-01 14:36:59 +02:00 committed by GitHub
parent 2fc4b76ba8
commit 6c87845fc1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 131 additions and 88 deletions

View File

@ -24,7 +24,6 @@ import org.elasticsearch.xpack.ml.datafeed.DatafeedTimingStatsReporter;
import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory;
import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider;
import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider;
import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister;
import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider;
import java.io.BufferedReader;
@ -42,21 +41,19 @@ public class TransportPreviewDatafeedAction extends HandledTransportAction<Previ
private final JobConfigProvider jobConfigProvider;
private final DatafeedConfigProvider datafeedConfigProvider;
private final JobResultsProvider jobResultsProvider;
private final JobResultsPersister jobResultsPersister;
private final NamedXContentRegistry xContentRegistry;
@Inject
public TransportPreviewDatafeedAction(ThreadPool threadPool, TransportService transportService,
ActionFilters actionFilters, Client client, JobConfigProvider jobConfigProvider,
DatafeedConfigProvider datafeedConfigProvider, JobResultsProvider jobResultsProvider,
JobResultsPersister jobResultsPersister, NamedXContentRegistry xContentRegistry) {
NamedXContentRegistry xContentRegistry) {
super(PreviewDatafeedAction.NAME, transportService, actionFilters, PreviewDatafeedAction.Request::new);
this.threadPool = threadPool;
this.client = client;
this.jobConfigProvider = jobConfigProvider;
this.datafeedConfigProvider = datafeedConfigProvider;
this.jobResultsProvider = jobResultsProvider;
this.jobResultsPersister = jobResultsPersister;
this.xContentRegistry = xContentRegistry;
}
@ -83,7 +80,8 @@ public class TransportPreviewDatafeedAction extends HandledTransportAction<Previ
previewDatafeed.build(),
jobBuilder.build(),
xContentRegistry,
new DatafeedTimingStatsReporter(timingStats, jobResultsPersister),
// Fake DatafeedTimingStatsReporter that does not have access to results index
new DatafeedTimingStatsReporter(timingStats, (ts, refreshPolicy) -> {}),
new ActionListener<DataExtractorFactory>() {
@Override
public void onResponse(DataExtractorFactory dataExtractorFactory) {

View File

@ -55,7 +55,6 @@ import org.elasticsearch.xpack.ml.datafeed.DatafeedTimingStatsReporter;
import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory;
import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider;
import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider;
import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister;
import org.elasticsearch.xpack.ml.notifications.Auditor;
import java.io.IOException;
@ -82,7 +81,6 @@ public class TransportStartDatafeedAction extends TransportMasterNodeAction<Star
private final PersistentTasksService persistentTasksService;
private final JobConfigProvider jobConfigProvider;
private final DatafeedConfigProvider datafeedConfigProvider;
private final JobResultsPersister jobResultsPersister;
private final Auditor auditor;
private final MlConfigMigrationEligibilityCheck migrationEligibilityCheck;
private final NamedXContentRegistry xContentRegistry;
@ -93,7 +91,7 @@ public class TransportStartDatafeedAction extends TransportMasterNodeAction<Star
PersistentTasksService persistentTasksService,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
Client client, JobConfigProvider jobConfigProvider, DatafeedConfigProvider datafeedConfigProvider,
JobResultsPersister jobResultsPersister, Auditor auditor, NamedXContentRegistry xContentRegistry) {
Auditor auditor, NamedXContentRegistry xContentRegistry) {
super(StartDatafeedAction.NAME, transportService, clusterService, threadPool, actionFilters, StartDatafeedAction.Request::new,
indexNameExpressionResolver);
this.licenseState = licenseState;
@ -101,7 +99,6 @@ public class TransportStartDatafeedAction extends TransportMasterNodeAction<Star
this.client = client;
this.jobConfigProvider = jobConfigProvider;
this.datafeedConfigProvider = datafeedConfigProvider;
this.jobResultsPersister = jobResultsPersister;
this.auditor = auditor;
this.migrationEligibilityCheck = new MlConfigMigrationEligibilityCheck(settings, clusterService);
this.xContentRegistry = xContentRegistry;
@ -250,8 +247,8 @@ public class TransportStartDatafeedAction extends TransportMasterNodeAction<Star
datafeed,
job,
xContentRegistry,
// Creating fake DatafeedTimingStatsReporter so that search API call is not needed.
new DatafeedTimingStatsReporter(new DatafeedTimingStats(job.getId()), jobResultsPersister),
// Fake DatafeedTimingStatsReporter that does not have access to results index
new DatafeedTimingStatsReporter(new DatafeedTimingStats(job.getId()), (ts, refreshPolicy) -> {}),
ActionListener.wrap(
unused ->
persistentTasksService.sendStartRequest(

View File

@ -107,6 +107,10 @@ class DatafeedJob {
return jobId;
}
public void finishReportingTimingStats() {
timingStatsReporter.finishReporting();
}
Long runLookBack(long startTime, Long endTime) throws Exception {
lookbackStartTimeMs = skipToStartTime(startTime);
Optional<Long> endMs = Optional.ofNullable(endTime);

View File

@ -101,8 +101,9 @@ public class DatafeedJobBuilder {
);
// Create data extractor factory
Consumer<DatafeedTimingStats> datafeedTimingStatsHandler = timingStats -> {
context.timingStatsReporter = new DatafeedTimingStatsReporter(timingStats, jobResultsPersister);
Consumer<DatafeedTimingStats> datafeedTimingStatsHandler = initialTimingStats -> {
context.timingStatsReporter =
new DatafeedTimingStatsReporter(initialTimingStats, jobResultsPersister::persistDatafeedTimingStats);
DataExtractorFactory.create(
client,
datafeedConfigHolder.get(),

View File

@ -347,6 +347,7 @@ public class DatafeedManager {
}
auditor.info(datafeedJob.getJobId(),
Messages.getMessage(isIsolated() ? Messages.JOB_AUDIT_DATAFEED_ISOLATED : Messages.JOB_AUDIT_DATAFEED_STOPPED));
datafeedJob.finishReportingTimingStats();
finishHandler.accept(e);
logger.info("[{}] datafeed [{}] for job [{}] has been stopped{}", source, datafeedId, datafeedJob.getJobId(),
acquired ? "" : ", but there may be pending tasks as the timeout [" + timeout.getStringRep() + "] expired");

View File

@ -9,7 +9,6 @@ import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedTimingStats;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts;
import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister;
import java.util.Objects;
@ -21,20 +20,28 @@ import java.util.Objects;
*/
public class DatafeedTimingStatsReporter {
/** Interface used for persisting current timing stats to the results index. */
@FunctionalInterface
public interface DatafeedTimingStatsPersister {
/** Does nothing by default. This behavior is useful when creating fake {@link DatafeedTimingStatsReporter} objects. */
void persistDatafeedTimingStats(DatafeedTimingStats timingStats, WriteRequest.RefreshPolicy refreshPolicy);
}
/** Persisted timing stats. May be stale. */
private DatafeedTimingStats persistedTimingStats;
/** Current timing stats. */
private volatile DatafeedTimingStats currentTimingStats;
/** Object used to persist current timing stats. */
private final JobResultsPersister jobResultsPersister;
private final DatafeedTimingStatsPersister persister;
public DatafeedTimingStatsReporter(DatafeedTimingStats timingStats, JobResultsPersister jobResultsPersister) {
public DatafeedTimingStatsReporter(DatafeedTimingStats timingStats, DatafeedTimingStatsPersister persister) {
Objects.requireNonNull(timingStats);
this.persistedTimingStats = new DatafeedTimingStats(timingStats);
this.currentTimingStats = new DatafeedTimingStats(timingStats);
this.jobResultsPersister = Objects.requireNonNull(jobResultsPersister);
this.persister = Objects.requireNonNull(persister);
}
/** Gets current timing stats. */
public DatafeedTimingStats getCurrentTimingStats() {
return new DatafeedTimingStats(currentTimingStats);
}
@ -64,16 +71,23 @@ public class DatafeedTimingStatsReporter {
flushIfDifferSignificantly();
}
private void flushIfDifferSignificantly() {
if (differSignificantly(currentTimingStats, persistedTimingStats)) {
flush();
/** Finishes reporting of timing stats. Makes timing stats persisted immediately. */
public void finishReporting() {
// Don't flush if current timing stats are identical to the persisted ones
if (currentTimingStats.equals(persistedTimingStats) == false) {
flush(WriteRequest.RefreshPolicy.IMMEDIATE);
}
}
private void flush() {
private void flushIfDifferSignificantly() {
if (differSignificantly(currentTimingStats, persistedTimingStats)) {
flush(WriteRequest.RefreshPolicy.NONE);
}
}
private void flush(WriteRequest.RefreshPolicy refreshPolicy) {
persistedTimingStats = new DatafeedTimingStats(currentTimingStats);
// TODO: Consider changing refresh policy to NONE here and only do IMMEDIATE on datafeed _stop action
jobResultsPersister.persistDatafeedTimingStats(persistedTimingStats, WriteRequest.RefreshPolicy.IMMEDIATE);
persister.persistDatafeedTimingStats(persistedTimingStats, refreshPolicy);
}
/**

View File

@ -11,10 +11,13 @@ import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedTimingStats;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts;
import org.elasticsearch.xpack.core.ml.utils.ExponentialAverageCalculationContext;
import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister;
import org.elasticsearch.xpack.ml.datafeed.DatafeedTimingStatsReporter.DatafeedTimingStatsPersister;
import org.junit.Before;
import org.mockito.InOrder;
import java.sql.Date;
import java.time.Instant;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.mockito.Mockito.inOrder;
@ -26,93 +29,107 @@ import static org.mockito.Mockito.verifyZeroInteractions;
public class DatafeedTimingStatsReporterTests extends ESTestCase {
private static final String JOB_ID = "my-job-id";
private static final Instant TIMESTAMP = Instant.ofEpochMilli(1000000000);
private static final TimeValue ONE_SECOND = TimeValue.timeValueSeconds(1);
private JobResultsPersister jobResultsPersister;
private DatafeedTimingStatsPersister timingStatsPersister;
@Before
public void setUpTests() {
jobResultsPersister = mock(JobResultsPersister.class);
timingStatsPersister = mock(DatafeedTimingStatsPersister.class);
}
public void testReportSearchDuration_Null() {
DatafeedTimingStatsReporter timingStatsReporter = createReporter(createDatafeedTimingStats(JOB_ID, 3, 10, 10000.0));
assertThat(timingStatsReporter.getCurrentTimingStats(), equalTo(createDatafeedTimingStats(JOB_ID, 3, 10, 10000.0)));
DatafeedTimingStatsReporter reporter = createReporter(createDatafeedTimingStats(JOB_ID, 3, 10, 10000.0));
assertThat(reporter.getCurrentTimingStats(), equalTo(createDatafeedTimingStats(JOB_ID, 3, 10, 10000.0)));
timingStatsReporter.reportSearchDuration(null);
assertThat(timingStatsReporter.getCurrentTimingStats(), equalTo(createDatafeedTimingStats(JOB_ID, 3, 10, 10000.0)));
reporter.reportSearchDuration(null);
assertThat(reporter.getCurrentTimingStats(), equalTo(createDatafeedTimingStats(JOB_ID, 3, 10, 10000.0)));
verifyZeroInteractions(jobResultsPersister);
verifyZeroInteractions(timingStatsPersister);
}
public void testReportSearchDuration_Zero() {
DatafeedTimingStatsReporter timingStatsReporter = createReporter(createDatafeedTimingStats(JOB_ID, 0, 0, 0.0));
assertThat(timingStatsReporter.getCurrentTimingStats(), equalTo(createDatafeedTimingStats(JOB_ID, 0, 0, 0.0)));
DatafeedTimingStatsReporter reporter = createReporter(createDatafeedTimingStats(JOB_ID, 0, 0, 0.0));
assertThat(reporter.getCurrentTimingStats(), equalTo(createDatafeedTimingStats(JOB_ID, 0, 0, 0.0)));
timingStatsReporter.reportSearchDuration(TimeValue.ZERO);
assertThat(timingStatsReporter.getCurrentTimingStats(), equalTo(createDatafeedTimingStats(JOB_ID, 1, 0, 0.0)));
reporter.reportSearchDuration(TimeValue.ZERO);
assertThat(reporter.getCurrentTimingStats(), equalTo(createDatafeedTimingStats(JOB_ID, 1, 0, 0.0)));
verify(jobResultsPersister).persistDatafeedTimingStats(createDatafeedTimingStats(JOB_ID, 1, 0, 0.0), RefreshPolicy.IMMEDIATE);
verifyNoMoreInteractions(jobResultsPersister);
verify(timingStatsPersister).persistDatafeedTimingStats(createDatafeedTimingStats(JOB_ID, 1, 0, 0.0), RefreshPolicy.NONE);
verifyNoMoreInteractions(timingStatsPersister);
}
public void testReportSearchDuration() {
DatafeedTimingStatsReporter timingStatsReporter = createReporter(createDatafeedTimingStats(JOB_ID, 13, 10, 10000.0, 10000.0));
assertThat(timingStatsReporter.getCurrentTimingStats(), equalTo(createDatafeedTimingStats(JOB_ID, 13, 10, 10000.0, 10000.0)));
DatafeedTimingStatsReporter reporter = createReporter(createDatafeedTimingStats(JOB_ID, 13, 10, 10000.0, 10000.0));
assertThat(reporter.getCurrentTimingStats(), equalTo(createDatafeedTimingStats(JOB_ID, 13, 10, 10000.0, 10000.0)));
timingStatsReporter.reportSearchDuration(ONE_SECOND);
assertThat(timingStatsReporter.getCurrentTimingStats(), equalTo(createDatafeedTimingStats(JOB_ID, 14, 10, 11000.0, 11000.0)));
reporter.reportSearchDuration(ONE_SECOND);
assertThat(reporter.getCurrentTimingStats(), equalTo(createDatafeedTimingStats(JOB_ID, 14, 10, 11000.0, 11000.0)));
timingStatsReporter.reportSearchDuration(ONE_SECOND);
assertThat(timingStatsReporter.getCurrentTimingStats(), equalTo(createDatafeedTimingStats(JOB_ID, 15, 10, 12000.0, 12000.0)));
reporter.reportSearchDuration(ONE_SECOND);
assertThat(reporter.getCurrentTimingStats(), equalTo(createDatafeedTimingStats(JOB_ID, 15, 10, 12000.0, 12000.0)));
timingStatsReporter.reportSearchDuration(ONE_SECOND);
assertThat(timingStatsReporter.getCurrentTimingStats(), equalTo(createDatafeedTimingStats(JOB_ID, 16, 10, 13000.0, 13000.0)));
reporter.reportSearchDuration(ONE_SECOND);
assertThat(reporter.getCurrentTimingStats(), equalTo(createDatafeedTimingStats(JOB_ID, 16, 10, 13000.0, 13000.0)));
timingStatsReporter.reportSearchDuration(ONE_SECOND);
assertThat(timingStatsReporter.getCurrentTimingStats(), equalTo(createDatafeedTimingStats(JOB_ID, 17, 10, 14000.0, 14000.0)));
reporter.reportSearchDuration(ONE_SECOND);
assertThat(reporter.getCurrentTimingStats(), equalTo(createDatafeedTimingStats(JOB_ID, 17, 10, 14000.0, 14000.0)));
InOrder inOrder = inOrder(jobResultsPersister);
inOrder.verify(jobResultsPersister).persistDatafeedTimingStats(
createDatafeedTimingStats(JOB_ID, 15, 10, 12000.0, 12000.0), RefreshPolicy.IMMEDIATE);
inOrder.verify(jobResultsPersister).persistDatafeedTimingStats(
createDatafeedTimingStats(JOB_ID, 17, 10, 14000.0, 14000.0), RefreshPolicy.IMMEDIATE);
verifyNoMoreInteractions(jobResultsPersister);
InOrder inOrder = inOrder(timingStatsPersister);
inOrder.verify(timingStatsPersister).persistDatafeedTimingStats(
createDatafeedTimingStats(JOB_ID, 15, 10, 12000.0, 12000.0), RefreshPolicy.NONE);
inOrder.verify(timingStatsPersister).persistDatafeedTimingStats(
createDatafeedTimingStats(JOB_ID, 17, 10, 14000.0, 14000.0), RefreshPolicy.NONE);
verifyNoMoreInteractions(timingStatsPersister);
}
public void testReportDataCounts_Null() {
DatafeedTimingStatsReporter timingStatsReporter = createReporter(createDatafeedTimingStats(JOB_ID, 3, 10, 10000.0));
assertThat(timingStatsReporter.getCurrentTimingStats(), equalTo(createDatafeedTimingStats(JOB_ID, 3, 10, 10000.0)));
DatafeedTimingStatsReporter reporter = createReporter(createDatafeedTimingStats(JOB_ID, 3, 10, 10000.0));
assertThat(reporter.getCurrentTimingStats(), equalTo(createDatafeedTimingStats(JOB_ID, 3, 10, 10000.0)));
timingStatsReporter.reportDataCounts(null);
assertThat(timingStatsReporter.getCurrentTimingStats(), equalTo(createDatafeedTimingStats(JOB_ID, 3, 10, 10000.0)));
reporter.reportDataCounts(null);
assertThat(reporter.getCurrentTimingStats(), equalTo(createDatafeedTimingStats(JOB_ID, 3, 10, 10000.0)));
verifyZeroInteractions(jobResultsPersister);
verifyZeroInteractions(timingStatsPersister);
}
public void testReportDataCounts() {
DatafeedTimingStatsReporter timingStatsReporter = createReporter(createDatafeedTimingStats(JOB_ID, 3, 20, 10000.0));
assertThat(timingStatsReporter.getCurrentTimingStats(), equalTo(createDatafeedTimingStats(JOB_ID, 3, 20, 10000.0)));
DatafeedTimingStatsReporter reporter = createReporter(createDatafeedTimingStats(JOB_ID, 3, 20, 10000.0));
assertThat(reporter.getCurrentTimingStats(), equalTo(createDatafeedTimingStats(JOB_ID, 3, 20, 10000.0)));
timingStatsReporter.reportDataCounts(createDataCountsWithBucketCount(1));
assertThat(timingStatsReporter.getCurrentTimingStats(), equalTo(createDatafeedTimingStats(JOB_ID, 3, 21, 10000.0)));
reporter.reportDataCounts(createDataCounts(1));
assertThat(reporter.getCurrentTimingStats(), equalTo(createDatafeedTimingStats(JOB_ID, 3, 21, 10000.0)));
timingStatsReporter.reportDataCounts(createDataCountsWithBucketCount(1));
assertThat(timingStatsReporter.getCurrentTimingStats(), equalTo(createDatafeedTimingStats(JOB_ID, 3, 22, 10000.0)));
reporter.reportDataCounts(createDataCounts(1));
assertThat(reporter.getCurrentTimingStats(), equalTo(createDatafeedTimingStats(JOB_ID, 3, 22, 10000.0)));
timingStatsReporter.reportDataCounts(createDataCountsWithBucketCount(1));
assertThat(timingStatsReporter.getCurrentTimingStats(), equalTo(createDatafeedTimingStats(JOB_ID, 3, 23, 10000.0)));
reporter.reportDataCounts(createDataCounts(1));
assertThat(reporter.getCurrentTimingStats(), equalTo(createDatafeedTimingStats(JOB_ID, 3, 23, 10000.0)));
InOrder inOrder = inOrder(jobResultsPersister);
inOrder.verify(jobResultsPersister).persistDatafeedTimingStats(
createDatafeedTimingStats(JOB_ID, 3, 23, 10000.0), RefreshPolicy.IMMEDIATE);
verifyNoMoreInteractions(jobResultsPersister);
InOrder inOrder = inOrder(timingStatsPersister);
inOrder.verify(timingStatsPersister).persistDatafeedTimingStats(
createDatafeedTimingStats(JOB_ID, 3, 23, 10000.0), RefreshPolicy.NONE);
verifyNoMoreInteractions(timingStatsPersister);
}
private static DataCounts createDataCountsWithBucketCount(long bucketCount) {
DataCounts dataCounts = new DataCounts(JOB_ID);
dataCounts.incrementBucketCount(bucketCount);
return dataCounts;
public void testFinishReporting_NoChange() {
DatafeedTimingStatsReporter reporter = createReporter(createDatafeedTimingStats(JOB_ID, 3, 10, 10000.0));
reporter.reportDataCounts(createDataCounts(0));
reporter.finishReporting();
verifyZeroInteractions(timingStatsPersister);
}
public void testFinishReporting_WithChange() {
DatafeedTimingStatsReporter reporter = createReporter(new DatafeedTimingStats(JOB_ID));
reporter.reportDataCounts(createDataCounts(0, TIMESTAMP));
reporter.finishReporting();
verify(timingStatsPersister).persistDatafeedTimingStats(
new DatafeedTimingStats(JOB_ID, 0, 0, 0.0, new ExponentialAverageCalculationContext(0.0, TIMESTAMP, null)),
RefreshPolicy.IMMEDIATE);
verifyNoMoreInteractions(timingStatsPersister);
}
public void testTimingStatsDifferSignificantly() {
@ -151,7 +168,7 @@ public class DatafeedTimingStatsReporterTests extends ESTestCase {
}
private DatafeedTimingStatsReporter createReporter(DatafeedTimingStats timingStats) {
return new DatafeedTimingStatsReporter(timingStats, jobResultsPersister);
return new DatafeedTimingStatsReporter(timingStats, timingStatsPersister);
}
private static DatafeedTimingStats createDatafeedTimingStats(
@ -171,4 +188,16 @@ public class DatafeedTimingStatsReporterTests extends ESTestCase {
ExponentialAverageCalculationContext context = new ExponentialAverageCalculationContext(incrementalSearchTimeMs, null, null);
return new DatafeedTimingStats(jobId, searchCount, bucketCount, totalSearchTimeMs, context);
}
private static DataCounts createDataCounts(long bucketCount, Instant latestRecordTimestamp) {
DataCounts dataCounts = createDataCounts(bucketCount);
dataCounts.setLatestRecordTimeStamp(Date.from(latestRecordTimestamp));
return dataCounts;
}
private static DataCounts createDataCounts(long bucketCount) {
DataCounts dataCounts = new DataCounts(JOB_ID);
dataCounts.incrementBucketCount(bucketCount);
return dataCounts;
}
}

View File

@ -20,8 +20,8 @@ import org.elasticsearch.search.aggregations.bucket.histogram.Histogram;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedTimingStats;
import org.elasticsearch.xpack.ml.datafeed.DatafeedTimingStatsReporter;
import org.elasticsearch.xpack.ml.datafeed.DatafeedTimingStatsReporter.DatafeedTimingStatsPersister;
import org.elasticsearch.xpack.ml.datafeed.extractor.aggregation.AggregationTestUtils.Term;
import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister;
import org.junit.Before;
import java.io.BufferedReader;
@ -93,7 +93,7 @@ public class AggregationDataExtractorTests extends ESTestCase {
.addAggregator(AggregationBuilders.histogram("time").field("time").interval(1000).subAggregation(
AggregationBuilders.terms("airline").field("airline").subAggregation(
AggregationBuilders.avg("responsetime").field("responsetime"))));
timingStatsReporter = new DatafeedTimingStatsReporter(new DatafeedTimingStats(jobId), mock(JobResultsPersister.class));
timingStatsReporter = new DatafeedTimingStatsReporter(new DatafeedTimingStats(jobId), mock(DatafeedTimingStatsPersister.class));
}
public void testExtraction() throws IOException {

View File

@ -26,8 +26,8 @@ import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedTimingStats;
import org.elasticsearch.xpack.core.ml.datafeed.extractor.DataExtractor;
import org.elasticsearch.xpack.ml.datafeed.DatafeedTimingStatsReporter;
import org.elasticsearch.xpack.ml.datafeed.DatafeedTimingStatsReporter.DatafeedTimingStatsPersister;
import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory;
import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister;
import org.junit.Before;
import java.io.IOException;
@ -93,7 +93,7 @@ public class ChunkedDataExtractorTests extends ESTestCase {
scrollSize = 1000;
chunkSpan = null;
dataExtractorFactory = mock(DataExtractorFactory.class);
timingStatsReporter = new DatafeedTimingStatsReporter(new DatafeedTimingStats(jobId), mock(JobResultsPersister.class));
timingStatsReporter = new DatafeedTimingStatsReporter(new DatafeedTimingStats(jobId), mock(DatafeedTimingStatsPersister.class));
}
public void testExtractionGivenNoData() throws IOException {

View File

@ -31,9 +31,9 @@ import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedTimingStats;
import org.elasticsearch.xpack.ml.datafeed.DatafeedTimingStatsReporter;
import org.elasticsearch.xpack.ml.datafeed.DatafeedTimingStatsReporter.DatafeedTimingStatsPersister;
import org.elasticsearch.xpack.ml.datafeed.extractor.fields.ExtractedField;
import org.elasticsearch.xpack.ml.datafeed.extractor.fields.TimeBasedExtractedFields;
import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister;
import org.junit.Before;
import org.mockito.ArgumentCaptor;
@ -146,7 +146,7 @@ public class ScrollDataExtractorTests extends ESTestCase {
clearScrollFuture = mock(ActionFuture.class);
capturedClearScrollRequests = ArgumentCaptor.forClass(ClearScrollRequest.class);
when(client.execute(same(ClearScrollAction.INSTANCE), capturedClearScrollRequests.capture())).thenReturn(clearScrollFuture);
timingStatsReporter = new DatafeedTimingStatsReporter(new DatafeedTimingStats(jobId), mock(JobResultsPersister.class));
timingStatsReporter = new DatafeedTimingStatsReporter(new DatafeedTimingStats(jobId), mock(DatafeedTimingStatsPersister.class));
}
public void testSinglePageExtraction() throws IOException {

View File

@ -22,6 +22,7 @@ import static org.hamcrest.Matchers.is;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.verifyZeroInteractions;
public class TimingStatsReporterTests extends ESTestCase {
@ -61,7 +62,7 @@ public class TimingStatsReporterTests extends ESTestCase {
InOrder inOrder = inOrder(bulkResultsPersister);
inOrder.verify(bulkResultsPersister).persistTimingStats(createTimingStats(JOB_ID, 1, 10.0, 10.0, 10.0, 10.0, 10.0));
inOrder.verify(bulkResultsPersister).persistTimingStats(createTimingStats(JOB_ID, 2, 10.0, 20.0, 15.0, 10.1, 30.0));
inOrder.verifyNoMoreInteractions();
verifyNoMoreInteractions(bulkResultsPersister);
}
public void testFinishReporting() {
@ -83,25 +84,23 @@ public class TimingStatsReporterTests extends ESTestCase {
InOrder inOrder = inOrder(bulkResultsPersister);
inOrder.verify(bulkResultsPersister).persistTimingStats(createTimingStats(JOB_ID, 1, 10.0, 10.0, 10.0, 10.0, 10.0));
inOrder.verify(bulkResultsPersister).persistTimingStats(createTimingStats(JOB_ID, 3, 10.0, 10.0, 10.0, 10.0, 30.0));
inOrder.verifyNoMoreInteractions();
verifyNoMoreInteractions(bulkResultsPersister);
}
public void testFinishReportingNoChange() {
public void testFinishReporting_NoChange() {
TimingStatsReporter reporter = createReporter(new TimingStats(JOB_ID));
reporter.finishReporting();
verifyZeroInteractions(bulkResultsPersister);
}
public void testFinishReportingWithChange() {
public void testFinishReporting_WithChange() {
TimingStatsReporter reporter = createReporter(new TimingStats(JOB_ID));
reporter.reportBucket(createBucket(10));
reporter.finishReporting();
verify(bulkResultsPersister).persistTimingStats(createTimingStats(JOB_ID, 1, 10.0, 10.0, 10.0, 10.0, 10.0));
verifyNoMoreInteractions(bulkResultsPersister);
}
public void testTimingStatsDifferSignificantly() {