From 961656932c1fa2e10c22d41580228ca565a9a605 Mon Sep 17 00:00:00 2001 From: Areek Zillur Date: Tue, 31 May 2016 16:15:54 -0400 Subject: [PATCH] extract schedule engine to x-pack package Original commit: elastic/x-pack-elasticsearch@f9688823066083425f10b4eb9e700d8483a791bf --- .../xpack/scheduler/SchedulerEngine.java | 249 ++++++++++++++++++ .../watcher/trigger/schedule/Schedule.java | 20 +- .../SchedulerScheduleTriggerEngine.java | 160 ++--------- 3 files changed, 269 insertions(+), 160 deletions(-) create mode 100644 elasticsearch/x-pack/src/main/java/org/elasticsearch/xpack/scheduler/SchedulerEngine.java diff --git a/elasticsearch/x-pack/src/main/java/org/elasticsearch/xpack/scheduler/SchedulerEngine.java b/elasticsearch/x-pack/src/main/java/org/elasticsearch/xpack/scheduler/SchedulerEngine.java new file mode 100644 index 00000000000..5d6a57b77a9 --- /dev/null +++ b/elasticsearch/x-pack/src/main/java/org/elasticsearch/xpack/scheduler/SchedulerEngine.java @@ -0,0 +1,249 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.scheduler; + +import org.elasticsearch.common.util.concurrent.EsExecutors; +import org.elasticsearch.common.util.concurrent.FutureUtils; +import org.elasticsearch.xpack.watcher.support.clock.Clock; + +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import static java.util.Collections.unmodifiableMap; + +/** + * + */ +public class SchedulerEngine { + + public static class Job { + private final String id; + private final Schedule schedule; + + public Job(String id, Schedule schedule) { + this.id = id; + this.schedule = schedule; + } + + public String getId() { + return id; + } + + public Schedule getSchedule() { + return schedule; + } + } + + public static class Event { + private final String jobName; + private final long triggeredTime; + private final long scheduledTime; + + public Event(String jobName, long triggeredTime, long scheduledTime) { + this.jobName = jobName; + this.triggeredTime = triggeredTime; + this.scheduledTime = scheduledTime; + } + + public String getJobName() { + return jobName; + } + + public long getTriggeredTime() { + return triggeredTime; + } + + public long getScheduledTime() { + return scheduledTime; + } + } + + public interface Listener { + void triggered(Event event); + } + + public interface Schedule { + + /** + * Returns the next scheduled time after the given time, according to this schedule. If the given schedule + * cannot resolve the next scheduled time, then {@code -1} is returned. It really depends on the type of + * schedule to determine when {@code -1} is returned. Some schedules (e.g. IntervalSchedule) will never return + * {@code -1} as they can always compute the next scheduled time. {@code Cron} based schedules are good example + * of schedules that may return {@code -1}, for example, when the schedule only points to times that are all + * before the given time (in which case, there is no next scheduled time for the given time). + * + * Example: + * + * cron 0 0 0 * 1 ? 2013 (only points to days in January 2013) + * + * time 2015-01-01 12:00:00 (this time is in 2015) + * + */ + long nextScheduledTimeAfter(long startTime, long now); + } + + private volatile Schedules schedules; + private ScheduledExecutorService scheduler; + private final Clock clock; + private List listeners = new CopyOnWriteArrayList<>(); + + public SchedulerEngine(Clock clock) { + this.clock = clock; + } + + public void register(Listener listener) { + listeners.add(listener); + } + + public void start(Collection jobs) { + this.scheduler = Executors.newScheduledThreadPool(1, EsExecutors.daemonThreadFactory("trigger_engine_scheduler")); + long starTime = clock.millis(); + List schedules = jobs.stream() + .map(job -> new ActiveSchedule(job.getId(), job.getSchedule(), starTime)) + .collect(Collectors.toList()); + this.schedules = new Schedules(schedules); + } + + public void stop() { + scheduler.shutdownNow(); + try { + scheduler.awaitTermination(5, TimeUnit.SECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + + public void add(Job job) { + ActiveSchedule schedule = new ActiveSchedule(job.getId(), job.getSchedule(), clock.millis()); + schedules = schedules.add(schedule); + } + + public boolean remove(String jobId) { + Schedules newSchedules = schedules.remove(jobId); + if (newSchedules == null) { + return false; + } + schedules = newSchedules; + return true; + } + + protected void notifyListeners(String name, long triggeredTime, long scheduledTime) { + final Event event = new Event(name, triggeredTime, scheduledTime); + for (Listener listener : listeners) { + listener.triggered(event); + } + } + + class ActiveSchedule implements Runnable { + + private final String name; + private final Schedule schedule; + private final long startTime; + + private volatile ScheduledFuture future; + private volatile long scheduledTime; + + public ActiveSchedule(String name, Schedule schedule, long startTime) { + this.name = name; + this.schedule = schedule; + this.startTime = startTime; + this.scheduledTime = schedule.nextScheduledTimeAfter(startTime, startTime); + if (scheduledTime != -1) { + long delay = Math.max(0, scheduledTime - clock.millis()); + future = scheduler.schedule(this, delay, TimeUnit.MILLISECONDS); + } + } + + @Override + public void run() { + long triggeredTime = clock.millis(); + notifyListeners(name, triggeredTime, scheduledTime); + scheduledTime = schedule.nextScheduledTimeAfter(startTime, triggeredTime); + if (scheduledTime != -1) { + long delay = Math.max(0, scheduledTime - triggeredTime); + future = scheduler.schedule(this, delay, TimeUnit.MILLISECONDS); + } + } + + public void cancel() { + FutureUtils.cancel(future); + } + } + + static class Schedules { + + private final ActiveSchedule[] schedules; + private final Map scheduleByName; + + Schedules(Collection schedules) { + Map builder = new HashMap<>(); + this.schedules = new ActiveSchedule[schedules.size()]; + int i = 0; + for (ActiveSchedule schedule : schedules) { + builder.put(schedule.name, schedule); + this.schedules[i++] = schedule; + } + this.scheduleByName = unmodifiableMap(builder); + } + + public Schedules(ActiveSchedule[] schedules, Map scheduleByName) { + this.schedules = schedules; + this.scheduleByName = scheduleByName; + } + + public Schedules add(ActiveSchedule schedule) { + boolean replacing = scheduleByName.containsKey(schedule.name); + if (!replacing) { + ActiveSchedule[] newSchedules = new ActiveSchedule[schedules.length + 1]; + System.arraycopy(schedules, 0, newSchedules, 0, schedules.length); + newSchedules[schedules.length] = schedule; + Map newScheduleByName = new HashMap<>(scheduleByName); + newScheduleByName.put(schedule.name, schedule); + return new Schedules(newSchedules, unmodifiableMap(newScheduleByName)); + } + ActiveSchedule[] newSchedules = new ActiveSchedule[schedules.length]; + Map builder = new HashMap<>(); + for (int i = 0; i < schedules.length; i++) { + final ActiveSchedule sched; + if (schedules[i].name.equals(schedule.name)) { + sched = schedule; + schedules[i].cancel(); + } else { + sched = schedules[i]; + } + newSchedules[i] = sched; + builder.put(sched.name, sched); + } + return new Schedules(newSchedules, unmodifiableMap(builder)); + } + + public Schedules remove(String name) { + if (!scheduleByName.containsKey(name)) { + return null; + } + Map builder = new HashMap<>(); + ActiveSchedule[] newSchedules = new ActiveSchedule[schedules.length - 1]; + int i = 0; + for (ActiveSchedule schedule : schedules) { + if (!schedule.name.equals(name)) { + newSchedules[i++] = schedule; + builder.put(schedule.name, schedule); + } else { + schedule.cancel(); + } + } + return new Schedules(newSchedules, unmodifiableMap(builder)); + } + } +} diff --git a/elasticsearch/x-pack/watcher/src/main/java/org/elasticsearch/xpack/watcher/trigger/schedule/Schedule.java b/elasticsearch/x-pack/watcher/src/main/java/org/elasticsearch/xpack/watcher/trigger/schedule/Schedule.java index ce2a552a03b..5125b93f59a 100644 --- a/elasticsearch/x-pack/watcher/src/main/java/org/elasticsearch/xpack/watcher/trigger/schedule/Schedule.java +++ b/elasticsearch/x-pack/watcher/src/main/java/org/elasticsearch/xpack/watcher/trigger/schedule/Schedule.java @@ -7,33 +7,17 @@ package org.elasticsearch.xpack.watcher.trigger.schedule; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.xpack.scheduler.SchedulerEngine; import java.io.IOException; /** * */ -public interface Schedule extends ToXContent { +public interface Schedule extends SchedulerEngine.Schedule, ToXContent { String type(); - /** - * Returns the next scheduled time after the given time, according to this schedule. If the given schedule - * cannot resolve the next scheduled time, then {@code -1} is returned. It really depends on the type of - * schedule to determine when {@code -1} is returned. Some schedules (e.g. IntervalSchedule) will never return - * {@code -1} as they can always compute the next scheduled time. {@code Cron} based schedules are good example - * of schedules that may return {@code -1}, for example, when the schedule only points to times that are all - * before the given time (in which case, there is no next scheduled time for the given time). - * - * Example: - * - * cron 0 0 0 * 1 ? 2013 (only points to days in January 2013) - * - * time 2015-01-01 12:00:00 (this time is in 2015) - * - */ - long nextScheduledTimeAfter(long startTime, long time); - interface Parser { String type(); diff --git a/elasticsearch/x-pack/watcher/src/main/java/org/elasticsearch/xpack/watcher/trigger/schedule/engine/SchedulerScheduleTriggerEngine.java b/elasticsearch/x-pack/watcher/src/main/java/org/elasticsearch/xpack/watcher/trigger/schedule/engine/SchedulerScheduleTriggerEngine.java index 392fc178d22..6a204d82063 100644 --- a/elasticsearch/x-pack/watcher/src/main/java/org/elasticsearch/xpack/watcher/trigger/schedule/engine/SchedulerScheduleTriggerEngine.java +++ b/elasticsearch/x-pack/watcher/src/main/java/org/elasticsearch/xpack/watcher/trigger/schedule/engine/SchedulerScheduleTriggerEngine.java @@ -7,11 +7,9 @@ package org.elasticsearch.xpack.watcher.trigger.schedule.engine; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.util.concurrent.EsExecutors; -import org.elasticsearch.common.util.concurrent.FutureUtils; +import org.elasticsearch.xpack.scheduler.SchedulerEngine; import org.elasticsearch.xpack.watcher.support.clock.Clock; import org.elasticsearch.xpack.watcher.trigger.TriggerEvent; -import org.elasticsearch.xpack.watcher.trigger.schedule.Schedule; import org.elasticsearch.xpack.watcher.trigger.schedule.ScheduleRegistry; import org.elasticsearch.xpack.watcher.trigger.schedule.ScheduleTrigger; import org.elasticsearch.xpack.watcher.trigger.schedule.ScheduleTriggerEngine; @@ -20,56 +18,43 @@ import org.joda.time.DateTime; import org.joda.time.DateTimeZone; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collection; -import java.util.HashMap; +import java.util.Collections; import java.util.List; -import java.util.Map; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; - -import static java.util.Collections.unmodifiableMap; /** * */ public class SchedulerScheduleTriggerEngine extends ScheduleTriggerEngine { - private volatile Schedules schedules; - private ScheduledExecutorService scheduler; + private final SchedulerEngine schedulerEngine; @Inject public SchedulerScheduleTriggerEngine(Settings settings, ScheduleRegistry scheduleRegistry, Clock clock) { super(settings, scheduleRegistry, clock); + this.schedulerEngine = new SchedulerEngine(clock); + this.schedulerEngine.register(event -> + notifyListeners(event.getJobName(), event.getTriggeredTime(), event.getScheduledTime())); } @Override public void start(Collection jobs) { logger.debug("starting schedule engine..."); - this.scheduler = Executors.newScheduledThreadPool(1, EsExecutors.daemonThreadFactory("trigger_engine_scheduler")); - long starTime = clock.millis(); - List schedules = new ArrayList<>(); - for (Job job : jobs) { - if (job.trigger() instanceof ScheduleTrigger) { - ScheduleTrigger trigger = (ScheduleTrigger) job.trigger(); - schedules.add(new ActiveSchedule(job.id(), trigger.getSchedule(), starTime)); - } - } - this.schedules = new Schedules(schedules); + final List schedulerJobs = new ArrayList<>(); + jobs.stream() + .filter(job -> job.trigger() instanceof ScheduleTrigger) + .forEach(job -> { + ScheduleTrigger trigger = (ScheduleTrigger) job.trigger(); + schedulerJobs.add(new SchedulerEngine.Job(job.id(), trigger.getSchedule())); + }); + schedulerEngine.start(schedulerJobs); logger.debug("schedule engine started at [{}]", clock.nowUTC()); } @Override public void stop() { logger.debug("stopping schedule engine..."); - scheduler.shutdownNow(); - try { - scheduler.awaitTermination(5, TimeUnit.SECONDS); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } + schedulerEngine.stop(); logger.debug("schedule engine stopped"); } @@ -77,18 +62,12 @@ public class SchedulerScheduleTriggerEngine extends ScheduleTriggerEngine { public void add(Job job) { assert job.trigger() instanceof ScheduleTrigger; ScheduleTrigger trigger = (ScheduleTrigger) job.trigger(); - ActiveSchedule schedule = new ActiveSchedule(job.id(), trigger.getSchedule(), clock.millis()); - schedules = schedules.add(schedule); + schedulerEngine.add(new SchedulerEngine.Job(job.id(), trigger.getSchedule())); } @Override public boolean remove(String jobId) { - Schedules newSchedules = schedules.remove(jobId); - if (newSchedules == null) { - return false; - } - schedules = newSchedules; - return true; + return schedulerEngine.remove(jobId); } protected void notifyListeners(String name, long triggeredTime, long scheduledTime) { @@ -97,110 +76,7 @@ public class SchedulerScheduleTriggerEngine extends ScheduleTriggerEngine { final ScheduleTriggerEvent event = new ScheduleTriggerEvent(name, new DateTime(triggeredTime, DateTimeZone.UTC), new DateTime(scheduledTime, DateTimeZone.UTC)); for (Listener listener : listeners) { - listener.triggered(Arrays.asList(event)); + listener.triggered(Collections.singletonList(event)); } } - - class ActiveSchedule implements Runnable { - - private final String name; - private final Schedule schedule; - private final long startTime; - - private volatile ScheduledFuture future; - private volatile long scheduledTime; - - public ActiveSchedule(String name, Schedule schedule, long startTime) { - this.name = name; - this.schedule = schedule; - this.startTime = startTime; - this.scheduledTime = schedule.nextScheduledTimeAfter(startTime, startTime); - if (scheduledTime != -1) { - long delay = Math.max(0, scheduledTime - clock.millis()); - future = scheduler.schedule(this, delay, TimeUnit.MILLISECONDS); - } - } - - @Override - public void run() { - long triggeredTime = clock.millis(); - notifyListeners(name, triggeredTime, scheduledTime); - scheduledTime = schedule.nextScheduledTimeAfter(startTime, triggeredTime); - if (scheduledTime != -1) { - long delay = Math.max(0, scheduledTime - triggeredTime); - future = scheduler.schedule(this, delay, TimeUnit.MILLISECONDS); - } - } - - public void cancel() { - FutureUtils.cancel(future); - } - } - - static class Schedules { - - private final ActiveSchedule[] schedules; - private final Map scheduleByName; - - Schedules(Collection schedules) { - Map builder = new HashMap<>(); - this.schedules = new ActiveSchedule[schedules.size()]; - int i = 0; - for (ActiveSchedule schedule : schedules) { - builder.put(schedule.name, schedule); - this.schedules[i++] = schedule; - } - this.scheduleByName = unmodifiableMap(builder); - } - - public Schedules(ActiveSchedule[] schedules, Map scheduleByName) { - this.schedules = schedules; - this.scheduleByName = scheduleByName; - } - - public Schedules add(ActiveSchedule schedule) { - boolean replacing = scheduleByName.containsKey(schedule.name); - if (!replacing) { - ActiveSchedule[] newSchedules = new ActiveSchedule[schedules.length + 1]; - System.arraycopy(schedules, 0, newSchedules, 0, schedules.length); - newSchedules[schedules.length] = schedule; - Map newScheduleByName = new HashMap<>(scheduleByName); - newScheduleByName.put(schedule.name, schedule); - return new Schedules(newSchedules, unmodifiableMap(newScheduleByName)); - } - ActiveSchedule[] newSchedules = new ActiveSchedule[schedules.length]; - Map builder = new HashMap<>(); - for (int i = 0; i < schedules.length; i++) { - final ActiveSchedule sched; - if (schedules[i].name.equals(schedule.name)) { - sched = schedule; - schedules[i].cancel(); - } else { - sched = schedules[i]; - } - newSchedules[i] = sched; - builder.put(sched.name, sched); - } - return new Schedules(newSchedules, unmodifiableMap(builder)); - } - - public Schedules remove(String name) { - if (!scheduleByName.containsKey(name)) { - return null; - } - Map builder = new HashMap<>(); - ActiveSchedule[] newSchedules = new ActiveSchedule[schedules.length - 1]; - int i = 0; - for (ActiveSchedule schedule : schedules) { - if (!schedule.name.equals(name)) { - newSchedules[i++] = schedule; - builder.put(schedule.name, schedule); - } else { - schedule.cancel(); - } - } - return new Schedules(newSchedules, unmodifiableMap(builder)); - } - } - }