diff --git a/x-pack/docs/en/watcher/actions.asciidoc b/x-pack/docs/en/watcher/actions.asciidoc index 34bdd2513a4..7f5cd3218b5 100644 --- a/x-pack/docs/en/watcher/actions.asciidoc +++ b/x-pack/docs/en/watcher/actions.asciidoc @@ -192,6 +192,49 @@ of a watch during its execution: image::images/action-throttling.jpg[align="center"] +[[action-foreach]] +=== Running an action for each element in an array + +You can use the `foreach` field in an action to trigger the configured action +for every element within that array. + +In order to protect from long running watches, after one hundred runs with an +foreach loop the execution is gracefully stopped. + +[source,js] +-------------------------------------------------- +PUT _watcher/watch/log_event_watch +{ + "trigger" : { + "schedule" : { "interval" : "5m" } + }, + "input" : { + "search" : { + "request" : { + "indices" : "log-events", + "body" : { + "query" : { "match" : { "status" : "error" } } + } + } + } + }, + "condition" : { + "compare" : { "ctx.payload.hits.total" : { "gt" : 0 } } + }, + "actions" : { + "log_hits" : { + "foreach" : "ctx.payload.hits.hits", <1> + "logging" : { + "text" : "Found id {{ctx.payload._id}} with field {{ctx.payload._source.my_field}}" + } + } + } +} +-------------------------------------------------- +// CONSOLE + +<1> The logging statement will be executed for each of the returned search hits. + [[action-conditions]] === Adding conditions to actions diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/actions/ActionWrapper.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/actions/ActionWrapper.java index eec08453498..bc314a623e1 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/actions/ActionWrapper.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/actions/ActionWrapper.java @@ -7,13 +7,17 @@ package org.elasticsearch.xpack.core.watcher.actions; import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.logging.log4j.util.Supplier; +import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchParseException; import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.Strings; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.ObjectPath; import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.license.XPackLicenseState; +import org.elasticsearch.script.JodaCompatibleZonedDateTime; import org.elasticsearch.xpack.core.watcher.actions.throttler.ActionThrottler; import org.elasticsearch.xpack.core.watcher.actions.throttler.Throttler; import org.elasticsearch.xpack.core.watcher.actions.throttler.ThrottlerField; @@ -30,12 +34,22 @@ import java.io.IOException; import java.time.Clock; import java.time.ZoneOffset; import java.time.ZonedDateTime; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import java.util.Objects; +import java.util.Set; +import java.util.stream.Collectors; import static org.elasticsearch.common.unit.TimeValue.timeValueMillis; public class ActionWrapper implements ToXContentObject { + private final int MAXIMUM_FOREACH_RUNS = 100; + private String id; @Nullable private final ExecutableCondition condition; @@ -43,16 +57,20 @@ public class ActionWrapper implements ToXContentObject { private final ExecutableTransform transform; private final ActionThrottler throttler; private final ExecutableAction action; + @Nullable + private String path; public ActionWrapper(String id, ActionThrottler throttler, @Nullable ExecutableCondition condition, @Nullable ExecutableTransform transform, - ExecutableAction action) { + ExecutableAction action, + @Nullable String path) { this.id = id; this.condition = condition; this.throttler = throttler; this.transform = transform; this.action = action; + this.path = path; } public String id() { @@ -140,16 +158,90 @@ public class ActionWrapper implements ToXContentObject { return new ActionWrapperResult(id, conditionResult, null, new Action.Result.FailureWithException(action.type(), e)); } } - try { - Action.Result actionResult = action.execute(id, ctx, payload); - return new ActionWrapperResult(id, conditionResult, transformResult, actionResult); - } catch (Exception e) { - action.logger().error( + if (Strings.isEmpty(path)) { + try { + Action.Result actionResult = action.execute(id, ctx, payload); + return new ActionWrapperResult(id, conditionResult, transformResult, actionResult); + } catch (Exception e) { + action.logger().error( + (Supplier) () -> new ParameterizedMessage("failed to execute action [{}/{}]", ctx.watch().id(), id), e); + return new ActionWrapperResult(id, new Action.Result.FailureWithException(action.type(), e)); + } + } else { + try { + List results = new ArrayList<>(); + Object object = ObjectPath.eval(path, toMap(ctx)); + int runs = 0; + if (object instanceof Collection) { + Collection collection = Collection.class.cast(object); + if (collection.isEmpty()) { + throw new ElasticsearchException("foreach object [{}] was an empty list, could not run any action", path); + } else { + for (Object o : collection) { + if (runs >= MAXIMUM_FOREACH_RUNS) { + break; + } + if (o instanceof Map) { + results.add(action.execute(id, ctx, new Payload.Simple((Map) o))); + } else { + results.add(action.execute(id, ctx, new Payload.Simple("_value", o))); + } + runs++; + } + } + } else if (object == null) { + throw new ElasticsearchException("specified foreach object was null: [{}]", path); + } else { + throw new ElasticsearchException("specified foreach object was not a an array/collection: [{}]", path); + } + + // check if we have mixed results, then set to partial failure + final Set statuses = results.stream().map(Action.Result::status).collect(Collectors.toSet()); + Action.Result.Status status; + if (statuses.size() == 1) { + status = statuses.iterator().next(); + } else { + status = Action.Result.Status.PARTIAL_FAILURE; + } + + final int numberOfActionsExecuted = runs; + return new ActionWrapperResult(id, conditionResult, transformResult, + new Action.Result(action.type(), status) { + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.field("number_of_actions_executed", numberOfActionsExecuted); + builder.startArray(WatchField.FOREACH.getPreferredName()); + for (Action.Result result : results) { + builder.startObject(); + result.toXContent(builder, params); + builder.endObject(); + } + builder.endArray(); + return builder; + } + }); + } catch (Exception e) { + action.logger().error( (Supplier) () -> new ParameterizedMessage("failed to execute action [{}/{}]", ctx.watch().id(), id), e); - return new ActionWrapperResult(id, new Action.Result.FailureWithException(action.type(), e)); + return new ActionWrapperResult(id, new Action.Result.FailureWithException(action.type(), e)); + } } } + private Map toMap(WatchExecutionContext ctx) { + Map model = new HashMap<>(); + model.put("id", ctx.id().value()); + model.put("watch_id", ctx.id().watchId()); + model.put("execution_time", new JodaCompatibleZonedDateTime(ctx.executionTime().toInstant(), ZoneOffset.UTC)); + model.put("trigger", ctx.triggerEvent().data()); + model.put("metadata", ctx.watch().metadata()); + model.put("vars", ctx.vars()); + if (ctx.payload().data() != null) { + model.put("payload", ctx.payload().data()); + } + return Collections.singletonMap("ctx", model); + } + @Override public boolean equals(Object o) { if (this == o) return true; @@ -186,6 +278,9 @@ public class ActionWrapper implements ToXContentObject { .field(transform.type(), transform, params) .endObject(); } + if (Strings.isEmpty(path) == false) { + builder.field(WatchField.FOREACH.getPreferredName(), path); + } builder.field(action.type(), action, params); return builder.endObject(); } @@ -198,6 +293,7 @@ public class ActionWrapper implements ToXContentObject { ExecutableCondition condition = null; ExecutableTransform transform = null; TimeValue throttlePeriod = null; + String path = null; ExecutableAction action = null; String currentFieldName = null; @@ -208,6 +304,8 @@ public class ActionWrapper implements ToXContentObject { } else { if (WatchField.CONDITION.match(currentFieldName, parser.getDeprecationHandler())) { condition = actionRegistry.getConditionRegistry().parseExecutable(watchId, parser); + } else if (WatchField.FOREACH.match(currentFieldName, parser.getDeprecationHandler())) { + path = parser.text(); } else if (Transform.TRANSFORM.match(currentFieldName, parser.getDeprecationHandler())) { transform = actionRegistry.getTransformRegistry().parse(watchId, parser); } else if (ThrottlerField.THROTTLE_PERIOD.match(currentFieldName, parser.getDeprecationHandler())) { @@ -235,7 +333,7 @@ public class ActionWrapper implements ToXContentObject { } ActionThrottler throttler = new ActionThrottler(clock, throttlePeriod, licenseState); - return new ActionWrapper(actionId, throttler, condition, transform, action); + return new ActionWrapper(actionId, throttler, condition, transform, action, path); } } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/client/WatchSourceBuilder.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/client/WatchSourceBuilder.java index 001a430ddb1..12eb6a12ee2 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/client/WatchSourceBuilder.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/client/WatchSourceBuilder.java @@ -101,7 +101,7 @@ public class WatchSourceBuilder implements ToXContentObject { } public WatchSourceBuilder addAction(String id, TimeValue throttlePeriod, Transform transform, Action action) { - actions.put(id, new TransformedAction(id, action, throttlePeriod, null, transform)); + actions.put(id, new TransformedAction(id, action, throttlePeriod, null, transform, null)); return this; } @@ -111,7 +111,13 @@ public class WatchSourceBuilder implements ToXContentObject { } public WatchSourceBuilder addAction(String id, TimeValue throttlePeriod, Condition condition, Transform transform, Action action) { - actions.put(id, new TransformedAction(id, action, throttlePeriod, condition, transform)); + actions.put(id, new TransformedAction(id, action, throttlePeriod, condition, transform, null)); + return this; + } + + public WatchSourceBuilder addAction(String id, TimeValue throttlePeriod, Condition condition, Transform transform, String path, + Action action) { + actions.put(id, new TransformedAction(id, action, throttlePeriod, condition, transform, path)); return this; } @@ -186,16 +192,18 @@ public class WatchSourceBuilder implements ToXContentObject { static class TransformedAction implements ToXContentObject { private final Action action; + @Nullable private String path; @Nullable private final TimeValue throttlePeriod; @Nullable private final Condition condition; @Nullable private final Transform transform; TransformedAction(String id, Action action, @Nullable TimeValue throttlePeriod, - @Nullable Condition condition, @Nullable Transform transform) { + @Nullable Condition condition, @Nullable Transform transform, @Nullable String path) { this.throttlePeriod = throttlePeriod; this.condition = condition; this.transform = transform; this.action = action; + this.path = path; } @Override @@ -215,6 +223,9 @@ public class WatchSourceBuilder implements ToXContentObject { .field(transform.type(), transform, params) .endObject(); } + if (path != null) { + builder.field("foreach", path); + } builder.field(action.type(), action, params); return builder.endObject(); } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/support/WatcherIndexTemplateRegistryField.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/support/WatcherIndexTemplateRegistryField.java index 4007b06ee7e..821ea9a4335 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/support/WatcherIndexTemplateRegistryField.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/support/WatcherIndexTemplateRegistryField.java @@ -14,8 +14,9 @@ public final class WatcherIndexTemplateRegistryField { // version 7: add full exception stack traces for better debugging // version 8: fix slack attachment property not to be dynamic, causing field type issues // version 9: add a user field defining which user executed the watch + // version 10: add support for foreach path in actions // Note: if you change this, also inform the kibana team around the watcher-ui - public static final String INDEX_TEMPLATE_VERSION = "9"; + public static final String INDEX_TEMPLATE_VERSION = "10"; public static final String HISTORY_TEMPLATE_NAME = ".watch-history-" + INDEX_TEMPLATE_VERSION; public static final String HISTORY_TEMPLATE_NAME_NO_ILM = ".watch-history-no-ilm-" + INDEX_TEMPLATE_VERSION; public static final String TRIGGERED_TEMPLATE_NAME = ".triggered_watches"; diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/watch/WatchField.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/watch/WatchField.java index 6f6a1955927..1bcb62447bf 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/watch/WatchField.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/watch/WatchField.java @@ -13,6 +13,7 @@ public final class WatchField { public static final ParseField CONDITION = new ParseField("condition"); public static final ParseField ACTIONS = new ParseField("actions"); public static final ParseField TRANSFORM = new ParseField("transform"); + public static final ParseField FOREACH = new ParseField("foreach"); public static final ParseField THROTTLE_PERIOD = new ParseField("throttle_period_in_millis"); public static final ParseField THROTTLE_PERIOD_HUMAN = new ParseField("throttle_period"); public static final ParseField METADATA = new ParseField("metadata"); diff --git a/x-pack/plugin/core/src/main/resources/watch-history.json b/x-pack/plugin/core/src/main/resources/watch-history.json index 9c5919f13a1..8b6bc435d2e 100644 --- a/x-pack/plugin/core/src/main/resources/watch-history.json +++ b/x-pack/plugin/core/src/main/resources/watch-history.json @@ -264,6 +264,13 @@ "reason" : { "type" : "keyword" }, + "number_of_actions_executed": { + "type": "integer" + }, + "foreach" : { + "type": "object", + "enabled" : false + }, "email": { "type": "object", "dynamic": true, diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/test/watcher/execute_watch/80_foreach.yml b/x-pack/plugin/src/test/resources/rest-api-spec/test/watcher/execute_watch/80_foreach.yml new file mode 100644 index 00000000000..0c5ec6fbcba --- /dev/null +++ b/x-pack/plugin/src/test/resources/rest-api-spec/test/watcher/execute_watch/80_foreach.yml @@ -0,0 +1,111 @@ +--- +setup: + - do: + cluster.health: + wait_for_status: yellow + +--- +"Test execute watch api with foreach action": + - do: + watcher.execute_watch: + body: > + { + "watch" : { + "trigger": { + "schedule" : { "cron" : "0 0 0 1 * ? 2099" } + }, + "input": { + "simple" : { + "hits" : { + "hits" : [ + { "key" : "first" }, + { "key" : "second" }, + { "key" : "third" } + ] + } + } + }, + "actions": { + "log_hits" : { + "foreach" : "ctx.payload.hits.hits", + "logging" : { + "text" : "Logging {{ctx.payload.key}}" + } + } + } + } + } + + - match: { watch_record.trigger_event.type: "manual" } + - match: { watch_record.state: "executed" } + - match: { watch_record.status.execution_state: "executed" } + - match: { watch_record.result.actions.0.foreach.0.logging.logged_text: "Logging first" } + - match: { watch_record.result.actions.0.foreach.1.logging.logged_text: "Logging second" } + - match: { watch_record.result.actions.0.foreach.2.logging.logged_text: "Logging third" } + + +--- +"Test execute watch api with foreach action using an array": + - do: + watcher.execute_watch: + body: > + { + "watch" : { + "trigger": { + "schedule" : { "cron" : "0 0 0 1 * ? 2099" } + }, + "input": { + "simple" : { + "values" : [1, 2, 3] + } + }, + "actions": { + "log_hits" : { + "foreach" : "ctx.payload.values", + "logging" : { + "text" : "Logging {{ctx.payload._value}}" + } + } + } + } + } + + - match: { watch_record.trigger_event.type: "manual" } + - match: { watch_record.state: "executed" } + - match: { watch_record.status.execution_state: "executed" } + - match: { watch_record.result.actions.0.foreach.0.logging.logged_text: "Logging 1" } + - match: { watch_record.result.actions.0.foreach.1.logging.logged_text: "Logging 2" } + - match: { watch_record.result.actions.0.foreach.2.logging.logged_text: "Logging 3" } + +--- +"Test execute watch api with foreach action using an array of arrays": + - do: + watcher.execute_watch: + body: > + { + "watch" : { + "trigger": { + "schedule" : { "cron" : "0 0 0 1 * ? 2099" } + }, + "input": { + "simple" : { + "values" : [[1, 2], [2, 3], [3, 4]] + } + }, + "actions": { + "log_hits" : { + "foreach" : "ctx.payload.values", + "logging" : { + "text" : "Logging {{ctx.payload._value.1}}" + } + } + } + } + } + + - match: { watch_record.trigger_event.type: "manual" } + - match: { watch_record.state: "executed" } + - match: { watch_record.status.execution_state: "executed" } + - match: { watch_record.result.actions.0.foreach.0.logging.logged_text: "Logging 2" } + - match: { watch_record.result.actions.0.foreach.1.logging.logged_text: "Logging 3" } + - match: { watch_record.result.actions.0.foreach.2.logging.logged_text: "Logging 4" } diff --git a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/actions/ActionWrapperTests.java b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/actions/ActionWrapperTests.java index e3a41062cae..304c26eefbf 100644 --- a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/actions/ActionWrapperTests.java +++ b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/actions/ActionWrapperTests.java @@ -5,6 +5,11 @@ */ package org.elasticsearch.xpack.watcher.actions; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentHelper; +import org.elasticsearch.common.xcontent.json.JsonXContent; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xpack.core.watcher.actions.Action; import org.elasticsearch.xpack.core.watcher.actions.ActionStatus; @@ -13,16 +18,31 @@ import org.elasticsearch.xpack.core.watcher.actions.ActionWrapper; import org.elasticsearch.xpack.core.watcher.actions.ActionWrapperResult; import org.elasticsearch.xpack.core.watcher.actions.ExecutableAction; import org.elasticsearch.xpack.core.watcher.execution.WatchExecutionContext; +import org.elasticsearch.xpack.core.watcher.execution.Wid; +import org.elasticsearch.xpack.core.watcher.trigger.TriggerEvent; +import org.elasticsearch.xpack.core.watcher.watch.Payload; import org.elasticsearch.xpack.core.watcher.watch.Watch; import org.elasticsearch.xpack.core.watcher.watch.WatchStatus; +import org.elasticsearch.xpack.watcher.actions.logging.ExecutableLoggingAction; +import org.elasticsearch.xpack.watcher.actions.logging.LoggingAction; +import org.elasticsearch.xpack.watcher.common.text.TextTemplate; +import org.elasticsearch.xpack.watcher.condition.InternalAlwaysCondition; import org.elasticsearch.xpack.watcher.condition.NeverCondition; +import org.elasticsearch.xpack.watcher.test.MockTextTemplateEngine; +import org.elasticsearch.xpack.watcher.trigger.schedule.ScheduleTriggerEvent; import java.time.ZoneOffset; import java.time.ZonedDateTime; +import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; +import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; +import static org.hamcrest.Matchers.hasKey; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.mock; @@ -34,7 +54,7 @@ public class ActionWrapperTests extends ESTestCase { private Watch watch = mock(Watch.class); @SuppressWarnings("unchecked") private ExecutableAction executableAction = mock(ExecutableAction.class); - private ActionWrapper actionWrapper = new ActionWrapper("_action", null, NeverCondition.INSTANCE, null, executableAction); + private ActionWrapper actionWrapper = new ActionWrapper("_action", null, NeverCondition.INSTANCE, null, executableAction, null); public void testThatUnmetActionConditionResetsAckStatus() throws Exception { WatchStatus watchStatus = new WatchStatus(now, Collections.singletonMap("_action", createActionStatus(State.ACKED))); @@ -59,10 +79,135 @@ public class ActionWrapperTests extends ESTestCase { assertThat(watch.status().actionStatus("other").ackStatus().state(), is(otherState)); } + public void testThatMultipleResultsCanBeReturned() throws Exception { + final LoggingAction loggingAction = new LoggingAction(new TextTemplate("{{key}}"), null, null); + final ExecutableAction executableAction = + new ExecutableLoggingAction(loggingAction, logger, new MockTextTemplateEngine()); + ActionWrapper wrapper = new ActionWrapper("_action", null, InternalAlwaysCondition.INSTANCE, null, executableAction, + "ctx.payload.my_path"); + + WatchExecutionContext ctx = mockExecutionContent(watch); + + List> payloads = new ArrayList<>(); + payloads.add(Collections.singletonMap("key", "first")); + payloads.add(Collections.singletonMap("key", "second")); + payloads.add(Collections.singletonMap("key", "third")); + Payload.Simple payload = new Payload.Simple(Collections.singletonMap("my_path", payloads)); + when(ctx.payload()).thenReturn(payload); + + ActionWrapperResult result = wrapper.execute(ctx); + assertThat(result.action().status(), is(Action.Result.Status.SUCCESS)); + // check that action toXContent contains all the results + try (XContentBuilder builder = jsonBuilder()) { + result.toXContent(builder, ToXContent.EMPTY_PARAMS); + final String json = Strings.toString(builder); + final Map map = XContentHelper.convertToMap(JsonXContent.jsonXContent, json, true); + assertThat(map, hasKey("foreach")); + assertThat(map.get("foreach"), instanceOf(List.class)); + List> actions = (List) map.get("foreach"); + assertThat(actions, hasSize(3)); + } + } + + public void testThatSpecifiedPathIsNotCollection() { + ActionWrapper wrapper = new ActionWrapper("_action", null, InternalAlwaysCondition.INSTANCE, null, executableAction, + "ctx.payload.my_path"); + WatchExecutionContext ctx = mockExecutionContent(watch); + Payload.Simple payload = new Payload.Simple(Collections.singletonMap("my_path", "not a map")); + when(ctx.payload()).thenReturn(payload); + when(executableAction.logger()).thenReturn(logger); + + ActionWrapperResult result = wrapper.execute(ctx); + assertThat(result.action().status(), is(Action.Result.Status.FAILURE)); + assertThat(result.action(), instanceOf(Action.Result.FailureWithException.class)); + Action.Result.FailureWithException failureWithException = (Action.Result.FailureWithException) result.action(); + assertThat(failureWithException.getException().getMessage(), + is("specified foreach object was not a an array/collection: [ctx.payload.my_path]")); + } + + public void testEmptyCollection() { + ActionWrapper wrapper = new ActionWrapper("_action", null, InternalAlwaysCondition.INSTANCE, null, executableAction, + "ctx.payload.my_path"); + WatchExecutionContext ctx = mockExecutionContent(watch); + Payload.Simple payload = new Payload.Simple(Collections.singletonMap("my_path", Collections.emptyList())); + when(ctx.payload()).thenReturn(payload); + when(executableAction.logger()).thenReturn(logger); + + ActionWrapperResult result = wrapper.execute(ctx); + assertThat(result.action().status(), is(Action.Result.Status.FAILURE)); + assertThat(result.action(), instanceOf(Action.Result.FailureWithException.class)); + Action.Result.FailureWithException failureWithException = (Action.Result.FailureWithException) result.action(); + assertThat(failureWithException.getException().getMessage(), + is("foreach object [ctx.payload.my_path] was an empty list, could not run any action")); + } + + public void testPartialFailure() throws Exception { + ActionWrapper wrapper = new ActionWrapper("_action", null, InternalAlwaysCondition.INSTANCE, null, executableAction, + "ctx.payload.my_path"); + WatchExecutionContext ctx = mockExecutionContent(watch); + + List> payloads = new ArrayList<>(); + payloads.add(Collections.singletonMap("key", "first")); + payloads.add(Collections.singletonMap("key", "second")); + Payload.Simple payload = new Payload.Simple(Collections.singletonMap("my_path", payloads)); + when(ctx.payload()).thenReturn(payload); + when(executableAction.logger()).thenReturn(logger); + + final Action.Result firstResult = new LoggingAction.Result.Success("log_message");; + final Payload firstPayload = new Payload.Simple(Collections.singletonMap("key", "first")); + when(executableAction.execute(eq("_action"), eq(ctx), eq(firstPayload))).thenReturn(firstResult); + + final Action.Result secondResult = new Action.Result.Failure("MY_TYPE", "second reason"); + final Payload secondPayload = new Payload.Simple(Collections.singletonMap("key", "second")); + when(executableAction.execute(eq("_action"), eq(ctx), eq(secondPayload))).thenReturn(secondResult); + + ActionWrapperResult result = wrapper.execute(ctx); + assertThat(result.action().status(), is(Action.Result.Status.PARTIAL_FAILURE)); + } + + public void testLimitOfNumberOfActionsExecuted() throws Exception { + ActionWrapper wrapper = new ActionWrapper("_action", null, InternalAlwaysCondition.INSTANCE, null, executableAction, + "ctx.payload.my_path"); + WatchExecutionContext ctx = mockExecutionContent(watch); + List> itemsPayload = new ArrayList<>(); + for (int i = 0; i < 101; i++) { + final Action.Result actionResult = new LoggingAction.Result.Success("log_message " + i);; + final Payload singleItemPayload = new Payload.Simple(Collections.singletonMap("key", String.valueOf(i))); + itemsPayload.add(Collections.singletonMap("key", String.valueOf(i))); + when(executableAction.execute(eq("_action"), eq(ctx), eq(singleItemPayload))).thenReturn(actionResult); + } + + Payload.Simple payload = new Payload.Simple(Collections.singletonMap("my_path", itemsPayload)); + when(ctx.payload()).thenReturn(payload); + when(executableAction.logger()).thenReturn(logger); + + ActionWrapperResult result = wrapper.execute(ctx); + assertThat(result.action().status(), is(Action.Result.Status.SUCCESS)); + + // check that action toXContent contains all the results + try (XContentBuilder builder = jsonBuilder()) { + result.toXContent(builder, ToXContent.EMPTY_PARAMS); + final String json = Strings.toString(builder); + final Map map = XContentHelper.convertToMap(JsonXContent.jsonXContent, json, true); + assertThat(map, hasKey("foreach")); + assertThat(map.get("foreach"), instanceOf(List.class)); + List> actions = (List) map.get("foreach"); + assertThat(actions, hasSize(100)); + assertThat(map, hasKey("number_of_actions_executed")); + assertThat(map.get("number_of_actions_executed"), is(100)); + } + } + private WatchExecutionContext mockExecutionContent(Watch watch) { WatchExecutionContext ctx = mock(WatchExecutionContext.class); when(watch.id()).thenReturn("watchId"); when(ctx.watch()).thenReturn(watch); + final ZonedDateTime now = ZonedDateTime.now(ZoneOffset.UTC); + final Wid wid = new Wid("watchId", now); + when(ctx.id()).thenReturn(wid); + when(ctx.executionTime()).thenReturn(now); + final TriggerEvent triggerEvent = new ScheduleTriggerEvent("watchId", now, now); + when(ctx.triggerEvent()).thenReturn(triggerEvent); when(ctx.skipThrottling(eq("_action"))).thenReturn(true); return ctx; } diff --git a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/execution/ExecutionServiceTests.java b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/execution/ExecutionServiceTests.java index cfc55deb131..40478f45dfa 100644 --- a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/execution/ExecutionServiceTests.java +++ b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/execution/ExecutionServiceTests.java @@ -227,7 +227,7 @@ public class ExecutionServiceTests extends ESTestCase { when(action.type()).thenReturn("MY_AWESOME_TYPE"); when(action.execute("_action", context, payload)).thenReturn(actionResult); - ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionCondition, actionTransform, action); + ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionCondition, actionTransform, action, null); WatchStatus watchStatus = new WatchStatus(now, singletonMap("_action", new ActionStatus(now))); @@ -313,7 +313,7 @@ public class ExecutionServiceTests extends ESTestCase { ExecutableAction action = mock(ExecutableAction.class); when(action.execute("_action", context, payload)).thenReturn(actionResult); - ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionCondition, actionTransform, action); + ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionCondition, actionTransform, action, null); WatchStatus watchStatus = new WatchStatus(now, singletonMap("_action", new ActionStatus(now))); when(watch.input()).thenReturn(input); @@ -378,7 +378,7 @@ public class ExecutionServiceTests extends ESTestCase { ExecutableAction action = mock(ExecutableAction.class); when(action.execute("_action", context, payload)).thenReturn(actionResult); - ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionCondition, actionTransform, action); + ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionCondition, actionTransform, action, null); WatchStatus watchStatus = new WatchStatus(now, singletonMap("_action", new ActionStatus(now))); when(watch.input()).thenReturn(input); @@ -442,7 +442,7 @@ public class ExecutionServiceTests extends ESTestCase { ExecutableAction action = mock(ExecutableAction.class); when(action.execute("_action", context, payload)).thenReturn(actionResult); - ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionCondition, actionTransform, action); + ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionCondition, actionTransform, action, null); WatchStatus watchStatus = new WatchStatus(now, singletonMap("_action", new ActionStatus(now))); when(watch.input()).thenReturn(input); @@ -520,7 +520,7 @@ public class ExecutionServiceTests extends ESTestCase { when(action.logger()).thenReturn(logger); when(action.execute("_action", context, payload)).thenReturn(actionResult); - ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionCondition, actionTransform, action); + ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionCondition, actionTransform, action, null); WatchStatus watchStatus = new WatchStatus(now, singletonMap("_action", new ActionStatus(now))); @@ -600,7 +600,7 @@ public class ExecutionServiceTests extends ESTestCase { ExecutableAction action = mock(ExecutableAction.class); when(action.execute("_action", context, payload)).thenReturn(actionResult); - ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionCondition, actionTransform, action); + ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionCondition, actionTransform, action, null); ZonedDateTime time = clock.instant().atZone(ZoneOffset.UTC); WatchStatus watchStatus = new WatchStatus(time, singletonMap("_action", new ActionStatus(now))); @@ -649,7 +649,7 @@ public class ExecutionServiceTests extends ESTestCase { ExecutableAction action = mock(ExecutableAction.class); when(action.type()).thenReturn("_type"); - ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionCondition, actionTransform, action); + ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionCondition, actionTransform, action, null); ZonedDateTime time = clock.instant().atZone(ZoneOffset.UTC); WatchStatus watchStatus = new WatchStatus(time, singletonMap("_action", new ActionStatus(now))); @@ -712,7 +712,7 @@ public class ExecutionServiceTests extends ESTestCase { ExecutableAction action = mock(ExecutableAction.class); when(action.type()).thenReturn("_type"); - ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionCondition, actionTransform, action); + ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionCondition, actionTransform, action, null); ZonedDateTime time = clock.instant().atZone(ZoneOffset.UTC); WatchStatus watchStatus = new WatchStatus(time, singletonMap("_action", new ActionStatus(now))); @@ -769,7 +769,7 @@ public class ExecutionServiceTests extends ESTestCase { ExecutableAction action = mock(ExecutableAction.class); when(action.type()).thenReturn("_type"); when(action.logger()).thenReturn(logger); - ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionCondition, actionTransform, action); + ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionCondition, actionTransform, action, null); ZonedDateTime time = clock.instant().atZone(ZoneOffset.UTC); WatchStatus watchStatus = new WatchStatus(time, singletonMap("_action", new ActionStatus(now))); @@ -817,7 +817,7 @@ public class ExecutionServiceTests extends ESTestCase { ExecutableCondition actionCondition = mock(ExecutableCondition.class); ExecutableTransform actionTransform = mock(ExecutableTransform.class); ExecutableAction action = mock(ExecutableAction.class); - ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionCondition, actionTransform, action); + ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionCondition, actionTransform, action, null); ZonedDateTime time = clock.instant().atZone(ZoneOffset.UTC); WatchStatus watchStatus = new WatchStatus(time, singletonMap("_action", new ActionStatus(now))); @@ -946,7 +946,7 @@ public class ExecutionServiceTests extends ESTestCase { when(action.type()).thenReturn("MY_AWESOME_TYPE"); when(action.execute("_action", context, payload)).thenReturn(actionResult); - ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, null, null, action); + ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, null, null, action, null); WatchStatus watchStatus = new WatchStatus(now, singletonMap("_action", new ActionStatus(now))); diff --git a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/WatcherTestUtils.java b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/WatcherTestUtils.java index 9636d159b52..aa1231baa17 100644 --- a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/WatcherTestUtils.java +++ b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/WatcherTestUtils.java @@ -158,7 +158,7 @@ public final class WatcherTestUtils { httpRequest.path(new TextTemplate("/foobarbaz/{{ctx.watch_id}}")); httpRequest.body(new TextTemplate("{{ctx.watch_id}} executed with {{ctx.payload.response.hits.total_hits}} hits")); actions.add(new ActionWrapper("_webhook", null, null, null, new ExecutableWebhookAction(new WebhookAction(httpRequest.build()), - logger, httpClient, engine))); + logger, httpClient, engine), null)); EmailTemplate email = EmailTemplate.builder().from("from@test.com").to("to@test.com").build(); @@ -166,7 +166,7 @@ public final class WatcherTestUtils { EmailAction action = new EmailAction(email, "testaccount", auth, Profile.STANDARD, null, null); ExecutableEmailAction executale = new ExecutableEmailAction(action, logger, emailService, engine, new HtmlSanitizer(Settings.EMPTY), Collections.emptyMap()); - actions.add(new ActionWrapper("_email", null, null, null, executale)); + actions.add(new ActionWrapper("_email", null, null, null, executale, null)); ZonedDateTime now = ZonedDateTime.now(ZoneOffset.UTC); Map statuses = new HashMap<>(); diff --git a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/watch/WatchTests.java b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/watch/WatchTests.java index ac944755b9c..3f62aa42c34 100644 --- a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/watch/WatchTests.java +++ b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/watch/WatchTests.java @@ -442,7 +442,7 @@ public class WatchTests extends ESTestCase { private WatchParser createWatchparser() throws Exception { LoggingAction loggingAction = new LoggingAction(new TextTemplate("foo"), null, null); List actions = Collections.singletonList(new ActionWrapper("_logging_", randomThrottler(), null, null, - new ExecutableLoggingAction(loggingAction, logger, new MockTextTemplateEngine()))); + new ExecutableLoggingAction(loggingAction, logger, new MockTextTemplateEngine()), null)); ScheduleRegistry scheduleRegistry = registry(new IntervalSchedule(new IntervalSchedule.Interval(1, IntervalSchedule.Interval.Unit.SECONDS))); @@ -585,7 +585,7 @@ public class WatchTests extends ESTestCase { randomFrom(DataAttachment.JSON, DataAttachment.YAML), EmailAttachments.EMPTY_ATTACHMENTS); list.add(new ActionWrapper("_email_" + randomAlphaOfLength(8), randomThrottler(), AlwaysConditionTests.randomCondition(scriptService), randomTransform(), - new ExecutableEmailAction(action, logger, emailService, templateEngine, htmlSanitizer, Collections.emptyMap()))); + new ExecutableEmailAction(action, logger, emailService, templateEngine, htmlSanitizer, Collections.emptyMap()), null)); } if (randomBoolean()) { ZoneOffset timeZone = randomBoolean() ? ZoneOffset.UTC : null; @@ -596,7 +596,7 @@ public class WatchTests extends ESTestCase { list.add(new ActionWrapper("_index_" + randomAlphaOfLength(8), randomThrottler(), AlwaysConditionTests.randomCondition(scriptService), randomTransform(), new ExecutableIndexAction(action, logger, client, TimeValue.timeValueSeconds(30), - TimeValue.timeValueSeconds(30)))); + TimeValue.timeValueSeconds(30)), null)); } if (randomBoolean()) { HttpRequestTemplate httpRequest = HttpRequestTemplate.builder("test.host", randomIntBetween(8000, 9000)) @@ -606,7 +606,7 @@ public class WatchTests extends ESTestCase { WebhookAction action = new WebhookAction(httpRequest); list.add(new ActionWrapper("_webhook_" + randomAlphaOfLength(8), randomThrottler(), AlwaysConditionTests.randomCondition(scriptService), randomTransform(), - new ExecutableWebhookAction(action, logger, httpClient, templateEngine))); + new ExecutableWebhookAction(action, logger, httpClient, templateEngine), null)); } return list; } diff --git a/x-pack/qa/src/main/java/org/elasticsearch/xpack/test/rest/XPackRestTestConstants.java b/x-pack/qa/src/main/java/org/elasticsearch/xpack/test/rest/XPackRestTestConstants.java index 481a11096aa..5a89c826683 100644 --- a/x-pack/qa/src/main/java/org/elasticsearch/xpack/test/rest/XPackRestTestConstants.java +++ b/x-pack/qa/src/main/java/org/elasticsearch/xpack/test/rest/XPackRestTestConstants.java @@ -12,7 +12,7 @@ import java.util.List; public final class XPackRestTestConstants { // Watcher constants: - public static final String INDEX_TEMPLATE_VERSION = "9"; + public static final String INDEX_TEMPLATE_VERSION = "10"; public static final String HISTORY_TEMPLATE_NAME_NO_ILM = ".watch-history-no-ilm-" + INDEX_TEMPLATE_VERSION; public static final String TRIGGERED_TEMPLATE_NAME = ".triggered_watches"; public static final String WATCHES_TEMPLATE_NAME = ".watches";