From 3f26a1b2e05afb2e404deddf4015f6b141ceafed Mon Sep 17 00:00:00 2001 From: uboness Date: Wed, 8 Apr 2015 15:31:00 +0200 Subject: [PATCH] Adds initial schedule engine implementation - `TimerScheduleTriggerEngine` - a single threaded Java `Timer`based scheduler. "Ticks" every second and checks all the registered schedules. - `SimpleTickerScheduleTriggerEngine` - a single threaded scheduler. "Ticks" every second and checks all the registered schedules - `SchedulerScheduleTriggerEngine` - a single threaded engine based on Java's schedule executor service. Here, every job is added as a scheduled task to the executor and each job is managing its own execution times. - `HashWheelScheduleTriggerEngine` - a single threaded engine based on Netty's `HashWheelTimer`. Like with the `scheduler` above, every job is added as a scheduled task to the executor and each job is managing its own execution times. Also: - Added an undocumented feature to configure the schedule engine in the settings using `watcher.trigger.schedule.engine` (optional values right now are `quartz`, `simple`, `timer`, `hashwheel` and `scheduler`) - `Cron` is a fork/copy of quartz `CronExpression`.. a bit cleaned up though. - `Schedule` now exposes `nextScheduledTimeAfter` to return the next scheduled time after the given one. - `CronnableSchedule` is now based on `Cron` (this exposed bugs in the schedule tests where we generated invalid cron expression. Now, since `Cronnable` creates the actual cron, validation is in place to make sure only valid expressions are created) - While at it... refactored how the thread pool settings are set. Removed it from the plugin class, now each module is responsible for the settings of its own TPs. Also, if the thread pools are already configured in node settings we don't configure our default ones. This will enable users to configure the TPs in `elasticsearch.yml` - Also updated `CronEvalTool` to work with `DateTime` construct (instead of java's `Date`) Original commit: elastic/x-pack-elasticsearch@40d107c66ed45d1a00748eda0ebf3043fa1ba620 --- .../elasticsearch/watcher/WatcherModule.java | 4 +- .../elasticsearch/watcher/WatcherPlugin.java | 25 +- .../execution/InternalWatchExecutor.java | 24 +- .../watcher/history/HistoryModule.java | 6 + .../support/ThreadPoolSettingsBuilder.java | 63 + .../watcher/trigger/TriggerEngine.java | 6 + .../watcher/trigger/TriggerModule.java | 7 +- .../watcher/trigger/TriggerService.java | 6 + .../watcher/trigger/schedule/Cron.java | 1519 +++++++++++++++++ .../trigger/schedule/CronSchedule.java | 14 +- .../trigger/schedule/CronnableSchedule.java | 43 +- .../trigger/schedule/IntervalSchedule.java | 13 + .../watcher/trigger/schedule/Schedule.java | 19 +- .../trigger/schedule/ScheduleModule.java | 96 +- .../trigger/schedule/ScheduleTrigger.java | 1 - .../schedule/ScheduleTriggerEngine.java | 19 +- .../HashWheelScheduleTriggerEngine.java | 259 +++ .../QuartzScheduleTriggerEngine.java | 80 +- .../SchedulerScheduleTriggerEngine.java | 250 +++ .../SimpleTickerScheduleTriggerEngine.java | 198 +++ .../TimerTickerScheduleTriggerEngine.java | 168 ++ .../{quartz => engine}/WatcherQuartzJob.java | 2 +- .../trigger/schedule/support/YearTimes.java | 3 +- .../trigger/schedule/tool/CronEvalTool.java | 30 +- .../test/AbstractWatcherIntegrationTests.java | 21 +- .../watcher/test/TimeWarpedWatcherPlugin.java | 6 +- .../test/bench/ScheduleEngineBenchmark.java | 81 + .../watcher/test/bench/WatcherBenchmark.java | 6 +- .../test/integration/BasicWatcherTests.java | 20 +- .../trigger/ScheduleTriggerEngineMock.java | 4 +- .../trigger/schedule/CronScheduleTests.java | 11 +- .../trigger/schedule/DailyScheduleTests.java | 8 +- .../trigger/schedule/HourlyScheduleTests.java | 8 +- .../schedule/MonthlyScheduleTests.java | 8 +- .../trigger/schedule/ScheduleTestCase.java | 17 +- .../trigger/schedule/WeeklyScheduleTests.java | 8 +- .../trigger/schedule/YearlyScheduleTests.java | 14 +- .../quartz/QuartzScheduleEngineTests.java | 1 + .../watcher/watch/WatchTests.java | 18 +- 39 files changed, 2941 insertions(+), 145 deletions(-) create mode 100644 src/main/java/org/elasticsearch/watcher/support/ThreadPoolSettingsBuilder.java create mode 100644 src/main/java/org/elasticsearch/watcher/trigger/schedule/Cron.java create mode 100644 src/main/java/org/elasticsearch/watcher/trigger/schedule/engine/HashWheelScheduleTriggerEngine.java rename src/main/java/org/elasticsearch/watcher/trigger/schedule/{quartz => engine}/QuartzScheduleTriggerEngine.java (74%) create mode 100644 src/main/java/org/elasticsearch/watcher/trigger/schedule/engine/SchedulerScheduleTriggerEngine.java create mode 100644 src/main/java/org/elasticsearch/watcher/trigger/schedule/engine/SimpleTickerScheduleTriggerEngine.java create mode 100644 src/main/java/org/elasticsearch/watcher/trigger/schedule/engine/TimerTickerScheduleTriggerEngine.java rename src/main/java/org/elasticsearch/watcher/trigger/schedule/{quartz => engine}/WatcherQuartzJob.java (95%) create mode 100644 src/test/java/org/elasticsearch/watcher/test/bench/ScheduleEngineBenchmark.java diff --git a/src/main/java/org/elasticsearch/watcher/WatcherModule.java b/src/main/java/org/elasticsearch/watcher/WatcherModule.java index e24c0888886..81c402de03e 100644 --- a/src/main/java/org/elasticsearch/watcher/WatcherModule.java +++ b/src/main/java/org/elasticsearch/watcher/WatcherModule.java @@ -33,7 +33,7 @@ import org.elasticsearch.watcher.watch.WatchModule; public class WatcherModule extends AbstractModule implements SpawnModules { - private final Settings settings; + protected final Settings settings; public WatcherModule(Settings settings) { this.settings = settings; @@ -51,7 +51,7 @@ public class WatcherModule extends AbstractModule implements SpawnModules { new WatcherClientModule(), new TransformModule(), new WatcherRestModule(), - new TriggerModule(), + new TriggerModule(settings), new WatcherTransportModule(), new ConditionModule(), new InputModule(), diff --git a/src/main/java/org/elasticsearch/watcher/WatcherPlugin.java b/src/main/java/org/elasticsearch/watcher/WatcherPlugin.java index b7af6dafcfe..56c3f63e563 100644 --- a/src/main/java/org/elasticsearch/watcher/WatcherPlugin.java +++ b/src/main/java/org/elasticsearch/watcher/WatcherPlugin.java @@ -6,16 +6,17 @@ package org.elasticsearch.watcher; import org.elasticsearch.client.Client; -import org.elasticsearch.common.settings.ImmutableSettings; -import org.elasticsearch.watcher.actions.email.service.InternalEmailService; -import org.elasticsearch.watcher.license.LicenseService; -import org.elasticsearch.watcher.support.init.InitializingService; import org.elasticsearch.common.collect.ImmutableList; import org.elasticsearch.common.component.LifecycleComponent; import org.elasticsearch.common.inject.Module; +import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.plugins.AbstractPlugin; +import org.elasticsearch.watcher.actions.email.service.InternalEmailService; +import org.elasticsearch.watcher.history.HistoryModule; +import org.elasticsearch.watcher.license.LicenseService; +import org.elasticsearch.watcher.support.init.InitializingService; +import org.elasticsearch.watcher.trigger.schedule.ScheduleModule; import java.util.Collection; @@ -24,7 +25,6 @@ import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilde public class WatcherPlugin extends AbstractPlugin { public static final String NAME = "watcher"; - public static final String SCHEDULER_THREAD_POOL_NAME = "watcher_scheduler"; private final Settings settings; private final boolean transportClient; @@ -68,15 +68,12 @@ public class WatcherPlugin extends AbstractPlugin { if (transportClient) { return ImmutableSettings.EMPTY; } - int availableProcessors = EsExecutors.boundedNumberOfProcessors(settings); - return settingsBuilder() - .put("threadpool." + SCHEDULER_THREAD_POOL_NAME + ".type", "fixed") - .put("threadpool." + SCHEDULER_THREAD_POOL_NAME + ".size", availableProcessors * 2) - .put("threadpool." + SCHEDULER_THREAD_POOL_NAME + ".queue_size", 1000) - .put("threadpool." + NAME + ".type", "fixed") - .put("threadpool." + NAME + ".size", availableProcessors * 5) - .put("threadpool." + NAME + ".queue_size", 1000) + Settings additionalSettings = settingsBuilder() + .put(ScheduleModule.additionalSettings(settings)) + .put(HistoryModule.additionalSettings(settings)) .build(); + + return additionalSettings; } } diff --git a/src/main/java/org/elasticsearch/watcher/execution/InternalWatchExecutor.java b/src/main/java/org/elasticsearch/watcher/execution/InternalWatchExecutor.java index 93729be32c5..7b017f17f38 100644 --- a/src/main/java/org/elasticsearch/watcher/execution/InternalWatchExecutor.java +++ b/src/main/java/org/elasticsearch/watcher/execution/InternalWatchExecutor.java @@ -5,10 +5,14 @@ */ package org.elasticsearch.watcher.execution; -import org.elasticsearch.watcher.WatcherPlugin; import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.settings.ImmutableSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.watcher.WatcherPlugin; +import org.elasticsearch.watcher.support.ThreadPoolSettingsBuilder; import java.util.concurrent.BlockingQueue; @@ -17,6 +21,22 @@ import java.util.concurrent.BlockingQueue; */ public class InternalWatchExecutor implements WatchExecutor { + public static final String THREAD_POOL_NAME = WatcherPlugin.NAME; + + public static Settings additionalSettings(Settings nodeSettings) { + Settings settings = nodeSettings.getAsSettings("threadpool." + THREAD_POOL_NAME); + if (!settings.names().isEmpty()) { + // the TP is already configured in the node settings + // no need for additional settings + return ImmutableSettings.EMPTY; + } + int availableProcessors = EsExecutors.boundedNumberOfProcessors(nodeSettings); + return new ThreadPoolSettingsBuilder.Fixed(THREAD_POOL_NAME) + .size(5 * availableProcessors) + .queueSize(1000) + .build(); + } + private final ThreadPool threadPool; @Inject @@ -40,6 +60,6 @@ public class InternalWatchExecutor implements WatchExecutor { } private EsThreadPoolExecutor executor() { - return (EsThreadPoolExecutor) threadPool.executor(WatcherPlugin.NAME); + return (EsThreadPoolExecutor) threadPool.executor(THREAD_POOL_NAME); } } \ No newline at end of file diff --git a/src/main/java/org/elasticsearch/watcher/history/HistoryModule.java b/src/main/java/org/elasticsearch/watcher/history/HistoryModule.java index bfdbe7714bb..31093c029d9 100644 --- a/src/main/java/org/elasticsearch/watcher/history/HistoryModule.java +++ b/src/main/java/org/elasticsearch/watcher/history/HistoryModule.java @@ -6,6 +6,8 @@ package org.elasticsearch.watcher.history; import org.elasticsearch.common.inject.AbstractModule; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.watcher.execution.InternalWatchExecutor; /** */ @@ -21,4 +23,8 @@ public class HistoryModule extends AbstractModule { bind(WatchRecord.Parser.class).asEagerSingleton(); bind(HistoryStore.class).asEagerSingleton(); } + + public static Settings additionalSettings(Settings nodeSettings) { + return InternalWatchExecutor.additionalSettings(nodeSettings); + } } diff --git a/src/main/java/org/elasticsearch/watcher/support/ThreadPoolSettingsBuilder.java b/src/main/java/org/elasticsearch/watcher/support/ThreadPoolSettingsBuilder.java new file mode 100644 index 00000000000..a21c5b3dd79 --- /dev/null +++ b/src/main/java/org/elasticsearch/watcher/support/ThreadPoolSettingsBuilder.java @@ -0,0 +1,63 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.watcher.support; + +import org.elasticsearch.common.settings.ImmutableSettings; +import org.elasticsearch.common.settings.Settings; + +/** + * + */ +public abstract class ThreadPoolSettingsBuilder { + + public static Same same(String name) { + return new Same(name); + } + + protected final String name; + private final ImmutableSettings.Builder builder = ImmutableSettings.builder(); + + protected ThreadPoolSettingsBuilder(String name, String type) { + this.name = name; + put("type", type); + } + + public Settings build() { + return builder.build(); + } + + protected B put(String setting, Object value) { + builder.put("threadpool." + name + "." + setting, value); + return (B) this; + } + + protected B put(String setting, int value) { + builder.put("threadpool." + name + "." + setting, value); + return (B) this; + } + + public static class Same extends ThreadPoolSettingsBuilder { + public Same(String name) { + super(name, "same"); + } + } + + public static class Fixed extends ThreadPoolSettingsBuilder { + + public Fixed(String name) { + super(name, "fixed"); + } + + public Fixed size(int size) { + return put("size", size); + } + + public Fixed queueSize(int queueSize) { + return put("queue_size", queueSize); + } + } + +} diff --git a/src/main/java/org/elasticsearch/watcher/trigger/TriggerEngine.java b/src/main/java/org/elasticsearch/watcher/trigger/TriggerEngine.java index 4fb6d9233f9..9bf4171976f 100644 --- a/src/main/java/org/elasticsearch/watcher/trigger/TriggerEngine.java +++ b/src/main/java/org/elasticsearch/watcher/trigger/TriggerEngine.java @@ -29,6 +29,12 @@ public interface TriggerEngine { void add(Job job); + /** + * Removes the job associated with the given name from this trigger engine. + * + * @param jobName The name of the job to remove + * @return {@code true} if the job existed and removed, {@code false} otherwise. + */ boolean remove(String jobName); T parseTrigger(String context, XContentParser parser) throws IOException; diff --git a/src/main/java/org/elasticsearch/watcher/trigger/TriggerModule.java b/src/main/java/org/elasticsearch/watcher/trigger/TriggerModule.java index 38d3189d1a2..5a46dfeb41b 100644 --- a/src/main/java/org/elasticsearch/watcher/trigger/TriggerModule.java +++ b/src/main/java/org/elasticsearch/watcher/trigger/TriggerModule.java @@ -10,6 +10,7 @@ import org.elasticsearch.common.inject.AbstractModule; import org.elasticsearch.common.inject.Module; import org.elasticsearch.common.inject.SpawnModules; import org.elasticsearch.common.inject.multibindings.Multibinder; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.watcher.trigger.manual.ManualTriggerEngine; import org.elasticsearch.watcher.trigger.schedule.ScheduleModule; @@ -21,9 +22,11 @@ import java.util.Set; */ public class TriggerModule extends AbstractModule implements SpawnModules { + private final Settings settings; private final Set> engines = new HashSet<>(); - public TriggerModule() { + public TriggerModule(Settings settings) { + this.settings = settings; registerStandardEngines(); } @@ -32,7 +35,7 @@ public class TriggerModule extends AbstractModule implements SpawnModules { } protected void registerStandardEngines() { - registerEngine(ScheduleModule.triggerEngineType()); + registerEngine(ScheduleModule.triggerEngineType(settings)); registerEngine(ManualTriggerEngine.class); } diff --git a/src/main/java/org/elasticsearch/watcher/trigger/TriggerService.java b/src/main/java/org/elasticsearch/watcher/trigger/TriggerService.java index 7aaa147bed1..c9476b47d99 100644 --- a/src/main/java/org/elasticsearch/watcher/trigger/TriggerService.java +++ b/src/main/java/org/elasticsearch/watcher/trigger/TriggerService.java @@ -54,6 +54,12 @@ public class TriggerService extends AbstractComponent { engines.get(job.trigger().type()).add(job); } + /** + * Removes the job associated with the given name from this trigger service. + * + * @param jobName The name of the job to remove + * @return {@code true} if the job existed and removed, {@code false} otherwise. + */ public boolean remove(String jobName) { for (TriggerEngine engine : engines.values()) { if (engine.remove(jobName)) { diff --git a/src/main/java/org/elasticsearch/watcher/trigger/schedule/Cron.java b/src/main/java/org/elasticsearch/watcher/trigger/schedule/Cron.java new file mode 100644 index 00000000000..a505eaa871a --- /dev/null +++ b/src/main/java/org/elasticsearch/watcher/trigger/schedule/Cron.java @@ -0,0 +1,1519 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.watcher.trigger.schedule; + +import org.elasticsearch.common.joda.time.DateTimeZone; +import org.elasticsearch.common.joda.time.format.DateTimeFormat; +import org.elasticsearch.common.joda.time.format.DateTimeFormatter; +import org.elasticsearch.watcher.trigger.TriggerException; + +import java.util.*; + + +/** + * + * THIS CLASS IS A COPY OF {@code CronExpression} + * FROM THE QUARTZ PROJECT + * + * + * Provides a parser and evaluator for unix-like cron expressions. Cron + * expressions provide the ability to specify complex time combinations such as + * "At 8:00am every Monday through Friday" or "At 1:30am every + * last Friday of the month". + *

+ * Cron expressions are comprised of 6 required fields and one optional field + * separated by white space. The fields respectively are described as follows: + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + *
Field Name Allowed Values Allowed Special Characters
Seconds  + * 0-59  + * , - * /
Minutes  + * 0-59  + * , - * /
Hours  + * 0-23  + * , - * /
Day-of-month  + * 1-31  + * , - * ? / L W
Month  + * 0-11 or JAN-DEC  + * , - * /
Day-of-Week  + * 1-7 or SUN-SAT  + * , - * ? / L #
Year (Optional)  + * empty, 1970-2199  + * , - * /
+ *

+ * The '*' character is used to specify all values. For example, "*" + * in the minute field means "every minute". + *

+ * The '?' character is allowed for the day-of-month and day-of-week fields. It + * is used to specify 'no specific value'. This is useful when you need to + * specify something in one of the two fields, but not the other. + *

+ * The '-' character is used to specify ranges For example "10-12" in + * the hour field means "the hours 10, 11 and 12". + *

+ * The ',' character is used to specify additional values. For example + * "MON,WED,FRI" in the day-of-week field means "the days Monday, + * Wednesday, and Friday". + *

+ * The '/' character is used to specify increments. For example "0/15" + * in the seconds field means "the seconds 0, 15, 30, and 45". And + * "5/15" in the seconds field means "the seconds 5, 20, 35, and + * 50". Specifying '*' before the '/' is equivalent to specifying 0 is + * the value to start with. Essentially, for each field in the expression, there + * is a set of numbers that can be turned on or off. For seconds and minutes, + * the numbers range from 0 to 59. For hours 0 to 23, for days of the month 0 to + * 31, and for months 0 to 11 (JAN to DEC). The "/" character simply helps you turn + * on every "nth" value in the given set. Thus "7/6" in the + * month field only turns on month "7", it does NOT mean every 6th + * month, please note that subtlety. + *

+ * The 'L' character is allowed for the day-of-month and day-of-week fields. + * This character is short-hand for "last", but it has different + * meaning in each of the two fields. For example, the value "L" in + * the day-of-month field means "the last day of the month" - day 31 + * for January, day 28 for February on non-leap years. If used in the + * day-of-week field by itself, it simply means "7" or + * "SAT". But if used in the day-of-week field after another value, it + * means "the last xxx day of the month" - for example "6L" + * means "the last friday of the month". You can also specify an offset + * from the last day of the month, such as "L-3" which would mean the third-to-last + * day of the calendar month. When using the 'L' option, it is important not to + * specify lists, or ranges of values, as you'll get confusing/unexpected results. + *

+ * The 'W' character is allowed for the day-of-month field. This character + * is used to specify the weekday (Monday-Friday) nearest the given day. As an + * example, if you were to specify "15W" as the value for the + * day-of-month field, the meaning is: "the nearest weekday to the 15th of + * the month". So if the 15th is a Saturday, the trigger will fire on + * Friday the 14th. If the 15th is a Sunday, the trigger will fire on Monday the + * 16th. If the 15th is a Tuesday, then it will fire on Tuesday the 15th. + * However if you specify "1W" as the value for day-of-month, and the + * 1st is a Saturday, the trigger will fire on Monday the 3rd, as it will not + * 'jump' over the boundary of a month's days. The 'W' character can only be + * specified when the day-of-month is a single day, not a range or list of days. + *

+ * The 'L' and 'W' characters can also be combined for the day-of-month + * expression to yield 'LW', which translates to "last weekday of the + * month". + *

+ * The '#' character is allowed for the day-of-week field. This character is + * used to specify "the nth" XXX day of the month. For example, the + * value of "6#3" in the day-of-week field means the third Friday of + * the month (day 6 = Friday and "#3" = the 3rd one in the month). + * Other examples: "2#1" = the first Monday of the month and + * "4#5" = the fifth Wednesday of the month. Note that if you specify + * "#5" and there is not 5 of the given day-of-week in the month, then + * no firing will occur that month. If the '#' character is used, there can + * only be one expression in the day-of-week field ("3#1,6#3" is + * not valid, since there are two expressions). + *

+ * + *

+ * The legal characters and the names of months and days of the week are not + * case sensitive. + * + *

+ * NOTES: + *

    + *
  • Support for specifying both a day-of-week and a day-of-month value is + * not complete (you'll need to use the '?' character in one of these fields). + *
  • + *
  • Overflowing ranges is supported - that is, having a larger number on + * the left hand side than the right. You might do 22-2 to catch 10 o'clock + * at night until 2 o'clock in the morning, or you might have NOV-FEB. It is + * very important to note that overuse of overflowing ranges creates ranges + * that don't make sense and no effort has been made to determine which + * interpretation CronExpression chooses. An example would be + * "0 0 14-6 ? * FRI-MON".
  • + *
+ *

+ * + * + * @author Sharada Jambula, James House + * @author Contributions from Mads Henderson + * @author Refactoring from CronTrigger to CronExpression by Aaron Craven + */ +public class Cron { + + private static final long serialVersionUID = 12423409423L; + + protected static final TimeZone UTC = DateTimeZone.UTC.toTimeZone(); + protected static final DateTimeFormatter formatter = DateTimeFormat.forPattern("YYYY-MM-dd'T'HH:mm:ss"); + + private static final int SECOND = 0; + private static final int MINUTE = 1; + private static final int HOUR = 2; + private static final int DAY_OF_MONTH = 3; + private static final int MONTH = 4; + private static final int DAY_OF_WEEK = 5; + private static final int YEAR = 6; + private static final int ALL_SPEC_INT = 99; // '*' + private static final int NO_SPEC_INT = 98; // '?' + private static final Integer ALL_SPEC = ALL_SPEC_INT; + private static final Integer NO_SPEC = NO_SPEC_INT; + + private static final Map monthMap = new HashMap<>(20); + private static final Map dayMap = new HashMap<>(60); + static { + monthMap.put("JAN", 0); + monthMap.put("FEB", 1); + monthMap.put("MAR", 2); + monthMap.put("APR", 3); + monthMap.put("MAY", 4); + monthMap.put("JUN", 5); + monthMap.put("JUL", 6); + monthMap.put("AUG", 7); + monthMap.put("SEP", 8); + monthMap.put("OCT", 9); + monthMap.put("NOV", 10); + monthMap.put("DEC", 11); + + dayMap.put("SUN", 1); + dayMap.put("MON", 2); + dayMap.put("TUE", 3); + dayMap.put("WED", 4); + dayMap.put("THU", 5); + dayMap.put("FRI", 6); + dayMap.put("SAT", 7); + } + + private final String expression; + + private transient TreeSet seconds; + private transient TreeSet minutes; + private transient TreeSet hours; + private transient TreeSet daysOfMonth; + private transient TreeSet months; + private transient TreeSet daysOfWeek; + private transient TreeSet years; + + private transient boolean lastdayOfWeek = false; + private transient int nthdayOfWeek = 0; + private transient boolean lastdayOfMonth = false; + private transient boolean nearestWeekday = false; + private transient int lastdayOffset = 0; + private transient boolean expressionParsed = false; + + public static final int MAX_YEAR = Calendar.getInstance(UTC, Locale.ROOT).get(Calendar.YEAR) + 100; + + /** + * Constructs a new CronExpression based on the specified + * parameter. + * + * @param expression String representation of the cron expression the + * new object should represent + * @throws java.text.ParseException + * if the string expression cannot be parsed into a valid + * CronExpression + */ + public Cron(String expression) { + assert expression != null : "cron expression cannot be null"; + this.expression = expression.toUpperCase(Locale.ROOT); + buildExpression(this.expression); + } + + /** + * Constructs a new {@code CronExpression} as a copy of an existing + * instance. + * + * @param cron The existing cron expression to be copied + */ + public Cron(Cron cron) { + this(cron.expression); + } + + /** + * Returns the next date/time after the given date/time which + * satisfies the cron expression. + * + * @param time the time since the epoch, or -1 if next time is unsupported (e.g. the cron expression points to + * a time that is previous to the given time) + * @return the next valid time (since the epoch) + */ + public long getNextValidTimeAfter(final long time) { + + // Computation is based on Gregorian year only. + Calendar cl = new java.util.GregorianCalendar(UTC, Locale.ROOT); + + // move ahead one second, since we're computing the time *after* the + // given time + final long afterTime = time + 1000; + // CronTrigger does not deal with milliseconds + cl.setTimeInMillis(afterTime); + cl.set(Calendar.MILLISECOND, 0); + + boolean gotOne = false; + // loop until we've computed the next time, or we've past the endTime + while (!gotOne) { + + if(cl.get(Calendar.YEAR) > 2999) { // prevent endless loop... + return -1; + } + + SortedSet st = null; + int t = 0; + + int sec = cl.get(Calendar.SECOND); + int min = cl.get(Calendar.MINUTE); + + // get second................................................. + st = seconds.tailSet(sec); + if (st != null && st.size() != 0) { + sec = st.first(); + } else { + sec = seconds.first(); + min++; + cl.set(Calendar.MINUTE, min); + } + cl.set(Calendar.SECOND, sec); + + min = cl.get(Calendar.MINUTE); + int hr = cl.get(Calendar.HOUR_OF_DAY); + t = -1; + + // get minute................................................. + st = minutes.tailSet(min); + if (st != null && st.size() != 0) { + t = min; + min = st.first(); + } else { + min = minutes.first(); + hr++; + } + if (min != t) { + cl.set(Calendar.SECOND, 0); + cl.set(Calendar.MINUTE, min); + setCalendarHour(cl, hr); + continue; + } + cl.set(Calendar.MINUTE, min); + + hr = cl.get(Calendar.HOUR_OF_DAY); + int day = cl.get(Calendar.DAY_OF_MONTH); + t = -1; + + // get hour................................................... + st = hours.tailSet(hr); + if (st != null && st.size() != 0) { + t = hr; + hr = st.first(); + } else { + hr = hours.first(); + day++; + } + if (hr != t) { + cl.set(Calendar.SECOND, 0); + cl.set(Calendar.MINUTE, 0); + cl.set(Calendar.DAY_OF_MONTH, day); + setCalendarHour(cl, hr); + continue; + } + cl.set(Calendar.HOUR_OF_DAY, hr); + + day = cl.get(Calendar.DAY_OF_MONTH); + int mon = cl.get(Calendar.MONTH) + 1; + // '+ 1' because calendar is 0-based for this field, and we are + // 1-based + t = -1; + int tmon = mon; + + // get day................................................... + boolean dayOfMSpec = !daysOfMonth.contains(NO_SPEC); + boolean dayOfWSpec = !daysOfWeek.contains(NO_SPEC); + if (dayOfMSpec && !dayOfWSpec) { // get day by day of month rule + st = daysOfMonth.tailSet(day); + if (lastdayOfMonth) { + if(!nearestWeekday) { + t = day; + day = getLastDayOfMonth(mon, cl.get(Calendar.YEAR)); + day -= lastdayOffset; + if(t > day) { + mon++; + if(mon > 12) { + mon = 1; + tmon = 3333; // ensure test of mon != tmon further below fails + cl.add(Calendar.YEAR, 1); + } + day = 1; + } + } else { + t = day; + day = getLastDayOfMonth(mon, cl.get(Calendar.YEAR)); + day -= lastdayOffset; + + Calendar tcal = Calendar.getInstance(UTC, Locale.ROOT); + tcal.set(Calendar.SECOND, 0); + tcal.set(Calendar.MINUTE, 0); + tcal.set(Calendar.HOUR_OF_DAY, 0); + tcal.set(Calendar.DAY_OF_MONTH, day); + tcal.set(Calendar.MONTH, mon - 1); + tcal.set(Calendar.YEAR, cl.get(Calendar.YEAR)); + + int ldom = getLastDayOfMonth(mon, cl.get(Calendar.YEAR)); + int dow = tcal.get(Calendar.DAY_OF_WEEK); + + if(dow == Calendar.SATURDAY && day == 1) { + day += 2; + } else if(dow == Calendar.SATURDAY) { + day -= 1; + } else if(dow == Calendar.SUNDAY && day == ldom) { + day -= 2; + } else if(dow == Calendar.SUNDAY) { + day += 1; + } + + tcal.set(Calendar.SECOND, sec); + tcal.set(Calendar.MINUTE, min); + tcal.set(Calendar.HOUR_OF_DAY, hr); + tcal.set(Calendar.DAY_OF_MONTH, day); + tcal.set(Calendar.MONTH, mon - 1); + long nTime = tcal.getTimeInMillis(); + if(nTime < afterTime) { + day = 1; + mon++; + } + } + } else if(nearestWeekday) { + t = day; + day = daysOfMonth.first(); + + Calendar tcal = Calendar.getInstance(UTC, Locale.ROOT); + tcal.set(Calendar.SECOND, 0); + tcal.set(Calendar.MINUTE, 0); + tcal.set(Calendar.HOUR_OF_DAY, 0); + tcal.set(Calendar.DAY_OF_MONTH, day); + tcal.set(Calendar.MONTH, mon - 1); + tcal.set(Calendar.YEAR, cl.get(Calendar.YEAR)); + + int ldom = getLastDayOfMonth(mon, cl.get(Calendar.YEAR)); + int dow = tcal.get(Calendar.DAY_OF_WEEK); + + if(dow == Calendar.SATURDAY && day == 1) { + day += 2; + } else if(dow == Calendar.SATURDAY) { + day -= 1; + } else if(dow == Calendar.SUNDAY && day == ldom) { + day -= 2; + } else if(dow == Calendar.SUNDAY) { + day += 1; + } + + + tcal.set(Calendar.SECOND, sec); + tcal.set(Calendar.MINUTE, min); + tcal.set(Calendar.HOUR_OF_DAY, hr); + tcal.set(Calendar.DAY_OF_MONTH, day); + tcal.set(Calendar.MONTH, mon - 1); + long nTime = tcal.getTimeInMillis(); + if(nTime < afterTime) { + day = daysOfMonth.first(); + mon++; + } + } else if (st != null && st.size() != 0) { + t = day; + day = st.first(); + // make sure we don't over-run a short month, such as february + int lastDay = getLastDayOfMonth(mon, cl.get(Calendar.YEAR)); + if (day > lastDay) { + day = daysOfMonth.first(); + mon++; + } + } else { + day = daysOfMonth.first(); + mon++; + } + + if (day != t || mon != tmon) { + cl.set(Calendar.SECOND, 0); + cl.set(Calendar.MINUTE, 0); + cl.set(Calendar.HOUR_OF_DAY, 0); + cl.set(Calendar.DAY_OF_MONTH, day); + cl.set(Calendar.MONTH, mon - 1); + // '- 1' because calendar is 0-based for this field, and we + // are 1-based + continue; + } + } else if (dayOfWSpec && !dayOfMSpec) { // get day by day of week rule + if (lastdayOfWeek) { // are we looking for the last XXX day of + // the month? + int dow = daysOfWeek.first(); // desired + // d-o-w + int cDow = cl.get(Calendar.DAY_OF_WEEK); // current d-o-w + int daysToAdd = 0; + if (cDow < dow) { + daysToAdd = dow - cDow; + } + if (cDow > dow) { + daysToAdd = dow + (7 - cDow); + } + + int lDay = getLastDayOfMonth(mon, cl.get(Calendar.YEAR)); + + if (day + daysToAdd > lDay) { // did we already miss the + // last one? + cl.set(Calendar.SECOND, 0); + cl.set(Calendar.MINUTE, 0); + cl.set(Calendar.HOUR_OF_DAY, 0); + cl.set(Calendar.DAY_OF_MONTH, 1); + cl.set(Calendar.MONTH, mon); + // no '- 1' here because we are promoting the month + continue; + } + + // find date of last occurrence of this day in this month... + while ((day + daysToAdd + 7) <= lDay) { + daysToAdd += 7; + } + + day += daysToAdd; + + if (daysToAdd > 0) { + cl.set(Calendar.SECOND, 0); + cl.set(Calendar.MINUTE, 0); + cl.set(Calendar.HOUR_OF_DAY, 0); + cl.set(Calendar.DAY_OF_MONTH, day); + cl.set(Calendar.MONTH, mon - 1); + // '- 1' here because we are not promoting the month + continue; + } + + } else if (nthdayOfWeek != 0) { + // are we looking for the Nth XXX day in the month? + int dow = daysOfWeek.first(); // desired + // d-o-w + int cDow = cl.get(Calendar.DAY_OF_WEEK); // current d-o-w + int daysToAdd = 0; + if (cDow < dow) { + daysToAdd = dow - cDow; + } else if (cDow > dow) { + daysToAdd = dow + (7 - cDow); + } + + boolean dayShifted = false; + if (daysToAdd > 0) { + dayShifted = true; + } + + day += daysToAdd; + int weekOfMonth = day / 7; + if (day % 7 > 0) { + weekOfMonth++; + } + + daysToAdd = (nthdayOfWeek - weekOfMonth) * 7; + day += daysToAdd; + if (daysToAdd < 0 + || day > getLastDayOfMonth(mon, cl + .get(Calendar.YEAR))) { + cl.set(Calendar.SECOND, 0); + cl.set(Calendar.MINUTE, 0); + cl.set(Calendar.HOUR_OF_DAY, 0); + cl.set(Calendar.DAY_OF_MONTH, 1); + cl.set(Calendar.MONTH, mon); + // no '- 1' here because we are promoting the month + continue; + } else if (daysToAdd > 0 || dayShifted) { + cl.set(Calendar.SECOND, 0); + cl.set(Calendar.MINUTE, 0); + cl.set(Calendar.HOUR_OF_DAY, 0); + cl.set(Calendar.DAY_OF_MONTH, day); + cl.set(Calendar.MONTH, mon - 1); + // '- 1' here because we are NOT promoting the month + continue; + } + } else { + int cDow = cl.get(Calendar.DAY_OF_WEEK); // current d-o-w + int dow = daysOfWeek.first(); // desired + // d-o-w + st = daysOfWeek.tailSet(cDow); + if (st != null && st.size() > 0) { + dow = st.first(); + } + + int daysToAdd = 0; + if (cDow < dow) { + daysToAdd = dow - cDow; + } + if (cDow > dow) { + daysToAdd = dow + (7 - cDow); + } + + int lDay = getLastDayOfMonth(mon, cl.get(Calendar.YEAR)); + + if (day + daysToAdd > lDay) { // will we pass the end of + // the month? + cl.set(Calendar.SECOND, 0); + cl.set(Calendar.MINUTE, 0); + cl.set(Calendar.HOUR_OF_DAY, 0); + cl.set(Calendar.DAY_OF_MONTH, 1); + cl.set(Calendar.MONTH, mon); + // no '- 1' here because we are promoting the month + continue; + } else if (daysToAdd > 0) { // are we swithing days? + cl.set(Calendar.SECOND, 0); + cl.set(Calendar.MINUTE, 0); + cl.set(Calendar.HOUR_OF_DAY, 0); + cl.set(Calendar.DAY_OF_MONTH, day + daysToAdd); + cl.set(Calendar.MONTH, mon - 1); + // '- 1' because calendar is 0-based for this field, + // and we are 1-based + continue; + } + } + } else { // dayOfWSpec && !dayOfMSpec + return -1; +// throw new UnsupportedOperationException( +// "Support for specifying both a day-of-week AND a day-of-month parameter is not implemented."); + } + cl.set(Calendar.DAY_OF_MONTH, day); + + mon = cl.get(Calendar.MONTH) + 1; + // '+ 1' because calendar is 0-based for this field, and we are + // 1-based + int year = cl.get(Calendar.YEAR); + t = -1; + + // test for expressions that never generate a valid fire date, + // but keep looping... + if (year > MAX_YEAR) { + return -1; +// throw new ElasticsearchIllegalArgumentException("given time is not supported by cron [" + formatter.print(time) + "]"); + } + + // get month................................................... + st = months.tailSet(mon); + if (st != null && st.size() != 0) { + t = mon; + mon = st.first(); + } else { + mon = months.first(); + year++; + } + if (mon != t) { + cl.set(Calendar.SECOND, 0); + cl.set(Calendar.MINUTE, 0); + cl.set(Calendar.HOUR_OF_DAY, 0); + cl.set(Calendar.DAY_OF_MONTH, 1); + cl.set(Calendar.MONTH, mon - 1); + // '- 1' because calendar is 0-based for this field, and we are + // 1-based + cl.set(Calendar.YEAR, year); + continue; + } + cl.set(Calendar.MONTH, mon - 1); + // '- 1' because calendar is 0-based for this field, and we are + // 1-based + + year = cl.get(Calendar.YEAR); + t = -1; + + // get year................................................... + st = years.tailSet(year); + if (st != null && st.size() != 0) { + t = year; + year = st.first(); + } else { + return -1; +// throw new ElasticsearchIllegalArgumentException("given time is not supported by cron [" + formatter.print(time) + "]"); + } + + if (year != t) { + cl.set(Calendar.SECOND, 0); + cl.set(Calendar.MINUTE, 0); + cl.set(Calendar.HOUR_OF_DAY, 0); + cl.set(Calendar.DAY_OF_MONTH, 1); + cl.set(Calendar.MONTH, 0); + // '- 1' because calendar is 0-based for this field, and we are + // 1-based + cl.set(Calendar.YEAR, year); + continue; + } + cl.set(Calendar.YEAR, year); + + gotOne = true; + } // while( !done ) + + return cl.getTimeInMillis(); + } + + public String expression() { + return expression; + } + + public String getExpressionSummary() { + StringBuilder buf = new StringBuilder(); + + buf.append("seconds: "); + buf.append(expressionSetSummary(seconds)); + buf.append("\n"); + buf.append("minutes: "); + buf.append(expressionSetSummary(minutes)); + buf.append("\n"); + buf.append("hours: "); + buf.append(expressionSetSummary(hours)); + buf.append("\n"); + buf.append("daysOfMonth: "); + buf.append(expressionSetSummary(daysOfMonth)); + buf.append("\n"); + buf.append("months: "); + buf.append(expressionSetSummary(months)); + buf.append("\n"); + buf.append("daysOfWeek: "); + buf.append(expressionSetSummary(daysOfWeek)); + buf.append("\n"); + buf.append("lastdayOfWeek: "); + buf.append(lastdayOfWeek); + buf.append("\n"); + buf.append("nearestWeekday: "); + buf.append(nearestWeekday); + buf.append("\n"); + buf.append("NthDayOfWeek: "); + buf.append(nthdayOfWeek); + buf.append("\n"); + buf.append("lastdayOfMonth: "); + buf.append(lastdayOfMonth); + buf.append("\n"); + buf.append("years: "); + buf.append(expressionSetSummary(years)); + buf.append("\n"); + + return buf.toString(); + } + + @Override + public int hashCode() { + return Objects.hash(expression); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null || getClass() != obj.getClass()) { + return false; + } + final Cron other = (Cron) obj; + return Objects.equals(this.expression, other.expression); + } + + /** + * Returns the string representation of the CronExpression + * + * @return a string representation of the CronExpression + */ + @Override + public String toString() { + return expression; + } + + /** + * Indicates whether the specified cron expression can be parsed into a + * valid cron expression + * + * @param expression the expression to evaluate + * @return a boolean indicating whether the given expression is a valid cron + * expression + */ + public static boolean isValid(String expression) { + try { + validate(expression); + } catch (ParseException pe) { + return false; + } + return true; + } + + public static void validate(String expression) throws ParseException { + new Cron(expression); + } + + + //////////////////////////////////////////////////////////////////////////// + // + // Expression Parsing Functions + // + //////////////////////////////////////////////////////////////////////////// + + private void buildExpression(String expression) { + expressionParsed = true; + + try { + + if (seconds == null) { + seconds = new TreeSet(); + } + if (minutes == null) { + minutes = new TreeSet(); + } + if (hours == null) { + hours = new TreeSet(); + } + if (daysOfMonth == null) { + daysOfMonth = new TreeSet(); + } + if (months == null) { + months = new TreeSet(); + } + if (daysOfWeek == null) { + daysOfWeek = new TreeSet(); + } + if (years == null) { + years = new TreeSet(); + } + + int exprOn = SECOND; + + StringTokenizer exprsTok = new StringTokenizer(expression, " \t", + false); + + while (exprsTok.hasMoreTokens() && exprOn <= YEAR) { + String expr = exprsTok.nextToken().trim(); + + // throw an exception if L is used with other days of the month + if(exprOn == DAY_OF_MONTH && expr.indexOf('L') != -1 && expr.length() > 1 && expr.contains(",")) { + throw new ParseException("Support for specifying 'L' and 'LW' with other days of the month is not implemented", -1); + } + // throw an exception if L is used with other days of the week + if(exprOn == DAY_OF_WEEK && expr.indexOf('L') != -1 && expr.length() > 1 && expr.contains(",")) { + throw new ParseException("Support for specifying 'L' with other days of the week is not implemented", -1); + } + if(exprOn == DAY_OF_WEEK && expr.indexOf('#') != -1 && expr.indexOf('#', expr.indexOf('#') +1) != -1) { + throw new ParseException("Support for specifying multiple \"nth\" days is not implemented.", -1); + } + + StringTokenizer vTok = new StringTokenizer(expr, ","); + while (vTok.hasMoreTokens()) { + String v = vTok.nextToken(); + storeExpressionVals(0, v, exprOn); + } + + exprOn++; + } + + if (exprOn <= DAY_OF_WEEK) { + throw new ParseException("Unexpected end of expression.", expression.length()); + } + + if (exprOn <= YEAR) { + storeExpressionVals(0, "*", YEAR); + } + + TreeSet dow = getSet(DAY_OF_WEEK); + TreeSet dom = getSet(DAY_OF_MONTH); + + // Copying the logic from the UnsupportedOperationException below + boolean dayOfMSpec = !dom.contains(NO_SPEC); + boolean dayOfWSpec = !dow.contains(NO_SPEC); + + if (!dayOfMSpec || dayOfWSpec) { + if (!dayOfWSpec || dayOfMSpec) { + throw new ParseException( + "Support for specifying both a day-of-week AND a day-of-month parameter is not implemented.", 0); + } + } + } catch (Exception e) { + throw new ParseException("Illegal cron expression format (" + e.toString() + ")", 0); + } + } + + private int storeExpressionVals(int pos, String s, int type) throws ParseException { + + int incr = 0; + int i = skipWhiteSpace(pos, s); + if (i >= s.length()) { + return i; + } + char c = s.charAt(i); + if ((c >= 'A') && (c <= 'Z') && (!s.equals("L")) && (!s.equals("LW")) && (!s.matches("^L-[0-9]*[W]?"))) { + String sub = s.substring(i, i + 3); + int sval = -1; + int eval = -1; + if (type == MONTH) { + sval = getMonthNumber(sub) + 1; + if (sval <= 0) { + throw new ParseException("Invalid Month value: '" + sub + "'", i); + } + if (s.length() > i + 3) { + c = s.charAt(i + 3); + if (c == '-') { + i += 4; + sub = s.substring(i, i + 3); + eval = getMonthNumber(sub) + 1; + if (eval <= 0) { + throw new ParseException("Invalid Month value: '" + sub + "'", i); + } + } + } + } else if (type == DAY_OF_WEEK) { + sval = getDayOfWeekNumber(sub); + if (sval < 0) { + throw new ParseException("Invalid Day-of-Week value: '" + + sub + "'", i); + } + if (s.length() > i + 3) { + c = s.charAt(i + 3); + if (c == '-') { + i += 4; + sub = s.substring(i, i + 3); + eval = getDayOfWeekNumber(sub); + if (eval < 0) { + throw new ParseException( + "Invalid Day-of-Week value: '" + sub + + "'", i); + } + } else if (c == '#') { + try { + i += 4; + nthdayOfWeek = Integer.parseInt(s.substring(i)); + if (nthdayOfWeek < 1 || nthdayOfWeek > 5) { + throw new Exception(); + } + } catch (Exception e) { + throw new ParseException( + "A numeric value between 1 and 5 must follow the '#' option", + i); + } + } else if (c == 'L') { + lastdayOfWeek = true; + i++; + } + } + + } else { + throw new ParseException( + "Illegal characters for this position: '" + sub + "'", + i); + } + if (eval != -1) { + incr = 1; + } + addToSet(sval, eval, incr, type); + return (i + 3); + } + + if (c == '?') { + i++; + if ((i + 1) < s.length() + && (s.charAt(i) != ' ' && s.charAt(i + 1) != '\t')) { + throw new ParseException("Illegal character after '?': " + + s.charAt(i), i); + } + if (type != DAY_OF_WEEK && type != DAY_OF_MONTH) { + throw new ParseException( + "'?' can only be specfied for Day-of-Month or Day-of-Week.", + i); + } + if (type == DAY_OF_WEEK && !lastdayOfMonth) { + int val = daysOfMonth.last(); + if (val == NO_SPEC_INT) { + throw new ParseException( + "'?' can only be specfied for Day-of-Month -OR- Day-of-Week.", + i); + } + } + + addToSet(NO_SPEC_INT, -1, 0, type); + return i; + } + + if (c == '*' || c == '/') { + if (c == '*' && (i + 1) >= s.length()) { + addToSet(ALL_SPEC_INT, -1, incr, type); + return i + 1; + } else if (c == '/' + && ((i + 1) >= s.length() || s.charAt(i + 1) == ' ' || s + .charAt(i + 1) == '\t')) { + throw new ParseException("'/' must be followed by an integer.", i); + } else if (c == '*') { + i++; + } + c = s.charAt(i); + if (c == '/') { // is an increment specified? + i++; + if (i >= s.length()) { + throw new ParseException("Unexpected end of string.", i); + } + + incr = getNumericValue(s, i); + + i++; + if (incr > 10) { + i++; + } + if (incr > 59 && (type == SECOND || type == MINUTE)) { + throw new ParseException("Increment > 60 : " + incr, i); + } else if (incr > 23 && (type == HOUR)) { + throw new ParseException("Increment > 24 : " + incr, i); + } else if (incr > 31 && (type == DAY_OF_MONTH)) { + throw new ParseException("Increment > 31 : " + incr, i); + } else if (incr > 7 && (type == DAY_OF_WEEK)) { + throw new ParseException("Increment > 7 : " + incr, i); + } else if (incr > 12 && (type == MONTH)) { + throw new ParseException("Increment > 12 : " + incr, i); + } + } else { + incr = 1; + } + + addToSet(ALL_SPEC_INT, -1, incr, type); + return i; + } else if (c == 'L') { + i++; + if (type == DAY_OF_MONTH) { + lastdayOfMonth = true; + } + if (type == DAY_OF_WEEK) { + addToSet(7, 7, 0, type); + } + if(type == DAY_OF_MONTH && s.length() > i) { + c = s.charAt(i); + if(c == '-') { + ValueSet vs = getValue(0, s, i+1); + lastdayOffset = vs.value; + if(lastdayOffset > 30) + throw new ParseException("Offset from last day must be <= 30", i+1); + i = vs.pos; + } + if(s.length() > i) { + c = s.charAt(i); + if(c == 'W') { + nearestWeekday = true; + i++; + } + } + } + return i; + } else if (c >= '0' && c <= '9') { + int val = Integer.parseInt(String.valueOf(c)); + i++; + if (i >= s.length()) { + addToSet(val, -1, -1, type); + } else { + c = s.charAt(i); + if (c >= '0' && c <= '9') { + ValueSet vs = getValue(val, s, i); + val = vs.value; + i = vs.pos; + } + i = checkNext(i, s, val, type); + return i; + } + } else { + throw new ParseException("Unexpected character: " + c, i); + } + + return i; + } + + private int checkNext(int pos, String s, int val, int type) throws ParseException { + + int end = -1; + int i = pos; + + if (i >= s.length()) { + addToSet(val, end, -1, type); + return i; + } + + char c = s.charAt(pos); + + if (c == 'L') { + if (type == DAY_OF_WEEK) { + if(val < 1 || val > 7) + throw new ParseException("Day-of-Week values must be between 1 and 7", -1); + lastdayOfWeek = true; + } else { + throw new ParseException("'L' option is not valid here. (pos=" + i + ")", i); + } + TreeSet set = getSet(type); + set.add(val); + i++; + return i; + } + + if (c == 'W') { + if (type == DAY_OF_MONTH) { + nearestWeekday = true; + } else { + throw new ParseException("'W' option is not valid here. (pos=" + i + ")", i); + } + if(val > 31) + throw new ParseException("The 'W' option does not make sense with values larger than 31 (max number of days in a month)", i); + TreeSet set = getSet(type); + set.add(val); + i++; + return i; + } + + if (c == '#') { + if (type != DAY_OF_WEEK) { + throw new ParseException("'#' option is not valid here. (pos=" + i + ")", i); + } + i++; + try { + nthdayOfWeek = Integer.parseInt(s.substring(i)); + if (nthdayOfWeek < 1 || nthdayOfWeek > 5) { + throw new Exception(); + } + } catch (Exception e) { + throw new ParseException( + "A numeric value between 1 and 5 must follow the '#' option", + i); + } + + TreeSet set = getSet(type); + set.add(val); + i++; + return i; + } + + if (c == '-') { + i++; + c = s.charAt(i); + int v = Integer.parseInt(String.valueOf(c)); + end = v; + i++; + if (i >= s.length()) { + addToSet(val, end, 1, type); + return i; + } + c = s.charAt(i); + if (c >= '0' && c <= '9') { + ValueSet vs = getValue(v, s, i); + end = vs.value; + i = vs.pos; + } + if (i < s.length() && ((c = s.charAt(i)) == '/')) { + i++; + c = s.charAt(i); + int v2 = Integer.parseInt(String.valueOf(c)); + i++; + if (i >= s.length()) { + addToSet(val, end, v2, type); + return i; + } + c = s.charAt(i); + if (c >= '0' && c <= '9') { + ValueSet vs = getValue(v2, s, i); + int v3 = vs.value; + addToSet(val, end, v3, type); + i = vs.pos; + return i; + } else { + addToSet(val, end, v2, type); + return i; + } + } else { + addToSet(val, end, 1, type); + return i; + } + } + + if (c == '/') { + i++; + c = s.charAt(i); + int v2 = Integer.parseInt(String.valueOf(c)); + i++; + if (i >= s.length()) { + addToSet(val, end, v2, type); + return i; + } + c = s.charAt(i); + if (c >= '0' && c <= '9') { + ValueSet vs = getValue(v2, s, i); + int v3 = vs.value; + addToSet(val, end, v3, type); + i = vs.pos; + return i; + } else { + throw new ParseException("Unexpected character '" + c + "' after '/'", i); + } + } + + addToSet(val, end, 0, type); + i++; + return i; + } + + private static String expressionSetSummary(java.util.Set set) { + + if (set.contains(NO_SPEC)) { + return "?"; + } + if (set.contains(ALL_SPEC)) { + return "*"; + } + + StringBuilder buf = new StringBuilder(); + + Iterator itr = set.iterator(); + boolean first = true; + while (itr.hasNext()) { + Integer iVal = itr.next(); + String val = iVal.toString(); + if (!first) { + buf.append(","); + } + buf.append(val); + first = false; + } + + return buf.toString(); + } + + private static String expressionSetSummary(java.util.ArrayList list) { + + if (list.contains(NO_SPEC)) { + return "?"; + } + if (list.contains(ALL_SPEC)) { + return "*"; + } + + StringBuilder buf = new StringBuilder(); + + Iterator itr = list.iterator(); + boolean first = true; + while (itr.hasNext()) { + Integer iVal = itr.next(); + String val = iVal.toString(); + if (!first) { + buf.append(","); + } + buf.append(val); + first = false; + } + + return buf.toString(); + } + + private static int skipWhiteSpace(int i, String s) { + for (; i < s.length() && (s.charAt(i) == ' ' || s.charAt(i) == '\t'); i++) { + ; + } + + return i; + } + + private static int findNextWhiteSpace(int i, String s) { + for (; i < s.length() && (s.charAt(i) != ' ' || s.charAt(i) != '\t'); i++) { + ; + } + + return i; + } + + private void addToSet(int val, int end, int incr, int type) throws ParseException { + + TreeSet set = getSet(type); + + if (type == SECOND || type == MINUTE) { + if ((val < 0 || val > 59 || end > 59) && (val != ALL_SPEC_INT)) { + throw new ParseException("Minute and Second values must be between 0 and 59", -1); + } + } else if (type == HOUR) { + if ((val < 0 || val > 23 || end > 23) && (val != ALL_SPEC_INT)) { + throw new ParseException("Hour values must be between 0 and 23", -1); + } + } else if (type == DAY_OF_MONTH) { + if ((val < 1 || val > 31 || end > 31) && (val != ALL_SPEC_INT) + && (val != NO_SPEC_INT)) { + throw new ParseException("Day of month values must be between 1 and 31", -1); + } + } else if (type == MONTH) { + if ((val < 1 || val > 12 || end > 12) && (val != ALL_SPEC_INT)) { + throw new ParseException("Month values must be between 1 and 12", -1); + } + } else if (type == DAY_OF_WEEK) { + if ((val == 0 || val > 7 || end > 7) && (val != ALL_SPEC_INT) + && (val != NO_SPEC_INT)) { + throw new ParseException("Day-of-Week values must be between 1 and 7", -1); + } + } + + if ((incr == 0 || incr == -1) && val != ALL_SPEC_INT) { + if (val != -1) { + set.add(val); + } else { + set.add(NO_SPEC); + } + + return; + } + + int startAt = val; + int stopAt = end; + + if (val == ALL_SPEC_INT && incr <= 0) { + incr = 1; + set.add(ALL_SPEC); // put in a marker, but also fill values + } + + if (type == SECOND || type == MINUTE) { + if (stopAt == -1) { + stopAt = 59; + } + if (startAt == -1 || startAt == ALL_SPEC_INT) { + startAt = 0; + } + } else if (type == HOUR) { + if (stopAt == -1) { + stopAt = 23; + } + if (startAt == -1 || startAt == ALL_SPEC_INT) { + startAt = 0; + } + } else if (type == DAY_OF_MONTH) { + if (stopAt == -1) { + stopAt = 31; + } + if (startAt == -1 || startAt == ALL_SPEC_INT) { + startAt = 1; + } + } else if (type == MONTH) { + if (stopAt == -1) { + stopAt = 12; + } + if (startAt == -1 || startAt == ALL_SPEC_INT) { + startAt = 1; + } + } else if (type == DAY_OF_WEEK) { + if (stopAt == -1) { + stopAt = 7; + } + if (startAt == -1 || startAt == ALL_SPEC_INT) { + startAt = 1; + } + } else if (type == YEAR) { + if (stopAt == -1) { + stopAt = MAX_YEAR; + } + if (startAt == -1 || startAt == ALL_SPEC_INT) { + startAt = 1970; + } + } + + // if the end of the range is before the start, then we need to overflow into + // the next day, month etc. This is done by adding the maximum amount for that + // type, and using modulus max to determine the value being added. + int max = -1; + if (stopAt < startAt) { + switch (type) { + case SECOND : max = 60; break; + case MINUTE : max = 60; break; + case HOUR : max = 24; break; + case MONTH : max = 12; break; + case DAY_OF_WEEK : max = 7; break; + case DAY_OF_MONTH : max = 31; break; + case YEAR : throw new IllegalArgumentException("Start year must be less than stop year"); + default : throw new IllegalArgumentException("Unexpected type encountered"); + } + stopAt += max; + } + + for (int i = startAt; i <= stopAt; i += incr) { + if (max == -1) { + // ie: there's no max to overflow over + set.add(i); + } else { + // take the modulus to get the real value + int i2 = i % max; + + // 1-indexed ranges should not include 0, and should include their max + if (i2 == 0 && (type == MONTH || type == DAY_OF_WEEK || type == DAY_OF_MONTH) ) { + i2 = max; + } + + set.add(i2); + } + } + } + + private TreeSet getSet(int type) { + switch (type) { + case SECOND: + return seconds; + case MINUTE: + return minutes; + case HOUR: + return hours; + case DAY_OF_MONTH: + return daysOfMonth; + case MONTH: + return months; + case DAY_OF_WEEK: + return daysOfWeek; + case YEAR: + return years; + default: + return null; + } + } + + private ValueSet getValue(int v, String s, int i) { + char c = s.charAt(i); + StringBuilder s1 = new StringBuilder(String.valueOf(v)); + while (c >= '0' && c <= '9') { + s1.append(c); + i++; + if (i >= s.length()) { + break; + } + c = s.charAt(i); + } + ValueSet val = new ValueSet(); + + val.pos = (i < s.length()) ? i : i + 1; + val.value = Integer.parseInt(s1.toString()); + return val; + } + + private int getNumericValue(String s, int i) { + int endOfVal = findNextWhiteSpace(i, s); + String val = s.substring(i, endOfVal); + return Integer.parseInt(val); + } + + private int getMonthNumber(String s) { + Integer integer = monthMap.get(s); + + if (integer == null) { + return -1; + } + + return integer; + } + + private int getDayOfWeekNumber(String s) { + Integer integer = dayMap.get(s); + + if (integer == null) { + return -1; + } + + return integer; + } + + /** + * Advance the calendar to the particular hour paying particular attention + * to daylight saving problems. + * + * @param cal the calendar to operate on + * @param hour the hour to set + */ + private static void setCalendarHour(Calendar cal, int hour) { + cal.set(java.util.Calendar.HOUR_OF_DAY, hour); + if (cal.get(java.util.Calendar.HOUR_OF_DAY) != hour && hour != 24) { + cal.set(java.util.Calendar.HOUR_OF_DAY, hour + 1); + } + } + + private static boolean isLeapYear(int year) { + return ((year % 4 == 0 && year % 100 != 0) || (year % 400 == 0)); + } + + private int getLastDayOfMonth(int monthNum, int year) { + + switch (monthNum) { + case 1: + return 31; + case 2: + return (isLeapYear(year)) ? 29 : 28; + case 3: + return 31; + case 4: + return 30; + case 5: + return 31; + case 6: + return 30; + case 7: + return 31; + case 8: + return 31; + case 9: + return 30; + case 10: + return 31; + case 11: + return 30; + case 12: + return 31; + default: + throw new IllegalArgumentException("Illegal month number: " + + monthNum); + } + } + + private static class ValueSet { + int value; + int pos; + } + + public static class ParseException extends TriggerException { + + private int errorOffset; + + public ParseException(String msg, int errorOffset) { + super(msg); + this.errorOffset = errorOffset; + } + + public ParseException(String msg, java.text.ParseException cause) { + super(msg, cause); + this.errorOffset = cause.getErrorOffset(); + } + } +} \ No newline at end of file diff --git a/src/main/java/org/elasticsearch/watcher/trigger/schedule/CronSchedule.java b/src/main/java/org/elasticsearch/watcher/trigger/schedule/CronSchedule.java index 4c0fc2bf5b3..733de519cf5 100644 --- a/src/main/java/org/elasticsearch/watcher/trigger/schedule/CronSchedule.java +++ b/src/main/java/org/elasticsearch/watcher/trigger/schedule/CronSchedule.java @@ -8,10 +8,8 @@ package org.elasticsearch.watcher.trigger.schedule; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.watcher.WatcherSettingsException; -import org.quartz.CronExpression; import java.io.IOException; -import java.text.ParseException; import java.util.ArrayList; import java.util.List; @@ -23,8 +21,7 @@ public class CronSchedule extends CronnableSchedule { public static final String TYPE = "cron"; public CronSchedule(String... crons) { - super(crons); - validate(crons); + super(validate(crons)); } @Override @@ -37,14 +34,15 @@ public class CronSchedule extends CronnableSchedule { return crons.length == 1 ? builder.value(crons[0]) : builder.value(crons); } - static void validate(String... crons) { + static String[] validate(String... crons) { for (String cron :crons) { try { - CronExpression.validateExpression(cron); - } catch (ParseException pe) { + Cron.validate(cron); + } catch (Cron.ParseException pe) { throw new ValidationException(cron, pe); } } + return crons; } public static class Parser implements Schedule.Parser { @@ -90,7 +88,7 @@ public class CronSchedule extends CronnableSchedule { private String expression; - public ValidationException(String expression, ParseException cause) { + public ValidationException(String expression, Cron.ParseException cause) { super("invalid cron expression [" + expression + "]. " + cause.getMessage()); this.expression = expression; } diff --git a/src/main/java/org/elasticsearch/watcher/trigger/schedule/CronnableSchedule.java b/src/main/java/org/elasticsearch/watcher/trigger/schedule/CronnableSchedule.java index 0efce2f37a9..c24ac74c3f8 100644 --- a/src/main/java/org/elasticsearch/watcher/trigger/schedule/CronnableSchedule.java +++ b/src/main/java/org/elasticsearch/watcher/trigger/schedule/CronnableSchedule.java @@ -6,6 +6,7 @@ package org.elasticsearch.watcher.trigger.schedule; import java.util.Arrays; +import java.util.Comparator; import java.util.Objects; /** @@ -13,14 +14,38 @@ import java.util.Objects; */ public abstract class CronnableSchedule implements Schedule { - protected final String[] crons; + private static final Comparator CRON_COMPARATOR = new Comparator() { + @Override + public int compare(Cron c1, Cron c2) { + return c1.expression().compareTo(c2.expression()); + } + }; - public CronnableSchedule(String... crons) { - this.crons = crons; - Arrays.sort(crons); + protected final Cron[] crons; + + public CronnableSchedule(String... expressions) { + this(crons(expressions)); } - public String[] crons() { + public CronnableSchedule(Cron... crons) { + assert crons.length > 0; + this.crons = crons; + Arrays.sort(crons, CRON_COMPARATOR); + } + + @Override + public long nextScheduledTimeAfter(long startTime, long time) { + if (time <= startTime) { + return startTime; + } + long nextTime = Long.MAX_VALUE; + for (Cron cron : crons) { + nextTime = Math.min(nextTime, cron.getNextValidTimeAfter(time)); + } + return nextTime; + } + + public Cron[] crons() { return crons; } @@ -40,4 +65,12 @@ public abstract class CronnableSchedule implements Schedule { final CronnableSchedule other = (CronnableSchedule) obj; return Objects.deepEquals(this.crons, other.crons); } + + static Cron[] crons(String... expressions) { + Cron[] crons = new Cron[expressions.length]; + for (int i = 0; i < crons.length; i++) { + crons[i] = new Cron(expressions[i]); + } + return crons; + } } diff --git a/src/main/java/org/elasticsearch/watcher/trigger/schedule/IntervalSchedule.java b/src/main/java/org/elasticsearch/watcher/trigger/schedule/IntervalSchedule.java index 11b8441d127..4507bf67408 100644 --- a/src/main/java/org/elasticsearch/watcher/trigger/schedule/IntervalSchedule.java +++ b/src/main/java/org/elasticsearch/watcher/trigger/schedule/IntervalSchedule.java @@ -32,6 +32,17 @@ public class IntervalSchedule implements Schedule { return TYPE; } + @Override + public long nextScheduledTimeAfter(long startTime, long time) { + if (time <= startTime) { + return startTime; + } + // advancing the time in 1 ns (we're looking for the time **after**) + time += 1; + long delta = time - startTime; + return startTime + (delta / interval.millis + 1) * interval.millis; + } + public Interval interval() { return interval; } @@ -132,10 +143,12 @@ public class IntervalSchedule implements Schedule { private final long duration; private final Unit unit; + private final long millis; // computed once public Interval(long duration, Unit unit) { this.duration = duration; this.unit = unit; + this.millis = unit.millis(duration); } public long seconds() { diff --git a/src/main/java/org/elasticsearch/watcher/trigger/schedule/Schedule.java b/src/main/java/org/elasticsearch/watcher/trigger/schedule/Schedule.java index d275c233086..80056291005 100644 --- a/src/main/java/org/elasticsearch/watcher/trigger/schedule/Schedule.java +++ b/src/main/java/org/elasticsearch/watcher/trigger/schedule/Schedule.java @@ -17,7 +17,24 @@ public interface Schedule extends ToXContent { String type(); - public static interface Parser { + /** + * Returns the next scheduled time after the given time, according to this schedule. If the given schedule + * cannot resolve the next scheduled time, then {@code -1} is returned. It really depends on the type of + * schedule to determine when {@code -1} is returned. Some schedules (e.g. IntervalSchedule) will never return + * {@code -1} as they can always compute the next scheduled time. {@code Cron} based schedules are good example + * of schedules that may return {@code -1}, for example, when the schedule only points to times that are all + * before the given time (in which case, there is no next scheduled time for the given time). + * + * Example: + * + * cron 0 0 0 * 1 ? 2013 (only points to days in January 2013) + * + * time 2015-01-01 12:00:00 (this time is in 2015) + * + */ + long nextScheduledTimeAfter(long startTime, long time); + + interface Parser { String type(); diff --git a/src/main/java/org/elasticsearch/watcher/trigger/schedule/ScheduleModule.java b/src/main/java/org/elasticsearch/watcher/trigger/schedule/ScheduleModule.java index 13b755e0258..f8863b91a23 100644 --- a/src/main/java/org/elasticsearch/watcher/trigger/schedule/ScheduleModule.java +++ b/src/main/java/org/elasticsearch/watcher/trigger/schedule/ScheduleModule.java @@ -7,10 +7,13 @@ package org.elasticsearch.watcher.trigger.schedule; import org.elasticsearch.common.inject.AbstractModule; import org.elasticsearch.common.inject.multibindings.MapBinder; +import org.elasticsearch.common.logging.Loggers; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.watcher.trigger.TriggerEngine; -import org.elasticsearch.watcher.trigger.schedule.quartz.QuartzScheduleTriggerEngine; +import org.elasticsearch.watcher.trigger.schedule.engine.*; import java.util.HashMap; +import java.util.Locale; import java.util.Map; /** @@ -30,8 +33,10 @@ public class ScheduleModule extends AbstractModule { registerScheduleParser(YearlySchedule.TYPE, YearlySchedule.Parser.class); } - public static Class triggerEngineType() { - return QuartzScheduleTriggerEngine.class; + public static Class triggerEngineType(Settings nodeSettings) { + Engine engine = Engine.resolve(nodeSettings); + Loggers.getLogger(ScheduleModule.class, nodeSettings).info("using [{}] schedule trigger engine", engine.name().toLowerCase(Locale.ROOT)); + return engine.engineType(); } public void registerScheduleParser(String parserType, Class parserClass) { @@ -50,4 +55,89 @@ public class ScheduleModule extends AbstractModule { bind(ScheduleRegistry.class).asEagerSingleton(); } + public static Settings additionalSettings(Settings nodeSettings) { + Engine engine = Engine.resolve(nodeSettings); + return engine.additionalSettings(nodeSettings); + } + + public enum Engine { + + SCHEDULER() { + @Override + protected Class engineType() { + return SchedulerScheduleTriggerEngine.class; + } + + @Override + protected Settings additionalSettings(Settings nodeSettings) { + return SchedulerScheduleTriggerEngine.additionalSettings(nodeSettings); + } + }, + + HASHWHEEL() { + @Override + protected Class engineType() { + return HashWheelScheduleTriggerEngine.class; + } + + @Override + protected Settings additionalSettings(Settings nodeSettings) { + return HashWheelScheduleTriggerEngine.additionalSettings(nodeSettings); + } + }, + + QUARTZ() { + @Override + protected Class engineType() { + return QuartzScheduleTriggerEngine.class; + } + + @Override + protected Settings additionalSettings(Settings nodeSettings) { + return QuartzScheduleTriggerEngine.additionalSettings(nodeSettings); + } + }, + + TIMER() { + @Override + protected Class engineType() { + return TimerTickerScheduleTriggerEngine.class; + } + + @Override + protected Settings additionalSettings(Settings nodeSettings) { + return TimerTickerScheduleTriggerEngine.additionalSettings(nodeSettings); + } + }, + + SIMPLE() { + @Override + protected Class engineType() { + return SimpleTickerScheduleTriggerEngine.class; + } + + @Override + protected Settings additionalSettings(Settings nodeSettings) { + return SimpleTickerScheduleTriggerEngine.additionalSettings(nodeSettings); + } + }; + + protected abstract Class engineType(); + + protected abstract Settings additionalSettings(Settings nodeSettings); + + public static Engine resolve(Settings settings) { + String engine = settings.getComponentSettings(ScheduleModule.class).get("engine", "scheduler"); + switch (engine.toLowerCase(Locale.ROOT)) { + case "quartz" : return QUARTZ; + case "timer" : return TIMER; + case "simple" : return SIMPLE; + case "hashwheel" : return HASHWHEEL; + case "scheduler" : return SCHEDULER; + default: + return SCHEDULER; + } + } + } + } diff --git a/src/main/java/org/elasticsearch/watcher/trigger/schedule/ScheduleTrigger.java b/src/main/java/org/elasticsearch/watcher/trigger/schedule/ScheduleTrigger.java index a6631a28dca..9d36c1fbc35 100644 --- a/src/main/java/org/elasticsearch/watcher/trigger/schedule/ScheduleTrigger.java +++ b/src/main/java/org/elasticsearch/watcher/trigger/schedule/ScheduleTrigger.java @@ -5,7 +5,6 @@ */ package org.elasticsearch.watcher.trigger.schedule; -import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.watcher.trigger.Trigger; diff --git a/src/main/java/org/elasticsearch/watcher/trigger/schedule/ScheduleTriggerEngine.java b/src/main/java/org/elasticsearch/watcher/trigger/schedule/ScheduleTriggerEngine.java index 26d1ceb4b0d..b4720d9be44 100644 --- a/src/main/java/org/elasticsearch/watcher/trigger/schedule/ScheduleTriggerEngine.java +++ b/src/main/java/org/elasticsearch/watcher/trigger/schedule/ScheduleTriggerEngine.java @@ -6,8 +6,11 @@ package org.elasticsearch.watcher.trigger.schedule; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.watcher.trigger.AbstractTriggerEngine; +import java.io.IOException; + /** * */ @@ -15,12 +18,26 @@ public abstract class ScheduleTriggerEngine extends AbstractTriggerEngine jobs) { + logger.debug("starting schedule engine..."); + + this.timer = new HashedWheelTimer(EsExecutors.daemonThreadFactory("trigger_engine_scheduler"), tickDuration.millis(), TimeUnit.MILLISECONDS, ticksPerWheel); + + // calibrate with round clock + while (clock.millis() % 1000 > 15) { + } + timer.start(); + + long starTime = clock.millis(); + List schedules = new ArrayList<>(); + for (Job job : jobs) { + if (job.trigger() instanceof ScheduleTrigger) { + ScheduleTrigger trigger = (ScheduleTrigger) job.trigger(); + schedules.add(new ActiveSchedule(job.name(), trigger.schedule(), starTime)); + } + } + this.schedules = new Schedules(schedules); + logger.debug("schedule engine started at [{}]", DateTime.now()); + } + + @Override + public void stop() { + logger.debug("stopping schedule engine..."); + timer.stop(); + executor.getQueue().clear(); + logger.debug("schedule engine stopped"); + } + + @Override + public void add(Job job) { + assert job.trigger() instanceof ScheduleTrigger; + ScheduleTrigger trigger = (ScheduleTrigger) job.trigger(); + ActiveSchedule schedule = new ActiveSchedule(job.name(), trigger.schedule(), clock.millis()); + schedules = schedules.add(schedule); + } + + @Override + public boolean remove(String jobName) { + Schedules newSchedules = schedules.remove(jobName); + if (newSchedules == null) { + return false; + } + schedules = newSchedules; + return true; + } + + void notifyListeners(String name, long triggeredTime, long scheduledTime) { + logger.trace("triggered job [{}] at [{}] (scheduled time was [{}])", name, new DateTime(triggeredTime), new DateTime(scheduledTime)); + final ScheduleTriggerEvent event = new ScheduleTriggerEvent(new DateTime(triggeredTime), new DateTime(scheduledTime)); + for (Listener listener : listeners) { + executor.execute(new ListenerRunnable(listener, name, event)); + } + } + + class ActiveSchedule implements TimerTask { + + private final String name; + private final Schedule schedule; + private final long startTime; + + private volatile Timeout timeout; + private volatile long scheduledTime; + + public ActiveSchedule(String name, Schedule schedule, long startTime) { + this.name = name; + this.schedule = schedule; + this.startTime = startTime; + // we don't want the schedule to trigger on the start time itself, so we compute + // the next scheduled time by simply computing the schedule time on the startTime + 1 + this.scheduledTime = schedule.nextScheduledTimeAfter(startTime, startTime + 1); + long delay = scheduledTime > 0 ? scheduledTime - clock.millis() : -1; + if (delay > 0) { + timeout = timer.newTimeout(this, delay, TimeUnit.MILLISECONDS); + } else { + timeout = null; + } + } + + @Override + public void run(Timeout timeout) { + long triggeredTime = clock.millis(); + notifyListeners(name, triggeredTime, scheduledTime); + scheduledTime = schedule.nextScheduledTimeAfter(startTime, triggeredTime); + long delay = scheduledTime > 0 ? scheduledTime - triggeredTime : -1; + if (delay > 0) { + this.timeout = timer.newTimeout(this, delay, TimeUnit.MILLISECONDS); + } + } + + public void cancel() { + if (timeout != null) { + timeout.cancel(); + } + } + } + + static class ListenerRunnable implements Runnable { + + private final Listener listener; + private final String jobName; + private final ScheduleTriggerEvent event; + + public ListenerRunnable(Listener listener, String jobName, ScheduleTriggerEvent event) { + this.listener = listener; + this.jobName = jobName; + this.event = event; + } + + @Override + public void run() { + listener.triggered(jobName, event); + } + } + + static class Schedules { + + private final ActiveSchedule[] schedules; + private final ImmutableMap scheduleByName; + + Schedules(Collection schedules) { + ImmutableMap.Builder builder = ImmutableMap.builder(); + this.schedules = new ActiveSchedule[schedules.size()]; + int i = 0; + for (ActiveSchedule schedule : schedules) { + builder.put(schedule.name, schedule); + this.schedules[i++] = schedule; + } + this.scheduleByName = builder.build(); + } + + public Schedules(ActiveSchedule[] schedules, ImmutableMap scheduleByName) { + this.schedules = schedules; + this.scheduleByName = scheduleByName; + } + + public long getNextScheduledTime() { + long time = -1; + for (ActiveSchedule schedule : schedules) { + if (time < 0) { + time = schedule.scheduledTime; + } else { + time = Math.min(time, schedule.scheduledTime); + } + } + return time; + } + + public Schedules add(ActiveSchedule schedule) { + boolean replacing = scheduleByName.containsKey(schedule.name); + if (!replacing) { + ActiveSchedule[] newSchedules = new ActiveSchedule[schedules.length + 1]; + System.arraycopy(schedules, 0, newSchedules, 0, schedules.length); + newSchedules[schedules.length] = schedule; + ImmutableMap newScheduleByName = ImmutableMap.builder() + .putAll(scheduleByName) + .put(schedule.name, schedule) + .build(); + return new Schedules(newSchedules, newScheduleByName); + } + ActiveSchedule[] newSchedules = new ActiveSchedule[schedules.length]; + ImmutableMap.Builder builder = ImmutableMap.builder(); + for (int i = 0; i < schedules.length; i++) { + ActiveSchedule sched = schedules[i].name.equals(schedule.name) ? schedule : schedules[i]; + schedules[i] = sched; + builder.put(sched.name, sched); + } + return new Schedules(newSchedules, builder.build()); + } + + public Schedules remove(String name) { + if (!scheduleByName.containsKey(name)) { + return null; + } + ImmutableMap.Builder builder = ImmutableMap.builder(); + ActiveSchedule[] newSchedules = new ActiveSchedule[schedules.length - 1]; + int i = 0; + for (ActiveSchedule schedule : schedules) { + if (!schedule.name.equals(name)) { + newSchedules[i++] = schedule; + builder.put(schedule.name, schedule); + } else { + schedule.cancel(); + } + } + return new Schedules(newSchedules, builder.build()); + } + } + +} diff --git a/src/main/java/org/elasticsearch/watcher/trigger/schedule/quartz/QuartzScheduleTriggerEngine.java b/src/main/java/org/elasticsearch/watcher/trigger/schedule/engine/QuartzScheduleTriggerEngine.java similarity index 74% rename from src/main/java/org/elasticsearch/watcher/trigger/schedule/quartz/QuartzScheduleTriggerEngine.java rename to src/main/java/org/elasticsearch/watcher/trigger/schedule/engine/QuartzScheduleTriggerEngine.java index bf83f33408d..2fc882e2a07 100644 --- a/src/main/java/org/elasticsearch/watcher/trigger/schedule/quartz/QuartzScheduleTriggerEngine.java +++ b/src/main/java/org/elasticsearch/watcher/trigger/schedule/engine/QuartzScheduleTriggerEngine.java @@ -3,17 +3,18 @@ * or more contributor license agreements. Licensed under the Elastic License; * you may not use this file except in compliance with the Elastic License. */ -package org.elasticsearch.watcher.trigger.schedule.quartz; +package org.elasticsearch.watcher.trigger.schedule.engine; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.joda.time.DateTime; import org.elasticsearch.common.joda.time.DateTimeZone; +import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor; -import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.watcher.WatcherPlugin; import org.elasticsearch.watcher.WatcherSettingsException; +import org.elasticsearch.watcher.support.ThreadPoolSettingsBuilder; import org.elasticsearch.watcher.support.clock.Clock; import org.elasticsearch.watcher.trigger.TriggerException; import org.elasticsearch.watcher.trigger.schedule.*; @@ -21,31 +22,39 @@ import org.quartz.*; import org.quartz.impl.StdSchedulerFactory; import org.quartz.simpl.SimpleJobFactory; -import java.io.IOException; import java.util.*; -import static org.elasticsearch.watcher.trigger.schedule.quartz.WatcherQuartzJob.jobDetail; - /** * */ public class QuartzScheduleTriggerEngine extends ScheduleTriggerEngine { - private final ScheduleRegistry scheduleRegistry; + public static final String THREAD_POOL_NAME = "watcher_scheduler"; - // Not happy about it, but otherwise we're stuck with Quartz's SimpleThreadPool - private volatile static ThreadPool threadPool; + public static Settings additionalSettings(Settings nodeSettings) { + Settings settings = nodeSettings.getAsSettings("threadpool." + THREAD_POOL_NAME); + if (!settings.names().isEmpty()) { + // scheduler TP is already configured in the node settings + // no need for additional settings + return ImmutableSettings.EMPTY; + } + int availableProcessors = EsExecutors.boundedNumberOfProcessors(settings); + return new ThreadPoolSettingsBuilder.Fixed(THREAD_POOL_NAME) + .size(availableProcessors) + .queueSize(1000) + .build(); + } private final Clock clock; private final DateTimeZone defaultTimeZone; + private final EsThreadPoolExecutor executor; private volatile org.quartz.Scheduler scheduler; @Inject public QuartzScheduleTriggerEngine(Settings settings, ScheduleRegistry scheduleRegistry, ThreadPool threadPool, Clock clock) { - super(settings); - this.scheduleRegistry = scheduleRegistry; - QuartzScheduleTriggerEngine.threadPool = threadPool; + super(settings, scheduleRegistry); + this.executor = (EsThreadPoolExecutor) threadPool.executor(THREAD_POOL_NAME); this.clock = clock; String timeZoneStr = componentSettings.get("time_zone", "UTC"); try { @@ -77,7 +86,7 @@ public class QuartzScheduleTriggerEngine extends ScheduleTriggerEngine { for (Job job : jobs) { if (job.trigger() instanceof ScheduleTrigger) { ScheduleTrigger trigger = (ScheduleTrigger) job.trigger(); - quartzJobs.put(jobDetail(job.name(), this), createTrigger(trigger.schedule(), defaultTimeZone, clock)); + quartzJobs.put(WatcherQuartzJob.jobDetail(job.name(), this), createTrigger(trigger.schedule(), defaultTimeZone, clock)); } } scheduler.scheduleJobs(quartzJobs, false); @@ -95,6 +104,7 @@ public class QuartzScheduleTriggerEngine extends ScheduleTriggerEngine { logger.info("Stopping scheduler..."); scheduler.shutdown(true); this.scheduler = null; + executor.getQueue().clear(); logger.info("Stopped scheduler"); } } catch (org.quartz.SchedulerException se){ @@ -108,7 +118,7 @@ public class QuartzScheduleTriggerEngine extends ScheduleTriggerEngine { ScheduleTrigger trigger = (ScheduleTrigger) job.trigger(); try { logger.trace("scheduling [{}] with schedule [{}]", job.name(), trigger.schedule()); - scheduler.scheduleJob(jobDetail(job.name(), this), createTrigger(trigger.schedule(), defaultTimeZone, clock), true); + scheduler.scheduleJob(WatcherQuartzJob.jobDetail(job.name(), this), createTrigger(trigger.schedule(), defaultTimeZone, clock), true); } catch (org.quartz.SchedulerException se) { logger.error("failed to schedule job",se); throw new TriggerException("failed to schedule job", se); @@ -128,9 +138,9 @@ public class QuartzScheduleTriggerEngine extends ScheduleTriggerEngine { static Set createTrigger(Schedule schedule, DateTimeZone timeZone, Clock clock) { HashSet triggers = new HashSet<>(); if (schedule instanceof CronnableSchedule) { - for (String cron : ((CronnableSchedule) schedule).crons()) { + for (Cron cron : ((CronnableSchedule) schedule).crons()) { triggers.add(TriggerBuilder.newTrigger() - .withSchedule(CronScheduleBuilder.cronSchedule(cron).inTimeZone(timeZone.toTimeZone())) + .withSchedule(CronScheduleBuilder.cronSchedule(cron.expression()).inTimeZone(timeZone.toTimeZone())) .startAt(clock.now().toDate()) .build()); } @@ -146,38 +156,36 @@ public class QuartzScheduleTriggerEngine extends ScheduleTriggerEngine { return triggers; } - - - @Override - public ScheduleTrigger parseTrigger(String context, XContentParser parser) throws IOException { - Schedule schedule = scheduleRegistry.parse(context, parser); - return new ScheduleTrigger(schedule); - } - - @Override - public ScheduleTriggerEvent parseTriggerEvent(String context, XContentParser parser) throws IOException { - return ScheduleTriggerEvent.parse(context, parser); - } - void notifyListeners(String name, JobExecutionContext ctx) { ScheduleTriggerEvent event = new ScheduleTriggerEvent(new DateTime(ctx.getFireTime()), new DateTime(ctx.getScheduledFireTime())); for (Listener listener : listeners) { - listener.triggered(name, event); + executor.execute(new ListenerRunnable(listener, name, event)); } } - // This Quartz thread pool will always accept. On this thread we will only index a watch record and add it to the work queue - public static final class WatcherQuartzThreadPool implements org.quartz.spi.ThreadPool { + static class ListenerRunnable implements Runnable { - private final EsThreadPoolExecutor executor; + private final Listener listener; + private final String jobName; + private final ScheduleTriggerEvent event; - public WatcherQuartzThreadPool() { - this.executor = (EsThreadPoolExecutor) threadPool.executor(WatcherPlugin.SCHEDULER_THREAD_POOL_NAME); + public ListenerRunnable(Listener listener, String jobName, ScheduleTriggerEvent event) { + this.listener = listener; + this.jobName = jobName; + this.event = event; } + @Override + public void run() { + listener.triggered(jobName, event); + } + } + + public static final class WatcherQuartzThreadPool implements org.quartz.spi.ThreadPool { + @Override public boolean runInThread(Runnable runnable) { - executor.execute(runnable); + runnable.run(); return true; } diff --git a/src/main/java/org/elasticsearch/watcher/trigger/schedule/engine/SchedulerScheduleTriggerEngine.java b/src/main/java/org/elasticsearch/watcher/trigger/schedule/engine/SchedulerScheduleTriggerEngine.java new file mode 100644 index 00000000000..2599fc1f440 --- /dev/null +++ b/src/main/java/org/elasticsearch/watcher/trigger/schedule/engine/SchedulerScheduleTriggerEngine.java @@ -0,0 +1,250 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.watcher.trigger.schedule.engine; + +import org.elasticsearch.common.collect.ImmutableMap; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.joda.time.DateTime; +import org.elasticsearch.common.settings.ImmutableSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.EsExecutors; +import org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.watcher.support.ThreadPoolSettingsBuilder; +import org.elasticsearch.watcher.support.clock.Clock; +import org.elasticsearch.watcher.trigger.schedule.*; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +/** + * + */ +public class SchedulerScheduleTriggerEngine extends ScheduleTriggerEngine { + + public static final String THREAD_POOL_NAME = "watcher_scheduler"; + + public static Settings additionalSettings(Settings nodeSettings) { + Settings settings = nodeSettings.getAsSettings("threadpool." + THREAD_POOL_NAME); + if (!settings.names().isEmpty()) { + // scheduler TP is already configured in the node settings + // no need for additional settings + return ImmutableSettings.EMPTY; + } + int availableProcessors = EsExecutors.boundedNumberOfProcessors(settings); + return new ThreadPoolSettingsBuilder.Fixed(THREAD_POOL_NAME) + .size(availableProcessors) + .queueSize(1000) + .build(); + } + + private final Clock clock; + private volatile Schedules schedules; + private ScheduledExecutorService scheduler; + private EsThreadPoolExecutor executor; + + @Inject + public SchedulerScheduleTriggerEngine(Settings settings, Clock clock, ScheduleRegistry scheduleRegistry, ThreadPool threadPool) { + super(settings, scheduleRegistry); + this.clock = clock; + this.executor = (EsThreadPoolExecutor) threadPool.executor(THREAD_POOL_NAME); + } + + @Override + public void start(Collection jobs) { + logger.debug("starting schedule engine..."); + this.scheduler = Executors.newScheduledThreadPool(1, EsExecutors.daemonThreadFactory("trigger_engine_scheduler")); + long starTime = clock.millis(); + List schedules = new ArrayList<>(); + for (Job job : jobs) { + if (job.trigger() instanceof ScheduleTrigger) { + ScheduleTrigger trigger = (ScheduleTrigger) job.trigger(); + schedules.add(new ActiveSchedule(job.name(), trigger.schedule(), starTime)); + } + } + this.schedules = new Schedules(schedules); + logger.debug("schedule engine started at [{}]", DateTime.now()); + } + + @Override + public void stop() { + logger.debug("stopping schedule engine..."); + scheduler.shutdownNow(); + try { + scheduler.awaitTermination(4, TimeUnit.SECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + executor.getQueue().clear(); + logger.debug("schedule engine stopped"); + } + + @Override + public void add(Job job) { + assert job.trigger() instanceof ScheduleTrigger; + ScheduleTrigger trigger = (ScheduleTrigger) job.trigger(); + ActiveSchedule schedule = new ActiveSchedule(job.name(), trigger.schedule(), clock.millis()); + schedules = schedules.add(schedule); + } + + @Override + public boolean remove(String jobName) { + Schedules newSchedules = schedules.remove(jobName); + if (newSchedules == null) { + return false; + } + schedules = newSchedules; + return true; + } + + void notifyListeners(String name, long triggeredTime, long scheduledTime) { + logger.trace("triggered job [{}] at [{}] (scheduled time was [{}])", name, new DateTime(triggeredTime), new DateTime(scheduledTime)); + final ScheduleTriggerEvent event = new ScheduleTriggerEvent(new DateTime(triggeredTime), new DateTime(scheduledTime)); + for (Listener listener : listeners) { + executor.execute(new ListenerRunnable(listener, name, event)); + } + } + + class ActiveSchedule implements Runnable { + + private final String name; + private final Schedule schedule; + private final long startTime; + + private volatile ScheduledFuture future; + private volatile long scheduledTime; + + public ActiveSchedule(String name, Schedule schedule, long startTime) { + this.name = name; + this.schedule = schedule; + this.startTime = startTime; + // we don't want the schedule to trigger on the start time itself, so we compute + // the next scheduled time by simply computing the schedule time on the startTime + 1 + this.scheduledTime = schedule.nextScheduledTimeAfter(startTime, startTime + 1); + long delay = scheduledTime > 0 ? scheduledTime - clock.millis() : -1; + if (delay > 0) { + future = scheduler.schedule(this, delay, TimeUnit.MILLISECONDS); + } else { + future = null; + } + } + + @Override + public void run() { + long triggeredTime = clock.millis(); + notifyListeners(name, triggeredTime, scheduledTime); + scheduledTime = schedule.nextScheduledTimeAfter(startTime, triggeredTime); + long delay = scheduledTime > 0 ? scheduledTime - triggeredTime : -1; + if (delay > 0) { + future = scheduler.schedule(this, delay, TimeUnit.MILLISECONDS); + } + } + + public void cancel() { + if (future != null) { + future.cancel(true); + } + } + } + + static class ListenerRunnable implements Runnable { + + private final Listener listener; + private final String jobName; + private final ScheduleTriggerEvent event; + + public ListenerRunnable(Listener listener, String jobName, ScheduleTriggerEvent event) { + this.listener = listener; + this.jobName = jobName; + this.event = event; + } + + @Override + public void run() { + listener.triggered(jobName, event); + } + } + + static class Schedules { + + private final ActiveSchedule[] schedules; + private final ImmutableMap scheduleByName; + + Schedules(Collection schedules) { + ImmutableMap.Builder builder = ImmutableMap.builder(); + this.schedules = new ActiveSchedule[schedules.size()]; + int i = 0; + for (ActiveSchedule schedule : schedules) { + builder.put(schedule.name, schedule); + this.schedules[i++] = schedule; + } + this.scheduleByName = builder.build(); + } + + public Schedules(ActiveSchedule[] schedules, ImmutableMap scheduleByName) { + this.schedules = schedules; + this.scheduleByName = scheduleByName; + } + + public long getNextScheduledTime() { + long time = -1; + for (ActiveSchedule schedule : schedules) { + if (time < 0) { + time = schedule.scheduledTime; + } else { + time = Math.min(time, schedule.scheduledTime); + } + } + return time; + } + + public Schedules add(ActiveSchedule schedule) { + boolean replacing = scheduleByName.containsKey(schedule.name); + if (!replacing) { + ActiveSchedule[] newSchedules = new ActiveSchedule[schedules.length + 1]; + System.arraycopy(schedules, 0, newSchedules, 0, schedules.length); + newSchedules[schedules.length] = schedule; + ImmutableMap newScheduleByName = ImmutableMap.builder() + .putAll(scheduleByName) + .put(schedule.name, schedule) + .build(); + return new Schedules(newSchedules, newScheduleByName); + } + ActiveSchedule[] newSchedules = new ActiveSchedule[schedules.length]; + ImmutableMap.Builder builder = ImmutableMap.builder(); + for (int i = 0; i < schedules.length; i++) { + ActiveSchedule sched = schedules[i].name.equals(schedule.name) ? schedule : schedules[i]; + schedules[i] = sched; + builder.put(sched.name, sched); + } + return new Schedules(newSchedules, builder.build()); + } + + public Schedules remove(String name) { + if (!scheduleByName.containsKey(name)) { + return null; + } + ImmutableMap.Builder builder = ImmutableMap.builder(); + ActiveSchedule[] newSchedules = new ActiveSchedule[schedules.length - 1]; + int i = 0; + for (ActiveSchedule schedule : schedules) { + if (!schedule.name.equals(name)) { + newSchedules[i++] = schedule; + builder.put(schedule.name, schedule); + } else { + schedule.cancel(); + } + } + return new Schedules(newSchedules, builder.build()); + } + } + +} diff --git a/src/main/java/org/elasticsearch/watcher/trigger/schedule/engine/SimpleTickerScheduleTriggerEngine.java b/src/main/java/org/elasticsearch/watcher/trigger/schedule/engine/SimpleTickerScheduleTriggerEngine.java new file mode 100644 index 00000000000..b182a117b85 --- /dev/null +++ b/src/main/java/org/elasticsearch/watcher/trigger/schedule/engine/SimpleTickerScheduleTriggerEngine.java @@ -0,0 +1,198 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.watcher.trigger.schedule.engine; + +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.joda.time.DateTime; +import org.elasticsearch.common.settings.ImmutableSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.EsExecutors; +import org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.watcher.support.ThreadPoolSettingsBuilder; +import org.elasticsearch.watcher.support.clock.Clock; +import org.elasticsearch.watcher.trigger.schedule.*; + +import java.util.Collection; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; + +/** + * + */ +public class SimpleTickerScheduleTriggerEngine extends ScheduleTriggerEngine { + + public static final String THREAD_POOL_NAME = "watcher_scheduler"; + + public static Settings additionalSettings(Settings nodeSettings) { + Settings settings = nodeSettings.getAsSettings("threadpool." + THREAD_POOL_NAME); + if (!settings.names().isEmpty()) { + // scheduler TP is already configured in the node settings + // no need for additional settings + return ImmutableSettings.EMPTY; + } + int availableProcessors = EsExecutors.boundedNumberOfProcessors(settings); + return new ThreadPoolSettingsBuilder.Fixed(THREAD_POOL_NAME) + .size(availableProcessors) + .queueSize(1000) + .build(); + } + + private final Clock clock; + + private volatile Map schedules; + private Ticker ticker; + private EsThreadPoolExecutor executor; + + @Inject + public SimpleTickerScheduleTriggerEngine(Settings settings, Clock clock, ScheduleRegistry scheduleRegistry, ThreadPool threadPool) { + super(settings, scheduleRegistry); + this.schedules = new ConcurrentHashMap<>(); + this.clock = clock; + this.executor = (EsThreadPoolExecutor) threadPool.executor(THREAD_POOL_NAME); + } + + @Override + public void start(Collection jobs) { + long starTime = clock.millis(); + Map schedules = new ConcurrentHashMap<>(); + for (Job job : jobs) { + if (job.trigger() instanceof ScheduleTrigger) { + ScheduleTrigger trigger = (ScheduleTrigger) job.trigger(); + schedules.put(job.name(), new ActiveSchedule(job.name(), trigger.schedule(), starTime)); + } + } + this.schedules = schedules; + this.ticker = new Ticker(); + } + + @Override + public void stop() { + ticker.close(); + executor.getQueue().clear(); + } + + @Override + public void add(Job job) { + assert job.trigger() instanceof ScheduleTrigger; + ScheduleTrigger trigger = (ScheduleTrigger) job.trigger(); + schedules.put(job.name(), new ActiveSchedule(job.name(), trigger.schedule(), clock.millis())); + } + + @Override + public boolean remove(String jobName) { + return schedules.remove(jobName) != null; + } + + void checkJobs() { + long triggeredTime = clock.millis(); + for (ActiveSchedule schedule : schedules.values()) { + long scheduledTime = schedule.check(triggeredTime); + if (scheduledTime > 0) { + notifyListeners(schedule.name, triggeredTime, scheduledTime); + } + } + } + + void notifyListeners(String name, long triggeredTime, long scheduledTime) { + logger.trace("triggered job [{}] at [{}] (scheduled time was [{}])", name, new DateTime(triggeredTime), new DateTime(scheduledTime)); + final ScheduleTriggerEvent event = new ScheduleTriggerEvent(new DateTime(triggeredTime), new DateTime(scheduledTime)); + for (Listener listener : listeners) { + executor.execute(new ListenerRunnable(listener, name, event)); + } + } + + static class ActiveSchedule { + + private final String name; + private final Schedule schedule; + private final long startTime; + + private volatile long scheduledTime; + + public ActiveSchedule(String name, Schedule schedule, long startTime) { + this.name = name; + this.schedule = schedule; + this.startTime = startTime; + // we don't want the schedule to trigger on the start time itself, so we compute + // the next scheduled time by simply computing the schedule time on the startTime + 1 + this.scheduledTime = schedule.nextScheduledTimeAfter(startTime, startTime + 1); + } + + /** + * Checks whether the given time is the same or after the scheduled time of this schedule. If so, the scheduled time is + * returned a new scheduled time is computed and set. Otherwise (the given time is before the scheduled time), {@code -1} + * is returned. + */ + public long check(long time) { + if (time < scheduledTime) { + return -1; + } + long prevScheduledTime = scheduledTime == 0 ? time : scheduledTime; + scheduledTime = schedule.nextScheduledTimeAfter(startTime, scheduledTime); + return prevScheduledTime; + } + } + + static class ListenerRunnable implements Runnable { + + private final Listener listener; + private final String jobName; + private final ScheduleTriggerEvent event; + + public ListenerRunnable(Listener listener, String jobName, ScheduleTriggerEvent event) { + this.listener = listener; + this.jobName = jobName; + this.event = event; + } + + @Override + public void run() { + listener.triggered(jobName, event); + } + } + + class Ticker extends Thread { + + private volatile boolean active = true; + private final CountDownLatch closeLatch = new CountDownLatch(1); + + public Ticker() { + super("ticker-schedule-trigger-engine"); + setDaemon(true); + start(); + } + + @Override + public void run() { + + // calibrate with round clock + while (clock.millis() % 1000 > 15) { + } + while (active) { + logger.trace("checking jobs [{}]", DateTime.now()); + checkJobs(); + try { + sleep(1000); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + closeLatch.countDown(); + } + + public void close() { + active = false; + try { + closeLatch.await(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + } + +} diff --git a/src/main/java/org/elasticsearch/watcher/trigger/schedule/engine/TimerTickerScheduleTriggerEngine.java b/src/main/java/org/elasticsearch/watcher/trigger/schedule/engine/TimerTickerScheduleTriggerEngine.java new file mode 100644 index 00000000000..439fe585029 --- /dev/null +++ b/src/main/java/org/elasticsearch/watcher/trigger/schedule/engine/TimerTickerScheduleTriggerEngine.java @@ -0,0 +1,168 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.watcher.trigger.schedule.engine; + +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.joda.time.DateTime; +import org.elasticsearch.common.settings.ImmutableSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.EsExecutors; +import org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.watcher.support.ThreadPoolSettingsBuilder; +import org.elasticsearch.watcher.support.clock.Clock; +import org.elasticsearch.watcher.trigger.schedule.*; + +import java.util.Collection; +import java.util.Map; +import java.util.Timer; +import java.util.TimerTask; +import java.util.concurrent.ConcurrentHashMap; + +/** + * + */ +public class TimerTickerScheduleTriggerEngine extends ScheduleTriggerEngine { + + public static final String THREAD_POOL_NAME = "watcher_scheduler"; + + public static Settings additionalSettings(Settings nodeSettings) { + Settings settings = nodeSettings.getAsSettings("threadpool." + THREAD_POOL_NAME); + if (!settings.names().isEmpty()) { + // scheduler TP is already configured in the node settings + // no need for additional settings + return ImmutableSettings.EMPTY; + } + int availableProcessors = EsExecutors.boundedNumberOfProcessors(settings); + return new ThreadPoolSettingsBuilder.Fixed(THREAD_POOL_NAME) + .size(availableProcessors) + .queueSize(1000) + .build(); + } + + private final Clock clock; + + private volatile Map schedules; + private Timer timer; + private TimerTask ticker; + private EsThreadPoolExecutor executor; + + @Inject + public TimerTickerScheduleTriggerEngine(Settings settings, Clock clock, ScheduleRegistry scheduleRegistry, ThreadPool threadPool) { + super(settings, scheduleRegistry); + this.schedules = new ConcurrentHashMap<>(); + this.clock = clock; + this.executor = (EsThreadPoolExecutor) threadPool.executor(THREAD_POOL_NAME); + } + + @Override + public void start(Collection jobs) { + long starTime = clock.millis(); + Map schedules = new ConcurrentHashMap<>(); + for (Job job : jobs) { + if (job.trigger() instanceof ScheduleTrigger) { + ScheduleTrigger trigger = (ScheduleTrigger) job.trigger(); + schedules.put(job.name(), new ActiveSchedule(job.name(), trigger.schedule(), starTime)); + } + } + this.schedules = schedules; + this.ticker = new TimerTask() { + @Override + public void run() { + checkJobs(); + } + }; + this.timer = new Timer("ticker-schedule-trigger-engine", true); + this.timer.scheduleAtFixedRate(ticker, clock.millis() % 1000 , 10); + } + + @Override + public void stop() { + ticker.cancel(); + timer.cancel(); + executor.getQueue().clear(); + } + + @Override + public void add(Job job) { + assert job.trigger() instanceof ScheduleTrigger; + ScheduleTrigger trigger = (ScheduleTrigger) job.trigger(); + schedules.put(job.name(), new ActiveSchedule(job.name(), trigger.schedule(), clock.millis())); + } + + @Override + public boolean remove(String jobName) { + return schedules.remove(jobName) != null; + } + + void checkJobs() { + long triggeredTime = clock.millis(); + for (ActiveSchedule schedule : schedules.values()) { + long scheduledTime = schedule.check(triggeredTime); + if (scheduledTime > 0) { + notifyListeners(schedule.name, triggeredTime, scheduledTime); + } + } + } + + void notifyListeners(String name, long triggeredTime, long scheduledTime) { + final ScheduleTriggerEvent event = new ScheduleTriggerEvent(new DateTime(triggeredTime), new DateTime(scheduledTime)); + for (Listener listener : listeners) { + executor.execute(new ListenerRunnable(listener, name, event)); + } + } + + static class ActiveSchedule { + + private final String name; + private final Schedule schedule; + private final long startTime; + + private volatile long scheduledTime; + + public ActiveSchedule(String name, Schedule schedule, long startTime) { + this.name = name; + this.schedule = schedule; + this.startTime = startTime; + // we don't want the schedule to trigger on the start time itself, so we compute + // the next scheduled time by simply computing the schedule time on the startTime + 1 + this.scheduledTime = schedule.nextScheduledTimeAfter(startTime, startTime + 1); + } + + /** + * Checks whether the given time is the same or after the scheduled time of this schedule. If so, the scheduled time is + * returned a new scheduled time is computed and set. Otherwise (the given time is before the scheduled time), {@code -1} + * is returned. + */ + public long check(long time) { + if (time < scheduledTime) { + return -1; + } + long prevScheduledTime = scheduledTime == 0 ? time : scheduledTime; + scheduledTime = schedule.nextScheduledTimeAfter(startTime, scheduledTime); + return prevScheduledTime; + } + } + + static class ListenerRunnable implements Runnable { + + private final Listener listener; + private final String jobName; + private final ScheduleTriggerEvent event; + + public ListenerRunnable(Listener listener, String jobName, ScheduleTriggerEvent event) { + this.listener = listener; + this.jobName = jobName; + this.event = event; + } + + @Override + public void run() { + listener.triggered(jobName, event); + } + } + +} diff --git a/src/main/java/org/elasticsearch/watcher/trigger/schedule/quartz/WatcherQuartzJob.java b/src/main/java/org/elasticsearch/watcher/trigger/schedule/engine/WatcherQuartzJob.java similarity index 95% rename from src/main/java/org/elasticsearch/watcher/trigger/schedule/quartz/WatcherQuartzJob.java rename to src/main/java/org/elasticsearch/watcher/trigger/schedule/engine/WatcherQuartzJob.java index 7039d556b58..cb334cc5e7e 100644 --- a/src/main/java/org/elasticsearch/watcher/trigger/schedule/quartz/WatcherQuartzJob.java +++ b/src/main/java/org/elasticsearch/watcher/trigger/schedule/engine/WatcherQuartzJob.java @@ -3,7 +3,7 @@ * or more contributor license agreements. Licensed under the Elastic License; * you may not use this file except in compliance with the Elastic License. */ -package org.elasticsearch.watcher.trigger.schedule.quartz; +package org.elasticsearch.watcher.trigger.schedule.engine; import org.quartz.*; diff --git a/src/main/java/org/elasticsearch/watcher/trigger/schedule/support/YearTimes.java b/src/main/java/org/elasticsearch/watcher/trigger/schedule/support/YearTimes.java index 0afe03327ff..c1e6b6c6604 100644 --- a/src/main/java/org/elasticsearch/watcher/trigger/schedule/support/YearTimes.java +++ b/src/main/java/org/elasticsearch/watcher/trigger/schedule/support/YearTimes.java @@ -72,7 +72,8 @@ public class YearTimes implements Times { String daysStr = Ints.join(",", this.days); daysStr = daysStr.replace("32", "L"); String monthsStr = Joiner.on(",").join(months); - crons.add("0 " + minsStr + " " + hrsStr + " " + daysStr + " " + monthsStr + " ?"); + String expression = "0 " + minsStr + " " + hrsStr + " " + daysStr + " " + monthsStr + " ?"; + crons.add(expression); } return crons; } diff --git a/src/main/java/org/elasticsearch/watcher/trigger/schedule/tool/CronEvalTool.java b/src/main/java/org/elasticsearch/watcher/trigger/schedule/tool/CronEvalTool.java index 50e81074546..00913f50671 100644 --- a/src/main/java/org/elasticsearch/watcher/trigger/schedule/tool/CronEvalTool.java +++ b/src/main/java/org/elasticsearch/watcher/trigger/schedule/tool/CronEvalTool.java @@ -9,13 +9,12 @@ import org.elasticsearch.common.cli.CliTool; import org.elasticsearch.common.cli.CliToolConfig; import org.elasticsearch.common.cli.Terminal; import org.elasticsearch.common.cli.commons.CommandLine; +import org.elasticsearch.common.joda.time.DateTime; +import org.elasticsearch.common.joda.time.format.DateTimeFormat; +import org.elasticsearch.common.joda.time.format.DateTimeFormatter; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.env.Environment; -import org.quartz.CronExpression; - -import java.text.SimpleDateFormat; -import java.util.Date; -import java.util.Locale; +import org.elasticsearch.watcher.trigger.schedule.Cron; import static org.elasticsearch.common.cli.CliToolConfig.Builder.cmd; import static org.elasticsearch.common.cli.CliToolConfig.Builder.option; @@ -50,7 +49,7 @@ public class CronEvalTool extends CliTool { .options(option("c", "count").hasArg(false).required(false)) .build(); - private static final SimpleDateFormat format = new SimpleDateFormat("EEE, d MMM yyyy HH:mm:ss", Locale.ROOT); + private static final DateTimeFormatter formatter = DateTimeFormat.forPattern("EEE, d MMM yyyy HH:mm:ss"); final String expression; final int count; @@ -79,20 +78,27 @@ public class CronEvalTool extends CliTool { // when invalid, a parse expression will be thrown with a descriptive error message // the cli infra handles such exceptions and hows the exceptions' message - CronExpression.validateExpression(expression); + Cron.validate(expression); terminal.println("Valid!"); - Date date = new Date(); + DateTime date = DateTime.now(); - terminal.println("Now is [" + format.format(date) + "]"); + terminal.println("Now is [" + formatter.print(date) + "]"); terminal.println("Here are the next " + count + " times this cron expression will trigger:"); - CronExpression cron = new CronExpression(expression); + Cron cron = new Cron(expression); + long time = date.getMillis(); for (int i = 0; i < count; i++) { - date = cron.getNextValidTimeAfter(date); - terminal.println((i+1) + ".\t" + format.format(date)); + long prevTime = time; + time = cron.getNextValidTimeAfter(time); + if (time < 0) { + terminal.printError((i + 1) + ".\t Could not compute future times since [" + formatter.print(prevTime) + "] " + + "(perhaps the cron expression only points to times in the past?)"); + return ExitStatus.OK; + } + terminal.println((i+1) + ".\t" + formatter.print(time)); } return ExitStatus.OK; } diff --git a/src/test/java/org/elasticsearch/watcher/test/AbstractWatcherIntegrationTests.java b/src/test/java/org/elasticsearch/watcher/test/AbstractWatcherIntegrationTests.java index 20cfb37f75c..27bce5201aa 100644 --- a/src/test/java/org/elasticsearch/watcher/test/AbstractWatcherIntegrationTests.java +++ b/src/test/java/org/elasticsearch/watcher/test/AbstractWatcherIntegrationTests.java @@ -54,6 +54,7 @@ import org.elasticsearch.watcher.transport.actions.stats.WatcherStatsResponse; import org.elasticsearch.watcher.trigger.ScheduleTriggerEngineMock; import org.elasticsearch.watcher.trigger.TriggerService; import org.elasticsearch.watcher.trigger.schedule.Schedule; +import org.elasticsearch.watcher.trigger.schedule.ScheduleModule; import org.elasticsearch.watcher.trigger.schedule.ScheduleTrigger; import org.elasticsearch.watcher.trigger.schedule.Schedules; import org.elasticsearch.watcher.watch.Watch; @@ -86,17 +87,20 @@ public abstract class AbstractWatcherIntegrationTests extends ElasticsearchInteg boolean shieldEnabled = enableShield(); + final ScheduleModule.Engine scheduleEngine = randomFrom(ScheduleModule.Engine.values()); + @Override protected Settings nodeSettings(int nodeOrdinal) { - ImmutableSettings.Builder builder = ImmutableSettings.builder() + return ImmutableSettings.builder() .put(super.nodeSettings(nodeOrdinal)) .put("scroll.size", randomIntBetween(1, 100)) .put("plugin.types", (timeWarped() ? TimeWarpedWatcherPlugin.class.getName() : WatcherPlugin.class.getName()) + "," + (shieldEnabled ? ShieldPlugin.class.getName() + "," : "") + licensePluginClass().getName()) - .put(ShieldSettings.settings(shieldEnabled)); - return builder.build(); + .put(ShieldSettings.settings(shieldEnabled)) + .put("watcher.trigger.schedule.engine", scheduleEngine().name().toLowerCase(Locale.ROOT)) + .build(); } /** @@ -116,6 +120,13 @@ public abstract class AbstractWatcherIntegrationTests extends ElasticsearchInteg return shieldEnabled; } + /** + * @return The schedule trigger engine that will be used for the nodes. + */ + protected ScheduleModule.Engine scheduleEngine() { + return scheduleEngine; + } + /** * Override and returns {@code false} to force running without shield */ @@ -600,6 +611,8 @@ public abstract class AbstractWatcherIntegrationTests extends ElasticsearchInteg static class ShieldSettings { + static boolean auditLogsEnabled = SystemPropertyUtil.getBoolean("tests.audit_logs", false); + public static final String IP_FILTER = "allow: all\n"; public static final String USERS = @@ -640,7 +653,7 @@ public abstract class AbstractWatcherIntegrationTests extends ElasticsearchInteg .put("shield.authc.realms.esusers.files.users_roles", writeFile(folder, "users_roles", USER_ROLES)) .put("shield.authz.store.files.roles", writeFile(folder, "roles.yml", ROLES)) .put("shield.transport.n2n.ip_filter.file", writeFile(folder, "ip_filter.yml", IP_FILTER)) - .put("shield.audit.enabled", true) + .put("shield.audit.enabled", auditLogsEnabled) .build(); } diff --git a/src/test/java/org/elasticsearch/watcher/test/TimeWarpedWatcherPlugin.java b/src/test/java/org/elasticsearch/watcher/test/TimeWarpedWatcherPlugin.java index 21f7e0cf083..241be14a928 100644 --- a/src/test/java/org/elasticsearch/watcher/test/TimeWarpedWatcherPlugin.java +++ b/src/test/java/org/elasticsearch/watcher/test/TimeWarpedWatcherPlugin.java @@ -58,7 +58,7 @@ public class TimeWarpedWatcherPlugin extends WatcherPlugin { if (module instanceof TriggerModule) { // replacing scheduler module so we'll // have control on when it fires a job - modules.add(new MockTriggerModule()); + modules.add(new MockTriggerModule(settings)); } else if (module instanceof ClockModule) { // replacing the clock module so we'll be able @@ -79,6 +79,10 @@ public class TimeWarpedWatcherPlugin extends WatcherPlugin { public static class MockTriggerModule extends TriggerModule { + public MockTriggerModule(Settings settings) { + super(settings); + } + @Override protected void registerStandardEngines() { registerEngine(ScheduleTriggerEngineMock.class); diff --git a/src/test/java/org/elasticsearch/watcher/test/bench/ScheduleEngineBenchmark.java b/src/test/java/org/elasticsearch/watcher/test/bench/ScheduleEngineBenchmark.java new file mode 100644 index 00000000000..32f7d861395 --- /dev/null +++ b/src/test/java/org/elasticsearch/watcher/test/bench/ScheduleEngineBenchmark.java @@ -0,0 +1,81 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.watcher.test.bench; + +import org.elasticsearch.common.cli.Terminal; +import org.elasticsearch.common.logging.ESLogger; +import org.elasticsearch.common.logging.Loggers; +import org.elasticsearch.common.settings.ImmutableSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.watcher.support.clock.SystemClock; +import org.elasticsearch.watcher.trigger.Trigger; +import org.elasticsearch.watcher.trigger.TriggerEngine; +import org.elasticsearch.watcher.trigger.TriggerEvent; +import org.elasticsearch.watcher.trigger.schedule.*; +import org.elasticsearch.watcher.trigger.schedule.engine.SchedulerScheduleTriggerEngine; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +/** + * + */ +public class ScheduleEngineBenchmark { + + static final ESLogger logger = Loggers.getLogger(ScheduleEngineBenchmark.class); + + public static void main(String[] args) throws Exception { + Settings settings = ImmutableSettings.builder() + .put(SchedulerScheduleTriggerEngine.additionalSettings(ImmutableSettings.EMPTY)) + .put("name", "test") + .build(); + ThreadPool threadPool = new ThreadPool(settings, null); + + ScheduleRegistry scheduleRegistry = new ScheduleRegistry(Collections.emptyMap()); + + SchedulerScheduleTriggerEngine scheduler = new SchedulerScheduleTriggerEngine(ImmutableSettings.EMPTY, SystemClock.INSTANCE, scheduleRegistry, threadPool); + + List jobs = new ArrayList<>(10000); + for (int i = 0; i < 10000; i++) { + jobs.add(new SimpleJob("job_" + i, new CronSchedule("0/3 * * * * ?"))); + } + scheduler.start(jobs); + + scheduler.register(new TriggerEngine.Listener() { + @Override + public void triggered(String jobName, TriggerEvent event) { + ScheduleTriggerEvent e = (ScheduleTriggerEvent) event; + logger.info("triggered [{}] at fire_time [{}], scheduled time [{}], now [{}]", jobName, event.triggeredTime(), e.scheduledTime(), SystemClock.INSTANCE.now()); + } + }); + + Terminal.DEFAULT.readText("press enter to quit..."); + scheduler.stop(); + } + + static class SimpleJob implements TriggerEngine.Job { + + private final String name; + private final ScheduleTrigger trigger; + + public SimpleJob(String name, Schedule schedule) { + this.name = name; + this.trigger = new ScheduleTrigger(schedule); + } + + @Override + public String name() { + return name; + } + + @Override + public Trigger trigger() { + return trigger; + } + } +} diff --git a/src/test/java/org/elasticsearch/watcher/test/bench/WatcherBenchmark.java b/src/test/java/org/elasticsearch/watcher/test/bench/WatcherBenchmark.java index e0e22cb9640..d04eeb712c6 100644 --- a/src/test/java/org/elasticsearch/watcher/test/bench/WatcherBenchmark.java +++ b/src/test/java/org/elasticsearch/watcher/test/bench/WatcherBenchmark.java @@ -226,7 +226,7 @@ public class WatcherBenchmark { if (module instanceof TriggerModule) { // replacing scheduler module so we'll // have control on when it fires a job - modules.add(new MockTriggerModule()); + modules.add(new MockTriggerModule(settings)); } else { modules.add(module); @@ -237,6 +237,10 @@ public class WatcherBenchmark { public static class MockTriggerModule extends TriggerModule { + public MockTriggerModule(Settings settings) { + super(settings); + } + @Override protected void registerStandardEngines() { registerEngine(ScheduleTriggerEngineMock.class); diff --git a/src/test/java/org/elasticsearch/watcher/test/integration/BasicWatcherTests.java b/src/test/java/org/elasticsearch/watcher/test/integration/BasicWatcherTests.java index 6540cc79948..7d52439235f 100644 --- a/src/test/java/org/elasticsearch/watcher/test/integration/BasicWatcherTests.java +++ b/src/test/java/org/elasticsearch/watcher/test/integration/BasicWatcherTests.java @@ -5,6 +5,7 @@ */ package org.elasticsearch.watcher.test.integration; +import com.carrotsearch.randomizedtesting.annotations.Repeat; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.support.IndicesOptions; @@ -13,6 +14,7 @@ import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.script.ScriptService; import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.elasticsearch.test.junit.annotations.TestLogging; import org.elasticsearch.watcher.WatcherException; import org.elasticsearch.watcher.client.WatchSourceBuilder; import org.elasticsearch.watcher.client.WatcherClient; @@ -23,6 +25,7 @@ import org.elasticsearch.watcher.transport.actions.delete.DeleteWatchResponse; import org.elasticsearch.watcher.transport.actions.get.GetWatchResponse; import org.elasticsearch.watcher.transport.actions.put.PutWatchResponse; import org.elasticsearch.watcher.trigger.schedule.IntervalSchedule; +import org.elasticsearch.watcher.trigger.schedule.ScheduleModule; import org.elasticsearch.watcher.trigger.schedule.Schedules; import org.elasticsearch.watcher.watch.WatchStore; import org.junit.Test; @@ -47,8 +50,14 @@ import static org.hamcrest.Matchers.*; /** */ +@TestLogging("watcher.trigger.schedule:TRACE") public class BasicWatcherTests extends AbstractWatcherIntegrationTests { + @Override + protected ScheduleModule.Engine scheduleEngine() { + return ScheduleModule.Engine.HASHWHEEL; + } + @Test public void testIndexWatch() throws Exception { WatcherClient watcherClient = watcherClient(); @@ -239,7 +248,7 @@ public class BasicWatcherTests extends AbstractWatcherIntegrationTests { assertThat(count, equalTo(findNumberOfPerformedActions("_name"))); } - @Test + @Test //@Repeat(iterations = 10) public void testConditionSearchWithSource() throws Exception { String variable = randomFrom("ctx.execution_time", "ctx.trigger.scheduled_time", "ctx.trigger.triggered_time"); SearchSourceBuilder searchSourceBuilder = searchSource().query(filteredQuery( @@ -251,7 +260,7 @@ public class BasicWatcherTests extends AbstractWatcherIntegrationTests { testConditionSearch(newInputSearchRequest("events").source(searchSourceBuilder)); } - @Test + @Test @Repeat(iterations = 10) public void testConditionSearchWithIndexedTemplate() throws Exception { String variable = randomFrom("ctx.execution_time", "ctx.trigger.scheduled_time", "ctx.trigger.triggered_time"); SearchSourceBuilder searchSourceBuilder = searchSource().query(filteredQuery( @@ -298,6 +307,8 @@ public class BasicWatcherTests extends AbstractWatcherIntegrationTests { timeWarp().scheduler().trigger("_name1"); timeWarp().scheduler().trigger("_name2"); refresh(); + } else { + Thread.sleep(5000); } assertWatchWithMinimumPerformedActionsCount("_name1", 1); @@ -325,10 +336,13 @@ public class BasicWatcherTests extends AbstractWatcherIntegrationTests { String watchName = "_name"; assertAcked(prepareCreate("events").addMapping("event", "_timestamp", "enabled=true", "level", "type=string")); + watcherClient().preparePutWatch(watchName) .setSource(createWatchSource(interval("5s"), request, "return ctx.payload.hits.total >= 3")) .get(); + logger.info("created watch [{}] at [{}]", watchName, DateTime.now()); + client().prepareIndex("events", "event") .setCreate(true) .setSource("level", "a") @@ -337,7 +351,9 @@ public class BasicWatcherTests extends AbstractWatcherIntegrationTests { .setCreate(true) .setSource("level", "a") .get(); + refresh(); + if (timeWarped()) { timeWarp().clock().fastForwardSeconds(5); timeWarp().scheduler().trigger(watchName); diff --git a/src/test/java/org/elasticsearch/watcher/trigger/ScheduleTriggerEngineMock.java b/src/test/java/org/elasticsearch/watcher/trigger/ScheduleTriggerEngineMock.java index be286d16cb7..e92fc43f9aa 100644 --- a/src/test/java/org/elasticsearch/watcher/trigger/ScheduleTriggerEngineMock.java +++ b/src/test/java/org/elasticsearch/watcher/trigger/ScheduleTriggerEngineMock.java @@ -33,13 +33,11 @@ public class ScheduleTriggerEngineMock extends ScheduleTriggerEngine { private final ESLogger logger; private final ConcurrentMap jobs = new ConcurrentHashMap<>(); private final Clock clock; - private final ScheduleRegistry scheduleRegistry; @Inject public ScheduleTriggerEngineMock(Settings settings, ScheduleRegistry scheduleRegistry, Clock clock) { - super(settings); + super(settings, scheduleRegistry); this.logger = Loggers.getLogger(ScheduleTriggerEngineMock.class, settings); - this.scheduleRegistry = scheduleRegistry; this.clock = clock; } diff --git a/src/test/java/org/elasticsearch/watcher/trigger/schedule/CronScheduleTests.java b/src/test/java/org/elasticsearch/watcher/trigger/schedule/CronScheduleTests.java index b662c7e2f0f..5bdd33acfb8 100644 --- a/src/test/java/org/elasticsearch/watcher/trigger/schedule/CronScheduleTests.java +++ b/src/test/java/org/elasticsearch/watcher/trigger/schedule/CronScheduleTests.java @@ -34,7 +34,7 @@ public class CronScheduleTests extends ScheduleTestCase { parser.nextToken(); CronSchedule schedule = new CronSchedule.Parser().parse(parser); assertThat(schedule.crons(), arrayWithSize(1)); - assertThat(schedule.crons()[0], is("0 0/5 * * * ?")); + assertThat(schedule.crons()[0].expression(), is("0 0/5 * * * ?")); } @Test @@ -48,10 +48,11 @@ public class CronScheduleTests extends ScheduleTestCase { XContentParser parser = JsonXContent.jsonXContent.createParser(bytes); parser.nextToken(); CronSchedule schedule = new CronSchedule.Parser().parse(parser); - assertThat(schedule.crons(), arrayWithSize(3)); - assertThat(schedule.crons(), hasItemInArray("0 0/1 * * * ?")); - assertThat(schedule.crons(), hasItemInArray("0 0/2 * * * ?")); - assertThat(schedule.crons(), hasItemInArray("0 0/3 * * * ?")); + String[] crons = expressions(schedule); + assertThat(crons, arrayWithSize(3)); + assertThat(crons, hasItemInArray("0 0/1 * * * ?")); + assertThat(crons, hasItemInArray("0 0/2 * * * ?")); + assertThat(crons, hasItemInArray("0 0/3 * * * ?")); } @Test diff --git a/src/test/java/org/elasticsearch/watcher/trigger/schedule/DailyScheduleTests.java b/src/test/java/org/elasticsearch/watcher/trigger/schedule/DailyScheduleTests.java index b34bf605ce5..ced6b924be0 100644 --- a/src/test/java/org/elasticsearch/watcher/trigger/schedule/DailyScheduleTests.java +++ b/src/test/java/org/elasticsearch/watcher/trigger/schedule/DailyScheduleTests.java @@ -6,12 +6,12 @@ package org.elasticsearch.watcher.trigger.schedule; import com.carrotsearch.randomizedtesting.annotations.Repeat; -import org.elasticsearch.watcher.WatcherSettingsException; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.primitives.Ints; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.json.JsonXContent; +import org.elasticsearch.watcher.WatcherSettingsException; import org.elasticsearch.watcher.trigger.schedule.support.DayTimes; import org.junit.Test; @@ -26,7 +26,7 @@ public class DailyScheduleTests extends ScheduleTestCase { @Test public void test_Default() throws Exception { DailySchedule schedule = new DailySchedule(); - String[] crons = schedule.crons(); + String[] crons = expressions(schedule.crons()); assertThat(crons, arrayWithSize(1)); assertThat(crons, arrayContaining("0 0 0 * * ?")); } @@ -35,7 +35,7 @@ public class DailyScheduleTests extends ScheduleTestCase { public void test_SingleTime() throws Exception { DayTimes time = validDayTime(); DailySchedule schedule = new DailySchedule(time); - String[] crons = schedule.crons(); + String[] crons = expressions(schedule); assertThat(crons, arrayWithSize(1)); assertThat(crons, arrayContaining("0 " + Ints.join(",", time.minute()) + " " + Ints.join(",", time.hour()) + " * * ?")); } @@ -57,7 +57,7 @@ public class DailyScheduleTests extends ScheduleTestCase { public void test_MultipleTimes() throws Exception { DayTimes[] times = validDayTimes(); DailySchedule schedule = new DailySchedule(times); - String[] crons = schedule.crons(); + String[] crons = expressions(schedule); assertThat(crons, arrayWithSize(times.length)); for (DayTimes time : times) { assertThat(crons, hasItemInArray("0 " + Ints.join(",", time.minute()) + " " + Ints.join(",", time.hour()) + " * * ?")); diff --git a/src/test/java/org/elasticsearch/watcher/trigger/schedule/HourlyScheduleTests.java b/src/test/java/org/elasticsearch/watcher/trigger/schedule/HourlyScheduleTests.java index 3047b98a829..4a872496f09 100644 --- a/src/test/java/org/elasticsearch/watcher/trigger/schedule/HourlyScheduleTests.java +++ b/src/test/java/org/elasticsearch/watcher/trigger/schedule/HourlyScheduleTests.java @@ -6,13 +6,13 @@ package org.elasticsearch.watcher.trigger.schedule; import com.carrotsearch.randomizedtesting.annotations.Repeat; -import org.elasticsearch.watcher.WatcherSettingsException; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.collect.Collections2; import org.elasticsearch.common.primitives.Ints; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.json.JsonXContent; +import org.elasticsearch.watcher.WatcherSettingsException; import org.junit.Test; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; @@ -26,7 +26,7 @@ public class HourlyScheduleTests extends ScheduleTestCase { @Test public void test_Default() throws Exception { HourlySchedule schedule = new HourlySchedule(); - String[] crons = schedule.crons(); + String[] crons = expressions(schedule); assertThat(crons, arrayWithSize(1)); assertThat(crons, arrayContaining("0 0 * * * ?")); } @@ -35,7 +35,7 @@ public class HourlyScheduleTests extends ScheduleTestCase { public void test_SingleMinute() throws Exception { int minute = validMinute(); HourlySchedule schedule = new HourlySchedule(minute); - String[] crons = schedule.crons(); + String[] crons = expressions(schedule); assertThat(crons, arrayWithSize(1)); assertThat(crons, arrayContaining("0 " + minute + " * * * ?")); } @@ -50,7 +50,7 @@ public class HourlyScheduleTests extends ScheduleTestCase { int[] minutes = validMinutes(); String minutesStr = Ints.join(",", minutes); HourlySchedule schedule = new HourlySchedule(minutes); - String[] crons = schedule.crons(); + String[] crons = expressions(schedule); assertThat(crons, arrayWithSize(1)); assertThat(crons, arrayContaining("0 " + minutesStr + " * * * ?")); } diff --git a/src/test/java/org/elasticsearch/watcher/trigger/schedule/MonthlyScheduleTests.java b/src/test/java/org/elasticsearch/watcher/trigger/schedule/MonthlyScheduleTests.java index 6610877f9ea..4ed2aba45f8 100644 --- a/src/test/java/org/elasticsearch/watcher/trigger/schedule/MonthlyScheduleTests.java +++ b/src/test/java/org/elasticsearch/watcher/trigger/schedule/MonthlyScheduleTests.java @@ -6,12 +6,12 @@ package org.elasticsearch.watcher.trigger.schedule; import com.carrotsearch.randomizedtesting.annotations.Repeat; -import org.elasticsearch.watcher.WatcherSettingsException; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.primitives.Ints; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.json.JsonXContent; +import org.elasticsearch.watcher.WatcherSettingsException; import org.elasticsearch.watcher.trigger.schedule.support.DayTimes; import org.elasticsearch.watcher.trigger.schedule.support.MonthTimes; import org.junit.Test; @@ -27,7 +27,7 @@ public class MonthlyScheduleTests extends ScheduleTestCase { @Test public void test_Default() throws Exception { MonthlySchedule schedule = new MonthlySchedule(); - String[] crons = schedule.crons(); + String[] crons = expressions(schedule); assertThat(crons, arrayWithSize(1)); assertThat(crons, arrayContaining("0 0 0 1 * ?")); } @@ -36,7 +36,7 @@ public class MonthlyScheduleTests extends ScheduleTestCase { public void test_SingleTime() throws Exception { MonthTimes time = validMonthTime(); MonthlySchedule schedule = new MonthlySchedule(time); - String[] crons = schedule.crons(); + String[] crons = expressions(schedule); assertThat(crons, arrayWithSize(time.times().length)); for (DayTimes dayTimes : time.times()) { String minStr = Ints.join(",", dayTimes.minute()); @@ -51,7 +51,7 @@ public class MonthlyScheduleTests extends ScheduleTestCase { public void test_MultipleTimes() throws Exception { MonthTimes[] times = validMonthTimes(); MonthlySchedule schedule = new MonthlySchedule(times); - String[] crons = schedule.crons(); + String[] crons = expressions(schedule); int count = 0; for (int i = 0; i < times.length; i++) { count += times[i].times().length; diff --git a/src/test/java/org/elasticsearch/watcher/trigger/schedule/ScheduleTestCase.java b/src/test/java/org/elasticsearch/watcher/trigger/schedule/ScheduleTestCase.java index b805c1e6c79..5c26b37b496 100644 --- a/src/test/java/org/elasticsearch/watcher/trigger/schedule/ScheduleTestCase.java +++ b/src/test/java/org/elasticsearch/watcher/trigger/schedule/ScheduleTestCase.java @@ -23,6 +23,18 @@ import static org.elasticsearch.watcher.trigger.schedule.Schedules.*; */ public abstract class ScheduleTestCase extends ElasticsearchTestCase { + protected static String[] expressions(CronnableSchedule schedule) { + return expressions(schedule.crons); + } + + protected static String[] expressions(Cron[] crons) { + String[] expressions = new String[crons.length]; + for (int i = 0; i < expressions.length; i++) { + expressions[i] = crons[i].expression(); + } + return expressions; + } + protected static MonthlySchedule randomMonthlySchedule() { switch (randomIntBetween(1, 4)) { case 1: return monthly().build(); @@ -145,10 +157,13 @@ public abstract class ScheduleTestCase extends ElasticsearchTestCase { } protected static int[] randomDaysOfMonth() { + if (rarely()) { + return new int[] { 32 }; + } int count = randomIntBetween(1, 5); Set days = new HashSet<>(); for (int i = 0; i < count; i++) { - days.add(randomIntBetween(1, 32)); + days.add(randomIntBetween(1, 31)); } return Ints.toArray(days); } diff --git a/src/test/java/org/elasticsearch/watcher/trigger/schedule/WeeklyScheduleTests.java b/src/test/java/org/elasticsearch/watcher/trigger/schedule/WeeklyScheduleTests.java index 091ccefdfc8..c9a24ea4dbe 100644 --- a/src/test/java/org/elasticsearch/watcher/trigger/schedule/WeeklyScheduleTests.java +++ b/src/test/java/org/elasticsearch/watcher/trigger/schedule/WeeklyScheduleTests.java @@ -6,13 +6,13 @@ package org.elasticsearch.watcher.trigger.schedule; import com.carrotsearch.randomizedtesting.annotations.Repeat; -import org.elasticsearch.watcher.WatcherSettingsException; import org.elasticsearch.common.base.Joiner; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.primitives.Ints; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.json.JsonXContent; +import org.elasticsearch.watcher.WatcherSettingsException; import org.elasticsearch.watcher.trigger.schedule.support.DayOfWeek; import org.elasticsearch.watcher.trigger.schedule.support.DayTimes; import org.elasticsearch.watcher.trigger.schedule.support.WeekTimes; @@ -29,7 +29,7 @@ public class WeeklyScheduleTests extends ScheduleTestCase { @Test public void test_Default() throws Exception { WeeklySchedule schedule = new WeeklySchedule(); - String[] crons = schedule.crons(); + String[] crons = expressions(schedule); assertThat(crons, arrayWithSize(1)); assertThat(crons, arrayContaining("0 0 0 ? * MON")); } @@ -38,7 +38,7 @@ public class WeeklyScheduleTests extends ScheduleTestCase { public void test_SingleTime() throws Exception { WeekTimes time = validWeekTime(); WeeklySchedule schedule = new WeeklySchedule(time); - String[] crons = schedule.crons(); + String[] crons = expressions(schedule); assertThat(crons, arrayWithSize(time.times().length)); for (DayTimes dayTimes : time.times()) { assertThat(crons, hasItemInArray("0 " + Ints.join(",", dayTimes.minute()) + " " + Ints.join(",", dayTimes.hour()) + " ? * " + Joiner.on(",").join(time.days()))); @@ -49,7 +49,7 @@ public class WeeklyScheduleTests extends ScheduleTestCase { public void test_MultipleTimes() throws Exception { WeekTimes[] times = validWeekTimes(); WeeklySchedule schedule = new WeeklySchedule(times); - String[] crons = schedule.crons(); + String[] crons = expressions(schedule); int count = 0; for (int i = 0; i < times.length; i++) { count += times[i].times().length; diff --git a/src/test/java/org/elasticsearch/watcher/trigger/schedule/YearlyScheduleTests.java b/src/test/java/org/elasticsearch/watcher/trigger/schedule/YearlyScheduleTests.java index 93f8b0b139f..924e911c639 100644 --- a/src/test/java/org/elasticsearch/watcher/trigger/schedule/YearlyScheduleTests.java +++ b/src/test/java/org/elasticsearch/watcher/trigger/schedule/YearlyScheduleTests.java @@ -6,13 +6,13 @@ package org.elasticsearch.watcher.trigger.schedule; import com.carrotsearch.randomizedtesting.annotations.Repeat; -import org.elasticsearch.watcher.WatcherSettingsException; import org.elasticsearch.common.base.Joiner; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.primitives.Ints; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.json.JsonXContent; +import org.elasticsearch.watcher.WatcherSettingsException; import org.elasticsearch.watcher.trigger.schedule.support.DayTimes; import org.elasticsearch.watcher.trigger.schedule.support.YearTimes; import org.junit.Test; @@ -28,16 +28,16 @@ public class YearlyScheduleTests extends ScheduleTestCase { @Test public void test_Default() throws Exception { YearlySchedule schedule = new YearlySchedule(); - String[] crons = schedule.crons(); + String[] crons = expressions(schedule); assertThat(crons, arrayWithSize(1)); assertThat(crons, arrayContaining("0 0 0 1 JAN ?")); } - @Test @Repeat(iterations = 20) + @Test @Repeat(iterations = 120) public void test_SingleTime() throws Exception { YearTimes time = validYearTime(); YearlySchedule schedule = new YearlySchedule(time); - String[] crons = schedule.crons(); + String[] crons = expressions(schedule); assertThat(crons, arrayWithSize(time.times().length)); for (DayTimes dayTimes : time.times()) { String minStr = Ints.join(",", dayTimes.minute()); @@ -45,7 +45,9 @@ public class YearlyScheduleTests extends ScheduleTestCase { String dayStr = Ints.join(",", time.days()); dayStr = dayStr.replace("32", "L"); String monthStr = Joiner.on(",").join(time.months()); - assertThat(crons, hasItemInArray("0 " + minStr + " " + hrStr + " " + dayStr + " " + monthStr + " ?")); + String expression = "0 " + minStr + " " + hrStr + " " + dayStr + " " + monthStr + " ?"; + logger.info("expression: " + expression); + assertThat(crons, hasItemInArray(expression)); } } @@ -53,7 +55,7 @@ public class YearlyScheduleTests extends ScheduleTestCase { public void test_MultipleTimes() throws Exception { YearTimes[] times = validYearTimes(); YearlySchedule schedule = new YearlySchedule(times); - String[] crons = schedule.crons(); + String[] crons = expressions(schedule); int count = 0; for (int i = 0; i < times.length; i++) { count += times[i].times().length; diff --git a/src/test/java/org/elasticsearch/watcher/trigger/schedule/quartz/QuartzScheduleEngineTests.java b/src/test/java/org/elasticsearch/watcher/trigger/schedule/quartz/QuartzScheduleEngineTests.java index 746b396c2dd..3f11f5cf754 100644 --- a/src/test/java/org/elasticsearch/watcher/trigger/schedule/quartz/QuartzScheduleEngineTests.java +++ b/src/test/java/org/elasticsearch/watcher/trigger/schedule/quartz/QuartzScheduleEngineTests.java @@ -20,6 +20,7 @@ import org.elasticsearch.watcher.trigger.TriggerEvent; import org.elasticsearch.watcher.trigger.schedule.Schedule; import org.elasticsearch.watcher.trigger.schedule.ScheduleRegistry; import org.elasticsearch.watcher.trigger.schedule.ScheduleTrigger; +import org.elasticsearch.watcher.trigger.schedule.engine.QuartzScheduleTriggerEngine; import org.elasticsearch.watcher.trigger.schedule.support.DayOfWeek; import org.elasticsearch.watcher.trigger.schedule.support.WeekTimes; import org.junit.After; diff --git a/src/test/java/org/elasticsearch/watcher/watch/WatchTests.java b/src/test/java/org/elasticsearch/watcher/watch/WatchTests.java index 7fcf0597c90..193b27c2eb0 100644 --- a/src/test/java/org/elasticsearch/watcher/watch/WatchTests.java +++ b/src/test/java/org/elasticsearch/watcher/watch/WatchTests.java @@ -16,7 +16,6 @@ import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentFactory; -import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.test.ElasticsearchTestCase; import org.elasticsearch.watcher.actions.ActionFactory; import org.elasticsearch.watcher.actions.ActionRegistry; @@ -78,7 +77,6 @@ import org.elasticsearch.watcher.trigger.schedule.support.*; import org.junit.Before; import org.junit.Test; -import java.io.IOException; import java.util.Collection; import java.util.Map; @@ -323,11 +321,8 @@ public class WatchTests extends ElasticsearchTestCase { static class ParseOnlyScheduleTriggerEngine extends ScheduleTriggerEngine { - private final ScheduleRegistry registry; - public ParseOnlyScheduleTriggerEngine(Settings settings, ScheduleRegistry registry) { - super(settings); - this.registry = registry; + super(settings, registry); } @Override @@ -350,16 +345,5 @@ public class WatchTests extends ElasticsearchTestCase { public boolean remove(String jobName) { return false; } - - @Override - public ScheduleTrigger parseTrigger(String context, XContentParser parser) throws IOException { - Schedule schedule = registry.parse(context, parser); - return new ScheduleTrigger(schedule); - } - - @Override - public ScheduleTriggerEvent parseTriggerEvent(String context, XContentParser parser) throws IOException { - return null; - } } }