scheduler: prevent stopping scheduler before the scheduler was actually started

The start scheduler waits until the scheduler state has been set to started before returning.
Before this change after the scheduler state has been set to started, the scheduler would link itself to the task the start scheduler api has created.
If the stop scheduler api was called immediately after the start scheduler api then this could lead the stop scheduler api cancelling the task without
stopping the scheduler, as the scheduler could not have been linked to the task.

Now the scheduler gets linked to the task before the scheduler state is set to started, fixing the problematic situation discribed above.

Original commit: elastic/x-pack-elasticsearch@8334ae1967
This commit is contained in:
Martijn van Groningen 2017-01-05 20:56:49 +01:00
parent b9205142a4
commit 66a2f999e5
1 changed files with 65 additions and 60 deletions

View File

@ -64,70 +64,72 @@ public class ScheduledJobRunner extends AbstractComponent {
PrelertMetadata prelertMetadata = clusterService.state().metaData().custom(PrelertMetadata.TYPE);
validate(schedulerId, prelertMetadata);
setJobSchedulerStatus(schedulerId, SchedulerStatus.STARTED, error -> {
if (error != null) {
handler.accept(error);
return;
Scheduler scheduler = prelertMetadata.getScheduler(schedulerId);
Job job = prelertMetadata.getJobs().get(scheduler.getJobId());
BucketsQueryBuilder.BucketsQuery latestBucketQuery = new BucketsQueryBuilder()
.sortField(Bucket.TIMESTAMP.getPreferredName())
.sortDescending(true).size(1)
.includeInterim(false)
.build();
jobProvider.buckets(job.getId(), latestBucketQuery, buckets -> {
long latestFinalBucketEndMs = -1L;
Duration bucketSpan = Duration.ofSeconds(job.getAnalysisConfig().getBucketSpan());
if (buckets.results().size() == 1) {
latestFinalBucketEndMs = buckets.results().get(0).getTimestamp().getTime() + bucketSpan.toMillis() - 1;
}
Holder holder = createJobScheduler(scheduler, job, latestFinalBucketEndMs, handler, task);
innerRun(holder, startTime, endTime);
}, e -> {
if (e instanceof ResourceNotFoundException) {
Holder holder = createJobScheduler(scheduler, job, -1L, handler, task);
innerRun(holder, startTime, endTime);
} else {
handler.accept(e);
}
Scheduler scheduler = prelertMetadata.getScheduler(schedulerId);
Job job = prelertMetadata.getJobs().get(scheduler.getJobId());
BucketsQueryBuilder.BucketsQuery latestBucketQuery = new BucketsQueryBuilder()
.sortField(Bucket.TIMESTAMP.getPreferredName())
.sortDescending(true).size(1)
.includeInterim(false)
.build();
jobProvider.buckets(job.getId(), latestBucketQuery, buckets -> {
long latestFinalBucketEndMs = -1L;
Duration bucketSpan = Duration.ofSeconds(job.getAnalysisConfig().getBucketSpan());
if (buckets.results().size() == 1) {
latestFinalBucketEndMs = buckets.results().get(0).getTimestamp().getTime() + bucketSpan.toMillis() - 1;
}
innerRun(scheduler, job, startTime, endTime, task, latestFinalBucketEndMs, handler);
}, e -> {
if (e instanceof ResourceNotFoundException) {
innerRun(scheduler, job, startTime, endTime, task, -1, handler);
} else {
handler.accept(e);
}
});
});
}
private void innerRun(Scheduler scheduler, Job job, long startTime, Long endTime, StartSchedulerAction.SchedulerTask task,
long latestFinalBucketEndMs, Consumer<Exception> handler) {
logger.info("Starting scheduler [{}] for job [{}]", scheduler.getId(), scheduler.getJobId());
Holder holder = createJobScheduler(scheduler, job, latestFinalBucketEndMs, handler);
task.setHolder(holder);
holder.future = threadPool.executor(PrelertPlugin.SCHEDULED_RUNNER_THREAD_POOL_NAME).submit(() -> {
Long next = null;
try {
next = holder.scheduledJob.runLookBack(startTime, endTime);
} catch (ScheduledJob.ExtractionProblemException e) {
if (endTime == null) {
next = e.nextDelayInMsSinceEpoch;
}
holder.problemTracker.reportExtractionProblem(e.getCause().getMessage());
} catch (ScheduledJob.AnalysisProblemException e) {
if (endTime == null) {
next = e.nextDelayInMsSinceEpoch;
}
holder.problemTracker.reportAnalysisProblem(e.getCause().getMessage());
} catch (ScheduledJob.EmptyDataCountException e) {
if (endTime == null && holder.problemTracker.updateEmptyDataCount(true) == false) {
next = e.nextDelayInMsSinceEpoch;
}
} catch (Exception e) {
logger.error("Failed lookback import for job [" + job.getId() + "]", e);
holder.stop(e);
// Important: Holder must be created and assigned to SchedulerTask before setting status to started,
// otherwise if a stop scheduler call is made immediately after the start scheduler call we could cancel
// the SchedulerTask without stopping scheduler, which causes the scheduler to keep on running.
private void innerRun(Holder holder, long startTime, Long endTime) {
setJobSchedulerStatus(holder.scheduler.getId(), SchedulerStatus.STARTED, error -> {
if (error != null) {
holder.stop(error);
return;
}
if (next != null) {
doScheduleRealtime(next, job.getId(), holder);
} else {
holder.stop(null);
holder.problemTracker.finishReport();
}
logger.info("Starting scheduler [{}] for job [{}]", holder.scheduler.getId(), holder.scheduler.getJobId());
holder.future = threadPool.executor(PrelertPlugin.SCHEDULED_RUNNER_THREAD_POOL_NAME).submit(() -> {
Long next = null;
try {
next = holder.scheduledJob.runLookBack(startTime, endTime);
} catch (ScheduledJob.ExtractionProblemException e) {
if (endTime == null) {
next = e.nextDelayInMsSinceEpoch;
}
holder.problemTracker.reportExtractionProblem(e.getCause().getMessage());
} catch (ScheduledJob.AnalysisProblemException e) {
if (endTime == null) {
next = e.nextDelayInMsSinceEpoch;
}
holder.problemTracker.reportAnalysisProblem(e.getCause().getMessage());
} catch (ScheduledJob.EmptyDataCountException e) {
if (endTime == null && holder.problemTracker.updateEmptyDataCount(true) == false) {
next = e.nextDelayInMsSinceEpoch;
}
} catch (Exception e) {
logger.error("Failed lookback import for job [" + holder.scheduler.getJobId() + "]", e);
holder.stop(e);
return;
}
if (next != null) {
doScheduleRealtime(next, holder.scheduler.getJobId(), holder);
} else {
holder.stop(null);
holder.problemTracker.finishReport();
}
});
});
}
@ -190,14 +192,17 @@ public class ScheduledJobRunner extends AbstractComponent {
ScheduledJobValidator.validate(scheduler.getConfig(), job);
}
private Holder createJobScheduler(Scheduler scheduler, Job job, long latestFinalBucketEndMs, Consumer<Exception> handler) {
private Holder createJobScheduler(Scheduler scheduler, Job job, long latestFinalBucketEndMs, Consumer<Exception> handler,
StartSchedulerAction.SchedulerTask task) {
Auditor auditor = jobProvider.audit(job.getId());
Duration frequency = getFrequencyOrDefault(scheduler, job);
Duration queryDelay = Duration.ofSeconds(scheduler.getConfig().getQueryDelay());
DataExtractor dataExtractor = dataExtractorFactory.newExtractor(scheduler.getConfig(), job);
ScheduledJob scheduledJob = new ScheduledJob(job.getId(), frequency.toMillis(), queryDelay.toMillis(),
dataExtractor, client, auditor, currentTimeSupplier, latestFinalBucketEndMs, getLatestRecordTimestamp(job.getId()));
return new Holder(scheduler, scheduledJob, new ProblemTracker(() -> auditor), handler);
Holder holder = new Holder(scheduler, scheduledJob, new ProblemTracker(() -> auditor), handler);
task.setHolder(holder);
return holder;
}
private long getLatestRecordTimestamp(String jobId) {