Original commit: elastic/x-pack-elasticsearch@c83c3ebcc1
This commit is contained in:
David Kyle 2017-12-04 13:48:24 +00:00
parent cb9314ba78
commit d39c8b76db
1 changed files with 16 additions and 13 deletions

View File

@ -50,7 +50,7 @@ class DatafeedJob {
private final Supplier<Long> currentTimeSupplier; private final Supplier<Long> currentTimeSupplier;
private volatile long lookbackStartTimeMs; private volatile long lookbackStartTimeMs;
private volatile long lastEndTimeMs; private volatile Long lastEndTimeMs;
private AtomicBoolean running = new AtomicBoolean(true); private AtomicBoolean running = new AtomicBoolean(true);
private volatile boolean isIsolated; private volatile boolean isIsolated;
@ -122,20 +122,23 @@ class DatafeedJob {
} }
private long skipToStartTime(long startTime) { private long skipToStartTime(long startTime) {
if (lastEndTimeMs + 1 > startTime) { if (lastEndTimeMs != null) {
// start time is before last checkpoint, thus continue from checkpoint if (lastEndTimeMs + 1 > startTime) {
return lastEndTimeMs + 1; // start time is before last checkpoint, thus continue from checkpoint
return lastEndTimeMs + 1;
}
// start time is after last checkpoint, thus we need to skip time
FlushJobAction.Request request = new FlushJobAction.Request(jobId);
request.setSkipTime(String.valueOf(startTime));
FlushJobAction.Response flushResponse = flushJob(request);
LOGGER.info("Skipped to time [" + flushResponse.getLastFinalizedBucketEnd().getTime() + "]");
return flushResponse.getLastFinalizedBucketEnd().getTime();
} }
// start time is after last checkpoint, thus we need to skip time return startTime;
FlushJobAction.Request request = new FlushJobAction.Request(jobId);
request.setSkipTime(String.valueOf(startTime));
FlushJobAction.Response flushResponse = flushJob(request);
LOGGER.info("Skipped to time [" + flushResponse.getLastFinalizedBucketEnd().getTime() + "]");
return flushResponse.getLastFinalizedBucketEnd().getTime();
} }
long runRealtime() throws Exception { long runRealtime() throws Exception {
long start = Math.max(lookbackStartTimeMs, lastEndTimeMs + 1); long start = lastEndTimeMs == null ? lookbackStartTimeMs : Math.max(lookbackStartTimeMs, lastEndTimeMs + 1);
long nowMinusQueryDelay = currentTimeSupplier.get() - queryDelayMs; long nowMinusQueryDelay = currentTimeSupplier.get() - queryDelayMs;
long end = toIntervalStartEpochMs(nowMinusQueryDelay); long end = toIntervalStartEpochMs(nowMinusQueryDelay);
FlushJobAction.Request request = new FlushJobAction.Request(jobId); FlushJobAction.Request request = new FlushJobAction.Request(jobId);
@ -238,7 +241,7 @@ class DatafeedJob {
} }
} }
lastEndTimeMs = Math.max(lastEndTimeMs, end - 1); lastEndTimeMs = Math.max(lastEndTimeMs == null ? 0 : lastEndTimeMs, end - 1);
LOGGER.debug("[{}] Complete iterating data extractor [{}], [{}], [{}], [{}], [{}]", jobId, error, recordCount, LOGGER.debug("[{}] Complete iterating data extractor [{}], [{}], [{}], [{}], [{}]", jobId, error, recordCount,
lastEndTimeMs, isRunning(), dataExtractor.isCancelled()); lastEndTimeMs, isRunning(), dataExtractor.isCancelled());
@ -310,7 +313,7 @@ class DatafeedJob {
/** /**
* Visible for testing * Visible for testing
*/ */
long lastEndTimeMs() { Long lastEndTimeMs() {
return lastEndTimeMs; return lastEndTimeMs;
} }