diff --git a/src/main/java/org/elasticsearch/watcher/execution/AsyncTriggerListener.java b/src/main/java/org/elasticsearch/watcher/execution/AsyncTriggerListener.java new file mode 100644 index 00000000000..bf2d7aa2e52 --- /dev/null +++ b/src/main/java/org/elasticsearch/watcher/execution/AsyncTriggerListener.java @@ -0,0 +1,30 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.watcher.execution; + +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.watcher.trigger.TriggerEngine; +import org.elasticsearch.watcher.trigger.TriggerEvent; +import org.elasticsearch.watcher.trigger.TriggerService; + +/** + */ +public class AsyncTriggerListener implements TriggerEngine.Listener { + + private final ExecutionService executionService; + + @Inject + public AsyncTriggerListener(ExecutionService executionService, TriggerService triggerService) { + this.executionService = executionService; + triggerService.register(this); + } + + @Override + public void triggered(Iterable events) { + executionService.processEventsAsync(events); + } + +} diff --git a/src/main/java/org/elasticsearch/watcher/execution/ExecutionModule.java b/src/main/java/org/elasticsearch/watcher/execution/ExecutionModule.java index ad5d3b1a4bc..9372e387264 100644 --- a/src/main/java/org/elasticsearch/watcher/execution/ExecutionModule.java +++ b/src/main/java/org/elasticsearch/watcher/execution/ExecutionModule.java @@ -6,25 +6,29 @@ package org.elasticsearch.watcher.execution; import org.elasticsearch.common.inject.AbstractModule; +import org.elasticsearch.watcher.trigger.TriggerEngine; /** */ public class ExecutionModule extends AbstractModule { private final Class executorClass; + private final Class triggerEngineListenerClass; public ExecutionModule() { - this(InternalWatchExecutor.class); + this(InternalWatchExecutor.class, AsyncTriggerListener.class); } - protected ExecutionModule(Class executorClass) { + protected ExecutionModule(Class executorClass, Class triggerEngineListenerClass) { this.executorClass = executorClass; + this.triggerEngineListenerClass = triggerEngineListenerClass; } @Override protected void configure() { bind(ExecutionService.class).asEagerSingleton(); bind(executorClass).asEagerSingleton(); + bind(triggerEngineListenerClass).asEagerSingleton(); bind(WatchExecutor.class).to(executorClass); } } diff --git a/src/main/java/org/elasticsearch/watcher/execution/ExecutionService.java b/src/main/java/org/elasticsearch/watcher/execution/ExecutionService.java index 6095b305278..0c325f2eeb9 100644 --- a/src/main/java/org/elasticsearch/watcher/execution/ExecutionService.java +++ b/src/main/java/org/elasticsearch/watcher/execution/ExecutionService.java @@ -6,12 +6,15 @@ package org.elasticsearch.watcher.execution; import org.elasticsearch.ElasticsearchIllegalStateException; +import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateListener; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.joda.time.DateTime; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.watcher.WatcherException; @@ -24,9 +27,7 @@ import org.elasticsearch.watcher.support.Callback; import org.elasticsearch.watcher.support.clock.Clock; import org.elasticsearch.watcher.throttle.Throttler; import org.elasticsearch.watcher.transform.Transform; -import org.elasticsearch.watcher.trigger.TriggerEngine; import org.elasticsearch.watcher.trigger.TriggerEvent; -import org.elasticsearch.watcher.trigger.TriggerService; import org.elasticsearch.watcher.watch.Watch; import org.elasticsearch.watcher.watch.WatchExecution; import org.elasticsearch.watcher.watch.WatchLockService; @@ -35,6 +36,7 @@ import org.elasticsearch.watcher.watch.WatchStore; import java.io.IOException; import java.util.ArrayList; import java.util.Collection; +import java.util.LinkedList; import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -54,16 +56,14 @@ public class ExecutionService extends AbstractComponent { private final AtomicInteger initializationRetries = new AtomicInteger(); @Inject - public ExecutionService(Settings settings, HistoryStore historyStore, WatchExecutor executor, - WatchStore watchStore, WatchLockService watchLockService, TriggerService triggerService, - ClusterService clusterService, Clock clock) { + public ExecutionService(Settings settings, HistoryStore historyStore, WatchExecutor executor, WatchStore watchStore, + WatchLockService watchLockService, ClusterService clusterService, Clock clock) { super(settings); this.historyStore = historyStore; this.executor = executor; this.watchStore = watchStore; this.watchLockService = watchLockService; this.clusterService = clusterService; - triggerService.register(new SchedulerListener()); this.clock = clock; } @@ -113,6 +113,100 @@ public class ExecutionService extends AbstractComponent { return executor.largestPoolSize(); } + public void processEventsAsync(Iterable events) throws WatcherException { + if (!started.get()) { + throw new ElasticsearchIllegalStateException("not started"); + } + final LinkedList records = new LinkedList<>(); + final LinkedList contexts = new LinkedList<>(); + + DateTime now = clock.now(); + for (TriggerEvent event : events) { + Watch watch = watchStore.get(event.jobName()); + if (watch == null) { + logger.warn("unable to find watch [{}] in the watch store, perhaps it has been deleted", event.jobName()); + continue; + } + TriggeredExecutionContext ctx = new TriggeredExecutionContext(watch, now, event); + contexts.add(ctx); + records.add(new WatchRecord(ctx.id(), watch, event)); + } + + logger.debug("saving watch records [{}]", records.size()); + if (records.size() == 1) { + final WatchRecord watchRecord = records.getFirst(); + final TriggeredExecutionContext ctx = contexts.getFirst(); + historyStore.putAsync(watchRecord, new ActionListener() { + @Override + public void onResponse(Boolean aBoolean) { + executeAsync(ctx, watchRecord); + } + + @Override + public void onFailure(Throwable e) { + Throwable cause = ExceptionsHelper.unwrapCause(e); + if (cause instanceof EsRejectedExecutionException) { + logger.debug("Failed to store watch record {} due to overloaded threadpool: {}", watchRecord, ExceptionsHelper.detailedMessage(e)); + } else { + logger.warn("Failed to store watch record: {}", e, watchRecord); + } + } + }); + } else { + historyStore.bulkAsync(records, new ActionListener>() { + @Override + public void onResponse(List successFullSlots) { + for (Integer slot : successFullSlots) { + executeAsync(contexts.get(slot), records.get(slot)); + } + } + + @Override + public void onFailure(Throwable e) { + Throwable cause = ExceptionsHelper.unwrapCause(e); + if (cause instanceof EsRejectedExecutionException) { + logger.debug("Failed to store watch records due to overloaded threadpool: {}", ExceptionsHelper.detailedMessage(e)); + } else { + logger.warn("Failed to store watch records", e); + } + } + }); + } + } + + public void processEventsSync(Iterable events) throws WatcherException { + if (!started.get()) { + throw new ElasticsearchIllegalStateException("not started"); + } + final LinkedList records = new LinkedList<>(); + final LinkedList contexts = new LinkedList<>(); + + DateTime now = clock.now(); + for (TriggerEvent event : events) { + Watch watch = watchStore.get(event.jobName()); + if (watch == null) { + logger.warn("unable to find watch [{}] in the watch store, perhaps it has been deleted", event.jobName()); + continue; + } + TriggeredExecutionContext ctx = new TriggeredExecutionContext(watch, now, event); + contexts.add(ctx); + records.add(new WatchRecord(ctx.id(), watch, event)); + } + + logger.debug("saving watch records [{}]", records.size()); + if (records.size() == 1) { + final WatchRecord watchRecord = records.getFirst(); + final TriggeredExecutionContext ctx = contexts.getFirst(); + historyStore.put(watchRecord); + executeAsync(ctx, watchRecord); + } else { + List slots = historyStore.bulk(records); + for (Integer slot : slots) { + executeAsync(contexts.get(slot), records.get(slot)); + } + } + } + public WatchRecord execute(WatchExecutionContext ctx) throws IOException { WatchRecord watchRecord = new WatchRecord(ctx.id(), ctx.watch(), ctx.triggerEvent()); @@ -150,21 +244,6 @@ public class ExecutionService extends AbstractComponent { } } - - private void executeWatch(Watch watch, TriggerEvent event) throws WatcherException { - if (!started.get()) { - throw new ElasticsearchIllegalStateException("not started"); - } - TriggeredExecutionContext ctx = new TriggeredExecutionContext(watch, clock.now(), event); - WatchRecord watchRecord = new WatchRecord(ctx.id(), watch, event); - if (ctx.recordExecution()) { - logger.debug("saving watch record [{}] for watch [{}]", watchRecord.id(), watch.name()); - historyStore.put(watchRecord); - } - executeAsync(ctx, watchRecord); - } - - WatchExecution executeInner(WatchExecutionContext ctx) throws IOException { Watch watch = ctx.watch(); @@ -274,14 +353,15 @@ public class ExecutionService extends AbstractComponent { watchStore.updateStatus(ctx.watch()); } catch (Exception e) { if (started()) { - logger.warn("failed to execute watch [{}] [{}]", e, watchRecord.name(), ctx.id()); + String detailedMessage = ExceptionsHelper.detailedMessage(e); + logger.warn("failed to execute watch [{}] [{}], failure [{}]", watchRecord.name(), ctx.id(), detailedMessage); try { - watchRecord.update(WatchRecord.State.FAILED, e.getMessage()); + watchRecord.update(WatchRecord.State.FAILED, detailedMessage); if (ctx.recordExecution()) { historyStore.update(watchRecord); } } catch (Exception e2) { - logger.error("failed to update watch record [{}] failure [{}] for [{}] [{}]", e2, watchRecord, ctx.watch().name(), ctx.id(), e.getMessage()); + logger.error("failed to update watch record [{}], failure [{}], original failure [{}]", watchRecord, ExceptionsHelper.detailedMessage(e2), detailedMessage); } } else { logger.debug("failed to execute watch [{}] after shutdown", e, watchRecord); @@ -293,28 +373,4 @@ public class ExecutionService extends AbstractComponent { } } - - private class SchedulerListener implements TriggerEngine.Listener { - - @Override - public void triggered(String name, TriggerEvent event) { - if (!started.get()) { - throw new ElasticsearchIllegalStateException("not started"); - } - Watch watch = watchStore.get(name); - if (watch == null) { - logger.warn("unable to find watch [{}] in the watch store, perhaps it has been deleted", name); - return; - } - try { - ExecutionService.this.executeWatch(watch, event); - } catch (Exception e) { - if (started()) { - logger.error("failed to execute watch from SchedulerListener [{}]", e, name); - } else { - logger.debug("failed to execute watch from SchedulerListener [{}] after shutdown", e, name); - } - } - } - } } diff --git a/src/main/java/org/elasticsearch/watcher/execution/ManualExecutionContext.java b/src/main/java/org/elasticsearch/watcher/execution/ManualExecutionContext.java index 5e710637648..8df2e3fad04 100644 --- a/src/main/java/org/elasticsearch/watcher/execution/ManualExecutionContext.java +++ b/src/main/java/org/elasticsearch/watcher/execution/ManualExecutionContext.java @@ -121,7 +121,7 @@ public class ManualExecutionContext extends WatchExecutionContext { executionTime = DateTime.now(DateTimeZone.UTC); } if (triggerEvent == null) { - triggerEvent = new ManualTriggerEvent(executionTime, new HashMap()); + triggerEvent = new ManualTriggerEvent(watch.name(), executionTime, new HashMap()); } return new ManualExecutionContext(watch, executionTime, triggerEvent, inputResult, conditionResult, throttlerResult, simulateActionPredicate, recordExecution); } diff --git a/src/main/java/org/elasticsearch/watcher/execution/SyncTriggerListener.java b/src/main/java/org/elasticsearch/watcher/execution/SyncTriggerListener.java new file mode 100644 index 00000000000..4cd3618b488 --- /dev/null +++ b/src/main/java/org/elasticsearch/watcher/execution/SyncTriggerListener.java @@ -0,0 +1,30 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.watcher.execution; + +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.watcher.trigger.TriggerEngine; +import org.elasticsearch.watcher.trigger.TriggerEvent; +import org.elasticsearch.watcher.trigger.TriggerService; + +/** + */ +public class SyncTriggerListener implements TriggerEngine.Listener { + + private final ExecutionService executionService; + + @Inject + public SyncTriggerListener(ExecutionService executionService, TriggerService triggerService) { + this.executionService = executionService; + triggerService.register(this); + } + + @Override + public void triggered(Iterable events) { + executionService.processEventsSync(events); + } + +} diff --git a/src/main/java/org/elasticsearch/watcher/history/HistoryStore.java b/src/main/java/org/elasticsearch/watcher/history/HistoryStore.java index 6954f9c122d..bbfc6424d0b 100644 --- a/src/main/java/org/elasticsearch/watcher/history/HistoryStore.java +++ b/src/main/java/org/elasticsearch/watcher/history/HistoryStore.java @@ -5,8 +5,12 @@ */ package org.elasticsearch.watcher.history; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; import org.elasticsearch.action.admin.indices.refresh.RefreshResponse; +import org.elasticsearch.action.bulk.BulkItemResponse; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.search.SearchRequest; @@ -104,6 +108,94 @@ public class HistoryStore extends AbstractComponent { } } + public void putAsync(final WatchRecord watchRecord, final ActionListener listener) throws HistoryException { + String index = getHistoryIndexNameForTime(watchRecord.triggerEvent().triggeredTime()); + try { + IndexRequest request = new IndexRequest(index, DOC_TYPE, watchRecord.id().value()) + .source(XContentFactory.jsonBuilder().value(watchRecord)) + .opType(IndexRequest.OpType.CREATE); + client.indexAsync(request, new ActionListener() { + @Override + public void onResponse(IndexResponse response) { + watchRecord.version(response.getVersion()); + listener.onResponse(true); + } + + @Override + public void onFailure(Throwable e) { + listener.onFailure(e); + } + }); + } catch (IOException e) { + throw new HistoryException("failed to persist watch record [" + watchRecord + "]", e); + } + } + + public void bulkAsync(final List records, final ActionListener> listener) throws HistoryException { + try { + BulkRequest request = new BulkRequest(); + for (WatchRecord record : records) { + String index = getHistoryIndexNameForTime(record.triggerEvent().triggeredTime()); + IndexRequest indexRequest = new IndexRequest(index, DOC_TYPE, record.id().value()); + indexRequest.source(XContentFactory.jsonBuilder().value(record)); + indexRequest.opType(IndexRequest.OpType.CREATE); + request.add(indexRequest); + } + client.bulkAsync(request, new ActionListener() { + @Override + public void onResponse(BulkResponse response) { + List successFullSlots = new ArrayList(); + for (int i = 0; i < response.getItems().length; i++) { + BulkItemResponse itemResponse = response.getItems()[i]; + if (itemResponse.isFailed()) { + logger.error("could store watch record with id [" + itemResponse.getId() + "], because failed [" + itemResponse.getFailureMessage() + "]"); + } else { + IndexResponse indexResponse = itemResponse.getResponse(); + records.get(i).version(indexResponse.getVersion()); + successFullSlots.add(i); + } + } + listener.onResponse(successFullSlots); + } + + @Override + public void onFailure(Throwable e) { + listener.onFailure(e); + } + }); + } catch (IOException e) { + throw new HistoryException("failed to persist watch records", e); + } + } + + public List bulk(final List records) throws HistoryException { + try { + BulkRequest request = new BulkRequest(); + for (WatchRecord record : records) { + String index = getHistoryIndexNameForTime(record.triggerEvent().triggeredTime()); + IndexRequest indexRequest = new IndexRequest(index, DOC_TYPE, record.id().value()); + indexRequest.source(XContentFactory.jsonBuilder().value(record)); + indexRequest.opType(IndexRequest.OpType.CREATE); + request.add(indexRequest); + } + BulkResponse response = client.bulk(request); + List successFullSlots = new ArrayList<>(); + for (int i = 0; i < response.getItems().length; i++) { + BulkItemResponse itemResponse = response.getItems()[i]; + if (itemResponse.isFailed()) { + logger.error("could store watch record with id [" + itemResponse.getId() + "], because failed [" + itemResponse.getFailureMessage() + "]"); + } else { + IndexResponse indexResponse = itemResponse.getResponse(); + records.get(i).version(indexResponse.getVersion()); + successFullSlots.add(i); + } + } + return successFullSlots; + } catch (IOException e) { + throw new HistoryException("failed to persist watch records", e); + } + } + public void update(WatchRecord watchRecord) throws HistoryException { if (!started.get()) { throw new HistoryException("unable to persist watch record history store is not ready"); diff --git a/src/main/java/org/elasticsearch/watcher/support/init/proxy/ClientProxy.java b/src/main/java/org/elasticsearch/watcher/support/init/proxy/ClientProxy.java index c0583364a4f..6ef7d09eb94 100644 --- a/src/main/java/org/elasticsearch/watcher/support/init/proxy/ClientProxy.java +++ b/src/main/java/org/elasticsearch/watcher/support/init/proxy/ClientProxy.java @@ -6,8 +6,11 @@ package org.elasticsearch.watcher.support.init.proxy; import org.elasticsearch.action.ActionFuture; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; import org.elasticsearch.action.admin.indices.refresh.RefreshResponse; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.delete.DeleteResponse; import org.elasticsearch.action.index.IndexRequest; @@ -58,6 +61,21 @@ public class ClientProxy implements InitializingService.Initializable { return client.index(preProcess(request)).actionGet(); } + public BulkResponse bulk(BulkRequest request) { + request.listenerThreaded(true); + return client.bulk(request).actionGet(); + } + + public void indexAsync(IndexRequest request, ActionListener listener) { + request.listenerThreaded(true); + client.index(request, listener); + } + + public void bulkAsync(BulkRequest request, ActionListener listener) { + request.listenerThreaded(true); + client.bulk(request, listener); + } + public ActionFuture delete(DeleteRequest request) { return client.delete(preProcess(request)); } diff --git a/src/main/java/org/elasticsearch/watcher/transport/actions/execute/TransportExecuteWatchAction.java b/src/main/java/org/elasticsearch/watcher/transport/actions/execute/TransportExecuteWatchAction.java index 3561f4024cd..06d19c7e802 100644 --- a/src/main/java/org/elasticsearch/watcher/transport/actions/execute/TransportExecuteWatchAction.java +++ b/src/main/java/org/elasticsearch/watcher/transport/actions/execute/TransportExecuteWatchAction.java @@ -89,7 +89,7 @@ public class TransportExecuteWatchAction extends WatcherTransportAction { public static interface Listener { - void triggered(String jobName, TriggerEvent event); + void triggered(Iterable events); + } public static interface Job { diff --git a/src/main/java/org/elasticsearch/watcher/trigger/TriggerEvent.java b/src/main/java/org/elasticsearch/watcher/trigger/TriggerEvent.java index 3d1db67192c..9eb2380ce92 100644 --- a/src/main/java/org/elasticsearch/watcher/trigger/TriggerEvent.java +++ b/src/main/java/org/elasticsearch/watcher/trigger/TriggerEvent.java @@ -8,10 +8,7 @@ package org.elasticsearch.watcher.trigger; import org.elasticsearch.common.ParseField; import org.elasticsearch.common.joda.time.DateTime; import org.elasticsearch.common.xcontent.ToXContent; -import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.watcher.support.WatcherDateUtils; -import java.io.IOException; import java.util.HashMap; import java.util.Map; @@ -21,16 +18,23 @@ import java.util.Map; public abstract class TriggerEvent implements ToXContent { public static final ParseField TRIGGERED_TIME_FIELD = new ParseField("triggered_time"); + public static final ParseField JOB_NAME_FIELD = new ParseField("job_name"); + private final String jobName; protected final DateTime triggeredTime; protected final Map data; - public TriggerEvent(DateTime triggeredTime) { + public TriggerEvent(String jobName, DateTime triggeredTime) { + this.jobName = jobName; this.triggeredTime = triggeredTime; this.data = new HashMap<>(); data.put(TRIGGERED_TIME_FIELD.getPreferredName(), triggeredTime); } + public String jobName() { + return jobName; + } + public abstract String type(); public DateTime triggeredTime() { diff --git a/src/main/java/org/elasticsearch/watcher/trigger/TriggerService.java b/src/main/java/org/elasticsearch/watcher/trigger/TriggerService.java index c9476b47d99..b4b0801d5b9 100644 --- a/src/main/java/org/elasticsearch/watcher/trigger/TriggerService.java +++ b/src/main/java/org/elasticsearch/watcher/trigger/TriggerService.java @@ -130,9 +130,9 @@ public class TriggerService extends AbstractComponent { } @Override - public void triggered(String jobName, TriggerEvent event) { + public void triggered(Iterable events) { for (TriggerEngine.Listener listener : listeners) { - listener.triggered(jobName, event); + listener.triggered(events); } } } diff --git a/src/main/java/org/elasticsearch/watcher/trigger/manual/ManualTriggerEvent.java b/src/main/java/org/elasticsearch/watcher/trigger/manual/ManualTriggerEvent.java index 6cf68b1c1b3..7109082843d 100644 --- a/src/main/java/org/elasticsearch/watcher/trigger/manual/ManualTriggerEvent.java +++ b/src/main/java/org/elasticsearch/watcher/trigger/manual/ManualTriggerEvent.java @@ -25,8 +25,8 @@ public class ManualTriggerEvent extends TriggerEvent { private final Map triggerData; - public ManualTriggerEvent(DateTime triggeredTime, Map triggerData) { - super(triggeredTime); + public ManualTriggerEvent(String jobName, DateTime triggeredTime, Map triggerData) { + super(jobName, triggeredTime); data.putAll(triggerData); this.triggerData = triggerData; } @@ -39,12 +39,14 @@ public class ManualTriggerEvent extends TriggerEvent { @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { return builder.startObject() + .field(JOB_NAME_FIELD.getPreferredName(), jobName()) .field(TRIGGERED_TIME_FIELD.getPreferredName(), WatcherDateUtils.formatDate(triggeredTime)) .field(TRIGGER_DATA_FIELD.getPreferredName(), triggerData) .endObject(); } public static ManualTriggerEvent parse(String context, XContentParser parser) throws IOException { + String jobName = null; DateTime triggeredTime = null; Map triggerData = new HashMap<>(); String currentFieldName = null; @@ -54,7 +56,9 @@ public class ManualTriggerEvent extends TriggerEvent { currentFieldName = parser.currentName(); } else { if (token == XContentParser.Token.VALUE_STRING) { - if (TRIGGERED_TIME_FIELD.match(currentFieldName)) { + if (JOB_NAME_FIELD.match(currentFieldName)) { + jobName = parser.text(); + } else if (TRIGGERED_TIME_FIELD.match(currentFieldName)) { triggeredTime = WatcherDateUtils.parseDate(parser.text()); } else { throw new ParseException("could not parse trigger event for [" + context + "]. unknown string value field [" + currentFieldName + "]"); @@ -73,7 +77,7 @@ public class ManualTriggerEvent extends TriggerEvent { // should never be, it's fully controlled internally (not coming from the user) assert triggeredTime != null; - return new ManualTriggerEvent(triggeredTime, triggerData); + return new ManualTriggerEvent(jobName, triggeredTime, triggerData); } public static class ParseException extends WatcherException { diff --git a/src/main/java/org/elasticsearch/watcher/trigger/schedule/IntervalSchedule.java b/src/main/java/org/elasticsearch/watcher/trigger/schedule/IntervalSchedule.java index 4507bf67408..92186321742 100644 --- a/src/main/java/org/elasticsearch/watcher/trigger/schedule/IntervalSchedule.java +++ b/src/main/java/org/elasticsearch/watcher/trigger/schedule/IntervalSchedule.java @@ -37,7 +37,7 @@ public class IntervalSchedule implements Schedule { if (time <= startTime) { return startTime; } - // advancing the time in 1 ns (we're looking for the time **after**) + // advancing the time in 1 ms (we're looking for the time **after**) time += 1; long delta = time - startTime; return startTime + (delta / interval.millis + 1) * interval.millis; diff --git a/src/main/java/org/elasticsearch/watcher/trigger/schedule/ScheduleTriggerEvent.java b/src/main/java/org/elasticsearch/watcher/trigger/schedule/ScheduleTriggerEvent.java index 6a1724e76cf..d9f62b18be5 100644 --- a/src/main/java/org/elasticsearch/watcher/trigger/schedule/ScheduleTriggerEvent.java +++ b/src/main/java/org/elasticsearch/watcher/trigger/schedule/ScheduleTriggerEvent.java @@ -24,8 +24,8 @@ public class ScheduleTriggerEvent extends TriggerEvent { private final DateTime scheduledTime; - public ScheduleTriggerEvent(DateTime triggeredTime, DateTime scheduledTime) { - super(triggeredTime); + public ScheduleTriggerEvent(String jobName, DateTime triggeredTime, DateTime scheduledTime) { + super(jobName, triggeredTime); this.scheduledTime = scheduledTime; data.put(SCHEDULED_TIME_FIELD.getPreferredName(), scheduledTime); } @@ -42,12 +42,14 @@ public class ScheduleTriggerEvent extends TriggerEvent { @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { return builder.startObject() + .field(JOB_NAME_FIELD.getPreferredName(), jobName()) .field(TRIGGERED_TIME_FIELD.getPreferredName(), WatcherDateUtils.formatDate(triggeredTime)) .field(SCHEDULED_TIME_FIELD.getPreferredName(), WatcherDateUtils.formatDate(scheduledTime)) .endObject(); } public static ScheduleTriggerEvent parse(String context, XContentParser parser) throws IOException { + String jobName = null; DateTime triggeredTime = null; DateTime scheduledTime = null; @@ -58,7 +60,9 @@ public class ScheduleTriggerEvent extends TriggerEvent { currentFieldName = parser.currentName(); } else { if (token == XContentParser.Token.VALUE_STRING) { - if (TRIGGERED_TIME_FIELD.match(currentFieldName)) { + if (JOB_NAME_FIELD.match(currentFieldName)) { + jobName = parser.text(); + } else if (TRIGGERED_TIME_FIELD.match(currentFieldName)) { triggeredTime = WatcherDateUtils.parseDate(parser.text()); } else if (SCHEDULED_TIME_FIELD.match(currentFieldName)) { scheduledTime = WatcherDateUtils.parseDate(parser.text()); @@ -72,8 +76,8 @@ public class ScheduleTriggerEvent extends TriggerEvent { } // should never be, it's fully controlled internally (not coming from the user) - assert triggeredTime != null && scheduledTime != null; - return new ScheduleTriggerEvent(triggeredTime, scheduledTime); + assert jobName != null && triggeredTime != null && scheduledTime != null; + return new ScheduleTriggerEvent(jobName, triggeredTime, scheduledTime); } public static class ParseException extends WatcherException { diff --git a/src/main/java/org/elasticsearch/watcher/trigger/schedule/engine/HashWheelScheduleTriggerEngine.java b/src/main/java/org/elasticsearch/watcher/trigger/schedule/engine/HashWheelScheduleTriggerEngine.java index 1906601f43d..0b55d05d485 100644 --- a/src/main/java/org/elasticsearch/watcher/trigger/schedule/engine/HashWheelScheduleTriggerEngine.java +++ b/src/main/java/org/elasticsearch/watcher/trigger/schedule/engine/HashWheelScheduleTriggerEngine.java @@ -5,6 +5,7 @@ */ package org.elasticsearch.watcher.trigger.schedule.engine; +import org.elasticsearch.common.collect.ImmutableList; import org.elasticsearch.common.collect.ImmutableMap; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.joda.time.DateTime; @@ -21,6 +22,7 @@ import org.elasticsearch.common.util.concurrent.XRejectedExecutionHandler; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.watcher.support.ThreadPoolSettingsBuilder; import org.elasticsearch.watcher.support.clock.Clock; +import org.elasticsearch.watcher.trigger.TriggerEvent; import org.elasticsearch.watcher.trigger.schedule.*; import java.util.ArrayList; @@ -45,7 +47,7 @@ public class HashWheelScheduleTriggerEngine extends ScheduleTriggerEngine { } int availableProcessors = EsExecutors.boundedNumberOfProcessors(settings); return new ThreadPoolSettingsBuilder.Fixed(THREAD_POOL_NAME) - .size(availableProcessors) + .size(availableProcessors * 2) .queueSize(1000) .build(); } @@ -119,10 +121,10 @@ public class HashWheelScheduleTriggerEngine extends ScheduleTriggerEngine { protected void notifyListeners(String name, long triggeredTime, long scheduledTime) { logger.trace("triggered job [{}] at [{}] (scheduled time was [{}])", name, new DateTime(triggeredTime), new DateTime(scheduledTime)); - final ScheduleTriggerEvent event = new ScheduleTriggerEvent(new DateTime(triggeredTime), new DateTime(scheduledTime)); + final ScheduleTriggerEvent event = new ScheduleTriggerEvent(name, new DateTime(triggeredTime), new DateTime(scheduledTime)); for (Listener listener : listeners) { try { - executor.execute(new ListenerRunnable(listener, name, event)); + executor.execute(new ListenerRunnable(listener, event)); } catch (EsRejectedExecutionException e) { if (logger.isDebugEnabled()) { RejectedExecutionHandler rejectedExecutionHandler = executor.getRejectedExecutionHandler(); @@ -182,18 +184,16 @@ public class HashWheelScheduleTriggerEngine extends ScheduleTriggerEngine { static class ListenerRunnable implements Runnable { private final Listener listener; - private final String jobName; private final ScheduleTriggerEvent event; - public ListenerRunnable(Listener listener, String jobName, ScheduleTriggerEvent event) { + public ListenerRunnable(Listener listener, ScheduleTriggerEvent event) { this.listener = listener; - this.jobName = jobName; this.event = event; } @Override public void run() { - listener.triggered(jobName, event); + listener.triggered(ImmutableList.of(event)); } } diff --git a/src/main/java/org/elasticsearch/watcher/trigger/schedule/engine/QuartzScheduleTriggerEngine.java b/src/main/java/org/elasticsearch/watcher/trigger/schedule/engine/QuartzScheduleTriggerEngine.java index 6d9dedbaa5c..4b92602981c 100644 --- a/src/main/java/org/elasticsearch/watcher/trigger/schedule/engine/QuartzScheduleTriggerEngine.java +++ b/src/main/java/org/elasticsearch/watcher/trigger/schedule/engine/QuartzScheduleTriggerEngine.java @@ -5,6 +5,7 @@ */ package org.elasticsearch.watcher.trigger.schedule.engine; +import org.elasticsearch.common.collect.ImmutableList; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.joda.time.DateTime; import org.elasticsearch.common.joda.time.DateTimeZone; @@ -18,6 +19,7 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.watcher.WatcherSettingsException; import org.elasticsearch.watcher.support.ThreadPoolSettingsBuilder; import org.elasticsearch.watcher.support.clock.Clock; +import org.elasticsearch.watcher.trigger.TriggerEvent; import org.elasticsearch.watcher.trigger.TriggerException; import org.elasticsearch.watcher.trigger.schedule.*; import org.quartz.*; @@ -43,7 +45,7 @@ public class QuartzScheduleTriggerEngine extends ScheduleTriggerEngine { } int availableProcessors = EsExecutors.boundedNumberOfProcessors(settings); return new ThreadPoolSettingsBuilder.Fixed(THREAD_POOL_NAME) - .size(availableProcessors) + .size(availableProcessors * 2) .queueSize(1000) .build(); } @@ -160,10 +162,10 @@ public class QuartzScheduleTriggerEngine extends ScheduleTriggerEngine { } protected void notifyListeners(String name, JobExecutionContext ctx) { - ScheduleTriggerEvent event = new ScheduleTriggerEvent(new DateTime(ctx.getFireTime()), new DateTime(ctx.getScheduledFireTime())); + ScheduleTriggerEvent event = new ScheduleTriggerEvent(name, new DateTime(ctx.getFireTime()), new DateTime(ctx.getScheduledFireTime())); for (Listener listener : listeners) { try { - executor.execute(new ListenerRunnable(listener, name, event)); + executor.execute(new ListenerRunnable(listener, event)); } catch (EsRejectedExecutionException e) { if (logger.isDebugEnabled()) { RejectedExecutionHandler rejectedExecutionHandler = executor.getRejectedExecutionHandler(); @@ -181,18 +183,16 @@ public class QuartzScheduleTriggerEngine extends ScheduleTriggerEngine { static class ListenerRunnable implements Runnable { private final Listener listener; - private final String jobName; private final ScheduleTriggerEvent event; - public ListenerRunnable(Listener listener, String jobName, ScheduleTriggerEvent event) { + public ListenerRunnable(Listener listener, ScheduleTriggerEvent event) { this.listener = listener; - this.jobName = jobName; this.event = event; } @Override public void run() { - listener.triggered(jobName, event); + listener.triggered(ImmutableList.of(event)); } } diff --git a/src/main/java/org/elasticsearch/watcher/trigger/schedule/engine/SchedulerScheduleTriggerEngine.java b/src/main/java/org/elasticsearch/watcher/trigger/schedule/engine/SchedulerScheduleTriggerEngine.java index 52dd4d8e1bf..ad1aba13910 100644 --- a/src/main/java/org/elasticsearch/watcher/trigger/schedule/engine/SchedulerScheduleTriggerEngine.java +++ b/src/main/java/org/elasticsearch/watcher/trigger/schedule/engine/SchedulerScheduleTriggerEngine.java @@ -5,6 +5,7 @@ */ package org.elasticsearch.watcher.trigger.schedule.engine; +import org.elasticsearch.common.collect.ImmutableList; import org.elasticsearch.common.collect.ImmutableMap; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.joda.time.DateTime; @@ -17,6 +18,7 @@ import org.elasticsearch.common.util.concurrent.XRejectedExecutionHandler; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.watcher.support.ThreadPoolSettingsBuilder; import org.elasticsearch.watcher.support.clock.Clock; +import org.elasticsearch.watcher.trigger.TriggerEvent; import org.elasticsearch.watcher.trigger.schedule.*; import java.util.ArrayList; @@ -40,7 +42,7 @@ public class SchedulerScheduleTriggerEngine extends ScheduleTriggerEngine { } int availableProcessors = EsExecutors.boundedNumberOfProcessors(settings); return new ThreadPoolSettingsBuilder.Fixed(THREAD_POOL_NAME) - .size(availableProcessors) + .size(availableProcessors * 2) .queueSize(1000) .build(); } @@ -106,10 +108,10 @@ public class SchedulerScheduleTriggerEngine extends ScheduleTriggerEngine { protected void notifyListeners(String name, long triggeredTime, long scheduledTime) { logger.trace("triggered job [{}] at [{}] (scheduled time was [{}])", name, new DateTime(triggeredTime), new DateTime(scheduledTime)); - final ScheduleTriggerEvent event = new ScheduleTriggerEvent(new DateTime(triggeredTime), new DateTime(scheduledTime)); + final ScheduleTriggerEvent event = new ScheduleTriggerEvent(name, new DateTime(triggeredTime), new DateTime(scheduledTime)); for (Listener listener : listeners) { try { - executor.execute(new ListenerRunnable(listener, name, event)); + executor.execute(new ListenerRunnable(listener, event)); } catch (EsRejectedExecutionException e) { if (logger.isDebugEnabled()) { RejectedExecutionHandler rejectedExecutionHandler = executor.getRejectedExecutionHandler(); @@ -169,18 +171,16 @@ public class SchedulerScheduleTriggerEngine extends ScheduleTriggerEngine { static class ListenerRunnable implements Runnable { private final Listener listener; - private final String jobName; private final ScheduleTriggerEvent event; - public ListenerRunnable(Listener listener, String jobName, ScheduleTriggerEvent event) { + public ListenerRunnable(Listener listener, ScheduleTriggerEvent event) { this.listener = listener; - this.jobName = jobName; this.event = event; } @Override public void run() { - listener.triggered(jobName, event); + listener.triggered(ImmutableList.of(event)); } } diff --git a/src/main/java/org/elasticsearch/watcher/trigger/schedule/engine/SimpleTickerScheduleTriggerEngine.java b/src/main/java/org/elasticsearch/watcher/trigger/schedule/engine/SimpleTickerScheduleTriggerEngine.java index e61a8ffc78a..752b4f6732f 100644 --- a/src/main/java/org/elasticsearch/watcher/trigger/schedule/engine/SimpleTickerScheduleTriggerEngine.java +++ b/src/main/java/org/elasticsearch/watcher/trigger/schedule/engine/SimpleTickerScheduleTriggerEngine.java @@ -5,6 +5,7 @@ */ package org.elasticsearch.watcher.trigger.schedule.engine; +import org.elasticsearch.common.collect.ImmutableList; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.joda.time.DateTime; import org.elasticsearch.common.settings.ImmutableSettings; @@ -16,9 +17,12 @@ import org.elasticsearch.common.util.concurrent.XRejectedExecutionHandler; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.watcher.support.ThreadPoolSettingsBuilder; import org.elasticsearch.watcher.support.clock.Clock; +import org.elasticsearch.watcher.trigger.TriggerEvent; import org.elasticsearch.watcher.trigger.schedule.*; +import java.util.ArrayList; import java.util.Collection; +import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; @@ -40,7 +44,7 @@ public class SimpleTickerScheduleTriggerEngine extends ScheduleTriggerEngine { } int availableProcessors = EsExecutors.boundedNumberOfProcessors(settings); return new ThreadPoolSettingsBuilder.Fixed(THREAD_POOL_NAME) - .size(availableProcessors) + .size(availableProcessors * 2) .queueSize(1000) .build(); } @@ -93,20 +97,27 @@ public class SimpleTickerScheduleTriggerEngine extends ScheduleTriggerEngine { void checkJobs() { long triggeredTime = clock.millis(); + List events = new ArrayList<>(); for (ActiveSchedule schedule : schedules.values()) { long scheduledTime = schedule.check(triggeredTime); if (scheduledTime > 0) { - notifyListeners(schedule.name, triggeredTime, scheduledTime); + logger.trace("triggered job [{}] at [{}] (scheduled time was [{}])", schedule.name, new DateTime(triggeredTime), new DateTime(scheduledTime)); + events.add(new ScheduleTriggerEvent(schedule.name, new DateTime(triggeredTime), new DateTime(scheduledTime))); + if (events.size() >= 1000) { + notifyListeners(ImmutableList.copyOf(events)); + events.clear(); + } } } + if (events.size() > 0) { + notifyListeners(events); + } } - protected void notifyListeners(String name, long triggeredTime, long scheduledTime) { - logger.trace("triggered job [{}] at [{}] (scheduled time was [{}])", name, new DateTime(triggeredTime), new DateTime(scheduledTime)); - final ScheduleTriggerEvent event = new ScheduleTriggerEvent(new DateTime(triggeredTime), new DateTime(scheduledTime)); + protected void notifyListeners(List events) { for (Listener listener : listeners) { try { - executor.execute(new ListenerRunnable(listener, name, event)); + executor.execute(new ListenerRunnable(listener, events)); } catch (EsRejectedExecutionException e) { if (logger.isDebugEnabled()) { RejectedExecutionHandler rejectedExecutionHandler = executor.getRejectedExecutionHandler(); @@ -156,18 +167,16 @@ public class SimpleTickerScheduleTriggerEngine extends ScheduleTriggerEngine { static class ListenerRunnable implements Runnable { private final Listener listener; - private final String jobName; - private final ScheduleTriggerEvent event; + private final List events; - public ListenerRunnable(Listener listener, String jobName, ScheduleTriggerEvent event) { + public ListenerRunnable(Listener listener, List events) { this.listener = listener; - this.jobName = jobName; - this.event = event; + this.events = events; } @Override public void run() { - listener.triggered(jobName, event); + listener.triggered(events); } } @@ -192,7 +201,7 @@ public class SimpleTickerScheduleTriggerEngine extends ScheduleTriggerEngine { logger.trace("checking jobs [{}]", DateTime.now()); checkJobs(); try { - sleep(1000); + sleep(500); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } diff --git a/src/main/java/org/elasticsearch/watcher/trigger/schedule/engine/TimerTickerScheduleTriggerEngine.java b/src/main/java/org/elasticsearch/watcher/trigger/schedule/engine/TimerTickerScheduleTriggerEngine.java index 6a00a0d1d8e..9f1224bcf38 100644 --- a/src/main/java/org/elasticsearch/watcher/trigger/schedule/engine/TimerTickerScheduleTriggerEngine.java +++ b/src/main/java/org/elasticsearch/watcher/trigger/schedule/engine/TimerTickerScheduleTriggerEngine.java @@ -5,6 +5,7 @@ */ package org.elasticsearch.watcher.trigger.schedule.engine; +import org.elasticsearch.common.collect.ImmutableList; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.joda.time.DateTime; import org.elasticsearch.common.settings.ImmutableSettings; @@ -16,12 +17,10 @@ import org.elasticsearch.common.util.concurrent.XRejectedExecutionHandler; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.watcher.support.ThreadPoolSettingsBuilder; import org.elasticsearch.watcher.support.clock.Clock; +import org.elasticsearch.watcher.trigger.TriggerEvent; import org.elasticsearch.watcher.trigger.schedule.*; -import java.util.Collection; -import java.util.Map; -import java.util.Timer; -import java.util.TimerTask; +import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.RejectedExecutionHandler; @@ -41,7 +40,7 @@ public class TimerTickerScheduleTriggerEngine extends ScheduleTriggerEngine { } int availableProcessors = EsExecutors.boundedNumberOfProcessors(settings); return new ThreadPoolSettingsBuilder.Fixed(THREAD_POOL_NAME) - .size(availableProcessors) + .size(availableProcessors * 2) .queueSize(1000) .build(); } @@ -79,7 +78,7 @@ public class TimerTickerScheduleTriggerEngine extends ScheduleTriggerEngine { } }; this.timer = new Timer("ticker-schedule-trigger-engine", true); - this.timer.scheduleAtFixedRate(ticker, clock.millis() % 1000 , 10); + this.timer.scheduleAtFixedRate(ticker, clock.millis() % 1000, 500); } @Override @@ -103,19 +102,26 @@ public class TimerTickerScheduleTriggerEngine extends ScheduleTriggerEngine { void checkJobs() { long triggeredTime = clock.millis(); + List events = new ArrayList<>(); for (ActiveSchedule schedule : schedules.values()) { long scheduledTime = schedule.check(triggeredTime); if (scheduledTime > 0) { - notifyListeners(schedule.name, triggeredTime, scheduledTime); + events.add(new ScheduleTriggerEvent(schedule.name, new DateTime(triggeredTime), new DateTime(scheduledTime))); + if (events.size() >= 1000) { + notifyListeners(ImmutableList.copyOf(events)); + events.clear(); + } } } + if (events.size() > 0) { + notifyListeners(events); + } } - protected void notifyListeners(String name, long triggeredTime, long scheduledTime) { - final ScheduleTriggerEvent event = new ScheduleTriggerEvent(new DateTime(triggeredTime), new DateTime(scheduledTime)); + protected void notifyListeners(List events) { for (Listener listener : listeners) { try { - executor.execute(new ListenerRunnable(listener, name, event)); + executor.execute(new ListenerRunnable(listener, events)); } catch (EsRejectedExecutionException e) { if (logger.isDebugEnabled()) { RejectedExecutionHandler rejectedExecutionHandler = executor.getRejectedExecutionHandler(); @@ -165,18 +171,16 @@ public class TimerTickerScheduleTriggerEngine extends ScheduleTriggerEngine { static class ListenerRunnable implements Runnable { private final Listener listener; - private final String jobName; - private final ScheduleTriggerEvent event; + private final List events; - public ListenerRunnable(Listener listener, String jobName, ScheduleTriggerEvent event) { + public ListenerRunnable(Listener listener, List events) { this.listener = listener; - this.jobName = jobName; - this.event = event; + this.events = events; } @Override public void run() { - listener.triggered(jobName, event); + listener.triggered(events); } } diff --git a/src/test/java/org/elasticsearch/watcher/actions/email/EmailActionTests.java b/src/test/java/org/elasticsearch/watcher/actions/email/EmailActionTests.java index 2b1782365ef..a7a4f6316a2 100644 --- a/src/test/java/org/elasticsearch/watcher/actions/email/EmailActionTests.java +++ b/src/test/java/org/elasticsearch/watcher/actions/email/EmailActionTests.java @@ -93,7 +93,7 @@ public class EmailActionTests extends ElasticsearchTestCase { WatchExecutionContext ctx = mockExecutionContextBuilder("watch1") .wid(wid) .payload(payload) - .time(now) + .time("watch1", now) .metadata(metadata) .buildMock(); diff --git a/src/test/java/org/elasticsearch/watcher/actions/index/IndexActionTests.java b/src/test/java/org/elasticsearch/watcher/actions/index/IndexActionTests.java index baba91fdb4f..8ee191ead9d 100644 --- a/src/test/java/org/elasticsearch/watcher/actions/index/IndexActionTests.java +++ b/src/test/java/org/elasticsearch/watcher/actions/index/IndexActionTests.java @@ -70,7 +70,7 @@ public class IndexActionTests extends ElasticsearchIntegrationTest { } }, logger); - WatchExecutionContext ctx = new TriggeredExecutionContext(watch, new DateTime(), new ScheduleTriggerEvent(new DateTime(), new DateTime())); + WatchExecutionContext ctx = new TriggeredExecutionContext(watch, new DateTime(), new ScheduleTriggerEvent(watch.name(), new DateTime(), new DateTime())); Map payloadMap = new HashMap<>(); payloadMap.put("test", "foo"); diff --git a/src/test/java/org/elasticsearch/watcher/actions/logging/LoggingActionTests.java b/src/test/java/org/elasticsearch/watcher/actions/logging/LoggingActionTests.java index 2c65eaa0888..4f0eeb26ba9 100644 --- a/src/test/java/org/elasticsearch/watcher/actions/logging/LoggingActionTests.java +++ b/src/test/java/org/elasticsearch/watcher/actions/logging/LoggingActionTests.java @@ -81,7 +81,7 @@ public class LoggingActionTests extends ElasticsearchTestCase { when(engine.render(template, expectedModel)).thenReturn(text); WatchExecutionContext ctx = WatcherTestUtils.mockExecutionContextBuilder("_watch_id") - .time(now) + .time("_watch_id", now) .buildMock(); LoggingAction.Result result = executable.execute("_id", ctx, new Payload.Simple()); diff --git a/src/test/java/org/elasticsearch/watcher/actions/webhook/WebhookActionTests.java b/src/test/java/org/elasticsearch/watcher/actions/webhook/WebhookActionTests.java index 43d24d3c4ac..137dd1242ef 100644 --- a/src/test/java/org/elasticsearch/watcher/actions/webhook/WebhookActionTests.java +++ b/src/test/java/org/elasticsearch/watcher/actions/webhook/WebhookActionTests.java @@ -114,7 +114,7 @@ public class WebhookActionTests extends ElasticsearchTestCase { ExecutableWebhookAction executable = new ExecutableWebhookAction(action, logger, httpClient, templateEngine); Watch watch = createWatch("test_watch", client, account); - WatchExecutionContext ctx = new TriggeredExecutionContext(watch, new DateTime(), new ScheduleTriggerEvent(new DateTime(), new DateTime())); + WatchExecutionContext ctx = new TriggeredExecutionContext(watch, new DateTime(), new ScheduleTriggerEvent(watch.name(), new DateTime(), new DateTime())); WebhookAction.Result actionResult = executable.execute("_id", ctx, new Payload.Simple()); scenario.assertResult(actionResult); @@ -362,7 +362,7 @@ public class WebhookActionTests extends ElasticsearchTestCase { String watchName = "test_url_encode" + randomAsciiOfLength(10); Watch watch = createWatch(watchName, mock(ClientProxy.class), "account1"); - WatchExecutionContext ctx = new TriggeredExecutionContext(watch, new DateTime(), new ScheduleTriggerEvent(new DateTime(), new DateTime())); + WatchExecutionContext ctx = new TriggeredExecutionContext(watch, new DateTime(), new ScheduleTriggerEvent(watchName, new DateTime(), new DateTime())); WebhookAction.Result result = webhookAction.execute("_id", ctx, new Payload.Simple()); assertThat(result, Matchers.instanceOf(WebhookAction.Result.Executed.class)); } diff --git a/src/test/java/org/elasticsearch/watcher/execution/ExecutionServiceTests.java b/src/test/java/org/elasticsearch/watcher/execution/ExecutionServiceTests.java index 0fb438cc7f2..f5767ef1cbb 100644 --- a/src/test/java/org/elasticsearch/watcher/execution/ExecutionServiceTests.java +++ b/src/test/java/org/elasticsearch/watcher/execution/ExecutionServiceTests.java @@ -24,7 +24,6 @@ import org.elasticsearch.watcher.support.clock.Clock; import org.elasticsearch.watcher.support.clock.ClockMock; import org.elasticsearch.watcher.throttle.Throttler; import org.elasticsearch.watcher.transform.Transform; -import org.elasticsearch.watcher.trigger.TriggerService; import org.elasticsearch.watcher.trigger.schedule.ScheduleTriggerEvent; import org.elasticsearch.watcher.watch.*; import org.junit.Before; @@ -59,10 +58,9 @@ public class ExecutionServiceTests extends ElasticsearchTestCase { WatchExecutor executor = mock(WatchExecutor.class); WatchStore watchStore = mock(WatchStore.class); WatchLockService watchLockService = mock(WatchLockService.class); - TriggerService triggerService = mock(TriggerService.class); ClusterService clusterService = mock(ClusterService.class); Clock clock = new ClockMock(); - executionService = new ExecutionService(ImmutableSettings.EMPTY, historyStore, executor, watchStore, watchLockService, triggerService, clusterService, clock); + executionService = new ExecutionService(ImmutableSettings.EMPTY, historyStore, executor, watchStore, watchLockService, clusterService, clock); } @Test @@ -96,7 +94,7 @@ public class ExecutionServiceTests extends ElasticsearchTestCase { DateTime now = DateTime.now(DateTimeZone.UTC); - ScheduleTriggerEvent event = new ScheduleTriggerEvent(now, now); + ScheduleTriggerEvent event = new ScheduleTriggerEvent(null, now, now); WatchExecutionContext context = new TriggeredExecutionContext(watch, now, event); WatchExecution watchExecution = executionService.executeInner(context); assertThat(watchExecution.conditionResult(), sameInstance(conditionResult)); @@ -142,7 +140,7 @@ public class ExecutionServiceTests extends ElasticsearchTestCase { DateTime now = DateTime.now(DateTimeZone.UTC); - ScheduleTriggerEvent event = new ScheduleTriggerEvent(now, now); + ScheduleTriggerEvent event = new ScheduleTriggerEvent(null, now, now); WatchExecutionContext context = new TriggeredExecutionContext(watch, now, event); WatchExecution watchExecution = executionService.executeInner(context); assertThat(watchExecution.inputResult(), sameInstance(inputResult)); @@ -188,7 +186,7 @@ public class ExecutionServiceTests extends ElasticsearchTestCase { DateTime now = DateTime.now(DateTimeZone.UTC); - ScheduleTriggerEvent event = new ScheduleTriggerEvent(now, now); + ScheduleTriggerEvent event = new ScheduleTriggerEvent(null, now, now); WatchExecutionContext context = new TriggeredExecutionContext(watch, now, event); WatchExecution watchExecution = executionService.executeInner(context); assertThat(watchExecution.inputResult(), sameInstance(inputResult)); diff --git a/src/test/java/org/elasticsearch/watcher/history/HistoryStoreLifeCycleTest.java b/src/test/java/org/elasticsearch/watcher/history/HistoryStoreLifeCycleTest.java index 819072bf4b0..84ef62230f4 100644 --- a/src/test/java/org/elasticsearch/watcher/history/HistoryStoreLifeCycleTest.java +++ b/src/test/java/org/elasticsearch/watcher/history/HistoryStoreLifeCycleTest.java @@ -37,7 +37,7 @@ public class HistoryStoreLifeCycleTest extends AbstractWatcherIntegrationTests { WatchRecord[] watchRecords = new WatchRecord[randomIntBetween(1, 50)]; for (int i = 0; i < watchRecords.length; i++) { DateTime dateTime = new DateTime(i, DateTimeZone.UTC); - ScheduleTriggerEvent event = new ScheduleTriggerEvent(dateTime, dateTime); + ScheduleTriggerEvent event = new ScheduleTriggerEvent(watch.name(), dateTime, dateTime); Wid wid = new Wid("record_" + i, randomLong(), DateTime.now(UTC)); watchRecords[i] = new WatchRecord(wid, watch, event); historyStore.put(watchRecords[i]); diff --git a/src/test/java/org/elasticsearch/watcher/history/HistoryStoreTests.java b/src/test/java/org/elasticsearch/watcher/history/HistoryStoreTests.java index 5e321f63b86..6aa328673b9 100644 --- a/src/test/java/org/elasticsearch/watcher/history/HistoryStoreTests.java +++ b/src/test/java/org/elasticsearch/watcher/history/HistoryStoreTests.java @@ -76,7 +76,7 @@ public class HistoryStoreTests extends ElasticsearchTestCase { when(watch.condition()).thenReturn(new ExecutableAlwaysCondition(logger)); when(watch.input()).thenReturn(null); when(watch.metadata()).thenReturn(null); - ScheduleTriggerEvent event = new ScheduleTriggerEvent(new DateTime(0, DateTimeZone.UTC), new DateTime(0, DateTimeZone.UTC)); + ScheduleTriggerEvent event = new ScheduleTriggerEvent(watch.name(), new DateTime(0, DateTimeZone.UTC), new DateTime(0, DateTimeZone.UTC)); Wid wid = new Wid("_name", 0, new DateTime(0, DateTimeZone.UTC)); WatchRecord watchRecord = new WatchRecord(wid, watch, event); @@ -96,7 +96,7 @@ public class HistoryStoreTests extends ElasticsearchTestCase { when(watch.condition()).thenReturn(new ExecutableAlwaysCondition(logger)); when(watch.input()).thenReturn(null); when(watch.metadata()).thenReturn(null); - ScheduleTriggerEvent event = new ScheduleTriggerEvent(new DateTime(0, DateTimeZone.UTC), new DateTime(0, DateTimeZone.UTC)); + ScheduleTriggerEvent event = new ScheduleTriggerEvent(watch.name(), new DateTime(0, DateTimeZone.UTC), new DateTime(0, DateTimeZone.UTC)); Wid wid = new Wid("_name", 0, new DateTime(0, DateTimeZone.UTC)); WatchRecord watchRecord = new WatchRecord(wid, watch, event); watchRecord.version(4l); @@ -117,7 +117,7 @@ public class HistoryStoreTests extends ElasticsearchTestCase { when(watch.condition()).thenReturn(new ExecutableAlwaysCondition(logger)); when(watch.input()).thenReturn(null); when(watch.metadata()).thenReturn(null); - ScheduleTriggerEvent event = new ScheduleTriggerEvent(new DateTime(0, DateTimeZone.UTC), new DateTime(0, DateTimeZone.UTC)); + ScheduleTriggerEvent event = new ScheduleTriggerEvent(watch.name(), new DateTime(0, DateTimeZone.UTC), new DateTime(0, DateTimeZone.UTC)); Wid wid = new Wid("_name", 0, new DateTime(0, DateTimeZone.UTC)); WatchRecord watchRecord = new WatchRecord(wid, watch, event); @@ -137,7 +137,7 @@ public class HistoryStoreTests extends ElasticsearchTestCase { when(watch.condition()).thenReturn(new ExecutableAlwaysCondition(logger)); when(watch.input()).thenReturn(null); when(watch.metadata()).thenReturn(null); - ScheduleTriggerEvent event = new ScheduleTriggerEvent(new DateTime(0, DateTimeZone.UTC), new DateTime(0, DateTimeZone.UTC)); + ScheduleTriggerEvent event = new ScheduleTriggerEvent(watch.name(), new DateTime(0, DateTimeZone.UTC), new DateTime(0, DateTimeZone.UTC)); Wid wid = new Wid("_name", 0, new DateTime(0, DateTimeZone.UTC)); WatchRecord watchRecord = new WatchRecord(wid, watch, event); diff --git a/src/test/java/org/elasticsearch/watcher/history/WatchRecordTests.java b/src/test/java/org/elasticsearch/watcher/history/WatchRecordTests.java index 21832565d04..32676d8ab61 100644 --- a/src/test/java/org/elasticsearch/watcher/history/WatchRecordTests.java +++ b/src/test/java/org/elasticsearch/watcher/history/WatchRecordTests.java @@ -39,7 +39,7 @@ public class WatchRecordTests extends AbstractWatcherIntegrationTests { @Test public void testParser() throws Exception { Watch watch = WatcherTestUtils.createTestWatch("fired_test", scriptService(), httpClient(), noopEmailService(), logger); - ScheduleTriggerEvent event = new ScheduleTriggerEvent(DateTime.now(UTC), DateTime.now(UTC)); + ScheduleTriggerEvent event = new ScheduleTriggerEvent(watch.name(), DateTime.now(UTC), DateTime.now(UTC)); Wid wid = new Wid("_record", randomLong(), DateTime.now(UTC)); WatchRecord watchRecord = new WatchRecord(wid, watch, event); XContentBuilder jsonBuilder = XContentFactory.jsonBuilder(); @@ -55,7 +55,7 @@ public class WatchRecordTests extends AbstractWatcherIntegrationTests { @Test public void testParser_WithSealedWatchRecord() throws Exception { Watch watch = WatcherTestUtils.createTestWatch("fired_test", scriptService(), httpClient(), noopEmailService(), logger); - ScheduleTriggerEvent event = new ScheduleTriggerEvent(DateTime.now(UTC), DateTime.now(UTC)); + ScheduleTriggerEvent event = new ScheduleTriggerEvent(watch.name(), DateTime.now(UTC), DateTime.now(UTC)); Wid wid = new Wid("_record", randomLong(), DateTime.now(UTC)); WatchRecord watchRecord = new WatchRecord(wid, watch, event); WatchExecutionContext ctx = new TriggeredExecutionContext(watch, new DateTime(), event); @@ -85,7 +85,7 @@ public class WatchRecordTests extends AbstractWatcherIntegrationTests { @Test public void testParser_WithSealedWatchRecord_WithScriptSearchCondition() throws Exception { Watch watch = WatcherTestUtils.createTestWatch("fired_test", scriptService(), httpClient(), noopEmailService(), logger); - ScheduleTriggerEvent event = new ScheduleTriggerEvent(DateTime.now(UTC), DateTime.now(UTC)); + ScheduleTriggerEvent event = new ScheduleTriggerEvent(watch.name(), DateTime.now(UTC), DateTime.now(UTC)); WatchExecutionContext ctx = new TriggeredExecutionContext( watch, new DateTime(), event); WatchRecord watchRecord = new WatchRecord(ctx.id(), watch, event); ctx.onActionResult(new ActionWrapper.Result("_email", new EmailAction.Result.Failure("failed to send because blah"))); diff --git a/src/test/java/org/elasticsearch/watcher/input/http/HttpInputTests.java b/src/test/java/org/elasticsearch/watcher/input/http/HttpInputTests.java index 4600ea7b8f4..8d349d759f2 100644 --- a/src/test/java/org/elasticsearch/watcher/input/http/HttpInputTests.java +++ b/src/test/java/org/elasticsearch/watcher/input/http/HttpInputTests.java @@ -93,7 +93,7 @@ public class HttpInputTests extends ElasticsearchTestCase { new Watch.Status()); WatchExecutionContext ctx = new TriggeredExecutionContext(watch, new DateTime(0, DateTimeZone.UTC), - new ScheduleTriggerEvent(new DateTime(0, DateTimeZone.UTC), new DateTime(0, DateTimeZone.UTC))); + new ScheduleTriggerEvent(watch.name(), new DateTime(0, DateTimeZone.UTC), new DateTime(0, DateTimeZone.UTC))); HttpInput.Result result = input.execute(ctx); assertThat(result.type(), equalTo(HttpInput.TYPE)); assertThat(result.payload().data(), equalTo(MapBuilder.newMapBuilder().put("key", "value").map())); diff --git a/src/test/java/org/elasticsearch/watcher/input/search/SearchInputTests.java b/src/test/java/org/elasticsearch/watcher/input/search/SearchInputTests.java index b36c7b9a0cb..531c692918f 100644 --- a/src/test/java/org/elasticsearch/watcher/input/search/SearchInputTests.java +++ b/src/test/java/org/elasticsearch/watcher/input/search/SearchInputTests.java @@ -81,7 +81,7 @@ public class SearchInputTests extends ElasticsearchIntegrationTest { null, new Watch.Status()), new DateTime(0, DateTimeZone.UTC), - new ScheduleTriggerEvent(new DateTime(0, DateTimeZone.UTC), new DateTime(0, DateTimeZone.UTC))); + new ScheduleTriggerEvent("test-watch", new DateTime(0, DateTimeZone.UTC), new DateTime(0, DateTimeZone.UTC))); SearchInput.Result result = searchInput.execute(ctx); assertThat((Integer) XContentMapValues.extractValue("hits.total", result.payload().data()), equalTo(0)); @@ -119,7 +119,7 @@ public class SearchInputTests extends ElasticsearchIntegrationTest { null, new Watch.Status()), new DateTime(0, DateTimeZone.UTC), - new ScheduleTriggerEvent(new DateTime(0, DateTimeZone.UTC), new DateTime(0, DateTimeZone.UTC))); + new ScheduleTriggerEvent("test-watch", new DateTime(0, DateTimeZone.UTC), new DateTime(0, DateTimeZone.UTC))); SearchInput.Result result = searchInput.execute(ctx); assertThat((Integer) XContentMapValues.extractValue("hits.total", result.payload().data()), equalTo(0)); diff --git a/src/test/java/org/elasticsearch/watcher/support/VariablesTests.java b/src/test/java/org/elasticsearch/watcher/support/VariablesTests.java index db5f1500630..d9efa40db00 100644 --- a/src/test/java/org/elasticsearch/watcher/support/VariablesTests.java +++ b/src/test/java/org/elasticsearch/watcher/support/VariablesTests.java @@ -32,7 +32,7 @@ public class VariablesTests extends ElasticsearchTestCase { DateTime executionTime = triggeredTime.plusMillis(50); Payload payload = new Payload.Simple(ImmutableMap.builder().put("payload_key", "payload_value").build()); Map metatdata = ImmutableMap.builder().put("metadata_key", "metadata_value").build(); - TriggerEvent event = new ScheduleTriggerEvent(triggeredTime, scheduledTime); + TriggerEvent event = new ScheduleTriggerEvent("_watch_id", triggeredTime, scheduledTime); WatchExecutionContext wec = WatcherTestUtils.mockExecutionContextBuilder("_watch_id") .executionTime(executionTime) .triggerEvent(event) diff --git a/src/test/java/org/elasticsearch/watcher/test/AbstractWatcherIntegrationTests.java b/src/test/java/org/elasticsearch/watcher/test/AbstractWatcherIntegrationTests.java index 27bce5201aa..62802fa5ca8 100644 --- a/src/test/java/org/elasticsearch/watcher/test/AbstractWatcherIntegrationTests.java +++ b/src/test/java/org/elasticsearch/watcher/test/AbstractWatcherIntegrationTests.java @@ -91,6 +91,8 @@ public abstract class AbstractWatcherIntegrationTests extends ElasticsearchInteg @Override protected Settings nodeSettings(int nodeOrdinal) { + String scheduleImplName = scheduleEngine().name().toLowerCase(Locale.ROOT); + logger.info("using schedule engine [" + scheduleImplName + "]"); return ImmutableSettings.builder() .put(super.nodeSettings(nodeOrdinal)) .put("scroll.size", randomIntBetween(1, 100)) @@ -99,7 +101,7 @@ public abstract class AbstractWatcherIntegrationTests extends ElasticsearchInteg (shieldEnabled ? ShieldPlugin.class.getName() + "," : "") + licensePluginClass().getName()) .put(ShieldSettings.settings(shieldEnabled)) - .put("watcher.trigger.schedule.engine", scheduleEngine().name().toLowerCase(Locale.ROOT)) + .put("watcher.trigger.schedule.engine", scheduleImplName) .build(); } diff --git a/src/test/java/org/elasticsearch/watcher/test/TimeWarpedWatcherPlugin.java b/src/test/java/org/elasticsearch/watcher/test/TimeWarpedWatcherPlugin.java index 241be14a928..e9a247574e1 100644 --- a/src/test/java/org/elasticsearch/watcher/test/TimeWarpedWatcherPlugin.java +++ b/src/test/java/org/elasticsearch/watcher/test/TimeWarpedWatcherPlugin.java @@ -11,6 +11,7 @@ import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.watcher.WatcherPlugin; import org.elasticsearch.watcher.execution.ExecutionModule; +import org.elasticsearch.watcher.execution.SyncTriggerListener; import org.elasticsearch.watcher.execution.WatchExecutor; import org.elasticsearch.watcher.support.clock.Clock; import org.elasticsearch.watcher.support.clock.ClockMock; @@ -101,7 +102,7 @@ public class TimeWarpedWatcherPlugin extends WatcherPlugin { public static class MockExecutionModule extends ExecutionModule { public MockExecutionModule() { - super(SameThreadExecutor.class); + super(SameThreadExecutor.class, SyncTriggerListener.class); } public static class SameThreadExecutor implements WatchExecutor { diff --git a/src/test/java/org/elasticsearch/watcher/test/WatchExecutionContextMockBuilder.java b/src/test/java/org/elasticsearch/watcher/test/WatchExecutionContextMockBuilder.java index ce6423ece1d..e644a45d8e9 100644 --- a/src/test/java/org/elasticsearch/watcher/test/WatchExecutionContextMockBuilder.java +++ b/src/test/java/org/elasticsearch/watcher/test/WatchExecutionContextMockBuilder.java @@ -36,7 +36,7 @@ public class WatchExecutionContextMockBuilder { when(ctx.watch()).thenReturn(watch); payload(Collections.emptyMap()); metadata(Collections.emptyMap()); - time(DateTime.now(UTC)); + time(watchId, DateTime.now(UTC)); } public WatchExecutionContextMockBuilder wid(Wid wid) { @@ -57,8 +57,8 @@ public class WatchExecutionContextMockBuilder { return this; } - public WatchExecutionContextMockBuilder time(DateTime time) { - return executionTime(time).triggerEvent(new ScheduleTriggerEvent(time, time)); + public WatchExecutionContextMockBuilder time(String watchId, DateTime time) { + return executionTime(time).triggerEvent(new ScheduleTriggerEvent(watchId, time, time)); } public WatchExecutionContextMockBuilder executionTime(DateTime time) { diff --git a/src/test/java/org/elasticsearch/watcher/test/WatcherTestUtils.java b/src/test/java/org/elasticsearch/watcher/test/WatcherTestUtils.java index e3cc594ef13..4aea05d59f3 100644 --- a/src/test/java/org/elasticsearch/watcher/test/WatcherTestUtils.java +++ b/src/test/java/org/elasticsearch/watcher/test/WatcherTestUtils.java @@ -109,7 +109,7 @@ public final class WatcherTestUtils { public static WatchExecutionContext mockExecutionContext(String watchId, DateTime time, Payload payload) { return mockExecutionContextBuilder(watchId) .payload(payload) - .time(time) + .time(watchId, time) .buildMock(); } diff --git a/src/test/java/org/elasticsearch/watcher/test/bench/ScheduleEngineTriggerBenchmark.java b/src/test/java/org/elasticsearch/watcher/test/bench/ScheduleEngineTriggerBenchmark.java index 040c4204b40..7bf7e4b1bce 100644 --- a/src/test/java/org/elasticsearch/watcher/test/bench/ScheduleEngineTriggerBenchmark.java +++ b/src/test/java/org/elasticsearch/watcher/test/bench/ScheduleEngineTriggerBenchmark.java @@ -12,10 +12,8 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.watcher.support.clock.SystemClock; import org.elasticsearch.watcher.trigger.Trigger; import org.elasticsearch.watcher.trigger.TriggerEngine; -import org.elasticsearch.watcher.trigger.schedule.Schedule; -import org.elasticsearch.watcher.trigger.schedule.ScheduleRegistry; -import org.elasticsearch.watcher.trigger.schedule.ScheduleTrigger; -import org.elasticsearch.watcher.trigger.schedule.ScheduleTriggerEngine; +import org.elasticsearch.watcher.trigger.TriggerEvent; +import org.elasticsearch.watcher.trigger.schedule.*; import org.elasticsearch.watcher.trigger.schedule.engine.*; import org.quartz.JobExecutionContext; @@ -102,9 +100,12 @@ public class ScheduleEngineTriggerBenchmark { scheduler = new SimpleTickerScheduleTriggerEngine(settings, SystemClock.INSTANCE, scheduleRegistry, threadPool) { @Override - protected void notifyListeners(String name, long triggeredTime, long scheduledTime) { + protected void notifyListeners(List events) { if (running.get()) { - measure(total, triggerMetric, tooEarlyMetric, triggeredTime, scheduledTime); + for (TriggerEvent event : events) { + ScheduleTriggerEvent scheduleTriggerEvent = (ScheduleTriggerEvent) event; + measure(total, triggerMetric, tooEarlyMetric, event.triggeredTime().getMillis(), scheduleTriggerEvent.scheduledTime().getMillis()); + } } } }; @@ -113,9 +114,12 @@ public class ScheduleEngineTriggerBenchmark { scheduler = new TimerTickerScheduleTriggerEngine(settings, SystemClock.INSTANCE, scheduleRegistry, threadPool) { @Override - protected void notifyListeners(String name, long triggeredTime, long scheduledTime) { + protected void notifyListeners(List events) { if (running.get()) { - measure(total, triggerMetric, tooEarlyMetric, triggeredTime, scheduledTime); + for (TriggerEvent event : events) { + ScheduleTriggerEvent scheduleTriggerEvent = (ScheduleTriggerEvent) event; + measure(total, triggerMetric, tooEarlyMetric, event.triggeredTime().getMillis(), scheduleTriggerEvent.scheduledTime().getMillis()); + } } } }; diff --git a/src/test/java/org/elasticsearch/watcher/test/integration/BootStrapTests.java b/src/test/java/org/elasticsearch/watcher/test/integration/BootStrapTests.java index 821f6a46d30..ceb82ca20c0 100644 --- a/src/test/java/org/elasticsearch/watcher/test/integration/BootStrapTests.java +++ b/src/test/java/org/elasticsearch/watcher/test/integration/BootStrapTests.java @@ -105,7 +105,7 @@ public class BootStrapTests extends AbstractWatcherIntegrationTests { assertThat(indexResponse.isCreated(), is(true)); DateTime now = DateTime.now(UTC); - ScheduleTriggerEvent event = new ScheduleTriggerEvent(now, now); + ScheduleTriggerEvent event = new ScheduleTriggerEvent(watch.name(), now, now); Wid wid = new Wid("_record", randomLong(), DateTime.now(UTC)); WatchRecord watchRecord = new WatchRecord(wid, watch, event); String actionHistoryIndex = HistoryStore.getHistoryIndexNameForTime(now); @@ -164,7 +164,7 @@ public class BootStrapTests extends AbstractWatcherIntegrationTests { PutWatchResponse putWatchResponse = watcherClient().preparePutWatch(watch.name()).setSource(jsonBuilder.bytes()).get(); assertThat(putWatchResponse.isCreated(), is(true)); - ScheduleTriggerEvent event = new ScheduleTriggerEvent(historyIndexDate, historyIndexDate); + ScheduleTriggerEvent event = new ScheduleTriggerEvent(watch.name(), historyIndexDate, historyIndexDate); Wid wid = new Wid("record_" + i, randomLong(), DateTime.now(UTC)); WatchRecord watchRecord = new WatchRecord(wid, watch, event); diff --git a/src/test/java/org/elasticsearch/watcher/test/integration/HttpInputIntegrationTest.java b/src/test/java/org/elasticsearch/watcher/test/integration/HttpInputIntegrationTest.java index 50a69e60d12..d40ac928739 100644 --- a/src/test/java/org/elasticsearch/watcher/test/integration/HttpInputIntegrationTest.java +++ b/src/test/java/org/elasticsearch/watcher/test/integration/HttpInputIntegrationTest.java @@ -15,7 +15,6 @@ import org.elasticsearch.watcher.client.WatcherClient; import org.elasticsearch.watcher.history.HistoryStore; import org.elasticsearch.watcher.support.http.HttpRequestTemplate; import org.elasticsearch.watcher.support.http.auth.BasicAuth; -import org.elasticsearch.watcher.support.init.proxy.ScriptServiceProxy; import org.elasticsearch.watcher.support.template.Template; import org.elasticsearch.watcher.test.AbstractWatcherIntegrationTests; import org.elasticsearch.watcher.trigger.schedule.IntervalSchedule; @@ -85,7 +84,6 @@ public class HttpInputIntegrationTest extends AbstractWatcherIntegrationTests { client().prepareIndex("idx", "type").setSource("field", "value").get(); refresh(); - ScriptServiceProxy sc = scriptService(); InetSocketAddress address = internalTestCluster().httpAddresses()[0]; XContentBuilder body = jsonBuilder().prettyPrint().startObject() .field("query").value(termQuery("field", "value")) @@ -99,7 +97,7 @@ public class HttpInputIntegrationTest extends AbstractWatcherIntegrationTests { watcherClient.preparePutWatch("_name1") .setSource(watchBuilder() - .trigger(schedule(interval(5, IntervalSchedule.Interval.Unit.SECONDS))) + .trigger(schedule(interval(10, IntervalSchedule.Interval.Unit.SECONDS))) .input(httpInput(requestBuilder).extractKeys("hits.total")) .condition(scriptCondition("ctx.payload.hits.total == 1"))) .get(); @@ -107,7 +105,7 @@ public class HttpInputIntegrationTest extends AbstractWatcherIntegrationTests { // in this watcher the condition will fail, because max_score isn't extracted, only total: watcherClient.preparePutWatch("_name2") .setSource(watchBuilder() - .trigger(schedule(interval(5, IntervalSchedule.Interval.Unit.SECONDS))) + .trigger(schedule(interval(10, IntervalSchedule.Interval.Unit.SECONDS))) .input(httpInput(requestBuilder).extractKeys("hits.total")) .condition(scriptCondition("ctx.payload.hits.max_score >= 0"))) .get(); @@ -116,6 +114,8 @@ public class HttpInputIntegrationTest extends AbstractWatcherIntegrationTests { timeWarp().scheduler().trigger("_name1"); timeWarp().scheduler().trigger("_name2"); refresh(); + } else { + Thread.sleep(10000); } assertWatchWithMinimumPerformedActionsCount("_name1", 1, false); diff --git a/src/test/java/org/elasticsearch/watcher/test/integration/WatchThrottleTests.java b/src/test/java/org/elasticsearch/watcher/test/integration/WatchThrottleTests.java index 6e6836db760..0d3deda2020 100644 --- a/src/test/java/org/elasticsearch/watcher/test/integration/WatchThrottleTests.java +++ b/src/test/java/org/elasticsearch/watcher/test/integration/WatchThrottleTests.java @@ -137,7 +137,7 @@ public class WatchThrottleTests extends AbstractWatcherIntegrationTests { .condition(scriptCondition("ctx.payload.hits.total > 0")) .transform(searchTransform(matchAllRequest().indices("events"))) .addAction("_id", indexAction("actions", "action")) - .throttlePeriod(TimeValue.timeValueSeconds(10))) + .throttlePeriod(TimeValue.timeValueSeconds(30))) .get(); assertThat(putWatchResponse.isCreated(), is(true)); @@ -159,7 +159,7 @@ public class WatchThrottleTests extends AbstractWatcherIntegrationTests { actionsCount = docCount("actions", "action", matchAllQuery()); assertThat(actionsCount, is(1L)); - timeWarp().clock().fastForwardSeconds(10); + timeWarp().clock().fastForwardSeconds(30); timeWarp().scheduler().trigger("_name"); refresh(); @@ -172,23 +172,30 @@ public class WatchThrottleTests extends AbstractWatcherIntegrationTests { assertThat(throttledCount, is(1L)); } else { - - Thread.sleep(TimeUnit.SECONDS.toMillis(2)); - refresh(); - - // the first fire should work so we should have a single action in the actions index - long actionsCount = docCount("actions", "action", matchAllQuery()); - assertThat(actionsCount, is(1L)); - Thread.sleep(TimeUnit.SECONDS.toMillis(5)); - + // the first fire should work so we should have a single action in the actions index + assertBusy(new Runnable() { + @Override + public void run() { + refresh(); + long actionsCount = docCount("actions", "action", matchAllQuery()); + assertThat(actionsCount, is(1L)); + } + }, 5, TimeUnit.SECONDS); + Thread.sleep(TimeUnit.SECONDS.toMillis(5)); // we should still be within the throttling period... so the number of actions shouldn't change - actionsCount = docCount("actions", "action", matchAllQuery()); - assertThat(actionsCount, is(1L)); + assertBusy(new Runnable() { + @Override + public void run() { + refresh(); + long actionsCount = docCount("actions", "action", matchAllQuery()); + assertThat(actionsCount, is(1L)); - long throttledCount = docCount(HistoryStore.INDEX_PREFIX + "*", null, - matchQuery(WatchRecord.Parser.STATE_FIELD.getPreferredName(), WatchRecord.State.THROTTLED.id())); - assertThat(throttledCount, greaterThanOrEqualTo(1L)); + long throttledCount = docCount(HistoryStore.INDEX_PREFIX + "*", null, + matchQuery(WatchRecord.Parser.STATE_FIELD.getPreferredName(), WatchRecord.State.THROTTLED.id())); + assertThat(throttledCount, greaterThanOrEqualTo(1L)); + } + }, 5, TimeUnit.SECONDS); } } diff --git a/src/test/java/org/elasticsearch/watcher/transform/SearchTransformTests.java b/src/test/java/org/elasticsearch/watcher/transform/SearchTransformTests.java index 5f6834b1c5e..32b09165142 100644 --- a/src/test/java/org/elasticsearch/watcher/transform/SearchTransformTests.java +++ b/src/test/java/org/elasticsearch/watcher/transform/SearchTransformTests.java @@ -110,7 +110,7 @@ public class SearchTransformTests extends AbstractWatcherIntegrationTests { SearchTransform transform = new SearchTransform(logger, scriptService(), ClientProxy.of(client()), request); - ScheduleTriggerEvent event = new ScheduleTriggerEvent(parseDate("2015-01-04T00:00:00"), parseDate("2015-01-01T00:00:00")); + ScheduleTriggerEvent event = new ScheduleTriggerEvent("_name", parseDate("2015-01-04T00:00:00"), parseDate("2015-01-01T00:00:00")); WatchExecutionContext ctx = mockExecutionContext("_name", parseDate("2015-01-04T00:00:00"), event, EMPTY_PAYLOAD); Payload payload = simplePayload("value", "val_3"); diff --git a/src/test/java/org/elasticsearch/watcher/trigger/ScheduleTriggerEngineMock.java b/src/test/java/org/elasticsearch/watcher/trigger/ScheduleTriggerEngineMock.java index e92fc43f9aa..7c2e2a68668 100644 --- a/src/test/java/org/elasticsearch/watcher/trigger/ScheduleTriggerEngineMock.java +++ b/src/test/java/org/elasticsearch/watcher/trigger/ScheduleTriggerEngineMock.java @@ -5,6 +5,7 @@ */ package org.elasticsearch.watcher.trigger; +import org.elasticsearch.common.collect.ImmutableList; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.joda.time.DateTime; import org.elasticsearch.common.logging.ESLogger; @@ -81,9 +82,9 @@ public class ScheduleTriggerEngineMock extends ScheduleTriggerEngine { for (int i = 0; i < times; i++) { DateTime now = clock.now(); logger.debug("firing [" + jobName + "] at [" + now + "]"); - ScheduleTriggerEvent event = new ScheduleTriggerEvent(now, now); + ScheduleTriggerEvent event = new ScheduleTriggerEvent(jobName, now, now); for (Listener listener : listeners) { - listener.triggered(jobName, event); + listener.triggered(ImmutableList.of(event)); } if (interval != null) { if (clock instanceof ClockMock) { diff --git a/src/test/java/org/elasticsearch/watcher/trigger/schedule/quartz/QuartzScheduleEngineTests.java b/src/test/java/org/elasticsearch/watcher/trigger/schedule/quartz/QuartzScheduleEngineTests.java index 3f11f5cf754..262d3291220 100644 --- a/src/test/java/org/elasticsearch/watcher/trigger/schedule/quartz/QuartzScheduleEngineTests.java +++ b/src/test/java/org/elasticsearch/watcher/trigger/schedule/quartz/QuartzScheduleEngineTests.java @@ -74,15 +74,18 @@ public class QuartzScheduleEngineTests extends ElasticsearchTestCase { } final BitSet bits = new BitSet(count); engine.register(new TriggerEngine.Listener() { + @Override - public void triggered(String jobName, TriggerEvent event) { - int index = Integer.parseInt(jobName); - if (!bits.get(index)) { - logger.info("job [" + index + "] first fire: " + new DateTime()); - bits.set(index); - } else { - latch.countDown(); - logger.info("job [" + index + "] second fire: " + new DateTime()); + public void triggered(Iterable events) { + for (TriggerEvent event : events) { + int index = Integer.parseInt(event.jobName()); + if (!bits.get(index)) { + logger.info("job [" + index + "] first fire: " + new DateTime()); + bits.set(index); + } else { + latch.countDown(); + logger.info("job [" + index + "] second fire: " + new DateTime()); + } } } }); @@ -100,11 +103,14 @@ public class QuartzScheduleEngineTests extends ElasticsearchTestCase { final CountDownLatch latch = new CountDownLatch(1); engine.start(Collections.emptySet()); engine.register(new TriggerEngine.Listener() { + @Override - public void triggered(String jobName, TriggerEvent event) { - assertThat(jobName, is(name)); - logger.info("triggered job on [{}]", new DateTime()); - latch.countDown(); + public void triggered(Iterable events) { + for (TriggerEvent event : events) { + assertThat(event.jobName(), is(name)); + logger.info("triggered job on [{}]", new DateTime()); + latch.countDown(); + } } }); DateTime now = new DateTime(DateTimeZone.UTC); @@ -131,11 +137,14 @@ public class QuartzScheduleEngineTests extends ElasticsearchTestCase { final CountDownLatch latch = new CountDownLatch(1); engine.start(Collections.emptySet()); engine.register(new TriggerEngine.Listener() { + @Override - public void triggered(String jobName, TriggerEvent event) { - assertThat(jobName, is(name)); - logger.info("triggered job on [{}]", new DateTime()); - latch.countDown(); + public void triggered(Iterable events) { + for (TriggerEvent event : events) { + assertThat(event.jobName(), is(name)); + logger.info("triggered job on [{}]", new DateTime()); + latch.countDown(); + } } }); DateTime now = new DateTime(DateTimeZone.UTC); @@ -164,11 +173,14 @@ public class QuartzScheduleEngineTests extends ElasticsearchTestCase { final CountDownLatch latch = new CountDownLatch(1); engine.start(Collections.emptySet()); engine.register(new TriggerEngine.Listener() { + @Override - public void triggered(String jobName, TriggerEvent event) { - assertThat(jobName, is(name)); - logger.info("triggered job on [{}]", new DateTime()); - latch.countDown(); + public void triggered(Iterable events) { + for (TriggerEvent event : events) { + assertThat(event.jobName(), is(name)); + logger.info("triggered job on [{}]", new DateTime()); + latch.countDown(); + } } }); DateTime now = new DateTime(DateTimeZone.UTC);