Added inline watch support to _execute API

This change allows the specification of a watch inline to the `_execute` API.
This watch id will not be persisted to the index and if record_execution is set to true it will result in an error.
The internal id `_anonymous_` will be used for the watch id and will be the watch id in the watch record.

Original commit: elastic/x-pack-elasticsearch@00e32c3838
This commit is contained in:
Martijn van Groningen 2015-06-16 15:36:25 +02:00
parent 03b704f79b
commit e29df8dd60
9 changed files with 252 additions and 32 deletions

View File

@ -4,12 +4,11 @@
"methods": [ "PUT", "POST" ],
"url": {
"path": "/_watcher/watch/{id}/_execute",
"paths": [ "/_watcher/watch/{id}/_execute" ],
"paths": [ "/_watcher/watch/{id}/_execute", "/_watcher/watch/_execute" ],
"parts": {
"id": {
"type" : "string",
"description" : "Watch ID",
"required" : true
"description" : "Watch ID"
}
},
"params": {

View File

@ -0,0 +1,74 @@
---
"Test execute watch api with an inline watch":
- do:
cluster.health:
wait_for_status: green
- do:
watcher.execute_watch:
body: >
{
"trigger_data" : {
"scheduled_time" : "2015-05-05T20:58:02.443Z",
"triggered_time" : "2015-05-05T20:58:02.443Z"
},
"alternative_input" : {
"foo" : "bar"
},
"ignore_condition" : true,
"action_modes" : {
"_all" : "force_simulate"
},
"watch" : {
"trigger" : {
"schedule" : { "cron" : "0 0 0 1 * ? 2099" }
},
"input" : {
"search" : {
"request" : {
"indices" : [ "logstash*" ],
"body" : {
"query" : {
"filtered": {
"query": {
"match": {
"response": 404
}
},
"filter": {
"range": {
"@timestamp" : {
"from": "{{ctx.trigger.scheduled_time}}||-5m",
"to": "{{ctx.trigger.triggered_time}}"
}
}
}
}
}
}
}
}
},
"condition" : {
"script" : {
"inline" : "ctx.payload.hits.total > 1"
}
},
"actions" : {
"email_admin" : {
"email" : {
"to" : "someone@domain.host.com",
"subject" : "404 recently encountered"
}
}
}
}
}
- match: { "watch_record.state": "executed" }
- match: { "watch_record.trigger_event.manual.schedule.scheduled_time": "2015-05-05T20:58:02.443Z" }
- match: { "watch_record.result.input.type": "simple" }
- match: { "watch_record.result.input.payload.foo": "bar" }
- match: { "watch_record.result.condition.met": true }
- match: { "watch_record.result.actions.0.id" : "email_admin" }
- match: { "watch_record.result.actions.0.status" : "simulated" }
- match: { "watch_record.result.actions.0.email.email.subject" : "404 recently encountered" }

View File

@ -37,6 +37,8 @@ public class RestExecuteWatchAction extends WatcherRestHandler {
super(settings, controller, client);
controller.registerHandler(RestRequest.Method.POST, URI_BASE + "/watch/{id}/_execute", this);
controller.registerHandler(RestRequest.Method.PUT, URI_BASE + "/watch/{id}/_execute", this);
controller.registerHandler(RestRequest.Method.POST, URI_BASE + "/watch/_execute", this);
controller.registerHandler(RestRequest.Method.PUT, URI_BASE + "/watch/_execute", this);
this.triggerService = triggerService;
}
@ -58,9 +60,8 @@ public class RestExecuteWatchAction extends WatcherRestHandler {
//This tightly binds the REST API to the java API
private ExecuteWatchRequest parseRequest(RestRequest request, WatcherClient client) throws IOException {
String watchId = request.param("id");
ExecuteWatchRequestBuilder builder = client.prepareExecuteWatch(watchId);
ExecuteWatchRequestBuilder builder = client.prepareExecuteWatch();
builder.setId(request.param("id"));
if (request.content() == null || request.content().length() == 0) {
return builder.request();
}
@ -79,13 +80,17 @@ public class RestExecuteWatchAction extends WatcherRestHandler {
} else if (Field.RECORD_EXECUTION.match(currentFieldName)) {
builder.setRecordExecution(parser.booleanValue());
} else {
throw new ParseException("could not parse watch execution request for [{}]. unexpected boolean field [{}]", watchId, currentFieldName);
throw new ParseException("could not parse watch execution request. unexpected boolean field [{}]", currentFieldName);
}
} else if (token == XContentParser.Token.START_OBJECT) {
if (Field.ALTERNATIVE_INPUT.match(currentFieldName)) {
builder.setAlternativeInput(parser.map());
} else if (Field.TRIGGER_DATA.match(currentFieldName)) {
builder.setTriggerData(parser.map());
} else if (Field.WATCH.match(currentFieldName)) {
XContentBuilder watcherSource = XContentBuilder.builder(parser.contentType().xContent());
XContentHelper.copyCurrentStructure(watcherSource.generator(), parser);
builder.setWatchSource(watcherSource.bytes());
} else if (Field.ACTION_MODES.match(currentFieldName)) {
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
@ -95,17 +100,17 @@ public class RestExecuteWatchAction extends WatcherRestHandler {
ActionExecutionMode mode = ActionExecutionMode.resolve(parser.textOrNull());
builder.setActionMode(currentFieldName, mode);
} catch (WatcherException we) {
throw new ParseException("could not parse watch execution request for [{}].", watchId, we);
throw new ParseException("could not parse watch execution request", we);
}
} else {
throw new ParseException("could not parse watch execution request for [{}]. unexpected array field [{}]", watchId, currentFieldName);
throw new ParseException("could not parse watch execution request. unexpected array field [{}]", currentFieldName);
}
}
} else {
throw new ParseException("could not parse watch execution request for [{}]. unexpected object field [{}]", watchId, currentFieldName);
throw new ParseException("could not parse watch execution request. unexpected object field [{}]", currentFieldName);
}
} else {
throw new ParseException("could not parse watch execution request for [{}]. unexpected token [{}]", watchId, token);
throw new ParseException("could not parse watch execution request. unexpected token [{}]", token);
}
}
@ -132,5 +137,6 @@ public class RestExecuteWatchAction extends WatcherRestHandler {
ParseField ALTERNATIVE_INPUT = new ParseField("alternative_input");
ParseField IGNORE_CONDITION = new ParseField("ignore_condition");
ParseField TRIGGER_DATA = new ParseField("trigger_data");
ParseField WATCH = new ParseField("watch");
}
}

View File

@ -9,8 +9,11 @@ import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ValidateActions;
import org.elasticsearch.action.support.master.MasterNodeOperationRequest;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.watcher.client.WatchSourceBuilder;
import org.elasticsearch.watcher.execution.ActionExecutionMode;
import org.elasticsearch.watcher.support.validation.Validation;
import org.elasticsearch.watcher.trigger.TriggerEvent;
@ -24,12 +27,15 @@ import java.util.Map;
*/
public class ExecuteWatchRequest extends MasterNodeOperationRequest<ExecuteWatchRequest> {
public static final String INLINE_WATCH_ID = "_inlined_";
private String id;
private boolean ignoreCondition = false;
private boolean recordExecution = false;
private @Nullable Map<String, Object> triggerData = null;
private @Nullable Map<String, Object> alternativeInput = null;
private Map<String, ActionExecutionMode> actionModes = new HashMap<>();
private BytesReference watchSource;
ExecuteWatchRequest() {
}
@ -120,6 +126,26 @@ public class ExecuteWatchRequest extends MasterNodeOperationRequest<ExecuteWatch
return triggerData;
}
/**
* @return the source of the watch to execute
*/
public BytesReference getWatchSource() {
return watchSource;
}
/**
* @param watchSource instead of using an existing watch use this non persisted watch
*/
public void setWatchSource(BytesReference watchSource) {
this.watchSource = watchSource;
}
/**
* @param watchSource instead of using an existing watch use this non persisted watch
*/
public void setWatchSource(WatchSourceBuilder watchSource) {
this.watchSource = watchSource.buildAsBytes(XContentType.JSON);
}
/**
*
@ -143,26 +169,34 @@ public class ExecuteWatchRequest extends MasterNodeOperationRequest<ExecuteWatch
@Override
public ActionRequestValidationException validate() {
ActionRequestValidationException validationException = null;
if (id == null){
if (id == null && watchSource == null){
validationException = ValidateActions.addValidationError("watch id is missing", validationException);
}
Validation.Error error = Validation.watchId(id);
if (error != null) {
validationException = ValidateActions.addValidationError(error.message(), validationException);
}
for (Map.Entry<String, ActionExecutionMode> modes : actionModes.entrySet()) {
error = Validation.actionId(modes.getKey());
if (id != null) {
Validation.Error error = Validation.watchId(id);
if (error != null) {
validationException = ValidateActions.addValidationError(error.message(), validationException);
}
}
for (Map.Entry<String, ActionExecutionMode> modes : actionModes.entrySet()) {
Validation.Error error = Validation.actionId(modes.getKey());
if (error != null) {
validationException = ValidateActions.addValidationError(error.message(), validationException);
}
}
if (watchSource != null && id != null) {
validationException = ValidateActions.addValidationError("a watch execution request must either have a watch id or an inline watch source but not both", validationException);
}
if (watchSource != null && recordExecution) {
validationException = ValidateActions.addValidationError("the execution of an inline watch cannot be recorded", validationException);
}
return validationException;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
id = in.readString();
id = in.readOptionalString();
ignoreCondition = in.readBoolean();
recordExecution = in.readBoolean();
if (in.readBoolean()){
@ -176,13 +210,16 @@ public class ExecuteWatchRequest extends MasterNodeOperationRequest<ExecuteWatch
for (int i = 0; i < actionModesCount; i++) {
actionModes.put(in.readString(), ActionExecutionMode.resolve(in.readByte()));
}
if (in.readBoolean()) {
watchSource = in.readBytesReference();
}
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(id);
out.writeOptionalString(id);
out.writeBoolean(ignoreCondition);
out.writeBoolean(recordExecution);
out.writeBoolean(alternativeInput != null);
@ -198,6 +235,10 @@ public class ExecuteWatchRequest extends MasterNodeOperationRequest<ExecuteWatch
out.writeString(entry.getKey());
out.writeByte(entry.getValue().id());
}
out.writeBoolean(watchSource != null);
if (watchSource != null) {
out.writeBytesReference(watchSource);
}
}
@Override

View File

@ -8,6 +8,8 @@ package org.elasticsearch.watcher.transport.actions.execute;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.master.MasterNodeOperationRequestBuilder;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.watcher.client.WatchSourceBuilder;
import org.elasticsearch.watcher.client.WatcherClient;
import org.elasticsearch.watcher.execution.ActionExecutionMode;
import org.elasticsearch.watcher.trigger.TriggerEvent;
@ -75,6 +77,22 @@ public class ExecuteWatchRequestBuilder extends MasterNodeOperationRequestBuilde
return this;
}
/**
* @param watchSource instead of using an existing watch use this non persisted watch
*/
public ExecuteWatchRequestBuilder setWatchSource(BytesReference watchSource) {
request.setWatchSource(watchSource);
return this;
}
/**
* @param watchSource instead of using an existing watch use this non persisted watch
*/
public ExecuteWatchRequestBuilder setWatchSource(WatchSourceBuilder watchSource) {
request.setWatchSource(watchSource);
return this;
}
/**
* Sets the mode in which the given action (identified by its id) will be handled.
*
@ -90,5 +108,4 @@ public class ExecuteWatchRequestBuilder extends MasterNodeOperationRequestBuilde
protected void doExecute(final ActionListener<ExecuteWatchResponse> listener) {
new WatcherClient(client).executeWatch(request, listener);
}
}

View File

@ -49,16 +49,19 @@ public class TransportExecuteWatchAction extends WatcherTransportAction<ExecuteW
private final WatchStore watchStore;
private final Clock clock;
private final TriggerService triggerService;
private final Watch.Parser watchParser;
@Inject
public TransportExecuteWatchAction(Settings settings, TransportService transportService, ClusterService clusterService,
ThreadPool threadPool, ActionFilters actionFilters, ExecutionService executionService,
Clock clock, LicenseService licenseService, WatchStore watchStore, TriggerService triggerService) {
Clock clock, LicenseService licenseService, WatchStore watchStore, TriggerService triggerService,
Watch.Parser watchParser) {
super(settings, ExecuteWatchAction.NAME, transportService, clusterService, threadPool, actionFilters, licenseService);
this.executionService = executionService;
this.watchStore = watchStore;
this.clock = clock;
this.triggerService = triggerService;
this.watchParser = watchParser;
}
@Override
@ -79,15 +82,26 @@ public class TransportExecuteWatchAction extends WatcherTransportAction<ExecuteW
@Override
protected void masterOperation(ExecuteWatchRequest request, ClusterState state, ActionListener<ExecuteWatchResponse> listener) throws ElasticsearchException {
try {
Watch watch = watchStore.get(request.getId());
if (watch == null) {
throw new WatcherException("watch [{}] does not exist", request.getId());
Watch watch;
boolean knownWatch;
if (request.getId() != null) {
watch = watchStore.get(request.getId());
if (watch == null) {
throw new WatcherException("watch [{}] does not exist", request.getId());
}
knownWatch = true;
} else if (request.getWatchSource() != null) {
assert !request.isRecordExecution();
watch = watchParser.parse(ExecuteWatchRequest.INLINE_WATCH_ID, false, request.getWatchSource());
knownWatch = false;
} else {
throw new WatcherException("no watch provided");
}
String triggerType = watch.trigger().type();
TriggerEvent triggerEvent = triggerService.simulateEvent(triggerType, watch.id(), request.getTriggerData());
ManualExecutionContext.Builder ctxBuilder = ManualExecutionContext.builder(watch, true, new ManualTriggerEvent(triggerEvent.jobName(), triggerEvent), executionService.defaultThrottlePeriod());
ManualExecutionContext.Builder ctxBuilder = ManualExecutionContext.builder(watch, knownWatch, new ManualTriggerEvent(triggerEvent.jobName(), triggerEvent), executionService.defaultThrottlePeriod());
DateTime executionTime = clock.now(UTC);
ctxBuilder.executionTime(executionTime);

View File

@ -105,7 +105,9 @@ public class TriggerService extends AbstractComponent {
public Trigger parseTrigger(String jobName, String type, XContentParser parser) throws IOException {
TriggerEngine engine = engines.get(type);
assert engine != null;
if (engine == null) {
throw new TriggerException("could not parse trigger [{}] for [{}]. unknown trigger type [{}]", type, jobName, type);
}
return engine.parseTrigger(jobName, parser);
}

View File

@ -252,6 +252,7 @@ public class Watch implements TriggerEngine.Job, ToXContent {
if (withSecrets) {
parser = new SensitiveXContentParser(parser, secretService);
}
parser.nextToken();
return parse(id, includeStatus, parser);
} catch (IOException ioe) {
throw new WatcherException("could not parse watch [{}]", ioe, id);
@ -273,7 +274,7 @@ public class Watch implements TriggerEngine.Job, ToXContent {
WatchStatus status = null;
String currentFieldName = null;
XContentParser.Token token = parser.nextToken();
XContentParser.Token token;
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == null ) {
throw new ParseException("could not parse watch [{}]. null token", id);

View File

@ -7,6 +7,7 @@ package org.elasticsearch.watcher.execution;
import com.carrotsearch.randomizedtesting.annotations.Repeat;
import org.apache.lucene.util.LuceneTestCase.Slow;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.common.joda.time.DateTime;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentType;
@ -24,6 +25,9 @@ import org.elasticsearch.watcher.support.Script;
import org.elasticsearch.watcher.support.xcontent.ObjectPath;
import org.elasticsearch.watcher.test.AbstractWatcherIntegrationTests;
import org.elasticsearch.watcher.transport.actions.delete.DeleteWatchResponse;
import org.elasticsearch.watcher.transport.actions.execute.ExecuteWatchRequest;
import org.elasticsearch.watcher.transport.actions.execute.ExecuteWatchRequestBuilder;
import org.elasticsearch.watcher.transport.actions.execute.ExecuteWatchResponse;
import org.elasticsearch.watcher.transport.actions.get.GetWatchRequest;
import org.elasticsearch.watcher.transport.actions.put.PutWatchRequest;
import org.elasticsearch.watcher.transport.actions.put.PutWatchResponse;
@ -62,7 +66,6 @@ public class ManualExecutionTests extends AbstractWatcherIntegrationTests {
@Test @Repeat(iterations = 10)
public void testExecuteWatch() throws Exception {
ensureWatcherStarted();
boolean ignoreCondition = randomBoolean();
boolean recordExecution = randomBoolean();
boolean conditionAlwaysTrue = randomBoolean();
@ -146,9 +149,74 @@ public class ManualExecutionTests extends AbstractWatcherIntegrationTests {
}
}
@Test
@Repeat(iterations = 5)
public void testExecutionWithInlineWatch() throws Exception {
WatchSourceBuilder watchBuilder = watchBuilder()
.trigger(schedule(cron("0 0 0 1 * ? 2099")))
.input(simpleInput("foo", "bar"))
.condition(alwaysCondition())
.addAction("log", loggingAction("foobar"));
ExecuteWatchRequestBuilder builder = watcherClient().prepareExecuteWatch()
.setWatchSource(watchBuilder);
if (randomBoolean()) {
builder.setRecordExecution(false);
}
if (randomBoolean()) {
builder.setTriggerEvent(new ScheduleTriggerEvent(new DateTime(UTC), new DateTime(UTC)));
}
ExecuteWatchResponse executeWatchResponse = builder.get();
assertThat(executeWatchResponse.getRecordId(), startsWith(ExecuteWatchRequest.INLINE_WATCH_ID));
assertThat(executeWatchResponse.getRecordSource().getValue("watch_id").toString(), equalTo(ExecuteWatchRequest.INLINE_WATCH_ID));
assertThat(executeWatchResponse.getRecordSource().getValue("state").toString(), equalTo("executed"));
assertThat(executeWatchResponse.getRecordSource().getValue("trigger_event.type").toString(), equalTo("manual"));
}
@Test
public void testExecutionWithInlineWatch_withRecordExecutionEnabled() throws Exception {
WatchSourceBuilder watchBuilder = watchBuilder()
.trigger(schedule(cron("0 0 0 1 * ? 2099")))
.input(simpleInput("foo", "bar"))
.condition(alwaysCondition())
.addAction("log", loggingAction("foobar"));
try {
watcherClient().prepareExecuteWatch()
.setWatchSource(watchBuilder)
.setRecordExecution(true)
.setTriggerEvent(new ScheduleTriggerEvent(new DateTime(UTC), new DateTime(UTC)))
.get();
fail();
} catch (ActionRequestValidationException e) {
assertThat(e.getMessage(), containsString("the execution of an inline watch cannot be recorded"));
}
}
@Test
public void testExecutionWithInlineWatch_withWatchId() throws Exception {
WatchSourceBuilder watchBuilder = watchBuilder()
.trigger(schedule(cron("0 0 0 1 * ? 2099")))
.input(simpleInput("foo", "bar"))
.condition(alwaysCondition())
.addAction("log", loggingAction("foobar"));
try {
watcherClient().prepareExecuteWatch()
.setId("_id")
.setWatchSource(watchBuilder)
.setRecordExecution(false)
.setTriggerEvent(new ScheduleTriggerEvent(new DateTime(UTC), new DateTime(UTC)))
.get();
fail();
} catch (ActionRequestValidationException e) {
assertThat(e.getMessage(), containsString("a watch execution request must either have a watch id or an inline watch source but not both"));
}
}
@Test
public void testDifferentAlternativeInputs() throws Exception {
ensureWatcherStarted();
WatchSourceBuilder watchBuilder = watchBuilder()
.trigger(schedule(cron("0 0 0 1 * ? 2099")))
.addAction("log", loggingAction("foobar"));
@ -187,8 +255,6 @@ public class ManualExecutionTests extends AbstractWatcherIntegrationTests {
@Test
public void testExecutionRequestDefaults() throws Exception {
ensureWatcherStarted();
WatchSourceBuilder watchBuilder = watchBuilder()
.trigger(schedule(cron("0 0 0 1 * ? 2099")))
.input(simpleInput("foo", "bar"))