Watcher: Make SchedulerEngine job handling threadsafe (elastic/elasticsearch#3955)

The old handling was not thread safe, as it used to replace volatile
objects in the code. This implementation uses a concurrent hashmap
to easily allow adding/removing schedules without having to replace
whole objects

Original commit: elastic/x-pack-elasticsearch@0aa618b372
This commit is contained in:
Alexander Reelsen 2016-11-02 14:50:44 +01:00 committed by GitHub
parent 176829c4cc
commit 87ee1f30d6
1 changed files with 20 additions and 78 deletions

View File

@ -5,12 +5,12 @@
*/ */
package org.elasticsearch.xpack.scheduler; package org.elasticsearch.xpack.scheduler;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.FutureUtils; import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.xpack.support.clock.Clock; import org.elasticsearch.xpack.support.clock.Clock;
import java.util.Collection; import java.util.Collection;
import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArrayList;
@ -18,9 +18,6 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import static java.util.Collections.unmodifiableMap;
public class SchedulerEngine { public class SchedulerEngine {
@ -90,7 +87,7 @@ public class SchedulerEngine {
long nextScheduledTimeAfter(long startTime, long now); long nextScheduledTimeAfter(long startTime, long now);
} }
private volatile Schedules schedules; private final Map<String, ActiveSchedule> schedules = ConcurrentCollections.newConcurrentMap();
private ScheduledExecutorService scheduler; private ScheduledExecutorService scheduler;
private final Clock clock; private final Clock clock;
private List<Listener> listeners = new CopyOnWriteArrayList<>(); private List<Listener> listeners = new CopyOnWriteArrayList<>();
@ -105,11 +102,7 @@ public class SchedulerEngine {
public void start(Collection<Job> jobs) { public void start(Collection<Job> jobs) {
this.scheduler = Executors.newScheduledThreadPool(1, EsExecutors.daemonThreadFactory("trigger_engine_scheduler")); this.scheduler = Executors.newScheduledThreadPool(1, EsExecutors.daemonThreadFactory("trigger_engine_scheduler"));
long starTime = clock.millis(); jobs.forEach(this::add);
List<ActiveSchedule> schedules = jobs.stream()
.map(job -> new ActiveSchedule(job.getId(), job.getSchedule(), starTime))
.collect(Collectors.toList());
this.schedules = new Schedules(schedules);
} }
public void stop() { public void stop() {
@ -123,16 +116,20 @@ public class SchedulerEngine {
public void add(Job job) { public void add(Job job) {
ActiveSchedule schedule = new ActiveSchedule(job.getId(), job.getSchedule(), clock.millis()); ActiveSchedule schedule = new ActiveSchedule(job.getId(), job.getSchedule(), clock.millis());
schedules = schedules.add(schedule); schedules.compute(schedule.name, (name, previousSchedule) -> {
if (previousSchedule != null) {
previousSchedule.cancel();
}
return schedule;
});
} }
public boolean remove(String jobId) { public boolean remove(String jobId) {
Schedules newSchedules = schedules.remove(jobId); ActiveSchedule removedSchedule = schedules.remove(jobId);
if (newSchedules == null) { if (removedSchedule != null) {
return false; removedSchedule.cancel();
} }
schedules = newSchedules; return removedSchedule != null;
return true;
} }
protected void notifyListeners(String name, long triggeredTime, long scheduledTime) { protected void notifyListeners(String name, long triggeredTime, long scheduledTime) {
@ -155,20 +152,20 @@ public class SchedulerEngine {
this.name = name; this.name = name;
this.schedule = schedule; this.schedule = schedule;
this.startTime = startTime; this.startTime = startTime;
this.scheduledTime = schedule.nextScheduledTimeAfter(startTime, startTime); this.scheduleNextRun(startTime);
if (scheduledTime != -1) {
long delay = Math.max(0, scheduledTime - clock.millis());
future = scheduler.schedule(this, delay, TimeUnit.MILLISECONDS);
}
} }
@Override @Override
public void run() { public void run() {
long triggeredTime = clock.millis(); long triggeredTime = clock.millis();
notifyListeners(name, triggeredTime, scheduledTime); notifyListeners(name, triggeredTime, scheduledTime);
scheduledTime = schedule.nextScheduledTimeAfter(startTime, triggeredTime); scheduleNextRun(triggeredTime);
}
private void scheduleNextRun(long currentTime) {
this.scheduledTime = schedule.nextScheduledTimeAfter(startTime, currentTime);
if (scheduledTime != -1) { if (scheduledTime != -1) {
long delay = Math.max(0, scheduledTime - triggeredTime); long delay = Math.max(0, scheduledTime - currentTime);
future = scheduler.schedule(this, delay, TimeUnit.MILLISECONDS); future = scheduler.schedule(this, delay, TimeUnit.MILLISECONDS);
} }
} }
@ -177,59 +174,4 @@ public class SchedulerEngine {
FutureUtils.cancel(future); FutureUtils.cancel(future);
} }
} }
static class Schedules {
private final Map<String, ActiveSchedule> scheduleByName;
Schedules(Collection<ActiveSchedule> schedules) {
Map<String, ActiveSchedule> builder = new HashMap<>();
for (ActiveSchedule schedule : schedules) {
builder.put(schedule.name, schedule);
}
this.scheduleByName = unmodifiableMap(builder);
}
public Schedules(Map<String, ActiveSchedule> scheduleByName) {
this.scheduleByName = scheduleByName;
}
public Schedules add(ActiveSchedule schedule) {
boolean replacing = scheduleByName.containsKey(schedule.name);
if (!replacing) {
Map<String, ActiveSchedule> newScheduleByName = new HashMap<>(scheduleByName);
newScheduleByName.put(schedule.name, schedule);
return new Schedules(unmodifiableMap(newScheduleByName));
}
Map<String, ActiveSchedule> builder = new HashMap<>(scheduleByName.size());
for (Map.Entry<String, ActiveSchedule> scheduleEntry : scheduleByName.entrySet()) {
final String existingScheduleName = scheduleEntry.getKey();
final ActiveSchedule existingSchedule = scheduleEntry.getValue();
if (existingScheduleName.equals(schedule.name)) {
existingSchedule.cancel();
builder.put(schedule.name, schedule);
} else {
builder.put(existingScheduleName, existingSchedule);
}
}
return new Schedules(unmodifiableMap(builder));
}
public Schedules remove(String name) {
if (!scheduleByName.containsKey(name)) {
return null;
}
Map<String, ActiveSchedule> builder = new HashMap<>(scheduleByName.size() - 1);
for (Map.Entry<String, ActiveSchedule> scheduleEntry : scheduleByName.entrySet()) {
final String existingScheduleName = scheduleEntry.getKey();
final ActiveSchedule existingSchedule = scheduleEntry.getValue();
if (existingScheduleName.equals(name)) {
existingSchedule.cancel();
} else {
builder.put(existingScheduleName, existingSchedule);
}
}
return new Schedules(unmodifiableMap(builder));
}
}
} }