Watcher: Allow to execute actions for each element in array (#41997)
This adds the ability to execute an action for each element that occurs in an array, for example you could sent a dedicated slack action for each search hit returned from a search. There is also a limit for the number of actions executed, which is hardcoded to 100 right now, to prevent having watches run forever. The watch history logs each action result and the total number of actions the were executed. Relates #34546
This commit is contained in:
parent
2a8f30eb9a
commit
9077c4402f
|
@ -192,6 +192,49 @@ of a watch during its execution:
|
|||
image::images/action-throttling.jpg[align="center"]
|
||||
|
||||
|
||||
[[action-foreach]]
|
||||
=== Running an action for each element in an array
|
||||
|
||||
You can use the `foreach` field in an action to trigger the configured action
|
||||
for every element within that array.
|
||||
|
||||
In order to protect from long running watches, after one hundred runs with an
|
||||
foreach loop the execution is gracefully stopped.
|
||||
|
||||
[source,js]
|
||||
--------------------------------------------------
|
||||
PUT _watcher/watch/log_event_watch
|
||||
{
|
||||
"trigger" : {
|
||||
"schedule" : { "interval" : "5m" }
|
||||
},
|
||||
"input" : {
|
||||
"search" : {
|
||||
"request" : {
|
||||
"indices" : "log-events",
|
||||
"body" : {
|
||||
"query" : { "match" : { "status" : "error" } }
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
"condition" : {
|
||||
"compare" : { "ctx.payload.hits.total" : { "gt" : 0 } }
|
||||
},
|
||||
"actions" : {
|
||||
"log_hits" : {
|
||||
"foreach" : "ctx.payload.hits.hits", <1>
|
||||
"logging" : {
|
||||
"text" : "Found id {{ctx.payload._id}} with field {{ctx.payload._source.my_field}}"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
--------------------------------------------------
|
||||
// CONSOLE
|
||||
|
||||
<1> The logging statement will be executed for each of the returned search hits.
|
||||
|
||||
[[action-conditions]]
|
||||
=== Adding conditions to actions
|
||||
|
||||
|
|
|
@ -7,13 +7,17 @@ package org.elasticsearch.xpack.core.watcher.actions;
|
|||
|
||||
import org.apache.logging.log4j.message.ParameterizedMessage;
|
||||
import org.apache.logging.log4j.util.Supplier;
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.ElasticsearchParseException;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.xcontent.ObjectPath;
|
||||
import org.elasticsearch.common.xcontent.ToXContentObject;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
import org.elasticsearch.license.XPackLicenseState;
|
||||
import org.elasticsearch.script.JodaCompatibleZonedDateTime;
|
||||
import org.elasticsearch.xpack.core.watcher.actions.throttler.ActionThrottler;
|
||||
import org.elasticsearch.xpack.core.watcher.actions.throttler.Throttler;
|
||||
import org.elasticsearch.xpack.core.watcher.actions.throttler.ThrottlerField;
|
||||
|
@ -30,12 +34,22 @@ import java.io.IOException;
|
|||
import java.time.Clock;
|
||||
import java.time.ZoneOffset;
|
||||
import java.time.ZonedDateTime;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static org.elasticsearch.common.unit.TimeValue.timeValueMillis;
|
||||
|
||||
public class ActionWrapper implements ToXContentObject {
|
||||
|
||||
private final int MAXIMUM_FOREACH_RUNS = 100;
|
||||
|
||||
private String id;
|
||||
@Nullable
|
||||
private final ExecutableCondition condition;
|
||||
|
@ -43,16 +57,20 @@ public class ActionWrapper implements ToXContentObject {
|
|||
private final ExecutableTransform<Transform, Transform.Result> transform;
|
||||
private final ActionThrottler throttler;
|
||||
private final ExecutableAction<? extends Action> action;
|
||||
@Nullable
|
||||
private String path;
|
||||
|
||||
public ActionWrapper(String id, ActionThrottler throttler,
|
||||
@Nullable ExecutableCondition condition,
|
||||
@Nullable ExecutableTransform<Transform, Transform.Result> transform,
|
||||
ExecutableAction<? extends Action> action) {
|
||||
ExecutableAction<? extends Action> action,
|
||||
@Nullable String path) {
|
||||
this.id = id;
|
||||
this.condition = condition;
|
||||
this.throttler = throttler;
|
||||
this.transform = transform;
|
||||
this.action = action;
|
||||
this.path = path;
|
||||
}
|
||||
|
||||
public String id() {
|
||||
|
@ -140,16 +158,90 @@ public class ActionWrapper implements ToXContentObject {
|
|||
return new ActionWrapperResult(id, conditionResult, null, new Action.Result.FailureWithException(action.type(), e));
|
||||
}
|
||||
}
|
||||
try {
|
||||
Action.Result actionResult = action.execute(id, ctx, payload);
|
||||
return new ActionWrapperResult(id, conditionResult, transformResult, actionResult);
|
||||
} catch (Exception e) {
|
||||
action.logger().error(
|
||||
if (Strings.isEmpty(path)) {
|
||||
try {
|
||||
Action.Result actionResult = action.execute(id, ctx, payload);
|
||||
return new ActionWrapperResult(id, conditionResult, transformResult, actionResult);
|
||||
} catch (Exception e) {
|
||||
action.logger().error(
|
||||
(Supplier<?>) () -> new ParameterizedMessage("failed to execute action [{}/{}]", ctx.watch().id(), id), e);
|
||||
return new ActionWrapperResult(id, new Action.Result.FailureWithException(action.type(), e));
|
||||
}
|
||||
} else {
|
||||
try {
|
||||
List<Action.Result> results = new ArrayList<>();
|
||||
Object object = ObjectPath.eval(path, toMap(ctx));
|
||||
int runs = 0;
|
||||
if (object instanceof Collection) {
|
||||
Collection collection = Collection.class.cast(object);
|
||||
if (collection.isEmpty()) {
|
||||
throw new ElasticsearchException("foreach object [{}] was an empty list, could not run any action", path);
|
||||
} else {
|
||||
for (Object o : collection) {
|
||||
if (runs >= MAXIMUM_FOREACH_RUNS) {
|
||||
break;
|
||||
}
|
||||
if (o instanceof Map) {
|
||||
results.add(action.execute(id, ctx, new Payload.Simple((Map<String, Object>) o)));
|
||||
} else {
|
||||
results.add(action.execute(id, ctx, new Payload.Simple("_value", o)));
|
||||
}
|
||||
runs++;
|
||||
}
|
||||
}
|
||||
} else if (object == null) {
|
||||
throw new ElasticsearchException("specified foreach object was null: [{}]", path);
|
||||
} else {
|
||||
throw new ElasticsearchException("specified foreach object was not a an array/collection: [{}]", path);
|
||||
}
|
||||
|
||||
// check if we have mixed results, then set to partial failure
|
||||
final Set<Action.Result.Status> statuses = results.stream().map(Action.Result::status).collect(Collectors.toSet());
|
||||
Action.Result.Status status;
|
||||
if (statuses.size() == 1) {
|
||||
status = statuses.iterator().next();
|
||||
} else {
|
||||
status = Action.Result.Status.PARTIAL_FAILURE;
|
||||
}
|
||||
|
||||
final int numberOfActionsExecuted = runs;
|
||||
return new ActionWrapperResult(id, conditionResult, transformResult,
|
||||
new Action.Result(action.type(), status) {
|
||||
@Override
|
||||
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
builder.field("number_of_actions_executed", numberOfActionsExecuted);
|
||||
builder.startArray(WatchField.FOREACH.getPreferredName());
|
||||
for (Action.Result result : results) {
|
||||
builder.startObject();
|
||||
result.toXContent(builder, params);
|
||||
builder.endObject();
|
||||
}
|
||||
builder.endArray();
|
||||
return builder;
|
||||
}
|
||||
});
|
||||
} catch (Exception e) {
|
||||
action.logger().error(
|
||||
(Supplier<?>) () -> new ParameterizedMessage("failed to execute action [{}/{}]", ctx.watch().id(), id), e);
|
||||
return new ActionWrapperResult(id, new Action.Result.FailureWithException(action.type(), e));
|
||||
return new ActionWrapperResult(id, new Action.Result.FailureWithException(action.type(), e));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private Map<String, Object> toMap(WatchExecutionContext ctx) {
|
||||
Map<String, Object> model = new HashMap<>();
|
||||
model.put("id", ctx.id().value());
|
||||
model.put("watch_id", ctx.id().watchId());
|
||||
model.put("execution_time", new JodaCompatibleZonedDateTime(ctx.executionTime().toInstant(), ZoneOffset.UTC));
|
||||
model.put("trigger", ctx.triggerEvent().data());
|
||||
model.put("metadata", ctx.watch().metadata());
|
||||
model.put("vars", ctx.vars());
|
||||
if (ctx.payload().data() != null) {
|
||||
model.put("payload", ctx.payload().data());
|
||||
}
|
||||
return Collections.singletonMap("ctx", model);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (this == o) return true;
|
||||
|
@ -186,6 +278,9 @@ public class ActionWrapper implements ToXContentObject {
|
|||
.field(transform.type(), transform, params)
|
||||
.endObject();
|
||||
}
|
||||
if (Strings.isEmpty(path) == false) {
|
||||
builder.field(WatchField.FOREACH.getPreferredName(), path);
|
||||
}
|
||||
builder.field(action.type(), action, params);
|
||||
return builder.endObject();
|
||||
}
|
||||
|
@ -198,6 +293,7 @@ public class ActionWrapper implements ToXContentObject {
|
|||
ExecutableCondition condition = null;
|
||||
ExecutableTransform<Transform, Transform.Result> transform = null;
|
||||
TimeValue throttlePeriod = null;
|
||||
String path = null;
|
||||
ExecutableAction<? extends Action> action = null;
|
||||
|
||||
String currentFieldName = null;
|
||||
|
@ -208,6 +304,8 @@ public class ActionWrapper implements ToXContentObject {
|
|||
} else {
|
||||
if (WatchField.CONDITION.match(currentFieldName, parser.getDeprecationHandler())) {
|
||||
condition = actionRegistry.getConditionRegistry().parseExecutable(watchId, parser);
|
||||
} else if (WatchField.FOREACH.match(currentFieldName, parser.getDeprecationHandler())) {
|
||||
path = parser.text();
|
||||
} else if (Transform.TRANSFORM.match(currentFieldName, parser.getDeprecationHandler())) {
|
||||
transform = actionRegistry.getTransformRegistry().parse(watchId, parser);
|
||||
} else if (ThrottlerField.THROTTLE_PERIOD.match(currentFieldName, parser.getDeprecationHandler())) {
|
||||
|
@ -235,7 +333,7 @@ public class ActionWrapper implements ToXContentObject {
|
|||
}
|
||||
|
||||
ActionThrottler throttler = new ActionThrottler(clock, throttlePeriod, licenseState);
|
||||
return new ActionWrapper(actionId, throttler, condition, transform, action);
|
||||
return new ActionWrapper(actionId, throttler, condition, transform, action, path);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -101,7 +101,7 @@ public class WatchSourceBuilder implements ToXContentObject {
|
|||
}
|
||||
|
||||
public WatchSourceBuilder addAction(String id, TimeValue throttlePeriod, Transform transform, Action action) {
|
||||
actions.put(id, new TransformedAction(id, action, throttlePeriod, null, transform));
|
||||
actions.put(id, new TransformedAction(id, action, throttlePeriod, null, transform, null));
|
||||
return this;
|
||||
}
|
||||
|
||||
|
@ -111,7 +111,13 @@ public class WatchSourceBuilder implements ToXContentObject {
|
|||
}
|
||||
|
||||
public WatchSourceBuilder addAction(String id, TimeValue throttlePeriod, Condition condition, Transform transform, Action action) {
|
||||
actions.put(id, new TransformedAction(id, action, throttlePeriod, condition, transform));
|
||||
actions.put(id, new TransformedAction(id, action, throttlePeriod, condition, transform, null));
|
||||
return this;
|
||||
}
|
||||
|
||||
public WatchSourceBuilder addAction(String id, TimeValue throttlePeriod, Condition condition, Transform transform, String path,
|
||||
Action action) {
|
||||
actions.put(id, new TransformedAction(id, action, throttlePeriod, condition, transform, path));
|
||||
return this;
|
||||
}
|
||||
|
||||
|
@ -186,16 +192,18 @@ public class WatchSourceBuilder implements ToXContentObject {
|
|||
static class TransformedAction implements ToXContentObject {
|
||||
|
||||
private final Action action;
|
||||
@Nullable private String path;
|
||||
@Nullable private final TimeValue throttlePeriod;
|
||||
@Nullable private final Condition condition;
|
||||
@Nullable private final Transform transform;
|
||||
|
||||
TransformedAction(String id, Action action, @Nullable TimeValue throttlePeriod,
|
||||
@Nullable Condition condition, @Nullable Transform transform) {
|
||||
@Nullable Condition condition, @Nullable Transform transform, @Nullable String path) {
|
||||
this.throttlePeriod = throttlePeriod;
|
||||
this.condition = condition;
|
||||
this.transform = transform;
|
||||
this.action = action;
|
||||
this.path = path;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -215,6 +223,9 @@ public class WatchSourceBuilder implements ToXContentObject {
|
|||
.field(transform.type(), transform, params)
|
||||
.endObject();
|
||||
}
|
||||
if (path != null) {
|
||||
builder.field("foreach", path);
|
||||
}
|
||||
builder.field(action.type(), action, params);
|
||||
return builder.endObject();
|
||||
}
|
||||
|
|
|
@ -14,8 +14,9 @@ public final class WatcherIndexTemplateRegistryField {
|
|||
// version 7: add full exception stack traces for better debugging
|
||||
// version 8: fix slack attachment property not to be dynamic, causing field type issues
|
||||
// version 9: add a user field defining which user executed the watch
|
||||
// version 10: add support for foreach path in actions
|
||||
// Note: if you change this, also inform the kibana team around the watcher-ui
|
||||
public static final String INDEX_TEMPLATE_VERSION = "9";
|
||||
public static final String INDEX_TEMPLATE_VERSION = "10";
|
||||
public static final String HISTORY_TEMPLATE_NAME = ".watch-history-" + INDEX_TEMPLATE_VERSION;
|
||||
public static final String HISTORY_TEMPLATE_NAME_NO_ILM = ".watch-history-no-ilm-" + INDEX_TEMPLATE_VERSION;
|
||||
public static final String TRIGGERED_TEMPLATE_NAME = ".triggered_watches";
|
||||
|
|
|
@ -13,6 +13,7 @@ public final class WatchField {
|
|||
public static final ParseField CONDITION = new ParseField("condition");
|
||||
public static final ParseField ACTIONS = new ParseField("actions");
|
||||
public static final ParseField TRANSFORM = new ParseField("transform");
|
||||
public static final ParseField FOREACH = new ParseField("foreach");
|
||||
public static final ParseField THROTTLE_PERIOD = new ParseField("throttle_period_in_millis");
|
||||
public static final ParseField THROTTLE_PERIOD_HUMAN = new ParseField("throttle_period");
|
||||
public static final ParseField METADATA = new ParseField("metadata");
|
||||
|
|
|
@ -264,6 +264,13 @@
|
|||
"reason" : {
|
||||
"type" : "keyword"
|
||||
},
|
||||
"number_of_actions_executed": {
|
||||
"type": "integer"
|
||||
},
|
||||
"foreach" : {
|
||||
"type": "object",
|
||||
"enabled" : false
|
||||
},
|
||||
"email": {
|
||||
"type": "object",
|
||||
"dynamic": true,
|
||||
|
|
|
@ -0,0 +1,111 @@
|
|||
---
|
||||
setup:
|
||||
- do:
|
||||
cluster.health:
|
||||
wait_for_status: yellow
|
||||
|
||||
---
|
||||
"Test execute watch api with foreach action":
|
||||
- do:
|
||||
watcher.execute_watch:
|
||||
body: >
|
||||
{
|
||||
"watch" : {
|
||||
"trigger": {
|
||||
"schedule" : { "cron" : "0 0 0 1 * ? 2099" }
|
||||
},
|
||||
"input": {
|
||||
"simple" : {
|
||||
"hits" : {
|
||||
"hits" : [
|
||||
{ "key" : "first" },
|
||||
{ "key" : "second" },
|
||||
{ "key" : "third" }
|
||||
]
|
||||
}
|
||||
}
|
||||
},
|
||||
"actions": {
|
||||
"log_hits" : {
|
||||
"foreach" : "ctx.payload.hits.hits",
|
||||
"logging" : {
|
||||
"text" : "Logging {{ctx.payload.key}}"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
- match: { watch_record.trigger_event.type: "manual" }
|
||||
- match: { watch_record.state: "executed" }
|
||||
- match: { watch_record.status.execution_state: "executed" }
|
||||
- match: { watch_record.result.actions.0.foreach.0.logging.logged_text: "Logging first" }
|
||||
- match: { watch_record.result.actions.0.foreach.1.logging.logged_text: "Logging second" }
|
||||
- match: { watch_record.result.actions.0.foreach.2.logging.logged_text: "Logging third" }
|
||||
|
||||
|
||||
---
|
||||
"Test execute watch api with foreach action using an array":
|
||||
- do:
|
||||
watcher.execute_watch:
|
||||
body: >
|
||||
{
|
||||
"watch" : {
|
||||
"trigger": {
|
||||
"schedule" : { "cron" : "0 0 0 1 * ? 2099" }
|
||||
},
|
||||
"input": {
|
||||
"simple" : {
|
||||
"values" : [1, 2, 3]
|
||||
}
|
||||
},
|
||||
"actions": {
|
||||
"log_hits" : {
|
||||
"foreach" : "ctx.payload.values",
|
||||
"logging" : {
|
||||
"text" : "Logging {{ctx.payload._value}}"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
- match: { watch_record.trigger_event.type: "manual" }
|
||||
- match: { watch_record.state: "executed" }
|
||||
- match: { watch_record.status.execution_state: "executed" }
|
||||
- match: { watch_record.result.actions.0.foreach.0.logging.logged_text: "Logging 1" }
|
||||
- match: { watch_record.result.actions.0.foreach.1.logging.logged_text: "Logging 2" }
|
||||
- match: { watch_record.result.actions.0.foreach.2.logging.logged_text: "Logging 3" }
|
||||
|
||||
---
|
||||
"Test execute watch api with foreach action using an array of arrays":
|
||||
- do:
|
||||
watcher.execute_watch:
|
||||
body: >
|
||||
{
|
||||
"watch" : {
|
||||
"trigger": {
|
||||
"schedule" : { "cron" : "0 0 0 1 * ? 2099" }
|
||||
},
|
||||
"input": {
|
||||
"simple" : {
|
||||
"values" : [[1, 2], [2, 3], [3, 4]]
|
||||
}
|
||||
},
|
||||
"actions": {
|
||||
"log_hits" : {
|
||||
"foreach" : "ctx.payload.values",
|
||||
"logging" : {
|
||||
"text" : "Logging {{ctx.payload._value.1}}"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
- match: { watch_record.trigger_event.type: "manual" }
|
||||
- match: { watch_record.state: "executed" }
|
||||
- match: { watch_record.status.execution_state: "executed" }
|
||||
- match: { watch_record.result.actions.0.foreach.0.logging.logged_text: "Logging 2" }
|
||||
- match: { watch_record.result.actions.0.foreach.1.logging.logged_text: "Logging 3" }
|
||||
- match: { watch_record.result.actions.0.foreach.2.logging.logged_text: "Logging 4" }
|
|
@ -5,6 +5,11 @@
|
|||
*/
|
||||
package org.elasticsearch.xpack.watcher.actions;
|
||||
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.xcontent.ToXContent;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentHelper;
|
||||
import org.elasticsearch.common.xcontent.json.JsonXContent;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.xpack.core.watcher.actions.Action;
|
||||
import org.elasticsearch.xpack.core.watcher.actions.ActionStatus;
|
||||
|
@ -13,16 +18,31 @@ import org.elasticsearch.xpack.core.watcher.actions.ActionWrapper;
|
|||
import org.elasticsearch.xpack.core.watcher.actions.ActionWrapperResult;
|
||||
import org.elasticsearch.xpack.core.watcher.actions.ExecutableAction;
|
||||
import org.elasticsearch.xpack.core.watcher.execution.WatchExecutionContext;
|
||||
import org.elasticsearch.xpack.core.watcher.execution.Wid;
|
||||
import org.elasticsearch.xpack.core.watcher.trigger.TriggerEvent;
|
||||
import org.elasticsearch.xpack.core.watcher.watch.Payload;
|
||||
import org.elasticsearch.xpack.core.watcher.watch.Watch;
|
||||
import org.elasticsearch.xpack.core.watcher.watch.WatchStatus;
|
||||
import org.elasticsearch.xpack.watcher.actions.logging.ExecutableLoggingAction;
|
||||
import org.elasticsearch.xpack.watcher.actions.logging.LoggingAction;
|
||||
import org.elasticsearch.xpack.watcher.common.text.TextTemplate;
|
||||
import org.elasticsearch.xpack.watcher.condition.InternalAlwaysCondition;
|
||||
import org.elasticsearch.xpack.watcher.condition.NeverCondition;
|
||||
import org.elasticsearch.xpack.watcher.test.MockTextTemplateEngine;
|
||||
import org.elasticsearch.xpack.watcher.trigger.schedule.ScheduleTriggerEvent;
|
||||
|
||||
import java.time.ZoneOffset;
|
||||
import java.time.ZonedDateTime;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
|
||||
import static org.hamcrest.Matchers.hasKey;
|
||||
import static org.hamcrest.Matchers.hasSize;
|
||||
import static org.hamcrest.Matchers.instanceOf;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.mockito.Matchers.eq;
|
||||
import static org.mockito.Mockito.mock;
|
||||
|
@ -34,7 +54,7 @@ public class ActionWrapperTests extends ESTestCase {
|
|||
private Watch watch = mock(Watch.class);
|
||||
@SuppressWarnings("unchecked")
|
||||
private ExecutableAction<Action> executableAction = mock(ExecutableAction.class);
|
||||
private ActionWrapper actionWrapper = new ActionWrapper("_action", null, NeverCondition.INSTANCE, null, executableAction);
|
||||
private ActionWrapper actionWrapper = new ActionWrapper("_action", null, NeverCondition.INSTANCE, null, executableAction, null);
|
||||
|
||||
public void testThatUnmetActionConditionResetsAckStatus() throws Exception {
|
||||
WatchStatus watchStatus = new WatchStatus(now, Collections.singletonMap("_action", createActionStatus(State.ACKED)));
|
||||
|
@ -59,10 +79,135 @@ public class ActionWrapperTests extends ESTestCase {
|
|||
assertThat(watch.status().actionStatus("other").ackStatus().state(), is(otherState));
|
||||
}
|
||||
|
||||
public void testThatMultipleResultsCanBeReturned() throws Exception {
|
||||
final LoggingAction loggingAction = new LoggingAction(new TextTemplate("{{key}}"), null, null);
|
||||
final ExecutableAction<LoggingAction> executableAction =
|
||||
new ExecutableLoggingAction(loggingAction, logger, new MockTextTemplateEngine());
|
||||
ActionWrapper wrapper = new ActionWrapper("_action", null, InternalAlwaysCondition.INSTANCE, null, executableAction,
|
||||
"ctx.payload.my_path");
|
||||
|
||||
WatchExecutionContext ctx = mockExecutionContent(watch);
|
||||
|
||||
List<Map<String, String>> payloads = new ArrayList<>();
|
||||
payloads.add(Collections.singletonMap("key", "first"));
|
||||
payloads.add(Collections.singletonMap("key", "second"));
|
||||
payloads.add(Collections.singletonMap("key", "third"));
|
||||
Payload.Simple payload = new Payload.Simple(Collections.singletonMap("my_path", payloads));
|
||||
when(ctx.payload()).thenReturn(payload);
|
||||
|
||||
ActionWrapperResult result = wrapper.execute(ctx);
|
||||
assertThat(result.action().status(), is(Action.Result.Status.SUCCESS));
|
||||
// check that action toXContent contains all the results
|
||||
try (XContentBuilder builder = jsonBuilder()) {
|
||||
result.toXContent(builder, ToXContent.EMPTY_PARAMS);
|
||||
final String json = Strings.toString(builder);
|
||||
final Map<String, Object> map = XContentHelper.convertToMap(JsonXContent.jsonXContent, json, true);
|
||||
assertThat(map, hasKey("foreach"));
|
||||
assertThat(map.get("foreach"), instanceOf(List.class));
|
||||
List<Map<String, Object>> actions = (List) map.get("foreach");
|
||||
assertThat(actions, hasSize(3));
|
||||
}
|
||||
}
|
||||
|
||||
public void testThatSpecifiedPathIsNotCollection() {
|
||||
ActionWrapper wrapper = new ActionWrapper("_action", null, InternalAlwaysCondition.INSTANCE, null, executableAction,
|
||||
"ctx.payload.my_path");
|
||||
WatchExecutionContext ctx = mockExecutionContent(watch);
|
||||
Payload.Simple payload = new Payload.Simple(Collections.singletonMap("my_path", "not a map"));
|
||||
when(ctx.payload()).thenReturn(payload);
|
||||
when(executableAction.logger()).thenReturn(logger);
|
||||
|
||||
ActionWrapperResult result = wrapper.execute(ctx);
|
||||
assertThat(result.action().status(), is(Action.Result.Status.FAILURE));
|
||||
assertThat(result.action(), instanceOf(Action.Result.FailureWithException.class));
|
||||
Action.Result.FailureWithException failureWithException = (Action.Result.FailureWithException) result.action();
|
||||
assertThat(failureWithException.getException().getMessage(),
|
||||
is("specified foreach object was not a an array/collection: [ctx.payload.my_path]"));
|
||||
}
|
||||
|
||||
public void testEmptyCollection() {
|
||||
ActionWrapper wrapper = new ActionWrapper("_action", null, InternalAlwaysCondition.INSTANCE, null, executableAction,
|
||||
"ctx.payload.my_path");
|
||||
WatchExecutionContext ctx = mockExecutionContent(watch);
|
||||
Payload.Simple payload = new Payload.Simple(Collections.singletonMap("my_path", Collections.emptyList()));
|
||||
when(ctx.payload()).thenReturn(payload);
|
||||
when(executableAction.logger()).thenReturn(logger);
|
||||
|
||||
ActionWrapperResult result = wrapper.execute(ctx);
|
||||
assertThat(result.action().status(), is(Action.Result.Status.FAILURE));
|
||||
assertThat(result.action(), instanceOf(Action.Result.FailureWithException.class));
|
||||
Action.Result.FailureWithException failureWithException = (Action.Result.FailureWithException) result.action();
|
||||
assertThat(failureWithException.getException().getMessage(),
|
||||
is("foreach object [ctx.payload.my_path] was an empty list, could not run any action"));
|
||||
}
|
||||
|
||||
public void testPartialFailure() throws Exception {
|
||||
ActionWrapper wrapper = new ActionWrapper("_action", null, InternalAlwaysCondition.INSTANCE, null, executableAction,
|
||||
"ctx.payload.my_path");
|
||||
WatchExecutionContext ctx = mockExecutionContent(watch);
|
||||
|
||||
List<Map<String, String>> payloads = new ArrayList<>();
|
||||
payloads.add(Collections.singletonMap("key", "first"));
|
||||
payloads.add(Collections.singletonMap("key", "second"));
|
||||
Payload.Simple payload = new Payload.Simple(Collections.singletonMap("my_path", payloads));
|
||||
when(ctx.payload()).thenReturn(payload);
|
||||
when(executableAction.logger()).thenReturn(logger);
|
||||
|
||||
final Action.Result firstResult = new LoggingAction.Result.Success("log_message");;
|
||||
final Payload firstPayload = new Payload.Simple(Collections.singletonMap("key", "first"));
|
||||
when(executableAction.execute(eq("_action"), eq(ctx), eq(firstPayload))).thenReturn(firstResult);
|
||||
|
||||
final Action.Result secondResult = new Action.Result.Failure("MY_TYPE", "second reason");
|
||||
final Payload secondPayload = new Payload.Simple(Collections.singletonMap("key", "second"));
|
||||
when(executableAction.execute(eq("_action"), eq(ctx), eq(secondPayload))).thenReturn(secondResult);
|
||||
|
||||
ActionWrapperResult result = wrapper.execute(ctx);
|
||||
assertThat(result.action().status(), is(Action.Result.Status.PARTIAL_FAILURE));
|
||||
}
|
||||
|
||||
public void testLimitOfNumberOfActionsExecuted() throws Exception {
|
||||
ActionWrapper wrapper = new ActionWrapper("_action", null, InternalAlwaysCondition.INSTANCE, null, executableAction,
|
||||
"ctx.payload.my_path");
|
||||
WatchExecutionContext ctx = mockExecutionContent(watch);
|
||||
List<Map<String, String>> itemsPayload = new ArrayList<>();
|
||||
for (int i = 0; i < 101; i++) {
|
||||
final Action.Result actionResult = new LoggingAction.Result.Success("log_message " + i);;
|
||||
final Payload singleItemPayload = new Payload.Simple(Collections.singletonMap("key", String.valueOf(i)));
|
||||
itemsPayload.add(Collections.singletonMap("key", String.valueOf(i)));
|
||||
when(executableAction.execute(eq("_action"), eq(ctx), eq(singleItemPayload))).thenReturn(actionResult);
|
||||
}
|
||||
|
||||
Payload.Simple payload = new Payload.Simple(Collections.singletonMap("my_path", itemsPayload));
|
||||
when(ctx.payload()).thenReturn(payload);
|
||||
when(executableAction.logger()).thenReturn(logger);
|
||||
|
||||
ActionWrapperResult result = wrapper.execute(ctx);
|
||||
assertThat(result.action().status(), is(Action.Result.Status.SUCCESS));
|
||||
|
||||
// check that action toXContent contains all the results
|
||||
try (XContentBuilder builder = jsonBuilder()) {
|
||||
result.toXContent(builder, ToXContent.EMPTY_PARAMS);
|
||||
final String json = Strings.toString(builder);
|
||||
final Map<String, Object> map = XContentHelper.convertToMap(JsonXContent.jsonXContent, json, true);
|
||||
assertThat(map, hasKey("foreach"));
|
||||
assertThat(map.get("foreach"), instanceOf(List.class));
|
||||
List<Map<String, Object>> actions = (List) map.get("foreach");
|
||||
assertThat(actions, hasSize(100));
|
||||
assertThat(map, hasKey("number_of_actions_executed"));
|
||||
assertThat(map.get("number_of_actions_executed"), is(100));
|
||||
}
|
||||
}
|
||||
|
||||
private WatchExecutionContext mockExecutionContent(Watch watch) {
|
||||
WatchExecutionContext ctx = mock(WatchExecutionContext.class);
|
||||
when(watch.id()).thenReturn("watchId");
|
||||
when(ctx.watch()).thenReturn(watch);
|
||||
final ZonedDateTime now = ZonedDateTime.now(ZoneOffset.UTC);
|
||||
final Wid wid = new Wid("watchId", now);
|
||||
when(ctx.id()).thenReturn(wid);
|
||||
when(ctx.executionTime()).thenReturn(now);
|
||||
final TriggerEvent triggerEvent = new ScheduleTriggerEvent("watchId", now, now);
|
||||
when(ctx.triggerEvent()).thenReturn(triggerEvent);
|
||||
when(ctx.skipThrottling(eq("_action"))).thenReturn(true);
|
||||
return ctx;
|
||||
}
|
||||
|
|
|
@ -227,7 +227,7 @@ public class ExecutionServiceTests extends ESTestCase {
|
|||
when(action.type()).thenReturn("MY_AWESOME_TYPE");
|
||||
when(action.execute("_action", context, payload)).thenReturn(actionResult);
|
||||
|
||||
ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionCondition, actionTransform, action);
|
||||
ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionCondition, actionTransform, action, null);
|
||||
|
||||
WatchStatus watchStatus = new WatchStatus(now, singletonMap("_action", new ActionStatus(now)));
|
||||
|
||||
|
@ -313,7 +313,7 @@ public class ExecutionServiceTests extends ESTestCase {
|
|||
ExecutableAction action = mock(ExecutableAction.class);
|
||||
when(action.execute("_action", context, payload)).thenReturn(actionResult);
|
||||
|
||||
ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionCondition, actionTransform, action);
|
||||
ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionCondition, actionTransform, action, null);
|
||||
WatchStatus watchStatus = new WatchStatus(now, singletonMap("_action", new ActionStatus(now)));
|
||||
|
||||
when(watch.input()).thenReturn(input);
|
||||
|
@ -378,7 +378,7 @@ public class ExecutionServiceTests extends ESTestCase {
|
|||
ExecutableAction action = mock(ExecutableAction.class);
|
||||
when(action.execute("_action", context, payload)).thenReturn(actionResult);
|
||||
|
||||
ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionCondition, actionTransform, action);
|
||||
ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionCondition, actionTransform, action, null);
|
||||
WatchStatus watchStatus = new WatchStatus(now, singletonMap("_action", new ActionStatus(now)));
|
||||
|
||||
when(watch.input()).thenReturn(input);
|
||||
|
@ -442,7 +442,7 @@ public class ExecutionServiceTests extends ESTestCase {
|
|||
ExecutableAction action = mock(ExecutableAction.class);
|
||||
when(action.execute("_action", context, payload)).thenReturn(actionResult);
|
||||
|
||||
ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionCondition, actionTransform, action);
|
||||
ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionCondition, actionTransform, action, null);
|
||||
WatchStatus watchStatus = new WatchStatus(now, singletonMap("_action", new ActionStatus(now)));
|
||||
|
||||
when(watch.input()).thenReturn(input);
|
||||
|
@ -520,7 +520,7 @@ public class ExecutionServiceTests extends ESTestCase {
|
|||
when(action.logger()).thenReturn(logger);
|
||||
when(action.execute("_action", context, payload)).thenReturn(actionResult);
|
||||
|
||||
ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionCondition, actionTransform, action);
|
||||
ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionCondition, actionTransform, action, null);
|
||||
|
||||
WatchStatus watchStatus = new WatchStatus(now, singletonMap("_action", new ActionStatus(now)));
|
||||
|
||||
|
@ -600,7 +600,7 @@ public class ExecutionServiceTests extends ESTestCase {
|
|||
ExecutableAction action = mock(ExecutableAction.class);
|
||||
when(action.execute("_action", context, payload)).thenReturn(actionResult);
|
||||
|
||||
ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionCondition, actionTransform, action);
|
||||
ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionCondition, actionTransform, action, null);
|
||||
ZonedDateTime time = clock.instant().atZone(ZoneOffset.UTC);
|
||||
WatchStatus watchStatus = new WatchStatus(time, singletonMap("_action", new ActionStatus(now)));
|
||||
|
||||
|
@ -649,7 +649,7 @@ public class ExecutionServiceTests extends ESTestCase {
|
|||
|
||||
ExecutableAction action = mock(ExecutableAction.class);
|
||||
when(action.type()).thenReturn("_type");
|
||||
ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionCondition, actionTransform, action);
|
||||
ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionCondition, actionTransform, action, null);
|
||||
ZonedDateTime time = clock.instant().atZone(ZoneOffset.UTC);
|
||||
WatchStatus watchStatus = new WatchStatus(time, singletonMap("_action", new ActionStatus(now)));
|
||||
|
||||
|
@ -712,7 +712,7 @@ public class ExecutionServiceTests extends ESTestCase {
|
|||
|
||||
ExecutableAction action = mock(ExecutableAction.class);
|
||||
when(action.type()).thenReturn("_type");
|
||||
ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionCondition, actionTransform, action);
|
||||
ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionCondition, actionTransform, action, null);
|
||||
ZonedDateTime time = clock.instant().atZone(ZoneOffset.UTC);
|
||||
WatchStatus watchStatus = new WatchStatus(time, singletonMap("_action", new ActionStatus(now)));
|
||||
|
||||
|
@ -769,7 +769,7 @@ public class ExecutionServiceTests extends ESTestCase {
|
|||
ExecutableAction action = mock(ExecutableAction.class);
|
||||
when(action.type()).thenReturn("_type");
|
||||
when(action.logger()).thenReturn(logger);
|
||||
ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionCondition, actionTransform, action);
|
||||
ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionCondition, actionTransform, action, null);
|
||||
ZonedDateTime time = clock.instant().atZone(ZoneOffset.UTC);
|
||||
WatchStatus watchStatus = new WatchStatus(time, singletonMap("_action", new ActionStatus(now)));
|
||||
|
||||
|
@ -817,7 +817,7 @@ public class ExecutionServiceTests extends ESTestCase {
|
|||
ExecutableCondition actionCondition = mock(ExecutableCondition.class);
|
||||
ExecutableTransform actionTransform = mock(ExecutableTransform.class);
|
||||
ExecutableAction action = mock(ExecutableAction.class);
|
||||
ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionCondition, actionTransform, action);
|
||||
ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionCondition, actionTransform, action, null);
|
||||
|
||||
ZonedDateTime time = clock.instant().atZone(ZoneOffset.UTC);
|
||||
WatchStatus watchStatus = new WatchStatus(time, singletonMap("_action", new ActionStatus(now)));
|
||||
|
@ -946,7 +946,7 @@ public class ExecutionServiceTests extends ESTestCase {
|
|||
when(action.type()).thenReturn("MY_AWESOME_TYPE");
|
||||
when(action.execute("_action", context, payload)).thenReturn(actionResult);
|
||||
|
||||
ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, null, null, action);
|
||||
ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, null, null, action, null);
|
||||
|
||||
WatchStatus watchStatus = new WatchStatus(now, singletonMap("_action", new ActionStatus(now)));
|
||||
|
||||
|
|
|
@ -158,7 +158,7 @@ public final class WatcherTestUtils {
|
|||
httpRequest.path(new TextTemplate("/foobarbaz/{{ctx.watch_id}}"));
|
||||
httpRequest.body(new TextTemplate("{{ctx.watch_id}} executed with {{ctx.payload.response.hits.total_hits}} hits"));
|
||||
actions.add(new ActionWrapper("_webhook", null, null, null, new ExecutableWebhookAction(new WebhookAction(httpRequest.build()),
|
||||
logger, httpClient, engine)));
|
||||
logger, httpClient, engine), null));
|
||||
|
||||
|
||||
EmailTemplate email = EmailTemplate.builder().from("from@test.com").to("to@test.com").build();
|
||||
|
@ -166,7 +166,7 @@ public final class WatcherTestUtils {
|
|||
EmailAction action = new EmailAction(email, "testaccount", auth, Profile.STANDARD, null, null);
|
||||
ExecutableEmailAction executale = new ExecutableEmailAction(action, logger, emailService, engine,
|
||||
new HtmlSanitizer(Settings.EMPTY), Collections.emptyMap());
|
||||
actions.add(new ActionWrapper("_email", null, null, null, executale));
|
||||
actions.add(new ActionWrapper("_email", null, null, null, executale, null));
|
||||
|
||||
ZonedDateTime now = ZonedDateTime.now(ZoneOffset.UTC);
|
||||
Map<String, ActionStatus> statuses = new HashMap<>();
|
||||
|
|
|
@ -442,7 +442,7 @@ public class WatchTests extends ESTestCase {
|
|||
private WatchParser createWatchparser() throws Exception {
|
||||
LoggingAction loggingAction = new LoggingAction(new TextTemplate("foo"), null, null);
|
||||
List<ActionWrapper> actions = Collections.singletonList(new ActionWrapper("_logging_", randomThrottler(), null, null,
|
||||
new ExecutableLoggingAction(loggingAction, logger, new MockTextTemplateEngine())));
|
||||
new ExecutableLoggingAction(loggingAction, logger, new MockTextTemplateEngine()), null));
|
||||
|
||||
ScheduleRegistry scheduleRegistry = registry(new IntervalSchedule(new IntervalSchedule.Interval(1,
|
||||
IntervalSchedule.Interval.Unit.SECONDS)));
|
||||
|
@ -585,7 +585,7 @@ public class WatchTests extends ESTestCase {
|
|||
randomFrom(DataAttachment.JSON, DataAttachment.YAML), EmailAttachments.EMPTY_ATTACHMENTS);
|
||||
list.add(new ActionWrapper("_email_" + randomAlphaOfLength(8), randomThrottler(),
|
||||
AlwaysConditionTests.randomCondition(scriptService), randomTransform(),
|
||||
new ExecutableEmailAction(action, logger, emailService, templateEngine, htmlSanitizer, Collections.emptyMap())));
|
||||
new ExecutableEmailAction(action, logger, emailService, templateEngine, htmlSanitizer, Collections.emptyMap()), null));
|
||||
}
|
||||
if (randomBoolean()) {
|
||||
ZoneOffset timeZone = randomBoolean() ? ZoneOffset.UTC : null;
|
||||
|
@ -596,7 +596,7 @@ public class WatchTests extends ESTestCase {
|
|||
list.add(new ActionWrapper("_index_" + randomAlphaOfLength(8), randomThrottler(),
|
||||
AlwaysConditionTests.randomCondition(scriptService), randomTransform(),
|
||||
new ExecutableIndexAction(action, logger, client, TimeValue.timeValueSeconds(30),
|
||||
TimeValue.timeValueSeconds(30))));
|
||||
TimeValue.timeValueSeconds(30)), null));
|
||||
}
|
||||
if (randomBoolean()) {
|
||||
HttpRequestTemplate httpRequest = HttpRequestTemplate.builder("test.host", randomIntBetween(8000, 9000))
|
||||
|
@ -606,7 +606,7 @@ public class WatchTests extends ESTestCase {
|
|||
WebhookAction action = new WebhookAction(httpRequest);
|
||||
list.add(new ActionWrapper("_webhook_" + randomAlphaOfLength(8), randomThrottler(),
|
||||
AlwaysConditionTests.randomCondition(scriptService), randomTransform(),
|
||||
new ExecutableWebhookAction(action, logger, httpClient, templateEngine)));
|
||||
new ExecutableWebhookAction(action, logger, httpClient, templateEngine), null));
|
||||
}
|
||||
return list;
|
||||
}
|
||||
|
|
|
@ -12,7 +12,7 @@ import java.util.List;
|
|||
public final class XPackRestTestConstants {
|
||||
|
||||
// Watcher constants:
|
||||
public static final String INDEX_TEMPLATE_VERSION = "9";
|
||||
public static final String INDEX_TEMPLATE_VERSION = "10";
|
||||
public static final String HISTORY_TEMPLATE_NAME_NO_ILM = ".watch-history-no-ilm-" + INDEX_TEMPLATE_VERSION;
|
||||
public static final String TRIGGERED_TEMPLATE_NAME = ".triggered_watches";
|
||||
public static final String WATCHES_TEMPLATE_NAME = ".watches";
|
||||
|
|
Loading…
Reference in New Issue