diff --git a/plugin/src/main/java/org/elasticsearch/xpack/watcher/Watcher.java b/plugin/src/main/java/org/elasticsearch/xpack/watcher/Watcher.java index 74bcada4ae0..510534abc62 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/watcher/Watcher.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/watcher/Watcher.java @@ -80,7 +80,7 @@ import org.elasticsearch.xpack.watcher.condition.ConditionFactory; import org.elasticsearch.xpack.watcher.condition.ConditionRegistry; import org.elasticsearch.xpack.watcher.condition.NeverCondition; import org.elasticsearch.xpack.watcher.condition.ScriptCondition; -import org.elasticsearch.xpack.watcher.execution.AsyncTriggerListener; +import org.elasticsearch.xpack.watcher.execution.AsyncTriggerEventConsumer; import org.elasticsearch.xpack.watcher.execution.ExecutionService; import org.elasticsearch.xpack.watcher.execution.InternalWatchExecutor; import org.elasticsearch.xpack.watcher.execution.TriggeredWatch; @@ -135,6 +135,7 @@ import org.elasticsearch.xpack.watcher.transport.actions.service.WatcherServiceA import org.elasticsearch.xpack.watcher.transport.actions.stats.TransportWatcherStatsAction; import org.elasticsearch.xpack.watcher.transport.actions.stats.WatcherStatsAction; import org.elasticsearch.xpack.watcher.trigger.TriggerEngine; +import org.elasticsearch.xpack.watcher.trigger.TriggerEvent; import org.elasticsearch.xpack.watcher.trigger.TriggerService; import org.elasticsearch.xpack.watcher.trigger.manual.ManualTriggerEngine; import org.elasticsearch.xpack.watcher.trigger.schedule.CronSchedule; @@ -146,7 +147,6 @@ import org.elasticsearch.xpack.watcher.trigger.schedule.Schedule; import org.elasticsearch.xpack.watcher.trigger.schedule.ScheduleRegistry; import org.elasticsearch.xpack.watcher.trigger.schedule.WeeklySchedule; import org.elasticsearch.xpack.watcher.trigger.schedule.YearlySchedule; -import org.elasticsearch.xpack.watcher.trigger.schedule.engine.SchedulerScheduleTriggerEngine; import org.elasticsearch.xpack.watcher.trigger.schedule.engine.TickerScheduleTriggerEngine; import org.elasticsearch.xpack.watcher.watch.Watch; import org.joda.time.DateTime; @@ -162,12 +162,12 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Supplier; import java.util.stream.Collectors; import static java.util.Collections.emptyList; -import static org.elasticsearch.xpack.watcher.support.Exceptions.illegalState; public class Watcher implements ActionPlugin, ScriptPlugin { @@ -177,19 +177,6 @@ public class Watcher implements ActionPlugin, ScriptPlugin { Setting.boolSetting("xpack.watcher.encrypt_sensitive_data", false, Setting.Property.NodeScope); public static final Setting MAX_STOP_TIMEOUT_SETTING = Setting.timeSetting("xpack.watcher.stop.timeout", TimeValue.timeValueSeconds(30), Setting.Property.NodeScope); - public static final Setting TRIGGER_SCHEDULE_ENGINE_SETTING = - new Setting<>("xpack.watcher.trigger.schedule.engine", "ticker", s -> { - switch (s) { - case "ticker": - case "scheduler": - return s; - default: - throw new IllegalArgumentException("Can't parse [xpack.watcher.trigger.schedule.engine] must be one of [ticker, " + - "scheduler], was [" + s + "]"); - } - - }, Setting.Property.NodeScope); - private static final ScriptContext.Plugin SCRIPT_PLUGIN = new ScriptContext.Plugin("xpack", "watch"); public static final ScriptContext SCRIPT_CONTEXT = SCRIPT_PLUGIN::getKey; @@ -288,7 +275,7 @@ public class Watcher implements ActionPlugin, ScriptPlugin { final ScheduleRegistry scheduleRegistry = new ScheduleRegistry(scheduleParsers); TriggerEngine manualTriggerEngine = new ManualTriggerEngine(); - TriggerEngine configuredTriggerEngine = getTriggerEngine(clock, scheduleRegistry); + final TriggerEngine configuredTriggerEngine = getTriggerEngine(clock, scheduleRegistry); final Set triggerEngines = new HashSet<>(); triggerEngines.add(manualTriggerEngine); @@ -306,7 +293,7 @@ public class Watcher implements ActionPlugin, ScriptPlugin { final ExecutionService executionService = new ExecutionService(settings, historyStore, triggeredWatchStore, watchExecutor, clock, threadPool, watchParser, watcherClientProxy); - final TriggerEngine.Listener triggerEngineListener = getTriggerEngineListener(executionService); + final Consumer> triggerEngineListener = getTriggerEngineListener(executionService); triggerService.register(triggerEngineListener); final WatcherIndexTemplateRegistry watcherIndexTemplateRegistry = new WatcherIndexTemplateRegistry(settings, @@ -324,23 +311,15 @@ public class Watcher implements ActionPlugin, ScriptPlugin { } protected TriggerEngine getTriggerEngine(Clock clock, ScheduleRegistry scheduleRegistry) { - String engine = TRIGGER_SCHEDULE_ENGINE_SETTING.get(settings); - switch (engine) { - case "scheduler": - return new SchedulerScheduleTriggerEngine(settings, scheduleRegistry, clock); - case "ticker": - return new TickerScheduleTriggerEngine(settings, scheduleRegistry, clock); - default: // should never happen, as the setting is already parsing for scheduler/ticker - throw illegalState("schedule engine must be either set to [scheduler] or [ticker], but was []", engine); - } + return new TickerScheduleTriggerEngine(settings, scheduleRegistry, clock); } protected WatchExecutor getWatchExecutor(ThreadPool threadPool) { return new InternalWatchExecutor(threadPool); } - protected TriggerEngine.Listener getTriggerEngineListener(ExecutionService executionService) { - return new AsyncTriggerListener(settings, executionService); + protected Consumer> getTriggerEngineListener(ExecutionService executionService) { + return new AsyncTriggerEventConsumer(settings, executionService); } private T getService(Class serviceClass, Collection services) { @@ -374,7 +353,6 @@ public class Watcher implements ActionPlugin, ScriptPlugin { for (TemplateConfig templateConfig : WatcherIndexTemplateRegistry.TEMPLATE_CONFIGS) { settings.add(templateConfig.getSetting()); } - settings.add(TRIGGER_SCHEDULE_ENGINE_SETTING); settings.add(INDEX_WATCHER_TEMPLATE_VERSION_SETTING); settings.add(MAX_STOP_TIMEOUT_SETTING); settings.add(ExecutionService.DEFAULT_THROTTLE_PERIOD_SETTING); diff --git a/plugin/src/main/java/org/elasticsearch/xpack/watcher/execution/AsyncTriggerListener.java b/plugin/src/main/java/org/elasticsearch/xpack/watcher/execution/AsyncTriggerEventConsumer.java similarity index 73% rename from plugin/src/main/java/org/elasticsearch/xpack/watcher/execution/AsyncTriggerListener.java rename to plugin/src/main/java/org/elasticsearch/xpack/watcher/execution/AsyncTriggerEventConsumer.java index 3cb6f1988a9..7e637185600 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/watcher/execution/AsyncTriggerListener.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/watcher/execution/AsyncTriggerEventConsumer.java @@ -10,33 +10,33 @@ import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.logging.log4j.util.Supplier; import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.xpack.watcher.trigger.TriggerEngine; import org.elasticsearch.xpack.watcher.trigger.TriggerEvent; +import java.util.function.Consumer; + import static java.util.stream.StreamSupport.stream; -public class AsyncTriggerListener implements TriggerEngine.Listener { +public class AsyncTriggerEventConsumer implements Consumer> { private final Logger logger; private final ExecutionService executionService; - public AsyncTriggerListener(Settings settings, ExecutionService executionService) { - this.logger = Loggers.getLogger(SyncTriggerListener.class, settings); + public AsyncTriggerEventConsumer(Settings settings, ExecutionService executionService) { + this.logger = Loggers.getLogger(SyncTriggerEventConsumer.class, settings); this.executionService = executionService; } @Override - public void triggered(Iterable events) { + public void accept(Iterable events) { try { executionService.processEventsAsync(events); } catch (Exception e) { logger.error( (Supplier) () -> new ParameterizedMessage( "failed to process triggered events [{}]", - (Object) stream(events.spliterator(), false).toArray(size -> new TriggerEvent[size])), + (Object) stream(events.spliterator(), false).toArray(size -> + new TriggerEvent[size])), e); } - } - } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/watcher/execution/SyncTriggerListener.java b/plugin/src/main/java/org/elasticsearch/xpack/watcher/execution/SyncTriggerEventConsumer.java similarity index 73% rename from plugin/src/main/java/org/elasticsearch/xpack/watcher/execution/SyncTriggerListener.java rename to plugin/src/main/java/org/elasticsearch/xpack/watcher/execution/SyncTriggerEventConsumer.java index 8e92038c5b2..91af3247457 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/watcher/execution/SyncTriggerListener.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/watcher/execution/SyncTriggerEventConsumer.java @@ -10,32 +10,33 @@ import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.logging.log4j.util.Supplier; import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.xpack.watcher.trigger.TriggerEngine; import org.elasticsearch.xpack.watcher.trigger.TriggerEvent; +import java.util.function.Consumer; + import static java.util.stream.StreamSupport.stream; -public class SyncTriggerListener implements TriggerEngine.Listener { +public class SyncTriggerEventConsumer implements Consumer> { private final ExecutionService executionService; private final Logger logger; - public SyncTriggerListener(Settings settings, ExecutionService executionService) { - this.logger = Loggers.getLogger(SyncTriggerListener.class, settings); + public SyncTriggerEventConsumer(Settings settings, ExecutionService executionService) { + this.logger = Loggers.getLogger(SyncTriggerEventConsumer.class, settings); this.executionService = executionService; } @Override - public void triggered(Iterable events) { + public void accept(Iterable events) { try { executionService.processEventsSync(events); } catch (Exception e) { logger.error( (Supplier) () -> new ParameterizedMessage( "failed to process triggered events [{}]", - (Object) stream(events.spliterator(), false).toArray(size -> new TriggerEvent[size])), + (Object) stream(events.spliterator(), false).toArray(size -> + new TriggerEvent[size])), e); } } - } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/watcher/trigger/AbstractTriggerEngine.java b/plugin/src/main/java/org/elasticsearch/xpack/watcher/trigger/AbstractTriggerEngine.java deleted file mode 100644 index 876009c16a1..00000000000 --- a/plugin/src/main/java/org/elasticsearch/xpack/watcher/trigger/AbstractTriggerEngine.java +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ -package org.elasticsearch.xpack.watcher.trigger; - -import org.elasticsearch.common.component.AbstractComponent; -import org.elasticsearch.common.settings.Settings; - -import java.util.List; -import java.util.concurrent.CopyOnWriteArrayList; - -public abstract class AbstractTriggerEngine extends AbstractComponent implements - TriggerEngine { - - protected final List listeners = new CopyOnWriteArrayList<>(); - - public AbstractTriggerEngine(Settings settings) { - super(settings); - } - - @Override - public void register(Listener listener) { - listeners.add(listener); - } -} diff --git a/plugin/src/main/java/org/elasticsearch/xpack/watcher/trigger/TriggerEngine.java b/plugin/src/main/java/org/elasticsearch/xpack/watcher/trigger/TriggerEngine.java index b8d6bf042b2..e949370c5c7 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/watcher/trigger/TriggerEngine.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/watcher/trigger/TriggerEngine.java @@ -7,10 +7,12 @@ package org.elasticsearch.xpack.watcher.trigger; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.xpack.watcher.watch.Watch; import java.io.IOException; import java.util.Collection; import java.util.Map; +import java.util.function.Consumer; public interface TriggerEngine { @@ -20,13 +22,13 @@ public interface TriggerEngine { * It's the responsibility of the trigger engine implementation to select the appropriate jobs * from the given list of jobs */ - void start(Collection jobs); + void start(Collection jobs); void stop(); - void register(Listener listener); + void register(Consumer> consumer); - void add(Job job); + void add(Watch job); /** * Removes the job associated with the given name from this trigger engine. @@ -41,19 +43,4 @@ public interface TriggerEngine { T parseTrigger(String context, XContentParser parser) throws IOException; E parseTriggerEvent(TriggerService service, String watchId, String context, XContentParser parser) throws IOException; - - interface Listener { - - void triggered(Iterable events); - - } - - interface Job { - - String id(); - - Trigger trigger(); - } - - } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/watcher/trigger/TriggerService.java b/plugin/src/main/java/org/elasticsearch/xpack/watcher/trigger/TriggerService.java index 01a326da208..c6ff233e037 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/watcher/trigger/TriggerService.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/watcher/trigger/TriggerService.java @@ -18,22 +18,22 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.function.Consumer; import static java.util.Collections.unmodifiableMap; import static org.elasticsearch.xpack.watcher.support.Exceptions.illegalArgument; public class TriggerService extends AbstractComponent { - private final Listeners listeners; + private final GroupedConsumer consumer = new GroupedConsumer(); private final Map engines; public TriggerService(Settings settings, Set engines) { super(settings); - listeners = new Listeners(); Map builder = new HashMap<>(); for (TriggerEngine engine : engines) { builder.put(engine.type(), engine); - engine.register(listeners); + engine.register(consumer); } this.engines = unmodifiableMap(builder); } @@ -54,10 +54,10 @@ public class TriggerService extends AbstractComponent { * Adds the given job to the trigger service. If there is already a registered job in this service with the * same job ID, the newly added job will replace the old job (the old job will not be triggered anymore) * - * @param job The new job + * @param watch The new watch */ - public void add(TriggerEngine.Job job) { - engines.get(job.trigger().type()).add(job); + public void add(Watch watch) { + engines.get(watch.trigger().type()).add(watch); } /** @@ -75,8 +75,8 @@ public class TriggerService extends AbstractComponent { return false; } - public void register(TriggerEngine.Listener listener) { - listeners.add(listener); + public void register(Consumer> consumer) { + this.consumer.add(consumer); } public TriggerEvent simulateEvent(String type, String jobId, Map data) { @@ -149,20 +149,17 @@ public class TriggerService extends AbstractComponent { return engine.parseTriggerEvent(this, watchId, context, parser); } - static class Listeners implements TriggerEngine.Listener { + static class GroupedConsumer implements java.util.function.Consumer> { - private List listeners = new CopyOnWriteArrayList<>(); + private List>> consumers = new CopyOnWriteArrayList<>(); - public void add(TriggerEngine.Listener listener) { - listeners.add(listener); + public void add(Consumer> consumer) { + consumers.add(consumer); } @Override - public void triggered(Iterable events) { - for (TriggerEngine.Listener listener : listeners) { - listener.triggered(events); - } + public void accept(Iterable events) { + consumers.forEach(c -> c.accept(events)); } } - } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/watcher/trigger/manual/ManualTriggerEngine.java b/plugin/src/main/java/org/elasticsearch/xpack/watcher/trigger/manual/ManualTriggerEngine.java index aeef9139882..9f98737ee51 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/watcher/trigger/manual/ManualTriggerEngine.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/watcher/trigger/manual/ManualTriggerEngine.java @@ -8,11 +8,14 @@ package org.elasticsearch.xpack.watcher.trigger.manual; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.xpack.watcher.trigger.TriggerEngine; +import org.elasticsearch.xpack.watcher.trigger.TriggerEvent; import org.elasticsearch.xpack.watcher.trigger.TriggerService; +import org.elasticsearch.xpack.watcher.watch.Watch; import java.io.IOException; import java.util.Collection; import java.util.Map; +import java.util.function.Consumer; import static org.elasticsearch.xpack.watcher.support.Exceptions.illegalArgument; @@ -30,7 +33,7 @@ public class ManualTriggerEngine implements TriggerEngine jobs) { + public void start(Collection jobs) { } @Override @@ -38,11 +41,11 @@ public class ManualTriggerEngine implements TriggerEngine> consumer) { } @Override - public void add(Job job) { + public void add(Watch job) { } @Override diff --git a/plugin/src/main/java/org/elasticsearch/xpack/watcher/trigger/schedule/ScheduleTriggerEngine.java b/plugin/src/main/java/org/elasticsearch/xpack/watcher/trigger/schedule/ScheduleTriggerEngine.java index 0dd01099646..4b96c8617fc 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/watcher/trigger/schedule/ScheduleTriggerEngine.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/watcher/trigger/schedule/ScheduleTriggerEngine.java @@ -6,24 +6,30 @@ package org.elasticsearch.xpack.watcher.trigger.schedule; import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.xpack.watcher.support.WatcherDateTimeUtils; -import org.elasticsearch.xpack.watcher.trigger.AbstractTriggerEngine; +import org.elasticsearch.xpack.watcher.trigger.TriggerEngine; +import org.elasticsearch.xpack.watcher.trigger.TriggerEvent; import org.elasticsearch.xpack.watcher.trigger.TriggerService; import org.joda.time.DateTime; import java.io.IOException; import java.time.Clock; +import java.util.List; import java.util.Map; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.function.Consumer; import static org.elasticsearch.xpack.watcher.support.Exceptions.illegalArgument; import static org.joda.time.DateTimeZone.UTC; -public abstract class ScheduleTriggerEngine extends AbstractTriggerEngine { +public abstract class ScheduleTriggerEngine extends AbstractComponent implements TriggerEngine { public static final String TYPE = ScheduleTrigger.TYPE; + protected final List>> consumers = new CopyOnWriteArrayList<>(); protected final ScheduleRegistry scheduleRegistry; protected final Clock clock; @@ -38,6 +44,12 @@ public abstract class ScheduleTriggerEngine extends AbstractTriggerEngine> consumer) { + consumers.add(consumer); + } + + @Override public ScheduleTriggerEvent simulateEvent(String jobId, @Nullable Map data, TriggerService service) { DateTime now = new DateTime(clock.millis(), UTC); diff --git a/plugin/src/main/java/org/elasticsearch/xpack/watcher/trigger/schedule/engine/SchedulerScheduleTriggerEngine.java b/plugin/src/main/java/org/elasticsearch/xpack/watcher/trigger/schedule/engine/SchedulerScheduleTriggerEngine.java deleted file mode 100644 index 862244bb7cb..00000000000 --- a/plugin/src/main/java/org/elasticsearch/xpack/watcher/trigger/schedule/engine/SchedulerScheduleTriggerEngine.java +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ -package org.elasticsearch.xpack.watcher.trigger.schedule.engine; - -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.xpack.scheduler.SchedulerEngine; -import org.elasticsearch.xpack.watcher.trigger.schedule.ScheduleRegistry; -import org.elasticsearch.xpack.watcher.trigger.schedule.ScheduleTrigger; -import org.elasticsearch.xpack.watcher.trigger.schedule.ScheduleTriggerEngine; -import org.elasticsearch.xpack.watcher.trigger.schedule.ScheduleTriggerEvent; -import org.joda.time.DateTime; - -import java.time.Clock; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.List; - -import static org.joda.time.DateTimeZone.UTC; - -public class SchedulerScheduleTriggerEngine extends ScheduleTriggerEngine { - - private final SchedulerEngine schedulerEngine; - - public SchedulerScheduleTriggerEngine(Settings settings, ScheduleRegistry scheduleRegistry, Clock clock) { - super(settings, scheduleRegistry, clock); - this.schedulerEngine = new SchedulerEngine(clock); - this.schedulerEngine.register(event -> - notifyListeners(event.getJobName(), event.getTriggeredTime(), event.getScheduledTime())); - } - - @Override - public void start(Collection jobs) { - logger.debug("starting schedule engine..."); - final List schedulerJobs = new ArrayList<>(); - jobs.stream() - .filter(job -> job.trigger() instanceof ScheduleTrigger) - .forEach(job -> { - ScheduleTrigger trigger = (ScheduleTrigger) job.trigger(); - schedulerJobs.add(new SchedulerEngine.Job(job.id(), trigger.getSchedule())); - }); - schedulerEngine.start(schedulerJobs); - logger.debug("schedule engine started at [{}]", new DateTime(clock.millis(), UTC)); - } - - @Override - public void stop() { - logger.debug("stopping schedule engine..."); - schedulerEngine.stop(); - logger.debug("schedule engine stopped"); - } - - @Override - public void add(Job job) { - assert job.trigger() instanceof ScheduleTrigger; - ScheduleTrigger trigger = (ScheduleTrigger) job.trigger(); - schedulerEngine.add(new SchedulerEngine.Job(job.id(), trigger.getSchedule())); - } - - @Override - public boolean remove(String jobId) { - return schedulerEngine.remove(jobId); - } - - protected void notifyListeners(String name, long triggeredTime, long scheduledTime) { - logger.trace("triggered job [{}] at [{}] (scheduled time was [{}])", name, new DateTime(triggeredTime, UTC), - new DateTime(scheduledTime, UTC)); - final ScheduleTriggerEvent event = new ScheduleTriggerEvent(name, new DateTime(triggeredTime, UTC), - new DateTime(scheduledTime, UTC)); - for (Listener listener : listeners) { - listener.triggered(Collections.singletonList(event)); - } - } -} diff --git a/plugin/src/main/java/org/elasticsearch/xpack/watcher/trigger/schedule/engine/TickerScheduleTriggerEngine.java b/plugin/src/main/java/org/elasticsearch/xpack/watcher/trigger/schedule/engine/TickerScheduleTriggerEngine.java index 1525e13a773..99237bd0556 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/watcher/trigger/schedule/engine/TickerScheduleTriggerEngine.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/watcher/trigger/schedule/engine/TickerScheduleTriggerEngine.java @@ -13,6 +13,7 @@ import org.elasticsearch.xpack.watcher.trigger.schedule.ScheduleRegistry; import org.elasticsearch.xpack.watcher.trigger.schedule.ScheduleTrigger; import org.elasticsearch.xpack.watcher.trigger.schedule.ScheduleTriggerEngine; import org.elasticsearch.xpack.watcher.trigger.schedule.ScheduleTriggerEvent; +import org.elasticsearch.xpack.watcher.watch.Watch; import org.joda.time.DateTime; import java.time.Clock; @@ -38,10 +39,10 @@ public class TickerScheduleTriggerEngine extends ScheduleTriggerEngine { } @Override - public void start(Collection jobs) { + public void start(Collection jobs) { long starTime = clock.millis(); Map schedules = new ConcurrentHashMap<>(); - for (Job job : jobs) { + for (Watch job : jobs) { if (job.trigger() instanceof ScheduleTrigger) { ScheduleTrigger trigger = (ScheduleTrigger) job.trigger(); schedules.put(job.id(), new ActiveSchedule(job.id(), trigger.getSchedule(), starTime)); @@ -57,7 +58,7 @@ public class TickerScheduleTriggerEngine extends ScheduleTriggerEngine { } @Override - public void add(Job job) { + public void add(Watch job) { assert job.trigger() instanceof ScheduleTrigger; ScheduleTrigger trigger = (ScheduleTrigger) job.trigger(); schedules.put(job.id(), new ActiveSchedule(job.id(), trigger.getSchedule(), clock.millis())); @@ -90,9 +91,7 @@ public class TickerScheduleTriggerEngine extends ScheduleTriggerEngine { } protected void notifyListeners(List events) { - for (Listener listener : listeners) { - listener.triggered(events); - } + consumers.forEach(consumer -> consumer.accept(events)); } static class ActiveSchedule { diff --git a/plugin/src/main/java/org/elasticsearch/xpack/watcher/watch/Watch.java b/plugin/src/main/java/org/elasticsearch/xpack/watcher/watch/Watch.java index eaafd7be5b1..b3d1f429c7d 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/watcher/watch/Watch.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/watcher/watch/Watch.java @@ -17,7 +17,6 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.xpack.common.secret.Secret; @@ -36,7 +35,6 @@ import org.elasticsearch.xpack.watcher.support.WatcherDateTimeUtils; import org.elasticsearch.xpack.watcher.support.xcontent.WatcherXContentParser; import org.elasticsearch.xpack.watcher.transform.ExecutableTransform; import org.elasticsearch.xpack.watcher.trigger.Trigger; -import org.elasticsearch.xpack.watcher.trigger.TriggerEngine; import org.elasticsearch.xpack.watcher.trigger.TriggerService; import org.joda.time.DateTime; @@ -50,11 +48,10 @@ import java.util.regex.Pattern; import static java.util.Collections.unmodifiableMap; import static org.elasticsearch.common.unit.TimeValue.timeValueMillis; -import static org.elasticsearch.common.xcontent.XContentHelper.createParser; import static org.elasticsearch.xpack.watcher.support.Exceptions.ioException; import static org.joda.time.DateTimeZone.UTC; -public class Watch implements TriggerEngine.Job, ToXContentObject { +public class Watch implements ToXContentObject { public static final String ALL_ACTIONS_ID = "_all"; public static final String INCLUDE_STATUS_KEY = "include_status"; @@ -87,12 +84,10 @@ public class Watch implements TriggerEngine.Job, ToXContentObject { this.status = status; } - @Override public String id() { return id; } - @Override public Trigger trigger() { return trigger; } diff --git a/plugin/src/test/java/org/elasticsearch/xpack/watcher/test/AbstractWatcherIntegrationTestCase.java b/plugin/src/test/java/org/elasticsearch/xpack/watcher/test/AbstractWatcherIntegrationTestCase.java index 19457d3697f..8c411962cbd 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/watcher/test/AbstractWatcherIntegrationTestCase.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/watcher/test/AbstractWatcherIntegrationTestCase.java @@ -121,20 +121,16 @@ public abstract class AbstractWatcherIntegrationTestCase extends ESIntegTestCase private static Boolean securityEnabled; - private static String scheduleEngineName; - @Override protected TestCluster buildTestCluster(Scope scope, long seed) throws IOException { if (securityEnabled == null) { securityEnabled = enableSecurity(); } - scheduleEngineName = randomFrom("ticker", "scheduler"); return super.buildTestCluster(scope, seed); } @Override protected Settings nodeSettings(int nodeOrdinal) { - logger.info("using schedule engine [{}]", scheduleEngineName); return Settings.builder() .put(super.nodeSettings(nodeOrdinal)) //TODO: for now lets isolate watcher tests from monitoring (randomize this later) @@ -144,7 +140,6 @@ public abstract class AbstractWatcherIntegrationTestCase extends ESIntegTestCase .put("xpack.watcher.execution.scroll.size", randomIntBetween(1, 100)) .put("xpack.watcher.watch.scroll.size", randomIntBetween(1, 100)) .put(SecuritySettings.settings(securityEnabled)) - .put("xpack.watcher.trigger.schedule.engine", scheduleEngineName) .put("script.inline", "true") // Disable native ML autodetect_process as the c++ controller won't be available .put(MachineLearning.AUTODETECT_PROCESS.getKey(), false) @@ -257,7 +252,6 @@ public abstract class AbstractWatcherIntegrationTestCase extends ESIntegTestCase @AfterClass public static void _cleanupClass() { securityEnabled = null; - scheduleEngineName = null; } @Override diff --git a/plugin/src/test/java/org/elasticsearch/xpack/watcher/test/TimeWarpedWatcher.java b/plugin/src/test/java/org/elasticsearch/xpack/watcher/test/TimeWarpedWatcher.java index 6fce1d9cdce..95b290e71f2 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/watcher/test/TimeWarpedWatcher.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/watcher/test/TimeWarpedWatcher.java @@ -10,15 +10,17 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.watcher.Watcher; import org.elasticsearch.xpack.watcher.execution.ExecutionService; -import org.elasticsearch.xpack.watcher.execution.SyncTriggerListener; +import org.elasticsearch.xpack.watcher.execution.SyncTriggerEventConsumer; import org.elasticsearch.xpack.watcher.execution.WatchExecutor; import org.elasticsearch.xpack.watcher.trigger.ScheduleTriggerEngineMock; import org.elasticsearch.xpack.watcher.trigger.TriggerEngine; +import org.elasticsearch.xpack.watcher.trigger.TriggerEvent; import org.elasticsearch.xpack.watcher.trigger.schedule.ScheduleRegistry; import java.time.Clock; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; +import java.util.function.Consumer; import java.util.stream.Stream; public class TimeWarpedWatcher extends Watcher { @@ -40,8 +42,8 @@ public class TimeWarpedWatcher extends Watcher { } @Override - protected TriggerEngine.Listener getTriggerEngineListener(ExecutionService executionService) { - return new SyncTriggerListener(settings, executionService); + protected Consumer> getTriggerEngineListener(ExecutionService executionService) { + return new SyncTriggerEventConsumer(settings, executionService); } public static class SameThreadExecutor implements WatchExecutor { diff --git a/plugin/src/test/java/org/elasticsearch/xpack/watcher/test/bench/ScheduleEngineTriggerBenchmark.java b/plugin/src/test/java/org/elasticsearch/xpack/watcher/test/bench/ScheduleEngineTriggerBenchmark.java index aff9da5a106..0fc6d6fd034 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/watcher/test/bench/ScheduleEngineTriggerBenchmark.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/watcher/test/bench/ScheduleEngineTriggerBenchmark.java @@ -5,22 +5,24 @@ */ package org.elasticsearch.xpack.watcher.test.bench; -import org.elasticsearch.common.Randomness; +import org.apache.logging.log4j.Logger; import org.elasticsearch.common.SuppressForbidden; +import org.elasticsearch.common.logging.ESLoggerFactory; import org.elasticsearch.common.metrics.MeanMetric; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.xpack.watcher.trigger.TriggerEngine; +import org.elasticsearch.xpack.watcher.condition.AlwaysCondition; +import org.elasticsearch.xpack.watcher.input.none.ExecutableNoneInput; import org.elasticsearch.xpack.watcher.trigger.TriggerEvent; import org.elasticsearch.xpack.watcher.trigger.schedule.ScheduleRegistry; +import org.elasticsearch.xpack.watcher.trigger.schedule.ScheduleTrigger; import org.elasticsearch.xpack.watcher.trigger.schedule.ScheduleTriggerEngine; import org.elasticsearch.xpack.watcher.trigger.schedule.ScheduleTriggerEvent; -import org.elasticsearch.xpack.watcher.trigger.schedule.engine.BaseTriggerEngineTestCase; -import org.elasticsearch.xpack.watcher.trigger.schedule.engine.SchedulerScheduleTriggerEngine; import org.elasticsearch.xpack.watcher.trigger.schedule.engine.TickerScheduleTriggerEngine; +import org.elasticsearch.xpack.watcher.watch.Watch; import java.time.Clock; import java.util.ArrayList; -import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.Locale; import java.util.concurrent.atomic.AtomicBoolean; @@ -32,6 +34,8 @@ import static org.elasticsearch.xpack.watcher.trigger.schedule.Schedules.interva @SuppressForbidden(reason = "benchmark") public class ScheduleEngineTriggerBenchmark { + private static final Logger logger = ESLoggerFactory.getLogger(ScheduleEngineTriggerBenchmark.class); + public static void main(String[] args) throws Exception { int numWatches = 1000; int interval = 2; @@ -55,75 +59,52 @@ public class ScheduleEngineTriggerBenchmark { Settings settings = Settings.builder() .put("name", "test") .build(); - List jobs = new ArrayList<>(numWatches); + List watches = new ArrayList<>(numWatches); for (int i = 0; i < numWatches; i++) { - jobs.add(new BaseTriggerEngineTestCase.SimpleJob("job_" + i, interval(interval + "s"))); + watches.add(new Watch("job_" + i, new ScheduleTrigger(interval(interval + "s")), new ExecutableNoneInput(logger), + AlwaysCondition.INSTANCE, null, null, Collections.emptyList(), null, null)); } ScheduleRegistry scheduleRegistry = new ScheduleRegistry(emptySet()); - List impls = new ArrayList<>(Arrays.asList(new String[]{"schedule", "ticker"})); - Randomness.shuffle(impls); List results = new ArrayList<>(); - for (String impl : impls) { - System.gc(); - System.out.println("====================================="); - System.out.println("===> Testing [" + impl + "] scheduler"); - System.out.println("====================================="); - final AtomicBoolean running = new AtomicBoolean(false); - final AtomicInteger total = new AtomicInteger(); - final MeanMetric triggerMetric = new MeanMetric(); - final MeanMetric tooEarlyMetric = new MeanMetric(); + System.gc(); + System.out.println("====================================="); + System.out.println("===> Testing scheduler"); + System.out.println("====================================="); + final AtomicBoolean running = new AtomicBoolean(false); + final AtomicInteger total = new AtomicInteger(); + final MeanMetric triggerMetric = new MeanMetric(); + final MeanMetric tooEarlyMetric = new MeanMetric(); - final ScheduleTriggerEngine scheduler; - switch (impl) { - case "schedule": - scheduler = new SchedulerScheduleTriggerEngine(Settings.EMPTY, scheduleRegistry, Clock.systemUTC()) { - - @Override - protected void notifyListeners(String name, long triggeredTime, long scheduledTime) { - if (running.get()) { - measure(total, triggerMetric, tooEarlyMetric, triggeredTime, scheduledTime); - } - } - }; - break; - case "ticker": - scheduler = new TickerScheduleTriggerEngine(settings, scheduleRegistry, Clock.systemUTC()) { - - @Override - protected void notifyListeners(List events) { - if (running.get()) { - for (TriggerEvent event : events) { - ScheduleTriggerEvent scheduleTriggerEvent = (ScheduleTriggerEvent) event; - measure(total, triggerMetric, tooEarlyMetric, event.triggeredTime().getMillis(), - scheduleTriggerEvent.scheduledTime().getMillis()); - } - } - } - }; - break; - default: - throw new IllegalArgumentException("impl [" + impl + "] doesn't exist"); + final ScheduleTriggerEngine scheduler = new TickerScheduleTriggerEngine(settings, scheduleRegistry, Clock.systemUTC()) { + @Override + protected void notifyListeners(List events) { + if (running.get()) { + for (TriggerEvent event : events) { + ScheduleTriggerEvent scheduleTriggerEvent = (ScheduleTriggerEvent) event; + measure(total, triggerMetric, tooEarlyMetric, event.triggeredTime().getMillis(), + scheduleTriggerEvent.scheduledTime().getMillis()); + } + } } - scheduler.start(jobs); - System.out.println("Added [" + numWatches + "] jobs"); - running.set(true); - Thread.sleep(benchTime); - running.set(false); - scheduler.stop(); - System.out.println("done, triggered [" + total.get() + "] times, delayed triggered [" + triggerMetric.count() + - "] times, avg [" + triggerMetric.mean() + "] ms"); - results.add(new Stats(impl, total.get(), triggerMetric.count(), triggerMetric.mean(), tooEarlyMetric.count(), - tooEarlyMetric.mean())); - } + }; + scheduler.start(watches); + System.out.println("Added [" + numWatches + "] jobs"); + running.set(true); + Thread.sleep(benchTime); + running.set(false); + scheduler.stop(); + System.out.println("done, triggered [" + total.get() + "] times, delayed triggered [" + triggerMetric.count() + + "] times, avg [" + triggerMetric.mean() + "] ms"); + results.add(new Stats(total.get(), triggerMetric.count(), triggerMetric.mean(), tooEarlyMetric.count(), tooEarlyMetric.mean())); System.out.println(" Name | # triggered | # delayed | avg delay | # too early triggered | avg too early delay"); System.out.println("--------------- | ----------- | --------- | --------- | --------------------- | ------------------ "); for (Stats stats : results) { System.out.printf( Locale.ENGLISH, - "%15s | %11d | %9d | %9d | %21d | %18d\n", - stats.implementation, stats.numberOfTimesTriggered, stats.numberOfTimesDelayed, stats.avgDelayTime, + "%11d | %9d | %9d | %21d | %18d\n", + stats.numberOfTimesTriggered, stats.numberOfTimesDelayed, stats.avgDelayTime, stats.numberOfEarlyTriggered, stats.avgEarlyDelayTime ); } @@ -143,16 +124,14 @@ public class ScheduleEngineTriggerBenchmark { static class Stats { - final String implementation; final int numberOfTimesTriggered; final long numberOfTimesDelayed; final long avgDelayTime; final long numberOfEarlyTriggered; final long avgEarlyDelayTime; - Stats(String implementation, int numberOfTimesTriggered, long numberOfTimesDelayed, double avgDelayTime, + Stats(int numberOfTimesTriggered, long numberOfTimesDelayed, double avgDelayTime, long numberOfEarlyTriggered, double avgEarlyDelayTime) { - this.implementation = implementation; this.numberOfTimesTriggered = numberOfTimesTriggered; this.numberOfTimesDelayed = numberOfTimesDelayed; this.avgDelayTime = Math.round(avgDelayTime); diff --git a/plugin/src/test/java/org/elasticsearch/xpack/watcher/test/bench/WatcherExecutorServiceBenchmark.java b/plugin/src/test/java/org/elasticsearch/xpack/watcher/test/bench/WatcherExecutorServiceBenchmark.java index fa079d51a3a..27844ad1e78 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/watcher/test/bench/WatcherExecutorServiceBenchmark.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/watcher/test/bench/WatcherExecutorServiceBenchmark.java @@ -224,8 +224,6 @@ public class WatcherExecutorServiceBenchmark { protected TriggerEngine getTriggerEngine(Clock clock, ScheduleRegistry scheduleRegistry) { return new ScheduleTriggerEngineMock(settings, scheduleRegistry, clock); } - - } } } diff --git a/plugin/src/test/java/org/elasticsearch/xpack/watcher/trigger/ScheduleTriggerEngineMock.java b/plugin/src/test/java/org/elasticsearch/xpack/watcher/trigger/ScheduleTriggerEngineMock.java index 3ed958e6c1c..0cf3e5b3257 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/watcher/trigger/ScheduleTriggerEngineMock.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/watcher/trigger/ScheduleTriggerEngineMock.java @@ -15,12 +15,13 @@ import org.elasticsearch.xpack.watcher.trigger.schedule.ScheduleRegistry; import org.elasticsearch.xpack.watcher.trigger.schedule.ScheduleTrigger; import org.elasticsearch.xpack.watcher.trigger.schedule.ScheduleTriggerEngine; import org.elasticsearch.xpack.watcher.trigger.schedule.ScheduleTriggerEvent; +import org.elasticsearch.xpack.watcher.watch.Watch; import org.joda.time.DateTime; import java.io.IOException; import java.time.Clock; -import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -31,12 +32,11 @@ import java.util.concurrent.ConcurrentMap; public class ScheduleTriggerEngineMock extends ScheduleTriggerEngine { private final Logger logger; - private final ConcurrentMap jobs = new ConcurrentHashMap<>(); + private final ConcurrentMap watches = new ConcurrentHashMap<>(); public ScheduleTriggerEngineMock(Settings settings, ScheduleRegistry scheduleRegistry, Clock clock) { super(settings, scheduleRegistry, clock); this.logger = Loggers.getLogger(ScheduleTriggerEngineMock.class, settings); - } @Override @@ -51,7 +51,7 @@ public class ScheduleTriggerEngineMock extends ScheduleTriggerEngine { } @Override - public void start(Collection jobs) { + public void start(Collection jobs) { } @Override @@ -59,13 +59,13 @@ public class ScheduleTriggerEngineMock extends ScheduleTriggerEngine { } @Override - public void add(Job job) { - jobs.put(job.id(), job); + public void add(Watch watch) { + watches.put(watch.id(), watch); } @Override public boolean remove(String jobId) { - return jobs.remove(jobId) != null; + return watches.remove(jobId) != null; } public void trigger(String jobName) { @@ -81,9 +81,7 @@ public class ScheduleTriggerEngineMock extends ScheduleTriggerEngine { DateTime now = new DateTime(clock.millis()); logger.debug("firing [{}] at [{}]", jobName, now); ScheduleTriggerEvent event = new ScheduleTriggerEvent(jobName, now, now); - for (Listener listener : listeners) { - listener.triggered(Arrays.asList(event)); - } + consumers.forEach(consumer -> consumer.accept(Collections.singletonList(event))); if (interval != null) { if (clock instanceof ClockMock) { ((ClockMock) clock).fastForward(interval); diff --git a/plugin/src/test/java/org/elasticsearch/xpack/watcher/trigger/schedule/engine/BaseTriggerEngineTestCase.java b/plugin/src/test/java/org/elasticsearch/xpack/watcher/trigger/schedule/engine/BaseTriggerEngineTestCase.java deleted file mode 100644 index 6904ec1a63f..00000000000 --- a/plugin/src/test/java/org/elasticsearch/xpack/watcher/trigger/schedule/engine/BaseTriggerEngineTestCase.java +++ /dev/null @@ -1,243 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ -package org.elasticsearch.xpack.watcher.trigger.schedule.engine; - -import org.elasticsearch.test.ESTestCase; -import org.elasticsearch.xpack.support.clock.ClockMock; -import org.elasticsearch.xpack.watcher.trigger.Trigger; -import org.elasticsearch.xpack.watcher.trigger.TriggerEngine; -import org.elasticsearch.xpack.watcher.trigger.TriggerEvent; -import org.elasticsearch.xpack.watcher.trigger.schedule.Schedule; -import org.elasticsearch.xpack.watcher.trigger.schedule.ScheduleTrigger; -import org.elasticsearch.xpack.watcher.trigger.schedule.support.DayOfWeek; -import org.elasticsearch.xpack.watcher.trigger.schedule.support.WeekTimes; -import org.joda.time.DateTime; -import org.junit.After; -import org.junit.Before; - -import java.util.ArrayList; -import java.util.BitSet; -import java.util.Collections; -import java.util.List; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; - -import static org.elasticsearch.xpack.watcher.trigger.schedule.Schedules.daily; -import static org.elasticsearch.xpack.watcher.trigger.schedule.Schedules.interval; -import static org.elasticsearch.xpack.watcher.trigger.schedule.Schedules.weekly; -import static org.hamcrest.Matchers.is; -import static org.joda.time.DateTimeZone.UTC; - -public abstract class BaseTriggerEngineTestCase extends ESTestCase { - - private TriggerEngine engine; - protected ClockMock clock = ClockMock.frozen(); - - @Before - public void init() throws Exception { - engine = createEngine(); - } - - protected abstract TriggerEngine createEngine(); - - /** - * Dependending on the trigger engine used, we may need to advance the clock, because the implementation might use the clock - * in order to check for new jobs being executed - */ - protected abstract void advanceClockIfNeeded(DateTime newCurrentDateTime); - - @After - public void cleanup() throws Exception { - engine.stop(); - } - - public void testStart() throws Exception { - int count = randomIntBetween(2, 5); - final CountDownLatch firstLatch = new CountDownLatch(count); - final CountDownLatch secondLatch = new CountDownLatch(count); - List jobs = new ArrayList<>(); - for (int i = 0; i < count; i++) { - jobs.add(new SimpleJob(String.valueOf(i), interval("1s"))); - } - final BitSet bits = new BitSet(count); - - engine.register(events -> { - for (TriggerEvent event : events) { - int index = Integer.parseInt(event.jobName()); - if (!bits.get(index)) { - logger.info("job [{}] first fire", index); - bits.set(index); - firstLatch.countDown(); - } else { - logger.info("job [{}] second fire", index); - secondLatch.countDown(); - } - } - }); - - engine.start(jobs); - advanceClockIfNeeded(new DateTime(clock.millis(), UTC).plusMillis(1100)); - if (!firstLatch.await(3 * count, TimeUnit.SECONDS)) { - fail("waiting too long for all watches to be triggered"); - } - - advanceClockIfNeeded(new DateTime(clock.millis(), UTC).plusMillis(1100)); - if (!secondLatch.await(3 * count, TimeUnit.SECONDS)) { - fail("waiting too long for all watches to be triggered"); - } - engine.stop(); - assertThat(bits.cardinality(), is(count)); - } - - public void testAddHourly() throws Exception { - final String name = "job_name"; - final CountDownLatch latch = new CountDownLatch(1); - engine.start(Collections.emptySet()); - engine.register(events -> { - for (TriggerEvent event : events) { - assertThat(event.jobName(), is(name)); - logger.info("triggered job on [{}]", clock); - } - latch.countDown(); - }); - - int randomMinute = randomIntBetween(0, 59); - DateTime testNowTime = new DateTime(clock.millis(), UTC).withMinuteOfHour(randomMinute).withSecondOfMinute(59); - DateTime scheduledTime = testNowTime.plusSeconds(2); - logger.info("Setting current time to [{}], job execution time [{}]", testNowTime, scheduledTime); - - clock.setTime(testNowTime); - engine.add(new SimpleJob(name, daily().at(scheduledTime.getHourOfDay(), scheduledTime.getMinuteOfHour()).build())); - advanceClockIfNeeded(scheduledTime); - - if (!latch.await(5, TimeUnit.SECONDS)) { - fail("waiting too long for all watches to be triggered"); - } - } - - public void testAddDaily() throws Exception { - final String name = "job_name"; - final CountDownLatch latch = new CountDownLatch(1); - engine.start(Collections.emptySet()); - - engine.register(events -> { - for (TriggerEvent event : events) { - assertThat(event.jobName(), is(name)); - logger.info("triggered job on [{}]", new DateTime(clock.millis(), UTC)); - latch.countDown(); - } - }); - - int randomHour = randomIntBetween(0, 23); - int randomMinute = randomIntBetween(0, 59); - - DateTime testNowTime = new DateTime(clock.millis(), UTC).withHourOfDay(randomHour) - .withMinuteOfHour(randomMinute).withSecondOfMinute(59); - DateTime scheduledTime = testNowTime.plusSeconds(2); - logger.info("Setting current time to [{}], job execution time [{}]", testNowTime, scheduledTime); - - clock.setTime(testNowTime); - engine.add(new SimpleJob(name, daily().at(scheduledTime.getHourOfDay(), scheduledTime.getMinuteOfHour()).build())); - advanceClockIfNeeded(scheduledTime); - - if (!latch.await(5, TimeUnit.SECONDS)) { - fail("waiting too long for all watches to be triggered"); - } - } - - public void testAddWeekly() throws Exception { - final String name = "job_name"; - final CountDownLatch latch = new CountDownLatch(1); - engine.start(Collections.emptySet()); - engine.register(events -> { - for (TriggerEvent event : events) { - assertThat(event.jobName(), is(name)); - logger.info("triggered job"); - } - latch.countDown(); - }); - - int randomHour = randomIntBetween(0, 23); - int randomMinute = randomIntBetween(0, 59); - int randomDay = randomIntBetween(1, 7); - - DateTime testNowTime = new DateTime(clock.millis(), UTC).withDayOfWeek(randomDay).withHourOfDay(randomHour) - .withMinuteOfHour(randomMinute).withSecondOfMinute(59); - DateTime scheduledTime = testNowTime.plusSeconds(2); - - logger.info("Setting current time to [{}], job execution time [{}]", testNowTime, scheduledTime); - clock.setTime(testNowTime); - - // fun part here (aka WTF): DayOfWeek with Joda is MON-SUN, starting at 1 - // DayOfWeek with Watcher is SUN-SAT, starting at 1 - int watcherDay = (scheduledTime.getDayOfWeek() % 7) + 1; - engine.add(new SimpleJob(name, weekly().time(WeekTimes.builder() - .on(DayOfWeek.resolve(watcherDay)) - .at(scheduledTime.getHourOfDay(), scheduledTime.getMinuteOfHour()).build()).build())); - advanceClockIfNeeded(scheduledTime); - - if (!latch.await(5, TimeUnit.SECONDS)) { - fail("waiting too long for all watches to be triggered"); - } - } - - public void testAddSameJobSeveralTimesAndExecutedOnce() throws InterruptedException { - engine.start(Collections.emptySet()); - - final CountDownLatch firstLatch = new CountDownLatch(1); - final CountDownLatch secondLatch = new CountDownLatch(1); - AtomicInteger counter = new AtomicInteger(0); - engine.register(events -> { - events.forEach(event -> { - if (counter.getAndIncrement() == 0) { - firstLatch.countDown(); - } else { - secondLatch.countDown(); - } - }); - }); - - int times = scaledRandomIntBetween(3, 30); - for (int i = 0; i < times; i++) { - engine.add(new SimpleJob("_id", interval("1s"))); - } - - advanceClockIfNeeded(new DateTime(clock.millis(), UTC).plusMillis(1100)); - if (!firstLatch.await(3, TimeUnit.SECONDS)) { - fail("waiting too long for all watches to be triggered"); - } - - advanceClockIfNeeded(new DateTime(clock.millis(), UTC).plusMillis(1100)); - if (!secondLatch.await(3, TimeUnit.SECONDS)) { - fail("waiting too long for all watches to be triggered"); - } - - // ensure job was only called twice independent from its name - assertThat(counter.get(), is(2)); - } - - public static class SimpleJob implements TriggerEngine.Job { - - private final String name; - private final ScheduleTrigger trigger; - - public SimpleJob(String name, Schedule schedule) { - this.name = name; - this.trigger = new ScheduleTrigger(schedule); - } - - @Override - public String id() { - return name; - } - - @Override - public Trigger trigger() { - return trigger; - } - } -} diff --git a/plugin/src/test/java/org/elasticsearch/xpack/watcher/trigger/schedule/engine/SchedulerScheduleEngineTests.java b/plugin/src/test/java/org/elasticsearch/xpack/watcher/trigger/schedule/engine/SchedulerScheduleEngineTests.java deleted file mode 100644 index 8a344b88d96..00000000000 --- a/plugin/src/test/java/org/elasticsearch/xpack/watcher/trigger/schedule/engine/SchedulerScheduleEngineTests.java +++ /dev/null @@ -1,26 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ -package org.elasticsearch.xpack.watcher.trigger.schedule.engine; - -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.xpack.watcher.trigger.TriggerEngine; -import org.elasticsearch.xpack.watcher.trigger.schedule.ScheduleRegistry; -import org.joda.time.DateTime; - -import static org.mockito.Mockito.mock; - -public class SchedulerScheduleEngineTests extends BaseTriggerEngineTestCase { - - protected TriggerEngine createEngine() { - return new SchedulerScheduleTriggerEngine(Settings.EMPTY, mock(ScheduleRegistry.class), clock); - } - - // the scheduler configures hard when to schedule, and does not check the clock - @Override - protected void advanceClockIfNeeded(DateTime newCurrentDateTime) { - } - -} diff --git a/plugin/src/test/java/org/elasticsearch/xpack/watcher/trigger/schedule/engine/TickerScheduleEngineTests.java b/plugin/src/test/java/org/elasticsearch/xpack/watcher/trigger/schedule/engine/TickerScheduleEngineTests.java index 869f907cbe6..aef7c4dd6e2 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/watcher/trigger/schedule/engine/TickerScheduleEngineTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/watcher/trigger/schedule/engine/TickerScheduleEngineTests.java @@ -5,22 +5,254 @@ */ package org.elasticsearch.xpack.watcher.trigger.schedule.engine; +import org.apache.logging.log4j.Logger; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xpack.support.clock.ClockMock; +import org.elasticsearch.xpack.watcher.condition.AlwaysCondition; +import org.elasticsearch.xpack.watcher.input.none.ExecutableNoneInput; import org.elasticsearch.xpack.watcher.trigger.TriggerEngine; +import org.elasticsearch.xpack.watcher.trigger.TriggerEvent; +import org.elasticsearch.xpack.watcher.trigger.schedule.Schedule; import org.elasticsearch.xpack.watcher.trigger.schedule.ScheduleRegistry; +import org.elasticsearch.xpack.watcher.trigger.schedule.ScheduleTrigger; +import org.elasticsearch.xpack.watcher.trigger.schedule.support.DayOfWeek; +import org.elasticsearch.xpack.watcher.trigger.schedule.support.WeekTimes; +import org.elasticsearch.xpack.watcher.watch.Watch; import org.joda.time.DateTime; +import org.junit.After; +import org.junit.Before; +import java.util.ArrayList; +import java.util.BitSet; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; + +import static org.elasticsearch.xpack.watcher.trigger.schedule.Schedules.daily; +import static org.elasticsearch.xpack.watcher.trigger.schedule.Schedules.interval; +import static org.elasticsearch.xpack.watcher.trigger.schedule.Schedules.weekly; +import static org.hamcrest.Matchers.is; +import static org.joda.time.DateTimeZone.UTC; import static org.mockito.Mockito.mock; -public class TickerScheduleEngineTests extends BaseTriggerEngineTestCase { +public class TickerScheduleEngineTests extends ESTestCase { - @Override - protected TriggerEngine createEngine() { - return new TickerScheduleTriggerEngine(Settings.EMPTY, mock(ScheduleRegistry.class), clock); + private TriggerEngine engine; + protected ClockMock clock = ClockMock.frozen(); + + @Before + public void init() throws Exception { + engine = createEngine(); } - @Override - protected void advanceClockIfNeeded(DateTime newCurrentDateTime) { + private TriggerEngine createEngine() { + return new TickerScheduleTriggerEngine(Settings.EMPTY, + mock(ScheduleRegistry.class), clock); + } + + private void advanceClockIfNeeded(DateTime newCurrentDateTime) { clock.setTime(newCurrentDateTime); } + + @After + public void cleanup() throws Exception { + engine.stop(); + } + + public void testStart() throws Exception { + int count = randomIntBetween(2, 5); + final CountDownLatch firstLatch = new CountDownLatch(count); + final CountDownLatch secondLatch = new CountDownLatch(count); + List watches = new ArrayList<>(); + for (int i = 0; i < count; i++) { + watches.add(createWatch(String.valueOf(i), interval("1s"))); + } + final BitSet bits = new BitSet(count); + + engine.register(new Consumer>() { + @Override + public void accept(Iterable events) { + for (TriggerEvent event : events) { + int index = Integer.parseInt(event.jobName()); + if (!bits.get(index)) { + logger.info("job [{}] first fire", index); + bits.set(index); + firstLatch.countDown(); + } else { + logger.info("job [{}] second fire", index); + secondLatch.countDown(); + } + } + } + }); + + engine.start(watches); + advanceClockIfNeeded(new DateTime(clock.millis(), UTC).plusMillis(1100)); + if (!firstLatch.await(3 * count, TimeUnit.SECONDS)) { + fail("waiting too long for all watches to be triggered"); + } + + advanceClockIfNeeded(new DateTime(clock.millis(), UTC).plusMillis(1100)); + if (!secondLatch.await(3 * count, TimeUnit.SECONDS)) { + fail("waiting too long for all watches to be triggered"); + } + engine.stop(); + assertThat(bits.cardinality(), is(count)); + } + + public void testAddHourly() throws Exception { + final String name = "job_name"; + final CountDownLatch latch = new CountDownLatch(1); + engine.start(Collections.emptySet()); + engine.register(new Consumer>() { + @Override + public void accept(Iterable events) { + for (TriggerEvent event : events) { + assertThat(event.jobName(), is(name)); + logger.info("triggered job on [{}]", clock); + } + latch.countDown(); + } + }); + + int randomMinute = randomIntBetween(0, 59); + DateTime testNowTime = new DateTime(clock.millis(), UTC).withMinuteOfHour(randomMinute) + .withSecondOfMinute(59); + DateTime scheduledTime = testNowTime.plusSeconds(2); + logger.info("Setting current time to [{}], job execution time [{}]", testNowTime, + scheduledTime); + + clock.setTime(testNowTime); + engine.add(createWatch(name, daily().at(scheduledTime.getHourOfDay(), + scheduledTime.getMinuteOfHour()).build())); + advanceClockIfNeeded(scheduledTime); + + if (!latch.await(5, TimeUnit.SECONDS)) { + fail("waiting too long for all watches to be triggered"); + } + } + + public void testAddDaily() throws Exception { + final String name = "job_name"; + final CountDownLatch latch = new CountDownLatch(1); + engine.start(Collections.emptySet()); + + engine.register(new Consumer>() { + @Override + public void accept(Iterable events) { + for (TriggerEvent event : events) { + assertThat(event.jobName(), is(name)); + logger.info("triggered job on [{}]", new DateTime(clock.millis(), UTC)); + latch.countDown(); + } + } + }); + + int randomHour = randomIntBetween(0, 23); + int randomMinute = randomIntBetween(0, 59); + + DateTime testNowTime = new DateTime(clock.millis(), UTC).withHourOfDay(randomHour) + .withMinuteOfHour(randomMinute).withSecondOfMinute(59); + DateTime scheduledTime = testNowTime.plusSeconds(2); + logger.info("Setting current time to [{}], job execution time [{}]", testNowTime, + scheduledTime); + + clock.setTime(testNowTime); + engine.add(createWatch(name, daily().at(scheduledTime.getHourOfDay(), + scheduledTime.getMinuteOfHour()).build())); + advanceClockIfNeeded(scheduledTime); + + if (!latch.await(5, TimeUnit.SECONDS)) { + fail("waiting too long for all watches to be triggered"); + } + } + + public void testAddWeekly() throws Exception { + final String name = "job_name"; + final CountDownLatch latch = new CountDownLatch(1); + engine.start(Collections.emptySet()); + engine.register(new Consumer>() { + @Override + public void accept(Iterable events) { + for (TriggerEvent event : events) { + assertThat(event.jobName(), is(name)); + logger.info("triggered job"); + } + latch.countDown(); + } + }); + + int randomHour = randomIntBetween(0, 23); + int randomMinute = randomIntBetween(0, 59); + int randomDay = randomIntBetween(1, 7); + + DateTime testNowTime = new DateTime(clock.millis(), UTC).withDayOfWeek(randomDay) + .withHourOfDay(randomHour).withMinuteOfHour(randomMinute).withSecondOfMinute(59); + DateTime scheduledTime = testNowTime.plusSeconds(2); + + logger.info("Setting current time to [{}], job execution time [{}]", testNowTime, + scheduledTime); + clock.setTime(testNowTime); + + // fun part here (aka WTF): DayOfWeek with Joda is MON-SUN, starting at 1 + // DayOfWeek with Watcher is SUN-SAT, starting at 1 + int watcherDay = (scheduledTime.getDayOfWeek() % 7) + 1; + engine.add(createWatch(name, weekly().time(WeekTimes.builder() + .on(DayOfWeek.resolve(watcherDay)) + .at(scheduledTime.getHourOfDay(), scheduledTime.getMinuteOfHour()).build()) + .build())); + advanceClockIfNeeded(scheduledTime); + + if (!latch.await(5, TimeUnit.SECONDS)) { + fail("waiting too long for all watches to be triggered"); + } + } + + public void testAddSameJobSeveralTimesAndExecutedOnce() throws InterruptedException { + engine.start(Collections.emptySet()); + + final CountDownLatch firstLatch = new CountDownLatch(1); + final CountDownLatch secondLatch = new CountDownLatch(1); + AtomicInteger counter = new AtomicInteger(0); + engine.register(new Consumer>() { + @Override + public void accept(Iterable events) { + events.forEach(event -> { + if (counter.getAndIncrement() == 0) { + firstLatch.countDown(); + } else { + secondLatch.countDown(); + } + }); + } + }); + + int times = scaledRandomIntBetween(3, 30); + for (int i = 0; i < times; i++) { + engine.add(createWatch("_id", interval("1s"))); + } + + advanceClockIfNeeded(new DateTime(clock.millis(), UTC).plusMillis(1100)); + if (!firstLatch.await(3, TimeUnit.SECONDS)) { + fail("waiting too long for all watches to be triggered"); + } + + advanceClockIfNeeded(new DateTime(clock.millis(), UTC).plusMillis(1100)); + if (!secondLatch.await(3, TimeUnit.SECONDS)) { + fail("waiting too long for all watches to be triggered"); + } + + // ensure job was only called twice independent from its name + assertThat(counter.get(), is(2)); + } + + private Watch createWatch(String name, Schedule schedule) { + return new Watch(name, new ScheduleTrigger(schedule), new ExecutableNoneInput(logger), + AlwaysCondition.INSTANCE, null, null, + Collections.emptyList(), null, null); + } } diff --git a/plugin/src/test/java/org/elasticsearch/xpack/watcher/watch/WatchTests.java b/plugin/src/test/java/org/elasticsearch/xpack/watcher/watch/WatchTests.java index d0f97c429b3..4b35078fe80 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/watcher/watch/WatchTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/watcher/watch/WatchTests.java @@ -532,7 +532,7 @@ public class WatchTests extends ESTestCase { } @Override - public void start(Collection jobs) { + public void start(Collection jobs) { } @Override @@ -540,11 +540,7 @@ public class WatchTests extends ESTestCase { } @Override - public void register(Listener listener) { - } - - @Override - public void add(Job job) { + public void add(Watch watch) { } @Override