diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJob.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJob.java index 9daf4659f64..72e7c56016c 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJob.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJob.java @@ -50,7 +50,7 @@ class DatafeedJob { private final Supplier currentTimeSupplier; private volatile long lookbackStartTimeMs; - private volatile long lastEndTimeMs; + private volatile Long lastEndTimeMs; private AtomicBoolean running = new AtomicBoolean(true); private volatile boolean isIsolated; @@ -122,20 +122,23 @@ class DatafeedJob { } private long skipToStartTime(long startTime) { - if (lastEndTimeMs + 1 > startTime) { - // start time is before last checkpoint, thus continue from checkpoint - return lastEndTimeMs + 1; + if (lastEndTimeMs != null) { + if (lastEndTimeMs + 1 > startTime) { + // start time is before last checkpoint, thus continue from checkpoint + return lastEndTimeMs + 1; + } + // start time is after last checkpoint, thus we need to skip time + FlushJobAction.Request request = new FlushJobAction.Request(jobId); + request.setSkipTime(String.valueOf(startTime)); + FlushJobAction.Response flushResponse = flushJob(request); + LOGGER.info("Skipped to time [" + flushResponse.getLastFinalizedBucketEnd().getTime() + "]"); + return flushResponse.getLastFinalizedBucketEnd().getTime(); } - // start time is after last checkpoint, thus we need to skip time - FlushJobAction.Request request = new FlushJobAction.Request(jobId); - request.setSkipTime(String.valueOf(startTime)); - FlushJobAction.Response flushResponse = flushJob(request); - LOGGER.info("Skipped to time [" + flushResponse.getLastFinalizedBucketEnd().getTime() + "]"); - return flushResponse.getLastFinalizedBucketEnd().getTime(); + return startTime; } long runRealtime() throws Exception { - long start = Math.max(lookbackStartTimeMs, lastEndTimeMs + 1); + long start = lastEndTimeMs == null ? lookbackStartTimeMs : Math.max(lookbackStartTimeMs, lastEndTimeMs + 1); long nowMinusQueryDelay = currentTimeSupplier.get() - queryDelayMs; long end = toIntervalStartEpochMs(nowMinusQueryDelay); FlushJobAction.Request request = new FlushJobAction.Request(jobId); @@ -238,7 +241,7 @@ class DatafeedJob { } } - lastEndTimeMs = Math.max(lastEndTimeMs, end - 1); + lastEndTimeMs = Math.max(lastEndTimeMs == null ? 0 : lastEndTimeMs, end - 1); LOGGER.debug("[{}] Complete iterating data extractor [{}], [{}], [{}], [{}], [{}]", jobId, error, recordCount, lastEndTimeMs, isRunning(), dataExtractor.isCancelled()); @@ -310,7 +313,7 @@ class DatafeedJob { /** * Visible for testing */ - long lastEndTimeMs() { + Long lastEndTimeMs() { return lastEndTimeMs; }