diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/scheduler/ScheduledJobRunner.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/scheduler/ScheduledJobRunner.java index 43a1f1e9b15..743453560e7 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/scheduler/ScheduledJobRunner.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/scheduler/ScheduledJobRunner.java @@ -64,70 +64,72 @@ public class ScheduledJobRunner extends AbstractComponent { PrelertMetadata prelertMetadata = clusterService.state().metaData().custom(PrelertMetadata.TYPE); validate(schedulerId, prelertMetadata); - setJobSchedulerStatus(schedulerId, SchedulerStatus.STARTED, error -> { - if (error != null) { - handler.accept(error); - return; + Scheduler scheduler = prelertMetadata.getScheduler(schedulerId); + Job job = prelertMetadata.getJobs().get(scheduler.getJobId()); + BucketsQueryBuilder.BucketsQuery latestBucketQuery = new BucketsQueryBuilder() + .sortField(Bucket.TIMESTAMP.getPreferredName()) + .sortDescending(true).size(1) + .includeInterim(false) + .build(); + jobProvider.buckets(job.getId(), latestBucketQuery, buckets -> { + long latestFinalBucketEndMs = -1L; + Duration bucketSpan = Duration.ofSeconds(job.getAnalysisConfig().getBucketSpan()); + if (buckets.results().size() == 1) { + latestFinalBucketEndMs = buckets.results().get(0).getTimestamp().getTime() + bucketSpan.toMillis() - 1; + } + Holder holder = createJobScheduler(scheduler, job, latestFinalBucketEndMs, handler, task); + innerRun(holder, startTime, endTime); + }, e -> { + if (e instanceof ResourceNotFoundException) { + Holder holder = createJobScheduler(scheduler, job, -1L, handler, task); + innerRun(holder, startTime, endTime); + } else { + handler.accept(e); } - - Scheduler scheduler = prelertMetadata.getScheduler(schedulerId); - Job job = prelertMetadata.getJobs().get(scheduler.getJobId()); - BucketsQueryBuilder.BucketsQuery latestBucketQuery = new BucketsQueryBuilder() - .sortField(Bucket.TIMESTAMP.getPreferredName()) - .sortDescending(true).size(1) - .includeInterim(false) - .build(); - jobProvider.buckets(job.getId(), latestBucketQuery, buckets -> { - long latestFinalBucketEndMs = -1L; - Duration bucketSpan = Duration.ofSeconds(job.getAnalysisConfig().getBucketSpan()); - if (buckets.results().size() == 1) { - latestFinalBucketEndMs = buckets.results().get(0).getTimestamp().getTime() + bucketSpan.toMillis() - 1; - } - innerRun(scheduler, job, startTime, endTime, task, latestFinalBucketEndMs, handler); - }, e -> { - if (e instanceof ResourceNotFoundException) { - innerRun(scheduler, job, startTime, endTime, task, -1, handler); - } else { - handler.accept(e); - } - }); }); } - private void innerRun(Scheduler scheduler, Job job, long startTime, Long endTime, StartSchedulerAction.SchedulerTask task, - long latestFinalBucketEndMs, Consumer handler) { - logger.info("Starting scheduler [{}] for job [{}]", scheduler.getId(), scheduler.getJobId()); - Holder holder = createJobScheduler(scheduler, job, latestFinalBucketEndMs, handler); - task.setHolder(holder); - holder.future = threadPool.executor(PrelertPlugin.SCHEDULED_RUNNER_THREAD_POOL_NAME).submit(() -> { - Long next = null; - try { - next = holder.scheduledJob.runLookBack(startTime, endTime); - } catch (ScheduledJob.ExtractionProblemException e) { - if (endTime == null) { - next = e.nextDelayInMsSinceEpoch; - } - holder.problemTracker.reportExtractionProblem(e.getCause().getMessage()); - } catch (ScheduledJob.AnalysisProblemException e) { - if (endTime == null) { - next = e.nextDelayInMsSinceEpoch; - } - holder.problemTracker.reportAnalysisProblem(e.getCause().getMessage()); - } catch (ScheduledJob.EmptyDataCountException e) { - if (endTime == null && holder.problemTracker.updateEmptyDataCount(true) == false) { - next = e.nextDelayInMsSinceEpoch; - } - } catch (Exception e) { - logger.error("Failed lookback import for job [" + job.getId() + "]", e); - holder.stop(e); + // Important: Holder must be created and assigned to SchedulerTask before setting status to started, + // otherwise if a stop scheduler call is made immediately after the start scheduler call we could cancel + // the SchedulerTask without stopping scheduler, which causes the scheduler to keep on running. + private void innerRun(Holder holder, long startTime, Long endTime) { + setJobSchedulerStatus(holder.scheduler.getId(), SchedulerStatus.STARTED, error -> { + if (error != null) { + holder.stop(error); return; } - if (next != null) { - doScheduleRealtime(next, job.getId(), holder); - } else { - holder.stop(null); - holder.problemTracker.finishReport(); - } + + logger.info("Starting scheduler [{}] for job [{}]", holder.scheduler.getId(), holder.scheduler.getJobId()); + holder.future = threadPool.executor(PrelertPlugin.SCHEDULED_RUNNER_THREAD_POOL_NAME).submit(() -> { + Long next = null; + try { + next = holder.scheduledJob.runLookBack(startTime, endTime); + } catch (ScheduledJob.ExtractionProblemException e) { + if (endTime == null) { + next = e.nextDelayInMsSinceEpoch; + } + holder.problemTracker.reportExtractionProblem(e.getCause().getMessage()); + } catch (ScheduledJob.AnalysisProblemException e) { + if (endTime == null) { + next = e.nextDelayInMsSinceEpoch; + } + holder.problemTracker.reportAnalysisProblem(e.getCause().getMessage()); + } catch (ScheduledJob.EmptyDataCountException e) { + if (endTime == null && holder.problemTracker.updateEmptyDataCount(true) == false) { + next = e.nextDelayInMsSinceEpoch; + } + } catch (Exception e) { + logger.error("Failed lookback import for job [" + holder.scheduler.getJobId() + "]", e); + holder.stop(e); + return; + } + if (next != null) { + doScheduleRealtime(next, holder.scheduler.getJobId(), holder); + } else { + holder.stop(null); + holder.problemTracker.finishReport(); + } + }); }); } @@ -190,14 +192,17 @@ public class ScheduledJobRunner extends AbstractComponent { ScheduledJobValidator.validate(scheduler.getConfig(), job); } - private Holder createJobScheduler(Scheduler scheduler, Job job, long latestFinalBucketEndMs, Consumer handler) { + private Holder createJobScheduler(Scheduler scheduler, Job job, long latestFinalBucketEndMs, Consumer handler, + StartSchedulerAction.SchedulerTask task) { Auditor auditor = jobProvider.audit(job.getId()); Duration frequency = getFrequencyOrDefault(scheduler, job); Duration queryDelay = Duration.ofSeconds(scheduler.getConfig().getQueryDelay()); DataExtractor dataExtractor = dataExtractorFactory.newExtractor(scheduler.getConfig(), job); ScheduledJob scheduledJob = new ScheduledJob(job.getId(), frequency.toMillis(), queryDelay.toMillis(), dataExtractor, client, auditor, currentTimeSupplier, latestFinalBucketEndMs, getLatestRecordTimestamp(job.getId())); - return new Holder(scheduler, scheduledJob, new ProblemTracker(() -> auditor), handler); + Holder holder = new Holder(scheduler, scheduledJob, new ProblemTracker(() -> auditor), handler); + task.setHolder(holder); + return holder; } private long getLatestRecordTimestamp(String jobId) {