[ML] Respect datafeed frequency when less or equal than query_delay (elastic/x-pack-elasticsearch#4168)

In order to deal with the most anticipated scenario, when datafeed
frequency is greater than the query_delay, we add the query_delay
to the frequency in order to determine the next time we will trigger
a real-time run. For example, if frequency is 10s and query_delay 1s,
we make sure to trigger the real-time run at a 10s + 1s = 11s offset.

However, this is not correct in the case the frequency is less or
equal to the query_delay. For example, if frequency is 1s and
query_delay is 10s. we would also end up triggering at 11s offset.
But the right behaviour would be to trigger every second while
ensuring we are searching for up to 10seconds ago.

This commit fixes this issue.

relates elastic/x-pack-elasticsearch#4167

Original commit: elastic/x-pack-elasticsearch@f605885167
This commit is contained in:
Dimitris Athanasiou 2018-03-22 14:05:22 +00:00 committed by GitHub
parent 42eae8b3be
commit 25b1a444cf
2 changed files with 9 additions and 3 deletions

View File

@ -287,8 +287,14 @@ class DatafeedJob {
}
private long nextRealtimeTimestamp() {
long epochMs = currentTimeSupplier.get() + frequencyMs;
return toIntervalStartEpochMs(epochMs) + queryDelayMs + NEXT_TASK_DELAY_MS;
// We find the timestamp of the start of the next frequency interval.
// The goal is to minimize any lag. To do so,
// we offset the time by the query delay modulo frequency.
// For example, if frequency is 60s and query delay 90s,
// we run 30s past the minute. If frequency is 1s and query delay 10s,
// we don't add anything and we'll run every second.
long next = currentTimeSupplier.get() + frequencyMs;
return toIntervalStartEpochMs(next) + queryDelayMs % frequencyMs + NEXT_TASK_DELAY_MS;
}
private long toIntervalStartEpochMs(long epochMs) {

View File

@ -198,7 +198,7 @@ public class DatafeedJobTests extends ESTestCase {
long queryDelayMs = 1000;
DatafeedJob datafeedJob = createDatafeedJob(frequencyMs, queryDelayMs, 1000, -1);
long next = datafeedJob.runRealtime();
assertEquals(currentTime + frequencyMs + queryDelayMs + 100, next);
assertEquals(currentTime + frequencyMs + 100, next);
verify(dataExtractorFactory).newExtractor(1000L + 1L, currentTime - queryDelayMs);
FlushJobAction.Request flushRequest = new FlushJobAction.Request(jobId);