Added `extract` option to filter keys out of the `search` and `http` input.

Via the `extract` option an array of keys can be defined that will be extracted from the input response in a streaming manner and used as payload instead of the entire input response.

http example:

```json
{
  "input" : {
    "http" : {
      "request" : {
         "host" : "host.domain",
         "port" : 9200,
         "path" : "/idx/_search"
      },
      "extract" : ["hits.hits.total", "aggregations.my_agg"]
    }
  }
  ...
}
```

search example:

```json
{
  "input" : {
    "search" : {
      "request" : {
         "indices" : [ "idx" ],
          "body" : {
             "query" : { "match_all" : {} }
          }
       },
       "extract" : ["hits.hits.total", "aggregations.my_agg"]
    }
  }
  ...
}
```

Closes elastic/elasticsearch#167

Original commit: elastic/x-pack-elasticsearch@437c35698b
This commit is contained in:
Martijn van Groningen 2015-03-31 16:01:27 +02:00
parent 3a09914b67
commit efb6ae8e1f
14 changed files with 683 additions and 28 deletions

View File

@ -16,7 +16,9 @@ import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.watcher.input.Input;
import org.elasticsearch.watcher.input.InputException;
import org.elasticsearch.watcher.support.Variables;
import org.elasticsearch.watcher.support.XContentFilterKeysUtils;
import org.elasticsearch.watcher.support.http.HttpClient;
import org.elasticsearch.watcher.support.http.HttpRequest;
import org.elasticsearch.watcher.support.http.HttpResponse;
@ -27,7 +29,9 @@ import org.elasticsearch.watcher.watch.Payload;
import org.elasticsearch.watcher.watch.WatchExecutionContext;
import java.io.IOException;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
/**
*/
@ -36,12 +40,14 @@ public class HttpInput extends Input<HttpInput.Result> {
public static final String TYPE = "http";
private final HttpClient client;
private final Set<String> extractKeys;
private final TemplatedHttpRequest request;
public HttpInput(ESLogger logger, HttpClient client, TemplatedHttpRequest request) {
public HttpInput(ESLogger logger, HttpClient client, TemplatedHttpRequest request, Set<String> extractKeys) {
super(logger);
this.request = request;
this.client = client;
this.extractKeys = extractKeys;
}
@Override
@ -54,14 +60,33 @@ public class HttpInput extends Input<HttpInput.Result> {
Map<String, Object> model = Variables.createCtxModel(ctx, null);
HttpRequest httpRequest = request.render(model);
try (HttpResponse response = client.execute(httpRequest)) {
Tuple<XContentType, Map<String, Object>> result = XContentHelper.convertToMap(response.body(), true);
return new Result(TYPE, new Payload.Simple(result.v2()), httpRequest, response.status());
byte[] bytes = response.body();
final Payload payload;
if (extractKeys != null) {
XContentParser parser = XContentHelper.createParser(bytes, 0, bytes.length);
Map<String, Object> filteredKeys = XContentFilterKeysUtils.filterMapOrdered(extractKeys, parser);
payload = new Payload.Simple(filteredKeys);
} else {
Tuple<XContentType, Map<String, Object>> result = XContentHelper.convertToMap(bytes, true);
payload = new Payload.Simple(result.v2());
}
return new Result(TYPE, payload, httpRequest, response.status());
}
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
return request.toXContent(builder, params);
builder.startObject();
builder.field(Parser.REQUEST_FIELD.getPreferredName());
builder = request.toXContent(builder, params);
if (extractKeys != null) {
builder.startArray(Parser.EXTRACT_FIELD.getPreferredName());
for (String extractKey : extractKeys) {
builder.value(extractKey);
}
builder.endObject();
}
return builder.endObject();
}
@Override
@ -114,6 +139,7 @@ public class HttpInput extends Input<HttpInput.Result> {
public final static class Parser extends AbstractComponent implements Input.Parser<Result, HttpInput> {
public static final ParseField REQUEST_FIELD = new ParseField("request");
public static final ParseField EXTRACT_FIELD = new ParseField("extract");
public static final ParseField HTTP_STATUS_FIELD = new ParseField("http_status");
private final HttpClient client;
@ -135,8 +161,45 @@ public class HttpInput extends Input<HttpInput.Result> {
@Override
public HttpInput parse(XContentParser parser) throws IOException {
TemplatedHttpRequest request = templatedRequestParser.parse(parser);
return new HttpInput(logger, client, request);
Set<String> extract = null;
TemplatedHttpRequest request = null;
String currentFieldName = null;
for (XContentParser.Token token = parser.nextToken(); token != XContentParser.Token.END_OBJECT; token = parser.nextToken()) {
switch (token) {
case FIELD_NAME:
currentFieldName = parser.currentName();
break;
case START_OBJECT:
if (REQUEST_FIELD.getPreferredName().equals(currentFieldName)) {
request = templatedRequestParser.parse(parser);
} else {
throw new InputException("could not parse [http] input. unexpected field [" + currentFieldName + "]");
}
break;
case START_ARRAY:
if (EXTRACT_FIELD.getPreferredName().equals(currentFieldName)) {
extract = new HashSet<>();
for (XContentParser.Token arrayToken = parser.nextToken(); arrayToken != XContentParser.Token.END_ARRAY; arrayToken = parser.nextToken()) {
if (arrayToken == XContentParser.Token.VALUE_STRING) {
extract.add(parser.text());
}
}
} else {
throw new InputException("could not parse [http] input. unexpected field [" + currentFieldName + "]");
}
break;
default:
throw new InputException("could not parse [http] input. unexpected token [" + token + "]");
}
}
if (request == null) {
throw new InputException("could not parse [http] input. http request is missing or null.");
}
return new HttpInput(logger, client, request, extract);
}
@Override
@ -157,7 +220,9 @@ public class HttpInput extends Input<HttpInput.Result> {
request = requestParser.parse(parser);
}
} else if (token == XContentParser.Token.VALUE_NUMBER) {
statusCode = parser.intValue();
if (HTTP_STATUS_FIELD.match(currentFieldName)) {
statusCode = parser.intValue();
}
}
}
return new Result(TYPE, payload, request, statusCode);
@ -175,6 +240,7 @@ public class HttpInput extends Input<HttpInput.Result> {
private Map<String, Template> headers;
private HttpAuth auth;
private Template body;
private Set<String> extractKeys;
public SourceBuilder setHost(String host) {
this.host = host;
@ -216,6 +282,14 @@ public class HttpInput extends Input<HttpInput.Result> {
return this;
}
public SourceBuilder addExtractKey(String key) {
if (extractKeys == null) {
extractKeys = new HashSet<>();
}
extractKeys.add(key);
return this;
}
@Override
public String type() {
return TYPE;
@ -224,6 +298,14 @@ public class HttpInput extends Input<HttpInput.Result> {
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params p) throws IOException {
builder.startObject();
if (extractKeys != null) {
builder.startArray(Parser.EXTRACT_FIELD.getPreferredName());
for (String extractKey : extractKeys) {
builder.value(extractKey);
}
builder.endArray();
}
builder.startObject(Parser.REQUEST_FIELD.getPreferredName());
builder.field(HttpRequest.Parser.HOST_FIELD.getPreferredName(), host);
builder.field(HttpRequest.Parser.PORT_FIELD.getPreferredName(), port);
if (method != null) {
@ -244,6 +326,7 @@ public class HttpInput extends Input<HttpInput.Result> {
if (body != null) {
builder.field(HttpRequest.Parser.BODY_FIELD.getPreferredName(), body);
}
builder.endObject();
return builder.endObject();
}
}

View File

@ -26,13 +26,19 @@ import org.elasticsearch.watcher.input.InputException;
import org.elasticsearch.watcher.support.SearchRequestEquivalence;
import org.elasticsearch.watcher.support.Variables;
import org.elasticsearch.watcher.support.WatcherUtils;
import org.elasticsearch.watcher.support.XContentFilterKeysUtils;
import org.elasticsearch.watcher.support.init.proxy.ClientProxy;
import org.elasticsearch.watcher.support.init.proxy.ScriptServiceProxy;
import org.elasticsearch.watcher.watch.Payload;
import org.elasticsearch.watcher.watch.WatchExecutionContext;
import java.io.IOException;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.common.xcontent.XContentParser.*;
/**
* An input that executes search and returns the search response as the initial payload
@ -43,13 +49,15 @@ public class SearchInput extends Input<SearchInput.Result> {
public static final SearchType DEFAULT_SEARCH_TYPE = SearchType.COUNT;
private final Set<String> extractKeys;
private final SearchRequest searchRequest;
private final ScriptServiceProxy scriptService;
private final ClientProxy client;
public SearchInput(ESLogger logger, ScriptServiceProxy scriptService, ClientProxy client, SearchRequest searchRequest) {
public SearchInput(ESLogger logger, ScriptServiceProxy scriptService, ClientProxy client, SearchRequest searchRequest, Set<String> extractKeys) {
super(logger);
this.extractKeys = extractKeys;
this.searchRequest = searchRequest;
this.scriptService = scriptService;
this.client = client;
@ -78,12 +86,32 @@ public class SearchInput extends Input<SearchInput.Result> {
}
return new Result(TYPE, new Payload.XContent(response), request);
final Payload payload;
if (extractKeys != null) {
XContentBuilder builder = jsonBuilder().startObject().value(response).endObject();
XContentParser parser = XContentHelper.createParser(builder.bytes());
Map<String, Object> filteredKeys = XContentFilterKeysUtils.filterMapOrdered(extractKeys, parser);
payload = new Payload.Simple(filteredKeys);
} else {
payload = new Payload.XContent(response);
}
return new Result(TYPE, payload, request);
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
return WatcherUtils.writeSearchRequest(searchRequest, builder, params);
builder.startObject();
builder.field(Parser.REQUEST_FIELD.getPreferredName());
builder = WatcherUtils.writeSearchRequest(searchRequest, builder, params);
if (extractKeys != null) {
builder.startArray(Parser.EXTRACT_FIELD.getPreferredName());
for (String extractKey : extractKeys) {
builder.value(extractKey);
}
builder.endObject();
}
builder.endObject();
return builder;
}
@Override
@ -150,6 +178,7 @@ public class SearchInput extends Input<SearchInput.Result> {
public static class Parser extends AbstractComponent implements Input.Parser<Result,SearchInput> {
public static ParseField REQUEST_FIELD = new ParseField("request");
public static ParseField EXTRACT_FIELD = new ParseField("extract");
private final ScriptServiceProxy scriptService;
private final ClientProxy client;
@ -167,11 +196,44 @@ public class SearchInput extends Input<SearchInput.Result> {
@Override
public SearchInput parse(XContentParser parser) throws IOException {
SearchRequest request = WatcherUtils.readSearchRequest(parser, DEFAULT_SEARCH_TYPE);
Set<String> extract = null;
SearchRequest request = null;
String currentFieldName = null;
for (Token token = parser.nextToken(); token != Token.END_OBJECT; token = parser.nextToken()) {
switch (token) {
case FIELD_NAME:
currentFieldName = parser.currentName();
break;
case START_OBJECT:
if (REQUEST_FIELD.getPreferredName().equals(currentFieldName)) {
request = WatcherUtils.readSearchRequest(parser, DEFAULT_SEARCH_TYPE);
} else {
throw new InputException("could not parse [search] input. unexpected field [" + currentFieldName + "]");
}
break;
case START_ARRAY:
if (EXTRACT_FIELD.getPreferredName().equals(currentFieldName)) {
extract = new HashSet<>();
for (Token arrayToken = parser.nextToken(); arrayToken != Token.END_ARRAY; arrayToken = parser.nextToken()) {
if (arrayToken == Token.VALUE_STRING) {
extract.add(parser.text());
}
}
} else {
throw new InputException("could not parse [search] input. unexpected field [" + currentFieldName + "]");
}
break;
default:
throw new InputException("could not parse [search] input. unexpected token [" + token + "]");
}
}
if (request == null) {
throw new InputException("could not parse [search] input. search request is missing or null.");
}
return new SearchInput(logger, scriptService, client, request);
return new SearchInput(logger, scriptService, client, request, extract);
}
@Override
@ -180,11 +242,10 @@ public class SearchInput extends Input<SearchInput.Result> {
SearchRequest request = null;
String currentFieldName = null;
XContentParser.Token token;
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
for (Token token = parser.nextToken(); token != Token.END_OBJECT; token = parser.nextToken()) {
if (token == Token.FIELD_NAME) {
currentFieldName = parser.currentName();
} else if (token == XContentParser.Token.START_OBJECT && currentFieldName != null) {
} else if (token == Token.START_OBJECT && currentFieldName != null) {
if (Input.Result.PAYLOAD_FIELD.match(currentFieldName)) {
payload = new Payload.XContent(parser);
} else if (REQUEST_FIELD.match(currentFieldName)) {
@ -212,11 +273,20 @@ public class SearchInput extends Input<SearchInput.Result> {
public static class SourceBuilder implements Input.SourceBuilder {
private final SearchRequest request;
private Set<String> extractKeys;
public SourceBuilder(SearchRequest request) {
this.request = request;
}
public SourceBuilder addExtractKey(String key) {
if (extractKeys == null) {
extractKeys = new HashSet<>();
}
extractKeys.add(key);
return this;
}
@Override
public String type() {
return TYPE;
@ -224,7 +294,17 @@ public class SearchInput extends Input<SearchInput.Result> {
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
return WatcherUtils.writeSearchRequest(request, builder, params);
builder.startObject();
if (extractKeys != null) {
builder.startArray(Parser.EXTRACT_FIELD.getPreferredName());
for (String extractKey : extractKeys) {
builder.value(extractKey);
}
builder.endArray();
}
builder.field(Parser.REQUEST_FIELD.getPreferredName());
builder = WatcherUtils.writeSearchRequest(request, builder, params);
return builder.endObject();
}
}
}

View File

@ -0,0 +1,162 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.watcher.support;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.watcher.WatcherException;
import java.io.IOException;
import java.util.*;
import static org.elasticsearch.common.xcontent.XContentParser.Token.*;
public final class XContentFilterKeysUtils {
private XContentFilterKeysUtils() {
}
public static Map<String, Object> filterMapOrdered(Set<String> keys, XContentParser parser) {
try {
if (parser.currentToken() != null) {
throw new IllegalArgumentException("Parser already started");
}
if (parser.nextToken() != START_OBJECT) {
throw new IllegalArgumentException("Content should start with START_OBJECT");
}
State state = new State(new ArrayList<>(keys));
return parse(parser, state);
} catch (IOException e) {
throw new WatcherException("could not build a filtered payload out of xcontent", e);
}
}
private static Map<String, Object> parse(XContentParser parser, State state) throws IOException {
if (state.includeLeaf) {
return parser.map();
}
Map<String, Object> data = new HashMap<>();
for (XContentParser.Token token = parser.nextToken(); token != END_OBJECT; token = parser.nextToken()) {
switch (token) {
case FIELD_NAME:
state.nextField(parser.currentName());
break;
case START_OBJECT:
if (state.includeKey) {
String fieldName = state.currentFieldName();
Map<String, Object> nestedData = parse(parser, state);
data.put(fieldName, nestedData);
} else {
parser.skipChildren();
}
state.previousField();
break;
case START_ARRAY:
if (state.includeKey) {
String fieldName = state.currentFieldName();
List<Object> arrayData = arrayParsing(parser, state);
data.put(fieldName, arrayData);
} else {
parser.skipChildren();
}
state.previousField();
break;
case VALUE_STRING:
if (state.includeKey) {
data.put(state.currentFieldName(), parser.text());
}
state.previousField();
break;
case VALUE_NUMBER:
if (state.includeKey) {
data.put(state.currentFieldName(), parser.numberValue());
}
state.previousField();
break;
case VALUE_BOOLEAN:
if (state.includeKey) {
data.put(state.currentFieldName(), parser.booleanValue());
}
state.previousField();
break;
}
}
return data;
}
private static List<Object> arrayParsing(XContentParser parser, State state) throws IOException {
List<Object> values = new ArrayList<>();
for (XContentParser.Token token = parser.nextToken(); token != END_ARRAY; token = parser.nextToken()) {
switch (token) {
case START_OBJECT:
values.add(parse(parser, state));
break;
case VALUE_STRING:
values.add(parser.text());
break;
case VALUE_NUMBER:
values.add(parser.numberValue());
break;
case VALUE_BOOLEAN:
values.add(parser.booleanValue());
break;
}
}
return values;
}
private static final class State {
final List<String> extractPaths;
StringBuilder currentPath = new StringBuilder();
boolean includeLeaf;
boolean includeKey;
String currentFieldName;
private State(List<String> extractPaths) {
this.extractPaths = extractPaths;
}
void nextField(String fieldName) {
currentFieldName = fieldName;
if (currentPath.length() != 0) {
currentPath.append('.');
}
currentPath = currentPath.append(fieldName);
final String path = currentPath.toString();
for (String extractPath : extractPaths) {
if (path.equals(extractPath)) {
includeKey = true;
includeLeaf = true;
return;
} else if (extractPath.startsWith(path)) {
includeKey = true;
return;
}
}
includeKey = false;
includeLeaf = false;
}
String currentFieldName() {
return currentFieldName;
}
void previousField() {
int start = currentPath.lastIndexOf(currentFieldName);
currentPath = currentPath.delete(start, currentPath.length());
if (currentPath.length() > 0 && currentPath.charAt(currentPath.length() - 1) == '.') {
currentPath = currentPath.deleteCharAt(currentPath.length() - 1);
}
currentFieldName = currentPath.toString();
includeKey = false;
includeLeaf = false;
}
}
}

View File

@ -83,9 +83,9 @@ public class HttpClient extends AbstractComponent {
}
HttpResponse response = new HttpResponse();
response.inputStream(urlConnection.getInputStream());
response.status(urlConnection.getResponseCode());
logger.debug("http status code: {}", response.status());
response.inputStream(urlConnection.getInputStream());
return response;
}

View File

@ -14,6 +14,7 @@ import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.watcher.support.WatcherUtils;
import org.elasticsearch.watcher.support.http.auth.HttpAuth;
import org.elasticsearch.watcher.support.http.auth.HttpAuthRegistry;
import org.elasticsearch.watcher.support.template.Template;
import java.io.IOException;
import java.util.Map;
@ -217,4 +218,82 @@ public class HttpRequest implements ToXContent {
}
public final static class SourceBuilder implements ToXContent {
private String host;
private int port;
private String method;
private Template path;
private Map<String, Template> params;
private Map<String, Template> headers;
private HttpAuth auth;
private Template body;
public SourceBuilder setHost(String host) {
this.host = host;
return this;
}
public SourceBuilder setPort(int port) {
this.port = port;
return this;
}
public SourceBuilder setMethod(String method) {
this.method = method;
return this;
}
public SourceBuilder setPath(Template path) {
this.path = path;
return this;
}
public SourceBuilder setParams(Map<String, Template> params) {
this.params = params;
return this;
}
public SourceBuilder setHeaders(Map<String, Template> headers) {
this.headers = headers;
return this;
}
public SourceBuilder setAuth(HttpAuth auth) {
this.auth = auth;
return this;
}
public SourceBuilder setBody(Template body) {
this.body = body;
return this;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params p) throws IOException {
builder.startObject();
builder.field(HttpRequest.Parser.HOST_FIELD.getPreferredName(), host);
builder.field(HttpRequest.Parser.PORT_FIELD.getPreferredName(), port);
if (method != null) {
builder.field(HttpRequest.Parser.METHOD_FIELD.getPreferredName(), method);
}
if (path != null) {
builder.field(HttpRequest.Parser.PATH_FIELD.getPreferredName(), path);
}
if (params != null) {
builder.field(HttpRequest.Parser.PARAMS_FIELD.getPreferredName(), params);
}
if (headers != null) {
builder.field(HttpRequest.Parser.HEADERS_FIELD.getPreferredName(), headers);
}
if (auth != null) {
builder.field(HttpRequest.Parser.AUTH_FIELD.getPreferredName(), auth);
}
if (body != null) {
builder.field(HttpRequest.Parser.BODY_FIELD.getPreferredName(), body);
}
return builder.endObject();
}
}
}

View File

@ -79,7 +79,7 @@ public class HttpInputTests extends ElasticsearchTestCase {
Template mockBody = mock(Template.class);
when(mockBody.render(anyMap())).thenReturn(body);
request.body(mockBody);
HttpInput input = new HttpInput(logger, httpClient, request);
HttpInput input = new HttpInput(logger, httpClient, request, null);
HttpResponse response = new HttpResponse();
response.status(123);
@ -163,7 +163,7 @@ public class HttpInputTests extends ElasticsearchTestCase {
String httpMethod = "get";
String body = "_body";
Map<String, Template> headers = new MapBuilder<String, Template>().put("a", new MockTemplate("b")).map();
HttpInput.SourceBuilder sourceBuilder = new HttpInput.SourceBuilder()
HttpRequest.SourceBuilder sourceBuilder = new HttpRequest.SourceBuilder()
.setMethod(httpMethod)
.setHost("_host")
.setPort(123)

View File

@ -66,7 +66,7 @@ public class SearchInputTests extends ElasticsearchIntegrationTest {
SearchInput searchInput = new SearchInput(logger,
ScriptServiceProxy.of(internalCluster().getInstance(ScriptService.class)),
ClientProxy.of(client()), request);
ClientProxy.of(client()), request, null);
WatchExecutionContext ctx = new WatchExecutionContext("test-watch",
new Watch("test-alert",
new ClockMock(),
@ -103,7 +103,7 @@ public class SearchInputTests extends ElasticsearchIntegrationTest {
SearchInput searchInput = new SearchInput(logger,
ScriptServiceProxy.of(internalCluster().getInstance(ScriptService.class)),
ClientProxy.of(client()), request);
ClientProxy.of(client()), request, null);
WatchExecutionContext ctx = new WatchExecutionContext("test-watch",
new Watch("test-alert",
new ClockMock(),
@ -134,7 +134,8 @@ public class SearchInputTests extends ElasticsearchIntegrationTest {
.source(searchSource()
.query(filteredQuery(matchQuery("event_type", "a"), rangeFilter("_timestamp").from("{{ctx.trigger.scheduled_time}}||-30s").to("{{ctx.trigger.triggered_time}}"))));
XContentBuilder builder = WatcherUtils.writeSearchRequest(request, jsonBuilder(), ToXContent.EMPTY_PARAMS);
SearchInput.SourceBuilder sourceBuilder = new SearchInput.SourceBuilder(request);
XContentBuilder builder = jsonBuilder().value(sourceBuilder);
XContentParser parser = JsonXContent.jsonXContent.createParser(builder.bytes());
parser.nextToken();

View File

@ -0,0 +1,128 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.watcher.support;
import com.carrotsearch.randomizedtesting.annotations.Repeat;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.test.ElasticsearchTestCase;
import org.hamcrest.Matchers;
import org.junit.Test;
import java.util.*;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.hamcrest.Matchers.equalTo;
/**
*/
public class FilterXContentTests extends ElasticsearchTestCase {
@Test
@Repeat(iterations = 12)
public void testPayloadFiltering() throws Exception {
Map<String, Object> data = new HashMap<>();
data.put("key0", "value1");
data.put("key1", 2);
data.put("key2", 3.1);
data.put("key3", true);
data.put("key4", Arrays.asList("value5", "value5.5"));
data.put("key5", "value6");
data.put("key6", 7.1);
data.put("key7", false);
XContentBuilder builder = jsonBuilder().value(data);
XContentParser parser = XContentHelper.createParser(builder.bytes());
Set<String> keys = new HashSet<>();
int numKeys = randomInt(3);
for (int i = 0; i < numKeys; i++) {
boolean added;
do {
added = keys.add("key" + randomInt(7));
} while (!added);
}
System.out.println(keys);
Map<String, Object> filteredData = XContentFilterKeysUtils.filterMapOrdered(keys, parser);
assertThat(filteredData.size(), equalTo(numKeys));
for (String key : keys) {
assertThat(filteredData.get(key), equalTo(data.get(key)));
}
}
@Test
public void testNestedPayloadFiltering() throws Exception {
Map<String, Object> data = new HashMap<>();
data.put("leaf1", MapBuilder.newMapBuilder().put("key1", "value1").put("key2", true).map());
data.put("leaf2", MapBuilder.newMapBuilder().put("key1", "value1").put("key2", "value2").put("key3", 3).map());
data.put("leaf3", MapBuilder.newMapBuilder().put("key1", "value1").put("key2", MapBuilder.newMapBuilder().put("key1", "value1").put("key2", "value2").map()).map());
BytesReference bytes = jsonBuilder().value(data).bytes();
XContentParser parser = XContentHelper.createParser(bytes);
Set<String> keys = new HashSet<>(Arrays.asList("leaf1.key2"));
Map<String, Object> filteredData = XContentFilterKeysUtils.filterMapOrdered(keys, parser);
assertThat(filteredData.size(), equalTo(1));
assertThat(selectMap(filteredData, "leaf1").size(), equalTo(1));
assertThat(selectMap(filteredData, "leaf1").get("key2"), Matchers.<Object>equalTo(Boolean.TRUE));
parser = XContentHelper.createParser(bytes);
keys = new HashSet<>(Arrays.asList("leaf2"));
filteredData = XContentFilterKeysUtils.filterMapOrdered(keys, parser);
assertThat(filteredData.size(), equalTo(1));
assertThat(selectMap(filteredData, "leaf2").size(), equalTo(3));
assertThat(selectMap(filteredData, "leaf2").get("key1"), Matchers.<Object>equalTo("value1"));
assertThat(selectMap(filteredData, "leaf2").get("key2"), Matchers.<Object>equalTo("value2"));
assertThat(selectMap(filteredData, "leaf2").get("key3"), Matchers.<Object>equalTo(3));
parser = XContentHelper.createParser(bytes);
keys = new HashSet<>(Arrays.asList("leaf3.key2.key1"));
filteredData = XContentFilterKeysUtils.filterMapOrdered(keys, parser);
assertThat(filteredData.size(), equalTo(1));
assertThat(selectMap(filteredData, "leaf3").size(), equalTo(1));
assertThat(selectMap(filteredData, "leaf3", "key2").size(), equalTo(1));
assertThat(selectMap(filteredData, "leaf3", "key2").get("key1"), Matchers.<Object>equalTo("value1"));
parser = XContentHelper.createParser(bytes);
keys = new HashSet<>(Arrays.asList("leaf1.key1", "leaf2.key2"));
filteredData = XContentFilterKeysUtils.filterMapOrdered(keys, parser);
assertThat(filteredData.size(), equalTo(2));
assertThat(selectMap(filteredData, "leaf1").size(), equalTo(1));
assertThat(selectMap(filteredData, "leaf2").size(), equalTo(1));
assertThat(selectMap(filteredData, "leaf1").get("key1"), Matchers.<Object>equalTo("value1"));
assertThat(selectMap(filteredData, "leaf2").get("key2"), Matchers.<Object>equalTo("value2"));
parser = XContentHelper.createParser(bytes);
keys = new HashSet<>(Arrays.asList("leaf2.key1", "leaf2.key3"));
filteredData = XContentFilterKeysUtils.filterMapOrdered(keys, parser);
assertThat(filteredData.size(), equalTo(1));
assertThat(selectMap(filteredData, "leaf2").size(), equalTo(2));
assertThat(selectMap(filteredData, "leaf2").get("key1"), Matchers.<Object>equalTo("value1"));
assertThat(selectMap(filteredData, "leaf2").get("key3"), Matchers.<Object>equalTo(3));
parser = XContentHelper.createParser(bytes);
keys = new HashSet<>(Arrays.asList("leaf3.key2.key1", "leaf3.key2.key2"));
filteredData = XContentFilterKeysUtils.filterMapOrdered(keys, parser);
assertThat(filteredData.size(), equalTo(1));
assertThat(selectMap(filteredData, "leaf3").size(), equalTo(1));
assertThat(selectMap(filteredData, "leaf3", "key2").size(), equalTo(2));
assertThat(selectMap(filteredData, "leaf3", "key2").get("key1"), Matchers.<Object>equalTo("value1"));
assertThat(selectMap(filteredData, "leaf3", "key2").get("key2"), Matchers.<Object>equalTo("value2"));
}
@SuppressWarnings("unchecked")
private static Map<String, Object> selectMap(Map<String, Object> data, String... path) {
for (String element : path) {
data = (Map<String, Object>) data.get(element);
}
return data;
}
}

View File

@ -220,8 +220,10 @@ public abstract class AbstractWatcherIntegrationTests extends ElasticsearchInteg
builder.startObject("input");
{
builder.field("search");
builder.startObject("search");
builder.field("request");
WatcherUtils.writeSearchRequest(conditionRequest, builder, ToXContent.EMPTY_PARAMS);
builder.endObject();
}
builder.endObject();

View File

@ -144,7 +144,7 @@ public final class WatcherTestUtils {
watchName,
SystemClock.INSTANCE,
new ScheduleTrigger(new CronSchedule("0/5 * * * * ? *")),
new SearchInput(logger, scriptService, ClientProxy.of(ElasticsearchIntegrationTest.client()), conditionRequest),
new SearchInput(logger, scriptService, ClientProxy.of(ElasticsearchIntegrationTest.client()), conditionRequest, null),
new ScriptCondition(logger, scriptService, new Script("return true")),
new SearchTransform(logger, scriptService, ClientProxy.of(ElasticsearchIntegrationTest.client()), transformRequest),
new Actions(actions),

View File

@ -6,6 +6,8 @@
package org.elasticsearch.watcher.test.integration;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.script.ScriptService;
@ -13,6 +15,7 @@ import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.watcher.WatcherException;
import org.elasticsearch.watcher.client.WatchSourceBuilder;
import org.elasticsearch.watcher.client.WatcherClient;
import org.elasticsearch.watcher.history.HistoryStore;
import org.elasticsearch.watcher.support.WatcherUtils;
import org.elasticsearch.watcher.test.AbstractWatcherIntegrationTests;
import org.elasticsearch.watcher.transport.actions.delete.DeleteWatchResponse;
@ -23,6 +26,8 @@ import org.elasticsearch.watcher.trigger.schedule.Schedules;
import org.elasticsearch.watcher.watch.WatchStore;
import org.junit.Test;
import java.util.Map;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.index.query.FilterBuilders.rangeFilter;
import static org.elasticsearch.index.query.QueryBuilders.*;
@ -256,6 +261,49 @@ public class BasicWatcherTests extends AbstractWatcherIntegrationTests {
testConditionSearch(searchRequest);
}
@Test
public void testInputFiltering() throws Exception {
WatcherClient watcherClient = watcherClient();
createIndex("idx");
// Have a sample document in the index, the watch is going to evaluate
client().prepareIndex("idx", "type").setSource("field", "value").get();
refresh();
SearchRequest searchRequest = newInputSearchRequest("idx").source(searchSource().query(termQuery("field", "value")));
watcherClient.preparePutWatch("_name1")
.source(watchSourceBuilder()
.trigger(schedule(interval(5, IntervalSchedule.Interval.Unit.SECONDS)))
.input(searchInput(searchRequest).addExtractKey("hits.total"))
.condition(scriptCondition("ctx.payload.hits.total == 1")))
.get();
// in this watcher the condition will fail, because max_score isn't extracted, only total:
watcherClient.preparePutWatch("_name2")
.source(watchSourceBuilder()
.trigger(schedule(interval(5, IntervalSchedule.Interval.Unit.SECONDS)))
.input(searchInput(searchRequest).addExtractKey("hits.total"))
.condition(scriptCondition("ctx.payload.hits.max_score >= 0")))
.get();
if (timeWarped()) {
timeWarp().scheduler().trigger("_name1");
timeWarp().scheduler().trigger("_name2");
refresh();
}
assertWatchWithMinimumPerformedActionsCount("_name1", 1);
assertWatchWithNoActionNeeded("_name2", 1);
// Check that the input result payload has been filtered
SearchResponse searchResponse = client().prepareSearch(HistoryStore.INDEX_PREFIX + "*")
.setIndicesOptions(IndicesOptions.lenientExpandOpen())
.setQuery(matchQuery("watch_name", "_name1"))
.setSize(1)
.get();
Map payload = (Map) ((Map)((Map)((Map) searchResponse.getHits().getAt(0).sourceAsMap().get("watch_execution")).get("input_result")).get("search")).get("payload");
assertThat(payload.size(), equalTo(1));
assertThat(((Map) payload.get("hits")).size(), equalTo(1));
assertThat((Integer) ((Map) payload.get("hits")).get("total"), equalTo(1));
}
private void testConditionSearch(SearchRequest request) throws Exception {
String watchName = "_name";
assertAcked(prepareCreate("events").addMapping("event", "_timestamp", "enabled=true", "level", "type=string"));

View File

@ -85,7 +85,7 @@ public class BootStrapTests extends AbstractWatcherIntegrationTests {
"test-serialization",
SystemClock.INSTANCE,
new ScheduleTrigger(new CronSchedule("0/5 * * * * ? 2035")), //Set this into the future so we don't get any extra runs
new SearchInput(logger, scriptService(), ClientProxy.of(client()), searchRequest),
new SearchInput(logger, scriptService(), ClientProxy.of(client()), searchRequest, null),
new ScriptCondition(logger, scriptService(), new Script("return true")),
new SearchTransform(logger, scriptService(), ClientProxy.of(client()), searchRequest),
new Actions(new ArrayList<Action>()),
@ -146,7 +146,7 @@ public class BootStrapTests extends AbstractWatcherIntegrationTests {
SystemClock.INSTANCE,
new ScheduleTrigger(new CronSchedule("0/5 * * * * ? 2035")), //Set a cron schedule far into the future so this watch is never scheduled
new SearchInput(logger, scriptService(), ClientProxy.of(client()),
searchRequest),
searchRequest, null),
new ScriptCondition(logger, scriptService(), new Script("return true")),
new SearchTransform(logger, scriptService(), ClientProxy.of(client()), searchRequest),
new Actions(new ArrayList<Action>()),

View File

@ -5,24 +5,38 @@
*/
package org.elasticsearch.watcher.test.integration;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.node.internal.InternalNode;
import org.elasticsearch.watcher.client.WatchSourceBuilder;
import org.elasticsearch.watcher.client.WatcherClient;
import org.elasticsearch.watcher.history.HistoryStore;
import org.elasticsearch.watcher.input.http.HttpInput;
import org.elasticsearch.watcher.support.init.proxy.ScriptServiceProxy;
import org.elasticsearch.watcher.support.template.ScriptTemplate;
import org.elasticsearch.watcher.test.AbstractWatcherIntegrationTests;
import org.elasticsearch.watcher.trigger.TriggerBuilders;
import org.elasticsearch.watcher.trigger.schedule.IntervalSchedule;
import org.junit.Test;
import java.net.InetSocketAddress;
import java.util.Map;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.index.query.QueryBuilders.matchQuery;
import static org.elasticsearch.index.query.QueryBuilders.termQuery;
import static org.elasticsearch.search.builder.SearchSourceBuilder.searchSource;
import static org.elasticsearch.watcher.actions.ActionBuilders.indexAction;
import static org.elasticsearch.watcher.client.WatchSourceBuilder.watchSourceBuilder;
import static org.elasticsearch.watcher.condition.ConditionBuilders.scriptCondition;
import static org.elasticsearch.watcher.input.InputBuilders.httpInput;
import static org.elasticsearch.watcher.test.WatcherTestUtils.newInputSearchRequest;
import static org.elasticsearch.watcher.trigger.TriggerBuilders.schedule;
import static org.elasticsearch.watcher.trigger.schedule.Schedules.interval;
import static org.hamcrest.Matchers.equalTo;
/**
*/
@ -69,4 +83,62 @@ public class HttpInputIntegrationTest extends AbstractWatcherIntegrationTests {
assertWatchWithMinimumPerformedActionsCount("_name", 1, false);
}
@Test
public void testInputFiltering() throws Exception {
WatcherClient watcherClient = watcherClient();
createIndex("idx");
// Have a sample document in the index, the watch is going to evaluate
client().prepareIndex("idx", "type").setSource("field", "value").get();
refresh();
ScriptServiceProxy sc = scriptService();
InetSocketAddress address = internalTestCluster().httpAddresses()[0];
String body = jsonBuilder().prettyPrint().startObject()
.field("query").value(termQuery("field", "value"))
.endObject().string();
HttpInput.SourceBuilder httpInputBuilder = httpInput()
.setHost(address.getHostName())
.setPort(address.getPort())
.setPath(new ScriptTemplate(sc, "/idx/_search"))
.setBody(new ScriptTemplate(sc, body))
.addExtractKey("hits.total");
SearchRequest searchRequest = newInputSearchRequest("idx").source(searchSource().query(termQuery("field", "value")));
watcherClient.preparePutWatch("_name1")
.source(watchSourceBuilder()
.trigger(schedule(interval(5, IntervalSchedule.Interval.Unit.SECONDS)))
.input(httpInputBuilder)
.condition(scriptCondition("ctx.payload.hits.total == 1")))
.get();
// in this watcher the condition will fail, because max_score isn't extracted, only total:
watcherClient.preparePutWatch("_name2")
.source(watchSourceBuilder()
.trigger(schedule(interval(5, IntervalSchedule.Interval.Unit.SECONDS)))
.input(httpInputBuilder)
.condition(scriptCondition("ctx.payload.hits.max_score >= 0")))
.get();
if (timeWarped()) {
timeWarp().scheduler().trigger("_name1");
timeWarp().scheduler().trigger("_name2");
refresh();
}
assertWatchWithMinimumPerformedActionsCount("_name1", 1, false);
assertWatchWithNoActionNeeded("_name2", 1);
// Check that the input result payload has been filtered
SearchResponse searchResponse = client().prepareSearch(HistoryStore.INDEX_PREFIX + "*")
.setIndicesOptions(IndicesOptions.lenientExpandOpen())
.setQuery(matchQuery("watch_name", "_name1"))
.setSize(1)
.get();
Map payload = (Map) ((Map)((Map)((Map) searchResponse.getHits().getAt(0).sourceAsMap().get("watch_execution")).get("input_result")).get("http")).get("payload");
assertThat(payload.size(), equalTo(1));
assertThat(((Map) payload.get("hits")).size(), equalTo(1));
assertThat((Integer) ((Map) payload.get("hits")).get("total"), equalTo(1));
System.out.println(searchResponse);
}
}

View File

@ -185,7 +185,7 @@ public class WatchTests extends ElasticsearchTestCase {
String type = randomFrom(SearchInput.TYPE, SimpleInput.TYPE);
switch (type) {
case SearchInput.TYPE:
return new SearchInput(logger, scriptService, client, WatcherTestUtils.newInputSearchRequest("idx"));
return new SearchInput(logger, scriptService, client, WatcherTestUtils.newInputSearchRequest("idx"), null);
default:
return new SimpleInput(logger, new Payload.Simple(ImmutableMap.<String, Object>builder().put("_key", "_val").build()));
}