diff --git a/src/main/java/org/elasticsearch/watcher/input/http/HttpInput.java b/src/main/java/org/elasticsearch/watcher/input/http/HttpInput.java index ffbbb7afe28..cc21e9440e4 100644 --- a/src/main/java/org/elasticsearch/watcher/input/http/HttpInput.java +++ b/src/main/java/org/elasticsearch/watcher/input/http/HttpInput.java @@ -16,7 +16,9 @@ import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.watcher.input.Input; +import org.elasticsearch.watcher.input.InputException; import org.elasticsearch.watcher.support.Variables; +import org.elasticsearch.watcher.support.XContentFilterKeysUtils; import org.elasticsearch.watcher.support.http.HttpClient; import org.elasticsearch.watcher.support.http.HttpRequest; import org.elasticsearch.watcher.support.http.HttpResponse; @@ -27,7 +29,9 @@ import org.elasticsearch.watcher.watch.Payload; import org.elasticsearch.watcher.watch.WatchExecutionContext; import java.io.IOException; +import java.util.HashSet; import java.util.Map; +import java.util.Set; /** */ @@ -36,12 +40,14 @@ public class HttpInput extends Input { public static final String TYPE = "http"; private final HttpClient client; + private final Set extractKeys; private final TemplatedHttpRequest request; - public HttpInput(ESLogger logger, HttpClient client, TemplatedHttpRequest request) { + public HttpInput(ESLogger logger, HttpClient client, TemplatedHttpRequest request, Set extractKeys) { super(logger); this.request = request; this.client = client; + this.extractKeys = extractKeys; } @Override @@ -54,14 +60,33 @@ public class HttpInput extends Input { Map model = Variables.createCtxModel(ctx, null); HttpRequest httpRequest = request.render(model); try (HttpResponse response = client.execute(httpRequest)) { - Tuple> result = XContentHelper.convertToMap(response.body(), true); - return new Result(TYPE, new Payload.Simple(result.v2()), httpRequest, response.status()); + byte[] bytes = response.body(); + final Payload payload; + if (extractKeys != null) { + XContentParser parser = XContentHelper.createParser(bytes, 0, bytes.length); + Map filteredKeys = XContentFilterKeysUtils.filterMapOrdered(extractKeys, parser); + payload = new Payload.Simple(filteredKeys); + } else { + Tuple> result = XContentHelper.convertToMap(bytes, true); + payload = new Payload.Simple(result.v2()); + } + return new Result(TYPE, payload, httpRequest, response.status()); } } @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - return request.toXContent(builder, params); + builder.startObject(); + builder.field(Parser.REQUEST_FIELD.getPreferredName()); + builder = request.toXContent(builder, params); + if (extractKeys != null) { + builder.startArray(Parser.EXTRACT_FIELD.getPreferredName()); + for (String extractKey : extractKeys) { + builder.value(extractKey); + } + builder.endObject(); + } + return builder.endObject(); } @Override @@ -114,6 +139,7 @@ public class HttpInput extends Input { public final static class Parser extends AbstractComponent implements Input.Parser { public static final ParseField REQUEST_FIELD = new ParseField("request"); + public static final ParseField EXTRACT_FIELD = new ParseField("extract"); public static final ParseField HTTP_STATUS_FIELD = new ParseField("http_status"); private final HttpClient client; @@ -135,8 +161,45 @@ public class HttpInput extends Input { @Override public HttpInput parse(XContentParser parser) throws IOException { - TemplatedHttpRequest request = templatedRequestParser.parse(parser); - return new HttpInput(logger, client, request); + Set extract = null; + TemplatedHttpRequest request = null; + + String currentFieldName = null; + for (XContentParser.Token token = parser.nextToken(); token != XContentParser.Token.END_OBJECT; token = parser.nextToken()) { + switch (token) { + case FIELD_NAME: + currentFieldName = parser.currentName(); + break; + case START_OBJECT: + if (REQUEST_FIELD.getPreferredName().equals(currentFieldName)) { + request = templatedRequestParser.parse(parser); + } else { + throw new InputException("could not parse [http] input. unexpected field [" + currentFieldName + "]"); + } + break; + case START_ARRAY: + if (EXTRACT_FIELD.getPreferredName().equals(currentFieldName)) { + extract = new HashSet<>(); + for (XContentParser.Token arrayToken = parser.nextToken(); arrayToken != XContentParser.Token.END_ARRAY; arrayToken = parser.nextToken()) { + if (arrayToken == XContentParser.Token.VALUE_STRING) { + extract.add(parser.text()); + } + } + } else { + throw new InputException("could not parse [http] input. unexpected field [" + currentFieldName + "]"); + } + break; + default: + throw new InputException("could not parse [http] input. unexpected token [" + token + "]"); + + } + } + + if (request == null) { + throw new InputException("could not parse [http] input. http request is missing or null."); + } + + return new HttpInput(logger, client, request, extract); } @Override @@ -157,7 +220,9 @@ public class HttpInput extends Input { request = requestParser.parse(parser); } } else if (token == XContentParser.Token.VALUE_NUMBER) { - statusCode = parser.intValue(); + if (HTTP_STATUS_FIELD.match(currentFieldName)) { + statusCode = parser.intValue(); + } } } return new Result(TYPE, payload, request, statusCode); @@ -175,6 +240,7 @@ public class HttpInput extends Input { private Map headers; private HttpAuth auth; private Template body; + private Set extractKeys; public SourceBuilder setHost(String host) { this.host = host; @@ -216,6 +282,14 @@ public class HttpInput extends Input { return this; } + public SourceBuilder addExtractKey(String key) { + if (extractKeys == null) { + extractKeys = new HashSet<>(); + } + extractKeys.add(key); + return this; + } + @Override public String type() { return TYPE; @@ -224,6 +298,14 @@ public class HttpInput extends Input { @Override public XContentBuilder toXContent(XContentBuilder builder, Params p) throws IOException { builder.startObject(); + if (extractKeys != null) { + builder.startArray(Parser.EXTRACT_FIELD.getPreferredName()); + for (String extractKey : extractKeys) { + builder.value(extractKey); + } + builder.endArray(); + } + builder.startObject(Parser.REQUEST_FIELD.getPreferredName()); builder.field(HttpRequest.Parser.HOST_FIELD.getPreferredName(), host); builder.field(HttpRequest.Parser.PORT_FIELD.getPreferredName(), port); if (method != null) { @@ -244,6 +326,7 @@ public class HttpInput extends Input { if (body != null) { builder.field(HttpRequest.Parser.BODY_FIELD.getPreferredName(), body); } + builder.endObject(); return builder.endObject(); } } diff --git a/src/main/java/org/elasticsearch/watcher/input/search/SearchInput.java b/src/main/java/org/elasticsearch/watcher/input/search/SearchInput.java index f23a016b3b7..3e2c444a00c 100644 --- a/src/main/java/org/elasticsearch/watcher/input/search/SearchInput.java +++ b/src/main/java/org/elasticsearch/watcher/input/search/SearchInput.java @@ -26,13 +26,19 @@ import org.elasticsearch.watcher.input.InputException; import org.elasticsearch.watcher.support.SearchRequestEquivalence; import org.elasticsearch.watcher.support.Variables; import org.elasticsearch.watcher.support.WatcherUtils; +import org.elasticsearch.watcher.support.XContentFilterKeysUtils; import org.elasticsearch.watcher.support.init.proxy.ClientProxy; import org.elasticsearch.watcher.support.init.proxy.ScriptServiceProxy; import org.elasticsearch.watcher.watch.Payload; import org.elasticsearch.watcher.watch.WatchExecutionContext; import java.io.IOException; +import java.util.HashSet; import java.util.Map; +import java.util.Set; + +import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; +import static org.elasticsearch.common.xcontent.XContentParser.*; /** * An input that executes search and returns the search response as the initial payload @@ -43,13 +49,15 @@ public class SearchInput extends Input { public static final SearchType DEFAULT_SEARCH_TYPE = SearchType.COUNT; + private final Set extractKeys; private final SearchRequest searchRequest; private final ScriptServiceProxy scriptService; private final ClientProxy client; - public SearchInput(ESLogger logger, ScriptServiceProxy scriptService, ClientProxy client, SearchRequest searchRequest) { + public SearchInput(ESLogger logger, ScriptServiceProxy scriptService, ClientProxy client, SearchRequest searchRequest, Set extractKeys) { super(logger); + this.extractKeys = extractKeys; this.searchRequest = searchRequest; this.scriptService = scriptService; this.client = client; @@ -78,12 +86,32 @@ public class SearchInput extends Input { } - return new Result(TYPE, new Payload.XContent(response), request); + final Payload payload; + if (extractKeys != null) { + XContentBuilder builder = jsonBuilder().startObject().value(response).endObject(); + XContentParser parser = XContentHelper.createParser(builder.bytes()); + Map filteredKeys = XContentFilterKeysUtils.filterMapOrdered(extractKeys, parser); + payload = new Payload.Simple(filteredKeys); + } else { + payload = new Payload.XContent(response); + } + return new Result(TYPE, payload, request); } @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - return WatcherUtils.writeSearchRequest(searchRequest, builder, params); + builder.startObject(); + builder.field(Parser.REQUEST_FIELD.getPreferredName()); + builder = WatcherUtils.writeSearchRequest(searchRequest, builder, params); + if (extractKeys != null) { + builder.startArray(Parser.EXTRACT_FIELD.getPreferredName()); + for (String extractKey : extractKeys) { + builder.value(extractKey); + } + builder.endObject(); + } + builder.endObject(); + return builder; } @Override @@ -150,6 +178,7 @@ public class SearchInput extends Input { public static class Parser extends AbstractComponent implements Input.Parser { public static ParseField REQUEST_FIELD = new ParseField("request"); + public static ParseField EXTRACT_FIELD = new ParseField("extract"); private final ScriptServiceProxy scriptService; private final ClientProxy client; @@ -167,11 +196,44 @@ public class SearchInput extends Input { @Override public SearchInput parse(XContentParser parser) throws IOException { - SearchRequest request = WatcherUtils.readSearchRequest(parser, DEFAULT_SEARCH_TYPE); + Set extract = null; + SearchRequest request = null; + + String currentFieldName = null; + for (Token token = parser.nextToken(); token != Token.END_OBJECT; token = parser.nextToken()) { + switch (token) { + case FIELD_NAME: + currentFieldName = parser.currentName(); + break; + case START_OBJECT: + if (REQUEST_FIELD.getPreferredName().equals(currentFieldName)) { + request = WatcherUtils.readSearchRequest(parser, DEFAULT_SEARCH_TYPE); + } else { + throw new InputException("could not parse [search] input. unexpected field [" + currentFieldName + "]"); + } + break; + case START_ARRAY: + if (EXTRACT_FIELD.getPreferredName().equals(currentFieldName)) { + extract = new HashSet<>(); + for (Token arrayToken = parser.nextToken(); arrayToken != Token.END_ARRAY; arrayToken = parser.nextToken()) { + if (arrayToken == Token.VALUE_STRING) { + extract.add(parser.text()); + } + } + } else { + throw new InputException("could not parse [search] input. unexpected field [" + currentFieldName + "]"); + } + break; + default: + throw new InputException("could not parse [search] input. unexpected token [" + token + "]"); + + } + } + if (request == null) { throw new InputException("could not parse [search] input. search request is missing or null."); } - return new SearchInput(logger, scriptService, client, request); + return new SearchInput(logger, scriptService, client, request, extract); } @Override @@ -180,11 +242,10 @@ public class SearchInput extends Input { SearchRequest request = null; String currentFieldName = null; - XContentParser.Token token; - while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { - if (token == XContentParser.Token.FIELD_NAME) { + for (Token token = parser.nextToken(); token != Token.END_OBJECT; token = parser.nextToken()) { + if (token == Token.FIELD_NAME) { currentFieldName = parser.currentName(); - } else if (token == XContentParser.Token.START_OBJECT && currentFieldName != null) { + } else if (token == Token.START_OBJECT && currentFieldName != null) { if (Input.Result.PAYLOAD_FIELD.match(currentFieldName)) { payload = new Payload.XContent(parser); } else if (REQUEST_FIELD.match(currentFieldName)) { @@ -212,11 +273,20 @@ public class SearchInput extends Input { public static class SourceBuilder implements Input.SourceBuilder { private final SearchRequest request; + private Set extractKeys; public SourceBuilder(SearchRequest request) { this.request = request; } + public SourceBuilder addExtractKey(String key) { + if (extractKeys == null) { + extractKeys = new HashSet<>(); + } + extractKeys.add(key); + return this; + } + @Override public String type() { return TYPE; @@ -224,7 +294,17 @@ public class SearchInput extends Input { @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - return WatcherUtils.writeSearchRequest(request, builder, params); + builder.startObject(); + if (extractKeys != null) { + builder.startArray(Parser.EXTRACT_FIELD.getPreferredName()); + for (String extractKey : extractKeys) { + builder.value(extractKey); + } + builder.endArray(); + } + builder.field(Parser.REQUEST_FIELD.getPreferredName()); + builder = WatcherUtils.writeSearchRequest(request, builder, params); + return builder.endObject(); } } } diff --git a/src/main/java/org/elasticsearch/watcher/support/XContentFilterKeysUtils.java b/src/main/java/org/elasticsearch/watcher/support/XContentFilterKeysUtils.java new file mode 100644 index 00000000000..7c8d7a5a39a --- /dev/null +++ b/src/main/java/org/elasticsearch/watcher/support/XContentFilterKeysUtils.java @@ -0,0 +1,162 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.watcher.support; + +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.watcher.WatcherException; + +import java.io.IOException; +import java.util.*; + +import static org.elasticsearch.common.xcontent.XContentParser.Token.*; + +public final class XContentFilterKeysUtils { + + private XContentFilterKeysUtils() { + } + + public static Map filterMapOrdered(Set keys, XContentParser parser) { + try { + if (parser.currentToken() != null) { + throw new IllegalArgumentException("Parser already started"); + } + if (parser.nextToken() != START_OBJECT) { + throw new IllegalArgumentException("Content should start with START_OBJECT"); + } + State state = new State(new ArrayList<>(keys)); + return parse(parser, state); + } catch (IOException e) { + throw new WatcherException("could not build a filtered payload out of xcontent", e); + } + } + + private static Map parse(XContentParser parser, State state) throws IOException { + if (state.includeLeaf) { + return parser.map(); + } + + Map data = new HashMap<>(); + for (XContentParser.Token token = parser.nextToken(); token != END_OBJECT; token = parser.nextToken()) { + switch (token) { + case FIELD_NAME: + state.nextField(parser.currentName()); + break; + case START_OBJECT: + if (state.includeKey) { + String fieldName = state.currentFieldName(); + Map nestedData = parse(parser, state); + data.put(fieldName, nestedData); + } else { + parser.skipChildren(); + } + state.previousField(); + break; + case START_ARRAY: + if (state.includeKey) { + String fieldName = state.currentFieldName(); + List arrayData = arrayParsing(parser, state); + data.put(fieldName, arrayData); + } else { + parser.skipChildren(); + } + state.previousField(); + break; + case VALUE_STRING: + if (state.includeKey) { + data.put(state.currentFieldName(), parser.text()); + } + state.previousField(); + break; + case VALUE_NUMBER: + if (state.includeKey) { + data.put(state.currentFieldName(), parser.numberValue()); + } + state.previousField(); + break; + case VALUE_BOOLEAN: + if (state.includeKey) { + data.put(state.currentFieldName(), parser.booleanValue()); + } + state.previousField(); + break; + } + } + return data; + } + + private static List arrayParsing(XContentParser parser, State state) throws IOException { + List values = new ArrayList<>(); + for (XContentParser.Token token = parser.nextToken(); token != END_ARRAY; token = parser.nextToken()) { + switch (token) { + case START_OBJECT: + values.add(parse(parser, state)); + break; + case VALUE_STRING: + values.add(parser.text()); + break; + case VALUE_NUMBER: + values.add(parser.numberValue()); + break; + case VALUE_BOOLEAN: + values.add(parser.booleanValue()); + break; + } + } + return values; + } + + private static final class State { + + final List extractPaths; + StringBuilder currentPath = new StringBuilder(); + + boolean includeLeaf; + boolean includeKey; + String currentFieldName; + + private State(List extractPaths) { + this.extractPaths = extractPaths; + } + + void nextField(String fieldName) { + currentFieldName = fieldName; + if (currentPath.length() != 0) { + currentPath.append('.'); + } + currentPath = currentPath.append(fieldName); + final String path = currentPath.toString(); + for (String extractPath : extractPaths) { + if (path.equals(extractPath)) { + includeKey = true; + includeLeaf = true; + return; + } else if (extractPath.startsWith(path)) { + includeKey = true; + return; + } + } + includeKey = false; + includeLeaf = false; + } + + String currentFieldName() { + return currentFieldName; + } + + void previousField() { + int start = currentPath.lastIndexOf(currentFieldName); + currentPath = currentPath.delete(start, currentPath.length()); + if (currentPath.length() > 0 && currentPath.charAt(currentPath.length() - 1) == '.') { + currentPath = currentPath.deleteCharAt(currentPath.length() - 1); + } + currentFieldName = currentPath.toString(); + includeKey = false; + includeLeaf = false; + } + + } + +} diff --git a/src/main/java/org/elasticsearch/watcher/support/http/HttpClient.java b/src/main/java/org/elasticsearch/watcher/support/http/HttpClient.java index c511050b771..ea6b84c21b7 100644 --- a/src/main/java/org/elasticsearch/watcher/support/http/HttpClient.java +++ b/src/main/java/org/elasticsearch/watcher/support/http/HttpClient.java @@ -83,9 +83,9 @@ public class HttpClient extends AbstractComponent { } HttpResponse response = new HttpResponse(); - response.inputStream(urlConnection.getInputStream()); response.status(urlConnection.getResponseCode()); logger.debug("http status code: {}", response.status()); + response.inputStream(urlConnection.getInputStream()); return response; } diff --git a/src/main/java/org/elasticsearch/watcher/support/http/HttpRequest.java b/src/main/java/org/elasticsearch/watcher/support/http/HttpRequest.java index a4a0f013e68..d961289ca33 100644 --- a/src/main/java/org/elasticsearch/watcher/support/http/HttpRequest.java +++ b/src/main/java/org/elasticsearch/watcher/support/http/HttpRequest.java @@ -14,6 +14,7 @@ import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.watcher.support.WatcherUtils; import org.elasticsearch.watcher.support.http.auth.HttpAuth; import org.elasticsearch.watcher.support.http.auth.HttpAuthRegistry; +import org.elasticsearch.watcher.support.template.Template; import java.io.IOException; import java.util.Map; @@ -217,4 +218,82 @@ public class HttpRequest implements ToXContent { } + public final static class SourceBuilder implements ToXContent { + + private String host; + private int port; + private String method; + private Template path; + private Map params; + private Map headers; + private HttpAuth auth; + private Template body; + + public SourceBuilder setHost(String host) { + this.host = host; + return this; + } + + public SourceBuilder setPort(int port) { + this.port = port; + return this; + } + + public SourceBuilder setMethod(String method) { + this.method = method; + return this; + } + + public SourceBuilder setPath(Template path) { + this.path = path; + return this; + } + + public SourceBuilder setParams(Map params) { + this.params = params; + return this; + } + + public SourceBuilder setHeaders(Map headers) { + this.headers = headers; + return this; + } + + public SourceBuilder setAuth(HttpAuth auth) { + this.auth = auth; + return this; + } + + public SourceBuilder setBody(Template body) { + this.body = body; + return this; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params p) throws IOException { + builder.startObject(); + builder.field(HttpRequest.Parser.HOST_FIELD.getPreferredName(), host); + builder.field(HttpRequest.Parser.PORT_FIELD.getPreferredName(), port); + if (method != null) { + builder.field(HttpRequest.Parser.METHOD_FIELD.getPreferredName(), method); + } + if (path != null) { + builder.field(HttpRequest.Parser.PATH_FIELD.getPreferredName(), path); + } + if (params != null) { + builder.field(HttpRequest.Parser.PARAMS_FIELD.getPreferredName(), params); + } + if (headers != null) { + builder.field(HttpRequest.Parser.HEADERS_FIELD.getPreferredName(), headers); + } + if (auth != null) { + builder.field(HttpRequest.Parser.AUTH_FIELD.getPreferredName(), auth); + } + if (body != null) { + builder.field(HttpRequest.Parser.BODY_FIELD.getPreferredName(), body); + } + return builder.endObject(); + } + } + } diff --git a/src/test/java/org/elasticsearch/watcher/input/http/HttpInputTests.java b/src/test/java/org/elasticsearch/watcher/input/http/HttpInputTests.java index e21e9503d03..b7e42bdd55b 100644 --- a/src/test/java/org/elasticsearch/watcher/input/http/HttpInputTests.java +++ b/src/test/java/org/elasticsearch/watcher/input/http/HttpInputTests.java @@ -79,7 +79,7 @@ public class HttpInputTests extends ElasticsearchTestCase { Template mockBody = mock(Template.class); when(mockBody.render(anyMap())).thenReturn(body); request.body(mockBody); - HttpInput input = new HttpInput(logger, httpClient, request); + HttpInput input = new HttpInput(logger, httpClient, request, null); HttpResponse response = new HttpResponse(); response.status(123); @@ -163,7 +163,7 @@ public class HttpInputTests extends ElasticsearchTestCase { String httpMethod = "get"; String body = "_body"; Map headers = new MapBuilder().put("a", new MockTemplate("b")).map(); - HttpInput.SourceBuilder sourceBuilder = new HttpInput.SourceBuilder() + HttpRequest.SourceBuilder sourceBuilder = new HttpRequest.SourceBuilder() .setMethod(httpMethod) .setHost("_host") .setPort(123) diff --git a/src/test/java/org/elasticsearch/watcher/input/search/SearchInputTests.java b/src/test/java/org/elasticsearch/watcher/input/search/SearchInputTests.java index 82f40aa6abc..fc1a093e5cd 100644 --- a/src/test/java/org/elasticsearch/watcher/input/search/SearchInputTests.java +++ b/src/test/java/org/elasticsearch/watcher/input/search/SearchInputTests.java @@ -66,7 +66,7 @@ public class SearchInputTests extends ElasticsearchIntegrationTest { SearchInput searchInput = new SearchInput(logger, ScriptServiceProxy.of(internalCluster().getInstance(ScriptService.class)), - ClientProxy.of(client()), request); + ClientProxy.of(client()), request, null); WatchExecutionContext ctx = new WatchExecutionContext("test-watch", new Watch("test-alert", new ClockMock(), @@ -103,7 +103,7 @@ public class SearchInputTests extends ElasticsearchIntegrationTest { SearchInput searchInput = new SearchInput(logger, ScriptServiceProxy.of(internalCluster().getInstance(ScriptService.class)), - ClientProxy.of(client()), request); + ClientProxy.of(client()), request, null); WatchExecutionContext ctx = new WatchExecutionContext("test-watch", new Watch("test-alert", new ClockMock(), @@ -134,7 +134,8 @@ public class SearchInputTests extends ElasticsearchIntegrationTest { .source(searchSource() .query(filteredQuery(matchQuery("event_type", "a"), rangeFilter("_timestamp").from("{{ctx.trigger.scheduled_time}}||-30s").to("{{ctx.trigger.triggered_time}}")))); - XContentBuilder builder = WatcherUtils.writeSearchRequest(request, jsonBuilder(), ToXContent.EMPTY_PARAMS); + SearchInput.SourceBuilder sourceBuilder = new SearchInput.SourceBuilder(request); + XContentBuilder builder = jsonBuilder().value(sourceBuilder); XContentParser parser = JsonXContent.jsonXContent.createParser(builder.bytes()); parser.nextToken(); diff --git a/src/test/java/org/elasticsearch/watcher/support/FilterXContentTests.java b/src/test/java/org/elasticsearch/watcher/support/FilterXContentTests.java new file mode 100644 index 00000000000..36ace34fbe7 --- /dev/null +++ b/src/test/java/org/elasticsearch/watcher/support/FilterXContentTests.java @@ -0,0 +1,128 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.watcher.support; + +import com.carrotsearch.randomizedtesting.annotations.Repeat; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.collect.MapBuilder; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentHelper; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.test.ElasticsearchTestCase; +import org.hamcrest.Matchers; +import org.junit.Test; + +import java.util.*; + +import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; +import static org.hamcrest.Matchers.equalTo; + +/** + */ +public class FilterXContentTests extends ElasticsearchTestCase { + + @Test + @Repeat(iterations = 12) + public void testPayloadFiltering() throws Exception { + Map data = new HashMap<>(); + data.put("key0", "value1"); + data.put("key1", 2); + data.put("key2", 3.1); + data.put("key3", true); + data.put("key4", Arrays.asList("value5", "value5.5")); + data.put("key5", "value6"); + data.put("key6", 7.1); + data.put("key7", false); + + XContentBuilder builder = jsonBuilder().value(data); + XContentParser parser = XContentHelper.createParser(builder.bytes()); + + Set keys = new HashSet<>(); + int numKeys = randomInt(3); + for (int i = 0; i < numKeys; i++) { + boolean added; + do { + added = keys.add("key" + randomInt(7)); + } while (!added); + } + System.out.println(keys); + + Map filteredData = XContentFilterKeysUtils.filterMapOrdered(keys, parser); + assertThat(filteredData.size(), equalTo(numKeys)); + for (String key : keys) { + assertThat(filteredData.get(key), equalTo(data.get(key))); + } + } + + @Test + public void testNestedPayloadFiltering() throws Exception { + Map data = new HashMap<>(); + data.put("leaf1", MapBuilder.newMapBuilder().put("key1", "value1").put("key2", true).map()); + data.put("leaf2", MapBuilder.newMapBuilder().put("key1", "value1").put("key2", "value2").put("key3", 3).map()); + data.put("leaf3", MapBuilder.newMapBuilder().put("key1", "value1").put("key2", MapBuilder.newMapBuilder().put("key1", "value1").put("key2", "value2").map()).map()); + + BytesReference bytes = jsonBuilder().value(data).bytes(); + + XContentParser parser = XContentHelper.createParser(bytes); + Set keys = new HashSet<>(Arrays.asList("leaf1.key2")); + Map filteredData = XContentFilterKeysUtils.filterMapOrdered(keys, parser); + assertThat(filteredData.size(), equalTo(1)); + assertThat(selectMap(filteredData, "leaf1").size(), equalTo(1)); + assertThat(selectMap(filteredData, "leaf1").get("key2"), Matchers.equalTo(Boolean.TRUE)); + + parser = XContentHelper.createParser(bytes); + keys = new HashSet<>(Arrays.asList("leaf2")); + filteredData = XContentFilterKeysUtils.filterMapOrdered(keys, parser); + assertThat(filteredData.size(), equalTo(1)); + assertThat(selectMap(filteredData, "leaf2").size(), equalTo(3)); + assertThat(selectMap(filteredData, "leaf2").get("key1"), Matchers.equalTo("value1")); + assertThat(selectMap(filteredData, "leaf2").get("key2"), Matchers.equalTo("value2")); + assertThat(selectMap(filteredData, "leaf2").get("key3"), Matchers.equalTo(3)); + + parser = XContentHelper.createParser(bytes); + keys = new HashSet<>(Arrays.asList("leaf3.key2.key1")); + filteredData = XContentFilterKeysUtils.filterMapOrdered(keys, parser); + assertThat(filteredData.size(), equalTo(1)); + assertThat(selectMap(filteredData, "leaf3").size(), equalTo(1)); + assertThat(selectMap(filteredData, "leaf3", "key2").size(), equalTo(1)); + assertThat(selectMap(filteredData, "leaf3", "key2").get("key1"), Matchers.equalTo("value1")); + + parser = XContentHelper.createParser(bytes); + keys = new HashSet<>(Arrays.asList("leaf1.key1", "leaf2.key2")); + filteredData = XContentFilterKeysUtils.filterMapOrdered(keys, parser); + assertThat(filteredData.size(), equalTo(2)); + assertThat(selectMap(filteredData, "leaf1").size(), equalTo(1)); + assertThat(selectMap(filteredData, "leaf2").size(), equalTo(1)); + assertThat(selectMap(filteredData, "leaf1").get("key1"), Matchers.equalTo("value1")); + assertThat(selectMap(filteredData, "leaf2").get("key2"), Matchers.equalTo("value2")); + + parser = XContentHelper.createParser(bytes); + keys = new HashSet<>(Arrays.asList("leaf2.key1", "leaf2.key3")); + filteredData = XContentFilterKeysUtils.filterMapOrdered(keys, parser); + assertThat(filteredData.size(), equalTo(1)); + assertThat(selectMap(filteredData, "leaf2").size(), equalTo(2)); + assertThat(selectMap(filteredData, "leaf2").get("key1"), Matchers.equalTo("value1")); + assertThat(selectMap(filteredData, "leaf2").get("key3"), Matchers.equalTo(3)); + + parser = XContentHelper.createParser(bytes); + keys = new HashSet<>(Arrays.asList("leaf3.key2.key1", "leaf3.key2.key2")); + filteredData = XContentFilterKeysUtils.filterMapOrdered(keys, parser); + assertThat(filteredData.size(), equalTo(1)); + assertThat(selectMap(filteredData, "leaf3").size(), equalTo(1)); + assertThat(selectMap(filteredData, "leaf3", "key2").size(), equalTo(2)); + assertThat(selectMap(filteredData, "leaf3", "key2").get("key1"), Matchers.equalTo("value1")); + assertThat(selectMap(filteredData, "leaf3", "key2").get("key2"), Matchers.equalTo("value2")); + } + + @SuppressWarnings("unchecked") + private static Map selectMap(Map data, String... path) { + for (String element : path) { + data = (Map) data.get(element); + } + return data; + } + +} diff --git a/src/test/java/org/elasticsearch/watcher/test/AbstractWatcherIntegrationTests.java b/src/test/java/org/elasticsearch/watcher/test/AbstractWatcherIntegrationTests.java index abd71140955..19fcae1b5db 100644 --- a/src/test/java/org/elasticsearch/watcher/test/AbstractWatcherIntegrationTests.java +++ b/src/test/java/org/elasticsearch/watcher/test/AbstractWatcherIntegrationTests.java @@ -220,8 +220,10 @@ public abstract class AbstractWatcherIntegrationTests extends ElasticsearchInteg builder.startObject("input"); { - builder.field("search"); + builder.startObject("search"); + builder.field("request"); WatcherUtils.writeSearchRequest(conditionRequest, builder, ToXContent.EMPTY_PARAMS); + builder.endObject(); } builder.endObject(); diff --git a/src/test/java/org/elasticsearch/watcher/test/WatcherTestUtils.java b/src/test/java/org/elasticsearch/watcher/test/WatcherTestUtils.java index 8abd0e11840..98b34c62e29 100644 --- a/src/test/java/org/elasticsearch/watcher/test/WatcherTestUtils.java +++ b/src/test/java/org/elasticsearch/watcher/test/WatcherTestUtils.java @@ -144,7 +144,7 @@ public final class WatcherTestUtils { watchName, SystemClock.INSTANCE, new ScheduleTrigger(new CronSchedule("0/5 * * * * ? *")), - new SearchInput(logger, scriptService, ClientProxy.of(ElasticsearchIntegrationTest.client()), conditionRequest), + new SearchInput(logger, scriptService, ClientProxy.of(ElasticsearchIntegrationTest.client()), conditionRequest, null), new ScriptCondition(logger, scriptService, new Script("return true")), new SearchTransform(logger, scriptService, ClientProxy.of(ElasticsearchIntegrationTest.client()), transformRequest), new Actions(actions), diff --git a/src/test/java/org/elasticsearch/watcher/test/integration/BasicWatcherTests.java b/src/test/java/org/elasticsearch/watcher/test/integration/BasicWatcherTests.java index cd61a0f13d3..930fb0b6430 100644 --- a/src/test/java/org/elasticsearch/watcher/test/integration/BasicWatcherTests.java +++ b/src/test/java/org/elasticsearch/watcher/test/integration/BasicWatcherTests.java @@ -6,6 +6,8 @@ package org.elasticsearch.watcher.test.integration; import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.script.ScriptService; @@ -13,6 +15,7 @@ import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.watcher.WatcherException; import org.elasticsearch.watcher.client.WatchSourceBuilder; import org.elasticsearch.watcher.client.WatcherClient; +import org.elasticsearch.watcher.history.HistoryStore; import org.elasticsearch.watcher.support.WatcherUtils; import org.elasticsearch.watcher.test.AbstractWatcherIntegrationTests; import org.elasticsearch.watcher.transport.actions.delete.DeleteWatchResponse; @@ -23,6 +26,8 @@ import org.elasticsearch.watcher.trigger.schedule.Schedules; import org.elasticsearch.watcher.watch.WatchStore; import org.junit.Test; +import java.util.Map; + import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.elasticsearch.index.query.FilterBuilders.rangeFilter; import static org.elasticsearch.index.query.QueryBuilders.*; @@ -256,6 +261,49 @@ public class BasicWatcherTests extends AbstractWatcherIntegrationTests { testConditionSearch(searchRequest); } + @Test + public void testInputFiltering() throws Exception { + WatcherClient watcherClient = watcherClient(); + createIndex("idx"); + // Have a sample document in the index, the watch is going to evaluate + client().prepareIndex("idx", "type").setSource("field", "value").get(); + refresh(); + SearchRequest searchRequest = newInputSearchRequest("idx").source(searchSource().query(termQuery("field", "value"))); + watcherClient.preparePutWatch("_name1") + .source(watchSourceBuilder() + .trigger(schedule(interval(5, IntervalSchedule.Interval.Unit.SECONDS))) + .input(searchInput(searchRequest).addExtractKey("hits.total")) + .condition(scriptCondition("ctx.payload.hits.total == 1"))) + .get(); + // in this watcher the condition will fail, because max_score isn't extracted, only total: + watcherClient.preparePutWatch("_name2") + .source(watchSourceBuilder() + .trigger(schedule(interval(5, IntervalSchedule.Interval.Unit.SECONDS))) + .input(searchInput(searchRequest).addExtractKey("hits.total")) + .condition(scriptCondition("ctx.payload.hits.max_score >= 0"))) + .get(); + + if (timeWarped()) { + timeWarp().scheduler().trigger("_name1"); + timeWarp().scheduler().trigger("_name2"); + refresh(); + } + + assertWatchWithMinimumPerformedActionsCount("_name1", 1); + assertWatchWithNoActionNeeded("_name2", 1); + + // Check that the input result payload has been filtered + SearchResponse searchResponse = client().prepareSearch(HistoryStore.INDEX_PREFIX + "*") + .setIndicesOptions(IndicesOptions.lenientExpandOpen()) + .setQuery(matchQuery("watch_name", "_name1")) + .setSize(1) + .get(); + Map payload = (Map) ((Map)((Map)((Map) searchResponse.getHits().getAt(0).sourceAsMap().get("watch_execution")).get("input_result")).get("search")).get("payload"); + assertThat(payload.size(), equalTo(1)); + assertThat(((Map) payload.get("hits")).size(), equalTo(1)); + assertThat((Integer) ((Map) payload.get("hits")).get("total"), equalTo(1)); + } + private void testConditionSearch(SearchRequest request) throws Exception { String watchName = "_name"; assertAcked(prepareCreate("events").addMapping("event", "_timestamp", "enabled=true", "level", "type=string")); diff --git a/src/test/java/org/elasticsearch/watcher/test/integration/BootStrapTests.java b/src/test/java/org/elasticsearch/watcher/test/integration/BootStrapTests.java index 5c2fa22578b..cc4db4ea7e0 100644 --- a/src/test/java/org/elasticsearch/watcher/test/integration/BootStrapTests.java +++ b/src/test/java/org/elasticsearch/watcher/test/integration/BootStrapTests.java @@ -85,7 +85,7 @@ public class BootStrapTests extends AbstractWatcherIntegrationTests { "test-serialization", SystemClock.INSTANCE, new ScheduleTrigger(new CronSchedule("0/5 * * * * ? 2035")), //Set this into the future so we don't get any extra runs - new SearchInput(logger, scriptService(), ClientProxy.of(client()), searchRequest), + new SearchInput(logger, scriptService(), ClientProxy.of(client()), searchRequest, null), new ScriptCondition(logger, scriptService(), new Script("return true")), new SearchTransform(logger, scriptService(), ClientProxy.of(client()), searchRequest), new Actions(new ArrayList()), @@ -146,7 +146,7 @@ public class BootStrapTests extends AbstractWatcherIntegrationTests { SystemClock.INSTANCE, new ScheduleTrigger(new CronSchedule("0/5 * * * * ? 2035")), //Set a cron schedule far into the future so this watch is never scheduled new SearchInput(logger, scriptService(), ClientProxy.of(client()), - searchRequest), + searchRequest, null), new ScriptCondition(logger, scriptService(), new Script("return true")), new SearchTransform(logger, scriptService(), ClientProxy.of(client()), searchRequest), new Actions(new ArrayList()), diff --git a/src/test/java/org/elasticsearch/watcher/test/integration/HttpInputIntegrationTest.java b/src/test/java/org/elasticsearch/watcher/test/integration/HttpInputIntegrationTest.java index 5d5f6926ecb..20c0d5671f4 100644 --- a/src/test/java/org/elasticsearch/watcher/test/integration/HttpInputIntegrationTest.java +++ b/src/test/java/org/elasticsearch/watcher/test/integration/HttpInputIntegrationTest.java @@ -5,24 +5,38 @@ */ package org.elasticsearch.watcher.test.integration; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.node.internal.InternalNode; import org.elasticsearch.watcher.client.WatchSourceBuilder; +import org.elasticsearch.watcher.client.WatcherClient; +import org.elasticsearch.watcher.history.HistoryStore; +import org.elasticsearch.watcher.input.http.HttpInput; import org.elasticsearch.watcher.support.init.proxy.ScriptServiceProxy; import org.elasticsearch.watcher.support.template.ScriptTemplate; import org.elasticsearch.watcher.test.AbstractWatcherIntegrationTests; import org.elasticsearch.watcher.trigger.TriggerBuilders; +import org.elasticsearch.watcher.trigger.schedule.IntervalSchedule; import org.junit.Test; import java.net.InetSocketAddress; +import java.util.Map; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; +import static org.elasticsearch.index.query.QueryBuilders.matchQuery; +import static org.elasticsearch.index.query.QueryBuilders.termQuery; +import static org.elasticsearch.search.builder.SearchSourceBuilder.searchSource; import static org.elasticsearch.watcher.actions.ActionBuilders.indexAction; import static org.elasticsearch.watcher.client.WatchSourceBuilder.watchSourceBuilder; import static org.elasticsearch.watcher.condition.ConditionBuilders.scriptCondition; import static org.elasticsearch.watcher.input.InputBuilders.httpInput; +import static org.elasticsearch.watcher.test.WatcherTestUtils.newInputSearchRequest; +import static org.elasticsearch.watcher.trigger.TriggerBuilders.schedule; import static org.elasticsearch.watcher.trigger.schedule.Schedules.interval; +import static org.hamcrest.Matchers.equalTo; /** */ @@ -69,4 +83,62 @@ public class HttpInputIntegrationTest extends AbstractWatcherIntegrationTests { assertWatchWithMinimumPerformedActionsCount("_name", 1, false); } + @Test + public void testInputFiltering() throws Exception { + WatcherClient watcherClient = watcherClient(); + createIndex("idx"); + // Have a sample document in the index, the watch is going to evaluate + client().prepareIndex("idx", "type").setSource("field", "value").get(); + refresh(); + + ScriptServiceProxy sc = scriptService(); + InetSocketAddress address = internalTestCluster().httpAddresses()[0]; + String body = jsonBuilder().prettyPrint().startObject() + .field("query").value(termQuery("field", "value")) + .endObject().string(); + HttpInput.SourceBuilder httpInputBuilder = httpInput() + .setHost(address.getHostName()) + .setPort(address.getPort()) + .setPath(new ScriptTemplate(sc, "/idx/_search")) + .setBody(new ScriptTemplate(sc, body)) + .addExtractKey("hits.total"); + + SearchRequest searchRequest = newInputSearchRequest("idx").source(searchSource().query(termQuery("field", "value"))); + watcherClient.preparePutWatch("_name1") + .source(watchSourceBuilder() + .trigger(schedule(interval(5, IntervalSchedule.Interval.Unit.SECONDS))) + .input(httpInputBuilder) + .condition(scriptCondition("ctx.payload.hits.total == 1"))) + .get(); + + // in this watcher the condition will fail, because max_score isn't extracted, only total: + watcherClient.preparePutWatch("_name2") + .source(watchSourceBuilder() + .trigger(schedule(interval(5, IntervalSchedule.Interval.Unit.SECONDS))) + .input(httpInputBuilder) + .condition(scriptCondition("ctx.payload.hits.max_score >= 0"))) + .get(); + + if (timeWarped()) { + timeWarp().scheduler().trigger("_name1"); + timeWarp().scheduler().trigger("_name2"); + refresh(); + } + + assertWatchWithMinimumPerformedActionsCount("_name1", 1, false); + assertWatchWithNoActionNeeded("_name2", 1); + + // Check that the input result payload has been filtered + SearchResponse searchResponse = client().prepareSearch(HistoryStore.INDEX_PREFIX + "*") + .setIndicesOptions(IndicesOptions.lenientExpandOpen()) + .setQuery(matchQuery("watch_name", "_name1")) + .setSize(1) + .get(); + Map payload = (Map) ((Map)((Map)((Map) searchResponse.getHits().getAt(0).sourceAsMap().get("watch_execution")).get("input_result")).get("http")).get("payload"); + assertThat(payload.size(), equalTo(1)); + assertThat(((Map) payload.get("hits")).size(), equalTo(1)); + assertThat((Integer) ((Map) payload.get("hits")).get("total"), equalTo(1)); + System.out.println(searchResponse); + } + } diff --git a/src/test/java/org/elasticsearch/watcher/watch/WatchTests.java b/src/test/java/org/elasticsearch/watcher/watch/WatchTests.java index 8f6061935f6..4b7c6e327cb 100644 --- a/src/test/java/org/elasticsearch/watcher/watch/WatchTests.java +++ b/src/test/java/org/elasticsearch/watcher/watch/WatchTests.java @@ -185,7 +185,7 @@ public class WatchTests extends ElasticsearchTestCase { String type = randomFrom(SearchInput.TYPE, SimpleInput.TYPE); switch (type) { case SearchInput.TYPE: - return new SearchInput(logger, scriptService, client, WatcherTestUtils.newInputSearchRequest("idx")); + return new SearchInput(logger, scriptService, client, WatcherTestUtils.newInputSearchRequest("idx"), null); default: return new SimpleInput(logger, new Payload.Simple(ImmutableMap.builder().put("_key", "_val").build())); }