* Doubled the watcher executor thread pool size.

* Tune the index templates for higher index throughput.
* Trigger events should be processed and indexed in an async manner.
* Moved the job name into the TriggerEvent
* Added support for fire multiple events at the same time. If multiple events are fired at the same time then use async bulk to persist the watch record.
* Cut simple ticker and timer ticker over to fire multiple events at the same time
* Don't fire more than 1000 events at the time. (this also may result in large bulk requests)

Original commit: elastic/x-pack-elasticsearch@c7f6bd3812
This commit is contained in:
Martijn van Groningen 2015-04-16 12:25:31 +02:00
parent c8a0c27934
commit a68db406e2
41 changed files with 482 additions and 201 deletions

View File

@ -0,0 +1,30 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.watcher.execution;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.watcher.trigger.TriggerEngine;
import org.elasticsearch.watcher.trigger.TriggerEvent;
import org.elasticsearch.watcher.trigger.TriggerService;
/**
*/
public class AsyncTriggerListener implements TriggerEngine.Listener {
private final ExecutionService executionService;
@Inject
public AsyncTriggerListener(ExecutionService executionService, TriggerService triggerService) {
this.executionService = executionService;
triggerService.register(this);
}
@Override
public void triggered(Iterable<TriggerEvent> events) {
executionService.processEventsAsync(events);
}
}

View File

@ -6,25 +6,29 @@
package org.elasticsearch.watcher.execution;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.watcher.trigger.TriggerEngine;
/**
*/
public class ExecutionModule extends AbstractModule {
private final Class<? extends WatchExecutor> executorClass;
private final Class<? extends TriggerEngine.Listener> triggerEngineListenerClass;
public ExecutionModule() {
this(InternalWatchExecutor.class);
this(InternalWatchExecutor.class, AsyncTriggerListener.class);
}
protected ExecutionModule(Class<? extends WatchExecutor> executorClass) {
protected ExecutionModule(Class<? extends WatchExecutor> executorClass, Class<? extends TriggerEngine.Listener> triggerEngineListenerClass) {
this.executorClass = executorClass;
this.triggerEngineListenerClass = triggerEngineListenerClass;
}
@Override
protected void configure() {
bind(ExecutionService.class).asEagerSingleton();
bind(executorClass).asEagerSingleton();
bind(triggerEngineListenerClass).asEagerSingleton();
bind(WatchExecutor.class).to(executorClass);
}
}

View File

@ -6,12 +6,15 @@
package org.elasticsearch.watcher.execution;
import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.joda.time.DateTime;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.watcher.WatcherException;
@ -24,9 +27,7 @@ import org.elasticsearch.watcher.support.Callback;
import org.elasticsearch.watcher.support.clock.Clock;
import org.elasticsearch.watcher.throttle.Throttler;
import org.elasticsearch.watcher.transform.Transform;
import org.elasticsearch.watcher.trigger.TriggerEngine;
import org.elasticsearch.watcher.trigger.TriggerEvent;
import org.elasticsearch.watcher.trigger.TriggerService;
import org.elasticsearch.watcher.watch.Watch;
import org.elasticsearch.watcher.watch.WatchExecution;
import org.elasticsearch.watcher.watch.WatchLockService;
@ -35,6 +36,7 @@ import org.elasticsearch.watcher.watch.WatchStore;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@ -54,16 +56,14 @@ public class ExecutionService extends AbstractComponent {
private final AtomicInteger initializationRetries = new AtomicInteger();
@Inject
public ExecutionService(Settings settings, HistoryStore historyStore, WatchExecutor executor,
WatchStore watchStore, WatchLockService watchLockService, TriggerService triggerService,
ClusterService clusterService, Clock clock) {
public ExecutionService(Settings settings, HistoryStore historyStore, WatchExecutor executor, WatchStore watchStore,
WatchLockService watchLockService, ClusterService clusterService, Clock clock) {
super(settings);
this.historyStore = historyStore;
this.executor = executor;
this.watchStore = watchStore;
this.watchLockService = watchLockService;
this.clusterService = clusterService;
triggerService.register(new SchedulerListener());
this.clock = clock;
}
@ -113,6 +113,100 @@ public class ExecutionService extends AbstractComponent {
return executor.largestPoolSize();
}
public void processEventsAsync(Iterable<TriggerEvent> events) throws WatcherException {
if (!started.get()) {
throw new ElasticsearchIllegalStateException("not started");
}
final LinkedList<WatchRecord> records = new LinkedList<>();
final LinkedList<TriggeredExecutionContext> contexts = new LinkedList<>();
DateTime now = clock.now();
for (TriggerEvent event : events) {
Watch watch = watchStore.get(event.jobName());
if (watch == null) {
logger.warn("unable to find watch [{}] in the watch store, perhaps it has been deleted", event.jobName());
continue;
}
TriggeredExecutionContext ctx = new TriggeredExecutionContext(watch, now, event);
contexts.add(ctx);
records.add(new WatchRecord(ctx.id(), watch, event));
}
logger.debug("saving watch records [{}]", records.size());
if (records.size() == 1) {
final WatchRecord watchRecord = records.getFirst();
final TriggeredExecutionContext ctx = contexts.getFirst();
historyStore.putAsync(watchRecord, new ActionListener<Boolean>() {
@Override
public void onResponse(Boolean aBoolean) {
executeAsync(ctx, watchRecord);
}
@Override
public void onFailure(Throwable e) {
Throwable cause = ExceptionsHelper.unwrapCause(e);
if (cause instanceof EsRejectedExecutionException) {
logger.debug("Failed to store watch record {} due to overloaded threadpool: {}", watchRecord, ExceptionsHelper.detailedMessage(e));
} else {
logger.warn("Failed to store watch record: {}", e, watchRecord);
}
}
});
} else {
historyStore.bulkAsync(records, new ActionListener<List<Integer>>() {
@Override
public void onResponse(List<Integer> successFullSlots) {
for (Integer slot : successFullSlots) {
executeAsync(contexts.get(slot), records.get(slot));
}
}
@Override
public void onFailure(Throwable e) {
Throwable cause = ExceptionsHelper.unwrapCause(e);
if (cause instanceof EsRejectedExecutionException) {
logger.debug("Failed to store watch records due to overloaded threadpool: {}", ExceptionsHelper.detailedMessage(e));
} else {
logger.warn("Failed to store watch records", e);
}
}
});
}
}
public void processEventsSync(Iterable<TriggerEvent> events) throws WatcherException {
if (!started.get()) {
throw new ElasticsearchIllegalStateException("not started");
}
final LinkedList<WatchRecord> records = new LinkedList<>();
final LinkedList<TriggeredExecutionContext> contexts = new LinkedList<>();
DateTime now = clock.now();
for (TriggerEvent event : events) {
Watch watch = watchStore.get(event.jobName());
if (watch == null) {
logger.warn("unable to find watch [{}] in the watch store, perhaps it has been deleted", event.jobName());
continue;
}
TriggeredExecutionContext ctx = new TriggeredExecutionContext(watch, now, event);
contexts.add(ctx);
records.add(new WatchRecord(ctx.id(), watch, event));
}
logger.debug("saving watch records [{}]", records.size());
if (records.size() == 1) {
final WatchRecord watchRecord = records.getFirst();
final TriggeredExecutionContext ctx = contexts.getFirst();
historyStore.put(watchRecord);
executeAsync(ctx, watchRecord);
} else {
List<Integer> slots = historyStore.bulk(records);
for (Integer slot : slots) {
executeAsync(contexts.get(slot), records.get(slot));
}
}
}
public WatchRecord execute(WatchExecutionContext ctx) throws IOException {
WatchRecord watchRecord = new WatchRecord(ctx.id(), ctx.watch(), ctx.triggerEvent());
@ -150,21 +244,6 @@ public class ExecutionService extends AbstractComponent {
}
}
private void executeWatch(Watch watch, TriggerEvent event) throws WatcherException {
if (!started.get()) {
throw new ElasticsearchIllegalStateException("not started");
}
TriggeredExecutionContext ctx = new TriggeredExecutionContext(watch, clock.now(), event);
WatchRecord watchRecord = new WatchRecord(ctx.id(), watch, event);
if (ctx.recordExecution()) {
logger.debug("saving watch record [{}] for watch [{}]", watchRecord.id(), watch.name());
historyStore.put(watchRecord);
}
executeAsync(ctx, watchRecord);
}
WatchExecution executeInner(WatchExecutionContext ctx) throws IOException {
Watch watch = ctx.watch();
@ -274,14 +353,15 @@ public class ExecutionService extends AbstractComponent {
watchStore.updateStatus(ctx.watch());
} catch (Exception e) {
if (started()) {
logger.warn("failed to execute watch [{}] [{}]", e, watchRecord.name(), ctx.id());
String detailedMessage = ExceptionsHelper.detailedMessage(e);
logger.warn("failed to execute watch [{}] [{}], failure [{}]", watchRecord.name(), ctx.id(), detailedMessage);
try {
watchRecord.update(WatchRecord.State.FAILED, e.getMessage());
watchRecord.update(WatchRecord.State.FAILED, detailedMessage);
if (ctx.recordExecution()) {
historyStore.update(watchRecord);
}
} catch (Exception e2) {
logger.error("failed to update watch record [{}] failure [{}] for [{}] [{}]", e2, watchRecord, ctx.watch().name(), ctx.id(), e.getMessage());
logger.error("failed to update watch record [{}], failure [{}], original failure [{}]", watchRecord, ExceptionsHelper.detailedMessage(e2), detailedMessage);
}
} else {
logger.debug("failed to execute watch [{}] after shutdown", e, watchRecord);
@ -293,28 +373,4 @@ public class ExecutionService extends AbstractComponent {
}
}
private class SchedulerListener implements TriggerEngine.Listener {
@Override
public void triggered(String name, TriggerEvent event) {
if (!started.get()) {
throw new ElasticsearchIllegalStateException("not started");
}
Watch watch = watchStore.get(name);
if (watch == null) {
logger.warn("unable to find watch [{}] in the watch store, perhaps it has been deleted", name);
return;
}
try {
ExecutionService.this.executeWatch(watch, event);
} catch (Exception e) {
if (started()) {
logger.error("failed to execute watch from SchedulerListener [{}]", e, name);
} else {
logger.debug("failed to execute watch from SchedulerListener [{}] after shutdown", e, name);
}
}
}
}
}

View File

@ -121,7 +121,7 @@ public class ManualExecutionContext extends WatchExecutionContext {
executionTime = DateTime.now(DateTimeZone.UTC);
}
if (triggerEvent == null) {
triggerEvent = new ManualTriggerEvent(executionTime, new HashMap<String, Object>());
triggerEvent = new ManualTriggerEvent(watch.name(), executionTime, new HashMap<String, Object>());
}
return new ManualExecutionContext(watch, executionTime, triggerEvent, inputResult, conditionResult, throttlerResult, simulateActionPredicate, recordExecution);
}

View File

@ -0,0 +1,30 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.watcher.execution;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.watcher.trigger.TriggerEngine;
import org.elasticsearch.watcher.trigger.TriggerEvent;
import org.elasticsearch.watcher.trigger.TriggerService;
/**
*/
public class SyncTriggerListener implements TriggerEngine.Listener {
private final ExecutionService executionService;
@Inject
public SyncTriggerListener(ExecutionService executionService, TriggerService triggerService) {
this.executionService = executionService;
triggerService.register(this);
}
@Override
public void triggered(Iterable<TriggerEvent> events) {
executionService.processEventsSync(events);
}
}

View File

@ -5,8 +5,12 @@
*/
package org.elasticsearch.watcher.history;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchRequest;
@ -104,6 +108,94 @@ public class HistoryStore extends AbstractComponent {
}
}
public void putAsync(final WatchRecord watchRecord, final ActionListener<Boolean> listener) throws HistoryException {
String index = getHistoryIndexNameForTime(watchRecord.triggerEvent().triggeredTime());
try {
IndexRequest request = new IndexRequest(index, DOC_TYPE, watchRecord.id().value())
.source(XContentFactory.jsonBuilder().value(watchRecord))
.opType(IndexRequest.OpType.CREATE);
client.indexAsync(request, new ActionListener<IndexResponse>() {
@Override
public void onResponse(IndexResponse response) {
watchRecord.version(response.getVersion());
listener.onResponse(true);
}
@Override
public void onFailure(Throwable e) {
listener.onFailure(e);
}
});
} catch (IOException e) {
throw new HistoryException("failed to persist watch record [" + watchRecord + "]", e);
}
}
public void bulkAsync(final List<WatchRecord> records, final ActionListener<List<Integer>> listener) throws HistoryException {
try {
BulkRequest request = new BulkRequest();
for (WatchRecord record : records) {
String index = getHistoryIndexNameForTime(record.triggerEvent().triggeredTime());
IndexRequest indexRequest = new IndexRequest(index, DOC_TYPE, record.id().value());
indexRequest.source(XContentFactory.jsonBuilder().value(record));
indexRequest.opType(IndexRequest.OpType.CREATE);
request.add(indexRequest);
}
client.bulkAsync(request, new ActionListener<BulkResponse>() {
@Override
public void onResponse(BulkResponse response) {
List<Integer> successFullSlots = new ArrayList<Integer>();
for (int i = 0; i < response.getItems().length; i++) {
BulkItemResponse itemResponse = response.getItems()[i];
if (itemResponse.isFailed()) {
logger.error("could store watch record with id [" + itemResponse.getId() + "], because failed [" + itemResponse.getFailureMessage() + "]");
} else {
IndexResponse indexResponse = itemResponse.getResponse();
records.get(i).version(indexResponse.getVersion());
successFullSlots.add(i);
}
}
listener.onResponse(successFullSlots);
}
@Override
public void onFailure(Throwable e) {
listener.onFailure(e);
}
});
} catch (IOException e) {
throw new HistoryException("failed to persist watch records", e);
}
}
public List<Integer> bulk(final List<WatchRecord> records) throws HistoryException {
try {
BulkRequest request = new BulkRequest();
for (WatchRecord record : records) {
String index = getHistoryIndexNameForTime(record.triggerEvent().triggeredTime());
IndexRequest indexRequest = new IndexRequest(index, DOC_TYPE, record.id().value());
indexRequest.source(XContentFactory.jsonBuilder().value(record));
indexRequest.opType(IndexRequest.OpType.CREATE);
request.add(indexRequest);
}
BulkResponse response = client.bulk(request);
List<Integer> successFullSlots = new ArrayList<>();
for (int i = 0; i < response.getItems().length; i++) {
BulkItemResponse itemResponse = response.getItems()[i];
if (itemResponse.isFailed()) {
logger.error("could store watch record with id [" + itemResponse.getId() + "], because failed [" + itemResponse.getFailureMessage() + "]");
} else {
IndexResponse indexResponse = itemResponse.getResponse();
records.get(i).version(indexResponse.getVersion());
successFullSlots.add(i);
}
}
return successFullSlots;
} catch (IOException e) {
throw new HistoryException("failed to persist watch records", e);
}
}
public void update(WatchRecord watchRecord) throws HistoryException {
if (!started.get()) {
throw new HistoryException("unable to persist watch record history store is not ready");

View File

@ -6,8 +6,11 @@
package org.elasticsearch.watcher.support.init.proxy;
import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.index.IndexRequest;
@ -58,6 +61,21 @@ public class ClientProxy implements InitializingService.Initializable {
return client.index(preProcess(request)).actionGet();
}
public BulkResponse bulk(BulkRequest request) {
request.listenerThreaded(true);
return client.bulk(request).actionGet();
}
public void indexAsync(IndexRequest request, ActionListener<IndexResponse> listener) {
request.listenerThreaded(true);
client.index(request, listener);
}
public void bulkAsync(BulkRequest request, ActionListener<BulkResponse> listener) {
request.listenerThreaded(true);
client.bulk(request, listener);
}
public ActionFuture<DeleteResponse> delete(DeleteRequest request) {
return client.delete(preProcess(request));
}

View File

@ -89,7 +89,7 @@ public class TransportExecuteWatchAction extends WatcherTransportAction<ExecuteW
}
}
if (request.getTriggerData() != null) {
ctxBuilder.triggerEvent(new ManualTriggerEvent(executionTime, request.getTriggerData()));
ctxBuilder.triggerEvent(new ManualTriggerEvent(watch.name(), executionTime, request.getTriggerData()));
}
if (request.getAlternativeInput() != null) {
ctxBuilder.withInput(new SimpleInput.Result(new Payload.Simple(request.getAlternativeInput())));

View File

@ -43,7 +43,8 @@ public interface TriggerEngine<T extends Trigger, E extends TriggerEvent> {
public static interface Listener {
void triggered(String jobName, TriggerEvent event);
void triggered(Iterable<TriggerEvent> events);
}
public static interface Job {

View File

@ -8,10 +8,7 @@ package org.elasticsearch.watcher.trigger;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.joda.time.DateTime;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.watcher.support.WatcherDateUtils;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
@ -21,16 +18,23 @@ import java.util.Map;
public abstract class TriggerEvent implements ToXContent {
public static final ParseField TRIGGERED_TIME_FIELD = new ParseField("triggered_time");
public static final ParseField JOB_NAME_FIELD = new ParseField("job_name");
private final String jobName;
protected final DateTime triggeredTime;
protected final Map<String, Object> data;
public TriggerEvent(DateTime triggeredTime) {
public TriggerEvent(String jobName, DateTime triggeredTime) {
this.jobName = jobName;
this.triggeredTime = triggeredTime;
this.data = new HashMap<>();
data.put(TRIGGERED_TIME_FIELD.getPreferredName(), triggeredTime);
}
public String jobName() {
return jobName;
}
public abstract String type();
public DateTime triggeredTime() {

View File

@ -130,9 +130,9 @@ public class TriggerService extends AbstractComponent {
}
@Override
public void triggered(String jobName, TriggerEvent event) {
public void triggered(Iterable<TriggerEvent> events) {
for (TriggerEngine.Listener listener : listeners) {
listener.triggered(jobName, event);
listener.triggered(events);
}
}
}

View File

@ -25,8 +25,8 @@ public class ManualTriggerEvent extends TriggerEvent {
private final Map<String, Object> triggerData;
public ManualTriggerEvent(DateTime triggeredTime, Map<String, Object> triggerData) {
super(triggeredTime);
public ManualTriggerEvent(String jobName, DateTime triggeredTime, Map<String, Object> triggerData) {
super(jobName, triggeredTime);
data.putAll(triggerData);
this.triggerData = triggerData;
}
@ -39,12 +39,14 @@ public class ManualTriggerEvent extends TriggerEvent {
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
return builder.startObject()
.field(JOB_NAME_FIELD.getPreferredName(), jobName())
.field(TRIGGERED_TIME_FIELD.getPreferredName(), WatcherDateUtils.formatDate(triggeredTime))
.field(TRIGGER_DATA_FIELD.getPreferredName(), triggerData)
.endObject();
}
public static ManualTriggerEvent parse(String context, XContentParser parser) throws IOException {
String jobName = null;
DateTime triggeredTime = null;
Map<String, Object> triggerData = new HashMap<>();
String currentFieldName = null;
@ -54,7 +56,9 @@ public class ManualTriggerEvent extends TriggerEvent {
currentFieldName = parser.currentName();
} else {
if (token == XContentParser.Token.VALUE_STRING) {
if (TRIGGERED_TIME_FIELD.match(currentFieldName)) {
if (JOB_NAME_FIELD.match(currentFieldName)) {
jobName = parser.text();
} else if (TRIGGERED_TIME_FIELD.match(currentFieldName)) {
triggeredTime = WatcherDateUtils.parseDate(parser.text());
} else {
throw new ParseException("could not parse trigger event for [" + context + "]. unknown string value field [" + currentFieldName + "]");
@ -73,7 +77,7 @@ public class ManualTriggerEvent extends TriggerEvent {
// should never be, it's fully controlled internally (not coming from the user)
assert triggeredTime != null;
return new ManualTriggerEvent(triggeredTime, triggerData);
return new ManualTriggerEvent(jobName, triggeredTime, triggerData);
}
public static class ParseException extends WatcherException {

View File

@ -37,7 +37,7 @@ public class IntervalSchedule implements Schedule {
if (time <= startTime) {
return startTime;
}
// advancing the time in 1 ns (we're looking for the time **after**)
// advancing the time in 1 ms (we're looking for the time **after**)
time += 1;
long delta = time - startTime;
return startTime + (delta / interval.millis + 1) * interval.millis;

View File

@ -24,8 +24,8 @@ public class ScheduleTriggerEvent extends TriggerEvent {
private final DateTime scheduledTime;
public ScheduleTriggerEvent(DateTime triggeredTime, DateTime scheduledTime) {
super(triggeredTime);
public ScheduleTriggerEvent(String jobName, DateTime triggeredTime, DateTime scheduledTime) {
super(jobName, triggeredTime);
this.scheduledTime = scheduledTime;
data.put(SCHEDULED_TIME_FIELD.getPreferredName(), scheduledTime);
}
@ -42,12 +42,14 @@ public class ScheduleTriggerEvent extends TriggerEvent {
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
return builder.startObject()
.field(JOB_NAME_FIELD.getPreferredName(), jobName())
.field(TRIGGERED_TIME_FIELD.getPreferredName(), WatcherDateUtils.formatDate(triggeredTime))
.field(SCHEDULED_TIME_FIELD.getPreferredName(), WatcherDateUtils.formatDate(scheduledTime))
.endObject();
}
public static ScheduleTriggerEvent parse(String context, XContentParser parser) throws IOException {
String jobName = null;
DateTime triggeredTime = null;
DateTime scheduledTime = null;
@ -58,7 +60,9 @@ public class ScheduleTriggerEvent extends TriggerEvent {
currentFieldName = parser.currentName();
} else {
if (token == XContentParser.Token.VALUE_STRING) {
if (TRIGGERED_TIME_FIELD.match(currentFieldName)) {
if (JOB_NAME_FIELD.match(currentFieldName)) {
jobName = parser.text();
} else if (TRIGGERED_TIME_FIELD.match(currentFieldName)) {
triggeredTime = WatcherDateUtils.parseDate(parser.text());
} else if (SCHEDULED_TIME_FIELD.match(currentFieldName)) {
scheduledTime = WatcherDateUtils.parseDate(parser.text());
@ -72,8 +76,8 @@ public class ScheduleTriggerEvent extends TriggerEvent {
}
// should never be, it's fully controlled internally (not coming from the user)
assert triggeredTime != null && scheduledTime != null;
return new ScheduleTriggerEvent(triggeredTime, scheduledTime);
assert jobName != null && triggeredTime != null && scheduledTime != null;
return new ScheduleTriggerEvent(jobName, triggeredTime, scheduledTime);
}
public static class ParseException extends WatcherException {

View File

@ -5,6 +5,7 @@
*/
package org.elasticsearch.watcher.trigger.schedule.engine;
import org.elasticsearch.common.collect.ImmutableList;
import org.elasticsearch.common.collect.ImmutableMap;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.joda.time.DateTime;
@ -21,6 +22,7 @@ import org.elasticsearch.common.util.concurrent.XRejectedExecutionHandler;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.watcher.support.ThreadPoolSettingsBuilder;
import org.elasticsearch.watcher.support.clock.Clock;
import org.elasticsearch.watcher.trigger.TriggerEvent;
import org.elasticsearch.watcher.trigger.schedule.*;
import java.util.ArrayList;
@ -45,7 +47,7 @@ public class HashWheelScheduleTriggerEngine extends ScheduleTriggerEngine {
}
int availableProcessors = EsExecutors.boundedNumberOfProcessors(settings);
return new ThreadPoolSettingsBuilder.Fixed(THREAD_POOL_NAME)
.size(availableProcessors)
.size(availableProcessors * 2)
.queueSize(1000)
.build();
}
@ -119,10 +121,10 @@ public class HashWheelScheduleTriggerEngine extends ScheduleTriggerEngine {
protected void notifyListeners(String name, long triggeredTime, long scheduledTime) {
logger.trace("triggered job [{}] at [{}] (scheduled time was [{}])", name, new DateTime(triggeredTime), new DateTime(scheduledTime));
final ScheduleTriggerEvent event = new ScheduleTriggerEvent(new DateTime(triggeredTime), new DateTime(scheduledTime));
final ScheduleTriggerEvent event = new ScheduleTriggerEvent(name, new DateTime(triggeredTime), new DateTime(scheduledTime));
for (Listener listener : listeners) {
try {
executor.execute(new ListenerRunnable(listener, name, event));
executor.execute(new ListenerRunnable(listener, event));
} catch (EsRejectedExecutionException e) {
if (logger.isDebugEnabled()) {
RejectedExecutionHandler rejectedExecutionHandler = executor.getRejectedExecutionHandler();
@ -182,18 +184,16 @@ public class HashWheelScheduleTriggerEngine extends ScheduleTriggerEngine {
static class ListenerRunnable implements Runnable {
private final Listener listener;
private final String jobName;
private final ScheduleTriggerEvent event;
public ListenerRunnable(Listener listener, String jobName, ScheduleTriggerEvent event) {
public ListenerRunnable(Listener listener, ScheduleTriggerEvent event) {
this.listener = listener;
this.jobName = jobName;
this.event = event;
}
@Override
public void run() {
listener.triggered(jobName, event);
listener.triggered(ImmutableList.<TriggerEvent>of(event));
}
}

View File

@ -5,6 +5,7 @@
*/
package org.elasticsearch.watcher.trigger.schedule.engine;
import org.elasticsearch.common.collect.ImmutableList;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.joda.time.DateTime;
import org.elasticsearch.common.joda.time.DateTimeZone;
@ -18,6 +19,7 @@ import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.watcher.WatcherSettingsException;
import org.elasticsearch.watcher.support.ThreadPoolSettingsBuilder;
import org.elasticsearch.watcher.support.clock.Clock;
import org.elasticsearch.watcher.trigger.TriggerEvent;
import org.elasticsearch.watcher.trigger.TriggerException;
import org.elasticsearch.watcher.trigger.schedule.*;
import org.quartz.*;
@ -43,7 +45,7 @@ public class QuartzScheduleTriggerEngine extends ScheduleTriggerEngine {
}
int availableProcessors = EsExecutors.boundedNumberOfProcessors(settings);
return new ThreadPoolSettingsBuilder.Fixed(THREAD_POOL_NAME)
.size(availableProcessors)
.size(availableProcessors * 2)
.queueSize(1000)
.build();
}
@ -160,10 +162,10 @@ public class QuartzScheduleTriggerEngine extends ScheduleTriggerEngine {
}
protected void notifyListeners(String name, JobExecutionContext ctx) {
ScheduleTriggerEvent event = new ScheduleTriggerEvent(new DateTime(ctx.getFireTime()), new DateTime(ctx.getScheduledFireTime()));
ScheduleTriggerEvent event = new ScheduleTriggerEvent(name, new DateTime(ctx.getFireTime()), new DateTime(ctx.getScheduledFireTime()));
for (Listener listener : listeners) {
try {
executor.execute(new ListenerRunnable(listener, name, event));
executor.execute(new ListenerRunnable(listener, event));
} catch (EsRejectedExecutionException e) {
if (logger.isDebugEnabled()) {
RejectedExecutionHandler rejectedExecutionHandler = executor.getRejectedExecutionHandler();
@ -181,18 +183,16 @@ public class QuartzScheduleTriggerEngine extends ScheduleTriggerEngine {
static class ListenerRunnable implements Runnable {
private final Listener listener;
private final String jobName;
private final ScheduleTriggerEvent event;
public ListenerRunnable(Listener listener, String jobName, ScheduleTriggerEvent event) {
public ListenerRunnable(Listener listener, ScheduleTriggerEvent event) {
this.listener = listener;
this.jobName = jobName;
this.event = event;
}
@Override
public void run() {
listener.triggered(jobName, event);
listener.triggered(ImmutableList.<TriggerEvent>of(event));
}
}

View File

@ -5,6 +5,7 @@
*/
package org.elasticsearch.watcher.trigger.schedule.engine;
import org.elasticsearch.common.collect.ImmutableList;
import org.elasticsearch.common.collect.ImmutableMap;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.joda.time.DateTime;
@ -17,6 +18,7 @@ import org.elasticsearch.common.util.concurrent.XRejectedExecutionHandler;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.watcher.support.ThreadPoolSettingsBuilder;
import org.elasticsearch.watcher.support.clock.Clock;
import org.elasticsearch.watcher.trigger.TriggerEvent;
import org.elasticsearch.watcher.trigger.schedule.*;
import java.util.ArrayList;
@ -40,7 +42,7 @@ public class SchedulerScheduleTriggerEngine extends ScheduleTriggerEngine {
}
int availableProcessors = EsExecutors.boundedNumberOfProcessors(settings);
return new ThreadPoolSettingsBuilder.Fixed(THREAD_POOL_NAME)
.size(availableProcessors)
.size(availableProcessors * 2)
.queueSize(1000)
.build();
}
@ -106,10 +108,10 @@ public class SchedulerScheduleTriggerEngine extends ScheduleTriggerEngine {
protected void notifyListeners(String name, long triggeredTime, long scheduledTime) {
logger.trace("triggered job [{}] at [{}] (scheduled time was [{}])", name, new DateTime(triggeredTime), new DateTime(scheduledTime));
final ScheduleTriggerEvent event = new ScheduleTriggerEvent(new DateTime(triggeredTime), new DateTime(scheduledTime));
final ScheduleTriggerEvent event = new ScheduleTriggerEvent(name, new DateTime(triggeredTime), new DateTime(scheduledTime));
for (Listener listener : listeners) {
try {
executor.execute(new ListenerRunnable(listener, name, event));
executor.execute(new ListenerRunnable(listener, event));
} catch (EsRejectedExecutionException e) {
if (logger.isDebugEnabled()) {
RejectedExecutionHandler rejectedExecutionHandler = executor.getRejectedExecutionHandler();
@ -169,18 +171,16 @@ public class SchedulerScheduleTriggerEngine extends ScheduleTriggerEngine {
static class ListenerRunnable implements Runnable {
private final Listener listener;
private final String jobName;
private final ScheduleTriggerEvent event;
public ListenerRunnable(Listener listener, String jobName, ScheduleTriggerEvent event) {
public ListenerRunnable(Listener listener, ScheduleTriggerEvent event) {
this.listener = listener;
this.jobName = jobName;
this.event = event;
}
@Override
public void run() {
listener.triggered(jobName, event);
listener.triggered(ImmutableList.<TriggerEvent>of(event));
}
}

View File

@ -5,6 +5,7 @@
*/
package org.elasticsearch.watcher.trigger.schedule.engine;
import org.elasticsearch.common.collect.ImmutableList;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.joda.time.DateTime;
import org.elasticsearch.common.settings.ImmutableSettings;
@ -16,9 +17,12 @@ import org.elasticsearch.common.util.concurrent.XRejectedExecutionHandler;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.watcher.support.ThreadPoolSettingsBuilder;
import org.elasticsearch.watcher.support.clock.Clock;
import org.elasticsearch.watcher.trigger.TriggerEvent;
import org.elasticsearch.watcher.trigger.schedule.*;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
@ -40,7 +44,7 @@ public class SimpleTickerScheduleTriggerEngine extends ScheduleTriggerEngine {
}
int availableProcessors = EsExecutors.boundedNumberOfProcessors(settings);
return new ThreadPoolSettingsBuilder.Fixed(THREAD_POOL_NAME)
.size(availableProcessors)
.size(availableProcessors * 2)
.queueSize(1000)
.build();
}
@ -93,20 +97,27 @@ public class SimpleTickerScheduleTriggerEngine extends ScheduleTriggerEngine {
void checkJobs() {
long triggeredTime = clock.millis();
List<TriggerEvent> events = new ArrayList<>();
for (ActiveSchedule schedule : schedules.values()) {
long scheduledTime = schedule.check(triggeredTime);
if (scheduledTime > 0) {
notifyListeners(schedule.name, triggeredTime, scheduledTime);
logger.trace("triggered job [{}] at [{}] (scheduled time was [{}])", schedule.name, new DateTime(triggeredTime), new DateTime(scheduledTime));
events.add(new ScheduleTriggerEvent(schedule.name, new DateTime(triggeredTime), new DateTime(scheduledTime)));
if (events.size() >= 1000) {
notifyListeners(ImmutableList.copyOf(events));
events.clear();
}
}
}
if (events.size() > 0) {
notifyListeners(events);
}
}
protected void notifyListeners(String name, long triggeredTime, long scheduledTime) {
logger.trace("triggered job [{}] at [{}] (scheduled time was [{}])", name, new DateTime(triggeredTime), new DateTime(scheduledTime));
final ScheduleTriggerEvent event = new ScheduleTriggerEvent(new DateTime(triggeredTime), new DateTime(scheduledTime));
protected void notifyListeners(List<TriggerEvent> events) {
for (Listener listener : listeners) {
try {
executor.execute(new ListenerRunnable(listener, name, event));
executor.execute(new ListenerRunnable(listener, events));
} catch (EsRejectedExecutionException e) {
if (logger.isDebugEnabled()) {
RejectedExecutionHandler rejectedExecutionHandler = executor.getRejectedExecutionHandler();
@ -156,18 +167,16 @@ public class SimpleTickerScheduleTriggerEngine extends ScheduleTriggerEngine {
static class ListenerRunnable implements Runnable {
private final Listener listener;
private final String jobName;
private final ScheduleTriggerEvent event;
private final List<TriggerEvent> events;
public ListenerRunnable(Listener listener, String jobName, ScheduleTriggerEvent event) {
public ListenerRunnable(Listener listener, List<TriggerEvent> events) {
this.listener = listener;
this.jobName = jobName;
this.event = event;
this.events = events;
}
@Override
public void run() {
listener.triggered(jobName, event);
listener.triggered(events);
}
}
@ -192,7 +201,7 @@ public class SimpleTickerScheduleTriggerEngine extends ScheduleTriggerEngine {
logger.trace("checking jobs [{}]", DateTime.now());
checkJobs();
try {
sleep(1000);
sleep(500);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}

View File

@ -5,6 +5,7 @@
*/
package org.elasticsearch.watcher.trigger.schedule.engine;
import org.elasticsearch.common.collect.ImmutableList;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.joda.time.DateTime;
import org.elasticsearch.common.settings.ImmutableSettings;
@ -16,12 +17,10 @@ import org.elasticsearch.common.util.concurrent.XRejectedExecutionHandler;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.watcher.support.ThreadPoolSettingsBuilder;
import org.elasticsearch.watcher.support.clock.Clock;
import org.elasticsearch.watcher.trigger.TriggerEvent;
import org.elasticsearch.watcher.trigger.schedule.*;
import java.util.Collection;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.RejectedExecutionHandler;
@ -41,7 +40,7 @@ public class TimerTickerScheduleTriggerEngine extends ScheduleTriggerEngine {
}
int availableProcessors = EsExecutors.boundedNumberOfProcessors(settings);
return new ThreadPoolSettingsBuilder.Fixed(THREAD_POOL_NAME)
.size(availableProcessors)
.size(availableProcessors * 2)
.queueSize(1000)
.build();
}
@ -79,7 +78,7 @@ public class TimerTickerScheduleTriggerEngine extends ScheduleTriggerEngine {
}
};
this.timer = new Timer("ticker-schedule-trigger-engine", true);
this.timer.scheduleAtFixedRate(ticker, clock.millis() % 1000 , 10);
this.timer.scheduleAtFixedRate(ticker, clock.millis() % 1000, 500);
}
@Override
@ -103,19 +102,26 @@ public class TimerTickerScheduleTriggerEngine extends ScheduleTriggerEngine {
void checkJobs() {
long triggeredTime = clock.millis();
List<TriggerEvent> events = new ArrayList<>();
for (ActiveSchedule schedule : schedules.values()) {
long scheduledTime = schedule.check(triggeredTime);
if (scheduledTime > 0) {
notifyListeners(schedule.name, triggeredTime, scheduledTime);
events.add(new ScheduleTriggerEvent(schedule.name, new DateTime(triggeredTime), new DateTime(scheduledTime)));
if (events.size() >= 1000) {
notifyListeners(ImmutableList.copyOf(events));
events.clear();
}
}
}
if (events.size() > 0) {
notifyListeners(events);
}
}
protected void notifyListeners(String name, long triggeredTime, long scheduledTime) {
final ScheduleTriggerEvent event = new ScheduleTriggerEvent(new DateTime(triggeredTime), new DateTime(scheduledTime));
protected void notifyListeners(List<TriggerEvent> events) {
for (Listener listener : listeners) {
try {
executor.execute(new ListenerRunnable(listener, name, event));
executor.execute(new ListenerRunnable(listener, events));
} catch (EsRejectedExecutionException e) {
if (logger.isDebugEnabled()) {
RejectedExecutionHandler rejectedExecutionHandler = executor.getRejectedExecutionHandler();
@ -165,18 +171,16 @@ public class TimerTickerScheduleTriggerEngine extends ScheduleTriggerEngine {
static class ListenerRunnable implements Runnable {
private final Listener listener;
private final String jobName;
private final ScheduleTriggerEvent event;
private final List<TriggerEvent> events;
public ListenerRunnable(Listener listener, String jobName, ScheduleTriggerEvent event) {
public ListenerRunnable(Listener listener, List<TriggerEvent> events) {
this.listener = listener;
this.jobName = jobName;
this.event = event;
this.events = events;
}
@Override
public void run() {
listener.triggered(jobName, event);
listener.triggered(events);
}
}

View File

@ -93,7 +93,7 @@ public class EmailActionTests extends ElasticsearchTestCase {
WatchExecutionContext ctx = mockExecutionContextBuilder("watch1")
.wid(wid)
.payload(payload)
.time(now)
.time("watch1", now)
.metadata(metadata)
.buildMock();

View File

@ -70,7 +70,7 @@ public class IndexActionTests extends ElasticsearchIntegrationTest {
}
},
logger);
WatchExecutionContext ctx = new TriggeredExecutionContext(watch, new DateTime(), new ScheduleTriggerEvent(new DateTime(), new DateTime()));
WatchExecutionContext ctx = new TriggeredExecutionContext(watch, new DateTime(), new ScheduleTriggerEvent(watch.name(), new DateTime(), new DateTime()));
Map<String, Object> payloadMap = new HashMap<>();
payloadMap.put("test", "foo");

View File

@ -81,7 +81,7 @@ public class LoggingActionTests extends ElasticsearchTestCase {
when(engine.render(template, expectedModel)).thenReturn(text);
WatchExecutionContext ctx = WatcherTestUtils.mockExecutionContextBuilder("_watch_id")
.time(now)
.time("_watch_id", now)
.buildMock();
LoggingAction.Result result = executable.execute("_id", ctx, new Payload.Simple());

View File

@ -114,7 +114,7 @@ public class WebhookActionTests extends ElasticsearchTestCase {
ExecutableWebhookAction executable = new ExecutableWebhookAction(action, logger, httpClient, templateEngine);
Watch watch = createWatch("test_watch", client, account);
WatchExecutionContext ctx = new TriggeredExecutionContext(watch, new DateTime(), new ScheduleTriggerEvent(new DateTime(), new DateTime()));
WatchExecutionContext ctx = new TriggeredExecutionContext(watch, new DateTime(), new ScheduleTriggerEvent(watch.name(), new DateTime(), new DateTime()));
WebhookAction.Result actionResult = executable.execute("_id", ctx, new Payload.Simple());
scenario.assertResult(actionResult);
@ -362,7 +362,7 @@ public class WebhookActionTests extends ElasticsearchTestCase {
String watchName = "test_url_encode" + randomAsciiOfLength(10);
Watch watch = createWatch(watchName, mock(ClientProxy.class), "account1");
WatchExecutionContext ctx = new TriggeredExecutionContext(watch, new DateTime(), new ScheduleTriggerEvent(new DateTime(), new DateTime()));
WatchExecutionContext ctx = new TriggeredExecutionContext(watch, new DateTime(), new ScheduleTriggerEvent(watchName, new DateTime(), new DateTime()));
WebhookAction.Result result = webhookAction.execute("_id", ctx, new Payload.Simple());
assertThat(result, Matchers.instanceOf(WebhookAction.Result.Executed.class));
}

View File

@ -24,7 +24,6 @@ import org.elasticsearch.watcher.support.clock.Clock;
import org.elasticsearch.watcher.support.clock.ClockMock;
import org.elasticsearch.watcher.throttle.Throttler;
import org.elasticsearch.watcher.transform.Transform;
import org.elasticsearch.watcher.trigger.TriggerService;
import org.elasticsearch.watcher.trigger.schedule.ScheduleTriggerEvent;
import org.elasticsearch.watcher.watch.*;
import org.junit.Before;
@ -59,10 +58,9 @@ public class ExecutionServiceTests extends ElasticsearchTestCase {
WatchExecutor executor = mock(WatchExecutor.class);
WatchStore watchStore = mock(WatchStore.class);
WatchLockService watchLockService = mock(WatchLockService.class);
TriggerService triggerService = mock(TriggerService.class);
ClusterService clusterService = mock(ClusterService.class);
Clock clock = new ClockMock();
executionService = new ExecutionService(ImmutableSettings.EMPTY, historyStore, executor, watchStore, watchLockService, triggerService, clusterService, clock);
executionService = new ExecutionService(ImmutableSettings.EMPTY, historyStore, executor, watchStore, watchLockService, clusterService, clock);
}
@Test
@ -96,7 +94,7 @@ public class ExecutionServiceTests extends ElasticsearchTestCase {
DateTime now = DateTime.now(DateTimeZone.UTC);
ScheduleTriggerEvent event = new ScheduleTriggerEvent(now, now);
ScheduleTriggerEvent event = new ScheduleTriggerEvent(null, now, now);
WatchExecutionContext context = new TriggeredExecutionContext(watch, now, event);
WatchExecution watchExecution = executionService.executeInner(context);
assertThat(watchExecution.conditionResult(), sameInstance(conditionResult));
@ -142,7 +140,7 @@ public class ExecutionServiceTests extends ElasticsearchTestCase {
DateTime now = DateTime.now(DateTimeZone.UTC);
ScheduleTriggerEvent event = new ScheduleTriggerEvent(now, now);
ScheduleTriggerEvent event = new ScheduleTriggerEvent(null, now, now);
WatchExecutionContext context = new TriggeredExecutionContext(watch, now, event);
WatchExecution watchExecution = executionService.executeInner(context);
assertThat(watchExecution.inputResult(), sameInstance(inputResult));
@ -188,7 +186,7 @@ public class ExecutionServiceTests extends ElasticsearchTestCase {
DateTime now = DateTime.now(DateTimeZone.UTC);
ScheduleTriggerEvent event = new ScheduleTriggerEvent(now, now);
ScheduleTriggerEvent event = new ScheduleTriggerEvent(null, now, now);
WatchExecutionContext context = new TriggeredExecutionContext(watch, now, event);
WatchExecution watchExecution = executionService.executeInner(context);
assertThat(watchExecution.inputResult(), sameInstance(inputResult));

View File

@ -37,7 +37,7 @@ public class HistoryStoreLifeCycleTest extends AbstractWatcherIntegrationTests {
WatchRecord[] watchRecords = new WatchRecord[randomIntBetween(1, 50)];
for (int i = 0; i < watchRecords.length; i++) {
DateTime dateTime = new DateTime(i, DateTimeZone.UTC);
ScheduleTriggerEvent event = new ScheduleTriggerEvent(dateTime, dateTime);
ScheduleTriggerEvent event = new ScheduleTriggerEvent(watch.name(), dateTime, dateTime);
Wid wid = new Wid("record_" + i, randomLong(), DateTime.now(UTC));
watchRecords[i] = new WatchRecord(wid, watch, event);
historyStore.put(watchRecords[i]);

View File

@ -76,7 +76,7 @@ public class HistoryStoreTests extends ElasticsearchTestCase {
when(watch.condition()).thenReturn(new ExecutableAlwaysCondition(logger));
when(watch.input()).thenReturn(null);
when(watch.metadata()).thenReturn(null);
ScheduleTriggerEvent event = new ScheduleTriggerEvent(new DateTime(0, DateTimeZone.UTC), new DateTime(0, DateTimeZone.UTC));
ScheduleTriggerEvent event = new ScheduleTriggerEvent(watch.name(), new DateTime(0, DateTimeZone.UTC), new DateTime(0, DateTimeZone.UTC));
Wid wid = new Wid("_name", 0, new DateTime(0, DateTimeZone.UTC));
WatchRecord watchRecord = new WatchRecord(wid, watch, event);
@ -96,7 +96,7 @@ public class HistoryStoreTests extends ElasticsearchTestCase {
when(watch.condition()).thenReturn(new ExecutableAlwaysCondition(logger));
when(watch.input()).thenReturn(null);
when(watch.metadata()).thenReturn(null);
ScheduleTriggerEvent event = new ScheduleTriggerEvent(new DateTime(0, DateTimeZone.UTC), new DateTime(0, DateTimeZone.UTC));
ScheduleTriggerEvent event = new ScheduleTriggerEvent(watch.name(), new DateTime(0, DateTimeZone.UTC), new DateTime(0, DateTimeZone.UTC));
Wid wid = new Wid("_name", 0, new DateTime(0, DateTimeZone.UTC));
WatchRecord watchRecord = new WatchRecord(wid, watch, event);
watchRecord.version(4l);
@ -117,7 +117,7 @@ public class HistoryStoreTests extends ElasticsearchTestCase {
when(watch.condition()).thenReturn(new ExecutableAlwaysCondition(logger));
when(watch.input()).thenReturn(null);
when(watch.metadata()).thenReturn(null);
ScheduleTriggerEvent event = new ScheduleTriggerEvent(new DateTime(0, DateTimeZone.UTC), new DateTime(0, DateTimeZone.UTC));
ScheduleTriggerEvent event = new ScheduleTriggerEvent(watch.name(), new DateTime(0, DateTimeZone.UTC), new DateTime(0, DateTimeZone.UTC));
Wid wid = new Wid("_name", 0, new DateTime(0, DateTimeZone.UTC));
WatchRecord watchRecord = new WatchRecord(wid, watch, event);
@ -137,7 +137,7 @@ public class HistoryStoreTests extends ElasticsearchTestCase {
when(watch.condition()).thenReturn(new ExecutableAlwaysCondition(logger));
when(watch.input()).thenReturn(null);
when(watch.metadata()).thenReturn(null);
ScheduleTriggerEvent event = new ScheduleTriggerEvent(new DateTime(0, DateTimeZone.UTC), new DateTime(0, DateTimeZone.UTC));
ScheduleTriggerEvent event = new ScheduleTriggerEvent(watch.name(), new DateTime(0, DateTimeZone.UTC), new DateTime(0, DateTimeZone.UTC));
Wid wid = new Wid("_name", 0, new DateTime(0, DateTimeZone.UTC));
WatchRecord watchRecord = new WatchRecord(wid, watch, event);

View File

@ -39,7 +39,7 @@ public class WatchRecordTests extends AbstractWatcherIntegrationTests {
@Test
public void testParser() throws Exception {
Watch watch = WatcherTestUtils.createTestWatch("fired_test", scriptService(), httpClient(), noopEmailService(), logger);
ScheduleTriggerEvent event = new ScheduleTriggerEvent(DateTime.now(UTC), DateTime.now(UTC));
ScheduleTriggerEvent event = new ScheduleTriggerEvent(watch.name(), DateTime.now(UTC), DateTime.now(UTC));
Wid wid = new Wid("_record", randomLong(), DateTime.now(UTC));
WatchRecord watchRecord = new WatchRecord(wid, watch, event);
XContentBuilder jsonBuilder = XContentFactory.jsonBuilder();
@ -55,7 +55,7 @@ public class WatchRecordTests extends AbstractWatcherIntegrationTests {
@Test
public void testParser_WithSealedWatchRecord() throws Exception {
Watch watch = WatcherTestUtils.createTestWatch("fired_test", scriptService(), httpClient(), noopEmailService(), logger);
ScheduleTriggerEvent event = new ScheduleTriggerEvent(DateTime.now(UTC), DateTime.now(UTC));
ScheduleTriggerEvent event = new ScheduleTriggerEvent(watch.name(), DateTime.now(UTC), DateTime.now(UTC));
Wid wid = new Wid("_record", randomLong(), DateTime.now(UTC));
WatchRecord watchRecord = new WatchRecord(wid, watch, event);
WatchExecutionContext ctx = new TriggeredExecutionContext(watch, new DateTime(), event);
@ -85,7 +85,7 @@ public class WatchRecordTests extends AbstractWatcherIntegrationTests {
@Test
public void testParser_WithSealedWatchRecord_WithScriptSearchCondition() throws Exception {
Watch watch = WatcherTestUtils.createTestWatch("fired_test", scriptService(), httpClient(), noopEmailService(), logger);
ScheduleTriggerEvent event = new ScheduleTriggerEvent(DateTime.now(UTC), DateTime.now(UTC));
ScheduleTriggerEvent event = new ScheduleTriggerEvent(watch.name(), DateTime.now(UTC), DateTime.now(UTC));
WatchExecutionContext ctx = new TriggeredExecutionContext( watch, new DateTime(), event);
WatchRecord watchRecord = new WatchRecord(ctx.id(), watch, event);
ctx.onActionResult(new ActionWrapper.Result("_email", new EmailAction.Result.Failure("failed to send because blah")));

View File

@ -93,7 +93,7 @@ public class HttpInputTests extends ElasticsearchTestCase {
new Watch.Status());
WatchExecutionContext ctx = new TriggeredExecutionContext(watch,
new DateTime(0, DateTimeZone.UTC),
new ScheduleTriggerEvent(new DateTime(0, DateTimeZone.UTC), new DateTime(0, DateTimeZone.UTC)));
new ScheduleTriggerEvent(watch.name(), new DateTime(0, DateTimeZone.UTC), new DateTime(0, DateTimeZone.UTC)));
HttpInput.Result result = input.execute(ctx);
assertThat(result.type(), equalTo(HttpInput.TYPE));
assertThat(result.payload().data(), equalTo(MapBuilder.<String, Object>newMapBuilder().put("key", "value").map()));

View File

@ -81,7 +81,7 @@ public class SearchInputTests extends ElasticsearchIntegrationTest {
null,
new Watch.Status()),
new DateTime(0, DateTimeZone.UTC),
new ScheduleTriggerEvent(new DateTime(0, DateTimeZone.UTC), new DateTime(0, DateTimeZone.UTC)));
new ScheduleTriggerEvent("test-watch", new DateTime(0, DateTimeZone.UTC), new DateTime(0, DateTimeZone.UTC)));
SearchInput.Result result = searchInput.execute(ctx);
assertThat((Integer) XContentMapValues.extractValue("hits.total", result.payload().data()), equalTo(0));
@ -119,7 +119,7 @@ public class SearchInputTests extends ElasticsearchIntegrationTest {
null,
new Watch.Status()),
new DateTime(0, DateTimeZone.UTC),
new ScheduleTriggerEvent(new DateTime(0, DateTimeZone.UTC), new DateTime(0, DateTimeZone.UTC)));
new ScheduleTriggerEvent("test-watch", new DateTime(0, DateTimeZone.UTC), new DateTime(0, DateTimeZone.UTC)));
SearchInput.Result result = searchInput.execute(ctx);
assertThat((Integer) XContentMapValues.extractValue("hits.total", result.payload().data()), equalTo(0));

View File

@ -32,7 +32,7 @@ public class VariablesTests extends ElasticsearchTestCase {
DateTime executionTime = triggeredTime.plusMillis(50);
Payload payload = new Payload.Simple(ImmutableMap.<String, Object>builder().put("payload_key", "payload_value").build());
Map<String, Object> metatdata = ImmutableMap.<String, Object>builder().put("metadata_key", "metadata_value").build();
TriggerEvent event = new ScheduleTriggerEvent(triggeredTime, scheduledTime);
TriggerEvent event = new ScheduleTriggerEvent("_watch_id", triggeredTime, scheduledTime);
WatchExecutionContext wec = WatcherTestUtils.mockExecutionContextBuilder("_watch_id")
.executionTime(executionTime)
.triggerEvent(event)

View File

@ -91,6 +91,8 @@ public abstract class AbstractWatcherIntegrationTests extends ElasticsearchInteg
@Override
protected Settings nodeSettings(int nodeOrdinal) {
String scheduleImplName = scheduleEngine().name().toLowerCase(Locale.ROOT);
logger.info("using schedule engine [" + scheduleImplName + "]");
return ImmutableSettings.builder()
.put(super.nodeSettings(nodeOrdinal))
.put("scroll.size", randomIntBetween(1, 100))
@ -99,7 +101,7 @@ public abstract class AbstractWatcherIntegrationTests extends ElasticsearchInteg
(shieldEnabled ? ShieldPlugin.class.getName() + "," : "") +
licensePluginClass().getName())
.put(ShieldSettings.settings(shieldEnabled))
.put("watcher.trigger.schedule.engine", scheduleEngine().name().toLowerCase(Locale.ROOT))
.put("watcher.trigger.schedule.engine", scheduleImplName)
.build();
}

View File

@ -11,6 +11,7 @@ import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.watcher.WatcherPlugin;
import org.elasticsearch.watcher.execution.ExecutionModule;
import org.elasticsearch.watcher.execution.SyncTriggerListener;
import org.elasticsearch.watcher.execution.WatchExecutor;
import org.elasticsearch.watcher.support.clock.Clock;
import org.elasticsearch.watcher.support.clock.ClockMock;
@ -101,7 +102,7 @@ public class TimeWarpedWatcherPlugin extends WatcherPlugin {
public static class MockExecutionModule extends ExecutionModule {
public MockExecutionModule() {
super(SameThreadExecutor.class);
super(SameThreadExecutor.class, SyncTriggerListener.class);
}
public static class SameThreadExecutor implements WatchExecutor {

View File

@ -36,7 +36,7 @@ public class WatchExecutionContextMockBuilder {
when(ctx.watch()).thenReturn(watch);
payload(Collections.<String, Object>emptyMap());
metadata(Collections.<String, Object>emptyMap());
time(DateTime.now(UTC));
time(watchId, DateTime.now(UTC));
}
public WatchExecutionContextMockBuilder wid(Wid wid) {
@ -57,8 +57,8 @@ public class WatchExecutionContextMockBuilder {
return this;
}
public WatchExecutionContextMockBuilder time(DateTime time) {
return executionTime(time).triggerEvent(new ScheduleTriggerEvent(time, time));
public WatchExecutionContextMockBuilder time(String watchId, DateTime time) {
return executionTime(time).triggerEvent(new ScheduleTriggerEvent(watchId, time, time));
}
public WatchExecutionContextMockBuilder executionTime(DateTime time) {

View File

@ -109,7 +109,7 @@ public final class WatcherTestUtils {
public static WatchExecutionContext mockExecutionContext(String watchId, DateTime time, Payload payload) {
return mockExecutionContextBuilder(watchId)
.payload(payload)
.time(time)
.time(watchId, time)
.buildMock();
}

View File

@ -12,10 +12,8 @@ import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.watcher.support.clock.SystemClock;
import org.elasticsearch.watcher.trigger.Trigger;
import org.elasticsearch.watcher.trigger.TriggerEngine;
import org.elasticsearch.watcher.trigger.schedule.Schedule;
import org.elasticsearch.watcher.trigger.schedule.ScheduleRegistry;
import org.elasticsearch.watcher.trigger.schedule.ScheduleTrigger;
import org.elasticsearch.watcher.trigger.schedule.ScheduleTriggerEngine;
import org.elasticsearch.watcher.trigger.TriggerEvent;
import org.elasticsearch.watcher.trigger.schedule.*;
import org.elasticsearch.watcher.trigger.schedule.engine.*;
import org.quartz.JobExecutionContext;
@ -102,9 +100,12 @@ public class ScheduleEngineTriggerBenchmark {
scheduler = new SimpleTickerScheduleTriggerEngine(settings, SystemClock.INSTANCE, scheduleRegistry, threadPool) {
@Override
protected void notifyListeners(String name, long triggeredTime, long scheduledTime) {
protected void notifyListeners(List<TriggerEvent> events) {
if (running.get()) {
measure(total, triggerMetric, tooEarlyMetric, triggeredTime, scheduledTime);
for (TriggerEvent event : events) {
ScheduleTriggerEvent scheduleTriggerEvent = (ScheduleTriggerEvent) event;
measure(total, triggerMetric, tooEarlyMetric, event.triggeredTime().getMillis(), scheduleTriggerEvent.scheduledTime().getMillis());
}
}
}
};
@ -113,9 +114,12 @@ public class ScheduleEngineTriggerBenchmark {
scheduler = new TimerTickerScheduleTriggerEngine(settings, SystemClock.INSTANCE, scheduleRegistry, threadPool) {
@Override
protected void notifyListeners(String name, long triggeredTime, long scheduledTime) {
protected void notifyListeners(List<TriggerEvent> events) {
if (running.get()) {
measure(total, triggerMetric, tooEarlyMetric, triggeredTime, scheduledTime);
for (TriggerEvent event : events) {
ScheduleTriggerEvent scheduleTriggerEvent = (ScheduleTriggerEvent) event;
measure(total, triggerMetric, tooEarlyMetric, event.triggeredTime().getMillis(), scheduleTriggerEvent.scheduledTime().getMillis());
}
}
}
};

View File

@ -105,7 +105,7 @@ public class BootStrapTests extends AbstractWatcherIntegrationTests {
assertThat(indexResponse.isCreated(), is(true));
DateTime now = DateTime.now(UTC);
ScheduleTriggerEvent event = new ScheduleTriggerEvent(now, now);
ScheduleTriggerEvent event = new ScheduleTriggerEvent(watch.name(), now, now);
Wid wid = new Wid("_record", randomLong(), DateTime.now(UTC));
WatchRecord watchRecord = new WatchRecord(wid, watch, event);
String actionHistoryIndex = HistoryStore.getHistoryIndexNameForTime(now);
@ -164,7 +164,7 @@ public class BootStrapTests extends AbstractWatcherIntegrationTests {
PutWatchResponse putWatchResponse = watcherClient().preparePutWatch(watch.name()).setSource(jsonBuilder.bytes()).get();
assertThat(putWatchResponse.isCreated(), is(true));
ScheduleTriggerEvent event = new ScheduleTriggerEvent(historyIndexDate, historyIndexDate);
ScheduleTriggerEvent event = new ScheduleTriggerEvent(watch.name(), historyIndexDate, historyIndexDate);
Wid wid = new Wid("record_" + i, randomLong(), DateTime.now(UTC));
WatchRecord watchRecord = new WatchRecord(wid, watch, event);

View File

@ -15,7 +15,6 @@ import org.elasticsearch.watcher.client.WatcherClient;
import org.elasticsearch.watcher.history.HistoryStore;
import org.elasticsearch.watcher.support.http.HttpRequestTemplate;
import org.elasticsearch.watcher.support.http.auth.BasicAuth;
import org.elasticsearch.watcher.support.init.proxy.ScriptServiceProxy;
import org.elasticsearch.watcher.support.template.Template;
import org.elasticsearch.watcher.test.AbstractWatcherIntegrationTests;
import org.elasticsearch.watcher.trigger.schedule.IntervalSchedule;
@ -85,7 +84,6 @@ public class HttpInputIntegrationTest extends AbstractWatcherIntegrationTests {
client().prepareIndex("idx", "type").setSource("field", "value").get();
refresh();
ScriptServiceProxy sc = scriptService();
InetSocketAddress address = internalTestCluster().httpAddresses()[0];
XContentBuilder body = jsonBuilder().prettyPrint().startObject()
.field("query").value(termQuery("field", "value"))
@ -99,7 +97,7 @@ public class HttpInputIntegrationTest extends AbstractWatcherIntegrationTests {
watcherClient.preparePutWatch("_name1")
.setSource(watchBuilder()
.trigger(schedule(interval(5, IntervalSchedule.Interval.Unit.SECONDS)))
.trigger(schedule(interval(10, IntervalSchedule.Interval.Unit.SECONDS)))
.input(httpInput(requestBuilder).extractKeys("hits.total"))
.condition(scriptCondition("ctx.payload.hits.total == 1")))
.get();
@ -107,7 +105,7 @@ public class HttpInputIntegrationTest extends AbstractWatcherIntegrationTests {
// in this watcher the condition will fail, because max_score isn't extracted, only total:
watcherClient.preparePutWatch("_name2")
.setSource(watchBuilder()
.trigger(schedule(interval(5, IntervalSchedule.Interval.Unit.SECONDS)))
.trigger(schedule(interval(10, IntervalSchedule.Interval.Unit.SECONDS)))
.input(httpInput(requestBuilder).extractKeys("hits.total"))
.condition(scriptCondition("ctx.payload.hits.max_score >= 0")))
.get();
@ -116,6 +114,8 @@ public class HttpInputIntegrationTest extends AbstractWatcherIntegrationTests {
timeWarp().scheduler().trigger("_name1");
timeWarp().scheduler().trigger("_name2");
refresh();
} else {
Thread.sleep(10000);
}
assertWatchWithMinimumPerformedActionsCount("_name1", 1, false);

View File

@ -137,7 +137,7 @@ public class WatchThrottleTests extends AbstractWatcherIntegrationTests {
.condition(scriptCondition("ctx.payload.hits.total > 0"))
.transform(searchTransform(matchAllRequest().indices("events")))
.addAction("_id", indexAction("actions", "action"))
.throttlePeriod(TimeValue.timeValueSeconds(10)))
.throttlePeriod(TimeValue.timeValueSeconds(30)))
.get();
assertThat(putWatchResponse.isCreated(), is(true));
@ -159,7 +159,7 @@ public class WatchThrottleTests extends AbstractWatcherIntegrationTests {
actionsCount = docCount("actions", "action", matchAllQuery());
assertThat(actionsCount, is(1L));
timeWarp().clock().fastForwardSeconds(10);
timeWarp().clock().fastForwardSeconds(30);
timeWarp().scheduler().trigger("_name");
refresh();
@ -172,23 +172,30 @@ public class WatchThrottleTests extends AbstractWatcherIntegrationTests {
assertThat(throttledCount, is(1L));
} else {
Thread.sleep(TimeUnit.SECONDS.toMillis(2));
refresh();
// the first fire should work so we should have a single action in the actions index
long actionsCount = docCount("actions", "action", matchAllQuery());
assertThat(actionsCount, is(1L));
Thread.sleep(TimeUnit.SECONDS.toMillis(5));
// the first fire should work so we should have a single action in the actions index
assertBusy(new Runnable() {
@Override
public void run() {
refresh();
long actionsCount = docCount("actions", "action", matchAllQuery());
assertThat(actionsCount, is(1L));
}
}, 5, TimeUnit.SECONDS);
Thread.sleep(TimeUnit.SECONDS.toMillis(5));
// we should still be within the throttling period... so the number of actions shouldn't change
actionsCount = docCount("actions", "action", matchAllQuery());
assertThat(actionsCount, is(1L));
assertBusy(new Runnable() {
@Override
public void run() {
refresh();
long actionsCount = docCount("actions", "action", matchAllQuery());
assertThat(actionsCount, is(1L));
long throttledCount = docCount(HistoryStore.INDEX_PREFIX + "*", null,
matchQuery(WatchRecord.Parser.STATE_FIELD.getPreferredName(), WatchRecord.State.THROTTLED.id()));
assertThat(throttledCount, greaterThanOrEqualTo(1L));
long throttledCount = docCount(HistoryStore.INDEX_PREFIX + "*", null,
matchQuery(WatchRecord.Parser.STATE_FIELD.getPreferredName(), WatchRecord.State.THROTTLED.id()));
assertThat(throttledCount, greaterThanOrEqualTo(1L));
}
}, 5, TimeUnit.SECONDS);
}
}

View File

@ -110,7 +110,7 @@ public class SearchTransformTests extends AbstractWatcherIntegrationTests {
SearchTransform transform = new SearchTransform(logger, scriptService(), ClientProxy.of(client()), request);
ScheduleTriggerEvent event = new ScheduleTriggerEvent(parseDate("2015-01-04T00:00:00"), parseDate("2015-01-01T00:00:00"));
ScheduleTriggerEvent event = new ScheduleTriggerEvent("_name", parseDate("2015-01-04T00:00:00"), parseDate("2015-01-01T00:00:00"));
WatchExecutionContext ctx = mockExecutionContext("_name", parseDate("2015-01-04T00:00:00"), event, EMPTY_PAYLOAD);
Payload payload = simplePayload("value", "val_3");

View File

@ -5,6 +5,7 @@
*/
package org.elasticsearch.watcher.trigger;
import org.elasticsearch.common.collect.ImmutableList;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.joda.time.DateTime;
import org.elasticsearch.common.logging.ESLogger;
@ -81,9 +82,9 @@ public class ScheduleTriggerEngineMock extends ScheduleTriggerEngine {
for (int i = 0; i < times; i++) {
DateTime now = clock.now();
logger.debug("firing [" + jobName + "] at [" + now + "]");
ScheduleTriggerEvent event = new ScheduleTriggerEvent(now, now);
ScheduleTriggerEvent event = new ScheduleTriggerEvent(jobName, now, now);
for (Listener listener : listeners) {
listener.triggered(jobName, event);
listener.triggered(ImmutableList.<TriggerEvent>of(event));
}
if (interval != null) {
if (clock instanceof ClockMock) {

View File

@ -74,15 +74,18 @@ public class QuartzScheduleEngineTests extends ElasticsearchTestCase {
}
final BitSet bits = new BitSet(count);
engine.register(new TriggerEngine.Listener() {
@Override
public void triggered(String jobName, TriggerEvent event) {
int index = Integer.parseInt(jobName);
if (!bits.get(index)) {
logger.info("job [" + index + "] first fire: " + new DateTime());
bits.set(index);
} else {
latch.countDown();
logger.info("job [" + index + "] second fire: " + new DateTime());
public void triggered(Iterable<TriggerEvent> events) {
for (TriggerEvent event : events) {
int index = Integer.parseInt(event.jobName());
if (!bits.get(index)) {
logger.info("job [" + index + "] first fire: " + new DateTime());
bits.set(index);
} else {
latch.countDown();
logger.info("job [" + index + "] second fire: " + new DateTime());
}
}
}
});
@ -100,11 +103,14 @@ public class QuartzScheduleEngineTests extends ElasticsearchTestCase {
final CountDownLatch latch = new CountDownLatch(1);
engine.start(Collections.<TriggerEngine.Job>emptySet());
engine.register(new TriggerEngine.Listener() {
@Override
public void triggered(String jobName, TriggerEvent event) {
assertThat(jobName, is(name));
logger.info("triggered job on [{}]", new DateTime());
latch.countDown();
public void triggered(Iterable<TriggerEvent> events) {
for (TriggerEvent event : events) {
assertThat(event.jobName(), is(name));
logger.info("triggered job on [{}]", new DateTime());
latch.countDown();
}
}
});
DateTime now = new DateTime(DateTimeZone.UTC);
@ -131,11 +137,14 @@ public class QuartzScheduleEngineTests extends ElasticsearchTestCase {
final CountDownLatch latch = new CountDownLatch(1);
engine.start(Collections.<TriggerEngine.Job>emptySet());
engine.register(new TriggerEngine.Listener() {
@Override
public void triggered(String jobName, TriggerEvent event) {
assertThat(jobName, is(name));
logger.info("triggered job on [{}]", new DateTime());
latch.countDown();
public void triggered(Iterable<TriggerEvent> events) {
for (TriggerEvent event : events) {
assertThat(event.jobName(), is(name));
logger.info("triggered job on [{}]", new DateTime());
latch.countDown();
}
}
});
DateTime now = new DateTime(DateTimeZone.UTC);
@ -164,11 +173,14 @@ public class QuartzScheduleEngineTests extends ElasticsearchTestCase {
final CountDownLatch latch = new CountDownLatch(1);
engine.start(Collections.<TriggerEngine.Job>emptySet());
engine.register(new TriggerEngine.Listener() {
@Override
public void triggered(String jobName, TriggerEvent event) {
assertThat(jobName, is(name));
logger.info("triggered job on [{}]", new DateTime());
latch.countDown();
public void triggered(Iterable<TriggerEvent> events) {
for (TriggerEvent event : events) {
assertThat(event.jobName(), is(name));
logger.info("triggered job on [{}]", new DateTime());
latch.countDown();
}
}
});
DateTime now = new DateTime(DateTimeZone.UTC);