diff --git a/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJob.java b/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJob.java index 883a22124e3..225ad4c5cd4 100644 --- a/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJob.java +++ b/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJob.java @@ -287,8 +287,14 @@ class DatafeedJob { } private long nextRealtimeTimestamp() { - long epochMs = currentTimeSupplier.get() + frequencyMs; - return toIntervalStartEpochMs(epochMs) + queryDelayMs + NEXT_TASK_DELAY_MS; + // We find the timestamp of the start of the next frequency interval. + // The goal is to minimize any lag. To do so, + // we offset the time by the query delay modulo frequency. + // For example, if frequency is 60s and query delay 90s, + // we run 30s past the minute. If frequency is 1s and query delay 10s, + // we don't add anything and we'll run every second. + long next = currentTimeSupplier.get() + frequencyMs; + return toIntervalStartEpochMs(next) + queryDelayMs % frequencyMs + NEXT_TASK_DELAY_MS; } private long toIntervalStartEpochMs(long epochMs) { diff --git a/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobTests.java b/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobTests.java index 75106b8ee0c..268a351cd24 100644 --- a/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobTests.java +++ b/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobTests.java @@ -198,7 +198,7 @@ public class DatafeedJobTests extends ESTestCase { long queryDelayMs = 1000; DatafeedJob datafeedJob = createDatafeedJob(frequencyMs, queryDelayMs, 1000, -1); long next = datafeedJob.runRealtime(); - assertEquals(currentTime + frequencyMs + queryDelayMs + 100, next); + assertEquals(currentTime + frequencyMs + 100, next); verify(dataExtractorFactory).newExtractor(1000L + 1L, currentTime - queryDelayMs); FlushJobAction.Request flushRequest = new FlushJobAction.Request(jobId);