[Watcher] Add Condition to Action

This adds a "condition" to every action (via the ActionWrapper) that prevents execution of the action if the condition fails. An action-level condition is only useful when there is more than one action, but nothing checks to ensure that it's only used in that scenario.

Original commit: elastic/x-pack-elasticsearch@704cfb1a86
This commit is contained in:
Chris Earle 2016-08-12 17:07:10 -04:00
parent 101d791ec4
commit 53d022a20a
12 changed files with 733 additions and 108 deletions

View File

@ -27,6 +27,7 @@ public interface Action extends ToXContent {
FAILURE,
PARTIAL_FAILURE,
THROTTLED,
CONDITION_FAILED,
SIMULATED;
@Override
@ -51,12 +52,17 @@ public interface Action extends ToXContent {
return status;
}
public static class Failure extends Result {
/**
* {@code StoppedResult} is a {@link Result} with a {@link #reason()}.
* <p>
* Any {@code StoppedResult} should provide a reason <em>why</em> it is stopped.
*/
public static class StoppedResult extends Result {
private final String reason;
public Failure(String type, String reason, Object... args) {
super(type, Status.FAILURE);
protected StoppedResult(String type, Status status, String reason, Object... args) {
super(type, status);
this.reason = LoggerMessageFormat.format(reason, args);
}
@ -68,25 +74,42 @@ public interface Action extends ToXContent {
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
return builder.field(Field.REASON.getPreferredName(), reason);
}
}
public static class Throttled extends Result {
/**
* {@code Failure} is a {@link StoppedResult} with a status of {@link Status#FAILURE} for actiosn that have failed unexpectedly
* (e.g., an exception was thrown in a place that wouldn't expect one, like transformation or an HTTP request).
*/
public static class Failure extends StoppedResult {
private final String reason;
public Failure(String type, String reason, Object... args) {
super(type, Status.FAILURE, reason, args);
}
}
/**
* {@code Throttled} is a {@link StoppedResult} with a status of {@link Status#THROTTLED} for actions that have been throttled.
*/
public static class Throttled extends StoppedResult {
public Throttled(String type, String reason) {
super(type, Status.THROTTLED);
this.reason = reason;
super(type, Status.THROTTLED, reason);
}
public String reason() {
return reason;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
return builder.field(Field.REASON.getPreferredName(), reason);
/**
* {@code ConditionFailed} is a {@link StoppedResult} with a status of {@link Status#FAILURE} for actions that have been skipped
* because the action's condition failed (either expected or unexpected).
*/
public static class ConditionFailed extends StoppedResult {
public ConditionFailed(String type, String reason, Object... args) {
super(type, Status.CONDITION_FAILED, reason, args);
}
}
}

View File

@ -10,6 +10,7 @@ import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.xpack.support.clock.Clock;
import org.elasticsearch.xpack.watcher.condition.ConditionRegistry;
import org.elasticsearch.xpack.watcher.support.validation.Validation;
import org.elasticsearch.xpack.watcher.transform.TransformRegistry;
@ -23,14 +24,18 @@ import java.util.Map;
public class ActionRegistry {
private final Map<String, ActionFactory> parsers;
private final ConditionRegistry conditionRegistry;
private final TransformRegistry transformRegistry;
private final Clock clock;
private final XPackLicenseState licenseState;
@Inject
public ActionRegistry(Map<String, ActionFactory> parsers, TransformRegistry transformRegistry, Clock clock,
public ActionRegistry(Map<String, ActionFactory> parsers,
ConditionRegistry conditionRegistry, TransformRegistry transformRegistry,
Clock clock,
XPackLicenseState licenseState) {
this.parsers = parsers;
this.conditionRegistry = conditionRegistry;
this.transformRegistry = transformRegistry;
this.clock = clock;
this.licenseState = licenseState;
@ -57,8 +62,7 @@ public class ActionRegistry {
throw new ElasticsearchParseException("could not parse action [{}] for watch [{}]. {}", id, watchId, error);
}
} else if (token == XContentParser.Token.START_OBJECT && id != null) {
ActionWrapper action = ActionWrapper.parse(watchId, id, parser, this, transformRegistry, clock, licenseState);
actions.add(action);
actions.add(ActionWrapper.parse(watchId, id, parser, this, conditionRegistry, transformRegistry, clock, licenseState));
}
}
return new ExecutableActions(actions);

View File

@ -17,6 +17,9 @@ import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.xpack.watcher.actions.throttler.ActionThrottler;
import org.elasticsearch.xpack.watcher.actions.throttler.Throttler;
import org.elasticsearch.xpack.watcher.condition.Condition;
import org.elasticsearch.xpack.watcher.condition.ConditionRegistry;
import org.elasticsearch.xpack.watcher.condition.ExecutableCondition;
import org.elasticsearch.xpack.watcher.execution.WatchExecutionContext;
import org.elasticsearch.xpack.watcher.support.WatcherDateTimeUtils;
import org.elasticsearch.xpack.support.clock.Clock;
@ -24,6 +27,7 @@ import org.elasticsearch.xpack.watcher.transform.ExecutableTransform;
import org.elasticsearch.xpack.watcher.transform.Transform;
import org.elasticsearch.xpack.watcher.transform.TransformRegistry;
import org.elasticsearch.xpack.watcher.watch.Payload;
import org.elasticsearch.xpack.watcher.watch.Watch;
import java.io.IOException;
@ -33,16 +37,23 @@ import java.io.IOException;
public class ActionWrapper implements ToXContent {
private String id;
@Nullable private final ExecutableTransform transform;
@Nullable
private final ExecutableCondition condition;
@Nullable
private final ExecutableTransform transform;
private final ActionThrottler throttler;
private final ExecutableAction action;
public ActionWrapper(String id, ExecutableAction action) {
this(id, null, null, action);
this(id, null, null, null, action);
}
public ActionWrapper(String id, ActionThrottler throttler, @Nullable ExecutableTransform transform, ExecutableAction action) {
public ActionWrapper(String id, ActionThrottler throttler,
@Nullable ExecutableCondition condition,
@Nullable ExecutableTransform transform,
ExecutableAction action) {
this.id = id;
this.condition = condition;
this.throttler = throttler;
this.transform = transform;
this.action = action;
@ -52,6 +63,10 @@ public class ActionWrapper implements ToXContent {
return id;
}
public ExecutableCondition condition() {
return condition;
}
public ExecutableTransform transform() {
return transform;
}
@ -64,7 +79,21 @@ public class ActionWrapper implements ToXContent {
return action;
}
public ActionWrapper.Result execute(WatchExecutionContext ctx) throws IOException {
/**
* Execute the current {@link #action()}.
* <p>
* This executes in the order of:
* <ol>
* <li>Throttling</li>
* <li>Conditional Check</li>
* <li>Transformation</li>
* <li>Action</li>
* </ol>
*
* @param ctx The current watch's context
* @return Never {@code null}
*/
public ActionWrapper.Result execute(WatchExecutionContext ctx) {
ActionWrapper.Result result = ctx.actionsResults().get(id);
if (result != null) {
return result;
@ -75,6 +104,20 @@ public class ActionWrapper implements ToXContent {
return new ActionWrapper.Result(id, new Action.Result.Throttled(action.type(), throttleResult.reason()));
}
}
Condition.Result conditionResult = null;
if (condition != null) {
try {
conditionResult = condition.execute(ctx);
if (conditionResult.met() == false) {
return new ActionWrapper.Result(id, conditionResult, null,
new Action.Result.ConditionFailed(action.type(), "condition not met. skipping"));
}
} catch (RuntimeException e) {
action.logger().error("failed to execute action [{}/{}]. failed to execute condition", e, ctx.watch().id(), id);
return new ActionWrapper.Result(id, new Action.Result.ConditionFailed(action.type(),
"condition failed. skipping: {}", e.getMessage()));
}
}
Payload payload = ctx.payload();
Transform.Result transformResult = null;
if (transform != null) {
@ -84,18 +127,19 @@ public class ActionWrapper implements ToXContent {
action.logger().error("failed to execute action [{}/{}]. failed to transform payload. {}", ctx.watch().id(), id,
transformResult.reason());
String msg = "Failed to transform payload";
return new ActionWrapper.Result(id, transformResult, new Action.Result.Failure(action.type(), msg));
return new ActionWrapper.Result(id, conditionResult, transformResult, new Action.Result.Failure(action.type(), msg));
}
payload = transformResult.payload();
} catch (Exception e) {
action.logger().error("failed to execute action [{}/{}]. failed to transform payload.", e, ctx.watch().id(), id);
return new ActionWrapper.Result(id, new Action.Result.Failure(action.type(), "Failed to transform payload. error: " +
return new ActionWrapper.Result(id, conditionResult, null,
new Action.Result.Failure(action.type(), "Failed to transform payload. error: {}",
ExceptionsHelper.detailedMessage(e)));
}
}
try {
Action.Result actionResult = action.execute(id, ctx, payload);
return new ActionWrapper.Result(id, transformResult, actionResult);
return new ActionWrapper.Result(id, conditionResult, transformResult, actionResult);
} catch (Exception e) {
action.logger().error("failed to execute action [{}/{}]", e, ctx.watch().id(), id);
return new ActionWrapper.Result(id, new Action.Result.Failure(action.type(), ExceptionsHelper.detailedMessage(e)));
@ -110,6 +154,7 @@ public class ActionWrapper implements ToXContent {
ActionWrapper that = (ActionWrapper) o;
if (!id.equals(that.id)) return false;
if (condition != null ? !condition.equals(that.condition) : that.condition != null) return false;
if (transform != null ? !transform.equals(that.transform) : that.transform != null) return false;
return action.equals(that.action);
}
@ -117,6 +162,7 @@ public class ActionWrapper implements ToXContent {
@Override
public int hashCode() {
int result = id.hashCode();
result = 31 * result + (condition != null ? condition.hashCode() : 0);
result = 31 * result + (transform != null ? transform.hashCode() : 0);
result = 31 * result + action.hashCode();
return result;
@ -129,6 +175,11 @@ public class ActionWrapper implements ToXContent {
if (throttlePeriod != null) {
builder.field(Throttler.Field.THROTTLE_PERIOD.getPreferredName(), throttlePeriod);
}
if (condition != null) {
builder.startObject(Watch.Field.CONDITION.getPreferredName())
.field(condition.type(), condition, params)
.endObject();
}
if (transform != null) {
builder.startObject(Transform.Field.TRANSFORM.getPreferredName())
.field(transform.type(), transform, params)
@ -139,11 +190,12 @@ public class ActionWrapper implements ToXContent {
}
static ActionWrapper parse(String watchId, String actionId, XContentParser parser,
ActionRegistry actionRegistry, TransformRegistry transformRegistry,
ActionRegistry actionRegistry, ConditionRegistry conditionRegistry, TransformRegistry transformRegistry,
Clock clock, XPackLicenseState licenseState) throws IOException {
assert parser.currentToken() == XContentParser.Token.START_OBJECT;
ExecutableCondition condition = null;
ExecutableTransform transform = null;
TimeValue throttlePeriod = null;
ExecutableAction action = null;
@ -154,7 +206,9 @@ public class ActionWrapper implements ToXContent {
if (token == XContentParser.Token.FIELD_NAME) {
currentFieldName = parser.currentName();
} else {
if (ParseFieldMatcher.STRICT.match(currentFieldName, Transform.Field.TRANSFORM)) {
if (ParseFieldMatcher.STRICT.match(currentFieldName, Watch.Field.CONDITION)) {
condition = conditionRegistry.parseExecutable(watchId, parser);
} else if (ParseFieldMatcher.STRICT.match(currentFieldName, Transform.Field.TRANSFORM)) {
transform = transformRegistry.parse(watchId, parser);
} else if (ParseFieldMatcher.STRICT.match(currentFieldName, Throttler.Field.THROTTLE_PERIOD)) {
try {
@ -179,21 +233,25 @@ public class ActionWrapper implements ToXContent {
}
ActionThrottler throttler = new ActionThrottler(clock, throttlePeriod, licenseState);
return new ActionWrapper(actionId, throttler, transform, action);
return new ActionWrapper(actionId, throttler, condition, transform, action);
}
public static class Result implements ToXContent {
private final String id;
@Nullable private final Transform.Result transform;
@Nullable
private final Condition.Result condition;
@Nullable
private final Transform.Result transform;
private final Action.Result action;
public Result(String id, Action.Result action) {
this(id, null, action);
this(id, null, null, action);
}
public Result(String id, @Nullable Transform.Result transform, Action.Result action) {
public Result(String id, @Nullable Condition.Result condition, @Nullable Transform.Result transform, Action.Result action) {
this.id = id;
this.condition = condition;
this.transform = transform;
this.action = action;
}
@ -202,6 +260,10 @@ public class ActionWrapper implements ToXContent {
return id;
}
public Condition.Result condition() {
return condition;
}
public Transform.Result transform() {
return transform;
}
@ -218,6 +280,7 @@ public class ActionWrapper implements ToXContent {
Result result = (Result) o;
if (!id.equals(result.id)) return false;
if (condition != null ? !condition.equals(result.condition) : result.condition != null) return false;
if (transform != null ? !transform.equals(result.transform) : result.transform != null) return false;
return action.equals(result.action);
}
@ -225,6 +288,7 @@ public class ActionWrapper implements ToXContent {
@Override
public int hashCode() {
int result = id.hashCode();
result = 31 * result + (condition != null ? condition.hashCode() : 0);
result = 31 * result + (transform != null ? transform.hashCode() : 0);
result = 31 * result + action.hashCode();
return result;
@ -235,7 +299,10 @@ public class ActionWrapper implements ToXContent {
builder.startObject();
builder.field(Field.ID.getPreferredName(), id);
builder.field(Field.TYPE.getPreferredName(), action.type());
builder.field(Field.STATUS.getPreferredName(), action.status, params);
builder.field(Field.STATUS.getPreferredName(), action.status(), params);
if (condition != null) {
builder.field(Watch.Field.CONDITION.getPreferredName(), condition, params);
}
if (transform != null) {
builder.field(Transform.Field.TRANSFORM.getPreferredName(), transform, params);
}

View File

@ -97,12 +97,26 @@ public class WatchSourceBuilder implements ToXContent {
return addAction(id, null, transform.build(), action.build());
}
public WatchSourceBuilder addAction(String id, Condition.Builder condition, Action.Builder action) {
return addAction(id, null, condition.build(), null, action.build());
}
public WatchSourceBuilder addAction(String id, TimeValue throttlePeriod, Transform.Builder transform, Action.Builder action) {
return addAction(id, throttlePeriod, transform.build(), action.build());
}
public WatchSourceBuilder addAction(String id, TimeValue throttlePeriod, Transform transform, Action action) {
actions.put(id, new TransformedAction(id, action, throttlePeriod, transform));
actions.put(id, new TransformedAction(id, action, throttlePeriod, null, transform));
return this;
}
public WatchSourceBuilder addAction(String id, TimeValue throttlePeriod, Condition.Builder condition, Transform.Builder transform,
Action.Builder action) {
return addAction(id, throttlePeriod, condition.build(), transform.build(), action.build());
}
public WatchSourceBuilder addAction(String id, TimeValue throttlePeriod, Condition condition, Transform transform, Action action) {
actions.put(id, new TransformedAction(id, action, throttlePeriod, condition, transform));
return this;
}
@ -173,11 +187,14 @@ public class WatchSourceBuilder implements ToXContent {
private final String id;
private final Action action;
@Nullable private final TimeValue throttlePeriod;
@Nullable private final Condition condition;
@Nullable private final Transform transform;
public TransformedAction(String id, Action action, @Nullable TimeValue throttlePeriod, @Nullable Transform transform) {
public TransformedAction(String id, Action action, @Nullable TimeValue throttlePeriod,
@Nullable Condition condition, @Nullable Transform transform) {
this.id = id;
this.throttlePeriod = throttlePeriod;
this.condition = condition;
this.transform = transform;
this.action = action;
}
@ -188,6 +205,11 @@ public class WatchSourceBuilder implements ToXContent {
if (throttlePeriod != null) {
builder.field(Throttler.Field.THROTTLE_PERIOD.getPreferredName(), throttlePeriod);
}
if (condition != null) {
builder.startObject(Watch.Field.CONDITION.getPreferredName())
.field(condition.type(), condition, params)
.endObject();
}
if (transform != null) {
builder.startObject(Transform.Field.TRANSFORM.getPreferredName())
.field(transform.type(), transform, params)

View File

@ -31,6 +31,7 @@ public interface Condition extends ToXContent {
protected final boolean met;
public Result(String type, boolean met) {
// TODO: FAILURE status is never used, but a some code assumes that it is used
this.status = Status.SUCCESS;
this.type = type;
this.met = met;
@ -46,7 +47,6 @@ public interface Condition extends ToXContent {
}
public boolean met() {
assert status == Status.SUCCESS;
return met;
}

View File

@ -17,7 +17,6 @@ import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.xpack.support.clock.Clock;
import org.elasticsearch.xpack.watcher.Watcher;
import org.elasticsearch.xpack.watcher.WatcherFeatureSet;
import org.elasticsearch.xpack.common.stats.Counters;
import org.elasticsearch.xpack.watcher.actions.ActionWrapper;
import org.elasticsearch.xpack.watcher.condition.Condition;
@ -32,7 +31,6 @@ import org.elasticsearch.xpack.watcher.watch.WatchStore;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@ -349,7 +347,7 @@ public class ExecutionService extends AbstractComponent {
}
}
WatchRecord executeInner(WatchExecutionContext ctx) throws IOException {
WatchRecord executeInner(WatchExecutionContext ctx) {
ctx.start();
Watch watch = ctx.watch();

View File

@ -14,11 +14,8 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.query.QueryParseContext;
import org.elasticsearch.indices.query.IndicesQueriesRegistry;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.search.SearchRequestParsers;
import org.elasticsearch.search.aggregations.AggregatorParsers;
import org.elasticsearch.search.suggest.Suggesters;
import org.elasticsearch.xpack.security.InternalClient;
import org.elasticsearch.xpack.watcher.support.init.proxy.WatcherClientProxy;
import org.elasticsearch.xpack.watcher.support.search.WatcherSearchTemplateService;

View File

@ -6,6 +6,7 @@
package org.elasticsearch.xpack.watcher.execution;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.support.clock.Clock;
@ -67,7 +68,6 @@ public class ExecutionServiceTests extends ESTestCase {
private Input.Result inputResult;
private WatchStore watchStore;
private TriggeredWatchStore triggeredWatchStore;
private HistoryStore historyStore;
private WatchLockService watchLockService;
private ExecutionService executionService;
@ -75,6 +75,8 @@ public class ExecutionServiceTests extends ESTestCase {
@Before
public void init() throws Exception {
TriggeredWatchStore triggeredWatchStore;
payload = mock(Payload.class);
input = mock(ExecutableInput.class);
inputResult = mock(Input.Result.class);
@ -87,7 +89,7 @@ public class ExecutionServiceTests extends ESTestCase {
historyStore = mock(HistoryStore.class);
WatchExecutor executor = mock(WatchExecutor.class);
when(executor.queue()).thenReturn(new ArrayBlockingQueue<Runnable>(1));
when(executor.queue()).thenReturn(new ArrayBlockingQueue<>(1));
watchLockService = mock(WatchLockService.class);
clock = new ClockMock();
@ -95,7 +97,7 @@ public class ExecutionServiceTests extends ESTestCase {
watchLockService, clock);
ClusterState clusterState = mock(ClusterState.class);
when(triggeredWatchStore.loadTriggeredWatches(clusterState)).thenReturn(new ArrayList<TriggeredWatch>());
when(triggeredWatchStore.loadTriggeredWatches(clusterState)).thenReturn(new ArrayList<>());
executionService.start(clusterState);
}
@ -127,11 +129,27 @@ public class ExecutionServiceTests extends ESTestCase {
ActionThrottler throttler = mock(ActionThrottler.class);
when(throttler.throttle("_action", context)).thenReturn(throttleResult);
// action level conditional
ExecutableCondition actionCondition = null;
Condition.Result actionConditionResult = null;
if (randomBoolean()) {
Tuple<ExecutableCondition, Condition.Result> pair = whenCondition(context);
actionCondition = pair.v1();
actionConditionResult = pair.v2();
}
// action level transform
Transform.Result actionTransformResult = mock(Transform.Result.class);
when(actionTransformResult.payload()).thenReturn(payload);
ExecutableTransform actionTransform = mock(ExecutableTransform.class);
when(actionTransform.execute(context, payload)).thenReturn(actionTransformResult);
ExecutableTransform actionTransform = null;
Transform.Result actionTransformResult = null;
if (randomBoolean()) {
Tuple<ExecutableTransform, Transform.Result> pair = whenTransform(context);
actionTransform = pair.v1();
actionTransformResult = pair.v2();
}
// the action
Action.Result actionResult = mock(Action.Result.class);
@ -141,7 +159,7 @@ public class ExecutionServiceTests extends ESTestCase {
when(action.type()).thenReturn("MY_AWESOME_TYPE");
when(action.execute("_action", context, payload)).thenReturn(actionResult);
ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionTransform, action);
ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionCondition, actionTransform, action);
ExecutableActions actions = new ExecutableActions(Arrays.asList(actionWrapper));
WatchStatus watchStatus = new WatchStatus(clock.nowUTC(), singletonMap("_action", new ActionStatus(clock.nowUTC())));
@ -158,6 +176,7 @@ public class ExecutionServiceTests extends ESTestCase {
ActionWrapper.Result result = watchRecord.result().actionsResults().get("_action");
assertThat(result, notNullValue());
assertThat(result.id(), is("_action"));
assertThat(result.condition(), sameInstance(actionConditionResult));
assertThat(result.transform(), sameInstance(actionTransformResult));
assertThat(result.action(), sameInstance(actionResult));
@ -208,11 +227,10 @@ public class ExecutionServiceTests extends ESTestCase {
ActionThrottler throttler = mock(ActionThrottler.class);
when(throttler.throttle("_action", context)).thenReturn(throttleResult);
// action level transform
Transform.Result actionTransformResult = mock(Transform.Result.class);
when(actionTransformResult.payload()).thenReturn(payload);
ExecutableTransform actionTransform = mock(ExecutableTransform.class);
when(actionTransform.execute(context, payload)).thenReturn(actionTransformResult);
// action level condition (unused)
ExecutableCondition actionCondition = randomBoolean() ? mock(ExecutableCondition.class) : null;
// action level transform (unused)
ExecutableTransform actionTransform = randomBoolean() ? mock(ExecutableTransform.class) : null;
// the action
Action.Result actionResult = mock(Action.Result.class);
@ -221,7 +239,7 @@ public class ExecutionServiceTests extends ESTestCase {
ExecutableAction action = mock(ExecutableAction.class);
when(action.execute("_action", context, payload)).thenReturn(actionResult);
ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionTransform, action);
ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionCondition, actionTransform, action);
ExecutableActions actions = new ExecutableActions(Arrays.asList(actionWrapper));
WatchStatus watchStatus = new WatchStatus(clock.nowUTC(), singletonMap("_action", new ActionStatus(clock.nowUTC())));
@ -276,11 +294,10 @@ public class ExecutionServiceTests extends ESTestCase {
ActionThrottler throttler = mock(ActionThrottler.class);
when(throttler.throttle("_action", context)).thenReturn(throttleResult);
// action level transform
Transform.Result actionTransformResult = mock(Transform.Result.class);
when(actionTransformResult.payload()).thenReturn(payload);
ExecutableTransform actionTransform = mock(ExecutableTransform.class);
when(actionTransform.execute(context, payload)).thenReturn(actionTransformResult);
// action level condition (unused)
ExecutableCondition actionCondition = randomBoolean() ? mock(ExecutableCondition.class) : null;
// action level transform (unused)
ExecutableTransform actionTransform = randomBoolean() ? mock(ExecutableTransform.class) : null;
// the action
Action.Result actionResult = mock(Action.Result.class);
@ -289,7 +306,7 @@ public class ExecutionServiceTests extends ESTestCase {
ExecutableAction action = mock(ExecutableAction.class);
when(action.execute("_action", context, payload)).thenReturn(actionResult);
ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionTransform, action);
ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionCondition, actionTransform, action);
ExecutableActions actions = new ExecutableActions(Arrays.asList(actionWrapper));
WatchStatus watchStatus = new WatchStatus(clock.nowUTC(), singletonMap("_action", new ActionStatus(clock.nowUTC())));
@ -343,11 +360,10 @@ public class ExecutionServiceTests extends ESTestCase {
ActionThrottler throttler = mock(ActionThrottler.class);
when(throttler.throttle("_action", context)).thenReturn(throttleResult);
// action level transform
Transform.Result actionTransformResult = mock(Transform.Result.class);
when(actionTransformResult.payload()).thenReturn(payload);
ExecutableTransform actionTransform = mock(ExecutableTransform.class);
when(actionTransform.execute(context, payload)).thenReturn(actionTransformResult);
// action level condition (unused)
ExecutableCondition actionCondition = randomBoolean() ? mock(ExecutableCondition.class) : null;
// action level transform (unused)
ExecutableTransform actionTransform = randomBoolean() ? mock(ExecutableTransform.class) : null;
// the action
Action.Result actionResult = mock(Action.Result.class);
@ -356,7 +372,7 @@ public class ExecutionServiceTests extends ESTestCase {
ExecutableAction action = mock(ExecutableAction.class);
when(action.execute("_action", context, payload)).thenReturn(actionResult);
ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionTransform, action);
ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionCondition, actionTransform, action);
ExecutableActions actions = new ExecutableActions(Arrays.asList(actionWrapper));
WatchStatus watchStatus = new WatchStatus(clock.nowUTC(), singletonMap("_action", new ActionStatus(clock.nowUTC())));
@ -410,6 +426,17 @@ public class ExecutionServiceTests extends ESTestCase {
ActionThrottler throttler = mock(ActionThrottler.class);
when(throttler.throttle("_action", context)).thenReturn(throttleResult);
// action level condition
ExecutableCondition actionCondition = null;
Condition.Result actionConditionResult = null;
if (randomBoolean()) {
Tuple<ExecutableCondition, Condition.Result> pair = whenCondition(context);
actionCondition = pair.v1();
actionConditionResult = pair.v2();
}
// action level transform
Transform.Result actionTransformResult = mock(Transform.Result.class);
when(actionTransformResult.status()).thenReturn(Transform.Result.Status.FAILURE);
@ -425,7 +452,7 @@ public class ExecutionServiceTests extends ESTestCase {
when(action.logger()).thenReturn(logger);
when(action.execute("_action", context, payload)).thenReturn(actionResult);
ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionTransform, action);
ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionCondition, actionTransform, action);
ExecutableActions actions = new ExecutableActions(Arrays.asList(actionWrapper));
WatchStatus watchStatus = new WatchStatus(clock.nowUTC(), singletonMap("_action", new ActionStatus(clock.nowUTC())));
@ -442,6 +469,7 @@ public class ExecutionServiceTests extends ESTestCase {
assertThat(watchRecord.result().transformResult(), is(watchTransformResult));
assertThat(watchRecord.result().actionsResults(), notNullValue());
assertThat(watchRecord.result().actionsResults().count(), is(1));
assertThat(watchRecord.result().actionsResults().get("_action").condition(), is(actionConditionResult));
assertThat(watchRecord.result().actionsResults().get("_action").transform(), is(actionTransformResult));
assertThat(watchRecord.result().actionsResults().get("_action").action().status(), is(Action.Result.Status.FAILURE));
@ -476,11 +504,27 @@ public class ExecutionServiceTests extends ESTestCase {
ActionThrottler throttler = mock(ActionThrottler.class);
when(throttler.throttle("_action", context)).thenReturn(throttleResult);
// action level conditional
ExecutableCondition actionCondition = null;
Condition.Result actionConditionResult = null;
if (randomBoolean()) {
Tuple<ExecutableCondition, Condition.Result> pair = whenCondition(context);
actionCondition = pair.v1();
actionConditionResult = pair.v2();
}
// action level transform
Transform.Result actionTransformResult = mock(Transform.Result.class);
when(actionTransformResult.payload()).thenReturn(payload);
ExecutableTransform actionTransform = mock(ExecutableTransform.class);
when(actionTransform.execute(context, payload)).thenReturn(actionTransformResult);
ExecutableTransform actionTransform = null;
Transform.Result actionTransformResult = null;
if (randomBoolean()) {
Tuple<ExecutableTransform, Transform.Result> pair = whenTransform(context);
actionTransform = pair.v1();
actionTransformResult = pair.v2();
}
// the action
Action.Result actionResult = mock(Action.Result.class);
@ -489,7 +533,7 @@ public class ExecutionServiceTests extends ESTestCase {
ExecutableAction action = mock(ExecutableAction.class);
when(action.execute("_action", context, payload)).thenReturn(actionResult);
ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionTransform, action);
ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionCondition, actionTransform, action);
ExecutableActions actions = new ExecutableActions(Arrays.asList(actionWrapper));
WatchStatus watchStatus = new WatchStatus(clock.nowUTC(), singletonMap("_action", new ActionStatus(now)));
@ -506,6 +550,7 @@ public class ExecutionServiceTests extends ESTestCase {
ActionWrapper.Result result = watchRecord.result().actionsResults().get("_action");
assertThat(result, notNullValue());
assertThat(result.id(), is("_action"));
assertThat(result.condition(), sameInstance(actionConditionResult));
assertThat(result.transform(), sameInstance(actionTransformResult));
assertThat(result.action(), sameInstance(actionResult));
@ -524,17 +569,20 @@ public class ExecutionServiceTests extends ESTestCase {
ExecutableCondition condition = mock(ExecutableCondition.class);
when(condition.execute(any(WatchExecutionContext.class))).thenReturn(conditionResult);
// action throttler
Throttler.Result throttleResult = mock(Throttler.Result.class);
when(throttleResult.throttle()).thenReturn(true);
when(throttleResult.reason()).thenReturn("_throttle_reason");
ActionThrottler throttler = mock(ActionThrottler.class);
when(throttler.throttle("_action", context)).thenReturn(throttleResult);
ExecutableTransform transform = mock(ExecutableTransform.class);
// unused with throttle
ExecutableCondition actionCondition = mock(ExecutableCondition.class);
ExecutableTransform actionTransform = mock(ExecutableTransform.class);
ExecutableAction action = mock(ExecutableAction.class);
when(action.type()).thenReturn("_type");
ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, transform, action);
ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionCondition, actionTransform, action);
ExecutableActions actions = new ExecutableActions(Arrays.asList(actionWrapper));
WatchStatus watchStatus = new WatchStatus(clock.nowUTC(), singletonMap("_action", new ActionStatus(now)));
@ -552,6 +600,7 @@ public class ExecutionServiceTests extends ESTestCase {
ActionWrapper.Result result = watchRecord.result().actionsResults().get("_action");
assertThat(result, notNullValue());
assertThat(result.id(), is("_action"));
assertThat(result.condition(), nullValue());
assertThat(result.transform(), nullValue());
assertThat(result.action(), instanceOf(Action.Result.Throttled.class));
Action.Result.Throttled throttled = (Action.Result.Throttled) result.action();
@ -559,7 +608,8 @@ public class ExecutionServiceTests extends ESTestCase {
verify(condition, times(1)).execute(context);
verify(throttler, times(1)).throttle("_action", context);
verify(transform, never()).execute(context, payload);
verify(actionCondition, never()).execute(context);
verify(actionTransform, never()).execute(context, payload);
}
public void testExecuteInnerConditionNotMet() throws Exception {
@ -568,6 +618,126 @@ public class ExecutionServiceTests extends ESTestCase {
ScheduleTriggerEvent event = new ScheduleTriggerEvent("_id", now, now);
WatchExecutionContext context = new TriggeredExecutionContext(watch, now, event, timeValueSeconds(5));
Condition.Result conditionResult = AlwaysCondition.Result.INSTANCE;
ExecutableCondition condition = mock(ExecutableCondition.class);
when(condition.execute(any(WatchExecutionContext.class))).thenReturn(conditionResult);
// action throttler
Throttler.Result throttleResult = mock(Throttler.Result.class);
when(throttleResult.throttle()).thenReturn(false);
ActionThrottler throttler = mock(ActionThrottler.class);
when(throttler.throttle("_action", context)).thenReturn(throttleResult);
// action condition (always fails)
Condition.Result actionConditionResult = mock(Condition.Result.class);
// note: sometimes it can be met _with_ success
if (randomBoolean()) {
when(actionConditionResult.status()).thenReturn(Condition.Result.Status.SUCCESS);
} else {
when(actionConditionResult.status()).thenReturn(Condition.Result.Status.FAILURE);
}
when(actionConditionResult.met()).thenReturn(false);
ExecutableCondition actionCondition = mock(ExecutableCondition.class);
when(actionCondition.execute(context)).thenReturn(actionConditionResult);
// unused with failed condition
ExecutableTransform actionTransform = mock(ExecutableTransform.class);
ExecutableAction action = mock(ExecutableAction.class);
when(action.type()).thenReturn("_type");
ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionCondition, actionTransform, action);
ExecutableActions actions = new ExecutableActions(Arrays.asList(actionWrapper));
WatchStatus watchStatus = new WatchStatus(clock.nowUTC(), singletonMap("_action", new ActionStatus(now)));
when(watch.input()).thenReturn(input);
when(watch.condition()).thenReturn(condition);
when(watch.actions()).thenReturn(actions);
when(watch.status()).thenReturn(watchStatus);
WatchRecord watchRecord = executionService.executeInner(context);
assertThat(watchRecord.result().inputResult(), sameInstance(inputResult));
assertThat(watchRecord.result().conditionResult(), sameInstance(conditionResult));
assertThat(watchRecord.result().transformResult(), nullValue());
assertThat(watchRecord.result().actionsResults().count(), is(1));
ActionWrapper.Result result = watchRecord.result().actionsResults().get("_action");
assertThat(result, notNullValue());
assertThat(result.id(), is("_action"));
assertThat(result.condition(), sameInstance(actionConditionResult));
assertThat(result.transform(), nullValue());
assertThat(result.action(), instanceOf(Action.Result.ConditionFailed.class));
Action.Result.ConditionFailed conditionFailed = (Action.Result.ConditionFailed) result.action();
assertThat(conditionFailed.reason(), is("condition not met. skipping"));
verify(condition, times(1)).execute(context);
verify(throttler, times(1)).throttle("_action", context);
verify(actionCondition, times(1)).execute(context);
verify(actionTransform, never()).execute(context, payload);
}
public void testExecuteInnerConditionNotMetDueToException() throws Exception {
DateTime now = DateTime.now(DateTimeZone.UTC);
Watch watch = mock(Watch.class);
when(watch.id()).thenReturn(getTestName());
ScheduleTriggerEvent event = new ScheduleTriggerEvent("_id", now, now);
WatchExecutionContext context = new TriggeredExecutionContext(watch, now, event, timeValueSeconds(5));
Condition.Result conditionResult = AlwaysCondition.Result.INSTANCE;
ExecutableCondition condition = mock(ExecutableCondition.class);
when(condition.execute(any(WatchExecutionContext.class))).thenReturn(conditionResult);
// action throttler
Throttler.Result throttleResult = mock(Throttler.Result.class);
when(throttleResult.throttle()).thenReturn(false);
ActionThrottler throttler = mock(ActionThrottler.class);
when(throttler.throttle("_action", context)).thenReturn(throttleResult);
// action condition (always fails)
ExecutableCondition actionCondition = mock(ExecutableCondition.class);
when(actionCondition.execute(context)).thenThrow(new IllegalArgumentException("[expected] failed for test"));
// unused with failed condition
ExecutableTransform actionTransform = mock(ExecutableTransform.class);
ExecutableAction action = mock(ExecutableAction.class);
when(action.type()).thenReturn("_type");
when(action.logger()).thenReturn(logger);
ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionCondition, actionTransform, action);
ExecutableActions actions = new ExecutableActions(Arrays.asList(actionWrapper));
WatchStatus watchStatus = new WatchStatus(clock.nowUTC(), singletonMap("_action", new ActionStatus(now)));
when(watch.input()).thenReturn(input);
when(watch.condition()).thenReturn(condition);
when(watch.actions()).thenReturn(actions);
when(watch.status()).thenReturn(watchStatus);
WatchRecord watchRecord = executionService.executeInner(context);
assertThat(watchRecord.result().inputResult(), sameInstance(inputResult));
assertThat(watchRecord.result().conditionResult(), sameInstance(conditionResult));
assertThat(watchRecord.result().transformResult(), nullValue());
assertThat(watchRecord.result().actionsResults().count(), is(1));
ActionWrapper.Result result = watchRecord.result().actionsResults().get("_action");
assertThat(result, notNullValue());
assertThat(result.id(), is("_action"));
assertThat(result.condition(), nullValue());
assertThat(result.transform(), nullValue());
assertThat(result.action(), instanceOf(Action.Result.ConditionFailed.class));
Action.Result.ConditionFailed conditionFailed = (Action.Result.ConditionFailed) result.action();
assertThat(conditionFailed.reason(), is("condition failed. skipping: [expected] failed for test"));
verify(condition, times(1)).execute(context);
verify(throttler, times(1)).throttle("_action", context);
verify(actionCondition, times(1)).execute(context);
verify(actionTransform, never()).execute(context, payload);
}
public void testExecuteConditionNotMet() throws Exception {
DateTime now = DateTime.now(DateTimeZone.UTC);
Watch watch = mock(Watch.class);
ScheduleTriggerEvent event = new ScheduleTriggerEvent("_id", now, now);
WatchExecutionContext context = new TriggeredExecutionContext(watch, now, event, timeValueSeconds(5));
Condition.Result conditionResult = NeverCondition.Result.INSTANCE;
ExecutableCondition condition = mock(ExecutableCondition.class);
when(condition.execute(any(WatchExecutionContext.class))).thenReturn(conditionResult);
@ -577,9 +747,10 @@ public class ExecutionServiceTests extends ESTestCase {
// action throttler
ActionThrottler throttler = mock(ActionThrottler.class);
ExecutableCondition actionCondition = mock(ExecutableCondition.class);
ExecutableTransform actionTransform = mock(ExecutableTransform.class);
ExecutableAction action = mock(ExecutableAction.class);
ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionTransform, action);
ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionCondition, actionTransform, action);
ExecutableActions actions = new ExecutableActions(Arrays.asList(actionWrapper));
WatchStatus watchStatus = new WatchStatus(clock.nowUTC(), singletonMap("_action", new ActionStatus(now)));
@ -602,4 +773,23 @@ public class ExecutionServiceTests extends ESTestCase {
verify(actionTransform, never()).execute(context, payload);
verify(action, never()).execute("_action", context, payload);
}
private Tuple<ExecutableCondition, Condition.Result> whenCondition(final WatchExecutionContext context) {
Condition.Result conditionResult = mock(Condition.Result.class);
when(conditionResult.met()).thenReturn(true);
ExecutableCondition condition = mock(ExecutableCondition.class);
when(condition.execute(context)).thenReturn(conditionResult);
return new Tuple<>(condition, conditionResult);
}
private Tuple<ExecutableTransform, Transform.Result> whenTransform(final WatchExecutionContext context) {
Transform.Result transformResult = mock(Transform.Result.class);
when(transformResult.payload()).thenReturn(payload);
ExecutableTransform transform = mock(ExecutableTransform.class);
when(transform.execute(context, payload)).thenReturn(transformResult);
return new Tuple<>(transform, transformResult);
}
}

View File

@ -0,0 +1,275 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.watcher.history;
import com.google.common.collect.Lists;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.script.MockScriptPlugin;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.xpack.watcher.client.WatchSourceBuilder;
import org.elasticsearch.xpack.watcher.condition.Condition;
import org.elasticsearch.xpack.watcher.condition.compare.CompareCondition;
import org.elasticsearch.xpack.watcher.execution.ExecutionState;
import org.elasticsearch.xpack.watcher.input.Input;
import org.elasticsearch.xpack.watcher.support.WatcherScript;
import org.elasticsearch.xpack.watcher.test.AbstractWatcherIntegrationTestCase;
import org.elasticsearch.xpack.watcher.transport.actions.put.PutWatchResponse;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import static org.elasticsearch.index.query.QueryBuilders.termQuery;
import static org.elasticsearch.xpack.watcher.actions.ActionBuilders.loggingAction;
import static org.elasticsearch.xpack.watcher.client.WatchSourceBuilders.watchBuilder;
import static org.elasticsearch.xpack.watcher.condition.ConditionBuilders.alwaysCondition;
import static org.elasticsearch.xpack.watcher.condition.ConditionBuilders.compareCondition;
import static org.elasticsearch.xpack.watcher.condition.ConditionBuilders.neverCondition;
import static org.elasticsearch.xpack.watcher.condition.ConditionBuilders.scriptCondition;
import static org.elasticsearch.xpack.watcher.input.InputBuilders.simpleInput;
import static org.elasticsearch.xpack.watcher.trigger.TriggerBuilders.schedule;
import static org.elasticsearch.xpack.watcher.trigger.schedule.Schedules.interval;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;
/**
* This test makes sure per-action conditions are honored.
*/
public class HistoryActionConditionTests extends AbstractWatcherIntegrationTestCase {
private final Input input = simpleInput("key", 15).build();
private final Condition.Builder scriptConditionPasses = mockScriptCondition("return true;");
private final Condition.Builder compareConditionPasses = compareCondition("ctx.payload.key", CompareCondition.Op.GTE, 15);
private final Condition.Builder conditionPasses = randomFrom(alwaysCondition(), scriptConditionPasses, compareConditionPasses);
private final Condition.Builder scriptConditionFails = mockScriptCondition("return false;");
private final Condition.Builder compareConditionFails = compareCondition("ctx.payload.key", CompareCondition.Op.LT, 15);
private final Condition.Builder conditionFails = randomFrom(neverCondition(), scriptConditionFails, compareConditionFails);
@Override
protected List<Class<? extends Plugin>> pluginTypes() {
List<Class<? extends Plugin>> types = super.pluginTypes();
types.add(CustomScriptPlugin.class);
return types;
}
public static class CustomScriptPlugin extends MockScriptPlugin {
@Override
protected Map<String, Function<Map<String, Object>, Object>> pluginScripts() {
Map<String, Function<Map<String, Object>, Object>> scripts = new HashMap<>();
scripts.put("return true;", vars -> true);
scripts.put("return false;", vars -> false);
scripts.put("throw new IllegalStateException('failed');", vars -> {
throw new IllegalStateException("[expected] failed hard");
});
return scripts;
}
}
@Override
protected boolean timeWarped() {
return true; // just to have better control over the triggers
}
@Override
protected boolean enableSecurity() {
return false; // remove security noise from this test
}
/**
* A hard failure is where an exception is thrown by the script condition.
*/
@SuppressWarnings("unchecked")
public void testActionConditionWithHardFailures() throws Exception {
final String id = "testActionConditionWithHardFailures";
final Condition.Builder scriptConditionFailsHard = mockScriptCondition("throw new IllegalStateException('failed');");
final List<Condition.Builder> actionConditionsWithFailure =
Lists.newArrayList(scriptConditionFailsHard, conditionPasses, alwaysCondition());
Collections.shuffle(actionConditionsWithFailure, random());
final int failedIndex = actionConditionsWithFailure.indexOf(scriptConditionFailsHard);
putAndTriggerWatch(id, input, actionConditionsWithFailure.toArray(new Condition.Builder[actionConditionsWithFailure.size()]));
flush();
assertWatchWithMinimumActionsCount(id, ExecutionState.EXECUTED, 1);
// only one action should have failed via condition
final SearchResponse response = searchHistory(SearchSourceBuilder.searchSource().query(termQuery("watch_id", id)));
assertThat(response.getHits().getTotalHits(), is(1L));
final SearchHit hit = response.getHits().getAt(0);
final List<Object> actions = getActionsFromHit(hit.getSource());
for (int i = 0; i < actionConditionsWithFailure.size(); ++i) {
final Map<String, Object> action = (Map<String, Object>)actions.get(i);
final Map<String, Object> condition = (Map<String, Object>)action.get("condition");
final Map<String, Object> logging = (Map<String, Object>)action.get("logging");
assertThat(action.get("id"), is("action" + i));
if (i == failedIndex) {
assertThat(action.get("status"), is("condition_failed"));
assertThat(action.get("reason"), is("condition failed. skipping: [expected] failed hard"));
assertThat(condition, nullValue());
assertThat(logging, nullValue());
} else {
assertThat(condition.get("type"), is(actionConditionsWithFailure.get(i).build().type()));
assertThat(action.get("status"), is("success"));
assertThat(condition.get("met"), is(true));
assertThat(action.get("reason"), nullValue());
assertThat(logging.get("logged_text"), is(Integer.toString(i)));
}
}
}
@SuppressWarnings("unchecked")
public void testActionConditionWithFailures() throws Exception {
final String id = "testActionConditionWithFailures";
final List<Condition.Builder> actionConditionsWithFailure = Lists.newArrayList(conditionFails, conditionPasses, alwaysCondition());
Collections.shuffle(actionConditionsWithFailure, random());
final int failedIndex = actionConditionsWithFailure.indexOf(conditionFails);
putAndTriggerWatch(id, input, actionConditionsWithFailure.toArray(new Condition.Builder[actionConditionsWithFailure.size()]));
flush();
assertWatchWithMinimumActionsCount(id, ExecutionState.EXECUTED, 1);
// only one action should have failed via condition
final SearchResponse response = searchHistory(SearchSourceBuilder.searchSource().query(termQuery("watch_id", id)));
assertThat(response.getHits().getTotalHits(), is(1L));
final SearchHit hit = response.getHits().getAt(0);
final List<Object> actions = getActionsFromHit(hit.getSource());
for (int i = 0; i < actionConditionsWithFailure.size(); ++i) {
final Map<String, Object> action = (Map<String, Object>)actions.get(i);
final Map<String, Object> condition = (Map<String, Object>)action.get("condition");
final Map<String, Object> logging = (Map<String, Object>)action.get("logging");
assertThat(action.get("id"), is("action" + i));
assertThat(condition.get("type"), is(actionConditionsWithFailure.get(i).build().type()));
if (i == failedIndex) {
assertThat(action.get("status"), is("condition_failed"));
assertThat(condition.get("met"), is(false));
assertThat(action.get("reason"), is("condition not met. skipping"));
assertThat(logging, nullValue());
} else {
assertThat(action.get("status"), is("success"));
assertThat(condition.get("met"), is(true));
assertThat(action.get("reason"), nullValue());
assertThat(logging.get("logged_text"), is(Integer.toString(i)));
}
}
}
@SuppressWarnings("unchecked")
public void testActionCondition() throws Exception {
final String id = "testActionCondition";
final List<Condition.Builder> actionConditions = Lists.newArrayList(conditionPasses);
if (randomBoolean()) {
actionConditions.add(alwaysCondition());
}
Collections.shuffle(actionConditions, random());
putAndTriggerWatch(id, input, actionConditions.toArray(new Condition.Builder[actionConditions.size()]));
flush();
assertWatchWithMinimumActionsCount(id, ExecutionState.EXECUTED, 1);
// all actions should be successful
final SearchResponse response = searchHistory(SearchSourceBuilder.searchSource().query(termQuery("watch_id", id)));
assertThat(response.getHits().getTotalHits(), is(1L));
final SearchHit hit = response.getHits().getAt(0);
final List<Object> actions = getActionsFromHit(hit.getSource());
for (int i = 0; i < actionConditions.size(); ++i) {
final Map<String, Object> action = (Map<String, Object>)actions.get(i);
final Map<String, Object> condition = (Map<String, Object>)action.get("condition");
final Map<String, Object> logging = (Map<String, Object>)action.get("logging");
assertThat(action.get("id"), is("action" + i));
assertThat(action.get("status"), is("success"));
assertThat(condition.get("type"), is(actionConditions.get(i).build().type()));
assertThat(condition.get("met"), is(true));
assertThat(action.get("reason"), nullValue());
assertThat(logging.get("logged_text"), is(Integer.toString(i)));
}
}
/**
* Get the "actions" from the Watch History hit.
*
* @param source The hit's source.
* @return The list of "actions"
*/
@SuppressWarnings("unchecked")
private List<Object> getActionsFromHit(final Map<String, Object> source) {
final Map<String, Object> result = (Map<String, Object>)source.get("result");
return (List<Object>)result.get("actions");
}
/**
* Create a Watch with the specified {@code id} and {@code input}.
* <p>
* The {@code actionConditions} are
*
* @param id The ID of the Watch
* @param input The input to use for the Watch
* @param actionConditions The conditions to add to the Watch
*/
private void putAndTriggerWatch(final String id, final Input input, final Condition.Builder... actionConditions) {
WatchSourceBuilder source = watchBuilder().trigger(schedule(interval("5s"))).input(input).condition(alwaysCondition());
for (int i = 0; i < actionConditions.length; ++i) {
source.addAction("action" + i, actionConditions[i], loggingAction(Integer.toString(i)));
}
PutWatchResponse putWatchResponse = watcherClient().preparePutWatch(id).setSource(source).get();
assertThat(putWatchResponse.isCreated(), is(true));
timeWarp().scheduler().trigger(id);
}
/**
* Create an inline script using the {@link CustomScriptPlugin}.
*
* @param inlineScript The script to "compile" and run
* @return Never {@code null}
*/
private static Condition.Builder mockScriptCondition(String inlineScript) {
WatcherScript.Builder builder = new WatcherScript.Builder.Inline(inlineScript);
builder.lang(MockScriptPlugin.NAME);
return scriptCondition(builder);
}
}

View File

@ -79,8 +79,6 @@ public class HttpSecretsIntegrationTests extends AbstractWatcherIntegrationTestC
webServer.shutdown();
}
@Override
protected Settings nodeSettings(int nodeOrdinal) {
if (encryptSensitiveData == null) {
@ -213,8 +211,11 @@ public class HttpSecretsIntegrationTests extends AbstractWatcherIntegrationTestC
.setTriggerEvent(triggerEvent)
.get();
assertThat(executeResponse, notNullValue());
contentSource = executeResponse.getRecordSource();
assertThat(contentSource.getValue("result.actions.0.status"), is("success"));
value = contentSource.getValue("result.actions.0.webhook.response.status");
assertThat(value, notNullValue());
assertThat(value, instanceOf(Number.class));

View File

@ -183,12 +183,12 @@ public class WatchTests extends ESTestCase {
InputRegistry inputRegistry = registry(input);
ExecutableCondition condition = randomCondition();
ConditionRegistry conditionRegistry = registry(condition);
ConditionRegistry conditionRegistry = conditionRegistry();
ExecutableTransform transform = randomTransform();
ExecutableActions actions = randomActions();
ActionRegistry actionRegistry = registry(actions, transformRegistry);
ActionRegistry actionRegistry = registry(actions, conditionRegistry, transformRegistry);
Map<String, Object> metadata = singletonMap("_key", "_val");
@ -227,15 +227,14 @@ public class WatchTests extends ESTestCase {
ScheduleRegistry scheduleRegistry = registry(randomSchedule());
TriggerEngine triggerEngine = new ParseOnlyScheduleTriggerEngine(Settings.EMPTY, scheduleRegistry, clock);
TriggerService triggerService = new TriggerService(Settings.EMPTY, singleton(triggerEngine));
ExecutableCondition condition = randomCondition();
ConditionRegistry conditionRegistry = registry(condition);
ConditionRegistry conditionRegistry = conditionRegistry();
ExecutableInput input = randomInput();
InputRegistry inputRegistry = registry(input);
TransformRegistry transformRegistry = transformRegistry();
ExecutableActions actions = randomActions();
ActionRegistry actionRegistry = registry(actions, transformRegistry);
ActionRegistry actionRegistry = registry(actions,conditionRegistry, transformRegistry);
XContentBuilder jsonBuilder = XContentFactory.jsonBuilder()
@ -258,11 +257,11 @@ public class WatchTests extends ESTestCase {
TriggerEngine triggerEngine = new ParseOnlyScheduleTriggerEngine(Settings.EMPTY, scheduleRegistry, SystemClock.INSTANCE);
TriggerService triggerService = new TriggerService(Settings.EMPTY, singleton(triggerEngine));
ConditionRegistry conditionRegistry = registry(new ExecutableAlwaysCondition(logger));
ConditionRegistry conditionRegistry = conditionRegistry();
InputRegistry inputRegistry = registry(new ExecutableNoneInput(logger));
TransformRegistry transformRegistry = transformRegistry();
ExecutableActions actions = new ExecutableActions(Collections.emptyList());
ActionRegistry actionRegistry = registry(actions, transformRegistry);
ActionRegistry actionRegistry = registry(actions, conditionRegistry, transformRegistry);
XContentBuilder builder = XContentFactory.jsonBuilder();
builder.startObject();
@ -377,23 +376,14 @@ public class WatchTests extends ESTestCase {
}
}
private ConditionRegistry registry(ExecutableCondition condition) {
private ConditionRegistry conditionRegistry() {
Map<String, ConditionFactory> parsers = new HashMap<>();
switch (condition.type()) {
case ScriptCondition.TYPE:
parsers.put(ScriptCondition.TYPE, new ScriptConditionFactory(settings, scriptService));
return new ConditionRegistry(parsers);
case CompareCondition.TYPE:
parsers.put(CompareCondition.TYPE, new CompareConditionFactory(settings, SystemClock.INSTANCE));
return new ConditionRegistry(parsers);
case ArrayCompareCondition.TYPE:
parsers.put(ArrayCompareCondition.TYPE, new ArrayCompareConditionFactory(settings, SystemClock.INSTANCE));
return new ConditionRegistry(parsers);
default:
parsers.put(AlwaysCondition.TYPE, new AlwaysConditionFactory(settings));
return new ConditionRegistry(parsers);
}
}
private ExecutableTransform randomTransform() {
String type = randomFrom(ScriptTransform.TYPE, SearchTransform.TYPE, ChainTransform.TYPE);
@ -429,24 +419,22 @@ public class WatchTests extends ESTestCase {
Map<String, TransformFactory> factories = new HashMap<>();
factories.put(ScriptTransform.TYPE, new ScriptTransformFactory(settings, scriptService));
factories.put(SearchTransform.TYPE, new SearchTransformFactory(settings, client, searchParsers, scriptService));
TransformRegistry registry = new TransformRegistry(Settings.EMPTY, unmodifiableMap(factories));
return registry;
return new TransformRegistry(Settings.EMPTY, unmodifiableMap(factories));
}
private ExecutableActions randomActions() {
List<ActionWrapper> list = new ArrayList<>();
if (randomBoolean()) {
ExecutableTransform transform = randomTransform();
EmailAction action = new EmailAction(EmailTemplate.builder().build(), null, null, Profile.STANDARD,
randomFrom(DataAttachment.JSON, DataAttachment.YAML), EmailAttachments.EMPTY_ATTACHMENTS);
list.add(new ActionWrapper("_email_" + randomAsciiOfLength(8), randomThrottler(), transform,
list.add(new ActionWrapper("_email_" + randomAsciiOfLength(8), randomThrottler(), randomCondition(), randomTransform(),
new ExecutableEmailAction(action, logger, emailService, templateEngine, htmlSanitizer, Collections.emptyMap())));
}
if (randomBoolean()) {
DateTimeZone timeZone = randomBoolean() ? DateTimeZone.UTC : null;
TimeValue timeout = randomBoolean() ? TimeValue.timeValueSeconds(30) : null;
IndexAction action = new IndexAction("_index", "_type", null, timeout, timeZone);
list.add(new ActionWrapper("_index_" + randomAsciiOfLength(8), randomThrottler(), randomTransform(),
list.add(new ActionWrapper("_index_" + randomAsciiOfLength(8), randomThrottler(), randomCondition(), randomTransform(),
new ExecutableIndexAction(action, logger, client, null)));
}
if (randomBoolean()) {
@ -455,13 +443,13 @@ public class WatchTests extends ESTestCase {
.path(TextTemplate.inline("_url").build())
.build();
WebhookAction action = new WebhookAction(httpRequest);
list.add(new ActionWrapper("_webhook_" + randomAsciiOfLength(8), randomThrottler(), randomTransform(),
list.add(new ActionWrapper("_webhook_" + randomAsciiOfLength(8), randomThrottler(), randomCondition(), randomTransform(),
new ExecutableWebhookAction(action, logger, httpClient, templateEngine)));
}
return new ExecutableActions(list);
}
private ActionRegistry registry(ExecutableActions actions, TransformRegistry transformRegistry) {
private ActionRegistry registry(ExecutableActions actions, ConditionRegistry conditionRegistry, TransformRegistry transformRegistry) {
Map<String, ActionFactory> parsers = new HashMap<>();
for (ActionWrapper action : actions) {
switch (action.action().type()) {
@ -478,7 +466,7 @@ public class WatchTests extends ESTestCase {
break;
}
}
return new ActionRegistry(unmodifiableMap(parsers), transformRegistry, SystemClock.INSTANCE, licenseState);
return new ActionRegistry(unmodifiableMap(parsers), conditionRegistry, transformRegistry, SystemClock.INSTANCE, licenseState);
}
private ActionThrottler randomThrottler() {

View File

@ -0,0 +1,60 @@
---
setup:
- do:
cluster.health:
wait_for_status: yellow
---
teardown:
- do:
xpack.watcher.delete_watch:
id: "my_watch1"
ignore: 404
---
"Test put watch api with action level condition":
- do:
xpack.watcher.put_watch:
id: "my_watch1"
master_timeout: "40s"
body: >
{
"trigger": {
"schedule": {
"hourly": {
"minute": [ 0, 5 ]
}
}
},
"input": {
"simple": {
"payload": {
"value": 15
}
}
},
"condition": {
"always": {}
},
"actions": {
"test_index": {
"condition": {
"ctx.payload.value": {
"gt": 10
}
},
"index": {
"index": "test",
"doc_type": "test2"
}
}
}
}
- match: { _id: "my_watch1" }
- do:
xpack.watcher.get_watch:
id: "my_watch1"
- match: { found : true}
- match: { _id: "my_watch1" }
- match: { watch.actions.test_index.condition.ctx.payload.value.gt: 10 }