Unify SearchRequest construction between SearchTransform and SearchInput.

This is a step between the old and elastic/elasticsearch#184 this change unifies the logic that generates the search requests from prototype for SearchTransforms and SearchInputs.
This change also adds the `executed_request` field to the `SearchTransform.Result` that was missing.
Add tests for SearchTransform.Result parsing and all different templating options for SearchTransform along with testing handling of search_type in SearchTransform which was being missed previously.

Fixes elastic/elasticsearch#311

Original commit: elastic/x-pack-elasticsearch@26e68576f5
This commit is contained in:
Brian Murphy 2015-04-30 22:57:25 -04:00
parent 118389bc06
commit 56e422b9e8
10 changed files with 519 additions and 291 deletions

View File

@ -8,18 +8,14 @@ package org.elasticsearch.watcher.input.search;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.script.ExecutableScript;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.watcher.execution.WatchExecutionContext;
import org.elasticsearch.watcher.input.ExecutableInput;
import org.elasticsearch.watcher.support.Variables;
import org.elasticsearch.watcher.support.WatcherUtils;
import org.elasticsearch.watcher.support.XContentFilterKeysUtils;
import org.elasticsearch.watcher.support.init.proxy.ClientProxy;
import org.elasticsearch.watcher.support.init.proxy.ScriptServiceProxy;
@ -49,7 +45,7 @@ public class ExecutableSearchInput extends ExecutableInput<SearchInput, SearchIn
@Override
public SearchInput.Result execute(WatchExecutionContext ctx) throws IOException {
SearchRequest request = createSearchRequestWithTimes(input.getSearchRequest(), ctx, scriptService);
SearchRequest request = WatcherUtils.createSearchRequestFromPrototype(input.getSearchRequest(), ctx, scriptService, null);
if (logger.isTraceEnabled()) {
logger.trace("[{}] running query for [{}] [{}]", ctx.id(), ctx.watch().id(), XContentHelper.convertToJson(request.source(), false, true));
}
@ -76,39 +72,4 @@ public class ExecutableSearchInput extends ExecutableInput<SearchInput, SearchIn
return new SearchInput.Result(request, payload);
}
/**
* Creates a new search request applying the scheduledFireTime and fireTime to the original request
*/
public static SearchRequest createSearchRequestWithTimes(SearchRequest requestPrototype, WatchExecutionContext ctx, ScriptServiceProxy scriptService) throws IOException {
SearchRequest request = new SearchRequest(requestPrototype)
.indicesOptions(requestPrototype.indicesOptions())
.searchType(requestPrototype.searchType())
.indices(requestPrototype.indices());
Map<String, Object> templateParams = Variables.createCtxModel(ctx, null);
templateParams.putAll(requestPrototype.templateParams());
if (Strings.hasLength(requestPrototype.source())) {
String requestSource = XContentHelper.convertToJson(requestPrototype.source(), false);
ExecutableScript script = scriptService.executable("mustache", requestSource, ScriptService.ScriptType.INLINE, templateParams);
request.source((BytesReference) script.unwrap(script.run()), false);
} else if (Strings.hasLength(requestPrototype.templateSource())) {
String requestSource = XContentHelper.convertToJson(requestPrototype.templateSource(), false);
ExecutableScript script = scriptService.executable("mustache", requestSource, ScriptService.ScriptType.INLINE, templateParams);
request.source((BytesReference) script.unwrap(script.run()), false);
} else if (requestPrototype.templateName() != null) {
//templateParams = WatcherUtils.flattenModel(templateParams);
request.templateParams(templateParams);
request.templateName(requestPrototype.templateName());
request.templateType(requestPrototype.templateType());
} /*else {
request.templateParams(templateParams);
}*/
// falling back to an empty body
return request;
}
}

View File

@ -10,14 +10,19 @@ import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.joda.time.DateTime;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.script.ExecutableScript;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.watcher.WatcherException;
import org.elasticsearch.watcher.execution.WatchExecutionContext;
import org.elasticsearch.watcher.support.init.proxy.ScriptServiceProxy;
import org.elasticsearch.watcher.watch.Payload;
import java.io.IOException;
import java.lang.reflect.Array;
@ -57,6 +62,33 @@ public final class WatcherUtils {
}
}
public static SearchRequest createSearchRequestFromPrototype(SearchRequest requestPrototype, WatchExecutionContext ctx, ScriptServiceProxy scriptService, Payload payload) throws IOException {
SearchRequest request = new SearchRequest(requestPrototype)
.indicesOptions(requestPrototype.indicesOptions())
.searchType(requestPrototype.searchType())
.indices(requestPrototype.indices());
Map<String, Object> templateParams = Variables.createCtxModel(ctx, payload);
templateParams.putAll(requestPrototype.templateParams());
if (Strings.hasLength(requestPrototype.source())) {
String requestSource = XContentHelper.convertToJson(requestPrototype.source(), false);
ExecutableScript script = scriptService.executable("mustache", requestSource, ScriptService.ScriptType.INLINE, templateParams);
request.source((BytesReference) script.unwrap(script.run()), false);
} else if (Strings.hasLength(requestPrototype.templateSource())) {
String requestSource = XContentHelper.convertToJson(requestPrototype.templateSource(), false);
ExecutableScript script = scriptService.executable("mustache", requestSource, ScriptService.ScriptType.INLINE, templateParams);
request.source((BytesReference) script.unwrap(script.run()), false);
} else if (requestPrototype.templateName() != null) {
request.templateParams(templateParams);
request.templateName(requestPrototype.templateName());
request.templateType(requestPrototype.templateType());
}
// falling back to an empty body
return request;
}
/**
* Reads a new search request instance for the specified parser.
*/

View File

@ -16,6 +16,7 @@ import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.script.ExecutableScript;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.watcher.execution.WatchExecutionContext;
import org.elasticsearch.watcher.support.WatcherUtils;
import org.elasticsearch.watcher.support.init.proxy.ClientProxy;
import org.elasticsearch.watcher.support.init.proxy.ScriptServiceProxy;
import org.elasticsearch.watcher.transform.ExecutableTransform;
@ -45,15 +46,18 @@ public class ExecutableSearchTransform extends ExecutableTransform<SearchTransfo
@Override
public SearchTransform.Result execute(WatchExecutionContext ctx, Payload payload) throws IOException {
SearchRequest req = createRequest(transform.request, ctx, payload);
SearchRequest req = WatcherUtils.createSearchRequestFromPrototype(transform.request, ctx, scriptService, payload);
SearchResponse resp = client.search(req);
return new SearchTransform.Result(new Payload.XContent(resp));
return new SearchTransform.Result(req, new Payload.XContent(resp));
}
SearchRequest createRequest(SearchRequest requestPrototype, WatchExecutionContext ctx, Payload payload) throws IOException {
SearchRequest request = new SearchRequest(requestPrototype)
.indicesOptions(requestPrototype.indicesOptions())
.indices(requestPrototype.indices());
.indices(requestPrototype.indices())
.searchType(requestPrototype.searchType());
if (Strings.hasLength(requestPrototype.source())) {
String requestSource = XContentHelper.convertToJson(requestPrototype.source(), false);
ExecutableScript script = scriptService.executable("mustache", requestSource, ScriptService.ScriptType.INLINE, createCtxModel(ctx, payload));

View File

@ -6,6 +6,7 @@
package org.elasticsearch.watcher.transform.search;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.watcher.support.SearchRequestEquivalence;
@ -73,29 +74,57 @@ public class SearchTransform implements Transform {
public static class Result extends Transform.Result {
public Result(Payload payload) {
private final SearchRequest executedRequest;
public Result(SearchRequest executedRequest, Payload payload) {
super(TYPE, payload);
this.executedRequest = executedRequest;
}
@Override
protected XContentBuilder xContentBody(XContentBuilder builder, Params params) throws IOException {
builder.field(Field.EXECUTED_REQUEST.getPreferredName());
WatcherUtils.writeSearchRequest(executedRequest, builder, params);
return builder;
}
public static Result parse(String watchId, XContentParser parser) throws IOException {
XContentParser.Token token = parser.currentToken();
if (token != XContentParser.Token.START_OBJECT) {
throw new SearchTransformException("could not parse [{}] transform result for watch [{}]. expected an object, but found [{}] instead", TYPE, watchId, token);
SearchRequest executedRequest = null;
Payload payload = null;
String currentFieldName = null;
XContentParser.Token token;
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
currentFieldName = parser.currentName();
} else if (Field.EXECUTED_REQUEST.match(currentFieldName)) {
try {
executedRequest = WatcherUtils.readSearchRequest(parser, ExecutableSearchTransform.DEFAULT_SEARCH_TYPE);
} catch (SearchRequestParseException srpe) {
throw new SearchTransformException("could not parse [{}] transform result for watch [{}]. failed to parse [{}]", srpe, TYPE, watchId, currentFieldName);
}
} else if (token == XContentParser.Token.START_OBJECT && currentFieldName != null) {
if (Field.PAYLOAD.match(currentFieldName)) {
payload = new Payload.XContent(parser);
} else {
throw new SearchTransformException("could not parse [{}] transform result for watch [{}]. unexpected field [{}]", TYPE, watchId, currentFieldName);
}
}
}
token = parser.nextToken();
if (token != XContentParser.Token.FIELD_NAME || !Field.PAYLOAD.match(parser.currentName())) {
throw new SearchTransformException("could not parse [{}] transform result for watch [{}]. expected a [{}] field, but found [{}] instead", TYPE, watchId, Field.PAYLOAD.getPreferredName(), token);
if (payload == null) {
throw new SearchTransformException("could not parse [{}] transform result for watch [{}]. missing required [{}] field", TYPE, watchId, Field.PAYLOAD.getPreferredName());
}
token = parser.nextToken();
if (token != XContentParser.Token.START_OBJECT) {
throw new SearchTransformException("could not parse [{}] transform result for watch [{}]. expected a [{}] field, but found [{}] instead", TYPE, watchId, Field.PAYLOAD.getPreferredName(), token);
if (executedRequest == null) {
throw new SearchTransformException("could not parse [{}] transform result for watch [{}]. missing required [{}] field", TYPE, watchId, Field.EXECUTED_REQUEST.getPreferredName());
}
return new SearchTransform.Result(new Payload.XContent(parser));
return new SearchTransform.Result(executedRequest, payload);
}
public SearchRequest executedRequest() {
return executedRequest;
}
}
@ -112,4 +141,8 @@ public class SearchTransform implements Transform {
return new SearchTransform(request);
}
}
public interface Field extends Transform.Field {
ParseField EXECUTED_REQUEST = new ParseField("executed_request");
}
}

View File

@ -5,7 +5,6 @@
*/
package org.elasticsearch.watcher.input.search;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.indexedscripts.put.PutIndexedScriptRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchType;
@ -78,24 +77,6 @@ public class SearchInputTests extends ElasticsearchIntegrationTest {
.put("path.conf", this.getResource("config").getPath()).build();
}
private IndexResponse indexTestDoc() {
createIndex("test-search-index");
ensureGreen("test-search-index");
IndexResponse response = client().index(
client().prepareIndex()
.setId("test")
.setIndex("test-search-index")
.setType("test-search-type")
.setSource("foo","bar")
.setTimestamp(new DateTime(40000, UTC).toString()).request()).actionGet();
assertThat(response.isCreated(), is(true));
refresh();
return response;
}
@Test
public void testExecute() throws Exception {
SearchSourceBuilder searchSourceBuilder = searchSource().query(
@ -314,7 +295,6 @@ public class SearchInputTests extends ElasticsearchIntegrationTest {
}
private SearchInput.Result executeSearchInput(SearchRequest request) throws IOException {
createIndex("test-search-index");
ensureGreen("test-search-index");
SearchInput.Builder siBuilder = SearchInput.builder(request);

View File

@ -1,208 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.watcher.transform;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.client.Requests;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.watcher.execution.WatchExecutionContext;
import org.elasticsearch.watcher.support.init.proxy.ClientProxy;
import org.elasticsearch.watcher.test.AbstractWatcherIntegrationTests;
import org.elasticsearch.watcher.transform.search.ExecutableSearchTransform;
import org.elasticsearch.watcher.transform.search.SearchTransform;
import org.elasticsearch.watcher.transform.search.SearchTransformFactory;
import org.elasticsearch.watcher.trigger.schedule.ScheduleTriggerEvent;
import org.elasticsearch.watcher.watch.Payload;
import org.junit.Test;
import java.util.HashMap;
import java.util.Map;
import static org.elasticsearch.common.joda.time.DateTimeZone.UTC;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.index.query.FilterBuilders.*;
import static org.elasticsearch.index.query.QueryBuilders.filteredQuery;
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
import static org.elasticsearch.search.builder.SearchSourceBuilder.searchSource;
import static org.elasticsearch.watcher.support.WatcherDateUtils.parseDate;
import static org.elasticsearch.watcher.test.WatcherTestUtils.*;
import static org.hamcrest.Matchers.*;
/**
*
*/
public class SearchTransformTests extends AbstractWatcherIntegrationTests {
@Test
public void testApply() throws Exception {
index("idx", "type", "1");
ensureGreen("idx");
refresh();
SearchRequest request = Requests.searchRequest("idx").source(jsonBuilder().startObject()
.startObject("query")
.startObject("match_all").endObject()
.endObject()
.endObject());
ExecutableSearchTransform transform = new ExecutableSearchTransform(new SearchTransform(request), logger, scriptService(), ClientProxy.of(client()));
WatchExecutionContext ctx = mockExecutionContext("_name", EMPTY_PAYLOAD);
Transform.Result result = transform.execute(ctx, EMPTY_PAYLOAD);
assertThat(result, notNullValue());
assertThat(result.type(), is(SearchTransform.TYPE));
SearchResponse response = client().search(request).get();
Payload expectedPayload = new Payload.XContent(response);
// we need to remove the "took" field from teh response as this is the only field
// that most likely be different between the two... we don't really care about this
// field, we just want to make sure that the important parts of the response are the same
Map<String, Object> resultData = result.payload().data();
resultData.remove("took");
Map<String, Object> expectedData = expectedPayload.data();
expectedData.remove("took");
assertThat(resultData, equalTo(expectedData));
}
@Test
public void testApply_MustacheTemplate() throws Exception {
// The rational behind this test:
//
// - we index 4 documents each one associated with a unique value and each is associated with a day
// - we build a search transform such that with a filter that
// - the date must be after [scheduled_time] variable
// - the date must be before [execution_time] variable
// - the value must match [payload.value] variable
// - the variable are set as such:
// - scheduled_time = youngest document's date
// - fired_time = oldest document's date
// - payload.value = val_3
// - when executed, the variables will be replaced with the scheduled_time, fired_time and the payload.value.
// - we set all these variables accordingly (the search transform is responsible to populate them)
// - when replaced correctly, the search should return document 3.
//
// we then do a search for document 3, and compare the response to the payload returned by the transform
index("idx", "type", "1", doc("2015-01-01T00:00:00", "val_1"));
index("idx", "type", "2", doc("2015-01-02T00:00:00", "val_2"));
index("idx", "type", "3", doc("2015-01-03T00:00:00", "val_3"));
index("idx", "type", "4", doc("2015-01-04T00:00:00", "val_4"));
ensureGreen("idx");
refresh();
SearchRequest request = Requests.searchRequest("idx").source(searchSource().query(filteredQuery(matchAllQuery(), boolFilter()
.must(rangeFilter("date").gt("{{ctx.trigger.scheduled_time}}"))
.must(rangeFilter("date").lt("{{ctx.execution_time}}"))
.must(termFilter("value", "{{ctx.payload.value}}")))));
ExecutableSearchTransform transform = new ExecutableSearchTransform(new SearchTransform(request), logger, scriptService(), ClientProxy.of(client()));
ScheduleTriggerEvent event = new ScheduleTriggerEvent("_name", parseDate("2015-01-04T00:00:00", UTC), parseDate("2015-01-01T00:00:00", UTC));
WatchExecutionContext ctx = mockExecutionContext("_name", parseDate("2015-01-04T00:00:00", UTC), event, EMPTY_PAYLOAD);
Payload payload = simplePayload("value", "val_3");
Transform.Result result = transform.execute(ctx, payload);
assertThat(result, notNullValue());
assertThat(result.type(), is(SearchTransform.TYPE));
SearchResponse response = client().prepareSearch("idx").setQuery(
filteredQuery(matchAllQuery(), termFilter("value", "val_3")))
.get();
Payload expectedPayload = new Payload.XContent(response);
// we need to remove the "took" field from teh response as this is the only field
// that most likely be different between the two... we don't really care about this
// field, we just want to make sure that the important parts of the response are the same
Map<String, Object> resultData = result.payload().data();
resultData.remove("took");
Map<String, Object> expectedData = expectedPayload.data();
expectedData.remove("took");
assertThat(resultData, equalTo(expectedData));
}
@Test
public void testParser() throws Exception {
String[] indices = rarely() ? null : randomBoolean() ? new String[] { "idx" } : new String[] { "idx1", "idx2" };
SearchType searchType = randomBoolean() ? null : randomFrom(SearchType.values());
String templateName = randomBoolean() ? null : "template1";
ScriptService.ScriptType templateType = templateName != null && randomBoolean() ? randomFrom(ScriptService.ScriptType.values()) : null;
XContentBuilder builder = jsonBuilder().startObject();
if (indices != null) {
builder.array("indices", indices);
}
if (searchType != null) {
builder.field("search_type", searchType.name());
}
if (templateName != null) {
builder.startObject("template")
.field("name", templateName);
if (templateType != null) {
builder.field("type", templateType);
}
builder.endObject();
}
XContentBuilder sourceBuilder = jsonBuilder().startObject()
.startObject("query")
.startObject("match_all")
.endObject()
.endObject()
.endObject();
BytesReference source = sourceBuilder.bytes();
builder.startObject("body")
.startObject("query")
.startObject("match_all")
.endObject()
.endObject()
.endObject();
builder.endObject();
XContentParser parser = JsonXContent.jsonXContent.createParser(builder.bytes());
parser.nextToken();
ExecutableSearchTransform executable = new SearchTransformFactory(ImmutableSettings.EMPTY, scriptService(), ClientProxy.of(client())).parseExecutable("_id", parser);
assertThat(executable, notNullValue());
assertThat(executable.type(), is(SearchTransform.TYPE));
assertThat(executable.transform().getRequest(), notNullValue());
if (indices != null) {
assertThat(executable.transform().getRequest().indices(), arrayContainingInAnyOrder(indices));
}
if (searchType != null) {
assertThat(executable.transform().getRequest().searchType(), is(searchType));
}
if (templateName != null) {
assertThat(executable.transform().getRequest().templateName(), equalTo(templateName));
}
if (templateType != null) {
assertThat(executable.transform().getRequest().templateType(), equalTo(templateType));
}
assertThat(executable.transform().getRequest().source().toBytes(), equalTo(source.toBytes()));
}
private static Map<String, Object> doc(String date, String value) {
Map<String, Object> doc = new HashMap<>();
doc.put("date", parseDate(date, UTC));
doc.put("value", value);
return doc;
}
}

View File

@ -3,7 +3,7 @@
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.watcher.transform;
package org.elasticsearch.watcher.transform.chain;
import org.elasticsearch.common.collect.ImmutableList;
import org.elasticsearch.common.collect.ImmutableMap;
@ -14,9 +14,10 @@ import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.test.ElasticsearchTestCase;
import org.elasticsearch.watcher.execution.WatchExecutionContext;
import org.elasticsearch.watcher.transform.chain.ChainTransform;
import org.elasticsearch.watcher.transform.chain.ChainTransformFactory;
import org.elasticsearch.watcher.transform.chain.ExecutableChainTransform;
import org.elasticsearch.watcher.transform.ExecutableTransform;
import org.elasticsearch.watcher.transform.Transform;
import org.elasticsearch.watcher.transform.TransformFactory;
import org.elasticsearch.watcher.transform.TransformRegistry;
import org.elasticsearch.watcher.watch.Payload;
import org.junit.Test;
@ -80,9 +81,9 @@ public class ChainTransformTests extends ElasticsearchTestCase {
parser.nextToken();
ExecutableChainTransform executable = transformParser.parseExecutable("_id", parser);
assertThat(executable, notNullValue());
assertThat(executable.transform.getTransforms(), notNullValue());
assertThat(executable.transform.getTransforms(), hasSize(3));
for (int i = 0; i < executable.transform.getTransforms().size(); i++) {
assertThat(executable.transform().getTransforms(), notNullValue());
assertThat(executable.transform().getTransforms(), hasSize(3));
for (int i = 0; i < executable.transform().getTransforms().size(); i++) {
assertThat(executable.executableTransforms().get(i), instanceOf(NamedExecutableTransform.class));
assertThat(((NamedExecutableTransform) executable.executableTransforms().get(i)).transform().name, is("name" + (i + 1)));
}

View File

@ -3,7 +3,7 @@
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.watcher.transform;
package org.elasticsearch.watcher.transform.script;
import com.carrotsearch.ant.tasks.junit4.dependencies.com.google.common.collect.ImmutableList;
import com.carrotsearch.ant.tasks.junit4.dependencies.com.google.common.collect.ImmutableSet;
@ -23,6 +23,7 @@ import org.elasticsearch.watcher.execution.WatchExecutionContext;
import org.elasticsearch.watcher.support.Script;
import org.elasticsearch.watcher.support.Variables;
import org.elasticsearch.watcher.support.init.proxy.ScriptServiceProxy;
import org.elasticsearch.watcher.transform.Transform;
import org.elasticsearch.watcher.transform.script.ExecutableScriptTransform;
import org.elasticsearch.watcher.transform.script.ScriptTransform;
import org.elasticsearch.watcher.transform.script.ScriptTransformFactory;

View File

@ -0,0 +1,400 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.watcher.transform.search;
import org.elasticsearch.action.indexedscripts.put.PutIndexedScriptRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.client.Requests;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.joda.time.DateTime;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.watcher.actions.ActionWrapper;
import org.elasticsearch.watcher.actions.ExecutableActions;
import org.elasticsearch.watcher.condition.always.ExecutableAlwaysCondition;
import org.elasticsearch.watcher.execution.TriggeredExecutionContext;
import org.elasticsearch.watcher.execution.WatchExecutionContext;
import org.elasticsearch.watcher.input.simple.ExecutableSimpleInput;
import org.elasticsearch.watcher.input.simple.SimpleInput;
import org.elasticsearch.watcher.license.LicenseService;
import org.elasticsearch.watcher.support.WatcherUtils;
import org.elasticsearch.watcher.support.clock.ClockMock;
import org.elasticsearch.watcher.support.init.proxy.ClientProxy;
import org.elasticsearch.watcher.test.AbstractWatcherIntegrationTests;
import org.elasticsearch.watcher.transform.Transform;
import org.elasticsearch.watcher.trigger.schedule.IntervalSchedule;
import org.elasticsearch.watcher.trigger.schedule.ScheduleTrigger;
import org.elasticsearch.watcher.trigger.schedule.ScheduleTriggerEvent;
import org.elasticsearch.watcher.watch.Payload;
import org.elasticsearch.watcher.watch.Watch;
import org.junit.Test;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.elasticsearch.common.joda.time.DateTimeZone.UTC;
import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.index.query.FilterBuilders.*;
import static org.elasticsearch.index.query.QueryBuilders.*;
import static org.elasticsearch.search.builder.SearchSourceBuilder.searchSource;
import static org.elasticsearch.watcher.support.WatcherDateUtils.parseDate;
import static org.elasticsearch.watcher.test.WatcherTestUtils.*;
import static org.hamcrest.Matchers.*;
import static org.mockito.Mockito.mock;
/**
*
*/
public class SearchTransformTests extends AbstractWatcherIntegrationTests {
@Override
public Settings nodeSettings(int nodeOrdinal) {
//Set path so ScriptService will pick up the test scripts
return settingsBuilder().put(super.nodeSettings(nodeOrdinal))
.put("path.conf", this.getResource("config").getPath()).build();
}
@Test
public void testApply() throws Exception {
index("idx", "type", "1");
ensureGreen("idx");
refresh();
SearchRequest request = Requests.searchRequest("idx").source(jsonBuilder().startObject()
.startObject("query")
.startObject("match_all").endObject()
.endObject()
.endObject());
ExecutableSearchTransform transform = new ExecutableSearchTransform(new SearchTransform(request), logger, scriptService(), ClientProxy.of(client()));
WatchExecutionContext ctx = mockExecutionContext("_name", EMPTY_PAYLOAD);
Transform.Result result = transform.execute(ctx, EMPTY_PAYLOAD);
assertThat(result, notNullValue());
assertThat(result.type(), is(SearchTransform.TYPE));
SearchResponse response = client().search(request).get();
Payload expectedPayload = new Payload.XContent(response);
// we need to remove the "took" field from teh response as this is the only field
// that most likely be different between the two... we don't really care about this
// field, we just want to make sure that the important parts of the response are the same
Map<String, Object> resultData = result.payload().data();
resultData.remove("took");
Map<String, Object> expectedData = expectedPayload.data();
expectedData.remove("took");
assertThat(resultData, equalTo(expectedData));
}
@Test
public void testApply_MustacheTemplate() throws Exception {
// The rational behind this test:
//
// - we index 4 documents each one associated with a unique value and each is associated with a day
// - we build a search transform such that with a filter that
// - the date must be after [scheduled_time] variable
// - the date must be before [execution_time] variable
// - the value must match [payload.value] variable
// - the variable are set as such:
// - scheduled_time = youngest document's date
// - fired_time = oldest document's date
// - payload.value = val_3
// - when executed, the variables will be replaced with the scheduled_time, fired_time and the payload.value.
// - we set all these variables accordingly (the search transform is responsible to populate them)
// - when replaced correctly, the search should return document 3.
//
// we then do a search for document 3, and compare the response to the payload returned by the transform
index("idx", "type", "1", doc("2015-01-01T00:00:00", "val_1"));
index("idx", "type", "2", doc("2015-01-02T00:00:00", "val_2"));
index("idx", "type", "3", doc("2015-01-03T00:00:00", "val_3"));
index("idx", "type", "4", doc("2015-01-04T00:00:00", "val_4"));
ensureGreen("idx");
refresh();
SearchRequest request = Requests.searchRequest("idx").source(searchSource().query(filteredQuery(matchAllQuery(), boolFilter()
.must(rangeFilter("date").gt("{{ctx.trigger.scheduled_time}}"))
.must(rangeFilter("date").lt("{{ctx.execution_time}}"))
.must(termFilter("value", "{{ctx.payload.value}}")))));
ExecutableSearchTransform transform = new ExecutableSearchTransform(new SearchTransform(request), logger, scriptService(), ClientProxy.of(client()));
ScheduleTriggerEvent event = new ScheduleTriggerEvent("_name", parseDate("2015-01-04T00:00:00", UTC), parseDate("2015-01-01T00:00:00", UTC));
WatchExecutionContext ctx = mockExecutionContext("_name", parseDate("2015-01-04T00:00:00", UTC), event, EMPTY_PAYLOAD);
Payload payload = simplePayload("value", "val_3");
Transform.Result result = transform.execute(ctx, payload);
assertThat(result, notNullValue());
assertThat(result.type(), is(SearchTransform.TYPE));
SearchResponse response = client().prepareSearch("idx").setQuery(
filteredQuery(matchAllQuery(), termFilter("value", "val_3")))
.get();
Payload expectedPayload = new Payload.XContent(response);
// we need to remove the "took" field from teh response as this is the only field
// that most likely be different between the two... we don't really care about this
// field, we just want to make sure that the important parts of the response are the same
Map<String, Object> resultData = result.payload().data();
resultData.remove("took");
Map<String, Object> expectedData = expectedPayload.data();
expectedData.remove("took");
assertThat(resultData, equalTo(expectedData));
}
@Test
public void testParser() throws Exception {
String[] indices = rarely() ? null : randomBoolean() ? new String[] { "idx" } : new String[] { "idx1", "idx2" };
SearchType searchType = randomBoolean() ? null : randomFrom(SearchType.values());
String templateName = randomBoolean() ? null : "template1";
ScriptService.ScriptType templateType = templateName != null && randomBoolean() ? randomFrom(ScriptService.ScriptType.values()) : null;
XContentBuilder builder = jsonBuilder().startObject();
if (indices != null) {
builder.array("indices", indices);
}
if (searchType != null) {
builder.field("search_type", searchType.name());
}
if (templateName != null) {
builder.startObject("template")
.field("name", templateName);
if (templateType != null) {
builder.field("type", templateType);
}
builder.endObject();
}
XContentBuilder sourceBuilder = jsonBuilder().startObject()
.startObject("query")
.startObject("match_all")
.endObject()
.endObject()
.endObject();
BytesReference source = sourceBuilder.bytes();
builder.startObject("body")
.startObject("query")
.startObject("match_all")
.endObject()
.endObject()
.endObject();
builder.endObject();
XContentParser parser = JsonXContent.jsonXContent.createParser(builder.bytes());
parser.nextToken();
ExecutableSearchTransform executable = new SearchTransformFactory(ImmutableSettings.EMPTY, scriptService(), ClientProxy.of(client())).parseExecutable("_id", parser);
assertThat(executable, notNullValue());
assertThat(executable.type(), is(SearchTransform.TYPE));
assertThat(executable.transform().getRequest(), notNullValue());
if (indices != null) {
assertThat(executable.transform().getRequest().indices(), arrayContainingInAnyOrder(indices));
}
if (searchType != null) {
assertThat(executable.transform().getRequest().searchType(), is(searchType));
}
if (templateName != null) {
assertThat(executable.transform().getRequest().templateName(), equalTo(templateName));
}
if (templateType != null) {
assertThat(executable.transform().getRequest().templateType(), equalTo(templateType));
}
assertThat(executable.transform().getRequest().source().toBytes(), equalTo(source.toBytes()));
}
@Test
public void testSearch_InlineTemplate() throws Exception {
final String templateQuery = "{\"query\":{\"filtered\":{\"query\":{\"match\":{\"event_type\":{\"query\":\"a\"," +
"\"type\":\"boolean\"}}},\"filter\":{\"range\":{\"_timestamp\":" +
"{\"from\":\"{{ctx.trigger.scheduled_time}}||-{{seconds_param}}\",\"to\":\"{{ctx.trigger.scheduled_time}}\"," +
"\"include_lower\":true,\"include_upper\":true}}}}}}";
final String expectedQuery = "{\"query\":{\"filtered\":{\"query\":{\"match\":{\"event_type\":" +
"{\"query\":\"a\",\"type\":\"boolean\"}}},\"filter\":{\"range\":{\"_timestamp\":{\"from\":\"1970-01-01T00:01:00.000Z||-30s\"," +
"\"to\":\"1970-01-01T00:01:00.000Z\",\"include_lower\":true,\"include_upper\":true}}}}}}";
ScriptService.ScriptType scriptType = ScriptService.ScriptType.INLINE;
Map<String, Object> params = new HashMap<>();
params.put("seconds_param", "30s");
SearchRequest request = client()
.prepareSearch()
.setSearchType(ExecutableSearchTransform.DEFAULT_SEARCH_TYPE)
.setIndices("test-search-index")
.setTemplateSource(templateQuery)
.setTemplateParams(params)
.setTemplateType(scriptType)
.request();
SearchTransform.Result executedResult = executeSearchTransform(request);
assertThat(executedResult.executedRequest().source().toUtf8(), equalTo(expectedQuery));
}
@Test
public void testSearch_IndexedTemplate() throws Exception {
final String templateQuery = "{\"query\":{\"filtered\":{\"query\":{\"match\":{\"event_type\":{\"query\":\"a\"," +
"\"type\":\"boolean\"}}},\"filter\":{\"range\":{\"_timestamp\":" +
"{\"from\":\"{{ctx.trigger.scheduled_time}}||-{{seconds_param}}\",\"to\":\"{{ctx.trigger.scheduled_time}}\"," +
"\"include_lower\":true,\"include_upper\":true}}}}}}";
PutIndexedScriptRequest indexedScriptRequest = client().preparePutIndexedScript("mustache","test-script", templateQuery).request();
assertThat(client().putIndexedScript(indexedScriptRequest).actionGet().isCreated(), is(true));
ScriptService.ScriptType scriptType = ScriptService.ScriptType.INDEXED;
Map<String, Object> params = new HashMap<>();
params.put("seconds_param", "30s");
SearchRequest request = client()
.prepareSearch()
.setSearchType(ExecutableSearchTransform.DEFAULT_SEARCH_TYPE)
.setIndices("test-search-index")
.setTemplateName("test-script")
.setTemplateParams(params)
.setTemplateType(scriptType)
.request();
SearchTransform.Result result = executeSearchTransform(request);
assertNotNull(result.executedRequest());
}
@Test
public void testSearch_OndiskTemplate() throws Exception {
ScriptService.ScriptType scriptType = ScriptService.ScriptType.FILE;
Map<String, Object> params = new HashMap<>();
params.put("seconds_param", "30s");
SearchRequest request = client()
.prepareSearch()
.setSearchType(ExecutableSearchTransform.DEFAULT_SEARCH_TYPE)
.setIndices("test-search-index")
.setTemplateName("test_disk_template")
.setTemplateParams(params)
.setTemplateType(scriptType)
.request();
SearchTransform.Result result = executeSearchTransform(request);
assertNotNull(result.executedRequest());
}
@Test
public void testDifferentSearchType() throws Exception {
SearchSourceBuilder searchSourceBuilder = searchSource().query(
filteredQuery(matchQuery("event_type", "a"), rangeFilter("_timestamp").from("{{ctx.trigger.scheduled_time}}||-30s").to("{{ctx.trigger.triggered_time}}"))
);
SearchType searchType = randomFrom(SearchType.values());
SearchRequest request = client()
.prepareSearch()
.setSearchType(searchType)
.setIndices("test-search-index")
.request()
.source(searchSourceBuilder);
SearchTransform.Result result = executeSearchTransform(request);
assertThat((Integer) XContentMapValues.extractValue("hits.total", result.payload().data()), equalTo(0));
assertNotNull(result.executedRequest());
assertEquals(result.executedRequest().searchType(), searchType);
assertArrayEquals(result.executedRequest().indices(), request.indices());
assertEquals(result.executedRequest().indicesOptions(), request.indicesOptions());
}
@Test
public void testResultParser() throws Exception {
Map<String, Object> data = new HashMap<>();
data.put("foo", "bar");
data.put("baz", new ArrayList<String>() );
SearchSourceBuilder searchSourceBuilder = searchSource().query(
filteredQuery(matchQuery("event_type", "a"), rangeFilter("_timestamp").from("{{ctx.triggered.scheduled_time}}||-30s").to("{{ctx.triggered.triggered_time}}")));
SearchRequest request = client()
.prepareSearch()
.setSearchType(ExecutableSearchTransform.DEFAULT_SEARCH_TYPE)
.request()
.source(searchSourceBuilder);
XContentBuilder jsonBuilder = jsonBuilder();
jsonBuilder.startObject();
jsonBuilder.field(SearchTransform.Field.PAYLOAD.getPreferredName(), data);
jsonBuilder.field(SearchTransform.Field.EXECUTED_REQUEST.getPreferredName());
WatcherUtils.writeSearchRequest(request, jsonBuilder, ToXContent.EMPTY_PARAMS);
jsonBuilder.endObject();
SearchTransformFactory factory = new SearchTransformFactory(settingsBuilder().build(),
scriptService(),
ClientProxy.of(client()));
XContentParser parser = JsonXContent.jsonXContent.createParser(jsonBuilder.bytes());
parser.nextToken();
SearchTransform.Result result = factory.parseResult("_id", parser);
assertEquals(SearchTransform.TYPE, result.type());
assertEquals(result.payload().data().get("foo"), "bar");
List baz = (List)result.payload().data().get("baz");
assertTrue(baz.isEmpty());
assertNotNull(result.executedRequest());
}
private SearchTransform.Result executeSearchTransform(SearchRequest request) throws IOException {
createIndex("test-search-index");
ensureGreen("test-search-index");
SearchTransform searchTransform = new SearchTransform(request);
ExecutableSearchTransform executableSearchTransform = new ExecutableSearchTransform(searchTransform, logger, scriptService(), ClientProxy.of(client()));
WatchExecutionContext ctx = new TriggeredExecutionContext(
new Watch("test-watch",
new ClockMock(),
mock(LicenseService.class),
new ScheduleTrigger(new IntervalSchedule(new IntervalSchedule.Interval(1, IntervalSchedule.Interval.Unit.MINUTES))),
new ExecutableSimpleInput(new SimpleInput(new Payload.Simple()), logger),
new ExecutableAlwaysCondition(logger),
null,
new ExecutableActions(new ArrayList<ActionWrapper>()),
null,
null,
new Watch.Status()),
new DateTime(60000, UTC),
new ScheduleTriggerEvent("test-watch", new DateTime(60000, UTC), new DateTime(60000, UTC)));
return executableSearchTransform.execute(ctx, Payload.Simple.EMPTY);
}
private static Map<String, Object> doc(String date, String value) {
Map<String, Object> doc = new HashMap<>();
doc.put("date", parseDate(date, UTC));
doc.put("value", value);
return doc;
}
}

View File

@ -0,0 +1,24 @@
{
"query": {
"filtered": {
"query": {
"match": {
"event_type": {
"query": "a",
"type": "boolean"
}
}
},
"filter": {
"range": {
"_timestamp": {
"from": "{{ctx.trigger.scheduled_time}}||-{{seconds_param}}",
"to": "{{ctx.trigger.scheduled_time}}",
"include_lower": true,
"include_upper": true
}
}
}
}
}
}