[ML] Ensure datafeed runs on time (elastic/x-pack-elasticsearch#2471)

The datafeed runs on frequency-aligned intervals behind
query_delay. Currently, when a real-time run is triggered,
we subtract query_delay from now and then we take the aligned
interval. This results into running frequency + query_delay
behind now. The fix involves simply adding the query_delay
into the time real-time runs occur.

Relates elastic/x-pack-elasticsearch#2426

Original commit: elastic/x-pack-elasticsearch@61ceaaca8f
This commit is contained in:
Dimitris Athanasiou 2017-09-12 13:24:55 +01:00 committed by GitHub
parent 59d94eba40
commit e4882b36b7
2 changed files with 4 additions and 4 deletions

View File

@ -274,7 +274,7 @@ class DatafeedJob {
private long nextRealtimeTimestamp() { private long nextRealtimeTimestamp() {
long epochMs = currentTimeSupplier.get() + frequencyMs; long epochMs = currentTimeSupplier.get() + frequencyMs;
return toIntervalStartEpochMs(epochMs) + NEXT_TASK_DELAY_MS; return toIntervalStartEpochMs(epochMs) + queryDelayMs + NEXT_TASK_DELAY_MS;
} }
private long toIntervalStartEpochMs(long epochMs) { private long toIntervalStartEpochMs(long epochMs) {

View File

@ -116,7 +116,7 @@ public class DatafeedJobTests extends ESTestCase {
long queryDelayMs = 500; long queryDelayMs = 500;
DatafeedJob datafeedJob = createDatafeedJob(frequencyMs, queryDelayMs, -1, -1); DatafeedJob datafeedJob = createDatafeedJob(frequencyMs, queryDelayMs, -1, -1);
long next = datafeedJob.runLookBack(0L, null); long next = datafeedJob.runLookBack(0L, null);
assertEquals(2000 + frequencyMs + 100, next); assertEquals(2000 + frequencyMs + queryDelayMs + 100, next);
verify(dataExtractorFactory).newExtractor(0L, 1500L); verify(dataExtractorFactory).newExtractor(0L, 1500L);
FlushJobAction.Request flushRequest = new FlushJobAction.Request("_job_id"); FlushJobAction.Request flushRequest = new FlushJobAction.Request("_job_id");
@ -138,7 +138,7 @@ public class DatafeedJobTests extends ESTestCase {
long queryDelayMs = 500; long queryDelayMs = 500;
DatafeedJob datafeedJob = createDatafeedJob(frequencyMs, queryDelayMs, latestFinalBucketEndTimeMs, latestRecordTimeMs); DatafeedJob datafeedJob = createDatafeedJob(frequencyMs, queryDelayMs, latestFinalBucketEndTimeMs, latestRecordTimeMs);
long next = datafeedJob.runLookBack(0L, null); long next = datafeedJob.runLookBack(0L, null);
assertEquals(10000 + frequencyMs + 100, next); assertEquals(10000 + frequencyMs + queryDelayMs + 100, next);
verify(dataExtractorFactory).newExtractor(5000 + 1L, currentTime - queryDelayMs); verify(dataExtractorFactory).newExtractor(5000 + 1L, currentTime - queryDelayMs);
assertThat(flushJobRequests.getAllValues().size(), equalTo(1)); assertThat(flushJobRequests.getAllValues().size(), equalTo(1));
@ -185,7 +185,7 @@ public class DatafeedJobTests extends ESTestCase {
long queryDelayMs = 1000; long queryDelayMs = 1000;
DatafeedJob datafeedJob = createDatafeedJob(frequencyMs, queryDelayMs, 1000, -1); DatafeedJob datafeedJob = createDatafeedJob(frequencyMs, queryDelayMs, 1000, -1);
long next = datafeedJob.runRealtime(); long next = datafeedJob.runRealtime();
assertEquals(currentTime + frequencyMs + 100, next); assertEquals(currentTime + frequencyMs + queryDelayMs + 100, next);
verify(dataExtractorFactory).newExtractor(1000L + 1L, currentTime - queryDelayMs); verify(dataExtractorFactory).newExtractor(1000L + 1L, currentTime - queryDelayMs);
FlushJobAction.Request flushRequest = new FlushJobAction.Request("_job_id"); FlushJobAction.Request flushRequest = new FlushJobAction.Request("_job_id");