[ML] Uses AbstractRunnable for running datafeed, so that we handle rejected execution exceptions.

Original commit: elastic/x-pack-elasticsearch@d04d3fa401
This commit is contained in:
Martijn van Groningen 2017-03-21 14:17:59 +01:00
parent dd91ac0cd3
commit 6fae1867ec
1 changed files with 71 additions and 50 deletions

View File

@ -14,6 +14,7 @@ import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.index.mapper.DateFieldMapper;
import org.elasticsearch.rest.RestStatus;
@ -128,40 +129,50 @@ public class DatafeedJobRunner extends AbstractComponent {
logger.info("Starting datafeed [{}] for job [{}] in [{}, {})", holder.datafeed.getId(), holder.datafeed.getJobId(),
DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.printer().print(startTime),
endTime == null ? INF_SYMBOL : DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.printer().print(endTime));
holder.future = threadPool.executor(MachineLearning.DATAFEED_RUNNER_THREAD_POOL_NAME).submit(() -> {
Long next = null;
try {
next = holder.executeLoopBack(startTime, endTime);
} catch (DatafeedJob.ExtractionProblemException e) {
if (endTime == null) {
next = e.nextDelayInMsSinceEpoch;
}
holder.problemTracker.reportExtractionProblem(e.getCause().getMessage());
} catch (DatafeedJob.AnalysisProblemException e) {
if (endTime == null) {
next = e.nextDelayInMsSinceEpoch;
}
holder.problemTracker.reportAnalysisProblem(e.getCause().getMessage());
} catch (DatafeedJob.EmptyDataCountException e) {
if (endTime == null) {
holder.problemTracker.reportEmptyDataCount();
next = e.nextDelayInMsSinceEpoch;
} else {
// Notify that a lookback-only run found no data
String lookbackNoDataMsg = Messages.getMessage(Messages.JOB_AUDIT_DATAFEED_LOOKBACK_NO_DATA);
logger.warn("[{}] {}", holder.datafeed.getJobId(), lookbackNoDataMsg);
auditor.warning(holder.datafeed.getJobId(), lookbackNoDataMsg);
}
} catch (Exception e) {
holder.future = threadPool.executor(MachineLearning.DATAFEED_RUNNER_THREAD_POOL_NAME).submit(new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
logger.error("Failed lookback import for job [" + holder.datafeed.getJobId() + "]", e);
holder.stop("general_lookback_failure", TimeValue.timeValueSeconds(20), e);
return;
}
if (next != null) {
doDatafeedRealtime(next, holder.datafeed.getJobId(), holder);
} else {
holder.stop("no_realtime", TimeValue.timeValueSeconds(20), null);
holder.problemTracker.finishReport();
@Override
protected void doRun() throws Exception {
Long next = null;
try {
next = holder.executeLoopBack(startTime, endTime);
} catch (DatafeedJob.ExtractionProblemException e) {
if (endTime == null) {
next = e.nextDelayInMsSinceEpoch;
}
holder.problemTracker.reportExtractionProblem(e.getCause().getMessage());
} catch (DatafeedJob.AnalysisProblemException e) {
if (endTime == null) {
next = e.nextDelayInMsSinceEpoch;
}
holder.problemTracker.reportAnalysisProblem(e.getCause().getMessage());
} catch (DatafeedJob.EmptyDataCountException e) {
if (endTime == null) {
holder.problemTracker.reportEmptyDataCount();
next = e.nextDelayInMsSinceEpoch;
} else {
// Notify that a lookback-only run found no data
String lookbackNoDataMsg = Messages.getMessage(Messages.JOB_AUDIT_DATAFEED_LOOKBACK_NO_DATA);
logger.warn("[{}] {}", holder.datafeed.getJobId(), lookbackNoDataMsg);
auditor.warning(holder.datafeed.getJobId(), lookbackNoDataMsg);
}
} catch (Exception e) {
logger.error("Failed lookback import for job [" + holder.datafeed.getJobId() + "]", e);
holder.stop("general_lookback_failure", TimeValue.timeValueSeconds(20), e);
return;
}
if (next != null) {
doDatafeedRealtime(next, holder.datafeed.getJobId(), holder);
} else {
holder.stop("no_realtime", TimeValue.timeValueSeconds(20), null);
holder.problemTracker.finishReport();
}
}
});
}
@ -170,28 +181,38 @@ public class DatafeedJobRunner extends AbstractComponent {
if (holder.isRunning()) {
TimeValue delay = computeNextDelay(delayInMsSinceEpoch);
logger.debug("Waiting [{}] before executing next realtime import for job [{}]", delay, jobId);
holder.future = threadPool.schedule(delay, MachineLearning.DATAFEED_RUNNER_THREAD_POOL_NAME, () -> {
long nextDelayInMsSinceEpoch;
try {
nextDelayInMsSinceEpoch = holder.executeRealTime();
holder.problemTracker.reportNoneEmptyCount();
} catch (DatafeedJob.ExtractionProblemException e) {
nextDelayInMsSinceEpoch = e.nextDelayInMsSinceEpoch;
holder.problemTracker.reportExtractionProblem(e.getCause().getMessage());
} catch (DatafeedJob.AnalysisProblemException e) {
nextDelayInMsSinceEpoch = e.nextDelayInMsSinceEpoch;
holder.problemTracker.reportAnalysisProblem(e.getCause().getMessage());
} catch (DatafeedJob.EmptyDataCountException e) {
nextDelayInMsSinceEpoch = e.nextDelayInMsSinceEpoch;
holder.problemTracker.reportEmptyDataCount();
} catch (Exception e) {
holder.future = threadPool.schedule(delay, MachineLearning.DATAFEED_RUNNER_THREAD_POOL_NAME, new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
logger.error("Unexpected datafeed failure for job [" + jobId + "] stopping...", e);
holder.stop("general_realtime_error", TimeValue.timeValueSeconds(20), e);
return;
}
holder.problemTracker.finishReport();
if (nextDelayInMsSinceEpoch >= 0) {
doDatafeedRealtime(nextDelayInMsSinceEpoch, jobId, holder);
@Override
protected void doRun() throws Exception {
long nextDelayInMsSinceEpoch;
try {
nextDelayInMsSinceEpoch = holder.executeRealTime();
holder.problemTracker.reportNoneEmptyCount();
} catch (DatafeedJob.ExtractionProblemException e) {
nextDelayInMsSinceEpoch = e.nextDelayInMsSinceEpoch;
holder.problemTracker.reportExtractionProblem(e.getCause().getMessage());
} catch (DatafeedJob.AnalysisProblemException e) {
nextDelayInMsSinceEpoch = e.nextDelayInMsSinceEpoch;
holder.problemTracker.reportAnalysisProblem(e.getCause().getMessage());
} catch (DatafeedJob.EmptyDataCountException e) {
nextDelayInMsSinceEpoch = e.nextDelayInMsSinceEpoch;
holder.problemTracker.reportEmptyDataCount();
} catch (Exception e) {
logger.error("Unexpected datafeed failure for job [" + jobId + "] stopping...", e);
holder.stop("general_realtime_error", TimeValue.timeValueSeconds(20), e);
return;
}
holder.problemTracker.finishReport();
if (nextDelayInMsSinceEpoch >= 0) {
doDatafeedRealtime(nextDelayInMsSinceEpoch, jobId, holder);
}
}
});
}