Improved datafeed logging for stopping

Original commit: elastic/x-pack-elasticsearch@94bd5d6a00
This commit is contained in:
Martijn van Groningen 2017-01-24 14:22:37 +01:00
parent b636a4b829
commit e9f899e57a
2 changed files with 18 additions and 17 deletions

View File

@ -95,9 +95,10 @@ public class InternalStartDatafeedAction extends
/* public for testing */ /* public for testing */
public void stop() { public void stop() {
if (holder != null) { if (holder == null) {
holder.stop(null); throw new IllegalStateException("task cancel ran before datafeed runner assigned the holder");
} }
holder.stop("cancel", null);
} }
} }

View File

@ -19,20 +19,20 @@ import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.ml.MlPlugin; import org.elasticsearch.xpack.ml.MlPlugin;
import org.elasticsearch.xpack.ml.action.InternalStartDatafeedAction; import org.elasticsearch.xpack.ml.action.InternalStartDatafeedAction;
import org.elasticsearch.xpack.ml.action.UpdateDatafeedStatusAction; import org.elasticsearch.xpack.ml.action.UpdateDatafeedStatusAction;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts; import org.elasticsearch.xpack.ml.action.util.QueryPage;
import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory;
import org.elasticsearch.xpack.ml.datafeed.extractor.scroll.ScrollDataExtractorFactory;
import org.elasticsearch.xpack.ml.job.config.DataDescription; import org.elasticsearch.xpack.ml.job.config.DataDescription;
import org.elasticsearch.xpack.ml.job.config.DefaultFrequency;
import org.elasticsearch.xpack.ml.job.config.Job; import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.config.JobStatus; import org.elasticsearch.xpack.ml.job.config.JobStatus;
import org.elasticsearch.xpack.ml.notifications.Auditor;
import org.elasticsearch.xpack.ml.job.config.DefaultFrequency;
import org.elasticsearch.xpack.ml.job.metadata.Allocation; import org.elasticsearch.xpack.ml.job.metadata.Allocation;
import org.elasticsearch.xpack.ml.job.metadata.MlMetadata; import org.elasticsearch.xpack.ml.job.metadata.MlMetadata;
import org.elasticsearch.xpack.ml.job.persistence.BucketsQueryBuilder; import org.elasticsearch.xpack.ml.job.persistence.BucketsQueryBuilder;
import org.elasticsearch.xpack.ml.job.persistence.JobProvider; import org.elasticsearch.xpack.ml.job.persistence.JobProvider;
import org.elasticsearch.xpack.ml.action.util.QueryPage; import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts;
import org.elasticsearch.xpack.ml.job.results.Bucket; import org.elasticsearch.xpack.ml.job.results.Bucket;
import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory; import org.elasticsearch.xpack.ml.notifications.Auditor;
import org.elasticsearch.xpack.ml.datafeed.extractor.scroll.ScrollDataExtractorFactory;
import org.elasticsearch.xpack.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.ml.utils.ExceptionsHelper;
import java.time.Duration; import java.time.Duration;
@ -89,7 +89,7 @@ public class DatafeedJobRunner extends AbstractComponent {
private void innerRun(Holder holder, long startTime, Long endTime) { private void innerRun(Holder holder, long startTime, Long endTime) {
setJobDatafeedStatus(holder.datafeed.getId(), DatafeedStatus.STARTED, error -> { setJobDatafeedStatus(holder.datafeed.getId(), DatafeedStatus.STARTED, error -> {
if (error != null) { if (error != null) {
holder.stop(error); holder.stop("unable_to_set_datafeed_status", error);
return; return;
} }
@ -114,13 +114,13 @@ public class DatafeedJobRunner extends AbstractComponent {
} }
} catch (Exception e) { } catch (Exception e) {
logger.error("Failed lookback import for job [" + holder.datafeed.getJobId() + "]", e); logger.error("Failed lookback import for job [" + holder.datafeed.getJobId() + "]", e);
holder.stop(e); holder.stop("general_lookback_failure", e);
return; return;
} }
if (next != null) { if (next != null) {
doDatafeedRealtime(next, holder.datafeed.getJobId(), holder); doDatafeedRealtime(next, holder.datafeed.getJobId(), holder);
} else { } else {
holder.stop(null); holder.stop("no_realtime", null);
holder.problemTracker.finishReport(); holder.problemTracker.finishReport();
} }
}); });
@ -145,12 +145,12 @@ public class DatafeedJobRunner extends AbstractComponent {
nextDelayInMsSinceEpoch = e.nextDelayInMsSinceEpoch; nextDelayInMsSinceEpoch = e.nextDelayInMsSinceEpoch;
if (holder.problemTracker.updateEmptyDataCount(true)) { if (holder.problemTracker.updateEmptyDataCount(true)) {
holder.problemTracker.finishReport(); holder.problemTracker.finishReport();
holder.stop(e); holder.stop("empty_data", e);
return; return;
} }
} catch (Exception e) { } catch (Exception e) {
logger.error("Unexpected datafeed failure for job [" + jobId + "] stopping...", e); logger.error("Unexpected datafeed failure for job [" + jobId + "] stopping...", e);
holder.stop(e); holder.stop("general_realtime_error", e);
return; return;
} }
holder.problemTracker.finishReport(); holder.problemTracker.finishReport();
@ -279,14 +279,14 @@ public class DatafeedJobRunner extends AbstractComponent {
return datafeedJob.isRunning(); return datafeedJob.isRunning();
} }
public void stop(Exception e) { public void stop(String source, Exception e) {
logger.info("attempt to stop datafeed [{}] for job [{}]", datafeed.getId(), datafeed.getJobId()); logger.info("[{}] attempt to stop datafeed [{}] for job [{}]", source, datafeed.getId(), datafeed.getJobId());
if (datafeedJob.stop()) { if (datafeedJob.stop()) {
FutureUtils.cancel(future); FutureUtils.cancel(future);
setJobDatafeedStatus(datafeed.getId(), DatafeedStatus.STOPPED, error -> handler.accept(e)); setJobDatafeedStatus(datafeed.getId(), DatafeedStatus.STOPPED, error -> handler.accept(e));
logger.info("datafeed [{}] for job [{}] has been stopped", datafeed.getId(), datafeed.getJobId()); logger.info("[{}] datafeed [{}] for job [{}] has been stopped", source, datafeed.getId(), datafeed.getJobId());
} else { } else {
logger.info("datafeed [{}] for job [{}] was already stopped", datafeed.getId(), datafeed.getJobId()); logger.info("[{}] datafeed [{}] for job [{}] was already stopped", source, datafeed.getId(), datafeed.getJobId());
} }
} }