Added support for multi doc indexing to index action

This commit adds support for indexing multiple documents with the `index` action. This is done by introducing a special `_doc` field. During action execution, the `_doc` field will be looked up in the payload. If found, the value of the field will be considered as the document that needs to be indexed. If the value is an array of objects, each object in that array will be treated as a separate document and all the documents in the array will be bulk indexed.

This commit also changes the result of the action to hold `XContentSource` rather than a payload (to avoid Map creation explosions). Th `XContentSource` was also extended to support lists.

Original commit: elastic/x-pack-elasticsearch@86f454b029
This commit is contained in:
uboness 2015-06-04 17:40:41 +02:00
parent ccdaf2f116
commit c491e4db16
18 changed files with 648 additions and 167 deletions

View File

@ -5,20 +5,29 @@
*/
package org.elasticsearch.watcher.actions.index;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.index.mapper.internal.TimestampFieldMapper;
import org.elasticsearch.watcher.actions.Action;
import org.elasticsearch.watcher.actions.ExecutableAction;
import org.elasticsearch.watcher.execution.WatchExecutionContext;
import org.elasticsearch.watcher.support.ArrayObjectIterator;
import org.elasticsearch.watcher.support.WatcherDateTimeUtils;
import org.elasticsearch.watcher.support.init.proxy.ClientProxy;
import org.elasticsearch.watcher.support.xcontent.XContentSource;
import org.elasticsearch.watcher.watch.Payload;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
public class ExecutableIndexAction extends ExecutableAction<IndexAction> {
private final ClientProxy client;
@ -30,29 +39,86 @@ public class ExecutableIndexAction extends ExecutableAction<IndexAction> {
@Override
public Action.Result execute(String actionId, WatchExecutionContext ctx, Payload payload) throws Exception {
Map<String, Object> data = payload.data();
if (data.containsKey("_doc")) {
Object doc = data.get("_doc");
if (doc instanceof Iterable) {
return indexBulk((Iterable) doc, actionId, ctx);
}
if (doc.getClass().isArray()) {
return indexBulk(new ArrayObjectIterator.Iterable(doc), actionId, ctx);
}
if (doc instanceof Map) {
data = (Map<String, Object>) doc;
} else {
throw new IndexActionException("could not execute action [{}] of watch [{}]. failed to index payload data. [_data] field must either hold a Map or an List/Array of Maps", actionId, ctx.watch().id());
}
}
IndexRequest indexRequest = new IndexRequest();
indexRequest.index(action.index);
indexRequest.type(action.docType);
XContentBuilder resultBuilder = XContentFactory.jsonBuilder().prettyPrint();
resultBuilder.startObject();
resultBuilder.field("data", payload.data());
resultBuilder.field("timestamp", ctx.executionTime());
resultBuilder.endObject();
indexRequest.source(resultBuilder);
if (action.executionTimeField != null && !TimestampFieldMapper.NAME.equals(action.executionTimeField)) {
if (!(data instanceof HashMap)) {
data = new HashMap<>(data); // ensuring mutability
}
data.put(action.executionTimeField, WatcherDateTimeUtils.formatDate(ctx.executionTime()));
} else {
indexRequest.timestamp(WatcherDateTimeUtils.formatDate(ctx.executionTime()));
}
indexRequest.source(jsonBuilder().prettyPrint().map(data));
Map<String, Object> data = new HashMap<>();
if (ctx.simulateAction(actionId)) {
return new IndexAction.Result.Simulated(action.index, action.docType, new Payload.Simple(indexRequest.sourceAsMap()));
return new IndexAction.Result.Simulated(action.index, action.docType, new XContentSource(indexRequest.source()));
}
IndexResponse response = client.index(indexRequest);
data.put("created", response.isCreated());
data.put("id", response.getId());
data.put("version", response.getVersion());
data.put("type", response.getType());
data.put("index", response.getIndex());
return new IndexAction.Result.Success(new Payload.Simple(data));
XContentBuilder jsonBuilder = jsonBuilder();
indexResponseToXContent(jsonBuilder, response);
return new IndexAction.Result.Success(new XContentSource(jsonBuilder.bytes()));
}
Action.Result indexBulk(Iterable list, String actionId, WatchExecutionContext ctx) throws Exception {
BulkRequest bulkRequest = new BulkRequest();
for (Object item : list) {
if (!(item instanceof Map)) {
throw new IndexActionException("could not execute action [{}] of watch [{}]. failed to index payload data. [_data] field must either hold a Map or an List/Array of Maps", actionId, ctx.watch().id());
}
Map<String, Object> doc = (Map<String, Object>) item;
IndexRequest indexRequest = new IndexRequest();
indexRequest.index(action.index);
indexRequest.type(action.docType);
if (action.executionTimeField != null && !TimestampFieldMapper.NAME.equals(action.executionTimeField)) {
if (!(doc instanceof HashMap)) {
doc = new HashMap<>(doc); // ensuring mutability
}
doc.put(action.executionTimeField, WatcherDateTimeUtils.formatDate(ctx.executionTime()));
} else {
indexRequest.timestamp(WatcherDateTimeUtils.formatDate(ctx.executionTime()));
}
indexRequest.source(jsonBuilder().prettyPrint().map(doc));
bulkRequest.add(indexRequest);
}
BulkResponse bulkResponse = client.bulk(bulkRequest);
XContentBuilder jsonBuilder = jsonBuilder().startArray();
for (BulkItemResponse item : bulkResponse) {
IndexResponse response = item.getResponse();
indexResponseToXContent(jsonBuilder, response);
}
jsonBuilder.endArray();
return new IndexAction.Result.Success(new XContentSource(jsonBuilder.bytes()));
}
static void indexResponseToXContent(XContentBuilder builder, IndexResponse response) throws IOException {
builder.startObject()
.field("created", response.isCreated())
.field("id", response.getId())
.field("version", response.getVersion())
.field("type", response.getType())
.field("index", response.getIndex())
.endObject();
}
}

View File

@ -5,11 +5,12 @@
*/
package org.elasticsearch.watcher.actions.index;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.watcher.actions.Action;
import org.elasticsearch.watcher.watch.Payload;
import org.elasticsearch.watcher.support.xcontent.XContentSource;
import java.io.IOException;
@ -22,10 +23,12 @@ public class IndexAction implements Action {
final String index;
final String docType;
final @Nullable String executionTimeField;
public IndexAction(String index, String docType) {
public IndexAction(String index, String docType, @Nullable String executionTimeField) {
this.index = index;
this.docType = docType;
this.executionTimeField = executionTimeField;
}
@Override
@ -41,35 +44,45 @@ public class IndexAction implements Action {
return docType;
}
public String getExecutionTimeField() {
return executionTimeField;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
IndexAction action = (IndexAction) o;
IndexAction that = (IndexAction) o;
if (!index.equals(action.index)) return false;
return docType.equals(action.docType);
if (!index.equals(that.index)) return false;
if (!docType.equals(that.docType)) return false;
return !(executionTimeField != null ? !executionTimeField.equals(that.executionTimeField) : that.executionTimeField != null);
}
@Override
public int hashCode() {
int result = index.hashCode();
result = 31 * result + docType.hashCode();
result = 31 * result + (executionTimeField != null ? executionTimeField.hashCode() : 0);
return result;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
return builder.startObject()
.field(Field.INDEX.getPreferredName(), index)
.field(Field.DOC_TYPE.getPreferredName(), docType)
.endObject();
builder.startObject();
builder.field(Field.INDEX.getPreferredName(), index);
builder.field(Field.DOC_TYPE.getPreferredName(), docType);
if (executionTimeField != null) {
builder.field(Field.EXECUTION_TIME_FIELD.getPreferredName(), executionTimeField);
}
return builder.endObject();
}
public static IndexAction parse(String watchId, String actionId, XContentParser parser) throws IOException {
String index = null;
String docType = null;
String executionTimeField = null;
String currentFieldName = null;
XContentParser.Token token;
@ -81,6 +94,8 @@ public class IndexAction implements Action {
index = parser.text();
} else if (Field.DOC_TYPE.match(currentFieldName)) {
docType = parser.text();
} else if (Field.EXECUTION_TIME_FIELD.match(currentFieldName)) {
executionTimeField = parser.text();
} else {
throw new IndexActionException("could not parse [{}] action [{}/{}]. unexpected string field [{}]", TYPE, watchId, actionId, currentFieldName);
}
@ -97,7 +112,7 @@ public class IndexAction implements Action {
throw new IndexActionException("could not parse [{}] action [{}/{}]. missing required [{}] field", TYPE, watchId, actionId, Field.DOC_TYPE.getPreferredName());
}
return new IndexAction(index, docType);
return new IndexAction(index, docType, executionTimeField);
}
public static Builder builder(String index, String docType) {
@ -108,14 +123,14 @@ public class IndexAction implements Action {
class Success extends Action.Result implements Result {
private final Payload response;
private final XContentSource response;
public Success(Payload response) {
public Success(XContentSource response) {
super(TYPE, Status.SUCCESS);
this.response = response;
}
public Payload response() {
public XContentSource response() {
return response;
}
@ -131,9 +146,9 @@ public class IndexAction implements Action {
private final String index;
private final String docType;
private final Payload source;
private final XContentSource source;
protected Simulated(String index, String docType, Payload source) {
protected Simulated(String index, String docType, XContentSource source) {
super(TYPE, Status.SIMULATED);
this.index = index;
this.docType = docType;
@ -148,7 +163,7 @@ public class IndexAction implements Action {
return docType;
}
public Payload source() {
public XContentSource source() {
return source;
}
@ -169,21 +184,28 @@ public class IndexAction implements Action {
final String index;
final String docType;
String executionTimeField;
private Builder(String index, String docType) {
this.index = index;
this.docType = docType;
}
public Builder setExecutionTimeField(String executionTimeField) {
this.executionTimeField = executionTimeField;
return this;
}
@Override
public IndexAction build() {
return new IndexAction(index, docType);
return new IndexAction(index, docType, executionTimeField);
}
}
interface Field extends Action.Field {
ParseField INDEX = new ParseField("index");
ParseField DOC_TYPE = new ParseField("doc_type");
ParseField EXECUTION_TIME_FIELD = new ParseField("execution_time_field");
ParseField SOURCE = new ParseField("source");
ParseField RESPONSE = new ParseField("response");
ParseField REQUEST = new ParseField("request");

View File

@ -12,7 +12,7 @@ import org.elasticsearch.watcher.execution.WatchExecutionContext;
import org.elasticsearch.watcher.support.Variables;
import org.elasticsearch.watcher.support.WatcherDateTimeUtils;
import org.elasticsearch.watcher.support.clock.Clock;
import org.elasticsearch.watcher.support.xcontent.MapPath;
import org.elasticsearch.watcher.support.xcontent.ObjectPath;
import java.io.IOException;
import java.util.HashMap;
@ -59,13 +59,13 @@ public class ExecutableCompareCondition extends ExecutableCondition<CompareCondi
matcher = PATH_PATTERN.matcher((String) configuredValue);
if (matcher.matches()) {
String configuredPath = matcher.group(1);
configuredValue = MapPath.eval(configuredPath, model);
configuredValue = ObjectPath.eval(configuredPath, model);
resolvedValues.put(configuredPath, configuredValue);
}
}
}
Object resolvedValue = MapPath.eval(condition.getPath(), model);
Object resolvedValue = ObjectPath.eval(condition.getPath(), model);
resolvedValues.put(condition.getPath(), resolvedValue);
return new CompareCondition.Result(resolvedValues, condition.getOp().eval(resolvedValue, configuredValue));

View File

@ -0,0 +1,54 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.watcher.support;
import java.lang.reflect.Array;
import java.util.Iterator;
/**
*
*/
public class ArrayObjectIterator implements Iterator<Object> {
private final Object array;
private final int length;
private int index;
public ArrayObjectIterator(Object array) {
this.array = array;
this.length = Array.getLength(array);
this.index = 0;
}
@Override
public boolean hasNext() {
return index < length;
}
@Override
public Object next() {
return Array.get(array, index++);
}
@Override
public void remove() {
throw new UnsupportedOperationException("array iterator does not support removing elements");
}
public static class Iterable implements java.lang.Iterable<Object> {
private Object array;
public Iterable(Object array) {
this.array = array;
}
@Override
public Iterator<Object> iterator() {
return new ArrayObjectIterator(array);
}
}
}

View File

@ -11,6 +11,7 @@ import org.elasticsearch.common.mustache.DefaultMustacheFactory;
import org.elasticsearch.common.mustache.MustacheException;
import org.elasticsearch.common.mustache.reflect.ReflectionObjectHandler;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.watcher.support.ArrayObjectIterator;
import java.io.IOException;
import java.io.Writer;
@ -100,36 +101,10 @@ public class XMustacheFactory extends DefaultMustacheFactory {
*/
@Override
public Iterator<Object> iterator() {
return new Iter(array);
return new ArrayObjectIterator(array);
}
static class Iter implements Iterator<Object> {
private final Object array;
private final int length;
private int index;
public Iter(Object array) {
this.array = array;
this.length = Array.getLength(array);
this.index = 0;
}
@Override
public boolean hasNext() {
return index < length;
}
@Override
public Object next() {
return Array.get(array, index++);
}
@Override
public void remove() {
throw new UnsupportedOperationException("array iterator does not support removing elements");
}
}
}
static class CollectionMap extends AbstractMap<Object, Object> implements Iterable<Object> {

View File

@ -14,16 +14,16 @@ import java.util.Map;
/**
*
*/
public class MapPath {
public class ObjectPath {
private MapPath() {
private ObjectPath() {
}
public static <T> T eval(String path, Map<String, Object> map) {
return (T) eval(path, (Object) map);
public static <T> T eval(String path, Object object) {
return (T) evalContext(path, object);
}
private static Object eval(String path, Object ctx) {
private static Object evalContext(String path, Object ctx) {
String[] parts = Strings.splitStringToArray(path, '.');
StringBuilder resolved = new StringBuilder();
for (String part : parts) {

View File

@ -5,7 +5,16 @@
*/
package org.elasticsearch.watcher.support.xcontent;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.compress.CompressedStreamInput;
import org.elasticsearch.common.compress.Compressor;
import org.elasticsearch.common.compress.CompressorFactory;
import org.elasticsearch.common.io.stream.BytesStreamInput;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import java.io.IOException;
import java.util.ArrayList;
@ -19,6 +28,49 @@ public class WatcherXContentUtils {
private WatcherXContentUtils() {
}
public static Tuple<XContentType, Object> convertToObject(BytesReference bytes) throws ElasticsearchParseException {
if (bytes.hasArray()) {
return convertToObject(bytes.array(), bytes.arrayOffset(), bytes.length());
}
try {
XContentParser parser;
XContentType contentType;
Compressor compressor = CompressorFactory.compressor(bytes);
if (compressor != null) {
CompressedStreamInput compressedStreamInput = compressor.streamInput(bytes.streamInput());
contentType = XContentFactory.xContentType(compressedStreamInput);
compressedStreamInput.resetToBufferStart();
parser = XContentFactory.xContent(contentType).createParser(compressedStreamInput);
} else {
contentType = XContentFactory.xContentType(bytes);
parser = XContentFactory.xContent(contentType).createParser(bytes.streamInput());
}
return Tuple.tuple(contentType, readValue(parser, parser.nextToken()));
} catch (IOException e) {
throw new ElasticsearchParseException("Failed to parse content to map", e);
}
}
public static Tuple<XContentType, Object> convertToObject(byte[] data, int offset, int length) throws ElasticsearchParseException {
try {
XContentParser parser;
XContentType contentType;
Compressor compressor = CompressorFactory.compressor(data, offset, length);
if (compressor != null) {
CompressedStreamInput compressedStreamInput = compressor.streamInput(new BytesStreamInput(data, offset, length, false));
contentType = XContentFactory.xContentType(compressedStreamInput);
compressedStreamInput.resetToBufferStart();
parser = XContentFactory.xContent(contentType).createParser(compressedStreamInput);
} else {
contentType = XContentFactory.xContentType(data, offset, length);
parser = XContentFactory.xContent(contentType).createParser(data, offset, length);
}
return Tuple.tuple(contentType, readValue(parser, parser.nextToken()));
} catch (IOException e) {
throw new ElasticsearchParseException("Failed to parse content to map", e);
}
}
// TODO open this up in core
public static List<Object> readList(XContentParser parser, XContentParser.Token token) throws IOException {
List<Object> list = new ArrayList<>();

View File

@ -13,6 +13,7 @@ import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.*;
import java.io.IOException;
import java.util.List;
import java.util.Map;
/**
@ -23,7 +24,7 @@ public class XContentSource implements ToXContent {
private final BytesReference bytes;
private XContentType contentType;
private Map<String, Object> data;
private Object data;
/**
* Constructs a new XContentSource out of the given bytes reference.
@ -39,16 +40,32 @@ public class XContentSource implements ToXContent {
return bytes;
}
/**
* @return true if the top level value of the source is a map
*/
public boolean isMap() {
return data() instanceof Map;
}
/**
* @return The source as a map
*/
public Map<String, Object> getAsMap() {
if (data == null) {
Tuple<XContentType, Map<String, Object>> tuple = XContentHelper.convertToMap(bytes, false);
this.contentType = tuple.v1();
this.data = tuple.v2();
}
return data;
return (Map<String, Object>) data();
}
/**
* @return true if the top level value of the source is a list
*/
public boolean isList() {
return data() instanceof List;
}
/**
* @return The source as a list
*/
public List<Object> getAsList() {
return (List<Object>) data();
}
/**
@ -58,7 +75,7 @@ public class XContentSource implements ToXContent {
* @return The extracted value or {@code null} if no value is associated with the given path
*/
public <T> T getValue(String path) {
return (T) MapPath.eval(path, getAsMap());
return (T) ObjectPath.eval(path, data());
}
@Override
@ -84,4 +101,13 @@ public class XContentSource implements ToXContent {
return contentType;
}
private Object data() {
if (data == null) {
Tuple<XContentType, Object> tuple = WatcherXContentUtils.convertToObject(bytes);
this.contentType = tuple.v1();
this.data = tuple.v2();
}
return data;
}
}

View File

@ -77,7 +77,7 @@ public interface Payload extends ToXContent {
}
}
static class XContent extends Simple {
class XContent extends Simple {
public XContent(XContentParser parser) {
super(mapOrdered(parser));

View File

@ -0,0 +1,207 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.watcher.actions.index;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.common.joda.time.DateTime;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogram;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.watcher.history.HistoryStore;
import org.elasticsearch.watcher.support.WatcherDateTimeUtils;
import org.elasticsearch.watcher.test.AbstractWatcherIntegrationTests;
import org.elasticsearch.watcher.transport.actions.execute.ExecuteWatchResponse;
import org.elasticsearch.watcher.transport.actions.put.PutWatchResponse;
import org.elasticsearch.watcher.trigger.schedule.ScheduleTriggerEvent;
import org.junit.Test;
import static org.elasticsearch.common.joda.time.DateTimeZone.UTC;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.index.query.QueryBuilders.matchQuery;
import static org.elasticsearch.search.aggregations.AggregationBuilders.dateHistogram;
import static org.elasticsearch.search.builder.SearchSourceBuilder.searchSource;
import static org.elasticsearch.watcher.actions.ActionBuilders.indexAction;
import static org.elasticsearch.watcher.client.WatchSourceBuilders.watchBuilder;
import static org.elasticsearch.watcher.input.InputBuilders.searchInput;
import static org.elasticsearch.watcher.input.InputBuilders.simpleInput;
import static org.elasticsearch.watcher.transform.TransformBuilders.scriptTransform;
import static org.elasticsearch.watcher.trigger.TriggerBuilders.schedule;
import static org.elasticsearch.watcher.trigger.schedule.Schedules.cron;
import static org.hamcrest.Matchers.*;
/**
*
*/
public class IndexActionIntegrationTests extends AbstractWatcherIntegrationTests {
@Test
public void testSimple() throws Exception {
PutWatchResponse putWatchResponse = watcherClient().preparePutWatch("_id").setSource(watchBuilder()
.trigger(schedule(cron("0/1 * * * * ? 2020")))
.input(simpleInput("foo", "bar"))
.addAction("index-buckets", indexAction("idx", "type").setExecutionTimeField("@timestamp")))
.get();
assertThat(putWatchResponse.isCreated(), is(true));
DateTime now = timeWarped() ? timeWarp().clock().now(UTC) : DateTime.now(UTC);
ExecuteWatchResponse executeWatchResponse = watcherClient().prepareExecuteWatch("_id")
.setTriggerEvent(new ScheduleTriggerEvent(now, now))
.get();
assertThat(executeWatchResponse.getRecordSource().getValue("state"), is((Object) "executed"));
flush("idx");
refresh();
SearchResponse searchResponse = client().prepareSearch("idx").setTypes("type").get();
assertThat(searchResponse.getHits().totalHits(), is(1L));
SearchHit hit = searchResponse.getHits().getAt(0);
if (timeWarped()) {
assertThat(hit.getSource(), hasEntry("@timestamp", (Object) WatcherDateTimeUtils.formatDate(now)));
} else {
assertThat(hit.getSource(), hasKey("@timestamp"));
DateTime timestamp = WatcherDateTimeUtils.parseDate((String) hit.getSource().get("@timestamp"));
assertThat(timestamp.isEqual(now) || timestamp.isAfter(now), is(true));
}
assertThat(hit.getSource(), hasEntry("foo", (Object) "bar"));
}
@Test
public void testSimple_WithDocField() throws Exception {
PutWatchResponse putWatchResponse = watcherClient().preparePutWatch("_id").setSource(watchBuilder()
.trigger(schedule(cron("0/1 * * * * ? 2020")))
.input(simpleInput("foo", "bar"))
.addAction("index-buckets",
scriptTransform("return [ '_doc' : ctx.payload ]"),
indexAction("idx", "type").setExecutionTimeField("@timestamp")))
.get();
assertThat(putWatchResponse.isCreated(), is(true));
DateTime now = timeWarped() ? timeWarp().clock().now(UTC) : DateTime.now(UTC);
ExecuteWatchResponse executeWatchResponse = watcherClient().prepareExecuteWatch("_id")
.setTriggerEvent(new ScheduleTriggerEvent(now, now))
.get();
assertThat(executeWatchResponse.getRecordSource().getValue("state"), is((Object) "executed"));
flush("idx");
refresh();
SearchResponse searchResponse = client().prepareSearch("idx").setTypes("type").get();
assertThat(searchResponse.getHits().totalHits(), is(1L));
SearchHit hit = searchResponse.getHits().getAt(0);
if (timeWarped()) {
assertThat(hit.getSource(), hasEntry("@timestamp", (Object) WatcherDateTimeUtils.formatDate(now)));
} else {
assertThat(hit.getSource(), hasKey("@timestamp"));
DateTime timestamp = WatcherDateTimeUtils.parseDate((String) hit.getSource().get("@timestamp"));
assertThat(timestamp.isEqual(now) || timestamp.isAfter(now), is(true));
}
assertThat(hit.getSource(), hasEntry("foo", (Object) "bar"));
}
@Test
public void testSimple_WithDocField_WrongFieldType() throws Exception {
PutWatchResponse putWatchResponse = watcherClient().preparePutWatch("_id").setSource(watchBuilder()
.trigger(schedule(cron("0/1 * * * * ? 2020")))
.input(simpleInput("foo", "bar"))
.addAction("index-buckets",
scriptTransform("return [ '_doc' : 1 ]"),
indexAction("idx", "type").setExecutionTimeField("@timestamp")))
.get();
assertThat(putWatchResponse.isCreated(), is(true));
DateTime now = timeWarped() ? timeWarp().clock().now(UTC) : DateTime.now(UTC);
ExecuteWatchResponse executeWatchResponse = watcherClient().prepareExecuteWatch("_id")
.setTriggerEvent(new ScheduleTriggerEvent(now, now))
.setRecordExecution(true)
.get();
assertThat(executeWatchResponse.getRecordSource().getValue("state"), is((Object) "executed"));
flush();
refresh();
assertThat(client().admin().indices().prepareExists("idx").get().isExists(), is(false));
assertThat(docCount(HistoryStore.INDEX_PREFIX + "*", HistoryStore.DOC_TYPE, searchSource()
.query(matchQuery("result.actions.status", "failure"))), is(1L));
}
@Test
public void testIndexAggsBucketsAsDocuments() throws Exception {
DateTime now = timeWarped() ? timeWarp().clock().now(UTC) : DateTime.now(UTC);
long bucketCount = (long) randomIntBetween(2, 5);
for (int i = 0; i < bucketCount; i++) {
index("idx", "type", jsonBuilder().startObject()
.field("timestamp", now.minusDays(i))
.endObject());
}
flush("idx");
refresh();
PutWatchResponse putWatchResponse = watcherClient().preparePutWatch("_id").setSource(watchBuilder()
.trigger(schedule(cron("0/1 * * * * ? 2020")))
.input(searchInput(new SearchRequest("idx")
.types("type")
.searchType(SearchType.COUNT)
.source(searchSource()
.aggregation(dateHistogram("trend")
.field("timestamp")
.interval(DateHistogram.Interval.DAY)))))
.addAction("index-buckets",
// this transform takes the bucket list and assigns it to `_doc`
// this means each bucket will be indexed as a separate doc,
// so we expect to have the same number of documents as the number
// of buckets.
scriptTransform("return [ '_doc' : ctx.payload.aggregations.trend.buckets]"),
indexAction("idx", "bucket").setExecutionTimeField("@timestamp")))
.get();
assertThat(putWatchResponse.isCreated(), is(true));
ExecuteWatchResponse executeWatchResponse = watcherClient().prepareExecuteWatch("_id")
.setTriggerEvent(new ScheduleTriggerEvent(now, now))
.get();
assertThat(executeWatchResponse.getRecordSource().getValue("state"), is((Object) "executed"));
flush("idx");
refresh();
SearchResponse searchResponse = client().prepareSearch("idx").setTypes("bucket")
.addSort("key", SortOrder.DESC)
.get();
assertThat(searchResponse.getHits().getTotalHits(), is(bucketCount));
DateTime key = now.withMillisOfDay(0);
int i = 0;
for (SearchHit hit : searchResponse.getHits()) {
if (timeWarped()) {
assertThat(hit.getSource(), hasEntry("@timestamp", (Object) WatcherDateTimeUtils.formatDate(now)));
} else {
assertThat(hit.getSource(), hasKey("@timestamp"));
DateTime timestamp = WatcherDateTimeUtils.parseDate((String) hit.getSource().get("@timestamp"));
assertThat(timestamp.isEqual(now) || timestamp.isAfter(now), is(true));
}
assertThat(hit.getSource(), hasEntry("key", (Object) key.getMillis()));
key = key.minusDays(1);
}
}
}

View File

@ -7,99 +7,179 @@ package org.elasticsearch.watcher.actions.index;
import com.carrotsearch.randomizedtesting.annotations.Repeat;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.common.collect.ImmutableList;
import org.elasticsearch.common.collect.ImmutableMap;
import org.elasticsearch.common.collect.ImmutableSet;
import org.elasticsearch.common.joda.time.DateTime;
import org.elasticsearch.common.joda.time.DateTimeZone;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.elasticsearch.watcher.actions.Action;
import org.elasticsearch.watcher.actions.Action.Result.Status;
import org.elasticsearch.watcher.actions.email.service.Authentication;
import org.elasticsearch.watcher.actions.email.service.Email;
import org.elasticsearch.watcher.actions.email.service.EmailService;
import org.elasticsearch.watcher.actions.email.service.Profile;
import org.elasticsearch.watcher.execution.TriggeredExecutionContext;
import org.elasticsearch.watcher.execution.WatchExecutionContext;
import org.elasticsearch.watcher.support.http.HttpClient;
import org.elasticsearch.watcher.support.http.auth.HttpAuthRegistry;
import org.elasticsearch.watcher.support.WatcherDateTimeUtils;
import org.elasticsearch.watcher.support.init.proxy.ClientProxy;
import org.elasticsearch.watcher.support.init.proxy.ScriptServiceProxy;
import org.elasticsearch.watcher.support.xcontent.XContentSource;
import org.elasticsearch.watcher.test.WatcherTestUtils;
import org.elasticsearch.watcher.trigger.schedule.ScheduleTriggerEvent;
import org.elasticsearch.watcher.watch.Payload;
import org.elasticsearch.watcher.watch.Watch;
import org.junit.Test;
import java.util.HashMap;
import java.util.Map;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
import static org.elasticsearch.search.aggregations.AggregationBuilders.terms;
import static org.elasticsearch.search.builder.SearchSourceBuilder.searchSource;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
import static org.mockito.Mockito.mock;
import static org.hamcrest.Matchers.*;
/**
*/
public class IndexActionTests extends ElasticsearchIntegrationTest {
@Test
public void testIndexActionExecute() throws Exception {
@Test @Repeat(iterations = 6)
public void testIndexActionExecute_SingleDoc() throws Exception {
IndexAction action = new IndexAction("test-index", "test-type");
String timestampField = randomFrom(null, "_timestamp", "@timestamp");
boolean customTimestampField = "@timestamp".equals(timestampField);
if (timestampField == null || "_timestamp".equals(timestampField)) {
assertThat(prepareCreate("test-index")
.addMapping("test-type", "{ \"test-type\" : { \"_timestamp\" : { \"enabled\" : \"true\" }}}")
.get().isAcknowledged(), is(true));
}
IndexAction action = new IndexAction("test-index", "test-type", timestampField);
ExecutableIndexAction executable = new ExecutableIndexAction(action, logger, ClientProxy.of(client()));
final String account = "account1";
Watch watch = WatcherTestUtils.createTestWatch("test_watch",
ClientProxy.of(client()),
ScriptServiceProxy.of(internalCluster().getInstance(ScriptService.class)),
new HttpClient(ImmutableSettings.EMPTY, mock(HttpAuthRegistry.class)).start(),
new EmailService() {
@Override
public EmailService.EmailSent send(Email email, Authentication auth, Profile profile) {
return new EmailSent(account, email);
}
DateTime executionTime = DateTime.now(DateTimeZone.UTC);
Payload payload = randomBoolean() ? new Payload.Simple("foo", "bar") : new Payload.Simple("_doc", ImmutableMap.of("foo", "bar"));
WatchExecutionContext ctx = WatcherTestUtils.mockExecutionContext("_id", executionTime, payload);
@Override
public EmailSent send(Email email, Authentication auth, Profile profile, String accountName) {
return new EmailSent(account, email);
}
},
logger);
WatchExecutionContext ctx = new TriggeredExecutionContext(watch, new DateTime(), new ScheduleTriggerEvent(watch.id(), new DateTime(), new DateTime()), TimeValue.timeValueSeconds(5));
Map<String, Object> payloadMap = new HashMap<>();
payloadMap.put("test", "foo");
Action.Result result = executable.execute("_id", ctx, new Payload.Simple(payloadMap));
Action.Result result = executable.execute("_id", ctx, ctx.payload());
assertThat(result.status(), equalTo(Status.SUCCESS));
assertThat(result, instanceOf(IndexAction.Result.Success.class));
IndexAction.Result.Success successResult = (IndexAction.Result.Success) result;
Map<String, Object> responseData = successResult.response().data();
assertThat(responseData.get("created"), equalTo((Object)Boolean.TRUE));
assertThat(responseData.get("version"), equalTo((Object) 1L));
assertThat(responseData.get("type").toString(), equalTo("test-type"));
assertThat(responseData.get("index").toString(), equalTo("test-index"));
XContentSource response = successResult.response();
assertThat(response.getValue("created"), equalTo((Object)Boolean.TRUE));
assertThat(response.getValue("version"), equalTo((Object) 1));
assertThat(response.getValue("type").toString(), equalTo("test-type"));
assertThat(response.getValue("index").toString(), equalTo("test-index"));
refresh(); //Manually refresh to make sure data is available
SearchResponse sr = client().prepareSearch("test-index")
SearchResponse searchResponse = client().prepareSearch("test-index")
.setTypes("test-type")
.setSource(searchSource().query(matchAllQuery()).buildAsBytes()).get();
.setSource(searchSource()
.query(matchAllQuery())
.aggregation(terms("timestamps").field(customTimestampField ? timestampField : "_timestamp"))
.buildAsBytes())
.get();
assertThat(sr.getHits().totalHits(), equalTo(1L));
assertThat(searchResponse.getHits().totalHits(), equalTo(1L));
SearchHit hit = searchResponse.getHits().getAt(0);
if (customTimestampField) {
assertThat(hit.getSource().size(), is(2));
assertThat(hit.getSource(), hasEntry("foo", (Object) "bar"));
assertThat(hit.getSource(), hasEntry(timestampField, (Object) WatcherDateTimeUtils.formatDate(executionTime)));
} else {
assertThat(hit.getSource().size(), is(1));
assertThat(hit.getSource(), hasEntry("foo", (Object) "bar"));
}
Terms terms = searchResponse.getAggregations().get("timestamps");
assertThat(terms, notNullValue());
assertThat(terms.getBuckets(), hasSize(1));
assertThat(terms.getBuckets().get(0).getKeyAsNumber().longValue(), is(executionTime.getMillis()));
assertThat(terms.getBuckets().get(0).getDocCount(), is(1L));
}
@Test @Repeat(iterations = 6)
public void testIndexActionExecute_MultiDoc() throws Exception {
String timestampField = randomFrom(null, "_timestamp", "@timestamp");
boolean customTimestampField = "@timestamp".equals(timestampField);
if (timestampField == null || "_timestamp".equals(timestampField)) {
assertThat(prepareCreate("test-index")
.addMapping("test-type", "{ \"test-type\" : { \"_timestamp\" : { \"enabled\" : \"true\" }}}")
.get().isAcknowledged(), is(true));
}
Object list = randomFrom(
new Map[] { ImmutableMap.of("foo", "bar"), ImmutableMap.of("foo", "bar1") },
ImmutableList.of(ImmutableMap.of("foo", "bar"), ImmutableMap.of("foo", "bar1")),
ImmutableSet.of(ImmutableMap.of("foo", "bar"), ImmutableMap.of("foo", "bar1"))
);
IndexAction action = new IndexAction("test-index", "test-type", timestampField);
ExecutableIndexAction executable = new ExecutableIndexAction(action, logger, ClientProxy.of(client()));
DateTime executionTime = DateTime.now(DateTimeZone.UTC);
WatchExecutionContext ctx = WatcherTestUtils.mockExecutionContext("_id", executionTime, new Payload.Simple("_doc", list));
Action.Result result = executable.execute("_id", ctx, ctx.payload());
assertThat(result.status(), equalTo(Status.SUCCESS));
assertThat(result, instanceOf(IndexAction.Result.Success.class));
IndexAction.Result.Success successResult = (IndexAction.Result.Success) result;
XContentSource response = successResult.response();
assertThat(response.getValue("0.created"), equalTo((Object)Boolean.TRUE));
assertThat(response.getValue("0.version"), equalTo((Object) 1));
assertThat(response.getValue("0.type").toString(), equalTo("test-type"));
assertThat(response.getValue("0.index").toString(), equalTo("test-index"));
assertThat(response.getValue("1.created"), equalTo((Object)Boolean.TRUE));
assertThat(response.getValue("1.version"), equalTo((Object) 1));
assertThat(response.getValue("1.type").toString(), equalTo("test-type"));
assertThat(response.getValue("1.index").toString(), equalTo("test-index"));
refresh(); //Manually refresh to make sure data is available
SearchResponse searchResponse = client().prepareSearch("test-index")
.setTypes("test-type")
.addSort("foo", SortOrder.ASC)
.setSource(searchSource()
.query(matchAllQuery())
.aggregation(terms("timestamps").field(customTimestampField ? timestampField : "_timestamp"))
.buildAsBytes())
.get();
assertThat(searchResponse.getHits().totalHits(), equalTo(2L));
SearchHit hit = searchResponse.getHits().getAt(0);
if (customTimestampField) {
assertThat(hit.getSource().size(), is(2));
assertThat(hit.getSource(), hasEntry("foo", (Object) "bar"));
assertThat(hit.getSource(), hasEntry(timestampField, (Object) WatcherDateTimeUtils.formatDate(executionTime)));
} else {
assertThat(hit.getSource().size(), is(1));
assertThat(hit.getSource(), hasEntry("foo", (Object) "bar"));
}
hit = searchResponse.getHits().getAt(1);
if (customTimestampField) {
assertThat(hit.getSource().size(), is(2));
assertThat(hit.getSource(), hasEntry("foo", (Object) "bar1"));
assertThat(hit.getSource(), hasEntry(timestampField, (Object) WatcherDateTimeUtils.formatDate(executionTime)));
} else {
assertThat(hit.getSource().size(), is(1));
assertThat(hit.getSource(), hasEntry("foo", (Object) "bar1"));
}
}
@Test @Repeat(iterations = 10)
public void testParser() throws Exception {
String timestampField = randomBoolean() ? "@timestamp" : null;
XContentBuilder builder = jsonBuilder();
builder.startObject()
.field(IndexAction.Field.INDEX.getPreferredName(), "test-index")
.field(IndexAction.Field.DOC_TYPE.getPreferredName(), "test-type")
.endObject();
builder.startObject();
builder.field(IndexAction.Field.INDEX.getPreferredName(), "test-index");
builder.field(IndexAction.Field.DOC_TYPE.getPreferredName(), "test-type");
if (timestampField != null) {
builder.field(IndexAction.Field.EXECUTION_TIME_FIELD.getPreferredName(), timestampField);
}
builder.endObject();
IndexActionFactory actionParser = new IndexActionFactory(ImmutableSettings.EMPTY, ClientProxy.of(client()));
XContentParser parser = JsonXContent.jsonXContent.createParser(builder.bytes());
@ -109,6 +189,9 @@ public class IndexActionTests extends ElasticsearchIntegrationTest {
assertThat(executable.action().docType, equalTo("test-type"));
assertThat(executable.action().index, equalTo("test-index"));
if (timestampField != null) {
assertThat(executable.action().executionTimeField, equalTo(timestampField));
}
}
@Test @Repeat(iterations = 10)

View File

@ -25,7 +25,7 @@ import org.elasticsearch.watcher.history.WatchRecord;
import org.elasticsearch.watcher.support.clock.SystemClock;
import org.elasticsearch.watcher.support.http.HttpRequestTemplate;
import org.elasticsearch.watcher.support.template.Template;
import org.elasticsearch.watcher.support.xcontent.MapPath;
import org.elasticsearch.watcher.support.xcontent.ObjectPath;
import org.elasticsearch.watcher.test.AbstractWatcherIntegrationTests;
import org.elasticsearch.watcher.transport.actions.execute.ExecuteWatchResponse;
import org.elasticsearch.watcher.transport.actions.get.GetWatchRequest;
@ -358,7 +358,7 @@ public class ActionThrottleTests extends AbstractWatcherIntegrationTests {
private String getExecutionStatus(Map<String, Object> watchRecordMap) {
return MapPath.eval("result.actions.0.status", watchRecordMap);
return ObjectPath.eval("result.actions.0.status", watchRecordMap);
}
private ManualExecutionContext getManualExecutionContext(TimeValue throttlePeriod) {

View File

@ -93,23 +93,19 @@ public class WebhookActionTests extends ElasticsearchTestCase {
@Test @Repeat(iterations = 30)
public void testExecute() throws Exception {
ClientProxy client = mock(ClientProxy.class);
ExecuteScenario scenario = randomFrom(ExecuteScenario.Success, ExecuteScenario.ErrorCode);
HttpClient httpClient = scenario.client();
HttpMethod method = randomFrom(HttpMethod.GET, HttpMethod.POST, HttpMethod.PUT, HttpMethod.DELETE, HttpMethod.HEAD);
final String account = "account1";
HttpRequestTemplate httpRequest = getHttpRequestTemplate(method, TEST_HOST, TEST_PORT, testPath, testBody, null);
WebhookAction action = new WebhookAction(httpRequest);
ExecutableWebhookAction executable = new ExecutableWebhookAction(action, logger, httpClient, templateEngine);
Watch watch = createWatch("test_watch", client, account);
WatchExecutionContext ctx = new TriggeredExecutionContext(watch, new DateTime(), new ScheduleTriggerEvent(watch.id(), new DateTime(), new DateTime()), timeValueSeconds(5));
WatchExecutionContext ctx = WatcherTestUtils.mockExecutionContext("_id", new Payload.Simple("foo", "bar"));
Action.Result actionResult = executable.execute("_id", ctx, Payload.EMPTY);
scenario.assertResult(httpClient, actionResult);
}

View File

@ -21,7 +21,7 @@ import org.elasticsearch.watcher.history.HistoryStore;
import org.elasticsearch.watcher.history.WatchRecord;
import org.elasticsearch.watcher.input.simple.SimpleInput;
import org.elasticsearch.watcher.support.Script;
import org.elasticsearch.watcher.support.xcontent.MapPath;
import org.elasticsearch.watcher.support.xcontent.ObjectPath;
import org.elasticsearch.watcher.test.AbstractWatcherIntegrationTests;
import org.elasticsearch.watcher.transport.actions.delete.DeleteWatchResponse;
import org.elasticsearch.watcher.transport.actions.get.GetWatchRequest;
@ -208,8 +208,8 @@ public class ManualExecutionTests extends AbstractWatcherIntegrationTests {
.get().getRecordSource().getAsMap();
assertThat(MapPath.<String>eval("state", executeWatchResult), equalTo(ExecutionState.EXECUTION_NOT_NEEDED.toString()));
assertThat(MapPath.<String>eval("result.input.payload.foo", executeWatchResult), equalTo("bar"));
assertThat(ObjectPath.<String>eval("state", executeWatchResult), equalTo(ExecutionState.EXECUTION_NOT_NEEDED.toString()));
assertThat(ObjectPath.<String>eval("result.input.payload.foo", executeWatchResult), equalTo("bar"));
watchBuilder = watchBuilder()
.trigger(schedule(cron("0 0 0 1 * ? 2099")))
@ -224,16 +224,16 @@ public class ManualExecutionTests extends AbstractWatcherIntegrationTests {
.setId("_id").setTriggerEvent(triggerEvent).setRecordExecution(true)
.get().getRecordSource().getAsMap();
assertThat(MapPath.<String>eval("state", executeWatchResult), equalTo(ExecutionState.EXECUTED.toString()));
assertThat(MapPath.<String>eval("result.input.payload.foo", executeWatchResult), equalTo("bar"));
assertThat(MapPath.<String>eval("result.actions.0.id", executeWatchResult), equalTo("log"));
assertThat(ObjectPath.<String>eval("state", executeWatchResult), equalTo(ExecutionState.EXECUTED.toString()));
assertThat(ObjectPath.<String>eval("result.input.payload.foo", executeWatchResult), equalTo("bar"));
assertThat(ObjectPath.<String>eval("result.actions.0.id", executeWatchResult), equalTo("log"));
executeWatchResult = watcherClient().prepareExecuteWatch()
.setId("_id").setTriggerEvent(triggerEvent)
.get().getRecordSource().getAsMap();
assertThat(MapPath.<String>eval("state", executeWatchResult), equalTo(ExecutionState.THROTTLED.toString()));
assertThat(ObjectPath.<String>eval("state", executeWatchResult), equalTo(ExecutionState.THROTTLED.toString()));
}
@Test

View File

@ -28,8 +28,8 @@ public class MapPathTests extends ElasticsearchTestCase {
.put("key", "value")
.build();
assertThat(MapPath.eval("key", map), is((Object) "value"));
assertThat(MapPath.eval("key1", map), nullValue());
assertThat(ObjectPath.eval("key", map), is((Object) "value"));
assertThat(ObjectPath.eval("key1", map), nullValue());
}
@Test @Repeat(iterations = 5)
@ -40,7 +40,7 @@ public class MapPathTests extends ElasticsearchTestCase {
.build();
int index = randomInt(3);
assertThat(MapPath.eval("key." + index, map), is(list.get(index)));
assertThat(ObjectPath.eval("key." + index, map), is(list.get(index)));
}
@Test @Repeat(iterations = 5)
@ -51,7 +51,7 @@ public class MapPathTests extends ElasticsearchTestCase {
.build();
int index = randomInt(3);
assertThat(((Number) MapPath.eval("key." + index, map)).intValue(), is(array[index]));
assertThat(((Number) ObjectPath.eval("key." + index, map)).intValue(), is(array[index]));
}
@Test
@ -60,7 +60,7 @@ public class MapPathTests extends ElasticsearchTestCase {
.put("a", ImmutableMap.of("b", "val"))
.build();
assertThat(MapPath.eval("a.b", map), is((Object) "val"));
assertThat(ObjectPath.eval("a.b", map), is((Object) "val"));
}
@ -78,11 +78,11 @@ public class MapPathTests extends ElasticsearchTestCase {
.build())
.build();
assertThat(MapPath.eval("", map), is((Object) map));
assertThat(MapPath.eval("a.b.0.0.c", map), is((Object) "val"));
assertThat(MapPath.eval("a.b.0.0.c.d", map), nullValue());
assertThat(MapPath.eval("a.b.0.0.d", map), nullValue());
assertThat(MapPath.eval("a.b.c", map), nullValue());
assertThat(ObjectPath.eval("", map), is((Object) map));
assertThat(ObjectPath.eval("a.b.0.0.c", map), is((Object) "val"));
assertThat(ObjectPath.eval("a.b.0.0.c.d", map), nullValue());
assertThat(ObjectPath.eval("a.b.0.0.d", map), nullValue());
assertThat(ObjectPath.eval("a.b.c", map), nullValue());
}
}

View File

@ -95,14 +95,14 @@ public class TransformSearchTests extends AbstractWatcherIntegrationTests {
SearchResponse response = client().prepareSearch("output1").get();
assertNoFailures(response);
assertThat(response.getHits().getTotalHits(), greaterThanOrEqualTo(1l));
assertThat(((Map) response.getHits().getAt(0).sourceAsMap().get("data")).size(), equalTo(1));
assertThat(((Map) response.getHits().getAt(0).sourceAsMap().get("data")).get("key3").toString(), equalTo("20"));
assertThat(response.getHits().getAt(0).sourceAsMap().size(), equalTo(1));
assertThat(response.getHits().getAt(0).sourceAsMap().get("key3").toString(), equalTo("20"));
response = client().prepareSearch("output2").get();
assertNoFailures(response);
assertThat(response.getHits().getTotalHits(), greaterThanOrEqualTo(1l));
assertThat(((Map) response.getHits().getAt(0).sourceAsMap().get("data")).size(), equalTo(1));
assertThat(((Map) response.getHits().getAt(0).sourceAsMap().get("data")).get("key3").toString(), equalTo("20"));
assertThat(response.getHits().getAt(0).sourceAsMap().size(), equalTo(1));
assertThat(response.getHits().getAt(0).sourceAsMap().get("key3").toString(), equalTo("20"));
}
@Test
@ -190,14 +190,14 @@ public class TransformSearchTests extends AbstractWatcherIntegrationTests {
SearchResponse response = client().prepareSearch("output1").get();
assertNoFailures(response);
assertThat(response.getHits().getTotalHits(), greaterThanOrEqualTo(1l));
assertThat(((Map) response.getHits().getAt(0).sourceAsMap().get("data")).size(), equalTo(1));
assertThat(((Map) response.getHits().getAt(0).sourceAsMap().get("data")).get("key4").toString(), equalTo("30"));
assertThat(response.getHits().getAt(0).sourceAsMap().size(), equalTo(1));
assertThat(response.getHits().getAt(0).sourceAsMap().get("key4").toString(), equalTo("30"));
response = client().prepareSearch("output2").get();
assertNoFailures(response);
assertThat(response.getHits().getTotalHits(), greaterThanOrEqualTo(1l));
assertThat(((Map) response.getHits().getAt(0).sourceAsMap().get("data")).size(), equalTo(1));
assertThat(((Map) response.getHits().getAt(0).sourceAsMap().get("data")).get("key4").toString(), equalTo("30"));
assertThat(response.getHits().getAt(0).sourceAsMap().size(), equalTo(1));
assertThat(response.getHits().getAt(0).sourceAsMap().get("key4").toString(), equalTo("30"));
}
}

View File

@ -13,7 +13,7 @@ import org.elasticsearch.watcher.condition.always.AlwaysCondition;
import org.elasticsearch.watcher.execution.ActionExecutionMode;
import org.elasticsearch.watcher.history.HistoryStore;
import org.elasticsearch.watcher.support.template.Template;
import org.elasticsearch.watcher.support.xcontent.MapPath;
import org.elasticsearch.watcher.support.xcontent.ObjectPath;
import org.elasticsearch.watcher.test.AbstractWatcherIntegrationTests;
import org.elasticsearch.watcher.test.WatcherTestUtils;
import org.elasticsearch.watcher.transport.actions.execute.ExecuteWatchResponse;
@ -100,8 +100,8 @@ public class WatchMetadataTests extends AbstractWatcherIntegrationTests {
ExecuteWatchResponse executeWatchResponse = watcherClient().prepareExecuteWatch("_name").setTriggerEvent(triggerEvent).setActionMode("_all", ActionExecutionMode.SIMULATE).get();
Map<String, Object> result = executeWatchResponse.getRecordSource().getAsMap();;
assertThat(MapPath.<String>eval("metadata.foo", result), equalTo("bar"));
assertThat(MapPath.<String>eval("result.actions.0.id", result), equalTo("testLogger"));
assertThat(MapPath.<String>eval("result.actions.0.logging.logged_text", result), equalTo("This is a test"));
assertThat(ObjectPath.<String>eval("metadata.foo", result), equalTo("bar"));
assertThat(ObjectPath.<String>eval("result.actions.0.id", result), equalTo("testLogger"));
assertThat(ObjectPath.<String>eval("result.actions.0.logging.logged_text", result), equalTo("This is a test"));
}
}

View File

@ -30,6 +30,7 @@ import org.elasticsearch.watcher.actions.email.service.Profile;
import org.elasticsearch.watcher.actions.index.ExecutableIndexAction;
import org.elasticsearch.watcher.actions.index.IndexAction;
import org.elasticsearch.watcher.actions.index.IndexActionFactory;
import org.elasticsearch.watcher.actions.throttler.ActionThrottler;
import org.elasticsearch.watcher.actions.webhook.ExecutableWebhookAction;
import org.elasticsearch.watcher.actions.webhook.WebhookAction;
import org.elasticsearch.watcher.actions.webhook.WebhookActionFactory;
@ -76,7 +77,6 @@ import org.elasticsearch.watcher.support.secret.SecretService;
import org.elasticsearch.watcher.support.template.Template;
import org.elasticsearch.watcher.support.template.TemplateEngine;
import org.elasticsearch.watcher.test.WatcherTestUtils;
import org.elasticsearch.watcher.actions.throttler.ActionThrottler;
import org.elasticsearch.watcher.transform.ExecutableTransform;
import org.elasticsearch.watcher.transform.TransformFactory;
import org.elasticsearch.watcher.transform.TransformRegistry;
@ -387,7 +387,7 @@ public class WatchTests extends ElasticsearchTestCase {
list.add(new ActionWrapper("_email_" + randomAsciiOfLength(8), randomThrottler(), transform, new ExecutableEmailAction(action, logger, emailService, templateEngine)));
}
if (randomBoolean()) {
IndexAction aciton = new IndexAction("_index", "_type");
IndexAction aciton = new IndexAction("_index", "_type", null);
list.add(new ActionWrapper("_index_" + randomAsciiOfLength(8), randomThrottler(), randomTransform(), new ExecutableIndexAction(aciton, logger, client)));
}
if (randomBoolean()) {