From aff9a4a2baa6c1a28b7c528a2993fb9e24ea51fb Mon Sep 17 00:00:00 2001 From: Dimitris Athanasiou Date: Mon, 15 Jan 2018 18:07:48 +0000 Subject: [PATCH] [ML] Autodetect should receive events from the earliest valid timestamp (elastic/x-pack-elasticsearch#3570) When events are searched to be passed to the autodetect process, they are currently calculated based on the latest record timestamp, when a job opens, and `now` when the process is updated. This commit changes both to be consistent and based on the earliest valid timestamp for the job. The earliest valid timestamp is the latest record timestamp minus the job latency. Relates elastic/x-pack-elasticsearch#3016 Original commit: elastic/x-pack-elasticsearch@7f882ea0531ccb2a965c9d5e98cfab6cd199c56f --- .../xpack/ml/job/config/Job.java | 18 +++++++++++++ .../xpack/ml/job/persistence/JobProvider.java | 5 +--- .../ScheduledEventsQueryBuilder.java | 5 ++++ .../TransportPostCalendarEventsAction.java | 3 +-- .../ml/action/TransportPutCalendarAction.java | 6 +---- .../autodetect/AutodetectProcessManager.java | 4 +-- .../xpack/ml/job/config/JobTests.java | 25 +++++++++++++++++++ 7 files changed, 53 insertions(+), 13 deletions(-) diff --git a/plugin/core/src/main/java/org/elasticsearch/xpack/ml/job/config/Job.java b/plugin/core/src/main/java/org/elasticsearch/xpack/ml/job/config/Job.java index 2e90be13d10..5ee53a0cef7 100644 --- a/plugin/core/src/main/java/org/elasticsearch/xpack/ml/job/config/Job.java +++ b/plugin/core/src/main/java/org/elasticsearch/xpack/ml/job/config/Job.java @@ -26,6 +26,7 @@ import org.elasticsearch.index.analysis.AnalysisRegistry; import org.elasticsearch.xpack.ml.MlParserType; import org.elasticsearch.xpack.ml.job.messages.Messages; import org.elasticsearch.xpack.ml.job.persistence.AnomalyDetectorsIndexFields; +import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts; import org.elasticsearch.xpack.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.ml.utils.MlStrings; import org.elasticsearch.xpack.ml.utils.time.TimeUtils; @@ -450,6 +451,23 @@ public class Job extends AbstractDiffable implements Writeable, ToXContentO + PROCESS_MEMORY_OVERHEAD.getBytes(); } + /** + * Returns the timestamp before which data is not accepted by the job. + * This is the latest record timestamp minus the job latency. + * @param dataCounts the job data counts + * @return the timestamp before which data is not accepted by the job + */ + public long earliestValidTimestamp(DataCounts dataCounts) { + long currentTime = 0; + Date latestRecordTimestamp = dataCounts.getLatestRecordTimeStamp(); + if (latestRecordTimestamp != null) { + TimeValue latency = analysisConfig.getLatency(); + long latencyMillis = latency == null ? 0 : latency.millis(); + currentTime = latestRecordTimestamp.getTime() - latencyMillis; + } + return currentTime; + } + @Override public void writeTo(StreamOutput out) throws IOException { out.writeString(jobId); diff --git a/plugin/core/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobProvider.java b/plugin/core/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobProvider.java index f81e37c99e2..d9617038ef0 100644 --- a/plugin/core/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobProvider.java +++ b/plugin/core/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobProvider.java @@ -369,10 +369,7 @@ public class JobProvider { ActionListener getScheduledEventsListener = ActionListener.wrap( paramsBuilder -> { ScheduledEventsQueryBuilder scheduledEventsQueryBuilder = new ScheduledEventsQueryBuilder(); - Date lastestRecordTime = paramsBuilder.getDataCounts().getLatestRecordTimeStamp(); - if (lastestRecordTime != null) { - scheduledEventsQueryBuilder.start(Long.toString(lastestRecordTime.getTime())); - } + scheduledEventsQueryBuilder.start(job.earliestValidTimestamp(paramsBuilder.getDataCounts())); scheduledEventsForJob(jobId, job.getGroups(), scheduledEventsQueryBuilder, ActionListener.wrap( events -> { paramsBuilder.setScheduledEvents(events.results()); diff --git a/plugin/core/src/main/java/org/elasticsearch/xpack/ml/job/persistence/ScheduledEventsQueryBuilder.java b/plugin/core/src/main/java/org/elasticsearch/xpack/ml/job/persistence/ScheduledEventsQueryBuilder.java index 954c52f0156..5fbc9433941 100644 --- a/plugin/core/src/main/java/org/elasticsearch/xpack/ml/job/persistence/ScheduledEventsQueryBuilder.java +++ b/plugin/core/src/main/java/org/elasticsearch/xpack/ml/job/persistence/ScheduledEventsQueryBuilder.java @@ -41,6 +41,11 @@ public class ScheduledEventsQueryBuilder { return this; } + public ScheduledEventsQueryBuilder start(long start) { + this.start = Long.toString(start); + return this; + } + public ScheduledEventsQueryBuilder end(String end) { this.end = end; return this; diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/TransportPostCalendarEventsAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/TransportPostCalendarEventsAction.java index 6f119639802..8239db20422 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/TransportPostCalendarEventsAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/TransportPostCalendarEventsAction.java @@ -88,8 +88,7 @@ public class TransportPostCalendarEventsAction extends HandledTransportAction() { @Override public void onResponse(IndexResponse indexResponse) { - jobManager.updateProcessOnCalendarChanged(calendar.getJobIds()); listener.onResponse(new PutCalendarAction.Response(calendar)); } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java index 9e946763a18..fecc3448715 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java @@ -60,7 +60,6 @@ import java.io.IOException; import java.io.InputStream; import java.time.Duration; import java.time.ZonedDateTime; -import java.util.Collections; import java.util.Date; import java.util.Iterator; import java.util.List; @@ -280,7 +279,8 @@ public class AutodetectProcessManager extends AbstractComponent { if (updateParams.isUpdateScheduledEvents()) { Job job = jobManager.getJobOrThrowIfUnknown(jobTask.getJobId()); - ScheduledEventsQueryBuilder query = new ScheduledEventsQueryBuilder().start(Long.toString(new Date().getTime())); + DataCounts dataCounts = getStatistics(jobTask).get().v1(); + ScheduledEventsQueryBuilder query = new ScheduledEventsQueryBuilder().start(job.earliestValidTimestamp(dataCounts)); jobProvider.scheduledEventsForJob(jobTask.getJobId(), job.getGroups(), query, eventsListener); } else { eventsListener.onResponse(null); diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/config/JobTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/config/JobTests.java index 240de975ad8..4db9192e53e 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/config/JobTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/config/JobTests.java @@ -22,6 +22,7 @@ import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xpack.ml.MachineLearningClientActionPlugin; import org.elasticsearch.xpack.ml.job.messages.Messages; import org.elasticsearch.xpack.ml.job.persistence.AnomalyDetectorsIndexFields; +import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts; import java.io.IOException; import java.util.ArrayList; @@ -551,6 +552,30 @@ public class JobTests extends AbstractSerializingTestCase { builder.build().estimateMemoryFootprint()); } + public void testEarliestValidTimestamp_GivenEmptyDataCounts() { + assertThat(createRandomizedJob().earliestValidTimestamp(new DataCounts("foo")), equalTo(0L)); + } + + public void testEarliestValidTimestamp_GivenDataCountsAndZeroLatency() { + Job.Builder builder = buildJobBuilder("foo"); + DataCounts dataCounts = new DataCounts(builder.getId()); + dataCounts.setLatestRecordTimeStamp(new Date(123456789L)); + + assertThat(builder.build().earliestValidTimestamp(dataCounts), equalTo(123456789L)); + } + + public void testEarliestValidTimestamp_GivenDataCountsAndLatency() { + Job.Builder builder = buildJobBuilder("foo"); + AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(builder.build().getAnalysisConfig()); + analysisConfig.setLatency(TimeValue.timeValueMillis(1000L)); + builder.setAnalysisConfig(analysisConfig); + + DataCounts dataCounts = new DataCounts(builder.getId()); + dataCounts.setLatestRecordTimeStamp(new Date(123456789L)); + + assertThat(builder.build().earliestValidTimestamp(dataCounts), equalTo(123455789L)); + } + public static Job.Builder buildJobBuilder(String id, Date date) { Job.Builder builder = new Job.Builder(id); builder.setCreateTime(date);