From 0c7c2f521c88293b909483235aa1c0c0b9444801 Mon Sep 17 00:00:00 2001 From: Alexander Reelsen Date: Tue, 21 Mar 2017 10:27:41 +0100 Subject: [PATCH] Watcher: Remove scheduler based trigger engine (elastic/x-pack-elasticsearch#724) The scheduler based trigger engine is not enabled by default as the ticker based trigger engine is used. As we dont use it in production, this commit removes this specific implementation. It also removes some uneeded abstractions like AbstractTriggerEngine, TriggerEngine.Listener and TriggerEngine.Job Original commit: elastic/x-pack-elasticsearch@b17a2e9d623235b1523dcf2501ae5b5ebc3692d1 --- .../elasticsearch/xpack/watcher/Watcher.java | 38 +-- ...er.java => AsyncTriggerEventConsumer.java} | 16 +- ...ner.java => SyncTriggerEventConsumer.java} | 15 +- .../trigger/AbstractTriggerEngine.java | 27 -- .../xpack/watcher/trigger/TriggerEngine.java | 23 +- .../xpack/watcher/trigger/TriggerService.java | 31 +-- .../trigger/manual/ManualTriggerEngine.java | 9 +- .../schedule/ScheduleTriggerEngine.java | 16 +- .../SchedulerScheduleTriggerEngine.java | 77 ------ .../engine/TickerScheduleTriggerEngine.java | 11 +- .../xpack/watcher/watch/Watch.java | 7 +- .../AbstractWatcherIntegrationTestCase.java | 6 - .../xpack/watcher/test/TimeWarpedWatcher.java | 8 +- .../bench/ScheduleEngineTriggerBenchmark.java | 107 +++----- .../WatcherExecutorServiceBenchmark.java | 2 - .../trigger/ScheduleTriggerEngineMock.java | 18 +- .../engine/BaseTriggerEngineTestCase.java | 243 ----------------- .../engine/SchedulerScheduleEngineTests.java | 26 -- .../engine/TickerScheduleEngineTests.java | 244 +++++++++++++++++- .../xpack/watcher/watch/WatchTests.java | 8 +- 20 files changed, 365 insertions(+), 567 deletions(-) rename plugin/src/main/java/org/elasticsearch/xpack/watcher/execution/{AsyncTriggerListener.java => AsyncTriggerEventConsumer.java} (73%) rename plugin/src/main/java/org/elasticsearch/xpack/watcher/execution/{SyncTriggerListener.java => SyncTriggerEventConsumer.java} (73%) delete mode 100644 plugin/src/main/java/org/elasticsearch/xpack/watcher/trigger/AbstractTriggerEngine.java delete mode 100644 plugin/src/main/java/org/elasticsearch/xpack/watcher/trigger/schedule/engine/SchedulerScheduleTriggerEngine.java delete mode 100644 plugin/src/test/java/org/elasticsearch/xpack/watcher/trigger/schedule/engine/BaseTriggerEngineTestCase.java delete mode 100644 plugin/src/test/java/org/elasticsearch/xpack/watcher/trigger/schedule/engine/SchedulerScheduleEngineTests.java diff --git a/plugin/src/main/java/org/elasticsearch/xpack/watcher/Watcher.java b/plugin/src/main/java/org/elasticsearch/xpack/watcher/Watcher.java index 74bcada4ae0..510534abc62 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/watcher/Watcher.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/watcher/Watcher.java @@ -80,7 +80,7 @@ import org.elasticsearch.xpack.watcher.condition.ConditionFactory; import org.elasticsearch.xpack.watcher.condition.ConditionRegistry; import org.elasticsearch.xpack.watcher.condition.NeverCondition; import org.elasticsearch.xpack.watcher.condition.ScriptCondition; -import org.elasticsearch.xpack.watcher.execution.AsyncTriggerListener; +import org.elasticsearch.xpack.watcher.execution.AsyncTriggerEventConsumer; import org.elasticsearch.xpack.watcher.execution.ExecutionService; import org.elasticsearch.xpack.watcher.execution.InternalWatchExecutor; import org.elasticsearch.xpack.watcher.execution.TriggeredWatch; @@ -135,6 +135,7 @@ import org.elasticsearch.xpack.watcher.transport.actions.service.WatcherServiceA import org.elasticsearch.xpack.watcher.transport.actions.stats.TransportWatcherStatsAction; import org.elasticsearch.xpack.watcher.transport.actions.stats.WatcherStatsAction; import org.elasticsearch.xpack.watcher.trigger.TriggerEngine; +import org.elasticsearch.xpack.watcher.trigger.TriggerEvent; import org.elasticsearch.xpack.watcher.trigger.TriggerService; import org.elasticsearch.xpack.watcher.trigger.manual.ManualTriggerEngine; import org.elasticsearch.xpack.watcher.trigger.schedule.CronSchedule; @@ -146,7 +147,6 @@ import org.elasticsearch.xpack.watcher.trigger.schedule.Schedule; import org.elasticsearch.xpack.watcher.trigger.schedule.ScheduleRegistry; import org.elasticsearch.xpack.watcher.trigger.schedule.WeeklySchedule; import org.elasticsearch.xpack.watcher.trigger.schedule.YearlySchedule; -import org.elasticsearch.xpack.watcher.trigger.schedule.engine.SchedulerScheduleTriggerEngine; import org.elasticsearch.xpack.watcher.trigger.schedule.engine.TickerScheduleTriggerEngine; import org.elasticsearch.xpack.watcher.watch.Watch; import org.joda.time.DateTime; @@ -162,12 +162,12 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Supplier; import java.util.stream.Collectors; import static java.util.Collections.emptyList; -import static org.elasticsearch.xpack.watcher.support.Exceptions.illegalState; public class Watcher implements ActionPlugin, ScriptPlugin { @@ -177,19 +177,6 @@ public class Watcher implements ActionPlugin, ScriptPlugin { Setting.boolSetting("xpack.watcher.encrypt_sensitive_data", false, Setting.Property.NodeScope); public static final Setting MAX_STOP_TIMEOUT_SETTING = Setting.timeSetting("xpack.watcher.stop.timeout", TimeValue.timeValueSeconds(30), Setting.Property.NodeScope); - public static final Setting TRIGGER_SCHEDULE_ENGINE_SETTING = - new Setting<>("xpack.watcher.trigger.schedule.engine", "ticker", s -> { - switch (s) { - case "ticker": - case "scheduler": - return s; - default: - throw new IllegalArgumentException("Can't parse [xpack.watcher.trigger.schedule.engine] must be one of [ticker, " + - "scheduler], was [" + s + "]"); - } - - }, Setting.Property.NodeScope); - private static final ScriptContext.Plugin SCRIPT_PLUGIN = new ScriptContext.Plugin("xpack", "watch"); public static final ScriptContext SCRIPT_CONTEXT = SCRIPT_PLUGIN::getKey; @@ -288,7 +275,7 @@ public class Watcher implements ActionPlugin, ScriptPlugin { final ScheduleRegistry scheduleRegistry = new ScheduleRegistry(scheduleParsers); TriggerEngine manualTriggerEngine = new ManualTriggerEngine(); - TriggerEngine configuredTriggerEngine = getTriggerEngine(clock, scheduleRegistry); + final TriggerEngine configuredTriggerEngine = getTriggerEngine(clock, scheduleRegistry); final Set triggerEngines = new HashSet<>(); triggerEngines.add(manualTriggerEngine); @@ -306,7 +293,7 @@ public class Watcher implements ActionPlugin, ScriptPlugin { final ExecutionService executionService = new ExecutionService(settings, historyStore, triggeredWatchStore, watchExecutor, clock, threadPool, watchParser, watcherClientProxy); - final TriggerEngine.Listener triggerEngineListener = getTriggerEngineListener(executionService); + final Consumer> triggerEngineListener = getTriggerEngineListener(executionService); triggerService.register(triggerEngineListener); final WatcherIndexTemplateRegistry watcherIndexTemplateRegistry = new WatcherIndexTemplateRegistry(settings, @@ -324,23 +311,15 @@ public class Watcher implements ActionPlugin, ScriptPlugin { } protected TriggerEngine getTriggerEngine(Clock clock, ScheduleRegistry scheduleRegistry) { - String engine = TRIGGER_SCHEDULE_ENGINE_SETTING.get(settings); - switch (engine) { - case "scheduler": - return new SchedulerScheduleTriggerEngine(settings, scheduleRegistry, clock); - case "ticker": - return new TickerScheduleTriggerEngine(settings, scheduleRegistry, clock); - default: // should never happen, as the setting is already parsing for scheduler/ticker - throw illegalState("schedule engine must be either set to [scheduler] or [ticker], but was []", engine); - } + return new TickerScheduleTriggerEngine(settings, scheduleRegistry, clock); } protected WatchExecutor getWatchExecutor(ThreadPool threadPool) { return new InternalWatchExecutor(threadPool); } - protected TriggerEngine.Listener getTriggerEngineListener(ExecutionService executionService) { - return new AsyncTriggerListener(settings, executionService); + protected Consumer> getTriggerEngineListener(ExecutionService executionService) { + return new AsyncTriggerEventConsumer(settings, executionService); } private T getService(Class serviceClass, Collection services) { @@ -374,7 +353,6 @@ public class Watcher implements ActionPlugin, ScriptPlugin { for (TemplateConfig templateConfig : WatcherIndexTemplateRegistry.TEMPLATE_CONFIGS) { settings.add(templateConfig.getSetting()); } - settings.add(TRIGGER_SCHEDULE_ENGINE_SETTING); settings.add(INDEX_WATCHER_TEMPLATE_VERSION_SETTING); settings.add(MAX_STOP_TIMEOUT_SETTING); settings.add(ExecutionService.DEFAULT_THROTTLE_PERIOD_SETTING); diff --git a/plugin/src/main/java/org/elasticsearch/xpack/watcher/execution/AsyncTriggerListener.java b/plugin/src/main/java/org/elasticsearch/xpack/watcher/execution/AsyncTriggerEventConsumer.java similarity index 73% rename from plugin/src/main/java/org/elasticsearch/xpack/watcher/execution/AsyncTriggerListener.java rename to plugin/src/main/java/org/elasticsearch/xpack/watcher/execution/AsyncTriggerEventConsumer.java index 3cb6f1988a9..7e637185600 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/watcher/execution/AsyncTriggerListener.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/watcher/execution/AsyncTriggerEventConsumer.java @@ -10,33 +10,33 @@ import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.logging.log4j.util.Supplier; import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.xpack.watcher.trigger.TriggerEngine; import org.elasticsearch.xpack.watcher.trigger.TriggerEvent; +import java.util.function.Consumer; + import static java.util.stream.StreamSupport.stream; -public class AsyncTriggerListener implements TriggerEngine.Listener { +public class AsyncTriggerEventConsumer implements Consumer> { private final Logger logger; private final ExecutionService executionService; - public AsyncTriggerListener(Settings settings, ExecutionService executionService) { - this.logger = Loggers.getLogger(SyncTriggerListener.class, settings); + public AsyncTriggerEventConsumer(Settings settings, ExecutionService executionService) { + this.logger = Loggers.getLogger(SyncTriggerEventConsumer.class, settings); this.executionService = executionService; } @Override - public void triggered(Iterable events) { + public void accept(Iterable events) { try { executionService.processEventsAsync(events); } catch (Exception e) { logger.error( (Supplier) () -> new ParameterizedMessage( "failed to process triggered events [{}]", - (Object) stream(events.spliterator(), false).toArray(size -> new TriggerEvent[size])), + (Object) stream(events.spliterator(), false).toArray(size -> + new TriggerEvent[size])), e); } - } - } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/watcher/execution/SyncTriggerListener.java b/plugin/src/main/java/org/elasticsearch/xpack/watcher/execution/SyncTriggerEventConsumer.java similarity index 73% rename from plugin/src/main/java/org/elasticsearch/xpack/watcher/execution/SyncTriggerListener.java rename to plugin/src/main/java/org/elasticsearch/xpack/watcher/execution/SyncTriggerEventConsumer.java index 8e92038c5b2..91af3247457 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/watcher/execution/SyncTriggerListener.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/watcher/execution/SyncTriggerEventConsumer.java @@ -10,32 +10,33 @@ import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.logging.log4j.util.Supplier; import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.xpack.watcher.trigger.TriggerEngine; import org.elasticsearch.xpack.watcher.trigger.TriggerEvent; +import java.util.function.Consumer; + import static java.util.stream.StreamSupport.stream; -public class SyncTriggerListener implements TriggerEngine.Listener { +public class SyncTriggerEventConsumer implements Consumer> { private final ExecutionService executionService; private final Logger logger; - public SyncTriggerListener(Settings settings, ExecutionService executionService) { - this.logger = Loggers.getLogger(SyncTriggerListener.class, settings); + public SyncTriggerEventConsumer(Settings settings, ExecutionService executionService) { + this.logger = Loggers.getLogger(SyncTriggerEventConsumer.class, settings); this.executionService = executionService; } @Override - public void triggered(Iterable events) { + public void accept(Iterable events) { try { executionService.processEventsSync(events); } catch (Exception e) { logger.error( (Supplier) () -> new ParameterizedMessage( "failed to process triggered events [{}]", - (Object) stream(events.spliterator(), false).toArray(size -> new TriggerEvent[size])), + (Object) stream(events.spliterator(), false).toArray(size -> + new TriggerEvent[size])), e); } } - } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/watcher/trigger/AbstractTriggerEngine.java b/plugin/src/main/java/org/elasticsearch/xpack/watcher/trigger/AbstractTriggerEngine.java deleted file mode 100644 index 876009c16a1..00000000000 --- a/plugin/src/main/java/org/elasticsearch/xpack/watcher/trigger/AbstractTriggerEngine.java +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ -package org.elasticsearch.xpack.watcher.trigger; - -import org.elasticsearch.common.component.AbstractComponent; -import org.elasticsearch.common.settings.Settings; - -import java.util.List; -import java.util.concurrent.CopyOnWriteArrayList; - -public abstract class AbstractTriggerEngine extends AbstractComponent implements - TriggerEngine { - - protected final List listeners = new CopyOnWriteArrayList<>(); - - public AbstractTriggerEngine(Settings settings) { - super(settings); - } - - @Override - public void register(Listener listener) { - listeners.add(listener); - } -} diff --git a/plugin/src/main/java/org/elasticsearch/xpack/watcher/trigger/TriggerEngine.java b/plugin/src/main/java/org/elasticsearch/xpack/watcher/trigger/TriggerEngine.java index b8d6bf042b2..e949370c5c7 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/watcher/trigger/TriggerEngine.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/watcher/trigger/TriggerEngine.java @@ -7,10 +7,12 @@ package org.elasticsearch.xpack.watcher.trigger; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.xpack.watcher.watch.Watch; import java.io.IOException; import java.util.Collection; import java.util.Map; +import java.util.function.Consumer; public interface TriggerEngine { @@ -20,13 +22,13 @@ public interface TriggerEngine { * It's the responsibility of the trigger engine implementation to select the appropriate jobs * from the given list of jobs */ - void start(Collection jobs); + void start(Collection jobs); void stop(); - void register(Listener listener); + void register(Consumer> consumer); - void add(Job job); + void add(Watch job); /** * Removes the job associated with the given name from this trigger engine. @@ -41,19 +43,4 @@ public interface TriggerEngine { T parseTrigger(String context, XContentParser parser) throws IOException; E parseTriggerEvent(TriggerService service, String watchId, String context, XContentParser parser) throws IOException; - - interface Listener { - - void triggered(Iterable events); - - } - - interface Job { - - String id(); - - Trigger trigger(); - } - - } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/watcher/trigger/TriggerService.java b/plugin/src/main/java/org/elasticsearch/xpack/watcher/trigger/TriggerService.java index 01a326da208..c6ff233e037 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/watcher/trigger/TriggerService.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/watcher/trigger/TriggerService.java @@ -18,22 +18,22 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.function.Consumer; import static java.util.Collections.unmodifiableMap; import static org.elasticsearch.xpack.watcher.support.Exceptions.illegalArgument; public class TriggerService extends AbstractComponent { - private final Listeners listeners; + private final GroupedConsumer consumer = new GroupedConsumer(); private final Map engines; public TriggerService(Settings settings, Set engines) { super(settings); - listeners = new Listeners(); Map builder = new HashMap<>(); for (TriggerEngine engine : engines) { builder.put(engine.type(), engine); - engine.register(listeners); + engine.register(consumer); } this.engines = unmodifiableMap(builder); } @@ -54,10 +54,10 @@ public class TriggerService extends AbstractComponent { * Adds the given job to the trigger service. If there is already a registered job in this service with the * same job ID, the newly added job will replace the old job (the old job will not be triggered anymore) * - * @param job The new job + * @param watch The new watch */ - public void add(TriggerEngine.Job job) { - engines.get(job.trigger().type()).add(job); + public void add(Watch watch) { + engines.get(watch.trigger().type()).add(watch); } /** @@ -75,8 +75,8 @@ public class TriggerService extends AbstractComponent { return false; } - public void register(TriggerEngine.Listener listener) { - listeners.add(listener); + public void register(Consumer> consumer) { + this.consumer.add(consumer); } public TriggerEvent simulateEvent(String type, String jobId, Map data) { @@ -149,20 +149,17 @@ public class TriggerService extends AbstractComponent { return engine.parseTriggerEvent(this, watchId, context, parser); } - static class Listeners implements TriggerEngine.Listener { + static class GroupedConsumer implements java.util.function.Consumer> { - private List listeners = new CopyOnWriteArrayList<>(); + private List>> consumers = new CopyOnWriteArrayList<>(); - public void add(TriggerEngine.Listener listener) { - listeners.add(listener); + public void add(Consumer> consumer) { + consumers.add(consumer); } @Override - public void triggered(Iterable events) { - for (TriggerEngine.Listener listener : listeners) { - listener.triggered(events); - } + public void accept(Iterable events) { + consumers.forEach(c -> c.accept(events)); } } - } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/watcher/trigger/manual/ManualTriggerEngine.java b/plugin/src/main/java/org/elasticsearch/xpack/watcher/trigger/manual/ManualTriggerEngine.java index aeef9139882..9f98737ee51 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/watcher/trigger/manual/ManualTriggerEngine.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/watcher/trigger/manual/ManualTriggerEngine.java @@ -8,11 +8,14 @@ package org.elasticsearch.xpack.watcher.trigger.manual; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.xpack.watcher.trigger.TriggerEngine; +import org.elasticsearch.xpack.watcher.trigger.TriggerEvent; import org.elasticsearch.xpack.watcher.trigger.TriggerService; +import org.elasticsearch.xpack.watcher.watch.Watch; import java.io.IOException; import java.util.Collection; import java.util.Map; +import java.util.function.Consumer; import static org.elasticsearch.xpack.watcher.support.Exceptions.illegalArgument; @@ -30,7 +33,7 @@ public class ManualTriggerEngine implements TriggerEngine jobs) { + public void start(Collection jobs) { } @Override @@ -38,11 +41,11 @@ public class ManualTriggerEngine implements TriggerEngine> consumer) { } @Override - public void add(Job job) { + public void add(Watch job) { } @Override diff --git a/plugin/src/main/java/org/elasticsearch/xpack/watcher/trigger/schedule/ScheduleTriggerEngine.java b/plugin/src/main/java/org/elasticsearch/xpack/watcher/trigger/schedule/ScheduleTriggerEngine.java index 0dd01099646..4b96c8617fc 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/watcher/trigger/schedule/ScheduleTriggerEngine.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/watcher/trigger/schedule/ScheduleTriggerEngine.java @@ -6,24 +6,30 @@ package org.elasticsearch.xpack.watcher.trigger.schedule; import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.xpack.watcher.support.WatcherDateTimeUtils; -import org.elasticsearch.xpack.watcher.trigger.AbstractTriggerEngine; +import org.elasticsearch.xpack.watcher.trigger.TriggerEngine; +import org.elasticsearch.xpack.watcher.trigger.TriggerEvent; import org.elasticsearch.xpack.watcher.trigger.TriggerService; import org.joda.time.DateTime; import java.io.IOException; import java.time.Clock; +import java.util.List; import java.util.Map; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.function.Consumer; import static org.elasticsearch.xpack.watcher.support.Exceptions.illegalArgument; import static org.joda.time.DateTimeZone.UTC; -public abstract class ScheduleTriggerEngine extends AbstractTriggerEngine { +public abstract class ScheduleTriggerEngine extends AbstractComponent implements TriggerEngine { public static final String TYPE = ScheduleTrigger.TYPE; + protected final List>> consumers = new CopyOnWriteArrayList<>(); protected final ScheduleRegistry scheduleRegistry; protected final Clock clock; @@ -38,6 +44,12 @@ public abstract class ScheduleTriggerEngine extends AbstractTriggerEngine> consumer) { + consumers.add(consumer); + } + + @Override public ScheduleTriggerEvent simulateEvent(String jobId, @Nullable Map data, TriggerService service) { DateTime now = new DateTime(clock.millis(), UTC); diff --git a/plugin/src/main/java/org/elasticsearch/xpack/watcher/trigger/schedule/engine/SchedulerScheduleTriggerEngine.java b/plugin/src/main/java/org/elasticsearch/xpack/watcher/trigger/schedule/engine/SchedulerScheduleTriggerEngine.java deleted file mode 100644 index 862244bb7cb..00000000000 --- a/plugin/src/main/java/org/elasticsearch/xpack/watcher/trigger/schedule/engine/SchedulerScheduleTriggerEngine.java +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ -package org.elasticsearch.xpack.watcher.trigger.schedule.engine; - -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.xpack.scheduler.SchedulerEngine; -import org.elasticsearch.xpack.watcher.trigger.schedule.ScheduleRegistry; -import org.elasticsearch.xpack.watcher.trigger.schedule.ScheduleTrigger; -import org.elasticsearch.xpack.watcher.trigger.schedule.ScheduleTriggerEngine; -import org.elasticsearch.xpack.watcher.trigger.schedule.ScheduleTriggerEvent; -import org.joda.time.DateTime; - -import java.time.Clock; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.List; - -import static org.joda.time.DateTimeZone.UTC; - -public class SchedulerScheduleTriggerEngine extends ScheduleTriggerEngine { - - private final SchedulerEngine schedulerEngine; - - public SchedulerScheduleTriggerEngine(Settings settings, ScheduleRegistry scheduleRegistry, Clock clock) { - super(settings, scheduleRegistry, clock); - this.schedulerEngine = new SchedulerEngine(clock); - this.schedulerEngine.register(event -> - notifyListeners(event.getJobName(), event.getTriggeredTime(), event.getScheduledTime())); - } - - @Override - public void start(Collection jobs) { - logger.debug("starting schedule engine..."); - final List schedulerJobs = new ArrayList<>(); - jobs.stream() - .filter(job -> job.trigger() instanceof ScheduleTrigger) - .forEach(job -> { - ScheduleTrigger trigger = (ScheduleTrigger) job.trigger(); - schedulerJobs.add(new SchedulerEngine.Job(job.id(), trigger.getSchedule())); - }); - schedulerEngine.start(schedulerJobs); - logger.debug("schedule engine started at [{}]", new DateTime(clock.millis(), UTC)); - } - - @Override - public void stop() { - logger.debug("stopping schedule engine..."); - schedulerEngine.stop(); - logger.debug("schedule engine stopped"); - } - - @Override - public void add(Job job) { - assert job.trigger() instanceof ScheduleTrigger; - ScheduleTrigger trigger = (ScheduleTrigger) job.trigger(); - schedulerEngine.add(new SchedulerEngine.Job(job.id(), trigger.getSchedule())); - } - - @Override - public boolean remove(String jobId) { - return schedulerEngine.remove(jobId); - } - - protected void notifyListeners(String name, long triggeredTime, long scheduledTime) { - logger.trace("triggered job [{}] at [{}] (scheduled time was [{}])", name, new DateTime(triggeredTime, UTC), - new DateTime(scheduledTime, UTC)); - final ScheduleTriggerEvent event = new ScheduleTriggerEvent(name, new DateTime(triggeredTime, UTC), - new DateTime(scheduledTime, UTC)); - for (Listener listener : listeners) { - listener.triggered(Collections.singletonList(event)); - } - } -} diff --git a/plugin/src/main/java/org/elasticsearch/xpack/watcher/trigger/schedule/engine/TickerScheduleTriggerEngine.java b/plugin/src/main/java/org/elasticsearch/xpack/watcher/trigger/schedule/engine/TickerScheduleTriggerEngine.java index 1525e13a773..99237bd0556 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/watcher/trigger/schedule/engine/TickerScheduleTriggerEngine.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/watcher/trigger/schedule/engine/TickerScheduleTriggerEngine.java @@ -13,6 +13,7 @@ import org.elasticsearch.xpack.watcher.trigger.schedule.ScheduleRegistry; import org.elasticsearch.xpack.watcher.trigger.schedule.ScheduleTrigger; import org.elasticsearch.xpack.watcher.trigger.schedule.ScheduleTriggerEngine; import org.elasticsearch.xpack.watcher.trigger.schedule.ScheduleTriggerEvent; +import org.elasticsearch.xpack.watcher.watch.Watch; import org.joda.time.DateTime; import java.time.Clock; @@ -38,10 +39,10 @@ public class TickerScheduleTriggerEngine extends ScheduleTriggerEngine { } @Override - public void start(Collection jobs) { + public void start(Collection jobs) { long starTime = clock.millis(); Map schedules = new ConcurrentHashMap<>(); - for (Job job : jobs) { + for (Watch job : jobs) { if (job.trigger() instanceof ScheduleTrigger) { ScheduleTrigger trigger = (ScheduleTrigger) job.trigger(); schedules.put(job.id(), new ActiveSchedule(job.id(), trigger.getSchedule(), starTime)); @@ -57,7 +58,7 @@ public class TickerScheduleTriggerEngine extends ScheduleTriggerEngine { } @Override - public void add(Job job) { + public void add(Watch job) { assert job.trigger() instanceof ScheduleTrigger; ScheduleTrigger trigger = (ScheduleTrigger) job.trigger(); schedules.put(job.id(), new ActiveSchedule(job.id(), trigger.getSchedule(), clock.millis())); @@ -90,9 +91,7 @@ public class TickerScheduleTriggerEngine extends ScheduleTriggerEngine { } protected void notifyListeners(List events) { - for (Listener listener : listeners) { - listener.triggered(events); - } + consumers.forEach(consumer -> consumer.accept(events)); } static class ActiveSchedule { diff --git a/plugin/src/main/java/org/elasticsearch/xpack/watcher/watch/Watch.java b/plugin/src/main/java/org/elasticsearch/xpack/watcher/watch/Watch.java index eaafd7be5b1..b3d1f429c7d 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/watcher/watch/Watch.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/watcher/watch/Watch.java @@ -17,7 +17,6 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.xpack.common.secret.Secret; @@ -36,7 +35,6 @@ import org.elasticsearch.xpack.watcher.support.WatcherDateTimeUtils; import org.elasticsearch.xpack.watcher.support.xcontent.WatcherXContentParser; import org.elasticsearch.xpack.watcher.transform.ExecutableTransform; import org.elasticsearch.xpack.watcher.trigger.Trigger; -import org.elasticsearch.xpack.watcher.trigger.TriggerEngine; import org.elasticsearch.xpack.watcher.trigger.TriggerService; import org.joda.time.DateTime; @@ -50,11 +48,10 @@ import java.util.regex.Pattern; import static java.util.Collections.unmodifiableMap; import static org.elasticsearch.common.unit.TimeValue.timeValueMillis; -import static org.elasticsearch.common.xcontent.XContentHelper.createParser; import static org.elasticsearch.xpack.watcher.support.Exceptions.ioException; import static org.joda.time.DateTimeZone.UTC; -public class Watch implements TriggerEngine.Job, ToXContentObject { +public class Watch implements ToXContentObject { public static final String ALL_ACTIONS_ID = "_all"; public static final String INCLUDE_STATUS_KEY = "include_status"; @@ -87,12 +84,10 @@ public class Watch implements TriggerEngine.Job, ToXContentObject { this.status = status; } - @Override public String id() { return id; } - @Override public Trigger trigger() { return trigger; } diff --git a/plugin/src/test/java/org/elasticsearch/xpack/watcher/test/AbstractWatcherIntegrationTestCase.java b/plugin/src/test/java/org/elasticsearch/xpack/watcher/test/AbstractWatcherIntegrationTestCase.java index 19457d3697f..8c411962cbd 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/watcher/test/AbstractWatcherIntegrationTestCase.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/watcher/test/AbstractWatcherIntegrationTestCase.java @@ -121,20 +121,16 @@ public abstract class AbstractWatcherIntegrationTestCase extends ESIntegTestCase private static Boolean securityEnabled; - private static String scheduleEngineName; - @Override protected TestCluster buildTestCluster(Scope scope, long seed) throws IOException { if (securityEnabled == null) { securityEnabled = enableSecurity(); } - scheduleEngineName = randomFrom("ticker", "scheduler"); return super.buildTestCluster(scope, seed); } @Override protected Settings nodeSettings(int nodeOrdinal) { - logger.info("using schedule engine [{}]", scheduleEngineName); return Settings.builder() .put(super.nodeSettings(nodeOrdinal)) //TODO: for now lets isolate watcher tests from monitoring (randomize this later) @@ -144,7 +140,6 @@ public abstract class AbstractWatcherIntegrationTestCase extends ESIntegTestCase .put("xpack.watcher.execution.scroll.size", randomIntBetween(1, 100)) .put("xpack.watcher.watch.scroll.size", randomIntBetween(1, 100)) .put(SecuritySettings.settings(securityEnabled)) - .put("xpack.watcher.trigger.schedule.engine", scheduleEngineName) .put("script.inline", "true") // Disable native ML autodetect_process as the c++ controller won't be available .put(MachineLearning.AUTODETECT_PROCESS.getKey(), false) @@ -257,7 +252,6 @@ public abstract class AbstractWatcherIntegrationTestCase extends ESIntegTestCase @AfterClass public static void _cleanupClass() { securityEnabled = null; - scheduleEngineName = null; } @Override diff --git a/plugin/src/test/java/org/elasticsearch/xpack/watcher/test/TimeWarpedWatcher.java b/plugin/src/test/java/org/elasticsearch/xpack/watcher/test/TimeWarpedWatcher.java index 6fce1d9cdce..95b290e71f2 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/watcher/test/TimeWarpedWatcher.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/watcher/test/TimeWarpedWatcher.java @@ -10,15 +10,17 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.watcher.Watcher; import org.elasticsearch.xpack.watcher.execution.ExecutionService; -import org.elasticsearch.xpack.watcher.execution.SyncTriggerListener; +import org.elasticsearch.xpack.watcher.execution.SyncTriggerEventConsumer; import org.elasticsearch.xpack.watcher.execution.WatchExecutor; import org.elasticsearch.xpack.watcher.trigger.ScheduleTriggerEngineMock; import org.elasticsearch.xpack.watcher.trigger.TriggerEngine; +import org.elasticsearch.xpack.watcher.trigger.TriggerEvent; import org.elasticsearch.xpack.watcher.trigger.schedule.ScheduleRegistry; import java.time.Clock; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; +import java.util.function.Consumer; import java.util.stream.Stream; public class TimeWarpedWatcher extends Watcher { @@ -40,8 +42,8 @@ public class TimeWarpedWatcher extends Watcher { } @Override - protected TriggerEngine.Listener getTriggerEngineListener(ExecutionService executionService) { - return new SyncTriggerListener(settings, executionService); + protected Consumer> getTriggerEngineListener(ExecutionService executionService) { + return new SyncTriggerEventConsumer(settings, executionService); } public static class SameThreadExecutor implements WatchExecutor { diff --git a/plugin/src/test/java/org/elasticsearch/xpack/watcher/test/bench/ScheduleEngineTriggerBenchmark.java b/plugin/src/test/java/org/elasticsearch/xpack/watcher/test/bench/ScheduleEngineTriggerBenchmark.java index aff9da5a106..0fc6d6fd034 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/watcher/test/bench/ScheduleEngineTriggerBenchmark.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/watcher/test/bench/ScheduleEngineTriggerBenchmark.java @@ -5,22 +5,24 @@ */ package org.elasticsearch.xpack.watcher.test.bench; -import org.elasticsearch.common.Randomness; +import org.apache.logging.log4j.Logger; import org.elasticsearch.common.SuppressForbidden; +import org.elasticsearch.common.logging.ESLoggerFactory; import org.elasticsearch.common.metrics.MeanMetric; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.xpack.watcher.trigger.TriggerEngine; +import org.elasticsearch.xpack.watcher.condition.AlwaysCondition; +import org.elasticsearch.xpack.watcher.input.none.ExecutableNoneInput; import org.elasticsearch.xpack.watcher.trigger.TriggerEvent; import org.elasticsearch.xpack.watcher.trigger.schedule.ScheduleRegistry; +import org.elasticsearch.xpack.watcher.trigger.schedule.ScheduleTrigger; import org.elasticsearch.xpack.watcher.trigger.schedule.ScheduleTriggerEngine; import org.elasticsearch.xpack.watcher.trigger.schedule.ScheduleTriggerEvent; -import org.elasticsearch.xpack.watcher.trigger.schedule.engine.BaseTriggerEngineTestCase; -import org.elasticsearch.xpack.watcher.trigger.schedule.engine.SchedulerScheduleTriggerEngine; import org.elasticsearch.xpack.watcher.trigger.schedule.engine.TickerScheduleTriggerEngine; +import org.elasticsearch.xpack.watcher.watch.Watch; import java.time.Clock; import java.util.ArrayList; -import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.Locale; import java.util.concurrent.atomic.AtomicBoolean; @@ -32,6 +34,8 @@ import static org.elasticsearch.xpack.watcher.trigger.schedule.Schedules.interva @SuppressForbidden(reason = "benchmark") public class ScheduleEngineTriggerBenchmark { + private static final Logger logger = ESLoggerFactory.getLogger(ScheduleEngineTriggerBenchmark.class); + public static void main(String[] args) throws Exception { int numWatches = 1000; int interval = 2; @@ -55,75 +59,52 @@ public class ScheduleEngineTriggerBenchmark { Settings settings = Settings.builder() .put("name", "test") .build(); - List jobs = new ArrayList<>(numWatches); + List watches = new ArrayList<>(numWatches); for (int i = 0; i < numWatches; i++) { - jobs.add(new BaseTriggerEngineTestCase.SimpleJob("job_" + i, interval(interval + "s"))); + watches.add(new Watch("job_" + i, new ScheduleTrigger(interval(interval + "s")), new ExecutableNoneInput(logger), + AlwaysCondition.INSTANCE, null, null, Collections.emptyList(), null, null)); } ScheduleRegistry scheduleRegistry = new ScheduleRegistry(emptySet()); - List impls = new ArrayList<>(Arrays.asList(new String[]{"schedule", "ticker"})); - Randomness.shuffle(impls); List results = new ArrayList<>(); - for (String impl : impls) { - System.gc(); - System.out.println("====================================="); - System.out.println("===> Testing [" + impl + "] scheduler"); - System.out.println("====================================="); - final AtomicBoolean running = new AtomicBoolean(false); - final AtomicInteger total = new AtomicInteger(); - final MeanMetric triggerMetric = new MeanMetric(); - final MeanMetric tooEarlyMetric = new MeanMetric(); + System.gc(); + System.out.println("====================================="); + System.out.println("===> Testing scheduler"); + System.out.println("====================================="); + final AtomicBoolean running = new AtomicBoolean(false); + final AtomicInteger total = new AtomicInteger(); + final MeanMetric triggerMetric = new MeanMetric(); + final MeanMetric tooEarlyMetric = new MeanMetric(); - final ScheduleTriggerEngine scheduler; - switch (impl) { - case "schedule": - scheduler = new SchedulerScheduleTriggerEngine(Settings.EMPTY, scheduleRegistry, Clock.systemUTC()) { - - @Override - protected void notifyListeners(String name, long triggeredTime, long scheduledTime) { - if (running.get()) { - measure(total, triggerMetric, tooEarlyMetric, triggeredTime, scheduledTime); - } - } - }; - break; - case "ticker": - scheduler = new TickerScheduleTriggerEngine(settings, scheduleRegistry, Clock.systemUTC()) { - - @Override - protected void notifyListeners(List events) { - if (running.get()) { - for (TriggerEvent event : events) { - ScheduleTriggerEvent scheduleTriggerEvent = (ScheduleTriggerEvent) event; - measure(total, triggerMetric, tooEarlyMetric, event.triggeredTime().getMillis(), - scheduleTriggerEvent.scheduledTime().getMillis()); - } - } - } - }; - break; - default: - throw new IllegalArgumentException("impl [" + impl + "] doesn't exist"); + final ScheduleTriggerEngine scheduler = new TickerScheduleTriggerEngine(settings, scheduleRegistry, Clock.systemUTC()) { + @Override + protected void notifyListeners(List events) { + if (running.get()) { + for (TriggerEvent event : events) { + ScheduleTriggerEvent scheduleTriggerEvent = (ScheduleTriggerEvent) event; + measure(total, triggerMetric, tooEarlyMetric, event.triggeredTime().getMillis(), + scheduleTriggerEvent.scheduledTime().getMillis()); + } + } } - scheduler.start(jobs); - System.out.println("Added [" + numWatches + "] jobs"); - running.set(true); - Thread.sleep(benchTime); - running.set(false); - scheduler.stop(); - System.out.println("done, triggered [" + total.get() + "] times, delayed triggered [" + triggerMetric.count() + - "] times, avg [" + triggerMetric.mean() + "] ms"); - results.add(new Stats(impl, total.get(), triggerMetric.count(), triggerMetric.mean(), tooEarlyMetric.count(), - tooEarlyMetric.mean())); - } + }; + scheduler.start(watches); + System.out.println("Added [" + numWatches + "] jobs"); + running.set(true); + Thread.sleep(benchTime); + running.set(false); + scheduler.stop(); + System.out.println("done, triggered [" + total.get() + "] times, delayed triggered [" + triggerMetric.count() + + "] times, avg [" + triggerMetric.mean() + "] ms"); + results.add(new Stats(total.get(), triggerMetric.count(), triggerMetric.mean(), tooEarlyMetric.count(), tooEarlyMetric.mean())); System.out.println(" Name | # triggered | # delayed | avg delay | # too early triggered | avg too early delay"); System.out.println("--------------- | ----------- | --------- | --------- | --------------------- | ------------------ "); for (Stats stats : results) { System.out.printf( Locale.ENGLISH, - "%15s | %11d | %9d | %9d | %21d | %18d\n", - stats.implementation, stats.numberOfTimesTriggered, stats.numberOfTimesDelayed, stats.avgDelayTime, + "%11d | %9d | %9d | %21d | %18d\n", + stats.numberOfTimesTriggered, stats.numberOfTimesDelayed, stats.avgDelayTime, stats.numberOfEarlyTriggered, stats.avgEarlyDelayTime ); } @@ -143,16 +124,14 @@ public class ScheduleEngineTriggerBenchmark { static class Stats { - final String implementation; final int numberOfTimesTriggered; final long numberOfTimesDelayed; final long avgDelayTime; final long numberOfEarlyTriggered; final long avgEarlyDelayTime; - Stats(String implementation, int numberOfTimesTriggered, long numberOfTimesDelayed, double avgDelayTime, + Stats(int numberOfTimesTriggered, long numberOfTimesDelayed, double avgDelayTime, long numberOfEarlyTriggered, double avgEarlyDelayTime) { - this.implementation = implementation; this.numberOfTimesTriggered = numberOfTimesTriggered; this.numberOfTimesDelayed = numberOfTimesDelayed; this.avgDelayTime = Math.round(avgDelayTime); diff --git a/plugin/src/test/java/org/elasticsearch/xpack/watcher/test/bench/WatcherExecutorServiceBenchmark.java b/plugin/src/test/java/org/elasticsearch/xpack/watcher/test/bench/WatcherExecutorServiceBenchmark.java index fa079d51a3a..27844ad1e78 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/watcher/test/bench/WatcherExecutorServiceBenchmark.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/watcher/test/bench/WatcherExecutorServiceBenchmark.java @@ -224,8 +224,6 @@ public class WatcherExecutorServiceBenchmark { protected TriggerEngine getTriggerEngine(Clock clock, ScheduleRegistry scheduleRegistry) { return new ScheduleTriggerEngineMock(settings, scheduleRegistry, clock); } - - } } } diff --git a/plugin/src/test/java/org/elasticsearch/xpack/watcher/trigger/ScheduleTriggerEngineMock.java b/plugin/src/test/java/org/elasticsearch/xpack/watcher/trigger/ScheduleTriggerEngineMock.java index 3ed958e6c1c..0cf3e5b3257 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/watcher/trigger/ScheduleTriggerEngineMock.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/watcher/trigger/ScheduleTriggerEngineMock.java @@ -15,12 +15,13 @@ import org.elasticsearch.xpack.watcher.trigger.schedule.ScheduleRegistry; import org.elasticsearch.xpack.watcher.trigger.schedule.ScheduleTrigger; import org.elasticsearch.xpack.watcher.trigger.schedule.ScheduleTriggerEngine; import org.elasticsearch.xpack.watcher.trigger.schedule.ScheduleTriggerEvent; +import org.elasticsearch.xpack.watcher.watch.Watch; import org.joda.time.DateTime; import java.io.IOException; import java.time.Clock; -import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -31,12 +32,11 @@ import java.util.concurrent.ConcurrentMap; public class ScheduleTriggerEngineMock extends ScheduleTriggerEngine { private final Logger logger; - private final ConcurrentMap jobs = new ConcurrentHashMap<>(); + private final ConcurrentMap watches = new ConcurrentHashMap<>(); public ScheduleTriggerEngineMock(Settings settings, ScheduleRegistry scheduleRegistry, Clock clock) { super(settings, scheduleRegistry, clock); this.logger = Loggers.getLogger(ScheduleTriggerEngineMock.class, settings); - } @Override @@ -51,7 +51,7 @@ public class ScheduleTriggerEngineMock extends ScheduleTriggerEngine { } @Override - public void start(Collection jobs) { + public void start(Collection jobs) { } @Override @@ -59,13 +59,13 @@ public class ScheduleTriggerEngineMock extends ScheduleTriggerEngine { } @Override - public void add(Job job) { - jobs.put(job.id(), job); + public void add(Watch watch) { + watches.put(watch.id(), watch); } @Override public boolean remove(String jobId) { - return jobs.remove(jobId) != null; + return watches.remove(jobId) != null; } public void trigger(String jobName) { @@ -81,9 +81,7 @@ public class ScheduleTriggerEngineMock extends ScheduleTriggerEngine { DateTime now = new DateTime(clock.millis()); logger.debug("firing [{}] at [{}]", jobName, now); ScheduleTriggerEvent event = new ScheduleTriggerEvent(jobName, now, now); - for (Listener listener : listeners) { - listener.triggered(Arrays.asList(event)); - } + consumers.forEach(consumer -> consumer.accept(Collections.singletonList(event))); if (interval != null) { if (clock instanceof ClockMock) { ((ClockMock) clock).fastForward(interval); diff --git a/plugin/src/test/java/org/elasticsearch/xpack/watcher/trigger/schedule/engine/BaseTriggerEngineTestCase.java b/plugin/src/test/java/org/elasticsearch/xpack/watcher/trigger/schedule/engine/BaseTriggerEngineTestCase.java deleted file mode 100644 index 6904ec1a63f..00000000000 --- a/plugin/src/test/java/org/elasticsearch/xpack/watcher/trigger/schedule/engine/BaseTriggerEngineTestCase.java +++ /dev/null @@ -1,243 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ -package org.elasticsearch.xpack.watcher.trigger.schedule.engine; - -import org.elasticsearch.test.ESTestCase; -import org.elasticsearch.xpack.support.clock.ClockMock; -import org.elasticsearch.xpack.watcher.trigger.Trigger; -import org.elasticsearch.xpack.watcher.trigger.TriggerEngine; -import org.elasticsearch.xpack.watcher.trigger.TriggerEvent; -import org.elasticsearch.xpack.watcher.trigger.schedule.Schedule; -import org.elasticsearch.xpack.watcher.trigger.schedule.ScheduleTrigger; -import org.elasticsearch.xpack.watcher.trigger.schedule.support.DayOfWeek; -import org.elasticsearch.xpack.watcher.trigger.schedule.support.WeekTimes; -import org.joda.time.DateTime; -import org.junit.After; -import org.junit.Before; - -import java.util.ArrayList; -import java.util.BitSet; -import java.util.Collections; -import java.util.List; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; - -import static org.elasticsearch.xpack.watcher.trigger.schedule.Schedules.daily; -import static org.elasticsearch.xpack.watcher.trigger.schedule.Schedules.interval; -import static org.elasticsearch.xpack.watcher.trigger.schedule.Schedules.weekly; -import static org.hamcrest.Matchers.is; -import static org.joda.time.DateTimeZone.UTC; - -public abstract class BaseTriggerEngineTestCase extends ESTestCase { - - private TriggerEngine engine; - protected ClockMock clock = ClockMock.frozen(); - - @Before - public void init() throws Exception { - engine = createEngine(); - } - - protected abstract TriggerEngine createEngine(); - - /** - * Dependending on the trigger engine used, we may need to advance the clock, because the implementation might use the clock - * in order to check for new jobs being executed - */ - protected abstract void advanceClockIfNeeded(DateTime newCurrentDateTime); - - @After - public void cleanup() throws Exception { - engine.stop(); - } - - public void testStart() throws Exception { - int count = randomIntBetween(2, 5); - final CountDownLatch firstLatch = new CountDownLatch(count); - final CountDownLatch secondLatch = new CountDownLatch(count); - List jobs = new ArrayList<>(); - for (int i = 0; i < count; i++) { - jobs.add(new SimpleJob(String.valueOf(i), interval("1s"))); - } - final BitSet bits = new BitSet(count); - - engine.register(events -> { - for (TriggerEvent event : events) { - int index = Integer.parseInt(event.jobName()); - if (!bits.get(index)) { - logger.info("job [{}] first fire", index); - bits.set(index); - firstLatch.countDown(); - } else { - logger.info("job [{}] second fire", index); - secondLatch.countDown(); - } - } - }); - - engine.start(jobs); - advanceClockIfNeeded(new DateTime(clock.millis(), UTC).plusMillis(1100)); - if (!firstLatch.await(3 * count, TimeUnit.SECONDS)) { - fail("waiting too long for all watches to be triggered"); - } - - advanceClockIfNeeded(new DateTime(clock.millis(), UTC).plusMillis(1100)); - if (!secondLatch.await(3 * count, TimeUnit.SECONDS)) { - fail("waiting too long for all watches to be triggered"); - } - engine.stop(); - assertThat(bits.cardinality(), is(count)); - } - - public void testAddHourly() throws Exception { - final String name = "job_name"; - final CountDownLatch latch = new CountDownLatch(1); - engine.start(Collections.emptySet()); - engine.register(events -> { - for (TriggerEvent event : events) { - assertThat(event.jobName(), is(name)); - logger.info("triggered job on [{}]", clock); - } - latch.countDown(); - }); - - int randomMinute = randomIntBetween(0, 59); - DateTime testNowTime = new DateTime(clock.millis(), UTC).withMinuteOfHour(randomMinute).withSecondOfMinute(59); - DateTime scheduledTime = testNowTime.plusSeconds(2); - logger.info("Setting current time to [{}], job execution time [{}]", testNowTime, scheduledTime); - - clock.setTime(testNowTime); - engine.add(new SimpleJob(name, daily().at(scheduledTime.getHourOfDay(), scheduledTime.getMinuteOfHour()).build())); - advanceClockIfNeeded(scheduledTime); - - if (!latch.await(5, TimeUnit.SECONDS)) { - fail("waiting too long for all watches to be triggered"); - } - } - - public void testAddDaily() throws Exception { - final String name = "job_name"; - final CountDownLatch latch = new CountDownLatch(1); - engine.start(Collections.emptySet()); - - engine.register(events -> { - for (TriggerEvent event : events) { - assertThat(event.jobName(), is(name)); - logger.info("triggered job on [{}]", new DateTime(clock.millis(), UTC)); - latch.countDown(); - } - }); - - int randomHour = randomIntBetween(0, 23); - int randomMinute = randomIntBetween(0, 59); - - DateTime testNowTime = new DateTime(clock.millis(), UTC).withHourOfDay(randomHour) - .withMinuteOfHour(randomMinute).withSecondOfMinute(59); - DateTime scheduledTime = testNowTime.plusSeconds(2); - logger.info("Setting current time to [{}], job execution time [{}]", testNowTime, scheduledTime); - - clock.setTime(testNowTime); - engine.add(new SimpleJob(name, daily().at(scheduledTime.getHourOfDay(), scheduledTime.getMinuteOfHour()).build())); - advanceClockIfNeeded(scheduledTime); - - if (!latch.await(5, TimeUnit.SECONDS)) { - fail("waiting too long for all watches to be triggered"); - } - } - - public void testAddWeekly() throws Exception { - final String name = "job_name"; - final CountDownLatch latch = new CountDownLatch(1); - engine.start(Collections.emptySet()); - engine.register(events -> { - for (TriggerEvent event : events) { - assertThat(event.jobName(), is(name)); - logger.info("triggered job"); - } - latch.countDown(); - }); - - int randomHour = randomIntBetween(0, 23); - int randomMinute = randomIntBetween(0, 59); - int randomDay = randomIntBetween(1, 7); - - DateTime testNowTime = new DateTime(clock.millis(), UTC).withDayOfWeek(randomDay).withHourOfDay(randomHour) - .withMinuteOfHour(randomMinute).withSecondOfMinute(59); - DateTime scheduledTime = testNowTime.plusSeconds(2); - - logger.info("Setting current time to [{}], job execution time [{}]", testNowTime, scheduledTime); - clock.setTime(testNowTime); - - // fun part here (aka WTF): DayOfWeek with Joda is MON-SUN, starting at 1 - // DayOfWeek with Watcher is SUN-SAT, starting at 1 - int watcherDay = (scheduledTime.getDayOfWeek() % 7) + 1; - engine.add(new SimpleJob(name, weekly().time(WeekTimes.builder() - .on(DayOfWeek.resolve(watcherDay)) - .at(scheduledTime.getHourOfDay(), scheduledTime.getMinuteOfHour()).build()).build())); - advanceClockIfNeeded(scheduledTime); - - if (!latch.await(5, TimeUnit.SECONDS)) { - fail("waiting too long for all watches to be triggered"); - } - } - - public void testAddSameJobSeveralTimesAndExecutedOnce() throws InterruptedException { - engine.start(Collections.emptySet()); - - final CountDownLatch firstLatch = new CountDownLatch(1); - final CountDownLatch secondLatch = new CountDownLatch(1); - AtomicInteger counter = new AtomicInteger(0); - engine.register(events -> { - events.forEach(event -> { - if (counter.getAndIncrement() == 0) { - firstLatch.countDown(); - } else { - secondLatch.countDown(); - } - }); - }); - - int times = scaledRandomIntBetween(3, 30); - for (int i = 0; i < times; i++) { - engine.add(new SimpleJob("_id", interval("1s"))); - } - - advanceClockIfNeeded(new DateTime(clock.millis(), UTC).plusMillis(1100)); - if (!firstLatch.await(3, TimeUnit.SECONDS)) { - fail("waiting too long for all watches to be triggered"); - } - - advanceClockIfNeeded(new DateTime(clock.millis(), UTC).plusMillis(1100)); - if (!secondLatch.await(3, TimeUnit.SECONDS)) { - fail("waiting too long for all watches to be triggered"); - } - - // ensure job was only called twice independent from its name - assertThat(counter.get(), is(2)); - } - - public static class SimpleJob implements TriggerEngine.Job { - - private final String name; - private final ScheduleTrigger trigger; - - public SimpleJob(String name, Schedule schedule) { - this.name = name; - this.trigger = new ScheduleTrigger(schedule); - } - - @Override - public String id() { - return name; - } - - @Override - public Trigger trigger() { - return trigger; - } - } -} diff --git a/plugin/src/test/java/org/elasticsearch/xpack/watcher/trigger/schedule/engine/SchedulerScheduleEngineTests.java b/plugin/src/test/java/org/elasticsearch/xpack/watcher/trigger/schedule/engine/SchedulerScheduleEngineTests.java deleted file mode 100644 index 8a344b88d96..00000000000 --- a/plugin/src/test/java/org/elasticsearch/xpack/watcher/trigger/schedule/engine/SchedulerScheduleEngineTests.java +++ /dev/null @@ -1,26 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ -package org.elasticsearch.xpack.watcher.trigger.schedule.engine; - -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.xpack.watcher.trigger.TriggerEngine; -import org.elasticsearch.xpack.watcher.trigger.schedule.ScheduleRegistry; -import org.joda.time.DateTime; - -import static org.mockito.Mockito.mock; - -public class SchedulerScheduleEngineTests extends BaseTriggerEngineTestCase { - - protected TriggerEngine createEngine() { - return new SchedulerScheduleTriggerEngine(Settings.EMPTY, mock(ScheduleRegistry.class), clock); - } - - // the scheduler configures hard when to schedule, and does not check the clock - @Override - protected void advanceClockIfNeeded(DateTime newCurrentDateTime) { - } - -} diff --git a/plugin/src/test/java/org/elasticsearch/xpack/watcher/trigger/schedule/engine/TickerScheduleEngineTests.java b/plugin/src/test/java/org/elasticsearch/xpack/watcher/trigger/schedule/engine/TickerScheduleEngineTests.java index 869f907cbe6..aef7c4dd6e2 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/watcher/trigger/schedule/engine/TickerScheduleEngineTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/watcher/trigger/schedule/engine/TickerScheduleEngineTests.java @@ -5,22 +5,254 @@ */ package org.elasticsearch.xpack.watcher.trigger.schedule.engine; +import org.apache.logging.log4j.Logger; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xpack.support.clock.ClockMock; +import org.elasticsearch.xpack.watcher.condition.AlwaysCondition; +import org.elasticsearch.xpack.watcher.input.none.ExecutableNoneInput; import org.elasticsearch.xpack.watcher.trigger.TriggerEngine; +import org.elasticsearch.xpack.watcher.trigger.TriggerEvent; +import org.elasticsearch.xpack.watcher.trigger.schedule.Schedule; import org.elasticsearch.xpack.watcher.trigger.schedule.ScheduleRegistry; +import org.elasticsearch.xpack.watcher.trigger.schedule.ScheduleTrigger; +import org.elasticsearch.xpack.watcher.trigger.schedule.support.DayOfWeek; +import org.elasticsearch.xpack.watcher.trigger.schedule.support.WeekTimes; +import org.elasticsearch.xpack.watcher.watch.Watch; import org.joda.time.DateTime; +import org.junit.After; +import org.junit.Before; +import java.util.ArrayList; +import java.util.BitSet; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; + +import static org.elasticsearch.xpack.watcher.trigger.schedule.Schedules.daily; +import static org.elasticsearch.xpack.watcher.trigger.schedule.Schedules.interval; +import static org.elasticsearch.xpack.watcher.trigger.schedule.Schedules.weekly; +import static org.hamcrest.Matchers.is; +import static org.joda.time.DateTimeZone.UTC; import static org.mockito.Mockito.mock; -public class TickerScheduleEngineTests extends BaseTriggerEngineTestCase { +public class TickerScheduleEngineTests extends ESTestCase { - @Override - protected TriggerEngine createEngine() { - return new TickerScheduleTriggerEngine(Settings.EMPTY, mock(ScheduleRegistry.class), clock); + private TriggerEngine engine; + protected ClockMock clock = ClockMock.frozen(); + + @Before + public void init() throws Exception { + engine = createEngine(); } - @Override - protected void advanceClockIfNeeded(DateTime newCurrentDateTime) { + private TriggerEngine createEngine() { + return new TickerScheduleTriggerEngine(Settings.EMPTY, + mock(ScheduleRegistry.class), clock); + } + + private void advanceClockIfNeeded(DateTime newCurrentDateTime) { clock.setTime(newCurrentDateTime); } + + @After + public void cleanup() throws Exception { + engine.stop(); + } + + public void testStart() throws Exception { + int count = randomIntBetween(2, 5); + final CountDownLatch firstLatch = new CountDownLatch(count); + final CountDownLatch secondLatch = new CountDownLatch(count); + List watches = new ArrayList<>(); + for (int i = 0; i < count; i++) { + watches.add(createWatch(String.valueOf(i), interval("1s"))); + } + final BitSet bits = new BitSet(count); + + engine.register(new Consumer>() { + @Override + public void accept(Iterable events) { + for (TriggerEvent event : events) { + int index = Integer.parseInt(event.jobName()); + if (!bits.get(index)) { + logger.info("job [{}] first fire", index); + bits.set(index); + firstLatch.countDown(); + } else { + logger.info("job [{}] second fire", index); + secondLatch.countDown(); + } + } + } + }); + + engine.start(watches); + advanceClockIfNeeded(new DateTime(clock.millis(), UTC).plusMillis(1100)); + if (!firstLatch.await(3 * count, TimeUnit.SECONDS)) { + fail("waiting too long for all watches to be triggered"); + } + + advanceClockIfNeeded(new DateTime(clock.millis(), UTC).plusMillis(1100)); + if (!secondLatch.await(3 * count, TimeUnit.SECONDS)) { + fail("waiting too long for all watches to be triggered"); + } + engine.stop(); + assertThat(bits.cardinality(), is(count)); + } + + public void testAddHourly() throws Exception { + final String name = "job_name"; + final CountDownLatch latch = new CountDownLatch(1); + engine.start(Collections.emptySet()); + engine.register(new Consumer>() { + @Override + public void accept(Iterable events) { + for (TriggerEvent event : events) { + assertThat(event.jobName(), is(name)); + logger.info("triggered job on [{}]", clock); + } + latch.countDown(); + } + }); + + int randomMinute = randomIntBetween(0, 59); + DateTime testNowTime = new DateTime(clock.millis(), UTC).withMinuteOfHour(randomMinute) + .withSecondOfMinute(59); + DateTime scheduledTime = testNowTime.plusSeconds(2); + logger.info("Setting current time to [{}], job execution time [{}]", testNowTime, + scheduledTime); + + clock.setTime(testNowTime); + engine.add(createWatch(name, daily().at(scheduledTime.getHourOfDay(), + scheduledTime.getMinuteOfHour()).build())); + advanceClockIfNeeded(scheduledTime); + + if (!latch.await(5, TimeUnit.SECONDS)) { + fail("waiting too long for all watches to be triggered"); + } + } + + public void testAddDaily() throws Exception { + final String name = "job_name"; + final CountDownLatch latch = new CountDownLatch(1); + engine.start(Collections.emptySet()); + + engine.register(new Consumer>() { + @Override + public void accept(Iterable events) { + for (TriggerEvent event : events) { + assertThat(event.jobName(), is(name)); + logger.info("triggered job on [{}]", new DateTime(clock.millis(), UTC)); + latch.countDown(); + } + } + }); + + int randomHour = randomIntBetween(0, 23); + int randomMinute = randomIntBetween(0, 59); + + DateTime testNowTime = new DateTime(clock.millis(), UTC).withHourOfDay(randomHour) + .withMinuteOfHour(randomMinute).withSecondOfMinute(59); + DateTime scheduledTime = testNowTime.plusSeconds(2); + logger.info("Setting current time to [{}], job execution time [{}]", testNowTime, + scheduledTime); + + clock.setTime(testNowTime); + engine.add(createWatch(name, daily().at(scheduledTime.getHourOfDay(), + scheduledTime.getMinuteOfHour()).build())); + advanceClockIfNeeded(scheduledTime); + + if (!latch.await(5, TimeUnit.SECONDS)) { + fail("waiting too long for all watches to be triggered"); + } + } + + public void testAddWeekly() throws Exception { + final String name = "job_name"; + final CountDownLatch latch = new CountDownLatch(1); + engine.start(Collections.emptySet()); + engine.register(new Consumer>() { + @Override + public void accept(Iterable events) { + for (TriggerEvent event : events) { + assertThat(event.jobName(), is(name)); + logger.info("triggered job"); + } + latch.countDown(); + } + }); + + int randomHour = randomIntBetween(0, 23); + int randomMinute = randomIntBetween(0, 59); + int randomDay = randomIntBetween(1, 7); + + DateTime testNowTime = new DateTime(clock.millis(), UTC).withDayOfWeek(randomDay) + .withHourOfDay(randomHour).withMinuteOfHour(randomMinute).withSecondOfMinute(59); + DateTime scheduledTime = testNowTime.plusSeconds(2); + + logger.info("Setting current time to [{}], job execution time [{}]", testNowTime, + scheduledTime); + clock.setTime(testNowTime); + + // fun part here (aka WTF): DayOfWeek with Joda is MON-SUN, starting at 1 + // DayOfWeek with Watcher is SUN-SAT, starting at 1 + int watcherDay = (scheduledTime.getDayOfWeek() % 7) + 1; + engine.add(createWatch(name, weekly().time(WeekTimes.builder() + .on(DayOfWeek.resolve(watcherDay)) + .at(scheduledTime.getHourOfDay(), scheduledTime.getMinuteOfHour()).build()) + .build())); + advanceClockIfNeeded(scheduledTime); + + if (!latch.await(5, TimeUnit.SECONDS)) { + fail("waiting too long for all watches to be triggered"); + } + } + + public void testAddSameJobSeveralTimesAndExecutedOnce() throws InterruptedException { + engine.start(Collections.emptySet()); + + final CountDownLatch firstLatch = new CountDownLatch(1); + final CountDownLatch secondLatch = new CountDownLatch(1); + AtomicInteger counter = new AtomicInteger(0); + engine.register(new Consumer>() { + @Override + public void accept(Iterable events) { + events.forEach(event -> { + if (counter.getAndIncrement() == 0) { + firstLatch.countDown(); + } else { + secondLatch.countDown(); + } + }); + } + }); + + int times = scaledRandomIntBetween(3, 30); + for (int i = 0; i < times; i++) { + engine.add(createWatch("_id", interval("1s"))); + } + + advanceClockIfNeeded(new DateTime(clock.millis(), UTC).plusMillis(1100)); + if (!firstLatch.await(3, TimeUnit.SECONDS)) { + fail("waiting too long for all watches to be triggered"); + } + + advanceClockIfNeeded(new DateTime(clock.millis(), UTC).plusMillis(1100)); + if (!secondLatch.await(3, TimeUnit.SECONDS)) { + fail("waiting too long for all watches to be triggered"); + } + + // ensure job was only called twice independent from its name + assertThat(counter.get(), is(2)); + } + + private Watch createWatch(String name, Schedule schedule) { + return new Watch(name, new ScheduleTrigger(schedule), new ExecutableNoneInput(logger), + AlwaysCondition.INSTANCE, null, null, + Collections.emptyList(), null, null); + } } diff --git a/plugin/src/test/java/org/elasticsearch/xpack/watcher/watch/WatchTests.java b/plugin/src/test/java/org/elasticsearch/xpack/watcher/watch/WatchTests.java index d0f97c429b3..4b35078fe80 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/watcher/watch/WatchTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/watcher/watch/WatchTests.java @@ -532,7 +532,7 @@ public class WatchTests extends ESTestCase { } @Override - public void start(Collection jobs) { + public void start(Collection jobs) { } @Override @@ -540,11 +540,7 @@ public class WatchTests extends ESTestCase { } @Override - public void register(Listener listener) { - } - - @Override - public void add(Job job) { + public void add(Watch watch) { } @Override