diff --git a/src/main/java/org/elasticsearch/watcher/execution/CurrentExecutions.java b/src/main/java/org/elasticsearch/watcher/execution/CurrentExecutions.java index 9376970a580..959e436dd31 100644 --- a/src/main/java/org/elasticsearch/watcher/execution/CurrentExecutions.java +++ b/src/main/java/org/elasticsearch/watcher/execution/CurrentExecutions.java @@ -10,6 +10,7 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import java.util.Iterator; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; @@ -17,7 +18,7 @@ import java.util.concurrent.locks.ReentrantLock; public class CurrentExecutions implements Iterable { - private final ConcurrentMap currentExecutions = ConcurrentCollections.newConcurrentMapWithAggressiveConcurrency(); + private final ConcurrentMap currentExecutions = new ConcurrentHashMap<>(); private final ReentrantLock lock = new ReentrantLock(); private final Condition empty = lock.newCondition(); private boolean seal = false; diff --git a/src/main/java/org/elasticsearch/watcher/execution/ExecutionService.java b/src/main/java/org/elasticsearch/watcher/execution/ExecutionService.java index d453f7fbe2d..4977c6adf72 100644 --- a/src/main/java/org/elasticsearch/watcher/execution/ExecutionService.java +++ b/src/main/java/org/elasticsearch/watcher/execution/ExecutionService.java @@ -15,7 +15,6 @@ import org.elasticsearch.common.joda.time.DateTime; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; -import org.elasticsearch.index.engine.DocumentMissingException; import org.elasticsearch.watcher.WatcherException; import org.elasticsearch.watcher.actions.ActionWrapper; import org.elasticsearch.watcher.condition.Condition; @@ -83,8 +82,8 @@ public class ExecutionService extends AbstractComponent { historyStore.start(); triggeredWatchStore.start(); currentExecutions = new CurrentExecutions(); - Collection records = triggeredWatchStore.loadTriggeredWatches(state); - executeRecords(records); + Collection triggeredWatches = triggeredWatchStore.loadTriggeredWatches(state); + executeTriggeredWatches(triggeredWatches); logger.debug("started execution service"); } } @@ -97,14 +96,13 @@ public class ExecutionService extends AbstractComponent { if (started.compareAndSet(true, false)) { logger.debug("stopping execution service"); // We could also rely on the shutdown in #updateSettings call, but - // this is a forceful shutdown that also interrupts the worker threads in the threadpool - List cancelledTasks = new ArrayList<>(); - executor.queue().drainTo(cancelledTasks); + // this is a forceful shutdown that also interrupts the worker threads in the thread pool + int cancelledTaskCount = executor.queue().drainTo(new ArrayList<>()); currentExecutions.sealAndAwaitEmpty(maxStopTimeout); triggeredWatchStore.stop(); historyStore.stop(); - logger.debug("cancelled [{}] queued tasks", cancelledTasks.size()); + logger.debug("cancelled [{}] queued tasks", cancelledTaskCount); logger.debug("stopped execution service"); } } @@ -181,49 +179,25 @@ public class ExecutionService extends AbstractComponent { } logger.debug("saving watch records [{}]", triggeredWatches.size()); - if (triggeredWatches.size() == 0) { - return; - } - if (triggeredWatches.size() == 1) { - final TriggeredWatch triggeredWatch = triggeredWatches.getFirst(); - final TriggeredExecutionContext ctx = contexts.getFirst(); - triggeredWatchStore.put(triggeredWatch, new ActionListener() { - @Override - public void onResponse(Boolean aBoolean) { - executeAsync(ctx, triggeredWatch); + triggeredWatchStore.putAll(triggeredWatches, new ActionListener>() { + @Override + public void onResponse(List successFullSlots) { + for (Integer slot : successFullSlots) { + executeAsync(contexts.get(slot), triggeredWatches.get(slot)); } + } - @Override - public void onFailure(Throwable e) { - Throwable cause = ExceptionsHelper.unwrapCause(e); - if (cause instanceof EsRejectedExecutionException) { - logger.debug("failed to store watch record [{}]/[{}] due to overloaded threadpool [{}]", triggeredWatch, ctx.id(), ExceptionsHelper.detailedMessage(e)); - } else { - logger.warn("failed to store watch record [{}]/[{}]", e, triggeredWatch, ctx.id()); - } + @Override + public void onFailure(Throwable e) { + Throwable cause = ExceptionsHelper.unwrapCause(e); + if (cause instanceof EsRejectedExecutionException) { + logger.debug("failed to store watch records due to overloaded threadpool [{}]", ExceptionsHelper.detailedMessage(e)); + } else { + logger.warn("failed to store watch records", e); } - }); - } else { - triggeredWatchStore.putAll(triggeredWatches, new ActionListener>() { - @Override - public void onResponse(List successFullSlots) { - for (Integer slot : successFullSlots) { - executeAsync(contexts.get(slot), triggeredWatches.get(slot)); - } - } - - @Override - public void onFailure(Throwable e) { - Throwable cause = ExceptionsHelper.unwrapCause(e); - if (cause instanceof EsRejectedExecutionException) { - logger.debug("failed to store watch records due to overloaded threadpool [{}]", ExceptionsHelper.detailedMessage(e)); - } else { - logger.warn("failed to store watch records", e); - } - } - }); - } + } + }); } void processEventsSync(Iterable events) throws WatcherException { @@ -250,37 +224,64 @@ public class ExecutionService extends AbstractComponent { return; } - if (triggeredWatches.size() == 1) { - final TriggeredWatch triggeredWatch = triggeredWatches.getFirst(); - final TriggeredExecutionContext ctx = contexts.getFirst(); - triggeredWatchStore.put(triggeredWatch); - executeAsync(ctx, triggeredWatch); - } else { - List slots = triggeredWatchStore.putAll(triggeredWatches); - for (Integer slot : slots) { - executeAsync(contexts.get(slot), triggeredWatches.get(slot)); - } + List slots = triggeredWatchStore.putAll(triggeredWatches); + for (Integer slot : slots) { + executeAsync(contexts.get(slot), triggeredWatches.get(slot)); } } - public WatchRecord execute(WatchExecutionContext ctx) throws IOException { - WatchRecord record; + public WatchRecord execute(WatchExecutionContext ctx) { + WatchRecord record = null; WatchLockService.Lock lock = watchLockService.acquire(ctx.watch().id()); + if (logger.isTraceEnabled()) { + logger.trace("acquired lock for [{}] -- [{}]", ctx.id(), System.identityHashCode(lock)); + } try { + currentExecutions.put(ctx.watch().id(), new WatchExecution(ctx, Thread.currentThread())); - record = executeInner(ctx); - if (ctx.recordExecution()) { - watchStore.updateStatus(ctx.watch()); + + if (ctx.knownWatch() && watchStore.get(ctx.watch().id()) == null) { + // fail fast if we are trying to execute a deleted watch + String message = "unable to find watch for record [" + ctx.id() + "], perhaps it has been deleted, ignoring..."; + logger.warn(message); + record = ctx.abortBeforeExecution(message, ExecutionState.NOT_EXECUTED_WATCH_MISSING); + } else { + logger.debug("executing watch [{}]", ctx.id().watchId()); + record = executeInner(ctx); + if (ctx.recordExecution()) { + watchStore.updateStatus(ctx.watch()); + } } - } catch (DocumentMissingException vcee) { - throw new WatchMissingException("failed to update the watch [{}] on execute perhaps it was force deleted", vcee, ctx.watch().id()); + + } catch (Exception e) { + String detailedMessage = ExceptionsHelper.detailedMessage(e); + logger.warn("failed to execute watch [{}], failure [{}]", ctx.id(), detailedMessage); + record = ctx.abortFailedExecution(detailedMessage); + } finally { + + if (ctx.knownWatch() && record != null && ctx.recordExecution()) { + try { + historyStore.put(record); + } catch (Exception e) { + logger.error("failed to update watch record [{}]", e, ctx.id()); + } + } + + try { + triggeredWatchStore.delete(ctx.id()); + } catch (Exception e) { + logger.error("failed to delete triggered watch [{}]", e, ctx.id()); + } + currentExecutions.remove(ctx.watch().id()); + if (logger.isTraceEnabled()) { + logger.trace("releasing lock for [{}] -- [{}]", ctx.id(), System.identityHashCode(lock)); + } lock.release(); + logger.trace("finished [{}]/[{}]", ctx.watch().id(), ctx.id()); } - if (ctx.recordExecution()) { - historyStore.put(record); - } + return record; } @@ -300,7 +301,7 @@ public class ExecutionService extends AbstractComponent { } catch (EsRejectedExecutionException e) { String message = "failed to run triggered watch [" + triggeredWatch.id() + "] due to thread pool capacity"; logger.debug(message); - WatchRecord record = ctx.abort(message, ExecutionState.FAILED); + WatchRecord record = ctx.abortBeforeExecution(message, ExecutionState.FAILED); historyStore.put(record); triggeredWatchStore.delete(triggeredWatch.id()); } @@ -309,12 +310,19 @@ public class ExecutionService extends AbstractComponent { WatchRecord executeInner(WatchExecutionContext ctx) throws IOException { ctx.start(); Watch watch = ctx.watch(); + + // input ctx.beforeInput(); Input.Result inputResult = ctx.inputResult(); if (inputResult == null) { inputResult = watch.input().execute(ctx); ctx.onInputResult(inputResult); } + if (inputResult.status() == Input.Result.Status.FAILURE) { + return ctx.abortFailedExecution("failed to execute watch input"); + } + + // condition ctx.beforeCondition(); Condition.Result conditionResult = ctx.conditionResult(); if (conditionResult == null) { @@ -323,23 +331,26 @@ public class ExecutionService extends AbstractComponent { } if (conditionResult.met()) { + + // actions ctx.beforeAction(); for (ActionWrapper action : watch.actions()) { ActionWrapper.Result actionResult = action.execute(ctx); ctx.onActionResult(actionResult); } } + return ctx.finish(); } - void executeRecords(Collection triggeredWatches) { + void executeTriggeredWatches(Collection triggeredWatches) { assert triggeredWatches != null; int counter = 0; for (TriggeredWatch triggeredWatch : triggeredWatches) { Watch watch = watchStore.get(triggeredWatch.id().watchId()); if (watch == null) { String message = "unable to find watch for record [" + triggeredWatch.id().watchId() + "]/[" + triggeredWatch.id() + "], perhaps it has been deleted, ignoring..."; - WatchRecord record = new WatchRecord(triggeredWatch.id(), triggeredWatch.triggerEvent(), message, ExecutionState.DELETED_WHILE_QUEUED); + WatchRecord record = new WatchRecord(triggeredWatch.id(), triggeredWatch.triggerEvent(), message, ExecutionState.NOT_EXECUTED_WATCH_MISSING); historyStore.put(record); triggeredWatchStore.delete(triggeredWatch.id()); } else { @@ -361,47 +372,7 @@ public class ExecutionService extends AbstractComponent { @Override public void run() { - if (!started.get()) { - logger.debug("can't initiate watch execution as execution service is not started, ignoring it..."); - return; - } - logger.trace("executing [{}] [{}]", ctx.watch().id(), ctx.id()); - - WatchRecord record = null; - WatchLockService.Lock lock = watchLockService.acquire(ctx.watch().id()); - try { - currentExecutions.put(ctx.watch().id(), new WatchExecution(ctx, Thread.currentThread())); - if (watchStore.get(ctx.watch().id()) == null) { - //Fail fast if we are trying to execute a deleted watch - String message = "unable to find watch for record [" + ctx.id() + "], perhaps it has been deleted, ignoring..."; - record = ctx.abort(message, ExecutionState.DELETED_WHILE_QUEUED); - } else { - logger.debug("checking watch [{}]", ctx.id().watchId()); - record = executeInner(ctx); - if (ctx.recordExecution()) { - watchStore.updateStatus(ctx.watch()); - } - } - } catch (Exception e) { - String detailedMessage = ExceptionsHelper.detailedMessage(e); - logger.warn("failed to execute watch [{}], failure [{}]", ctx.id().value(), detailedMessage); - record = ctx.abort(detailedMessage, ExecutionState.FAILED); - } finally { - // The recordExecution doesn't need to check if it is in a started state here, because the - // ExecutionService doesn't stop before the WatchLockService stops - if (record != null && ctx.recordExecution()) { - try { - historyStore.put(record); - triggeredWatchStore.delete(ctx.id()); - } catch (Exception e) { - logger.error("failed to update watch record [{}], failure [{}], record failure if any [{}]", ctx.id(), ExceptionsHelper.detailedMessage(e), record.message()); - } - } - currentExecutions.remove(ctx.watch().id()); - lock.release(); - logger.trace("finished [{}]/[{}]", ctx.watch().id(), ctx.id()); - } - + execute(ctx); } } diff --git a/src/main/java/org/elasticsearch/watcher/execution/ExecutionState.java b/src/main/java/org/elasticsearch/watcher/execution/ExecutionState.java index bbc6c08f843..7400b9b3a07 100644 --- a/src/main/java/org/elasticsearch/watcher/execution/ExecutionState.java +++ b/src/main/java/org/elasticsearch/watcher/execution/ExecutionState.java @@ -15,7 +15,7 @@ public enum ExecutionState { THROTTLED, EXECUTED, FAILED, - DELETED_WHILE_QUEUED; + NOT_EXECUTED_WATCH_MISSING; public String id() { return name().toLowerCase(Locale.ROOT); diff --git a/src/main/java/org/elasticsearch/watcher/execution/ManualExecutionContext.java b/src/main/java/org/elasticsearch/watcher/execution/ManualExecutionContext.java index 821463c9627..732bd63feab 100644 --- a/src/main/java/org/elasticsearch/watcher/execution/ManualExecutionContext.java +++ b/src/main/java/org/elasticsearch/watcher/execution/ManualExecutionContext.java @@ -25,8 +25,9 @@ public class ManualExecutionContext extends WatchExecutionContext { private final Map actionModes; private final boolean recordExecution; + private final boolean knownWatch; - ManualExecutionContext(Watch watch, DateTime executionTime, ManualTriggerEvent triggerEvent, + ManualExecutionContext(Watch watch, boolean knownWatch, DateTime executionTime, ManualTriggerEvent triggerEvent, TimeValue defaultThrottlePeriod, Input.Result inputResult, Condition.Result conditionResult, Map actionModes, boolean recordExecution) { @@ -34,6 +35,7 @@ public class ManualExecutionContext extends WatchExecutionContext { this.actionModes = actionModes; this.recordExecution = recordExecution; + this.knownWatch = knownWatch; if (inputResult != null) { onInputResult(inputResult); @@ -57,6 +59,11 @@ public class ManualExecutionContext extends WatchExecutionContext { } } + @Override + public boolean knownWatch() { + return knownWatch; + } + @Override public final boolean simulateAction(String actionId) { ActionExecutionMode mode = actionModes.get(Builder.ALL); @@ -80,8 +87,8 @@ public class ManualExecutionContext extends WatchExecutionContext { return recordExecution; } - public static Builder builder(Watch watch, ManualTriggerEvent event, TimeValue defaultThrottlePeriod) { - return new Builder(watch, event, defaultThrottlePeriod); + public static Builder builder(Watch watch, boolean knownWatch, ManualTriggerEvent event, TimeValue defaultThrottlePeriod) { + return new Builder(watch, knownWatch, event, defaultThrottlePeriod); } public static class Builder { @@ -89,6 +96,7 @@ public class ManualExecutionContext extends WatchExecutionContext { static final String ALL = "_all"; private final Watch watch; + private final boolean knownWatch; private final ManualTriggerEvent triggerEvent; private final TimeValue defaultThrottlePeriod; protected DateTime executionTime; @@ -97,8 +105,9 @@ public class ManualExecutionContext extends WatchExecutionContext { private Input.Result inputResult; private Condition.Result conditionResult; - private Builder(Watch watch, ManualTriggerEvent triggerEvent, TimeValue defaultThrottlePeriod) { + private Builder(Watch watch, boolean knownWatch, ManualTriggerEvent triggerEvent, TimeValue defaultThrottlePeriod) { this.watch = watch; + this.knownWatch = knownWatch; assert triggerEvent != null; this.triggerEvent = triggerEvent; this.defaultThrottlePeriod = defaultThrottlePeriod; @@ -140,7 +149,7 @@ public class ManualExecutionContext extends WatchExecutionContext { if (executionTime == null) { executionTime = DateTime.now(UTC); } - return new ManualExecutionContext(watch, executionTime, triggerEvent, defaultThrottlePeriod, inputResult, conditionResult, actionModes.build(), recordExecution); + return new ManualExecutionContext(watch, knownWatch, executionTime, triggerEvent, defaultThrottlePeriod, inputResult, conditionResult, actionModes.build(), recordExecution); } } } diff --git a/src/main/java/org/elasticsearch/watcher/execution/TriggeredExecutionContext.java b/src/main/java/org/elasticsearch/watcher/execution/TriggeredExecutionContext.java index b97b8a5cf7f..d679fcc6baa 100644 --- a/src/main/java/org/elasticsearch/watcher/execution/TriggeredExecutionContext.java +++ b/src/main/java/org/elasticsearch/watcher/execution/TriggeredExecutionContext.java @@ -18,6 +18,11 @@ public class TriggeredExecutionContext extends WatchExecutionContext { super(watch, executionTime, triggerEvent, defaultThrottlePeriod); } + @Override + public boolean knownWatch() { + return true; + } + @Override public final boolean simulateAction(String actionId) { return false; diff --git a/src/main/java/org/elasticsearch/watcher/execution/TriggeredWatchStore.java b/src/main/java/org/elasticsearch/watcher/execution/TriggeredWatchStore.java index b917caaf3ba..f4552f456d0 100644 --- a/src/main/java/org/elasticsearch/watcher/execution/TriggeredWatchStore.java +++ b/src/main/java/org/elasticsearch/watcher/execution/TriggeredWatchStore.java @@ -20,6 +20,7 @@ import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchType; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.common.collect.ImmutableList; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.ImmutableSettings; @@ -158,6 +159,27 @@ public class TriggeredWatchStore extends AbstractComponent { } public void putAll(final List triggeredWatches, final ActionListener> listener) throws TriggeredWatchException { + + if (triggeredWatches.isEmpty()) { + listener.onResponse(ImmutableList.of()); + return; + } + + if (triggeredWatches.size() == 1) { + put(triggeredWatches.get(0), new ActionListener() { + @Override + public void onResponse(Boolean success) { + listener.onResponse(ImmutableList.of(0)); + } + + @Override + public void onFailure(Throwable e) { + listener.onFailure(e); + } + }); + return; + } + ensureStarted(); try { BulkRequest request = new BulkRequest(); diff --git a/src/main/java/org/elasticsearch/watcher/execution/WatchExecutionContext.java b/src/main/java/org/elasticsearch/watcher/execution/WatchExecutionContext.java index e7e987c802b..64adee1aeb5 100644 --- a/src/main/java/org/elasticsearch/watcher/execution/WatchExecutionContext.java +++ b/src/main/java/org/elasticsearch/watcher/execution/WatchExecutionContext.java @@ -45,6 +45,8 @@ public abstract class WatchExecutionContext { private long actualExecutionStartMs; + private boolean sealed = false; + public WatchExecutionContext(Watch watch, DateTime executionTime, TriggerEvent triggerEvent, TimeValue defaultThrottlePeriod) { this.id = new Wid(watch.id(), watch.nonce(), executionTime); this.watch = watch; @@ -53,6 +55,16 @@ public abstract class WatchExecutionContext { this.defaultThrottlePeriod = defaultThrottlePeriod; } + /** + * @return true if the watch associated with this context is known to watcher (i.e. it's stored + * in watcher. This plays a key role in how we handle execution. For example, if + * the watch is known, but then the watch is not there (perhaps deleted in between) + * we abort execution. It also plays a part (along with {@link #recordExecution()} + * in the decision of whether the watch record should be stored and if the watch + * status should be updated. + */ + public abstract boolean knownWatch(); + /** * @return true if this action should be simulated */ @@ -107,6 +119,7 @@ public abstract class WatchExecutionContext { transformedPayload = payload; return transformedPayload; } + beforeWatchTransform(); this.transformResult = watch.transform().execute(this, payload); this.payload = transformResult.payload(); this.transformedPayload = this.payload; @@ -118,12 +131,16 @@ public abstract class WatchExecutionContext { } public void beforeInput() { + assert !sealed; executionPhase = ExecutionPhase.INPUT; } public void onInputResult(Input.Result inputResult) { + assert !sealed; this.inputResult = inputResult; - this.payload = inputResult.payload(); + if (inputResult.status() == Input.Result.Status.SUCCESS) { + this.payload = inputResult.payload(); + } } public Input.Result inputResult() { @@ -131,10 +148,12 @@ public abstract class WatchExecutionContext { } public void beforeCondition() { + assert !sealed; executionPhase = ExecutionPhase.CONDITION; } public void onConditionResult(Condition.Result conditionResult) { + assert !sealed; this.conditionResult = conditionResult; if (recordExecution()) { watch.status().onCheck(conditionResult.met(), executionTime); @@ -147,23 +166,21 @@ public abstract class WatchExecutionContext { } public void beforeWatchTransform() { + assert !sealed; this.executionPhase = ExecutionPhase.WATCH_TRANSFORM; } - public void onTransformResult(Transform.Result transformResult) { - this.transformResult = transformResult; - this.payload = transformResult.payload(); - } - public Transform.Result transformResult() { return transformResult; } public void beforeAction() { + assert !sealed; executionPhase = ExecutionPhase.ACTIONS; } public void onActionResult(ActionWrapper.Result result) { + assert !sealed; actionsResults.put(result.id(), result); if (recordExecution()) { watch.status().onActionResult(result.id(), executionTime, result.action()); @@ -174,21 +191,31 @@ public abstract class WatchExecutionContext { return new ExecutableActions.Results(actionsResults); } + public WatchRecord abortBeforeExecution(String message, ExecutionState state) { + sealed = true; + return new WatchRecord(id, triggerEvent, message, state); + } + public void start() { + assert !sealed; actualExecutionStartMs = System.currentTimeMillis(); } + public WatchRecord abortFailedExecution(String message) { + sealed = true; + long executionFinishMs = System.currentTimeMillis(); + WatchExecutionResult result = new WatchExecutionResult(this, executionFinishMs - actualExecutionStartMs); + return new WatchRecord(this, result, message); + } + public WatchRecord finish() { + sealed = true; executionPhase = ExecutionPhase.FINISHED; long executionFinishMs = System.currentTimeMillis(); WatchExecutionResult result = new WatchExecutionResult(this, executionFinishMs - actualExecutionStartMs); return new WatchRecord(this, result); } - public WatchRecord abort(String message, ExecutionState state) { - return new WatchRecord(id, triggerEvent, message, state); - } - public WatchExecutionSnapshot createSnapshot(Thread executionThread) { return new WatchExecutionSnapshot(this, executionThread.getStackTrace()); } diff --git a/src/main/java/org/elasticsearch/watcher/execution/WatchExecutionResult.java b/src/main/java/org/elasticsearch/watcher/execution/WatchExecutionResult.java index 59a8f2d98c7..ec96a700967 100644 --- a/src/main/java/org/elasticsearch/watcher/execution/WatchExecutionResult.java +++ b/src/main/java/org/elasticsearch/watcher/execution/WatchExecutionResult.java @@ -27,8 +27,8 @@ public class WatchExecutionResult implements ToXContent { private final DateTime executionTime; private final long executionDurationMs; - private final Input.Result inputResult; - private final Condition.Result conditionResult; + private final @Nullable Input.Result inputResult; + private final @Nullable Condition.Result conditionResult; private final @Nullable Transform.Result transformResult; private final ExecutableActions.Results actionsResults; diff --git a/src/main/java/org/elasticsearch/watcher/history/WatchRecord.java b/src/main/java/org/elasticsearch/watcher/history/WatchRecord.java index 7f1325fcaef..00f473b014e 100644 --- a/src/main/java/org/elasticsearch/watcher/history/WatchRecord.java +++ b/src/main/java/org/elasticsearch/watcher/history/WatchRecord.java @@ -10,7 +10,10 @@ import org.elasticsearch.common.ParseField; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.watcher.condition.Condition; -import org.elasticsearch.watcher.execution.*; +import org.elasticsearch.watcher.execution.ExecutionState; +import org.elasticsearch.watcher.execution.WatchExecutionContext; +import org.elasticsearch.watcher.execution.WatchExecutionResult; +import org.elasticsearch.watcher.execution.Wid; import org.elasticsearch.watcher.input.ExecutableInput; import org.elasticsearch.watcher.trigger.TriggerEvent; import org.elasticsearch.watcher.watch.Watch; @@ -31,6 +34,9 @@ public class WatchRecord implements ToXContent { private final @Nullable String message; private final @Nullable WatchExecutionResult executionResult; + /** + * Called when the execution was aborted before it started + */ public WatchRecord(Wid id, TriggerEvent triggerEvent, String message, ExecutionState state) { this.id = id; this.triggerEvent = triggerEvent; @@ -42,6 +48,24 @@ public class WatchRecord implements ToXContent { this.metadata = null; } + /** + * Called when the execution was aborted due to an error during execution (the given result should reflect + * were exactly the execution failed) + */ + public WatchRecord(WatchExecutionContext context, WatchExecutionResult executionResult, String message) { + this.id = context.id(); + this.triggerEvent = context.triggerEvent(); + this.condition = context.watch().condition().condition(); + this.input = context.watch().input(); + this.metadata = context.watch().metadata(); + this.executionResult = executionResult; + this.message = message; + this.state = ExecutionState.FAILED; + } + + /** + * Called when the execution finished. + */ public WatchRecord(WatchExecutionContext context, WatchExecutionResult executionResult) { this.id = context.id(); this.triggerEvent = context.triggerEvent(); @@ -92,7 +116,7 @@ public class WatchRecord implements ToXContent { return metadata; } - public WatchExecutionResult execution() { + public WatchExecutionResult result() { return executionResult; } diff --git a/src/main/java/org/elasticsearch/watcher/input/ExecutableInput.java b/src/main/java/org/elasticsearch/watcher/input/ExecutableInput.java index 8423fb26cb0..cc2c056d154 100644 --- a/src/main/java/org/elasticsearch/watcher/input/ExecutableInput.java +++ b/src/main/java/org/elasticsearch/watcher/input/ExecutableInput.java @@ -39,7 +39,7 @@ public abstract class ExecutableInput i /** * Executes this input */ - public abstract R execute(WatchExecutionContext ctx) throws IOException; + public abstract R execute(WatchExecutionContext ctx); @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { diff --git a/src/main/java/org/elasticsearch/watcher/input/Input.java b/src/main/java/org/elasticsearch/watcher/input/Input.java index cbfc686b8b2..8f931ba983c 100644 --- a/src/main/java/org/elasticsearch/watcher/input/Input.java +++ b/src/main/java/org/elasticsearch/watcher/input/Input.java @@ -5,12 +5,14 @@ */ package org.elasticsearch.watcher.input; +import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.common.ParseField; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.watcher.watch.Payload; import java.io.IOException; +import java.util.Locale; /** * @@ -21,28 +23,64 @@ public interface Input extends ToXContent { abstract class Result implements ToXContent { + public enum Status { + SUCCESS, FAILURE + } + + protected final Status status; protected final String type; private final Payload payload; + private final String reason; - public Result(String type, Payload payload) { + protected Result(String type, Payload payload) { + this.status = Status.SUCCESS; this.type = type; this.payload = payload; + this.reason = null; + } + + protected Result(String type, Exception e) { + this.status = Status.FAILURE; + this.type = type; + this.reason = ExceptionsHelper.detailedMessage(e); + this.payload = null; } public String type() { return type; } + public Status status() { + return status; + } public Payload payload() { + assert status == Status.SUCCESS; return payload; } + public String reason() { + assert status == Status.FAILURE; + return reason; + } + @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); + builder.field(Field.STATUS.getPreferredName(), status.name().toLowerCase(Locale.ROOT)); builder.field(Field.TYPE.getPreferredName(), type); - builder.field(Field.PAYLOAD.getPreferredName(), payload, params); + switch (status) { + case SUCCESS: + assert payload != null; + builder.field(Field.PAYLOAD.getPreferredName(), payload, params); + break; + case FAILURE: + assert reason != null; + builder.field(Field.REASON.getPreferredName(), reason); + break; + default: + assert false; + } typeXContent(builder, params); return builder.endObject(); } @@ -57,7 +95,9 @@ public interface Input extends ToXContent { } interface Field { + ParseField STATUS = new ParseField("status"); ParseField TYPE = new ParseField("type"); ParseField PAYLOAD = new ParseField("payload"); + ParseField REASON = new ParseField("reason"); } } diff --git a/src/main/java/org/elasticsearch/watcher/input/http/ExecutableHttpInput.java b/src/main/java/org/elasticsearch/watcher/input/http/ExecutableHttpInput.java index 4af75a672b6..fcc582a77dd 100644 --- a/src/main/java/org/elasticsearch/watcher/input/http/ExecutableHttpInput.java +++ b/src/main/java/org/elasticsearch/watcher/input/http/ExecutableHttpInput.java @@ -12,6 +12,7 @@ import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.watcher.execution.WatchExecutionContext; import org.elasticsearch.watcher.input.ExecutableInput; +import org.elasticsearch.watcher.input.search.SearchInput; import org.elasticsearch.watcher.support.Variables; import org.elasticsearch.watcher.support.XContentFilterKeysUtils; import org.elasticsearch.watcher.support.http.HttpClient; @@ -20,7 +21,6 @@ import org.elasticsearch.watcher.support.http.HttpResponse; import org.elasticsearch.watcher.support.template.TemplateEngine; import org.elasticsearch.watcher.watch.Payload; -import java.io.IOException; import java.util.Map; /** @@ -36,15 +36,23 @@ public class ExecutableHttpInput extends ExecutableInput model = Variables.createCtxModel(ctx, null); - HttpRequest request = input.getRequest().render(templateEngine, model); + public HttpInput.Result execute(WatchExecutionContext ctx) { + HttpRequest request = null; + try { + Map model = Variables.createCtxModel(ctx, null); + request = input.getRequest().render(templateEngine, model); + return doExecute(ctx, request); + } catch (Exception e) { + logger.error("failed to execute [{}] input for [{}]", e, HttpInput.TYPE, ctx.watch()); + return new HttpInput.Result(request, e); + } + } + HttpInput.Result doExecute(WatchExecutionContext ctx, HttpRequest request) throws Exception { HttpResponse response = client.execute(request); if (!response.hasContent()) { - return new HttpInput.Result(Payload.EMPTY, request, response.status()); + return new HttpInput.Result(request, response.status(), Payload.EMPTY); } XContentType contentType = response.xContentType(); @@ -52,7 +60,6 @@ public class ExecutableHttpInput extends ExecutableInput 0) { + builder.field(Field.STATUS_CODE.getPreferredName(), statusCode); + } + return builder.endObject(); } } @@ -190,7 +201,7 @@ public class HttpInput implements Input { interface Field extends Input.Field { ParseField REQUEST = new ParseField("request"); ParseField EXTRACT = new ParseField("extract"); - ParseField STATUS = new ParseField("status"); + ParseField STATUS_CODE = new ParseField("status_code"); ParseField RESPONSE_CONTENT_TYPE = new ParseField("response_content_type"); } } diff --git a/src/main/java/org/elasticsearch/watcher/input/none/ExecutableNoneInput.java b/src/main/java/org/elasticsearch/watcher/input/none/ExecutableNoneInput.java index 400b19935e3..1a632a2b965 100644 --- a/src/main/java/org/elasticsearch/watcher/input/none/ExecutableNoneInput.java +++ b/src/main/java/org/elasticsearch/watcher/input/none/ExecutableNoneInput.java @@ -10,8 +10,6 @@ import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.watcher.execution.WatchExecutionContext; import org.elasticsearch.watcher.input.ExecutableInput; -import java.io.IOException; - /** * */ @@ -22,7 +20,7 @@ public class ExecutableNoneInput extends ExecutableInput { @Override protected void doStop() throws ElasticsearchException { - } @Override diff --git a/src/main/java/org/elasticsearch/watcher/support/Variables.java b/src/main/java/org/elasticsearch/watcher/support/Variables.java index 1a80153c5c2..b8f9d16f50b 100644 --- a/src/main/java/org/elasticsearch/watcher/support/Variables.java +++ b/src/main/java/org/elasticsearch/watcher/support/Variables.java @@ -17,6 +17,7 @@ import java.util.Map; public final class Variables { public static final String CTX = "ctx"; + public static final String ID = "id"; public static final String WATCH_ID = "watch_id"; public static final String EXECUTION_TIME = "execution_time"; public static final String TRIGGER = "trigger"; @@ -25,6 +26,7 @@ public final class Variables { public static Map createCtxModel(WatchExecutionContext ctx, Payload payload) { Map vars = new HashMap<>(); + vars.put(ID, ctx.id().value()); vars.put(WATCH_ID, ctx.watch().id()); vars.put(EXECUTION_TIME, ctx.executionTime()); vars.put(TRIGGER, ctx.triggerEvent().data()); diff --git a/src/main/java/org/elasticsearch/watcher/support/http/HttpClient.java b/src/main/java/org/elasticsearch/watcher/support/http/HttpClient.java index 1fe2dd3e5d6..b63533f656f 100644 --- a/src/main/java/org/elasticsearch/watcher/support/http/HttpClient.java +++ b/src/main/java/org/elasticsearch/watcher/support/http/HttpClient.java @@ -6,6 +6,7 @@ package org.elasticsearch.watcher.support.http; import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.ElasticsearchTimeoutException; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.common.base.Charsets; import org.elasticsearch.common.component.AbstractLifecycleComponent; @@ -83,6 +84,14 @@ public class HttpClient extends AbstractLifecycleComponent { } public HttpResponse execute(HttpRequest request) throws IOException { + try { + return doExecute(request); + } catch (SocketTimeoutException ste) { + throw new ElasticsearchTimeoutException("failed to execute http request. timeout expired", ste); + } + } + + public HttpResponse doExecute(HttpRequest request) throws IOException { String queryString = null; if (request.params() != null && !request.params().isEmpty()) { StringBuilder builder = new StringBuilder(); diff --git a/src/main/java/org/elasticsearch/watcher/support/init/proxy/ClientProxy.java b/src/main/java/org/elasticsearch/watcher/support/init/proxy/ClientProxy.java index adb69e5b3c5..0db5a030c2b 100644 --- a/src/main/java/org/elasticsearch/watcher/support/init/proxy/ClientProxy.java +++ b/src/main/java/org/elasticsearch/watcher/support/init/proxy/ClientProxy.java @@ -28,6 +28,8 @@ import org.elasticsearch.transport.TransportMessage; import org.elasticsearch.watcher.shield.ShieldIntegration; import org.elasticsearch.watcher.support.init.InitializingService; +import java.util.concurrent.TimeUnit; + /** * A lazily initialized proxy to an elasticsearch {@link Client}. Inject this proxy whenever a client * needs to injected to be avoid circular dependencies issues. @@ -88,7 +90,7 @@ public class ClientProxy implements InitializingService.Initializable { } public SearchResponse search(SearchRequest request) { - return client.search(preProcess(request)).actionGet(); + return client.search(preProcess(request)).actionGet(5, TimeUnit.SECONDS); } public SearchResponse searchScroll(String scrollId, TimeValue timeout) { diff --git a/src/main/java/org/elasticsearch/watcher/transport/actions/execute/TransportExecuteWatchAction.java b/src/main/java/org/elasticsearch/watcher/transport/actions/execute/TransportExecuteWatchAction.java index f80814d58bf..18e35f94f39 100644 --- a/src/main/java/org/elasticsearch/watcher/transport/actions/execute/TransportExecuteWatchAction.java +++ b/src/main/java/org/elasticsearch/watcher/transport/actions/execute/TransportExecuteWatchAction.java @@ -87,7 +87,7 @@ public class TransportExecuteWatchAction extends WatcherTransportAction expectedModel = ImmutableMap.builder() .put("ctx", ImmutableMap.builder() + .put("id", ctx.id().value()) .put("watch_id", "watch1") .put("payload", data) .put("metadata", metadata) diff --git a/src/test/java/org/elasticsearch/watcher/actions/logging/LoggingActionTests.java b/src/test/java/org/elasticsearch/watcher/actions/logging/LoggingActionTests.java index dd7a6a3ba83..eef7a2615ed 100644 --- a/src/test/java/org/elasticsearch/watcher/actions/logging/LoggingActionTests.java +++ b/src/test/java/org/elasticsearch/watcher/actions/logging/LoggingActionTests.java @@ -57,10 +57,15 @@ public class LoggingActionTests extends ElasticsearchTestCase { public void testExecute() throws Exception { final DateTime now = DateTime.now(UTC); + WatchExecutionContext ctx = WatcherTestUtils.mockExecutionContextBuilder("_watch_id") + .time("_watch_id", now) + .buildMock(); + final Map expectedModel = ImmutableMap.builder() .put("ctx", ImmutableMap.builder() - .put("execution_time", now) + .put("id", ctx.id().value()) .put("watch_id", "_watch_id") + .put("execution_time", now) .put("payload", ImmutableMap.of()) .put("metadata", ImmutableMap.of()) .put("trigger", ImmutableMap.builder() @@ -76,9 +81,7 @@ public class LoggingActionTests extends ElasticsearchTestCase { ExecutableLoggingAction executable = new ExecutableLoggingAction(action, logger, actionLogger, engine); when(engine.render(template, expectedModel)).thenReturn(text); - WatchExecutionContext ctx = WatcherTestUtils.mockExecutionContextBuilder("_watch_id") - .time("_watch_id", now) - .buildMock(); + Action.Result result = executable.execute("_id", ctx, new Payload.Simple()); verifyLogger(actionLogger, level, text); diff --git a/src/test/java/org/elasticsearch/watcher/actions/throttler/ActionThrottleTests.java b/src/test/java/org/elasticsearch/watcher/actions/throttler/ActionThrottleTests.java index 0e13ef310d1..8cdd31f42c1 100644 --- a/src/test/java/org/elasticsearch/watcher/actions/throttler/ActionThrottleTests.java +++ b/src/test/java/org/elasticsearch/watcher/actions/throttler/ActionThrottleTests.java @@ -44,6 +44,8 @@ import java.util.Set; import java.util.concurrent.TimeUnit; import static org.elasticsearch.common.joda.time.DateTimeZone.UTC; +import static org.elasticsearch.watcher.actions.ActionBuilders.loggingAction; +import static org.elasticsearch.watcher.actions.ActionBuilders.webhookAction; import static org.elasticsearch.watcher.client.WatchSourceBuilders.watchBuilder; import static org.elasticsearch.watcher.trigger.TriggerBuilders.schedule; import static org.elasticsearch.watcher.trigger.schedule.Schedules.interval; @@ -73,7 +75,7 @@ public class ActionThrottleTests extends AbstractWatcherIntegrationTests { ManualExecutionContext ctx = getManualExecutionContext(new TimeValue(0, TimeUnit.SECONDS)); WatchRecord watchRecord = executionService().execute(ctx); - assertThat(watchRecord.execution().actionsResults().get("test_id").action().status(), equalTo(Action.Result.Status.SIMULATED)); + assertThat(watchRecord.result().actionsResults().get("test_id").action().status(), equalTo(Action.Result.Status.SIMULATED)); if (timeWarped()) { timeWarp().clock().fastForward(TimeValue.timeValueSeconds(1)); } @@ -88,9 +90,9 @@ public class ActionThrottleTests extends AbstractWatcherIntegrationTests { ctx = getManualExecutionContext(new TimeValue(0, TimeUnit.SECONDS)); watchRecord = executionService().execute(ctx); if (ack) { - assertThat(watchRecord.execution().actionsResults().get("test_id").action().status(), equalTo(Action.Result.Status.THROTTLED)); + assertThat(watchRecord.result().actionsResults().get("test_id").action().status(), equalTo(Action.Result.Status.THROTTLED)); } else { - assertThat(watchRecord.execution().actionsResults().get("test_id").action().status(), equalTo(Action.Result.Status.SIMULATED)); + assertThat(watchRecord.result().actionsResults().get("test_id").action().status(), equalTo(Action.Result.Status.SIMULATED)); } } @@ -133,7 +135,7 @@ public class ActionThrottleTests extends AbstractWatcherIntegrationTests { ctx = getManualExecutionContext(new TimeValue(0, TimeUnit.SECONDS)); WatchRecord watchRecord = executionService().execute(ctx); - for (ActionWrapper.Result result : watchRecord.execution().actionsResults()) { + for (ActionWrapper.Result result : watchRecord.result().actionsResults()) { if (ackingActions.contains(result.id())) { assertThat(result.action().status(), equalTo(Action.Result.Status.THROTTLED)); } else { @@ -162,12 +164,12 @@ public class ActionThrottleTests extends AbstractWatcherIntegrationTests { ManualExecutionContext ctx = getManualExecutionContext(new TimeValue(0, TimeUnit.SECONDS)); WatchRecord watchRecord = executionService().execute(ctx); long firstExecution = System.currentTimeMillis(); - for(ActionWrapper.Result actionResult : watchRecord.execution().actionsResults()) { + for(ActionWrapper.Result actionResult : watchRecord.result().actionsResults()) { assertThat(actionResult.action().status(), equalTo(Action.Result.Status.SIMULATED)); } ctx = getManualExecutionContext(new TimeValue(0, TimeUnit.SECONDS)); watchRecord = executionService().execute(ctx); - for(ActionWrapper.Result actionResult : watchRecord.execution().actionsResults()) { + for(ActionWrapper.Result actionResult : watchRecord.result().actionsResults()) { assertThat(actionResult.action().status(), equalTo(Action.Result.Status.THROTTLED)); } @@ -178,18 +180,14 @@ public class ActionThrottleTests extends AbstractWatcherIntegrationTests { assertBusy(new Runnable() { @Override public void run() { - try { - ManualExecutionContext ctx = getManualExecutionContext(new TimeValue(0, TimeUnit.SECONDS)); - WatchRecord watchRecord = executionService().execute(ctx); - for (ActionWrapper.Result actionResult : watchRecord.execution().actionsResults()) { - if ("ten_sec_throttle".equals(actionResult.id())) { - assertThat(actionResult.action().status(), equalTo(Action.Result.Status.SIMULATED)); - } else { - assertThat(actionResult.action().status(), equalTo(Action.Result.Status.THROTTLED)); - } + ManualExecutionContext ctx = getManualExecutionContext(new TimeValue(0, TimeUnit.SECONDS)); + WatchRecord watchRecord = executionService().execute(ctx); + for (ActionWrapper.Result actionResult : watchRecord.result().actionsResults()) { + if ("ten_sec_throttle".equals(actionResult.id())) { + assertThat(actionResult.action().status(), equalTo(Action.Result.Status.SIMULATED)); + } else { + assertThat(actionResult.action().status(), equalTo(Action.Result.Status.THROTTLED)); } - } catch (IOException ioe) { - throw new ElasticsearchException("failed to execute", ioe); } } }, 11000 - (System.currentTimeMillis() - firstExecution), TimeUnit.MILLISECONDS); @@ -258,14 +256,13 @@ public class ActionThrottleTests extends AbstractWatcherIntegrationTests { }, 6, TimeUnit.SECONDS); } - @Test @Slow + @Test @Slow @Repeat(iterations = 20) public void testWatchThrottlePeriod() throws Exception { WatchSourceBuilder watchSourceBuilder = watchBuilder() .trigger(schedule(interval("60m"))) .defaultThrottlePeriod(new TimeValue(1, TimeUnit.SECONDS)); AvailableAction availableAction = randomFrom(AvailableAction.values()); - final String actionType = availableAction.type(); watchSourceBuilder.addAction("default_global_throttle", availableAction.action()); PutWatchResponse putWatchResponse = watcherClient().putWatch(new PutWatchRequest("_id", watchSourceBuilder)).actionGet(); @@ -324,35 +321,35 @@ public class ActionThrottleTests extends AbstractWatcherIntegrationTests { @Test @Slow public void testFailingActionDoesGetThrottled() throws Exception { TimeValue throttlePeriod = new TimeValue(60, TimeUnit.MINUTES); - WatchSourceBuilder watchSourceBuilder = watchBuilder() - .trigger(new ScheduleTrigger(new IntervalSchedule(new IntervalSchedule.Interval(60, IntervalSchedule.Interval.Unit.MINUTES)))) - .defaultThrottlePeriod(throttlePeriod); - watchSourceBuilder.addAction("logging", LoggingAction.builder(new Template.Builder.Inline("test out").build())); - watchSourceBuilder.addAction("failing_hook", WebhookAction.builder(HttpRequestTemplate.builder("unknown.foo", 80).build())); - PutWatchResponse putWatchResponse = watcherClient().putWatch(new PutWatchRequest("_id", watchSourceBuilder)).actionGet(); + PutWatchResponse putWatchResponse = watcherClient().preparePutWatch("_id").setSource(watchBuilder() + .trigger(new ScheduleTrigger(new IntervalSchedule(new IntervalSchedule.Interval(60, IntervalSchedule.Interval.Unit.MINUTES)))) + .defaultThrottlePeriod(throttlePeriod) + .addAction("logging", loggingAction("test out")) + .addAction("failing_hook", webhookAction(HttpRequestTemplate.builder("unknown.foo", 80)))) + .get(); assertThat(putWatchResponse.getVersion(), greaterThan(0L)); refresh(); ManualTriggerEvent triggerEvent = new ManualTriggerEvent("_id", new ScheduleTriggerEvent(new DateTime(UTC), new DateTime(UTC))); - ManualExecutionContext.Builder ctxBuilder = ManualExecutionContext.builder(watchService().getWatch("_id"), triggerEvent, throttlePeriod); + ManualExecutionContext.Builder ctxBuilder = ManualExecutionContext.builder(watchService().getWatch("_id"), true, triggerEvent, throttlePeriod); ctxBuilder.recordExecution(true); ManualExecutionContext ctx = ctxBuilder.build(); WatchRecord watchRecord = executionService().execute(ctx); - assertThat(watchRecord.execution().actionsResults().get("logging").action().status(), equalTo(Action.Result.Status.SUCCESS)); - assertThat(watchRecord.execution().actionsResults().get("failing_hook").action().status(), equalTo(Action.Result.Status.FAILURE)); + assertThat(watchRecord.result().actionsResults().get("logging").action().status(), equalTo(Action.Result.Status.SUCCESS)); + assertThat(watchRecord.result().actionsResults().get("failing_hook").action().status(), equalTo(Action.Result.Status.FAILURE)); assertThat(watchRecord.state(), equalTo(ExecutionState.EXECUTED)); triggerEvent = new ManualTriggerEvent("_id", new ScheduleTriggerEvent(new DateTime(UTC), new DateTime(UTC))); - ctxBuilder = ManualExecutionContext.builder(watchService().getWatch("_id"), triggerEvent, throttlePeriod); + ctxBuilder = ManualExecutionContext.builder(watchService().getWatch("_id"), true, triggerEvent, throttlePeriod); ctxBuilder.recordExecution(true); ctx = ctxBuilder.build(); watchRecord = executionService().execute(ctx); - assertThat(watchRecord.execution().actionsResults().get("logging").action().status(), equalTo(Action.Result.Status.THROTTLED)); - assertThat(watchRecord.execution().actionsResults().get("failing_hook").action().status(), equalTo(Action.Result.Status.FAILURE)); + assertThat(watchRecord.result().actionsResults().get("logging").action().status(), equalTo(Action.Result.Status.THROTTLED)); + assertThat(watchRecord.result().actionsResults().get("failing_hook").action().status(), equalTo(Action.Result.Status.FAILURE)); assertThat(watchRecord.state(), equalTo(ExecutionState.THROTTLED)); } @@ -363,7 +360,7 @@ public class ActionThrottleTests extends AbstractWatcherIntegrationTests { private ManualExecutionContext getManualExecutionContext(TimeValue throttlePeriod) { ManualTriggerEvent triggerEvent = new ManualTriggerEvent("_id", new ScheduleTriggerEvent(new DateTime(UTC), new DateTime(UTC))); - return ManualExecutionContext.builder(watchService().getWatch("_id"), triggerEvent, throttlePeriod) + return ManualExecutionContext.builder(watchService().getWatch("_id"), true, triggerEvent, throttlePeriod) .executionTime(timeWarped() ? timeWarp().clock().nowUTC() : SystemClock.INSTANCE.nowUTC()) .allActionsMode(ActionExecutionMode.SIMULATE) .recordExecution(true) diff --git a/src/test/java/org/elasticsearch/watcher/execution/ExecutionServiceTests.java b/src/test/java/org/elasticsearch/watcher/execution/ExecutionServiceTests.java index 869d6743212..01c39a2e758 100644 --- a/src/test/java/org/elasticsearch/watcher/execution/ExecutionServiceTests.java +++ b/src/test/java/org/elasticsearch/watcher/execution/ExecutionServiceTests.java @@ -5,11 +5,15 @@ */ package org.elasticsearch.watcher.execution; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.common.collect.ImmutableList; import org.elasticsearch.common.collect.ImmutableMap; import org.elasticsearch.common.joda.time.DateTime; import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.test.ElasticsearchTestCase; import org.elasticsearch.watcher.actions.*; +import org.elasticsearch.watcher.actions.throttler.ActionThrottler; +import org.elasticsearch.watcher.actions.throttler.Throttler; import org.elasticsearch.watcher.condition.Condition; import org.elasticsearch.watcher.condition.ExecutableCondition; import org.elasticsearch.watcher.condition.always.AlwaysCondition; @@ -20,8 +24,6 @@ import org.elasticsearch.watcher.input.ExecutableInput; import org.elasticsearch.watcher.input.Input; import org.elasticsearch.watcher.support.clock.Clock; import org.elasticsearch.watcher.support.clock.ClockMock; -import org.elasticsearch.watcher.actions.throttler.ActionThrottler; -import org.elasticsearch.watcher.actions.throttler.Throttler; import org.elasticsearch.watcher.support.validation.WatcherSettingsValidation; import org.elasticsearch.watcher.transform.ExecutableTransform; import org.elasticsearch.watcher.transform.Transform; @@ -31,6 +33,7 @@ import org.junit.Before; import org.junit.Test; import java.util.Arrays; +import java.util.concurrent.ArrayBlockingQueue; import static org.elasticsearch.common.joda.time.DateTimeZone.UTC; import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds; @@ -46,28 +49,182 @@ public class ExecutionServiceTests extends ElasticsearchTestCase { private ExecutableInput input; private Input.Result inputResult; + private WatchStore watchStore; + private TriggeredWatchStore triggeredWatchStore; + private HistoryStore historyStore; + private WatchLockService watchLockService; private ExecutionService executionService; + private Clock clock; @Before public void init() throws Exception { payload = mock(Payload.class); input = mock(ExecutableInput.class); inputResult = mock(Input.Result.class); + when(inputResult.status()).thenReturn(Input.Result.Status.SUCCESS); when(inputResult.payload()).thenReturn(payload); when(input.execute(any(WatchExecutionContext.class))).thenReturn(inputResult); - HistoryStore historyStore = mock(HistoryStore.class); - TriggeredWatchStore triggeredWatchStore = mock(TriggeredWatchStore.class); + watchStore = mock(WatchStore.class); + triggeredWatchStore = mock(TriggeredWatchStore.class); + historyStore = mock(HistoryStore.class); + WatchExecutor executor = mock(WatchExecutor.class); - WatchStore watchStore = mock(WatchStore.class); - WatchLockService watchLockService = mock(WatchLockService.class); + when(executor.queue()).thenReturn(new ArrayBlockingQueue(1)); + + watchLockService = mock(WatchLockService.class); WatcherSettingsValidation settingsValidator = mock(WatcherSettingsValidation.class); - Clock clock = new ClockMock(); + clock = new ClockMock(); executionService = new ExecutionService(ImmutableSettings.EMPTY, historyStore, triggeredWatchStore, executor, watchStore, watchLockService, clock, settingsValidator); + + ClusterState clusterState = mock(ClusterState.class); + when(triggeredWatchStore.loadTriggeredWatches(clusterState)).thenReturn(ImmutableList.of()); + executionService.start(clusterState); } @Test public void testExecute() throws Exception { + WatchLockService.Lock lock = mock(WatchLockService.Lock.class); + when(watchLockService.acquire("_id")).thenReturn(lock); + + Watch watch = mock(Watch.class); + when(watch.id()).thenReturn("_id"); + when(watchStore.get("_id")).thenReturn(watch); + + ScheduleTriggerEvent event = new ScheduleTriggerEvent("_id", clock.nowUTC(), clock.nowUTC()); + WatchExecutionContext context = new TriggeredExecutionContext(watch, clock.nowUTC(), event, timeValueSeconds(5)); + + Condition.Result conditionResult = AlwaysCondition.Result.INSTANCE; + ExecutableCondition condition = mock(ExecutableCondition.class); + when(condition.execute(any(WatchExecutionContext.class))).thenReturn(conditionResult); + + // watch level transform + Transform.Result watchTransformResult = mock(Transform.Result.class); + when(watchTransformResult.payload()).thenReturn(payload); + ExecutableTransform watchTransform = mock(ExecutableTransform.class); + when(watchTransform.execute(context, payload)).thenReturn(watchTransformResult); + + // action throttler + Throttler.Result throttleResult = mock(Throttler.Result.class); + when(throttleResult.throttle()).thenReturn(false); + ActionThrottler throttler = mock(ActionThrottler.class); + when(throttler.throttle("_action", context)).thenReturn(throttleResult); + + // action level transform + Transform.Result actionTransformResult = mock(Transform.Result.class); + when(actionTransformResult.payload()).thenReturn(payload); + ExecutableTransform actionTransform = mock(ExecutableTransform.class); + when(actionTransform.execute(context, payload)).thenReturn(actionTransformResult); + + // the action + Action.Result actionResult = mock(Action.Result.class); + when(actionResult.type()).thenReturn("_action_type"); + when(actionResult.status()).thenReturn(Action.Result.Status.SUCCESS); + ExecutableAction action = mock(ExecutableAction.class); + when(action.execute("_action", context, payload)).thenReturn(actionResult); + + ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionTransform, action); + ExecutableActions actions = new ExecutableActions(Arrays.asList(actionWrapper)); + + WatchStatus watchStatus = new WatchStatus(ImmutableMap.of("_action", new ActionStatus(clock.nowUTC()))); + + when(watch.input()).thenReturn(input); + when(watch.condition()).thenReturn(condition); + when(watch.transform()).thenReturn(watchTransform); + when(watch.actions()).thenReturn(actions); + when(watch.status()).thenReturn(watchStatus); + + WatchRecord watchRecord = executionService.execute(context); + assertThat(watchRecord.result().conditionResult(), sameInstance(conditionResult)); + assertThat(watchRecord.result().transformResult(), sameInstance(watchTransformResult)); + ActionWrapper.Result result = watchRecord.result().actionsResults().get("_action"); + assertThat(result, notNullValue()); + assertThat(result.id(), is("_action")); + assertThat(result.transform(), sameInstance(actionTransformResult)); + assertThat(result.action(), sameInstance(actionResult)); + + verify(historyStore, times(1)).put(watchRecord); + verify(lock, times(1)).release(); + verify(condition, times(1)).execute(context); + verify(watchTransform, times(1)).execute(context, payload); + verify(action, times(1)).execute("_action", context, payload); + } + + @Test + public void testExecute_FailedInput() throws Exception { + WatchLockService.Lock lock = mock(WatchLockService.Lock.class); + when(watchLockService.acquire("_id")).thenReturn(lock); + + Watch watch = mock(Watch.class); + when(watch.id()).thenReturn("_id"); + when(watchStore.get("_id")).thenReturn(watch); + + ScheduleTriggerEvent event = new ScheduleTriggerEvent("_id", clock.nowUTC(), clock.nowUTC()); + WatchExecutionContext context = new TriggeredExecutionContext(watch, clock.nowUTC(), event, timeValueSeconds(5)); + + input = mock(ExecutableInput.class); + Input.Result inputResult = mock(Input.Result.class); + when(inputResult.status()).thenReturn(Input.Result.Status.FAILURE); + when(inputResult.reason()).thenReturn("_reason"); + when(input.execute(context)).thenReturn(inputResult); + + Condition.Result conditionResult = AlwaysCondition.Result.INSTANCE; + ExecutableCondition condition = mock(ExecutableCondition.class); + when(condition.execute(any(WatchExecutionContext.class))).thenReturn(conditionResult); + + // watch level transform + Transform.Result watchTransformResult = mock(Transform.Result.class); + when(watchTransformResult.payload()).thenReturn(payload); + ExecutableTransform watchTransform = mock(ExecutableTransform.class); + when(watchTransform.execute(context, payload)).thenReturn(watchTransformResult); + + // action throttler + Throttler.Result throttleResult = mock(Throttler.Result.class); + when(throttleResult.throttle()).thenReturn(false); + ActionThrottler throttler = mock(ActionThrottler.class); + when(throttler.throttle("_action", context)).thenReturn(throttleResult); + + // action level transform + Transform.Result actionTransformResult = mock(Transform.Result.class); + when(actionTransformResult.payload()).thenReturn(payload); + ExecutableTransform actionTransform = mock(ExecutableTransform.class); + when(actionTransform.execute(context, payload)).thenReturn(actionTransformResult); + + // the action + Action.Result actionResult = mock(Action.Result.class); + when(actionResult.type()).thenReturn("_action_type"); + when(actionResult.status()).thenReturn(Action.Result.Status.SUCCESS); + ExecutableAction action = mock(ExecutableAction.class); + when(action.execute("_action", context, payload)).thenReturn(actionResult); + + ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionTransform, action); + ExecutableActions actions = new ExecutableActions(Arrays.asList(actionWrapper)); + + WatchStatus watchStatus = new WatchStatus(ImmutableMap.of("_action", new ActionStatus(clock.nowUTC()))); + + when(watch.input()).thenReturn(input); + when(watch.condition()).thenReturn(condition); + when(watch.transform()).thenReturn(watchTransform); + when(watch.actions()).thenReturn(actions); + when(watch.status()).thenReturn(watchStatus); + + WatchRecord watchRecord = executionService.execute(context); + assertThat(watchRecord.result().inputResult(), is(inputResult)); + assertThat(watchRecord.result().conditionResult(), nullValue()); + assertThat(watchRecord.result().transformResult(), nullValue()); + assertThat(watchRecord.result().actionsResults(), notNullValue()); + assertThat(watchRecord.result().actionsResults().count(), is(0)); + + verify(historyStore, times(1)).put(watchRecord); + verify(lock, times(1)).release(); + verify(input, times(1)).execute(context); + verify(condition, never()).execute(context); + verify(watchTransform, never()).execute(context, payload); + verify(action, never()).execute("_action", context, payload); + } + + @Test + public void testExecuteInner() throws Exception { DateTime now = DateTime.now(UTC); Watch watch = mock(Watch.class); ScheduleTriggerEvent event = new ScheduleTriggerEvent("_id", now, now); @@ -102,7 +259,6 @@ public class ExecutionServiceTests extends ElasticsearchTestCase { ExecutableAction action = mock(ExecutableAction.class); when(action.execute("_action", context, payload)).thenReturn(actionResult); - ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionTransform, action); ExecutableActions actions = new ExecutableActions(Arrays.asList(actionWrapper)); @@ -115,9 +271,9 @@ public class ExecutionServiceTests extends ElasticsearchTestCase { when(watch.status()).thenReturn(watchStatus); WatchRecord watchRecord = executionService.executeInner(context); - assertThat(watchRecord.execution().conditionResult(), sameInstance(conditionResult)); - assertThat(watchRecord.execution().transformResult(), sameInstance(watchTransformResult)); - ActionWrapper.Result result = watchRecord.execution().actionsResults().get("_action"); + assertThat(watchRecord.result().conditionResult(), sameInstance(conditionResult)); + assertThat(watchRecord.result().transformResult(), sameInstance(watchTransformResult)); + ActionWrapper.Result result = watchRecord.result().actionsResults().get("_action"); assertThat(result, notNullValue()); assertThat(result.id(), is("_action")); assertThat(result.transform(), sameInstance(actionTransformResult)); @@ -129,7 +285,7 @@ public class ExecutionServiceTests extends ElasticsearchTestCase { } @Test - public void testExecute_throttled() throws Exception { + public void testExecuteInner_throttled() throws Exception { DateTime now = DateTime.now(UTC); Watch watch = mock(Watch.class); ScheduleTriggerEvent event = new ScheduleTriggerEvent("_id", now, now); @@ -159,12 +315,12 @@ public class ExecutionServiceTests extends ElasticsearchTestCase { when(watch.actions()).thenReturn(actions); when(watch.status()).thenReturn(watchStatus); - WatchRecord executionResult = executionService.executeInner(context); - assertThat(executionResult.execution().inputResult(), sameInstance(inputResult)); - assertThat(executionResult.execution().conditionResult(), sameInstance(conditionResult)); - assertThat(executionResult.execution().transformResult(), nullValue()); - assertThat(executionResult.execution().actionsResults().count(), is(1)); - ActionWrapper.Result result = executionResult.execution().actionsResults().get("_action"); + WatchRecord watchRecord = executionService.executeInner(context); + assertThat(watchRecord.result().inputResult(), sameInstance(inputResult)); + assertThat(watchRecord.result().conditionResult(), sameInstance(conditionResult)); + assertThat(watchRecord.result().transformResult(), nullValue()); + assertThat(watchRecord.result().actionsResults().count(), is(1)); + ActionWrapper.Result result = watchRecord.result().actionsResults().get("_action"); assertThat(result, notNullValue()); assertThat(result.id(), is("_action")); assertThat(result.transform(), nullValue()); @@ -178,7 +334,7 @@ public class ExecutionServiceTests extends ElasticsearchTestCase { } @Test - public void testExecute_conditionNotMet() throws Exception { + public void testExecuteInner_conditionNotMet() throws Exception { DateTime now = DateTime.now(UTC); Watch watch = mock(Watch.class); ScheduleTriggerEvent event = new ScheduleTriggerEvent("_id", now, now); @@ -206,11 +362,11 @@ public class ExecutionServiceTests extends ElasticsearchTestCase { when(watch.actions()).thenReturn(actions); when(watch.status()).thenReturn(watchStatus); - WatchRecord executionResult = executionService.executeInner(context); - assertThat(executionResult.execution().inputResult(), sameInstance(inputResult)); - assertThat(executionResult.execution().conditionResult(), sameInstance(conditionResult)); - assertThat(executionResult.execution().transformResult(), nullValue()); - assertThat(executionResult.execution().actionsResults().count(), is(0)); + WatchRecord watchRecord = executionService.executeInner(context); + assertThat(watchRecord.result().inputResult(), sameInstance(inputResult)); + assertThat(watchRecord.result().conditionResult(), sameInstance(conditionResult)); + assertThat(watchRecord.result().transformResult(), nullValue()); + assertThat(watchRecord.result().actionsResults().count(), is(0)); verify(condition, times(1)).execute(context); verify(watchTransform, never()).execute(context, payload); diff --git a/src/test/java/org/elasticsearch/watcher/execution/ManualExecutionTests.java b/src/test/java/org/elasticsearch/watcher/execution/ManualExecutionTests.java index e2ad8b40b23..b92d9d63cb9 100644 --- a/src/test/java/org/elasticsearch/watcher/execution/ManualExecutionTests.java +++ b/src/test/java/org/elasticsearch/watcher/execution/ManualExecutionTests.java @@ -5,11 +5,11 @@ */ package org.elasticsearch.watcher.execution; +import com.carrotsearch.randomizedtesting.annotations.Repeat; import org.apache.lucene.util.LuceneTestCase.Slow; import org.elasticsearch.common.joda.time.DateTime; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentType; -import org.elasticsearch.index.engine.DocumentMissingException; import org.elasticsearch.watcher.WatcherException; import org.elasticsearch.watcher.WatcherService; import org.elasticsearch.watcher.actions.ActionStatus; @@ -60,7 +60,7 @@ public class ManualExecutionTests extends AbstractWatcherIntegrationTests { return false; } - @Test //@Repeat(iterations = 10) + @Test @Repeat(iterations = 10) public void testExecuteWatch() throws Exception { ensureWatcherStarted(); boolean ignoreCondition = randomBoolean(); @@ -83,10 +83,10 @@ public class ManualExecutionTests extends AbstractWatcherIntegrationTests { refresh(); assertThat(watcherClient().getWatch(new GetWatchRequest("_id")).actionGet().isFound(), equalTo(true)); //If we are persisting the state we need to use the exact watch that is in memory - ctxBuilder = ManualExecutionContext.builder(watchService().getWatch("_id"), triggerEvent, timeValueSeconds(5)); + ctxBuilder = ManualExecutionContext.builder(watchService().getWatch("_id"), true, triggerEvent, timeValueSeconds(5)); } else { parsedWatch = watchParser().parse("_id", false, watchBuilder.buildAsBytes(XContentType.JSON)); - ctxBuilder = ManualExecutionContext.builder(parsedWatch, triggerEvent, timeValueSeconds(5)); + ctxBuilder = ManualExecutionContext.builder(parsedWatch, false, triggerEvent, timeValueSeconds(5)); } if (ignoreCondition) { @@ -117,18 +117,18 @@ public class ManualExecutionTests extends AbstractWatcherIntegrationTests { assertThat("the expected count of history records should be [" + expectedCount + "]", newRecordCount, equalTo(expectedCount)); if (ignoreCondition) { - assertThat("The action should have run", watchRecord.execution().actionsResults().count(), equalTo(1)); + assertThat("The action should have run", watchRecord.result().actionsResults().count(), equalTo(1)); } else if (!conditionAlwaysTrue) { - assertThat("The action should not have run", watchRecord.execution().actionsResults().count(), equalTo(0)); + assertThat("The action should not have run", watchRecord.result().actionsResults().count(), equalTo(0)); } if ((ignoreCondition || conditionAlwaysTrue) && action == null) { - assertThat("The action should have run non simulated", watchRecord.execution().actionsResults().get("log").action(), + assertThat("The action should have run non simulated", watchRecord.result().actionsResults().get("log").action(), not(instanceOf(LoggingAction.Result.Simulated.class)) ); } if ((ignoreCondition || conditionAlwaysTrue) && action != null ) { - assertThat("The action should have run simulated", watchRecord.execution().actionsResults().get("log").action(), instanceOf(LoggingAction.Result.Simulated.class)); + assertThat("The action should have run simulated", watchRecord.result().actionsResults().get("log").action(), instanceOf(LoggingAction.Result.Simulated.class)); } Watch testWatch = watchService().getWatch("_id"); @@ -165,7 +165,7 @@ public class ManualExecutionTests extends AbstractWatcherIntegrationTests { map2.put("foo", map1); ManualTriggerEvent triggerEvent = new ManualTriggerEvent("_id", new ScheduleTriggerEvent(new DateTime(UTC), new DateTime(UTC))); - ManualExecutionContext.Builder ctxBuilder1 = ManualExecutionContext.builder(watchService().getWatch("_id"), triggerEvent, timeValueSeconds(5)); + ManualExecutionContext.Builder ctxBuilder1 = ManualExecutionContext.builder(watchService().getWatch("_id"), true, triggerEvent, timeValueSeconds(5)); ctxBuilder1.actionMode("_all", ActionExecutionMode.SIMULATE); ctxBuilder1.withInput(new SimpleInput.Result(new Payload.Simple(map1))); @@ -173,7 +173,7 @@ public class ManualExecutionTests extends AbstractWatcherIntegrationTests { WatchRecord watchRecord1 = executionService().execute(ctxBuilder1.build()); - ManualExecutionContext.Builder ctxBuilder2 = ManualExecutionContext.builder(watchService().getWatch("_id"), triggerEvent, timeValueSeconds(5)); + ManualExecutionContext.Builder ctxBuilder2 = ManualExecutionContext.builder(watchService().getWatch("_id"), true, triggerEvent, timeValueSeconds(5)); ctxBuilder2.actionMode("_all", ActionExecutionMode.SIMULATE); ctxBuilder2.withInput(new SimpleInput.Result(new Payload.Simple(map2))); @@ -181,8 +181,8 @@ public class ManualExecutionTests extends AbstractWatcherIntegrationTests { WatchRecord watchRecord2 = executionService().execute(ctxBuilder2.build()); - assertThat(watchRecord1.execution().inputResult().payload().data().get("foo").toString(), equalTo("bar")); - assertThat(watchRecord2.execution().inputResult().payload().data().get("foo"), instanceOf(Map.class)); + assertThat(watchRecord1.result().inputResult().payload().data().get("foo").toString(), equalTo("bar")); + assertThat(watchRecord2.result().inputResult().payload().data().get("foo"), instanceOf(Map.class)); } @Test @@ -245,13 +245,12 @@ public class ManualExecutionTests extends AbstractWatcherIntegrationTests { .addAction("log", loggingAction("foobar")); Watch watch = watchParser().parse("_id", false, watchBuilder.buildAsBytes(XContentType.JSON)); - ManualExecutionContext.Builder ctxBuilder = ManualExecutionContext.builder(watch, new ManualTriggerEvent("_id", new ScheduleTriggerEvent(new DateTime(UTC), new DateTime(UTC))), new TimeValue(1, TimeUnit.HOURS)); + ManualExecutionContext.Builder ctxBuilder = ManualExecutionContext.builder(watch, false, new ManualTriggerEvent("_id", new ScheduleTriggerEvent(new DateTime(UTC), new DateTime(UTC))), new TimeValue(1, TimeUnit.HOURS)); WatchRecord record = executionService().execute(ctxBuilder.build()); - assertThat(record.execution().executionDurationMs(), greaterThanOrEqualTo(100L)); + assertThat(record.result().executionDurationMs(), greaterThanOrEqualTo(100L)); } - @Test - @Slow + @Test @Slow public void testForceDeletionOfLongRunningWatch() throws Exception { WatchSourceBuilder watchBuilder = watchBuilder() .trigger(schedule(cron("0 0 0 1 * ? 2099"))) @@ -266,7 +265,6 @@ public class ManualExecutionTests extends AbstractWatcherIntegrationTests { refresh(); assertThat(watcherClient().getWatch(new GetWatchRequest("_id")).actionGet().isFound(), equalTo(true)); - CountDownLatch startLatch = new CountDownLatch(1); List threads = new ArrayList<>(); @@ -308,7 +306,7 @@ public class ManualExecutionTests extends AbstractWatcherIntegrationTests { this.watchId = watchId; this.startLatch = startLatch; ManualTriggerEvent triggerEvent = new ManualTriggerEvent(watchId, new ScheduleTriggerEvent(new DateTime(UTC), new DateTime(UTC))); - ctxBuilder = ManualExecutionContext.builder(watcherService.getWatch(watchId), triggerEvent, timeValueSeconds(5)); + ctxBuilder = ManualExecutionContext.builder(watcherService.getWatch(watchId), true, triggerEvent, timeValueSeconds(5)); ctxBuilder.recordExecution(true); ctxBuilder.actionMode("_all", ActionExecutionMode.FORCE_EXECUTE); } @@ -317,10 +315,9 @@ public class ManualExecutionTests extends AbstractWatcherIntegrationTests { public void run() { try { startLatch.await(); - executionService.execute(ctxBuilder.build()); - fail("Execution of a deleted watch should fail but didn't"); - } catch (WatchMissingException we) { - assertThat(we.getCause(), instanceOf(DocumentMissingException.class)); + WatchRecord record = executionService.execute(ctxBuilder.build()); + assertThat(record, notNullValue()); + assertThat(record.state(), is(ExecutionState.NOT_EXECUTED_WATCH_MISSING)); } catch (Throwable t) { throw new WatcherException("Failure mode execution of [{}] failed in an unexpected way", t, watchId); } diff --git a/src/test/java/org/elasticsearch/watcher/input/search/SearchInputTests.java b/src/test/java/org/elasticsearch/watcher/input/search/SearchInputTests.java index c2ba9343572..8ba65035a34 100644 --- a/src/test/java/org/elasticsearch/watcher/input/search/SearchInputTests.java +++ b/src/test/java/org/elasticsearch/watcher/input/search/SearchInputTests.java @@ -107,10 +107,12 @@ public class SearchInputTests extends ElasticsearchIntegrationTest { @Test public void testSearch_InlineTemplate() throws Exception { + WatchExecutionContext ctx = createContext(); + final String expectedQuery = "{\"template\":{\"query\":{\"filtered\":{\"query\":{\"match\":{\"event_type\":{\"query\":\"a\"," + "\"type\":\"boolean\"}}},\"filter\":{\"range\":{\"_timestamp\":" + "{\"from\":\"{{ctx.trigger.scheduled_time}}||-{{seconds_param}}\",\"to\":\"{{ctx.trigger.scheduled_time}}\"," + - "\"include_lower\":true,\"include_upper\":true}}}}}},\"params\":{\"seconds_param\":\"30s\",\"ctx\":{\"metadata\":null,\"watch_id\":\"test-watch\",\"trigger\":{\"triggered_time\":\"1970-01-01T00:01:00.000Z\",\"scheduled_time\":\"1970-01-01T00:01:00.000Z\"},\"execution_time\":\"1970-01-01T00:01:00.000Z\"}}}"; + "\"include_lower\":true,\"include_upper\":true}}}}}},\"params\":{\"seconds_param\":\"30s\",\"ctx\":{\"id\":\"" + ctx.id().value() + "\",\"metadata\":null,\"watch_id\":\"test-watch\",\"trigger\":{\"triggered_time\":\"1970-01-01T00:01:00.000Z\",\"scheduled_time\":\"1970-01-01T00:01:00.000Z\"},\"execution_time\":\"1970-01-01T00:01:00.000Z\"}}}"; Map params = new HashMap<>(); params.put("seconds_param", "30s"); @@ -125,13 +127,14 @@ public class SearchInputTests extends ElasticsearchIntegrationTest { .setTemplateSource(templateSource) .request(); - - SearchInput.Result executedResult = executeSearchInput(request); + SearchInput.Result executedResult = executeSearchInput(request, ctx); assertThat(areJsonEquivalent(executedResult.executedRequest().templateSource().toUtf8(), expectedQuery), is(true)); } @Test public void testSearch_IndexedTemplate() throws Exception { + WatchExecutionContext ctx = createContext(); + PutIndexedScriptRequest indexedScriptRequest = client().preparePutIndexedScript("mustache","test-template", TEMPLATE_QUERY).request(); assertThat(client().putIndexedScript(indexedScriptRequest).actionGet().isCreated(), is(true)); @@ -148,12 +151,14 @@ public class SearchInputTests extends ElasticsearchIntegrationTest { .setTemplateSource(templateSource) .request(); - SearchInput.Result executedResult = executeSearchInput(request); + SearchInput.Result executedResult = executeSearchInput(request, ctx); assertThat(executedResult.executedRequest().templateSource().toUtf8(), startsWith("{\"template\":{\"id\":\"test-template\"")); } @Test public void testSearch_OndiskTemplate() throws Exception { + WatchExecutionContext ctx = createContext(); + Map params = new HashMap<>(); params.put("seconds_param", "30s"); @@ -167,7 +172,7 @@ public class SearchInputTests extends ElasticsearchIntegrationTest { .setTemplateSource(templateSource) .request(); - SearchInput.Result executedResult = executeSearchInput(request); + SearchInput.Result executedResult = executeSearchInput(request, ctx); assertThat(executedResult.executedRequest().templateSource().toUtf8(), startsWith("{\"template\":{\"file\":\"test_disk_template\"")); } @@ -243,15 +248,8 @@ public class SearchInputTests extends ElasticsearchIntegrationTest { fail("expected a SearchInputException as search type SCAN should not be supported"); } - private SearchInput.Result executeSearchInput(SearchRequest request) throws IOException { - createIndex("test-search-index"); - ensureGreen("test-search-index"); - SearchInput.Builder siBuilder = SearchInput.builder(request); - - SearchInput si = siBuilder.build(); - - ExecutableSearchInput searchInput = new ExecutableSearchInput(si, logger, ClientProxy.of(client())); - WatchExecutionContext ctx = new TriggeredExecutionContext( + private WatchExecutionContext createContext() { + return new TriggeredExecutionContext( new Watch("test-watch", new ScheduleTrigger(new IntervalSchedule(new IntervalSchedule.Interval(1, IntervalSchedule.Interval.Unit.MINUTES))), new ExecutableSimpleInput(new SimpleInput(new Payload.Simple()), logger), @@ -264,6 +262,16 @@ public class SearchInputTests extends ElasticsearchIntegrationTest { new DateTime(60000, UTC), new ScheduleTriggerEvent("test-watch", new DateTime(60000, UTC), new DateTime(60000, UTC)), timeValueSeconds(5)); + } + + private SearchInput.Result executeSearchInput(SearchRequest request, WatchExecutionContext ctx) throws IOException { + createIndex("test-search-index"); + ensureGreen("test-search-index"); + SearchInput.Builder siBuilder = SearchInput.builder(request); + + SearchInput si = siBuilder.build(); + + ExecutableSearchInput searchInput = new ExecutableSearchInput(si, logger, ClientProxy.of(client())); return searchInput.execute(ctx); } diff --git a/src/test/java/org/elasticsearch/watcher/support/VariablesTests.java b/src/test/java/org/elasticsearch/watcher/support/VariablesTests.java index d9efa40db00..f7ab0b2be55 100644 --- a/src/test/java/org/elasticsearch/watcher/support/VariablesTests.java +++ b/src/test/java/org/elasticsearch/watcher/support/VariablesTests.java @@ -9,6 +9,7 @@ import org.elasticsearch.common.collect.ImmutableMap; import org.elasticsearch.common.joda.time.DateTime; import org.elasticsearch.test.ElasticsearchTestCase; import org.elasticsearch.watcher.execution.WatchExecutionContext; +import org.elasticsearch.watcher.execution.Wid; import org.elasticsearch.watcher.test.WatcherTestUtils; import org.elasticsearch.watcher.trigger.TriggerEvent; import org.elasticsearch.watcher.trigger.schedule.ScheduleTriggerEvent; @@ -18,6 +19,7 @@ import org.junit.Test; import java.util.Map; import static org.elasticsearch.common.joda.time.DateTimeZone.UTC; +import static org.elasticsearch.watcher.test.WatcherTestUtils.assertValue; import static org.hamcrest.Matchers.*; /** @@ -33,25 +35,23 @@ public class VariablesTests extends ElasticsearchTestCase { Payload payload = new Payload.Simple(ImmutableMap.builder().put("payload_key", "payload_value").build()); Map metatdata = ImmutableMap.builder().put("metadata_key", "metadata_value").build(); TriggerEvent event = new ScheduleTriggerEvent("_watch_id", triggeredTime, scheduledTime); - WatchExecutionContext wec = WatcherTestUtils.mockExecutionContextBuilder("_watch_id") + Wid wid = new Wid("_watch_id", 0, executionTime); + WatchExecutionContext ctx = WatcherTestUtils.mockExecutionContextBuilder("_watch_id") + .wid(wid) .executionTime(executionTime) .triggerEvent(event) .payload(payload) .metadata(metatdata) .buildMock(); - Map model = Variables.createCtxModel(wec, payload); + Map model = Variables.createCtxModel(ctx, payload); assertThat(model, notNullValue()); - assertThat(model, hasKey(Variables.CTX)); - assertThat(model.get(Variables.CTX), instanceOf(Map.class)); assertThat(model.size(), is(1)); - - Map ctx = (Map) model.get(Variables.CTX); - assertThat(ctx, hasEntry(Variables.WATCH_ID, (Object) "_watch_id")); - assertThat(ctx, hasEntry(Variables.EXECUTION_TIME, (Object) executionTime)); - assertThat(ctx, hasEntry(Variables.TRIGGER, (Object) event.data())); - assertThat(ctx, hasEntry(Variables.PAYLOAD, (Object) payload.data())); - assertThat(ctx, hasEntry(Variables.METADATA, (Object) metatdata)); - assertThat(ctx.size(), is(5)); + assertValue(model, "ctx", instanceOf(Map.class)); + assertValue(model, "ctx.id", is(wid.value())); + assertValue(model, "ctx.execution_time", is(executionTime)); + assertValue(model, "ctx.trigger", is(event.data())); + assertValue(model, "ctx.payload", is(payload.data())); + assertValue(model, "ctx.metadata", is(metatdata)); } } diff --git a/src/test/java/org/elasticsearch/watcher/test/WatcherTestUtils.java b/src/test/java/org/elasticsearch/watcher/test/WatcherTestUtils.java index 10291d6897f..ffb5a9b3bbd 100644 --- a/src/test/java/org/elasticsearch/watcher/test/WatcherTestUtils.java +++ b/src/test/java/org/elasticsearch/watcher/test/WatcherTestUtils.java @@ -43,6 +43,7 @@ import org.elasticsearch.watcher.actions.webhook.WebhookAction; import org.elasticsearch.watcher.condition.script.ExecutableScriptCondition; import org.elasticsearch.watcher.condition.script.ScriptCondition; import org.elasticsearch.watcher.execution.WatchExecutionContext; +import org.elasticsearch.watcher.execution.Wid; import org.elasticsearch.watcher.input.search.ExecutableSearchInput; import org.elasticsearch.watcher.input.simple.ExecutableSimpleInput; import org.elasticsearch.watcher.input.simple.SimpleInput; @@ -59,6 +60,7 @@ import org.elasticsearch.watcher.support.template.Template; import org.elasticsearch.watcher.support.template.TemplateEngine; import org.elasticsearch.watcher.support.template.xmustache.XMustacheScriptEngineService; import org.elasticsearch.watcher.support.template.xmustache.XMustacheTemplateEngine; +import org.elasticsearch.watcher.support.xcontent.ObjectPath; import org.elasticsearch.watcher.transform.search.ExecutableSearchTransform; import org.elasticsearch.watcher.transform.search.SearchTransform; import org.elasticsearch.watcher.trigger.TriggerEvent; @@ -67,16 +69,19 @@ import org.elasticsearch.watcher.trigger.schedule.ScheduleTrigger; import org.elasticsearch.watcher.watch.Payload; import org.elasticsearch.watcher.watch.Watch; import org.elasticsearch.watcher.watch.WatchStatus; +import org.hamcrest.Matcher; import javax.mail.internet.AddressException; import java.io.IOException; import java.lang.reflect.Constructor; import java.util.*; +import static com.carrotsearch.randomizedtesting.RandomizedTest.randomInt; import static org.elasticsearch.common.joda.time.DateTimeZone.UTC; import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery; import static org.elasticsearch.search.builder.SearchSourceBuilder.searchSource; import static org.elasticsearch.test.ElasticsearchTestCase.randomFrom; +import static org.junit.Assert.assertThat; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -90,6 +95,10 @@ public final class WatcherTestUtils { private WatcherTestUtils() { } + public static void assertValue(Map map, String path, Matcher matcher) { + assertThat(ObjectPath.eval(path, map), (Matcher) matcher); + } + public static XContentParser xContentParser(XContentBuilder builder) throws IOException { return builder.contentType().xContent().createParser(builder.bytes()); } @@ -120,17 +129,20 @@ public final class WatcherTestUtils { } public static WatchExecutionContextMockBuilder mockExecutionContextBuilder(String watchId) { - return new WatchExecutionContextMockBuilder(watchId); + return new WatchExecutionContextMockBuilder(watchId) + .wid(new Wid(watchId, randomInt(10), DateTime.now(UTC))); } public static WatchExecutionContext mockExecutionContext(String watchId, Payload payload) { return mockExecutionContextBuilder(watchId) + .wid(new Wid(watchId, randomInt(10), DateTime.now(UTC))) .payload(payload) .buildMock(); } public static WatchExecutionContext mockExecutionContext(String watchId, DateTime time, Payload payload) { return mockExecutionContextBuilder(watchId) + .wid(new Wid(watchId, randomInt(10), DateTime.now(UTC))) .payload(payload) .time(watchId, time) .buildMock(); @@ -138,6 +150,7 @@ public final class WatcherTestUtils { public static WatchExecutionContext mockExecutionContext(String watchId, DateTime executionTime, TriggerEvent event, Payload payload) { return mockExecutionContextBuilder(watchId) + .wid(new Wid(watchId, randomInt(10), DateTime.now(UTC))) .payload(payload) .executionTime(executionTime) .triggerEvent(event) diff --git a/src/test/java/org/elasticsearch/watcher/test/integration/BootStrapTests.java b/src/test/java/org/elasticsearch/watcher/test/integration/BootStrapTests.java index 97da390e3ab..7246d5f1435 100644 --- a/src/test/java/org/elasticsearch/watcher/test/integration/BootStrapTests.java +++ b/src/test/java/org/elasticsearch/watcher/test/integration/BootStrapTests.java @@ -208,7 +208,7 @@ public class BootStrapTests extends AbstractWatcherIntegrationTests { SearchResponse searchResponse = client().prepareSearch(HistoryStore.INDEX_PREFIX + "*").get(); assertHitCount(searchResponse, 1); assertThat(searchResponse.getHits().getAt(0).id(), Matchers.equalTo(wid.value())); - assertThat(searchResponse.getHits().getAt(0).sourceAsMap().get(WatchRecord.Field.STATE.getPreferredName()).toString(), Matchers.equalTo(ExecutionState.DELETED_WHILE_QUEUED.toString())); + assertThat(searchResponse.getHits().getAt(0).sourceAsMap().get(WatchRecord.Field.STATE.getPreferredName()).toString(), Matchers.equalTo(ExecutionState.NOT_EXECUTED_WATCH_MISSING.toString())); } diff --git a/src/test/java/org/elasticsearch/watcher/test/integration/HttpSecretsIntegrationTests.java b/src/test/java/org/elasticsearch/watcher/test/integration/HttpSecretsIntegrationTests.java index 8ed9596bac4..6852c2a1e1f 100644 --- a/src/test/java/org/elasticsearch/watcher/test/integration/HttpSecretsIntegrationTests.java +++ b/src/test/java/org/elasticsearch/watcher/test/integration/HttpSecretsIntegrationTests.java @@ -153,7 +153,7 @@ public class HttpSecretsIntegrationTests extends AbstractWatcherIntegrationTests .get(); assertThat(executeResponse, notNullValue()); contentSource = executeResponse.getRecordSource(); - value = contentSource.getValue("result.input.http.status"); + value = contentSource.getValue("result.input.http.status_code"); assertThat(value, notNullValue()); assertThat(value, is((Object) 200)); diff --git a/src/test/java/org/elasticsearch/watcher/transform/search/SearchTransformTests.java b/src/test/java/org/elasticsearch/watcher/transform/search/SearchTransformTests.java index 066241ca339..60a4e0c2658 100644 --- a/src/test/java/org/elasticsearch/watcher/transform/search/SearchTransformTests.java +++ b/src/test/java/org/elasticsearch/watcher/transform/search/SearchTransformTests.java @@ -253,6 +253,8 @@ public class SearchTransformTests extends ElasticsearchIntegrationTest { @Test public void testSearch_InlineTemplate() throws Exception { + WatchExecutionContext ctx = createContext(); + final String templateQuery = "{\"query\":{\"filtered\":{\"query\":{\"match\":{\"event_type\":{\"query\":\"a\"," + "\"type\":\"boolean\"}}},\"filter\":{\"range\":{\"_timestamp\":" + "{\"from\":\"{{ctx.trigger.scheduled_time}}||-{{seconds_param}}\",\"to\":\"{{ctx.trigger.scheduled_time}}\"," + @@ -261,7 +263,7 @@ public class SearchTransformTests extends ElasticsearchIntegrationTest { final String expectedQuery = "{\"template\":{\"query\":{\"filtered\":{\"query\":{\"match\":{\"event_type\":{\"query\":\"a\"," + "\"type\":\"boolean\"}}},\"filter\":{\"range\":{\"_timestamp\":" + "{\"from\":\"{{ctx.trigger.scheduled_time}}||-{{seconds_param}}\",\"to\":\"{{ctx.trigger.scheduled_time}}\"," + - "\"include_lower\":true,\"include_upper\":true}}}}}},\"params\":{\"seconds_param\":\"30s\",\"ctx\":{\"metadata\":null,\"watch_id\":\"test-watch\",\"payload\":{},\"trigger\":{\"triggered_time\":\"1970-01-01T00:01:00.000Z\",\"scheduled_time\":\"1970-01-01T00:01:00.000Z\"},\"execution_time\":\"1970-01-01T00:01:00.000Z\"}}}"; + "\"include_lower\":true,\"include_upper\":true}}}}}},\"params\":{\"seconds_param\":\"30s\",\"ctx\":{\"id\":\"" + ctx.id().value() + "\",\"metadata\":null,\"watch_id\":\"test-watch\",\"payload\":{},\"trigger\":{\"triggered_time\":\"1970-01-01T00:01:00.000Z\",\"scheduled_time\":\"1970-01-01T00:01:00.000Z\"},\"execution_time\":\"1970-01-01T00:01:00.000Z\"}}}"; Map params = new HashMap<>(); params.put("seconds_param", "30s"); @@ -276,13 +278,15 @@ public class SearchTransformTests extends ElasticsearchIntegrationTest { .setTemplateSource(templateSource) .request(); - SearchTransform.Result executedResult = executeSearchTransform(request); + SearchTransform.Result executedResult = executeSearchTransform(request, ctx); assertThat(areJsonEquivalent(executedResult.executedRequest().templateSource().toUtf8(), expectedQuery), is(true)); } @Test public void testSearch_IndexedTemplate() throws Exception { + WatchExecutionContext ctx = createContext(); + final String templateQuery = "{\"query\":{\"filtered\":{\"query\":{\"match\":{\"event_type\":{\"query\":\"a\"," + "\"type\":\"boolean\"}}},\"filter\":{\"range\":{\"_timestamp\":" + "{\"from\":\"{{ctx.trigger.scheduled_time}}||-{{seconds_param}}\",\"to\":\"{{ctx.trigger.scheduled_time}}\"," + @@ -304,13 +308,15 @@ public class SearchTransformTests extends ElasticsearchIntegrationTest { .setTemplateSource(templateSource) .request(); - SearchTransform.Result result = executeSearchTransform(request); + SearchTransform.Result result = executeSearchTransform(request, ctx); assertNotNull(result.executedRequest()); assertThat(result.executedRequest().templateSource().toUtf8(), startsWith("{\"template\":{\"id\":\"test-script\"")); } @Test public void testSearch_OndiskTemplate() throws Exception { + WatchExecutionContext ctx = createContext(); + Map params = new HashMap<>(); params.put("seconds_param", "30s"); @@ -324,7 +330,7 @@ public class SearchTransformTests extends ElasticsearchIntegrationTest { .setTemplateSource(templateSource) .request(); - SearchTransform.Result result = executeSearchTransform(request); + SearchTransform.Result result = executeSearchTransform(request, ctx); assertNotNull(result.executedRequest()); assertThat(result.executedRequest().templateSource().toUtf8(), startsWith("{\"template\":{\"file\":\"test_disk_template\"")); } @@ -332,11 +338,13 @@ public class SearchTransformTests extends ElasticsearchIntegrationTest { @Test public void testDifferentSearchType() throws Exception { + WatchExecutionContext ctx = createContext(); + SearchSourceBuilder searchSourceBuilder = searchSource().query(filteredQuery( - matchQuery("event_type", "a"), - rangeFilter("_timestamp") - .from("{{ctx.trigger.scheduled_time}}||-30s") - .to("{{ctx.trigger.triggered_time}}"))); + matchQuery("event_type", "a"), + rangeFilter("_timestamp") + .from("{{ctx.trigger.scheduled_time}}||-30s") + .to("{{ctx.trigger.triggered_time}}"))); final SearchType searchType = getRandomSupportedSearchType(); SearchRequest request = client() @@ -345,7 +353,7 @@ public class SearchTransformTests extends ElasticsearchIntegrationTest { .request() .source(searchSourceBuilder); - SearchTransform.Result result = executeSearchTransform(request); + SearchTransform.Result result = executeSearchTransform(request, ctx); assertThat((Integer) XContentMapValues.extractValue("hits.total", result.payload().data()), equalTo(0)); assertThat(result.executedRequest(), notNullValue()); @@ -354,14 +362,8 @@ public class SearchTransformTests extends ElasticsearchIntegrationTest { assertThat(result.executedRequest().indicesOptions(), equalTo(request.indicesOptions())); } - private SearchTransform.Result executeSearchTransform(SearchRequest request) throws IOException { - createIndex("test-search-index"); - ensureGreen("test-search-index"); - - SearchTransform searchTransform = TransformBuilders.searchTransform(request).build(); - ExecutableSearchTransform executableSearchTransform = new ExecutableSearchTransform(searchTransform, logger, ClientProxy.of(client())); - - WatchExecutionContext ctx = new TriggeredExecutionContext( + private WatchExecutionContext createContext() { + return new TriggeredExecutionContext( new Watch("test-watch", new ScheduleTrigger(new IntervalSchedule(new IntervalSchedule.Interval(1, IntervalSchedule.Interval.Unit.MINUTES))), new ExecutableSimpleInput(new SimpleInput(new Payload.Simple()), logger), @@ -374,6 +376,14 @@ public class SearchTransformTests extends ElasticsearchIntegrationTest { new DateTime(60000, UTC), new ScheduleTriggerEvent("test-watch", new DateTime(60000, UTC), new DateTime(60000, UTC)), timeValueSeconds(5)); + } + + private SearchTransform.Result executeSearchTransform(SearchRequest request, WatchExecutionContext ctx) throws IOException { + createIndex("test-search-index"); + ensureGreen("test-search-index"); + + SearchTransform searchTransform = TransformBuilders.searchTransform(request).build(); + ExecutableSearchTransform executableSearchTransform = new ExecutableSearchTransform(searchTransform, logger, ClientProxy.of(client())); return executableSearchTransform.execute(ctx, Payload.Simple.EMPTY); } diff --git a/src/test/java/org/elasticsearch/watcher/transport/action/delete/ForceDeleteWatchTests.java b/src/test/java/org/elasticsearch/watcher/transport/action/delete/ForceDeleteWatchTests.java index 6cdd48b5efd..6e4469b04e2 100644 --- a/src/test/java/org/elasticsearch/watcher/transport/action/delete/ForceDeleteWatchTests.java +++ b/src/test/java/org/elasticsearch/watcher/transport/action/delete/ForceDeleteWatchTests.java @@ -39,9 +39,9 @@ public class ForceDeleteWatchTests extends AbstractWatcherIntegrationTests { @Test @Slow @TestLogging("_root:DEBUG") public void testForceDelete_LongRunningWatch() throws Exception { PutWatchResponse putResponse = watcherClient().preparePutWatch("_name").setSource(watchBuilder() - .trigger(schedule(interval("1s"))) + .trigger(schedule(interval("3s"))) .condition(scriptCondition(Script.inline("sleep 5000; return true"))) - .addAction("_action1", loggingAction("{{ctx.watch_id}}"))) + .addAction("_action1", loggingAction("executed action: {{ctx.id}}"))) .get(); assertThat(putResponse.getId(), equalTo("_name")); Thread.sleep(5000);