Change the ManualTriggerEvent to wrap a TriggerEvent

This change changes the manual trigger event to be a wrapper for another `TriggerEvent`.
TriggerEvent is required field for the execute API either via setTriggerEvent on the request or requestBuilder in the java API or `trigger_event` in the REST API

Original commit: elastic/x-pack-elasticsearch@790bdb96a5
This commit is contained in:
Brian Murphy 2015-05-06 11:51:10 -04:00
parent bd9bf2810d
commit 7d98358012
20 changed files with 244 additions and 183 deletions

View File

@ -65,15 +65,18 @@
"alternative_input" : {
"foo" : "bar"
},
"trigger_data" : {
"scheduled_time" : "now"
"trigger_event" : {
"schedule" : {
"scheduled_time" : "2015-05-05T20:58:02.443Z",
"triggered_time" : "2015-05-05T20:58:02.443Z"
}
},
"record_execution" : true
}
- match: { "watch_id": "my_exe_watch" }
- match: { "watch_execution.condition_result.always": {} }
- match: { "state": "executed" }
- match: { "trigger_event.manual.trigger_data.scheduled_time": "now" }
- match: { "trigger_event.manual.schedule.scheduled_time": "2015-05-05T20:58:02.443Z" }
- match: { "watch_execution.input_result.simple.payload.foo": "bar" }
- match: { "watch_execution.actions_results.EmailAdmin.email.success" : true }
- match: { "watch_execution.actions_results.EmailAdmin.email.simulated_email.subject" : "404 recently encountered" }

View File

@ -33,7 +33,15 @@
- do:
watcher.execute_watch:
id: "my_logging_watch"
body: null
body: >
{
"trigger_event" : {
"schedule" : {
"scheduled_time" : "2015-05-05T20:58:02.443Z",
"triggered_time" : "2015-05-05T20:58:02.443Z"
}
}
}
- match: { "watch_id": "my_logging_watch" }
- match: { "watch_execution.condition_result.script.met": true }

View File

@ -17,6 +17,10 @@ import org.elasticsearch.watcher.transport.actions.delete.DeleteWatchAction;
import org.elasticsearch.watcher.transport.actions.delete.DeleteWatchRequest;
import org.elasticsearch.watcher.transport.actions.delete.DeleteWatchRequestBuilder;
import org.elasticsearch.watcher.transport.actions.delete.DeleteWatchResponse;
import org.elasticsearch.watcher.transport.actions.execute.ExecuteWatchAction;
import org.elasticsearch.watcher.transport.actions.execute.ExecuteWatchRequest;
import org.elasticsearch.watcher.transport.actions.execute.ExecuteWatchRequestBuilder;
import org.elasticsearch.watcher.transport.actions.execute.ExecuteWatchResponse;
import org.elasticsearch.watcher.transport.actions.get.GetWatchAction;
import org.elasticsearch.watcher.transport.actions.get.GetWatchRequest;
import org.elasticsearch.watcher.transport.actions.get.GetWatchRequestBuilder;
@ -25,10 +29,6 @@ import org.elasticsearch.watcher.transport.actions.put.PutWatchAction;
import org.elasticsearch.watcher.transport.actions.put.PutWatchRequest;
import org.elasticsearch.watcher.transport.actions.put.PutWatchRequestBuilder;
import org.elasticsearch.watcher.transport.actions.put.PutWatchResponse;
import org.elasticsearch.watcher.transport.actions.execute.ExecuteWatchAction;
import org.elasticsearch.watcher.transport.actions.execute.ExecuteWatchRequest;
import org.elasticsearch.watcher.transport.actions.execute.ExecuteWatchRequestBuilder;
import org.elasticsearch.watcher.transport.actions.execute.ExecuteWatchResponse;
import org.elasticsearch.watcher.transport.actions.service.WatcherServiceAction;
import org.elasticsearch.watcher.transport.actions.service.WatcherServiceRequest;
import org.elasticsearch.watcher.transport.actions.service.WatcherServiceRequestBuilder;

View File

@ -15,7 +15,6 @@ import org.elasticsearch.watcher.throttle.Throttler;
import org.elasticsearch.watcher.trigger.manual.ManualTriggerEvent;
import org.elasticsearch.watcher.watch.Watch;
import java.util.HashMap;
import java.util.Set;
import static org.elasticsearch.common.joda.time.DateTimeZone.UTC;
@ -55,24 +54,26 @@ public class ManualExecutionContext extends WatchExecutionContext {
return recordExecution;
}
public static Builder builder(Watch watch) {
return new Builder(watch);
public static Builder builder(Watch watch, ManualTriggerEvent event) {
return new Builder(watch, event);
}
public static class Builder {
private final Watch watch;
private final ManualTriggerEvent triggerEvent;
protected DateTime executionTime;
private boolean recordExecution = false;
private Predicate<String> simulateActionPredicate = Predicates.alwaysFalse();
private Input.Result inputResult;
private Condition.Result conditionResult;
private Throttler.Result throttlerResult;
private ManualTriggerEvent triggerEvent;
private Builder(Watch watch) {
private Builder(Watch watch, ManualTriggerEvent triggerEvent) {
this.watch = watch;
assert triggerEvent != null;
this.triggerEvent = triggerEvent;
}
public Builder executionTime(DateTime executionTime) {
@ -110,19 +111,10 @@ public class ManualExecutionContext extends WatchExecutionContext {
return this;
}
public Builder triggerEvent(ManualTriggerEvent triggerEvent) {
this.triggerEvent = triggerEvent;
return this;
}
public ManualExecutionContext build() {
if (executionTime == null) {
executionTime = DateTime.now(UTC);
}
if (triggerEvent == null) {
triggerEvent = new ManualTriggerEvent(watch.id(), executionTime, new HashMap<String, Object>());
}
return new ManualExecutionContext(watch, executionTime, triggerEvent, inputResult, conditionResult, throttlerResult, simulateActionPredicate, recordExecution);
}
}

View File

@ -262,7 +262,7 @@ public class WatchRecord implements ToXContent {
} else if (WATCH_EXECUTION_FIELD.match(currentFieldName)) {
record.execution = WatchExecutionResult.Parser.parse(record.id, parser, conditionRegistry, actionRegistry, inputRegistry, transformRegistry);
} else if (TRIGGER_EVENT_FIELD.match(currentFieldName)) {
record.triggerEvent = triggerService.parseTriggerEvent(id, parser);
record.triggerEvent = triggerService.parseTriggerEvent(record.watchId, id, parser);
} else {
throw new WatcherException("could not parse watch record [{}]. unexpected field [{}]", id, currentFieldName);
}

View File

@ -20,6 +20,8 @@ import org.elasticsearch.watcher.rest.WatcherRestHandler;
import org.elasticsearch.watcher.transport.actions.execute.ExecuteWatchRequest;
import org.elasticsearch.watcher.transport.actions.execute.ExecuteWatchRequestBuilder;
import org.elasticsearch.watcher.transport.actions.execute.ExecuteWatchResponse;
import org.elasticsearch.watcher.trigger.TriggerEvent;
import org.elasticsearch.watcher.trigger.TriggerService;
import java.io.IOException;
@ -32,13 +34,16 @@ public class RestExecuteWatchAction extends WatcherRestHandler {
static ParseField ALTERNATIVE_INPUT_FIELD = new ParseField("alternative_input");
static ParseField IGNORE_CONDITION_FIELD = new ParseField("ignore_condition");
static ParseField IGNORE_THROTTLE_FIELD = new ParseField("ignore_throttle");
static ParseField TRIGGER_DATA_FIELD = new ParseField("trigger_data");
static ParseField TRIGGER_EVENT_FIELD = new ParseField("trigger_event");
final TriggerService triggerService;
@Inject
public RestExecuteWatchAction(Settings settings, RestController controller, Client client) {
public RestExecuteWatchAction(Settings settings, RestController controller, Client client, TriggerService triggerService) {
super(settings, controller, client);
controller.registerHandler(RestRequest.Method.POST, URI_BASE + "/watch/{id}/_execute", this);
controller.registerHandler(RestRequest.Method.PUT, URI_BASE + "/watch/{id}/_execute", this);
this.triggerService = triggerService;
}
@Override
@ -55,70 +60,77 @@ public class RestExecuteWatchAction extends WatcherRestHandler {
}
//This tightly binds the REST API to the java API
private static ExecuteWatchRequest parseRequest(RestRequest request, WatcherClient client) throws IOException {
ExecuteWatchRequestBuilder executeWatchRequestBuilder = client.prepareExecuteWatch(request.param("id"));
private ExecuteWatchRequest parseRequest(RestRequest request, WatcherClient client) throws IOException {
String watchId = request.param("id");
if (request.content() == null || request.content().length() == 0) {
//If there isn't any content just return the default request
return executeWatchRequestBuilder.request();
}
ExecuteWatchRequestBuilder executeWatchRequestBuilder = client.prepareExecuteWatch(watchId);
TriggerEvent triggerEvent = null;
XContentParser parser = XContentHelper.createParser(request.content());
parser.nextToken();
String currentFieldName = null;
XContentParser.Token token = parser.nextToken();
for (; token != XContentParser.Token.END_OBJECT; token = parser.nextToken()) {
switch (token) {
case FIELD_NAME:
currentFieldName = parser.currentName();
break;
case VALUE_BOOLEAN:
if (IGNORE_CONDITION_FIELD.match(currentFieldName)) {
executeWatchRequestBuilder.setIgnoreCondition(parser.booleanValue());
} else if (IGNORE_THROTTLE_FIELD.match(currentFieldName)) {
executeWatchRequestBuilder.setIgnoreThrottle(parser.booleanValue());
} else if (RECORD_EXECUTION_FIELD.match(currentFieldName)) {
executeWatchRequestBuilder.setRecordExecution(parser.booleanValue());
} else {
throw new ParseException("invalid watch execution request, unexpected boolean value field [" + currentFieldName + "]");
}
break;
case START_OBJECT:
if (ALTERNATIVE_INPUT_FIELD.match(currentFieldName)) {
executeWatchRequestBuilder.setAlternativeInput(parser.map());
} else if (TRIGGER_DATA_FIELD.match(currentFieldName)) {
executeWatchRequestBuilder.setTriggerData(parser.map());
} else {
throw new ParseException("invalid watch execution request, unexpected object value field [" + currentFieldName + "]");
}
break;
case START_ARRAY:
if (SIMULATED_ACTIONS_FIELD.match(currentFieldName)) {
for (XContentParser.Token arrayToken = parser.nextToken(); arrayToken != XContentParser.Token.END_ARRAY; arrayToken = parser.nextToken()) {
if (arrayToken == XContentParser.Token.VALUE_STRING) {
executeWatchRequestBuilder.addSimulatedActions(parser.text());
}
}
} else {
throw new ParseException("invalid watch execution request, unexpected array value field [" + currentFieldName + "]");
}
break;
case VALUE_STRING:
if (SIMULATED_ACTIONS_FIELD.match(currentFieldName)) {
if (parser.text().equals("_all")) {
executeWatchRequestBuilder.addSimulatedActions("_all");
if (request.content() != null && request.content().length() != 0) {
XContentParser parser = XContentHelper.createParser(request.content());
parser.nextToken();
String currentFieldName = null;
XContentParser.Token token = parser.nextToken();
for (; token != XContentParser.Token.END_OBJECT; token = parser.nextToken()) {
switch (token) {
case FIELD_NAME:
currentFieldName = parser.currentName();
break;
case VALUE_BOOLEAN:
if (IGNORE_CONDITION_FIELD.match(currentFieldName)) {
executeWatchRequestBuilder.setIgnoreCondition(parser.booleanValue());
} else if (IGNORE_THROTTLE_FIELD.match(currentFieldName)) {
executeWatchRequestBuilder.setIgnoreThrottle(parser.booleanValue());
} else if (RECORD_EXECUTION_FIELD.match(currentFieldName)) {
executeWatchRequestBuilder.setRecordExecution(parser.booleanValue());
} else {
throw new ParseException("invalid watch execution request, unexpected string value [" + parser.text() + "] for field [" + SIMULATED_ACTIONS_FIELD.getPreferredName() + "]");
throw new ParseException("invalid watch execution request, unexpected boolean value field [" + currentFieldName + "]");
}
} else {
throw new ParseException("invalid watch execution request, unexpected string value field [" + currentFieldName + "]");
}
break;
default:
throw new ParseException("invalid watch execution request, unexpected token field [" + token + "]");
break;
case START_OBJECT:
if (ALTERNATIVE_INPUT_FIELD.match(currentFieldName)) {
executeWatchRequestBuilder.setAlternativeInput(parser.map());
} else if (TRIGGER_EVENT_FIELD.match(currentFieldName)) {
triggerEvent = triggerService.parseTriggerEvent(watchId, watchId, parser);
} else {
throw new ParseException("invalid watch execution request, unexpected object value field [" + currentFieldName + "]");
}
break;
case START_ARRAY:
if (SIMULATED_ACTIONS_FIELD.match(currentFieldName)) {
for (XContentParser.Token arrayToken = parser.nextToken(); arrayToken != XContentParser.Token.END_ARRAY; arrayToken = parser.nextToken()) {
if (arrayToken == XContentParser.Token.VALUE_STRING) {
executeWatchRequestBuilder.addSimulatedActions(parser.text());
}
}
} else {
throw new ParseException("invalid watch execution request, unexpected array value field [" + currentFieldName + "]");
}
break;
case VALUE_STRING:
if (SIMULATED_ACTIONS_FIELD.match(currentFieldName)) {
if (parser.text().equals("_all")) {
executeWatchRequestBuilder.addSimulatedActions("_all");
} else {
throw new ParseException("invalid watch execution request, unexpected string value [" + parser.text() + "] for field [" + SIMULATED_ACTIONS_FIELD.getPreferredName() + "]");
}
} else {
throw new ParseException("invalid watch execution request, unexpected string value field [" + currentFieldName + "]");
}
break;
default:
throw new ParseException("invalid watch execution request, unexpected token field [" + token + "]");
}
}
}
if (triggerEvent == null) {
throw new WatcherException("[{}] is a required field.",TRIGGER_EVENT_FIELD.getPreferredName());
}
return executeWatchRequestBuilder.request();
}

View File

@ -8,8 +8,13 @@ package org.elasticsearch.watcher.transport.actions.execute;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ValidateActions;
import org.elasticsearch.action.support.master.MasterNodeOperationRequest;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.watcher.trigger.TriggerEvent;
import java.io.IOException;
import java.util.HashSet;
@ -26,7 +31,8 @@ public class ExecuteWatchRequest extends MasterNodeOperationRequest<ExecuteWatch
private boolean ignoreThrottle = false;
private boolean recordExecution = false;
private Map<String, Object> alternativeInput = null;
private Map<String, Object> triggerData = null;
private BytesReference triggerSource = null;
private String triggerType = null;
private Set<String> simulatedActionIds = new HashSet<>();
ExecuteWatchRequest() {
@ -110,19 +116,37 @@ public class ExecuteWatchRequest extends MasterNodeOperationRequest<ExecuteWatch
}
/**
* @return the alternative input to use
* @return the trigger to use
*/
public Map<String, Object> getTriggerData() {
return triggerData;
public BytesReference getTriggerSource() {
return triggerSource;
}
/**
* @param triggerData the trigger data to use
* @param triggerType the type of trigger to use
* @param triggerSource the trigger source to use
*/
public void setTriggerData(Map<String, Object> triggerData) {
this.triggerData = triggerData;
public void setTriggerEvent(String triggerType, BytesReference triggerSource) {
this.triggerType = triggerType;
this.triggerSource = triggerSource;
}
/**
* @param triggerEvent the trigger event to use
* @throws IOException
*/
public void setTriggerEvent(TriggerEvent triggerEvent) throws IOException {
XContentBuilder jsonBuilder = XContentFactory.jsonBuilder();
triggerEvent.toXContent(jsonBuilder, ToXContent.EMPTY_PARAMS);
setTriggerEvent(triggerEvent.type(), jsonBuilder.bytes());
}
/**
* @return the type of trigger to use
*/
public String getTriggerType() { return triggerType; }
/**
* @return the trigger data to use
*/
@ -152,6 +176,9 @@ public class ExecuteWatchRequest extends MasterNodeOperationRequest<ExecuteWatch
if (id == null){
validationException = ValidateActions.addValidationError("watch id is missing", validationException);
}
if (triggerSource == null || triggerType == null) {
validationException = ValidateActions.addValidationError("trigger event is missing", validationException);
}
return validationException;
}
@ -165,9 +192,8 @@ public class ExecuteWatchRequest extends MasterNodeOperationRequest<ExecuteWatch
if (in.readBoolean()){
alternativeInput = in.readMap();
}
if (in.readBoolean()) {
triggerData = in.readMap();
}
triggerSource = in.readBytesReference();
triggerType = in.readString();
long simulatedIdCount = in.readLong();
for(long i = 0; i < simulatedIdCount; ++i) {
simulatedActionIds.add(in.readString());
@ -186,10 +212,8 @@ public class ExecuteWatchRequest extends MasterNodeOperationRequest<ExecuteWatch
if (alternativeInput != null) {
out.writeMap(alternativeInput);
}
out.writeBoolean(triggerData != null);
if (triggerData != null){
out.writeMap(triggerData);
}
out.writeBytesReference(triggerSource);
out.writeString(triggerType);
out.writeLong(simulatedActionIds.size());
for (String simulatedId : simulatedActionIds) {
out.writeString(simulatedId);

View File

@ -8,8 +8,11 @@ package org.elasticsearch.watcher.transport.actions.execute;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.master.MasterNodeOperationRequestBuilder;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.watcher.client.WatcherClient;
import org.elasticsearch.watcher.trigger.TriggerEvent;
import java.io.IOException;
import java.util.Map;
/**
@ -65,10 +68,19 @@ public class ExecuteWatchRequestBuilder extends MasterNodeOperationRequestBuilde
}
/**
* @param triggerData the trigger data to use
* @param triggerType the trigger type to use
* @param triggerSource the trigger source to use
*/
public ExecuteWatchRequestBuilder setTriggerData(Map<String, Object> triggerData) {
request.setTriggerData(triggerData);
public ExecuteWatchRequestBuilder setTriggerEvent(String triggerType, BytesReference triggerSource) {
request.setTriggerEvent(triggerType, triggerSource);
return this;
}
/**
* @param triggerEvent the trigger event to use
*/
public ExecuteWatchRequestBuilder setTriggerEvent(TriggerEvent triggerEvent) throws IOException {
request.setTriggerEvent(triggerEvent);
return this;
}

View File

@ -15,7 +15,6 @@ import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.joda.time.DateTime;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.threadpool.ThreadPool;
@ -31,6 +30,8 @@ import org.elasticsearch.watcher.support.clock.Clock;
import org.elasticsearch.watcher.support.xcontent.WatcherParams;
import org.elasticsearch.watcher.throttle.Throttler;
import org.elasticsearch.watcher.transport.actions.WatcherTransportAction;
import org.elasticsearch.watcher.trigger.TriggerEvent;
import org.elasticsearch.watcher.trigger.TriggerService;
import org.elasticsearch.watcher.trigger.manual.ManualTriggerEvent;
import org.elasticsearch.watcher.watch.Payload;
import org.elasticsearch.watcher.watch.Watch;
@ -45,15 +46,17 @@ public class TransportExecuteWatchAction extends WatcherTransportAction<ExecuteW
private final ExecutionService executionService;
private final WatchStore watchStore;
private final Clock clock;
private final TriggerService triggerService;
@Inject
public TransportExecuteWatchAction(Settings settings, TransportService transportService, ClusterService clusterService,
ThreadPool threadPool, ActionFilters actionFilters, ExecutionService executionService,
Clock clock, LicenseService licenseService, WatchStore watchStore) {
Clock clock, LicenseService licenseService, WatchStore watchStore, TriggerService triggerService) {
super(settings, ExecuteWatchAction.NAME, transportService, clusterService, threadPool, actionFilters, licenseService);
this.executionService = executionService;
this.watchStore = watchStore;
this.clock = clock;
this.triggerService = triggerService;
}
@Override
@ -79,7 +82,9 @@ public class TransportExecuteWatchAction extends WatcherTransportAction<ExecuteW
throw new WatcherException("watch [" + request.getId() + "] does not exist");
}
ManualExecutionContext.Builder ctxBuilder = ManualExecutionContext.builder(watch);
TriggerEvent triggerEvent = triggerService.parseTriggerEvent(watch.id(), watch.id() + "_manual_execution", request.getTriggerType(), request.getTriggerSource());
ManualExecutionContext.Builder ctxBuilder = ManualExecutionContext.builder(watch, new ManualTriggerEvent(triggerEvent.jobName(), triggerEvent));
DateTime executionTime = clock.now(UTC);
ctxBuilder.executionTime(executionTime);
if (request.isSimulateAllActions()) {
@ -87,9 +92,6 @@ public class TransportExecuteWatchAction extends WatcherTransportAction<ExecuteW
} else {
ctxBuilder.simulateActions(request.getSimulatedActionIds().toArray(new String[request.getSimulatedActionIds().size()]));
}
if (request.getTriggerData() != null) {
ctxBuilder.triggerEvent(new ManualTriggerEvent(watch.id(), executionTime, request.getTriggerData()));
}
if (request.getAlternativeInput() != null) {
ctxBuilder.withInput(new SimpleInput.Result(new Payload.Simple(request.getAlternativeInput())));
}

View File

@ -39,7 +39,7 @@ public interface TriggerEngine<T extends Trigger, E extends TriggerEvent> {
T parseTrigger(String context, XContentParser parser) throws IOException;
E parseTriggerEvent(String context, XContentParser parser) throws IOException;
E parseTriggerEvent(TriggerService service, String watchId, String context, XContentParser parser) throws IOException;
interface Listener {

View File

@ -5,10 +5,12 @@
*/
package org.elasticsearch.watcher.trigger;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.ImmutableMap;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentParser;
import java.io.IOException;
@ -98,32 +100,39 @@ public class TriggerService extends AbstractComponent {
return engine.parseTrigger(jobName, parser);
}
public TriggerEvent parseTriggerEvent(String historyRecordId, XContentParser parser) throws IOException {
public TriggerEvent parseTriggerEvent(String watchId, String context, XContentParser parser) throws IOException {
XContentParser.Token token = parser.currentToken();
assert token == XContentParser.Token.START_OBJECT;
token = parser.nextToken();
if (token != XContentParser.Token.FIELD_NAME) {
throw new TriggerException("could not parse trigger event for [" + historyRecordId + "]. expected trigger type string field, but found [" + token + "]");
throw new TriggerException("could not parse trigger event for [{}] for watch [{}]. expected trigger type string field, but found [{}]", context, watchId, token);
}
String type = parser.text();
token = parser.nextToken();
if (token != XContentParser.Token.START_OBJECT) {
throw new TriggerException("could not parse trigger event [" + type + "] for [" + historyRecordId + "]. expected trigger an object as the trigger body, but found [" + token + "]");
throw new TriggerException("could not parse trigger event for [{}] for watch [{}]. expected trigger an object as the trigger body, but found [{}]", context, watchId, token);
}
TriggerEvent trigger = parseTriggerEvent(historyRecordId, type, parser);
TriggerEvent trigger = parseTriggerEvent(watchId, context, type, parser);
token = parser.nextToken();
if (token != XContentParser.Token.END_OBJECT) {
throw new TriggerException("could not parse trigger [{}] for [{}]. expected [END_OBJECT] token, but found [{}]", type, historyRecordId, token);
throw new TriggerException("could not parse trigger [{}] for [{}]. expected [END_OBJECT] token, but found [{}]", type, context, token);
}
return trigger;
}
public TriggerEvent parseTriggerEvent(String context, String type, XContentParser parser) throws IOException {
public TriggerEvent parseTriggerEvent(String watchId, String context, String type, BytesReference bytes) throws IOException {
XContentParser parser = XContentHelper.createParser(bytes);
parser.nextToken();
return parseTriggerEvent(watchId, context, type, parser);
}
public TriggerEvent parseTriggerEvent(String watchId, String context, String type, XContentParser parser) throws IOException {
TriggerEngine engine = engines.get(type);
if (engine == null) {
throw new TriggerException("Unknown trigger type [{}]", type);
}
return engine.parseTriggerEvent(context, parser);
return engine.parseTriggerEvent(this, watchId, context, parser);
}
static class Listeners implements TriggerEngine.Listener {

View File

@ -8,6 +8,7 @@ package org.elasticsearch.watcher.trigger.manual;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.watcher.trigger.TriggerEngine;
import org.elasticsearch.watcher.trigger.TriggerService;
import java.io.IOException;
import java.util.Collection;
@ -60,7 +61,7 @@ public class ManualTriggerEngine implements TriggerEngine<ManualTrigger,ManualTr
}
@Override
public ManualTriggerEvent parseTriggerEvent(String context, XContentParser parser) throws IOException {
return ManualTriggerEvent.parse(context, parser);
public ManualTriggerEvent parseTriggerEvent(TriggerService service, String watchId, String context, XContentParser parser) throws IOException {
return ManualTriggerEvent.parse(service, watchId, context, parser);
}
}

View File

@ -5,33 +5,25 @@
*/
package org.elasticsearch.watcher.trigger.manual;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.joda.time.DateTime;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.watcher.WatcherException;
import org.elasticsearch.watcher.support.WatcherDateUtils;
import org.elasticsearch.watcher.trigger.TriggerEvent;
import org.elasticsearch.watcher.trigger.TriggerService;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import static org.elasticsearch.common.joda.time.DateTimeZone.UTC;
/**
*/
public class ManualTriggerEvent extends TriggerEvent {
private final Map<String, Object> triggerData;
private final TriggerEvent triggerEvent;
public ManualTriggerEvent(DateTime triggeredTime, Map<String, Object> triggerData) {
this(null, triggeredTime, triggerData);
}
public ManualTriggerEvent(String jobName, DateTime triggeredTime, Map<String, Object> triggerData) {
super(jobName, triggeredTime);
data.putAll(triggerData);
this.triggerData = triggerData;
public ManualTriggerEvent(String jobName, TriggerEvent triggerEvent) {
super(jobName, triggerEvent.triggeredTime());
this.triggerEvent = triggerEvent;
data.putAll(triggerEvent.data());
}
@Override
@ -42,39 +34,13 @@ public class ManualTriggerEvent extends TriggerEvent {
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
WatcherDateUtils.writeDate(Field.TRIGGERED_TIME.getPreferredName(), builder, triggeredTime);
builder.field(Field.TRIGGER_DATA.getPreferredName(), triggerData);
builder.field(triggerEvent.type(), triggerEvent, params);
return builder.endObject();
}
public static ManualTriggerEvent parse(String context, XContentParser parser) throws IOException {
DateTime triggeredTime = null;
Map<String, Object> triggerData = new HashMap<>();
String currentFieldName = null;
XContentParser.Token token;
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
currentFieldName = parser.currentName();
} else if (Field.TRIGGERED_TIME.match(currentFieldName)) {
try {
triggeredTime = WatcherDateUtils.parseDate(currentFieldName, parser, UTC);
} catch (WatcherDateUtils.ParseException pe) {
throw new ParseException("could not parse [{}] trigger event for [{}]. failed to parse date field [{}]", pe, ManualTriggerEngine.TYPE, context, currentFieldName);
}
} else if (token == XContentParser.Token.START_OBJECT) {
if (Field.TRIGGER_DATA.match(currentFieldName)) {
triggerData = parser.map();
} else {
throw new ParseException("could not parse trigger event for [{}]. unexpected object value field [{}]", context, currentFieldName);
}
} else {
throw new ParseException("could not parse trigger event for [{}]. unexpected token [{}]", context, token);
}
}
// should never be, it's fully controlled internally (not coming from the user)
assert triggeredTime != null;
return new ManualTriggerEvent(triggeredTime, triggerData);
public static ManualTriggerEvent parse(TriggerService triggerService, String watchId, String context, XContentParser parser) throws IOException {
TriggerEvent parsedTriggerEvent = triggerService.parseTriggerEvent(watchId, context, parser);
return new ManualTriggerEvent(context, parsedTriggerEvent);
}
public static class ParseException extends WatcherException {
@ -88,8 +54,4 @@ public class ManualTriggerEvent extends TriggerEvent {
}
}
interface Field extends TriggerEvent.Field {
ParseField TRIGGER_DATA = new ParseField("trigger_data");
}
}

View File

@ -8,6 +8,7 @@ package org.elasticsearch.watcher.trigger.schedule;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.watcher.trigger.AbstractTriggerEngine;
import org.elasticsearch.watcher.trigger.TriggerService;
import java.io.IOException;
@ -37,7 +38,7 @@ public abstract class ScheduleTriggerEngine extends AbstractTriggerEngine<Schedu
}
@Override
public ScheduleTriggerEvent parseTriggerEvent(String context, XContentParser parser) throws IOException {
return ScheduleTriggerEvent.parse(context, parser);
public ScheduleTriggerEvent parseTriggerEvent(TriggerService service, String watchId, String context, XContentParser parser) throws IOException {
return ScheduleTriggerEvent.parse(watchId, context, parser);
}
}

View File

@ -51,7 +51,7 @@ public class ScheduleTriggerEvent extends TriggerEvent {
return builder.endObject();
}
public static ScheduleTriggerEvent parse(String context, XContentParser parser) throws IOException {
public static ScheduleTriggerEvent parse(String watchId, String context, XContentParser parser) throws IOException {
DateTime triggeredTime = null;
DateTime scheduledTime = null;
@ -64,16 +64,16 @@ public class ScheduleTriggerEvent extends TriggerEvent {
try {
triggeredTime = WatcherDateUtils.parseDate(currentFieldName, parser, UTC);
} catch (WatcherDateUtils.ParseException pe) {
throw new ParseException("could not parse [{}] trigger event for [{}]. failed to parse date field [{}]", pe, ScheduleTriggerEngine.TYPE, context, currentFieldName);
throw new ParseException("could not parse [{}] trigger event for [{}] for watch [{}]. failed to parse date field [{}]", pe, ScheduleTriggerEngine.TYPE, context, watchId, currentFieldName);
}
} else if (Field.SCHEDULED_TIME.match(currentFieldName)) {
try {
scheduledTime = WatcherDateUtils.parseDate(currentFieldName, parser, UTC);
} catch (WatcherDateUtils.ParseException pe) {
throw new ParseException("could not parse [{}] trigger event for [{}]. failed to parse date field [{}]", pe, ScheduleTriggerEngine.TYPE, context, currentFieldName);
throw new ParseException("could not parse [{}] trigger event for [{}] for watch [{}]. failed to parse date field [{}]", pe, ScheduleTriggerEngine.TYPE, context, watchId, currentFieldName);
}
}else {
throw new ParseException("could not parse trigger event for [{}]. unexpected token [{}]", context, token);
throw new ParseException("could not parse trigger event for [{}] for watch [{}]. unexpected token [{}]", context, watchId, token);
}
}

View File

@ -20,6 +20,9 @@ import org.elasticsearch.watcher.transport.actions.execute.ExecuteWatchResponse;
import org.elasticsearch.watcher.transport.actions.get.GetWatchRequest;
import org.elasticsearch.watcher.transport.actions.put.PutWatchRequest;
import org.elasticsearch.watcher.transport.actions.put.PutWatchResponse;
import org.elasticsearch.watcher.trigger.TriggerEvent;
import org.elasticsearch.watcher.trigger.manual.ManualTriggerEvent;
import org.elasticsearch.watcher.trigger.schedule.ScheduleTriggerEvent;
import org.elasticsearch.watcher.watch.Payload;
import org.elasticsearch.watcher.watch.Watch;
import org.junit.Test;
@ -28,6 +31,7 @@ import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import static org.elasticsearch.common.joda.time.DateTimeZone.UTC;
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
import static org.elasticsearch.watcher.actions.ActionBuilders.loggingAction;
import static org.elasticsearch.watcher.client.WatchSourceBuilders.watchBuilder;
@ -61,15 +65,16 @@ public class ManualExecutionTests extends AbstractWatcherIntegrationTests {
ManualExecutionContext.Builder ctxBuilder;
Watch parsedWatch = null;
ManualTriggerEvent triggerEvent = new ManualTriggerEvent("_id", new ScheduleTriggerEvent(new DateTime(UTC), new DateTime(UTC)));
if (recordExecution) {
PutWatchResponse putWatchResponse = watcherClient().putWatch(new PutWatchRequest("_id", watchBuilder)).actionGet();
assertThat(putWatchResponse.getVersion(), greaterThan(0L));
refresh();
assertThat(watcherClient().getWatch(new GetWatchRequest("_id")).actionGet().isFound(), equalTo(true));
ctxBuilder = ManualExecutionContext.builder(watchService().getWatch("_id")); //If we are persisting the state we need to use the exact watch that is in memory
ctxBuilder = ManualExecutionContext.builder(watchService().getWatch("_id"), triggerEvent); //If we are persisting the state we need to use the exact watch that is in memory
} else {
parsedWatch = watchParser().parse("_id", false, watchBuilder.buildAsBytes(XContentType.JSON));
ctxBuilder = ManualExecutionContext.builder(parsedWatch);
ctxBuilder = ManualExecutionContext.builder(parsedWatch, triggerEvent);
}
if (ignoreCondition) {
@ -146,14 +151,16 @@ public class ManualExecutionTests extends AbstractWatcherIntegrationTests {
Map<String, Object> map2 = new HashMap<>();
map2.put("foo", map1);
ManualExecutionContext.Builder ctxBuilder1 = ManualExecutionContext.builder(watchService().getWatch("_id"));
ManualTriggerEvent triggerEvent = new ManualTriggerEvent("_id", new ScheduleTriggerEvent(new DateTime(UTC), new DateTime(UTC)));
ManualExecutionContext.Builder ctxBuilder1 = ManualExecutionContext.builder(watchService().getWatch("_id"), triggerEvent);
ctxBuilder1.simulateActions("_all");
ctxBuilder1.withInput(new SimpleInput.Result(new Payload.Simple(map1)));
ctxBuilder1.recordExecution(true);
WatchRecord watchRecord1 = executionService().execute(ctxBuilder1.build());
ManualExecutionContext.Builder ctxBuilder2 = ManualExecutionContext.builder(watchService().getWatch("_id"));
ManualExecutionContext.Builder ctxBuilder2 = ManualExecutionContext.builder(watchService().getWatch("_id"), triggerEvent);
ctxBuilder2.simulateActions("_all");
ctxBuilder2.withInput(new SimpleInput.Result(new Payload.Simple(map2)));
ctxBuilder2.recordExecution(true);
@ -177,8 +184,16 @@ public class ManualExecutionTests extends AbstractWatcherIntegrationTests {
.addAction("log", loggingAction("foobar"));
watcherClient().putWatch(new PutWatchRequest("_id", watchBuilder)).actionGet();
TriggerEvent triggerEvent = new ScheduleTriggerEvent(new DateTime(UTC), new DateTime(UTC));
Wid wid = new Wid("_watchId",1,new DateTime());
ExecuteWatchResponse executeWatchResponse = watcherClient().prepareExecuteWatch().setId("_id").get();
ExecuteWatchResponse executeWatchResponse = watcherClient().prepareExecuteWatch()
.setId("_id")
.setTriggerEvent(triggerEvent)
.get();
WatchRecord watchRecord = watchRecordParser.parse(wid.value(), 1, executeWatchResponse.getSource().getBytes());
assertThat(watchRecord.state(), equalTo(WatchRecord.State.EXECUTION_NOT_NEEDED));
@ -192,16 +207,19 @@ public class ManualExecutionTests extends AbstractWatcherIntegrationTests {
.addAction("log", loggingAction("foobar"));
watcherClient().putWatch(new PutWatchRequest("_id", watchBuilder)).actionGet();
executeWatchResponse = watcherClient().prepareExecuteWatch().setId("_id").setRecordExecution(true).get();
executeWatchResponse = watcherClient().prepareExecuteWatch().setId("_id").setTriggerEvent(triggerEvent).setRecordExecution(true).get();
watchRecord = watchRecordParser.parse(wid.value(), 1, executeWatchResponse.getSource().getBytes());
assertThat(watchRecord.state(), equalTo(WatchRecord.State.EXECUTED));
assertThat(watchRecord.execution().inputResult().payload().data().get("foo").toString(), equalTo("bar"));
assertThat(watchRecord.execution().actionsResults().get("log"), not(instanceOf(LoggingAction.Result.Simulated.class)));
executeWatchResponse = watcherClient().prepareExecuteWatch().setId("_id").get();
watchRecord = watchRecordParser.parse(wid.value(), 1, executeWatchResponse.getSource().getBytes());
executeWatchResponse = watcherClient().prepareExecuteWatch().setId("_id").setTriggerEvent(triggerEvent).get();
watchRecord = watchRecordParser.parse(wid.value(), 1, executeWatchResponse.getSource().getBytes());
assertThat(watchRecord.state(), equalTo(WatchRecord.State.THROTTLED));
}
}

View File

@ -6,6 +6,7 @@
package org.elasticsearch.watcher.test.integration;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.common.joda.time.DateTime;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
@ -18,6 +19,8 @@ import org.elasticsearch.watcher.test.AbstractWatcherIntegrationTests;
import org.elasticsearch.watcher.transport.actions.execute.ExecuteWatchResponse;
import org.elasticsearch.watcher.transport.actions.get.GetWatchResponse;
import org.elasticsearch.watcher.support.xcontent.XContentSource;
import org.elasticsearch.watcher.trigger.TriggerEvent;
import org.elasticsearch.watcher.trigger.schedule.ScheduleTriggerEvent;
import org.elasticsearch.watcher.watch.WatchStore;
import org.junit.After;
import org.junit.Test;
@ -27,6 +30,7 @@ import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import static org.elasticsearch.common.joda.time.DateTimeZone.UTC;
import static org.elasticsearch.watcher.actions.ActionBuilders.emailAction;
import static org.elasticsearch.watcher.client.WatchSourceBuilders.watchBuilder;
import static org.elasticsearch.watcher.condition.ConditionBuilders.alwaysCondition;
@ -131,9 +135,11 @@ public class EmailSecretsIntegrationTests extends AbstractWatcherIntegrationTest
}
});
TriggerEvent triggerEvent = new ScheduleTriggerEvent(new DateTime(UTC), new DateTime(UTC));
ExecuteWatchResponse executeResponse = watcherClient.prepareExecuteWatch("_id")
.setRecordExecution(false)
.setIgnoreThrottle(true)
.setTriggerEvent(triggerEvent)
.get();
assertThat(executeResponse, notNullValue());
contentSource = executeResponse.getSource();

View File

@ -9,6 +9,7 @@ import com.squareup.okhttp.mockwebserver.MockResponse;
import com.squareup.okhttp.mockwebserver.MockWebServer;
import com.squareup.okhttp.mockwebserver.RecordedRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.common.joda.time.DateTime;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
@ -23,6 +24,8 @@ import org.elasticsearch.watcher.test.AbstractWatcherIntegrationTests;
import org.elasticsearch.watcher.transport.actions.execute.ExecuteWatchResponse;
import org.elasticsearch.watcher.transport.actions.get.GetWatchResponse;
import org.elasticsearch.watcher.support.xcontent.XContentSource;
import org.elasticsearch.watcher.trigger.TriggerEvent;
import org.elasticsearch.watcher.trigger.schedule.ScheduleTriggerEvent;
import org.elasticsearch.watcher.watch.WatchStore;
import org.junit.After;
import org.junit.Before;
@ -31,6 +34,7 @@ import org.junit.Test;
import java.net.BindException;
import java.util.Map;
import static org.elasticsearch.common.joda.time.DateTimeZone.UTC;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.watcher.actions.ActionBuilders.loggingAction;
import static org.elasticsearch.watcher.actions.ActionBuilders.webhookAction;
@ -140,9 +144,11 @@ public class HttpSecretsIntegrationTests extends AbstractWatcherIntegrationTests
webServer.enqueue(new MockResponse().setResponseCode(200).setBody(jsonBuilder().startObject().field("key", "value").endObject().bytes().toUtf8()));
TriggerEvent triggerEvent = new ScheduleTriggerEvent(new DateTime(UTC), new DateTime(UTC));
ExecuteWatchResponse executeResponse = watcherClient.prepareExecuteWatch("_id")
.setRecordExecution(false)
.setIgnoreThrottle(true)
.setTriggerEvent(triggerEvent)
.get();
assertThat(executeResponse, notNullValue());
contentSource = executeResponse.getSource();
@ -210,9 +216,11 @@ public class HttpSecretsIntegrationTests extends AbstractWatcherIntegrationTests
webServer.enqueue(new MockResponse().setResponseCode(200).setBody(jsonBuilder().startObject().field("key", "value").endObject().bytes().toUtf8()));
TriggerEvent triggerEvent = new ScheduleTriggerEvent(new DateTime(UTC), new DateTime(UTC));
ExecuteWatchResponse executeResponse = watcherClient.prepareExecuteWatch("_id")
.setRecordExecution(false)
.setIgnoreThrottle(true)
.setTriggerEvent(triggerEvent)
.get();
assertThat(executeResponse, notNullValue());
contentSource = executeResponse.getSource();

View File

@ -6,6 +6,7 @@
package org.elasticsearch.watcher.test.integration;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.common.joda.time.DateTime;
import org.elasticsearch.watcher.actions.logging.LoggingAction;
import org.elasticsearch.watcher.actions.logging.LoggingLevel;
import org.elasticsearch.watcher.condition.always.AlwaysCondition;
@ -15,6 +16,8 @@ import org.elasticsearch.watcher.support.template.Template;
import org.elasticsearch.watcher.test.AbstractWatcherIntegrationTests;
import org.elasticsearch.watcher.test.WatcherTestUtils;
import org.elasticsearch.watcher.transport.actions.execute.ExecuteWatchResponse;
import org.elasticsearch.watcher.trigger.TriggerEvent;
import org.elasticsearch.watcher.trigger.schedule.ScheduleTriggerEvent;
import org.junit.Test;
import java.util.ArrayList;
@ -22,6 +25,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.elasticsearch.common.joda.time.DateTimeZone.UTC;
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
import static org.elasticsearch.index.query.QueryBuilders.termQuery;
import static org.elasticsearch.search.builder.SearchSourceBuilder.searchSource;
@ -30,9 +34,7 @@ import static org.elasticsearch.watcher.condition.ConditionBuilders.scriptCondit
import static org.elasticsearch.watcher.input.InputBuilders.searchInput;
import static org.elasticsearch.watcher.trigger.TriggerBuilders.schedule;
import static org.elasticsearch.watcher.trigger.schedule.Schedules.cron;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.*;
/**
*
@ -90,7 +92,8 @@ public class WatchMetadataTests extends AbstractWatcherIntegrationTests {
.get();
WatchRecord.Parser parser = getInstanceFromMaster(WatchRecord.Parser.class);
ExecuteWatchResponse executeWatchResponse = watcherClient().prepareExecuteWatch("_name").addSimulatedActions("_all").get();
TriggerEvent triggerEvent = new ScheduleTriggerEvent(new DateTime(UTC), new DateTime(UTC));
ExecuteWatchResponse executeWatchResponse = watcherClient().prepareExecuteWatch("_name").setTriggerEvent(triggerEvent).addSimulatedActions("_all").get();
WatchRecord record = parser.parse("test_run", 1, executeWatchResponse.getSource().getBytes());
assertThat(record.metadata().get("foo").toString(), equalTo("bar"));

View File

@ -49,8 +49,8 @@ public class ScheduleTriggerEngineMock extends ScheduleTriggerEngine {
}
@Override
public ScheduleTriggerEvent parseTriggerEvent(String context, XContentParser parser) throws IOException {
return ScheduleTriggerEvent.parse(context, parser);
public ScheduleTriggerEvent parseTriggerEvent(TriggerService service, String watchId, String context, XContentParser parser) throws IOException {
return ScheduleTriggerEvent.parse(watchId, context, parser);
}
@Override