watcher: Watcher search templates shouldn't serialize SearchSourceBuilder to a string, template that and turn it back into a SearchSourceBuilder

Instead watcher search template should be agnostic of SearchSourceBuilder and just work with BytesReference, so that serializing to a string before templating isn't needed.

Original commit: elastic/x-pack-elasticsearch@36d21ec819
This commit is contained in:
Martijn van Groningen 2016-08-16 20:26:28 +02:00
parent 175867088d
commit f291f292bf
29 changed files with 402 additions and 465 deletions

View File

@ -95,8 +95,8 @@ setup:
- match: { "watch_record.result.input.status": "success" }
- match: { "watch_record.result.input.payload.hits.total": 4 }
# makes sure that the mustache template snippets have been resolved correctly:
- match: { "watch_record.result.input.search.request.body.query.bool.filter.0.range.date.from": "2015-01-04T00:00:00.000Z||-3d" }
- match: { "watch_record.result.input.search.request.body.query.bool.filter.0.range.date.to": "2015-01-04T00:00:00.000Z" }
- match: { "watch_record.result.input.search.request.body.query.bool.filter.0.range.date.gte": "2015-01-04T00:00:00.000Z||-3d" }
- match: { "watch_record.result.input.search.request.body.query.bool.filter.0.range.date.lte": "2015-01-04T00:00:00.000Z" }
---
"Test transform mustache integration":
@ -152,6 +152,6 @@ setup:
- match: { "watch_record.result.transform.payload.hits.total": 1 }
- match: { "watch_record.result.transform.payload.hits.hits.0._id": "3" }
# makes sure that the mustache template snippets have been resolved correctly:
- match: { "watch_record.result.transform.search.request.body.query.bool.filter.0.range.date.from": "2015-01-04T00:00:00.000Z||-1d" }
- match: { "watch_record.result.transform.search.request.body.query.bool.filter.0.range.date.to": "2015-01-04T00:00:00.000Z" }
- match: { "watch_record.result.transform.search.request.body.query.bool.filter.1.term.value.value": "val_3" }
- match: { "watch_record.result.transform.search.request.body.query.bool.filter.0.range.date.gte": "2015-01-04T00:00:00.000Z||-1d" }
- match: { "watch_record.result.transform.search.request.body.query.bool.filter.0.range.date.lte": "2015-01-04T00:00:00.000Z" }
- match: { "watch_record.result.transform.search.request.body.query.bool.filter.1.term.value": "val_3" }

View File

@ -5,7 +5,6 @@
*/
package org.elasticsearch.xpack.watcher.input;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.xpack.common.http.HttpRequestTemplate;
import org.elasticsearch.xpack.watcher.input.chain.ChainInput;
@ -19,9 +18,6 @@ import org.elasticsearch.xpack.watcher.watch.Payload;
import java.util.HashMap;
import java.util.Map;
/**
*
*/
public final class InputBuilders {
private InputBuilders() {
@ -35,12 +31,8 @@ public final class InputBuilders {
return SearchInput.builder(request);
}
public static SearchInput.Builder searchInput(SearchRequest request) {
return searchInput(new WatcherSearchTemplateRequest(request));
}
public static SimpleInput.Builder simpleInput() {
return simpleInput(new HashMap<String, Object>());
return simpleInput(new HashMap<>());
}
public static SimpleInput.Builder simpleInput(String key, Object value) {

View File

@ -5,10 +5,10 @@
*/
package org.elasticsearch.xpack.watcher.input.search;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
@ -17,8 +17,10 @@ import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.xpack.watcher.execution.WatchExecutionContext;
import org.elasticsearch.xpack.watcher.input.ExecutableInput;
import org.elasticsearch.xpack.watcher.support.WatcherScript;
import org.elasticsearch.xpack.watcher.support.XContentFilterKeysUtils;
import org.elasticsearch.xpack.watcher.support.init.proxy.WatcherClientProxy;
import org.elasticsearch.xpack.watcher.support.search.WatcherSearchTemplateRequest;
import org.elasticsearch.xpack.watcher.support.search.WatcherSearchTemplateService;
import org.elasticsearch.xpack.watcher.watch.Payload;
@ -47,9 +49,12 @@ public class ExecutableSearchInput extends ExecutableInput<SearchInput, SearchIn
@Override
public SearchInput.Result execute(WatchExecutionContext ctx, Payload payload) {
SearchRequest request = null;
WatcherSearchTemplateRequest request = null;
try {
request = searchTemplateService.createSearchRequestFromPrototype(input.getRequest(), ctx, payload);
WatcherScript template = input.getRequest().getOrCreateTemplate();
BytesReference renderedTemplate = searchTemplateService.renderTemplate(template, ctx, payload);
// We need to make a copy, so that we don't modify the original instance that we keep around in a watch:
request = new WatcherSearchTemplateRequest(input.getRequest(), renderedTemplate);
return doExecute(ctx, request);
} catch (Exception e) {
logger.error("failed to execute [{}] input for [{}]", e, SearchInput.TYPE, ctx.watch());
@ -57,12 +62,12 @@ public class ExecutableSearchInput extends ExecutableInput<SearchInput, SearchIn
}
}
SearchInput.Result doExecute(WatchExecutionContext ctx, SearchRequest request) throws Exception {
SearchInput.Result doExecute(WatchExecutionContext ctx, WatcherSearchTemplateRequest request) throws Exception {
if (logger.isTraceEnabled()) {
logger.trace("[{}] running query for [{}] [{}]", ctx.id(), ctx.watch().id(), XContentHelper.toString(request.source()));
logger.trace("[{}] running query for [{}] [{}]", ctx.id(), ctx.watch().id(), request.getSearchSource().utf8ToString());
}
SearchResponse response = client.search(request, timeout);
SearchResponse response = client.search(searchTemplateService.toSearchRequest(request), timeout);
if (logger.isDebugEnabled()) {
logger.debug("[{}] found [{}] hits", ctx.id(), response.getHits().getTotalHits());

View File

@ -6,7 +6,6 @@
package org.elasticsearch.xpack.watcher.input.search;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.ParseFieldMatcher;
@ -30,9 +29,6 @@ import java.util.Set;
import static java.util.Collections.unmodifiableSet;
/**
*
*/
public class SearchInput implements Input {
public static final String TYPE = "search";
@ -127,8 +123,7 @@ public class SearchInput implements Input {
currentFieldName = parser.currentName();
} else if (ParseFieldMatcher.STRICT.match(currentFieldName, Field.REQUEST)) {
try {
request = WatcherSearchTemplateRequest.fromXContent(parser, ExecutableSearchInput.DEFAULT_SEARCH_TYPE, context,
aggParsers, suggesters);
request = WatcherSearchTemplateRequest.fromXContent(parser, ExecutableSearchInput.DEFAULT_SEARCH_TYPE);
} catch (ElasticsearchParseException srpe) {
throw new ElasticsearchParseException("could not parse [{}] input for watch [{}]. failed to parse [{}]", srpe, TYPE,
watchId, currentFieldName);
@ -176,19 +171,19 @@ public class SearchInput implements Input {
public static class Result extends Input.Result {
@Nullable private final SearchRequest request;
@Nullable private final WatcherSearchTemplateRequest request;
public Result(SearchRequest request, Payload payload) {
public Result(WatcherSearchTemplateRequest request, Payload payload) {
super(TYPE, payload);
this.request = request;
}
public Result(@Nullable SearchRequest request, Exception e) {
public Result(@Nullable WatcherSearchTemplateRequest request, Exception e) {
super(TYPE, e);
this.request = request;
}
public SearchRequest executedRequest() {
public WatcherSearchTemplateRequest executedRequest() {
return request;
}
@ -198,7 +193,7 @@ public class SearchInput implements Input {
return builder;
}
builder.startObject(type);
builder.field(Field.REQUEST.getPreferredName(), new WatcherSearchTemplateRequest(request));
builder.field(Field.REQUEST.getPreferredName(), request);
return builder.endObject();
}
}

View File

@ -21,9 +21,8 @@ import java.io.IOException;
import java.util.Collections;
import java.util.Map;
/**
*
*/
// TODO: remove this class as it is exactly the same as org.elasticsearch.script.Script
// and we should be able to remove it without breaking bwc in the .watch index
public class WatcherScript implements ToXContent {
public static final String DEFAULT_LANG = ScriptSettings.DEFAULT_LANG;

View File

@ -6,25 +6,21 @@
package org.elasticsearch.xpack.watcher.support.search;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.ParseFieldMatcher;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.query.QueryParseContext;
import org.elasticsearch.search.aggregations.AggregatorParsers;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.suggest.Suggesters;
import org.elasticsearch.xpack.watcher.support.WatcherScript;
import org.elasticsearch.xpack.watcher.support.SearchRequestEquivalence;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Locale;
import java.util.Objects;
@ -35,60 +31,122 @@ import java.util.Objects;
*/
public class WatcherSearchTemplateRequest implements ToXContent {
private final SearchRequest request;
@Nullable private final WatcherScript template;
private static final String DEFAULT_LANG = "mustache";
public WatcherSearchTemplateRequest(SearchRequest searchRequest, @Nullable WatcherScript template) {
this.request = Objects.requireNonNull(searchRequest);
private final String[] indices;
private final String[] types;
private final SearchType searchType;
private final IndicesOptions indicesOptions;
private final WatcherScript template;
private final BytesReference searchSource;
public WatcherSearchTemplateRequest(String[] indices, String[] types, SearchType searchType, IndicesOptions indicesOptions,
BytesReference searchSource) {
this.indices = indices;
this.types = types;
this.searchType = searchType;
this.indicesOptions = indicesOptions;
// Here we convert a watch search request body into an inline search template,
// this way if any Watcher related context variables are used, they will get resolved.
this.template = WatcherScript.inline(searchSource.utf8ToString()).lang(DEFAULT_LANG).build();
this.searchSource = null;
}
public WatcherSearchTemplateRequest(String[] indices, String[] types, SearchType searchType, IndicesOptions indicesOptions,
WatcherScript template) {
this.indices = indices;
this.types = types;
this.searchType = searchType;
this.indicesOptions = indicesOptions;
this.template = template;
this.searchSource = null;
}
public WatcherSearchTemplateRequest(SearchRequest request) {
this(request, null);
public WatcherSearchTemplateRequest(WatcherSearchTemplateRequest original, BytesReference source) {
this.indices = original.indices;
this.types = original.types;
this.searchType = original.searchType;
this.indicesOptions = original.indicesOptions;
this.searchSource = source;
this.template = original.template;
}
public SearchRequest getRequest() {
return request;
private WatcherSearchTemplateRequest(String[] indices, String[] types, SearchType searchType, IndicesOptions indicesOptions,
BytesReference searchSource, WatcherScript template) {
this.indices = indices;
this.types = types;
this.searchType = searchType;
this.indicesOptions = indicesOptions;
this.template = template;
this.searchSource = searchSource;
}
@Nullable
public WatcherScript getTemplate() {
return template;
}
public String[] getIndices() {
return indices;
}
public String[] getTypes() {
return types;
}
public SearchType getSearchType() {
return searchType;
}
public IndicesOptions getIndicesOptions() {
return indicesOptions;
}
@Nullable
public BytesReference getSearchSource() {
return searchSource;
}
public WatcherScript getOrCreateTemplate() {
if (template != null) {
return template;
} else {
return WatcherScript.inline(searchSource.utf8ToString()).lang(DEFAULT_LANG).build();
}
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
if (request != null) {
if (request.searchType() != null) {
builder.field(SEARCH_TYPE_FIELD.getPreferredName(), request.searchType().toString().toLowerCase(Locale.ENGLISH));
}
if (request.indices() != null) {
builder.array(INDICES_FIELD.getPreferredName(), request.indices());
}
if (request.types() != null) {
builder.array(TYPES_FIELD.getPreferredName(), request.types());
}
if (request.source() != null) {
builder.field(BODY_FIELD.getPreferredName(), request.source());
}
if (request.indicesOptions() != DEFAULT_INDICES_OPTIONS) {
IndicesOptions options = request.indicesOptions();
builder.startObject(INDICES_OPTIONS_FIELD.getPreferredName());
String value;
if (options.expandWildcardsClosed() && options.expandWildcardsOpen()) {
value = "all";
} else if (options.expandWildcardsOpen()) {
value = "open";
} else if (options.expandWildcardsClosed()) {
value = "closed";
} else {
value = "none";
}
builder.field(EXPAND_WILDCARDS_FIELD.getPreferredName(), value);
builder.field(IGNORE_UNAVAILABLE_FIELD.getPreferredName(), options.ignoreUnavailable());
builder.field(ALLOW_NO_INDICES_FIELD.getPreferredName(), options.allowNoIndices());
builder.endObject();
if (searchType != null) {
builder.field(SEARCH_TYPE_FIELD.getPreferredName(), searchType.toString().toLowerCase(Locale.ENGLISH));
}
if (indices != null) {
builder.array(INDICES_FIELD.getPreferredName(), indices);
}
if (types != null) {
builder.array(TYPES_FIELD.getPreferredName(), types);
}
if (searchSource != null) {
builder.rawField(BODY_FIELD.getPreferredName(), searchSource);
}
if (indicesOptions != DEFAULT_INDICES_OPTIONS) {
builder.startObject(INDICES_OPTIONS_FIELD.getPreferredName());
String value;
if (indicesOptions.expandWildcardsClosed() && indicesOptions.expandWildcardsOpen()) {
value = "all";
} else if (indicesOptions.expandWildcardsOpen()) {
value = "open";
} else if (indicesOptions.expandWildcardsClosed()) {
value = "closed";
} else {
value = "none";
}
builder.field(EXPAND_WILDCARDS_FIELD.getPreferredName(), value);
builder.field(IGNORE_UNAVAILABLE_FIELD.getPreferredName(), indicesOptions.ignoreUnavailable());
builder.field(ALLOW_NO_INDICES_FIELD.getPreferredName(), indicesOptions.allowNoIndices());
builder.endObject();
}
if (template != null) {
builder.field(TEMPLATE_FIELD.getPreferredName(), template);
@ -100,11 +158,12 @@ public class WatcherSearchTemplateRequest implements ToXContent {
/**
* Reads a new watcher search request instance for the specified parser.
*/
public static WatcherSearchTemplateRequest fromXContent(XContentParser parser, SearchType searchType, QueryParseContext context,
AggregatorParsers aggParsers, Suggesters suggesters)
public static WatcherSearchTemplateRequest fromXContent(XContentParser parser, SearchType searchType)
throws IOException {
List<String> indices = new ArrayList<>();
List<String> types = new ArrayList<>();
IndicesOptions indicesOptions = DEFAULT_INDICES_OPTIONS;
SearchRequest searchRequest = new SearchRequest();
BytesReference searchSource = null;
WatcherScript template = null;
XContentParser.Token token;
@ -112,12 +171,8 @@ public class WatcherSearchTemplateRequest implements ToXContent {
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
currentFieldName = parser.currentName();
if (ParseFieldMatcher.STRICT.match(currentFieldName, BODY_FIELD)) {
searchRequest.source(SearchSourceBuilder.fromXContent(context, aggParsers, suggesters));
}
} else if (token == XContentParser.Token.START_ARRAY) {
if (ParseFieldMatcher.STRICT.match(currentFieldName, INDICES_FIELD)) {
List<String> indices = new ArrayList<>();
while ((token = parser.nextToken()) != XContentParser.Token.END_ARRAY) {
if (token == XContentParser.Token.VALUE_STRING) {
indices.add(parser.textOrNull());
@ -126,9 +181,7 @@ public class WatcherSearchTemplateRequest implements ToXContent {
currentFieldName + "] field, but instead found [" + token + "]");
}
}
searchRequest.indices(indices.toArray(new String[indices.size()]));
} else if (ParseFieldMatcher.STRICT.match(currentFieldName, TYPES_FIELD)) {
List<String> types = new ArrayList<>();
while ((token = parser.nextToken()) != XContentParser.Token.END_ARRAY) {
if (token == XContentParser.Token.VALUE_STRING) {
types.add(parser.textOrNull());
@ -137,13 +190,17 @@ public class WatcherSearchTemplateRequest implements ToXContent {
currentFieldName + "] field, but instead found [" + token + "]");
}
}
searchRequest.types(types.toArray(new String[types.size()]));
} else {
throw new ElasticsearchParseException("could not read search request. unexpected array field [" +
currentFieldName + "]");
}
} else if (token == XContentParser.Token.START_OBJECT) {
if (ParseFieldMatcher.STRICT.match(currentFieldName, INDICES_OPTIONS_FIELD)) {
if (ParseFieldMatcher.STRICT.match(currentFieldName, BODY_FIELD)) {
try (XContentBuilder builder = XContentBuilder.builder(parser.contentType().xContent())) {
builder.copyCurrentStructure(parser);
searchSource = builder.bytes();
}
} else if (ParseFieldMatcher.STRICT.match(currentFieldName, INDICES_OPTIONS_FIELD)) {
boolean expandOpen = DEFAULT_INDICES_OPTIONS.expandWildcardsOpen();
boolean expandClosed = DEFAULT_INDICES_OPTIONS.expandWildcardsClosed();
boolean allowNoIndices = DEFAULT_INDICES_OPTIONS.allowNoIndices();
@ -198,10 +255,10 @@ public class WatcherSearchTemplateRequest implements ToXContent {
} else if (token == XContentParser.Token.VALUE_STRING) {
if (ParseFieldMatcher.STRICT.match(currentFieldName, INDICES_FIELD)) {
String indicesStr = parser.text();
searchRequest.indices(Strings.delimitedListToStringArray(indicesStr, ",", " \t"));
indices.addAll(Arrays.asList(Strings.delimitedListToStringArray(indicesStr, ",", " \t")));
} else if (ParseFieldMatcher.STRICT.match(currentFieldName, TYPES_FIELD)) {
String typesStr = parser.text();
searchRequest.types(Strings.delimitedListToStringArray(typesStr, ",", " \t"));
types.addAll(Arrays.asList(Strings.delimitedListToStringArray(typesStr, ",", " \t")));
} else if (ParseFieldMatcher.STRICT.match(currentFieldName, SEARCH_TYPE_FIELD)) {
searchType = SearchType.fromString(parser.text().toLowerCase(Locale.ROOT), ParseFieldMatcher.EMPTY);
} else {
@ -213,12 +270,8 @@ public class WatcherSearchTemplateRequest implements ToXContent {
}
}
if (searchRequest.indices() == null) {
searchRequest.indices(Strings.EMPTY_ARRAY);
}
searchRequest.searchType(searchType);
searchRequest.indicesOptions(indicesOptions);
return new WatcherSearchTemplateRequest(searchRequest, template);
return new WatcherSearchTemplateRequest(indices.toArray(new String[0]), types.toArray(new String[0]), searchType,
indicesOptions, searchSource, template);
}
@Override
@ -226,29 +279,30 @@ public class WatcherSearchTemplateRequest implements ToXContent {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
WatcherSearchTemplateRequest that = (WatcherSearchTemplateRequest) o;
if (!SearchRequestEquivalence.INSTANCE.equivalent(request, that.request)) return false;
return template != null ? template.equals(that.template) : that.template == null;
WatcherSearchTemplateRequest other = (WatcherSearchTemplateRequest) o;
return Arrays.equals(indices, other.indices) &&
Arrays.equals(types, other.types) &&
Objects.equals(searchType, other.searchType) &&
Objects.equals(indicesOptions, other.indicesOptions) &&
Objects.equals(searchSource, other.searchSource) &&
Objects.equals(template, other.template);
}
@Override
public int hashCode() {
int result = request != null ? request.hashCode() : 0;
result = 31 * result + (template != null ? template.hashCode() : 0);
return result;
return Objects.hash(indices, types, searchType, indicesOptions, searchSource, template);
}
static final ParseField INDICES_FIELD = new ParseField("indices");
static final ParseField TYPES_FIELD = new ParseField("types");
static final ParseField BODY_FIELD = new ParseField("body");
static final ParseField SEARCH_TYPE_FIELD = new ParseField("search_type");
static final ParseField INDICES_OPTIONS_FIELD = new ParseField("indices_options");
static final ParseField EXPAND_WILDCARDS_FIELD = new ParseField("expand_wildcards");
static final ParseField IGNORE_UNAVAILABLE_FIELD = new ParseField("ignore_unavailable");
static final ParseField ALLOW_NO_INDICES_FIELD = new ParseField("allow_no_indices");
static final ParseField TEMPLATE_FIELD = new ParseField("template");
private static final ParseField INDICES_FIELD = new ParseField("indices");
private static final ParseField TYPES_FIELD = new ParseField("types");
private static final ParseField BODY_FIELD = new ParseField("body");
private static final ParseField SEARCH_TYPE_FIELD = new ParseField("search_type");
private static final ParseField INDICES_OPTIONS_FIELD = new ParseField("indices_options");
private static final ParseField EXPAND_WILDCARDS_FIELD = new ParseField("expand_wildcards");
private static final ParseField IGNORE_UNAVAILABLE_FIELD = new ParseField("ignore_unavailable");
private static final ParseField ALLOW_NO_INDICES_FIELD = new ParseField("allow_no_indices");
private static final ParseField TEMPLATE_FIELD = new ParseField("template");
public static final IndicesOptions DEFAULT_INDICES_OPTIONS = IndicesOptions.lenientExpandOpen();
}

View File

@ -11,36 +11,27 @@ import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.query.QueryParseContext;
import org.elasticsearch.indices.query.IndicesQueriesRegistry;
import org.elasticsearch.script.CompiledScript;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.search.SearchRequestParsers;
import org.elasticsearch.search.aggregations.AggregatorParsers;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.suggest.Suggesters;
import org.elasticsearch.xpack.watcher.execution.WatchExecutionContext;
import org.elasticsearch.xpack.watcher.support.WatcherScript;
import org.elasticsearch.xpack.watcher.support.Variables;
import org.elasticsearch.xpack.watcher.support.WatcherScript;
import org.elasticsearch.xpack.watcher.watch.Payload;
import java.io.IOException;
import java.util.Collections;
import java.util.Map;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
/**
* {@link WatcherSearchTemplateService} renders {@link WatcherSearchTemplateRequest} before their execution.
*/
public class WatcherSearchTemplateService extends AbstractComponent {
private static final String DEFAULT_LANG = "mustache";
private final ScriptService scriptService;
private final ParseFieldMatcher parseFieldMatcher;
private final SearchRequestParsers searchRequestParsers;
@ -53,71 +44,46 @@ public class WatcherSearchTemplateService extends AbstractComponent {
this.parseFieldMatcher = new ParseFieldMatcher(settings);
}
public SearchRequest createSearchRequestFromPrototype(WatcherSearchTemplateRequest prototype, WatchExecutionContext ctx,
Payload payload) throws IOException {
SearchRequest request = new SearchRequest()
.indicesOptions(prototype.getRequest().indicesOptions())
.searchType(prototype.getRequest().searchType())
.indices(prototype.getRequest().indices())
.types(prototype.getRequest().types());
WatcherScript template = null;
public BytesReference renderTemplate(WatcherScript templatePrototype,
WatchExecutionContext ctx,
Payload payload) throws IOException {
// Due the inconsistency with templates in ES 1.x, we maintain our own template format.
// This template format we use now, will become the template structure in ES 2.0
Map<String, Object> watcherContextParams = Variables.createCtxModel(ctx, payload);
// Here we convert a watch search request body into an inline search template,
// this way if any Watcher related context variables are used, they will get resolved.
if (prototype.getRequest().source() != null) {
try (XContentBuilder builder = jsonBuilder()) {
prototype.getRequest().source().toXContent(builder, ToXContent.EMPTY_PARAMS);
template = WatcherScript.inline(builder.string()).lang(DEFAULT_LANG).params(watcherContextParams).build();
}
} else if (prototype.getTemplate() != null) {
// Here we convert watcher template into a ES core templates. Due to the different format we use, we
// convert to the template format used in ES core
WatcherScript templatePrototype = prototype.getTemplate();
if (templatePrototype.params() != null) {
watcherContextParams.putAll(templatePrototype.params());
}
WatcherScript.Builder builder;
if (templatePrototype.type() == ScriptService.ScriptType.INLINE) {
builder = WatcherScript.inline(templatePrototype.script());
} else if (templatePrototype.type() == ScriptService.ScriptType.FILE) {
builder = WatcherScript.file(templatePrototype.script());
} else if (templatePrototype.type() == ScriptService.ScriptType.STORED) {
builder = WatcherScript.indexed(templatePrototype.script());
} else {
builder = WatcherScript.defaultType(templatePrototype.script());
}
template = builder.lang(templatePrototype.lang()).params(watcherContextParams).build();
// Here we convert watcher template into a ES core templates. Due to the different format we use, we
// convert to the template format used in ES core
if (templatePrototype.params() != null) {
watcherContextParams.putAll(templatePrototype.params());
}
request.source(convert(template));
return request;
WatcherScript.Builder builder;
if (templatePrototype.type() == ScriptService.ScriptType.INLINE) {
builder = WatcherScript.inline(templatePrototype.script());
} else if (templatePrototype.type() == ScriptService.ScriptType.FILE) {
builder = WatcherScript.file(templatePrototype.script());
} else if (templatePrototype.type() == ScriptService.ScriptType.STORED) {
builder = WatcherScript.indexed(templatePrototype.script());
} else {
builder = WatcherScript.defaultType(templatePrototype.script());
}
WatcherScript template = builder.lang(templatePrototype.lang()).params(watcherContextParams).build();
CompiledScript compiledScript = scriptService.compile(template.toScript(), WatcherScript.CTX, Collections.emptyMap());
return (BytesReference) scriptService.executable(compiledScript, template.params()).run();
}
/**
* Converts a {@link WatcherScript} to a {@link org.elasticsearch.search.builder.SearchSourceBuilder}
*/
private SearchSourceBuilder convert(WatcherScript template) throws IOException {
public SearchRequest toSearchRequest(WatcherSearchTemplateRequest request) throws IOException {
SearchRequest searchRequest = new SearchRequest(request.getIndices());
searchRequest.types(request.getTypes());
searchRequest.searchType(request.getSearchType());
searchRequest.indicesOptions(request.getIndicesOptions());
SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource();
if (template == null) {
// falling back to an empty body
return sourceBuilder;
}
CompiledScript compiledScript = scriptService.compile(template.toScript(), WatcherScript.CTX, Collections.emptyMap());
BytesReference source = (BytesReference) scriptService.executable(compiledScript, template.params()).run();
BytesReference source = request.getSearchSource();
if (source != null && source.length() > 0) {
try (XContentParser parser = XContentFactory.xContent(source).createParser(source)) {
sourceBuilder.parseXContent(new QueryParseContext(searchRequestParsers.queryParsers, parser, parseFieldMatcher),
searchRequestParsers.aggParsers, searchRequestParsers.suggesters);
searchRequestParsers.aggParsers, searchRequestParsers.suggesters);
searchRequest.source(sourceBuilder);
}
}
return sourceBuilder;
return searchRequest;
}
}

View File

@ -5,16 +5,12 @@
*/
package org.elasticsearch.xpack.watcher.transform;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.xpack.watcher.support.WatcherScript;
import org.elasticsearch.xpack.watcher.support.search.WatcherSearchTemplateRequest;
import org.elasticsearch.xpack.watcher.transform.chain.ChainTransform;
import org.elasticsearch.xpack.watcher.transform.script.ScriptTransform;
import org.elasticsearch.xpack.watcher.transform.search.SearchTransform;
/**
*
*/
public final class TransformBuilders {
private TransformBuilders() {
@ -24,10 +20,6 @@ public final class TransformBuilders {
return SearchTransform.builder(request);
}
public static SearchTransform.Builder searchTransform(SearchRequest request) {
return searchTransform(new WatcherSearchTemplateRequest(request));
}
public static ScriptTransform.Builder scriptTransform(String script) {
return scriptTransform(WatcherScript.inline(script));
}

View File

@ -5,21 +5,20 @@
*/
package org.elasticsearch.xpack.watcher.transform.search;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.xpack.watcher.execution.WatchExecutionContext;
import org.elasticsearch.xpack.watcher.support.WatcherScript;
import org.elasticsearch.xpack.watcher.support.init.proxy.WatcherClientProxy;
import org.elasticsearch.xpack.watcher.support.search.WatcherSearchTemplateRequest;
import org.elasticsearch.xpack.watcher.support.search.WatcherSearchTemplateService;
import org.elasticsearch.xpack.watcher.transform.ExecutableTransform;
import org.elasticsearch.xpack.watcher.watch.Payload;
/**
*
*/
public class ExecutableSearchTransform extends ExecutableTransform<SearchTransform, SearchTransform.Result> {
public static final SearchType DEFAULT_SEARCH_TYPE = SearchType.QUERY_THEN_FETCH;
@ -38,10 +37,13 @@ public class ExecutableSearchTransform extends ExecutableTransform<SearchTransfo
@Override
public SearchTransform.Result execute(WatchExecutionContext ctx, Payload payload) {
SearchRequest request = null;
WatcherSearchTemplateRequest request = null;
try {
request = searchTemplateService.createSearchRequestFromPrototype(transform.getRequest(), ctx, payload);
SearchResponse resp = client.search(request, timeout);
WatcherScript template = transform.getRequest().getOrCreateTemplate();
BytesReference renderedTemplate = searchTemplateService.renderTemplate(template, ctx, payload);
// We need to make a copy, so that we don't modify the original instance that we keep around in a watch:
request = new WatcherSearchTemplateRequest(transform.getRequest(), renderedTemplate);
SearchResponse resp = client.search(searchTemplateService.toSearchRequest(request), timeout);
return new SearchTransform.Result(request, new Payload.XContent(resp));
} catch (Exception e) {
logger.error("failed to execute [{}] transform for [{}]", e, SearchTransform.TYPE, ctx.id());

View File

@ -6,7 +6,6 @@
package org.elasticsearch.xpack.watcher.transform.search;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.ParseFieldMatcher;
@ -24,9 +23,6 @@ import org.joda.time.DateTimeZone;
import java.io.IOException;
/**
*
*/
public class SearchTransform implements Transform {
public static final String TYPE = "search";
@ -108,8 +104,7 @@ public class SearchTransform implements Transform {
currentFieldName = parser.currentName();
} else if (ParseFieldMatcher.STRICT.match(currentFieldName, Field.REQUEST)) {
try {
request = WatcherSearchTemplateRequest.fromXContent(parser, ExecutableSearchTransform.DEFAULT_SEARCH_TYPE, context,
aggParsers, suggesters);
request = WatcherSearchTemplateRequest.fromXContent(parser, ExecutableSearchTransform.DEFAULT_SEARCH_TYPE);
} catch (ElasticsearchParseException srpe) {
throw new ElasticsearchParseException("could not parse [{}] transform for watch [{}]. failed to parse [{}]", srpe,
TYPE, watchId, currentFieldName);
@ -142,19 +137,19 @@ public class SearchTransform implements Transform {
public static class Result extends Transform.Result {
@Nullable private final SearchRequest request;
@Nullable private final WatcherSearchTemplateRequest request;
public Result(SearchRequest request, Payload payload) {
public Result(WatcherSearchTemplateRequest request, Payload payload) {
super(TYPE, payload);
this.request = request;
}
public Result(SearchRequest request, Exception e) {
public Result(WatcherSearchTemplateRequest request, Exception e) {
super(TYPE, e);
this.request = request;
}
public SearchRequest executedRequest() {
public WatcherSearchTemplateRequest executedRequest() {
return request;
}
@ -162,7 +157,7 @@ public class SearchTransform implements Transform {
protected XContentBuilder typeXContent(XContentBuilder builder, Params params) throws IOException {
if (request != null) {
builder.startObject(type);
builder.field(Field.REQUEST.getPreferredName(), new WatcherSearchTemplateRequest(request));
builder.field(Field.REQUEST.getPreferredName(), request);
builder.endObject();
}
return builder;

View File

@ -6,8 +6,6 @@
package org.elasticsearch.watcher.test.integration;
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.xcontent.XContentBuilder;
@ -16,6 +14,7 @@ import org.elasticsearch.search.sort.SortBuilders;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.xpack.watcher.client.WatchSourceBuilder;
import org.elasticsearch.xpack.watcher.input.Input;
import org.elasticsearch.xpack.watcher.support.search.WatcherSearchTemplateRequest;
import org.elasticsearch.xpack.watcher.support.xcontent.XContentSource;
import org.elasticsearch.xpack.watcher.test.AbstractWatcherIntegrationTestCase;
import org.elasticsearch.xpack.watcher.transport.actions.put.PutWatchResponse;
@ -30,7 +29,7 @@ import static org.elasticsearch.xpack.watcher.client.WatchSourceBuilders.watchBu
import static org.elasticsearch.xpack.watcher.input.InputBuilders.chainInput;
import static org.elasticsearch.xpack.watcher.input.InputBuilders.searchInput;
import static org.elasticsearch.xpack.watcher.input.InputBuilders.simpleInput;
import static org.elasticsearch.xpack.watcher.test.WatcherTestUtils.newInputSearchRequest;
import static org.elasticsearch.xpack.watcher.test.WatcherTestUtils.templateRequest;
import static org.elasticsearch.xpack.watcher.trigger.TriggerBuilders.schedule;
import static org.elasticsearch.xpack.watcher.trigger.schedule.Schedules.interval;
import static org.hamcrest.Matchers.is;
@ -48,9 +47,9 @@ public class HistoryIntegrationTests extends AbstractWatcherIntegrationTestCase
.trigger(schedule(interval("10s")))
.addAction("logging", loggingAction("foo"));
SearchRequestBuilder searchRequestBuilder = client().prepareSearch("foo").addSort(SortBuilders.fieldSort("inner.date").order
(SortOrder.DESC));
builder.input(chainInput().add("first", searchInput(searchRequestBuilder.request())));
builder.input(chainInput().add("first", searchInput(
templateRequest(searchSource().sort(SortBuilders.fieldSort("inner.date").order(SortOrder.DESC)), "foo")))
);
PutWatchResponse response = watcherClient().preparePutWatch("test_watch").setSource(builder).get();
assertThat(response.isCreated(), is(true));
@ -64,15 +63,14 @@ public class HistoryIntegrationTests extends AbstractWatcherIntegrationTestCase
// See https://github.com/elastic/x-plugins/issues/2913
public void testFailedInputResultWithDotsInFieldNameGetsStored() throws Exception {
SearchRequest sortSearchRequest = newInputSearchRequest("non-existing-index")
.source(searchSource()
.query(matchAllQuery())
.sort("trigger_event.triggered_time", SortOrder.DESC)
.size(1));
WatcherSearchTemplateRequest request = templateRequest(searchSource()
.query(matchAllQuery())
.sort("trigger_event.triggered_time", SortOrder.DESC)
.size(1), "non-existing-index");
// The result of the search input will be a failure, because a missing index does not exist when
// the query is executed
Input.Builder input = searchInput(sortSearchRequest);
Input.Builder input = searchInput(request);
// wrapping this randomly into a chained input to test this as well
boolean useChained = randomBoolean();
if (useChained) {

View File

@ -8,6 +8,7 @@ package org.elasticsearch.xpack.watcher.actions;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.xpack.watcher.client.WatcherClient;
import org.elasticsearch.xpack.watcher.condition.compare.CompareCondition;
import org.elasticsearch.xpack.watcher.execution.ExecutionState;
@ -26,15 +27,13 @@ import static org.elasticsearch.xpack.watcher.actions.ActionBuilders.indexAction
import static org.elasticsearch.xpack.watcher.client.WatchSourceBuilders.watchBuilder;
import static org.elasticsearch.xpack.watcher.condition.ConditionBuilders.compareCondition;
import static org.elasticsearch.xpack.watcher.input.InputBuilders.searchInput;
import static org.elasticsearch.xpack.watcher.test.WatcherTestUtils.matchAllRequest;
import static org.elasticsearch.xpack.watcher.test.WatcherTestUtils.templateRequest;
import static org.elasticsearch.xpack.watcher.transform.TransformBuilders.searchTransform;
import static org.elasticsearch.xpack.watcher.trigger.TriggerBuilders.schedule;
import static org.elasticsearch.xpack.watcher.trigger.schedule.Schedules.interval;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.is;
/**
*/
public class TimeThrottleIntegrationTests extends AbstractWatcherIntegrationTestCase {
private IndexResponse indexTestDoc() {
@ -62,9 +61,9 @@ public class TimeThrottleIntegrationTests extends AbstractWatcherIntegrationTest
.setId("_name")
.setSource(watchBuilder()
.trigger(schedule(interval("5s")))
.input(searchInput(matchAllRequest().indices("events")))
.input(searchInput(templateRequest(new SearchSourceBuilder(), "events")))
.condition(compareCondition("ctx.payload.hits.total", CompareCondition.Op.GT, 0L))
.transform(searchTransform(matchAllRequest().indices("events")))
.transform(searchTransform(templateRequest(new SearchSourceBuilder(), "events")))
.addAction("_id", indexAction("actions", "action"))
.defaultThrottlePeriod(TimeValue.timeValueSeconds(30)))
.get();
@ -136,9 +135,9 @@ public class TimeThrottleIntegrationTests extends AbstractWatcherIntegrationTest
.setId("_name")
.setSource(watchBuilder()
.trigger(schedule(interval("1s")))
.input(searchInput(matchAllRequest().indices("events")))
.input(searchInput(templateRequest(new SearchSourceBuilder(), "events")))
.condition(compareCondition("ctx.payload.hits.total", CompareCondition.Op.GT, 0L))
.transform(searchTransform(matchAllRequest().indices("events")))
.transform(searchTransform(templateRequest(new SearchSourceBuilder(), "events")))
.addAction("_id", indexAction("actions", "action")))
.get();
assertThat(putWatchResponse.isCreated(), is(true));

View File

@ -8,7 +8,6 @@ package org.elasticsearch.xpack.watcher.actions.email;
import com.squareup.okhttp.mockwebserver.MockResponse;
import com.squareup.okhttp.mockwebserver.MockWebServer;
import com.squareup.okhttp.mockwebserver.QueueDispatcher;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.settings.Settings;
@ -25,6 +24,7 @@ import org.elasticsearch.xpack.watcher.client.WatcherClient;
import org.elasticsearch.xpack.watcher.condition.compare.CompareCondition;
import org.elasticsearch.xpack.common.http.HttpRequestTemplate;
import org.elasticsearch.xpack.common.http.Scheme;
import org.elasticsearch.xpack.watcher.support.search.WatcherSearchTemplateRequest;
import org.elasticsearch.xpack.watcher.test.AbstractWatcherIntegrationTestCase;
import org.elasticsearch.xpack.watcher.trigger.schedule.IntervalSchedule;
import org.junit.After;
@ -51,7 +51,7 @@ import static org.elasticsearch.xpack.notification.email.DataAttachment.YAML;
import static org.elasticsearch.xpack.watcher.client.WatchSourceBuilders.watchBuilder;
import static org.elasticsearch.xpack.watcher.condition.ConditionBuilders.compareCondition;
import static org.elasticsearch.xpack.watcher.input.InputBuilders.searchInput;
import static org.elasticsearch.xpack.watcher.test.WatcherTestUtils.newInputSearchRequest;
import static org.elasticsearch.xpack.watcher.test.WatcherTestUtils.templateRequest;
import static org.elasticsearch.xpack.watcher.trigger.TriggerBuilders.schedule;
import static org.elasticsearch.xpack.watcher.trigger.schedule.Schedules.interval;
import static org.hamcrest.Matchers.allOf;
@ -164,7 +164,7 @@ public class EmailAttachmentTests extends AbstractWatcherIntegrationTestCase {
// Have a sample document in the index, the watch is going to evaluate
client().prepareIndex("idx", "type").setSource("field", "value").get();
refresh();
SearchRequest searchRequest = newInputSearchRequest("idx").source(searchSource().query(matchAllQuery()));
WatcherSearchTemplateRequest request = templateRequest(searchSource().query(matchAllQuery()), "idx");
List<EmailAttachmentParser.EmailAttachment> attachments = new ArrayList<>();
@ -188,7 +188,7 @@ public class EmailAttachmentTests extends AbstractWatcherIntegrationTestCase {
EmailTemplate.Builder emailBuilder = EmailTemplate.builder().from("_from").to("_to").subject("Subject");
WatchSourceBuilder watchSourceBuilder = watchBuilder()
.trigger(schedule(interval(5, IntervalSchedule.Interval.Unit.SECONDS)))
.input(searchInput(searchRequest))
.input(searchInput(request))
.condition(compareCondition("ctx.payload.hits.total", CompareCondition.Op.GT, 0L))
.addAction("_email", emailAction(emailBuilder).setAuthentication(USERNAME, PASSWORD.toCharArray())
.setAttachments(emailAttachments));

View File

@ -5,16 +5,16 @@
*/
package org.elasticsearch.xpack.watcher.history;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.xpack.watcher.execution.ExecutionState;
import org.elasticsearch.xpack.watcher.support.search.WatcherSearchTemplateRequest;
import org.elasticsearch.xpack.watcher.test.AbstractWatcherIntegrationTestCase;
import org.elasticsearch.xpack.watcher.transport.actions.put.PutWatchResponse;
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
import static org.elasticsearch.search.aggregations.AggregationBuilders.terms;
import static org.elasticsearch.search.builder.SearchSourceBuilder.searchSource;
import static org.elasticsearch.xpack.watcher.actions.ActionBuilders.loggingAction;
@ -50,10 +50,13 @@ public class HistoryTemplateSearchInputMappingsTests extends AbstractWatcherInte
flush();
refresh();
WatcherSearchTemplateRequest request = new WatcherSearchTemplateRequest(
new String[]{index}, new String[]{type}, SearchType.QUERY_AND_FETCH,
WatcherSearchTemplateRequest.DEFAULT_INDICES_OPTIONS, new BytesArray("{}")
);
PutWatchResponse putWatchResponse = watcherClient().preparePutWatch("_id").setSource(watchBuilder()
.trigger(schedule(interval("5s")))
.input(searchInput(new SearchRequest().indices(index).types(type).searchType(SearchType.QUERY_AND_FETCH)
.source(searchSource().query(matchAllQuery()))))
.input(searchInput(request))
.condition(alwaysCondition())
.addAction("logger", loggingAction("indexed")))
.get();

View File

@ -5,10 +5,8 @@
*/
package org.elasticsearch.xpack.watcher.support;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.common.util.Callback;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.xpack.watcher.client.WatcherClient;
import org.elasticsearch.xpack.watcher.test.AbstractWatcherIntegrationTestCase;
import org.elasticsearch.xpack.watcher.transport.actions.put.PutWatchResponse;
@ -24,13 +22,12 @@ import static org.elasticsearch.xpack.watcher.client.WatchSourceBuilders.watchBu
import static org.elasticsearch.xpack.watcher.condition.ConditionBuilders.alwaysCondition;
import static org.elasticsearch.xpack.watcher.input.InputBuilders.searchInput;
import static org.elasticsearch.xpack.watcher.input.InputBuilders.simpleInput;
import static org.elasticsearch.xpack.watcher.test.WatcherTestUtils.templateRequest;
import static org.elasticsearch.xpack.watcher.transform.TransformBuilders.searchTransform;
import static org.elasticsearch.xpack.watcher.trigger.TriggerBuilders.schedule;
import static org.elasticsearch.xpack.watcher.trigger.schedule.Schedules.interval;
import static org.hamcrest.Matchers.is;
/**
*/
public class DynamicIndexNameIntegrationTests extends AbstractWatcherIntegrationTestCase {
@Override
protected boolean timeWarped() {
@ -61,14 +58,11 @@ public class DynamicIndexNameIntegrationTests extends AbstractWatcherIntegration
final String indexName = "idx-" + DateTimeFormat.forPattern("YYYY.MM.dd").print(timeWarp().clock().nowUTC());
logger.info("checking index [{}]", indexName);
assertBusy(new Runnable() {
@Override
public void run() {
flush();
refresh();
long docCount = docCount(indexName, "type", matchAllQuery());
assertThat(docCount, is(1L));
}
assertBusy(() -> {
flush();
refresh();
long docCount = docCount(indexName, "type", matchAllQuery());
assertThat(docCount, is(1L));
});
}
@ -84,7 +78,7 @@ public class DynamicIndexNameIntegrationTests extends AbstractWatcherIntegration
PutWatchResponse putWatchResponse = watcherClient.preparePutWatch("_id")
.setSource(watchBuilder()
.trigger(schedule(interval(5, IntervalSchedule.Interval.Unit.SECONDS)))
.input(searchInput(new SearchRequest(indexNameDateMathExpressions).types("type"))))
.input(searchInput(templateRequest(new SearchSourceBuilder(), indexNameDateMathExpressions))))
.get();
assertThat(putWatchResponse.isCreated(), is(true));
@ -110,7 +104,7 @@ public class DynamicIndexNameIntegrationTests extends AbstractWatcherIntegration
PutWatchResponse putWatchResponse = watcherClient.preparePutWatch("_id")
.setSource(watchBuilder()
.trigger(schedule(interval(5, IntervalSchedule.Interval.Unit.SECONDS)))
.transform(searchTransform(new SearchRequest(indexNameDateMathExpressions).types("type")))
.transform(searchTransform(templateRequest(new SearchSourceBuilder(), indexNameDateMathExpressions)))
.addAction("log", loggingAction("heya")))
.get();
@ -120,12 +114,8 @@ public class DynamicIndexNameIntegrationTests extends AbstractWatcherIntegration
flush();
refresh();
SearchResponse response = searchWatchRecords(new Callback<SearchRequestBuilder>() {
@Override
public void handle(SearchRequestBuilder builder) {
builder.setQuery(matchQuery("result.transform.search.request.indices", indexNameDateMathExpressions));
}
});
SearchResponse response = searchWatchRecords(builder ->
builder.setQuery(matchQuery("result.transform.search.request.indices", indexNameDateMathExpressions)));
assertThat(response.getHits().getTotalHits(), is(1L));
}
}

View File

@ -5,11 +5,8 @@
*/
package org.elasticsearch.xpack.watcher.support;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.common.ParseFieldMatcher;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.unit.TimeValue;
@ -18,11 +15,7 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.MatchAllQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.query.QueryParseContext;
import org.elasticsearch.index.query.QueryParser;
import org.elasticsearch.indices.query.IndicesQueriesRegistry;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.support.clock.SystemClock;
@ -45,11 +38,8 @@ import static org.hamcrest.Matchers.arrayContainingInAnyOrder;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasEntry;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
/**
*
*/
public class WatcherUtilsTests extends ESTestCase {
public void testFlattenModel() throws Exception {
DateTime now = SystemClock.INSTANCE.nowUTC();
@ -96,22 +86,15 @@ public class WatcherUtilsTests extends ESTestCase {
}
public void testSerializeSearchRequest() throws Exception {
String[] randomIndices = generateRandomStringArray(5, 5, false);
SearchRequest expectedRequest = new SearchRequest(randomIndices);
String[] expectedIndices = generateRandomStringArray(5, 5, true);
String[] expectedTypes = generateRandomStringArray(2, 5, true);
IndicesOptions expectedIndicesOptions = IndicesOptions.fromOptions(randomBoolean(), randomBoolean(), randomBoolean(),
randomBoolean(), WatcherSearchTemplateRequest.DEFAULT_INDICES_OPTIONS);
SearchType expectedSearchType = getRandomSupportedSearchType();
BytesReference expectedSource = null;
WatcherScript expectedTemplate = null;
if (randomBoolean()) {
String[] randomTypes = generateRandomStringArray(2, 5, false);
expectedRequest.types(randomTypes);
}
expectedRequest.indicesOptions(IndicesOptions.fromOptions(randomBoolean(), randomBoolean(), randomBoolean(),
randomBoolean(), WatcherSearchTemplateRequest.DEFAULT_INDICES_OPTIONS));
expectedRequest.searchType(getRandomSupportedSearchType());
SearchSourceBuilder searchSourceBuilder = SearchSourceBuilder.searchSource().query(QueryBuilders.matchAllQuery()).size(11);
expectedRequest.source(searchSourceBuilder);
WatcherSearchTemplateRequest request;
if (randomBoolean()) {
Map<String, Object> params = new HashMap<>();
if (randomBoolean()) {
@ -123,28 +106,33 @@ public class WatcherUtilsTests extends ESTestCase {
String text = randomAsciiOfLengthBetween(1, 5);
expectedTemplate = randomFrom(WatcherScript.inline(text), WatcherScript.file(text),
WatcherScript.indexed(text)).params(params).build();
request = new WatcherSearchTemplateRequest(expectedIndices, expectedTypes, expectedSearchType,
expectedIndicesOptions, expectedTemplate);
} else {
SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource().query(QueryBuilders.matchAllQuery()).size(11);
XContentBuilder builder = jsonBuilder();
builder.value(sourceBuilder);
expectedSource = builder.bytes();
request = new WatcherSearchTemplateRequest(expectedIndices, expectedTypes, expectedSearchType,
expectedIndicesOptions, expectedSource);
}
WatcherSearchTemplateRequest request = new WatcherSearchTemplateRequest(expectedRequest, expectedTemplate);
XContentBuilder builder = jsonBuilder();
request.toXContent(builder, ToXContent.EMPTY_PARAMS);
XContentParser parser = XContentHelper.createParser(builder.bytes());
assertThat(parser.nextToken(), equalTo(XContentParser.Token.START_OBJECT));
IndicesQueriesRegistry registry = new IndicesQueriesRegistry();
QueryParser<MatchAllQueryBuilder> queryParser = MatchAllQueryBuilder::fromXContent;
registry.register(queryParser, MatchAllQueryBuilder.NAME);
QueryParseContext context = new QueryParseContext(registry, parser, ParseFieldMatcher.STRICT);
WatcherSearchTemplateRequest result = WatcherSearchTemplateRequest.fromXContent(parser, DEFAULT_SEARCH_TYPE, context, null, null);
WatcherSearchTemplateRequest result = WatcherSearchTemplateRequest.fromXContent(parser, DEFAULT_SEARCH_TYPE);
assertThat(result.getRequest(), is(notNullValue()));
assertThat(result.getRequest().indices(), arrayContainingInAnyOrder(expectedRequest.indices()));
assertThat(result.getRequest().types(), arrayContainingInAnyOrder(expectedRequest.types()));
assertThat(result.getRequest().indicesOptions(), equalTo(expectedRequest.indicesOptions()));
assertThat(result.getRequest().searchType(), equalTo(expectedRequest.searchType()));
assertThat(result.getRequest().source(), equalTo(searchSourceBuilder));
assertThat(result.getIndices(), arrayContainingInAnyOrder(expectedIndices != null ? expectedIndices : new String[0]));
assertThat(result.getTypes(), arrayContainingInAnyOrder(expectedTypes != null ? expectedTypes : new String[0]));
assertThat(result.getIndicesOptions(), equalTo(expectedIndicesOptions));
assertThat(result.getSearchType(), equalTo(expectedSearchType));
if (expectedSource == null) {
assertThat(result.getTemplate(), equalTo(expectedTemplate));
} else {
assertThat(result.getTemplate().script(), equalTo(expectedSource.utf8ToString()));
}
assertThat(result.getTemplate(), equalTo(expectedTemplate));
}
public void testDeserializeSearchRequest() throws Exception {
@ -192,9 +180,8 @@ public class WatcherUtilsTests extends ESTestCase {
}
BytesReference source = null;
SearchSourceBuilder searchSourceBuilder = null;
if (randomBoolean()) {
searchSourceBuilder = SearchSourceBuilder.searchSource().query(QueryBuilders.matchAllQuery()).size(11);
SearchSourceBuilder searchSourceBuilder = SearchSourceBuilder.searchSource().query(QueryBuilders.matchAllQuery()).size(11);
XContentBuilder searchSourceJsonBuilder = jsonBuilder();
searchSourceBuilder.toXContent(searchSourceJsonBuilder, ToXContent.EMPTY_PARAMS);
source = searchSourceBuilder.buildAsBytes(XContentType.JSON);
@ -218,18 +205,17 @@ public class WatcherUtilsTests extends ESTestCase {
XContentParser parser = XContentHelper.createParser(builder.bytes());
assertThat(parser.nextToken(), equalTo(XContentParser.Token.START_OBJECT));
IndicesQueriesRegistry registry = new IndicesQueriesRegistry();
QueryParser<MatchAllQueryBuilder> queryParser = MatchAllQueryBuilder::fromXContent;
registry.register(queryParser, MatchAllQueryBuilder.NAME);
QueryParseContext context = new QueryParseContext(registry, parser, ParseFieldMatcher.STRICT);
WatcherSearchTemplateRequest result = WatcherSearchTemplateRequest.fromXContent(parser, DEFAULT_SEARCH_TYPE, context, null, null);
WatcherSearchTemplateRequest result = WatcherSearchTemplateRequest.fromXContent(parser, DEFAULT_SEARCH_TYPE);
assertThat(result.getRequest(), is(notNullValue()));
assertThat(result.getRequest().indices(), arrayContainingInAnyOrder(indices));
assertThat(result.getRequest().types(), arrayContainingInAnyOrder(types));
assertThat(result.getRequest().indicesOptions(), equalTo(indicesOptions));
assertThat(result.getRequest().searchType(), equalTo(searchType));
assertThat(result.getRequest().source(), equalTo(searchSourceBuilder));
assertThat(result.getIndices(), arrayContainingInAnyOrder(indices));
assertThat(result.getTypes(), arrayContainingInAnyOrder(types));
assertThat(result.getIndicesOptions(), equalTo(indicesOptions));
assertThat(result.getSearchType(), equalTo(searchType));
if (source == null) {
assertThat(result.getSearchSource(), nullValue());
} else {
assertThat(result.getSearchSource().utf8ToString(), equalTo(source.utf8ToString()));
}
assertThat(result.getTemplate(), equalTo(template));
}

View File

@ -47,7 +47,6 @@ import org.elasticsearch.xpack.watcher.actions.webhook.WebhookAction;
import org.elasticsearch.xpack.watcher.condition.always.ExecutableAlwaysCondition;
import org.elasticsearch.xpack.watcher.execution.WatchExecutionContext;
import org.elasticsearch.xpack.watcher.execution.Wid;
import org.elasticsearch.xpack.watcher.input.search.ExecutableSearchInput;
import org.elasticsearch.xpack.watcher.input.simple.ExecutableSimpleInput;
import org.elasticsearch.xpack.watcher.input.simple.SimpleInput;
import org.elasticsearch.xpack.watcher.support.WatcherScript;
@ -79,15 +78,13 @@ import java.util.Map;
import static com.carrotsearch.randomizedtesting.RandomizedTest.randomInt;
import static java.util.Collections.emptyMap;
import static org.apache.lucene.util.LuceneTestCase.createTempDir;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
import static org.elasticsearch.search.builder.SearchSourceBuilder.searchSource;
import static org.elasticsearch.test.ESTestCase.randomFrom;
import static org.joda.time.DateTimeZone.UTC;
import static org.junit.Assert.assertThat;
/**
*
*/
public final class WatcherTestUtils {
public static final Payload EMPTY_PAYLOAD = new Payload.Simple(emptyMap());
@ -112,12 +109,20 @@ public final class WatcherTestUtils {
return builder.contentType().xContent().createParser(builder.bytes());
}
public static SearchRequest newInputSearchRequest(String... indices) {
SearchRequest request = new SearchRequest();
request.indices(indices);
request.indicesOptions(WatcherSearchTemplateRequest.DEFAULT_INDICES_OPTIONS);
request.searchType(ExecutableSearchInput.DEFAULT_SEARCH_TYPE);
return request;
public static WatcherSearchTemplateRequest templateRequest(SearchSourceBuilder sourceBuilder, String... indices) {
return templateRequest(sourceBuilder, SearchType.DEFAULT, indices);
}
public static WatcherSearchTemplateRequest templateRequest(SearchSourceBuilder sourceBuilder, SearchType searchType,
String... indices) {
try {
XContentBuilder xContentBuilder = jsonBuilder();
xContentBuilder.value(sourceBuilder);
return new WatcherSearchTemplateRequest(indices, new String[0], searchType,
WatcherSearchTemplateRequest.DEFAULT_INDICES_OPTIONS, xContentBuilder.bytes());
} catch (IOException e) {
throw new RuntimeException(e);
}
}
public static SearchRequest matchAllRequest() {
@ -177,10 +182,8 @@ public final class WatcherTestUtils {
public static Watch createTestWatch(String watchName, WatcherClientProxy client, HttpClient httpClient, EmailService emailService,
WatcherSearchTemplateService searchTemplateService, ESLogger logger) throws AddressException {
SearchRequest conditionRequest = newInputSearchRequest("my-condition-index").source(searchSource().query(matchAllQuery()));
SearchRequest transformRequest = newInputSearchRequest("my-payload-index").source(searchSource().query(matchAllQuery()));
transformRequest.searchType(ExecutableSearchTransform.DEFAULT_SEARCH_TYPE);
conditionRequest.searchType(ExecutableSearchInput.DEFAULT_SEARCH_TYPE);
WatcherSearchTemplateRequest transformRequest =
templateRequest(searchSource().query(matchAllQuery()), "my-payload-index");
List<ActionWrapper> actions = new ArrayList<>();
@ -224,7 +227,7 @@ public final class WatcherTestUtils {
statuses.put("_webhook", new ActionStatus(now));
statuses.put("_email", new ActionStatus(now));
SearchTransform searchTransform = new SearchTransform(new WatcherSearchTemplateRequest(transformRequest), null, null);
SearchTransform searchTransform = new SearchTransform(transformRequest, null, null);
return new Watch(
watchName,

View File

@ -5,7 +5,6 @@
*/
package org.elasticsearch.xpack.watcher.test.bench;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.inject.Module;
import org.elasticsearch.common.logging.Loggers;
@ -32,6 +31,7 @@ import static org.elasticsearch.xpack.watcher.actions.ActionBuilders.indexAction
import static org.elasticsearch.xpack.watcher.condition.ConditionBuilders.scriptCondition;
import static org.elasticsearch.xpack.watcher.input.InputBuilders.httpInput;
import static org.elasticsearch.xpack.watcher.input.InputBuilders.searchInput;
import static org.elasticsearch.xpack.watcher.test.WatcherTestUtils.templateRequest;
import static org.elasticsearch.xpack.watcher.trigger.TriggerBuilders.schedule;
import static org.elasticsearch.xpack.watcher.trigger.schedule.Schedules.interval;
@ -80,9 +80,7 @@ public class WatcherExecutorServiceBenchmark {
final String name = "_name" + i;
PutWatchRequest putAlertRequest = new PutWatchRequest(name, new WatchSourceBuilder()
.trigger(schedule(interval("5s")))
.input(searchInput(new SearchRequest("test")
.source(new SearchSourceBuilder()))
)
.input(searchInput(templateRequest(new SearchSourceBuilder(), "test")))
.condition(scriptCondition("ctx.payload.hits.total > 0")));
putAlertRequest.setId(name);
watcherClient.putWatch(putAlertRequest).actionGet();
@ -123,10 +121,8 @@ public class WatcherExecutorServiceBenchmark {
final String name = "_name" + i;
PutWatchRequest putAlertRequest = new PutWatchRequest(name, new WatchSourceBuilder()
.trigger(schedule(interval("5s")))
.input(searchInput(new SearchRequest()
.source(new SearchSourceBuilder()))
.extractKeys("hits.total")
)
.input(searchInput(templateRequest(new SearchSourceBuilder(), "test"))
.extractKeys("hits.total"))
.condition(scriptCondition("1 == 1"))
.addAction("_id", indexAction("index", "type")));
putAlertRequest.setId(name);

View File

@ -8,7 +8,6 @@ package org.elasticsearch.xpack.watcher.test.bench;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.Strings;
@ -49,6 +48,7 @@ import static org.elasticsearch.search.aggregations.AggregationBuilders.percenti
import static org.elasticsearch.search.aggregations.AggregationBuilders.terms;
import static org.elasticsearch.xpack.watcher.condition.ConditionBuilders.scriptCondition;
import static org.elasticsearch.xpack.watcher.input.InputBuilders.searchInput;
import static org.elasticsearch.xpack.watcher.test.WatcherTestUtils.templateRequest;
import static org.elasticsearch.xpack.watcher.trigger.TriggerBuilders.schedule;
import static org.elasticsearch.xpack.watcher.trigger.schedule.Schedules.interval;
@ -111,9 +111,7 @@ public class WatcherScheduleEngineBenchmark {
client.prepareIndex(WatchStore.INDEX, WatchStore.DOC_TYPE, id)
.setSource(new WatchSourceBuilder()
.trigger(schedule(interval(interval + "s")))
.input(searchInput(new SearchRequest("test")
.source(new SearchSourceBuilder()))
)
.input(searchInput(templateRequest(new SearchSourceBuilder(), "test")))
.condition(scriptCondition("ctx.payload.hits.total > 0"))
.addAction("logging", ActionBuilders.loggingAction("test").setLevel(LoggingLevel.TRACE))
.buildAsBytes(XContentType.JSON)

View File

@ -6,8 +6,8 @@
package org.elasticsearch.xpack.watcher.test.integration;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.search.builder.SearchSourceBuilder;
@ -42,7 +42,7 @@ import static org.elasticsearch.xpack.watcher.condition.ConditionBuilders.always
import static org.elasticsearch.xpack.watcher.condition.ConditionBuilders.compareCondition;
import static org.elasticsearch.xpack.watcher.input.InputBuilders.searchInput;
import static org.elasticsearch.xpack.watcher.input.InputBuilders.simpleInput;
import static org.elasticsearch.xpack.watcher.test.WatcherTestUtils.newInputSearchRequest;
import static org.elasticsearch.xpack.watcher.test.WatcherTestUtils.templateRequest;
import static org.elasticsearch.xpack.watcher.test.WatcherTestUtils.xContentSource;
import static org.elasticsearch.xpack.watcher.trigger.TriggerBuilders.schedule;
import static org.elasticsearch.xpack.watcher.trigger.schedule.Schedules.cron;
@ -55,8 +55,6 @@ import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
/**
*/
public class BasicWatcherTests extends AbstractWatcherIntegrationTestCase {
@Override
@ -70,11 +68,11 @@ public class BasicWatcherTests extends AbstractWatcherIntegrationTestCase {
// Have a sample document in the index, the watch is going to evaluate
client().prepareIndex("idx", "type").setSource("field", "value").get();
refresh();
SearchRequest searchRequest = newInputSearchRequest("idx").source(searchSource().query(termQuery("field", "value")));
WatcherSearchTemplateRequest request = templateRequest(searchSource().query(termQuery("field", "value")), "idx");
watcherClient.preparePutWatch("_name")
.setSource(watchBuilder()
.trigger(schedule(interval(5, IntervalSchedule.Interval.Unit.SECONDS)))
.input(searchInput(searchRequest))
.input(searchInput(request))
.condition(compareCondition("ctx.payload.hits.total", CompareCondition.Op.EQ, 1L))
.addAction("_logger", loggingAction("_logging")
.setCategory("_category")))
@ -89,7 +87,7 @@ public class BasicWatcherTests extends AbstractWatcherIntegrationTestCase {
public void testIndexWatchRegisterWatchBeforeTargetIndex() throws Exception {
WatcherClient watcherClient = watcherClient();
SearchRequest searchRequest = newInputSearchRequest("idx").source(searchSource().query(termQuery("field", "value")));
WatcherSearchTemplateRequest searchRequest = templateRequest(searchSource().query(termQuery("field", "value")), "idx");
watcherClient.preparePutWatch("_name")
.setSource(watchBuilder()
.trigger(schedule(interval(5, IntervalSchedule.Interval.Unit.SECONDS)))
@ -115,7 +113,7 @@ public class BasicWatcherTests extends AbstractWatcherIntegrationTestCase {
public void testDeleteWatch() throws Exception {
WatcherClient watcherClient = watcherClient();
SearchRequest searchRequest = newInputSearchRequest("idx").source(searchSource().query(matchAllQuery()));
WatcherSearchTemplateRequest searchRequest = templateRequest(searchSource().query(matchAllQuery()), "idx");
PutWatchResponse indexResponse = watcherClient.preparePutWatch("_name")
.setSource(watchBuilder()
.trigger(schedule(cron("0/1 * * * * ? 2020")))
@ -148,7 +146,7 @@ public class BasicWatcherTests extends AbstractWatcherIntegrationTestCase {
watchSource.startObject("schedule").field("cron", "0/5 * * * * ? *").endObject();
watchSource.startObject("condition").startObject("script").field("script", "return true");
watchSource.field("request", new WatcherSearchTemplateRequest(newInputSearchRequest()));
watchSource.field("request", templateRequest(searchSource().query(matchAllQuery())));
watchSource.endObject().endObject();
watchSource.endObject();
@ -171,8 +169,7 @@ public class BasicWatcherTests extends AbstractWatcherIntegrationTestCase {
}
public void testModifyWatches() throws Exception {
SearchRequest searchRequest = newInputSearchRequest("idx")
.source(searchSource().query(matchAllQuery()));
WatcherSearchTemplateRequest searchRequest = templateRequest(searchSource().query(matchAllQuery()), "idx");
WatchSourceBuilder source = watchBuilder()
.trigger(schedule(interval("5s")))
@ -248,7 +245,7 @@ public class BasicWatcherTests extends AbstractWatcherIntegrationTestCase {
public void testConditionSearchWithSource() throws Exception {
SearchSourceBuilder searchSourceBuilder = searchSource().query(matchQuery("level", "a"));
testConditionSearch(newInputSearchRequest("events").source(searchSourceBuilder), null);
testConditionSearch(templateRequest(searchSourceBuilder, "events"));
}
public void testConditionSearchWithIndexedTemplate() throws Exception {
@ -260,8 +257,9 @@ public class BasicWatcherTests extends AbstractWatcherIntegrationTestCase {
.get());
WatcherScript template = WatcherScript.indexed("my-template").lang("mustache").build();
SearchRequest searchRequest = newInputSearchRequest("events");
testConditionSearch(searchRequest, template);
WatcherSearchTemplateRequest searchRequest = new WatcherSearchTemplateRequest(new String[]{"events"}, new String[0],
SearchType.DEFAULT, WatcherSearchTemplateRequest.DEFAULT_INDICES_OPTIONS, template);
testConditionSearch(searchRequest);
}
public void testInputFiltering() throws Exception {
@ -270,18 +268,18 @@ public class BasicWatcherTests extends AbstractWatcherIntegrationTestCase {
// Have a sample document in the index, the watch is going to evaluate
client().prepareIndex("idx", "type").setSource("field", "value").get();
refresh();
SearchRequest searchRequest = newInputSearchRequest("idx").source(searchSource().query(termQuery("field", "value")));
WatcherSearchTemplateRequest request = templateRequest(searchSource().query(termQuery("field", "value")), "idx");
watcherClient.preparePutWatch("_name1")
.setSource(watchBuilder()
.trigger(schedule(interval(5, IntervalSchedule.Interval.Unit.SECONDS)))
.input(searchInput(searchRequest).extractKeys("hits.total"))
.input(searchInput(request).extractKeys("hits.total"))
.condition(compareCondition("ctx.payload.hits.total", CompareCondition.Op.EQ, 1L)))
.get();
// in this watcher the condition will fail, because max_score isn't extracted, only total:
watcherClient.preparePutWatch("_name2")
.setSource(watchBuilder()
.trigger(schedule(interval(5, IntervalSchedule.Interval.Unit.SECONDS)))
.input(searchInput(searchRequest).extractKeys("hits.total"))
.input(searchInput(request).extractKeys("hits.total"))
.condition(compareCondition("ctx.payload.hits.max_score", CompareCondition.Op.GTE, 0L)))
.get();
@ -368,7 +366,7 @@ public class BasicWatcherTests extends AbstractWatcherIntegrationTestCase {
}
}
private void testConditionSearch(SearchRequest request, WatcherScript template) throws Exception {
private void testConditionSearch(WatcherSearchTemplateRequest request) throws Exception {
// reset, so we don't miss event docs when we filter over the _timestamp field.
timeWarp().clock().setTime(SystemClock.INSTANCE.nowUTC());
@ -378,7 +376,7 @@ public class BasicWatcherTests extends AbstractWatcherIntegrationTestCase {
watcherClient().preparePutWatch(watchName)
.setSource(watchBuilder()
.trigger(schedule(interval("5s")))
.input(searchInput(new WatcherSearchTemplateRequest(request, template)))
.input(searchInput(request))
.condition(compareCondition("ctx.payload.hits.total", CompareCondition.Op.GTE, 3L)))
.get();

View File

@ -5,7 +5,6 @@
*/
package org.elasticsearch.xpack.watcher.test.integration;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.common.unit.TimeValue;
@ -21,6 +20,7 @@ import org.elasticsearch.xpack.watcher.execution.TriggeredWatchStore;
import org.elasticsearch.xpack.watcher.execution.Wid;
import org.elasticsearch.xpack.watcher.history.HistoryStore;
import org.elasticsearch.xpack.watcher.history.WatchRecord;
import org.elasticsearch.xpack.watcher.support.search.WatcherSearchTemplateRequest;
import org.elasticsearch.xpack.watcher.test.AbstractWatcherIntegrationTestCase;
import org.elasticsearch.xpack.watcher.transport.actions.stats.WatcherStatsResponse;
import org.elasticsearch.xpack.watcher.trigger.schedule.ScheduleTriggerEvent;
@ -42,15 +42,13 @@ import static org.elasticsearch.xpack.watcher.client.WatchSourceBuilders.watchBu
import static org.elasticsearch.xpack.watcher.condition.ConditionBuilders.alwaysCondition;
import static org.elasticsearch.xpack.watcher.condition.ConditionBuilders.compareCondition;
import static org.elasticsearch.xpack.watcher.input.InputBuilders.searchInput;
import static org.elasticsearch.xpack.watcher.test.WatcherTestUtils.newInputSearchRequest;
import static org.elasticsearch.xpack.watcher.test.WatcherTestUtils.templateRequest;
import static org.elasticsearch.xpack.watcher.trigger.TriggerBuilders.schedule;
import static org.elasticsearch.xpack.watcher.trigger.schedule.Schedules.cron;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.core.IsEqual.equalTo;
import static org.joda.time.DateTimeZone.UTC;
/**
*/
@TestLogging("watcher:TRACE")
public class BootStrapTests extends AbstractWatcherIntegrationTestCase {
@Override
@ -217,12 +215,13 @@ public class BootStrapTests extends AbstractWatcherIntegrationTestCase {
public void testLoadExistingWatchesUponStartup() throws Exception {
int numWatches = scaledRandomIntBetween(16, 128);
SearchRequest searchRequest = newInputSearchRequest("my-index").source(searchSource().query(termQuery("field", "value")));
WatcherSearchTemplateRequest request =
templateRequest(searchSource().query(termQuery("field", "value")), "my-index");
for (int i = 0; i < numWatches; i++) {
client().prepareIndex(WatchStore.INDEX, WatchStore.DOC_TYPE, "_id" + i)
.setSource(watchBuilder()
.trigger(schedule(cron("0 0/5 * * * ? 2050")))
.input(searchInput(searchRequest))
.input(searchInput(request))
.condition(compareCondition("ctx.payload.hits.total", CompareCondition.Op.EQ, 1L))
.buildAsBytes(XContentType.JSON)
)
@ -246,13 +245,14 @@ public class BootStrapTests extends AbstractWatcherIntegrationTestCase {
assertThat(response.getWatcherState(), equalTo(WatcherState.STARTED));
assertThat(response.getWatchesCount(), equalTo(0L));
SearchRequest searchRequest = newInputSearchRequest("my-index").source(searchSource().query(termQuery("field", "value")));
WatcherSearchTemplateRequest request =
templateRequest(searchSource().query(termQuery("field", "value")), "my-index");
int numWatches = 8;
for (int i = 0; i < numWatches; i++) {
String watchId = "_id" + i;
watcherClient().preparePutWatch(watchId).setSource(watchBuilder()
.trigger(schedule(cron("0/5 * * * * ? 2050")))
.input(searchInput(searchRequest))
.input(searchInput(request))
.condition(alwaysCondition())
.addAction("_id", indexAction("output", "test"))
.defaultThrottlePeriod(TimeValue.timeValueMillis(0))
@ -302,10 +302,10 @@ public class BootStrapTests extends AbstractWatcherIntegrationTestCase {
assertThat(response.getWatchesCount(), equalTo(0L));
String watchId = "_id";
SearchRequest searchRequest = newInputSearchRequest("my-index").source(searchSource().query(termQuery("field", "value")));
WatcherSearchTemplateRequest request = templateRequest(searchSource().query(termQuery("field", "value")), "my-index");
watcherClient().preparePutWatch(watchId).setSource(watchBuilder()
.trigger(schedule(cron("0/5 * * * * ? 2050")))
.input(searchInput(searchRequest))
.input(searchInput(request))
.condition(alwaysCondition())
.addAction("_id", indexAction("output", "test"))
.defaultThrottlePeriod(TimeValue.timeValueMillis(0))

View File

@ -7,7 +7,6 @@ package org.elasticsearch.xpack.watcher.test.integration;
import org.apache.lucene.util.LuceneTestCase.BadApple;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.common.settings.Settings;
@ -28,8 +27,8 @@ import org.elasticsearch.xpack.watcher.client.WatchSourceBuilder;
import org.elasticsearch.xpack.watcher.client.WatchSourceBuilders;
import org.elasticsearch.xpack.watcher.condition.compare.CompareCondition;
import org.elasticsearch.xpack.watcher.execution.ExecutionService;
import org.elasticsearch.xpack.watcher.support.search.WatcherSearchTemplateRequest;
import org.elasticsearch.xpack.watcher.test.AbstractWatcherIntegrationTestCase;
import org.elasticsearch.xpack.watcher.test.WatcherTestUtils;
import org.elasticsearch.xpack.watcher.transport.actions.delete.DeleteWatchResponse;
import org.elasticsearch.xpack.watcher.transport.actions.stats.WatcherStatsResponse;
@ -44,6 +43,7 @@ import static org.elasticsearch.xpack.watcher.condition.ConditionBuilders.always
import static org.elasticsearch.xpack.watcher.condition.ConditionBuilders.compareCondition;
import static org.elasticsearch.xpack.watcher.input.InputBuilders.searchInput;
import static org.elasticsearch.xpack.watcher.input.InputBuilders.simpleInput;
import static org.elasticsearch.xpack.watcher.test.WatcherTestUtils.templateRequest;
import static org.elasticsearch.xpack.watcher.trigger.TriggerBuilders.schedule;
import static org.elasticsearch.xpack.watcher.trigger.schedule.Schedules.cron;
import static org.elasticsearch.xpack.watcher.trigger.schedule.Schedules.interval;
@ -90,11 +90,10 @@ public class NoMasterNodeTests extends AbstractWatcherIntegrationTestCase {
// Have a sample document in the index, the watch is going to evaluate
client().prepareIndex("my-index", "my-type").setSource("field", "value").get();
SearchRequest searchRequest = WatcherTestUtils.newInputSearchRequest("my-index").source(
searchSource().query(termQuery("field", "value")));
WatcherSearchTemplateRequest request = templateRequest(searchSource().query(termQuery("field", "value")), "my-index");
WatchSourceBuilder watchSource = watchBuilder()
.trigger(schedule(cron("0/5 * * * * ? *")))
.input(searchInput(searchRequest))
.input(searchInput(request))
.condition(compareCondition("ctx.payload.hits.total", CompareCondition.Op.EQ, 1L));
// we first need to make sure the license is enabled, otherwise all APIs will be blocked
@ -200,11 +199,10 @@ public class NoMasterNodeTests extends AbstractWatcherIntegrationTestCase {
ensureLicenseEnabled();
for (int i = 1; i <= numberOfWatches; i++) {
String watchName = "watch" + i;
SearchRequest searchRequest = WatcherTestUtils.newInputSearchRequest("my-index").source(
searchSource().query(termQuery("field", "value")));
WatcherSearchTemplateRequest request = templateRequest(searchSource().query(termQuery("field", "value")), "my-index");
WatchSourceBuilder watchSource = watchBuilder()
.trigger(schedule(cron("0/5 * * * * ? *")))
.input(searchInput(searchRequest))
.input(searchInput(request))
.condition(compareCondition("ctx.payload.hits.total", CompareCondition.Op.EQ, 1L));
watcherClient().preparePutWatch(watchName).setSource(watchSource).get();
}

View File

@ -5,7 +5,6 @@
*/
package org.elasticsearch.xpack.watcher.test.integration;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
@ -39,6 +38,7 @@ import org.elasticsearch.xpack.watcher.support.WatcherScript;
import org.elasticsearch.xpack.watcher.support.init.proxy.WatcherClientProxy;
import org.elasticsearch.xpack.watcher.support.search.WatcherSearchTemplateRequest;
import org.elasticsearch.xpack.watcher.support.search.WatcherSearchTemplateService;
import org.elasticsearch.xpack.watcher.test.WatcherTestUtils;
import org.elasticsearch.xpack.watcher.trigger.schedule.IntervalSchedule;
import org.elasticsearch.xpack.watcher.trigger.schedule.ScheduleTrigger;
import org.elasticsearch.xpack.watcher.trigger.schedule.ScheduleTriggerEvent;
@ -79,13 +79,8 @@ public class SearchInputTests extends ESIntegTestCase {
public void testExecute() throws Exception {
SearchSourceBuilder searchSourceBuilder = searchSource().query(
boolQuery().must(matchQuery("event_type", "a")));
SearchRequest searchRequest = client()
.prepareSearch()
.setSearchType(ExecutableSearchInput.DEFAULT_SEARCH_TYPE)
.request()
.source(searchSourceBuilder);
WatcherSearchTemplateRequest request = new WatcherSearchTemplateRequest(searchRequest);
WatcherSearchTemplateRequest request = WatcherTestUtils.templateRequest(searchSourceBuilder);
ExecutableSearchInput searchInput = new ExecutableSearchInput(new SearchInput(request, null, null, null), logger,
WatcherClientProxy.of(client()), watcherSearchTemplateService(), null);
WatchExecutionContext ctx = new TriggeredExecutionContext(
@ -106,9 +101,9 @@ public class SearchInputTests extends ESIntegTestCase {
assertThat(XContentMapValues.extractValue("hits.total", result.payload().data()), equalTo(0));
assertNotNull(result.executedRequest());
assertThat(result.status(), is(Input.Result.Status.SUCCESS));
assertEquals(result.executedRequest().searchType(), request.getRequest().searchType());
assertArrayEquals(result.executedRequest().indices(), request.getRequest().indices());
assertEquals(result.executedRequest().indicesOptions(), request.getRequest().indicesOptions());
assertEquals(result.executedRequest().getSearchType(), request.getSearchType());
assertArrayEquals(result.executedRequest().getIndices(), request.getIndices());
assertEquals(result.executedRequest().getIndicesOptions(), request.getIndicesOptions());
}
public void testDifferentSearchType() throws Exception {
@ -116,14 +111,7 @@ public class SearchInputTests extends ESIntegTestCase {
boolQuery().must(matchQuery("event_type", "a"))
);
SearchType searchType = getRandomSupportedSearchType();
SearchRequest searchRequest = client()
.prepareSearch()
.setSearchType(searchType)
.request()
.source(searchSourceBuilder);
WatcherSearchTemplateRequest request = new WatcherSearchTemplateRequest(searchRequest);
WatcherSearchTemplateRequest request = WatcherTestUtils.templateRequest(searchSourceBuilder, searchType);
ExecutableSearchInput searchInput = new ExecutableSearchInput(new SearchInput(request, null, null, null), logger,
WatcherClientProxy.of(client()), watcherSearchTemplateService(), null);
@ -145,22 +133,19 @@ public class SearchInputTests extends ESIntegTestCase {
assertThat(XContentMapValues.extractValue("hits.total", result.payload().data()), equalTo(0));
assertNotNull(result.executedRequest());
assertThat(result.status(), is(Input.Result.Status.SUCCESS));
assertEquals(result.executedRequest().searchType(), searchType);
assertArrayEquals(result.executedRequest().indices(), searchRequest.indices());
assertEquals(result.executedRequest().indicesOptions(), searchRequest.indicesOptions());
assertEquals(result.executedRequest().getSearchType(), searchType);
assertArrayEquals(result.executedRequest().getIndices(), request.getIndices());
assertEquals(result.executedRequest().getIndicesOptions(), request.getIndicesOptions());
}
public void testParserValid() throws Exception {
SearchRequest searchRequest = client().prepareSearch()
.setSearchType(ExecutableSearchInput.DEFAULT_SEARCH_TYPE)
.request()
.source(searchSource()
SearchSourceBuilder source = searchSource()
.query(boolQuery().must(matchQuery("event_type", "a")).must(rangeQuery("_timestamp")
.from("{{ctx.trigger.scheduled_time}}||-30s").to("{{ctx.trigger.triggered_time}}"))));
.from("{{ctx.trigger.scheduled_time}}||-30s").to("{{ctx.trigger.triggered_time}}")));
TimeValue timeout = randomBoolean() ? TimeValue.timeValueSeconds(randomInt(10)) : null;
XContentBuilder builder = jsonBuilder().value(
new SearchInput(new WatcherSearchTemplateRequest(searchRequest), null, timeout, null));
new SearchInput(WatcherTestUtils.templateRequest(source), null, timeout, null));
XContentParser parser = JsonXContent.jsonXContent.createParser(builder.bytes());
parser.nextToken();

View File

@ -5,10 +5,8 @@
*/
package org.elasticsearch.xpack.watcher.test.integration;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.client.Requests;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ToXContent;
@ -71,6 +69,7 @@ import static org.elasticsearch.test.ESIntegTestCase.Scope.SUITE;
import static org.elasticsearch.xpack.watcher.test.WatcherTestUtils.EMPTY_PAYLOAD;
import static org.elasticsearch.xpack.watcher.test.WatcherTestUtils.getRandomSupportedSearchType;
import static org.elasticsearch.xpack.watcher.test.WatcherTestUtils.mockExecutionContext;
import static org.elasticsearch.xpack.watcher.test.WatcherTestUtils.templateRequest;
import static org.hamcrest.Matchers.arrayContainingInAnyOrder;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
@ -108,7 +107,7 @@ public class SearchTransformTests extends ESIntegTestCase {
ensureGreen("idx");
refresh();
SearchRequest request = Requests.searchRequest("idx").source(new SearchSourceBuilder().query(QueryBuilders.matchAllQuery()));
WatcherSearchTemplateRequest request = templateRequest(new SearchSourceBuilder().query(QueryBuilders.matchAllQuery()), "idx");
SearchTransform searchTransform = TransformBuilders.searchTransform(request).build();
ExecutableSearchTransform transform = new ExecutableSearchTransform(searchTransform, logger, WatcherClientProxy.of(client()),
watcherSearchTemplateService(), null);
@ -120,7 +119,7 @@ public class SearchTransformTests extends ESIntegTestCase {
assertThat(result.type(), is(SearchTransform.TYPE));
assertThat(result.status(), is(Transform.Result.Status.SUCCESS));
SearchResponse response = client().search(request).get();
SearchResponse response = client().prepareSearch("idx").get();
Payload expectedPayload = new Payload.XContent(response);
// we need to remove the "took" field from teh response as this is the only field
@ -141,9 +140,9 @@ public class SearchTransformTests extends ESIntegTestCase {
refresh();
// create a bad request
SearchRequest request = Requests.searchRequest("idx").source(
new SearchSourceBuilder().query(QueryBuilders.wrapperQuery(jsonBuilder().startObject()
.startObject("_unknown_query_").endObject().endObject().bytes())));
WatcherSearchTemplateRequest request = templateRequest(new SearchSourceBuilder().query(
QueryBuilders.wrapperQuery(jsonBuilder().startObject()
.startObject("_unknown_query_").endObject().endObject().bytes())), "idx");
SearchTransform searchTransform = TransformBuilders.searchTransform(request).build();
ExecutableSearchTransform transform = new ExecutableSearchTransform(searchTransform, logger, WatcherClientProxy.of(client()),
watcherSearchTemplateService(), null);
@ -159,11 +158,14 @@ public class SearchTransformTests extends ESIntegTestCase {
// extract the base64 encoded query from the template script, path is: query -> wrapper -> query
try (XContentBuilder builder = jsonBuilder()) {
result.executedRequest().source().toXContent(builder, ToXContent.EMPTY_PARAMS);
result.executedRequest().toXContent(builder, ToXContent.EMPTY_PARAMS);
String jsonQuery = builder.string();
Map<String, Object> map = XContentFactory.xContent(jsonQuery).createParser(jsonQuery).map();
assertThat(map, hasKey("body"));
assertThat(map.get("body"), instanceOf(Map.class));
map = (Map<String, Object>) map.get("body");
assertThat(map, hasKey("query"));
assertThat(map.get("query"), instanceOf(Map.class));
@ -224,41 +226,34 @@ public class SearchTransformTests extends ESIntegTestCase {
assertThat(executable.type(), is(SearchTransform.TYPE));
assertThat(executable.transform().getRequest(), notNullValue());
if (indices != null) {
assertThat(executable.transform().getRequest().getRequest().indices(), arrayContainingInAnyOrder(indices));
assertThat(executable.transform().getRequest().getIndices(), arrayContainingInAnyOrder(indices));
}
if (searchType != null) {
assertThat(executable.transform().getRequest().getRequest().searchType(), is(searchType));
assertThat(executable.transform().getRequest().getSearchType(), is(searchType));
}
if (templateName != null) {
assertThat(executable.transform().getRequest().getTemplate(),
equalTo(WatcherScript.file("template1").build()));
}
SearchSourceBuilder source = new SearchSourceBuilder().query(QueryBuilders.matchAllQuery());
assertThat(executable.transform().getRequest().getRequest().source(), equalTo(source));
assertThat(executable.transform().getRequest().getSearchSource().utf8ToString(), equalTo("{\"query\":{\"match_all\":{}}}"));
assertThat(executable.transform().getTimeout(), equalTo(readTimeout));
}
public void testDifferentSearchType() throws Exception {
WatchExecutionContext ctx = createContext();
SearchSourceBuilder searchSourceBuilder = searchSource().query(boolQuery()
.must(matchQuery("event_type", "a")));
final SearchType searchType = getRandomSupportedSearchType();
SearchRequest request = client()
.prepareSearch("test-search-index")
.setSearchType(searchType)
.request()
.source(searchSourceBuilder);
SearchTransform.Result result = executeSearchTransform(request, null, ctx);
WatcherSearchTemplateRequest request = templateRequest(searchSourceBuilder, searchType, "test-search-index");
SearchTransform.Result result = executeSearchTransform(request, ctx);
assertThat(XContentMapValues.extractValue("hits.total", result.payload().data()), equalTo(0));
assertThat(result.executedRequest(), notNullValue());
assertThat(result.status(), is(Transform.Result.Status.SUCCESS));
assertThat(result.executedRequest().searchType(), is(searchType));
assertThat(result.executedRequest().indices(), arrayContainingInAnyOrder(request.indices()));
assertThat(result.executedRequest().indicesOptions(), equalTo(request.indicesOptions()));
assertThat(result.executedRequest().getSearchType(), is(searchType));
assertThat(result.executedRequest().getIndices(), arrayContainingInAnyOrder(request.getIndices()));
assertThat(result.executedRequest().getIndicesOptions(), equalTo(request.getIndicesOptions()));
}
private WatchExecutionContext createContext() {
@ -278,12 +273,12 @@ public class SearchTransformTests extends ESIntegTestCase {
timeValueSeconds(5));
}
private SearchTransform.Result executeSearchTransform(SearchRequest request, WatcherScript template, WatchExecutionContext ctx)
private SearchTransform.Result executeSearchTransform(WatcherSearchTemplateRequest request, WatchExecutionContext ctx)
throws IOException {
createIndex("test-search-index");
ensureGreen("test-search-index");
SearchTransform searchTransform = TransformBuilders.searchTransform(new WatcherSearchTemplateRequest(request, template)).build();
SearchTransform searchTransform = TransformBuilders.searchTransform(request).build();
ExecutableSearchTransform executableSearchTransform = new ExecutableSearchTransform(searchTransform, logger,
WatcherClientProxy.of(client()), watcherSearchTemplateService(), null);

View File

@ -34,11 +34,12 @@ import java.util.concurrent.TimeUnit;
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
import static org.elasticsearch.index.query.QueryBuilders.matchQuery;
import static org.elasticsearch.search.builder.SearchSourceBuilder.searchSource;
import static org.elasticsearch.xpack.watcher.actions.ActionBuilders.indexAction;
import static org.elasticsearch.xpack.watcher.client.WatchSourceBuilders.watchBuilder;
import static org.elasticsearch.xpack.watcher.condition.ConditionBuilders.compareCondition;
import static org.elasticsearch.xpack.watcher.input.InputBuilders.searchInput;
import static org.elasticsearch.xpack.watcher.test.WatcherTestUtils.matchAllRequest;
import static org.elasticsearch.xpack.watcher.test.WatcherTestUtils.templateRequest;
import static org.elasticsearch.xpack.watcher.transform.TransformBuilders.searchTransform;
import static org.elasticsearch.xpack.watcher.trigger.TriggerBuilders.schedule;
import static org.elasticsearch.xpack.watcher.trigger.schedule.Schedules.cron;
@ -72,9 +73,9 @@ public class WatchAckTests extends AbstractWatcherIntegrationTestCase {
.setId("_id")
.setSource(watchBuilder()
.trigger(schedule(cron("0/5 * * * * ? *")))
.input(searchInput(matchAllRequest().indices("events")))
.input(searchInput(templateRequest(searchSource(), "events")))
.condition(compareCondition("ctx.payload.hits.total", CompareCondition.Op.GT, 0L))
.transform(searchTransform(matchAllRequest().indices("events")))
.transform(searchTransform(templateRequest(searchSource(), "events")))
.addAction("_a1", indexAction("actions", "action1"))
.addAction("_a2", indexAction("actions", "action2"))
.defaultThrottlePeriod(new TimeValue(0, TimeUnit.SECONDS)))
@ -146,9 +147,9 @@ public class WatchAckTests extends AbstractWatcherIntegrationTestCase {
.setId("_id")
.setSource(watchBuilder()
.trigger(schedule(cron("0/5 * * * * ? *")))
.input(searchInput(matchAllRequest().indices("events")))
.input(searchInput(templateRequest(searchSource(), "events")))
.condition(compareCondition("ctx.payload.hits.total", CompareCondition.Op.GT, 0L))
.transform(searchTransform(matchAllRequest().indices("events")))
.transform(searchTransform(templateRequest(searchSource(), "events")))
.addAction("_a1", indexAction("actions", "action1"))
.addAction("_a2", indexAction("actions", "action2"))
.defaultThrottlePeriod(new TimeValue(0, TimeUnit.SECONDS)))
@ -227,9 +228,9 @@ public class WatchAckTests extends AbstractWatcherIntegrationTestCase {
.setId("_name")
.setSource(watchBuilder()
.trigger(schedule(cron("0/5 * * * * ? *")))
.input(searchInput(matchAllRequest().indices("events")))
.input(searchInput(templateRequest(searchSource(), "events")))
.condition(compareCondition("ctx.payload.hits.total", CompareCondition.Op.GT, 0L))
.transform(searchTransform(matchAllRequest().indices("events")))
.transform(searchTransform(templateRequest(searchSource(), "events")))
.addAction("_id", indexAction("actions", "action")))
.get();

View File

@ -16,7 +16,6 @@ import org.elasticsearch.xpack.watcher.history.HistoryStore;
import org.elasticsearch.xpack.common.text.TextTemplate;
import org.elasticsearch.xpack.watcher.support.xcontent.ObjectPath;
import org.elasticsearch.xpack.watcher.test.AbstractWatcherIntegrationTestCase;
import org.elasticsearch.xpack.watcher.test.WatcherTestUtils;
import org.elasticsearch.xpack.watcher.transport.actions.execute.ExecuteWatchResponse;
import org.elasticsearch.xpack.watcher.trigger.TriggerEvent;
import org.elasticsearch.xpack.watcher.trigger.schedule.ScheduleTriggerEvent;
@ -34,15 +33,13 @@ import static org.elasticsearch.xpack.watcher.actions.ActionBuilders.loggingActi
import static org.elasticsearch.xpack.watcher.client.WatchSourceBuilders.watchBuilder;
import static org.elasticsearch.xpack.watcher.condition.ConditionBuilders.compareCondition;
import static org.elasticsearch.xpack.watcher.input.InputBuilders.searchInput;
import static org.elasticsearch.xpack.watcher.test.WatcherTestUtils.templateRequest;
import static org.elasticsearch.xpack.watcher.trigger.TriggerBuilders.schedule;
import static org.elasticsearch.xpack.watcher.trigger.schedule.Schedules.cron;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.joda.time.DateTimeZone.UTC;
/**
*
*/
public class WatchMetadataTests extends AbstractWatcherIntegrationTestCase {
public void testWatchMetadata() throws Exception {
Map<String, Object> metadata = new HashMap<>();
@ -57,8 +54,7 @@ public class WatchMetadataTests extends AbstractWatcherIntegrationTestCase {
watcherClient().preparePutWatch("_name")
.setSource(watchBuilder()
.trigger(schedule(cron("0/5 * * * * ? *")))
.input(searchInput(WatcherTestUtils.newInputSearchRequest("my-index")
.source(searchSource().query(matchAllQuery()))))
.input(searchInput(templateRequest(searchSource().query(matchAllQuery()), "my-index")))
.condition(compareCondition("ctx.payload.hits.total", CompareCondition.Op.EQ, 1L))
.metadata(metadata))
.get();
@ -89,8 +85,7 @@ public class WatchMetadataTests extends AbstractWatcherIntegrationTestCase {
watcherClient().preparePutWatch("_name")
.setSource(watchBuilder()
.trigger(schedule(cron("0 0 0 1 1 ? 2050")))
.input(searchInput(WatcherTestUtils.newInputSearchRequest("my-index")
.source(searchSource().query(matchAllQuery()))))
.input(searchInput(templateRequest(searchSource().query(matchAllQuery()), "my-index")))
.condition(new AlwaysCondition())
.addAction("testLogger", loggingAction)
.defaultThrottlePeriod(TimeValue.timeValueSeconds(0))

View File

@ -5,7 +5,6 @@
*/
package org.elasticsearch.xpack.watcher.transform;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.collect.MapBuilder;
@ -15,15 +14,14 @@ import org.elasticsearch.env.Environment;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.script.MockScriptPlugin;
import org.elasticsearch.xpack.watcher.support.WatcherScript;
import org.elasticsearch.xpack.watcher.support.search.WatcherSearchTemplateRequest;
import org.elasticsearch.xpack.watcher.test.AbstractWatcherIntegrationTestCase;
import org.elasticsearch.xpack.watcher.test.WatcherTestUtils;
import org.elasticsearch.xpack.watcher.transport.actions.put.PutWatchResponse;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -39,6 +37,7 @@ import static org.elasticsearch.xpack.watcher.client.WatchSourceBuilders.watchBu
import static org.elasticsearch.xpack.watcher.condition.ConditionBuilders.alwaysCondition;
import static org.elasticsearch.xpack.watcher.input.InputBuilders.searchInput;
import static org.elasticsearch.xpack.watcher.input.InputBuilders.simpleInput;
import static org.elasticsearch.xpack.watcher.test.WatcherTestUtils.templateRequest;
import static org.elasticsearch.xpack.watcher.transform.TransformBuilders.chainTransform;
import static org.elasticsearch.xpack.watcher.transform.TransformBuilders.scriptTransform;
import static org.elasticsearch.xpack.watcher.transform.TransformBuilders.searchTransform;
@ -177,10 +176,8 @@ public class TransformIntegrationTests extends AbstractWatcherIntegrationTestCas
index("my-payload-index", "payload", "mytestresult");
refresh();
SearchRequest inputRequest = WatcherTestUtils.newInputSearchRequest("my-condition-index")
.source(searchSource().query(matchAllQuery()));
SearchRequest transformRequest = WatcherTestUtils.newInputSearchRequest("my-payload-index")
.source(searchSource().query(matchAllQuery()));
WatcherSearchTemplateRequest inputRequest = templateRequest(searchSource().query(matchAllQuery()), "my-condition-index");
WatcherSearchTemplateRequest transformRequest = templateRequest(searchSource().query(matchAllQuery()), "my-payload-index");
PutWatchResponse putWatchResponse = watcherClient().preparePutWatch("_id1")
.setSource(watchBuilder()

View File

@ -5,7 +5,6 @@
*/
package org.elasticsearch.xpack.watcher.transport.action.stats;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
import org.elasticsearch.test.junit.annotations.TestLogging;
@ -13,8 +12,8 @@ import org.elasticsearch.xpack.watcher.WatcherBuild;
import org.elasticsearch.xpack.watcher.WatcherState;
import org.elasticsearch.xpack.watcher.client.WatcherClient;
import org.elasticsearch.xpack.watcher.condition.compare.CompareCondition;
import org.elasticsearch.xpack.watcher.support.search.WatcherSearchTemplateRequest;
import org.elasticsearch.xpack.watcher.test.AbstractWatcherIntegrationTestCase;
import org.elasticsearch.xpack.watcher.test.WatcherTestUtils;
import org.elasticsearch.xpack.watcher.transport.actions.delete.DeleteWatchResponse;
import org.elasticsearch.xpack.watcher.transport.actions.stats.WatcherStatsRequest;
import org.elasticsearch.xpack.watcher.transport.actions.stats.WatcherStatsResponse;
@ -27,15 +26,13 @@ import static org.elasticsearch.test.ESIntegTestCase.Scope.TEST;
import static org.elasticsearch.xpack.watcher.client.WatchSourceBuilders.watchBuilder;
import static org.elasticsearch.xpack.watcher.condition.ConditionBuilders.compareCondition;
import static org.elasticsearch.xpack.watcher.input.InputBuilders.searchInput;
import static org.elasticsearch.xpack.watcher.test.WatcherTestUtils.templateRequest;
import static org.elasticsearch.xpack.watcher.trigger.TriggerBuilders.schedule;
import static org.elasticsearch.xpack.watcher.trigger.schedule.Schedules.cron;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.core.IsEqual.equalTo;
/**
*/
@ClusterScope(scope = TEST, numClientNodes = 0, transportClientRatio = 0, randomDynamicTemplates = false)
@TestLogging("watcher:TRACE")
public class WatcherStatsTests extends AbstractWatcherIntegrationTestCase {
@ -58,12 +55,11 @@ public class WatcherStatsTests extends AbstractWatcherIntegrationTestCase {
assertThat(response.getWatcherState(), equalTo(WatcherState.STARTED));
SearchRequest searchRequest = WatcherTestUtils.newInputSearchRequest("idx")
.source(searchSource().query(termQuery("field", "value")));
WatcherSearchTemplateRequest request = templateRequest(searchSource().query(termQuery("field", "value")), "idx");
watcherClient().preparePutWatch("_name")
.setSource(watchBuilder()
.trigger(schedule(cron("* * * * * ? *")))
.input(searchInput(searchRequest))
.input(searchInput(request))
.condition(compareCondition("ctx.payload.hits.total", CompareCondition.Op.EQ, 1L))
)
.get();

View File

@ -5,14 +5,6 @@
*/
package org.elasticsearch.xpack.watcher.watch;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.logging.ESLogger;
@ -89,7 +81,6 @@ import org.elasticsearch.xpack.watcher.input.simple.SimpleInput;
import org.elasticsearch.xpack.watcher.input.simple.SimpleInputFactory;
import org.elasticsearch.xpack.watcher.support.WatcherScript;
import org.elasticsearch.xpack.watcher.support.init.proxy.WatcherClientProxy;
import org.elasticsearch.xpack.watcher.support.search.WatcherSearchTemplateRequest;
import org.elasticsearch.xpack.watcher.support.search.WatcherSearchTemplateService;
import org.elasticsearch.xpack.watcher.test.WatcherTestUtils;
import org.elasticsearch.xpack.watcher.transform.ExecutableTransform;
@ -126,12 +117,20 @@ import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.junit.Before;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static java.util.Collections.singleton;
import static java.util.Collections.singletonMap;
import static java.util.Collections.unmodifiableMap;
import static org.elasticsearch.search.builder.SearchSourceBuilder.searchSource;
import static org.elasticsearch.xpack.watcher.input.InputBuilders.searchInput;
import static org.elasticsearch.xpack.watcher.support.search.WatcherSearchTemplateRequest.DEFAULT_INDICES_OPTIONS;
import static org.elasticsearch.xpack.watcher.test.WatcherTestUtils.matchAllRequest;
import static org.elasticsearch.xpack.watcher.test.WatcherTestUtils.templateRequest;
import static org.elasticsearch.xpack.watcher.trigger.TriggerBuilders.schedule;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
@ -337,7 +336,7 @@ public class WatchTests extends ESTestCase {
String type = randomFrom(SearchInput.TYPE, SimpleInput.TYPE);
switch (type) {
case SearchInput.TYPE:
SearchInput searchInput = searchInput(WatcherTestUtils.newInputSearchRequest("idx")).build();
SearchInput searchInput = searchInput(WatcherTestUtils.templateRequest(searchSource(), "idx")).build();
return new ExecutableSearchInput(searchInput, logger, client, searchTemplateService, null);
default:
SimpleInput simpleInput = InputBuilders.simpleInput(singletonMap("_key", "_val")).build();
@ -405,17 +404,17 @@ public class WatchTests extends ESTestCase {
return new ExecutableScriptTransform(new ScriptTransform(WatcherScript.inline("_script").build()), logger, scriptService);
case SearchTransform.TYPE:
SearchTransform transform = new SearchTransform(
new WatcherSearchTemplateRequest(matchAllRequest(DEFAULT_INDICES_OPTIONS), null), timeout, timeZone);
templateRequest(searchSource()), timeout, timeZone);
return new ExecutableSearchTransform(transform, logger, client, searchTemplateService, null);
default: // chain
SearchTransform searchTransform = new SearchTransform(
new WatcherSearchTemplateRequest(matchAllRequest(DEFAULT_INDICES_OPTIONS), null), timeout, timeZone);
templateRequest(searchSource()), timeout, timeZone);
ScriptTransform scriptTransform = new ScriptTransform(WatcherScript.inline("_script").build());
ChainTransform chainTransform = new ChainTransform(Arrays.asList(searchTransform, scriptTransform));
return new ExecutableChainTransform(chainTransform, logger, Arrays.<ExecutableTransform>asList(
new ExecutableSearchTransform(new SearchTransform(
new WatcherSearchTemplateRequest(matchAllRequest(DEFAULT_INDICES_OPTIONS), null), timeout, timeZone),
templateRequest(searchSource()), timeout, timeZone),
logger, client, searchTemplateService, null),
new ExecutableScriptTransform(new ScriptTransform(WatcherScript.inline("_script").build()),
logger, scriptService)));