From e29df8dd604dfe6d2e589ddda36b085cea23ad9a Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Tue, 16 Jun 2015 15:36:25 +0200 Subject: [PATCH] Added inline watch support to _execute API This change allows the specification of a watch inline to the `_execute` API. This watch id will not be persisted to the index and if record_execution is set to true it will result in an error. The internal id `_anonymous_` will be used for the watch id and will be the watch id in the watch record. Original commit: elastic/x-pack-elasticsearch@00e32c383857030367e280bcd44d25711123e709 --- rest-api-spec/api/watcher.execute_watch.json | 5 +- .../test/execute_watch/30_inline_watch.yaml | 74 +++++++++++++++++++ .../rest/action/RestExecuteWatchAction.java | 22 ++++-- .../actions/execute/ExecuteWatchRequest.java | 59 ++++++++++++--- .../execute/ExecuteWatchRequestBuilder.java | 19 ++++- .../execute/TransportExecuteWatchAction.java | 24 ++++-- .../watcher/trigger/TriggerService.java | 4 +- .../elasticsearch/watcher/watch/Watch.java | 3 +- .../execution/ManualExecutionTests.java | 74 ++++++++++++++++++- 9 files changed, 252 insertions(+), 32 deletions(-) create mode 100644 rest-api-spec/test/execute_watch/30_inline_watch.yaml diff --git a/rest-api-spec/api/watcher.execute_watch.json b/rest-api-spec/api/watcher.execute_watch.json index 33c62b25e68..6c93610f5ae 100644 --- a/rest-api-spec/api/watcher.execute_watch.json +++ b/rest-api-spec/api/watcher.execute_watch.json @@ -4,12 +4,11 @@ "methods": [ "PUT", "POST" ], "url": { "path": "/_watcher/watch/{id}/_execute", - "paths": [ "/_watcher/watch/{id}/_execute" ], + "paths": [ "/_watcher/watch/{id}/_execute", "/_watcher/watch/_execute" ], "parts": { "id": { "type" : "string", - "description" : "Watch ID", - "required" : true + "description" : "Watch ID" } }, "params": { diff --git a/rest-api-spec/test/execute_watch/30_inline_watch.yaml b/rest-api-spec/test/execute_watch/30_inline_watch.yaml new file mode 100644 index 00000000000..a2cd89b6c85 --- /dev/null +++ b/rest-api-spec/test/execute_watch/30_inline_watch.yaml @@ -0,0 +1,74 @@ +--- +"Test execute watch api with an inline watch": + - do: + cluster.health: + wait_for_status: green + + - do: + watcher.execute_watch: + body: > + { + "trigger_data" : { + "scheduled_time" : "2015-05-05T20:58:02.443Z", + "triggered_time" : "2015-05-05T20:58:02.443Z" + }, + "alternative_input" : { + "foo" : "bar" + }, + "ignore_condition" : true, + "action_modes" : { + "_all" : "force_simulate" + }, + "watch" : { + "trigger" : { + "schedule" : { "cron" : "0 0 0 1 * ? 2099" } + }, + "input" : { + "search" : { + "request" : { + "indices" : [ "logstash*" ], + "body" : { + "query" : { + "filtered": { + "query": { + "match": { + "response": 404 + } + }, + "filter": { + "range": { + "@timestamp" : { + "from": "{{ctx.trigger.scheduled_time}}||-5m", + "to": "{{ctx.trigger.triggered_time}}" + } + } + } + } + } + } + } + } + }, + "condition" : { + "script" : { + "inline" : "ctx.payload.hits.total > 1" + } + }, + "actions" : { + "email_admin" : { + "email" : { + "to" : "someone@domain.host.com", + "subject" : "404 recently encountered" + } + } + } + } + } + - match: { "watch_record.state": "executed" } + - match: { "watch_record.trigger_event.manual.schedule.scheduled_time": "2015-05-05T20:58:02.443Z" } + - match: { "watch_record.result.input.type": "simple" } + - match: { "watch_record.result.input.payload.foo": "bar" } + - match: { "watch_record.result.condition.met": true } + - match: { "watch_record.result.actions.0.id" : "email_admin" } + - match: { "watch_record.result.actions.0.status" : "simulated" } + - match: { "watch_record.result.actions.0.email.email.subject" : "404 recently encountered" } diff --git a/src/main/java/org/elasticsearch/watcher/rest/action/RestExecuteWatchAction.java b/src/main/java/org/elasticsearch/watcher/rest/action/RestExecuteWatchAction.java index 9f249ca9848..a5fcca85a60 100644 --- a/src/main/java/org/elasticsearch/watcher/rest/action/RestExecuteWatchAction.java +++ b/src/main/java/org/elasticsearch/watcher/rest/action/RestExecuteWatchAction.java @@ -37,6 +37,8 @@ public class RestExecuteWatchAction extends WatcherRestHandler { super(settings, controller, client); controller.registerHandler(RestRequest.Method.POST, URI_BASE + "/watch/{id}/_execute", this); controller.registerHandler(RestRequest.Method.PUT, URI_BASE + "/watch/{id}/_execute", this); + controller.registerHandler(RestRequest.Method.POST, URI_BASE + "/watch/_execute", this); + controller.registerHandler(RestRequest.Method.PUT, URI_BASE + "/watch/_execute", this); this.triggerService = triggerService; } @@ -58,9 +60,8 @@ public class RestExecuteWatchAction extends WatcherRestHandler { //This tightly binds the REST API to the java API private ExecuteWatchRequest parseRequest(RestRequest request, WatcherClient client) throws IOException { - String watchId = request.param("id"); - ExecuteWatchRequestBuilder builder = client.prepareExecuteWatch(watchId); - + ExecuteWatchRequestBuilder builder = client.prepareExecuteWatch(); + builder.setId(request.param("id")); if (request.content() == null || request.content().length() == 0) { return builder.request(); } @@ -79,13 +80,17 @@ public class RestExecuteWatchAction extends WatcherRestHandler { } else if (Field.RECORD_EXECUTION.match(currentFieldName)) { builder.setRecordExecution(parser.booleanValue()); } else { - throw new ParseException("could not parse watch execution request for [{}]. unexpected boolean field [{}]", watchId, currentFieldName); + throw new ParseException("could not parse watch execution request. unexpected boolean field [{}]", currentFieldName); } } else if (token == XContentParser.Token.START_OBJECT) { if (Field.ALTERNATIVE_INPUT.match(currentFieldName)) { builder.setAlternativeInput(parser.map()); } else if (Field.TRIGGER_DATA.match(currentFieldName)) { builder.setTriggerData(parser.map()); + } else if (Field.WATCH.match(currentFieldName)) { + XContentBuilder watcherSource = XContentBuilder.builder(parser.contentType().xContent()); + XContentHelper.copyCurrentStructure(watcherSource.generator(), parser); + builder.setWatchSource(watcherSource.bytes()); } else if (Field.ACTION_MODES.match(currentFieldName)) { while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { if (token == XContentParser.Token.FIELD_NAME) { @@ -95,17 +100,17 @@ public class RestExecuteWatchAction extends WatcherRestHandler { ActionExecutionMode mode = ActionExecutionMode.resolve(parser.textOrNull()); builder.setActionMode(currentFieldName, mode); } catch (WatcherException we) { - throw new ParseException("could not parse watch execution request for [{}].", watchId, we); + throw new ParseException("could not parse watch execution request", we); } } else { - throw new ParseException("could not parse watch execution request for [{}]. unexpected array field [{}]", watchId, currentFieldName); + throw new ParseException("could not parse watch execution request. unexpected array field [{}]", currentFieldName); } } } else { - throw new ParseException("could not parse watch execution request for [{}]. unexpected object field [{}]", watchId, currentFieldName); + throw new ParseException("could not parse watch execution request. unexpected object field [{}]", currentFieldName); } } else { - throw new ParseException("could not parse watch execution request for [{}]. unexpected token [{}]", watchId, token); + throw new ParseException("could not parse watch execution request. unexpected token [{}]", token); } } @@ -132,5 +137,6 @@ public class RestExecuteWatchAction extends WatcherRestHandler { ParseField ALTERNATIVE_INPUT = new ParseField("alternative_input"); ParseField IGNORE_CONDITION = new ParseField("ignore_condition"); ParseField TRIGGER_DATA = new ParseField("trigger_data"); + ParseField WATCH = new ParseField("watch"); } } diff --git a/src/main/java/org/elasticsearch/watcher/transport/actions/execute/ExecuteWatchRequest.java b/src/main/java/org/elasticsearch/watcher/transport/actions/execute/ExecuteWatchRequest.java index 3143abe9302..1354920419b 100644 --- a/src/main/java/org/elasticsearch/watcher/transport/actions/execute/ExecuteWatchRequest.java +++ b/src/main/java/org/elasticsearch/watcher/transport/actions/execute/ExecuteWatchRequest.java @@ -9,8 +9,11 @@ import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.ValidateActions; import org.elasticsearch.action.support.master.MasterNodeOperationRequest; import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.watcher.client.WatchSourceBuilder; import org.elasticsearch.watcher.execution.ActionExecutionMode; import org.elasticsearch.watcher.support.validation.Validation; import org.elasticsearch.watcher.trigger.TriggerEvent; @@ -24,12 +27,15 @@ import java.util.Map; */ public class ExecuteWatchRequest extends MasterNodeOperationRequest { + public static final String INLINE_WATCH_ID = "_inlined_"; + private String id; private boolean ignoreCondition = false; private boolean recordExecution = false; private @Nullable Map triggerData = null; private @Nullable Map alternativeInput = null; private Map actionModes = new HashMap<>(); + private BytesReference watchSource; ExecuteWatchRequest() { } @@ -120,6 +126,26 @@ public class ExecuteWatchRequest extends MasterNodeOperationRequest modes : actionModes.entrySet()) { - error = Validation.actionId(modes.getKey()); + if (id != null) { + Validation.Error error = Validation.watchId(id); if (error != null) { validationException = ValidateActions.addValidationError(error.message(), validationException); } } + for (Map.Entry modes : actionModes.entrySet()) { + Validation.Error error = Validation.actionId(modes.getKey()); + if (error != null) { + validationException = ValidateActions.addValidationError(error.message(), validationException); + } + } + if (watchSource != null && id != null) { + validationException = ValidateActions.addValidationError("a watch execution request must either have a watch id or an inline watch source but not both", validationException); + } + if (watchSource != null && recordExecution) { + validationException = ValidateActions.addValidationError("the execution of an inline watch cannot be recorded", validationException); + } return validationException; } @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); - id = in.readString(); + id = in.readOptionalString(); ignoreCondition = in.readBoolean(); recordExecution = in.readBoolean(); if (in.readBoolean()){ @@ -176,13 +210,16 @@ public class ExecuteWatchRequest extends MasterNodeOperationRequest listener) { new WatcherClient(client).executeWatch(request, listener); } - } diff --git a/src/main/java/org/elasticsearch/watcher/transport/actions/execute/TransportExecuteWatchAction.java b/src/main/java/org/elasticsearch/watcher/transport/actions/execute/TransportExecuteWatchAction.java index 18e35f94f39..d7f6ded09de 100644 --- a/src/main/java/org/elasticsearch/watcher/transport/actions/execute/TransportExecuteWatchAction.java +++ b/src/main/java/org/elasticsearch/watcher/transport/actions/execute/TransportExecuteWatchAction.java @@ -49,16 +49,19 @@ public class TransportExecuteWatchAction extends WatcherTransportAction listener) throws ElasticsearchException { try { - Watch watch = watchStore.get(request.getId()); - if (watch == null) { - throw new WatcherException("watch [{}] does not exist", request.getId()); + Watch watch; + boolean knownWatch; + if (request.getId() != null) { + watch = watchStore.get(request.getId()); + if (watch == null) { + throw new WatcherException("watch [{}] does not exist", request.getId()); + } + knownWatch = true; + } else if (request.getWatchSource() != null) { + assert !request.isRecordExecution(); + watch = watchParser.parse(ExecuteWatchRequest.INLINE_WATCH_ID, false, request.getWatchSource()); + knownWatch = false; + } else { + throw new WatcherException("no watch provided"); } String triggerType = watch.trigger().type(); TriggerEvent triggerEvent = triggerService.simulateEvent(triggerType, watch.id(), request.getTriggerData()); - ManualExecutionContext.Builder ctxBuilder = ManualExecutionContext.builder(watch, true, new ManualTriggerEvent(triggerEvent.jobName(), triggerEvent), executionService.defaultThrottlePeriod()); + ManualExecutionContext.Builder ctxBuilder = ManualExecutionContext.builder(watch, knownWatch, new ManualTriggerEvent(triggerEvent.jobName(), triggerEvent), executionService.defaultThrottlePeriod()); DateTime executionTime = clock.now(UTC); ctxBuilder.executionTime(executionTime); diff --git a/src/main/java/org/elasticsearch/watcher/trigger/TriggerService.java b/src/main/java/org/elasticsearch/watcher/trigger/TriggerService.java index 8b747faca9d..b33be545c36 100644 --- a/src/main/java/org/elasticsearch/watcher/trigger/TriggerService.java +++ b/src/main/java/org/elasticsearch/watcher/trigger/TriggerService.java @@ -105,7 +105,9 @@ public class TriggerService extends AbstractComponent { public Trigger parseTrigger(String jobName, String type, XContentParser parser) throws IOException { TriggerEngine engine = engines.get(type); - assert engine != null; + if (engine == null) { + throw new TriggerException("could not parse trigger [{}] for [{}]. unknown trigger type [{}]", type, jobName, type); + } return engine.parseTrigger(jobName, parser); } diff --git a/src/main/java/org/elasticsearch/watcher/watch/Watch.java b/src/main/java/org/elasticsearch/watcher/watch/Watch.java index 96000aadcbf..192246ab0d0 100644 --- a/src/main/java/org/elasticsearch/watcher/watch/Watch.java +++ b/src/main/java/org/elasticsearch/watcher/watch/Watch.java @@ -252,6 +252,7 @@ public class Watch implements TriggerEngine.Job, ToXContent { if (withSecrets) { parser = new SensitiveXContentParser(parser, secretService); } + parser.nextToken(); return parse(id, includeStatus, parser); } catch (IOException ioe) { throw new WatcherException("could not parse watch [{}]", ioe, id); @@ -273,7 +274,7 @@ public class Watch implements TriggerEngine.Job, ToXContent { WatchStatus status = null; String currentFieldName = null; - XContentParser.Token token = parser.nextToken(); + XContentParser.Token token; while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { if (token == null ) { throw new ParseException("could not parse watch [{}]. null token", id); diff --git a/src/test/java/org/elasticsearch/watcher/execution/ManualExecutionTests.java b/src/test/java/org/elasticsearch/watcher/execution/ManualExecutionTests.java index b92d9d63cb9..09ecf6672dd 100644 --- a/src/test/java/org/elasticsearch/watcher/execution/ManualExecutionTests.java +++ b/src/test/java/org/elasticsearch/watcher/execution/ManualExecutionTests.java @@ -7,6 +7,7 @@ package org.elasticsearch.watcher.execution; import com.carrotsearch.randomizedtesting.annotations.Repeat; import org.apache.lucene.util.LuceneTestCase.Slow; +import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.common.joda.time.DateTime; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentType; @@ -24,6 +25,9 @@ import org.elasticsearch.watcher.support.Script; import org.elasticsearch.watcher.support.xcontent.ObjectPath; import org.elasticsearch.watcher.test.AbstractWatcherIntegrationTests; import org.elasticsearch.watcher.transport.actions.delete.DeleteWatchResponse; +import org.elasticsearch.watcher.transport.actions.execute.ExecuteWatchRequest; +import org.elasticsearch.watcher.transport.actions.execute.ExecuteWatchRequestBuilder; +import org.elasticsearch.watcher.transport.actions.execute.ExecuteWatchResponse; import org.elasticsearch.watcher.transport.actions.get.GetWatchRequest; import org.elasticsearch.watcher.transport.actions.put.PutWatchRequest; import org.elasticsearch.watcher.transport.actions.put.PutWatchResponse; @@ -62,7 +66,6 @@ public class ManualExecutionTests extends AbstractWatcherIntegrationTests { @Test @Repeat(iterations = 10) public void testExecuteWatch() throws Exception { - ensureWatcherStarted(); boolean ignoreCondition = randomBoolean(); boolean recordExecution = randomBoolean(); boolean conditionAlwaysTrue = randomBoolean(); @@ -146,9 +149,74 @@ public class ManualExecutionTests extends AbstractWatcherIntegrationTests { } } + @Test + @Repeat(iterations = 5) + public void testExecutionWithInlineWatch() throws Exception { + WatchSourceBuilder watchBuilder = watchBuilder() + .trigger(schedule(cron("0 0 0 1 * ? 2099"))) + .input(simpleInput("foo", "bar")) + .condition(alwaysCondition()) + .addAction("log", loggingAction("foobar")); + + ExecuteWatchRequestBuilder builder = watcherClient().prepareExecuteWatch() + .setWatchSource(watchBuilder); + if (randomBoolean()) { + builder.setRecordExecution(false); + } + if (randomBoolean()) { + builder.setTriggerEvent(new ScheduleTriggerEvent(new DateTime(UTC), new DateTime(UTC))); + } + + ExecuteWatchResponse executeWatchResponse = builder.get(); + assertThat(executeWatchResponse.getRecordId(), startsWith(ExecuteWatchRequest.INLINE_WATCH_ID)); + assertThat(executeWatchResponse.getRecordSource().getValue("watch_id").toString(), equalTo(ExecuteWatchRequest.INLINE_WATCH_ID)); + assertThat(executeWatchResponse.getRecordSource().getValue("state").toString(), equalTo("executed")); + assertThat(executeWatchResponse.getRecordSource().getValue("trigger_event.type").toString(), equalTo("manual")); + } + + @Test + public void testExecutionWithInlineWatch_withRecordExecutionEnabled() throws Exception { + WatchSourceBuilder watchBuilder = watchBuilder() + .trigger(schedule(cron("0 0 0 1 * ? 2099"))) + .input(simpleInput("foo", "bar")) + .condition(alwaysCondition()) + .addAction("log", loggingAction("foobar")); + + try { + watcherClient().prepareExecuteWatch() + .setWatchSource(watchBuilder) + .setRecordExecution(true) + .setTriggerEvent(new ScheduleTriggerEvent(new DateTime(UTC), new DateTime(UTC))) + .get(); + fail(); + } catch (ActionRequestValidationException e) { + assertThat(e.getMessage(), containsString("the execution of an inline watch cannot be recorded")); + } + } + + @Test + public void testExecutionWithInlineWatch_withWatchId() throws Exception { + WatchSourceBuilder watchBuilder = watchBuilder() + .trigger(schedule(cron("0 0 0 1 * ? 2099"))) + .input(simpleInput("foo", "bar")) + .condition(alwaysCondition()) + .addAction("log", loggingAction("foobar")); + + try { + watcherClient().prepareExecuteWatch() + .setId("_id") + .setWatchSource(watchBuilder) + .setRecordExecution(false) + .setTriggerEvent(new ScheduleTriggerEvent(new DateTime(UTC), new DateTime(UTC))) + .get(); + fail(); + } catch (ActionRequestValidationException e) { + assertThat(e.getMessage(), containsString("a watch execution request must either have a watch id or an inline watch source but not both")); + } + } + @Test public void testDifferentAlternativeInputs() throws Exception { - ensureWatcherStarted(); WatchSourceBuilder watchBuilder = watchBuilder() .trigger(schedule(cron("0 0 0 1 * ? 2099"))) .addAction("log", loggingAction("foobar")); @@ -187,8 +255,6 @@ public class ManualExecutionTests extends AbstractWatcherIntegrationTests { @Test public void testExecutionRequestDefaults() throws Exception { - ensureWatcherStarted(); - WatchSourceBuilder watchBuilder = watchBuilder() .trigger(schedule(cron("0 0 0 1 * ? 2099"))) .input(simpleInput("foo", "bar"))