Stop scheduled job only once.

If scheduled job concurrently gets stopped from within (e.g. lookback) and externally via the stop scheduler api then make sure to execute the stop logic only once.

Original commit: elastic/x-pack-elasticsearch@505c44f515
This commit is contained in:
Martijn van Groningen 2017-01-16 11:28:28 +01:00
parent e0b6a1e493
commit 1d891965c1
2 changed files with 24 additions and 9 deletions

View File

@ -24,6 +24,7 @@ import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.util.Optional; import java.util.Optional;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier; import java.util.function.Supplier;
class ScheduledJob { class ScheduledJob {
@ -41,7 +42,7 @@ class ScheduledJob {
private volatile long lookbackStartTimeMs; private volatile long lookbackStartTimeMs;
private volatile Long lastEndTimeMs; private volatile Long lastEndTimeMs;
private volatile boolean running = true; private AtomicBoolean running = new AtomicBoolean(true);
ScheduledJob(String jobId, long frequencyMs, long queryDelayMs, DataExtractorFactory dataExtractorFactory, ScheduledJob(String jobId, long frequencyMs, long queryDelayMs, DataExtractorFactory dataExtractorFactory,
Client client, Auditor auditor, Supplier<Long> currentTimeSupplier, Client client, Auditor auditor, Supplier<Long> currentTimeSupplier,
@ -103,13 +104,23 @@ class ScheduledJob {
return nextRealtimeTimestamp(); return nextRealtimeTimestamp();
} }
public void stop() { /**
running = false; * Stops the scheduled job
auditor.info(Messages.getMessage(Messages.JOB_AUDIT_SCHEDULER_STOPPED)); *
* @return <code>true</code> when the scheduler was running and this method invocation stopped it,
* otherwise <code>false</code> is returned
*/
public boolean stop() {
if (running.compareAndSet(true, false)) {
auditor.info(Messages.getMessage(Messages.JOB_AUDIT_SCHEDULER_STOPPED));
return true;
} else {
return false;
}
} }
public boolean isRunning() { public boolean isRunning() {
return running; return running.get();
} }
private void run(long start, long end, FlushJobAction.Request flushRequest) throws IOException { private void run(long start, long end, FlushJobAction.Request flushRequest) throws IOException {

View File

@ -269,10 +269,14 @@ public class ScheduledJobRunner extends AbstractComponent {
} }
public void stop(Exception e) { public void stop(Exception e) {
logger.info("Stopping scheduler [{}] for job [{}]", scheduler.getId(), scheduler.getJobId()); logger.info("attempt to stop scheduler [{}] for job [{}]", scheduler.getId(), scheduler.getJobId());
scheduledJob.stop(); if (scheduledJob.stop()) {
FutureUtils.cancel(future); FutureUtils.cancel(future);
setJobSchedulerStatus(scheduler.getId(), SchedulerStatus.STOPPED, error -> handler.accept(e)); setJobSchedulerStatus(scheduler.getId(), SchedulerStatus.STOPPED, error -> handler.accept(e));
logger.info("scheduler [{}] for job [{}] has been stopped", scheduler.getId(), scheduler.getJobId());
} else {
logger.info("scheduler [{}] for job [{}] was already stopped", scheduler.getId(), scheduler.getJobId());
}
} }
} }