diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/scheduler/ScheduledJob.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/scheduler/ScheduledJob.java index 7947de35c40..07bfcd61874 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/scheduler/ScheduledJob.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/scheduler/ScheduledJob.java @@ -24,6 +24,7 @@ import java.io.IOException; import java.io.InputStream; import java.util.Optional; import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Supplier; class ScheduledJob { @@ -41,7 +42,7 @@ class ScheduledJob { private volatile long lookbackStartTimeMs; private volatile Long lastEndTimeMs; - private volatile boolean running = true; + private AtomicBoolean running = new AtomicBoolean(true); ScheduledJob(String jobId, long frequencyMs, long queryDelayMs, DataExtractorFactory dataExtractorFactory, Client client, Auditor auditor, Supplier currentTimeSupplier, @@ -103,13 +104,23 @@ class ScheduledJob { return nextRealtimeTimestamp(); } - public void stop() { - running = false; - auditor.info(Messages.getMessage(Messages.JOB_AUDIT_SCHEDULER_STOPPED)); + /** + * Stops the scheduled job + * + * @return true when the scheduler was running and this method invocation stopped it, + * otherwise false is returned + */ + public boolean stop() { + if (running.compareAndSet(true, false)) { + auditor.info(Messages.getMessage(Messages.JOB_AUDIT_SCHEDULER_STOPPED)); + return true; + } else { + return false; + } } public boolean isRunning() { - return running; + return running.get(); } private void run(long start, long end, FlushJobAction.Request flushRequest) throws IOException { diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/scheduler/ScheduledJobRunner.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/scheduler/ScheduledJobRunner.java index 1e6ab130abd..ef402b7abd7 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/scheduler/ScheduledJobRunner.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/scheduler/ScheduledJobRunner.java @@ -269,10 +269,14 @@ public class ScheduledJobRunner extends AbstractComponent { } public void stop(Exception e) { - logger.info("Stopping scheduler [{}] for job [{}]", scheduler.getId(), scheduler.getJobId()); - scheduledJob.stop(); - FutureUtils.cancel(future); - setJobSchedulerStatus(scheduler.getId(), SchedulerStatus.STOPPED, error -> handler.accept(e)); + logger.info("attempt to stop scheduler [{}] for job [{}]", scheduler.getId(), scheduler.getJobId()); + if (scheduledJob.stop()) { + FutureUtils.cancel(future); + setJobSchedulerStatus(scheduler.getId(), SchedulerStatus.STOPPED, error -> handler.accept(e)); + logger.info("scheduler [{}] for job [{}] has been stopped", scheduler.getId(), scheduler.getJobId()); + } else { + logger.info("scheduler [{}] for job [{}] was already stopped", scheduler.getId(), scheduler.getJobId()); + } } }