Moved input errors to the input result

Until now, if the input failed, an exception would be thrown and it would be captured globally on the watch execution and in the watch recod message. The problem with this approach is that the information about the input is lost. In this commit, the failure is returned as part of the input result.

- A new `status` field was added to the input result. Can either have `success` or `failure` values. When set to `failure` a `reason` field will be set with the error message.
- The `ExecutionService` changed to enable this functionality. Mainly, instead of relying on exception, during the execution the input result is checked for its status and the execution is aborted on failure. Also, the two places where the watch execution is handled were consolidated to a single method `execute(WatchExecutionContext)`.
- Also, the watch execution context id (which will end up being the `watch_record` id) was added the the context model (accessible via scripts and templates). This is done mainly for debugging purposes.

Original commit: elastic/x-pack-elasticsearch@e2567deada
This commit is contained in:
uboness 2015-06-08 17:45:21 +02:00
parent b54b52bb2f
commit 288ce368d5
37 changed files with 648 additions and 311 deletions

View File

@ -10,6 +10,7 @@ import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import java.util.Iterator; import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Condition;
@ -17,7 +18,7 @@ import java.util.concurrent.locks.ReentrantLock;
public class CurrentExecutions implements Iterable<ExecutionService.WatchExecution> { public class CurrentExecutions implements Iterable<ExecutionService.WatchExecution> {
private final ConcurrentMap<String, ExecutionService.WatchExecution> currentExecutions = ConcurrentCollections.newConcurrentMapWithAggressiveConcurrency(); private final ConcurrentMap<String, ExecutionService.WatchExecution> currentExecutions = new ConcurrentHashMap<>();
private final ReentrantLock lock = new ReentrantLock(); private final ReentrantLock lock = new ReentrantLock();
private final Condition empty = lock.newCondition(); private final Condition empty = lock.newCondition();
private boolean seal = false; private boolean seal = false;

View File

@ -15,7 +15,6 @@ import org.elasticsearch.common.joda.time.DateTime;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.index.engine.DocumentMissingException;
import org.elasticsearch.watcher.WatcherException; import org.elasticsearch.watcher.WatcherException;
import org.elasticsearch.watcher.actions.ActionWrapper; import org.elasticsearch.watcher.actions.ActionWrapper;
import org.elasticsearch.watcher.condition.Condition; import org.elasticsearch.watcher.condition.Condition;
@ -83,8 +82,8 @@ public class ExecutionService extends AbstractComponent {
historyStore.start(); historyStore.start();
triggeredWatchStore.start(); triggeredWatchStore.start();
currentExecutions = new CurrentExecutions(); currentExecutions = new CurrentExecutions();
Collection<TriggeredWatch> records = triggeredWatchStore.loadTriggeredWatches(state); Collection<TriggeredWatch> triggeredWatches = triggeredWatchStore.loadTriggeredWatches(state);
executeRecords(records); executeTriggeredWatches(triggeredWatches);
logger.debug("started execution service"); logger.debug("started execution service");
} }
} }
@ -97,14 +96,13 @@ public class ExecutionService extends AbstractComponent {
if (started.compareAndSet(true, false)) { if (started.compareAndSet(true, false)) {
logger.debug("stopping execution service"); logger.debug("stopping execution service");
// We could also rely on the shutdown in #updateSettings call, but // We could also rely on the shutdown in #updateSettings call, but
// this is a forceful shutdown that also interrupts the worker threads in the threadpool // this is a forceful shutdown that also interrupts the worker threads in the thread pool
List<Runnable> cancelledTasks = new ArrayList<>(); int cancelledTaskCount = executor.queue().drainTo(new ArrayList<>());
executor.queue().drainTo(cancelledTasks);
currentExecutions.sealAndAwaitEmpty(maxStopTimeout); currentExecutions.sealAndAwaitEmpty(maxStopTimeout);
triggeredWatchStore.stop(); triggeredWatchStore.stop();
historyStore.stop(); historyStore.stop();
logger.debug("cancelled [{}] queued tasks", cancelledTasks.size()); logger.debug("cancelled [{}] queued tasks", cancelledTaskCount);
logger.debug("stopped execution service"); logger.debug("stopped execution service");
} }
} }
@ -181,49 +179,25 @@ public class ExecutionService extends AbstractComponent {
} }
logger.debug("saving watch records [{}]", triggeredWatches.size()); logger.debug("saving watch records [{}]", triggeredWatches.size());
if (triggeredWatches.size() == 0) {
return;
}
if (triggeredWatches.size() == 1) { triggeredWatchStore.putAll(triggeredWatches, new ActionListener<List<Integer>>() {
final TriggeredWatch triggeredWatch = triggeredWatches.getFirst(); @Override
final TriggeredExecutionContext ctx = contexts.getFirst(); public void onResponse(List<Integer> successFullSlots) {
triggeredWatchStore.put(triggeredWatch, new ActionListener<Boolean>() { for (Integer slot : successFullSlots) {
@Override executeAsync(contexts.get(slot), triggeredWatches.get(slot));
public void onResponse(Boolean aBoolean) {
executeAsync(ctx, triggeredWatch);
} }
}
@Override @Override
public void onFailure(Throwable e) { public void onFailure(Throwable e) {
Throwable cause = ExceptionsHelper.unwrapCause(e); Throwable cause = ExceptionsHelper.unwrapCause(e);
if (cause instanceof EsRejectedExecutionException) { if (cause instanceof EsRejectedExecutionException) {
logger.debug("failed to store watch record [{}]/[{}] due to overloaded threadpool [{}]", triggeredWatch, ctx.id(), ExceptionsHelper.detailedMessage(e)); logger.debug("failed to store watch records due to overloaded threadpool [{}]", ExceptionsHelper.detailedMessage(e));
} else { } else {
logger.warn("failed to store watch record [{}]/[{}]", e, triggeredWatch, ctx.id()); logger.warn("failed to store watch records", e);
}
} }
}); }
} else { });
triggeredWatchStore.putAll(triggeredWatches, new ActionListener<List<Integer>>() {
@Override
public void onResponse(List<Integer> successFullSlots) {
for (Integer slot : successFullSlots) {
executeAsync(contexts.get(slot), triggeredWatches.get(slot));
}
}
@Override
public void onFailure(Throwable e) {
Throwable cause = ExceptionsHelper.unwrapCause(e);
if (cause instanceof EsRejectedExecutionException) {
logger.debug("failed to store watch records due to overloaded threadpool [{}]", ExceptionsHelper.detailedMessage(e));
} else {
logger.warn("failed to store watch records", e);
}
}
});
}
} }
void processEventsSync(Iterable<TriggerEvent> events) throws WatcherException { void processEventsSync(Iterable<TriggerEvent> events) throws WatcherException {
@ -250,37 +224,64 @@ public class ExecutionService extends AbstractComponent {
return; return;
} }
if (triggeredWatches.size() == 1) { List<Integer> slots = triggeredWatchStore.putAll(triggeredWatches);
final TriggeredWatch triggeredWatch = triggeredWatches.getFirst(); for (Integer slot : slots) {
final TriggeredExecutionContext ctx = contexts.getFirst(); executeAsync(contexts.get(slot), triggeredWatches.get(slot));
triggeredWatchStore.put(triggeredWatch);
executeAsync(ctx, triggeredWatch);
} else {
List<Integer> slots = triggeredWatchStore.putAll(triggeredWatches);
for (Integer slot : slots) {
executeAsync(contexts.get(slot), triggeredWatches.get(slot));
}
} }
} }
public WatchRecord execute(WatchExecutionContext ctx) throws IOException { public WatchRecord execute(WatchExecutionContext ctx) {
WatchRecord record; WatchRecord record = null;
WatchLockService.Lock lock = watchLockService.acquire(ctx.watch().id()); WatchLockService.Lock lock = watchLockService.acquire(ctx.watch().id());
if (logger.isTraceEnabled()) {
logger.trace("acquired lock for [{}] -- [{}]", ctx.id(), System.identityHashCode(lock));
}
try { try {
currentExecutions.put(ctx.watch().id(), new WatchExecution(ctx, Thread.currentThread())); currentExecutions.put(ctx.watch().id(), new WatchExecution(ctx, Thread.currentThread()));
record = executeInner(ctx);
if (ctx.recordExecution()) { if (ctx.knownWatch() && watchStore.get(ctx.watch().id()) == null) {
watchStore.updateStatus(ctx.watch()); // fail fast if we are trying to execute a deleted watch
String message = "unable to find watch for record [" + ctx.id() + "], perhaps it has been deleted, ignoring...";
logger.warn(message);
record = ctx.abortBeforeExecution(message, ExecutionState.NOT_EXECUTED_WATCH_MISSING);
} else {
logger.debug("executing watch [{}]", ctx.id().watchId());
record = executeInner(ctx);
if (ctx.recordExecution()) {
watchStore.updateStatus(ctx.watch());
}
} }
} catch (DocumentMissingException vcee) {
throw new WatchMissingException("failed to update the watch [{}] on execute perhaps it was force deleted", vcee, ctx.watch().id()); } catch (Exception e) {
String detailedMessage = ExceptionsHelper.detailedMessage(e);
logger.warn("failed to execute watch [{}], failure [{}]", ctx.id(), detailedMessage);
record = ctx.abortFailedExecution(detailedMessage);
} finally { } finally {
if (ctx.knownWatch() && record != null && ctx.recordExecution()) {
try {
historyStore.put(record);
} catch (Exception e) {
logger.error("failed to update watch record [{}]", e, ctx.id());
}
}
try {
triggeredWatchStore.delete(ctx.id());
} catch (Exception e) {
logger.error("failed to delete triggered watch [{}]", e, ctx.id());
}
currentExecutions.remove(ctx.watch().id()); currentExecutions.remove(ctx.watch().id());
if (logger.isTraceEnabled()) {
logger.trace("releasing lock for [{}] -- [{}]", ctx.id(), System.identityHashCode(lock));
}
lock.release(); lock.release();
logger.trace("finished [{}]/[{}]", ctx.watch().id(), ctx.id());
} }
if (ctx.recordExecution()) {
historyStore.put(record);
}
return record; return record;
} }
@ -300,7 +301,7 @@ public class ExecutionService extends AbstractComponent {
} catch (EsRejectedExecutionException e) { } catch (EsRejectedExecutionException e) {
String message = "failed to run triggered watch [" + triggeredWatch.id() + "] due to thread pool capacity"; String message = "failed to run triggered watch [" + triggeredWatch.id() + "] due to thread pool capacity";
logger.debug(message); logger.debug(message);
WatchRecord record = ctx.abort(message, ExecutionState.FAILED); WatchRecord record = ctx.abortBeforeExecution(message, ExecutionState.FAILED);
historyStore.put(record); historyStore.put(record);
triggeredWatchStore.delete(triggeredWatch.id()); triggeredWatchStore.delete(triggeredWatch.id());
} }
@ -309,12 +310,19 @@ public class ExecutionService extends AbstractComponent {
WatchRecord executeInner(WatchExecutionContext ctx) throws IOException { WatchRecord executeInner(WatchExecutionContext ctx) throws IOException {
ctx.start(); ctx.start();
Watch watch = ctx.watch(); Watch watch = ctx.watch();
// input
ctx.beforeInput(); ctx.beforeInput();
Input.Result inputResult = ctx.inputResult(); Input.Result inputResult = ctx.inputResult();
if (inputResult == null) { if (inputResult == null) {
inputResult = watch.input().execute(ctx); inputResult = watch.input().execute(ctx);
ctx.onInputResult(inputResult); ctx.onInputResult(inputResult);
} }
if (inputResult.status() == Input.Result.Status.FAILURE) {
return ctx.abortFailedExecution("failed to execute watch input");
}
// condition
ctx.beforeCondition(); ctx.beforeCondition();
Condition.Result conditionResult = ctx.conditionResult(); Condition.Result conditionResult = ctx.conditionResult();
if (conditionResult == null) { if (conditionResult == null) {
@ -323,23 +331,26 @@ public class ExecutionService extends AbstractComponent {
} }
if (conditionResult.met()) { if (conditionResult.met()) {
// actions
ctx.beforeAction(); ctx.beforeAction();
for (ActionWrapper action : watch.actions()) { for (ActionWrapper action : watch.actions()) {
ActionWrapper.Result actionResult = action.execute(ctx); ActionWrapper.Result actionResult = action.execute(ctx);
ctx.onActionResult(actionResult); ctx.onActionResult(actionResult);
} }
} }
return ctx.finish(); return ctx.finish();
} }
void executeRecords(Collection<TriggeredWatch> triggeredWatches) { void executeTriggeredWatches(Collection<TriggeredWatch> triggeredWatches) {
assert triggeredWatches != null; assert triggeredWatches != null;
int counter = 0; int counter = 0;
for (TriggeredWatch triggeredWatch : triggeredWatches) { for (TriggeredWatch triggeredWatch : triggeredWatches) {
Watch watch = watchStore.get(triggeredWatch.id().watchId()); Watch watch = watchStore.get(triggeredWatch.id().watchId());
if (watch == null) { if (watch == null) {
String message = "unable to find watch for record [" + triggeredWatch.id().watchId() + "]/[" + triggeredWatch.id() + "], perhaps it has been deleted, ignoring..."; String message = "unable to find watch for record [" + triggeredWatch.id().watchId() + "]/[" + triggeredWatch.id() + "], perhaps it has been deleted, ignoring...";
WatchRecord record = new WatchRecord(triggeredWatch.id(), triggeredWatch.triggerEvent(), message, ExecutionState.DELETED_WHILE_QUEUED); WatchRecord record = new WatchRecord(triggeredWatch.id(), triggeredWatch.triggerEvent(), message, ExecutionState.NOT_EXECUTED_WATCH_MISSING);
historyStore.put(record); historyStore.put(record);
triggeredWatchStore.delete(triggeredWatch.id()); triggeredWatchStore.delete(triggeredWatch.id());
} else { } else {
@ -361,47 +372,7 @@ public class ExecutionService extends AbstractComponent {
@Override @Override
public void run() { public void run() {
if (!started.get()) { execute(ctx);
logger.debug("can't initiate watch execution as execution service is not started, ignoring it...");
return;
}
logger.trace("executing [{}] [{}]", ctx.watch().id(), ctx.id());
WatchRecord record = null;
WatchLockService.Lock lock = watchLockService.acquire(ctx.watch().id());
try {
currentExecutions.put(ctx.watch().id(), new WatchExecution(ctx, Thread.currentThread()));
if (watchStore.get(ctx.watch().id()) == null) {
//Fail fast if we are trying to execute a deleted watch
String message = "unable to find watch for record [" + ctx.id() + "], perhaps it has been deleted, ignoring...";
record = ctx.abort(message, ExecutionState.DELETED_WHILE_QUEUED);
} else {
logger.debug("checking watch [{}]", ctx.id().watchId());
record = executeInner(ctx);
if (ctx.recordExecution()) {
watchStore.updateStatus(ctx.watch());
}
}
} catch (Exception e) {
String detailedMessage = ExceptionsHelper.detailedMessage(e);
logger.warn("failed to execute watch [{}], failure [{}]", ctx.id().value(), detailedMessage);
record = ctx.abort(detailedMessage, ExecutionState.FAILED);
} finally {
// The recordExecution doesn't need to check if it is in a started state here, because the
// ExecutionService doesn't stop before the WatchLockService stops
if (record != null && ctx.recordExecution()) {
try {
historyStore.put(record);
triggeredWatchStore.delete(ctx.id());
} catch (Exception e) {
logger.error("failed to update watch record [{}], failure [{}], record failure if any [{}]", ctx.id(), ExceptionsHelper.detailedMessage(e), record.message());
}
}
currentExecutions.remove(ctx.watch().id());
lock.release();
logger.trace("finished [{}]/[{}]", ctx.watch().id(), ctx.id());
}
} }
} }

View File

@ -15,7 +15,7 @@ public enum ExecutionState {
THROTTLED, THROTTLED,
EXECUTED, EXECUTED,
FAILED, FAILED,
DELETED_WHILE_QUEUED; NOT_EXECUTED_WATCH_MISSING;
public String id() { public String id() {
return name().toLowerCase(Locale.ROOT); return name().toLowerCase(Locale.ROOT);

View File

@ -25,8 +25,9 @@ public class ManualExecutionContext extends WatchExecutionContext {
private final Map<String, ActionExecutionMode> actionModes; private final Map<String, ActionExecutionMode> actionModes;
private final boolean recordExecution; private final boolean recordExecution;
private final boolean knownWatch;
ManualExecutionContext(Watch watch, DateTime executionTime, ManualTriggerEvent triggerEvent, ManualExecutionContext(Watch watch, boolean knownWatch, DateTime executionTime, ManualTriggerEvent triggerEvent,
TimeValue defaultThrottlePeriod, Input.Result inputResult, Condition.Result conditionResult, TimeValue defaultThrottlePeriod, Input.Result inputResult, Condition.Result conditionResult,
Map<String, ActionExecutionMode> actionModes, boolean recordExecution) { Map<String, ActionExecutionMode> actionModes, boolean recordExecution) {
@ -34,6 +35,7 @@ public class ManualExecutionContext extends WatchExecutionContext {
this.actionModes = actionModes; this.actionModes = actionModes;
this.recordExecution = recordExecution; this.recordExecution = recordExecution;
this.knownWatch = knownWatch;
if (inputResult != null) { if (inputResult != null) {
onInputResult(inputResult); onInputResult(inputResult);
@ -57,6 +59,11 @@ public class ManualExecutionContext extends WatchExecutionContext {
} }
} }
@Override
public boolean knownWatch() {
return knownWatch;
}
@Override @Override
public final boolean simulateAction(String actionId) { public final boolean simulateAction(String actionId) {
ActionExecutionMode mode = actionModes.get(Builder.ALL); ActionExecutionMode mode = actionModes.get(Builder.ALL);
@ -80,8 +87,8 @@ public class ManualExecutionContext extends WatchExecutionContext {
return recordExecution; return recordExecution;
} }
public static Builder builder(Watch watch, ManualTriggerEvent event, TimeValue defaultThrottlePeriod) { public static Builder builder(Watch watch, boolean knownWatch, ManualTriggerEvent event, TimeValue defaultThrottlePeriod) {
return new Builder(watch, event, defaultThrottlePeriod); return new Builder(watch, knownWatch, event, defaultThrottlePeriod);
} }
public static class Builder { public static class Builder {
@ -89,6 +96,7 @@ public class ManualExecutionContext extends WatchExecutionContext {
static final String ALL = "_all"; static final String ALL = "_all";
private final Watch watch; private final Watch watch;
private final boolean knownWatch;
private final ManualTriggerEvent triggerEvent; private final ManualTriggerEvent triggerEvent;
private final TimeValue defaultThrottlePeriod; private final TimeValue defaultThrottlePeriod;
protected DateTime executionTime; protected DateTime executionTime;
@ -97,8 +105,9 @@ public class ManualExecutionContext extends WatchExecutionContext {
private Input.Result inputResult; private Input.Result inputResult;
private Condition.Result conditionResult; private Condition.Result conditionResult;
private Builder(Watch watch, ManualTriggerEvent triggerEvent, TimeValue defaultThrottlePeriod) { private Builder(Watch watch, boolean knownWatch, ManualTriggerEvent triggerEvent, TimeValue defaultThrottlePeriod) {
this.watch = watch; this.watch = watch;
this.knownWatch = knownWatch;
assert triggerEvent != null; assert triggerEvent != null;
this.triggerEvent = triggerEvent; this.triggerEvent = triggerEvent;
this.defaultThrottlePeriod = defaultThrottlePeriod; this.defaultThrottlePeriod = defaultThrottlePeriod;
@ -140,7 +149,7 @@ public class ManualExecutionContext extends WatchExecutionContext {
if (executionTime == null) { if (executionTime == null) {
executionTime = DateTime.now(UTC); executionTime = DateTime.now(UTC);
} }
return new ManualExecutionContext(watch, executionTime, triggerEvent, defaultThrottlePeriod, inputResult, conditionResult, actionModes.build(), recordExecution); return new ManualExecutionContext(watch, knownWatch, executionTime, triggerEvent, defaultThrottlePeriod, inputResult, conditionResult, actionModes.build(), recordExecution);
} }
} }
} }

View File

@ -18,6 +18,11 @@ public class TriggeredExecutionContext extends WatchExecutionContext {
super(watch, executionTime, triggerEvent, defaultThrottlePeriod); super(watch, executionTime, triggerEvent, defaultThrottlePeriod);
} }
@Override
public boolean knownWatch() {
return true;
}
@Override @Override
public final boolean simulateAction(String actionId) { public final boolean simulateAction(String actionId) {
return false; return false;

View File

@ -20,6 +20,7 @@ import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType; import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.collect.ImmutableList;
import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.settings.ImmutableSettings;
@ -158,6 +159,27 @@ public class TriggeredWatchStore extends AbstractComponent {
} }
public void putAll(final List<TriggeredWatch> triggeredWatches, final ActionListener<List<Integer>> listener) throws TriggeredWatchException { public void putAll(final List<TriggeredWatch> triggeredWatches, final ActionListener<List<Integer>> listener) throws TriggeredWatchException {
if (triggeredWatches.isEmpty()) {
listener.onResponse(ImmutableList.<Integer>of());
return;
}
if (triggeredWatches.size() == 1) {
put(triggeredWatches.get(0), new ActionListener<Boolean>() {
@Override
public void onResponse(Boolean success) {
listener.onResponse(ImmutableList.of(0));
}
@Override
public void onFailure(Throwable e) {
listener.onFailure(e);
}
});
return;
}
ensureStarted(); ensureStarted();
try { try {
BulkRequest request = new BulkRequest(); BulkRequest request = new BulkRequest();

View File

@ -45,6 +45,8 @@ public abstract class WatchExecutionContext {
private long actualExecutionStartMs; private long actualExecutionStartMs;
private boolean sealed = false;
public WatchExecutionContext(Watch watch, DateTime executionTime, TriggerEvent triggerEvent, TimeValue defaultThrottlePeriod) { public WatchExecutionContext(Watch watch, DateTime executionTime, TriggerEvent triggerEvent, TimeValue defaultThrottlePeriod) {
this.id = new Wid(watch.id(), watch.nonce(), executionTime); this.id = new Wid(watch.id(), watch.nonce(), executionTime);
this.watch = watch; this.watch = watch;
@ -53,6 +55,16 @@ public abstract class WatchExecutionContext {
this.defaultThrottlePeriod = defaultThrottlePeriod; this.defaultThrottlePeriod = defaultThrottlePeriod;
} }
/**
* @return true if the watch associated with this context is known to watcher (i.e. it's stored
* in watcher. This plays a key role in how we handle execution. For example, if
* the watch is known, but then the watch is not there (perhaps deleted in between)
* we abort execution. It also plays a part (along with {@link #recordExecution()}
* in the decision of whether the watch record should be stored and if the watch
* status should be updated.
*/
public abstract boolean knownWatch();
/** /**
* @return true if this action should be simulated * @return true if this action should be simulated
*/ */
@ -107,6 +119,7 @@ public abstract class WatchExecutionContext {
transformedPayload = payload; transformedPayload = payload;
return transformedPayload; return transformedPayload;
} }
beforeWatchTransform();
this.transformResult = watch.transform().execute(this, payload); this.transformResult = watch.transform().execute(this, payload);
this.payload = transformResult.payload(); this.payload = transformResult.payload();
this.transformedPayload = this.payload; this.transformedPayload = this.payload;
@ -118,12 +131,16 @@ public abstract class WatchExecutionContext {
} }
public void beforeInput() { public void beforeInput() {
assert !sealed;
executionPhase = ExecutionPhase.INPUT; executionPhase = ExecutionPhase.INPUT;
} }
public void onInputResult(Input.Result inputResult) { public void onInputResult(Input.Result inputResult) {
assert !sealed;
this.inputResult = inputResult; this.inputResult = inputResult;
this.payload = inputResult.payload(); if (inputResult.status() == Input.Result.Status.SUCCESS) {
this.payload = inputResult.payload();
}
} }
public Input.Result inputResult() { public Input.Result inputResult() {
@ -131,10 +148,12 @@ public abstract class WatchExecutionContext {
} }
public void beforeCondition() { public void beforeCondition() {
assert !sealed;
executionPhase = ExecutionPhase.CONDITION; executionPhase = ExecutionPhase.CONDITION;
} }
public void onConditionResult(Condition.Result conditionResult) { public void onConditionResult(Condition.Result conditionResult) {
assert !sealed;
this.conditionResult = conditionResult; this.conditionResult = conditionResult;
if (recordExecution()) { if (recordExecution()) {
watch.status().onCheck(conditionResult.met(), executionTime); watch.status().onCheck(conditionResult.met(), executionTime);
@ -147,23 +166,21 @@ public abstract class WatchExecutionContext {
} }
public void beforeWatchTransform() { public void beforeWatchTransform() {
assert !sealed;
this.executionPhase = ExecutionPhase.WATCH_TRANSFORM; this.executionPhase = ExecutionPhase.WATCH_TRANSFORM;
} }
public void onTransformResult(Transform.Result transformResult) {
this.transformResult = transformResult;
this.payload = transformResult.payload();
}
public Transform.Result transformResult() { public Transform.Result transformResult() {
return transformResult; return transformResult;
} }
public void beforeAction() { public void beforeAction() {
assert !sealed;
executionPhase = ExecutionPhase.ACTIONS; executionPhase = ExecutionPhase.ACTIONS;
} }
public void onActionResult(ActionWrapper.Result result) { public void onActionResult(ActionWrapper.Result result) {
assert !sealed;
actionsResults.put(result.id(), result); actionsResults.put(result.id(), result);
if (recordExecution()) { if (recordExecution()) {
watch.status().onActionResult(result.id(), executionTime, result.action()); watch.status().onActionResult(result.id(), executionTime, result.action());
@ -174,21 +191,31 @@ public abstract class WatchExecutionContext {
return new ExecutableActions.Results(actionsResults); return new ExecutableActions.Results(actionsResults);
} }
public WatchRecord abortBeforeExecution(String message, ExecutionState state) {
sealed = true;
return new WatchRecord(id, triggerEvent, message, state);
}
public void start() { public void start() {
assert !sealed;
actualExecutionStartMs = System.currentTimeMillis(); actualExecutionStartMs = System.currentTimeMillis();
} }
public WatchRecord abortFailedExecution(String message) {
sealed = true;
long executionFinishMs = System.currentTimeMillis();
WatchExecutionResult result = new WatchExecutionResult(this, executionFinishMs - actualExecutionStartMs);
return new WatchRecord(this, result, message);
}
public WatchRecord finish() { public WatchRecord finish() {
sealed = true;
executionPhase = ExecutionPhase.FINISHED; executionPhase = ExecutionPhase.FINISHED;
long executionFinishMs = System.currentTimeMillis(); long executionFinishMs = System.currentTimeMillis();
WatchExecutionResult result = new WatchExecutionResult(this, executionFinishMs - actualExecutionStartMs); WatchExecutionResult result = new WatchExecutionResult(this, executionFinishMs - actualExecutionStartMs);
return new WatchRecord(this, result); return new WatchRecord(this, result);
} }
public WatchRecord abort(String message, ExecutionState state) {
return new WatchRecord(id, triggerEvent, message, state);
}
public WatchExecutionSnapshot createSnapshot(Thread executionThread) { public WatchExecutionSnapshot createSnapshot(Thread executionThread) {
return new WatchExecutionSnapshot(this, executionThread.getStackTrace()); return new WatchExecutionSnapshot(this, executionThread.getStackTrace());
} }

View File

@ -27,8 +27,8 @@ public class WatchExecutionResult implements ToXContent {
private final DateTime executionTime; private final DateTime executionTime;
private final long executionDurationMs; private final long executionDurationMs;
private final Input.Result inputResult; private final @Nullable Input.Result inputResult;
private final Condition.Result conditionResult; private final @Nullable Condition.Result conditionResult;
private final @Nullable Transform.Result transformResult; private final @Nullable Transform.Result transformResult;
private final ExecutableActions.Results actionsResults; private final ExecutableActions.Results actionsResults;

View File

@ -10,7 +10,10 @@ import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.watcher.condition.Condition; import org.elasticsearch.watcher.condition.Condition;
import org.elasticsearch.watcher.execution.*; import org.elasticsearch.watcher.execution.ExecutionState;
import org.elasticsearch.watcher.execution.WatchExecutionContext;
import org.elasticsearch.watcher.execution.WatchExecutionResult;
import org.elasticsearch.watcher.execution.Wid;
import org.elasticsearch.watcher.input.ExecutableInput; import org.elasticsearch.watcher.input.ExecutableInput;
import org.elasticsearch.watcher.trigger.TriggerEvent; import org.elasticsearch.watcher.trigger.TriggerEvent;
import org.elasticsearch.watcher.watch.Watch; import org.elasticsearch.watcher.watch.Watch;
@ -31,6 +34,9 @@ public class WatchRecord implements ToXContent {
private final @Nullable String message; private final @Nullable String message;
private final @Nullable WatchExecutionResult executionResult; private final @Nullable WatchExecutionResult executionResult;
/**
* Called when the execution was aborted before it started
*/
public WatchRecord(Wid id, TriggerEvent triggerEvent, String message, ExecutionState state) { public WatchRecord(Wid id, TriggerEvent triggerEvent, String message, ExecutionState state) {
this.id = id; this.id = id;
this.triggerEvent = triggerEvent; this.triggerEvent = triggerEvent;
@ -42,6 +48,24 @@ public class WatchRecord implements ToXContent {
this.metadata = null; this.metadata = null;
} }
/**
* Called when the execution was aborted due to an error during execution (the given result should reflect
* were exactly the execution failed)
*/
public WatchRecord(WatchExecutionContext context, WatchExecutionResult executionResult, String message) {
this.id = context.id();
this.triggerEvent = context.triggerEvent();
this.condition = context.watch().condition().condition();
this.input = context.watch().input();
this.metadata = context.watch().metadata();
this.executionResult = executionResult;
this.message = message;
this.state = ExecutionState.FAILED;
}
/**
* Called when the execution finished.
*/
public WatchRecord(WatchExecutionContext context, WatchExecutionResult executionResult) { public WatchRecord(WatchExecutionContext context, WatchExecutionResult executionResult) {
this.id = context.id(); this.id = context.id();
this.triggerEvent = context.triggerEvent(); this.triggerEvent = context.triggerEvent();
@ -92,7 +116,7 @@ public class WatchRecord implements ToXContent {
return metadata; return metadata;
} }
public WatchExecutionResult execution() { public WatchExecutionResult result() {
return executionResult; return executionResult;
} }

View File

@ -39,7 +39,7 @@ public abstract class ExecutableInput<I extends Input, R extends Input.Result> i
/** /**
* Executes this input * Executes this input
*/ */
public abstract R execute(WatchExecutionContext ctx) throws IOException; public abstract R execute(WatchExecutionContext ctx);
@Override @Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {

View File

@ -5,12 +5,14 @@
*/ */
package org.elasticsearch.watcher.input; package org.elasticsearch.watcher.input;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.common.ParseField; import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.watcher.watch.Payload; import org.elasticsearch.watcher.watch.Payload;
import java.io.IOException; import java.io.IOException;
import java.util.Locale;
/** /**
* *
@ -21,28 +23,64 @@ public interface Input extends ToXContent {
abstract class Result implements ToXContent { abstract class Result implements ToXContent {
public enum Status {
SUCCESS, FAILURE
}
protected final Status status;
protected final String type; protected final String type;
private final Payload payload; private final Payload payload;
private final String reason;
public Result(String type, Payload payload) { protected Result(String type, Payload payload) {
this.status = Status.SUCCESS;
this.type = type; this.type = type;
this.payload = payload; this.payload = payload;
this.reason = null;
}
protected Result(String type, Exception e) {
this.status = Status.FAILURE;
this.type = type;
this.reason = ExceptionsHelper.detailedMessage(e);
this.payload = null;
} }
public String type() { public String type() {
return type; return type;
} }
public Status status() {
return status;
}
public Payload payload() { public Payload payload() {
assert status == Status.SUCCESS;
return payload; return payload;
} }
public String reason() {
assert status == Status.FAILURE;
return reason;
}
@Override @Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject(); builder.startObject();
builder.field(Field.STATUS.getPreferredName(), status.name().toLowerCase(Locale.ROOT));
builder.field(Field.TYPE.getPreferredName(), type); builder.field(Field.TYPE.getPreferredName(), type);
builder.field(Field.PAYLOAD.getPreferredName(), payload, params); switch (status) {
case SUCCESS:
assert payload != null;
builder.field(Field.PAYLOAD.getPreferredName(), payload, params);
break;
case FAILURE:
assert reason != null;
builder.field(Field.REASON.getPreferredName(), reason);
break;
default:
assert false;
}
typeXContent(builder, params); typeXContent(builder, params);
return builder.endObject(); return builder.endObject();
} }
@ -57,7 +95,9 @@ public interface Input extends ToXContent {
} }
interface Field { interface Field {
ParseField STATUS = new ParseField("status");
ParseField TYPE = new ParseField("type"); ParseField TYPE = new ParseField("type");
ParseField PAYLOAD = new ParseField("payload"); ParseField PAYLOAD = new ParseField("payload");
ParseField REASON = new ParseField("reason");
} }
} }

View File

@ -12,6 +12,7 @@ import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.watcher.execution.WatchExecutionContext; import org.elasticsearch.watcher.execution.WatchExecutionContext;
import org.elasticsearch.watcher.input.ExecutableInput; import org.elasticsearch.watcher.input.ExecutableInput;
import org.elasticsearch.watcher.input.search.SearchInput;
import org.elasticsearch.watcher.support.Variables; import org.elasticsearch.watcher.support.Variables;
import org.elasticsearch.watcher.support.XContentFilterKeysUtils; import org.elasticsearch.watcher.support.XContentFilterKeysUtils;
import org.elasticsearch.watcher.support.http.HttpClient; import org.elasticsearch.watcher.support.http.HttpClient;
@ -20,7 +21,6 @@ import org.elasticsearch.watcher.support.http.HttpResponse;
import org.elasticsearch.watcher.support.template.TemplateEngine; import org.elasticsearch.watcher.support.template.TemplateEngine;
import org.elasticsearch.watcher.watch.Payload; import org.elasticsearch.watcher.watch.Payload;
import java.io.IOException;
import java.util.Map; import java.util.Map;
/** /**
@ -36,15 +36,23 @@ public class ExecutableHttpInput extends ExecutableInput<HttpInput, HttpInput.Re
this.templateEngine = templateEngine; this.templateEngine = templateEngine;
} }
@Override public HttpInput.Result execute(WatchExecutionContext ctx) {
public HttpInput.Result execute(WatchExecutionContext ctx) throws IOException { HttpRequest request = null;
Map<String, Object> model = Variables.createCtxModel(ctx, null); try {
HttpRequest request = input.getRequest().render(templateEngine, model); Map<String, Object> model = Variables.createCtxModel(ctx, null);
request = input.getRequest().render(templateEngine, model);
return doExecute(ctx, request);
} catch (Exception e) {
logger.error("failed to execute [{}] input for [{}]", e, HttpInput.TYPE, ctx.watch());
return new HttpInput.Result(request, e);
}
}
HttpInput.Result doExecute(WatchExecutionContext ctx, HttpRequest request) throws Exception {
HttpResponse response = client.execute(request); HttpResponse response = client.execute(request);
if (!response.hasContent()) { if (!response.hasContent()) {
return new HttpInput.Result(Payload.EMPTY, request, response.status()); return new HttpInput.Result(request, response.status(), Payload.EMPTY);
} }
XContentType contentType = response.xContentType(); XContentType contentType = response.xContentType();
@ -52,7 +60,6 @@ public class ExecutableHttpInput extends ExecutableInput<HttpInput, HttpInput.Re
if (contentType != input.getExpectedResponseXContentType().contentType()) { if (contentType != input.getExpectedResponseXContentType().contentType()) {
logger.warn("[{}] [{}] input expected content type [{}] but read [{}] from headers", type(), ctx.id(), input.getExpectedResponseXContentType(), contentType); logger.warn("[{}] [{}] input expected content type [{}] but read [{}] from headers", type(), ctx.id(), input.getExpectedResponseXContentType(), contentType);
} }
if (contentType == null) { if (contentType == null) {
contentType = input.getExpectedResponseXContentType().contentType(); contentType = input.getExpectedResponseXContentType().contentType();
} }
@ -68,7 +75,7 @@ public class ExecutableHttpInput extends ExecutableInput<HttpInput, HttpInput.Re
try { try {
parser = contentType.xContent().createParser(response.body()); parser = contentType.xContent().createParser(response.body());
} catch (Exception e) { } catch (Exception e) {
throw new HttpInputException("[{}] [{}] input could not parse response body [{}] it does not appear to be [{}]", type(), ctx.id(), response.body().toUtf8(), contentType.shortName()); throw new HttpInputException("could not parse response body [{}] it does not appear to be [{}]", type(), ctx.id(), response.body().toUtf8(), contentType.shortName());
} }
} }
@ -84,6 +91,6 @@ public class ExecutableHttpInput extends ExecutableInput<HttpInput, HttpInput.Re
payload = new Payload.Simple("_value", response.body().toUtf8()); payload = new Payload.Simple("_value", response.body().toUtf8());
} }
} }
return new HttpInput.Result(payload, request, response.status()); return new HttpInput.Result(request, response.status(), payload);
} }
} }

View File

@ -129,29 +129,40 @@ public class HttpInput implements Input {
public static class Result extends Input.Result { public static class Result extends Input.Result {
private final HttpRequest request; private final @Nullable HttpRequest request;
private final int status; private final int statusCode;
public Result(Payload payload, HttpRequest request, int status) { public Result(HttpRequest request, int statusCode, Payload payload) {
super(TYPE, payload); super(TYPE, payload);
this.request = request; this.request = request;
this.status = status; this.statusCode = statusCode;
}
public Result(@Nullable HttpRequest request, Exception e) {
super(TYPE, e);
this.request = request;
this.statusCode = -1;
} }
public HttpRequest request() { public HttpRequest request() {
return request; return request;
} }
public int status() { public int statusCode() {
return status; return statusCode;
} }
@Override @Override
protected XContentBuilder typeXContent(XContentBuilder builder, Params params) throws IOException { protected XContentBuilder typeXContent(XContentBuilder builder, Params params) throws IOException {
return builder.startObject(type) if (request == null) {
.field(Field.REQUEST.getPreferredName(), request, params) return builder;
.field(Field.STATUS.getPreferredName(), status) }
.endObject(); builder.startObject(type);
builder.field(Field.REQUEST.getPreferredName(), request, params);
if (statusCode > 0) {
builder.field(Field.STATUS_CODE.getPreferredName(), statusCode);
}
return builder.endObject();
} }
} }
@ -190,7 +201,7 @@ public class HttpInput implements Input {
interface Field extends Input.Field { interface Field extends Input.Field {
ParseField REQUEST = new ParseField("request"); ParseField REQUEST = new ParseField("request");
ParseField EXTRACT = new ParseField("extract"); ParseField EXTRACT = new ParseField("extract");
ParseField STATUS = new ParseField("status"); ParseField STATUS_CODE = new ParseField("status_code");
ParseField RESPONSE_CONTENT_TYPE = new ParseField("response_content_type"); ParseField RESPONSE_CONTENT_TYPE = new ParseField("response_content_type");
} }
} }

View File

@ -10,8 +10,6 @@ import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.watcher.execution.WatchExecutionContext; import org.elasticsearch.watcher.execution.WatchExecutionContext;
import org.elasticsearch.watcher.input.ExecutableInput; import org.elasticsearch.watcher.input.ExecutableInput;
import java.io.IOException;
/** /**
* *
*/ */
@ -22,7 +20,7 @@ public class ExecutableNoneInput extends ExecutableInput<NoneInput, NoneInput.Re
} }
@Override @Override
public NoneInput.Result execute(WatchExecutionContext ctx) throws IOException { public NoneInput.Result execute(WatchExecutionContext ctx) {
return NoneInput.Result.INSTANCE; return NoneInput.Result.INSTANCE;
} }

View File

@ -21,7 +21,6 @@ import org.elasticsearch.watcher.support.XContentFilterKeysUtils;
import org.elasticsearch.watcher.support.init.proxy.ClientProxy; import org.elasticsearch.watcher.support.init.proxy.ClientProxy;
import org.elasticsearch.watcher.watch.Payload; import org.elasticsearch.watcher.watch.Payload;
import java.io.IOException;
import java.util.Map; import java.util.Map;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
@ -40,23 +39,31 @@ public class ExecutableSearchInput extends ExecutableInput<SearchInput, SearchIn
this.client = client; this.client = client;
} }
@Override public SearchInput.Result execute(WatchExecutionContext ctx) {
public SearchInput.Result execute(WatchExecutionContext ctx) throws IOException { SearchRequest request = null;
try {
request = WatcherUtils.createSearchRequestFromPrototype(input.getSearchRequest(), ctx, null);
return doExecute(ctx, request);
} catch (Exception e) {
logger.error("failed to execute [{}] input for [{}]", e, SearchInput.TYPE, ctx.watch());
return new SearchInput.Result(request, e);
}
}
SearchInput.Result doExecute(WatchExecutionContext ctx, SearchRequest request) throws Exception {
SearchRequest request = WatcherUtils.createSearchRequestFromPrototype(input.getSearchRequest(), ctx, null);
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
BytesReference source = request.source() != null ? request.source() : request.templateSource(); BytesReference source = request.source() != null ? request.source() : request.templateSource();
logger.trace("[{}] running query for [{}] [{}]", ctx.id(), ctx.watch().id(), XContentHelper.convertToJson(source, false, true)); logger.trace("[{}] running query for [{}] [{}]", ctx.id(), ctx.watch().id(), XContentHelper.convertToJson(source, false, true));
} }
// actionGet deals properly with InterruptedException
SearchResponse response = client.search(request); SearchResponse response = client.search(request);
if (logger.isDebugEnabled()) { if (logger.isDebugEnabled()) {
logger.debug("[{}] found [{}] hits", ctx.id(), ctx.watch().id(), response.getHits().getTotalHits()); logger.debug("[{}] found [{}] hits", ctx.id(), response.getHits().getTotalHits());
for (SearchHit hit : response.getHits()) { for (SearchHit hit : response.getHits()) {
logger.debug("[{}] hit [{}]", ctx.id(), XContentHelper.toString(hit)); logger.debug("[{}] hit [{}]", ctx.id(), XContentHelper.toString(hit));
} }
} }
final Payload payload; final Payload payload;

View File

@ -126,19 +126,27 @@ public class SearchInput implements Input {
public static class Result extends Input.Result { public static class Result extends Input.Result {
private final SearchRequest request; private final @Nullable SearchRequest request;
public Result(SearchRequest request, Payload payload) { public Result(SearchRequest request, Payload payload) {
super(TYPE, payload); super(TYPE, payload);
this.request = request; this.request = request;
} }
public Result(@Nullable SearchRequest request, Exception e) {
super(TYPE, e);
this.request = request;
}
public SearchRequest executedRequest() { public SearchRequest executedRequest() {
return request; return request;
} }
@Override @Override
protected XContentBuilder typeXContent(XContentBuilder builder, Params params) throws IOException { protected XContentBuilder typeXContent(XContentBuilder builder, Params params) throws IOException {
if (request == null) {
return builder;
}
builder.startObject(type); builder.startObject(type);
builder.field(Field.REQUEST.getPreferredName()); builder.field(Field.REQUEST.getPreferredName());
WatcherUtils.writeSearchRequest(request, builder, params); WatcherUtils.writeSearchRequest(request, builder, params);

View File

@ -9,8 +9,6 @@ import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.watcher.execution.WatchExecutionContext; import org.elasticsearch.watcher.execution.WatchExecutionContext;
import org.elasticsearch.watcher.input.ExecutableInput; import org.elasticsearch.watcher.input.ExecutableInput;
import java.io.IOException;
/** /**
* This class just defines a simple xcontent map as an input * This class just defines a simple xcontent map as an input
*/ */
@ -21,7 +19,7 @@ public class ExecutableSimpleInput extends ExecutableInput<SimpleInput, SimpleIn
} }
@Override @Override
public SimpleInput.Result execute(WatchExecutionContext ctx) throws IOException { public SimpleInput.Result execute(WatchExecutionContext ctx) {
return new SimpleInput.Result(input.getPayload()); return new SimpleInput.Result(input.getPayload());
} }
} }

View File

@ -89,7 +89,6 @@ public class LicenseService extends AbstractLifecycleComponent<LicenseService> {
@Override @Override
protected void doStop() throws ElasticsearchException { protected void doStop() throws ElasticsearchException {
} }
@Override @Override

View File

@ -17,6 +17,7 @@ import java.util.Map;
public final class Variables { public final class Variables {
public static final String CTX = "ctx"; public static final String CTX = "ctx";
public static final String ID = "id";
public static final String WATCH_ID = "watch_id"; public static final String WATCH_ID = "watch_id";
public static final String EXECUTION_TIME = "execution_time"; public static final String EXECUTION_TIME = "execution_time";
public static final String TRIGGER = "trigger"; public static final String TRIGGER = "trigger";
@ -25,6 +26,7 @@ public final class Variables {
public static Map<String, Object> createCtxModel(WatchExecutionContext ctx, Payload payload) { public static Map<String, Object> createCtxModel(WatchExecutionContext ctx, Payload payload) {
Map<String, Object> vars = new HashMap<>(); Map<String, Object> vars = new HashMap<>();
vars.put(ID, ctx.id().value());
vars.put(WATCH_ID, ctx.watch().id()); vars.put(WATCH_ID, ctx.watch().id());
vars.put(EXECUTION_TIME, ctx.executionTime()); vars.put(EXECUTION_TIME, ctx.executionTime());
vars.put(TRIGGER, ctx.triggerEvent().data()); vars.put(TRIGGER, ctx.triggerEvent().data());

View File

@ -6,6 +6,7 @@
package org.elasticsearch.watcher.support.http; package org.elasticsearch.watcher.support.http;
import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.common.base.Charsets; import org.elasticsearch.common.base.Charsets;
import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.component.AbstractLifecycleComponent;
@ -83,6 +84,14 @@ public class HttpClient extends AbstractLifecycleComponent<HttpClient> {
} }
public HttpResponse execute(HttpRequest request) throws IOException { public HttpResponse execute(HttpRequest request) throws IOException {
try {
return doExecute(request);
} catch (SocketTimeoutException ste) {
throw new ElasticsearchTimeoutException("failed to execute http request. timeout expired", ste);
}
}
public HttpResponse doExecute(HttpRequest request) throws IOException {
String queryString = null; String queryString = null;
if (request.params() != null && !request.params().isEmpty()) { if (request.params() != null && !request.params().isEmpty()) {
StringBuilder builder = new StringBuilder(); StringBuilder builder = new StringBuilder();

View File

@ -28,6 +28,8 @@ import org.elasticsearch.transport.TransportMessage;
import org.elasticsearch.watcher.shield.ShieldIntegration; import org.elasticsearch.watcher.shield.ShieldIntegration;
import org.elasticsearch.watcher.support.init.InitializingService; import org.elasticsearch.watcher.support.init.InitializingService;
import java.util.concurrent.TimeUnit;
/** /**
* A lazily initialized proxy to an elasticsearch {@link Client}. Inject this proxy whenever a client * A lazily initialized proxy to an elasticsearch {@link Client}. Inject this proxy whenever a client
* needs to injected to be avoid circular dependencies issues. * needs to injected to be avoid circular dependencies issues.
@ -88,7 +90,7 @@ public class ClientProxy implements InitializingService.Initializable {
} }
public SearchResponse search(SearchRequest request) { public SearchResponse search(SearchRequest request) {
return client.search(preProcess(request)).actionGet(); return client.search(preProcess(request)).actionGet(5, TimeUnit.SECONDS);
} }
public SearchResponse searchScroll(String scrollId, TimeValue timeout) { public SearchResponse searchScroll(String scrollId, TimeValue timeout) {

View File

@ -87,7 +87,7 @@ public class TransportExecuteWatchAction extends WatcherTransportAction<ExecuteW
String triggerType = watch.trigger().type(); String triggerType = watch.trigger().type();
TriggerEvent triggerEvent = triggerService.simulateEvent(triggerType, watch.id(), request.getTriggerData()); TriggerEvent triggerEvent = triggerService.simulateEvent(triggerType, watch.id(), request.getTriggerData());
ManualExecutionContext.Builder ctxBuilder = ManualExecutionContext.builder(watch, new ManualTriggerEvent(triggerEvent.jobName(), triggerEvent), executionService.defaultThrottlePeriod()); ManualExecutionContext.Builder ctxBuilder = ManualExecutionContext.builder(watch, true, new ManualTriggerEvent(triggerEvent.jobName(), triggerEvent), executionService.defaultThrottlePeriod());
DateTime executionTime = clock.now(UTC); DateTime executionTime = clock.now(UTC);
ctxBuilder.executionTime(executionTime); ctxBuilder.executionTime(executionTime);

View File

@ -3,7 +3,7 @@
* or more contributor license agreements. Licensed under the Elastic License; * or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License. * you may not use this file except in compliance with the Elastic License.
*/ */
package org.elasticsearch.watcher.execution; package org.elasticsearch.watcher.watch;
import org.elasticsearch.watcher.WatcherException; import org.elasticsearch.watcher.WatcherException;

View File

@ -30,6 +30,7 @@ import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.json.JsonXContent; import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.index.engine.DocumentMissingException;
import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.watcher.WatcherException; import org.elasticsearch.watcher.WatcherException;
@ -144,23 +145,29 @@ public class WatchStore extends AbstractComponent {
*/ */
public void updateStatus(Watch watch) throws IOException { public void updateStatus(Watch watch) throws IOException {
ensureStarted(); ensureStarted();
if (!watch.status().dirty()) {
return;
}
// at the moment we store the status together with the watch, // at the moment we store the status together with the watch,
// so we just need to update the watch itself // so we just need to update the watch itself
// TODO: consider storing the status in a different documment (watch_status doc) (must smaller docs... faster for frequent updates) // TODO: consider storing the status in a different documment (watch_status doc) (must smaller docs... faster for frequent updates)
if (watch.status().dirty()) { XContentBuilder source = JsonXContent.contentBuilder().
XContentBuilder source = JsonXContent.contentBuilder(). startObject()
startObject() .field(Watch.Field.STATUS.getPreferredName(), watch.status(), ToXContent.EMPTY_PARAMS)
.field(Watch.Field.STATUS.getPreferredName(), watch.status(), ToXContent.EMPTY_PARAMS) .endObject();
.endObject(); UpdateRequest updateRequest = new UpdateRequest(INDEX, DOC_TYPE, watch.id());
UpdateRequest updateRequest = new UpdateRequest(INDEX, DOC_TYPE, watch.id()); updateRequest.listenerThreaded(false);
updateRequest.listenerThreaded(false); updateRequest.doc(source);
updateRequest.doc(source); updateRequest.version(watch.version());
updateRequest.version(watch.version()); try {
UpdateResponse response = client.update(updateRequest); UpdateResponse response = client.update(updateRequest);
watch.status().version(response.getVersion()); watch.status().version(response.getVersion());
watch.version(response.getVersion()); watch.version(response.getVersion());
watch.status().resetDirty(); watch.status().resetDirty();
// Don't need to update the watches, since we are working on an instance from it. // Don't need to update the watches, since we are working on an instance from it.
} catch (DocumentMissingException dme) {
throw new WatchMissingException("could not update watch [{}] as it could not be found", watch.id(), dme);
} }
} }

View File

@ -112,6 +112,10 @@
"type" : "object", "type" : "object",
"enabled" : false "enabled" : false
}, },
"status" : {
"type" : "string",
"index" : "not_analyzed"
},
"search": { "search": {
"type": "object", "type": "object",
"dynamic": true, "dynamic": true,

View File

@ -6,6 +6,7 @@
package org.elasticsearch.watcher.actions.email; package org.elasticsearch.watcher.actions.email;
import com.carrotsearch.randomizedtesting.annotations.Repeat; import com.carrotsearch.randomizedtesting.annotations.Repeat;
import com.carrotsearch.randomizedtesting.annotations.Seed;
import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.ImmutableMap; import org.elasticsearch.common.collect.ImmutableMap;
import org.elasticsearch.common.collect.MapBuilder; import org.elasticsearch.common.collect.MapBuilder;
@ -101,6 +102,7 @@ public class EmailActionTests extends ElasticsearchTestCase {
Map<String, Object> expectedModel = ImmutableMap.<String, Object>builder() Map<String, Object> expectedModel = ImmutableMap.<String, Object>builder()
.put("ctx", ImmutableMap.<String, Object>builder() .put("ctx", ImmutableMap.<String, Object>builder()
.put("id", ctx.id().value())
.put("watch_id", "watch1") .put("watch_id", "watch1")
.put("payload", data) .put("payload", data)
.put("metadata", metadata) .put("metadata", metadata)

View File

@ -57,10 +57,15 @@ public class LoggingActionTests extends ElasticsearchTestCase {
public void testExecute() throws Exception { public void testExecute() throws Exception {
final DateTime now = DateTime.now(UTC); final DateTime now = DateTime.now(UTC);
WatchExecutionContext ctx = WatcherTestUtils.mockExecutionContextBuilder("_watch_id")
.time("_watch_id", now)
.buildMock();
final Map<String, Object> expectedModel = ImmutableMap.<String, Object>builder() final Map<String, Object> expectedModel = ImmutableMap.<String, Object>builder()
.put("ctx", ImmutableMap.builder() .put("ctx", ImmutableMap.builder()
.put("execution_time", now) .put("id", ctx.id().value())
.put("watch_id", "_watch_id") .put("watch_id", "_watch_id")
.put("execution_time", now)
.put("payload", ImmutableMap.of()) .put("payload", ImmutableMap.of())
.put("metadata", ImmutableMap.of()) .put("metadata", ImmutableMap.of())
.put("trigger", ImmutableMap.builder() .put("trigger", ImmutableMap.builder()
@ -76,9 +81,7 @@ public class LoggingActionTests extends ElasticsearchTestCase {
ExecutableLoggingAction executable = new ExecutableLoggingAction(action, logger, actionLogger, engine); ExecutableLoggingAction executable = new ExecutableLoggingAction(action, logger, actionLogger, engine);
when(engine.render(template, expectedModel)).thenReturn(text); when(engine.render(template, expectedModel)).thenReturn(text);
WatchExecutionContext ctx = WatcherTestUtils.mockExecutionContextBuilder("_watch_id")
.time("_watch_id", now)
.buildMock();
Action.Result result = executable.execute("_id", ctx, new Payload.Simple()); Action.Result result = executable.execute("_id", ctx, new Payload.Simple());
verifyLogger(actionLogger, level, text); verifyLogger(actionLogger, level, text);

View File

@ -44,6 +44,8 @@ import java.util.Set;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import static org.elasticsearch.common.joda.time.DateTimeZone.UTC; import static org.elasticsearch.common.joda.time.DateTimeZone.UTC;
import static org.elasticsearch.watcher.actions.ActionBuilders.loggingAction;
import static org.elasticsearch.watcher.actions.ActionBuilders.webhookAction;
import static org.elasticsearch.watcher.client.WatchSourceBuilders.watchBuilder; import static org.elasticsearch.watcher.client.WatchSourceBuilders.watchBuilder;
import static org.elasticsearch.watcher.trigger.TriggerBuilders.schedule; import static org.elasticsearch.watcher.trigger.TriggerBuilders.schedule;
import static org.elasticsearch.watcher.trigger.schedule.Schedules.interval; import static org.elasticsearch.watcher.trigger.schedule.Schedules.interval;
@ -73,7 +75,7 @@ public class ActionThrottleTests extends AbstractWatcherIntegrationTests {
ManualExecutionContext ctx = getManualExecutionContext(new TimeValue(0, TimeUnit.SECONDS)); ManualExecutionContext ctx = getManualExecutionContext(new TimeValue(0, TimeUnit.SECONDS));
WatchRecord watchRecord = executionService().execute(ctx); WatchRecord watchRecord = executionService().execute(ctx);
assertThat(watchRecord.execution().actionsResults().get("test_id").action().status(), equalTo(Action.Result.Status.SIMULATED)); assertThat(watchRecord.result().actionsResults().get("test_id").action().status(), equalTo(Action.Result.Status.SIMULATED));
if (timeWarped()) { if (timeWarped()) {
timeWarp().clock().fastForward(TimeValue.timeValueSeconds(1)); timeWarp().clock().fastForward(TimeValue.timeValueSeconds(1));
} }
@ -88,9 +90,9 @@ public class ActionThrottleTests extends AbstractWatcherIntegrationTests {
ctx = getManualExecutionContext(new TimeValue(0, TimeUnit.SECONDS)); ctx = getManualExecutionContext(new TimeValue(0, TimeUnit.SECONDS));
watchRecord = executionService().execute(ctx); watchRecord = executionService().execute(ctx);
if (ack) { if (ack) {
assertThat(watchRecord.execution().actionsResults().get("test_id").action().status(), equalTo(Action.Result.Status.THROTTLED)); assertThat(watchRecord.result().actionsResults().get("test_id").action().status(), equalTo(Action.Result.Status.THROTTLED));
} else { } else {
assertThat(watchRecord.execution().actionsResults().get("test_id").action().status(), equalTo(Action.Result.Status.SIMULATED)); assertThat(watchRecord.result().actionsResults().get("test_id").action().status(), equalTo(Action.Result.Status.SIMULATED));
} }
} }
@ -133,7 +135,7 @@ public class ActionThrottleTests extends AbstractWatcherIntegrationTests {
ctx = getManualExecutionContext(new TimeValue(0, TimeUnit.SECONDS)); ctx = getManualExecutionContext(new TimeValue(0, TimeUnit.SECONDS));
WatchRecord watchRecord = executionService().execute(ctx); WatchRecord watchRecord = executionService().execute(ctx);
for (ActionWrapper.Result result : watchRecord.execution().actionsResults()) { for (ActionWrapper.Result result : watchRecord.result().actionsResults()) {
if (ackingActions.contains(result.id())) { if (ackingActions.contains(result.id())) {
assertThat(result.action().status(), equalTo(Action.Result.Status.THROTTLED)); assertThat(result.action().status(), equalTo(Action.Result.Status.THROTTLED));
} else { } else {
@ -162,12 +164,12 @@ public class ActionThrottleTests extends AbstractWatcherIntegrationTests {
ManualExecutionContext ctx = getManualExecutionContext(new TimeValue(0, TimeUnit.SECONDS)); ManualExecutionContext ctx = getManualExecutionContext(new TimeValue(0, TimeUnit.SECONDS));
WatchRecord watchRecord = executionService().execute(ctx); WatchRecord watchRecord = executionService().execute(ctx);
long firstExecution = System.currentTimeMillis(); long firstExecution = System.currentTimeMillis();
for(ActionWrapper.Result actionResult : watchRecord.execution().actionsResults()) { for(ActionWrapper.Result actionResult : watchRecord.result().actionsResults()) {
assertThat(actionResult.action().status(), equalTo(Action.Result.Status.SIMULATED)); assertThat(actionResult.action().status(), equalTo(Action.Result.Status.SIMULATED));
} }
ctx = getManualExecutionContext(new TimeValue(0, TimeUnit.SECONDS)); ctx = getManualExecutionContext(new TimeValue(0, TimeUnit.SECONDS));
watchRecord = executionService().execute(ctx); watchRecord = executionService().execute(ctx);
for(ActionWrapper.Result actionResult : watchRecord.execution().actionsResults()) { for(ActionWrapper.Result actionResult : watchRecord.result().actionsResults()) {
assertThat(actionResult.action().status(), equalTo(Action.Result.Status.THROTTLED)); assertThat(actionResult.action().status(), equalTo(Action.Result.Status.THROTTLED));
} }
@ -178,18 +180,14 @@ public class ActionThrottleTests extends AbstractWatcherIntegrationTests {
assertBusy(new Runnable() { assertBusy(new Runnable() {
@Override @Override
public void run() { public void run() {
try { ManualExecutionContext ctx = getManualExecutionContext(new TimeValue(0, TimeUnit.SECONDS));
ManualExecutionContext ctx = getManualExecutionContext(new TimeValue(0, TimeUnit.SECONDS)); WatchRecord watchRecord = executionService().execute(ctx);
WatchRecord watchRecord = executionService().execute(ctx); for (ActionWrapper.Result actionResult : watchRecord.result().actionsResults()) {
for (ActionWrapper.Result actionResult : watchRecord.execution().actionsResults()) { if ("ten_sec_throttle".equals(actionResult.id())) {
if ("ten_sec_throttle".equals(actionResult.id())) { assertThat(actionResult.action().status(), equalTo(Action.Result.Status.SIMULATED));
assertThat(actionResult.action().status(), equalTo(Action.Result.Status.SIMULATED)); } else {
} else { assertThat(actionResult.action().status(), equalTo(Action.Result.Status.THROTTLED));
assertThat(actionResult.action().status(), equalTo(Action.Result.Status.THROTTLED));
}
} }
} catch (IOException ioe) {
throw new ElasticsearchException("failed to execute", ioe);
} }
} }
}, 11000 - (System.currentTimeMillis() - firstExecution), TimeUnit.MILLISECONDS); }, 11000 - (System.currentTimeMillis() - firstExecution), TimeUnit.MILLISECONDS);
@ -258,14 +256,13 @@ public class ActionThrottleTests extends AbstractWatcherIntegrationTests {
}, 6, TimeUnit.SECONDS); }, 6, TimeUnit.SECONDS);
} }
@Test @Slow @Test @Slow @Repeat(iterations = 20)
public void testWatchThrottlePeriod() throws Exception { public void testWatchThrottlePeriod() throws Exception {
WatchSourceBuilder watchSourceBuilder = watchBuilder() WatchSourceBuilder watchSourceBuilder = watchBuilder()
.trigger(schedule(interval("60m"))) .trigger(schedule(interval("60m")))
.defaultThrottlePeriod(new TimeValue(1, TimeUnit.SECONDS)); .defaultThrottlePeriod(new TimeValue(1, TimeUnit.SECONDS));
AvailableAction availableAction = randomFrom(AvailableAction.values()); AvailableAction availableAction = randomFrom(AvailableAction.values());
final String actionType = availableAction.type();
watchSourceBuilder.addAction("default_global_throttle", availableAction.action()); watchSourceBuilder.addAction("default_global_throttle", availableAction.action());
PutWatchResponse putWatchResponse = watcherClient().putWatch(new PutWatchRequest("_id", watchSourceBuilder)).actionGet(); PutWatchResponse putWatchResponse = watcherClient().putWatch(new PutWatchRequest("_id", watchSourceBuilder)).actionGet();
@ -324,35 +321,35 @@ public class ActionThrottleTests extends AbstractWatcherIntegrationTests {
@Test @Slow @Test @Slow
public void testFailingActionDoesGetThrottled() throws Exception { public void testFailingActionDoesGetThrottled() throws Exception {
TimeValue throttlePeriod = new TimeValue(60, TimeUnit.MINUTES); TimeValue throttlePeriod = new TimeValue(60, TimeUnit.MINUTES);
WatchSourceBuilder watchSourceBuilder = watchBuilder()
.trigger(new ScheduleTrigger(new IntervalSchedule(new IntervalSchedule.Interval(60, IntervalSchedule.Interval.Unit.MINUTES))))
.defaultThrottlePeriod(throttlePeriod);
watchSourceBuilder.addAction("logging", LoggingAction.builder(new Template.Builder.Inline("test out").build()));
watchSourceBuilder.addAction("failing_hook", WebhookAction.builder(HttpRequestTemplate.builder("unknown.foo", 80).build()));
PutWatchResponse putWatchResponse = watcherClient().putWatch(new PutWatchRequest("_id", watchSourceBuilder)).actionGet(); PutWatchResponse putWatchResponse = watcherClient().preparePutWatch("_id").setSource(watchBuilder()
.trigger(new ScheduleTrigger(new IntervalSchedule(new IntervalSchedule.Interval(60, IntervalSchedule.Interval.Unit.MINUTES))))
.defaultThrottlePeriod(throttlePeriod)
.addAction("logging", loggingAction("test out"))
.addAction("failing_hook", webhookAction(HttpRequestTemplate.builder("unknown.foo", 80))))
.get();
assertThat(putWatchResponse.getVersion(), greaterThan(0L)); assertThat(putWatchResponse.getVersion(), greaterThan(0L));
refresh(); refresh();
ManualTriggerEvent triggerEvent = new ManualTriggerEvent("_id", new ScheduleTriggerEvent(new DateTime(UTC), new DateTime(UTC))); ManualTriggerEvent triggerEvent = new ManualTriggerEvent("_id", new ScheduleTriggerEvent(new DateTime(UTC), new DateTime(UTC)));
ManualExecutionContext.Builder ctxBuilder = ManualExecutionContext.builder(watchService().getWatch("_id"), triggerEvent, throttlePeriod); ManualExecutionContext.Builder ctxBuilder = ManualExecutionContext.builder(watchService().getWatch("_id"), true, triggerEvent, throttlePeriod);
ctxBuilder.recordExecution(true); ctxBuilder.recordExecution(true);
ManualExecutionContext ctx = ctxBuilder.build(); ManualExecutionContext ctx = ctxBuilder.build();
WatchRecord watchRecord = executionService().execute(ctx); WatchRecord watchRecord = executionService().execute(ctx);
assertThat(watchRecord.execution().actionsResults().get("logging").action().status(), equalTo(Action.Result.Status.SUCCESS)); assertThat(watchRecord.result().actionsResults().get("logging").action().status(), equalTo(Action.Result.Status.SUCCESS));
assertThat(watchRecord.execution().actionsResults().get("failing_hook").action().status(), equalTo(Action.Result.Status.FAILURE)); assertThat(watchRecord.result().actionsResults().get("failing_hook").action().status(), equalTo(Action.Result.Status.FAILURE));
assertThat(watchRecord.state(), equalTo(ExecutionState.EXECUTED)); assertThat(watchRecord.state(), equalTo(ExecutionState.EXECUTED));
triggerEvent = new ManualTriggerEvent("_id", new ScheduleTriggerEvent(new DateTime(UTC), new DateTime(UTC))); triggerEvent = new ManualTriggerEvent("_id", new ScheduleTriggerEvent(new DateTime(UTC), new DateTime(UTC)));
ctxBuilder = ManualExecutionContext.builder(watchService().getWatch("_id"), triggerEvent, throttlePeriod); ctxBuilder = ManualExecutionContext.builder(watchService().getWatch("_id"), true, triggerEvent, throttlePeriod);
ctxBuilder.recordExecution(true); ctxBuilder.recordExecution(true);
ctx = ctxBuilder.build(); ctx = ctxBuilder.build();
watchRecord = executionService().execute(ctx); watchRecord = executionService().execute(ctx);
assertThat(watchRecord.execution().actionsResults().get("logging").action().status(), equalTo(Action.Result.Status.THROTTLED)); assertThat(watchRecord.result().actionsResults().get("logging").action().status(), equalTo(Action.Result.Status.THROTTLED));
assertThat(watchRecord.execution().actionsResults().get("failing_hook").action().status(), equalTo(Action.Result.Status.FAILURE)); assertThat(watchRecord.result().actionsResults().get("failing_hook").action().status(), equalTo(Action.Result.Status.FAILURE));
assertThat(watchRecord.state(), equalTo(ExecutionState.THROTTLED)); assertThat(watchRecord.state(), equalTo(ExecutionState.THROTTLED));
} }
@ -363,7 +360,7 @@ public class ActionThrottleTests extends AbstractWatcherIntegrationTests {
private ManualExecutionContext getManualExecutionContext(TimeValue throttlePeriod) { private ManualExecutionContext getManualExecutionContext(TimeValue throttlePeriod) {
ManualTriggerEvent triggerEvent = new ManualTriggerEvent("_id", new ScheduleTriggerEvent(new DateTime(UTC), new DateTime(UTC))); ManualTriggerEvent triggerEvent = new ManualTriggerEvent("_id", new ScheduleTriggerEvent(new DateTime(UTC), new DateTime(UTC)));
return ManualExecutionContext.builder(watchService().getWatch("_id"), triggerEvent, throttlePeriod) return ManualExecutionContext.builder(watchService().getWatch("_id"), true, triggerEvent, throttlePeriod)
.executionTime(timeWarped() ? timeWarp().clock().nowUTC() : SystemClock.INSTANCE.nowUTC()) .executionTime(timeWarped() ? timeWarp().clock().nowUTC() : SystemClock.INSTANCE.nowUTC())
.allActionsMode(ActionExecutionMode.SIMULATE) .allActionsMode(ActionExecutionMode.SIMULATE)
.recordExecution(true) .recordExecution(true)

View File

@ -5,11 +5,15 @@
*/ */
package org.elasticsearch.watcher.execution; package org.elasticsearch.watcher.execution;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.common.collect.ImmutableList;
import org.elasticsearch.common.collect.ImmutableMap; import org.elasticsearch.common.collect.ImmutableMap;
import org.elasticsearch.common.joda.time.DateTime; import org.elasticsearch.common.joda.time.DateTime;
import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.test.ElasticsearchTestCase; import org.elasticsearch.test.ElasticsearchTestCase;
import org.elasticsearch.watcher.actions.*; import org.elasticsearch.watcher.actions.*;
import org.elasticsearch.watcher.actions.throttler.ActionThrottler;
import org.elasticsearch.watcher.actions.throttler.Throttler;
import org.elasticsearch.watcher.condition.Condition; import org.elasticsearch.watcher.condition.Condition;
import org.elasticsearch.watcher.condition.ExecutableCondition; import org.elasticsearch.watcher.condition.ExecutableCondition;
import org.elasticsearch.watcher.condition.always.AlwaysCondition; import org.elasticsearch.watcher.condition.always.AlwaysCondition;
@ -20,8 +24,6 @@ import org.elasticsearch.watcher.input.ExecutableInput;
import org.elasticsearch.watcher.input.Input; import org.elasticsearch.watcher.input.Input;
import org.elasticsearch.watcher.support.clock.Clock; import org.elasticsearch.watcher.support.clock.Clock;
import org.elasticsearch.watcher.support.clock.ClockMock; import org.elasticsearch.watcher.support.clock.ClockMock;
import org.elasticsearch.watcher.actions.throttler.ActionThrottler;
import org.elasticsearch.watcher.actions.throttler.Throttler;
import org.elasticsearch.watcher.support.validation.WatcherSettingsValidation; import org.elasticsearch.watcher.support.validation.WatcherSettingsValidation;
import org.elasticsearch.watcher.transform.ExecutableTransform; import org.elasticsearch.watcher.transform.ExecutableTransform;
import org.elasticsearch.watcher.transform.Transform; import org.elasticsearch.watcher.transform.Transform;
@ -31,6 +33,7 @@ import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import java.util.Arrays; import java.util.Arrays;
import java.util.concurrent.ArrayBlockingQueue;
import static org.elasticsearch.common.joda.time.DateTimeZone.UTC; import static org.elasticsearch.common.joda.time.DateTimeZone.UTC;
import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds; import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds;
@ -46,28 +49,182 @@ public class ExecutionServiceTests extends ElasticsearchTestCase {
private ExecutableInput input; private ExecutableInput input;
private Input.Result inputResult; private Input.Result inputResult;
private WatchStore watchStore;
private TriggeredWatchStore triggeredWatchStore;
private HistoryStore historyStore;
private WatchLockService watchLockService;
private ExecutionService executionService; private ExecutionService executionService;
private Clock clock;
@Before @Before
public void init() throws Exception { public void init() throws Exception {
payload = mock(Payload.class); payload = mock(Payload.class);
input = mock(ExecutableInput.class); input = mock(ExecutableInput.class);
inputResult = mock(Input.Result.class); inputResult = mock(Input.Result.class);
when(inputResult.status()).thenReturn(Input.Result.Status.SUCCESS);
when(inputResult.payload()).thenReturn(payload); when(inputResult.payload()).thenReturn(payload);
when(input.execute(any(WatchExecutionContext.class))).thenReturn(inputResult); when(input.execute(any(WatchExecutionContext.class))).thenReturn(inputResult);
HistoryStore historyStore = mock(HistoryStore.class); watchStore = mock(WatchStore.class);
TriggeredWatchStore triggeredWatchStore = mock(TriggeredWatchStore.class); triggeredWatchStore = mock(TriggeredWatchStore.class);
historyStore = mock(HistoryStore.class);
WatchExecutor executor = mock(WatchExecutor.class); WatchExecutor executor = mock(WatchExecutor.class);
WatchStore watchStore = mock(WatchStore.class); when(executor.queue()).thenReturn(new ArrayBlockingQueue<Runnable>(1));
WatchLockService watchLockService = mock(WatchLockService.class);
watchLockService = mock(WatchLockService.class);
WatcherSettingsValidation settingsValidator = mock(WatcherSettingsValidation.class); WatcherSettingsValidation settingsValidator = mock(WatcherSettingsValidation.class);
Clock clock = new ClockMock(); clock = new ClockMock();
executionService = new ExecutionService(ImmutableSettings.EMPTY, historyStore, triggeredWatchStore, executor, watchStore, watchLockService, clock, settingsValidator); executionService = new ExecutionService(ImmutableSettings.EMPTY, historyStore, triggeredWatchStore, executor, watchStore, watchLockService, clock, settingsValidator);
ClusterState clusterState = mock(ClusterState.class);
when(triggeredWatchStore.loadTriggeredWatches(clusterState)).thenReturn(ImmutableList.<TriggeredWatch>of());
executionService.start(clusterState);
} }
@Test @Test
public void testExecute() throws Exception { public void testExecute() throws Exception {
WatchLockService.Lock lock = mock(WatchLockService.Lock.class);
when(watchLockService.acquire("_id")).thenReturn(lock);
Watch watch = mock(Watch.class);
when(watch.id()).thenReturn("_id");
when(watchStore.get("_id")).thenReturn(watch);
ScheduleTriggerEvent event = new ScheduleTriggerEvent("_id", clock.nowUTC(), clock.nowUTC());
WatchExecutionContext context = new TriggeredExecutionContext(watch, clock.nowUTC(), event, timeValueSeconds(5));
Condition.Result conditionResult = AlwaysCondition.Result.INSTANCE;
ExecutableCondition condition = mock(ExecutableCondition.class);
when(condition.execute(any(WatchExecutionContext.class))).thenReturn(conditionResult);
// watch level transform
Transform.Result watchTransformResult = mock(Transform.Result.class);
when(watchTransformResult.payload()).thenReturn(payload);
ExecutableTransform watchTransform = mock(ExecutableTransform.class);
when(watchTransform.execute(context, payload)).thenReturn(watchTransformResult);
// action throttler
Throttler.Result throttleResult = mock(Throttler.Result.class);
when(throttleResult.throttle()).thenReturn(false);
ActionThrottler throttler = mock(ActionThrottler.class);
when(throttler.throttle("_action", context)).thenReturn(throttleResult);
// action level transform
Transform.Result actionTransformResult = mock(Transform.Result.class);
when(actionTransformResult.payload()).thenReturn(payload);
ExecutableTransform actionTransform = mock(ExecutableTransform.class);
when(actionTransform.execute(context, payload)).thenReturn(actionTransformResult);
// the action
Action.Result actionResult = mock(Action.Result.class);
when(actionResult.type()).thenReturn("_action_type");
when(actionResult.status()).thenReturn(Action.Result.Status.SUCCESS);
ExecutableAction action = mock(ExecutableAction.class);
when(action.execute("_action", context, payload)).thenReturn(actionResult);
ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionTransform, action);
ExecutableActions actions = new ExecutableActions(Arrays.asList(actionWrapper));
WatchStatus watchStatus = new WatchStatus(ImmutableMap.of("_action", new ActionStatus(clock.nowUTC())));
when(watch.input()).thenReturn(input);
when(watch.condition()).thenReturn(condition);
when(watch.transform()).thenReturn(watchTransform);
when(watch.actions()).thenReturn(actions);
when(watch.status()).thenReturn(watchStatus);
WatchRecord watchRecord = executionService.execute(context);
assertThat(watchRecord.result().conditionResult(), sameInstance(conditionResult));
assertThat(watchRecord.result().transformResult(), sameInstance(watchTransformResult));
ActionWrapper.Result result = watchRecord.result().actionsResults().get("_action");
assertThat(result, notNullValue());
assertThat(result.id(), is("_action"));
assertThat(result.transform(), sameInstance(actionTransformResult));
assertThat(result.action(), sameInstance(actionResult));
verify(historyStore, times(1)).put(watchRecord);
verify(lock, times(1)).release();
verify(condition, times(1)).execute(context);
verify(watchTransform, times(1)).execute(context, payload);
verify(action, times(1)).execute("_action", context, payload);
}
@Test
public void testExecute_FailedInput() throws Exception {
WatchLockService.Lock lock = mock(WatchLockService.Lock.class);
when(watchLockService.acquire("_id")).thenReturn(lock);
Watch watch = mock(Watch.class);
when(watch.id()).thenReturn("_id");
when(watchStore.get("_id")).thenReturn(watch);
ScheduleTriggerEvent event = new ScheduleTriggerEvent("_id", clock.nowUTC(), clock.nowUTC());
WatchExecutionContext context = new TriggeredExecutionContext(watch, clock.nowUTC(), event, timeValueSeconds(5));
input = mock(ExecutableInput.class);
Input.Result inputResult = mock(Input.Result.class);
when(inputResult.status()).thenReturn(Input.Result.Status.FAILURE);
when(inputResult.reason()).thenReturn("_reason");
when(input.execute(context)).thenReturn(inputResult);
Condition.Result conditionResult = AlwaysCondition.Result.INSTANCE;
ExecutableCondition condition = mock(ExecutableCondition.class);
when(condition.execute(any(WatchExecutionContext.class))).thenReturn(conditionResult);
// watch level transform
Transform.Result watchTransformResult = mock(Transform.Result.class);
when(watchTransformResult.payload()).thenReturn(payload);
ExecutableTransform watchTransform = mock(ExecutableTransform.class);
when(watchTransform.execute(context, payload)).thenReturn(watchTransformResult);
// action throttler
Throttler.Result throttleResult = mock(Throttler.Result.class);
when(throttleResult.throttle()).thenReturn(false);
ActionThrottler throttler = mock(ActionThrottler.class);
when(throttler.throttle("_action", context)).thenReturn(throttleResult);
// action level transform
Transform.Result actionTransformResult = mock(Transform.Result.class);
when(actionTransformResult.payload()).thenReturn(payload);
ExecutableTransform actionTransform = mock(ExecutableTransform.class);
when(actionTransform.execute(context, payload)).thenReturn(actionTransformResult);
// the action
Action.Result actionResult = mock(Action.Result.class);
when(actionResult.type()).thenReturn("_action_type");
when(actionResult.status()).thenReturn(Action.Result.Status.SUCCESS);
ExecutableAction action = mock(ExecutableAction.class);
when(action.execute("_action", context, payload)).thenReturn(actionResult);
ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionTransform, action);
ExecutableActions actions = new ExecutableActions(Arrays.asList(actionWrapper));
WatchStatus watchStatus = new WatchStatus(ImmutableMap.of("_action", new ActionStatus(clock.nowUTC())));
when(watch.input()).thenReturn(input);
when(watch.condition()).thenReturn(condition);
when(watch.transform()).thenReturn(watchTransform);
when(watch.actions()).thenReturn(actions);
when(watch.status()).thenReturn(watchStatus);
WatchRecord watchRecord = executionService.execute(context);
assertThat(watchRecord.result().inputResult(), is(inputResult));
assertThat(watchRecord.result().conditionResult(), nullValue());
assertThat(watchRecord.result().transformResult(), nullValue());
assertThat(watchRecord.result().actionsResults(), notNullValue());
assertThat(watchRecord.result().actionsResults().count(), is(0));
verify(historyStore, times(1)).put(watchRecord);
verify(lock, times(1)).release();
verify(input, times(1)).execute(context);
verify(condition, never()).execute(context);
verify(watchTransform, never()).execute(context, payload);
verify(action, never()).execute("_action", context, payload);
}
@Test
public void testExecuteInner() throws Exception {
DateTime now = DateTime.now(UTC); DateTime now = DateTime.now(UTC);
Watch watch = mock(Watch.class); Watch watch = mock(Watch.class);
ScheduleTriggerEvent event = new ScheduleTriggerEvent("_id", now, now); ScheduleTriggerEvent event = new ScheduleTriggerEvent("_id", now, now);
@ -102,7 +259,6 @@ public class ExecutionServiceTests extends ElasticsearchTestCase {
ExecutableAction action = mock(ExecutableAction.class); ExecutableAction action = mock(ExecutableAction.class);
when(action.execute("_action", context, payload)).thenReturn(actionResult); when(action.execute("_action", context, payload)).thenReturn(actionResult);
ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionTransform, action); ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionTransform, action);
ExecutableActions actions = new ExecutableActions(Arrays.asList(actionWrapper)); ExecutableActions actions = new ExecutableActions(Arrays.asList(actionWrapper));
@ -115,9 +271,9 @@ public class ExecutionServiceTests extends ElasticsearchTestCase {
when(watch.status()).thenReturn(watchStatus); when(watch.status()).thenReturn(watchStatus);
WatchRecord watchRecord = executionService.executeInner(context); WatchRecord watchRecord = executionService.executeInner(context);
assertThat(watchRecord.execution().conditionResult(), sameInstance(conditionResult)); assertThat(watchRecord.result().conditionResult(), sameInstance(conditionResult));
assertThat(watchRecord.execution().transformResult(), sameInstance(watchTransformResult)); assertThat(watchRecord.result().transformResult(), sameInstance(watchTransformResult));
ActionWrapper.Result result = watchRecord.execution().actionsResults().get("_action"); ActionWrapper.Result result = watchRecord.result().actionsResults().get("_action");
assertThat(result, notNullValue()); assertThat(result, notNullValue());
assertThat(result.id(), is("_action")); assertThat(result.id(), is("_action"));
assertThat(result.transform(), sameInstance(actionTransformResult)); assertThat(result.transform(), sameInstance(actionTransformResult));
@ -129,7 +285,7 @@ public class ExecutionServiceTests extends ElasticsearchTestCase {
} }
@Test @Test
public void testExecute_throttled() throws Exception { public void testExecuteInner_throttled() throws Exception {
DateTime now = DateTime.now(UTC); DateTime now = DateTime.now(UTC);
Watch watch = mock(Watch.class); Watch watch = mock(Watch.class);
ScheduleTriggerEvent event = new ScheduleTriggerEvent("_id", now, now); ScheduleTriggerEvent event = new ScheduleTriggerEvent("_id", now, now);
@ -159,12 +315,12 @@ public class ExecutionServiceTests extends ElasticsearchTestCase {
when(watch.actions()).thenReturn(actions); when(watch.actions()).thenReturn(actions);
when(watch.status()).thenReturn(watchStatus); when(watch.status()).thenReturn(watchStatus);
WatchRecord executionResult = executionService.executeInner(context); WatchRecord watchRecord = executionService.executeInner(context);
assertThat(executionResult.execution().inputResult(), sameInstance(inputResult)); assertThat(watchRecord.result().inputResult(), sameInstance(inputResult));
assertThat(executionResult.execution().conditionResult(), sameInstance(conditionResult)); assertThat(watchRecord.result().conditionResult(), sameInstance(conditionResult));
assertThat(executionResult.execution().transformResult(), nullValue()); assertThat(watchRecord.result().transformResult(), nullValue());
assertThat(executionResult.execution().actionsResults().count(), is(1)); assertThat(watchRecord.result().actionsResults().count(), is(1));
ActionWrapper.Result result = executionResult.execution().actionsResults().get("_action"); ActionWrapper.Result result = watchRecord.result().actionsResults().get("_action");
assertThat(result, notNullValue()); assertThat(result, notNullValue());
assertThat(result.id(), is("_action")); assertThat(result.id(), is("_action"));
assertThat(result.transform(), nullValue()); assertThat(result.transform(), nullValue());
@ -178,7 +334,7 @@ public class ExecutionServiceTests extends ElasticsearchTestCase {
} }
@Test @Test
public void testExecute_conditionNotMet() throws Exception { public void testExecuteInner_conditionNotMet() throws Exception {
DateTime now = DateTime.now(UTC); DateTime now = DateTime.now(UTC);
Watch watch = mock(Watch.class); Watch watch = mock(Watch.class);
ScheduleTriggerEvent event = new ScheduleTriggerEvent("_id", now, now); ScheduleTriggerEvent event = new ScheduleTriggerEvent("_id", now, now);
@ -206,11 +362,11 @@ public class ExecutionServiceTests extends ElasticsearchTestCase {
when(watch.actions()).thenReturn(actions); when(watch.actions()).thenReturn(actions);
when(watch.status()).thenReturn(watchStatus); when(watch.status()).thenReturn(watchStatus);
WatchRecord executionResult = executionService.executeInner(context); WatchRecord watchRecord = executionService.executeInner(context);
assertThat(executionResult.execution().inputResult(), sameInstance(inputResult)); assertThat(watchRecord.result().inputResult(), sameInstance(inputResult));
assertThat(executionResult.execution().conditionResult(), sameInstance(conditionResult)); assertThat(watchRecord.result().conditionResult(), sameInstance(conditionResult));
assertThat(executionResult.execution().transformResult(), nullValue()); assertThat(watchRecord.result().transformResult(), nullValue());
assertThat(executionResult.execution().actionsResults().count(), is(0)); assertThat(watchRecord.result().actionsResults().count(), is(0));
verify(condition, times(1)).execute(context); verify(condition, times(1)).execute(context);
verify(watchTransform, never()).execute(context, payload); verify(watchTransform, never()).execute(context, payload);

View File

@ -5,11 +5,11 @@
*/ */
package org.elasticsearch.watcher.execution; package org.elasticsearch.watcher.execution;
import com.carrotsearch.randomizedtesting.annotations.Repeat;
import org.apache.lucene.util.LuceneTestCase.Slow; import org.apache.lucene.util.LuceneTestCase.Slow;
import org.elasticsearch.common.joda.time.DateTime; import org.elasticsearch.common.joda.time.DateTime;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.engine.DocumentMissingException;
import org.elasticsearch.watcher.WatcherException; import org.elasticsearch.watcher.WatcherException;
import org.elasticsearch.watcher.WatcherService; import org.elasticsearch.watcher.WatcherService;
import org.elasticsearch.watcher.actions.ActionStatus; import org.elasticsearch.watcher.actions.ActionStatus;
@ -60,7 +60,7 @@ public class ManualExecutionTests extends AbstractWatcherIntegrationTests {
return false; return false;
} }
@Test //@Repeat(iterations = 10) @Test @Repeat(iterations = 10)
public void testExecuteWatch() throws Exception { public void testExecuteWatch() throws Exception {
ensureWatcherStarted(); ensureWatcherStarted();
boolean ignoreCondition = randomBoolean(); boolean ignoreCondition = randomBoolean();
@ -83,10 +83,10 @@ public class ManualExecutionTests extends AbstractWatcherIntegrationTests {
refresh(); refresh();
assertThat(watcherClient().getWatch(new GetWatchRequest("_id")).actionGet().isFound(), equalTo(true)); assertThat(watcherClient().getWatch(new GetWatchRequest("_id")).actionGet().isFound(), equalTo(true));
//If we are persisting the state we need to use the exact watch that is in memory //If we are persisting the state we need to use the exact watch that is in memory
ctxBuilder = ManualExecutionContext.builder(watchService().getWatch("_id"), triggerEvent, timeValueSeconds(5)); ctxBuilder = ManualExecutionContext.builder(watchService().getWatch("_id"), true, triggerEvent, timeValueSeconds(5));
} else { } else {
parsedWatch = watchParser().parse("_id", false, watchBuilder.buildAsBytes(XContentType.JSON)); parsedWatch = watchParser().parse("_id", false, watchBuilder.buildAsBytes(XContentType.JSON));
ctxBuilder = ManualExecutionContext.builder(parsedWatch, triggerEvent, timeValueSeconds(5)); ctxBuilder = ManualExecutionContext.builder(parsedWatch, false, triggerEvent, timeValueSeconds(5));
} }
if (ignoreCondition) { if (ignoreCondition) {
@ -117,18 +117,18 @@ public class ManualExecutionTests extends AbstractWatcherIntegrationTests {
assertThat("the expected count of history records should be [" + expectedCount + "]", newRecordCount, equalTo(expectedCount)); assertThat("the expected count of history records should be [" + expectedCount + "]", newRecordCount, equalTo(expectedCount));
if (ignoreCondition) { if (ignoreCondition) {
assertThat("The action should have run", watchRecord.execution().actionsResults().count(), equalTo(1)); assertThat("The action should have run", watchRecord.result().actionsResults().count(), equalTo(1));
} else if (!conditionAlwaysTrue) { } else if (!conditionAlwaysTrue) {
assertThat("The action should not have run", watchRecord.execution().actionsResults().count(), equalTo(0)); assertThat("The action should not have run", watchRecord.result().actionsResults().count(), equalTo(0));
} }
if ((ignoreCondition || conditionAlwaysTrue) && action == null) { if ((ignoreCondition || conditionAlwaysTrue) && action == null) {
assertThat("The action should have run non simulated", watchRecord.execution().actionsResults().get("log").action(), assertThat("The action should have run non simulated", watchRecord.result().actionsResults().get("log").action(),
not(instanceOf(LoggingAction.Result.Simulated.class)) ); not(instanceOf(LoggingAction.Result.Simulated.class)) );
} }
if ((ignoreCondition || conditionAlwaysTrue) && action != null ) { if ((ignoreCondition || conditionAlwaysTrue) && action != null ) {
assertThat("The action should have run simulated", watchRecord.execution().actionsResults().get("log").action(), instanceOf(LoggingAction.Result.Simulated.class)); assertThat("The action should have run simulated", watchRecord.result().actionsResults().get("log").action(), instanceOf(LoggingAction.Result.Simulated.class));
} }
Watch testWatch = watchService().getWatch("_id"); Watch testWatch = watchService().getWatch("_id");
@ -165,7 +165,7 @@ public class ManualExecutionTests extends AbstractWatcherIntegrationTests {
map2.put("foo", map1); map2.put("foo", map1);
ManualTriggerEvent triggerEvent = new ManualTriggerEvent("_id", new ScheduleTriggerEvent(new DateTime(UTC), new DateTime(UTC))); ManualTriggerEvent triggerEvent = new ManualTriggerEvent("_id", new ScheduleTriggerEvent(new DateTime(UTC), new DateTime(UTC)));
ManualExecutionContext.Builder ctxBuilder1 = ManualExecutionContext.builder(watchService().getWatch("_id"), triggerEvent, timeValueSeconds(5)); ManualExecutionContext.Builder ctxBuilder1 = ManualExecutionContext.builder(watchService().getWatch("_id"), true, triggerEvent, timeValueSeconds(5));
ctxBuilder1.actionMode("_all", ActionExecutionMode.SIMULATE); ctxBuilder1.actionMode("_all", ActionExecutionMode.SIMULATE);
ctxBuilder1.withInput(new SimpleInput.Result(new Payload.Simple(map1))); ctxBuilder1.withInput(new SimpleInput.Result(new Payload.Simple(map1)));
@ -173,7 +173,7 @@ public class ManualExecutionTests extends AbstractWatcherIntegrationTests {
WatchRecord watchRecord1 = executionService().execute(ctxBuilder1.build()); WatchRecord watchRecord1 = executionService().execute(ctxBuilder1.build());
ManualExecutionContext.Builder ctxBuilder2 = ManualExecutionContext.builder(watchService().getWatch("_id"), triggerEvent, timeValueSeconds(5)); ManualExecutionContext.Builder ctxBuilder2 = ManualExecutionContext.builder(watchService().getWatch("_id"), true, triggerEvent, timeValueSeconds(5));
ctxBuilder2.actionMode("_all", ActionExecutionMode.SIMULATE); ctxBuilder2.actionMode("_all", ActionExecutionMode.SIMULATE);
ctxBuilder2.withInput(new SimpleInput.Result(new Payload.Simple(map2))); ctxBuilder2.withInput(new SimpleInput.Result(new Payload.Simple(map2)));
@ -181,8 +181,8 @@ public class ManualExecutionTests extends AbstractWatcherIntegrationTests {
WatchRecord watchRecord2 = executionService().execute(ctxBuilder2.build()); WatchRecord watchRecord2 = executionService().execute(ctxBuilder2.build());
assertThat(watchRecord1.execution().inputResult().payload().data().get("foo").toString(), equalTo("bar")); assertThat(watchRecord1.result().inputResult().payload().data().get("foo").toString(), equalTo("bar"));
assertThat(watchRecord2.execution().inputResult().payload().data().get("foo"), instanceOf(Map.class)); assertThat(watchRecord2.result().inputResult().payload().data().get("foo"), instanceOf(Map.class));
} }
@Test @Test
@ -245,13 +245,12 @@ public class ManualExecutionTests extends AbstractWatcherIntegrationTests {
.addAction("log", loggingAction("foobar")); .addAction("log", loggingAction("foobar"));
Watch watch = watchParser().parse("_id", false, watchBuilder.buildAsBytes(XContentType.JSON)); Watch watch = watchParser().parse("_id", false, watchBuilder.buildAsBytes(XContentType.JSON));
ManualExecutionContext.Builder ctxBuilder = ManualExecutionContext.builder(watch, new ManualTriggerEvent("_id", new ScheduleTriggerEvent(new DateTime(UTC), new DateTime(UTC))), new TimeValue(1, TimeUnit.HOURS)); ManualExecutionContext.Builder ctxBuilder = ManualExecutionContext.builder(watch, false, new ManualTriggerEvent("_id", new ScheduleTriggerEvent(new DateTime(UTC), new DateTime(UTC))), new TimeValue(1, TimeUnit.HOURS));
WatchRecord record = executionService().execute(ctxBuilder.build()); WatchRecord record = executionService().execute(ctxBuilder.build());
assertThat(record.execution().executionDurationMs(), greaterThanOrEqualTo(100L)); assertThat(record.result().executionDurationMs(), greaterThanOrEqualTo(100L));
} }
@Test @Test @Slow
@Slow
public void testForceDeletionOfLongRunningWatch() throws Exception { public void testForceDeletionOfLongRunningWatch() throws Exception {
WatchSourceBuilder watchBuilder = watchBuilder() WatchSourceBuilder watchBuilder = watchBuilder()
.trigger(schedule(cron("0 0 0 1 * ? 2099"))) .trigger(schedule(cron("0 0 0 1 * ? 2099")))
@ -266,7 +265,6 @@ public class ManualExecutionTests extends AbstractWatcherIntegrationTests {
refresh(); refresh();
assertThat(watcherClient().getWatch(new GetWatchRequest("_id")).actionGet().isFound(), equalTo(true)); assertThat(watcherClient().getWatch(new GetWatchRequest("_id")).actionGet().isFound(), equalTo(true));
CountDownLatch startLatch = new CountDownLatch(1); CountDownLatch startLatch = new CountDownLatch(1);
List<Thread> threads = new ArrayList<>(); List<Thread> threads = new ArrayList<>();
@ -308,7 +306,7 @@ public class ManualExecutionTests extends AbstractWatcherIntegrationTests {
this.watchId = watchId; this.watchId = watchId;
this.startLatch = startLatch; this.startLatch = startLatch;
ManualTriggerEvent triggerEvent = new ManualTriggerEvent(watchId, new ScheduleTriggerEvent(new DateTime(UTC), new DateTime(UTC))); ManualTriggerEvent triggerEvent = new ManualTriggerEvent(watchId, new ScheduleTriggerEvent(new DateTime(UTC), new DateTime(UTC)));
ctxBuilder = ManualExecutionContext.builder(watcherService.getWatch(watchId), triggerEvent, timeValueSeconds(5)); ctxBuilder = ManualExecutionContext.builder(watcherService.getWatch(watchId), true, triggerEvent, timeValueSeconds(5));
ctxBuilder.recordExecution(true); ctxBuilder.recordExecution(true);
ctxBuilder.actionMode("_all", ActionExecutionMode.FORCE_EXECUTE); ctxBuilder.actionMode("_all", ActionExecutionMode.FORCE_EXECUTE);
} }
@ -317,10 +315,9 @@ public class ManualExecutionTests extends AbstractWatcherIntegrationTests {
public void run() { public void run() {
try { try {
startLatch.await(); startLatch.await();
executionService.execute(ctxBuilder.build()); WatchRecord record = executionService.execute(ctxBuilder.build());
fail("Execution of a deleted watch should fail but didn't"); assertThat(record, notNullValue());
} catch (WatchMissingException we) { assertThat(record.state(), is(ExecutionState.NOT_EXECUTED_WATCH_MISSING));
assertThat(we.getCause(), instanceOf(DocumentMissingException.class));
} catch (Throwable t) { } catch (Throwable t) {
throw new WatcherException("Failure mode execution of [{}] failed in an unexpected way", t, watchId); throw new WatcherException("Failure mode execution of [{}] failed in an unexpected way", t, watchId);
} }

View File

@ -107,10 +107,12 @@ public class SearchInputTests extends ElasticsearchIntegrationTest {
@Test @Test
public void testSearch_InlineTemplate() throws Exception { public void testSearch_InlineTemplate() throws Exception {
WatchExecutionContext ctx = createContext();
final String expectedQuery = "{\"template\":{\"query\":{\"filtered\":{\"query\":{\"match\":{\"event_type\":{\"query\":\"a\"," + final String expectedQuery = "{\"template\":{\"query\":{\"filtered\":{\"query\":{\"match\":{\"event_type\":{\"query\":\"a\"," +
"\"type\":\"boolean\"}}},\"filter\":{\"range\":{\"_timestamp\":" + "\"type\":\"boolean\"}}},\"filter\":{\"range\":{\"_timestamp\":" +
"{\"from\":\"{{ctx.trigger.scheduled_time}}||-{{seconds_param}}\",\"to\":\"{{ctx.trigger.scheduled_time}}\"," + "{\"from\":\"{{ctx.trigger.scheduled_time}}||-{{seconds_param}}\",\"to\":\"{{ctx.trigger.scheduled_time}}\"," +
"\"include_lower\":true,\"include_upper\":true}}}}}},\"params\":{\"seconds_param\":\"30s\",\"ctx\":{\"metadata\":null,\"watch_id\":\"test-watch\",\"trigger\":{\"triggered_time\":\"1970-01-01T00:01:00.000Z\",\"scheduled_time\":\"1970-01-01T00:01:00.000Z\"},\"execution_time\":\"1970-01-01T00:01:00.000Z\"}}}"; "\"include_lower\":true,\"include_upper\":true}}}}}},\"params\":{\"seconds_param\":\"30s\",\"ctx\":{\"id\":\"" + ctx.id().value() + "\",\"metadata\":null,\"watch_id\":\"test-watch\",\"trigger\":{\"triggered_time\":\"1970-01-01T00:01:00.000Z\",\"scheduled_time\":\"1970-01-01T00:01:00.000Z\"},\"execution_time\":\"1970-01-01T00:01:00.000Z\"}}}";
Map<String, Object> params = new HashMap<>(); Map<String, Object> params = new HashMap<>();
params.put("seconds_param", "30s"); params.put("seconds_param", "30s");
@ -125,13 +127,14 @@ public class SearchInputTests extends ElasticsearchIntegrationTest {
.setTemplateSource(templateSource) .setTemplateSource(templateSource)
.request(); .request();
SearchInput.Result executedResult = executeSearchInput(request, ctx);
SearchInput.Result executedResult = executeSearchInput(request);
assertThat(areJsonEquivalent(executedResult.executedRequest().templateSource().toUtf8(), expectedQuery), is(true)); assertThat(areJsonEquivalent(executedResult.executedRequest().templateSource().toUtf8(), expectedQuery), is(true));
} }
@Test @Test
public void testSearch_IndexedTemplate() throws Exception { public void testSearch_IndexedTemplate() throws Exception {
WatchExecutionContext ctx = createContext();
PutIndexedScriptRequest indexedScriptRequest = client().preparePutIndexedScript("mustache","test-template", TEMPLATE_QUERY).request(); PutIndexedScriptRequest indexedScriptRequest = client().preparePutIndexedScript("mustache","test-template", TEMPLATE_QUERY).request();
assertThat(client().putIndexedScript(indexedScriptRequest).actionGet().isCreated(), is(true)); assertThat(client().putIndexedScript(indexedScriptRequest).actionGet().isCreated(), is(true));
@ -148,12 +151,14 @@ public class SearchInputTests extends ElasticsearchIntegrationTest {
.setTemplateSource(templateSource) .setTemplateSource(templateSource)
.request(); .request();
SearchInput.Result executedResult = executeSearchInput(request); SearchInput.Result executedResult = executeSearchInput(request, ctx);
assertThat(executedResult.executedRequest().templateSource().toUtf8(), startsWith("{\"template\":{\"id\":\"test-template\"")); assertThat(executedResult.executedRequest().templateSource().toUtf8(), startsWith("{\"template\":{\"id\":\"test-template\""));
} }
@Test @Test
public void testSearch_OndiskTemplate() throws Exception { public void testSearch_OndiskTemplate() throws Exception {
WatchExecutionContext ctx = createContext();
Map<String, Object> params = new HashMap<>(); Map<String, Object> params = new HashMap<>();
params.put("seconds_param", "30s"); params.put("seconds_param", "30s");
@ -167,7 +172,7 @@ public class SearchInputTests extends ElasticsearchIntegrationTest {
.setTemplateSource(templateSource) .setTemplateSource(templateSource)
.request(); .request();
SearchInput.Result executedResult = executeSearchInput(request); SearchInput.Result executedResult = executeSearchInput(request, ctx);
assertThat(executedResult.executedRequest().templateSource().toUtf8(), startsWith("{\"template\":{\"file\":\"test_disk_template\"")); assertThat(executedResult.executedRequest().templateSource().toUtf8(), startsWith("{\"template\":{\"file\":\"test_disk_template\""));
} }
@ -243,15 +248,8 @@ public class SearchInputTests extends ElasticsearchIntegrationTest {
fail("expected a SearchInputException as search type SCAN should not be supported"); fail("expected a SearchInputException as search type SCAN should not be supported");
} }
private SearchInput.Result executeSearchInput(SearchRequest request) throws IOException { private WatchExecutionContext createContext() {
createIndex("test-search-index"); return new TriggeredExecutionContext(
ensureGreen("test-search-index");
SearchInput.Builder siBuilder = SearchInput.builder(request);
SearchInput si = siBuilder.build();
ExecutableSearchInput searchInput = new ExecutableSearchInput(si, logger, ClientProxy.of(client()));
WatchExecutionContext ctx = new TriggeredExecutionContext(
new Watch("test-watch", new Watch("test-watch",
new ScheduleTrigger(new IntervalSchedule(new IntervalSchedule.Interval(1, IntervalSchedule.Interval.Unit.MINUTES))), new ScheduleTrigger(new IntervalSchedule(new IntervalSchedule.Interval(1, IntervalSchedule.Interval.Unit.MINUTES))),
new ExecutableSimpleInput(new SimpleInput(new Payload.Simple()), logger), new ExecutableSimpleInput(new SimpleInput(new Payload.Simple()), logger),
@ -264,6 +262,16 @@ public class SearchInputTests extends ElasticsearchIntegrationTest {
new DateTime(60000, UTC), new DateTime(60000, UTC),
new ScheduleTriggerEvent("test-watch", new DateTime(60000, UTC), new DateTime(60000, UTC)), new ScheduleTriggerEvent("test-watch", new DateTime(60000, UTC), new DateTime(60000, UTC)),
timeValueSeconds(5)); timeValueSeconds(5));
}
private SearchInput.Result executeSearchInput(SearchRequest request, WatchExecutionContext ctx) throws IOException {
createIndex("test-search-index");
ensureGreen("test-search-index");
SearchInput.Builder siBuilder = SearchInput.builder(request);
SearchInput si = siBuilder.build();
ExecutableSearchInput searchInput = new ExecutableSearchInput(si, logger, ClientProxy.of(client()));
return searchInput.execute(ctx); return searchInput.execute(ctx);
} }

View File

@ -9,6 +9,7 @@ import org.elasticsearch.common.collect.ImmutableMap;
import org.elasticsearch.common.joda.time.DateTime; import org.elasticsearch.common.joda.time.DateTime;
import org.elasticsearch.test.ElasticsearchTestCase; import org.elasticsearch.test.ElasticsearchTestCase;
import org.elasticsearch.watcher.execution.WatchExecutionContext; import org.elasticsearch.watcher.execution.WatchExecutionContext;
import org.elasticsearch.watcher.execution.Wid;
import org.elasticsearch.watcher.test.WatcherTestUtils; import org.elasticsearch.watcher.test.WatcherTestUtils;
import org.elasticsearch.watcher.trigger.TriggerEvent; import org.elasticsearch.watcher.trigger.TriggerEvent;
import org.elasticsearch.watcher.trigger.schedule.ScheduleTriggerEvent; import org.elasticsearch.watcher.trigger.schedule.ScheduleTriggerEvent;
@ -18,6 +19,7 @@ import org.junit.Test;
import java.util.Map; import java.util.Map;
import static org.elasticsearch.common.joda.time.DateTimeZone.UTC; import static org.elasticsearch.common.joda.time.DateTimeZone.UTC;
import static org.elasticsearch.watcher.test.WatcherTestUtils.assertValue;
import static org.hamcrest.Matchers.*; import static org.hamcrest.Matchers.*;
/** /**
@ -33,25 +35,23 @@ public class VariablesTests extends ElasticsearchTestCase {
Payload payload = new Payload.Simple(ImmutableMap.<String, Object>builder().put("payload_key", "payload_value").build()); Payload payload = new Payload.Simple(ImmutableMap.<String, Object>builder().put("payload_key", "payload_value").build());
Map<String, Object> metatdata = ImmutableMap.<String, Object>builder().put("metadata_key", "metadata_value").build(); Map<String, Object> metatdata = ImmutableMap.<String, Object>builder().put("metadata_key", "metadata_value").build();
TriggerEvent event = new ScheduleTriggerEvent("_watch_id", triggeredTime, scheduledTime); TriggerEvent event = new ScheduleTriggerEvent("_watch_id", triggeredTime, scheduledTime);
WatchExecutionContext wec = WatcherTestUtils.mockExecutionContextBuilder("_watch_id") Wid wid = new Wid("_watch_id", 0, executionTime);
WatchExecutionContext ctx = WatcherTestUtils.mockExecutionContextBuilder("_watch_id")
.wid(wid)
.executionTime(executionTime) .executionTime(executionTime)
.triggerEvent(event) .triggerEvent(event)
.payload(payload) .payload(payload)
.metadata(metatdata) .metadata(metatdata)
.buildMock(); .buildMock();
Map<String, Object> model = Variables.createCtxModel(wec, payload); Map<String, Object> model = Variables.createCtxModel(ctx, payload);
assertThat(model, notNullValue()); assertThat(model, notNullValue());
assertThat(model, hasKey(Variables.CTX));
assertThat(model.get(Variables.CTX), instanceOf(Map.class));
assertThat(model.size(), is(1)); assertThat(model.size(), is(1));
assertValue(model, "ctx", instanceOf(Map.class));
Map<String, Object> ctx = (Map<String, Object>) model.get(Variables.CTX); assertValue(model, "ctx.id", is(wid.value()));
assertThat(ctx, hasEntry(Variables.WATCH_ID, (Object) "_watch_id")); assertValue(model, "ctx.execution_time", is(executionTime));
assertThat(ctx, hasEntry(Variables.EXECUTION_TIME, (Object) executionTime)); assertValue(model, "ctx.trigger", is(event.data()));
assertThat(ctx, hasEntry(Variables.TRIGGER, (Object) event.data())); assertValue(model, "ctx.payload", is(payload.data()));
assertThat(ctx, hasEntry(Variables.PAYLOAD, (Object) payload.data())); assertValue(model, "ctx.metadata", is(metatdata));
assertThat(ctx, hasEntry(Variables.METADATA, (Object) metatdata));
assertThat(ctx.size(), is(5));
} }
} }

View File

@ -43,6 +43,7 @@ import org.elasticsearch.watcher.actions.webhook.WebhookAction;
import org.elasticsearch.watcher.condition.script.ExecutableScriptCondition; import org.elasticsearch.watcher.condition.script.ExecutableScriptCondition;
import org.elasticsearch.watcher.condition.script.ScriptCondition; import org.elasticsearch.watcher.condition.script.ScriptCondition;
import org.elasticsearch.watcher.execution.WatchExecutionContext; import org.elasticsearch.watcher.execution.WatchExecutionContext;
import org.elasticsearch.watcher.execution.Wid;
import org.elasticsearch.watcher.input.search.ExecutableSearchInput; import org.elasticsearch.watcher.input.search.ExecutableSearchInput;
import org.elasticsearch.watcher.input.simple.ExecutableSimpleInput; import org.elasticsearch.watcher.input.simple.ExecutableSimpleInput;
import org.elasticsearch.watcher.input.simple.SimpleInput; import org.elasticsearch.watcher.input.simple.SimpleInput;
@ -59,6 +60,7 @@ import org.elasticsearch.watcher.support.template.Template;
import org.elasticsearch.watcher.support.template.TemplateEngine; import org.elasticsearch.watcher.support.template.TemplateEngine;
import org.elasticsearch.watcher.support.template.xmustache.XMustacheScriptEngineService; import org.elasticsearch.watcher.support.template.xmustache.XMustacheScriptEngineService;
import org.elasticsearch.watcher.support.template.xmustache.XMustacheTemplateEngine; import org.elasticsearch.watcher.support.template.xmustache.XMustacheTemplateEngine;
import org.elasticsearch.watcher.support.xcontent.ObjectPath;
import org.elasticsearch.watcher.transform.search.ExecutableSearchTransform; import org.elasticsearch.watcher.transform.search.ExecutableSearchTransform;
import org.elasticsearch.watcher.transform.search.SearchTransform; import org.elasticsearch.watcher.transform.search.SearchTransform;
import org.elasticsearch.watcher.trigger.TriggerEvent; import org.elasticsearch.watcher.trigger.TriggerEvent;
@ -67,16 +69,19 @@ import org.elasticsearch.watcher.trigger.schedule.ScheduleTrigger;
import org.elasticsearch.watcher.watch.Payload; import org.elasticsearch.watcher.watch.Payload;
import org.elasticsearch.watcher.watch.Watch; import org.elasticsearch.watcher.watch.Watch;
import org.elasticsearch.watcher.watch.WatchStatus; import org.elasticsearch.watcher.watch.WatchStatus;
import org.hamcrest.Matcher;
import javax.mail.internet.AddressException; import javax.mail.internet.AddressException;
import java.io.IOException; import java.io.IOException;
import java.lang.reflect.Constructor; import java.lang.reflect.Constructor;
import java.util.*; import java.util.*;
import static com.carrotsearch.randomizedtesting.RandomizedTest.randomInt;
import static org.elasticsearch.common.joda.time.DateTimeZone.UTC; import static org.elasticsearch.common.joda.time.DateTimeZone.UTC;
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery; import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
import static org.elasticsearch.search.builder.SearchSourceBuilder.searchSource; import static org.elasticsearch.search.builder.SearchSourceBuilder.searchSource;
import static org.elasticsearch.test.ElasticsearchTestCase.randomFrom; import static org.elasticsearch.test.ElasticsearchTestCase.randomFrom;
import static org.junit.Assert.assertThat;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
@ -90,6 +95,10 @@ public final class WatcherTestUtils {
private WatcherTestUtils() { private WatcherTestUtils() {
} }
public static void assertValue(Map<String, Object> map, String path, Matcher<?> matcher) {
assertThat(ObjectPath.eval(path, map), (Matcher<Object>) matcher);
}
public static XContentParser xContentParser(XContentBuilder builder) throws IOException { public static XContentParser xContentParser(XContentBuilder builder) throws IOException {
return builder.contentType().xContent().createParser(builder.bytes()); return builder.contentType().xContent().createParser(builder.bytes());
} }
@ -120,17 +129,20 @@ public final class WatcherTestUtils {
} }
public static WatchExecutionContextMockBuilder mockExecutionContextBuilder(String watchId) { public static WatchExecutionContextMockBuilder mockExecutionContextBuilder(String watchId) {
return new WatchExecutionContextMockBuilder(watchId); return new WatchExecutionContextMockBuilder(watchId)
.wid(new Wid(watchId, randomInt(10), DateTime.now(UTC)));
} }
public static WatchExecutionContext mockExecutionContext(String watchId, Payload payload) { public static WatchExecutionContext mockExecutionContext(String watchId, Payload payload) {
return mockExecutionContextBuilder(watchId) return mockExecutionContextBuilder(watchId)
.wid(new Wid(watchId, randomInt(10), DateTime.now(UTC)))
.payload(payload) .payload(payload)
.buildMock(); .buildMock();
} }
public static WatchExecutionContext mockExecutionContext(String watchId, DateTime time, Payload payload) { public static WatchExecutionContext mockExecutionContext(String watchId, DateTime time, Payload payload) {
return mockExecutionContextBuilder(watchId) return mockExecutionContextBuilder(watchId)
.wid(new Wid(watchId, randomInt(10), DateTime.now(UTC)))
.payload(payload) .payload(payload)
.time(watchId, time) .time(watchId, time)
.buildMock(); .buildMock();
@ -138,6 +150,7 @@ public final class WatcherTestUtils {
public static WatchExecutionContext mockExecutionContext(String watchId, DateTime executionTime, TriggerEvent event, Payload payload) { public static WatchExecutionContext mockExecutionContext(String watchId, DateTime executionTime, TriggerEvent event, Payload payload) {
return mockExecutionContextBuilder(watchId) return mockExecutionContextBuilder(watchId)
.wid(new Wid(watchId, randomInt(10), DateTime.now(UTC)))
.payload(payload) .payload(payload)
.executionTime(executionTime) .executionTime(executionTime)
.triggerEvent(event) .triggerEvent(event)

View File

@ -208,7 +208,7 @@ public class BootStrapTests extends AbstractWatcherIntegrationTests {
SearchResponse searchResponse = client().prepareSearch(HistoryStore.INDEX_PREFIX + "*").get(); SearchResponse searchResponse = client().prepareSearch(HistoryStore.INDEX_PREFIX + "*").get();
assertHitCount(searchResponse, 1); assertHitCount(searchResponse, 1);
assertThat(searchResponse.getHits().getAt(0).id(), Matchers.equalTo(wid.value())); assertThat(searchResponse.getHits().getAt(0).id(), Matchers.equalTo(wid.value()));
assertThat(searchResponse.getHits().getAt(0).sourceAsMap().get(WatchRecord.Field.STATE.getPreferredName()).toString(), Matchers.equalTo(ExecutionState.DELETED_WHILE_QUEUED.toString())); assertThat(searchResponse.getHits().getAt(0).sourceAsMap().get(WatchRecord.Field.STATE.getPreferredName()).toString(), Matchers.equalTo(ExecutionState.NOT_EXECUTED_WATCH_MISSING.toString()));
} }

View File

@ -153,7 +153,7 @@ public class HttpSecretsIntegrationTests extends AbstractWatcherIntegrationTests
.get(); .get();
assertThat(executeResponse, notNullValue()); assertThat(executeResponse, notNullValue());
contentSource = executeResponse.getRecordSource(); contentSource = executeResponse.getRecordSource();
value = contentSource.getValue("result.input.http.status"); value = contentSource.getValue("result.input.http.status_code");
assertThat(value, notNullValue()); assertThat(value, notNullValue());
assertThat(value, is((Object) 200)); assertThat(value, is((Object) 200));

View File

@ -253,6 +253,8 @@ public class SearchTransformTests extends ElasticsearchIntegrationTest {
@Test @Test
public void testSearch_InlineTemplate() throws Exception { public void testSearch_InlineTemplate() throws Exception {
WatchExecutionContext ctx = createContext();
final String templateQuery = "{\"query\":{\"filtered\":{\"query\":{\"match\":{\"event_type\":{\"query\":\"a\"," + final String templateQuery = "{\"query\":{\"filtered\":{\"query\":{\"match\":{\"event_type\":{\"query\":\"a\"," +
"\"type\":\"boolean\"}}},\"filter\":{\"range\":{\"_timestamp\":" + "\"type\":\"boolean\"}}},\"filter\":{\"range\":{\"_timestamp\":" +
"{\"from\":\"{{ctx.trigger.scheduled_time}}||-{{seconds_param}}\",\"to\":\"{{ctx.trigger.scheduled_time}}\"," + "{\"from\":\"{{ctx.trigger.scheduled_time}}||-{{seconds_param}}\",\"to\":\"{{ctx.trigger.scheduled_time}}\"," +
@ -261,7 +263,7 @@ public class SearchTransformTests extends ElasticsearchIntegrationTest {
final String expectedQuery = "{\"template\":{\"query\":{\"filtered\":{\"query\":{\"match\":{\"event_type\":{\"query\":\"a\"," + final String expectedQuery = "{\"template\":{\"query\":{\"filtered\":{\"query\":{\"match\":{\"event_type\":{\"query\":\"a\"," +
"\"type\":\"boolean\"}}},\"filter\":{\"range\":{\"_timestamp\":" + "\"type\":\"boolean\"}}},\"filter\":{\"range\":{\"_timestamp\":" +
"{\"from\":\"{{ctx.trigger.scheduled_time}}||-{{seconds_param}}\",\"to\":\"{{ctx.trigger.scheduled_time}}\"," + "{\"from\":\"{{ctx.trigger.scheduled_time}}||-{{seconds_param}}\",\"to\":\"{{ctx.trigger.scheduled_time}}\"," +
"\"include_lower\":true,\"include_upper\":true}}}}}},\"params\":{\"seconds_param\":\"30s\",\"ctx\":{\"metadata\":null,\"watch_id\":\"test-watch\",\"payload\":{},\"trigger\":{\"triggered_time\":\"1970-01-01T00:01:00.000Z\",\"scheduled_time\":\"1970-01-01T00:01:00.000Z\"},\"execution_time\":\"1970-01-01T00:01:00.000Z\"}}}"; "\"include_lower\":true,\"include_upper\":true}}}}}},\"params\":{\"seconds_param\":\"30s\",\"ctx\":{\"id\":\"" + ctx.id().value() + "\",\"metadata\":null,\"watch_id\":\"test-watch\",\"payload\":{},\"trigger\":{\"triggered_time\":\"1970-01-01T00:01:00.000Z\",\"scheduled_time\":\"1970-01-01T00:01:00.000Z\"},\"execution_time\":\"1970-01-01T00:01:00.000Z\"}}}";
Map<String, Object> params = new HashMap<>(); Map<String, Object> params = new HashMap<>();
params.put("seconds_param", "30s"); params.put("seconds_param", "30s");
@ -276,13 +278,15 @@ public class SearchTransformTests extends ElasticsearchIntegrationTest {
.setTemplateSource(templateSource) .setTemplateSource(templateSource)
.request(); .request();
SearchTransform.Result executedResult = executeSearchTransform(request); SearchTransform.Result executedResult = executeSearchTransform(request, ctx);
assertThat(areJsonEquivalent(executedResult.executedRequest().templateSource().toUtf8(), expectedQuery), is(true)); assertThat(areJsonEquivalent(executedResult.executedRequest().templateSource().toUtf8(), expectedQuery), is(true));
} }
@Test @Test
public void testSearch_IndexedTemplate() throws Exception { public void testSearch_IndexedTemplate() throws Exception {
WatchExecutionContext ctx = createContext();
final String templateQuery = "{\"query\":{\"filtered\":{\"query\":{\"match\":{\"event_type\":{\"query\":\"a\"," + final String templateQuery = "{\"query\":{\"filtered\":{\"query\":{\"match\":{\"event_type\":{\"query\":\"a\"," +
"\"type\":\"boolean\"}}},\"filter\":{\"range\":{\"_timestamp\":" + "\"type\":\"boolean\"}}},\"filter\":{\"range\":{\"_timestamp\":" +
"{\"from\":\"{{ctx.trigger.scheduled_time}}||-{{seconds_param}}\",\"to\":\"{{ctx.trigger.scheduled_time}}\"," + "{\"from\":\"{{ctx.trigger.scheduled_time}}||-{{seconds_param}}\",\"to\":\"{{ctx.trigger.scheduled_time}}\"," +
@ -304,13 +308,15 @@ public class SearchTransformTests extends ElasticsearchIntegrationTest {
.setTemplateSource(templateSource) .setTemplateSource(templateSource)
.request(); .request();
SearchTransform.Result result = executeSearchTransform(request); SearchTransform.Result result = executeSearchTransform(request, ctx);
assertNotNull(result.executedRequest()); assertNotNull(result.executedRequest());
assertThat(result.executedRequest().templateSource().toUtf8(), startsWith("{\"template\":{\"id\":\"test-script\"")); assertThat(result.executedRequest().templateSource().toUtf8(), startsWith("{\"template\":{\"id\":\"test-script\""));
} }
@Test @Test
public void testSearch_OndiskTemplate() throws Exception { public void testSearch_OndiskTemplate() throws Exception {
WatchExecutionContext ctx = createContext();
Map<String, Object> params = new HashMap<>(); Map<String, Object> params = new HashMap<>();
params.put("seconds_param", "30s"); params.put("seconds_param", "30s");
@ -324,7 +330,7 @@ public class SearchTransformTests extends ElasticsearchIntegrationTest {
.setTemplateSource(templateSource) .setTemplateSource(templateSource)
.request(); .request();
SearchTransform.Result result = executeSearchTransform(request); SearchTransform.Result result = executeSearchTransform(request, ctx);
assertNotNull(result.executedRequest()); assertNotNull(result.executedRequest());
assertThat(result.executedRequest().templateSource().toUtf8(), startsWith("{\"template\":{\"file\":\"test_disk_template\"")); assertThat(result.executedRequest().templateSource().toUtf8(), startsWith("{\"template\":{\"file\":\"test_disk_template\""));
} }
@ -332,11 +338,13 @@ public class SearchTransformTests extends ElasticsearchIntegrationTest {
@Test @Test
public void testDifferentSearchType() throws Exception { public void testDifferentSearchType() throws Exception {
WatchExecutionContext ctx = createContext();
SearchSourceBuilder searchSourceBuilder = searchSource().query(filteredQuery( SearchSourceBuilder searchSourceBuilder = searchSource().query(filteredQuery(
matchQuery("event_type", "a"), matchQuery("event_type", "a"),
rangeFilter("_timestamp") rangeFilter("_timestamp")
.from("{{ctx.trigger.scheduled_time}}||-30s") .from("{{ctx.trigger.scheduled_time}}||-30s")
.to("{{ctx.trigger.triggered_time}}"))); .to("{{ctx.trigger.triggered_time}}")));
final SearchType searchType = getRandomSupportedSearchType(); final SearchType searchType = getRandomSupportedSearchType();
SearchRequest request = client() SearchRequest request = client()
@ -345,7 +353,7 @@ public class SearchTransformTests extends ElasticsearchIntegrationTest {
.request() .request()
.source(searchSourceBuilder); .source(searchSourceBuilder);
SearchTransform.Result result = executeSearchTransform(request); SearchTransform.Result result = executeSearchTransform(request, ctx);
assertThat((Integer) XContentMapValues.extractValue("hits.total", result.payload().data()), equalTo(0)); assertThat((Integer) XContentMapValues.extractValue("hits.total", result.payload().data()), equalTo(0));
assertThat(result.executedRequest(), notNullValue()); assertThat(result.executedRequest(), notNullValue());
@ -354,14 +362,8 @@ public class SearchTransformTests extends ElasticsearchIntegrationTest {
assertThat(result.executedRequest().indicesOptions(), equalTo(request.indicesOptions())); assertThat(result.executedRequest().indicesOptions(), equalTo(request.indicesOptions()));
} }
private SearchTransform.Result executeSearchTransform(SearchRequest request) throws IOException { private WatchExecutionContext createContext() {
createIndex("test-search-index"); return new TriggeredExecutionContext(
ensureGreen("test-search-index");
SearchTransform searchTransform = TransformBuilders.searchTransform(request).build();
ExecutableSearchTransform executableSearchTransform = new ExecutableSearchTransform(searchTransform, logger, ClientProxy.of(client()));
WatchExecutionContext ctx = new TriggeredExecutionContext(
new Watch("test-watch", new Watch("test-watch",
new ScheduleTrigger(new IntervalSchedule(new IntervalSchedule.Interval(1, IntervalSchedule.Interval.Unit.MINUTES))), new ScheduleTrigger(new IntervalSchedule(new IntervalSchedule.Interval(1, IntervalSchedule.Interval.Unit.MINUTES))),
new ExecutableSimpleInput(new SimpleInput(new Payload.Simple()), logger), new ExecutableSimpleInput(new SimpleInput(new Payload.Simple()), logger),
@ -374,6 +376,14 @@ public class SearchTransformTests extends ElasticsearchIntegrationTest {
new DateTime(60000, UTC), new DateTime(60000, UTC),
new ScheduleTriggerEvent("test-watch", new DateTime(60000, UTC), new DateTime(60000, UTC)), new ScheduleTriggerEvent("test-watch", new DateTime(60000, UTC), new DateTime(60000, UTC)),
timeValueSeconds(5)); timeValueSeconds(5));
}
private SearchTransform.Result executeSearchTransform(SearchRequest request, WatchExecutionContext ctx) throws IOException {
createIndex("test-search-index");
ensureGreen("test-search-index");
SearchTransform searchTransform = TransformBuilders.searchTransform(request).build();
ExecutableSearchTransform executableSearchTransform = new ExecutableSearchTransform(searchTransform, logger, ClientProxy.of(client()));
return executableSearchTransform.execute(ctx, Payload.Simple.EMPTY); return executableSearchTransform.execute(ctx, Payload.Simple.EMPTY);
} }

View File

@ -39,9 +39,9 @@ public class ForceDeleteWatchTests extends AbstractWatcherIntegrationTests {
@Test @Slow @TestLogging("_root:DEBUG") @Test @Slow @TestLogging("_root:DEBUG")
public void testForceDelete_LongRunningWatch() throws Exception { public void testForceDelete_LongRunningWatch() throws Exception {
PutWatchResponse putResponse = watcherClient().preparePutWatch("_name").setSource(watchBuilder() PutWatchResponse putResponse = watcherClient().preparePutWatch("_name").setSource(watchBuilder()
.trigger(schedule(interval("1s"))) .trigger(schedule(interval("3s")))
.condition(scriptCondition(Script.inline("sleep 5000; return true"))) .condition(scriptCondition(Script.inline("sleep 5000; return true")))
.addAction("_action1", loggingAction("{{ctx.watch_id}}"))) .addAction("_action1", loggingAction("executed action: {{ctx.id}}")))
.get(); .get();
assertThat(putResponse.getId(), equalTo("_name")); assertThat(putResponse.getId(), equalTo("_name"));
Thread.sleep(5000); Thread.sleep(5000);