Changed search template format for search input and search transform to the new Watcher template format.

Also the WatcherUtils now always use the search template support from core and never uses ScriptService:
* A template is re-parsed and extended with the watch context variables.
* A normal request body is converted into a search template and watch context variables are used as template params.

Original commit: elastic/x-pack-elasticsearch@16bacaf094
This commit is contained in:
Martijn van Groningen 2015-05-09 01:10:03 +02:00
parent 8ddb0a65c4
commit 8fad9937f7
12 changed files with 189 additions and 216 deletions

View File

@ -8,6 +8,7 @@ package org.elasticsearch.watcher.input.search;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentHelper;
@ -18,7 +19,6 @@ import org.elasticsearch.watcher.input.ExecutableInput;
import org.elasticsearch.watcher.support.WatcherUtils;
import org.elasticsearch.watcher.support.XContentFilterKeysUtils;
import org.elasticsearch.watcher.support.init.proxy.ClientProxy;
import org.elasticsearch.watcher.support.init.proxy.ScriptServiceProxy;
import org.elasticsearch.watcher.watch.Payload;
import java.io.IOException;
@ -33,21 +33,20 @@ public class ExecutableSearchInput extends ExecutableInput<SearchInput, SearchIn
public static final SearchType DEFAULT_SEARCH_TYPE = SearchType.QUERY_THEN_FETCH;
private final ScriptServiceProxy scriptService;
private final ClientProxy client;
public ExecutableSearchInput(SearchInput input, ESLogger logger, ScriptServiceProxy scriptService, ClientProxy client) {
public ExecutableSearchInput(SearchInput input, ESLogger logger, ClientProxy client) {
super(input, logger);
this.scriptService = scriptService;
this.client = client;
}
@Override
public SearchInput.Result execute(WatchExecutionContext ctx) throws IOException {
SearchRequest request = WatcherUtils.createSearchRequestFromPrototype(input.getSearchRequest(), ctx, scriptService, null);
SearchRequest request = WatcherUtils.createSearchRequestFromPrototype(input.getSearchRequest(), ctx, null);
if (logger.isTraceEnabled()) {
logger.trace("[{}] running query for [{}] [{}]", ctx.id(), ctx.watch().id(), XContentHelper.convertToJson(request.source(), false, true));
BytesReference source = request.source() != null ? request.source() : request.templateSource();
logger.trace("[{}] running query for [{}] [{}]", ctx.id(), ctx.watch().id(), XContentHelper.convertToJson(source, false, true));
}
// actionGet deals properly with InterruptedException

View File

@ -12,7 +12,6 @@ import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.watcher.input.InputFactory;
import org.elasticsearch.watcher.input.simple.ExecutableSimpleInput;
import org.elasticsearch.watcher.support.init.proxy.ClientProxy;
import org.elasticsearch.watcher.support.init.proxy.ScriptServiceProxy;
import java.io.IOException;
@ -21,13 +20,11 @@ import java.io.IOException;
*/
public class SearchInputFactory extends InputFactory<SearchInput, SearchInput.Result, ExecutableSearchInput> {
private final ScriptServiceProxy scriptService;
private final ClientProxy client;
@Inject
public SearchInputFactory(Settings settings, ScriptServiceProxy scriptService, ClientProxy client) {
public SearchInputFactory(Settings settings, ClientProxy client) {
super(Loggers.getLogger(ExecutableSimpleInput.class, settings));
this.scriptService = scriptService;
this.client = client;
}
@ -48,6 +45,6 @@ public class SearchInputFactory extends InputFactory<SearchInput, SearchInput.Re
@Override
public ExecutableSearchInput createExecutable(SearchInput input) {
return new ExecutableSearchInput(input, inputLogger, scriptService, client);
return new ExecutableSearchInput(input, inputLogger, client);
}
}

View File

@ -10,18 +10,14 @@ import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.joda.time.DateTime;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.script.ExecutableScript;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.common.xcontent.*;
import org.elasticsearch.watcher.WatcherException;
import org.elasticsearch.watcher.execution.WatchExecutionContext;
import org.elasticsearch.watcher.support.init.proxy.ScriptServiceProxy;
import org.elasticsearch.watcher.support.template.Template;
import org.elasticsearch.watcher.watch.Payload;
import java.io.IOException;
@ -44,9 +40,6 @@ public final class WatcherUtils {
static final ParseField IGNORE_UNAVAILABLE_FIELD = new ParseField("ignore_unavailable");
static final ParseField ALLOW_NO_INDICES_FIELD = new ParseField("allow_no_indices");
static final ParseField TEMPLATE_FIELD = new ParseField("template");
static final ParseField TEMPLATE_NAME_FIELD = new ParseField("name");
static final ParseField TEMPLATE_TYPE_FIELD = new ParseField("type");
static final ParseField TEMPLATE_PARAMS_FIELD = new ParseField("params");
public final static IndicesOptions DEFAULT_INDICES_OPTIONS = IndicesOptions.lenientExpandOpen();
@ -62,27 +55,64 @@ public final class WatcherUtils {
}
}
public static SearchRequest createSearchRequestFromPrototype(SearchRequest requestPrototype, WatchExecutionContext ctx, ScriptServiceProxy scriptService, Payload payload) throws IOException {
public static SearchRequest createSearchRequestFromPrototype(SearchRequest requestPrototype, WatchExecutionContext ctx, Payload payload) throws IOException {
SearchRequest request = new SearchRequest(requestPrototype)
.indicesOptions(requestPrototype.indicesOptions())
.searchType(requestPrototype.searchType())
.indices(requestPrototype.indices());
Map<String, Object> templateParams = Variables.createCtxModel(ctx, payload);
templateParams.putAll(requestPrototype.templateParams());
// TODO: Revise this search template conversion code once search templates in core have been refactored once ES 2.0 is released.
// Due the inconsistency with templates in ES 1.x, we maintain our own template format.
// This template format we use now, will become the template structure in ES 2.0
Map<String, Object> watcherContextParams = Variables.createCtxModel(ctx, payload);
if (Strings.hasLength(requestPrototype.source())) {
String requestSource = XContentHelper.convertToJson(requestPrototype.source(), false);
ExecutableScript script = scriptService.executable("mustache", requestSource, ScriptService.ScriptType.INLINE, templateParams);
request.source((BytesReference) script.unwrap(script.run()), false);
// Here we convert a watch search request body into an inline search template,
// this way if any Watcher related context variables are used, they will get resolved,
// by ES search template support
XContentBuilder builder = jsonBuilder();
builder.startObject();
XContentHelper.writeRawField("template", requestPrototype.source(), builder, ToXContent.EMPTY_PARAMS);
builder.field("params", watcherContextParams);
builder.endObject();
request.templateSource(builder.bytes(), false);
} else if (Strings.hasLength(requestPrototype.templateSource())) {
String requestSource = XContentHelper.convertToJson(requestPrototype.templateSource(), false);
ExecutableScript script = scriptService.executable("mustache", requestSource, ScriptService.ScriptType.INLINE, templateParams);
request.source((BytesReference) script.unwrap(script.run()), false);
// Here we convert watcher template into a ES core templates. Due to the different format we use, we
// convert to the template format used in ES core
BytesReference templateSource = requestPrototype.templateSource();
try (XContentParser sourceParser = XContentFactory.xContent(templateSource).createParser(templateSource)) {
sourceParser.nextToken();
Template template = Template.parse(sourceParser);
// Convert to the ES template format:
XContentBuilder builder = jsonBuilder();
builder.startObject();
switch (template.getType()) {
case INDEXED:
builder.startObject("template");
builder.field("id", template.getTemplate());
builder.endObject();
break;
case FILE:
builder.startObject("template");
builder.field("file", template.getTemplate());
builder.endObject();
break;
case INLINE:
XContentHelper.writeRawField("template", new BytesArray(template.getTemplate()), builder, ToXContent.EMPTY_PARAMS);
break;
}
Map<String, Object> params = new HashMap<>();
params.putAll(watcherContextParams);
params.putAll(template.getParams());
builder.field("params", params);
builder.endObject();
request.templateSource(builder.bytes(), false);
}
} else if (requestPrototype.templateName() != null) {
request.templateParams(templateParams);
request.templateName(requestPrototype.templateName());
request.templateType(requestPrototype.templateType());
// In Watcher templates on all places can be defined in one format
// Can only be set via the Java api
throw new WatcherException("SearchRequest#templateName() isn't supported, templates should be defined in the request body");
}
// falling back to an empty body
return request;
@ -93,8 +123,11 @@ public final class WatcherUtils {
* Reads a new search request instance for the specified parser.
*/
public static SearchRequest readSearchRequest(XContentParser parser, SearchType searchType) throws IOException {
SearchRequest searchRequest = new SearchRequest();
BytesReference searchBody = null;
BytesReference templateBody = null;
IndicesOptions indicesOptions = DEFAULT_INDICES_OPTIONS;
SearchRequest searchRequest = new SearchRequest();
XContentParser.Token token;
String currentFieldName = null;
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
@ -125,11 +158,10 @@ public final class WatcherUtils {
throw new SearchRequestParseException("could not read search request. unexpected array field [" + currentFieldName + "]");
}
} else if (token == XContentParser.Token.START_OBJECT) {
XContentBuilder builder;
if (BODY_FIELD.match(currentFieldName)) {
builder = XContentBuilder.builder(parser.contentType().xContent());
XContentBuilder builder = XContentBuilder.builder(parser.contentType().xContent());
builder.copyCurrentStructure(parser);
searchRequest.source(builder);
searchBody = builder.bytes();
} else if (INDICES_OPTIONS_FIELD.match(currentFieldName)) {
boolean expandOpen = DEFAULT_INDICES_OPTIONS.expandWildcardsOpen();
boolean expandClosed = DEFAULT_INDICES_OPTIONS.expandWildcardsClosed();
@ -173,29 +205,9 @@ public final class WatcherUtils {
}
indicesOptions = IndicesOptions.fromOptions(ignoreUnavailable, allowNoIndices, expandOpen, expandClosed, DEFAULT_INDICES_OPTIONS);
} else if (TEMPLATE_FIELD.match(currentFieldName)) {
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
currentFieldName = parser.currentName();
} else if (token == XContentParser.Token.VALUE_STRING) {
if (TEMPLATE_NAME_FIELD.match(currentFieldName)) {
searchRequest.templateName(parser.textOrNull());
} else if (TEMPLATE_TYPE_FIELD.match(currentFieldName)) {
try {
searchRequest.templateType(ScriptService.ScriptType.valueOf(parser.text().toUpperCase(Locale.ROOT)));
} catch (IllegalArgumentException iae) {
throw new SearchRequestParseException("could not parse search request. unknown template type [" + parser.text() + "]");
}
} else {
throw new SearchRequestParseException("could not read search request. unexpected template field [" + currentFieldName + "]");
}
} else if (token == XContentParser.Token.START_OBJECT) {
if (TEMPLATE_PARAMS_FIELD.getPreferredName().equals(currentFieldName)) {
searchRequest.templateParams(flattenModel(parser.map()));
}
} else {
throw new SearchRequestParseException("could not read search request. unexpected template token [" + token + "]");
}
}
XContentBuilder builder = XContentBuilder.builder(parser.contentType().xContent());
builder.copyCurrentStructure(parser);
templateBody = builder.bytes();
} else {
throw new SearchRequestParseException("could not read search request. unexpected object field [" + currentFieldName + "]");
}
@ -224,6 +236,12 @@ public final class WatcherUtils {
}
searchRequest.searchType(searchType);
searchRequest.indicesOptions(indicesOptions);
if (searchBody != null) {
searchRequest.source(searchBody, false);
}
if (templateBody != null) {
searchRequest.templateSource(templateBody, false);
}
return searchRequest;
}
@ -249,16 +267,8 @@ public final class WatcherUtils {
if (Strings.hasLength(searchRequest.source())) {
XContentHelper.writeRawField(BODY_FIELD.getPreferredName(), searchRequest.source(), builder, params);
}
if (searchRequest.templateName() != null) {
builder.startObject(TEMPLATE_FIELD.getPreferredName())
.field(TEMPLATE_NAME_FIELD.getPreferredName(), searchRequest.templateName());
if (searchRequest.templateType() != null) {
builder.field(TEMPLATE_TYPE_FIELD.getPreferredName(), searchRequest.templateType().name().toLowerCase(Locale.ROOT));
}
if (searchRequest.templateParams() != null && !searchRequest.templateParams().isEmpty()) {
builder.field(TEMPLATE_PARAMS_FIELD.getPreferredName(), searchRequest.templateParams());
}
builder.endObject();
if (Strings.hasLength(searchRequest.templateSource())) {
XContentHelper.writeRawField(TEMPLATE_FIELD.getPreferredName(), searchRequest.templateSource(), builder, params);
}
if (searchRequest.indicesOptions() != DEFAULT_INDICES_OPTIONS) {

View File

@ -8,26 +8,15 @@ package org.elasticsearch.watcher.transform.search;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.script.ExecutableScript;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.watcher.execution.WatchExecutionContext;
import org.elasticsearch.watcher.support.WatcherUtils;
import org.elasticsearch.watcher.support.init.proxy.ClientProxy;
import org.elasticsearch.watcher.support.init.proxy.ScriptServiceProxy;
import org.elasticsearch.watcher.transform.ExecutableTransform;
import org.elasticsearch.watcher.transform.TransformException;
import org.elasticsearch.watcher.watch.Payload;
import java.io.IOException;
import static org.elasticsearch.watcher.support.Variables.createCtxModel;
import static org.elasticsearch.watcher.support.WatcherUtils.flattenModel;
/**
*
*/
@ -35,18 +24,16 @@ public class ExecutableSearchTransform extends ExecutableTransform<SearchTransfo
public static final SearchType DEFAULT_SEARCH_TYPE = SearchType.QUERY_THEN_FETCH;
protected final ScriptServiceProxy scriptService;
protected final ClientProxy client;
public ExecutableSearchTransform(SearchTransform transform, ESLogger logger, ScriptServiceProxy scriptService, ClientProxy client) {
public ExecutableSearchTransform(SearchTransform transform, ESLogger logger, ClientProxy client) {
super(transform, logger);
this.scriptService = scriptService;
this.client = client;
}
@Override
public SearchTransform.Result execute(WatchExecutionContext ctx, Payload payload) throws IOException {
SearchRequest req = WatcherUtils.createSearchRequestFromPrototype(transform.request, ctx, scriptService, payload);
SearchRequest req = WatcherUtils.createSearchRequestFromPrototype(transform.request, ctx, payload);
SearchResponse resp = client.search(req);
return new SearchTransform.Result(req, new Payload.XContent(resp));
}

View File

@ -20,13 +20,11 @@ import java.io.IOException;
*/
public class SearchTransformFactory extends TransformFactory<SearchTransform, SearchTransform.Result, ExecutableSearchTransform> {
protected final ScriptServiceProxy scriptService;
protected final ClientProxy client;
@Inject
public SearchTransformFactory(Settings settings, ScriptServiceProxy scriptService, ClientProxy client) {
public SearchTransformFactory(Settings settings, ClientProxy client) {
super(Loggers.getLogger(ExecutableSearchTransform.class, settings));
this.scriptService = scriptService;
this.client = client;
}
@ -47,6 +45,6 @@ public class SearchTransformFactory extends TransformFactory<SearchTransform, Se
@Override
public ExecutableSearchTransform createExecutable(SearchTransform transform) {
return new ExecutableSearchTransform(transform, transformLogger, scriptService, client);
return new ExecutableSearchTransform(transform, transformLogger, client);
}
}

View File

@ -8,6 +8,7 @@ package org.elasticsearch.watcher.input.search;
import org.elasticsearch.action.indexedscripts.put.PutIndexedScriptRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.joda.time.DateTime;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
@ -31,6 +32,7 @@ import org.elasticsearch.watcher.support.WatcherUtils;
import org.elasticsearch.watcher.support.clock.ClockMock;
import org.elasticsearch.watcher.support.init.proxy.ClientProxy;
import org.elasticsearch.watcher.support.init.proxy.ScriptServiceProxy;
import org.elasticsearch.watcher.support.template.Template;
import org.elasticsearch.watcher.trigger.schedule.IntervalSchedule;
import org.elasticsearch.watcher.trigger.schedule.ScheduleTrigger;
import org.elasticsearch.watcher.trigger.schedule.ScheduleTriggerEvent;
@ -48,14 +50,17 @@ import static org.elasticsearch.index.query.FilterBuilders.rangeFilter;
import static org.elasticsearch.index.query.QueryBuilders.filteredQuery;
import static org.elasticsearch.index.query.QueryBuilders.matchQuery;
import static org.elasticsearch.search.builder.SearchSourceBuilder.searchSource;
import static org.elasticsearch.test.ElasticsearchIntegrationTest.Scope.SUITE;
import static org.elasticsearch.watcher.test.WatcherTestUtils.getRandomSupportedSearchType;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.startsWith;
import static org.mockito.Mockito.mock;
/**
*/
@ElasticsearchIntegrationTest.ClusterScope(scope = SUITE, numClientNodes = 0, transportClientRatio = 0, randomDynamicTemplates = false, numDataNodes = 1)
public class SearchInputTests extends ElasticsearchIntegrationTest {
private final static String TEMPLATE_QUERY = "{\"query\":{\"filtered\":{\"query\":{\"match\":{\"event_type\":{\"query\":\"a\"," +
@ -63,11 +68,6 @@ public class SearchInputTests extends ElasticsearchIntegrationTest {
"{\"from\":\"{{ctx.trigger.scheduled_time}}||-{{seconds_param}}\",\"to\":\"{{ctx.trigger.scheduled_time}}\"," +
"\"include_lower\":true,\"include_upper\":true}}}}}}";
private final static String EXPECTED_TEMPLATE_QUERY = "{\"query\":{\"filtered\":{\"query\":{\"match\":{\"event_type\":" +
"{\"query\":\"a\",\"type\":\"boolean\"}}},\"filter\":{\"range\":{\"_timestamp\":{\"from\":\"1970-01-01T00:01:00.000Z||-30s\"," +
"\"to\":\"1970-01-01T00:01:00.000Z\",\"include_lower\":true,\"include_upper\":true}}}}}}";
@Override
public Settings nodeSettings(int nodeOrdinal) {
//Set path so ScriptService will pick up the test scripts
@ -85,9 +85,7 @@ public class SearchInputTests extends ElasticsearchIntegrationTest {
.request()
.source(searchSourceBuilder);
ExecutableSearchInput searchInput = new ExecutableSearchInput(new SearchInput(request, null), logger,
ScriptServiceProxy.of(internalCluster().getInstance(ScriptService.class)),
ClientProxy.of(client()));
ExecutableSearchInput searchInput = new ExecutableSearchInput(new SearchInput(request, null), logger, ClientProxy.of(client()));
WatchExecutionContext ctx = new TriggeredExecutionContext(
new Watch("test-watch",
new ClockMock(),
@ -113,67 +111,68 @@ public class SearchInputTests extends ElasticsearchIntegrationTest {
@Test
public void testSearch_InlineTemplate() throws Exception {
ScriptService.ScriptType scriptType = ScriptService.ScriptType.INLINE;
final String expectedQuery = "{\"template\":{\"query\":{\"filtered\":{\"query\":{\"match\":{\"event_type\":{\"query\":\"a\"," +
"\"type\":\"boolean\"}}},\"filter\":{\"range\":{\"_timestamp\":" +
"{\"from\":\"{{ctx.trigger.scheduled_time}}||-{{seconds_param}}\",\"to\":\"{{ctx.trigger.scheduled_time}}\"," +
"\"include_lower\":true,\"include_upper\":true}}}}}},\"params\":{\"seconds_param\":\"30s\",\"ctx\":{\"metadata\":null,\"watch_id\":\"test-watch\",\"trigger\":{\"triggered_time\":\"1970-01-01T00:01:00.000Z\",\"scheduled_time\":\"1970-01-01T00:01:00.000Z\"},\"execution_time\":\"1970-01-01T00:01:00.000Z\"}}}";
Map<String, Object> params = new HashMap<>();
params.put("seconds_param", "30s");
BytesReference templateSource = jsonBuilder()
.value(Template.inline(TEMPLATE_QUERY).params(params).build())
.bytes();
SearchRequest request = client()
.prepareSearch()
.setSearchType(ExecutableSearchInput.DEFAULT_SEARCH_TYPE)
.setIndices("test-search-index")
.setTemplateSource(TEMPLATE_QUERY)
.setTemplateParams(params)
.setTemplateType(scriptType)
.setTemplateSource(templateSource)
.request();
SearchInput.Result executedResult = executeSearchInput(request);
assertThat(executedResult.executedRequest().source().toUtf8(), equalTo(EXPECTED_TEMPLATE_QUERY));
assertThat(executedResult.executedRequest().templateSource().toUtf8(), equalTo(expectedQuery));
}
@Test
public void testSearch_IndexedTemplate() throws Exception {
PutIndexedScriptRequest indexedScriptRequest = client().preparePutIndexedScript("mustache","test-script", TEMPLATE_QUERY).request();
PutIndexedScriptRequest indexedScriptRequest = client().preparePutIndexedScript("mustache","test-template", TEMPLATE_QUERY).request();
assertThat(client().putIndexedScript(indexedScriptRequest).actionGet().isCreated(), is(true));
ScriptService.ScriptType scriptType = ScriptService.ScriptType.INDEXED;
Map<String, Object> params = new HashMap<>();
params.put("seconds_param", "30s");
BytesReference templateSource = jsonBuilder()
.value(Template.indexed("test-template").params(params).build())
.bytes();
SearchRequest request = client()
.prepareSearch()
.setSearchType(ExecutableSearchInput.DEFAULT_SEARCH_TYPE)
.setIndices("test-search-index")
.setTemplateName("test-script")
.setTemplateParams(params)
.setTemplateType(scriptType)
.setTemplateSource(templateSource)
.request();
executeSearchInput(request);
//This will fail if templating fails
SearchInput.Result executedResult = executeSearchInput(request);
assertThat(executedResult.executedRequest().templateSource().toUtf8(), startsWith("{\"template\":{\"id\":\"test-template\""));
}
@Test
public void testSearch_OndiskTemplate() throws Exception {
ScriptService.ScriptType scriptType = ScriptService.ScriptType.FILE;
Map<String, Object> params = new HashMap<>();
params.put("seconds_param", "30s");
BytesReference templateSource = jsonBuilder()
.value(Template.file("test_disk_template").params(params).build())
.bytes();
SearchRequest request = client()
.prepareSearch()
.setSearchType(ExecutableSearchInput.DEFAULT_SEARCH_TYPE)
.setIndices("test-search-index")
.setTemplateName("test_disk_template")
.setTemplateParams(params)
.setTemplateType(scriptType)
.setTemplateSource(templateSource)
.request();
executeSearchInput(request).executedRequest();
SearchInput.Result executedResult = executeSearchInput(request);
assertThat(executedResult.executedRequest().templateSource().toUtf8(), startsWith("{\"template\":{\"file\":\"test_disk_template\""));
}
@Test
@ -189,9 +188,7 @@ public class SearchInputTests extends ElasticsearchIntegrationTest {
.request()
.source(searchSourceBuilder);
ExecutableSearchInput searchInput = new ExecutableSearchInput(new SearchInput(request, null), logger,
ScriptServiceProxy.of(internalCluster().getInstance(ScriptService.class)),
ClientProxy.of(client()));
ExecutableSearchInput searchInput = new ExecutableSearchInput(new SearchInput(request, null), logger, ClientProxy.of(client()));
WatchExecutionContext ctx = new TriggeredExecutionContext(
new Watch("test-watch",
new ClockMock(),
@ -227,9 +224,7 @@ public class SearchInputTests extends ElasticsearchIntegrationTest {
XContentParser parser = JsonXContent.jsonXContent.createParser(builder.bytes());
parser.nextToken();
SearchInputFactory factory = new SearchInputFactory(ImmutableSettings.EMPTY,
ScriptServiceProxy.of(internalCluster().getInstance(ScriptService.class)),
ClientProxy.of(client()));
SearchInputFactory factory = new SearchInputFactory(ImmutableSettings.EMPTY, ClientProxy.of(client()));
SearchInput searchInput = factory.parseInput("_id", parser);
assertEquals(SearchInput.TYPE, searchInput.type());
@ -247,9 +242,7 @@ public class SearchInputTests extends ElasticsearchIntegrationTest {
XContentParser parser = JsonXContent.jsonXContent.createParser(builder.bytes());
parser.nextToken();
SearchInputFactory factory = new SearchInputFactory(ImmutableSettings.EMPTY,
ScriptServiceProxy.of(internalCluster().getInstance(ScriptService.class)),
ClientProxy.of(client()));
SearchInputFactory factory = new SearchInputFactory(ImmutableSettings.EMPTY, ClientProxy.of(client()));
factory.parseInput("_id", parser);
fail("expected a SearchInputException as search type SCAN should not be supported");
@ -257,9 +250,7 @@ public class SearchInputTests extends ElasticsearchIntegrationTest {
@Test(expected = SearchInputException.class)
public void testParser_Invalid() throws Exception {
SearchInputFactory factory = new SearchInputFactory(settingsBuilder().build(),
ScriptServiceProxy.of(internalCluster().getInstance(ScriptService.class)),
ClientProxy.of(client()));
SearchInputFactory factory = new SearchInputFactory(settingsBuilder().build(), ClientProxy.of(client()));
Map<String, Object> data = new HashMap<>();
data.put("foo", "bar");
@ -297,9 +288,7 @@ public class SearchInputTests extends ElasticsearchIntegrationTest {
WatcherUtils.writeSearchRequest(request, jsonBuilder, ToXContent.EMPTY_PARAMS);
jsonBuilder.endObject();
SearchInputFactory factory = new SearchInputFactory(settingsBuilder().build(),
ScriptServiceProxy.of(internalCluster().getInstance(ScriptService.class)),
ClientProxy.of(client()));
SearchInputFactory factory = new SearchInputFactory(settingsBuilder().build(), ClientProxy.of(client()));
XContentParser parser = JsonXContent.jsonXContent.createParser(jsonBuilder.bytes());
parser.nextToken();
@ -319,9 +308,7 @@ public class SearchInputTests extends ElasticsearchIntegrationTest {
SearchInput si = siBuilder.build();
ExecutableSearchInput searchInput = new ExecutableSearchInput(si, logger,
ScriptServiceProxy.of(internalCluster().getInstance(ScriptService.class)),
ClientProxy.of(client()));
ExecutableSearchInput searchInput = new ExecutableSearchInput(si, logger, ClientProxy.of(client()));
WatchExecutionContext ctx = new TriggeredExecutionContext(
new Watch("test-watch",
new ClockMock(),

View File

@ -17,10 +17,10 @@ import org.elasticsearch.common.joda.time.DateTime;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.*;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.test.ElasticsearchTestCase;
import org.elasticsearch.watcher.input.search.ExecutableSearchInput;
import org.elasticsearch.watcher.support.template.Template;
import org.junit.Test;
import java.io.IOException;
@ -106,16 +106,20 @@ public class WatcherUtilsTests extends ElasticsearchTestCase {
expectedRequest.source(expectedSource);
if (randomBoolean()) {
expectedRequest.templateName(randomAsciiOfLengthBetween(1, 5));
expectedRequest.templateType(randomFrom(ScriptService.ScriptType.values()));
Map<String, Object> params = new HashMap<>();
if (randomBoolean()) {
Map<String, Object> params = new HashMap<>();
int maxParams = randomIntBetween(1, 10);
for (int i = 0; i < maxParams; i++) {
params.put(randomAsciiOfLengthBetween(1, 5), randomAsciiOfLengthBetween(1, 5));
}
expectedRequest.templateParams(params);
}
String text = randomAsciiOfLengthBetween(1, 5);
Template template = randomFrom(
Template.inline(text).params(params).build(),
Template.file(text).params(params).build(),
Template.indexed(text).params(params).build()
);
expectedRequest.templateSource(jsonBuilder().startObject().field("template", template).endObject().bytes(), false);
}
XContentBuilder builder = jsonBuilder();
@ -129,9 +133,7 @@ public class WatcherUtilsTests extends ElasticsearchTestCase {
assertThat(result.indicesOptions(), equalTo(expectedRequest.indicesOptions()));
assertThat(result.searchType(), equalTo(expectedRequest.searchType()));
assertThat(result.source().toUtf8(), equalTo(expectedSource));
assertThat(result.templateName(), equalTo(expectedRequest.templateName()));
assertThat(result.templateType(), equalTo(expectedRequest.templateType()));
assertThat(result.templateParams(), equalTo(expectedRequest.templateParams()));
assertThat(result.templateSource(), equalTo(expectedRequest.templateSource()));
}
@Test @Repeat(iterations = 100)
@ -187,28 +189,23 @@ public class WatcherUtilsTests extends ElasticsearchTestCase {
builder.rawField("body", source);
}
String templateName = null;
ScriptService.ScriptType templateType = null;
Map<String, Object> templateParams = Collections.emptyMap();
BytesReference templateSource = null;
if (randomBoolean()) {
builder.startObject("template");
Map<String, Object> params = new HashMap<>();
if (randomBoolean()) {
templateName = randomAsciiOfLengthBetween(1, 5);
builder.field("name", templateName);
}
if (randomBoolean()) {
templateType = randomFrom(ScriptService.ScriptType.values());
builder.field("type", randomBoolean() ? templateType.name() : templateType.name().toLowerCase(Locale.ROOT));
}
if (randomBoolean()) {
templateParams = new HashMap<>();
int maxParams = randomIntBetween(1, 10);
for (int i = 0; i < maxParams; i++) {
templateParams.put(randomAsciiOfLengthBetween(1, 5), randomAsciiOfLengthBetween(1, 5));
params.put(randomAsciiOfLengthBetween(1, 5), randomAsciiOfLengthBetween(1, 5));
}
builder.field("params", templateParams);
}
builder.endObject();
String text = randomAsciiOfLengthBetween(1, 5);
Template template = randomFrom(
Template.inline(text).params(params).build(),
Template.file(text).params(params).build(),
Template.indexed(text).params(params).build()
);
builder.field("template", template);
templateSource = jsonBuilder().value(template).bytes();
}
XContentParser parser = XContentHelper.createParser(builder.bytes());
@ -220,9 +217,7 @@ public class WatcherUtilsTests extends ElasticsearchTestCase {
assertThat(result.indicesOptions(), equalTo(indicesOptions));
assertThat(result.searchType(), equalTo(searchType));
assertThat(result.source(), equalTo(source));
assertThat(result.templateName(), equalTo(templateName));
assertThat(result.templateType(), equalTo(templateType));
assertThat(result.templateParams(), equalTo(templateParams));
assertThat(result.templateSource(), equalTo(templateSource));
}
}

View File

@ -192,7 +192,7 @@ public final class WatcherTestUtils {
new ScheduleTrigger(new CronSchedule("0/5 * * * * ? *")),
new ExecutableSimpleInput(new SimpleInput(new Payload.Simple(inputData)), logger),
new ExecutableScriptCondition(new ScriptCondition(Script.inline("return true").build()), logger, scriptService),
new ExecutableSearchTransform(new SearchTransform(transformRequest), logger, scriptService, client),
new ExecutableSearchTransform(new SearchTransform(transformRequest), logger, client),
new ExecutableActions(actions),
metadata,
new TimeValue(0),

View File

@ -9,6 +9,7 @@ import org.apache.lucene.util.LuceneTestCase;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.joda.time.DateTime;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ToXContent;
@ -23,6 +24,7 @@ import org.elasticsearch.watcher.client.WatcherClient;
import org.elasticsearch.watcher.condition.ConditionBuilders;
import org.elasticsearch.watcher.history.HistoryStore;
import org.elasticsearch.watcher.support.WatcherUtils;
import org.elasticsearch.watcher.support.template.Template;
import org.elasticsearch.watcher.test.AbstractWatcherIntegrationTests;
import org.elasticsearch.watcher.transport.actions.delete.DeleteWatchResponse;
import org.elasticsearch.watcher.transport.actions.get.GetWatchResponse;
@ -313,9 +315,12 @@ public class BasicWatcherTests extends AbstractWatcherIntegrationTests {
.setSource(jsonBuilder().startObject().field("template").value(searchSourceBuilder).endObject())
.get();
refresh();
BytesReference templateSource = jsonBuilder()
.value(Template.indexed("my-template").build())
.bytes();
SearchRequest searchRequest = newInputSearchRequest("events");
searchRequest.templateName("my-template");
searchRequest.templateType(ScriptService.ScriptType.INDEXED);
searchRequest.templateSource(templateSource, false);
testConditionSearch(searchRequest);
}

View File

@ -10,7 +10,6 @@ import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.common.base.Predicate;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;

View File

@ -21,6 +21,7 @@ import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.elasticsearch.watcher.actions.ActionWrapper;
import org.elasticsearch.watcher.actions.ExecutableActions;
import org.elasticsearch.watcher.condition.always.ExecutableAlwaysCondition;
@ -32,8 +33,10 @@ import org.elasticsearch.watcher.license.LicenseService;
import org.elasticsearch.watcher.support.WatcherUtils;
import org.elasticsearch.watcher.support.clock.ClockMock;
import org.elasticsearch.watcher.support.init.proxy.ClientProxy;
import org.elasticsearch.watcher.support.template.Template;
import org.elasticsearch.watcher.test.AbstractWatcherIntegrationTests;
import org.elasticsearch.watcher.transform.Transform;
import org.elasticsearch.watcher.transform.TransformBuilders;
import org.elasticsearch.watcher.trigger.schedule.IntervalSchedule;
import org.elasticsearch.watcher.trigger.schedule.ScheduleTrigger;
import org.elasticsearch.watcher.trigger.schedule.ScheduleTriggerEvent;
@ -50,6 +53,7 @@ import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.index.query.FilterBuilders.*;
import static org.elasticsearch.index.query.QueryBuilders.*;
import static org.elasticsearch.search.builder.SearchSourceBuilder.searchSource;
import static org.elasticsearch.test.ElasticsearchIntegrationTest.Scope.SUITE;
import static org.elasticsearch.watcher.support.WatcherDateUtils.parseDate;
import static org.elasticsearch.watcher.test.WatcherTestUtils.*;
import static org.hamcrest.Matchers.*;
@ -58,7 +62,8 @@ import static org.mockito.Mockito.mock;
/**
*
*/
public class SearchTransformTests extends AbstractWatcherIntegrationTests {
@ElasticsearchIntegrationTest.ClusterScope(scope = SUITE, numClientNodes = 0, transportClientRatio = 0, randomDynamicTemplates = false, numDataNodes = 1)
public class SearchTransformTests extends ElasticsearchIntegrationTest {
@Override
public Settings nodeSettings(int nodeOrdinal) {
@ -79,7 +84,8 @@ public class SearchTransformTests extends AbstractWatcherIntegrationTests {
.startObject("match_all").endObject()
.endObject()
.endObject());
ExecutableSearchTransform transform = new ExecutableSearchTransform(new SearchTransform(request), logger, scriptService(), ClientProxy.of(client()));
SearchTransform searchTransform = TransformBuilders.searchTransform(request).build();
ExecutableSearchTransform transform = new ExecutableSearchTransform(searchTransform, logger, ClientProxy.of(client()));
WatchExecutionContext ctx = mockExecutionContext("_name", EMPTY_PAYLOAD);
@ -136,7 +142,8 @@ public class SearchTransformTests extends AbstractWatcherIntegrationTests {
.must(rangeFilter("date").lt("{{ctx.execution_time}}"))
.must(termFilter("value", "{{ctx.payload.value}}")))));
ExecutableSearchTransform transform = new ExecutableSearchTransform(new SearchTransform(request), logger, scriptService(), ClientProxy.of(client()));
SearchTransform searchTransform = TransformBuilders.searchTransform(request).build();
ExecutableSearchTransform transform = new ExecutableSearchTransform(searchTransform, logger, ClientProxy.of(client()));
ScheduleTriggerEvent event = new ScheduleTriggerEvent("_name", parseDate("2015-01-04T00:00:00", UTC), parseDate("2015-01-01T00:00:00", UTC));
WatchExecutionContext ctx = mockExecutionContext("_name", parseDate("2015-01-04T00:00:00", UTC), event, EMPTY_PAYLOAD);
@ -168,7 +175,6 @@ public class SearchTransformTests extends AbstractWatcherIntegrationTests {
String[] indices = rarely() ? null : randomBoolean() ? new String[] { "idx" } : new String[] { "idx1", "idx2" };
SearchType searchType = getRandomSupportedSearchType();
String templateName = randomBoolean() ? null : "template1";
ScriptService.ScriptType templateType = templateName != null && randomBoolean() ? randomFrom(ScriptService.ScriptType.values()) : null;
XContentBuilder builder = jsonBuilder().startObject();
if (indices != null) {
builder.array("indices", indices);
@ -177,12 +183,8 @@ public class SearchTransformTests extends AbstractWatcherIntegrationTests {
builder.field("search_type", searchType.name());
}
if (templateName != null) {
builder.startObject("template")
.field("name", templateName);
if (templateType != null) {
builder.field("type", templateType);
}
builder.endObject();
Template template = Template.file(templateName).build();
builder.field("template", template);
}
XContentBuilder sourceBuilder = jsonBuilder().startObject()
@ -203,7 +205,7 @@ public class SearchTransformTests extends AbstractWatcherIntegrationTests {
builder.endObject();
XContentParser parser = JsonXContent.jsonXContent.createParser(builder.bytes());
parser.nextToken();
ExecutableSearchTransform executable = new SearchTransformFactory(ImmutableSettings.EMPTY, scriptService(), ClientProxy.of(client())).parseExecutable("_id", parser);
ExecutableSearchTransform executable = new SearchTransformFactory(ImmutableSettings.EMPTY, ClientProxy.of(client())).parseExecutable("_id", parser);
assertThat(executable, notNullValue());
assertThat(executable.type(), is(SearchTransform.TYPE));
assertThat(executable.transform().getRequest(), notNullValue());
@ -214,10 +216,7 @@ public class SearchTransformTests extends AbstractWatcherIntegrationTests {
assertThat(executable.transform().getRequest().searchType(), is(searchType));
}
if (templateName != null) {
assertThat(executable.transform().getRequest().templateName(), equalTo(templateName));
}
if (templateType != null) {
assertThat(executable.transform().getRequest().templateType(), equalTo(templateType));
assertThat(executable.transform().getRequest().templateSource().toUtf8(), equalTo("{\"file\":\"template1\"}"));
}
assertThat(executable.transform().getRequest().source().toBytes(), equalTo(source.toBytes()));
}
@ -230,13 +229,12 @@ public class SearchTransformTests extends AbstractWatcherIntegrationTests {
.source(searchSource()
.query(filteredQuery(matchQuery("event_type", "a"), rangeFilter("_timestamp").from("{{ctx.trigger.scheduled_time}}||-30s").to("{{ctx.trigger.triggered_time}}"))));
XContentBuilder builder = jsonBuilder().value(new SearchTransform(request));
SearchTransform searchTransform = TransformBuilders.searchTransform(request).build();
XContentBuilder builder = jsonBuilder().value(searchTransform);
XContentParser parser = JsonXContent.jsonXContent.createParser(builder.bytes());
parser.nextToken();
SearchTransformFactory factory = new SearchTransformFactory(ImmutableSettings.EMPTY,
scriptService(),
ClientProxy.of(client()));
SearchTransformFactory factory = new SearchTransformFactory(ImmutableSettings.EMPTY, ClientProxy.of(client()));
factory.parseTransform("_id", parser);
fail("expected a SearchTransformException as search type SCAN should not be supported");
@ -250,27 +248,27 @@ public class SearchTransformTests extends AbstractWatcherIntegrationTests {
"{\"from\":\"{{ctx.trigger.scheduled_time}}||-{{seconds_param}}\",\"to\":\"{{ctx.trigger.scheduled_time}}\"," +
"\"include_lower\":true,\"include_upper\":true}}}}}}";
final String expectedQuery = "{\"query\":{\"filtered\":{\"query\":{\"match\":{\"event_type\":" +
"{\"query\":\"a\",\"type\":\"boolean\"}}},\"filter\":{\"range\":{\"_timestamp\":{\"from\":\"1970-01-01T00:01:00.000Z||-30s\"," +
"\"to\":\"1970-01-01T00:01:00.000Z\",\"include_lower\":true,\"include_upper\":true}}}}}}";
ScriptService.ScriptType scriptType = ScriptService.ScriptType.INLINE;
final String expectedQuery = "{\"template\":{\"query\":{\"filtered\":{\"query\":{\"match\":{\"event_type\":{\"query\":\"a\"," +
"\"type\":\"boolean\"}}},\"filter\":{\"range\":{\"_timestamp\":" +
"{\"from\":\"{{ctx.trigger.scheduled_time}}||-{{seconds_param}}\",\"to\":\"{{ctx.trigger.scheduled_time}}\"," +
"\"include_lower\":true,\"include_upper\":true}}}}}},\"params\":{\"seconds_param\":\"30s\",\"ctx\":{\"metadata\":null,\"watch_id\":\"test-watch\",\"payload\":{},\"trigger\":{\"triggered_time\":\"1970-01-01T00:01:00.000Z\",\"scheduled_time\":\"1970-01-01T00:01:00.000Z\"},\"execution_time\":\"1970-01-01T00:01:00.000Z\"}}}";
Map<String, Object> params = new HashMap<>();
params.put("seconds_param", "30s");
BytesReference templateSource = jsonBuilder()
.value(Template.inline(templateQuery).params(params).build())
.bytes();
SearchRequest request = client()
.prepareSearch()
.setSearchType(ExecutableSearchTransform.DEFAULT_SEARCH_TYPE)
.setIndices("test-search-index")
.setTemplateSource(templateQuery)
.setTemplateParams(params)
.setTemplateType(scriptType)
.setTemplateSource(templateSource)
.request();
SearchTransform.Result executedResult = executeSearchTransform(request);
assertThat(executedResult.executedRequest().source().toUtf8(), equalTo(expectedQuery));
assertThat(executedResult.executedRequest().templateSource().toUtf8(), equalTo(expectedQuery));
}
@Test
@ -283,42 +281,42 @@ public class SearchTransformTests extends AbstractWatcherIntegrationTests {
PutIndexedScriptRequest indexedScriptRequest = client().preparePutIndexedScript("mustache","test-script", templateQuery).request();
assertThat(client().putIndexedScript(indexedScriptRequest).actionGet().isCreated(), is(true));
ScriptService.ScriptType scriptType = ScriptService.ScriptType.INDEXED;
Map<String, Object> params = new HashMap<>();
params.put("seconds_param", "30s");
BytesReference templateSource = jsonBuilder()
.value(Template.indexed("test-script").params(params).build())
.bytes();
SearchRequest request = client()
.prepareSearch()
.setSearchType(ExecutableSearchTransform.DEFAULT_SEARCH_TYPE)
.setIndices("test-search-index")
.setTemplateName("test-script")
.setTemplateParams(params)
.setTemplateType(scriptType)
.setTemplateSource(templateSource)
.request();
SearchTransform.Result result = executeSearchTransform(request);
assertNotNull(result.executedRequest());
assertThat(result.executedRequest().templateSource().toUtf8(), startsWith("{\"template\":{\"id\":\"test-script\""));
}
@Test
public void testSearch_OndiskTemplate() throws Exception {
ScriptService.ScriptType scriptType = ScriptService.ScriptType.FILE;
Map<String, Object> params = new HashMap<>();
params.put("seconds_param", "30s");
BytesReference templateSource = jsonBuilder()
.value(Template.file("test_disk_template").params(params).build())
.bytes();
SearchRequest request = client()
.prepareSearch()
.setSearchType(ExecutableSearchTransform.DEFAULT_SEARCH_TYPE)
.setIndices("test-search-index")
.setTemplateName("test_disk_template")
.setTemplateParams(params)
.setTemplateType(scriptType)
.setTemplateSource(templateSource)
.request();
SearchTransform.Result result = executeSearchTransform(request);
assertNotNull(result.executedRequest());
assertThat(result.executedRequest().templateSource().toUtf8(), startsWith("{\"template\":{\"file\":\"test_disk_template\""));
}
@ -365,9 +363,7 @@ public class SearchTransformTests extends AbstractWatcherIntegrationTests {
WatcherUtils.writeSearchRequest(request, jsonBuilder, ToXContent.EMPTY_PARAMS);
jsonBuilder.endObject();
SearchTransformFactory factory = new SearchTransformFactory(settingsBuilder().build(),
scriptService(),
ClientProxy.of(client()));
SearchTransformFactory factory = new SearchTransformFactory(settingsBuilder().build(), ClientProxy.of(client()));
XContentParser parser = JsonXContent.jsonXContent.createParser(jsonBuilder.bytes());
parser.nextToken();
@ -385,8 +381,8 @@ public class SearchTransformTests extends AbstractWatcherIntegrationTests {
createIndex("test-search-index");
ensureGreen("test-search-index");
SearchTransform searchTransform = new SearchTransform(request);
ExecutableSearchTransform executableSearchTransform = new ExecutableSearchTransform(searchTransform, logger, scriptService(), ClientProxy.of(client()));
SearchTransform searchTransform = TransformBuilders.searchTransform(request).build();
ExecutableSearchTransform executableSearchTransform = new ExecutableSearchTransform(searchTransform, logger, ClientProxy.of(client()));
WatchExecutionContext ctx = new TriggeredExecutionContext(
new Watch("test-watch",

View File

@ -263,7 +263,7 @@ public class WatchTests extends ElasticsearchTestCase {
switch (type) {
case SearchInput.TYPE:
SearchInput searchInput = searchInput(WatcherTestUtils.newInputSearchRequest("idx")).build();
return new ExecutableSearchInput(searchInput, logger, scriptService, client);
return new ExecutableSearchInput(searchInput, logger, client);
default:
SimpleInput simpleInput = InputBuilders.simpleInput(ImmutableMap.<String, Object>builder().put("_key", "_val")).build();
return new ExecutableSimpleInput(simpleInput, logger);
@ -274,7 +274,7 @@ public class WatchTests extends ElasticsearchTestCase {
ImmutableMap.Builder<String, InputFactory> parsers = ImmutableMap.builder();
switch (input.type()) {
case SearchInput.TYPE:
parsers.put(SearchInput.TYPE, new SearchInputFactory(settings, scriptService, client));
parsers.put(SearchInput.TYPE, new SearchInputFactory(settings, client));
return new InputRegistry(parsers.build());
default:
parsers.put(SimpleInput.TYPE, new SimpleInputFactory(settings));
@ -310,13 +310,13 @@ public class WatchTests extends ElasticsearchTestCase {
case ScriptTransform.TYPE:
return new ExecutableScriptTransform(new ScriptTransform(Script.inline("_script").build()), logger, scriptService);
case SearchTransform.TYPE:
return new ExecutableSearchTransform(new SearchTransform(matchAllRequest(WatcherUtils.DEFAULT_INDICES_OPTIONS)), logger, scriptService, client);
return new ExecutableSearchTransform(new SearchTransform(matchAllRequest(WatcherUtils.DEFAULT_INDICES_OPTIONS)), logger, client);
default: // chain
ChainTransform chainTransform = new ChainTransform(ImmutableList.of(
new SearchTransform(matchAllRequest(WatcherUtils.DEFAULT_INDICES_OPTIONS)),
new ScriptTransform(Script.inline("_script").build())));
return new ExecutableChainTransform(chainTransform, logger, ImmutableList.<ExecutableTransform>of(
new ExecutableSearchTransform(new SearchTransform(matchAllRequest(WatcherUtils.DEFAULT_INDICES_OPTIONS)), logger, scriptService, client),
new ExecutableSearchTransform(new SearchTransform(matchAllRequest(WatcherUtils.DEFAULT_INDICES_OPTIONS)), logger, client),
new ExecutableScriptTransform(new ScriptTransform(Script.inline("_script").build()), logger, scriptService)));
}
}
@ -326,7 +326,7 @@ public class WatchTests extends ElasticsearchTestCase {
ChainTransformFactory parser = new ChainTransformFactory();
factories.put(ChainTransform.TYPE, parser);
factories.put(ScriptTransform.TYPE, new ScriptTransformFactory(settings, scriptService));
factories.put(SearchTransform.TYPE, new SearchTransformFactory(settings, scriptService, client));
factories.put(SearchTransform.TYPE, new SearchTransformFactory(settings, client));
TransformRegistry registry = new TransformRegistry(factories.build());
parser.init(registry);
return registry;