[ML] Remove confusing datafeed log message (elastic/x-pack-elasticsearch#3202)

Original commit: elastic/x-pack-elasticsearch@b8ec3d06c9
This commit is contained in:
David Kyle 2017-12-04 13:08:49 +00:00 committed by GitHub
parent 2c978842da
commit cb9314ba78
2 changed files with 15 additions and 22 deletions

View File

@ -50,7 +50,7 @@ class DatafeedJob {
private final Supplier<Long> currentTimeSupplier;
private volatile long lookbackStartTimeMs;
private volatile Long lastEndTimeMs;
private volatile long lastEndTimeMs;
private AtomicBoolean running = new AtomicBoolean(true);
private volatile boolean isIsolated;
@ -98,6 +98,8 @@ class DatafeedJob {
DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.printer().print(lookbackStartTimeMs),
endTime == null ? "real-time" : DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.printer().print(lookbackEnd));
auditor.info(jobId, msg);
LOGGER.info("[" + jobId + "] " + msg);
FlushJobAction.Request request = new FlushJobAction.Request(jobId);
request.setCalcInterim(true);
@ -120,23 +122,20 @@ class DatafeedJob {
}
private long skipToStartTime(long startTime) {
if (lastEndTimeMs != null) {
if (lastEndTimeMs + 1 > startTime) {
// start time is before last checkpoint, thus continue from checkpoint
return lastEndTimeMs + 1;
}
// start time is after last checkpoint, thus we need to skip time
FlushJobAction.Request request = new FlushJobAction.Request(jobId);
request.setSkipTime(String.valueOf(startTime));
FlushJobAction.Response flushResponse = flushJob(request);
LOGGER.info("Skipped to time [" + flushResponse.getLastFinalizedBucketEnd().getTime() + "]");
return flushResponse.getLastFinalizedBucketEnd().getTime();
if (lastEndTimeMs + 1 > startTime) {
// start time is before last checkpoint, thus continue from checkpoint
return lastEndTimeMs + 1;
}
return startTime;
// start time is after last checkpoint, thus we need to skip time
FlushJobAction.Request request = new FlushJobAction.Request(jobId);
request.setSkipTime(String.valueOf(startTime));
FlushJobAction.Response flushResponse = flushJob(request);
LOGGER.info("Skipped to time [" + flushResponse.getLastFinalizedBucketEnd().getTime() + "]");
return flushResponse.getLastFinalizedBucketEnd().getTime();
}
long runRealtime() throws Exception {
long start = lastEndTimeMs == null ? lookbackStartTimeMs : Math.max(lookbackStartTimeMs, lastEndTimeMs + 1);
long start = Math.max(lookbackStartTimeMs, lastEndTimeMs + 1);
long nowMinusQueryDelay = currentTimeSupplier.get() - queryDelayMs;
long end = toIntervalStartEpochMs(nowMinusQueryDelay);
FlushJobAction.Request request = new FlushJobAction.Request(jobId);
@ -239,7 +238,7 @@ class DatafeedJob {
}
}
lastEndTimeMs = Math.max(lastEndTimeMs == null ? 0 : lastEndTimeMs, end - 1);
lastEndTimeMs = Math.max(lastEndTimeMs, end - 1);
LOGGER.debug("[{}] Complete iterating data extractor [{}], [{}], [{}], [{}], [{}]", jobId, error, recordCount,
lastEndTimeMs, isRunning(), dataExtractor.isCancelled());
@ -311,7 +310,7 @@ class DatafeedJob {
/**
* Visible for testing
*/
Long lastEndTimeMs() {
long lastEndTimeMs() {
return lastEndTimeMs;
}

View File

@ -17,7 +17,6 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.index.mapper.DateFieldMapper;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.ml.MachineLearning;
@ -51,8 +50,6 @@ import static org.elasticsearch.xpack.persistent.PersistentTasksService.WaitForP
public class DatafeedManager extends AbstractComponent {
private static final String INF_SYMBOL = "forever";
private final Client client;
private final ClusterService clusterService;
private final PersistentTasksService persistentTasksService;
@ -159,9 +156,6 @@ public class DatafeedManager extends AbstractComponent {
// otherwise if a stop datafeed call is made immediately after the start datafeed call we could cancel
// the DatafeedTask without stopping datafeed, which causes the datafeed to keep on running.
private void innerRun(Holder holder, long startTime, Long endTime) {
logger.info("Starting datafeed [{}] for job [{}] in [{}, {})", holder.datafeed.getId(), holder.datafeed.getJobId(),
DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.printer().print(startTime),
endTime == null ? INF_SYMBOL : DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.printer().print(endTime));
holder.future = threadPool.executor(MachineLearning.DATAFEED_THREAD_POOL_NAME).submit(new AbstractRunnable() {
@Override