diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJob.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJob.java index 6777011641d..f30e2f09168 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJob.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJob.java @@ -274,7 +274,7 @@ class DatafeedJob { private long nextRealtimeTimestamp() { long epochMs = currentTimeSupplier.get() + frequencyMs; - return toIntervalStartEpochMs(epochMs) + NEXT_TASK_DELAY_MS; + return toIntervalStartEpochMs(epochMs) + queryDelayMs + NEXT_TASK_DELAY_MS; } private long toIntervalStartEpochMs(long epochMs) { diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobTests.java index 9974d1aa803..c5ef5bfda38 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobTests.java @@ -116,7 +116,7 @@ public class DatafeedJobTests extends ESTestCase { long queryDelayMs = 500; DatafeedJob datafeedJob = createDatafeedJob(frequencyMs, queryDelayMs, -1, -1); long next = datafeedJob.runLookBack(0L, null); - assertEquals(2000 + frequencyMs + 100, next); + assertEquals(2000 + frequencyMs + queryDelayMs + 100, next); verify(dataExtractorFactory).newExtractor(0L, 1500L); FlushJobAction.Request flushRequest = new FlushJobAction.Request("_job_id"); @@ -138,7 +138,7 @@ public class DatafeedJobTests extends ESTestCase { long queryDelayMs = 500; DatafeedJob datafeedJob = createDatafeedJob(frequencyMs, queryDelayMs, latestFinalBucketEndTimeMs, latestRecordTimeMs); long next = datafeedJob.runLookBack(0L, null); - assertEquals(10000 + frequencyMs + 100, next); + assertEquals(10000 + frequencyMs + queryDelayMs + 100, next); verify(dataExtractorFactory).newExtractor(5000 + 1L, currentTime - queryDelayMs); assertThat(flushJobRequests.getAllValues().size(), equalTo(1)); @@ -185,7 +185,7 @@ public class DatafeedJobTests extends ESTestCase { long queryDelayMs = 1000; DatafeedJob datafeedJob = createDatafeedJob(frequencyMs, queryDelayMs, 1000, -1); long next = datafeedJob.runRealtime(); - assertEquals(currentTime + frequencyMs + 100, next); + assertEquals(currentTime + frequencyMs + queryDelayMs + 100, next); verify(dataExtractorFactory).newExtractor(1000L + 1L, currentTime - queryDelayMs); FlushJobAction.Request flushRequest = new FlushJobAction.Request("_job_id");