Cleanup and Refactoring of the inputs

* Split the action into two constructs: `Input` and `ExecutableInput`. The former holds all the input configuration, the latter can execute the input based on that configuration (an executable input holds an input)
 - This the code clearer to understand and maintain.
 - This also enabled to pull some common implementation code into the `ExecutableInput` and by that reduce the implementation details of each executable to the minimum required.

* Also, extracted the `Input.Parser` to its own top level class, and renamed it to - `InputFactory`. The main thing that the factory does is: 1) delegate to the parsing to the `Input` class, 2) construct & wire up the `ExecutableInput`.

* With the introduction of `Input`, we no longer need the `SourceBuilder` for inputs. Instead, we have an `Input.Builder` that help you build an input. This is much more intuitive from the client perspective.

* Changed the `request` xcontent field in the http input result to `sent_request` for clarity
* Changed the `request` xcontent field in the search input result to `executed_request` for clarity

Original commit: elastic/x-pack-elasticsearch@63b93f9c7b
This commit is contained in:
uboness 2015-04-17 12:57:34 -07:00
parent 54fddac93f
commit ebda02438e
46 changed files with 1236 additions and 858 deletions

View File

@ -13,15 +13,15 @@
"schedule" : { "cron" : "0 0/1 * * * ?" }
},
"input" : {
"search" : {
"request" : {
"indices" : [ "logstash*" ],
"body" : {
"query" : {
"filtered": {
"query": {
"match": {
"response": 404
"search" : {
"request" : {
"indices" : [ "logstash*" ],
"body" : {
"query" : {
"filtered": {
"query": {
"match": {
"response": 404
}
},
"filter": {

View File

@ -13,15 +13,15 @@
"schedule" : { "cron" : "0 0/1 * * * ?" }
},
"input" : {
"search" : {
"request" : {
"indices" : [ "logstash*" ],
"body" : {
"query" : {
"filtered": {
"query": {
"match": {
"response": 404
"search" : {
"request" : {
"indices" : [ "logstash*" ],
"body" : {
"query" : {
"filtered": {
"query": {
"match": {
"response": 404
}
},
"filter": {

View File

@ -27,15 +27,16 @@
"always_true": {}
},
"actions": [
{
"test_index": {
"index": {
"index": "test",
"type": "test2"
{
"test_index": {
"index": {
"index": "test",
"type": "test2"
}
}
}
} ]
}
]
}
- match: { _id: "my_watch" }
- match: { created: true }

View File

@ -18,7 +18,7 @@ import org.elasticsearch.watcher.actions.Action;
import org.elasticsearch.watcher.condition.Condition;
import org.elasticsearch.watcher.condition.always.AlwaysCondition;
import org.elasticsearch.watcher.input.Input;
import org.elasticsearch.watcher.input.NoneInput;
import org.elasticsearch.watcher.input.none.NoneInput;
import org.elasticsearch.watcher.transform.Transform;
import org.elasticsearch.watcher.trigger.Trigger;
import org.elasticsearch.watcher.watch.Watch;
@ -33,7 +33,7 @@ import java.util.Map;
public class WatchSourceBuilder implements ToXContent {
private Trigger.SourceBuilder trigger;
private Input.SourceBuilder input = NoneInput.SourceBuilder.INSTANCE;
private Input input = NoneInput.INSTANCE;
private Condition condition = AlwaysCondition.INSTANCE;
private Transform.SourceBuilder transform = null;
private Map<String, TransformedAction> actions = new HashMap<>();
@ -45,7 +45,11 @@ public class WatchSourceBuilder implements ToXContent {
return this;
}
public WatchSourceBuilder input(Input.SourceBuilder input) {
public WatchSourceBuilder input(Input.Builder input) {
return input(input.build());
}
public WatchSourceBuilder input(Input input) {
this.input = input;
return this;
}

View File

@ -11,6 +11,7 @@ import org.elasticsearch.common.collect.ImmutableSet;
import org.elasticsearch.common.joda.time.DateTime;
import org.elasticsearch.common.joda.time.DateTimeZone;
import org.elasticsearch.watcher.condition.Condition;
import org.elasticsearch.watcher.input.ExecutableInput;
import org.elasticsearch.watcher.input.Input;
import org.elasticsearch.watcher.throttle.Throttler;
import org.elasticsearch.watcher.trigger.manual.ManualTriggerEvent;

View File

@ -22,7 +22,7 @@ import org.elasticsearch.watcher.actions.ActionRegistry;
import org.elasticsearch.watcher.condition.Condition;
import org.elasticsearch.watcher.condition.ConditionRegistry;
import org.elasticsearch.watcher.execution.Wid;
import org.elasticsearch.watcher.input.Input;
import org.elasticsearch.watcher.input.ExecutableInput;
import org.elasticsearch.watcher.input.InputRegistry;
import org.elasticsearch.watcher.transform.TransformRegistry;
import org.elasticsearch.watcher.trigger.TriggerEvent;
@ -40,7 +40,7 @@ public class WatchRecord implements ToXContent {
private Wid id;
private String name;
private TriggerEvent triggerEvent;
private Input input;
private ExecutableInput input;
private Condition condition;
private State state;
private WatchExecution execution;
@ -79,7 +79,7 @@ public class WatchRecord implements ToXContent {
return name;
}
public Input input() { return input; }
public ExecutableInput input() { return input; }
public Condition condition() {
return condition;
@ -248,7 +248,7 @@ public class WatchRecord implements ToXContent {
currentFieldName = parser.currentName();
} else if (token == XContentParser.Token.START_OBJECT) {
if (Watch.Parser.INPUT_FIELD.match(currentFieldName)) {
record.input = inputRegistry.parse(parser);
record.input = inputRegistry.parse(id, parser);
} else if (Watch.Parser.CONDITION_FIELD.match(currentFieldName)) {
record.condition = conditionRegistry.parseCondition(id, parser);
} else if (METADATA_FIELD.match(currentFieldName)) {

View File

@ -0,0 +1,63 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.watcher.input;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.watcher.execution.WatchExecutionContext;
import java.io.IOException;
/**
*
*/
public abstract class ExecutableInput<I extends Input, R extends Input.Result> implements ToXContent {
protected final I input;
protected final ESLogger logger;
protected ExecutableInput(I input, ESLogger logger) {
this.input = input;
this.logger = logger;
}
/**
* @return the type of this input
*/
public final String type() {
return input.type();
}
I input() {
return input;
}
/**
* Executes this input
*/
public abstract R execute(WatchExecutionContext ctx) throws IOException;
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
return input.toXContent(builder, params);
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ExecutableInput<?, ?> that = (ExecutableInput<?, ?>) o;
return input.equals(that.input);
}
@Override
public int hashCode() {
return input.hashCode();
}
}

View File

@ -5,62 +5,21 @@
*/
package org.elasticsearch.watcher.input;
import org.elasticsearch.watcher.execution.WatchExecutionContext;
import org.elasticsearch.watcher.watch.Payload;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.watcher.watch.Payload;
import java.io.IOException;
/**
*
*/
public abstract class Input<R extends Input.Result> implements ToXContent {
public interface Input extends ToXContent {
protected final ESLogger logger;
String type();
protected Input(ESLogger logger) {
this.logger = logger;
}
/**
* @return the type of this input
*/
public abstract String type();
/**
* Executes this input
*/
public abstract R execute(WatchExecutionContext ctx) throws IOException;
/**
* Parses xcontent to a concrete input of the same type.
*/
public static interface Parser<R extends Input.Result, I extends Input<R>> {
/**
* @return The type of the input
*/
String type();
/**
* Parses the given xcontent and creates a concrete input
*/
I parse(XContentParser parser) throws IOException;
/**
* Parses the given xContent and creates a concrete result
*/
R parseResult(XContentParser parser) throws IOException;
}
public abstract static class Result implements ToXContent {
public static final ParseField PAYLOAD_FIELD = new ParseField("payload");
abstract class Result implements ToXContent {
private final String type;
private final Payload payload;
@ -82,16 +41,21 @@ public abstract class Input<R extends Input.Result> implements ToXContent {
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject()
.field(PAYLOAD_FIELD.getPreferredName(), payload);
return toXContentBody(builder, params).endObject();
.field(Field.PAYLOAD.getPreferredName(), payload);
toXContentBody(builder, params);
return builder.endObject();
}
protected abstract XContentBuilder toXContentBody(XContentBuilder builder, Params params) throws IOException;
}
public static interface SourceBuilder extends ToXContent {
interface Builder<I extends Input> {
String type();
I build();
}
interface Field {
ParseField PAYLOAD = new ParseField("payload");
}
}

View File

@ -10,9 +10,11 @@ import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.common.collect.ImmutableMap;
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.watcher.input.http.HttpInput;
import org.elasticsearch.watcher.input.none.NoneInput;
import org.elasticsearch.watcher.input.search.SearchInput;
import org.elasticsearch.watcher.input.simple.SimpleInput;
import org.elasticsearch.watcher.support.http.HttpRequestTemplate;
import org.elasticsearch.watcher.watch.Payload;
import java.util.HashMap;
import java.util.Map;
@ -25,39 +27,43 @@ public final class InputBuilders {
private InputBuilders() {
}
public static SearchInput.SourceBuilder searchInput(SearchRequest request) {
return new SearchInput.SourceBuilder(request);
public static NoneInput.Builder noneInput() {
return NoneInput.builder();
}
public static SearchInput.SourceBuilder searchInput(SearchRequestBuilder builder) {
public static SearchInput.Builder searchInput(SearchRequest request) {
return SearchInput.builder(request);
}
public static SearchInput.Builder searchInput(SearchRequestBuilder builder) {
return searchInput(builder.request());
}
public static SimpleInput.SourceBuilder simpleInput() {
public static SimpleInput.Builder simpleInput() {
return simpleInput(new HashMap<String, Object>());
}
public static SimpleInput.SourceBuilder simpleInput(String key, Object value) {
public static SimpleInput.Builder simpleInput(String key, Object value) {
return simpleInput(MapBuilder.<String, Object>newMapBuilder().put(key, value));
}
public static SimpleInput.SourceBuilder simpleInput(ImmutableMap.Builder<String, Object> data) {
public static SimpleInput.Builder simpleInput(ImmutableMap.Builder<String, Object> data) {
return simpleInput(data.build());
}
public static SimpleInput.SourceBuilder simpleInput(MapBuilder<String, Object> data) {
public static SimpleInput.Builder simpleInput(MapBuilder<String, Object> data) {
return simpleInput(data.map());
}
public static SimpleInput.SourceBuilder simpleInput(Map<String, Object> data) {
return new SimpleInput.SourceBuilder(data);
public static SimpleInput.Builder simpleInput(Map<String, Object> data) {
return SimpleInput.builder(new Payload.Simple(data));
}
public static HttpInput.SourceBuilder httpInput(HttpRequestTemplate.Builder request) {
public static HttpInput.Builder httpInput(HttpRequestTemplate.Builder request) {
return httpInput(request.build());
}
public static HttpInput.SourceBuilder httpInput(HttpRequestTemplate request) {
return new HttpInput.SourceBuilder(request);
public static HttpInput.Builder httpInput(HttpRequestTemplate request) {
return HttpInput.builder(request);
}
}

View File

@ -12,11 +12,11 @@ import org.elasticsearch.watcher.WatcherException;
*/
public class InputException extends WatcherException {
public InputException(String msg) {
super(msg);
public InputException(String msg, Object... args) {
super(msg, args);
}
public InputException(String msg, Throwable cause) {
super(msg, cause);
public InputException(String msg, Throwable cause, Object... args) {
super(msg, cause, args);
}
}

View File

@ -0,0 +1,48 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.watcher.input;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.xcontent.XContentParser;
import java.io.IOException;
/**
* Parses xcontent to a concrete input of the same type.
*/
public abstract class InputFactory<I extends Input, R extends Input.Result, E extends ExecutableInput<I, R>> {
protected final ESLogger inputLogger;
public InputFactory(ESLogger inputLogger) {
this.inputLogger = inputLogger;
}
/**
* @return The type of the input
*/
public abstract String type();
/**
* Parses the given xcontent and creates a concrete input
*/
public abstract I parseInput(String watchId, XContentParser parser) throws IOException;
/**
* Parses the given xContent and creates a concrete result
*/
public abstract R parseResult(String watchId, XContentParser parser) throws IOException;
/**
* Creates an executable input out of the given input.
*/
public abstract E createExecutable(I input);
public ExecutableInput parseExecutable(String watchId, XContentParser parser) throws IOException {
I input = parseInput(watchId, parser);
return createExecutable(input);
}
}

View File

@ -8,8 +8,13 @@ package org.elasticsearch.watcher.input;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.inject.multibindings.MapBinder;
import org.elasticsearch.watcher.input.http.HttpInput;
import org.elasticsearch.watcher.input.http.HttpInputFactory;
import org.elasticsearch.watcher.input.none.NoneInput;
import org.elasticsearch.watcher.input.none.NoneInputFactory;
import org.elasticsearch.watcher.input.search.SearchInput;
import org.elasticsearch.watcher.input.search.SearchInputFactory;
import org.elasticsearch.watcher.input.simple.SimpleInput;
import org.elasticsearch.watcher.input.simple.SimpleInputFactory;
import java.util.HashMap;
import java.util.Map;
@ -19,25 +24,29 @@ import java.util.Map;
*/
public class InputModule extends AbstractModule {
private final Map<String, Class<? extends Input.Parser>> parsers = new HashMap<>();
private final Map<String, Class<? extends InputFactory>> parsers = new HashMap<>();
public void registerInput(String type, Class<? extends Input.Parser> parserType) {
public void registerInput(String type, Class<? extends InputFactory> parserType) {
parsers.put(type, parserType);
}
@Override
protected void configure() {
MapBinder<String, Input.Parser> parsersBinder = MapBinder.newMapBinder(binder(), String.class, Input.Parser.class);
bind(SearchInput.Parser.class).asEagerSingleton();
parsersBinder.addBinding(SearchInput.TYPE).to(SearchInput.Parser.class);
bind(SimpleInput.Parser.class).asEagerSingleton();
parsersBinder.addBinding(SimpleInput.TYPE).to(SimpleInput.Parser.class);
bind(HttpInput.Parser.class).asEagerSingleton();
parsersBinder.addBinding(HttpInput.TYPE).to(HttpInput.Parser.class);
bind(NoneInput.Parser.class).asEagerSingleton();
parsersBinder.addBinding(NoneInput.TYPE).to(NoneInput.Parser.class);
MapBinder<String, InputFactory> parsersBinder = MapBinder.newMapBinder(binder(), String.class, InputFactory.class);
for (Map.Entry<String, Class<? extends Input.Parser>> entry : parsers.entrySet()) {
bind(SearchInputFactory.class).asEagerSingleton();
parsersBinder.addBinding(SearchInput.TYPE).to(SearchInputFactory.class);
bind(SimpleInputFactory.class).asEagerSingleton();
parsersBinder.addBinding(SimpleInput.TYPE).to(SimpleInputFactory.class);
bind(HttpInputFactory.class).asEagerSingleton();
parsersBinder.addBinding(HttpInput.TYPE).to(HttpInputFactory.class);
bind(NoneInputFactory.class).asEagerSingleton();
parsersBinder.addBinding(NoneInput.TYPE).to(NoneInputFactory.class);
for (Map.Entry<String, Class<? extends InputFactory>> entry : parsers.entrySet()) {
bind(entry.getValue()).asEagerSingleton();
parsersBinder.addBinding(entry.getKey()).to(entry.getValue());
}

View File

@ -17,11 +17,11 @@ import java.util.Map;
*/
public class InputRegistry {
private final ImmutableMap<String, Input.Parser> parsers;
private final ImmutableMap<String, InputFactory> factories;
@Inject
public InputRegistry(Map<String, Input.Parser> parsers) {
this.parsers = ImmutableMap.copyOf(parsers);
public InputRegistry(Map<String, InputFactory> factories) {
this.factories = ImmutableMap.copyOf(factories);
}
/**
@ -31,37 +31,39 @@ public class InputRegistry {
* @return A new input instance from the parser
* @throws java.io.IOException
*/
public Input parse(XContentParser parser) throws IOException {
public ExecutableInput parse(String watchId, XContentParser parser) throws IOException {
String type = null;
XContentParser.Token token;
Input input = null;
ExecutableInput input = null;
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
type = parser.currentName();
} else if (token == XContentParser.Token.START_OBJECT && type != null) {
Input.Parser inputParser = parsers.get(type);
if (inputParser == null) {
throw new InputException("unknown input type [" + type + "]");
InputFactory factory = factories.get(type);
if (factory == null) {
throw new InputException("could not parse input for watch [{}]. unknown input type [{}]", watchId, type);
}
input = inputParser.parse(parser);
input = factory.parseExecutable(watchId, parser);
}
}
return input;
}
public Input.Result parseResult(XContentParser parser) throws IOException {
public Input.Result parseResult(String watchId, XContentParser parser) throws IOException {
String type = null;
XContentParser.Token token;
Input.Result inputResult = null;
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
type = parser.currentName();
} else if (token == XContentParser.Token.START_OBJECT && type != null) {
Input.Parser inputParser = parsers.get(type);
if (inputParser == null) {
throw new InputException("unknown input type [" + type + "]");
InputFactory factory = factories.get(type);
if (factory == null) {
throw new InputException("could not parse input result for watch [{}]. unknown input type [{}]", watchId, type);
}
inputResult = inputParser.parseResult(parser);
inputResult = factory.parseResult(watchId, parser);
}
}
return inputResult;

View File

@ -1,131 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.watcher.input;
import org.elasticsearch.common.collect.ImmutableMap;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.watcher.execution.WatchExecutionContext;
import org.elasticsearch.watcher.watch.Payload;
import java.io.IOException;
import java.util.Map;
/**
*
*/
public class NoneInput extends Input<NoneInput.Result> {
public static final String TYPE = "none";
private static final Payload EMPTY_PAYLOAD = new Payload() {
@Override
public Map<String, Object> data() {
return ImmutableMap.of();
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
return builder.startObject().endObject();
}
};
public NoneInput(ESLogger logger) {
super(logger);
}
@Override
public String type() {
return TYPE;
}
@Override
public Result execute(WatchExecutionContext ctx) throws IOException {
return Result.INSTANCE;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
return builder.startObject().endObject();
}
public static class Result extends Input.Result {
static final Result INSTANCE = new Result();
private Result() {
super(TYPE, EMPTY_PAYLOAD);
}
@Override
protected XContentBuilder toXContentBody(XContentBuilder builder, Params params) throws IOException {
return builder.startObject().endObject();
}
}
public static class Parser extends AbstractComponent implements Input.Parser<Result, NoneInput> {
private final NoneInput input;
@Inject
public Parser(Settings settings) {
super(settings);
this.input = new NoneInput(logger);
}
@Override
public String type() {
return TYPE;
}
@Override
public NoneInput parse(XContentParser parser) throws IOException {
if (parser.currentToken() != XContentParser.Token.START_OBJECT) {
}
parser.nextToken();
if (parser.currentToken() != XContentParser.Token.END_OBJECT) {
}
return input;
}
@Override
public Result parseResult(XContentParser parser) throws IOException {
if (parser.currentToken() != XContentParser.Token.START_OBJECT) {
}
parser.nextToken();
if (parser.currentToken() != XContentParser.Token.END_OBJECT) {
}
return Result.INSTANCE;
}
}
public static class SourceBuilder implements Input.SourceBuilder {
public static final SourceBuilder INSTANCE = new SourceBuilder();
private SourceBuilder() {
}
@Override
public String type() {
return TYPE;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
return builder.startObject().endObject();
}
}
}

View File

@ -0,0 +1,57 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.watcher.input.http;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.watcher.execution.WatchExecutionContext;
import org.elasticsearch.watcher.input.ExecutableInput;
import org.elasticsearch.watcher.support.Variables;
import org.elasticsearch.watcher.support.XContentFilterKeysUtils;
import org.elasticsearch.watcher.support.http.HttpClient;
import org.elasticsearch.watcher.support.http.HttpRequest;
import org.elasticsearch.watcher.support.http.HttpResponse;
import org.elasticsearch.watcher.support.template.TemplateEngine;
import org.elasticsearch.watcher.watch.Payload;
import java.io.IOException;
import java.util.Map;
/**
*/
public class ExecutableHttpInput extends ExecutableInput<HttpInput, HttpInput.Result> {
private final HttpClient client;
private final TemplateEngine templateEngine;
public ExecutableHttpInput(HttpInput input, ESLogger logger, HttpClient client, TemplateEngine templateEngine) {
super(input, logger);
this.client = client;
this.templateEngine = templateEngine;
}
@Override
public HttpInput.Result execute(WatchExecutionContext ctx) throws IOException {
Map<String, Object> model = Variables.createCtxModel(ctx, null);
HttpRequest request = input.getRequest().render(templateEngine, model);
HttpResponse response = client.execute(request);
Payload payload;
if (input.getExtractKeys() != null) {
XContentParser parser = XContentHelper.createParser(response.body());
Map<String, Object> filteredKeys = XContentFilterKeysUtils.filterMapOrdered(input.getExtractKeys(), parser);
payload = new Payload.Simple(filteredKeys);
} else {
Tuple<XContentType, Map<String, Object>> result = XContentHelper.convertToMap(response.body(), true);
payload = new Payload.Simple(result.v2());
}
return new HttpInput.Result(payload, request, response.status());
}
}

View File

@ -5,50 +5,34 @@
*/
package org.elasticsearch.watcher.input.http;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.collect.ImmutableSet;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.watcher.execution.WatchExecutionContext;
import org.elasticsearch.watcher.input.Input;
import org.elasticsearch.watcher.input.InputException;
import org.elasticsearch.watcher.support.Variables;
import org.elasticsearch.watcher.support.XContentFilterKeysUtils;
import org.elasticsearch.watcher.support.http.HttpClient;
import org.elasticsearch.watcher.support.http.HttpRequest;
import org.elasticsearch.watcher.support.http.HttpRequestTemplate;
import org.elasticsearch.watcher.support.http.HttpResponse;
import org.elasticsearch.watcher.support.template.TemplateEngine;
import org.elasticsearch.watcher.watch.Payload;
import java.io.IOException;
import java.util.Collection;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
/**
*
*/
public class HttpInput extends Input<HttpInput.Result> {
public class HttpInput implements Input {
public static final String TYPE = "http";
private final HttpClient client;
private final Set<String> extractKeys;
private final HttpRequestTemplate requestTemplate;
private final TemplateEngine templateEngine;
private final HttpRequestTemplate request;
private final @Nullable Set<String> extractKeys;
public HttpInput(ESLogger logger, HttpClient client, HttpRequestTemplate requestTemplate, Set<String> extractKeys, TemplateEngine templateEngine) {
super(logger);
this.requestTemplate = requestTemplate;
this.client = client;
public HttpInput(HttpRequestTemplate request, @Nullable Set<String> extractKeys) {
this.request = request;
this.extractKeys = extractKeys;
this.templateEngine = templateEngine;
}
@Override
@ -56,217 +40,169 @@ public class HttpInput extends Input<HttpInput.Result> {
return TYPE;
}
@Override
public Result execute(WatchExecutionContext ctx) throws IOException {
Map<String, Object> model = Variables.createCtxModel(ctx, null);
HttpRequest request = requestTemplate.render(templateEngine, model);
public HttpRequestTemplate getRequest() {
return request;
}
HttpResponse response = client.execute(request);
Payload payload;
if (extractKeys != null) {
XContentParser parser = XContentHelper.createParser(response.body());
Map<String, Object> filteredKeys = XContentFilterKeysUtils.filterMapOrdered(extractKeys, parser);
payload = new Payload.Simple(filteredKeys);
} else {
Tuple<XContentType, Map<String, Object>> result = XContentHelper.convertToMap(response.body(), true);
payload = new Payload.Simple(result.v2());
}
return new Result(payload, request, response.status());
public Set<String> getExtractKeys() {
return extractKeys;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(Parser.REQUEST_FIELD.getPreferredName());
builder = requestTemplate.toXContent(builder, params);
builder.field(Field.REQUEST.getPreferredName(), request);
if (extractKeys != null) {
builder.startArray(Parser.EXTRACT_FIELD.getPreferredName());
for (String extractKey : extractKeys) {
builder.value(extractKey);
builder.field(Field.EXTRACT.getPreferredName(), extractKeys);
}
builder.endObject();
return builder;
}
public static HttpInput parse(String watchId, XContentParser parser, HttpRequestTemplate.Parser requestParser) throws IOException {
Set<String> extract = null;
HttpRequestTemplate request = null;
String currentFieldName = null;
XContentParser.Token token;
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
currentFieldName = parser.currentName();
} else if (Field.REQUEST.match(currentFieldName)) {
try {
request = requestParser.parse(parser);
} catch (HttpRequestTemplate.ParseException pe) {
throw new HttpInputException("could not parse [{}] input for watch [{}]. failed to parse http request template", pe, TYPE, watchId);
}
} else if (token == XContentParser.Token.START_ARRAY) {
if (Field.EXTRACT.getPreferredName().equals(currentFieldName)) {
extract = new HashSet<>();
while ((token = parser.nextToken()) != XContentParser.Token.END_ARRAY) {
if (token == XContentParser.Token.VALUE_STRING) {
extract.add(parser.text());
} else {
throw new HttpInputException("could not parse [{}] input for watch [{}]. expected a string value as an [{}] item but found [{}] instead", TYPE, watchId, currentFieldName, token);
}
}
} else {
throw new HttpInputException("could not parse [{}] input for watch [{}]. unexpected array field [{}]", TYPE, watchId, currentFieldName);
}
} else {
throw new HttpInputException("could not parse [{}] input for watch [{}]. unexpected token [{}]", TYPE, watchId, token);
}
builder.endArray();
}
return builder.endObject();
if (request == null) {
throw new HttpInputException("could not parse [{}] input for watch [{}]. missing require [{}] field", TYPE, watchId, Field.REQUEST.getPreferredName());
}
return new HttpInput(request, extract);
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
HttpInput httpInput = (HttpInput) o;
if (!requestTemplate.equals(httpInput.requestTemplate)) return false;
return true;
public static Builder builder(HttpRequestTemplate httpRequest) {
return new Builder(httpRequest);
}
@Override
public int hashCode() {
return requestTemplate.hashCode();
}
public static class Result extends Input.Result {
HttpRequestTemplate getRequestTemplate() {
return requestTemplate;
}
private final HttpRequest sentRequest;
private final int httpStatus;
public final static class Result extends Input.Result {
private final HttpRequest request;
private final int statusCode;
public Result(Payload payload, HttpRequest request, int statusCode) {
public Result(Payload payload, HttpRequest sentRequest, int httpStatus) {
super(TYPE, payload);
this.request = request;
this.statusCode = statusCode;
this.sentRequest =sentRequest;
this.httpStatus = httpStatus;
}
public HttpRequest sentRequest() {
return sentRequest;
}
public int statusCode() {
return httpStatus;
}
@Override
protected XContentBuilder toXContentBody(XContentBuilder builder, Params params) throws IOException {
builder.field(Parser.HTTP_STATUS_FIELD.getPreferredName(), statusCode);
builder.field(Parser.REQUEST_FIELD.getPreferredName(), request);
return builder;
return builder.field(Field.SENT_REQUEST.getPreferredName(), sentRequest)
.field(Field.HTTP_STATUS.getPreferredName(), httpStatus);
}
HttpRequest request() {
return request;
}
int statusCode() {
return statusCode;
}
}
public final static class Parser extends AbstractComponent implements Input.Parser<Result, HttpInput> {
public static final ParseField REQUEST_FIELD = new ParseField("request");
public static final ParseField EXTRACT_FIELD = new ParseField("extract");
public static final ParseField HTTP_STATUS_FIELD = new ParseField("http_status");
private final HttpClient client;
private final HttpRequest.Parser requestParser;
private final HttpRequestTemplate.Parser requestTemplateParser;
private final TemplateEngine templateEngine;
@Inject
public Parser(Settings settings, HttpClient client, HttpRequest.Parser requestParser, HttpRequestTemplate.Parser requestTemplateParser, TemplateEngine templateEngine) {
super(settings);
this.client = client;
this.requestParser = requestParser;
this.requestTemplateParser = requestTemplateParser;
this.templateEngine = templateEngine;
}
@Override
public String type() {
return TYPE;
}
@Override
public HttpInput parse(XContentParser parser) throws IOException {
Set<String> extract = null;
HttpRequestTemplate request = null;
String currentFieldName = null;
for (XContentParser.Token token = parser.nextToken(); token != XContentParser.Token.END_OBJECT; token = parser.nextToken()) {
switch (token) {
case FIELD_NAME:
currentFieldName = parser.currentName();
break;
case START_OBJECT:
if (REQUEST_FIELD.getPreferredName().equals(currentFieldName)) {
request = requestTemplateParser.parse(parser);
} else {
throw new InputException("could not parse [http] input. unexpected field [" + currentFieldName + "]");
}
break;
case START_ARRAY:
if (EXTRACT_FIELD.getPreferredName().equals(currentFieldName)) {
extract = new HashSet<>();
for (XContentParser.Token arrayToken = parser.nextToken(); arrayToken != XContentParser.Token.END_ARRAY; arrayToken = parser.nextToken()) {
if (arrayToken == XContentParser.Token.VALUE_STRING) {
extract.add(parser.text());
}
}
} else {
throw new InputException("could not parse [http] input. unexpected field [" + currentFieldName + "]");
}
break;
default:
throw new InputException("could not parse [http] input. unexpected token [" + token + "]");
}
}
if (request == null) {
throw new InputException("could not parse [http] input. http request is missing or null.");
}
return new HttpInput(logger, client, request, extract, templateEngine);
}
@Override
public Result parseResult(XContentParser parser) throws IOException {
public static Result parse(String watchId, XContentParser parser, HttpRequest.Parser requestParser) throws IOException {
HttpRequest sentRequest = null;
Payload payload = null;
HttpRequest request = null;
int statusCode = -1;
int httpStatus = -1;
XContentParser.Token token;
String currentFieldName = null;
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
currentFieldName = parser.currentName();
} else if (Field.SENT_REQUEST.match(currentFieldName)) {
try {
sentRequest = requestParser.parse(parser);
} catch (HttpRequest.Parser.ParseException pe) {
throw new HttpInputException("could not parse [{}] input result for watch [{}]. failed parsing [{}] field", pe, TYPE, watchId, Field.SENT_REQUEST.getPreferredName());
}
} else if (token == XContentParser.Token.START_OBJECT) {
if (Input.Result.PAYLOAD_FIELD.match(currentFieldName)) {
if (Field.PAYLOAD.match(currentFieldName)) {
payload = new Payload.XContent(parser);
} else if (REQUEST_FIELD.match(currentFieldName)) {
request = requestParser.parse(parser);
} else {
throw new HttpInputException("could not parse [{}] input result for watch [{}]. unexpected object field [{}]", TYPE, watchId, currentFieldName);
}
} else if (token == XContentParser.Token.VALUE_NUMBER) {
if (HTTP_STATUS_FIELD.match(currentFieldName)) {
statusCode = parser.intValue();
if (Field.HTTP_STATUS.match(currentFieldName)) {
httpStatus = parser.intValue();
} else {
throw new HttpInputException("could not parse [{}] input result for watch [{}]. unexpected numeric field [{}]", TYPE, watchId, currentFieldName);
}
} else {
throw new HttpInputException("could not parse [{}] input result for watch [{}]. unexpected token [{}]", TYPE, watchId, token);
}
}
return new Result(payload, request, statusCode);
}
if (sentRequest == null) {
throw new HttpInputException("could not parse [{}] input result for watch [{}]. missing required [{}] field", TYPE, watchId, Field.SENT_REQUEST.getPreferredName());
}
if (httpStatus < 0) {
throw new HttpInputException("could not parse [{}] input result for watch [{}]. missing required [{}] field", TYPE, watchId, Field.HTTP_STATUS.getPreferredName());
}
return new HttpInput.Result(payload, sentRequest, httpStatus);
}
}
public final static class SourceBuilder implements Input.SourceBuilder {
public static class Builder implements Input.Builder<HttpInput> {
private HttpRequestTemplate request;
private Set<String> extractKeys;
private final HttpRequestTemplate request;
private final ImmutableSet.Builder<String> extractKeys = ImmutableSet.builder();
public SourceBuilder(HttpRequestTemplate request) {
private Builder(HttpRequestTemplate request) {
this.request = request;
}
public SourceBuilder addExtractKey(String key) {
if (extractKeys == null) {
extractKeys = new HashSet<>();
}
extractKeys.add(key);
public Builder extractKeys(Collection<String> keys) {
extractKeys.addAll(keys);
return this;
}
public Builder extractKeys(String... keys) {
extractKeys.add(keys);
return this;
}
@Override
public String type() {
return TYPE;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
if (extractKeys != null) {
builder.startArray(Parser.EXTRACT_FIELD.getPreferredName());
for (String extractKey : extractKeys) {
builder.value(extractKey);
}
builder.endArray();
}
builder.field(Parser.REQUEST_FIELD.getPreferredName(), request);
return builder.endObject();
public HttpInput build() {
ImmutableSet<String> keys = extractKeys.build();
return new HttpInput(request, keys.isEmpty() ? null : keys);
}
}
interface Field extends Input.Field {
ParseField REQUEST = new ParseField("request");
ParseField SENT_REQUEST = new ParseField("sent_request");
ParseField EXTRACT = new ParseField("extract");
ParseField HTTP_STATUS = new ParseField("http_status");
}
}

View File

@ -0,0 +1,22 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.watcher.input.http;
import org.elasticsearch.watcher.input.InputException;
/**
*
*/
public class HttpInputException extends InputException {
public HttpInputException(String msg, Object... args) {
super(msg, args);
}
public HttpInputException(String msg, Throwable cause, Object... args) {
super(msg, cause, args);
}
}

View File

@ -0,0 +1,58 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.watcher.input.http;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.watcher.input.InputFactory;
import org.elasticsearch.watcher.support.http.HttpClient;
import org.elasticsearch.watcher.support.http.HttpRequest;
import org.elasticsearch.watcher.support.http.HttpRequestTemplate;
import org.elasticsearch.watcher.support.template.TemplateEngine;
import java.io.IOException;
/**
*
*/
public final class HttpInputFactory extends InputFactory<HttpInput, HttpInput.Result, ExecutableHttpInput> {
private final HttpClient httpClient;
private final TemplateEngine templateEngine;
private final HttpRequest.Parser requestParser;
private final HttpRequestTemplate.Parser requestTemplateParser;
@Inject
public HttpInputFactory(Settings settings, HttpClient httpClient, TemplateEngine templateEngine, HttpRequest.Parser requestParser, HttpRequestTemplate.Parser requestTemplateParser) {
super(Loggers.getLogger(ExecutableHttpInput.class, settings));
this.templateEngine = templateEngine;
this.httpClient = httpClient;
this.requestParser = requestParser;
this.requestTemplateParser = requestTemplateParser;
}
@Override
public String type() {
return HttpInput.TYPE;
}
@Override
public HttpInput parseInput(String watchId, XContentParser parser) throws IOException {
return HttpInput.parse(watchId, parser, requestTemplateParser);
}
@Override
public HttpInput.Result parseResult(String watchId, XContentParser parser) throws IOException {
return HttpInput.Result.parse(watchId, parser, requestParser);
}
@Override
public ExecutableHttpInput createExecutable(HttpInput input) {
return new ExecutableHttpInput(input, inputLogger, httpClient, templateEngine);
}
}

View File

@ -0,0 +1,29 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.watcher.input.none;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.watcher.execution.WatchExecutionContext;
import org.elasticsearch.watcher.input.ExecutableInput;
import java.io.IOException;
/**
*
*/
public class ExecutableNoneInput extends ExecutableInput<NoneInput, NoneInput.Result> {
public ExecutableNoneInput(ESLogger logger) {
super(NoneInput.INSTANCE, logger);
}
@Override
public NoneInput.Result execute(WatchExecutionContext ctx) throws IOException {
return NoneInput.Result.INSTANCE;
}
}

View File

@ -0,0 +1,91 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.watcher.input.none;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.watcher.input.Input;
import org.elasticsearch.watcher.watch.Payload;
import java.io.IOException;
/**
*
*/
public class NoneInput implements Input {
public static final String TYPE = "none";
public static final NoneInput INSTANCE = new NoneInput();
private NoneInput() {
}
@Override
public String type() {
return TYPE;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
return builder.startObject().endObject();
}
public static NoneInput parse(String watchId, XContentParser parser) throws IOException {
if (parser.currentToken() != XContentParser.Token.START_OBJECT) {
throw new NoneInputException("could not parse [{}] input for watch [{}]. expected an empty object but found [{}] instead", TYPE, watchId, parser.currentToken());
}
if (parser.nextToken() != XContentParser.Token.END_OBJECT) {
throw new NoneInputException("could not parse [{}] input for watch [{}]. expected an empty object but found [{}] instead", TYPE, watchId, parser.currentToken());
}
return INSTANCE;
}
public static Builder builder() {
return Builder.INSTANCE;
}
public static class Result extends Input.Result {
static final Result INSTANCE = new Result();
private Result() {
super(TYPE, Payload.EMPTY);
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
return builder.startObject().endObject();
}
@Override
protected XContentBuilder toXContentBody(XContentBuilder builder, Params params) throws IOException {
return builder;
}
public static Result parse(String watchId, XContentParser parser) throws IOException {
if (parser.currentToken() != XContentParser.Token.START_OBJECT) {
throw new NoneInputException("could not parse [{}] input result for watch [{}]. expected an empty object but found [{}] instead", TYPE, watchId, parser.currentToken());
}
if (parser.nextToken() != XContentParser.Token.END_OBJECT) {
throw new NoneInputException("could not parse [{}] input result for watch [{}]. expected an empty object but found [{}] instead", TYPE, watchId, parser.currentToken());
}
return INSTANCE;
}
}
public static class Builder implements Input.Builder<NoneInput> {
private static final Builder INSTANCE = new Builder();
private Builder() {
}
@Override
public NoneInput build() {
return NoneInput.INSTANCE;
}
}
}

View File

@ -0,0 +1,22 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.watcher.input.none;
import org.elasticsearch.watcher.input.InputException;
/**
*
*/
public class NoneInputException extends InputException {
public NoneInputException(String msg, Object... args) {
super(msg, args);
}
public NoneInputException(String msg, Throwable cause, Object... args) {
super(msg, cause, args);
}
}

View File

@ -0,0 +1,45 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.watcher.input.none;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.watcher.input.InputFactory;
import java.io.IOException;
/**
*
*/
public class NoneInputFactory extends InputFactory<NoneInput, NoneInput.Result, ExecutableNoneInput> {
@Inject
public NoneInputFactory(Settings settings) {
super(Loggers.getLogger(ExecutableNoneInput.class, settings));
}
@Override
public String type() {
return NoneInput.TYPE;
}
@Override
public NoneInput parseInput(String watchId, XContentParser parser) throws IOException {
return NoneInput.parse(watchId, parser);
}
@Override
public NoneInput.Result parseResult(String watchId, XContentParser parser) throws IOException {
return NoneInput.Result.parse(watchId, parser);
}
@Override
public ExecutableNoneInput createExecutable(NoneInput input) {
return new ExecutableNoneInput(inputLogger);
}
}

View File

@ -0,0 +1,106 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.watcher.input.search;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.script.ExecutableScript;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.watcher.execution.WatchExecutionContext;
import org.elasticsearch.watcher.input.ExecutableInput;
import org.elasticsearch.watcher.support.Variables;
import org.elasticsearch.watcher.support.XContentFilterKeysUtils;
import org.elasticsearch.watcher.support.init.proxy.ClientProxy;
import org.elasticsearch.watcher.support.init.proxy.ScriptServiceProxy;
import org.elasticsearch.watcher.watch.Payload;
import java.io.IOException;
import java.util.Map;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
/**
* An input that executes search and returns the search response as the initial payload
*/
public class ExecutableSearchInput extends ExecutableInput<SearchInput, SearchInput.Result> {
public static final SearchType DEFAULT_SEARCH_TYPE = SearchType.QUERY_THEN_FETCH;
private final ScriptServiceProxy scriptService;
private final ClientProxy client;
public ExecutableSearchInput(SearchInput input, ESLogger logger, ScriptServiceProxy scriptService, ClientProxy client) {
super(input, logger);
this.scriptService = scriptService;
this.client = client;
}
@Override
public SearchInput.Result execute(WatchExecutionContext ctx) throws IOException {
SearchRequest request = createSearchRequestWithTimes(input.getSearchRequest(), ctx, scriptService);
if (logger.isTraceEnabled()) {
logger.trace("[{}] running query for [{}] [{}]", ctx.id(), ctx.watch().name(), XContentHelper.convertToJson(request.source(), false, true));
}
// actionGet deals properly with InterruptedException
SearchResponse response = client.search(request);
if (logger.isDebugEnabled()) {
logger.debug("[{}] found [{}] hits", ctx.id(), ctx.watch().name(), response.getHits().getTotalHits());
for (SearchHit hit : response.getHits()) {
logger.debug("[{}] hit [{}]", ctx.id(), XContentHelper.toString(hit));
}
}
final Payload payload;
if (input.getExtractKeys() != null) {
XContentBuilder builder = jsonBuilder().startObject().value(response).endObject();
XContentParser parser = XContentHelper.createParser(builder.bytes());
Map<String, Object> filteredKeys = XContentFilterKeysUtils.filterMapOrdered(input.getExtractKeys(), parser);
payload = new Payload.Simple(filteredKeys);
} else {
payload = new Payload.XContent(response);
}
return new SearchInput.Result(request, payload);
}
/**
* Creates a new search request applying the scheduledFireTime and fireTime to the original request
*/
public static SearchRequest createSearchRequestWithTimes(SearchRequest requestPrototype, WatchExecutionContext ctx, ScriptServiceProxy scriptService) throws IOException {
SearchRequest request = new SearchRequest(requestPrototype)
.indicesOptions(requestPrototype.indicesOptions())
.searchType(requestPrototype.searchType())
.indices(requestPrototype.indices());
if (Strings.hasLength(requestPrototype.source())) {
Map<String, Object> templateParams = Variables.createCtxModel(ctx, null);
String requestSource = XContentHelper.convertToJson(requestPrototype.source(), false);
ExecutableScript script = scriptService.executable("mustache", requestSource, ScriptService.ScriptType.INLINE, templateParams);
request.source((BytesReference) script.unwrap(script.run()), false);
} else if (requestPrototype.templateName() != null) {
Map<String, Object> templateParams = Variables.createCtxModel(ctx, null);
templateParams.putAll(requestPrototype.templateParams());
request.templateParams(templateParams);
request.templateName(requestPrototype.templateName());
request.templateType(requestPrototype.templateType());
}
// falling back to an empty body
return request;
}
}

View File

@ -6,61 +6,35 @@
package org.elasticsearch.watcher.input.search;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.collect.ImmutableSet;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.script.ExecutableScript;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.watcher.input.Input;
import org.elasticsearch.watcher.input.InputException;
import org.elasticsearch.watcher.support.SearchRequestEquivalence;
import org.elasticsearch.watcher.support.Variables;
import org.elasticsearch.watcher.support.SearchRequestParseException;
import org.elasticsearch.watcher.support.WatcherUtils;
import org.elasticsearch.watcher.support.XContentFilterKeysUtils;
import org.elasticsearch.watcher.support.init.proxy.ClientProxy;
import org.elasticsearch.watcher.support.init.proxy.ScriptServiceProxy;
import org.elasticsearch.watcher.watch.Payload;
import org.elasticsearch.watcher.execution.WatchExecutionContext;
import java.io.IOException;
import java.util.Collection;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.common.xcontent.XContentParser.*;
/**
* An input that executes search and returns the search response as the initial payload
*
*/
public class SearchInput extends Input<SearchInput.Result> {
public class SearchInput implements Input {
public static final String TYPE = "search";
public static final SearchType DEFAULT_SEARCH_TYPE = SearchType.QUERY_THEN_FETCH;
private final Set<String> extractKeys;
private final SearchRequest searchRequest;
private final @Nullable Set<String> extractKeys;
private final ScriptServiceProxy scriptService;
private final ClientProxy client;
public SearchInput(ESLogger logger, ScriptServiceProxy scriptService, ClientProxy client, SearchRequest searchRequest, Set<String> extractKeys) {
super(logger);
this.extractKeys = extractKeys;
public SearchInput(SearchRequest searchRequest, @Nullable Set<String> extractKeys) {
this.searchRequest = searchRequest;
this.scriptService = scriptService;
this.client = client;
this.extractKeys = extractKeys;
}
@Override
@ -68,52 +42,6 @@ public class SearchInput extends Input<SearchInput.Result> {
return TYPE;
}
@Override
public Result execute(WatchExecutionContext ctx) throws IOException {
SearchRequest request = createSearchRequestWithTimes(this.searchRequest, ctx, scriptService);
if (logger.isTraceEnabled()) {
logger.trace("[{}] running query for [{}] [{}]", ctx.id(), ctx.watch().name(), XContentHelper.convertToJson(request.source(), false, true));
}
// actionGet deals properly with InterruptedException
SearchResponse response = client.search(request);
if (logger.isDebugEnabled()) {
logger.debug("[{}] found [{}] hits", ctx.id(), ctx.watch().name(), response.getHits().getTotalHits());
for (SearchHit hit : response.getHits()) {
logger.debug("[{}] hit [{}]", ctx.id(), XContentHelper.toString(hit));
}
}
final Payload payload;
if (extractKeys != null) {
XContentBuilder builder = jsonBuilder().startObject().value(response).endObject();
XContentParser parser = XContentHelper.createParser(builder.bytes());
Map<String, Object> filteredKeys = XContentFilterKeysUtils.filterMapOrdered(extractKeys, parser);
payload = new Payload.Simple(filteredKeys);
} else {
payload = new Payload.XContent(response);
}
return new Result(TYPE, payload, request);
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(Parser.REQUEST_FIELD.getPreferredName());
builder = WatcherUtils.writeSearchRequest(searchRequest, builder, params);
if (extractKeys != null) {
builder.startArray(Parser.EXTRACT_FIELD.getPreferredName());
for (String extractKey : extractKeys) {
builder.value(extractKey);
}
builder.endArray();
}
builder.endObject();
return builder;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
@ -121,190 +49,166 @@ public class SearchInput extends Input<SearchInput.Result> {
SearchInput that = (SearchInput) o;
if (!SearchRequestEquivalence.INSTANCE.equivalent(searchRequest, that.searchRequest)) return false;
return true;
if (!SearchRequestEquivalence.INSTANCE.equivalent(searchRequest, this.searchRequest)) return false;
return !(extractKeys != null ? !extractKeys.equals(that.extractKeys) : that.extractKeys != null);
}
@Override
public int hashCode() {
return SearchRequestEquivalence.INSTANCE.hash(searchRequest);
int result = searchRequest.hashCode();
result = 31 * result + (extractKeys != null ? extractKeys.hashCode() : 0);
return result;
}
/**
* Creates a new search request applying the scheduledFireTime and fireTime to the original request
*/
public static SearchRequest createSearchRequestWithTimes(SearchRequest requestPrototype, WatchExecutionContext ctx, ScriptServiceProxy scriptService) throws IOException {
SearchRequest request = new SearchRequest(requestPrototype)
.indicesOptions(requestPrototype.indicesOptions())
.searchType(requestPrototype.searchType())
.indices(requestPrototype.indices());
if (Strings.hasLength(requestPrototype.source())) {
Map<String, Object> templateParams = Variables.createCtxModel(ctx, null);
String requestSource = XContentHelper.convertToJson(requestPrototype.source(), false);
ExecutableScript script = scriptService.executable("mustache", requestSource, ScriptService.ScriptType.INLINE, templateParams);
request.source((BytesReference) script.unwrap(script.run()), false);
} else if (requestPrototype.templateName() != null) {
Map<String, Object> templateParams = Variables.createCtxModel(ctx, null);
templateParams.putAll(requestPrototype.templateParams());
request.templateParams(templateParams);
request.templateName(requestPrototype.templateName());
request.templateType(requestPrototype.templateType());
public SearchRequest getSearchRequest() {
return searchRequest;
}
public Set<String> getExtractKeys() {
return extractKeys;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(Field.REQUEST.getPreferredName());
builder = WatcherUtils.writeSearchRequest(searchRequest, builder, params);
if (extractKeys != null) {
builder.field(Field.EXTRACT.getPreferredName(), extractKeys);
}
// falling back to an empty body
return request;
builder.endObject();
return builder;
}
public static SearchInput parse(String watchId, XContentParser parser) throws IOException {
SearchRequest request = null;
Set<String> extract = null;
String currentFieldName = null;
XContentParser.Token token;
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
currentFieldName = parser.currentName();
} else if (Field.REQUEST.match(currentFieldName)) {
try {
request = WatcherUtils.readSearchRequest(parser, ExecutableSearchInput.DEFAULT_SEARCH_TYPE);
} catch (SearchRequestParseException srpe) {
throw new SearchInputException("could not parse [{}] input for watch [{}]. failed to parse [{}]", srpe, TYPE, watchId, currentFieldName);
}
} else if (token == XContentParser.Token.START_ARRAY) {
if (Field.EXTRACT.match(currentFieldName)) {
extract = new HashSet<>();
while ((token = parser.nextToken()) != XContentParser.Token.END_ARRAY) {
if (token == XContentParser.Token.VALUE_STRING) {
extract.add(parser.text());
} else {
throw new SearchInputException("could not parse [{}] input for watch [{}]. expected a string value in [{}] array, but found [{}] instead", TYPE, watchId, currentFieldName, token);
}
}
} else {
throw new SearchInputException("could not parse [{}] input for watch [{}]. unexpected array field [{}]", TYPE, watchId, currentFieldName);
}
} else {
throw new SearchInputException("could not parse [{}] input for watch [{}]. unexpected token [{}]", TYPE, watchId, token);
}
}
if (request == null) {
throw new SearchInputException("could not parse [{}] input for watch [{}]. missing required [{}] field", TYPE, watchId, Field.REQUEST.getPreferredName());
}
return new SearchInput(request, extract);
}
public static Builder builder(SearchRequest request) {
return new Builder(request);
}
public static class Result extends Input.Result {
private final SearchRequest request;
private final SearchRequest executedRequest;
public Result(String type, Payload payload, SearchRequest request) {
super(type, payload);
this.request = request;
public Result(SearchRequest executedRequest, Payload payload) {
super(TYPE, payload);
this.executedRequest = executedRequest;
}
public SearchRequest request() {
return request;
public SearchRequest executedRequest() {
return executedRequest;
}
@Override
protected XContentBuilder toXContentBody(XContentBuilder builder, Params params) throws IOException {
builder.field(Parser.REQUEST_FIELD.getPreferredName());
return WatcherUtils.writeSearchRequest(request, builder, params);
}
}
public static class Parser extends AbstractComponent implements Input.Parser<Result,SearchInput> {
public static ParseField REQUEST_FIELD = new ParseField("request");
public static ParseField EXTRACT_FIELD = new ParseField("extract");
private final ScriptServiceProxy scriptService;
private final ClientProxy client;
@Inject
public Parser(Settings settings, ScriptServiceProxy scriptService, ClientProxy client) {
super(settings);
this.scriptService = scriptService;
this.client = client;
}
@Override
public String type() {
return TYPE;
builder.field(Field.EXECUTED_REQUEST.getPreferredName());
WatcherUtils.writeSearchRequest(executedRequest, builder, params);
return builder;
}
@Override
public SearchInput parse(XContentParser parser) throws IOException {
Set<String> extract = null;
SearchRequest request = null;
String currentFieldName = null;
for (Token token = parser.nextToken(); token != Token.END_OBJECT; token = parser.nextToken()) {
switch (token) {
case FIELD_NAME:
currentFieldName = parser.currentName();
break;
case START_OBJECT:
if (REQUEST_FIELD.getPreferredName().equals(currentFieldName)) {
request = WatcherUtils.readSearchRequest(parser, DEFAULT_SEARCH_TYPE);
} else {
throw new InputException("could not parse [search] input. unexpected field [" + currentFieldName + "]");
}
break;
case START_ARRAY:
if (EXTRACT_FIELD.getPreferredName().equals(currentFieldName)) {
extract = new HashSet<>();
for (Token arrayToken = parser.nextToken(); arrayToken != Token.END_ARRAY; arrayToken = parser.nextToken()) {
if (arrayToken == Token.VALUE_STRING) {
extract.add(parser.text());
}
}
} else {
throw new InputException("could not parse [search] input. unexpected field [" + currentFieldName + "]");
}
break;
default:
throw new InputException("could not parse [search] input. unexpected token [" + token + "]");
}
}
if (request == null) {
throw new InputException("could not parse [search] input. search request is missing or null.");
}
return new SearchInput(logger, scriptService, client, request, extract);
}
@Override
public Result parseResult(XContentParser parser) throws IOException {
public static Result parse(String watchId, XContentParser parser) throws IOException {
SearchRequest executedRequest = null;
Payload payload = null;
SearchRequest request = null;
String currentFieldName = null;
for (Token token = parser.nextToken(); token != Token.END_OBJECT; token = parser.nextToken()) {
if (token == Token.FIELD_NAME) {
XContentParser.Token token;
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
currentFieldName = parser.currentName();
} else if (token == Token.START_OBJECT && currentFieldName != null) {
if (Input.Result.PAYLOAD_FIELD.match(currentFieldName)) {
} else if (Field.EXECUTED_REQUEST.match(currentFieldName)) {
try {
executedRequest = WatcherUtils.readSearchRequest(parser, ExecutableSearchInput.DEFAULT_SEARCH_TYPE);
} catch (SearchRequestParseException srpe) {
throw new SearchInputException("could not parse [{}] input result for watch [{}]. failed to parse [{}]", srpe, TYPE, watchId, currentFieldName);
}
} else if (token == XContentParser.Token.START_OBJECT && currentFieldName != null) {
if (Field.PAYLOAD.match(currentFieldName)) {
payload = new Payload.XContent(parser);
} else if (REQUEST_FIELD.match(currentFieldName)) {
request = WatcherUtils.readSearchRequest(parser, DEFAULT_SEARCH_TYPE);
} else {
throw new InputException("unable to parse [" + TYPE + "] input result. unexpected field [" + currentFieldName + "]");
throw new SearchInputException("could not parse [{}] input result for watch [{}]. unexpected field [{}]", TYPE, watchId, currentFieldName);
}
}
}
if (executedRequest == null) {
throw new SearchInputException("could not parse [{}] input result for watch [{}]. missing required [{}] field", TYPE, watchId, Field.EXECUTED_REQUEST.getPreferredName());
}
if (payload == null) {
throw new InputException("unable to parse [" + TYPE + "] input result ["
+ Input.Result.PAYLOAD_FIELD.getPreferredName() + "] is required");
throw new SearchInputException("could not parse [{}] input result for watch [{}]. missing required [{}] field", TYPE, watchId, Field.PAYLOAD.getPreferredName());
}
if (request == null) {
throw new InputException("unable to parse [" + TYPE + "] input result, ["
+ REQUEST_FIELD.getPreferredName() + "] is required");
}
return new Result(TYPE, payload, request);
return new Result(executedRequest, payload);
}
}
public static class SourceBuilder implements Input.SourceBuilder {
public static class Builder implements Input.Builder<SearchInput> {
private final SearchRequest request;
private Set<String> extractKeys;
private final ImmutableSet.Builder<String> extractKeys = ImmutableSet.builder();
public SourceBuilder(SearchRequest request) {
private Builder(SearchRequest request) {
this.request = request;
}
public SourceBuilder addExtractKey(String key) {
if (extractKeys == null) {
extractKeys = new HashSet<>();
}
extractKeys.add(key);
public Builder extractKeys(Collection<String> keys) {
extractKeys.addAll(keys);
return this;
}
public Builder extractKeys(String... keys) {
extractKeys.add(keys);
return this;
}
@Override
public String type() {
return TYPE;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
if (extractKeys != null) {
builder.startArray(Parser.EXTRACT_FIELD.getPreferredName());
for (String extractKey : extractKeys) {
builder.value(extractKey);
}
builder.endArray();
}
builder.field(Parser.REQUEST_FIELD.getPreferredName());
builder = WatcherUtils.writeSearchRequest(request, builder, params);
return builder.endObject();
public SearchInput build() {
Set<String> keys = extractKeys.build();
return new SearchInput(request, keys.isEmpty() ? null : keys);
}
}
public interface Field extends Input.Field {
ParseField REQUEST = new ParseField("request");
ParseField EXECUTED_REQUEST = new ParseField("executed_request");
ParseField EXTRACT = new ParseField("extract");
}
}

View File

@ -0,0 +1,22 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.watcher.input.search;
import org.elasticsearch.watcher.input.InputException;
/**
*
*/
public class SearchInputException extends InputException {
public SearchInputException(String msg, Object... args) {
super(msg, args);
}
public SearchInputException(String msg, Throwable cause, Object... args) {
super(msg, cause, args);
}
}

View File

@ -0,0 +1,53 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.watcher.input.search;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.watcher.input.InputFactory;
import org.elasticsearch.watcher.input.simple.ExecutableSimpleInput;
import org.elasticsearch.watcher.support.init.proxy.ClientProxy;
import org.elasticsearch.watcher.support.init.proxy.ScriptServiceProxy;
import java.io.IOException;
/**
*
*/
public class SearchInputFactory extends InputFactory<SearchInput, SearchInput.Result, ExecutableSearchInput> {
private final ScriptServiceProxy scriptService;
private final ClientProxy client;
@Inject
public SearchInputFactory(Settings settings, ScriptServiceProxy scriptService, ClientProxy client) {
super(Loggers.getLogger(ExecutableSimpleInput.class, settings));
this.scriptService = scriptService;
this.client = client;
}
@Override
public String type() {
return SearchInput.TYPE;
}
@Override
public SearchInput parseInput(String watchId, XContentParser parser) throws IOException {
return SearchInput.parse(watchId, parser);
}
@Override
public SearchInput.Result parseResult(String watchId, XContentParser parser) throws IOException {
return SearchInput.Result.parse(watchId, parser);
}
@Override
public ExecutableSearchInput createExecutable(SearchInput input) {
return new ExecutableSearchInput(input, inputLogger, scriptService, client);
}
}

View File

@ -0,0 +1,27 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.watcher.input.simple;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.watcher.execution.WatchExecutionContext;
import org.elasticsearch.watcher.input.ExecutableInput;
import java.io.IOException;
/**
* This class just defines a simple xcontent map as an input
*/
public class ExecutableSimpleInput extends ExecutableInput<SimpleInput, SimpleInput.Result> {
public ExecutableSimpleInput(SimpleInput input, ESLogger logger) {
super(input, logger);
}
@Override
public SimpleInput.Result execute(WatchExecutionContext ctx) throws IOException {
return new SimpleInput.Result(input.getPayload());
}
}

View File

@ -5,32 +5,23 @@
*/
package org.elasticsearch.watcher.input.simple;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.watcher.input.Input;
import org.elasticsearch.watcher.input.InputException;
import org.elasticsearch.watcher.watch.Payload;
import org.elasticsearch.watcher.execution.WatchExecutionContext;
import java.io.IOException;
import java.util.Map;
import java.util.Objects;
/**
* This class just defines a simple xcontent map as an input
*
*/
public class SimpleInput extends Input<SimpleInput.Result> {
public class SimpleInput implements Input {
public static final String TYPE = "simple";
private final Payload payload;
public SimpleInput(ESLogger logger, Payload payload) {
super(logger);
public SimpleInput(Payload payload) {
this.payload = payload;
}
@ -39,36 +30,40 @@ public class SimpleInput extends Input<SimpleInput.Result> {
return TYPE;
}
@Override
public Result execute(WatchExecutionContext ctx) throws IOException {
return new Result(payload);
public Payload getPayload() {
return payload;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
return payload.toXContent(builder, params);
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
SimpleInput that = (SimpleInput) o;
return payload.equals(that.payload);
}
@Override
public int hashCode() {
return Objects.hash(payload);
return payload.hashCode();
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null || getClass() != obj.getClass()) {
return false;
}
final SimpleInput other = (SimpleInput) obj;
return Objects.equals(this.payload.data(), other.payload.data());
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
return builder.value(payload);
}
@Override
public String toString() {
return payload.toString();
public static SimpleInput parse(String watchId, XContentParser parser) throws IOException {
if (parser.currentToken() != XContentParser.Token.START_OBJECT) {
throw new SimpleInputException("could not parse [{}] input for watch [{}]. expected an object but found [{}] instead", TYPE, watchId, parser.currentToken());
}
Payload payload = new Payload.Simple(parser.map());
return new SimpleInput(payload);
}
public static Builder builder(Payload payload) {
return new Builder(payload);
}
public static class Result extends Input.Result {
@ -81,31 +76,8 @@ public class SimpleInput extends Input<SimpleInput.Result> {
protected XContentBuilder toXContentBody(XContentBuilder builder, Params params) throws IOException {
return builder;
}
}
public static class Parser extends AbstractComponent implements Input.Parser<Result,SimpleInput> {
@Inject
public Parser(Settings settings) {
super(settings);
}
@Override
public String type() {
return TYPE;
}
@Override
public SimpleInput parse(XContentParser parser) throws IOException {
if (parser.currentToken() != XContentParser.Token.START_OBJECT) {
throw new InputException("could not parse simple input. expected an object but found [" + parser.currentToken() + "]");
}
Payload payload = new Payload.Simple(parser.map());
return new SimpleInput(logger, payload);
}
@Override
public Result parseResult(XContentParser parser) throws IOException {
public static Result parse(String watchId, XContentParser parser) throws IOException {
Payload payload = null;
String currentFieldName = null;
@ -114,44 +86,33 @@ public class SimpleInput extends Input<SimpleInput.Result> {
if (token == XContentParser.Token.FIELD_NAME) {
currentFieldName = parser.currentName();
} else if (token == XContentParser.Token.START_OBJECT && currentFieldName != null) {
if (Input.Result.PAYLOAD_FIELD.match(currentFieldName)) {
if (Field.PAYLOAD.match(currentFieldName)) {
payload = new Payload.XContent(parser);
} else {
throw new InputException("unable to parse [" + TYPE + "] input result. unexpected field [" + currentFieldName + "]");
throw new SimpleInputException("could not parse [{}] input result for watch [{}]. unexpected field [{}]", TYPE, watchId, currentFieldName);
}
}
}
if (payload == null) {
throw new InputException("unable to parse [" + TYPE + "] input result [payload] is a required field");
throw new SimpleInputException("could not parse [{}] input result for watch [{}]. missing required [{}] field", TYPE, watchId, Field.PAYLOAD.getPreferredName());
}
return new Result(payload);
}
}
public static class SourceBuilder implements Input.SourceBuilder {
public static class Builder implements Input.Builder<SimpleInput> {
private Map<String, Object> data;
private final Payload payload;
public SourceBuilder(Map<String, Object> data) {
this.data = data;
}
public Input.SourceBuilder put(String key, Object value) {
data.put(key, value);
return this;
private Builder(Payload payload) {
this.payload = payload;
}
@Override
public String type() {
return TYPE;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
return builder.map(data);
public SimpleInput build() {
return new SimpleInput(payload);
}
}
}

View File

@ -0,0 +1,22 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.watcher.input.simple;
import org.elasticsearch.watcher.input.InputException;
/**
*
*/
public class SimpleInputException extends InputException {
public SimpleInputException(String msg, Object... args) {
super(msg, args);
}
public SimpleInputException(String msg, Throwable cause, Object... args) {
super(msg, cause, args);
}
}

View File

@ -0,0 +1,45 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.watcher.input.simple;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.watcher.input.InputFactory;
import java.io.IOException;
/**
*
*/
public class SimpleInputFactory extends InputFactory<SimpleInput, SimpleInput.Result, ExecutableSimpleInput> {
@Inject
public SimpleInputFactory(Settings settings) {
super(Loggers.getLogger(ExecutableSimpleInput.class, settings));
}
@Override
public String type() {
return SimpleInput.TYPE;
}
@Override
public SimpleInput parseInput(String watchId, XContentParser parser) throws IOException {
return SimpleInput.parse(watchId, parser);
}
@Override
public SimpleInput.Result parseResult(String watchId, XContentParser parser) throws IOException {
return SimpleInput.Result.parse(watchId, parser);
}
@Override
public ExecutableSimpleInput createExecutable(SimpleInput input) {
return new ExecutableSimpleInput(input, inputLogger);
}
}

View File

@ -25,6 +25,7 @@ import org.elasticsearch.watcher.condition.always.AlwaysCondition;
import org.elasticsearch.watcher.execution.ExecutionService;
import org.elasticsearch.watcher.execution.ManualExecutionContext;
import org.elasticsearch.watcher.history.WatchRecord;
import org.elasticsearch.watcher.input.simple.ExecutableSimpleInput;
import org.elasticsearch.watcher.input.simple.SimpleInput;
import org.elasticsearch.watcher.license.LicenseService;
import org.elasticsearch.watcher.support.clock.Clock;

View File

@ -5,8 +5,6 @@
*/
package org.elasticsearch.watcher.watch;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.base.MoreObjects;
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
@ -14,6 +12,7 @@ import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.watcher.WatcherException;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
@ -25,6 +24,8 @@ import static org.elasticsearch.watcher.support.WatcherUtils.responseToData;
*/
public interface Payload extends ToXContent {
Simple EMPTY = new Simple(Collections.<String, Object>emptyMap());
Map<String, Object> data();
class Simple implements Payload {

View File

@ -25,12 +25,12 @@ import org.elasticsearch.watcher.WatcherException;
import org.elasticsearch.watcher.WatcherSettingsException;
import org.elasticsearch.watcher.actions.ActionRegistry;
import org.elasticsearch.watcher.actions.ExecutableActions;
import org.elasticsearch.watcher.condition.ExecutableCondition;
import org.elasticsearch.watcher.condition.ConditionRegistry;
import org.elasticsearch.watcher.condition.ExecutableCondition;
import org.elasticsearch.watcher.condition.always.ExecutableAlwaysCondition;
import org.elasticsearch.watcher.input.Input;
import org.elasticsearch.watcher.input.ExecutableInput;
import org.elasticsearch.watcher.input.InputRegistry;
import org.elasticsearch.watcher.input.NoneInput;
import org.elasticsearch.watcher.input.none.ExecutableNoneInput;
import org.elasticsearch.watcher.license.LicenseService;
import org.elasticsearch.watcher.support.clock.Clock;
import org.elasticsearch.watcher.throttle.Throttler;
@ -56,7 +56,7 @@ public class Watch implements TriggerEngine.Job, ToXContent {
private final String name;
private final Trigger trigger;
private final Input input;
private final ExecutableInput input;
private final ExecutableCondition condition;
private final ExecutableActions actions;
private final Throttler throttler;
@ -71,7 +71,7 @@ public class Watch implements TriggerEngine.Job, ToXContent {
private final transient AtomicLong nonceCounter = new AtomicLong();
public Watch(String name, Clock clock, LicenseService licenseService, Trigger trigger, Input input, ExecutableCondition condition, @Nullable Transform transform,
public Watch(String name, Clock clock, LicenseService licenseService, Trigger trigger, ExecutableInput input, ExecutableCondition condition, @Nullable Transform transform,
ExecutableActions actions, @Nullable Map<String, Object> metadata, @Nullable TimeValue throttlePeriod, @Nullable Status status) {
this.name = name;
this.trigger = trigger;
@ -93,7 +93,7 @@ public class Watch implements TriggerEngine.Job, ToXContent {
return trigger;
}
public Input input() { return input;}
public ExecutableInput input() { return input;}
public ExecutableCondition condition() {
return condition;
@ -195,7 +195,7 @@ public class Watch implements TriggerEngine.Job, ToXContent {
private final InputRegistry inputRegistry;
private final Clock clock;
private final Input defaultInput;
private final ExecutableInput defaultInput;
private final ExecutableCondition defaultCondition;
private final TimeValue defaultThrottleTimePeriod;
@ -213,7 +213,7 @@ public class Watch implements TriggerEngine.Job, ToXContent {
this.inputRegistry = inputRegistry;
this.clock = clock;
this.defaultInput = new NoneInput(logger);
this.defaultInput = new ExecutableNoneInput(logger);
this.defaultCondition = new ExecutableAlwaysCondition(logger);
this.defaultThrottleTimePeriod = settings.getAsTime(DEFAULT_THROTTLE_PERIOD_SETTING, DEFAULT_THROTTLE_PERIOD);
}
@ -231,7 +231,7 @@ public class Watch implements TriggerEngine.Job, ToXContent {
public Watch parse(String id, boolean includeStatus, XContentParser parser) throws IOException {
Trigger trigger = null;
Input input = defaultInput;
ExecutableInput input = defaultInput;
ExecutableCondition condition = defaultCondition;
ExecutableActions actions = null;
Transform transform = null;
@ -250,7 +250,7 @@ public class Watch implements TriggerEngine.Job, ToXContent {
if (TRIGGER_FIELD.match(currentFieldName)) {
trigger = triggerService.parseTrigger(id, parser);
} else if (INPUT_FIELD.match(currentFieldName)) {
input = inputRegistry.parse(parser);
input = inputRegistry.parse(id, parser);
} else if (CONDITION_FIELD.match(currentFieldName)) {
condition = conditionRegistry.parseExecutable(id, parser);
} else if (ACTIONS_FIELD.match(currentFieldName)) {

View File

@ -128,7 +128,7 @@ public class WatchExecution implements ToXContent {
}
} else if (token == XContentParser.Token.START_OBJECT) {
if (INPUT_RESULT_FIELD.match(currentFieldName)) {
inputResult = inputRegistry.parseResult(parser);
inputResult = inputRegistry.parseResult(wid.watchId(), parser);
} else if (CONDITION_RESULT_FIELD.match(currentFieldName)) {
conditionResult = conditionRegistry.parseResult(wid.watchId(), parser);
} else if (Transform.Parser.TRANSFORM_RESULT_FIELD.match(currentFieldName)) {

View File

@ -18,6 +18,7 @@ import org.elasticsearch.watcher.condition.ExecutableCondition;
import org.elasticsearch.watcher.condition.always.AlwaysCondition;
import org.elasticsearch.watcher.condition.never.NeverCondition;
import org.elasticsearch.watcher.history.HistoryStore;
import org.elasticsearch.watcher.input.ExecutableInput;
import org.elasticsearch.watcher.input.Input;
import org.elasticsearch.watcher.support.clock.Clock;
import org.elasticsearch.watcher.support.clock.ClockMock;
@ -41,7 +42,7 @@ import static org.mockito.Mockito.*;
public class ExecutionServiceTests extends ElasticsearchTestCase {
private Payload payload;
private Input input;
private ExecutableInput input;
private Input.Result inputResult;
private ExecutionService executionService;
@ -49,7 +50,7 @@ public class ExecutionServiceTests extends ElasticsearchTestCase {
@Before
public void init() throws Exception {
payload = mock(Payload.class);
input = mock(Input.class);
input = mock(ExecutableInput.class);
inputResult = mock(Input.Result.class);
when(inputResult.payload()).thenReturn(payload);
when(input.execute(any(WatchExecutionContext.class))).thenReturn(inputResult);

View File

@ -17,7 +17,6 @@ import org.elasticsearch.watcher.condition.always.AlwaysCondition;
import org.elasticsearch.watcher.execution.TriggeredExecutionContext;
import org.elasticsearch.watcher.execution.WatchExecutionContext;
import org.elasticsearch.watcher.execution.Wid;
import org.elasticsearch.watcher.input.Input;
import org.elasticsearch.watcher.input.simple.SimpleInput;
import org.elasticsearch.watcher.support.http.HttpRequest;
import org.elasticsearch.watcher.support.http.HttpResponse;
@ -66,7 +65,7 @@ public class WatchRecordTests extends AbstractWatcherIntegrationTests {
.body("{'awesome' : 'us'}")
.build();
ctx.onActionResult(new ActionWrapper.Result("_webhook", new WebhookAction.Result.Executed(request, new HttpResponse(300))));
Input.Result inputResult = new SimpleInput.Result(new Payload.Simple());
SimpleInput.Result inputResult = new SimpleInput.Result(new Payload.Simple());
Condition.Result conditionResult = AlwaysCondition.Result.INSTANCE;
ctx.onThrottleResult(Throttler.NO_THROTTLE.throttle(ctx));
ctx.onInputResult(inputResult);
@ -95,7 +94,7 @@ public class WatchRecordTests extends AbstractWatcherIntegrationTests {
.body("{'awesome' : 'us'}")
.build();
ctx.onActionResult(new ActionWrapper.Result("_webhook", new WebhookAction.Result.Executed(request, new HttpResponse(300))));
Input.Result inputResult = new SimpleInput.Result(new Payload.Simple());
SimpleInput.Result inputResult = new SimpleInput.Result(new Payload.Simple());
Condition.Result conditionResult = AlwaysCondition.Result.INSTANCE;
ctx.onThrottleResult(Throttler.NO_THROTTLE.throttle(ctx));
ctx.onInputResult(inputResult);

View File

@ -5,7 +5,6 @@
*/
package org.elasticsearch.watcher.input.http;
import com.carrotsearch.randomizedtesting.annotations.Repeat;
import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.common.collect.ImmutableMap;
import org.elasticsearch.common.collect.MapBuilder;
@ -21,8 +20,8 @@ import org.elasticsearch.watcher.actions.ExecutableActions;
import org.elasticsearch.watcher.condition.always.ExecutableAlwaysCondition;
import org.elasticsearch.watcher.execution.TriggeredExecutionContext;
import org.elasticsearch.watcher.execution.WatchExecutionContext;
import org.elasticsearch.watcher.input.Input;
import org.elasticsearch.watcher.input.InputBuilders;
import org.elasticsearch.watcher.input.simple.ExecutableSimpleInput;
import org.elasticsearch.watcher.input.simple.SimpleInput;
import org.elasticsearch.watcher.license.LicenseService;
import org.elasticsearch.watcher.support.clock.ClockMock;
@ -55,7 +54,7 @@ import static org.mockito.Mockito.when;
public class HttpInputTests extends ElasticsearchTestCase {
private HttpClient httpClient;
private HttpInput.Parser httpParser;
private HttpInputFactory httpParser;
private TemplateEngine templateEngine;
@Before
@ -63,7 +62,7 @@ public class HttpInputTests extends ElasticsearchTestCase {
httpClient = mock(HttpClient.class);
templateEngine = mock(TemplateEngine.class);
HttpAuthRegistry registry = new HttpAuthRegistry(ImmutableMap.<String, HttpAuth.Parser>of("basic", new BasicAuth.Parser()));
httpParser = new HttpInput.Parser(ImmutableSettings.EMPTY, httpClient, new HttpRequest.Parser(registry), new HttpRequestTemplate.Parser(registry), templateEngine);
httpParser = new HttpInputFactory(ImmutableSettings.EMPTY, httpClient, templateEngine, new HttpRequest.Parser(registry), new HttpRequestTemplate.Parser(registry));
}
@Test
@ -73,7 +72,8 @@ public class HttpInputTests extends ElasticsearchTestCase {
HttpRequestTemplate.Builder request = HttpRequestTemplate.builder(host, port)
.method(HttpMethod.POST)
.body("_body");
HttpInput input = new HttpInput(logger, httpClient, request.build(), null, templateEngine);
HttpInput httpInput = InputBuilders.httpInput(request.build()).build();
ExecutableHttpInput input = new ExecutableHttpInput(httpInput, logger, httpClient, templateEngine);
HttpResponse response = new HttpResponse(123, "{\"key\" : \"value\"}".getBytes(UTF8));
when(httpClient.execute(any(HttpRequest.class))).thenReturn(response);
@ -84,7 +84,7 @@ public class HttpInputTests extends ElasticsearchTestCase {
new ClockMock(),
mock(LicenseService.class),
new ScheduleTrigger(new IntervalSchedule(new IntervalSchedule.Interval(1, IntervalSchedule.Interval.Unit.MINUTES))),
new SimpleInput(logger, new Payload.Simple()),
new ExecutableSimpleInput(new SimpleInput(new Payload.Simple()), logger),
new ExecutableAlwaysCondition(logger),
null,
new ExecutableActions(new ArrayList<ActionWrapper>()),
@ -99,7 +99,7 @@ public class HttpInputTests extends ElasticsearchTestCase {
assertThat(result.payload().data(), equalTo(MapBuilder.<String, Object>newMapBuilder().put("key", "value").map()));
}
@Test @Repeat(iterations = 20)
@Test //@Repeat(iterations = 20)
public void testParser() throws Exception {
final HttpMethod httpMethod = rarely() ? null : randomFrom(HttpMethod.values());
Scheme scheme = randomFrom(Scheme.HTTP, Scheme.HTTPS, null);
@ -125,27 +125,27 @@ public class HttpInputTests extends ElasticsearchTestCase {
requestBuilder.putHeaders(headers);
}
XContentParser parser = XContentHelper.createParser(jsonBuilder().value(InputBuilders.httpInput(requestBuilder)).bytes());
XContentParser parser = XContentHelper.createParser(jsonBuilder().value(InputBuilders.httpInput(requestBuilder).build()).bytes());
parser.nextToken();
HttpInput result = httpParser.parse(parser);
HttpInput result = httpParser.parseInput("_id", parser);
assertThat(result.type(), equalTo(HttpInput.TYPE));
assertThat(result.getRequestTemplate().scheme(), equalTo(scheme != null ? scheme : Scheme.HTTP)); // http is the default
assertThat(result.getRequestTemplate().method(), equalTo(httpMethod != null ? httpMethod : HttpMethod.GET)); // get is the default
assertThat(result.getRequestTemplate().host(), equalTo(host));
assertThat(result.getRequestTemplate().port(), equalTo(port));
assertThat(result.getRequestTemplate().path(), is(new Template(path)));
assertThat(result.getRequest().scheme(), equalTo(scheme != null ? scheme : Scheme.HTTP)); // http is the default
assertThat(result.getRequest().method(), equalTo(httpMethod != null ? httpMethod : HttpMethod.GET)); // get is the default
assertThat(result.getRequest().host(), equalTo(host));
assertThat(result.getRequest().port(), equalTo(port));
assertThat(result.getRequest().path(), is(new Template(path)));
if (params != null) {
assertThat(result.getRequestTemplate().params(), hasEntry(is("a"), is(new Template("b"))));
assertThat(result.getRequest().params(), hasEntry(is("a"), is(new Template("b"))));
}
if (headers != null) {
assertThat(result.getRequestTemplate().headers(), hasEntry(is("c"), is(new Template("d"))));
assertThat(result.getRequest().headers(), hasEntry(is("c"), is(new Template("d"))));
}
assertThat(result.getRequestTemplate().auth(), equalTo(auth));
assertThat(result.getRequest().auth(), equalTo(auth));
if (body != null) {
assertThat(result.getRequestTemplate().body(), is(new Template(body)));
assertThat(result.getRequest().body(), is(new Template(body)));
} else {
assertThat(result.getRequestTemplate().body(), nullValue());
assertThat(result.getRequest().body(), nullValue());
}
}
@ -159,7 +159,7 @@ public class HttpInputTests extends ElasticsearchTestCase {
.endObject();
XContentParser parser = XContentHelper.createParser(builder.bytes());
parser.nextToken();
httpParser.parse(parser);
httpParser.parseInput("_id", parser);
}
@Test
@ -176,23 +176,23 @@ public class HttpInputTests extends ElasticsearchTestCase {
Map<String, Object> payload = MapBuilder.<String, Object>newMapBuilder().put("x", "y").map();
XContentBuilder builder = jsonBuilder().startObject();
builder.field(HttpInput.Parser.HTTP_STATUS_FIELD.getPreferredName(), 123);
builder.field(HttpInput.Parser.REQUEST_FIELD.getPreferredName(), request);
builder.field(Input.Result.PAYLOAD_FIELD.getPreferredName(), payload);
builder.field(HttpInput.Field.HTTP_STATUS.getPreferredName(), 123);
builder.field(HttpInput.Field.SENT_REQUEST.getPreferredName(), request);
builder.field(HttpInput.Field.PAYLOAD.getPreferredName(), payload);
builder.endObject();
XContentParser parser = XContentHelper.createParser(builder.bytes());
parser.nextToken();
HttpInput.Result result = httpParser.parseResult(parser);
HttpInput.Result result = httpParser.parseResult("_id", parser);
assertThat(result.type(), equalTo(HttpInput.TYPE));
assertThat(result.payload().data(), equalTo(payload));
assertThat(result.statusCode(), equalTo(123));
assertThat(result.request().method().method(), equalTo("GET"));
assertThat(result.request().headers().size(), equalTo(headers.size()));
assertThat(result.request().headers(), hasEntry("a", (Object) "b"));
assertThat(result.request().host(), equalTo("_host"));
assertThat(result.request().port(), equalTo(123));
assertThat(result.request().body(), equalTo("_body"));
assertThat(result.sentRequest().method().method(), equalTo("GET"));
assertThat(result.sentRequest().headers().size(), equalTo(headers.size()));
assertThat(result.sentRequest().headers(), hasEntry("a", (Object) "b"));
assertThat(result.sentRequest().host(), equalTo("_host"));
assertThat(result.sentRequest().port(), equalTo(123));
assertThat(result.sentRequest().body(), equalTo("_body"));
}
}

View File

@ -12,7 +12,6 @@ import org.elasticsearch.common.joda.time.DateTimeZone;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
@ -22,8 +21,9 @@ import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.elasticsearch.watcher.actions.ActionWrapper;
import org.elasticsearch.watcher.actions.ExecutableActions;
import org.elasticsearch.watcher.condition.always.ExecutableAlwaysCondition;
import org.elasticsearch.watcher.input.Input;
import org.elasticsearch.watcher.input.InputException;
import org.elasticsearch.watcher.execution.TriggeredExecutionContext;
import org.elasticsearch.watcher.execution.WatchExecutionContext;
import org.elasticsearch.watcher.input.simple.ExecutableSimpleInput;
import org.elasticsearch.watcher.input.simple.SimpleInput;
import org.elasticsearch.watcher.license.LicenseService;
import org.elasticsearch.watcher.support.WatcherUtils;
@ -33,10 +33,8 @@ import org.elasticsearch.watcher.support.init.proxy.ScriptServiceProxy;
import org.elasticsearch.watcher.trigger.schedule.IntervalSchedule;
import org.elasticsearch.watcher.trigger.schedule.ScheduleTrigger;
import org.elasticsearch.watcher.trigger.schedule.ScheduleTriggerEvent;
import org.elasticsearch.watcher.execution.TriggeredExecutionContext;
import org.elasticsearch.watcher.watch.Payload;
import org.elasticsearch.watcher.watch.Watch;
import org.elasticsearch.watcher.execution.WatchExecutionContext;
import org.junit.Test;
import java.util.ArrayList;
@ -63,19 +61,19 @@ public class SearchInputTests extends ElasticsearchIntegrationTest {
filteredQuery(matchQuery("event_type", "a"), rangeFilter("_timestamp").from("{{ctx.trigger.scheduled_time}}||-30s").to("{{ctx.trigger.triggered_time}}")));
SearchRequest request = client()
.prepareSearch()
.setSearchType(SearchInput.DEFAULT_SEARCH_TYPE)
.setSearchType(ExecutableSearchInput.DEFAULT_SEARCH_TYPE)
.request()
.source(searchSourceBuilder);
SearchInput searchInput = new SearchInput(logger,
ExecutableSearchInput searchInput = new ExecutableSearchInput(new SearchInput(request, null), logger,
ScriptServiceProxy.of(internalCluster().getInstance(ScriptService.class)),
ClientProxy.of(client()), request, null);
ClientProxy.of(client()));
WatchExecutionContext ctx = new TriggeredExecutionContext(
new Watch("test-watch",
new ClockMock(),
mock(LicenseService.class),
new ScheduleTrigger(new IntervalSchedule(new IntervalSchedule.Interval(1, IntervalSchedule.Interval.Unit.MINUTES))),
new SimpleInput(logger, new Payload.Simple()),
new ExecutableSimpleInput(new SimpleInput(new Payload.Simple()), logger),
new ExecutableAlwaysCondition(logger),
null,
new ExecutableActions(new ArrayList<ActionWrapper>()),
@ -87,10 +85,10 @@ public class SearchInputTests extends ElasticsearchIntegrationTest {
SearchInput.Result result = searchInput.execute(ctx);
assertThat((Integer) XContentMapValues.extractValue("hits.total", result.payload().data()), equalTo(0));
assertNotNull(result.request());
assertEquals(result.request().searchType(),request.searchType());
assertArrayEquals(result.request().indices(), request.indices());
assertEquals(result.request().indicesOptions(), request.indicesOptions());
assertNotNull(result.executedRequest());
assertEquals(result.executedRequest().searchType(),request.searchType());
assertArrayEquals(result.executedRequest().indices(), request.indices());
assertEquals(result.executedRequest().indicesOptions(), request.indicesOptions());
}
@Test
@ -105,15 +103,15 @@ public class SearchInputTests extends ElasticsearchIntegrationTest {
.request()
.source(searchSourceBuilder);
SearchInput searchInput = new SearchInput(logger,
ExecutableSearchInput searchInput = new ExecutableSearchInput(new SearchInput(request, null), logger,
ScriptServiceProxy.of(internalCluster().getInstance(ScriptService.class)),
ClientProxy.of(client()), request, null);
ClientProxy.of(client()));
WatchExecutionContext ctx = new TriggeredExecutionContext(
new Watch("test-watch",
new ClockMock(),
mock(LicenseService.class),
new ScheduleTrigger(new IntervalSchedule(new IntervalSchedule.Interval(1, IntervalSchedule.Interval.Unit.MINUTES))),
new SimpleInput(logger, new Payload.Simple()),
new ExecutableSimpleInput(new SimpleInput(new Payload.Simple()), logger),
new ExecutableAlwaysCondition(logger),
null,
new ExecutableActions(new ArrayList<ActionWrapper>()),
@ -125,49 +123,50 @@ public class SearchInputTests extends ElasticsearchIntegrationTest {
SearchInput.Result result = searchInput.execute(ctx);
assertThat((Integer) XContentMapValues.extractValue("hits.total", result.payload().data()), equalTo(0));
assertNotNull(result.request());
assertEquals(result.request().searchType(), searchType);
assertArrayEquals(result.request().indices(), request.indices());
assertEquals(result.request().indicesOptions(), request.indicesOptions());
assertNotNull(result.executedRequest());
assertEquals(result.executedRequest().searchType(), searchType);
assertArrayEquals(result.executedRequest().indices(), request.indices());
assertEquals(result.executedRequest().indicesOptions(), request.indicesOptions());
}
@Test
public void testParser_Valid() throws Exception {
SearchRequest request = client().prepareSearch()
.setSearchType(SearchInput.DEFAULT_SEARCH_TYPE)
.setSearchType(ExecutableSearchInput.DEFAULT_SEARCH_TYPE)
.request()
.source(searchSource()
.query(filteredQuery(matchQuery("event_type", "a"), rangeFilter("_timestamp").from("{{ctx.trigger.scheduled_time}}||-30s").to("{{ctx.trigger.triggered_time}}"))));
SearchInput.SourceBuilder sourceBuilder = new SearchInput.SourceBuilder(request);
XContentBuilder builder = jsonBuilder().value(sourceBuilder);
XContentBuilder builder = jsonBuilder().value(new SearchInput(request, null));
XContentParser parser = JsonXContent.jsonXContent.createParser(builder.bytes());
parser.nextToken();
Input.Parser searchInputParser = new SearchInput.Parser(ImmutableSettings.EMPTY,
SearchInputFactory factory = new SearchInputFactory(ImmutableSettings.EMPTY,
ScriptServiceProxy.of(internalCluster().getInstance(ScriptService.class)),
ClientProxy.of(client()));
Input searchInput = searchInputParser.parse(parser);
SearchInput searchInput = factory.parseInput("_id", parser);
assertEquals(SearchInput.TYPE, searchInput.type());
}
@Test(expected = InputException.class)
@Test(expected = SearchInputException.class)
public void testParser_Invalid() throws Exception {
Input.Parser searchInputParser = new SearchInput.Parser(ImmutableSettings.settingsBuilder().build(),
SearchInputFactory factory = new SearchInputFactory(ImmutableSettings.settingsBuilder().build(),
ScriptServiceProxy.of(internalCluster().getInstance(ScriptService.class)),
ClientProxy.of(client()));
Map<String, Object> data = new HashMap<>();
data.put("foo", "bar");
data.put("baz", new ArrayList<String>() );
data.put("baz", new ArrayList<String>());
XContentBuilder jsonBuilder = jsonBuilder();
jsonBuilder.startObject();
jsonBuilder.field(Input.Result.PAYLOAD_FIELD.getPreferredName(), data);
jsonBuilder.field(SearchInput.Field.PAYLOAD.getPreferredName(), data);
jsonBuilder.endObject();
searchInputParser.parseResult(XContentFactory.xContent(jsonBuilder.bytes()).createParser(jsonBuilder.bytes()));
XContentParser parser = JsonXContent.jsonXContent.createParser(jsonBuilder.bytes());
parser.nextToken();
factory.parseResult("_id", parser);
fail("result parsing should fail if payload is provided but request is missing");
}
@ -181,30 +180,29 @@ public class SearchInputTests extends ElasticsearchIntegrationTest {
filteredQuery(matchQuery("event_type", "a"), rangeFilter("_timestamp").from("{{ctx.triggered.scheduled_time}}||-30s").to("{{ctx.triggered.triggered_time}}")));
SearchRequest request = client()
.prepareSearch()
.setSearchType(SearchInput.DEFAULT_SEARCH_TYPE)
.setSearchType(ExecutableSearchInput.DEFAULT_SEARCH_TYPE)
.request()
.source(searchSourceBuilder);
XContentBuilder jsonBuilder = jsonBuilder();
jsonBuilder.startObject();
jsonBuilder.field(Input.Result.PAYLOAD_FIELD.getPreferredName(), data);
jsonBuilder.field(SearchInput.Parser.REQUEST_FIELD.getPreferredName());
jsonBuilder.field(SearchInput.Field.PAYLOAD.getPreferredName(), data);
jsonBuilder.field(SearchInput.Field.EXECUTED_REQUEST.getPreferredName());
WatcherUtils.writeSearchRequest(request, jsonBuilder, ToXContent.EMPTY_PARAMS);
jsonBuilder.endObject();
Input.Parser searchInputParser = new SearchInput.Parser(ImmutableSettings.settingsBuilder().build(),
SearchInputFactory factory = new SearchInputFactory(ImmutableSettings.settingsBuilder().build(),
ScriptServiceProxy.of(internalCluster().getInstance(ScriptService.class)),
ClientProxy.of(client()));
Input.Result result = searchInputParser.parseResult(XContentFactory.xContent(jsonBuilder.bytes()).createParser(jsonBuilder.bytes()));
XContentParser parser = JsonXContent.jsonXContent.createParser(jsonBuilder.bytes());
parser.nextToken();
SearchInput.Result result = factory.parseResult("_id", parser);
assertEquals(SearchInput.TYPE, result.type());
assertEquals(result.payload().data().get("foo"), "bar");
List baz = (List)result.payload().data().get("baz");
assertTrue(baz.isEmpty());
assertTrue(result instanceof SearchInput.Result);
SearchInput.Result searchInputResult = (SearchInput.Result) result;
assertNotNull(searchInputResult.request());
assertNotNull(result.executedRequest());
}
}

View File

@ -7,8 +7,10 @@ package org.elasticsearch.watcher.input.simple;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.watcher.watch.Payload;
import org.elasticsearch.watcher.input.Input;
import org.elasticsearch.watcher.input.InputFactory;
import org.elasticsearch.watcher.watch.Payload;
import org.elasticsearch.watcher.input.ExecutableInput;
import org.elasticsearch.watcher.input.InputException;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.xcontent.XContentBuilder;
@ -32,7 +34,7 @@ public class SimpleInputTests extends ElasticsearchTestCase {
Map<String, Object> data = new HashMap<>();
data.put("foo", "bar");
data.put("baz", new ArrayList<String>() );
Input staticInput = new SimpleInput(logger, new Payload.Simple(data));
ExecutableInput staticInput = new ExecutableSimpleInput(new SimpleInput(new Payload.Simple(data)), logger);
Input.Result staticResult = staticInput.execute(null);
assertEquals(staticResult.payload().data().get("foo"), "bar");
@ -48,10 +50,10 @@ public class SimpleInputTests extends ElasticsearchTestCase {
data.put("baz", new ArrayList<String>());
XContentBuilder jsonBuilder = jsonBuilder().value(data);
Input.Parser parser = new SimpleInput.Parser(ImmutableSettings.builder().build());
InputFactory parser = new SimpleInputFactory(ImmutableSettings.builder().build());
XContentParser xContentParser = JsonXContent.jsonXContent.createParser(jsonBuilder.bytes());
xContentParser.nextToken();
Input input = parser.parse(xContentParser);
ExecutableInput input = parser.parseExecutable("_id", xContentParser);
assertEquals(input.type(), SimpleInput.TYPE);
@ -67,10 +69,10 @@ public class SimpleInputTests extends ElasticsearchTestCase {
XContentBuilder jsonBuilder = jsonBuilder().value("just a string");
Input.Parser parser = new SimpleInput.Parser(ImmutableSettings.builder().build());
InputFactory parser = new SimpleInputFactory(ImmutableSettings.builder().build());
XContentParser xContentParser = JsonXContent.jsonXContent.createParser(jsonBuilder.bytes());
xContentParser.nextToken();
parser.parse(xContentParser);
parser.parseInput("_id", xContentParser);
fail("[simple] input parse should fail with an InputException for an empty json object");
}
@ -82,11 +84,11 @@ public class SimpleInputTests extends ElasticsearchTestCase {
XContentBuilder jsonBuilder = jsonBuilder();
jsonBuilder.startObject();
jsonBuilder.field(Input.Result.PAYLOAD_FIELD.getPreferredName(), data);
jsonBuilder.field(SimpleInput.Field.PAYLOAD.getPreferredName(), data);
jsonBuilder.endObject();
Input.Parser parser = new SimpleInput.Parser(ImmutableSettings.builder().build());
Input.Result staticResult = parser.parseResult(XContentFactory.xContent(jsonBuilder.bytes()).createParser(jsonBuilder.bytes()));
SimpleInputFactory parser = new SimpleInputFactory(ImmutableSettings.builder().build());
SimpleInput.Result staticResult = parser.parseResult("_id", XContentFactory.xContent(jsonBuilder.bytes()).createParser(jsonBuilder.bytes()));
assertEquals(staticResult.type(), SimpleInput.TYPE);
assertEquals(staticResult.payload().data().get("foo"), "bar");
@ -100,8 +102,8 @@ public class SimpleInputTests extends ElasticsearchTestCase {
jsonBuilder.startObject();
jsonBuilder.endObject();
Input.Parser parser = new SimpleInput.Parser(ImmutableSettings.builder().build());
parser.parseResult(XContentFactory.xContent(jsonBuilder.bytes()).createParser(jsonBuilder.bytes()));
InputFactory parser = new SimpleInputFactory(ImmutableSettings.builder().build());
parser.parseResult("_id", XContentFactory.xContent(jsonBuilder.bytes()).createParser(jsonBuilder.bytes()));
fail("[simple] input result parse should fail with an InputException for an empty json object");
}

View File

@ -20,7 +20,7 @@ import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.test.ElasticsearchTestCase;
import org.elasticsearch.watcher.input.search.SearchInput;
import org.elasticsearch.watcher.input.search.ExecutableSearchInput;
import org.junit.Test;
import java.io.IOException;
@ -121,7 +121,7 @@ public class WatcherUtilsTests extends ElasticsearchTestCase {
builder = WatcherUtils.writeSearchRequest(expectedRequest, builder, ToXContent.EMPTY_PARAMS);
XContentParser parser = XContentHelper.createParser(builder.bytes());
assertThat(parser.nextToken(), equalTo(XContentParser.Token.START_OBJECT));
SearchRequest result = WatcherUtils.readSearchRequest(parser, SearchInput.DEFAULT_SEARCH_TYPE);
SearchRequest result = WatcherUtils.readSearchRequest(parser, ExecutableSearchInput.DEFAULT_SEARCH_TYPE);
assertThat(result.indices(), arrayContainingInAnyOrder(expectedRequest.indices()));
assertThat(result.types(), arrayContainingInAnyOrder(expectedRequest.types()));
@ -212,7 +212,7 @@ public class WatcherUtilsTests extends ElasticsearchTestCase {
XContentParser parser = XContentHelper.createParser(builder.bytes());
assertThat(parser.nextToken(), equalTo(XContentParser.Token.START_OBJECT));
SearchRequest result = WatcherUtils.readSearchRequest(parser, SearchInput.DEFAULT_SEARCH_TYPE);
SearchRequest result = WatcherUtils.readSearchRequest(parser, ExecutableSearchInput.DEFAULT_SEARCH_TYPE);
assertThat(result.indices(), arrayContainingInAnyOrder(indices));
assertThat(result.types(), arrayContainingInAnyOrder(types));

View File

@ -29,7 +29,8 @@ import org.elasticsearch.watcher.actions.webhook.WebhookAction;
import org.elasticsearch.watcher.condition.script.ExecutableScriptCondition;
import org.elasticsearch.watcher.condition.script.ScriptCondition;
import org.elasticsearch.watcher.execution.WatchExecutionContext;
import org.elasticsearch.watcher.input.search.SearchInput;
import org.elasticsearch.watcher.input.search.ExecutableSearchInput;
import org.elasticsearch.watcher.input.simple.ExecutableSimpleInput;
import org.elasticsearch.watcher.input.simple.SimpleInput;
import org.elasticsearch.watcher.license.LicenseService;
import org.elasticsearch.watcher.support.Script;
@ -74,7 +75,7 @@ public final class WatcherTestUtils {
public static SearchRequest newInputSearchRequest(String... indices) {
SearchRequest request = new SearchRequest(indices);
request.indicesOptions(WatcherUtils.DEFAULT_INDICES_OPTIONS);
request.searchType(SearchInput.DEFAULT_SEARCH_TYPE);
request.searchType(ExecutableSearchInput.DEFAULT_SEARCH_TYPE);
return request;
}
@ -131,7 +132,7 @@ public final class WatcherTestUtils {
SearchRequest conditionRequest = newInputSearchRequest("my-condition-index").source(searchSource().query(matchAllQuery()));
SearchRequest transformRequest = newInputSearchRequest("my-payload-index").source(searchSource().query(matchAllQuery()));
transformRequest.searchType(SearchTransform.DEFAULT_SEARCH_TYPE);
conditionRequest.searchType(SearchInput.DEFAULT_SEARCH_TYPE);
conditionRequest.searchType(ExecutableSearchInput.DEFAULT_SEARCH_TYPE);
List<ActionWrapper> actions = new ArrayList<>();
@ -178,7 +179,7 @@ public final class WatcherTestUtils {
SystemClock.INSTANCE,
licenseService,
new ScheduleTrigger(new CronSchedule("0/5 * * * * ? *")),
new SimpleInput(logger, new Payload.Simple(inputData)),
new ExecutableSimpleInput(new SimpleInput(new Payload.Simple(inputData)), logger),
new ExecutableScriptCondition(new ScriptCondition(new Script("return true")), logger, scriptService),
new SearchTransform(logger, scriptService, client, transformRequest),
new ExecutableActions(actions),

View File

@ -124,7 +124,7 @@ public class WatcherBenchmark {
.trigger(schedule(interval("5s")))
.input(searchInput(new SearchRequest()
.source(new SearchSourceBuilder()))
.addExtractKey("hits.total")
.extractKeys("hits.total")
)
.condition(scriptCondition("1 == 1"))
.addAction("_id", indexAction("index", "type")));

View File

@ -25,7 +25,6 @@ import org.elasticsearch.watcher.transport.actions.put.PutWatchResponse;
import org.elasticsearch.watcher.trigger.schedule.IntervalSchedule;
import org.elasticsearch.watcher.trigger.schedule.Schedules;
import org.elasticsearch.watcher.watch.WatchStore;
import org.junit.Ignore;
import org.junit.Test;
import java.util.Map;
@ -284,14 +283,14 @@ public class BasicWatcherTests extends AbstractWatcherIntegrationTests {
watcherClient.preparePutWatch("_name1")
.setSource(watchBuilder()
.trigger(schedule(interval(5, IntervalSchedule.Interval.Unit.SECONDS)))
.input(searchInput(searchRequest).addExtractKey("hits.total"))
.input(searchInput(searchRequest).extractKeys("hits.total"))
.condition(scriptCondition("ctx.payload.hits.total == 1")))
.get();
// in this watcher the condition will fail, because max_score isn't extracted, only total:
watcherClient.preparePutWatch("_name2")
.setSource(watchBuilder()
.trigger(schedule(interval(5, IntervalSchedule.Interval.Unit.SECONDS)))
.input(searchInput(searchRequest).addExtractKey("hits.total"))
.input(searchInput(searchRequest).extractKeys("hits.total"))
.condition(scriptCondition("ctx.payload.hits.max_score >= 0")))
.get();

View File

@ -21,6 +21,7 @@ import org.elasticsearch.watcher.condition.script.ScriptCondition;
import org.elasticsearch.watcher.execution.Wid;
import org.elasticsearch.watcher.history.HistoryStore;
import org.elasticsearch.watcher.history.WatchRecord;
import org.elasticsearch.watcher.input.search.ExecutableSearchInput;
import org.elasticsearch.watcher.input.search.SearchInput;
import org.elasticsearch.watcher.support.Script;
import org.elasticsearch.watcher.support.clock.SystemClock;
@ -88,7 +89,7 @@ public class BootStrapTests extends AbstractWatcherIntegrationTests {
SystemClock.INSTANCE,
licenseService(),
new ScheduleTrigger(new CronSchedule("0/5 * * * * ? 2035")), //Set this into the future so we don't get any extra runs
new SearchInput(logger, scriptService(), ClientProxy.of(client()), searchRequest, null),
new ExecutableSearchInput(new SearchInput(searchRequest, null), logger, scriptService(), ClientProxy.of(client())),
new ExecutableScriptCondition(new ScriptCondition(new Script("return true")), logger, scriptService()),
new SearchTransform(logger, scriptService(), ClientProxy.of(client()), searchRequest),
new ExecutableActions(new ArrayList<ActionWrapper>()),
@ -150,8 +151,7 @@ public class BootStrapTests extends AbstractWatcherIntegrationTests {
SystemClock.INSTANCE,
licenseService(),
new ScheduleTrigger(new CronSchedule("0/5 * * * * ? 2035")), //Set a cron schedule far into the future so this watch is never scheduled
new SearchInput(logger, scriptService(), ClientProxy.of(client()),
searchRequest, null),
new ExecutableSearchInput(new SearchInput(searchRequest, null), logger, scriptService(), ClientProxy.of(client())),
new ExecutableScriptCondition(new ScriptCondition(new Script("return true")), logger, scriptService()),
new SearchTransform(logger, scriptService(), ClientProxy.of(client()), searchRequest),
new ExecutableActions(new ArrayList<ActionWrapper>()),

View File

@ -100,7 +100,7 @@ public class HttpInputIntegrationTest extends AbstractWatcherIntegrationTests {
watcherClient.preparePutWatch("_name1")
.setSource(watchBuilder()
.trigger(schedule(interval(5, IntervalSchedule.Interval.Unit.SECONDS)))
.input(httpInput(requestBuilder).addExtractKey("hits.total"))
.input(httpInput(requestBuilder).extractKeys("hits.total"))
.condition(scriptCondition("ctx.payload.hits.total == 1")))
.get();
@ -108,7 +108,7 @@ public class HttpInputIntegrationTest extends AbstractWatcherIntegrationTests {
watcherClient.preparePutWatch("_name2")
.setSource(watchBuilder()
.trigger(schedule(interval(5, IntervalSchedule.Interval.Unit.SECONDS)))
.input(httpInput(requestBuilder).addExtractKey("hits.total"))
.input(httpInput(requestBuilder).extractKeys("hits.total"))
.condition(scriptCondition("ctx.payload.hits.max_score >= 0")))
.get();

View File

@ -43,10 +43,16 @@ import org.elasticsearch.watcher.condition.always.ExecutableAlwaysCondition;
import org.elasticsearch.watcher.condition.script.ExecutableScriptCondition;
import org.elasticsearch.watcher.condition.script.ScriptCondition;
import org.elasticsearch.watcher.condition.script.ScriptConditionFactory;
import org.elasticsearch.watcher.input.Input;
import org.elasticsearch.watcher.input.ExecutableInput;
import org.elasticsearch.watcher.input.InputBuilders;
import org.elasticsearch.watcher.input.InputFactory;
import org.elasticsearch.watcher.input.InputRegistry;
import org.elasticsearch.watcher.input.search.ExecutableSearchInput;
import org.elasticsearch.watcher.input.search.SearchInput;
import org.elasticsearch.watcher.input.search.SearchInputFactory;
import org.elasticsearch.watcher.input.simple.ExecutableSimpleInput;
import org.elasticsearch.watcher.input.simple.SimpleInput;
import org.elasticsearch.watcher.input.simple.SimpleInputFactory;
import org.elasticsearch.watcher.license.LicenseService;
import org.elasticsearch.watcher.support.Script;
import org.elasticsearch.watcher.support.WatcherUtils;
@ -76,6 +82,7 @@ import java.io.IOException;
import java.util.Collection;
import java.util.Map;
import static org.elasticsearch.watcher.input.InputBuilders.searchInput;
import static org.elasticsearch.watcher.test.WatcherTestUtils.matchAllRequest;
import static org.hamcrest.Matchers.equalTo;
import static org.mockito.Mockito.mock;
@ -113,7 +120,7 @@ public class WatchTests extends ElasticsearchTestCase {
TriggerEngine triggerEngine = new ParseOnlyScheduleTriggerEngine(ImmutableSettings.EMPTY, scheduleRegistry);
TriggerService triggerService = new TriggerService(ImmutableSettings.EMPTY, ImmutableSet.of(triggerEngine));
Input input = randomInput();
ExecutableInput input = randomInput();
InputRegistry inputRegistry = registry(input);
ExecutableCondition condition = randomCondition();
@ -201,24 +208,26 @@ public class WatchTests extends ElasticsearchTestCase {
}
}
private Input randomInput() {
private ExecutableInput randomInput() {
String type = randomFrom(SearchInput.TYPE, SimpleInput.TYPE);
switch (type) {
case SearchInput.TYPE:
return new SearchInput(logger, scriptService, client, WatcherTestUtils.newInputSearchRequest("idx"), null);
SearchInput searchInput = searchInput(WatcherTestUtils.newInputSearchRequest("idx")).build();
return new ExecutableSearchInput(searchInput, logger, scriptService, client);
default:
return new SimpleInput(logger, new Payload.Simple(ImmutableMap.<String, Object>builder().put("_key", "_val").build()));
SimpleInput simpleInput = InputBuilders.simpleInput(ImmutableMap.<String, Object>builder().put("_key", "_val")).build();
return new ExecutableSimpleInput(simpleInput, logger);
}
}
private InputRegistry registry(Input input) {
ImmutableMap.Builder<String, Input.Parser> parsers = ImmutableMap.builder();
private InputRegistry registry(ExecutableInput input) {
ImmutableMap.Builder<String, InputFactory> parsers = ImmutableMap.builder();
switch (input.type()) {
case SearchInput.TYPE:
parsers.put(SearchInput.TYPE, new SearchInput.Parser(settings, scriptService, client));
parsers.put(SearchInput.TYPE, new SearchInputFactory(settings, scriptService, client));
return new InputRegistry(parsers.build());
default:
parsers.put(SimpleInput.TYPE, new SimpleInput.Parser(settings));
parsers.put(SimpleInput.TYPE, new SimpleInputFactory(settings));
return new InputRegistry(parsers.build());
}
}