diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/action/InternalStartDatafeedAction.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/action/InternalStartDatafeedAction.java index c4eda16d352..9e664dcf500 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/action/InternalStartDatafeedAction.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/action/InternalStartDatafeedAction.java @@ -95,9 +95,10 @@ public class InternalStartDatafeedAction extends /* public for testing */ public void stop() { - if (holder != null) { - holder.stop(null); + if (holder == null) { + throw new IllegalStateException("task cancel ran before datafeed runner assigned the holder"); } + holder.stop("cancel", null); } } diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobRunner.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobRunner.java index 935d18a79f7..b18d55bdc44 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobRunner.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobRunner.java @@ -19,20 +19,20 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.ml.MlPlugin; import org.elasticsearch.xpack.ml.action.InternalStartDatafeedAction; import org.elasticsearch.xpack.ml.action.UpdateDatafeedStatusAction; -import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts; +import org.elasticsearch.xpack.ml.action.util.QueryPage; +import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory; +import org.elasticsearch.xpack.ml.datafeed.extractor.scroll.ScrollDataExtractorFactory; import org.elasticsearch.xpack.ml.job.config.DataDescription; +import org.elasticsearch.xpack.ml.job.config.DefaultFrequency; import org.elasticsearch.xpack.ml.job.config.Job; import org.elasticsearch.xpack.ml.job.config.JobStatus; -import org.elasticsearch.xpack.ml.notifications.Auditor; -import org.elasticsearch.xpack.ml.job.config.DefaultFrequency; import org.elasticsearch.xpack.ml.job.metadata.Allocation; import org.elasticsearch.xpack.ml.job.metadata.MlMetadata; import org.elasticsearch.xpack.ml.job.persistence.BucketsQueryBuilder; import org.elasticsearch.xpack.ml.job.persistence.JobProvider; -import org.elasticsearch.xpack.ml.action.util.QueryPage; +import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts; import org.elasticsearch.xpack.ml.job.results.Bucket; -import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory; -import org.elasticsearch.xpack.ml.datafeed.extractor.scroll.ScrollDataExtractorFactory; +import org.elasticsearch.xpack.ml.notifications.Auditor; import org.elasticsearch.xpack.ml.utils.ExceptionsHelper; import java.time.Duration; @@ -89,7 +89,7 @@ public class DatafeedJobRunner extends AbstractComponent { private void innerRun(Holder holder, long startTime, Long endTime) { setJobDatafeedStatus(holder.datafeed.getId(), DatafeedStatus.STARTED, error -> { if (error != null) { - holder.stop(error); + holder.stop("unable_to_set_datafeed_status", error); return; } @@ -114,13 +114,13 @@ public class DatafeedJobRunner extends AbstractComponent { } } catch (Exception e) { logger.error("Failed lookback import for job [" + holder.datafeed.getJobId() + "]", e); - holder.stop(e); + holder.stop("general_lookback_failure", e); return; } if (next != null) { doDatafeedRealtime(next, holder.datafeed.getJobId(), holder); } else { - holder.stop(null); + holder.stop("no_realtime", null); holder.problemTracker.finishReport(); } }); @@ -145,12 +145,12 @@ public class DatafeedJobRunner extends AbstractComponent { nextDelayInMsSinceEpoch = e.nextDelayInMsSinceEpoch; if (holder.problemTracker.updateEmptyDataCount(true)) { holder.problemTracker.finishReport(); - holder.stop(e); + holder.stop("empty_data", e); return; } } catch (Exception e) { logger.error("Unexpected datafeed failure for job [" + jobId + "] stopping...", e); - holder.stop(e); + holder.stop("general_realtime_error", e); return; } holder.problemTracker.finishReport(); @@ -279,14 +279,14 @@ public class DatafeedJobRunner extends AbstractComponent { return datafeedJob.isRunning(); } - public void stop(Exception e) { - logger.info("attempt to stop datafeed [{}] for job [{}]", datafeed.getId(), datafeed.getJobId()); + public void stop(String source, Exception e) { + logger.info("[{}] attempt to stop datafeed [{}] for job [{}]", source, datafeed.getId(), datafeed.getJobId()); if (datafeedJob.stop()) { FutureUtils.cancel(future); setJobDatafeedStatus(datafeed.getId(), DatafeedStatus.STOPPED, error -> handler.accept(e)); - logger.info("datafeed [{}] for job [{}] has been stopped", datafeed.getId(), datafeed.getJobId()); + logger.info("[{}] datafeed [{}] for job [{}] has been stopped", source, datafeed.getId(), datafeed.getJobId()); } else { - logger.info("datafeed [{}] for job [{}] was already stopped", datafeed.getId(), datafeed.getJobId()); + logger.info("[{}] datafeed [{}] for job [{}] was already stopped", source, datafeed.getId(), datafeed.getJobId()); } }