Adds initial schedule engine implementation

- `TimerScheduleTriggerEngine` - a single threaded Java `Timer`based scheduler. "Ticks" every second and checks all the registered schedules.

- `SimpleTickerScheduleTriggerEngine` - a single threaded scheduler. "Ticks" every second and checks all the registered schedules

- `SchedulerScheduleTriggerEngine` - a single threaded engine based on Java's schedule executor service. Here, every job is added as a scheduled task to the executor and each job is managing its own execution times.

- `HashWheelScheduleTriggerEngine` - a single threaded engine based on Netty's `HashWheelTimer`. Like with the `scheduler` above, every job is added as a scheduled task to the executor and each job is managing its own execution times.

Also:

- Added an undocumented feature to configure the schedule engine in the settings using `watcher.trigger.schedule.engine` (optional values right now are `quartz`, `simple`, `timer`, `hashwheel` and `scheduler`)
- `Cron` is a fork/copy of quartz `CronExpression`.. a bit cleaned up though.
- `Schedule` now exposes `nextScheduledTimeAfter` to return the next scheduled time after the given one.
- `CronnableSchedule` is now based on `Cron` (this exposed bugs in the schedule tests where we generated invalid cron expression. Now, since `Cronnable` creates the actual cron, validation is in place to make sure only valid expressions are created)
- While at it... refactored how the thread pool settings are set. Removed it from the plugin class, now each module is responsible for the settings of its own TPs. Also, if the thread pools are already configured in node settings we don't configure our default ones. This will enable users to configure the TPs in `elasticsearch.yml`
- Also updated `CronEvalTool` to work with `DateTime` construct (instead of java's `Date`)

Original commit: elastic/x-pack-elasticsearch@40d107c66e
This commit is contained in:
uboness 2015-04-08 15:31:00 +02:00 committed by Martijn van Groningen
parent 3c7b42eb7b
commit 3f26a1b2e0
39 changed files with 2941 additions and 145 deletions

View File

@ -33,7 +33,7 @@ import org.elasticsearch.watcher.watch.WatchModule;
public class WatcherModule extends AbstractModule implements SpawnModules {
private final Settings settings;
protected final Settings settings;
public WatcherModule(Settings settings) {
this.settings = settings;
@ -51,7 +51,7 @@ public class WatcherModule extends AbstractModule implements SpawnModules {
new WatcherClientModule(),
new TransformModule(),
new WatcherRestModule(),
new TriggerModule(),
new TriggerModule(settings),
new WatcherTransportModule(),
new ConditionModule(),
new InputModule(),

View File

@ -6,16 +6,17 @@
package org.elasticsearch.watcher;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.watcher.actions.email.service.InternalEmailService;
import org.elasticsearch.watcher.license.LicenseService;
import org.elasticsearch.watcher.support.init.InitializingService;
import org.elasticsearch.common.collect.ImmutableList;
import org.elasticsearch.common.component.LifecycleComponent;
import org.elasticsearch.common.inject.Module;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.plugins.AbstractPlugin;
import org.elasticsearch.watcher.actions.email.service.InternalEmailService;
import org.elasticsearch.watcher.history.HistoryModule;
import org.elasticsearch.watcher.license.LicenseService;
import org.elasticsearch.watcher.support.init.InitializingService;
import org.elasticsearch.watcher.trigger.schedule.ScheduleModule;
import java.util.Collection;
@ -24,7 +25,6 @@ import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilde
public class WatcherPlugin extends AbstractPlugin {
public static final String NAME = "watcher";
public static final String SCHEDULER_THREAD_POOL_NAME = "watcher_scheduler";
private final Settings settings;
private final boolean transportClient;
@ -68,15 +68,12 @@ public class WatcherPlugin extends AbstractPlugin {
if (transportClient) {
return ImmutableSettings.EMPTY;
}
int availableProcessors = EsExecutors.boundedNumberOfProcessors(settings);
return settingsBuilder()
.put("threadpool." + SCHEDULER_THREAD_POOL_NAME + ".type", "fixed")
.put("threadpool." + SCHEDULER_THREAD_POOL_NAME + ".size", availableProcessors * 2)
.put("threadpool." + SCHEDULER_THREAD_POOL_NAME + ".queue_size", 1000)
.put("threadpool." + NAME + ".type", "fixed")
.put("threadpool." + NAME + ".size", availableProcessors * 5)
.put("threadpool." + NAME + ".queue_size", 1000)
Settings additionalSettings = settingsBuilder()
.put(ScheduleModule.additionalSettings(settings))
.put(HistoryModule.additionalSettings(settings))
.build();
return additionalSettings;
}
}

View File

@ -5,10 +5,14 @@
*/
package org.elasticsearch.watcher.execution;
import org.elasticsearch.watcher.WatcherPlugin;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.watcher.WatcherPlugin;
import org.elasticsearch.watcher.support.ThreadPoolSettingsBuilder;
import java.util.concurrent.BlockingQueue;
@ -17,6 +21,22 @@ import java.util.concurrent.BlockingQueue;
*/
public class InternalWatchExecutor implements WatchExecutor {
public static final String THREAD_POOL_NAME = WatcherPlugin.NAME;
public static Settings additionalSettings(Settings nodeSettings) {
Settings settings = nodeSettings.getAsSettings("threadpool." + THREAD_POOL_NAME);
if (!settings.names().isEmpty()) {
// the TP is already configured in the node settings
// no need for additional settings
return ImmutableSettings.EMPTY;
}
int availableProcessors = EsExecutors.boundedNumberOfProcessors(nodeSettings);
return new ThreadPoolSettingsBuilder.Fixed(THREAD_POOL_NAME)
.size(5 * availableProcessors)
.queueSize(1000)
.build();
}
private final ThreadPool threadPool;
@Inject
@ -40,6 +60,6 @@ public class InternalWatchExecutor implements WatchExecutor {
}
private EsThreadPoolExecutor executor() {
return (EsThreadPoolExecutor) threadPool.executor(WatcherPlugin.NAME);
return (EsThreadPoolExecutor) threadPool.executor(THREAD_POOL_NAME);
}
}

View File

@ -6,6 +6,8 @@
package org.elasticsearch.watcher.history;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.watcher.execution.InternalWatchExecutor;
/**
*/
@ -21,4 +23,8 @@ public class HistoryModule extends AbstractModule {
bind(WatchRecord.Parser.class).asEagerSingleton();
bind(HistoryStore.class).asEagerSingleton();
}
public static Settings additionalSettings(Settings nodeSettings) {
return InternalWatchExecutor.additionalSettings(nodeSettings);
}
}

View File

@ -0,0 +1,63 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.watcher.support;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
/**
*
*/
public abstract class ThreadPoolSettingsBuilder<B extends ThreadPoolSettingsBuilder> {
public static Same same(String name) {
return new Same(name);
}
protected final String name;
private final ImmutableSettings.Builder builder = ImmutableSettings.builder();
protected ThreadPoolSettingsBuilder(String name, String type) {
this.name = name;
put("type", type);
}
public Settings build() {
return builder.build();
}
protected B put(String setting, Object value) {
builder.put("threadpool." + name + "." + setting, value);
return (B) this;
}
protected B put(String setting, int value) {
builder.put("threadpool." + name + "." + setting, value);
return (B) this;
}
public static class Same extends ThreadPoolSettingsBuilder<Same> {
public Same(String name) {
super(name, "same");
}
}
public static class Fixed extends ThreadPoolSettingsBuilder<Fixed> {
public Fixed(String name) {
super(name, "fixed");
}
public Fixed size(int size) {
return put("size", size);
}
public Fixed queueSize(int queueSize) {
return put("queue_size", queueSize);
}
}
}

View File

@ -29,6 +29,12 @@ public interface TriggerEngine<T extends Trigger, E extends TriggerEvent> {
void add(Job job);
/**
* Removes the job associated with the given name from this trigger engine.
*
* @param jobName The name of the job to remove
* @return {@code true} if the job existed and removed, {@code false} otherwise.
*/
boolean remove(String jobName);
T parseTrigger(String context, XContentParser parser) throws IOException;

View File

@ -10,6 +10,7 @@ import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.inject.Module;
import org.elasticsearch.common.inject.SpawnModules;
import org.elasticsearch.common.inject.multibindings.Multibinder;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.watcher.trigger.manual.ManualTriggerEngine;
import org.elasticsearch.watcher.trigger.schedule.ScheduleModule;
@ -21,9 +22,11 @@ import java.util.Set;
*/
public class TriggerModule extends AbstractModule implements SpawnModules {
private final Settings settings;
private final Set<Class<? extends TriggerEngine>> engines = new HashSet<>();
public TriggerModule() {
public TriggerModule(Settings settings) {
this.settings = settings;
registerStandardEngines();
}
@ -32,7 +35,7 @@ public class TriggerModule extends AbstractModule implements SpawnModules {
}
protected void registerStandardEngines() {
registerEngine(ScheduleModule.triggerEngineType());
registerEngine(ScheduleModule.triggerEngineType(settings));
registerEngine(ManualTriggerEngine.class);
}

View File

@ -54,6 +54,12 @@ public class TriggerService extends AbstractComponent {
engines.get(job.trigger().type()).add(job);
}
/**
* Removes the job associated with the given name from this trigger service.
*
* @param jobName The name of the job to remove
* @return {@code true} if the job existed and removed, {@code false} otherwise.
*/
public boolean remove(String jobName) {
for (TriggerEngine engine : engines.values()) {
if (engine.remove(jobName)) {

File diff suppressed because it is too large Load Diff

View File

@ -8,10 +8,8 @@ package org.elasticsearch.watcher.trigger.schedule;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.watcher.WatcherSettingsException;
import org.quartz.CronExpression;
import java.io.IOException;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.List;
@ -23,8 +21,7 @@ public class CronSchedule extends CronnableSchedule {
public static final String TYPE = "cron";
public CronSchedule(String... crons) {
super(crons);
validate(crons);
super(validate(crons));
}
@Override
@ -37,14 +34,15 @@ public class CronSchedule extends CronnableSchedule {
return crons.length == 1 ? builder.value(crons[0]) : builder.value(crons);
}
static void validate(String... crons) {
static String[] validate(String... crons) {
for (String cron :crons) {
try {
CronExpression.validateExpression(cron);
} catch (ParseException pe) {
Cron.validate(cron);
} catch (Cron.ParseException pe) {
throw new ValidationException(cron, pe);
}
}
return crons;
}
public static class Parser implements Schedule.Parser<CronSchedule> {
@ -90,7 +88,7 @@ public class CronSchedule extends CronnableSchedule {
private String expression;
public ValidationException(String expression, ParseException cause) {
public ValidationException(String expression, Cron.ParseException cause) {
super("invalid cron expression [" + expression + "]. " + cause.getMessage());
this.expression = expression;
}

View File

@ -6,6 +6,7 @@
package org.elasticsearch.watcher.trigger.schedule;
import java.util.Arrays;
import java.util.Comparator;
import java.util.Objects;
/**
@ -13,14 +14,38 @@ import java.util.Objects;
*/
public abstract class CronnableSchedule implements Schedule {
protected final String[] crons;
private static final Comparator<Cron> CRON_COMPARATOR = new Comparator<Cron>() {
@Override
public int compare(Cron c1, Cron c2) {
return c1.expression().compareTo(c2.expression());
}
};
public CronnableSchedule(String... crons) {
this.crons = crons;
Arrays.sort(crons);
protected final Cron[] crons;
public CronnableSchedule(String... expressions) {
this(crons(expressions));
}
public String[] crons() {
public CronnableSchedule(Cron... crons) {
assert crons.length > 0;
this.crons = crons;
Arrays.sort(crons, CRON_COMPARATOR);
}
@Override
public long nextScheduledTimeAfter(long startTime, long time) {
if (time <= startTime) {
return startTime;
}
long nextTime = Long.MAX_VALUE;
for (Cron cron : crons) {
nextTime = Math.min(nextTime, cron.getNextValidTimeAfter(time));
}
return nextTime;
}
public Cron[] crons() {
return crons;
}
@ -40,4 +65,12 @@ public abstract class CronnableSchedule implements Schedule {
final CronnableSchedule other = (CronnableSchedule) obj;
return Objects.deepEquals(this.crons, other.crons);
}
static Cron[] crons(String... expressions) {
Cron[] crons = new Cron[expressions.length];
for (int i = 0; i < crons.length; i++) {
crons[i] = new Cron(expressions[i]);
}
return crons;
}
}

View File

@ -32,6 +32,17 @@ public class IntervalSchedule implements Schedule {
return TYPE;
}
@Override
public long nextScheduledTimeAfter(long startTime, long time) {
if (time <= startTime) {
return startTime;
}
// advancing the time in 1 ns (we're looking for the time **after**)
time += 1;
long delta = time - startTime;
return startTime + (delta / interval.millis + 1) * interval.millis;
}
public Interval interval() {
return interval;
}
@ -132,10 +143,12 @@ public class IntervalSchedule implements Schedule {
private final long duration;
private final Unit unit;
private final long millis; // computed once
public Interval(long duration, Unit unit) {
this.duration = duration;
this.unit = unit;
this.millis = unit.millis(duration);
}
public long seconds() {

View File

@ -17,7 +17,24 @@ public interface Schedule extends ToXContent {
String type();
public static interface Parser<S extends Schedule> {
/**
* Returns the next scheduled time after the given time, according to this schedule. If the given schedule
* cannot resolve the next scheduled time, then {@code -1} is returned. It really depends on the type of
* schedule to determine when {@code -1} is returned. Some schedules (e.g. IntervalSchedule) will never return
* {@code -1} as they can always compute the next scheduled time. {@code Cron} based schedules are good example
* of schedules that may return {@code -1}, for example, when the schedule only points to times that are all
* before the given time (in which case, there is no next scheduled time for the given time).
*
* Example:
*
* cron 0 0 0 * 1 ? 2013 (only points to days in January 2013)
*
* time 2015-01-01 12:00:00 (this time is in 2015)
*
*/
long nextScheduledTimeAfter(long startTime, long time);
interface Parser<S extends Schedule> {
String type();

View File

@ -7,10 +7,13 @@ package org.elasticsearch.watcher.trigger.schedule;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.inject.multibindings.MapBinder;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.watcher.trigger.TriggerEngine;
import org.elasticsearch.watcher.trigger.schedule.quartz.QuartzScheduleTriggerEngine;
import org.elasticsearch.watcher.trigger.schedule.engine.*;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
/**
@ -30,8 +33,10 @@ public class ScheduleModule extends AbstractModule {
registerScheduleParser(YearlySchedule.TYPE, YearlySchedule.Parser.class);
}
public static Class<? extends TriggerEngine> triggerEngineType() {
return QuartzScheduleTriggerEngine.class;
public static Class<? extends TriggerEngine> triggerEngineType(Settings nodeSettings) {
Engine engine = Engine.resolve(nodeSettings);
Loggers.getLogger(ScheduleModule.class, nodeSettings).info("using [{}] schedule trigger engine", engine.name().toLowerCase(Locale.ROOT));
return engine.engineType();
}
public void registerScheduleParser(String parserType, Class<? extends Schedule.Parser> parserClass) {
@ -50,4 +55,89 @@ public class ScheduleModule extends AbstractModule {
bind(ScheduleRegistry.class).asEagerSingleton();
}
public static Settings additionalSettings(Settings nodeSettings) {
Engine engine = Engine.resolve(nodeSettings);
return engine.additionalSettings(nodeSettings);
}
public enum Engine {
SCHEDULER() {
@Override
protected Class<? extends TriggerEngine> engineType() {
return SchedulerScheduleTriggerEngine.class;
}
@Override
protected Settings additionalSettings(Settings nodeSettings) {
return SchedulerScheduleTriggerEngine.additionalSettings(nodeSettings);
}
},
HASHWHEEL() {
@Override
protected Class<? extends TriggerEngine> engineType() {
return HashWheelScheduleTriggerEngine.class;
}
@Override
protected Settings additionalSettings(Settings nodeSettings) {
return HashWheelScheduleTriggerEngine.additionalSettings(nodeSettings);
}
},
QUARTZ() {
@Override
protected Class<? extends TriggerEngine> engineType() {
return QuartzScheduleTriggerEngine.class;
}
@Override
protected Settings additionalSettings(Settings nodeSettings) {
return QuartzScheduleTriggerEngine.additionalSettings(nodeSettings);
}
},
TIMER() {
@Override
protected Class<? extends TriggerEngine> engineType() {
return TimerTickerScheduleTriggerEngine.class;
}
@Override
protected Settings additionalSettings(Settings nodeSettings) {
return TimerTickerScheduleTriggerEngine.additionalSettings(nodeSettings);
}
},
SIMPLE() {
@Override
protected Class<? extends TriggerEngine> engineType() {
return SimpleTickerScheduleTriggerEngine.class;
}
@Override
protected Settings additionalSettings(Settings nodeSettings) {
return SimpleTickerScheduleTriggerEngine.additionalSettings(nodeSettings);
}
};
protected abstract Class<? extends TriggerEngine> engineType();
protected abstract Settings additionalSettings(Settings nodeSettings);
public static Engine resolve(Settings settings) {
String engine = settings.getComponentSettings(ScheduleModule.class).get("engine", "scheduler");
switch (engine.toLowerCase(Locale.ROOT)) {
case "quartz" : return QUARTZ;
case "timer" : return TIMER;
case "simple" : return SIMPLE;
case "hashwheel" : return HASHWHEEL;
case "scheduler" : return SCHEDULER;
default:
return SCHEDULER;
}
}
}
}

View File

@ -5,7 +5,6 @@
*/
package org.elasticsearch.watcher.trigger.schedule;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.watcher.trigger.Trigger;

View File

@ -6,8 +6,11 @@
package org.elasticsearch.watcher.trigger.schedule;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.watcher.trigger.AbstractTriggerEngine;
import java.io.IOException;
/**
*
*/
@ -15,12 +18,26 @@ public abstract class ScheduleTriggerEngine extends AbstractTriggerEngine<Schedu
public static final String TYPE = ScheduleTrigger.TYPE;
public ScheduleTriggerEngine(Settings settings) {
protected final ScheduleRegistry scheduleRegistry;
public ScheduleTriggerEngine(Settings settings, ScheduleRegistry scheduleRegistry) {
super(settings);
this.scheduleRegistry = scheduleRegistry;
}
@Override
public String type() {
return TYPE;
}
@Override
public ScheduleTrigger parseTrigger(String context, XContentParser parser) throws IOException {
Schedule schedule = scheduleRegistry.parse(context, parser);
return new ScheduleTrigger(schedule);
}
@Override
public ScheduleTriggerEvent parseTriggerEvent(String context, XContentParser parser) throws IOException {
return ScheduleTriggerEvent.parse(context, parser);
}
}

View File

@ -0,0 +1,259 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.watcher.trigger.schedule.engine;
import org.elasticsearch.common.collect.ImmutableMap;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.joda.time.DateTime;
import org.elasticsearch.common.netty.util.HashedWheelTimer;
import org.elasticsearch.common.netty.util.Timeout;
import org.elasticsearch.common.netty.util.TimerTask;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.watcher.support.ThreadPoolSettingsBuilder;
import org.elasticsearch.watcher.support.clock.Clock;
import org.elasticsearch.watcher.trigger.schedule.*;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.TimeUnit;
/**
*
*/
public class HashWheelScheduleTriggerEngine extends ScheduleTriggerEngine {
public static final String THREAD_POOL_NAME = "watcher_scheduler";
public static Settings additionalSettings(Settings nodeSettings) {
Settings settings = nodeSettings.getAsSettings("threadpool." + THREAD_POOL_NAME);
if (!settings.names().isEmpty()) {
// scheduler TP is already configured in the node settings
// no need for additional settings
return ImmutableSettings.EMPTY;
}
int availableProcessors = EsExecutors.boundedNumberOfProcessors(settings);
return new ThreadPoolSettingsBuilder.Fixed(THREAD_POOL_NAME)
.size(availableProcessors)
.queueSize(1000)
.build();
}
private final Clock clock;
private final EsThreadPoolExecutor executor;
private final TimeValue tickDuration;
private final int ticksPerWheel;
private volatile HashedWheelTimer timer;
private volatile Schedules schedules;
@Inject
public HashWheelScheduleTriggerEngine(Settings settings, Clock clock, ScheduleRegistry scheduleRegistry, ThreadPool threadPool) {
super(settings, scheduleRegistry);
this.clock = clock;
this.executor = (EsThreadPoolExecutor) threadPool.executor(THREAD_POOL_NAME);
this.tickDuration = componentSettings.getAsTime("hashweel.tick_duration", TimeValue.timeValueMillis(500));
this.ticksPerWheel = componentSettings.getAsInt("hashweel.tick_per_wheel", 8196);
}
@Override
public void start(Collection<Job> jobs) {
logger.debug("starting schedule engine...");
this.timer = new HashedWheelTimer(EsExecutors.daemonThreadFactory("trigger_engine_scheduler"), tickDuration.millis(), TimeUnit.MILLISECONDS, ticksPerWheel);
// calibrate with round clock
while (clock.millis() % 1000 > 15) {
}
timer.start();
long starTime = clock.millis();
List<ActiveSchedule> schedules = new ArrayList<>();
for (Job job : jobs) {
if (job.trigger() instanceof ScheduleTrigger) {
ScheduleTrigger trigger = (ScheduleTrigger) job.trigger();
schedules.add(new ActiveSchedule(job.name(), trigger.schedule(), starTime));
}
}
this.schedules = new Schedules(schedules);
logger.debug("schedule engine started at [{}]", DateTime.now());
}
@Override
public void stop() {
logger.debug("stopping schedule engine...");
timer.stop();
executor.getQueue().clear();
logger.debug("schedule engine stopped");
}
@Override
public void add(Job job) {
assert job.trigger() instanceof ScheduleTrigger;
ScheduleTrigger trigger = (ScheduleTrigger) job.trigger();
ActiveSchedule schedule = new ActiveSchedule(job.name(), trigger.schedule(), clock.millis());
schedules = schedules.add(schedule);
}
@Override
public boolean remove(String jobName) {
Schedules newSchedules = schedules.remove(jobName);
if (newSchedules == null) {
return false;
}
schedules = newSchedules;
return true;
}
void notifyListeners(String name, long triggeredTime, long scheduledTime) {
logger.trace("triggered job [{}] at [{}] (scheduled time was [{}])", name, new DateTime(triggeredTime), new DateTime(scheduledTime));
final ScheduleTriggerEvent event = new ScheduleTriggerEvent(new DateTime(triggeredTime), new DateTime(scheduledTime));
for (Listener listener : listeners) {
executor.execute(new ListenerRunnable(listener, name, event));
}
}
class ActiveSchedule implements TimerTask {
private final String name;
private final Schedule schedule;
private final long startTime;
private volatile Timeout timeout;
private volatile long scheduledTime;
public ActiveSchedule(String name, Schedule schedule, long startTime) {
this.name = name;
this.schedule = schedule;
this.startTime = startTime;
// we don't want the schedule to trigger on the start time itself, so we compute
// the next scheduled time by simply computing the schedule time on the startTime + 1
this.scheduledTime = schedule.nextScheduledTimeAfter(startTime, startTime + 1);
long delay = scheduledTime > 0 ? scheduledTime - clock.millis() : -1;
if (delay > 0) {
timeout = timer.newTimeout(this, delay, TimeUnit.MILLISECONDS);
} else {
timeout = null;
}
}
@Override
public void run(Timeout timeout) {
long triggeredTime = clock.millis();
notifyListeners(name, triggeredTime, scheduledTime);
scheduledTime = schedule.nextScheduledTimeAfter(startTime, triggeredTime);
long delay = scheduledTime > 0 ? scheduledTime - triggeredTime : -1;
if (delay > 0) {
this.timeout = timer.newTimeout(this, delay, TimeUnit.MILLISECONDS);
}
}
public void cancel() {
if (timeout != null) {
timeout.cancel();
}
}
}
static class ListenerRunnable implements Runnable {
private final Listener listener;
private final String jobName;
private final ScheduleTriggerEvent event;
public ListenerRunnable(Listener listener, String jobName, ScheduleTriggerEvent event) {
this.listener = listener;
this.jobName = jobName;
this.event = event;
}
@Override
public void run() {
listener.triggered(jobName, event);
}
}
static class Schedules {
private final ActiveSchedule[] schedules;
private final ImmutableMap<String, ActiveSchedule> scheduleByName;
Schedules(Collection<ActiveSchedule> schedules) {
ImmutableMap.Builder<String, ActiveSchedule> builder = ImmutableMap.builder();
this.schedules = new ActiveSchedule[schedules.size()];
int i = 0;
for (ActiveSchedule schedule : schedules) {
builder.put(schedule.name, schedule);
this.schedules[i++] = schedule;
}
this.scheduleByName = builder.build();
}
public Schedules(ActiveSchedule[] schedules, ImmutableMap<String, ActiveSchedule> scheduleByName) {
this.schedules = schedules;
this.scheduleByName = scheduleByName;
}
public long getNextScheduledTime() {
long time = -1;
for (ActiveSchedule schedule : schedules) {
if (time < 0) {
time = schedule.scheduledTime;
} else {
time = Math.min(time, schedule.scheduledTime);
}
}
return time;
}
public Schedules add(ActiveSchedule schedule) {
boolean replacing = scheduleByName.containsKey(schedule.name);
if (!replacing) {
ActiveSchedule[] newSchedules = new ActiveSchedule[schedules.length + 1];
System.arraycopy(schedules, 0, newSchedules, 0, schedules.length);
newSchedules[schedules.length] = schedule;
ImmutableMap<String, ActiveSchedule> newScheduleByName = ImmutableMap.<String, ActiveSchedule>builder()
.putAll(scheduleByName)
.put(schedule.name, schedule)
.build();
return new Schedules(newSchedules, newScheduleByName);
}
ActiveSchedule[] newSchedules = new ActiveSchedule[schedules.length];
ImmutableMap.Builder<String, ActiveSchedule> builder = ImmutableMap.builder();
for (int i = 0; i < schedules.length; i++) {
ActiveSchedule sched = schedules[i].name.equals(schedule.name) ? schedule : schedules[i];
schedules[i] = sched;
builder.put(sched.name, sched);
}
return new Schedules(newSchedules, builder.build());
}
public Schedules remove(String name) {
if (!scheduleByName.containsKey(name)) {
return null;
}
ImmutableMap.Builder<String, ActiveSchedule> builder = ImmutableMap.builder();
ActiveSchedule[] newSchedules = new ActiveSchedule[schedules.length - 1];
int i = 0;
for (ActiveSchedule schedule : schedules) {
if (!schedule.name.equals(name)) {
newSchedules[i++] = schedule;
builder.put(schedule.name, schedule);
} else {
schedule.cancel();
}
}
return new Schedules(newSchedules, builder.build());
}
}
}

View File

@ -3,17 +3,18 @@
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.watcher.trigger.schedule.quartz;
package org.elasticsearch.watcher.trigger.schedule.engine;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.joda.time.DateTime;
import org.elasticsearch.common.joda.time.DateTimeZone;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.watcher.WatcherPlugin;
import org.elasticsearch.watcher.WatcherSettingsException;
import org.elasticsearch.watcher.support.ThreadPoolSettingsBuilder;
import org.elasticsearch.watcher.support.clock.Clock;
import org.elasticsearch.watcher.trigger.TriggerException;
import org.elasticsearch.watcher.trigger.schedule.*;
@ -21,31 +22,39 @@ import org.quartz.*;
import org.quartz.impl.StdSchedulerFactory;
import org.quartz.simpl.SimpleJobFactory;
import java.io.IOException;
import java.util.*;
import static org.elasticsearch.watcher.trigger.schedule.quartz.WatcherQuartzJob.jobDetail;
/**
*
*/
public class QuartzScheduleTriggerEngine extends ScheduleTriggerEngine {
private final ScheduleRegistry scheduleRegistry;
public static final String THREAD_POOL_NAME = "watcher_scheduler";
// Not happy about it, but otherwise we're stuck with Quartz's SimpleThreadPool
private volatile static ThreadPool threadPool;
public static Settings additionalSettings(Settings nodeSettings) {
Settings settings = nodeSettings.getAsSettings("threadpool." + THREAD_POOL_NAME);
if (!settings.names().isEmpty()) {
// scheduler TP is already configured in the node settings
// no need for additional settings
return ImmutableSettings.EMPTY;
}
int availableProcessors = EsExecutors.boundedNumberOfProcessors(settings);
return new ThreadPoolSettingsBuilder.Fixed(THREAD_POOL_NAME)
.size(availableProcessors)
.queueSize(1000)
.build();
}
private final Clock clock;
private final DateTimeZone defaultTimeZone;
private final EsThreadPoolExecutor executor;
private volatile org.quartz.Scheduler scheduler;
@Inject
public QuartzScheduleTriggerEngine(Settings settings, ScheduleRegistry scheduleRegistry, ThreadPool threadPool, Clock clock) {
super(settings);
this.scheduleRegistry = scheduleRegistry;
QuartzScheduleTriggerEngine.threadPool = threadPool;
super(settings, scheduleRegistry);
this.executor = (EsThreadPoolExecutor) threadPool.executor(THREAD_POOL_NAME);
this.clock = clock;
String timeZoneStr = componentSettings.get("time_zone", "UTC");
try {
@ -77,7 +86,7 @@ public class QuartzScheduleTriggerEngine extends ScheduleTriggerEngine {
for (Job job : jobs) {
if (job.trigger() instanceof ScheduleTrigger) {
ScheduleTrigger trigger = (ScheduleTrigger) job.trigger();
quartzJobs.put(jobDetail(job.name(), this), createTrigger(trigger.schedule(), defaultTimeZone, clock));
quartzJobs.put(WatcherQuartzJob.jobDetail(job.name(), this), createTrigger(trigger.schedule(), defaultTimeZone, clock));
}
}
scheduler.scheduleJobs(quartzJobs, false);
@ -95,6 +104,7 @@ public class QuartzScheduleTriggerEngine extends ScheduleTriggerEngine {
logger.info("Stopping scheduler...");
scheduler.shutdown(true);
this.scheduler = null;
executor.getQueue().clear();
logger.info("Stopped scheduler");
}
} catch (org.quartz.SchedulerException se){
@ -108,7 +118,7 @@ public class QuartzScheduleTriggerEngine extends ScheduleTriggerEngine {
ScheduleTrigger trigger = (ScheduleTrigger) job.trigger();
try {
logger.trace("scheduling [{}] with schedule [{}]", job.name(), trigger.schedule());
scheduler.scheduleJob(jobDetail(job.name(), this), createTrigger(trigger.schedule(), defaultTimeZone, clock), true);
scheduler.scheduleJob(WatcherQuartzJob.jobDetail(job.name(), this), createTrigger(trigger.schedule(), defaultTimeZone, clock), true);
} catch (org.quartz.SchedulerException se) {
logger.error("failed to schedule job",se);
throw new TriggerException("failed to schedule job", se);
@ -128,9 +138,9 @@ public class QuartzScheduleTriggerEngine extends ScheduleTriggerEngine {
static Set<Trigger> createTrigger(Schedule schedule, DateTimeZone timeZone, Clock clock) {
HashSet<Trigger> triggers = new HashSet<>();
if (schedule instanceof CronnableSchedule) {
for (String cron : ((CronnableSchedule) schedule).crons()) {
for (Cron cron : ((CronnableSchedule) schedule).crons()) {
triggers.add(TriggerBuilder.newTrigger()
.withSchedule(CronScheduleBuilder.cronSchedule(cron).inTimeZone(timeZone.toTimeZone()))
.withSchedule(CronScheduleBuilder.cronSchedule(cron.expression()).inTimeZone(timeZone.toTimeZone()))
.startAt(clock.now().toDate())
.build());
}
@ -146,38 +156,36 @@ public class QuartzScheduleTriggerEngine extends ScheduleTriggerEngine {
return triggers;
}
@Override
public ScheduleTrigger parseTrigger(String context, XContentParser parser) throws IOException {
Schedule schedule = scheduleRegistry.parse(context, parser);
return new ScheduleTrigger(schedule);
}
@Override
public ScheduleTriggerEvent parseTriggerEvent(String context, XContentParser parser) throws IOException {
return ScheduleTriggerEvent.parse(context, parser);
}
void notifyListeners(String name, JobExecutionContext ctx) {
ScheduleTriggerEvent event = new ScheduleTriggerEvent(new DateTime(ctx.getFireTime()), new DateTime(ctx.getScheduledFireTime()));
for (Listener listener : listeners) {
listener.triggered(name, event);
executor.execute(new ListenerRunnable(listener, name, event));
}
}
// This Quartz thread pool will always accept. On this thread we will only index a watch record and add it to the work queue
public static final class WatcherQuartzThreadPool implements org.quartz.spi.ThreadPool {
static class ListenerRunnable implements Runnable {
private final EsThreadPoolExecutor executor;
private final Listener listener;
private final String jobName;
private final ScheduleTriggerEvent event;
public WatcherQuartzThreadPool() {
this.executor = (EsThreadPoolExecutor) threadPool.executor(WatcherPlugin.SCHEDULER_THREAD_POOL_NAME);
public ListenerRunnable(Listener listener, String jobName, ScheduleTriggerEvent event) {
this.listener = listener;
this.jobName = jobName;
this.event = event;
}
@Override
public void run() {
listener.triggered(jobName, event);
}
}
public static final class WatcherQuartzThreadPool implements org.quartz.spi.ThreadPool {
@Override
public boolean runInThread(Runnable runnable) {
executor.execute(runnable);
runnable.run();
return true;
}

View File

@ -0,0 +1,250 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.watcher.trigger.schedule.engine;
import org.elasticsearch.common.collect.ImmutableMap;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.joda.time.DateTime;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.watcher.support.ThreadPoolSettingsBuilder;
import org.elasticsearch.watcher.support.clock.Clock;
import org.elasticsearch.watcher.trigger.schedule.*;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
/**
*
*/
public class SchedulerScheduleTriggerEngine extends ScheduleTriggerEngine {
public static final String THREAD_POOL_NAME = "watcher_scheduler";
public static Settings additionalSettings(Settings nodeSettings) {
Settings settings = nodeSettings.getAsSettings("threadpool." + THREAD_POOL_NAME);
if (!settings.names().isEmpty()) {
// scheduler TP is already configured in the node settings
// no need for additional settings
return ImmutableSettings.EMPTY;
}
int availableProcessors = EsExecutors.boundedNumberOfProcessors(settings);
return new ThreadPoolSettingsBuilder.Fixed(THREAD_POOL_NAME)
.size(availableProcessors)
.queueSize(1000)
.build();
}
private final Clock clock;
private volatile Schedules schedules;
private ScheduledExecutorService scheduler;
private EsThreadPoolExecutor executor;
@Inject
public SchedulerScheduleTriggerEngine(Settings settings, Clock clock, ScheduleRegistry scheduleRegistry, ThreadPool threadPool) {
super(settings, scheduleRegistry);
this.clock = clock;
this.executor = (EsThreadPoolExecutor) threadPool.executor(THREAD_POOL_NAME);
}
@Override
public void start(Collection<Job> jobs) {
logger.debug("starting schedule engine...");
this.scheduler = Executors.newScheduledThreadPool(1, EsExecutors.daemonThreadFactory("trigger_engine_scheduler"));
long starTime = clock.millis();
List<ActiveSchedule> schedules = new ArrayList<>();
for (Job job : jobs) {
if (job.trigger() instanceof ScheduleTrigger) {
ScheduleTrigger trigger = (ScheduleTrigger) job.trigger();
schedules.add(new ActiveSchedule(job.name(), trigger.schedule(), starTime));
}
}
this.schedules = new Schedules(schedules);
logger.debug("schedule engine started at [{}]", DateTime.now());
}
@Override
public void stop() {
logger.debug("stopping schedule engine...");
scheduler.shutdownNow();
try {
scheduler.awaitTermination(4, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
executor.getQueue().clear();
logger.debug("schedule engine stopped");
}
@Override
public void add(Job job) {
assert job.trigger() instanceof ScheduleTrigger;
ScheduleTrigger trigger = (ScheduleTrigger) job.trigger();
ActiveSchedule schedule = new ActiveSchedule(job.name(), trigger.schedule(), clock.millis());
schedules = schedules.add(schedule);
}
@Override
public boolean remove(String jobName) {
Schedules newSchedules = schedules.remove(jobName);
if (newSchedules == null) {
return false;
}
schedules = newSchedules;
return true;
}
void notifyListeners(String name, long triggeredTime, long scheduledTime) {
logger.trace("triggered job [{}] at [{}] (scheduled time was [{}])", name, new DateTime(triggeredTime), new DateTime(scheduledTime));
final ScheduleTriggerEvent event = new ScheduleTriggerEvent(new DateTime(triggeredTime), new DateTime(scheduledTime));
for (Listener listener : listeners) {
executor.execute(new ListenerRunnable(listener, name, event));
}
}
class ActiveSchedule implements Runnable {
private final String name;
private final Schedule schedule;
private final long startTime;
private volatile ScheduledFuture<?> future;
private volatile long scheduledTime;
public ActiveSchedule(String name, Schedule schedule, long startTime) {
this.name = name;
this.schedule = schedule;
this.startTime = startTime;
// we don't want the schedule to trigger on the start time itself, so we compute
// the next scheduled time by simply computing the schedule time on the startTime + 1
this.scheduledTime = schedule.nextScheduledTimeAfter(startTime, startTime + 1);
long delay = scheduledTime > 0 ? scheduledTime - clock.millis() : -1;
if (delay > 0) {
future = scheduler.schedule(this, delay, TimeUnit.MILLISECONDS);
} else {
future = null;
}
}
@Override
public void run() {
long triggeredTime = clock.millis();
notifyListeners(name, triggeredTime, scheduledTime);
scheduledTime = schedule.nextScheduledTimeAfter(startTime, triggeredTime);
long delay = scheduledTime > 0 ? scheduledTime - triggeredTime : -1;
if (delay > 0) {
future = scheduler.schedule(this, delay, TimeUnit.MILLISECONDS);
}
}
public void cancel() {
if (future != null) {
future.cancel(true);
}
}
}
static class ListenerRunnable implements Runnable {
private final Listener listener;
private final String jobName;
private final ScheduleTriggerEvent event;
public ListenerRunnable(Listener listener, String jobName, ScheduleTriggerEvent event) {
this.listener = listener;
this.jobName = jobName;
this.event = event;
}
@Override
public void run() {
listener.triggered(jobName, event);
}
}
static class Schedules {
private final ActiveSchedule[] schedules;
private final ImmutableMap<String, ActiveSchedule> scheduleByName;
Schedules(Collection<ActiveSchedule> schedules) {
ImmutableMap.Builder<String, ActiveSchedule> builder = ImmutableMap.builder();
this.schedules = new ActiveSchedule[schedules.size()];
int i = 0;
for (ActiveSchedule schedule : schedules) {
builder.put(schedule.name, schedule);
this.schedules[i++] = schedule;
}
this.scheduleByName = builder.build();
}
public Schedules(ActiveSchedule[] schedules, ImmutableMap<String, ActiveSchedule> scheduleByName) {
this.schedules = schedules;
this.scheduleByName = scheduleByName;
}
public long getNextScheduledTime() {
long time = -1;
for (ActiveSchedule schedule : schedules) {
if (time < 0) {
time = schedule.scheduledTime;
} else {
time = Math.min(time, schedule.scheduledTime);
}
}
return time;
}
public Schedules add(ActiveSchedule schedule) {
boolean replacing = scheduleByName.containsKey(schedule.name);
if (!replacing) {
ActiveSchedule[] newSchedules = new ActiveSchedule[schedules.length + 1];
System.arraycopy(schedules, 0, newSchedules, 0, schedules.length);
newSchedules[schedules.length] = schedule;
ImmutableMap<String, ActiveSchedule> newScheduleByName = ImmutableMap.<String, ActiveSchedule>builder()
.putAll(scheduleByName)
.put(schedule.name, schedule)
.build();
return new Schedules(newSchedules, newScheduleByName);
}
ActiveSchedule[] newSchedules = new ActiveSchedule[schedules.length];
ImmutableMap.Builder<String, ActiveSchedule> builder = ImmutableMap.builder();
for (int i = 0; i < schedules.length; i++) {
ActiveSchedule sched = schedules[i].name.equals(schedule.name) ? schedule : schedules[i];
schedules[i] = sched;
builder.put(sched.name, sched);
}
return new Schedules(newSchedules, builder.build());
}
public Schedules remove(String name) {
if (!scheduleByName.containsKey(name)) {
return null;
}
ImmutableMap.Builder<String, ActiveSchedule> builder = ImmutableMap.builder();
ActiveSchedule[] newSchedules = new ActiveSchedule[schedules.length - 1];
int i = 0;
for (ActiveSchedule schedule : schedules) {
if (!schedule.name.equals(name)) {
newSchedules[i++] = schedule;
builder.put(schedule.name, schedule);
} else {
schedule.cancel();
}
}
return new Schedules(newSchedules, builder.build());
}
}
}

View File

@ -0,0 +1,198 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.watcher.trigger.schedule.engine;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.joda.time.DateTime;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.watcher.support.ThreadPoolSettingsBuilder;
import org.elasticsearch.watcher.support.clock.Clock;
import org.elasticsearch.watcher.trigger.schedule.*;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
/**
*
*/
public class SimpleTickerScheduleTriggerEngine extends ScheduleTriggerEngine {
public static final String THREAD_POOL_NAME = "watcher_scheduler";
public static Settings additionalSettings(Settings nodeSettings) {
Settings settings = nodeSettings.getAsSettings("threadpool." + THREAD_POOL_NAME);
if (!settings.names().isEmpty()) {
// scheduler TP is already configured in the node settings
// no need for additional settings
return ImmutableSettings.EMPTY;
}
int availableProcessors = EsExecutors.boundedNumberOfProcessors(settings);
return new ThreadPoolSettingsBuilder.Fixed(THREAD_POOL_NAME)
.size(availableProcessors)
.queueSize(1000)
.build();
}
private final Clock clock;
private volatile Map<String, ActiveSchedule> schedules;
private Ticker ticker;
private EsThreadPoolExecutor executor;
@Inject
public SimpleTickerScheduleTriggerEngine(Settings settings, Clock clock, ScheduleRegistry scheduleRegistry, ThreadPool threadPool) {
super(settings, scheduleRegistry);
this.schedules = new ConcurrentHashMap<>();
this.clock = clock;
this.executor = (EsThreadPoolExecutor) threadPool.executor(THREAD_POOL_NAME);
}
@Override
public void start(Collection<Job> jobs) {
long starTime = clock.millis();
Map<String, ActiveSchedule> schedules = new ConcurrentHashMap<>();
for (Job job : jobs) {
if (job.trigger() instanceof ScheduleTrigger) {
ScheduleTrigger trigger = (ScheduleTrigger) job.trigger();
schedules.put(job.name(), new ActiveSchedule(job.name(), trigger.schedule(), starTime));
}
}
this.schedules = schedules;
this.ticker = new Ticker();
}
@Override
public void stop() {
ticker.close();
executor.getQueue().clear();
}
@Override
public void add(Job job) {
assert job.trigger() instanceof ScheduleTrigger;
ScheduleTrigger trigger = (ScheduleTrigger) job.trigger();
schedules.put(job.name(), new ActiveSchedule(job.name(), trigger.schedule(), clock.millis()));
}
@Override
public boolean remove(String jobName) {
return schedules.remove(jobName) != null;
}
void checkJobs() {
long triggeredTime = clock.millis();
for (ActiveSchedule schedule : schedules.values()) {
long scheduledTime = schedule.check(triggeredTime);
if (scheduledTime > 0) {
notifyListeners(schedule.name, triggeredTime, scheduledTime);
}
}
}
void notifyListeners(String name, long triggeredTime, long scheduledTime) {
logger.trace("triggered job [{}] at [{}] (scheduled time was [{}])", name, new DateTime(triggeredTime), new DateTime(scheduledTime));
final ScheduleTriggerEvent event = new ScheduleTriggerEvent(new DateTime(triggeredTime), new DateTime(scheduledTime));
for (Listener listener : listeners) {
executor.execute(new ListenerRunnable(listener, name, event));
}
}
static class ActiveSchedule {
private final String name;
private final Schedule schedule;
private final long startTime;
private volatile long scheduledTime;
public ActiveSchedule(String name, Schedule schedule, long startTime) {
this.name = name;
this.schedule = schedule;
this.startTime = startTime;
// we don't want the schedule to trigger on the start time itself, so we compute
// the next scheduled time by simply computing the schedule time on the startTime + 1
this.scheduledTime = schedule.nextScheduledTimeAfter(startTime, startTime + 1);
}
/**
* Checks whether the given time is the same or after the scheduled time of this schedule. If so, the scheduled time is
* returned a new scheduled time is computed and set. Otherwise (the given time is before the scheduled time), {@code -1}
* is returned.
*/
public long check(long time) {
if (time < scheduledTime) {
return -1;
}
long prevScheduledTime = scheduledTime == 0 ? time : scheduledTime;
scheduledTime = schedule.nextScheduledTimeAfter(startTime, scheduledTime);
return prevScheduledTime;
}
}
static class ListenerRunnable implements Runnable {
private final Listener listener;
private final String jobName;
private final ScheduleTriggerEvent event;
public ListenerRunnable(Listener listener, String jobName, ScheduleTriggerEvent event) {
this.listener = listener;
this.jobName = jobName;
this.event = event;
}
@Override
public void run() {
listener.triggered(jobName, event);
}
}
class Ticker extends Thread {
private volatile boolean active = true;
private final CountDownLatch closeLatch = new CountDownLatch(1);
public Ticker() {
super("ticker-schedule-trigger-engine");
setDaemon(true);
start();
}
@Override
public void run() {
// calibrate with round clock
while (clock.millis() % 1000 > 15) {
}
while (active) {
logger.trace("checking jobs [{}]", DateTime.now());
checkJobs();
try {
sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
closeLatch.countDown();
}
public void close() {
active = false;
try {
closeLatch.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
}

View File

@ -0,0 +1,168 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.watcher.trigger.schedule.engine;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.joda.time.DateTime;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.watcher.support.ThreadPoolSettingsBuilder;
import org.elasticsearch.watcher.support.clock.Clock;
import org.elasticsearch.watcher.trigger.schedule.*;
import java.util.Collection;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
/**
*
*/
public class TimerTickerScheduleTriggerEngine extends ScheduleTriggerEngine {
public static final String THREAD_POOL_NAME = "watcher_scheduler";
public static Settings additionalSettings(Settings nodeSettings) {
Settings settings = nodeSettings.getAsSettings("threadpool." + THREAD_POOL_NAME);
if (!settings.names().isEmpty()) {
// scheduler TP is already configured in the node settings
// no need for additional settings
return ImmutableSettings.EMPTY;
}
int availableProcessors = EsExecutors.boundedNumberOfProcessors(settings);
return new ThreadPoolSettingsBuilder.Fixed(THREAD_POOL_NAME)
.size(availableProcessors)
.queueSize(1000)
.build();
}
private final Clock clock;
private volatile Map<String, ActiveSchedule> schedules;
private Timer timer;
private TimerTask ticker;
private EsThreadPoolExecutor executor;
@Inject
public TimerTickerScheduleTriggerEngine(Settings settings, Clock clock, ScheduleRegistry scheduleRegistry, ThreadPool threadPool) {
super(settings, scheduleRegistry);
this.schedules = new ConcurrentHashMap<>();
this.clock = clock;
this.executor = (EsThreadPoolExecutor) threadPool.executor(THREAD_POOL_NAME);
}
@Override
public void start(Collection<Job> jobs) {
long starTime = clock.millis();
Map<String, ActiveSchedule> schedules = new ConcurrentHashMap<>();
for (Job job : jobs) {
if (job.trigger() instanceof ScheduleTrigger) {
ScheduleTrigger trigger = (ScheduleTrigger) job.trigger();
schedules.put(job.name(), new ActiveSchedule(job.name(), trigger.schedule(), starTime));
}
}
this.schedules = schedules;
this.ticker = new TimerTask() {
@Override
public void run() {
checkJobs();
}
};
this.timer = new Timer("ticker-schedule-trigger-engine", true);
this.timer.scheduleAtFixedRate(ticker, clock.millis() % 1000 , 10);
}
@Override
public void stop() {
ticker.cancel();
timer.cancel();
executor.getQueue().clear();
}
@Override
public void add(Job job) {
assert job.trigger() instanceof ScheduleTrigger;
ScheduleTrigger trigger = (ScheduleTrigger) job.trigger();
schedules.put(job.name(), new ActiveSchedule(job.name(), trigger.schedule(), clock.millis()));
}
@Override
public boolean remove(String jobName) {
return schedules.remove(jobName) != null;
}
void checkJobs() {
long triggeredTime = clock.millis();
for (ActiveSchedule schedule : schedules.values()) {
long scheduledTime = schedule.check(triggeredTime);
if (scheduledTime > 0) {
notifyListeners(schedule.name, triggeredTime, scheduledTime);
}
}
}
void notifyListeners(String name, long triggeredTime, long scheduledTime) {
final ScheduleTriggerEvent event = new ScheduleTriggerEvent(new DateTime(triggeredTime), new DateTime(scheduledTime));
for (Listener listener : listeners) {
executor.execute(new ListenerRunnable(listener, name, event));
}
}
static class ActiveSchedule {
private final String name;
private final Schedule schedule;
private final long startTime;
private volatile long scheduledTime;
public ActiveSchedule(String name, Schedule schedule, long startTime) {
this.name = name;
this.schedule = schedule;
this.startTime = startTime;
// we don't want the schedule to trigger on the start time itself, so we compute
// the next scheduled time by simply computing the schedule time on the startTime + 1
this.scheduledTime = schedule.nextScheduledTimeAfter(startTime, startTime + 1);
}
/**
* Checks whether the given time is the same or after the scheduled time of this schedule. If so, the scheduled time is
* returned a new scheduled time is computed and set. Otherwise (the given time is before the scheduled time), {@code -1}
* is returned.
*/
public long check(long time) {
if (time < scheduledTime) {
return -1;
}
long prevScheduledTime = scheduledTime == 0 ? time : scheduledTime;
scheduledTime = schedule.nextScheduledTimeAfter(startTime, scheduledTime);
return prevScheduledTime;
}
}
static class ListenerRunnable implements Runnable {
private final Listener listener;
private final String jobName;
private final ScheduleTriggerEvent event;
public ListenerRunnable(Listener listener, String jobName, ScheduleTriggerEvent event) {
this.listener = listener;
this.jobName = jobName;
this.event = event;
}
@Override
public void run() {
listener.triggered(jobName, event);
}
}
}

View File

@ -3,7 +3,7 @@
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.watcher.trigger.schedule.quartz;
package org.elasticsearch.watcher.trigger.schedule.engine;
import org.quartz.*;

View File

@ -72,7 +72,8 @@ public class YearTimes implements Times {
String daysStr = Ints.join(",", this.days);
daysStr = daysStr.replace("32", "L");
String monthsStr = Joiner.on(",").join(months);
crons.add("0 " + minsStr + " " + hrsStr + " " + daysStr + " " + monthsStr + " ?");
String expression = "0 " + minsStr + " " + hrsStr + " " + daysStr + " " + monthsStr + " ?";
crons.add(expression);
}
return crons;
}

View File

@ -9,13 +9,12 @@ import org.elasticsearch.common.cli.CliTool;
import org.elasticsearch.common.cli.CliToolConfig;
import org.elasticsearch.common.cli.Terminal;
import org.elasticsearch.common.cli.commons.CommandLine;
import org.elasticsearch.common.joda.time.DateTime;
import org.elasticsearch.common.joda.time.format.DateTimeFormat;
import org.elasticsearch.common.joda.time.format.DateTimeFormatter;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.env.Environment;
import org.quartz.CronExpression;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Locale;
import org.elasticsearch.watcher.trigger.schedule.Cron;
import static org.elasticsearch.common.cli.CliToolConfig.Builder.cmd;
import static org.elasticsearch.common.cli.CliToolConfig.Builder.option;
@ -50,7 +49,7 @@ public class CronEvalTool extends CliTool {
.options(option("c", "count").hasArg(false).required(false))
.build();
private static final SimpleDateFormat format = new SimpleDateFormat("EEE, d MMM yyyy HH:mm:ss", Locale.ROOT);
private static final DateTimeFormatter formatter = DateTimeFormat.forPattern("EEE, d MMM yyyy HH:mm:ss");
final String expression;
final int count;
@ -79,20 +78,27 @@ public class CronEvalTool extends CliTool {
// when invalid, a parse expression will be thrown with a descriptive error message
// the cli infra handles such exceptions and hows the exceptions' message
CronExpression.validateExpression(expression);
Cron.validate(expression);
terminal.println("Valid!");
Date date = new Date();
DateTime date = DateTime.now();
terminal.println("Now is [" + format.format(date) + "]");
terminal.println("Now is [" + formatter.print(date) + "]");
terminal.println("Here are the next " + count + " times this cron expression will trigger:");
CronExpression cron = new CronExpression(expression);
Cron cron = new Cron(expression);
long time = date.getMillis();
for (int i = 0; i < count; i++) {
date = cron.getNextValidTimeAfter(date);
terminal.println((i+1) + ".\t" + format.format(date));
long prevTime = time;
time = cron.getNextValidTimeAfter(time);
if (time < 0) {
terminal.printError((i + 1) + ".\t Could not compute future times since [" + formatter.print(prevTime) + "] " +
"(perhaps the cron expression only points to times in the past?)");
return ExitStatus.OK;
}
terminal.println((i+1) + ".\t" + formatter.print(time));
}
return ExitStatus.OK;
}

View File

@ -54,6 +54,7 @@ import org.elasticsearch.watcher.transport.actions.stats.WatcherStatsResponse;
import org.elasticsearch.watcher.trigger.ScheduleTriggerEngineMock;
import org.elasticsearch.watcher.trigger.TriggerService;
import org.elasticsearch.watcher.trigger.schedule.Schedule;
import org.elasticsearch.watcher.trigger.schedule.ScheduleModule;
import org.elasticsearch.watcher.trigger.schedule.ScheduleTrigger;
import org.elasticsearch.watcher.trigger.schedule.Schedules;
import org.elasticsearch.watcher.watch.Watch;
@ -86,17 +87,20 @@ public abstract class AbstractWatcherIntegrationTests extends ElasticsearchInteg
boolean shieldEnabled = enableShield();
final ScheduleModule.Engine scheduleEngine = randomFrom(ScheduleModule.Engine.values());
@Override
protected Settings nodeSettings(int nodeOrdinal) {
ImmutableSettings.Builder builder = ImmutableSettings.builder()
return ImmutableSettings.builder()
.put(super.nodeSettings(nodeOrdinal))
.put("scroll.size", randomIntBetween(1, 100))
.put("plugin.types",
(timeWarped() ? TimeWarpedWatcherPlugin.class.getName() : WatcherPlugin.class.getName()) + "," +
(shieldEnabled ? ShieldPlugin.class.getName() + "," : "") +
licensePluginClass().getName())
.put(ShieldSettings.settings(shieldEnabled));
return builder.build();
.put(ShieldSettings.settings(shieldEnabled))
.put("watcher.trigger.schedule.engine", scheduleEngine().name().toLowerCase(Locale.ROOT))
.build();
}
/**
@ -116,6 +120,13 @@ public abstract class AbstractWatcherIntegrationTests extends ElasticsearchInteg
return shieldEnabled;
}
/**
* @return The schedule trigger engine that will be used for the nodes.
*/
protected ScheduleModule.Engine scheduleEngine() {
return scheduleEngine;
}
/**
* Override and returns {@code false} to force running without shield
*/
@ -600,6 +611,8 @@ public abstract class AbstractWatcherIntegrationTests extends ElasticsearchInteg
static class ShieldSettings {
static boolean auditLogsEnabled = SystemPropertyUtil.getBoolean("tests.audit_logs", false);
public static final String IP_FILTER = "allow: all\n";
public static final String USERS =
@ -640,7 +653,7 @@ public abstract class AbstractWatcherIntegrationTests extends ElasticsearchInteg
.put("shield.authc.realms.esusers.files.users_roles", writeFile(folder, "users_roles", USER_ROLES))
.put("shield.authz.store.files.roles", writeFile(folder, "roles.yml", ROLES))
.put("shield.transport.n2n.ip_filter.file", writeFile(folder, "ip_filter.yml", IP_FILTER))
.put("shield.audit.enabled", true)
.put("shield.audit.enabled", auditLogsEnabled)
.build();
}

View File

@ -58,7 +58,7 @@ public class TimeWarpedWatcherPlugin extends WatcherPlugin {
if (module instanceof TriggerModule) {
// replacing scheduler module so we'll
// have control on when it fires a job
modules.add(new MockTriggerModule());
modules.add(new MockTriggerModule(settings));
} else if (module instanceof ClockModule) {
// replacing the clock module so we'll be able
@ -79,6 +79,10 @@ public class TimeWarpedWatcherPlugin extends WatcherPlugin {
public static class MockTriggerModule extends TriggerModule {
public MockTriggerModule(Settings settings) {
super(settings);
}
@Override
protected void registerStandardEngines() {
registerEngine(ScheduleTriggerEngineMock.class);

View File

@ -0,0 +1,81 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.watcher.test.bench;
import org.elasticsearch.common.cli.Terminal;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.watcher.support.clock.SystemClock;
import org.elasticsearch.watcher.trigger.Trigger;
import org.elasticsearch.watcher.trigger.TriggerEngine;
import org.elasticsearch.watcher.trigger.TriggerEvent;
import org.elasticsearch.watcher.trigger.schedule.*;
import org.elasticsearch.watcher.trigger.schedule.engine.SchedulerScheduleTriggerEngine;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
/**
*
*/
public class ScheduleEngineBenchmark {
static final ESLogger logger = Loggers.getLogger(ScheduleEngineBenchmark.class);
public static void main(String[] args) throws Exception {
Settings settings = ImmutableSettings.builder()
.put(SchedulerScheduleTriggerEngine.additionalSettings(ImmutableSettings.EMPTY))
.put("name", "test")
.build();
ThreadPool threadPool = new ThreadPool(settings, null);
ScheduleRegistry scheduleRegistry = new ScheduleRegistry(Collections.<String, Schedule.Parser>emptyMap());
SchedulerScheduleTriggerEngine scheduler = new SchedulerScheduleTriggerEngine(ImmutableSettings.EMPTY, SystemClock.INSTANCE, scheduleRegistry, threadPool);
List<TriggerEngine.Job> jobs = new ArrayList<>(10000);
for (int i = 0; i < 10000; i++) {
jobs.add(new SimpleJob("job_" + i, new CronSchedule("0/3 * * * * ?")));
}
scheduler.start(jobs);
scheduler.register(new TriggerEngine.Listener() {
@Override
public void triggered(String jobName, TriggerEvent event) {
ScheduleTriggerEvent e = (ScheduleTriggerEvent) event;
logger.info("triggered [{}] at fire_time [{}], scheduled time [{}], now [{}]", jobName, event.triggeredTime(), e.scheduledTime(), SystemClock.INSTANCE.now());
}
});
Terminal.DEFAULT.readText("press enter to quit...");
scheduler.stop();
}
static class SimpleJob implements TriggerEngine.Job {
private final String name;
private final ScheduleTrigger trigger;
public SimpleJob(String name, Schedule schedule) {
this.name = name;
this.trigger = new ScheduleTrigger(schedule);
}
@Override
public String name() {
return name;
}
@Override
public Trigger trigger() {
return trigger;
}
}
}

View File

@ -226,7 +226,7 @@ public class WatcherBenchmark {
if (module instanceof TriggerModule) {
// replacing scheduler module so we'll
// have control on when it fires a job
modules.add(new MockTriggerModule());
modules.add(new MockTriggerModule(settings));
} else {
modules.add(module);
@ -237,6 +237,10 @@ public class WatcherBenchmark {
public static class MockTriggerModule extends TriggerModule {
public MockTriggerModule(Settings settings) {
super(settings);
}
@Override
protected void registerStandardEngines() {
registerEngine(ScheduleTriggerEngineMock.class);

View File

@ -5,6 +5,7 @@
*/
package org.elasticsearch.watcher.test.integration;
import com.carrotsearch.randomizedtesting.annotations.Repeat;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.IndicesOptions;
@ -13,6 +14,7 @@ import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.watcher.WatcherException;
import org.elasticsearch.watcher.client.WatchSourceBuilder;
import org.elasticsearch.watcher.client.WatcherClient;
@ -23,6 +25,7 @@ import org.elasticsearch.watcher.transport.actions.delete.DeleteWatchResponse;
import org.elasticsearch.watcher.transport.actions.get.GetWatchResponse;
import org.elasticsearch.watcher.transport.actions.put.PutWatchResponse;
import org.elasticsearch.watcher.trigger.schedule.IntervalSchedule;
import org.elasticsearch.watcher.trigger.schedule.ScheduleModule;
import org.elasticsearch.watcher.trigger.schedule.Schedules;
import org.elasticsearch.watcher.watch.WatchStore;
import org.junit.Test;
@ -47,8 +50,14 @@ import static org.hamcrest.Matchers.*;
/**
*/
@TestLogging("watcher.trigger.schedule:TRACE")
public class BasicWatcherTests extends AbstractWatcherIntegrationTests {
@Override
protected ScheduleModule.Engine scheduleEngine() {
return ScheduleModule.Engine.HASHWHEEL;
}
@Test
public void testIndexWatch() throws Exception {
WatcherClient watcherClient = watcherClient();
@ -239,7 +248,7 @@ public class BasicWatcherTests extends AbstractWatcherIntegrationTests {
assertThat(count, equalTo(findNumberOfPerformedActions("_name")));
}
@Test
@Test //@Repeat(iterations = 10)
public void testConditionSearchWithSource() throws Exception {
String variable = randomFrom("ctx.execution_time", "ctx.trigger.scheduled_time", "ctx.trigger.triggered_time");
SearchSourceBuilder searchSourceBuilder = searchSource().query(filteredQuery(
@ -251,7 +260,7 @@ public class BasicWatcherTests extends AbstractWatcherIntegrationTests {
testConditionSearch(newInputSearchRequest("events").source(searchSourceBuilder));
}
@Test
@Test @Repeat(iterations = 10)
public void testConditionSearchWithIndexedTemplate() throws Exception {
String variable = randomFrom("ctx.execution_time", "ctx.trigger.scheduled_time", "ctx.trigger.triggered_time");
SearchSourceBuilder searchSourceBuilder = searchSource().query(filteredQuery(
@ -298,6 +307,8 @@ public class BasicWatcherTests extends AbstractWatcherIntegrationTests {
timeWarp().scheduler().trigger("_name1");
timeWarp().scheduler().trigger("_name2");
refresh();
} else {
Thread.sleep(5000);
}
assertWatchWithMinimumPerformedActionsCount("_name1", 1);
@ -325,10 +336,13 @@ public class BasicWatcherTests extends AbstractWatcherIntegrationTests {
String watchName = "_name";
assertAcked(prepareCreate("events").addMapping("event", "_timestamp", "enabled=true", "level", "type=string"));
watcherClient().preparePutWatch(watchName)
.setSource(createWatchSource(interval("5s"), request, "return ctx.payload.hits.total >= 3"))
.get();
logger.info("created watch [{}] at [{}]", watchName, DateTime.now());
client().prepareIndex("events", "event")
.setCreate(true)
.setSource("level", "a")
@ -337,7 +351,9 @@ public class BasicWatcherTests extends AbstractWatcherIntegrationTests {
.setCreate(true)
.setSource("level", "a")
.get();
refresh();
if (timeWarped()) {
timeWarp().clock().fastForwardSeconds(5);
timeWarp().scheduler().trigger(watchName);

View File

@ -33,13 +33,11 @@ public class ScheduleTriggerEngineMock extends ScheduleTriggerEngine {
private final ESLogger logger;
private final ConcurrentMap<String, Job> jobs = new ConcurrentHashMap<>();
private final Clock clock;
private final ScheduleRegistry scheduleRegistry;
@Inject
public ScheduleTriggerEngineMock(Settings settings, ScheduleRegistry scheduleRegistry, Clock clock) {
super(settings);
super(settings, scheduleRegistry);
this.logger = Loggers.getLogger(ScheduleTriggerEngineMock.class, settings);
this.scheduleRegistry = scheduleRegistry;
this.clock = clock;
}

View File

@ -34,7 +34,7 @@ public class CronScheduleTests extends ScheduleTestCase {
parser.nextToken();
CronSchedule schedule = new CronSchedule.Parser().parse(parser);
assertThat(schedule.crons(), arrayWithSize(1));
assertThat(schedule.crons()[0], is("0 0/5 * * * ?"));
assertThat(schedule.crons()[0].expression(), is("0 0/5 * * * ?"));
}
@Test
@ -48,10 +48,11 @@ public class CronScheduleTests extends ScheduleTestCase {
XContentParser parser = JsonXContent.jsonXContent.createParser(bytes);
parser.nextToken();
CronSchedule schedule = new CronSchedule.Parser().parse(parser);
assertThat(schedule.crons(), arrayWithSize(3));
assertThat(schedule.crons(), hasItemInArray("0 0/1 * * * ?"));
assertThat(schedule.crons(), hasItemInArray("0 0/2 * * * ?"));
assertThat(schedule.crons(), hasItemInArray("0 0/3 * * * ?"));
String[] crons = expressions(schedule);
assertThat(crons, arrayWithSize(3));
assertThat(crons, hasItemInArray("0 0/1 * * * ?"));
assertThat(crons, hasItemInArray("0 0/2 * * * ?"));
assertThat(crons, hasItemInArray("0 0/3 * * * ?"));
}
@Test

View File

@ -6,12 +6,12 @@
package org.elasticsearch.watcher.trigger.schedule;
import com.carrotsearch.randomizedtesting.annotations.Repeat;
import org.elasticsearch.watcher.WatcherSettingsException;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.primitives.Ints;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.watcher.WatcherSettingsException;
import org.elasticsearch.watcher.trigger.schedule.support.DayTimes;
import org.junit.Test;
@ -26,7 +26,7 @@ public class DailyScheduleTests extends ScheduleTestCase {
@Test
public void test_Default() throws Exception {
DailySchedule schedule = new DailySchedule();
String[] crons = schedule.crons();
String[] crons = expressions(schedule.crons());
assertThat(crons, arrayWithSize(1));
assertThat(crons, arrayContaining("0 0 0 * * ?"));
}
@ -35,7 +35,7 @@ public class DailyScheduleTests extends ScheduleTestCase {
public void test_SingleTime() throws Exception {
DayTimes time = validDayTime();
DailySchedule schedule = new DailySchedule(time);
String[] crons = schedule.crons();
String[] crons = expressions(schedule);
assertThat(crons, arrayWithSize(1));
assertThat(crons, arrayContaining("0 " + Ints.join(",", time.minute()) + " " + Ints.join(",", time.hour()) + " * * ?"));
}
@ -57,7 +57,7 @@ public class DailyScheduleTests extends ScheduleTestCase {
public void test_MultipleTimes() throws Exception {
DayTimes[] times = validDayTimes();
DailySchedule schedule = new DailySchedule(times);
String[] crons = schedule.crons();
String[] crons = expressions(schedule);
assertThat(crons, arrayWithSize(times.length));
for (DayTimes time : times) {
assertThat(crons, hasItemInArray("0 " + Ints.join(",", time.minute()) + " " + Ints.join(",", time.hour()) + " * * ?"));

View File

@ -6,13 +6,13 @@
package org.elasticsearch.watcher.trigger.schedule;
import com.carrotsearch.randomizedtesting.annotations.Repeat;
import org.elasticsearch.watcher.WatcherSettingsException;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.Collections2;
import org.elasticsearch.common.primitives.Ints;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.watcher.WatcherSettingsException;
import org.junit.Test;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
@ -26,7 +26,7 @@ public class HourlyScheduleTests extends ScheduleTestCase {
@Test
public void test_Default() throws Exception {
HourlySchedule schedule = new HourlySchedule();
String[] crons = schedule.crons();
String[] crons = expressions(schedule);
assertThat(crons, arrayWithSize(1));
assertThat(crons, arrayContaining("0 0 * * * ?"));
}
@ -35,7 +35,7 @@ public class HourlyScheduleTests extends ScheduleTestCase {
public void test_SingleMinute() throws Exception {
int minute = validMinute();
HourlySchedule schedule = new HourlySchedule(minute);
String[] crons = schedule.crons();
String[] crons = expressions(schedule);
assertThat(crons, arrayWithSize(1));
assertThat(crons, arrayContaining("0 " + minute + " * * * ?"));
}
@ -50,7 +50,7 @@ public class HourlyScheduleTests extends ScheduleTestCase {
int[] minutes = validMinutes();
String minutesStr = Ints.join(",", minutes);
HourlySchedule schedule = new HourlySchedule(minutes);
String[] crons = schedule.crons();
String[] crons = expressions(schedule);
assertThat(crons, arrayWithSize(1));
assertThat(crons, arrayContaining("0 " + minutesStr + " * * * ?"));
}

View File

@ -6,12 +6,12 @@
package org.elasticsearch.watcher.trigger.schedule;
import com.carrotsearch.randomizedtesting.annotations.Repeat;
import org.elasticsearch.watcher.WatcherSettingsException;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.primitives.Ints;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.watcher.WatcherSettingsException;
import org.elasticsearch.watcher.trigger.schedule.support.DayTimes;
import org.elasticsearch.watcher.trigger.schedule.support.MonthTimes;
import org.junit.Test;
@ -27,7 +27,7 @@ public class MonthlyScheduleTests extends ScheduleTestCase {
@Test
public void test_Default() throws Exception {
MonthlySchedule schedule = new MonthlySchedule();
String[] crons = schedule.crons();
String[] crons = expressions(schedule);
assertThat(crons, arrayWithSize(1));
assertThat(crons, arrayContaining("0 0 0 1 * ?"));
}
@ -36,7 +36,7 @@ public class MonthlyScheduleTests extends ScheduleTestCase {
public void test_SingleTime() throws Exception {
MonthTimes time = validMonthTime();
MonthlySchedule schedule = new MonthlySchedule(time);
String[] crons = schedule.crons();
String[] crons = expressions(schedule);
assertThat(crons, arrayWithSize(time.times().length));
for (DayTimes dayTimes : time.times()) {
String minStr = Ints.join(",", dayTimes.minute());
@ -51,7 +51,7 @@ public class MonthlyScheduleTests extends ScheduleTestCase {
public void test_MultipleTimes() throws Exception {
MonthTimes[] times = validMonthTimes();
MonthlySchedule schedule = new MonthlySchedule(times);
String[] crons = schedule.crons();
String[] crons = expressions(schedule);
int count = 0;
for (int i = 0; i < times.length; i++) {
count += times[i].times().length;

View File

@ -23,6 +23,18 @@ import static org.elasticsearch.watcher.trigger.schedule.Schedules.*;
*/
public abstract class ScheduleTestCase extends ElasticsearchTestCase {
protected static String[] expressions(CronnableSchedule schedule) {
return expressions(schedule.crons);
}
protected static String[] expressions(Cron[] crons) {
String[] expressions = new String[crons.length];
for (int i = 0; i < expressions.length; i++) {
expressions[i] = crons[i].expression();
}
return expressions;
}
protected static MonthlySchedule randomMonthlySchedule() {
switch (randomIntBetween(1, 4)) {
case 1: return monthly().build();
@ -145,10 +157,13 @@ public abstract class ScheduleTestCase extends ElasticsearchTestCase {
}
protected static int[] randomDaysOfMonth() {
if (rarely()) {
return new int[] { 32 };
}
int count = randomIntBetween(1, 5);
Set<Integer> days = new HashSet<>();
for (int i = 0; i < count; i++) {
days.add(randomIntBetween(1, 32));
days.add(randomIntBetween(1, 31));
}
return Ints.toArray(days);
}

View File

@ -6,13 +6,13 @@
package org.elasticsearch.watcher.trigger.schedule;
import com.carrotsearch.randomizedtesting.annotations.Repeat;
import org.elasticsearch.watcher.WatcherSettingsException;
import org.elasticsearch.common.base.Joiner;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.primitives.Ints;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.watcher.WatcherSettingsException;
import org.elasticsearch.watcher.trigger.schedule.support.DayOfWeek;
import org.elasticsearch.watcher.trigger.schedule.support.DayTimes;
import org.elasticsearch.watcher.trigger.schedule.support.WeekTimes;
@ -29,7 +29,7 @@ public class WeeklyScheduleTests extends ScheduleTestCase {
@Test
public void test_Default() throws Exception {
WeeklySchedule schedule = new WeeklySchedule();
String[] crons = schedule.crons();
String[] crons = expressions(schedule);
assertThat(crons, arrayWithSize(1));
assertThat(crons, arrayContaining("0 0 0 ? * MON"));
}
@ -38,7 +38,7 @@ public class WeeklyScheduleTests extends ScheduleTestCase {
public void test_SingleTime() throws Exception {
WeekTimes time = validWeekTime();
WeeklySchedule schedule = new WeeklySchedule(time);
String[] crons = schedule.crons();
String[] crons = expressions(schedule);
assertThat(crons, arrayWithSize(time.times().length));
for (DayTimes dayTimes : time.times()) {
assertThat(crons, hasItemInArray("0 " + Ints.join(",", dayTimes.minute()) + " " + Ints.join(",", dayTimes.hour()) + " ? * " + Joiner.on(",").join(time.days())));
@ -49,7 +49,7 @@ public class WeeklyScheduleTests extends ScheduleTestCase {
public void test_MultipleTimes() throws Exception {
WeekTimes[] times = validWeekTimes();
WeeklySchedule schedule = new WeeklySchedule(times);
String[] crons = schedule.crons();
String[] crons = expressions(schedule);
int count = 0;
for (int i = 0; i < times.length; i++) {
count += times[i].times().length;

View File

@ -6,13 +6,13 @@
package org.elasticsearch.watcher.trigger.schedule;
import com.carrotsearch.randomizedtesting.annotations.Repeat;
import org.elasticsearch.watcher.WatcherSettingsException;
import org.elasticsearch.common.base.Joiner;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.primitives.Ints;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.watcher.WatcherSettingsException;
import org.elasticsearch.watcher.trigger.schedule.support.DayTimes;
import org.elasticsearch.watcher.trigger.schedule.support.YearTimes;
import org.junit.Test;
@ -28,16 +28,16 @@ public class YearlyScheduleTests extends ScheduleTestCase {
@Test
public void test_Default() throws Exception {
YearlySchedule schedule = new YearlySchedule();
String[] crons = schedule.crons();
String[] crons = expressions(schedule);
assertThat(crons, arrayWithSize(1));
assertThat(crons, arrayContaining("0 0 0 1 JAN ?"));
}
@Test @Repeat(iterations = 20)
@Test @Repeat(iterations = 120)
public void test_SingleTime() throws Exception {
YearTimes time = validYearTime();
YearlySchedule schedule = new YearlySchedule(time);
String[] crons = schedule.crons();
String[] crons = expressions(schedule);
assertThat(crons, arrayWithSize(time.times().length));
for (DayTimes dayTimes : time.times()) {
String minStr = Ints.join(",", dayTimes.minute());
@ -45,7 +45,9 @@ public class YearlyScheduleTests extends ScheduleTestCase {
String dayStr = Ints.join(",", time.days());
dayStr = dayStr.replace("32", "L");
String monthStr = Joiner.on(",").join(time.months());
assertThat(crons, hasItemInArray("0 " + minStr + " " + hrStr + " " + dayStr + " " + monthStr + " ?"));
String expression = "0 " + minStr + " " + hrStr + " " + dayStr + " " + monthStr + " ?";
logger.info("expression: " + expression);
assertThat(crons, hasItemInArray(expression));
}
}
@ -53,7 +55,7 @@ public class YearlyScheduleTests extends ScheduleTestCase {
public void test_MultipleTimes() throws Exception {
YearTimes[] times = validYearTimes();
YearlySchedule schedule = new YearlySchedule(times);
String[] crons = schedule.crons();
String[] crons = expressions(schedule);
int count = 0;
for (int i = 0; i < times.length; i++) {
count += times[i].times().length;

View File

@ -20,6 +20,7 @@ import org.elasticsearch.watcher.trigger.TriggerEvent;
import org.elasticsearch.watcher.trigger.schedule.Schedule;
import org.elasticsearch.watcher.trigger.schedule.ScheduleRegistry;
import org.elasticsearch.watcher.trigger.schedule.ScheduleTrigger;
import org.elasticsearch.watcher.trigger.schedule.engine.QuartzScheduleTriggerEngine;
import org.elasticsearch.watcher.trigger.schedule.support.DayOfWeek;
import org.elasticsearch.watcher.trigger.schedule.support.WeekTimes;
import org.junit.After;

View File

@ -16,7 +16,6 @@ import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.test.ElasticsearchTestCase;
import org.elasticsearch.watcher.actions.ActionFactory;
import org.elasticsearch.watcher.actions.ActionRegistry;
@ -78,7 +77,6 @@ import org.elasticsearch.watcher.trigger.schedule.support.*;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.util.Collection;
import java.util.Map;
@ -323,11 +321,8 @@ public class WatchTests extends ElasticsearchTestCase {
static class ParseOnlyScheduleTriggerEngine extends ScheduleTriggerEngine {
private final ScheduleRegistry registry;
public ParseOnlyScheduleTriggerEngine(Settings settings, ScheduleRegistry registry) {
super(settings);
this.registry = registry;
super(settings, registry);
}
@Override
@ -350,16 +345,5 @@ public class WatchTests extends ElasticsearchTestCase {
public boolean remove(String jobName) {
return false;
}
@Override
public ScheduleTrigger parseTrigger(String context, XContentParser parser) throws IOException {
Schedule schedule = registry.parse(context, parser);
return new ScheduleTrigger(schedule);
}
@Override
public ScheduleTriggerEvent parseTriggerEvent(String context, XContentParser parser) throws IOException {
return null;
}
}
}