[transform] fixed and added unit tests for SearchTransform

- Also, the search template/script are not populated not just by the fired/scheduled time, but also by the payload

Original commit: elastic/x-pack-elasticsearch@7ca8331a1c
This commit is contained in:
uboness 2015-02-25 13:56:42 +02:00
parent 46cefe261a
commit 3aa988472c
5 changed files with 306 additions and 16 deletions

View File

@ -137,7 +137,7 @@ public final class AlertUtils {
searchRequest.templateType(ScriptService.ScriptType.valueOf(parser.text().toUpperCase(Locale.ROOT)));
break;
case "search_type":
searchType = SearchType.fromString(parser.text());
searchType = SearchType.fromString(parser.text().toLowerCase(Locale.ROOT));
break;
default:
throw new ElasticsearchIllegalArgumentException("Unexpected field [" + searchRequestFieldName + "]");

View File

@ -10,6 +10,7 @@ package org.elasticsearch.alerts.support;
*/
public final class Variables {
public static final String PAYLOAD = "payload";
public static final String FIRE_TIME = "fire_time";
public static final String SCHEDULED_FIRE_TIME = "scheduled_fire_time";

View File

@ -11,7 +11,6 @@ import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.alerts.ExecutionContext;
import org.elasticsearch.alerts.Payload;
import org.elasticsearch.alerts.support.AlertUtils;
import org.elasticsearch.alerts.support.Variables;
import org.elasticsearch.alerts.support.init.proxy.ClientProxy;
import org.elasticsearch.alerts.support.init.proxy.ScriptServiceProxy;
import org.elasticsearch.common.Strings;
@ -22,6 +21,7 @@ import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.joda.time.DateTime;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentHelper;
@ -30,6 +30,7 @@ import org.elasticsearch.script.ExecutableScript;
import org.elasticsearch.script.ScriptService;
import java.io.IOException;
import java.lang.reflect.Array;
import java.util.HashMap;
import java.util.Map;
@ -38,7 +39,7 @@ import static org.elasticsearch.alerts.support.AlertsDateUtils.formatDate;
/**
*
*/
public class SearchTransform implements Transform {
public class SearchTransform extends Transform {
public static final String TYPE = "search";
@ -63,7 +64,7 @@ public class SearchTransform implements Transform {
@Override
public Transform.Result apply(ExecutionContext ctx, Payload payload) throws IOException {
SearchRequest req = createRequest(request, ctx.scheduledTime(), ctx.fireTime(), payload.data());
SearchRequest req = createRequest(request, ctx, payload);
SearchResponse resp = client.search(req).actionGet();
return new Transform.Result(TYPE, new Payload.ActionResponse(resp));
}
@ -74,21 +75,17 @@ public class SearchTransform implements Transform {
return builder;
}
public SearchRequest createRequest(SearchRequest requestPrototype, DateTime scheduledFireTime, DateTime fireTime, Map<String, Object> data) throws IOException {
public SearchRequest createRequest(SearchRequest requestPrototype, ExecutionContext ctx, Payload payload) throws IOException {
SearchRequest request = new SearchRequest(requestPrototype)
.indicesOptions(requestPrototype.indicesOptions())
.indices(requestPrototype.indices());
if (Strings.hasLength(requestPrototype.source())) {
Map<String, String> templateParams = new HashMap<>();
templateParams.put(Variables.SCHEDULED_FIRE_TIME, formatDate(scheduledFireTime));
templateParams.put(Variables.FIRE_TIME, formatDate(fireTime));
String requestSource = XContentHelper.convertToJson(requestPrototype.source(), false);
ExecutableScript script = scriptService.executable("mustache", requestSource, ScriptService.ScriptType.INLINE, templateParams);
ExecutableScript script = scriptService.executable("mustache", requestSource, ScriptService.ScriptType.INLINE, createModel(ctx, payload));
request.source((BytesReference) script.unwrap(script.run()), false);
} else if (requestPrototype.templateName() != null) {
MapBuilder<String, String> templateParams = MapBuilder.newMapBuilder(requestPrototype.templateParams())
.put(Variables.SCHEDULED_FIRE_TIME, formatDate(scheduledFireTime))
.put(Variables.FIRE_TIME, formatDate(fireTime));
.putAll(flatten(createModel(ctx, payload)));
request.templateParams(templateParams.map());
request.templateName(requestPrototype.templateName());
request.templateType(requestPrototype.templateType());
@ -98,6 +95,47 @@ public class SearchTransform implements Transform {
return request;
}
static Map<String, String> flatten(Map<String, Object> map) {
Map<String, String> result = new HashMap<>();
flatten("", map, result);
return result;
}
private static void flatten(String key, Object value, Map<String, String> result) {
if (value instanceof Map) {
for (Map.Entry<String, Object> entry : ((Map<String, Object>) value).entrySet()) {
if ("".equals(key)) {
flatten(entry.getKey(), entry.getValue(), result);
} else {
flatten(key + "." + entry.getKey(), entry.getValue(), result);
}
}
return;
}
if (value instanceof Iterable) {
int i = 0;
for (Object item : (Iterable) value) {
flatten(key + "." + i++, item, result);
}
return;
}
if (value.getClass().isArray()) {
for (int i = 0; i < Array.getLength(value); i++) {
flatten(key + "." + i, Array.get(value, i), result);
}
return;
}
if (value instanceof DateTime) {
result.put(key, formatDate((DateTime) value));
return;
}
if (value instanceof TimeValue) {
result.put(key, String.valueOf(((TimeValue) value).getMillis()));
return;
}
result.put(key, String.valueOf(value));
}
public static class Parser extends AbstractComponent implements Transform.Parser<SearchTransform> {
protected final ScriptServiceProxy scriptService;

View File

@ -7,18 +7,21 @@ package org.elasticsearch.alerts.transform;
import org.elasticsearch.alerts.ExecutionContext;
import org.elasticsearch.alerts.Payload;
import org.elasticsearch.alerts.support.Variables;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
/**
*
*/
public interface Transform extends ToXContent {
public abstract class Transform implements ToXContent {
static final Transform NOOP = new Transform() {
public static final Transform NOOP = new Transform() {
@Override
public String type() {
return "noop";
@ -35,11 +38,19 @@ public interface Transform extends ToXContent {
}
};
String type();
public abstract String type();
Result apply(ExecutionContext context, Payload payload) throws IOException;
public abstract Result apply(ExecutionContext ctx, Payload payload) throws IOException;
static class Result {
protected static Map<String, Object> createModel(ExecutionContext ctx, Payload payload) {
Map<String, Object> model = new HashMap<>();
model.put(Variables.SCHEDULED_FIRE_TIME, ctx.scheduledTime());
model.put(Variables.FIRE_TIME, ctx.fireTime());
model.put(Variables.PAYLOAD, payload.data());
return model;
}
public static class Result {
private final String type;
private final Payload payload;

View File

@ -0,0 +1,240 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.alerts.transform;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.alerts.AbstractAlertingTests;
import org.elasticsearch.alerts.ExecutionContext;
import org.elasticsearch.alerts.Payload;
import org.elasticsearch.alerts.support.Variables;
import org.elasticsearch.alerts.support.init.proxy.ClientProxy;
import org.elasticsearch.client.Requests;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.ImmutableList;
import org.elasticsearch.common.collect.ImmutableMap;
import org.elasticsearch.common.joda.time.DateTime;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.script.ScriptService;
import org.junit.Test;
import java.util.HashMap;
import java.util.Map;
import static org.elasticsearch.alerts.support.AlertsDateUtils.formatDate;
import static org.elasticsearch.alerts.support.AlertsDateUtils.parseDate;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.index.query.FilterBuilders.*;
import static org.elasticsearch.index.query.QueryBuilders.filteredQuery;
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
import static org.elasticsearch.search.builder.SearchSourceBuilder.searchSource;
import static org.hamcrest.Matchers.*;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
/**
*
*/
public class SearchTransformTests extends AbstractAlertingTests {
@Test
public void testApply() throws Exception {
index("idx", "type", "1");
ensureGreen("idx");
refresh();
SearchRequest request = Requests.searchRequest("idx").source(jsonBuilder().startObject()
.startObject("query")
.startObject("match_all").endObject()
.endObject()
.endObject());
SearchTransform transform = new SearchTransform(logger, scriptService(), ClientProxy.of(client()), request);
ExecutionContext ctx = mock(ExecutionContext.class);
DateTime now = new DateTime();
when(ctx.scheduledTime()).thenReturn(now);
when(ctx.fireTime()).thenReturn(now);
Payload payload = new Payload.Simple(new HashMap<String, Object>());
Transform.Result result = transform.apply(ctx, payload);
assertThat(result, notNullValue());
assertThat(result.type(), is(SearchTransform.TYPE));
SearchResponse response = client().search(request).get();
Payload expectedPayload = new Payload.ActionResponse(response);
// we need to remove the "took" field from teh response as this is the only field
// that most likely be different between the two... we don't really care about this
// field, we just want to make sure that the important parts of the response are the same
Map<String, Object> resultData = result.payload().data();
resultData.remove("took");
Map<String, Object> expectedData = expectedPayload.data();
expectedData.remove("took");
assertThat(resultData, equalTo(expectedData));
}
@Test
public void testApply_MustacheTemplate() throws Exception {
// The rational behind this test:
//
// - we index 4 documents each one associated with a unique value and each is associated with a day
// - we build a search transform such that with a filter that
// - the date must be after [scheduled_time] variable
// - the date must be before [fired_time] variable
// - the value must match [payload.value] variable
// - the variable are set as such:
// - scheduled_time = youngest document's date
// - fired_time = oldest document's date
// - payload.value = val_3
// - when executed, the variables will be replaced with the scheduled_time, fired_time and the payload.value.
// - we set all these variables accordingly (the search transform is responsible to populate them)
// - when replaced correctly, the search should return document 3.
//
// we then do a search for document 3, and compare the response to the payload returned by the transform
index("idx", "type", "1", doc("2015-01-01T00:00:00", "val_1"));
index("idx", "type", "2", doc("2015-01-02T00:00:00", "val_2"));
index("idx", "type", "3", doc("2015-01-03T00:00:00", "val_3"));
index("idx", "type", "4", doc("2015-01-04T00:00:00", "val_4"));
ensureGreen("idx");
refresh();
SearchRequest request = Requests.searchRequest("idx").source(searchSource().query(filteredQuery(matchAllQuery(), boolFilter()
.must(rangeFilter("date").gt("{{" + Variables.SCHEDULED_FIRE_TIME + "}}"))
.must(rangeFilter("date").lt("{{" + Variables.FIRE_TIME + "}}"))
.must(termFilter("value", "{{" + Variables.PAYLOAD + ".value}}")))));
SearchTransform transform = new SearchTransform(logger, scriptService(), ClientProxy.of(client()), request);
ExecutionContext ctx = mock(ExecutionContext.class);
when(ctx.scheduledTime()).thenReturn(parseDate("2015-01-01T00:00:00"));
when(ctx.fireTime()).thenReturn(parseDate("2015-01-04T00:00:00"));
Payload payload = new Payload.Simple(ImmutableMap.<String, Object>builder()
.put("value", "val_3")
.build());
Transform.Result result = transform.apply(ctx, payload);
assertThat(result, notNullValue());
assertThat(result.type(), is(SearchTransform.TYPE));
SearchResponse response = client().prepareSearch("idx").setQuery(
filteredQuery(matchAllQuery(), termFilter("value", "val_3")))
.get();
Payload expectedPayload = new Payload.ActionResponse(response);
// we need to remove the "took" field from teh response as this is the only field
// that most likely be different between the two... we don't really care about this
// field, we just want to make sure that the important parts of the response are the same
Map<String, Object> resultData = result.payload().data();
resultData.remove("took");
Map<String, Object> expectedData = expectedPayload.data();
expectedData.remove("took");
assertThat(resultData, equalTo(expectedData));
}
@Test
public void testParser() throws Exception {
String[] indices = rarely() ? null : randomBoolean() ? new String[] { "idx" } : new String[] { "idx1", "idx2" };
SearchType searchType = randomBoolean() ? null : randomFrom(SearchType.values());
String templateName = randomBoolean() ? null : "template1";
ScriptService.ScriptType templateType = templateName != null && randomBoolean() ? randomFrom(ScriptService.ScriptType.values()) : null;
BytesReference source = null;
XContentBuilder builder = jsonBuilder().startObject();
if (indices != null) {
builder.array("indices", indices);
}
if (searchType != null) {
builder.field("search_type", searchType.name());
}
if (templateName != null) {
builder.field("template_name", templateName);
}
if (templateType != null) {
builder.field("template_type", templateType);
}
XContentBuilder sourceBuilder = jsonBuilder().startObject()
.startObject("query")
.startObject("match_all")
.endObject()
.endObject()
.endObject();
source = sourceBuilder.bytes();
builder.startObject("body")
.startObject("query")
.startObject("match_all")
.endObject()
.endObject()
.endObject();
builder.endObject();
XContentParser parser = JsonXContent.jsonXContent.createParser(builder.bytes());
parser.nextToken();
SearchTransform transform = new SearchTransform.Parser(ImmutableSettings.EMPTY, scriptService(), ClientProxy.of(client())).parse(parser);
assertThat(transform, notNullValue());
assertThat(transform.type(), is(SearchTransform.TYPE));
assertThat(transform.request, notNullValue());
if (indices != null) {
assertThat(transform.request.indices(), arrayContainingInAnyOrder(indices));
}
if (searchType != null) {
assertThat(transform.request.searchType(), is(searchType));
}
if (templateName != null) {
assertThat(transform.request.templateName(), equalTo(templateName));
}
if (templateType != null) {
assertThat(transform.request.templateType(), equalTo(templateType));
}
assertThat(transform.request.source().toBytes(), equalTo(source.toBytes()));
}
@Test
public void testFlatten() throws Exception {
DateTime now = new DateTime();
Map<String, Object> map = ImmutableMap.<String, Object>builder()
.put("a", ImmutableMap.builder().put("a1", new int[] { 0, 1, 2 }).build())
.put("b", new String[] { "b0", "b1", "b2" })
.put("c", ImmutableList.of( TimeValue.timeValueSeconds(0), TimeValue.timeValueSeconds(1)))
.put("d", now)
.build();
Map<String, String> result = SearchTransform.flatten(map);
assertThat(result.size(), is(9));
assertThat(result, hasEntry("a.a1.0", "0"));
assertThat(result, hasEntry("a.a1.1", "1"));
assertThat(result, hasEntry("a.a1.2", "2"));
assertThat(result, hasEntry("b.0", "b0"));
assertThat(result, hasEntry("b.1", "b1"));
assertThat(result, hasEntry("b.2", "b2"));
assertThat(result, hasEntry("c.0", "0"));
assertThat(result, hasEntry("c.1", "1000"));
assertThat(result, hasEntry("d", formatDate(now)));
}
private static Map<String, Object> doc(String date, String value) {
Map<String, Object> doc = new HashMap<>();
doc.put("date", parseDate(date));
doc.put("value", value);
return doc;
}
}