diff --git a/src/main/java/org/elasticsearch/watcher/WatcherModule.java b/src/main/java/org/elasticsearch/watcher/WatcherModule.java index e24c0888886..81c402de03e 100644 --- a/src/main/java/org/elasticsearch/watcher/WatcherModule.java +++ b/src/main/java/org/elasticsearch/watcher/WatcherModule.java @@ -33,7 +33,7 @@ import org.elasticsearch.watcher.watch.WatchModule; public class WatcherModule extends AbstractModule implements SpawnModules { - private final Settings settings; + protected final Settings settings; public WatcherModule(Settings settings) { this.settings = settings; @@ -51,7 +51,7 @@ public class WatcherModule extends AbstractModule implements SpawnModules { new WatcherClientModule(), new TransformModule(), new WatcherRestModule(), - new TriggerModule(), + new TriggerModule(settings), new WatcherTransportModule(), new ConditionModule(), new InputModule(), diff --git a/src/main/java/org/elasticsearch/watcher/WatcherPlugin.java b/src/main/java/org/elasticsearch/watcher/WatcherPlugin.java index b7af6dafcfe..56c3f63e563 100644 --- a/src/main/java/org/elasticsearch/watcher/WatcherPlugin.java +++ b/src/main/java/org/elasticsearch/watcher/WatcherPlugin.java @@ -6,16 +6,17 @@ package org.elasticsearch.watcher; import org.elasticsearch.client.Client; -import org.elasticsearch.common.settings.ImmutableSettings; -import org.elasticsearch.watcher.actions.email.service.InternalEmailService; -import org.elasticsearch.watcher.license.LicenseService; -import org.elasticsearch.watcher.support.init.InitializingService; import org.elasticsearch.common.collect.ImmutableList; import org.elasticsearch.common.component.LifecycleComponent; import org.elasticsearch.common.inject.Module; +import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.plugins.AbstractPlugin; +import org.elasticsearch.watcher.actions.email.service.InternalEmailService; +import org.elasticsearch.watcher.history.HistoryModule; +import org.elasticsearch.watcher.license.LicenseService; +import org.elasticsearch.watcher.support.init.InitializingService; +import org.elasticsearch.watcher.trigger.schedule.ScheduleModule; import java.util.Collection; @@ -24,7 +25,6 @@ import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilde public class WatcherPlugin extends AbstractPlugin { public static final String NAME = "watcher"; - public static final String SCHEDULER_THREAD_POOL_NAME = "watcher_scheduler"; private final Settings settings; private final boolean transportClient; @@ -68,15 +68,12 @@ public class WatcherPlugin extends AbstractPlugin { if (transportClient) { return ImmutableSettings.EMPTY; } - int availableProcessors = EsExecutors.boundedNumberOfProcessors(settings); - return settingsBuilder() - .put("threadpool." + SCHEDULER_THREAD_POOL_NAME + ".type", "fixed") - .put("threadpool." + SCHEDULER_THREAD_POOL_NAME + ".size", availableProcessors * 2) - .put("threadpool." + SCHEDULER_THREAD_POOL_NAME + ".queue_size", 1000) - .put("threadpool." + NAME + ".type", "fixed") - .put("threadpool." + NAME + ".size", availableProcessors * 5) - .put("threadpool." + NAME + ".queue_size", 1000) + Settings additionalSettings = settingsBuilder() + .put(ScheduleModule.additionalSettings(settings)) + .put(HistoryModule.additionalSettings(settings)) .build(); + + return additionalSettings; } } diff --git a/src/main/java/org/elasticsearch/watcher/execution/InternalWatchExecutor.java b/src/main/java/org/elasticsearch/watcher/execution/InternalWatchExecutor.java index 93729be32c5..7b017f17f38 100644 --- a/src/main/java/org/elasticsearch/watcher/execution/InternalWatchExecutor.java +++ b/src/main/java/org/elasticsearch/watcher/execution/InternalWatchExecutor.java @@ -5,10 +5,14 @@ */ package org.elasticsearch.watcher.execution; -import org.elasticsearch.watcher.WatcherPlugin; import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.settings.ImmutableSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.watcher.WatcherPlugin; +import org.elasticsearch.watcher.support.ThreadPoolSettingsBuilder; import java.util.concurrent.BlockingQueue; @@ -17,6 +21,22 @@ import java.util.concurrent.BlockingQueue; */ public class InternalWatchExecutor implements WatchExecutor { + public static final String THREAD_POOL_NAME = WatcherPlugin.NAME; + + public static Settings additionalSettings(Settings nodeSettings) { + Settings settings = nodeSettings.getAsSettings("threadpool." + THREAD_POOL_NAME); + if (!settings.names().isEmpty()) { + // the TP is already configured in the node settings + // no need for additional settings + return ImmutableSettings.EMPTY; + } + int availableProcessors = EsExecutors.boundedNumberOfProcessors(nodeSettings); + return new ThreadPoolSettingsBuilder.Fixed(THREAD_POOL_NAME) + .size(5 * availableProcessors) + .queueSize(1000) + .build(); + } + private final ThreadPool threadPool; @Inject @@ -40,6 +60,6 @@ public class InternalWatchExecutor implements WatchExecutor { } private EsThreadPoolExecutor executor() { - return (EsThreadPoolExecutor) threadPool.executor(WatcherPlugin.NAME); + return (EsThreadPoolExecutor) threadPool.executor(THREAD_POOL_NAME); } } \ No newline at end of file diff --git a/src/main/java/org/elasticsearch/watcher/history/HistoryModule.java b/src/main/java/org/elasticsearch/watcher/history/HistoryModule.java index bfdbe7714bb..31093c029d9 100644 --- a/src/main/java/org/elasticsearch/watcher/history/HistoryModule.java +++ b/src/main/java/org/elasticsearch/watcher/history/HistoryModule.java @@ -6,6 +6,8 @@ package org.elasticsearch.watcher.history; import org.elasticsearch.common.inject.AbstractModule; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.watcher.execution.InternalWatchExecutor; /** */ @@ -21,4 +23,8 @@ public class HistoryModule extends AbstractModule { bind(WatchRecord.Parser.class).asEagerSingleton(); bind(HistoryStore.class).asEagerSingleton(); } + + public static Settings additionalSettings(Settings nodeSettings) { + return InternalWatchExecutor.additionalSettings(nodeSettings); + } } diff --git a/src/main/java/org/elasticsearch/watcher/support/ThreadPoolSettingsBuilder.java b/src/main/java/org/elasticsearch/watcher/support/ThreadPoolSettingsBuilder.java new file mode 100644 index 00000000000..a21c5b3dd79 --- /dev/null +++ b/src/main/java/org/elasticsearch/watcher/support/ThreadPoolSettingsBuilder.java @@ -0,0 +1,63 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.watcher.support; + +import org.elasticsearch.common.settings.ImmutableSettings; +import org.elasticsearch.common.settings.Settings; + +/** + * + */ +public abstract class ThreadPoolSettingsBuilder { + + public static Same same(String name) { + return new Same(name); + } + + protected final String name; + private final ImmutableSettings.Builder builder = ImmutableSettings.builder(); + + protected ThreadPoolSettingsBuilder(String name, String type) { + this.name = name; + put("type", type); + } + + public Settings build() { + return builder.build(); + } + + protected B put(String setting, Object value) { + builder.put("threadpool." + name + "." + setting, value); + return (B) this; + } + + protected B put(String setting, int value) { + builder.put("threadpool." + name + "." + setting, value); + return (B) this; + } + + public static class Same extends ThreadPoolSettingsBuilder { + public Same(String name) { + super(name, "same"); + } + } + + public static class Fixed extends ThreadPoolSettingsBuilder { + + public Fixed(String name) { + super(name, "fixed"); + } + + public Fixed size(int size) { + return put("size", size); + } + + public Fixed queueSize(int queueSize) { + return put("queue_size", queueSize); + } + } + +} diff --git a/src/main/java/org/elasticsearch/watcher/trigger/TriggerEngine.java b/src/main/java/org/elasticsearch/watcher/trigger/TriggerEngine.java index 4fb6d9233f9..9bf4171976f 100644 --- a/src/main/java/org/elasticsearch/watcher/trigger/TriggerEngine.java +++ b/src/main/java/org/elasticsearch/watcher/trigger/TriggerEngine.java @@ -29,6 +29,12 @@ public interface TriggerEngine { void add(Job job); + /** + * Removes the job associated with the given name from this trigger engine. + * + * @param jobName The name of the job to remove + * @return {@code true} if the job existed and removed, {@code false} otherwise. + */ boolean remove(String jobName); T parseTrigger(String context, XContentParser parser) throws IOException; diff --git a/src/main/java/org/elasticsearch/watcher/trigger/TriggerModule.java b/src/main/java/org/elasticsearch/watcher/trigger/TriggerModule.java index 38d3189d1a2..5a46dfeb41b 100644 --- a/src/main/java/org/elasticsearch/watcher/trigger/TriggerModule.java +++ b/src/main/java/org/elasticsearch/watcher/trigger/TriggerModule.java @@ -10,6 +10,7 @@ import org.elasticsearch.common.inject.AbstractModule; import org.elasticsearch.common.inject.Module; import org.elasticsearch.common.inject.SpawnModules; import org.elasticsearch.common.inject.multibindings.Multibinder; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.watcher.trigger.manual.ManualTriggerEngine; import org.elasticsearch.watcher.trigger.schedule.ScheduleModule; @@ -21,9 +22,11 @@ import java.util.Set; */ public class TriggerModule extends AbstractModule implements SpawnModules { + private final Settings settings; private final Set> engines = new HashSet<>(); - public TriggerModule() { + public TriggerModule(Settings settings) { + this.settings = settings; registerStandardEngines(); } @@ -32,7 +35,7 @@ public class TriggerModule extends AbstractModule implements SpawnModules { } protected void registerStandardEngines() { - registerEngine(ScheduleModule.triggerEngineType()); + registerEngine(ScheduleModule.triggerEngineType(settings)); registerEngine(ManualTriggerEngine.class); } diff --git a/src/main/java/org/elasticsearch/watcher/trigger/TriggerService.java b/src/main/java/org/elasticsearch/watcher/trigger/TriggerService.java index 7aaa147bed1..c9476b47d99 100644 --- a/src/main/java/org/elasticsearch/watcher/trigger/TriggerService.java +++ b/src/main/java/org/elasticsearch/watcher/trigger/TriggerService.java @@ -54,6 +54,12 @@ public class TriggerService extends AbstractComponent { engines.get(job.trigger().type()).add(job); } + /** + * Removes the job associated with the given name from this trigger service. + * + * @param jobName The name of the job to remove + * @return {@code true} if the job existed and removed, {@code false} otherwise. + */ public boolean remove(String jobName) { for (TriggerEngine engine : engines.values()) { if (engine.remove(jobName)) { diff --git a/src/main/java/org/elasticsearch/watcher/trigger/schedule/Cron.java b/src/main/java/org/elasticsearch/watcher/trigger/schedule/Cron.java new file mode 100644 index 00000000000..a505eaa871a --- /dev/null +++ b/src/main/java/org/elasticsearch/watcher/trigger/schedule/Cron.java @@ -0,0 +1,1519 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.watcher.trigger.schedule; + +import org.elasticsearch.common.joda.time.DateTimeZone; +import org.elasticsearch.common.joda.time.format.DateTimeFormat; +import org.elasticsearch.common.joda.time.format.DateTimeFormatter; +import org.elasticsearch.watcher.trigger.TriggerException; + +import java.util.*; + + +/** + * + * THIS CLASS IS A COPY OF {@code CronExpression} + * FROM THE QUARTZ PROJECT + * + * + * Provides a parser and evaluator for unix-like cron expressions. Cron + * expressions provide the ability to specify complex time combinations such as + * "At 8:00am every Monday through Friday" or "At 1:30am every + * last Friday of the month". + *

+ * Cron expressions are comprised of 6 required fields and one optional field + * separated by white space. The fields respectively are described as follows: + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + *
Field Name Allowed Values Allowed Special Characters
Seconds  + * 0-59  + * , - * /
Minutes  + * 0-59  + * , - * /
Hours  + * 0-23  + * , - * /
Day-of-month  + * 1-31  + * , - * ? / L W
Month  + * 0-11 or JAN-DEC  + * , - * /
Day-of-Week  + * 1-7 or SUN-SAT  + * , - * ? / L #
Year (Optional)  + * empty, 1970-2199  + * , - * /
+ *

+ * The '*' character is used to specify all values. For example, "*" + * in the minute field means "every minute". + *

+ * The '?' character is allowed for the day-of-month and day-of-week fields. It + * is used to specify 'no specific value'. This is useful when you need to + * specify something in one of the two fields, but not the other. + *

+ * The '-' character is used to specify ranges For example "10-12" in + * the hour field means "the hours 10, 11 and 12". + *

+ * The ',' character is used to specify additional values. For example + * "MON,WED,FRI" in the day-of-week field means "the days Monday, + * Wednesday, and Friday". + *

+ * The '/' character is used to specify increments. For example "0/15" + * in the seconds field means "the seconds 0, 15, 30, and 45". And + * "5/15" in the seconds field means "the seconds 5, 20, 35, and + * 50". Specifying '*' before the '/' is equivalent to specifying 0 is + * the value to start with. Essentially, for each field in the expression, there + * is a set of numbers that can be turned on or off. For seconds and minutes, + * the numbers range from 0 to 59. For hours 0 to 23, for days of the month 0 to + * 31, and for months 0 to 11 (JAN to DEC). The "/" character simply helps you turn + * on every "nth" value in the given set. Thus "7/6" in the + * month field only turns on month "7", it does NOT mean every 6th + * month, please note that subtlety. + *

+ * The 'L' character is allowed for the day-of-month and day-of-week fields. + * This character is short-hand for "last", but it has different + * meaning in each of the two fields. For example, the value "L" in + * the day-of-month field means "the last day of the month" - day 31 + * for January, day 28 for February on non-leap years. If used in the + * day-of-week field by itself, it simply means "7" or + * "SAT". But if used in the day-of-week field after another value, it + * means "the last xxx day of the month" - for example "6L" + * means "the last friday of the month". You can also specify an offset + * from the last day of the month, such as "L-3" which would mean the third-to-last + * day of the calendar month. When using the 'L' option, it is important not to + * specify lists, or ranges of values, as you'll get confusing/unexpected results. + *

+ * The 'W' character is allowed for the day-of-month field. This character + * is used to specify the weekday (Monday-Friday) nearest the given day. As an + * example, if you were to specify "15W" as the value for the + * day-of-month field, the meaning is: "the nearest weekday to the 15th of + * the month". So if the 15th is a Saturday, the trigger will fire on + * Friday the 14th. If the 15th is a Sunday, the trigger will fire on Monday the + * 16th. If the 15th is a Tuesday, then it will fire on Tuesday the 15th. + * However if you specify "1W" as the value for day-of-month, and the + * 1st is a Saturday, the trigger will fire on Monday the 3rd, as it will not + * 'jump' over the boundary of a month's days. The 'W' character can only be + * specified when the day-of-month is a single day, not a range or list of days. + *

+ * The 'L' and 'W' characters can also be combined for the day-of-month + * expression to yield 'LW', which translates to "last weekday of the + * month". + *

+ * The '#' character is allowed for the day-of-week field. This character is + * used to specify "the nth" XXX day of the month. For example, the + * value of "6#3" in the day-of-week field means the third Friday of + * the month (day 6 = Friday and "#3" = the 3rd one in the month). + * Other examples: "2#1" = the first Monday of the month and + * "4#5" = the fifth Wednesday of the month. Note that if you specify + * "#5" and there is not 5 of the given day-of-week in the month, then + * no firing will occur that month. If the '#' character is used, there can + * only be one expression in the day-of-week field ("3#1,6#3" is + * not valid, since there are two expressions). + *

+ * + *

+ * The legal characters and the names of months and days of the week are not + * case sensitive. + * + *

+ * NOTES: + *

    + *
  • Support for specifying both a day-of-week and a day-of-month value is + * not complete (you'll need to use the '?' character in one of these fields). + *
  • + *
  • Overflowing ranges is supported - that is, having a larger number on + * the left hand side than the right. You might do 22-2 to catch 10 o'clock + * at night until 2 o'clock in the morning, or you might have NOV-FEB. It is + * very important to note that overuse of overflowing ranges creates ranges + * that don't make sense and no effort has been made to determine which + * interpretation CronExpression chooses. An example would be + * "0 0 14-6 ? * FRI-MON".
  • + *
+ *

+ * + * + * @author Sharada Jambula, James House + * @author Contributions from Mads Henderson + * @author Refactoring from CronTrigger to CronExpression by Aaron Craven + */ +public class Cron { + + private static final long serialVersionUID = 12423409423L; + + protected static final TimeZone UTC = DateTimeZone.UTC.toTimeZone(); + protected static final DateTimeFormatter formatter = DateTimeFormat.forPattern("YYYY-MM-dd'T'HH:mm:ss"); + + private static final int SECOND = 0; + private static final int MINUTE = 1; + private static final int HOUR = 2; + private static final int DAY_OF_MONTH = 3; + private static final int MONTH = 4; + private static final int DAY_OF_WEEK = 5; + private static final int YEAR = 6; + private static final int ALL_SPEC_INT = 99; // '*' + private static final int NO_SPEC_INT = 98; // '?' + private static final Integer ALL_SPEC = ALL_SPEC_INT; + private static final Integer NO_SPEC = NO_SPEC_INT; + + private static final Map monthMap = new HashMap<>(20); + private static final Map dayMap = new HashMap<>(60); + static { + monthMap.put("JAN", 0); + monthMap.put("FEB", 1); + monthMap.put("MAR", 2); + monthMap.put("APR", 3); + monthMap.put("MAY", 4); + monthMap.put("JUN", 5); + monthMap.put("JUL", 6); + monthMap.put("AUG", 7); + monthMap.put("SEP", 8); + monthMap.put("OCT", 9); + monthMap.put("NOV", 10); + monthMap.put("DEC", 11); + + dayMap.put("SUN", 1); + dayMap.put("MON", 2); + dayMap.put("TUE", 3); + dayMap.put("WED", 4); + dayMap.put("THU", 5); + dayMap.put("FRI", 6); + dayMap.put("SAT", 7); + } + + private final String expression; + + private transient TreeSet seconds; + private transient TreeSet minutes; + private transient TreeSet hours; + private transient TreeSet daysOfMonth; + private transient TreeSet months; + private transient TreeSet daysOfWeek; + private transient TreeSet years; + + private transient boolean lastdayOfWeek = false; + private transient int nthdayOfWeek = 0; + private transient boolean lastdayOfMonth = false; + private transient boolean nearestWeekday = false; + private transient int lastdayOffset = 0; + private transient boolean expressionParsed = false; + + public static final int MAX_YEAR = Calendar.getInstance(UTC, Locale.ROOT).get(Calendar.YEAR) + 100; + + /** + * Constructs a new CronExpression based on the specified + * parameter. + * + * @param expression String representation of the cron expression the + * new object should represent + * @throws java.text.ParseException + * if the string expression cannot be parsed into a valid + * CronExpression + */ + public Cron(String expression) { + assert expression != null : "cron expression cannot be null"; + this.expression = expression.toUpperCase(Locale.ROOT); + buildExpression(this.expression); + } + + /** + * Constructs a new {@code CronExpression} as a copy of an existing + * instance. + * + * @param cron The existing cron expression to be copied + */ + public Cron(Cron cron) { + this(cron.expression); + } + + /** + * Returns the next date/time after the given date/time which + * satisfies the cron expression. + * + * @param time the time since the epoch, or -1 if next time is unsupported (e.g. the cron expression points to + * a time that is previous to the given time) + * @return the next valid time (since the epoch) + */ + public long getNextValidTimeAfter(final long time) { + + // Computation is based on Gregorian year only. + Calendar cl = new java.util.GregorianCalendar(UTC, Locale.ROOT); + + // move ahead one second, since we're computing the time *after* the + // given time + final long afterTime = time + 1000; + // CronTrigger does not deal with milliseconds + cl.setTimeInMillis(afterTime); + cl.set(Calendar.MILLISECOND, 0); + + boolean gotOne = false; + // loop until we've computed the next time, or we've past the endTime + while (!gotOne) { + + if(cl.get(Calendar.YEAR) > 2999) { // prevent endless loop... + return -1; + } + + SortedSet st = null; + int t = 0; + + int sec = cl.get(Calendar.SECOND); + int min = cl.get(Calendar.MINUTE); + + // get second................................................. + st = seconds.tailSet(sec); + if (st != null && st.size() != 0) { + sec = st.first(); + } else { + sec = seconds.first(); + min++; + cl.set(Calendar.MINUTE, min); + } + cl.set(Calendar.SECOND, sec); + + min = cl.get(Calendar.MINUTE); + int hr = cl.get(Calendar.HOUR_OF_DAY); + t = -1; + + // get minute................................................. + st = minutes.tailSet(min); + if (st != null && st.size() != 0) { + t = min; + min = st.first(); + } else { + min = minutes.first(); + hr++; + } + if (min != t) { + cl.set(Calendar.SECOND, 0); + cl.set(Calendar.MINUTE, min); + setCalendarHour(cl, hr); + continue; + } + cl.set(Calendar.MINUTE, min); + + hr = cl.get(Calendar.HOUR_OF_DAY); + int day = cl.get(Calendar.DAY_OF_MONTH); + t = -1; + + // get hour................................................... + st = hours.tailSet(hr); + if (st != null && st.size() != 0) { + t = hr; + hr = st.first(); + } else { + hr = hours.first(); + day++; + } + if (hr != t) { + cl.set(Calendar.SECOND, 0); + cl.set(Calendar.MINUTE, 0); + cl.set(Calendar.DAY_OF_MONTH, day); + setCalendarHour(cl, hr); + continue; + } + cl.set(Calendar.HOUR_OF_DAY, hr); + + day = cl.get(Calendar.DAY_OF_MONTH); + int mon = cl.get(Calendar.MONTH) + 1; + // '+ 1' because calendar is 0-based for this field, and we are + // 1-based + t = -1; + int tmon = mon; + + // get day................................................... + boolean dayOfMSpec = !daysOfMonth.contains(NO_SPEC); + boolean dayOfWSpec = !daysOfWeek.contains(NO_SPEC); + if (dayOfMSpec && !dayOfWSpec) { // get day by day of month rule + st = daysOfMonth.tailSet(day); + if (lastdayOfMonth) { + if(!nearestWeekday) { + t = day; + day = getLastDayOfMonth(mon, cl.get(Calendar.YEAR)); + day -= lastdayOffset; + if(t > day) { + mon++; + if(mon > 12) { + mon = 1; + tmon = 3333; // ensure test of mon != tmon further below fails + cl.add(Calendar.YEAR, 1); + } + day = 1; + } + } else { + t = day; + day = getLastDayOfMonth(mon, cl.get(Calendar.YEAR)); + day -= lastdayOffset; + + Calendar tcal = Calendar.getInstance(UTC, Locale.ROOT); + tcal.set(Calendar.SECOND, 0); + tcal.set(Calendar.MINUTE, 0); + tcal.set(Calendar.HOUR_OF_DAY, 0); + tcal.set(Calendar.DAY_OF_MONTH, day); + tcal.set(Calendar.MONTH, mon - 1); + tcal.set(Calendar.YEAR, cl.get(Calendar.YEAR)); + + int ldom = getLastDayOfMonth(mon, cl.get(Calendar.YEAR)); + int dow = tcal.get(Calendar.DAY_OF_WEEK); + + if(dow == Calendar.SATURDAY && day == 1) { + day += 2; + } else if(dow == Calendar.SATURDAY) { + day -= 1; + } else if(dow == Calendar.SUNDAY && day == ldom) { + day -= 2; + } else if(dow == Calendar.SUNDAY) { + day += 1; + } + + tcal.set(Calendar.SECOND, sec); + tcal.set(Calendar.MINUTE, min); + tcal.set(Calendar.HOUR_OF_DAY, hr); + tcal.set(Calendar.DAY_OF_MONTH, day); + tcal.set(Calendar.MONTH, mon - 1); + long nTime = tcal.getTimeInMillis(); + if(nTime < afterTime) { + day = 1; + mon++; + } + } + } else if(nearestWeekday) { + t = day; + day = daysOfMonth.first(); + + Calendar tcal = Calendar.getInstance(UTC, Locale.ROOT); + tcal.set(Calendar.SECOND, 0); + tcal.set(Calendar.MINUTE, 0); + tcal.set(Calendar.HOUR_OF_DAY, 0); + tcal.set(Calendar.DAY_OF_MONTH, day); + tcal.set(Calendar.MONTH, mon - 1); + tcal.set(Calendar.YEAR, cl.get(Calendar.YEAR)); + + int ldom = getLastDayOfMonth(mon, cl.get(Calendar.YEAR)); + int dow = tcal.get(Calendar.DAY_OF_WEEK); + + if(dow == Calendar.SATURDAY && day == 1) { + day += 2; + } else if(dow == Calendar.SATURDAY) { + day -= 1; + } else if(dow == Calendar.SUNDAY && day == ldom) { + day -= 2; + } else if(dow == Calendar.SUNDAY) { + day += 1; + } + + + tcal.set(Calendar.SECOND, sec); + tcal.set(Calendar.MINUTE, min); + tcal.set(Calendar.HOUR_OF_DAY, hr); + tcal.set(Calendar.DAY_OF_MONTH, day); + tcal.set(Calendar.MONTH, mon - 1); + long nTime = tcal.getTimeInMillis(); + if(nTime < afterTime) { + day = daysOfMonth.first(); + mon++; + } + } else if (st != null && st.size() != 0) { + t = day; + day = st.first(); + // make sure we don't over-run a short month, such as february + int lastDay = getLastDayOfMonth(mon, cl.get(Calendar.YEAR)); + if (day > lastDay) { + day = daysOfMonth.first(); + mon++; + } + } else { + day = daysOfMonth.first(); + mon++; + } + + if (day != t || mon != tmon) { + cl.set(Calendar.SECOND, 0); + cl.set(Calendar.MINUTE, 0); + cl.set(Calendar.HOUR_OF_DAY, 0); + cl.set(Calendar.DAY_OF_MONTH, day); + cl.set(Calendar.MONTH, mon - 1); + // '- 1' because calendar is 0-based for this field, and we + // are 1-based + continue; + } + } else if (dayOfWSpec && !dayOfMSpec) { // get day by day of week rule + if (lastdayOfWeek) { // are we looking for the last XXX day of + // the month? + int dow = daysOfWeek.first(); // desired + // d-o-w + int cDow = cl.get(Calendar.DAY_OF_WEEK); // current d-o-w + int daysToAdd = 0; + if (cDow < dow) { + daysToAdd = dow - cDow; + } + if (cDow > dow) { + daysToAdd = dow + (7 - cDow); + } + + int lDay = getLastDayOfMonth(mon, cl.get(Calendar.YEAR)); + + if (day + daysToAdd > lDay) { // did we already miss the + // last one? + cl.set(Calendar.SECOND, 0); + cl.set(Calendar.MINUTE, 0); + cl.set(Calendar.HOUR_OF_DAY, 0); + cl.set(Calendar.DAY_OF_MONTH, 1); + cl.set(Calendar.MONTH, mon); + // no '- 1' here because we are promoting the month + continue; + } + + // find date of last occurrence of this day in this month... + while ((day + daysToAdd + 7) <= lDay) { + daysToAdd += 7; + } + + day += daysToAdd; + + if (daysToAdd > 0) { + cl.set(Calendar.SECOND, 0); + cl.set(Calendar.MINUTE, 0); + cl.set(Calendar.HOUR_OF_DAY, 0); + cl.set(Calendar.DAY_OF_MONTH, day); + cl.set(Calendar.MONTH, mon - 1); + // '- 1' here because we are not promoting the month + continue; + } + + } else if (nthdayOfWeek != 0) { + // are we looking for the Nth XXX day in the month? + int dow = daysOfWeek.first(); // desired + // d-o-w + int cDow = cl.get(Calendar.DAY_OF_WEEK); // current d-o-w + int daysToAdd = 0; + if (cDow < dow) { + daysToAdd = dow - cDow; + } else if (cDow > dow) { + daysToAdd = dow + (7 - cDow); + } + + boolean dayShifted = false; + if (daysToAdd > 0) { + dayShifted = true; + } + + day += daysToAdd; + int weekOfMonth = day / 7; + if (day % 7 > 0) { + weekOfMonth++; + } + + daysToAdd = (nthdayOfWeek - weekOfMonth) * 7; + day += daysToAdd; + if (daysToAdd < 0 + || day > getLastDayOfMonth(mon, cl + .get(Calendar.YEAR))) { + cl.set(Calendar.SECOND, 0); + cl.set(Calendar.MINUTE, 0); + cl.set(Calendar.HOUR_OF_DAY, 0); + cl.set(Calendar.DAY_OF_MONTH, 1); + cl.set(Calendar.MONTH, mon); + // no '- 1' here because we are promoting the month + continue; + } else if (daysToAdd > 0 || dayShifted) { + cl.set(Calendar.SECOND, 0); + cl.set(Calendar.MINUTE, 0); + cl.set(Calendar.HOUR_OF_DAY, 0); + cl.set(Calendar.DAY_OF_MONTH, day); + cl.set(Calendar.MONTH, mon - 1); + // '- 1' here because we are NOT promoting the month + continue; + } + } else { + int cDow = cl.get(Calendar.DAY_OF_WEEK); // current d-o-w + int dow = daysOfWeek.first(); // desired + // d-o-w + st = daysOfWeek.tailSet(cDow); + if (st != null && st.size() > 0) { + dow = st.first(); + } + + int daysToAdd = 0; + if (cDow < dow) { + daysToAdd = dow - cDow; + } + if (cDow > dow) { + daysToAdd = dow + (7 - cDow); + } + + int lDay = getLastDayOfMonth(mon, cl.get(Calendar.YEAR)); + + if (day + daysToAdd > lDay) { // will we pass the end of + // the month? + cl.set(Calendar.SECOND, 0); + cl.set(Calendar.MINUTE, 0); + cl.set(Calendar.HOUR_OF_DAY, 0); + cl.set(Calendar.DAY_OF_MONTH, 1); + cl.set(Calendar.MONTH, mon); + // no '- 1' here because we are promoting the month + continue; + } else if (daysToAdd > 0) { // are we swithing days? + cl.set(Calendar.SECOND, 0); + cl.set(Calendar.MINUTE, 0); + cl.set(Calendar.HOUR_OF_DAY, 0); + cl.set(Calendar.DAY_OF_MONTH, day + daysToAdd); + cl.set(Calendar.MONTH, mon - 1); + // '- 1' because calendar is 0-based for this field, + // and we are 1-based + continue; + } + } + } else { // dayOfWSpec && !dayOfMSpec + return -1; +// throw new UnsupportedOperationException( +// "Support for specifying both a day-of-week AND a day-of-month parameter is not implemented."); + } + cl.set(Calendar.DAY_OF_MONTH, day); + + mon = cl.get(Calendar.MONTH) + 1; + // '+ 1' because calendar is 0-based for this field, and we are + // 1-based + int year = cl.get(Calendar.YEAR); + t = -1; + + // test for expressions that never generate a valid fire date, + // but keep looping... + if (year > MAX_YEAR) { + return -1; +// throw new ElasticsearchIllegalArgumentException("given time is not supported by cron [" + formatter.print(time) + "]"); + } + + // get month................................................... + st = months.tailSet(mon); + if (st != null && st.size() != 0) { + t = mon; + mon = st.first(); + } else { + mon = months.first(); + year++; + } + if (mon != t) { + cl.set(Calendar.SECOND, 0); + cl.set(Calendar.MINUTE, 0); + cl.set(Calendar.HOUR_OF_DAY, 0); + cl.set(Calendar.DAY_OF_MONTH, 1); + cl.set(Calendar.MONTH, mon - 1); + // '- 1' because calendar is 0-based for this field, and we are + // 1-based + cl.set(Calendar.YEAR, year); + continue; + } + cl.set(Calendar.MONTH, mon - 1); + // '- 1' because calendar is 0-based for this field, and we are + // 1-based + + year = cl.get(Calendar.YEAR); + t = -1; + + // get year................................................... + st = years.tailSet(year); + if (st != null && st.size() != 0) { + t = year; + year = st.first(); + } else { + return -1; +// throw new ElasticsearchIllegalArgumentException("given time is not supported by cron [" + formatter.print(time) + "]"); + } + + if (year != t) { + cl.set(Calendar.SECOND, 0); + cl.set(Calendar.MINUTE, 0); + cl.set(Calendar.HOUR_OF_DAY, 0); + cl.set(Calendar.DAY_OF_MONTH, 1); + cl.set(Calendar.MONTH, 0); + // '- 1' because calendar is 0-based for this field, and we are + // 1-based + cl.set(Calendar.YEAR, year); + continue; + } + cl.set(Calendar.YEAR, year); + + gotOne = true; + } // while( !done ) + + return cl.getTimeInMillis(); + } + + public String expression() { + return expression; + } + + public String getExpressionSummary() { + StringBuilder buf = new StringBuilder(); + + buf.append("seconds: "); + buf.append(expressionSetSummary(seconds)); + buf.append("\n"); + buf.append("minutes: "); + buf.append(expressionSetSummary(minutes)); + buf.append("\n"); + buf.append("hours: "); + buf.append(expressionSetSummary(hours)); + buf.append("\n"); + buf.append("daysOfMonth: "); + buf.append(expressionSetSummary(daysOfMonth)); + buf.append("\n"); + buf.append("months: "); + buf.append(expressionSetSummary(months)); + buf.append("\n"); + buf.append("daysOfWeek: "); + buf.append(expressionSetSummary(daysOfWeek)); + buf.append("\n"); + buf.append("lastdayOfWeek: "); + buf.append(lastdayOfWeek); + buf.append("\n"); + buf.append("nearestWeekday: "); + buf.append(nearestWeekday); + buf.append("\n"); + buf.append("NthDayOfWeek: "); + buf.append(nthdayOfWeek); + buf.append("\n"); + buf.append("lastdayOfMonth: "); + buf.append(lastdayOfMonth); + buf.append("\n"); + buf.append("years: "); + buf.append(expressionSetSummary(years)); + buf.append("\n"); + + return buf.toString(); + } + + @Override + public int hashCode() { + return Objects.hash(expression); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null || getClass() != obj.getClass()) { + return false; + } + final Cron other = (Cron) obj; + return Objects.equals(this.expression, other.expression); + } + + /** + * Returns the string representation of the CronExpression + * + * @return a string representation of the CronExpression + */ + @Override + public String toString() { + return expression; + } + + /** + * Indicates whether the specified cron expression can be parsed into a + * valid cron expression + * + * @param expression the expression to evaluate + * @return a boolean indicating whether the given expression is a valid cron + * expression + */ + public static boolean isValid(String expression) { + try { + validate(expression); + } catch (ParseException pe) { + return false; + } + return true; + } + + public static void validate(String expression) throws ParseException { + new Cron(expression); + } + + + //////////////////////////////////////////////////////////////////////////// + // + // Expression Parsing Functions + // + //////////////////////////////////////////////////////////////////////////// + + private void buildExpression(String expression) { + expressionParsed = true; + + try { + + if (seconds == null) { + seconds = new TreeSet(); + } + if (minutes == null) { + minutes = new TreeSet(); + } + if (hours == null) { + hours = new TreeSet(); + } + if (daysOfMonth == null) { + daysOfMonth = new TreeSet(); + } + if (months == null) { + months = new TreeSet(); + } + if (daysOfWeek == null) { + daysOfWeek = new TreeSet(); + } + if (years == null) { + years = new TreeSet(); + } + + int exprOn = SECOND; + + StringTokenizer exprsTok = new StringTokenizer(expression, " \t", + false); + + while (exprsTok.hasMoreTokens() && exprOn <= YEAR) { + String expr = exprsTok.nextToken().trim(); + + // throw an exception if L is used with other days of the month + if(exprOn == DAY_OF_MONTH && expr.indexOf('L') != -1 && expr.length() > 1 && expr.contains(",")) { + throw new ParseException("Support for specifying 'L' and 'LW' with other days of the month is not implemented", -1); + } + // throw an exception if L is used with other days of the week + if(exprOn == DAY_OF_WEEK && expr.indexOf('L') != -1 && expr.length() > 1 && expr.contains(",")) { + throw new ParseException("Support for specifying 'L' with other days of the week is not implemented", -1); + } + if(exprOn == DAY_OF_WEEK && expr.indexOf('#') != -1 && expr.indexOf('#', expr.indexOf('#') +1) != -1) { + throw new ParseException("Support for specifying multiple \"nth\" days is not implemented.", -1); + } + + StringTokenizer vTok = new StringTokenizer(expr, ","); + while (vTok.hasMoreTokens()) { + String v = vTok.nextToken(); + storeExpressionVals(0, v, exprOn); + } + + exprOn++; + } + + if (exprOn <= DAY_OF_WEEK) { + throw new ParseException("Unexpected end of expression.", expression.length()); + } + + if (exprOn <= YEAR) { + storeExpressionVals(0, "*", YEAR); + } + + TreeSet dow = getSet(DAY_OF_WEEK); + TreeSet dom = getSet(DAY_OF_MONTH); + + // Copying the logic from the UnsupportedOperationException below + boolean dayOfMSpec = !dom.contains(NO_SPEC); + boolean dayOfWSpec = !dow.contains(NO_SPEC); + + if (!dayOfMSpec || dayOfWSpec) { + if (!dayOfWSpec || dayOfMSpec) { + throw new ParseException( + "Support for specifying both a day-of-week AND a day-of-month parameter is not implemented.", 0); + } + } + } catch (Exception e) { + throw new ParseException("Illegal cron expression format (" + e.toString() + ")", 0); + } + } + + private int storeExpressionVals(int pos, String s, int type) throws ParseException { + + int incr = 0; + int i = skipWhiteSpace(pos, s); + if (i >= s.length()) { + return i; + } + char c = s.charAt(i); + if ((c >= 'A') && (c <= 'Z') && (!s.equals("L")) && (!s.equals("LW")) && (!s.matches("^L-[0-9]*[W]?"))) { + String sub = s.substring(i, i + 3); + int sval = -1; + int eval = -1; + if (type == MONTH) { + sval = getMonthNumber(sub) + 1; + if (sval <= 0) { + throw new ParseException("Invalid Month value: '" + sub + "'", i); + } + if (s.length() > i + 3) { + c = s.charAt(i + 3); + if (c == '-') { + i += 4; + sub = s.substring(i, i + 3); + eval = getMonthNumber(sub) + 1; + if (eval <= 0) { + throw new ParseException("Invalid Month value: '" + sub + "'", i); + } + } + } + } else if (type == DAY_OF_WEEK) { + sval = getDayOfWeekNumber(sub); + if (sval < 0) { + throw new ParseException("Invalid Day-of-Week value: '" + + sub + "'", i); + } + if (s.length() > i + 3) { + c = s.charAt(i + 3); + if (c == '-') { + i += 4; + sub = s.substring(i, i + 3); + eval = getDayOfWeekNumber(sub); + if (eval < 0) { + throw new ParseException( + "Invalid Day-of-Week value: '" + sub + + "'", i); + } + } else if (c == '#') { + try { + i += 4; + nthdayOfWeek = Integer.parseInt(s.substring(i)); + if (nthdayOfWeek < 1 || nthdayOfWeek > 5) { + throw new Exception(); + } + } catch (Exception e) { + throw new ParseException( + "A numeric value between 1 and 5 must follow the '#' option", + i); + } + } else if (c == 'L') { + lastdayOfWeek = true; + i++; + } + } + + } else { + throw new ParseException( + "Illegal characters for this position: '" + sub + "'", + i); + } + if (eval != -1) { + incr = 1; + } + addToSet(sval, eval, incr, type); + return (i + 3); + } + + if (c == '?') { + i++; + if ((i + 1) < s.length() + && (s.charAt(i) != ' ' && s.charAt(i + 1) != '\t')) { + throw new ParseException("Illegal character after '?': " + + s.charAt(i), i); + } + if (type != DAY_OF_WEEK && type != DAY_OF_MONTH) { + throw new ParseException( + "'?' can only be specfied for Day-of-Month or Day-of-Week.", + i); + } + if (type == DAY_OF_WEEK && !lastdayOfMonth) { + int val = daysOfMonth.last(); + if (val == NO_SPEC_INT) { + throw new ParseException( + "'?' can only be specfied for Day-of-Month -OR- Day-of-Week.", + i); + } + } + + addToSet(NO_SPEC_INT, -1, 0, type); + return i; + } + + if (c == '*' || c == '/') { + if (c == '*' && (i + 1) >= s.length()) { + addToSet(ALL_SPEC_INT, -1, incr, type); + return i + 1; + } else if (c == '/' + && ((i + 1) >= s.length() || s.charAt(i + 1) == ' ' || s + .charAt(i + 1) == '\t')) { + throw new ParseException("'/' must be followed by an integer.", i); + } else if (c == '*') { + i++; + } + c = s.charAt(i); + if (c == '/') { // is an increment specified? + i++; + if (i >= s.length()) { + throw new ParseException("Unexpected end of string.", i); + } + + incr = getNumericValue(s, i); + + i++; + if (incr > 10) { + i++; + } + if (incr > 59 && (type == SECOND || type == MINUTE)) { + throw new ParseException("Increment > 60 : " + incr, i); + } else if (incr > 23 && (type == HOUR)) { + throw new ParseException("Increment > 24 : " + incr, i); + } else if (incr > 31 && (type == DAY_OF_MONTH)) { + throw new ParseException("Increment > 31 : " + incr, i); + } else if (incr > 7 && (type == DAY_OF_WEEK)) { + throw new ParseException("Increment > 7 : " + incr, i); + } else if (incr > 12 && (type == MONTH)) { + throw new ParseException("Increment > 12 : " + incr, i); + } + } else { + incr = 1; + } + + addToSet(ALL_SPEC_INT, -1, incr, type); + return i; + } else if (c == 'L') { + i++; + if (type == DAY_OF_MONTH) { + lastdayOfMonth = true; + } + if (type == DAY_OF_WEEK) { + addToSet(7, 7, 0, type); + } + if(type == DAY_OF_MONTH && s.length() > i) { + c = s.charAt(i); + if(c == '-') { + ValueSet vs = getValue(0, s, i+1); + lastdayOffset = vs.value; + if(lastdayOffset > 30) + throw new ParseException("Offset from last day must be <= 30", i+1); + i = vs.pos; + } + if(s.length() > i) { + c = s.charAt(i); + if(c == 'W') { + nearestWeekday = true; + i++; + } + } + } + return i; + } else if (c >= '0' && c <= '9') { + int val = Integer.parseInt(String.valueOf(c)); + i++; + if (i >= s.length()) { + addToSet(val, -1, -1, type); + } else { + c = s.charAt(i); + if (c >= '0' && c <= '9') { + ValueSet vs = getValue(val, s, i); + val = vs.value; + i = vs.pos; + } + i = checkNext(i, s, val, type); + return i; + } + } else { + throw new ParseException("Unexpected character: " + c, i); + } + + return i; + } + + private int checkNext(int pos, String s, int val, int type) throws ParseException { + + int end = -1; + int i = pos; + + if (i >= s.length()) { + addToSet(val, end, -1, type); + return i; + } + + char c = s.charAt(pos); + + if (c == 'L') { + if (type == DAY_OF_WEEK) { + if(val < 1 || val > 7) + throw new ParseException("Day-of-Week values must be between 1 and 7", -1); + lastdayOfWeek = true; + } else { + throw new ParseException("'L' option is not valid here. (pos=" + i + ")", i); + } + TreeSet set = getSet(type); + set.add(val); + i++; + return i; + } + + if (c == 'W') { + if (type == DAY_OF_MONTH) { + nearestWeekday = true; + } else { + throw new ParseException("'W' option is not valid here. (pos=" + i + ")", i); + } + if(val > 31) + throw new ParseException("The 'W' option does not make sense with values larger than 31 (max number of days in a month)", i); + TreeSet set = getSet(type); + set.add(val); + i++; + return i; + } + + if (c == '#') { + if (type != DAY_OF_WEEK) { + throw new ParseException("'#' option is not valid here. (pos=" + i + ")", i); + } + i++; + try { + nthdayOfWeek = Integer.parseInt(s.substring(i)); + if (nthdayOfWeek < 1 || nthdayOfWeek > 5) { + throw new Exception(); + } + } catch (Exception e) { + throw new ParseException( + "A numeric value between 1 and 5 must follow the '#' option", + i); + } + + TreeSet set = getSet(type); + set.add(val); + i++; + return i; + } + + if (c == '-') { + i++; + c = s.charAt(i); + int v = Integer.parseInt(String.valueOf(c)); + end = v; + i++; + if (i >= s.length()) { + addToSet(val, end, 1, type); + return i; + } + c = s.charAt(i); + if (c >= '0' && c <= '9') { + ValueSet vs = getValue(v, s, i); + end = vs.value; + i = vs.pos; + } + if (i < s.length() && ((c = s.charAt(i)) == '/')) { + i++; + c = s.charAt(i); + int v2 = Integer.parseInt(String.valueOf(c)); + i++; + if (i >= s.length()) { + addToSet(val, end, v2, type); + return i; + } + c = s.charAt(i); + if (c >= '0' && c <= '9') { + ValueSet vs = getValue(v2, s, i); + int v3 = vs.value; + addToSet(val, end, v3, type); + i = vs.pos; + return i; + } else { + addToSet(val, end, v2, type); + return i; + } + } else { + addToSet(val, end, 1, type); + return i; + } + } + + if (c == '/') { + i++; + c = s.charAt(i); + int v2 = Integer.parseInt(String.valueOf(c)); + i++; + if (i >= s.length()) { + addToSet(val, end, v2, type); + return i; + } + c = s.charAt(i); + if (c >= '0' && c <= '9') { + ValueSet vs = getValue(v2, s, i); + int v3 = vs.value; + addToSet(val, end, v3, type); + i = vs.pos; + return i; + } else { + throw new ParseException("Unexpected character '" + c + "' after '/'", i); + } + } + + addToSet(val, end, 0, type); + i++; + return i; + } + + private static String expressionSetSummary(java.util.Set set) { + + if (set.contains(NO_SPEC)) { + return "?"; + } + if (set.contains(ALL_SPEC)) { + return "*"; + } + + StringBuilder buf = new StringBuilder(); + + Iterator itr = set.iterator(); + boolean first = true; + while (itr.hasNext()) { + Integer iVal = itr.next(); + String val = iVal.toString(); + if (!first) { + buf.append(","); + } + buf.append(val); + first = false; + } + + return buf.toString(); + } + + private static String expressionSetSummary(java.util.ArrayList list) { + + if (list.contains(NO_SPEC)) { + return "?"; + } + if (list.contains(ALL_SPEC)) { + return "*"; + } + + StringBuilder buf = new StringBuilder(); + + Iterator itr = list.iterator(); + boolean first = true; + while (itr.hasNext()) { + Integer iVal = itr.next(); + String val = iVal.toString(); + if (!first) { + buf.append(","); + } + buf.append(val); + first = false; + } + + return buf.toString(); + } + + private static int skipWhiteSpace(int i, String s) { + for (; i < s.length() && (s.charAt(i) == ' ' || s.charAt(i) == '\t'); i++) { + ; + } + + return i; + } + + private static int findNextWhiteSpace(int i, String s) { + for (; i < s.length() && (s.charAt(i) != ' ' || s.charAt(i) != '\t'); i++) { + ; + } + + return i; + } + + private void addToSet(int val, int end, int incr, int type) throws ParseException { + + TreeSet set = getSet(type); + + if (type == SECOND || type == MINUTE) { + if ((val < 0 || val > 59 || end > 59) && (val != ALL_SPEC_INT)) { + throw new ParseException("Minute and Second values must be between 0 and 59", -1); + } + } else if (type == HOUR) { + if ((val < 0 || val > 23 || end > 23) && (val != ALL_SPEC_INT)) { + throw new ParseException("Hour values must be between 0 and 23", -1); + } + } else if (type == DAY_OF_MONTH) { + if ((val < 1 || val > 31 || end > 31) && (val != ALL_SPEC_INT) + && (val != NO_SPEC_INT)) { + throw new ParseException("Day of month values must be between 1 and 31", -1); + } + } else if (type == MONTH) { + if ((val < 1 || val > 12 || end > 12) && (val != ALL_SPEC_INT)) { + throw new ParseException("Month values must be between 1 and 12", -1); + } + } else if (type == DAY_OF_WEEK) { + if ((val == 0 || val > 7 || end > 7) && (val != ALL_SPEC_INT) + && (val != NO_SPEC_INT)) { + throw new ParseException("Day-of-Week values must be between 1 and 7", -1); + } + } + + if ((incr == 0 || incr == -1) && val != ALL_SPEC_INT) { + if (val != -1) { + set.add(val); + } else { + set.add(NO_SPEC); + } + + return; + } + + int startAt = val; + int stopAt = end; + + if (val == ALL_SPEC_INT && incr <= 0) { + incr = 1; + set.add(ALL_SPEC); // put in a marker, but also fill values + } + + if (type == SECOND || type == MINUTE) { + if (stopAt == -1) { + stopAt = 59; + } + if (startAt == -1 || startAt == ALL_SPEC_INT) { + startAt = 0; + } + } else if (type == HOUR) { + if (stopAt == -1) { + stopAt = 23; + } + if (startAt == -1 || startAt == ALL_SPEC_INT) { + startAt = 0; + } + } else if (type == DAY_OF_MONTH) { + if (stopAt == -1) { + stopAt = 31; + } + if (startAt == -1 || startAt == ALL_SPEC_INT) { + startAt = 1; + } + } else if (type == MONTH) { + if (stopAt == -1) { + stopAt = 12; + } + if (startAt == -1 || startAt == ALL_SPEC_INT) { + startAt = 1; + } + } else if (type == DAY_OF_WEEK) { + if (stopAt == -1) { + stopAt = 7; + } + if (startAt == -1 || startAt == ALL_SPEC_INT) { + startAt = 1; + } + } else if (type == YEAR) { + if (stopAt == -1) { + stopAt = MAX_YEAR; + } + if (startAt == -1 || startAt == ALL_SPEC_INT) { + startAt = 1970; + } + } + + // if the end of the range is before the start, then we need to overflow into + // the next day, month etc. This is done by adding the maximum amount for that + // type, and using modulus max to determine the value being added. + int max = -1; + if (stopAt < startAt) { + switch (type) { + case SECOND : max = 60; break; + case MINUTE : max = 60; break; + case HOUR : max = 24; break; + case MONTH : max = 12; break; + case DAY_OF_WEEK : max = 7; break; + case DAY_OF_MONTH : max = 31; break; + case YEAR : throw new IllegalArgumentException("Start year must be less than stop year"); + default : throw new IllegalArgumentException("Unexpected type encountered"); + } + stopAt += max; + } + + for (int i = startAt; i <= stopAt; i += incr) { + if (max == -1) { + // ie: there's no max to overflow over + set.add(i); + } else { + // take the modulus to get the real value + int i2 = i % max; + + // 1-indexed ranges should not include 0, and should include their max + if (i2 == 0 && (type == MONTH || type == DAY_OF_WEEK || type == DAY_OF_MONTH) ) { + i2 = max; + } + + set.add(i2); + } + } + } + + private TreeSet getSet(int type) { + switch (type) { + case SECOND: + return seconds; + case MINUTE: + return minutes; + case HOUR: + return hours; + case DAY_OF_MONTH: + return daysOfMonth; + case MONTH: + return months; + case DAY_OF_WEEK: + return daysOfWeek; + case YEAR: + return years; + default: + return null; + } + } + + private ValueSet getValue(int v, String s, int i) { + char c = s.charAt(i); + StringBuilder s1 = new StringBuilder(String.valueOf(v)); + while (c >= '0' && c <= '9') { + s1.append(c); + i++; + if (i >= s.length()) { + break; + } + c = s.charAt(i); + } + ValueSet val = new ValueSet(); + + val.pos = (i < s.length()) ? i : i + 1; + val.value = Integer.parseInt(s1.toString()); + return val; + } + + private int getNumericValue(String s, int i) { + int endOfVal = findNextWhiteSpace(i, s); + String val = s.substring(i, endOfVal); + return Integer.parseInt(val); + } + + private int getMonthNumber(String s) { + Integer integer = monthMap.get(s); + + if (integer == null) { + return -1; + } + + return integer; + } + + private int getDayOfWeekNumber(String s) { + Integer integer = dayMap.get(s); + + if (integer == null) { + return -1; + } + + return integer; + } + + /** + * Advance the calendar to the particular hour paying particular attention + * to daylight saving problems. + * + * @param cal the calendar to operate on + * @param hour the hour to set + */ + private static void setCalendarHour(Calendar cal, int hour) { + cal.set(java.util.Calendar.HOUR_OF_DAY, hour); + if (cal.get(java.util.Calendar.HOUR_OF_DAY) != hour && hour != 24) { + cal.set(java.util.Calendar.HOUR_OF_DAY, hour + 1); + } + } + + private static boolean isLeapYear(int year) { + return ((year % 4 == 0 && year % 100 != 0) || (year % 400 == 0)); + } + + private int getLastDayOfMonth(int monthNum, int year) { + + switch (monthNum) { + case 1: + return 31; + case 2: + return (isLeapYear(year)) ? 29 : 28; + case 3: + return 31; + case 4: + return 30; + case 5: + return 31; + case 6: + return 30; + case 7: + return 31; + case 8: + return 31; + case 9: + return 30; + case 10: + return 31; + case 11: + return 30; + case 12: + return 31; + default: + throw new IllegalArgumentException("Illegal month number: " + + monthNum); + } + } + + private static class ValueSet { + int value; + int pos; + } + + public static class ParseException extends TriggerException { + + private int errorOffset; + + public ParseException(String msg, int errorOffset) { + super(msg); + this.errorOffset = errorOffset; + } + + public ParseException(String msg, java.text.ParseException cause) { + super(msg, cause); + this.errorOffset = cause.getErrorOffset(); + } + } +} \ No newline at end of file diff --git a/src/main/java/org/elasticsearch/watcher/trigger/schedule/CronSchedule.java b/src/main/java/org/elasticsearch/watcher/trigger/schedule/CronSchedule.java index 4c0fc2bf5b3..733de519cf5 100644 --- a/src/main/java/org/elasticsearch/watcher/trigger/schedule/CronSchedule.java +++ b/src/main/java/org/elasticsearch/watcher/trigger/schedule/CronSchedule.java @@ -8,10 +8,8 @@ package org.elasticsearch.watcher.trigger.schedule; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.watcher.WatcherSettingsException; -import org.quartz.CronExpression; import java.io.IOException; -import java.text.ParseException; import java.util.ArrayList; import java.util.List; @@ -23,8 +21,7 @@ public class CronSchedule extends CronnableSchedule { public static final String TYPE = "cron"; public CronSchedule(String... crons) { - super(crons); - validate(crons); + super(validate(crons)); } @Override @@ -37,14 +34,15 @@ public class CronSchedule extends CronnableSchedule { return crons.length == 1 ? builder.value(crons[0]) : builder.value(crons); } - static void validate(String... crons) { + static String[] validate(String... crons) { for (String cron :crons) { try { - CronExpression.validateExpression(cron); - } catch (ParseException pe) { + Cron.validate(cron); + } catch (Cron.ParseException pe) { throw new ValidationException(cron, pe); } } + return crons; } public static class Parser implements Schedule.Parser { @@ -90,7 +88,7 @@ public class CronSchedule extends CronnableSchedule { private String expression; - public ValidationException(String expression, ParseException cause) { + public ValidationException(String expression, Cron.ParseException cause) { super("invalid cron expression [" + expression + "]. " + cause.getMessage()); this.expression = expression; } diff --git a/src/main/java/org/elasticsearch/watcher/trigger/schedule/CronnableSchedule.java b/src/main/java/org/elasticsearch/watcher/trigger/schedule/CronnableSchedule.java index 0efce2f37a9..c24ac74c3f8 100644 --- a/src/main/java/org/elasticsearch/watcher/trigger/schedule/CronnableSchedule.java +++ b/src/main/java/org/elasticsearch/watcher/trigger/schedule/CronnableSchedule.java @@ -6,6 +6,7 @@ package org.elasticsearch.watcher.trigger.schedule; import java.util.Arrays; +import java.util.Comparator; import java.util.Objects; /** @@ -13,14 +14,38 @@ import java.util.Objects; */ public abstract class CronnableSchedule implements Schedule { - protected final String[] crons; + private static final Comparator CRON_COMPARATOR = new Comparator() { + @Override + public int compare(Cron c1, Cron c2) { + return c1.expression().compareTo(c2.expression()); + } + }; - public CronnableSchedule(String... crons) { - this.crons = crons; - Arrays.sort(crons); + protected final Cron[] crons; + + public CronnableSchedule(String... expressions) { + this(crons(expressions)); } - public String[] crons() { + public CronnableSchedule(Cron... crons) { + assert crons.length > 0; + this.crons = crons; + Arrays.sort(crons, CRON_COMPARATOR); + } + + @Override + public long nextScheduledTimeAfter(long startTime, long time) { + if (time <= startTime) { + return startTime; + } + long nextTime = Long.MAX_VALUE; + for (Cron cron : crons) { + nextTime = Math.min(nextTime, cron.getNextValidTimeAfter(time)); + } + return nextTime; + } + + public Cron[] crons() { return crons; } @@ -40,4 +65,12 @@ public abstract class CronnableSchedule implements Schedule { final CronnableSchedule other = (CronnableSchedule) obj; return Objects.deepEquals(this.crons, other.crons); } + + static Cron[] crons(String... expressions) { + Cron[] crons = new Cron[expressions.length]; + for (int i = 0; i < crons.length; i++) { + crons[i] = new Cron(expressions[i]); + } + return crons; + } } diff --git a/src/main/java/org/elasticsearch/watcher/trigger/schedule/IntervalSchedule.java b/src/main/java/org/elasticsearch/watcher/trigger/schedule/IntervalSchedule.java index 11b8441d127..4507bf67408 100644 --- a/src/main/java/org/elasticsearch/watcher/trigger/schedule/IntervalSchedule.java +++ b/src/main/java/org/elasticsearch/watcher/trigger/schedule/IntervalSchedule.java @@ -32,6 +32,17 @@ public class IntervalSchedule implements Schedule { return TYPE; } + @Override + public long nextScheduledTimeAfter(long startTime, long time) { + if (time <= startTime) { + return startTime; + } + // advancing the time in 1 ns (we're looking for the time **after**) + time += 1; + long delta = time - startTime; + return startTime + (delta / interval.millis + 1) * interval.millis; + } + public Interval interval() { return interval; } @@ -132,10 +143,12 @@ public class IntervalSchedule implements Schedule { private final long duration; private final Unit unit; + private final long millis; // computed once public Interval(long duration, Unit unit) { this.duration = duration; this.unit = unit; + this.millis = unit.millis(duration); } public long seconds() { diff --git a/src/main/java/org/elasticsearch/watcher/trigger/schedule/Schedule.java b/src/main/java/org/elasticsearch/watcher/trigger/schedule/Schedule.java index d275c233086..80056291005 100644 --- a/src/main/java/org/elasticsearch/watcher/trigger/schedule/Schedule.java +++ b/src/main/java/org/elasticsearch/watcher/trigger/schedule/Schedule.java @@ -17,7 +17,24 @@ public interface Schedule extends ToXContent { String type(); - public static interface Parser { + /** + * Returns the next scheduled time after the given time, according to this schedule. If the given schedule + * cannot resolve the next scheduled time, then {@code -1} is returned. It really depends on the type of + * schedule to determine when {@code -1} is returned. Some schedules (e.g. IntervalSchedule) will never return + * {@code -1} as they can always compute the next scheduled time. {@code Cron} based schedules are good example + * of schedules that may return {@code -1}, for example, when the schedule only points to times that are all + * before the given time (in which case, there is no next scheduled time for the given time). + * + * Example: + * + * cron 0 0 0 * 1 ? 2013 (only points to days in January 2013) + * + * time 2015-01-01 12:00:00 (this time is in 2015) + * + */ + long nextScheduledTimeAfter(long startTime, long time); + + interface Parser { String type(); diff --git a/src/main/java/org/elasticsearch/watcher/trigger/schedule/ScheduleModule.java b/src/main/java/org/elasticsearch/watcher/trigger/schedule/ScheduleModule.java index 13b755e0258..f8863b91a23 100644 --- a/src/main/java/org/elasticsearch/watcher/trigger/schedule/ScheduleModule.java +++ b/src/main/java/org/elasticsearch/watcher/trigger/schedule/ScheduleModule.java @@ -7,10 +7,13 @@ package org.elasticsearch.watcher.trigger.schedule; import org.elasticsearch.common.inject.AbstractModule; import org.elasticsearch.common.inject.multibindings.MapBinder; +import org.elasticsearch.common.logging.Loggers; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.watcher.trigger.TriggerEngine; -import org.elasticsearch.watcher.trigger.schedule.quartz.QuartzScheduleTriggerEngine; +import org.elasticsearch.watcher.trigger.schedule.engine.*; import java.util.HashMap; +import java.util.Locale; import java.util.Map; /** @@ -30,8 +33,10 @@ public class ScheduleModule extends AbstractModule { registerScheduleParser(YearlySchedule.TYPE, YearlySchedule.Parser.class); } - public static Class triggerEngineType() { - return QuartzScheduleTriggerEngine.class; + public static Class triggerEngineType(Settings nodeSettings) { + Engine engine = Engine.resolve(nodeSettings); + Loggers.getLogger(ScheduleModule.class, nodeSettings).info("using [{}] schedule trigger engine", engine.name().toLowerCase(Locale.ROOT)); + return engine.engineType(); } public void registerScheduleParser(String parserType, Class parserClass) { @@ -50,4 +55,89 @@ public class ScheduleModule extends AbstractModule { bind(ScheduleRegistry.class).asEagerSingleton(); } + public static Settings additionalSettings(Settings nodeSettings) { + Engine engine = Engine.resolve(nodeSettings); + return engine.additionalSettings(nodeSettings); + } + + public enum Engine { + + SCHEDULER() { + @Override + protected Class engineType() { + return SchedulerScheduleTriggerEngine.class; + } + + @Override + protected Settings additionalSettings(Settings nodeSettings) { + return SchedulerScheduleTriggerEngine.additionalSettings(nodeSettings); + } + }, + + HASHWHEEL() { + @Override + protected Class engineType() { + return HashWheelScheduleTriggerEngine.class; + } + + @Override + protected Settings additionalSettings(Settings nodeSettings) { + return HashWheelScheduleTriggerEngine.additionalSettings(nodeSettings); + } + }, + + QUARTZ() { + @Override + protected Class engineType() { + return QuartzScheduleTriggerEngine.class; + } + + @Override + protected Settings additionalSettings(Settings nodeSettings) { + return QuartzScheduleTriggerEngine.additionalSettings(nodeSettings); + } + }, + + TIMER() { + @Override + protected Class engineType() { + return TimerTickerScheduleTriggerEngine.class; + } + + @Override + protected Settings additionalSettings(Settings nodeSettings) { + return TimerTickerScheduleTriggerEngine.additionalSettings(nodeSettings); + } + }, + + SIMPLE() { + @Override + protected Class engineType() { + return SimpleTickerScheduleTriggerEngine.class; + } + + @Override + protected Settings additionalSettings(Settings nodeSettings) { + return SimpleTickerScheduleTriggerEngine.additionalSettings(nodeSettings); + } + }; + + protected abstract Class engineType(); + + protected abstract Settings additionalSettings(Settings nodeSettings); + + public static Engine resolve(Settings settings) { + String engine = settings.getComponentSettings(ScheduleModule.class).get("engine", "scheduler"); + switch (engine.toLowerCase(Locale.ROOT)) { + case "quartz" : return QUARTZ; + case "timer" : return TIMER; + case "simple" : return SIMPLE; + case "hashwheel" : return HASHWHEEL; + case "scheduler" : return SCHEDULER; + default: + return SCHEDULER; + } + } + } + } diff --git a/src/main/java/org/elasticsearch/watcher/trigger/schedule/ScheduleTrigger.java b/src/main/java/org/elasticsearch/watcher/trigger/schedule/ScheduleTrigger.java index a6631a28dca..9d36c1fbc35 100644 --- a/src/main/java/org/elasticsearch/watcher/trigger/schedule/ScheduleTrigger.java +++ b/src/main/java/org/elasticsearch/watcher/trigger/schedule/ScheduleTrigger.java @@ -5,7 +5,6 @@ */ package org.elasticsearch.watcher.trigger.schedule; -import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.watcher.trigger.Trigger; diff --git a/src/main/java/org/elasticsearch/watcher/trigger/schedule/ScheduleTriggerEngine.java b/src/main/java/org/elasticsearch/watcher/trigger/schedule/ScheduleTriggerEngine.java index 26d1ceb4b0d..b4720d9be44 100644 --- a/src/main/java/org/elasticsearch/watcher/trigger/schedule/ScheduleTriggerEngine.java +++ b/src/main/java/org/elasticsearch/watcher/trigger/schedule/ScheduleTriggerEngine.java @@ -6,8 +6,11 @@ package org.elasticsearch.watcher.trigger.schedule; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.watcher.trigger.AbstractTriggerEngine; +import java.io.IOException; + /** * */ @@ -15,12 +18,26 @@ public abstract class ScheduleTriggerEngine extends AbstractTriggerEngine jobs) { + logger.debug("starting schedule engine..."); + + this.timer = new HashedWheelTimer(EsExecutors.daemonThreadFactory("trigger_engine_scheduler"), tickDuration.millis(), TimeUnit.MILLISECONDS, ticksPerWheel); + + // calibrate with round clock + while (clock.millis() % 1000 > 15) { + } + timer.start(); + + long starTime = clock.millis(); + List schedules = new ArrayList<>(); + for (Job job : jobs) { + if (job.trigger() instanceof ScheduleTrigger) { + ScheduleTrigger trigger = (ScheduleTrigger) job.trigger(); + schedules.add(new ActiveSchedule(job.name(), trigger.schedule(), starTime)); + } + } + this.schedules = new Schedules(schedules); + logger.debug("schedule engine started at [{}]", DateTime.now()); + } + + @Override + public void stop() { + logger.debug("stopping schedule engine..."); + timer.stop(); + executor.getQueue().clear(); + logger.debug("schedule engine stopped"); + } + + @Override + public void add(Job job) { + assert job.trigger() instanceof ScheduleTrigger; + ScheduleTrigger trigger = (ScheduleTrigger) job.trigger(); + ActiveSchedule schedule = new ActiveSchedule(job.name(), trigger.schedule(), clock.millis()); + schedules = schedules.add(schedule); + } + + @Override + public boolean remove(String jobName) { + Schedules newSchedules = schedules.remove(jobName); + if (newSchedules == null) { + return false; + } + schedules = newSchedules; + return true; + } + + void notifyListeners(String name, long triggeredTime, long scheduledTime) { + logger.trace("triggered job [{}] at [{}] (scheduled time was [{}])", name, new DateTime(triggeredTime), new DateTime(scheduledTime)); + final ScheduleTriggerEvent event = new ScheduleTriggerEvent(new DateTime(triggeredTime), new DateTime(scheduledTime)); + for (Listener listener : listeners) { + executor.execute(new ListenerRunnable(listener, name, event)); + } + } + + class ActiveSchedule implements TimerTask { + + private final String name; + private final Schedule schedule; + private final long startTime; + + private volatile Timeout timeout; + private volatile long scheduledTime; + + public ActiveSchedule(String name, Schedule schedule, long startTime) { + this.name = name; + this.schedule = schedule; + this.startTime = startTime; + // we don't want the schedule to trigger on the start time itself, so we compute + // the next scheduled time by simply computing the schedule time on the startTime + 1 + this.scheduledTime = schedule.nextScheduledTimeAfter(startTime, startTime + 1); + long delay = scheduledTime > 0 ? scheduledTime - clock.millis() : -1; + if (delay > 0) { + timeout = timer.newTimeout(this, delay, TimeUnit.MILLISECONDS); + } else { + timeout = null; + } + } + + @Override + public void run(Timeout timeout) { + long triggeredTime = clock.millis(); + notifyListeners(name, triggeredTime, scheduledTime); + scheduledTime = schedule.nextScheduledTimeAfter(startTime, triggeredTime); + long delay = scheduledTime > 0 ? scheduledTime - triggeredTime : -1; + if (delay > 0) { + this.timeout = timer.newTimeout(this, delay, TimeUnit.MILLISECONDS); + } + } + + public void cancel() { + if (timeout != null) { + timeout.cancel(); + } + } + } + + static class ListenerRunnable implements Runnable { + + private final Listener listener; + private final String jobName; + private final ScheduleTriggerEvent event; + + public ListenerRunnable(Listener listener, String jobName, ScheduleTriggerEvent event) { + this.listener = listener; + this.jobName = jobName; + this.event = event; + } + + @Override + public void run() { + listener.triggered(jobName, event); + } + } + + static class Schedules { + + private final ActiveSchedule[] schedules; + private final ImmutableMap scheduleByName; + + Schedules(Collection schedules) { + ImmutableMap.Builder builder = ImmutableMap.builder(); + this.schedules = new ActiveSchedule[schedules.size()]; + int i = 0; + for (ActiveSchedule schedule : schedules) { + builder.put(schedule.name, schedule); + this.schedules[i++] = schedule; + } + this.scheduleByName = builder.build(); + } + + public Schedules(ActiveSchedule[] schedules, ImmutableMap scheduleByName) { + this.schedules = schedules; + this.scheduleByName = scheduleByName; + } + + public long getNextScheduledTime() { + long time = -1; + for (ActiveSchedule schedule : schedules) { + if (time < 0) { + time = schedule.scheduledTime; + } else { + time = Math.min(time, schedule.scheduledTime); + } + } + return time; + } + + public Schedules add(ActiveSchedule schedule) { + boolean replacing = scheduleByName.containsKey(schedule.name); + if (!replacing) { + ActiveSchedule[] newSchedules = new ActiveSchedule[schedules.length + 1]; + System.arraycopy(schedules, 0, newSchedules, 0, schedules.length); + newSchedules[schedules.length] = schedule; + ImmutableMap newScheduleByName = ImmutableMap.builder() + .putAll(scheduleByName) + .put(schedule.name, schedule) + .build(); + return new Schedules(newSchedules, newScheduleByName); + } + ActiveSchedule[] newSchedules = new ActiveSchedule[schedules.length]; + ImmutableMap.Builder builder = ImmutableMap.builder(); + for (int i = 0; i < schedules.length; i++) { + ActiveSchedule sched = schedules[i].name.equals(schedule.name) ? schedule : schedules[i]; + schedules[i] = sched; + builder.put(sched.name, sched); + } + return new Schedules(newSchedules, builder.build()); + } + + public Schedules remove(String name) { + if (!scheduleByName.containsKey(name)) { + return null; + } + ImmutableMap.Builder builder = ImmutableMap.builder(); + ActiveSchedule[] newSchedules = new ActiveSchedule[schedules.length - 1]; + int i = 0; + for (ActiveSchedule schedule : schedules) { + if (!schedule.name.equals(name)) { + newSchedules[i++] = schedule; + builder.put(schedule.name, schedule); + } else { + schedule.cancel(); + } + } + return new Schedules(newSchedules, builder.build()); + } + } + +} diff --git a/src/main/java/org/elasticsearch/watcher/trigger/schedule/quartz/QuartzScheduleTriggerEngine.java b/src/main/java/org/elasticsearch/watcher/trigger/schedule/engine/QuartzScheduleTriggerEngine.java similarity index 74% rename from src/main/java/org/elasticsearch/watcher/trigger/schedule/quartz/QuartzScheduleTriggerEngine.java rename to src/main/java/org/elasticsearch/watcher/trigger/schedule/engine/QuartzScheduleTriggerEngine.java index bf83f33408d..2fc882e2a07 100644 --- a/src/main/java/org/elasticsearch/watcher/trigger/schedule/quartz/QuartzScheduleTriggerEngine.java +++ b/src/main/java/org/elasticsearch/watcher/trigger/schedule/engine/QuartzScheduleTriggerEngine.java @@ -3,17 +3,18 @@ * or more contributor license agreements. Licensed under the Elastic License; * you may not use this file except in compliance with the Elastic License. */ -package org.elasticsearch.watcher.trigger.schedule.quartz; +package org.elasticsearch.watcher.trigger.schedule.engine; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.joda.time.DateTime; import org.elasticsearch.common.joda.time.DateTimeZone; +import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor; -import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.watcher.WatcherPlugin; import org.elasticsearch.watcher.WatcherSettingsException; +import org.elasticsearch.watcher.support.ThreadPoolSettingsBuilder; import org.elasticsearch.watcher.support.clock.Clock; import org.elasticsearch.watcher.trigger.TriggerException; import org.elasticsearch.watcher.trigger.schedule.*; @@ -21,31 +22,39 @@ import org.quartz.*; import org.quartz.impl.StdSchedulerFactory; import org.quartz.simpl.SimpleJobFactory; -import java.io.IOException; import java.util.*; -import static org.elasticsearch.watcher.trigger.schedule.quartz.WatcherQuartzJob.jobDetail; - /** * */ public class QuartzScheduleTriggerEngine extends ScheduleTriggerEngine { - private final ScheduleRegistry scheduleRegistry; + public static final String THREAD_POOL_NAME = "watcher_scheduler"; - // Not happy about it, but otherwise we're stuck with Quartz's SimpleThreadPool - private volatile static ThreadPool threadPool; + public static Settings additionalSettings(Settings nodeSettings) { + Settings settings = nodeSettings.getAsSettings("threadpool." + THREAD_POOL_NAME); + if (!settings.names().isEmpty()) { + // scheduler TP is already configured in the node settings + // no need for additional settings + return ImmutableSettings.EMPTY; + } + int availableProcessors = EsExecutors.boundedNumberOfProcessors(settings); + return new ThreadPoolSettingsBuilder.Fixed(THREAD_POOL_NAME) + .size(availableProcessors) + .queueSize(1000) + .build(); + } private final Clock clock; private final DateTimeZone defaultTimeZone; + private final EsThreadPoolExecutor executor; private volatile org.quartz.Scheduler scheduler; @Inject public QuartzScheduleTriggerEngine(Settings settings, ScheduleRegistry scheduleRegistry, ThreadPool threadPool, Clock clock) { - super(settings); - this.scheduleRegistry = scheduleRegistry; - QuartzScheduleTriggerEngine.threadPool = threadPool; + super(settings, scheduleRegistry); + this.executor = (EsThreadPoolExecutor) threadPool.executor(THREAD_POOL_NAME); this.clock = clock; String timeZoneStr = componentSettings.get("time_zone", "UTC"); try { @@ -77,7 +86,7 @@ public class QuartzScheduleTriggerEngine extends ScheduleTriggerEngine { for (Job job : jobs) { if (job.trigger() instanceof ScheduleTrigger) { ScheduleTrigger trigger = (ScheduleTrigger) job.trigger(); - quartzJobs.put(jobDetail(job.name(), this), createTrigger(trigger.schedule(), defaultTimeZone, clock)); + quartzJobs.put(WatcherQuartzJob.jobDetail(job.name(), this), createTrigger(trigger.schedule(), defaultTimeZone, clock)); } } scheduler.scheduleJobs(quartzJobs, false); @@ -95,6 +104,7 @@ public class QuartzScheduleTriggerEngine extends ScheduleTriggerEngine { logger.info("Stopping scheduler..."); scheduler.shutdown(true); this.scheduler = null; + executor.getQueue().clear(); logger.info("Stopped scheduler"); } } catch (org.quartz.SchedulerException se){ @@ -108,7 +118,7 @@ public class QuartzScheduleTriggerEngine extends ScheduleTriggerEngine { ScheduleTrigger trigger = (ScheduleTrigger) job.trigger(); try { logger.trace("scheduling [{}] with schedule [{}]", job.name(), trigger.schedule()); - scheduler.scheduleJob(jobDetail(job.name(), this), createTrigger(trigger.schedule(), defaultTimeZone, clock), true); + scheduler.scheduleJob(WatcherQuartzJob.jobDetail(job.name(), this), createTrigger(trigger.schedule(), defaultTimeZone, clock), true); } catch (org.quartz.SchedulerException se) { logger.error("failed to schedule job",se); throw new TriggerException("failed to schedule job", se); @@ -128,9 +138,9 @@ public class QuartzScheduleTriggerEngine extends ScheduleTriggerEngine { static Set createTrigger(Schedule schedule, DateTimeZone timeZone, Clock clock) { HashSet triggers = new HashSet<>(); if (schedule instanceof CronnableSchedule) { - for (String cron : ((CronnableSchedule) schedule).crons()) { + for (Cron cron : ((CronnableSchedule) schedule).crons()) { triggers.add(TriggerBuilder.newTrigger() - .withSchedule(CronScheduleBuilder.cronSchedule(cron).inTimeZone(timeZone.toTimeZone())) + .withSchedule(CronScheduleBuilder.cronSchedule(cron.expression()).inTimeZone(timeZone.toTimeZone())) .startAt(clock.now().toDate()) .build()); } @@ -146,38 +156,36 @@ public class QuartzScheduleTriggerEngine extends ScheduleTriggerEngine { return triggers; } - - - @Override - public ScheduleTrigger parseTrigger(String context, XContentParser parser) throws IOException { - Schedule schedule = scheduleRegistry.parse(context, parser); - return new ScheduleTrigger(schedule); - } - - @Override - public ScheduleTriggerEvent parseTriggerEvent(String context, XContentParser parser) throws IOException { - return ScheduleTriggerEvent.parse(context, parser); - } - void notifyListeners(String name, JobExecutionContext ctx) { ScheduleTriggerEvent event = new ScheduleTriggerEvent(new DateTime(ctx.getFireTime()), new DateTime(ctx.getScheduledFireTime())); for (Listener listener : listeners) { - listener.triggered(name, event); + executor.execute(new ListenerRunnable(listener, name, event)); } } - // This Quartz thread pool will always accept. On this thread we will only index a watch record and add it to the work queue - public static final class WatcherQuartzThreadPool implements org.quartz.spi.ThreadPool { + static class ListenerRunnable implements Runnable { - private final EsThreadPoolExecutor executor; + private final Listener listener; + private final String jobName; + private final ScheduleTriggerEvent event; - public WatcherQuartzThreadPool() { - this.executor = (EsThreadPoolExecutor) threadPool.executor(WatcherPlugin.SCHEDULER_THREAD_POOL_NAME); + public ListenerRunnable(Listener listener, String jobName, ScheduleTriggerEvent event) { + this.listener = listener; + this.jobName = jobName; + this.event = event; } + @Override + public void run() { + listener.triggered(jobName, event); + } + } + + public static final class WatcherQuartzThreadPool implements org.quartz.spi.ThreadPool { + @Override public boolean runInThread(Runnable runnable) { - executor.execute(runnable); + runnable.run(); return true; } diff --git a/src/main/java/org/elasticsearch/watcher/trigger/schedule/engine/SchedulerScheduleTriggerEngine.java b/src/main/java/org/elasticsearch/watcher/trigger/schedule/engine/SchedulerScheduleTriggerEngine.java new file mode 100644 index 00000000000..2599fc1f440 --- /dev/null +++ b/src/main/java/org/elasticsearch/watcher/trigger/schedule/engine/SchedulerScheduleTriggerEngine.java @@ -0,0 +1,250 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.watcher.trigger.schedule.engine; + +import org.elasticsearch.common.collect.ImmutableMap; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.joda.time.DateTime; +import org.elasticsearch.common.settings.ImmutableSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.EsExecutors; +import org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.watcher.support.ThreadPoolSettingsBuilder; +import org.elasticsearch.watcher.support.clock.Clock; +import org.elasticsearch.watcher.trigger.schedule.*; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +/** + * + */ +public class SchedulerScheduleTriggerEngine extends ScheduleTriggerEngine { + + public static final String THREAD_POOL_NAME = "watcher_scheduler"; + + public static Settings additionalSettings(Settings nodeSettings) { + Settings settings = nodeSettings.getAsSettings("threadpool." + THREAD_POOL_NAME); + if (!settings.names().isEmpty()) { + // scheduler TP is already configured in the node settings + // no need for additional settings + return ImmutableSettings.EMPTY; + } + int availableProcessors = EsExecutors.boundedNumberOfProcessors(settings); + return new ThreadPoolSettingsBuilder.Fixed(THREAD_POOL_NAME) + .size(availableProcessors) + .queueSize(1000) + .build(); + } + + private final Clock clock; + private volatile Schedules schedules; + private ScheduledExecutorService scheduler; + private EsThreadPoolExecutor executor; + + @Inject + public SchedulerScheduleTriggerEngine(Settings settings, Clock clock, ScheduleRegistry scheduleRegistry, ThreadPool threadPool) { + super(settings, scheduleRegistry); + this.clock = clock; + this.executor = (EsThreadPoolExecutor) threadPool.executor(THREAD_POOL_NAME); + } + + @Override + public void start(Collection jobs) { + logger.debug("starting schedule engine..."); + this.scheduler = Executors.newScheduledThreadPool(1, EsExecutors.daemonThreadFactory("trigger_engine_scheduler")); + long starTime = clock.millis(); + List schedules = new ArrayList<>(); + for (Job job : jobs) { + if (job.trigger() instanceof ScheduleTrigger) { + ScheduleTrigger trigger = (ScheduleTrigger) job.trigger(); + schedules.add(new ActiveSchedule(job.name(), trigger.schedule(), starTime)); + } + } + this.schedules = new Schedules(schedules); + logger.debug("schedule engine started at [{}]", DateTime.now()); + } + + @Override + public void stop() { + logger.debug("stopping schedule engine..."); + scheduler.shutdownNow(); + try { + scheduler.awaitTermination(4, TimeUnit.SECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + executor.getQueue().clear(); + logger.debug("schedule engine stopped"); + } + + @Override + public void add(Job job) { + assert job.trigger() instanceof ScheduleTrigger; + ScheduleTrigger trigger = (ScheduleTrigger) job.trigger(); + ActiveSchedule schedule = new ActiveSchedule(job.name(), trigger.schedule(), clock.millis()); + schedules = schedules.add(schedule); + } + + @Override + public boolean remove(String jobName) { + Schedules newSchedules = schedules.remove(jobName); + if (newSchedules == null) { + return false; + } + schedules = newSchedules; + return true; + } + + void notifyListeners(String name, long triggeredTime, long scheduledTime) { + logger.trace("triggered job [{}] at [{}] (scheduled time was [{}])", name, new DateTime(triggeredTime), new DateTime(scheduledTime)); + final ScheduleTriggerEvent event = new ScheduleTriggerEvent(new DateTime(triggeredTime), new DateTime(scheduledTime)); + for (Listener listener : listeners) { + executor.execute(new ListenerRunnable(listener, name, event)); + } + } + + class ActiveSchedule implements Runnable { + + private final String name; + private final Schedule schedule; + private final long startTime; + + private volatile ScheduledFuture future; + private volatile long scheduledTime; + + public ActiveSchedule(String name, Schedule schedule, long startTime) { + this.name = name; + this.schedule = schedule; + this.startTime = startTime; + // we don't want the schedule to trigger on the start time itself, so we compute + // the next scheduled time by simply computing the schedule time on the startTime + 1 + this.scheduledTime = schedule.nextScheduledTimeAfter(startTime, startTime + 1); + long delay = scheduledTime > 0 ? scheduledTime - clock.millis() : -1; + if (delay > 0) { + future = scheduler.schedule(this, delay, TimeUnit.MILLISECONDS); + } else { + future = null; + } + } + + @Override + public void run() { + long triggeredTime = clock.millis(); + notifyListeners(name, triggeredTime, scheduledTime); + scheduledTime = schedule.nextScheduledTimeAfter(startTime, triggeredTime); + long delay = scheduledTime > 0 ? scheduledTime - triggeredTime : -1; + if (delay > 0) { + future = scheduler.schedule(this, delay, TimeUnit.MILLISECONDS); + } + } + + public void cancel() { + if (future != null) { + future.cancel(true); + } + } + } + + static class ListenerRunnable implements Runnable { + + private final Listener listener; + private final String jobName; + private final ScheduleTriggerEvent event; + + public ListenerRunnable(Listener listener, String jobName, ScheduleTriggerEvent event) { + this.listener = listener; + this.jobName = jobName; + this.event = event; + } + + @Override + public void run() { + listener.triggered(jobName, event); + } + } + + static class Schedules { + + private final ActiveSchedule[] schedules; + private final ImmutableMap scheduleByName; + + Schedules(Collection schedules) { + ImmutableMap.Builder builder = ImmutableMap.builder(); + this.schedules = new ActiveSchedule[schedules.size()]; + int i = 0; + for (ActiveSchedule schedule : schedules) { + builder.put(schedule.name, schedule); + this.schedules[i++] = schedule; + } + this.scheduleByName = builder.build(); + } + + public Schedules(ActiveSchedule[] schedules, ImmutableMap scheduleByName) { + this.schedules = schedules; + this.scheduleByName = scheduleByName; + } + + public long getNextScheduledTime() { + long time = -1; + for (ActiveSchedule schedule : schedules) { + if (time < 0) { + time = schedule.scheduledTime; + } else { + time = Math.min(time, schedule.scheduledTime); + } + } + return time; + } + + public Schedules add(ActiveSchedule schedule) { + boolean replacing = scheduleByName.containsKey(schedule.name); + if (!replacing) { + ActiveSchedule[] newSchedules = new ActiveSchedule[schedules.length + 1]; + System.arraycopy(schedules, 0, newSchedules, 0, schedules.length); + newSchedules[schedules.length] = schedule; + ImmutableMap newScheduleByName = ImmutableMap.builder() + .putAll(scheduleByName) + .put(schedule.name, schedule) + .build(); + return new Schedules(newSchedules, newScheduleByName); + } + ActiveSchedule[] newSchedules = new ActiveSchedule[schedules.length]; + ImmutableMap.Builder builder = ImmutableMap.builder(); + for (int i = 0; i < schedules.length; i++) { + ActiveSchedule sched = schedules[i].name.equals(schedule.name) ? schedule : schedules[i]; + schedules[i] = sched; + builder.put(sched.name, sched); + } + return new Schedules(newSchedules, builder.build()); + } + + public Schedules remove(String name) { + if (!scheduleByName.containsKey(name)) { + return null; + } + ImmutableMap.Builder builder = ImmutableMap.builder(); + ActiveSchedule[] newSchedules = new ActiveSchedule[schedules.length - 1]; + int i = 0; + for (ActiveSchedule schedule : schedules) { + if (!schedule.name.equals(name)) { + newSchedules[i++] = schedule; + builder.put(schedule.name, schedule); + } else { + schedule.cancel(); + } + } + return new Schedules(newSchedules, builder.build()); + } + } + +} diff --git a/src/main/java/org/elasticsearch/watcher/trigger/schedule/engine/SimpleTickerScheduleTriggerEngine.java b/src/main/java/org/elasticsearch/watcher/trigger/schedule/engine/SimpleTickerScheduleTriggerEngine.java new file mode 100644 index 00000000000..b182a117b85 --- /dev/null +++ b/src/main/java/org/elasticsearch/watcher/trigger/schedule/engine/SimpleTickerScheduleTriggerEngine.java @@ -0,0 +1,198 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.watcher.trigger.schedule.engine; + +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.joda.time.DateTime; +import org.elasticsearch.common.settings.ImmutableSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.EsExecutors; +import org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.watcher.support.ThreadPoolSettingsBuilder; +import org.elasticsearch.watcher.support.clock.Clock; +import org.elasticsearch.watcher.trigger.schedule.*; + +import java.util.Collection; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; + +/** + * + */ +public class SimpleTickerScheduleTriggerEngine extends ScheduleTriggerEngine { + + public static final String THREAD_POOL_NAME = "watcher_scheduler"; + + public static Settings additionalSettings(Settings nodeSettings) { + Settings settings = nodeSettings.getAsSettings("threadpool." + THREAD_POOL_NAME); + if (!settings.names().isEmpty()) { + // scheduler TP is already configured in the node settings + // no need for additional settings + return ImmutableSettings.EMPTY; + } + int availableProcessors = EsExecutors.boundedNumberOfProcessors(settings); + return new ThreadPoolSettingsBuilder.Fixed(THREAD_POOL_NAME) + .size(availableProcessors) + .queueSize(1000) + .build(); + } + + private final Clock clock; + + private volatile Map schedules; + private Ticker ticker; + private EsThreadPoolExecutor executor; + + @Inject + public SimpleTickerScheduleTriggerEngine(Settings settings, Clock clock, ScheduleRegistry scheduleRegistry, ThreadPool threadPool) { + super(settings, scheduleRegistry); + this.schedules = new ConcurrentHashMap<>(); + this.clock = clock; + this.executor = (EsThreadPoolExecutor) threadPool.executor(THREAD_POOL_NAME); + } + + @Override + public void start(Collection jobs) { + long starTime = clock.millis(); + Map schedules = new ConcurrentHashMap<>(); + for (Job job : jobs) { + if (job.trigger() instanceof ScheduleTrigger) { + ScheduleTrigger trigger = (ScheduleTrigger) job.trigger(); + schedules.put(job.name(), new ActiveSchedule(job.name(), trigger.schedule(), starTime)); + } + } + this.schedules = schedules; + this.ticker = new Ticker(); + } + + @Override + public void stop() { + ticker.close(); + executor.getQueue().clear(); + } + + @Override + public void add(Job job) { + assert job.trigger() instanceof ScheduleTrigger; + ScheduleTrigger trigger = (ScheduleTrigger) job.trigger(); + schedules.put(job.name(), new ActiveSchedule(job.name(), trigger.schedule(), clock.millis())); + } + + @Override + public boolean remove(String jobName) { + return schedules.remove(jobName) != null; + } + + void checkJobs() { + long triggeredTime = clock.millis(); + for (ActiveSchedule schedule : schedules.values()) { + long scheduledTime = schedule.check(triggeredTime); + if (scheduledTime > 0) { + notifyListeners(schedule.name, triggeredTime, scheduledTime); + } + } + } + + void notifyListeners(String name, long triggeredTime, long scheduledTime) { + logger.trace("triggered job [{}] at [{}] (scheduled time was [{}])", name, new DateTime(triggeredTime), new DateTime(scheduledTime)); + final ScheduleTriggerEvent event = new ScheduleTriggerEvent(new DateTime(triggeredTime), new DateTime(scheduledTime)); + for (Listener listener : listeners) { + executor.execute(new ListenerRunnable(listener, name, event)); + } + } + + static class ActiveSchedule { + + private final String name; + private final Schedule schedule; + private final long startTime; + + private volatile long scheduledTime; + + public ActiveSchedule(String name, Schedule schedule, long startTime) { + this.name = name; + this.schedule = schedule; + this.startTime = startTime; + // we don't want the schedule to trigger on the start time itself, so we compute + // the next scheduled time by simply computing the schedule time on the startTime + 1 + this.scheduledTime = schedule.nextScheduledTimeAfter(startTime, startTime + 1); + } + + /** + * Checks whether the given time is the same or after the scheduled time of this schedule. If so, the scheduled time is + * returned a new scheduled time is computed and set. Otherwise (the given time is before the scheduled time), {@code -1} + * is returned. + */ + public long check(long time) { + if (time < scheduledTime) { + return -1; + } + long prevScheduledTime = scheduledTime == 0 ? time : scheduledTime; + scheduledTime = schedule.nextScheduledTimeAfter(startTime, scheduledTime); + return prevScheduledTime; + } + } + + static class ListenerRunnable implements Runnable { + + private final Listener listener; + private final String jobName; + private final ScheduleTriggerEvent event; + + public ListenerRunnable(Listener listener, String jobName, ScheduleTriggerEvent event) { + this.listener = listener; + this.jobName = jobName; + this.event = event; + } + + @Override + public void run() { + listener.triggered(jobName, event); + } + } + + class Ticker extends Thread { + + private volatile boolean active = true; + private final CountDownLatch closeLatch = new CountDownLatch(1); + + public Ticker() { + super("ticker-schedule-trigger-engine"); + setDaemon(true); + start(); + } + + @Override + public void run() { + + // calibrate with round clock + while (clock.millis() % 1000 > 15) { + } + while (active) { + logger.trace("checking jobs [{}]", DateTime.now()); + checkJobs(); + try { + sleep(1000); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + closeLatch.countDown(); + } + + public void close() { + active = false; + try { + closeLatch.await(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + } + +} diff --git a/src/main/java/org/elasticsearch/watcher/trigger/schedule/engine/TimerTickerScheduleTriggerEngine.java b/src/main/java/org/elasticsearch/watcher/trigger/schedule/engine/TimerTickerScheduleTriggerEngine.java new file mode 100644 index 00000000000..439fe585029 --- /dev/null +++ b/src/main/java/org/elasticsearch/watcher/trigger/schedule/engine/TimerTickerScheduleTriggerEngine.java @@ -0,0 +1,168 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.watcher.trigger.schedule.engine; + +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.joda.time.DateTime; +import org.elasticsearch.common.settings.ImmutableSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.EsExecutors; +import org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.watcher.support.ThreadPoolSettingsBuilder; +import org.elasticsearch.watcher.support.clock.Clock; +import org.elasticsearch.watcher.trigger.schedule.*; + +import java.util.Collection; +import java.util.Map; +import java.util.Timer; +import java.util.TimerTask; +import java.util.concurrent.ConcurrentHashMap; + +/** + * + */ +public class TimerTickerScheduleTriggerEngine extends ScheduleTriggerEngine { + + public static final String THREAD_POOL_NAME = "watcher_scheduler"; + + public static Settings additionalSettings(Settings nodeSettings) { + Settings settings = nodeSettings.getAsSettings("threadpool." + THREAD_POOL_NAME); + if (!settings.names().isEmpty()) { + // scheduler TP is already configured in the node settings + // no need for additional settings + return ImmutableSettings.EMPTY; + } + int availableProcessors = EsExecutors.boundedNumberOfProcessors(settings); + return new ThreadPoolSettingsBuilder.Fixed(THREAD_POOL_NAME) + .size(availableProcessors) + .queueSize(1000) + .build(); + } + + private final Clock clock; + + private volatile Map schedules; + private Timer timer; + private TimerTask ticker; + private EsThreadPoolExecutor executor; + + @Inject + public TimerTickerScheduleTriggerEngine(Settings settings, Clock clock, ScheduleRegistry scheduleRegistry, ThreadPool threadPool) { + super(settings, scheduleRegistry); + this.schedules = new ConcurrentHashMap<>(); + this.clock = clock; + this.executor = (EsThreadPoolExecutor) threadPool.executor(THREAD_POOL_NAME); + } + + @Override + public void start(Collection jobs) { + long starTime = clock.millis(); + Map schedules = new ConcurrentHashMap<>(); + for (Job job : jobs) { + if (job.trigger() instanceof ScheduleTrigger) { + ScheduleTrigger trigger = (ScheduleTrigger) job.trigger(); + schedules.put(job.name(), new ActiveSchedule(job.name(), trigger.schedule(), starTime)); + } + } + this.schedules = schedules; + this.ticker = new TimerTask() { + @Override + public void run() { + checkJobs(); + } + }; + this.timer = new Timer("ticker-schedule-trigger-engine", true); + this.timer.scheduleAtFixedRate(ticker, clock.millis() % 1000 , 10); + } + + @Override + public void stop() { + ticker.cancel(); + timer.cancel(); + executor.getQueue().clear(); + } + + @Override + public void add(Job job) { + assert job.trigger() instanceof ScheduleTrigger; + ScheduleTrigger trigger = (ScheduleTrigger) job.trigger(); + schedules.put(job.name(), new ActiveSchedule(job.name(), trigger.schedule(), clock.millis())); + } + + @Override + public boolean remove(String jobName) { + return schedules.remove(jobName) != null; + } + + void checkJobs() { + long triggeredTime = clock.millis(); + for (ActiveSchedule schedule : schedules.values()) { + long scheduledTime = schedule.check(triggeredTime); + if (scheduledTime > 0) { + notifyListeners(schedule.name, triggeredTime, scheduledTime); + } + } + } + + void notifyListeners(String name, long triggeredTime, long scheduledTime) { + final ScheduleTriggerEvent event = new ScheduleTriggerEvent(new DateTime(triggeredTime), new DateTime(scheduledTime)); + for (Listener listener : listeners) { + executor.execute(new ListenerRunnable(listener, name, event)); + } + } + + static class ActiveSchedule { + + private final String name; + private final Schedule schedule; + private final long startTime; + + private volatile long scheduledTime; + + public ActiveSchedule(String name, Schedule schedule, long startTime) { + this.name = name; + this.schedule = schedule; + this.startTime = startTime; + // we don't want the schedule to trigger on the start time itself, so we compute + // the next scheduled time by simply computing the schedule time on the startTime + 1 + this.scheduledTime = schedule.nextScheduledTimeAfter(startTime, startTime + 1); + } + + /** + * Checks whether the given time is the same or after the scheduled time of this schedule. If so, the scheduled time is + * returned a new scheduled time is computed and set. Otherwise (the given time is before the scheduled time), {@code -1} + * is returned. + */ + public long check(long time) { + if (time < scheduledTime) { + return -1; + } + long prevScheduledTime = scheduledTime == 0 ? time : scheduledTime; + scheduledTime = schedule.nextScheduledTimeAfter(startTime, scheduledTime); + return prevScheduledTime; + } + } + + static class ListenerRunnable implements Runnable { + + private final Listener listener; + private final String jobName; + private final ScheduleTriggerEvent event; + + public ListenerRunnable(Listener listener, String jobName, ScheduleTriggerEvent event) { + this.listener = listener; + this.jobName = jobName; + this.event = event; + } + + @Override + public void run() { + listener.triggered(jobName, event); + } + } + +} diff --git a/src/main/java/org/elasticsearch/watcher/trigger/schedule/quartz/WatcherQuartzJob.java b/src/main/java/org/elasticsearch/watcher/trigger/schedule/engine/WatcherQuartzJob.java similarity index 95% rename from src/main/java/org/elasticsearch/watcher/trigger/schedule/quartz/WatcherQuartzJob.java rename to src/main/java/org/elasticsearch/watcher/trigger/schedule/engine/WatcherQuartzJob.java index 7039d556b58..cb334cc5e7e 100644 --- a/src/main/java/org/elasticsearch/watcher/trigger/schedule/quartz/WatcherQuartzJob.java +++ b/src/main/java/org/elasticsearch/watcher/trigger/schedule/engine/WatcherQuartzJob.java @@ -3,7 +3,7 @@ * or more contributor license agreements. Licensed under the Elastic License; * you may not use this file except in compliance with the Elastic License. */ -package org.elasticsearch.watcher.trigger.schedule.quartz; +package org.elasticsearch.watcher.trigger.schedule.engine; import org.quartz.*; diff --git a/src/main/java/org/elasticsearch/watcher/trigger/schedule/support/YearTimes.java b/src/main/java/org/elasticsearch/watcher/trigger/schedule/support/YearTimes.java index 0afe03327ff..c1e6b6c6604 100644 --- a/src/main/java/org/elasticsearch/watcher/trigger/schedule/support/YearTimes.java +++ b/src/main/java/org/elasticsearch/watcher/trigger/schedule/support/YearTimes.java @@ -72,7 +72,8 @@ public class YearTimes implements Times { String daysStr = Ints.join(",", this.days); daysStr = daysStr.replace("32", "L"); String monthsStr = Joiner.on(",").join(months); - crons.add("0 " + minsStr + " " + hrsStr + " " + daysStr + " " + monthsStr + " ?"); + String expression = "0 " + minsStr + " " + hrsStr + " " + daysStr + " " + monthsStr + " ?"; + crons.add(expression); } return crons; } diff --git a/src/main/java/org/elasticsearch/watcher/trigger/schedule/tool/CronEvalTool.java b/src/main/java/org/elasticsearch/watcher/trigger/schedule/tool/CronEvalTool.java index 50e81074546..00913f50671 100644 --- a/src/main/java/org/elasticsearch/watcher/trigger/schedule/tool/CronEvalTool.java +++ b/src/main/java/org/elasticsearch/watcher/trigger/schedule/tool/CronEvalTool.java @@ -9,13 +9,12 @@ import org.elasticsearch.common.cli.CliTool; import org.elasticsearch.common.cli.CliToolConfig; import org.elasticsearch.common.cli.Terminal; import org.elasticsearch.common.cli.commons.CommandLine; +import org.elasticsearch.common.joda.time.DateTime; +import org.elasticsearch.common.joda.time.format.DateTimeFormat; +import org.elasticsearch.common.joda.time.format.DateTimeFormatter; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.env.Environment; -import org.quartz.CronExpression; - -import java.text.SimpleDateFormat; -import java.util.Date; -import java.util.Locale; +import org.elasticsearch.watcher.trigger.schedule.Cron; import static org.elasticsearch.common.cli.CliToolConfig.Builder.cmd; import static org.elasticsearch.common.cli.CliToolConfig.Builder.option; @@ -50,7 +49,7 @@ public class CronEvalTool extends CliTool { .options(option("c", "count").hasArg(false).required(false)) .build(); - private static final SimpleDateFormat format = new SimpleDateFormat("EEE, d MMM yyyy HH:mm:ss", Locale.ROOT); + private static final DateTimeFormatter formatter = DateTimeFormat.forPattern("EEE, d MMM yyyy HH:mm:ss"); final String expression; final int count; @@ -79,20 +78,27 @@ public class CronEvalTool extends CliTool { // when invalid, a parse expression will be thrown with a descriptive error message // the cli infra handles such exceptions and hows the exceptions' message - CronExpression.validateExpression(expression); + Cron.validate(expression); terminal.println("Valid!"); - Date date = new Date(); + DateTime date = DateTime.now(); - terminal.println("Now is [" + format.format(date) + "]"); + terminal.println("Now is [" + formatter.print(date) + "]"); terminal.println("Here are the next " + count + " times this cron expression will trigger:"); - CronExpression cron = new CronExpression(expression); + Cron cron = new Cron(expression); + long time = date.getMillis(); for (int i = 0; i < count; i++) { - date = cron.getNextValidTimeAfter(date); - terminal.println((i+1) + ".\t" + format.format(date)); + long prevTime = time; + time = cron.getNextValidTimeAfter(time); + if (time < 0) { + terminal.printError((i + 1) + ".\t Could not compute future times since [" + formatter.print(prevTime) + "] " + + "(perhaps the cron expression only points to times in the past?)"); + return ExitStatus.OK; + } + terminal.println((i+1) + ".\t" + formatter.print(time)); } return ExitStatus.OK; } diff --git a/src/test/java/org/elasticsearch/watcher/test/AbstractWatcherIntegrationTests.java b/src/test/java/org/elasticsearch/watcher/test/AbstractWatcherIntegrationTests.java index 20cfb37f75c..27bce5201aa 100644 --- a/src/test/java/org/elasticsearch/watcher/test/AbstractWatcherIntegrationTests.java +++ b/src/test/java/org/elasticsearch/watcher/test/AbstractWatcherIntegrationTests.java @@ -54,6 +54,7 @@ import org.elasticsearch.watcher.transport.actions.stats.WatcherStatsResponse; import org.elasticsearch.watcher.trigger.ScheduleTriggerEngineMock; import org.elasticsearch.watcher.trigger.TriggerService; import org.elasticsearch.watcher.trigger.schedule.Schedule; +import org.elasticsearch.watcher.trigger.schedule.ScheduleModule; import org.elasticsearch.watcher.trigger.schedule.ScheduleTrigger; import org.elasticsearch.watcher.trigger.schedule.Schedules; import org.elasticsearch.watcher.watch.Watch; @@ -86,17 +87,20 @@ public abstract class AbstractWatcherIntegrationTests extends ElasticsearchInteg boolean shieldEnabled = enableShield(); + final ScheduleModule.Engine scheduleEngine = randomFrom(ScheduleModule.Engine.values()); + @Override protected Settings nodeSettings(int nodeOrdinal) { - ImmutableSettings.Builder builder = ImmutableSettings.builder() + return ImmutableSettings.builder() .put(super.nodeSettings(nodeOrdinal)) .put("scroll.size", randomIntBetween(1, 100)) .put("plugin.types", (timeWarped() ? TimeWarpedWatcherPlugin.class.getName() : WatcherPlugin.class.getName()) + "," + (shieldEnabled ? ShieldPlugin.class.getName() + "," : "") + licensePluginClass().getName()) - .put(ShieldSettings.settings(shieldEnabled)); - return builder.build(); + .put(ShieldSettings.settings(shieldEnabled)) + .put("watcher.trigger.schedule.engine", scheduleEngine().name().toLowerCase(Locale.ROOT)) + .build(); } /** @@ -116,6 +120,13 @@ public abstract class AbstractWatcherIntegrationTests extends ElasticsearchInteg return shieldEnabled; } + /** + * @return The schedule trigger engine that will be used for the nodes. + */ + protected ScheduleModule.Engine scheduleEngine() { + return scheduleEngine; + } + /** * Override and returns {@code false} to force running without shield */ @@ -600,6 +611,8 @@ public abstract class AbstractWatcherIntegrationTests extends ElasticsearchInteg static class ShieldSettings { + static boolean auditLogsEnabled = SystemPropertyUtil.getBoolean("tests.audit_logs", false); + public static final String IP_FILTER = "allow: all\n"; public static final String USERS = @@ -640,7 +653,7 @@ public abstract class AbstractWatcherIntegrationTests extends ElasticsearchInteg .put("shield.authc.realms.esusers.files.users_roles", writeFile(folder, "users_roles", USER_ROLES)) .put("shield.authz.store.files.roles", writeFile(folder, "roles.yml", ROLES)) .put("shield.transport.n2n.ip_filter.file", writeFile(folder, "ip_filter.yml", IP_FILTER)) - .put("shield.audit.enabled", true) + .put("shield.audit.enabled", auditLogsEnabled) .build(); } diff --git a/src/test/java/org/elasticsearch/watcher/test/TimeWarpedWatcherPlugin.java b/src/test/java/org/elasticsearch/watcher/test/TimeWarpedWatcherPlugin.java index 21f7e0cf083..241be14a928 100644 --- a/src/test/java/org/elasticsearch/watcher/test/TimeWarpedWatcherPlugin.java +++ b/src/test/java/org/elasticsearch/watcher/test/TimeWarpedWatcherPlugin.java @@ -58,7 +58,7 @@ public class TimeWarpedWatcherPlugin extends WatcherPlugin { if (module instanceof TriggerModule) { // replacing scheduler module so we'll // have control on when it fires a job - modules.add(new MockTriggerModule()); + modules.add(new MockTriggerModule(settings)); } else if (module instanceof ClockModule) { // replacing the clock module so we'll be able @@ -79,6 +79,10 @@ public class TimeWarpedWatcherPlugin extends WatcherPlugin { public static class MockTriggerModule extends TriggerModule { + public MockTriggerModule(Settings settings) { + super(settings); + } + @Override protected void registerStandardEngines() { registerEngine(ScheduleTriggerEngineMock.class); diff --git a/src/test/java/org/elasticsearch/watcher/test/bench/ScheduleEngineBenchmark.java b/src/test/java/org/elasticsearch/watcher/test/bench/ScheduleEngineBenchmark.java new file mode 100644 index 00000000000..32f7d861395 --- /dev/null +++ b/src/test/java/org/elasticsearch/watcher/test/bench/ScheduleEngineBenchmark.java @@ -0,0 +1,81 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.watcher.test.bench; + +import org.elasticsearch.common.cli.Terminal; +import org.elasticsearch.common.logging.ESLogger; +import org.elasticsearch.common.logging.Loggers; +import org.elasticsearch.common.settings.ImmutableSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.watcher.support.clock.SystemClock; +import org.elasticsearch.watcher.trigger.Trigger; +import org.elasticsearch.watcher.trigger.TriggerEngine; +import org.elasticsearch.watcher.trigger.TriggerEvent; +import org.elasticsearch.watcher.trigger.schedule.*; +import org.elasticsearch.watcher.trigger.schedule.engine.SchedulerScheduleTriggerEngine; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +/** + * + */ +public class ScheduleEngineBenchmark { + + static final ESLogger logger = Loggers.getLogger(ScheduleEngineBenchmark.class); + + public static void main(String[] args) throws Exception { + Settings settings = ImmutableSettings.builder() + .put(SchedulerScheduleTriggerEngine.additionalSettings(ImmutableSettings.EMPTY)) + .put("name", "test") + .build(); + ThreadPool threadPool = new ThreadPool(settings, null); + + ScheduleRegistry scheduleRegistry = new ScheduleRegistry(Collections.emptyMap()); + + SchedulerScheduleTriggerEngine scheduler = new SchedulerScheduleTriggerEngine(ImmutableSettings.EMPTY, SystemClock.INSTANCE, scheduleRegistry, threadPool); + + List jobs = new ArrayList<>(10000); + for (int i = 0; i < 10000; i++) { + jobs.add(new SimpleJob("job_" + i, new CronSchedule("0/3 * * * * ?"))); + } + scheduler.start(jobs); + + scheduler.register(new TriggerEngine.Listener() { + @Override + public void triggered(String jobName, TriggerEvent event) { + ScheduleTriggerEvent e = (ScheduleTriggerEvent) event; + logger.info("triggered [{}] at fire_time [{}], scheduled time [{}], now [{}]", jobName, event.triggeredTime(), e.scheduledTime(), SystemClock.INSTANCE.now()); + } + }); + + Terminal.DEFAULT.readText("press enter to quit..."); + scheduler.stop(); + } + + static class SimpleJob implements TriggerEngine.Job { + + private final String name; + private final ScheduleTrigger trigger; + + public SimpleJob(String name, Schedule schedule) { + this.name = name; + this.trigger = new ScheduleTrigger(schedule); + } + + @Override + public String name() { + return name; + } + + @Override + public Trigger trigger() { + return trigger; + } + } +} diff --git a/src/test/java/org/elasticsearch/watcher/test/bench/WatcherBenchmark.java b/src/test/java/org/elasticsearch/watcher/test/bench/WatcherBenchmark.java index e0e22cb9640..d04eeb712c6 100644 --- a/src/test/java/org/elasticsearch/watcher/test/bench/WatcherBenchmark.java +++ b/src/test/java/org/elasticsearch/watcher/test/bench/WatcherBenchmark.java @@ -226,7 +226,7 @@ public class WatcherBenchmark { if (module instanceof TriggerModule) { // replacing scheduler module so we'll // have control on when it fires a job - modules.add(new MockTriggerModule()); + modules.add(new MockTriggerModule(settings)); } else { modules.add(module); @@ -237,6 +237,10 @@ public class WatcherBenchmark { public static class MockTriggerModule extends TriggerModule { + public MockTriggerModule(Settings settings) { + super(settings); + } + @Override protected void registerStandardEngines() { registerEngine(ScheduleTriggerEngineMock.class); diff --git a/src/test/java/org/elasticsearch/watcher/test/integration/BasicWatcherTests.java b/src/test/java/org/elasticsearch/watcher/test/integration/BasicWatcherTests.java index 6540cc79948..7d52439235f 100644 --- a/src/test/java/org/elasticsearch/watcher/test/integration/BasicWatcherTests.java +++ b/src/test/java/org/elasticsearch/watcher/test/integration/BasicWatcherTests.java @@ -5,6 +5,7 @@ */ package org.elasticsearch.watcher.test.integration; +import com.carrotsearch.randomizedtesting.annotations.Repeat; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.support.IndicesOptions; @@ -13,6 +14,7 @@ import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.script.ScriptService; import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.elasticsearch.test.junit.annotations.TestLogging; import org.elasticsearch.watcher.WatcherException; import org.elasticsearch.watcher.client.WatchSourceBuilder; import org.elasticsearch.watcher.client.WatcherClient; @@ -23,6 +25,7 @@ import org.elasticsearch.watcher.transport.actions.delete.DeleteWatchResponse; import org.elasticsearch.watcher.transport.actions.get.GetWatchResponse; import org.elasticsearch.watcher.transport.actions.put.PutWatchResponse; import org.elasticsearch.watcher.trigger.schedule.IntervalSchedule; +import org.elasticsearch.watcher.trigger.schedule.ScheduleModule; import org.elasticsearch.watcher.trigger.schedule.Schedules; import org.elasticsearch.watcher.watch.WatchStore; import org.junit.Test; @@ -47,8 +50,14 @@ import static org.hamcrest.Matchers.*; /** */ +@TestLogging("watcher.trigger.schedule:TRACE") public class BasicWatcherTests extends AbstractWatcherIntegrationTests { + @Override + protected ScheduleModule.Engine scheduleEngine() { + return ScheduleModule.Engine.HASHWHEEL; + } + @Test public void testIndexWatch() throws Exception { WatcherClient watcherClient = watcherClient(); @@ -239,7 +248,7 @@ public class BasicWatcherTests extends AbstractWatcherIntegrationTests { assertThat(count, equalTo(findNumberOfPerformedActions("_name"))); } - @Test + @Test //@Repeat(iterations = 10) public void testConditionSearchWithSource() throws Exception { String variable = randomFrom("ctx.execution_time", "ctx.trigger.scheduled_time", "ctx.trigger.triggered_time"); SearchSourceBuilder searchSourceBuilder = searchSource().query(filteredQuery( @@ -251,7 +260,7 @@ public class BasicWatcherTests extends AbstractWatcherIntegrationTests { testConditionSearch(newInputSearchRequest("events").source(searchSourceBuilder)); } - @Test + @Test @Repeat(iterations = 10) public void testConditionSearchWithIndexedTemplate() throws Exception { String variable = randomFrom("ctx.execution_time", "ctx.trigger.scheduled_time", "ctx.trigger.triggered_time"); SearchSourceBuilder searchSourceBuilder = searchSource().query(filteredQuery( @@ -298,6 +307,8 @@ public class BasicWatcherTests extends AbstractWatcherIntegrationTests { timeWarp().scheduler().trigger("_name1"); timeWarp().scheduler().trigger("_name2"); refresh(); + } else { + Thread.sleep(5000); } assertWatchWithMinimumPerformedActionsCount("_name1", 1); @@ -325,10 +336,13 @@ public class BasicWatcherTests extends AbstractWatcherIntegrationTests { String watchName = "_name"; assertAcked(prepareCreate("events").addMapping("event", "_timestamp", "enabled=true", "level", "type=string")); + watcherClient().preparePutWatch(watchName) .setSource(createWatchSource(interval("5s"), request, "return ctx.payload.hits.total >= 3")) .get(); + logger.info("created watch [{}] at [{}]", watchName, DateTime.now()); + client().prepareIndex("events", "event") .setCreate(true) .setSource("level", "a") @@ -337,7 +351,9 @@ public class BasicWatcherTests extends AbstractWatcherIntegrationTests { .setCreate(true) .setSource("level", "a") .get(); + refresh(); + if (timeWarped()) { timeWarp().clock().fastForwardSeconds(5); timeWarp().scheduler().trigger(watchName); diff --git a/src/test/java/org/elasticsearch/watcher/trigger/ScheduleTriggerEngineMock.java b/src/test/java/org/elasticsearch/watcher/trigger/ScheduleTriggerEngineMock.java index be286d16cb7..e92fc43f9aa 100644 --- a/src/test/java/org/elasticsearch/watcher/trigger/ScheduleTriggerEngineMock.java +++ b/src/test/java/org/elasticsearch/watcher/trigger/ScheduleTriggerEngineMock.java @@ -33,13 +33,11 @@ public class ScheduleTriggerEngineMock extends ScheduleTriggerEngine { private final ESLogger logger; private final ConcurrentMap jobs = new ConcurrentHashMap<>(); private final Clock clock; - private final ScheduleRegistry scheduleRegistry; @Inject public ScheduleTriggerEngineMock(Settings settings, ScheduleRegistry scheduleRegistry, Clock clock) { - super(settings); + super(settings, scheduleRegistry); this.logger = Loggers.getLogger(ScheduleTriggerEngineMock.class, settings); - this.scheduleRegistry = scheduleRegistry; this.clock = clock; } diff --git a/src/test/java/org/elasticsearch/watcher/trigger/schedule/CronScheduleTests.java b/src/test/java/org/elasticsearch/watcher/trigger/schedule/CronScheduleTests.java index b662c7e2f0f..5bdd33acfb8 100644 --- a/src/test/java/org/elasticsearch/watcher/trigger/schedule/CronScheduleTests.java +++ b/src/test/java/org/elasticsearch/watcher/trigger/schedule/CronScheduleTests.java @@ -34,7 +34,7 @@ public class CronScheduleTests extends ScheduleTestCase { parser.nextToken(); CronSchedule schedule = new CronSchedule.Parser().parse(parser); assertThat(schedule.crons(), arrayWithSize(1)); - assertThat(schedule.crons()[0], is("0 0/5 * * * ?")); + assertThat(schedule.crons()[0].expression(), is("0 0/5 * * * ?")); } @Test @@ -48,10 +48,11 @@ public class CronScheduleTests extends ScheduleTestCase { XContentParser parser = JsonXContent.jsonXContent.createParser(bytes); parser.nextToken(); CronSchedule schedule = new CronSchedule.Parser().parse(parser); - assertThat(schedule.crons(), arrayWithSize(3)); - assertThat(schedule.crons(), hasItemInArray("0 0/1 * * * ?")); - assertThat(schedule.crons(), hasItemInArray("0 0/2 * * * ?")); - assertThat(schedule.crons(), hasItemInArray("0 0/3 * * * ?")); + String[] crons = expressions(schedule); + assertThat(crons, arrayWithSize(3)); + assertThat(crons, hasItemInArray("0 0/1 * * * ?")); + assertThat(crons, hasItemInArray("0 0/2 * * * ?")); + assertThat(crons, hasItemInArray("0 0/3 * * * ?")); } @Test diff --git a/src/test/java/org/elasticsearch/watcher/trigger/schedule/DailyScheduleTests.java b/src/test/java/org/elasticsearch/watcher/trigger/schedule/DailyScheduleTests.java index b34bf605ce5..ced6b924be0 100644 --- a/src/test/java/org/elasticsearch/watcher/trigger/schedule/DailyScheduleTests.java +++ b/src/test/java/org/elasticsearch/watcher/trigger/schedule/DailyScheduleTests.java @@ -6,12 +6,12 @@ package org.elasticsearch.watcher.trigger.schedule; import com.carrotsearch.randomizedtesting.annotations.Repeat; -import org.elasticsearch.watcher.WatcherSettingsException; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.primitives.Ints; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.json.JsonXContent; +import org.elasticsearch.watcher.WatcherSettingsException; import org.elasticsearch.watcher.trigger.schedule.support.DayTimes; import org.junit.Test; @@ -26,7 +26,7 @@ public class DailyScheduleTests extends ScheduleTestCase { @Test public void test_Default() throws Exception { DailySchedule schedule = new DailySchedule(); - String[] crons = schedule.crons(); + String[] crons = expressions(schedule.crons()); assertThat(crons, arrayWithSize(1)); assertThat(crons, arrayContaining("0 0 0 * * ?")); } @@ -35,7 +35,7 @@ public class DailyScheduleTests extends ScheduleTestCase { public void test_SingleTime() throws Exception { DayTimes time = validDayTime(); DailySchedule schedule = new DailySchedule(time); - String[] crons = schedule.crons(); + String[] crons = expressions(schedule); assertThat(crons, arrayWithSize(1)); assertThat(crons, arrayContaining("0 " + Ints.join(",", time.minute()) + " " + Ints.join(",", time.hour()) + " * * ?")); } @@ -57,7 +57,7 @@ public class DailyScheduleTests extends ScheduleTestCase { public void test_MultipleTimes() throws Exception { DayTimes[] times = validDayTimes(); DailySchedule schedule = new DailySchedule(times); - String[] crons = schedule.crons(); + String[] crons = expressions(schedule); assertThat(crons, arrayWithSize(times.length)); for (DayTimes time : times) { assertThat(crons, hasItemInArray("0 " + Ints.join(",", time.minute()) + " " + Ints.join(",", time.hour()) + " * * ?")); diff --git a/src/test/java/org/elasticsearch/watcher/trigger/schedule/HourlyScheduleTests.java b/src/test/java/org/elasticsearch/watcher/trigger/schedule/HourlyScheduleTests.java index 3047b98a829..4a872496f09 100644 --- a/src/test/java/org/elasticsearch/watcher/trigger/schedule/HourlyScheduleTests.java +++ b/src/test/java/org/elasticsearch/watcher/trigger/schedule/HourlyScheduleTests.java @@ -6,13 +6,13 @@ package org.elasticsearch.watcher.trigger.schedule; import com.carrotsearch.randomizedtesting.annotations.Repeat; -import org.elasticsearch.watcher.WatcherSettingsException; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.collect.Collections2; import org.elasticsearch.common.primitives.Ints; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.json.JsonXContent; +import org.elasticsearch.watcher.WatcherSettingsException; import org.junit.Test; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; @@ -26,7 +26,7 @@ public class HourlyScheduleTests extends ScheduleTestCase { @Test public void test_Default() throws Exception { HourlySchedule schedule = new HourlySchedule(); - String[] crons = schedule.crons(); + String[] crons = expressions(schedule); assertThat(crons, arrayWithSize(1)); assertThat(crons, arrayContaining("0 0 * * * ?")); } @@ -35,7 +35,7 @@ public class HourlyScheduleTests extends ScheduleTestCase { public void test_SingleMinute() throws Exception { int minute = validMinute(); HourlySchedule schedule = new HourlySchedule(minute); - String[] crons = schedule.crons(); + String[] crons = expressions(schedule); assertThat(crons, arrayWithSize(1)); assertThat(crons, arrayContaining("0 " + minute + " * * * ?")); } @@ -50,7 +50,7 @@ public class HourlyScheduleTests extends ScheduleTestCase { int[] minutes = validMinutes(); String minutesStr = Ints.join(",", minutes); HourlySchedule schedule = new HourlySchedule(minutes); - String[] crons = schedule.crons(); + String[] crons = expressions(schedule); assertThat(crons, arrayWithSize(1)); assertThat(crons, arrayContaining("0 " + minutesStr + " * * * ?")); } diff --git a/src/test/java/org/elasticsearch/watcher/trigger/schedule/MonthlyScheduleTests.java b/src/test/java/org/elasticsearch/watcher/trigger/schedule/MonthlyScheduleTests.java index 6610877f9ea..4ed2aba45f8 100644 --- a/src/test/java/org/elasticsearch/watcher/trigger/schedule/MonthlyScheduleTests.java +++ b/src/test/java/org/elasticsearch/watcher/trigger/schedule/MonthlyScheduleTests.java @@ -6,12 +6,12 @@ package org.elasticsearch.watcher.trigger.schedule; import com.carrotsearch.randomizedtesting.annotations.Repeat; -import org.elasticsearch.watcher.WatcherSettingsException; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.primitives.Ints; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.json.JsonXContent; +import org.elasticsearch.watcher.WatcherSettingsException; import org.elasticsearch.watcher.trigger.schedule.support.DayTimes; import org.elasticsearch.watcher.trigger.schedule.support.MonthTimes; import org.junit.Test; @@ -27,7 +27,7 @@ public class MonthlyScheduleTests extends ScheduleTestCase { @Test public void test_Default() throws Exception { MonthlySchedule schedule = new MonthlySchedule(); - String[] crons = schedule.crons(); + String[] crons = expressions(schedule); assertThat(crons, arrayWithSize(1)); assertThat(crons, arrayContaining("0 0 0 1 * ?")); } @@ -36,7 +36,7 @@ public class MonthlyScheduleTests extends ScheduleTestCase { public void test_SingleTime() throws Exception { MonthTimes time = validMonthTime(); MonthlySchedule schedule = new MonthlySchedule(time); - String[] crons = schedule.crons(); + String[] crons = expressions(schedule); assertThat(crons, arrayWithSize(time.times().length)); for (DayTimes dayTimes : time.times()) { String minStr = Ints.join(",", dayTimes.minute()); @@ -51,7 +51,7 @@ public class MonthlyScheduleTests extends ScheduleTestCase { public void test_MultipleTimes() throws Exception { MonthTimes[] times = validMonthTimes(); MonthlySchedule schedule = new MonthlySchedule(times); - String[] crons = schedule.crons(); + String[] crons = expressions(schedule); int count = 0; for (int i = 0; i < times.length; i++) { count += times[i].times().length; diff --git a/src/test/java/org/elasticsearch/watcher/trigger/schedule/ScheduleTestCase.java b/src/test/java/org/elasticsearch/watcher/trigger/schedule/ScheduleTestCase.java index b805c1e6c79..5c26b37b496 100644 --- a/src/test/java/org/elasticsearch/watcher/trigger/schedule/ScheduleTestCase.java +++ b/src/test/java/org/elasticsearch/watcher/trigger/schedule/ScheduleTestCase.java @@ -23,6 +23,18 @@ import static org.elasticsearch.watcher.trigger.schedule.Schedules.*; */ public abstract class ScheduleTestCase extends ElasticsearchTestCase { + protected static String[] expressions(CronnableSchedule schedule) { + return expressions(schedule.crons); + } + + protected static String[] expressions(Cron[] crons) { + String[] expressions = new String[crons.length]; + for (int i = 0; i < expressions.length; i++) { + expressions[i] = crons[i].expression(); + } + return expressions; + } + protected static MonthlySchedule randomMonthlySchedule() { switch (randomIntBetween(1, 4)) { case 1: return monthly().build(); @@ -145,10 +157,13 @@ public abstract class ScheduleTestCase extends ElasticsearchTestCase { } protected static int[] randomDaysOfMonth() { + if (rarely()) { + return new int[] { 32 }; + } int count = randomIntBetween(1, 5); Set days = new HashSet<>(); for (int i = 0; i < count; i++) { - days.add(randomIntBetween(1, 32)); + days.add(randomIntBetween(1, 31)); } return Ints.toArray(days); } diff --git a/src/test/java/org/elasticsearch/watcher/trigger/schedule/WeeklyScheduleTests.java b/src/test/java/org/elasticsearch/watcher/trigger/schedule/WeeklyScheduleTests.java index 091ccefdfc8..c9a24ea4dbe 100644 --- a/src/test/java/org/elasticsearch/watcher/trigger/schedule/WeeklyScheduleTests.java +++ b/src/test/java/org/elasticsearch/watcher/trigger/schedule/WeeklyScheduleTests.java @@ -6,13 +6,13 @@ package org.elasticsearch.watcher.trigger.schedule; import com.carrotsearch.randomizedtesting.annotations.Repeat; -import org.elasticsearch.watcher.WatcherSettingsException; import org.elasticsearch.common.base.Joiner; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.primitives.Ints; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.json.JsonXContent; +import org.elasticsearch.watcher.WatcherSettingsException; import org.elasticsearch.watcher.trigger.schedule.support.DayOfWeek; import org.elasticsearch.watcher.trigger.schedule.support.DayTimes; import org.elasticsearch.watcher.trigger.schedule.support.WeekTimes; @@ -29,7 +29,7 @@ public class WeeklyScheduleTests extends ScheduleTestCase { @Test public void test_Default() throws Exception { WeeklySchedule schedule = new WeeklySchedule(); - String[] crons = schedule.crons(); + String[] crons = expressions(schedule); assertThat(crons, arrayWithSize(1)); assertThat(crons, arrayContaining("0 0 0 ? * MON")); } @@ -38,7 +38,7 @@ public class WeeklyScheduleTests extends ScheduleTestCase { public void test_SingleTime() throws Exception { WeekTimes time = validWeekTime(); WeeklySchedule schedule = new WeeklySchedule(time); - String[] crons = schedule.crons(); + String[] crons = expressions(schedule); assertThat(crons, arrayWithSize(time.times().length)); for (DayTimes dayTimes : time.times()) { assertThat(crons, hasItemInArray("0 " + Ints.join(",", dayTimes.minute()) + " " + Ints.join(",", dayTimes.hour()) + " ? * " + Joiner.on(",").join(time.days()))); @@ -49,7 +49,7 @@ public class WeeklyScheduleTests extends ScheduleTestCase { public void test_MultipleTimes() throws Exception { WeekTimes[] times = validWeekTimes(); WeeklySchedule schedule = new WeeklySchedule(times); - String[] crons = schedule.crons(); + String[] crons = expressions(schedule); int count = 0; for (int i = 0; i < times.length; i++) { count += times[i].times().length; diff --git a/src/test/java/org/elasticsearch/watcher/trigger/schedule/YearlyScheduleTests.java b/src/test/java/org/elasticsearch/watcher/trigger/schedule/YearlyScheduleTests.java index 93f8b0b139f..924e911c639 100644 --- a/src/test/java/org/elasticsearch/watcher/trigger/schedule/YearlyScheduleTests.java +++ b/src/test/java/org/elasticsearch/watcher/trigger/schedule/YearlyScheduleTests.java @@ -6,13 +6,13 @@ package org.elasticsearch.watcher.trigger.schedule; import com.carrotsearch.randomizedtesting.annotations.Repeat; -import org.elasticsearch.watcher.WatcherSettingsException; import org.elasticsearch.common.base.Joiner; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.primitives.Ints; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.json.JsonXContent; +import org.elasticsearch.watcher.WatcherSettingsException; import org.elasticsearch.watcher.trigger.schedule.support.DayTimes; import org.elasticsearch.watcher.trigger.schedule.support.YearTimes; import org.junit.Test; @@ -28,16 +28,16 @@ public class YearlyScheduleTests extends ScheduleTestCase { @Test public void test_Default() throws Exception { YearlySchedule schedule = new YearlySchedule(); - String[] crons = schedule.crons(); + String[] crons = expressions(schedule); assertThat(crons, arrayWithSize(1)); assertThat(crons, arrayContaining("0 0 0 1 JAN ?")); } - @Test @Repeat(iterations = 20) + @Test @Repeat(iterations = 120) public void test_SingleTime() throws Exception { YearTimes time = validYearTime(); YearlySchedule schedule = new YearlySchedule(time); - String[] crons = schedule.crons(); + String[] crons = expressions(schedule); assertThat(crons, arrayWithSize(time.times().length)); for (DayTimes dayTimes : time.times()) { String minStr = Ints.join(",", dayTimes.minute()); @@ -45,7 +45,9 @@ public class YearlyScheduleTests extends ScheduleTestCase { String dayStr = Ints.join(",", time.days()); dayStr = dayStr.replace("32", "L"); String monthStr = Joiner.on(",").join(time.months()); - assertThat(crons, hasItemInArray("0 " + minStr + " " + hrStr + " " + dayStr + " " + monthStr + " ?")); + String expression = "0 " + minStr + " " + hrStr + " " + dayStr + " " + monthStr + " ?"; + logger.info("expression: " + expression); + assertThat(crons, hasItemInArray(expression)); } } @@ -53,7 +55,7 @@ public class YearlyScheduleTests extends ScheduleTestCase { public void test_MultipleTimes() throws Exception { YearTimes[] times = validYearTimes(); YearlySchedule schedule = new YearlySchedule(times); - String[] crons = schedule.crons(); + String[] crons = expressions(schedule); int count = 0; for (int i = 0; i < times.length; i++) { count += times[i].times().length; diff --git a/src/test/java/org/elasticsearch/watcher/trigger/schedule/quartz/QuartzScheduleEngineTests.java b/src/test/java/org/elasticsearch/watcher/trigger/schedule/quartz/QuartzScheduleEngineTests.java index 746b396c2dd..3f11f5cf754 100644 --- a/src/test/java/org/elasticsearch/watcher/trigger/schedule/quartz/QuartzScheduleEngineTests.java +++ b/src/test/java/org/elasticsearch/watcher/trigger/schedule/quartz/QuartzScheduleEngineTests.java @@ -20,6 +20,7 @@ import org.elasticsearch.watcher.trigger.TriggerEvent; import org.elasticsearch.watcher.trigger.schedule.Schedule; import org.elasticsearch.watcher.trigger.schedule.ScheduleRegistry; import org.elasticsearch.watcher.trigger.schedule.ScheduleTrigger; +import org.elasticsearch.watcher.trigger.schedule.engine.QuartzScheduleTriggerEngine; import org.elasticsearch.watcher.trigger.schedule.support.DayOfWeek; import org.elasticsearch.watcher.trigger.schedule.support.WeekTimes; import org.junit.After; diff --git a/src/test/java/org/elasticsearch/watcher/watch/WatchTests.java b/src/test/java/org/elasticsearch/watcher/watch/WatchTests.java index 7fcf0597c90..193b27c2eb0 100644 --- a/src/test/java/org/elasticsearch/watcher/watch/WatchTests.java +++ b/src/test/java/org/elasticsearch/watcher/watch/WatchTests.java @@ -16,7 +16,6 @@ import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentFactory; -import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.test.ElasticsearchTestCase; import org.elasticsearch.watcher.actions.ActionFactory; import org.elasticsearch.watcher.actions.ActionRegistry; @@ -78,7 +77,6 @@ import org.elasticsearch.watcher.trigger.schedule.support.*; import org.junit.Before; import org.junit.Test; -import java.io.IOException; import java.util.Collection; import java.util.Map; @@ -323,11 +321,8 @@ public class WatchTests extends ElasticsearchTestCase { static class ParseOnlyScheduleTriggerEngine extends ScheduleTriggerEngine { - private final ScheduleRegistry registry; - public ParseOnlyScheduleTriggerEngine(Settings settings, ScheduleRegistry registry) { - super(settings); - this.registry = registry; + super(settings, registry); } @Override @@ -350,16 +345,5 @@ public class WatchTests extends ElasticsearchTestCase { public boolean remove(String jobName) { return false; } - - @Override - public ScheduleTrigger parseTrigger(String context, XContentParser parser) throws IOException { - Schedule schedule = registry.parse(context, parser); - return new ScheduleTrigger(schedule); - } - - @Override - public ScheduleTriggerEvent parseTriggerEvent(String context, XContentParser parser) throws IOException { - return null; - } } }