From 1d891965c1d1f16ba6fccb1a19cb64d662018f32 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Mon, 16 Jan 2017 11:28:28 +0100 Subject: [PATCH] Stop scheduled job only once. If scheduled job concurrently gets stopped from within (e.g. lookback) and externally via the stop scheduler api then make sure to execute the stop logic only once. Original commit: elastic/x-pack-elasticsearch@505c44f5155b409e473f144d3985b5340d00a91b --- .../xpack/ml/scheduler/ScheduledJob.java | 21 ++++++++++++++----- .../ml/scheduler/ScheduledJobRunner.java | 12 +++++++---- 2 files changed, 24 insertions(+), 9 deletions(-) diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/scheduler/ScheduledJob.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/scheduler/ScheduledJob.java index 7947de35c40..07bfcd61874 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/scheduler/ScheduledJob.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/scheduler/ScheduledJob.java @@ -24,6 +24,7 @@ import java.io.IOException; import java.io.InputStream; import java.util.Optional; import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Supplier; class ScheduledJob { @@ -41,7 +42,7 @@ class ScheduledJob { private volatile long lookbackStartTimeMs; private volatile Long lastEndTimeMs; - private volatile boolean running = true; + private AtomicBoolean running = new AtomicBoolean(true); ScheduledJob(String jobId, long frequencyMs, long queryDelayMs, DataExtractorFactory dataExtractorFactory, Client client, Auditor auditor, Supplier currentTimeSupplier, @@ -103,13 +104,23 @@ class ScheduledJob { return nextRealtimeTimestamp(); } - public void stop() { - running = false; - auditor.info(Messages.getMessage(Messages.JOB_AUDIT_SCHEDULER_STOPPED)); + /** + * Stops the scheduled job + * + * @return true when the scheduler was running and this method invocation stopped it, + * otherwise false is returned + */ + public boolean stop() { + if (running.compareAndSet(true, false)) { + auditor.info(Messages.getMessage(Messages.JOB_AUDIT_SCHEDULER_STOPPED)); + return true; + } else { + return false; + } } public boolean isRunning() { - return running; + return running.get(); } private void run(long start, long end, FlushJobAction.Request flushRequest) throws IOException { diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/scheduler/ScheduledJobRunner.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/scheduler/ScheduledJobRunner.java index 1e6ab130abd..ef402b7abd7 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/scheduler/ScheduledJobRunner.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/scheduler/ScheduledJobRunner.java @@ -269,10 +269,14 @@ public class ScheduledJobRunner extends AbstractComponent { } public void stop(Exception e) { - logger.info("Stopping scheduler [{}] for job [{}]", scheduler.getId(), scheduler.getJobId()); - scheduledJob.stop(); - FutureUtils.cancel(future); - setJobSchedulerStatus(scheduler.getId(), SchedulerStatus.STOPPED, error -> handler.accept(e)); + logger.info("attempt to stop scheduler [{}] for job [{}]", scheduler.getId(), scheduler.getJobId()); + if (scheduledJob.stop()) { + FutureUtils.cancel(future); + setJobSchedulerStatus(scheduler.getId(), SchedulerStatus.STOPPED, error -> handler.accept(e)); + logger.info("scheduler [{}] for job [{}] has been stopped", scheduler.getId(), scheduler.getJobId()); + } else { + logger.info("scheduler [{}] for job [{}] was already stopped", scheduler.getId(), scheduler.getJobId()); + } } }