diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJob.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJob.java index 6dbc1f0c07c..9daf4659f64 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJob.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJob.java @@ -50,7 +50,7 @@ class DatafeedJob { private final Supplier currentTimeSupplier; private volatile long lookbackStartTimeMs; - private volatile Long lastEndTimeMs; + private volatile long lastEndTimeMs; private AtomicBoolean running = new AtomicBoolean(true); private volatile boolean isIsolated; @@ -98,6 +98,8 @@ class DatafeedJob { DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.printer().print(lookbackStartTimeMs), endTime == null ? "real-time" : DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.printer().print(lookbackEnd)); auditor.info(jobId, msg); + LOGGER.info("[" + jobId + "] " + msg); + FlushJobAction.Request request = new FlushJobAction.Request(jobId); request.setCalcInterim(true); @@ -120,23 +122,20 @@ class DatafeedJob { } private long skipToStartTime(long startTime) { - if (lastEndTimeMs != null) { - if (lastEndTimeMs + 1 > startTime) { - // start time is before last checkpoint, thus continue from checkpoint - return lastEndTimeMs + 1; - } - // start time is after last checkpoint, thus we need to skip time - FlushJobAction.Request request = new FlushJobAction.Request(jobId); - request.setSkipTime(String.valueOf(startTime)); - FlushJobAction.Response flushResponse = flushJob(request); - LOGGER.info("Skipped to time [" + flushResponse.getLastFinalizedBucketEnd().getTime() + "]"); - return flushResponse.getLastFinalizedBucketEnd().getTime(); + if (lastEndTimeMs + 1 > startTime) { + // start time is before last checkpoint, thus continue from checkpoint + return lastEndTimeMs + 1; } - return startTime; + // start time is after last checkpoint, thus we need to skip time + FlushJobAction.Request request = new FlushJobAction.Request(jobId); + request.setSkipTime(String.valueOf(startTime)); + FlushJobAction.Response flushResponse = flushJob(request); + LOGGER.info("Skipped to time [" + flushResponse.getLastFinalizedBucketEnd().getTime() + "]"); + return flushResponse.getLastFinalizedBucketEnd().getTime(); } long runRealtime() throws Exception { - long start = lastEndTimeMs == null ? lookbackStartTimeMs : Math.max(lookbackStartTimeMs, lastEndTimeMs + 1); + long start = Math.max(lookbackStartTimeMs, lastEndTimeMs + 1); long nowMinusQueryDelay = currentTimeSupplier.get() - queryDelayMs; long end = toIntervalStartEpochMs(nowMinusQueryDelay); FlushJobAction.Request request = new FlushJobAction.Request(jobId); @@ -239,7 +238,7 @@ class DatafeedJob { } } - lastEndTimeMs = Math.max(lastEndTimeMs == null ? 0 : lastEndTimeMs, end - 1); + lastEndTimeMs = Math.max(lastEndTimeMs, end - 1); LOGGER.debug("[{}] Complete iterating data extractor [{}], [{}], [{}], [{}], [{}]", jobId, error, recordCount, lastEndTimeMs, isRunning(), dataExtractor.isCancelled()); @@ -311,7 +310,7 @@ class DatafeedJob { /** * Visible for testing */ - Long lastEndTimeMs() { + long lastEndTimeMs() { return lastEndTimeMs; } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManager.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManager.java index 01e5a7d3425..a07c0dec464 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManager.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManager.java @@ -17,7 +17,6 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.FutureUtils; -import org.elasticsearch.index.mapper.DateFieldMapper; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.ml.MachineLearning; @@ -51,8 +50,6 @@ import static org.elasticsearch.xpack.persistent.PersistentTasksService.WaitForP public class DatafeedManager extends AbstractComponent { - private static final String INF_SYMBOL = "forever"; - private final Client client; private final ClusterService clusterService; private final PersistentTasksService persistentTasksService; @@ -159,9 +156,6 @@ public class DatafeedManager extends AbstractComponent { // otherwise if a stop datafeed call is made immediately after the start datafeed call we could cancel // the DatafeedTask without stopping datafeed, which causes the datafeed to keep on running. private void innerRun(Holder holder, long startTime, Long endTime) { - logger.info("Starting datafeed [{}] for job [{}] in [{}, {})", holder.datafeed.getId(), holder.datafeed.getJobId(), - DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.printer().print(startTime), - endTime == null ? INF_SYMBOL : DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.printer().print(endTime)); holder.future = threadPool.executor(MachineLearning.DATAFEED_THREAD_POOL_NAME).submit(new AbstractRunnable() { @Override