Watcher: Remove scheduler based trigger engine (elastic/x-pack-elasticsearch#724)

The scheduler based trigger engine is not enabled by default
as the ticker based trigger engine is used. As we dont use it
in production, this commit removes this specific implementation.

It also removes some uneeded abstractions like AbstractTriggerEngine, TriggerEngine.Listener and TriggerEngine.Job

Original commit: elastic/x-pack-elasticsearch@b17a2e9d62
This commit is contained in:
Alexander Reelsen 2017-03-21 10:27:41 +01:00 committed by GitHub
parent 1a7e842c15
commit 0c7c2f521c
20 changed files with 365 additions and 567 deletions

View File

@ -80,7 +80,7 @@ import org.elasticsearch.xpack.watcher.condition.ConditionFactory;
import org.elasticsearch.xpack.watcher.condition.ConditionRegistry;
import org.elasticsearch.xpack.watcher.condition.NeverCondition;
import org.elasticsearch.xpack.watcher.condition.ScriptCondition;
import org.elasticsearch.xpack.watcher.execution.AsyncTriggerListener;
import org.elasticsearch.xpack.watcher.execution.AsyncTriggerEventConsumer;
import org.elasticsearch.xpack.watcher.execution.ExecutionService;
import org.elasticsearch.xpack.watcher.execution.InternalWatchExecutor;
import org.elasticsearch.xpack.watcher.execution.TriggeredWatch;
@ -135,6 +135,7 @@ import org.elasticsearch.xpack.watcher.transport.actions.service.WatcherServiceA
import org.elasticsearch.xpack.watcher.transport.actions.stats.TransportWatcherStatsAction;
import org.elasticsearch.xpack.watcher.transport.actions.stats.WatcherStatsAction;
import org.elasticsearch.xpack.watcher.trigger.TriggerEngine;
import org.elasticsearch.xpack.watcher.trigger.TriggerEvent;
import org.elasticsearch.xpack.watcher.trigger.TriggerService;
import org.elasticsearch.xpack.watcher.trigger.manual.ManualTriggerEngine;
import org.elasticsearch.xpack.watcher.trigger.schedule.CronSchedule;
@ -146,7 +147,6 @@ import org.elasticsearch.xpack.watcher.trigger.schedule.Schedule;
import org.elasticsearch.xpack.watcher.trigger.schedule.ScheduleRegistry;
import org.elasticsearch.xpack.watcher.trigger.schedule.WeeklySchedule;
import org.elasticsearch.xpack.watcher.trigger.schedule.YearlySchedule;
import org.elasticsearch.xpack.watcher.trigger.schedule.engine.SchedulerScheduleTriggerEngine;
import org.elasticsearch.xpack.watcher.trigger.schedule.engine.TickerScheduleTriggerEngine;
import org.elasticsearch.xpack.watcher.watch.Watch;
import org.joda.time.DateTime;
@ -162,12 +162,12 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import static java.util.Collections.emptyList;
import static org.elasticsearch.xpack.watcher.support.Exceptions.illegalState;
public class Watcher implements ActionPlugin, ScriptPlugin {
@ -177,19 +177,6 @@ public class Watcher implements ActionPlugin, ScriptPlugin {
Setting.boolSetting("xpack.watcher.encrypt_sensitive_data", false, Setting.Property.NodeScope);
public static final Setting<TimeValue> MAX_STOP_TIMEOUT_SETTING =
Setting.timeSetting("xpack.watcher.stop.timeout", TimeValue.timeValueSeconds(30), Setting.Property.NodeScope);
public static final Setting<String> TRIGGER_SCHEDULE_ENGINE_SETTING =
new Setting<>("xpack.watcher.trigger.schedule.engine", "ticker", s -> {
switch (s) {
case "ticker":
case "scheduler":
return s;
default:
throw new IllegalArgumentException("Can't parse [xpack.watcher.trigger.schedule.engine] must be one of [ticker, " +
"scheduler], was [" + s + "]");
}
}, Setting.Property.NodeScope);
private static final ScriptContext.Plugin SCRIPT_PLUGIN = new ScriptContext.Plugin("xpack", "watch");
public static final ScriptContext SCRIPT_CONTEXT = SCRIPT_PLUGIN::getKey;
@ -288,7 +275,7 @@ public class Watcher implements ActionPlugin, ScriptPlugin {
final ScheduleRegistry scheduleRegistry = new ScheduleRegistry(scheduleParsers);
TriggerEngine manualTriggerEngine = new ManualTriggerEngine();
TriggerEngine configuredTriggerEngine = getTriggerEngine(clock, scheduleRegistry);
final TriggerEngine configuredTriggerEngine = getTriggerEngine(clock, scheduleRegistry);
final Set<TriggerEngine> triggerEngines = new HashSet<>();
triggerEngines.add(manualTriggerEngine);
@ -306,7 +293,7 @@ public class Watcher implements ActionPlugin, ScriptPlugin {
final ExecutionService executionService = new ExecutionService(settings, historyStore, triggeredWatchStore, watchExecutor,
clock, threadPool, watchParser, watcherClientProxy);
final TriggerEngine.Listener triggerEngineListener = getTriggerEngineListener(executionService);
final Consumer<Iterable<TriggerEvent>> triggerEngineListener = getTriggerEngineListener(executionService);
triggerService.register(triggerEngineListener);
final WatcherIndexTemplateRegistry watcherIndexTemplateRegistry = new WatcherIndexTemplateRegistry(settings,
@ -324,23 +311,15 @@ public class Watcher implements ActionPlugin, ScriptPlugin {
}
protected TriggerEngine getTriggerEngine(Clock clock, ScheduleRegistry scheduleRegistry) {
String engine = TRIGGER_SCHEDULE_ENGINE_SETTING.get(settings);
switch (engine) {
case "scheduler":
return new SchedulerScheduleTriggerEngine(settings, scheduleRegistry, clock);
case "ticker":
return new TickerScheduleTriggerEngine(settings, scheduleRegistry, clock);
default: // should never happen, as the setting is already parsing for scheduler/ticker
throw illegalState("schedule engine must be either set to [scheduler] or [ticker], but was []", engine);
}
return new TickerScheduleTriggerEngine(settings, scheduleRegistry, clock);
}
protected WatchExecutor getWatchExecutor(ThreadPool threadPool) {
return new InternalWatchExecutor(threadPool);
}
protected TriggerEngine.Listener getTriggerEngineListener(ExecutionService executionService) {
return new AsyncTriggerListener(settings, executionService);
protected Consumer<Iterable<TriggerEvent>> getTriggerEngineListener(ExecutionService executionService) {
return new AsyncTriggerEventConsumer(settings, executionService);
}
private <T> T getService(Class<T> serviceClass, Collection<Object> services) {
@ -374,7 +353,6 @@ public class Watcher implements ActionPlugin, ScriptPlugin {
for (TemplateConfig templateConfig : WatcherIndexTemplateRegistry.TEMPLATE_CONFIGS) {
settings.add(templateConfig.getSetting());
}
settings.add(TRIGGER_SCHEDULE_ENGINE_SETTING);
settings.add(INDEX_WATCHER_TEMPLATE_VERSION_SETTING);
settings.add(MAX_STOP_TIMEOUT_SETTING);
settings.add(ExecutionService.DEFAULT_THROTTLE_PERIOD_SETTING);

View File

@ -10,33 +10,33 @@ import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.xpack.watcher.trigger.TriggerEngine;
import org.elasticsearch.xpack.watcher.trigger.TriggerEvent;
import java.util.function.Consumer;
import static java.util.stream.StreamSupport.stream;
public class AsyncTriggerListener implements TriggerEngine.Listener {
public class AsyncTriggerEventConsumer implements Consumer<Iterable<TriggerEvent>> {
private final Logger logger;
private final ExecutionService executionService;
public AsyncTriggerListener(Settings settings, ExecutionService executionService) {
this.logger = Loggers.getLogger(SyncTriggerListener.class, settings);
public AsyncTriggerEventConsumer(Settings settings, ExecutionService executionService) {
this.logger = Loggers.getLogger(SyncTriggerEventConsumer.class, settings);
this.executionService = executionService;
}
@Override
public void triggered(Iterable<TriggerEvent> events) {
public void accept(Iterable<TriggerEvent> events) {
try {
executionService.processEventsAsync(events);
} catch (Exception e) {
logger.error(
(Supplier<?>) () -> new ParameterizedMessage(
"failed to process triggered events [{}]",
(Object) stream(events.spliterator(), false).toArray(size -> new TriggerEvent[size])),
(Object) stream(events.spliterator(), false).toArray(size ->
new TriggerEvent[size])),
e);
}
}
}

View File

@ -10,32 +10,33 @@ import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.xpack.watcher.trigger.TriggerEngine;
import org.elasticsearch.xpack.watcher.trigger.TriggerEvent;
import java.util.function.Consumer;
import static java.util.stream.StreamSupport.stream;
public class SyncTriggerListener implements TriggerEngine.Listener {
public class SyncTriggerEventConsumer implements Consumer<Iterable<TriggerEvent>> {
private final ExecutionService executionService;
private final Logger logger;
public SyncTriggerListener(Settings settings, ExecutionService executionService) {
this.logger = Loggers.getLogger(SyncTriggerListener.class, settings);
public SyncTriggerEventConsumer(Settings settings, ExecutionService executionService) {
this.logger = Loggers.getLogger(SyncTriggerEventConsumer.class, settings);
this.executionService = executionService;
}
@Override
public void triggered(Iterable<TriggerEvent> events) {
public void accept(Iterable<TriggerEvent> events) {
try {
executionService.processEventsSync(events);
} catch (Exception e) {
logger.error(
(Supplier<?>) () -> new ParameterizedMessage(
"failed to process triggered events [{}]",
(Object) stream(events.spliterator(), false).toArray(size -> new TriggerEvent[size])),
(Object) stream(events.spliterator(), false).toArray(size ->
new TriggerEvent[size])),
e);
}
}
}

View File

@ -1,27 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.watcher.trigger;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Settings;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
public abstract class AbstractTriggerEngine<T extends Trigger, E extends TriggerEvent> extends AbstractComponent implements
TriggerEngine<T, E> {
protected final List<Listener> listeners = new CopyOnWriteArrayList<>();
public AbstractTriggerEngine(Settings settings) {
super(settings);
}
@Override
public void register(Listener listener) {
listeners.add(listener);
}
}

View File

@ -7,10 +7,12 @@ package org.elasticsearch.xpack.watcher.trigger;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.xpack.watcher.watch.Watch;
import java.io.IOException;
import java.util.Collection;
import java.util.Map;
import java.util.function.Consumer;
public interface TriggerEngine<T extends Trigger, E extends TriggerEvent> {
@ -20,13 +22,13 @@ public interface TriggerEngine<T extends Trigger, E extends TriggerEvent> {
* It's the responsibility of the trigger engine implementation to select the appropriate jobs
* from the given list of jobs
*/
void start(Collection<Job> jobs);
void start(Collection<Watch> jobs);
void stop();
void register(Listener listener);
void register(Consumer<Iterable<TriggerEvent>> consumer);
void add(Job job);
void add(Watch job);
/**
* Removes the job associated with the given name from this trigger engine.
@ -41,19 +43,4 @@ public interface TriggerEngine<T extends Trigger, E extends TriggerEvent> {
T parseTrigger(String context, XContentParser parser) throws IOException;
E parseTriggerEvent(TriggerService service, String watchId, String context, XContentParser parser) throws IOException;
interface Listener {
void triggered(Iterable<TriggerEvent> events);
}
interface Job {
String id();
Trigger trigger();
}
}

View File

@ -18,22 +18,22 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.function.Consumer;
import static java.util.Collections.unmodifiableMap;
import static org.elasticsearch.xpack.watcher.support.Exceptions.illegalArgument;
public class TriggerService extends AbstractComponent {
private final Listeners listeners;
private final GroupedConsumer consumer = new GroupedConsumer();
private final Map<String, TriggerEngine> engines;
public TriggerService(Settings settings, Set<TriggerEngine> engines) {
super(settings);
listeners = new Listeners();
Map<String, TriggerEngine> builder = new HashMap<>();
for (TriggerEngine engine : engines) {
builder.put(engine.type(), engine);
engine.register(listeners);
engine.register(consumer);
}
this.engines = unmodifiableMap(builder);
}
@ -54,10 +54,10 @@ public class TriggerService extends AbstractComponent {
* Adds the given job to the trigger service. If there is already a registered job in this service with the
* same job ID, the newly added job will replace the old job (the old job will not be triggered anymore)
*
* @param job The new job
* @param watch The new watch
*/
public void add(TriggerEngine.Job job) {
engines.get(job.trigger().type()).add(job);
public void add(Watch watch) {
engines.get(watch.trigger().type()).add(watch);
}
/**
@ -75,8 +75,8 @@ public class TriggerService extends AbstractComponent {
return false;
}
public void register(TriggerEngine.Listener listener) {
listeners.add(listener);
public void register(Consumer<Iterable<TriggerEvent>> consumer) {
this.consumer.add(consumer);
}
public TriggerEvent simulateEvent(String type, String jobId, Map<String, Object> data) {
@ -149,20 +149,17 @@ public class TriggerService extends AbstractComponent {
return engine.parseTriggerEvent(this, watchId, context, parser);
}
static class Listeners implements TriggerEngine.Listener {
static class GroupedConsumer implements java.util.function.Consumer<Iterable<TriggerEvent>> {
private List<TriggerEngine.Listener> listeners = new CopyOnWriteArrayList<>();
private List<Consumer<Iterable<TriggerEvent>>> consumers = new CopyOnWriteArrayList<>();
public void add(TriggerEngine.Listener listener) {
listeners.add(listener);
public void add(Consumer<Iterable<TriggerEvent>> consumer) {
consumers.add(consumer);
}
@Override
public void triggered(Iterable<TriggerEvent> events) {
for (TriggerEngine.Listener listener : listeners) {
listener.triggered(events);
}
public void accept(Iterable<TriggerEvent> events) {
consumers.forEach(c -> c.accept(events));
}
}
}

View File

@ -8,11 +8,14 @@ package org.elasticsearch.xpack.watcher.trigger.manual;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.xpack.watcher.trigger.TriggerEngine;
import org.elasticsearch.xpack.watcher.trigger.TriggerEvent;
import org.elasticsearch.xpack.watcher.trigger.TriggerService;
import org.elasticsearch.xpack.watcher.watch.Watch;
import java.io.IOException;
import java.util.Collection;
import java.util.Map;
import java.util.function.Consumer;
import static org.elasticsearch.xpack.watcher.support.Exceptions.illegalArgument;
@ -30,7 +33,7 @@ public class ManualTriggerEngine implements TriggerEngine<ManualTrigger, ManualT
* from the given list of jobs
*/
@Override
public void start(Collection<Job> jobs) {
public void start(Collection<Watch> jobs) {
}
@Override
@ -38,11 +41,11 @@ public class ManualTriggerEngine implements TriggerEngine<ManualTrigger, ManualT
}
@Override
public void register(Listener listener) {
public void register(Consumer<Iterable<TriggerEvent>> consumer) {
}
@Override
public void add(Job job) {
public void add(Watch job) {
}
@Override

View File

@ -6,24 +6,30 @@
package org.elasticsearch.xpack.watcher.trigger.schedule;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.xpack.watcher.support.WatcherDateTimeUtils;
import org.elasticsearch.xpack.watcher.trigger.AbstractTriggerEngine;
import org.elasticsearch.xpack.watcher.trigger.TriggerEngine;
import org.elasticsearch.xpack.watcher.trigger.TriggerEvent;
import org.elasticsearch.xpack.watcher.trigger.TriggerService;
import org.joda.time.DateTime;
import java.io.IOException;
import java.time.Clock;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.function.Consumer;
import static org.elasticsearch.xpack.watcher.support.Exceptions.illegalArgument;
import static org.joda.time.DateTimeZone.UTC;
public abstract class ScheduleTriggerEngine extends AbstractTriggerEngine<ScheduleTrigger, ScheduleTriggerEvent> {
public abstract class ScheduleTriggerEngine extends AbstractComponent implements TriggerEngine<ScheduleTrigger, ScheduleTriggerEvent> {
public static final String TYPE = ScheduleTrigger.TYPE;
protected final List<Consumer<Iterable<TriggerEvent>>> consumers = new CopyOnWriteArrayList<>();
protected final ScheduleRegistry scheduleRegistry;
protected final Clock clock;
@ -38,6 +44,12 @@ public abstract class ScheduleTriggerEngine extends AbstractTriggerEngine<Schedu
return TYPE;
}
@Override
public void register(Consumer<Iterable<TriggerEvent>> consumer) {
consumers.add(consumer);
}
@Override
public ScheduleTriggerEvent simulateEvent(String jobId, @Nullable Map<String, Object> data, TriggerService service) {
DateTime now = new DateTime(clock.millis(), UTC);

View File

@ -1,77 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.watcher.trigger.schedule.engine;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.xpack.scheduler.SchedulerEngine;
import org.elasticsearch.xpack.watcher.trigger.schedule.ScheduleRegistry;
import org.elasticsearch.xpack.watcher.trigger.schedule.ScheduleTrigger;
import org.elasticsearch.xpack.watcher.trigger.schedule.ScheduleTriggerEngine;
import org.elasticsearch.xpack.watcher.trigger.schedule.ScheduleTriggerEvent;
import org.joda.time.DateTime;
import java.time.Clock;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import static org.joda.time.DateTimeZone.UTC;
public class SchedulerScheduleTriggerEngine extends ScheduleTriggerEngine {
private final SchedulerEngine schedulerEngine;
public SchedulerScheduleTriggerEngine(Settings settings, ScheduleRegistry scheduleRegistry, Clock clock) {
super(settings, scheduleRegistry, clock);
this.schedulerEngine = new SchedulerEngine(clock);
this.schedulerEngine.register(event ->
notifyListeners(event.getJobName(), event.getTriggeredTime(), event.getScheduledTime()));
}
@Override
public void start(Collection<Job> jobs) {
logger.debug("starting schedule engine...");
final List<SchedulerEngine.Job> schedulerJobs = new ArrayList<>();
jobs.stream()
.filter(job -> job.trigger() instanceof ScheduleTrigger)
.forEach(job -> {
ScheduleTrigger trigger = (ScheduleTrigger) job.trigger();
schedulerJobs.add(new SchedulerEngine.Job(job.id(), trigger.getSchedule()));
});
schedulerEngine.start(schedulerJobs);
logger.debug("schedule engine started at [{}]", new DateTime(clock.millis(), UTC));
}
@Override
public void stop() {
logger.debug("stopping schedule engine...");
schedulerEngine.stop();
logger.debug("schedule engine stopped");
}
@Override
public void add(Job job) {
assert job.trigger() instanceof ScheduleTrigger;
ScheduleTrigger trigger = (ScheduleTrigger) job.trigger();
schedulerEngine.add(new SchedulerEngine.Job(job.id(), trigger.getSchedule()));
}
@Override
public boolean remove(String jobId) {
return schedulerEngine.remove(jobId);
}
protected void notifyListeners(String name, long triggeredTime, long scheduledTime) {
logger.trace("triggered job [{}] at [{}] (scheduled time was [{}])", name, new DateTime(triggeredTime, UTC),
new DateTime(scheduledTime, UTC));
final ScheduleTriggerEvent event = new ScheduleTriggerEvent(name, new DateTime(triggeredTime, UTC),
new DateTime(scheduledTime, UTC));
for (Listener listener : listeners) {
listener.triggered(Collections.singletonList(event));
}
}
}

View File

@ -13,6 +13,7 @@ import org.elasticsearch.xpack.watcher.trigger.schedule.ScheduleRegistry;
import org.elasticsearch.xpack.watcher.trigger.schedule.ScheduleTrigger;
import org.elasticsearch.xpack.watcher.trigger.schedule.ScheduleTriggerEngine;
import org.elasticsearch.xpack.watcher.trigger.schedule.ScheduleTriggerEvent;
import org.elasticsearch.xpack.watcher.watch.Watch;
import org.joda.time.DateTime;
import java.time.Clock;
@ -38,10 +39,10 @@ public class TickerScheduleTriggerEngine extends ScheduleTriggerEngine {
}
@Override
public void start(Collection<Job> jobs) {
public void start(Collection<Watch> jobs) {
long starTime = clock.millis();
Map<String, ActiveSchedule> schedules = new ConcurrentHashMap<>();
for (Job job : jobs) {
for (Watch job : jobs) {
if (job.trigger() instanceof ScheduleTrigger) {
ScheduleTrigger trigger = (ScheduleTrigger) job.trigger();
schedules.put(job.id(), new ActiveSchedule(job.id(), trigger.getSchedule(), starTime));
@ -57,7 +58,7 @@ public class TickerScheduleTriggerEngine extends ScheduleTriggerEngine {
}
@Override
public void add(Job job) {
public void add(Watch job) {
assert job.trigger() instanceof ScheduleTrigger;
ScheduleTrigger trigger = (ScheduleTrigger) job.trigger();
schedules.put(job.id(), new ActiveSchedule(job.id(), trigger.getSchedule(), clock.millis()));
@ -90,9 +91,7 @@ public class TickerScheduleTriggerEngine extends ScheduleTriggerEngine {
}
protected void notifyListeners(List<TriggerEvent> events) {
for (Listener listener : listeners) {
listener.triggered(events);
}
consumers.forEach(consumer -> consumer.accept(events));
}
static class ActiveSchedule {

View File

@ -17,7 +17,6 @@ import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.xpack.common.secret.Secret;
@ -36,7 +35,6 @@ import org.elasticsearch.xpack.watcher.support.WatcherDateTimeUtils;
import org.elasticsearch.xpack.watcher.support.xcontent.WatcherXContentParser;
import org.elasticsearch.xpack.watcher.transform.ExecutableTransform;
import org.elasticsearch.xpack.watcher.trigger.Trigger;
import org.elasticsearch.xpack.watcher.trigger.TriggerEngine;
import org.elasticsearch.xpack.watcher.trigger.TriggerService;
import org.joda.time.DateTime;
@ -50,11 +48,10 @@ import java.util.regex.Pattern;
import static java.util.Collections.unmodifiableMap;
import static org.elasticsearch.common.unit.TimeValue.timeValueMillis;
import static org.elasticsearch.common.xcontent.XContentHelper.createParser;
import static org.elasticsearch.xpack.watcher.support.Exceptions.ioException;
import static org.joda.time.DateTimeZone.UTC;
public class Watch implements TriggerEngine.Job, ToXContentObject {
public class Watch implements ToXContentObject {
public static final String ALL_ACTIONS_ID = "_all";
public static final String INCLUDE_STATUS_KEY = "include_status";
@ -87,12 +84,10 @@ public class Watch implements TriggerEngine.Job, ToXContentObject {
this.status = status;
}
@Override
public String id() {
return id;
}
@Override
public Trigger trigger() {
return trigger;
}

View File

@ -121,20 +121,16 @@ public abstract class AbstractWatcherIntegrationTestCase extends ESIntegTestCase
private static Boolean securityEnabled;
private static String scheduleEngineName;
@Override
protected TestCluster buildTestCluster(Scope scope, long seed) throws IOException {
if (securityEnabled == null) {
securityEnabled = enableSecurity();
}
scheduleEngineName = randomFrom("ticker", "scheduler");
return super.buildTestCluster(scope, seed);
}
@Override
protected Settings nodeSettings(int nodeOrdinal) {
logger.info("using schedule engine [{}]", scheduleEngineName);
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal))
//TODO: for now lets isolate watcher tests from monitoring (randomize this later)
@ -144,7 +140,6 @@ public abstract class AbstractWatcherIntegrationTestCase extends ESIntegTestCase
.put("xpack.watcher.execution.scroll.size", randomIntBetween(1, 100))
.put("xpack.watcher.watch.scroll.size", randomIntBetween(1, 100))
.put(SecuritySettings.settings(securityEnabled))
.put("xpack.watcher.trigger.schedule.engine", scheduleEngineName)
.put("script.inline", "true")
// Disable native ML autodetect_process as the c++ controller won't be available
.put(MachineLearning.AUTODETECT_PROCESS.getKey(), false)
@ -257,7 +252,6 @@ public abstract class AbstractWatcherIntegrationTestCase extends ESIntegTestCase
@AfterClass
public static void _cleanupClass() {
securityEnabled = null;
scheduleEngineName = null;
}
@Override

View File

@ -10,15 +10,17 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.watcher.Watcher;
import org.elasticsearch.xpack.watcher.execution.ExecutionService;
import org.elasticsearch.xpack.watcher.execution.SyncTriggerListener;
import org.elasticsearch.xpack.watcher.execution.SyncTriggerEventConsumer;
import org.elasticsearch.xpack.watcher.execution.WatchExecutor;
import org.elasticsearch.xpack.watcher.trigger.ScheduleTriggerEngineMock;
import org.elasticsearch.xpack.watcher.trigger.TriggerEngine;
import org.elasticsearch.xpack.watcher.trigger.TriggerEvent;
import org.elasticsearch.xpack.watcher.trigger.schedule.ScheduleRegistry;
import java.time.Clock;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.function.Consumer;
import java.util.stream.Stream;
public class TimeWarpedWatcher extends Watcher {
@ -40,8 +42,8 @@ public class TimeWarpedWatcher extends Watcher {
}
@Override
protected TriggerEngine.Listener getTriggerEngineListener(ExecutionService executionService) {
return new SyncTriggerListener(settings, executionService);
protected Consumer<Iterable<TriggerEvent>> getTriggerEngineListener(ExecutionService executionService) {
return new SyncTriggerEventConsumer(settings, executionService);
}
public static class SameThreadExecutor implements WatchExecutor {

View File

@ -5,22 +5,24 @@
*/
package org.elasticsearch.xpack.watcher.test.bench;
import org.elasticsearch.common.Randomness;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.common.logging.ESLoggerFactory;
import org.elasticsearch.common.metrics.MeanMetric;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.xpack.watcher.trigger.TriggerEngine;
import org.elasticsearch.xpack.watcher.condition.AlwaysCondition;
import org.elasticsearch.xpack.watcher.input.none.ExecutableNoneInput;
import org.elasticsearch.xpack.watcher.trigger.TriggerEvent;
import org.elasticsearch.xpack.watcher.trigger.schedule.ScheduleRegistry;
import org.elasticsearch.xpack.watcher.trigger.schedule.ScheduleTrigger;
import org.elasticsearch.xpack.watcher.trigger.schedule.ScheduleTriggerEngine;
import org.elasticsearch.xpack.watcher.trigger.schedule.ScheduleTriggerEvent;
import org.elasticsearch.xpack.watcher.trigger.schedule.engine.BaseTriggerEngineTestCase;
import org.elasticsearch.xpack.watcher.trigger.schedule.engine.SchedulerScheduleTriggerEngine;
import org.elasticsearch.xpack.watcher.trigger.schedule.engine.TickerScheduleTriggerEngine;
import org.elasticsearch.xpack.watcher.watch.Watch;
import java.time.Clock;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.concurrent.atomic.AtomicBoolean;
@ -32,6 +34,8 @@ import static org.elasticsearch.xpack.watcher.trigger.schedule.Schedules.interva
@SuppressForbidden(reason = "benchmark")
public class ScheduleEngineTriggerBenchmark {
private static final Logger logger = ESLoggerFactory.getLogger(ScheduleEngineTriggerBenchmark.class);
public static void main(String[] args) throws Exception {
int numWatches = 1000;
int interval = 2;
@ -55,75 +59,52 @@ public class ScheduleEngineTriggerBenchmark {
Settings settings = Settings.builder()
.put("name", "test")
.build();
List<TriggerEngine.Job> jobs = new ArrayList<>(numWatches);
List<Watch> watches = new ArrayList<>(numWatches);
for (int i = 0; i < numWatches; i++) {
jobs.add(new BaseTriggerEngineTestCase.SimpleJob("job_" + i, interval(interval + "s")));
watches.add(new Watch("job_" + i, new ScheduleTrigger(interval(interval + "s")), new ExecutableNoneInput(logger),
AlwaysCondition.INSTANCE, null, null, Collections.emptyList(), null, null));
}
ScheduleRegistry scheduleRegistry = new ScheduleRegistry(emptySet());
List<String> impls = new ArrayList<>(Arrays.asList(new String[]{"schedule", "ticker"}));
Randomness.shuffle(impls);
List<Stats> results = new ArrayList<>();
for (String impl : impls) {
System.gc();
System.out.println("=====================================");
System.out.println("===> Testing [" + impl + "] scheduler");
System.out.println("=====================================");
final AtomicBoolean running = new AtomicBoolean(false);
final AtomicInteger total = new AtomicInteger();
final MeanMetric triggerMetric = new MeanMetric();
final MeanMetric tooEarlyMetric = new MeanMetric();
System.gc();
System.out.println("=====================================");
System.out.println("===> Testing scheduler");
System.out.println("=====================================");
final AtomicBoolean running = new AtomicBoolean(false);
final AtomicInteger total = new AtomicInteger();
final MeanMetric triggerMetric = new MeanMetric();
final MeanMetric tooEarlyMetric = new MeanMetric();
final ScheduleTriggerEngine scheduler;
switch (impl) {
case "schedule":
scheduler = new SchedulerScheduleTriggerEngine(Settings.EMPTY, scheduleRegistry, Clock.systemUTC()) {
@Override
protected void notifyListeners(String name, long triggeredTime, long scheduledTime) {
if (running.get()) {
measure(total, triggerMetric, tooEarlyMetric, triggeredTime, scheduledTime);
}
}
};
break;
case "ticker":
scheduler = new TickerScheduleTriggerEngine(settings, scheduleRegistry, Clock.systemUTC()) {
@Override
protected void notifyListeners(List<TriggerEvent> events) {
if (running.get()) {
for (TriggerEvent event : events) {
ScheduleTriggerEvent scheduleTriggerEvent = (ScheduleTriggerEvent) event;
measure(total, triggerMetric, tooEarlyMetric, event.triggeredTime().getMillis(),
scheduleTriggerEvent.scheduledTime().getMillis());
}
}
}
};
break;
default:
throw new IllegalArgumentException("impl [" + impl + "] doesn't exist");
final ScheduleTriggerEngine scheduler = new TickerScheduleTriggerEngine(settings, scheduleRegistry, Clock.systemUTC()) {
@Override
protected void notifyListeners(List<TriggerEvent> events) {
if (running.get()) {
for (TriggerEvent event : events) {
ScheduleTriggerEvent scheduleTriggerEvent = (ScheduleTriggerEvent) event;
measure(total, triggerMetric, tooEarlyMetric, event.triggeredTime().getMillis(),
scheduleTriggerEvent.scheduledTime().getMillis());
}
}
}
scheduler.start(jobs);
System.out.println("Added [" + numWatches + "] jobs");
running.set(true);
Thread.sleep(benchTime);
running.set(false);
scheduler.stop();
System.out.println("done, triggered [" + total.get() + "] times, delayed triggered [" + triggerMetric.count() +
"] times, avg [" + triggerMetric.mean() + "] ms");
results.add(new Stats(impl, total.get(), triggerMetric.count(), triggerMetric.mean(), tooEarlyMetric.count(),
tooEarlyMetric.mean()));
}
};
scheduler.start(watches);
System.out.println("Added [" + numWatches + "] jobs");
running.set(true);
Thread.sleep(benchTime);
running.set(false);
scheduler.stop();
System.out.println("done, triggered [" + total.get() + "] times, delayed triggered [" + triggerMetric.count() +
"] times, avg [" + triggerMetric.mean() + "] ms");
results.add(new Stats(total.get(), triggerMetric.count(), triggerMetric.mean(), tooEarlyMetric.count(), tooEarlyMetric.mean()));
System.out.println(" Name | # triggered | # delayed | avg delay | # too early triggered | avg too early delay");
System.out.println("--------------- | ----------- | --------- | --------- | --------------------- | ------------------ ");
for (Stats stats : results) {
System.out.printf(
Locale.ENGLISH,
"%15s | %11d | %9d | %9d | %21d | %18d\n",
stats.implementation, stats.numberOfTimesTriggered, stats.numberOfTimesDelayed, stats.avgDelayTime,
"%11d | %9d | %9d | %21d | %18d\n",
stats.numberOfTimesTriggered, stats.numberOfTimesDelayed, stats.avgDelayTime,
stats.numberOfEarlyTriggered, stats.avgEarlyDelayTime
);
}
@ -143,16 +124,14 @@ public class ScheduleEngineTriggerBenchmark {
static class Stats {
final String implementation;
final int numberOfTimesTriggered;
final long numberOfTimesDelayed;
final long avgDelayTime;
final long numberOfEarlyTriggered;
final long avgEarlyDelayTime;
Stats(String implementation, int numberOfTimesTriggered, long numberOfTimesDelayed, double avgDelayTime,
Stats(int numberOfTimesTriggered, long numberOfTimesDelayed, double avgDelayTime,
long numberOfEarlyTriggered, double avgEarlyDelayTime) {
this.implementation = implementation;
this.numberOfTimesTriggered = numberOfTimesTriggered;
this.numberOfTimesDelayed = numberOfTimesDelayed;
this.avgDelayTime = Math.round(avgDelayTime);

View File

@ -224,8 +224,6 @@ public class WatcherExecutorServiceBenchmark {
protected TriggerEngine getTriggerEngine(Clock clock, ScheduleRegistry scheduleRegistry) {
return new ScheduleTriggerEngineMock(settings, scheduleRegistry, clock);
}
}
}
}

View File

@ -15,12 +15,13 @@ import org.elasticsearch.xpack.watcher.trigger.schedule.ScheduleRegistry;
import org.elasticsearch.xpack.watcher.trigger.schedule.ScheduleTrigger;
import org.elasticsearch.xpack.watcher.trigger.schedule.ScheduleTriggerEngine;
import org.elasticsearch.xpack.watcher.trigger.schedule.ScheduleTriggerEvent;
import org.elasticsearch.xpack.watcher.watch.Watch;
import org.joda.time.DateTime;
import java.io.IOException;
import java.time.Clock;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@ -31,12 +32,11 @@ import java.util.concurrent.ConcurrentMap;
public class ScheduleTriggerEngineMock extends ScheduleTriggerEngine {
private final Logger logger;
private final ConcurrentMap<String, Job> jobs = new ConcurrentHashMap<>();
private final ConcurrentMap<String, Watch> watches = new ConcurrentHashMap<>();
public ScheduleTriggerEngineMock(Settings settings, ScheduleRegistry scheduleRegistry, Clock clock) {
super(settings, scheduleRegistry, clock);
this.logger = Loggers.getLogger(ScheduleTriggerEngineMock.class, settings);
}
@Override
@ -51,7 +51,7 @@ public class ScheduleTriggerEngineMock extends ScheduleTriggerEngine {
}
@Override
public void start(Collection<Job> jobs) {
public void start(Collection<Watch> jobs) {
}
@Override
@ -59,13 +59,13 @@ public class ScheduleTriggerEngineMock extends ScheduleTriggerEngine {
}
@Override
public void add(Job job) {
jobs.put(job.id(), job);
public void add(Watch watch) {
watches.put(watch.id(), watch);
}
@Override
public boolean remove(String jobId) {
return jobs.remove(jobId) != null;
return watches.remove(jobId) != null;
}
public void trigger(String jobName) {
@ -81,9 +81,7 @@ public class ScheduleTriggerEngineMock extends ScheduleTriggerEngine {
DateTime now = new DateTime(clock.millis());
logger.debug("firing [{}] at [{}]", jobName, now);
ScheduleTriggerEvent event = new ScheduleTriggerEvent(jobName, now, now);
for (Listener listener : listeners) {
listener.triggered(Arrays.asList(event));
}
consumers.forEach(consumer -> consumer.accept(Collections.singletonList(event)));
if (interval != null) {
if (clock instanceof ClockMock) {
((ClockMock) clock).fastForward(interval);

View File

@ -1,243 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.watcher.trigger.schedule.engine;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.support.clock.ClockMock;
import org.elasticsearch.xpack.watcher.trigger.Trigger;
import org.elasticsearch.xpack.watcher.trigger.TriggerEngine;
import org.elasticsearch.xpack.watcher.trigger.TriggerEvent;
import org.elasticsearch.xpack.watcher.trigger.schedule.Schedule;
import org.elasticsearch.xpack.watcher.trigger.schedule.ScheduleTrigger;
import org.elasticsearch.xpack.watcher.trigger.schedule.support.DayOfWeek;
import org.elasticsearch.xpack.watcher.trigger.schedule.support.WeekTimes;
import org.joda.time.DateTime;
import org.junit.After;
import org.junit.Before;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import static org.elasticsearch.xpack.watcher.trigger.schedule.Schedules.daily;
import static org.elasticsearch.xpack.watcher.trigger.schedule.Schedules.interval;
import static org.elasticsearch.xpack.watcher.trigger.schedule.Schedules.weekly;
import static org.hamcrest.Matchers.is;
import static org.joda.time.DateTimeZone.UTC;
public abstract class BaseTriggerEngineTestCase extends ESTestCase {
private TriggerEngine engine;
protected ClockMock clock = ClockMock.frozen();
@Before
public void init() throws Exception {
engine = createEngine();
}
protected abstract TriggerEngine createEngine();
/**
* Dependending on the trigger engine used, we may need to advance the clock, because the implementation might use the clock
* in order to check for new jobs being executed
*/
protected abstract void advanceClockIfNeeded(DateTime newCurrentDateTime);
@After
public void cleanup() throws Exception {
engine.stop();
}
public void testStart() throws Exception {
int count = randomIntBetween(2, 5);
final CountDownLatch firstLatch = new CountDownLatch(count);
final CountDownLatch secondLatch = new CountDownLatch(count);
List<TriggerEngine.Job> jobs = new ArrayList<>();
for (int i = 0; i < count; i++) {
jobs.add(new SimpleJob(String.valueOf(i), interval("1s")));
}
final BitSet bits = new BitSet(count);
engine.register(events -> {
for (TriggerEvent event : events) {
int index = Integer.parseInt(event.jobName());
if (!bits.get(index)) {
logger.info("job [{}] first fire", index);
bits.set(index);
firstLatch.countDown();
} else {
logger.info("job [{}] second fire", index);
secondLatch.countDown();
}
}
});
engine.start(jobs);
advanceClockIfNeeded(new DateTime(clock.millis(), UTC).plusMillis(1100));
if (!firstLatch.await(3 * count, TimeUnit.SECONDS)) {
fail("waiting too long for all watches to be triggered");
}
advanceClockIfNeeded(new DateTime(clock.millis(), UTC).plusMillis(1100));
if (!secondLatch.await(3 * count, TimeUnit.SECONDS)) {
fail("waiting too long for all watches to be triggered");
}
engine.stop();
assertThat(bits.cardinality(), is(count));
}
public void testAddHourly() throws Exception {
final String name = "job_name";
final CountDownLatch latch = new CountDownLatch(1);
engine.start(Collections.emptySet());
engine.register(events -> {
for (TriggerEvent event : events) {
assertThat(event.jobName(), is(name));
logger.info("triggered job on [{}]", clock);
}
latch.countDown();
});
int randomMinute = randomIntBetween(0, 59);
DateTime testNowTime = new DateTime(clock.millis(), UTC).withMinuteOfHour(randomMinute).withSecondOfMinute(59);
DateTime scheduledTime = testNowTime.plusSeconds(2);
logger.info("Setting current time to [{}], job execution time [{}]", testNowTime, scheduledTime);
clock.setTime(testNowTime);
engine.add(new SimpleJob(name, daily().at(scheduledTime.getHourOfDay(), scheduledTime.getMinuteOfHour()).build()));
advanceClockIfNeeded(scheduledTime);
if (!latch.await(5, TimeUnit.SECONDS)) {
fail("waiting too long for all watches to be triggered");
}
}
public void testAddDaily() throws Exception {
final String name = "job_name";
final CountDownLatch latch = new CountDownLatch(1);
engine.start(Collections.emptySet());
engine.register(events -> {
for (TriggerEvent event : events) {
assertThat(event.jobName(), is(name));
logger.info("triggered job on [{}]", new DateTime(clock.millis(), UTC));
latch.countDown();
}
});
int randomHour = randomIntBetween(0, 23);
int randomMinute = randomIntBetween(0, 59);
DateTime testNowTime = new DateTime(clock.millis(), UTC).withHourOfDay(randomHour)
.withMinuteOfHour(randomMinute).withSecondOfMinute(59);
DateTime scheduledTime = testNowTime.plusSeconds(2);
logger.info("Setting current time to [{}], job execution time [{}]", testNowTime, scheduledTime);
clock.setTime(testNowTime);
engine.add(new SimpleJob(name, daily().at(scheduledTime.getHourOfDay(), scheduledTime.getMinuteOfHour()).build()));
advanceClockIfNeeded(scheduledTime);
if (!latch.await(5, TimeUnit.SECONDS)) {
fail("waiting too long for all watches to be triggered");
}
}
public void testAddWeekly() throws Exception {
final String name = "job_name";
final CountDownLatch latch = new CountDownLatch(1);
engine.start(Collections.emptySet());
engine.register(events -> {
for (TriggerEvent event : events) {
assertThat(event.jobName(), is(name));
logger.info("triggered job");
}
latch.countDown();
});
int randomHour = randomIntBetween(0, 23);
int randomMinute = randomIntBetween(0, 59);
int randomDay = randomIntBetween(1, 7);
DateTime testNowTime = new DateTime(clock.millis(), UTC).withDayOfWeek(randomDay).withHourOfDay(randomHour)
.withMinuteOfHour(randomMinute).withSecondOfMinute(59);
DateTime scheduledTime = testNowTime.plusSeconds(2);
logger.info("Setting current time to [{}], job execution time [{}]", testNowTime, scheduledTime);
clock.setTime(testNowTime);
// fun part here (aka WTF): DayOfWeek with Joda is MON-SUN, starting at 1
// DayOfWeek with Watcher is SUN-SAT, starting at 1
int watcherDay = (scheduledTime.getDayOfWeek() % 7) + 1;
engine.add(new SimpleJob(name, weekly().time(WeekTimes.builder()
.on(DayOfWeek.resolve(watcherDay))
.at(scheduledTime.getHourOfDay(), scheduledTime.getMinuteOfHour()).build()).build()));
advanceClockIfNeeded(scheduledTime);
if (!latch.await(5, TimeUnit.SECONDS)) {
fail("waiting too long for all watches to be triggered");
}
}
public void testAddSameJobSeveralTimesAndExecutedOnce() throws InterruptedException {
engine.start(Collections.emptySet());
final CountDownLatch firstLatch = new CountDownLatch(1);
final CountDownLatch secondLatch = new CountDownLatch(1);
AtomicInteger counter = new AtomicInteger(0);
engine.register(events -> {
events.forEach(event -> {
if (counter.getAndIncrement() == 0) {
firstLatch.countDown();
} else {
secondLatch.countDown();
}
});
});
int times = scaledRandomIntBetween(3, 30);
for (int i = 0; i < times; i++) {
engine.add(new SimpleJob("_id", interval("1s")));
}
advanceClockIfNeeded(new DateTime(clock.millis(), UTC).plusMillis(1100));
if (!firstLatch.await(3, TimeUnit.SECONDS)) {
fail("waiting too long for all watches to be triggered");
}
advanceClockIfNeeded(new DateTime(clock.millis(), UTC).plusMillis(1100));
if (!secondLatch.await(3, TimeUnit.SECONDS)) {
fail("waiting too long for all watches to be triggered");
}
// ensure job was only called twice independent from its name
assertThat(counter.get(), is(2));
}
public static class SimpleJob implements TriggerEngine.Job {
private final String name;
private final ScheduleTrigger trigger;
public SimpleJob(String name, Schedule schedule) {
this.name = name;
this.trigger = new ScheduleTrigger(schedule);
}
@Override
public String id() {
return name;
}
@Override
public Trigger trigger() {
return trigger;
}
}
}

View File

@ -1,26 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.watcher.trigger.schedule.engine;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.xpack.watcher.trigger.TriggerEngine;
import org.elasticsearch.xpack.watcher.trigger.schedule.ScheduleRegistry;
import org.joda.time.DateTime;
import static org.mockito.Mockito.mock;
public class SchedulerScheduleEngineTests extends BaseTriggerEngineTestCase {
protected TriggerEngine createEngine() {
return new SchedulerScheduleTriggerEngine(Settings.EMPTY, mock(ScheduleRegistry.class), clock);
}
// the scheduler configures hard when to schedule, and does not check the clock
@Override
protected void advanceClockIfNeeded(DateTime newCurrentDateTime) {
}
}

View File

@ -5,22 +5,254 @@
*/
package org.elasticsearch.xpack.watcher.trigger.schedule.engine;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.support.clock.ClockMock;
import org.elasticsearch.xpack.watcher.condition.AlwaysCondition;
import org.elasticsearch.xpack.watcher.input.none.ExecutableNoneInput;
import org.elasticsearch.xpack.watcher.trigger.TriggerEngine;
import org.elasticsearch.xpack.watcher.trigger.TriggerEvent;
import org.elasticsearch.xpack.watcher.trigger.schedule.Schedule;
import org.elasticsearch.xpack.watcher.trigger.schedule.ScheduleRegistry;
import org.elasticsearch.xpack.watcher.trigger.schedule.ScheduleTrigger;
import org.elasticsearch.xpack.watcher.trigger.schedule.support.DayOfWeek;
import org.elasticsearch.xpack.watcher.trigger.schedule.support.WeekTimes;
import org.elasticsearch.xpack.watcher.watch.Watch;
import org.joda.time.DateTime;
import org.junit.After;
import org.junit.Before;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import static org.elasticsearch.xpack.watcher.trigger.schedule.Schedules.daily;
import static org.elasticsearch.xpack.watcher.trigger.schedule.Schedules.interval;
import static org.elasticsearch.xpack.watcher.trigger.schedule.Schedules.weekly;
import static org.hamcrest.Matchers.is;
import static org.joda.time.DateTimeZone.UTC;
import static org.mockito.Mockito.mock;
public class TickerScheduleEngineTests extends BaseTriggerEngineTestCase {
public class TickerScheduleEngineTests extends ESTestCase {
@Override
protected TriggerEngine createEngine() {
return new TickerScheduleTriggerEngine(Settings.EMPTY, mock(ScheduleRegistry.class), clock);
private TriggerEngine engine;
protected ClockMock clock = ClockMock.frozen();
@Before
public void init() throws Exception {
engine = createEngine();
}
@Override
protected void advanceClockIfNeeded(DateTime newCurrentDateTime) {
private TriggerEngine createEngine() {
return new TickerScheduleTriggerEngine(Settings.EMPTY,
mock(ScheduleRegistry.class), clock);
}
private void advanceClockIfNeeded(DateTime newCurrentDateTime) {
clock.setTime(newCurrentDateTime);
}
@After
public void cleanup() throws Exception {
engine.stop();
}
public void testStart() throws Exception {
int count = randomIntBetween(2, 5);
final CountDownLatch firstLatch = new CountDownLatch(count);
final CountDownLatch secondLatch = new CountDownLatch(count);
List<Watch> watches = new ArrayList<>();
for (int i = 0; i < count; i++) {
watches.add(createWatch(String.valueOf(i), interval("1s")));
}
final BitSet bits = new BitSet(count);
engine.register(new Consumer<Iterable<TriggerEvent>>() {
@Override
public void accept(Iterable<TriggerEvent> events) {
for (TriggerEvent event : events) {
int index = Integer.parseInt(event.jobName());
if (!bits.get(index)) {
logger.info("job [{}] first fire", index);
bits.set(index);
firstLatch.countDown();
} else {
logger.info("job [{}] second fire", index);
secondLatch.countDown();
}
}
}
});
engine.start(watches);
advanceClockIfNeeded(new DateTime(clock.millis(), UTC).plusMillis(1100));
if (!firstLatch.await(3 * count, TimeUnit.SECONDS)) {
fail("waiting too long for all watches to be triggered");
}
advanceClockIfNeeded(new DateTime(clock.millis(), UTC).plusMillis(1100));
if (!secondLatch.await(3 * count, TimeUnit.SECONDS)) {
fail("waiting too long for all watches to be triggered");
}
engine.stop();
assertThat(bits.cardinality(), is(count));
}
public void testAddHourly() throws Exception {
final String name = "job_name";
final CountDownLatch latch = new CountDownLatch(1);
engine.start(Collections.emptySet());
engine.register(new Consumer<Iterable<TriggerEvent>>() {
@Override
public void accept(Iterable<TriggerEvent> events) {
for (TriggerEvent event : events) {
assertThat(event.jobName(), is(name));
logger.info("triggered job on [{}]", clock);
}
latch.countDown();
}
});
int randomMinute = randomIntBetween(0, 59);
DateTime testNowTime = new DateTime(clock.millis(), UTC).withMinuteOfHour(randomMinute)
.withSecondOfMinute(59);
DateTime scheduledTime = testNowTime.plusSeconds(2);
logger.info("Setting current time to [{}], job execution time [{}]", testNowTime,
scheduledTime);
clock.setTime(testNowTime);
engine.add(createWatch(name, daily().at(scheduledTime.getHourOfDay(),
scheduledTime.getMinuteOfHour()).build()));
advanceClockIfNeeded(scheduledTime);
if (!latch.await(5, TimeUnit.SECONDS)) {
fail("waiting too long for all watches to be triggered");
}
}
public void testAddDaily() throws Exception {
final String name = "job_name";
final CountDownLatch latch = new CountDownLatch(1);
engine.start(Collections.emptySet());
engine.register(new Consumer<Iterable<TriggerEvent>>() {
@Override
public void accept(Iterable<TriggerEvent> events) {
for (TriggerEvent event : events) {
assertThat(event.jobName(), is(name));
logger.info("triggered job on [{}]", new DateTime(clock.millis(), UTC));
latch.countDown();
}
}
});
int randomHour = randomIntBetween(0, 23);
int randomMinute = randomIntBetween(0, 59);
DateTime testNowTime = new DateTime(clock.millis(), UTC).withHourOfDay(randomHour)
.withMinuteOfHour(randomMinute).withSecondOfMinute(59);
DateTime scheduledTime = testNowTime.plusSeconds(2);
logger.info("Setting current time to [{}], job execution time [{}]", testNowTime,
scheduledTime);
clock.setTime(testNowTime);
engine.add(createWatch(name, daily().at(scheduledTime.getHourOfDay(),
scheduledTime.getMinuteOfHour()).build()));
advanceClockIfNeeded(scheduledTime);
if (!latch.await(5, TimeUnit.SECONDS)) {
fail("waiting too long for all watches to be triggered");
}
}
public void testAddWeekly() throws Exception {
final String name = "job_name";
final CountDownLatch latch = new CountDownLatch(1);
engine.start(Collections.emptySet());
engine.register(new Consumer<Iterable<TriggerEvent>>() {
@Override
public void accept(Iterable<TriggerEvent> events) {
for (TriggerEvent event : events) {
assertThat(event.jobName(), is(name));
logger.info("triggered job");
}
latch.countDown();
}
});
int randomHour = randomIntBetween(0, 23);
int randomMinute = randomIntBetween(0, 59);
int randomDay = randomIntBetween(1, 7);
DateTime testNowTime = new DateTime(clock.millis(), UTC).withDayOfWeek(randomDay)
.withHourOfDay(randomHour).withMinuteOfHour(randomMinute).withSecondOfMinute(59);
DateTime scheduledTime = testNowTime.plusSeconds(2);
logger.info("Setting current time to [{}], job execution time [{}]", testNowTime,
scheduledTime);
clock.setTime(testNowTime);
// fun part here (aka WTF): DayOfWeek with Joda is MON-SUN, starting at 1
// DayOfWeek with Watcher is SUN-SAT, starting at 1
int watcherDay = (scheduledTime.getDayOfWeek() % 7) + 1;
engine.add(createWatch(name, weekly().time(WeekTimes.builder()
.on(DayOfWeek.resolve(watcherDay))
.at(scheduledTime.getHourOfDay(), scheduledTime.getMinuteOfHour()).build())
.build()));
advanceClockIfNeeded(scheduledTime);
if (!latch.await(5, TimeUnit.SECONDS)) {
fail("waiting too long for all watches to be triggered");
}
}
public void testAddSameJobSeveralTimesAndExecutedOnce() throws InterruptedException {
engine.start(Collections.emptySet());
final CountDownLatch firstLatch = new CountDownLatch(1);
final CountDownLatch secondLatch = new CountDownLatch(1);
AtomicInteger counter = new AtomicInteger(0);
engine.register(new Consumer<Iterable<TriggerEvent>>() {
@Override
public void accept(Iterable<TriggerEvent> events) {
events.forEach(event -> {
if (counter.getAndIncrement() == 0) {
firstLatch.countDown();
} else {
secondLatch.countDown();
}
});
}
});
int times = scaledRandomIntBetween(3, 30);
for (int i = 0; i < times; i++) {
engine.add(createWatch("_id", interval("1s")));
}
advanceClockIfNeeded(new DateTime(clock.millis(), UTC).plusMillis(1100));
if (!firstLatch.await(3, TimeUnit.SECONDS)) {
fail("waiting too long for all watches to be triggered");
}
advanceClockIfNeeded(new DateTime(clock.millis(), UTC).plusMillis(1100));
if (!secondLatch.await(3, TimeUnit.SECONDS)) {
fail("waiting too long for all watches to be triggered");
}
// ensure job was only called twice independent from its name
assertThat(counter.get(), is(2));
}
private Watch createWatch(String name, Schedule schedule) {
return new Watch(name, new ScheduleTrigger(schedule), new ExecutableNoneInput(logger),
AlwaysCondition.INSTANCE, null, null,
Collections.emptyList(), null, null);
}
}

View File

@ -532,7 +532,7 @@ public class WatchTests extends ESTestCase {
}
@Override
public void start(Collection<Job> jobs) {
public void start(Collection<Watch> jobs) {
}
@Override
@ -540,11 +540,7 @@ public class WatchTests extends ESTestCase {
}
@Override
public void register(Listener listener) {
}
@Override
public void add(Job job) {
public void add(Watch watch) {
}
@Override