From c491e4db167ff2288a32b83d824a4c7c80e10d04 Mon Sep 17 00:00:00 2001 From: uboness Date: Thu, 4 Jun 2015 17:40:41 +0200 Subject: [PATCH] Added support for multi doc indexing to index action This commit adds support for indexing multiple documents with the `index` action. This is done by introducing a special `_doc` field. During action execution, the `_doc` field will be looked up in the payload. If found, the value of the field will be considered as the document that needs to be indexed. If the value is an array of objects, each object in that array will be treated as a separate document and all the documents in the array will be bulk indexed. This commit also changes the result of the action to hold `XContentSource` rather than a payload (to avoid Map creation explosions). Th `XContentSource` was also extended to support lists. Original commit: elastic/x-pack-elasticsearch@86f454b0297d1c0a3da5890fa2bfd116d8a51ad3 --- .../actions/index/ExecutableIndexAction.java | 96 ++++++-- .../watcher/actions/index/IndexAction.java | 56 +++-- .../compare/ExecutableCompareCondition.java | 6 +- .../watcher/support/ArrayObjectIterator.java | 54 +++++ .../template/xmustache/XMustacheFactory.java | 29 +-- .../{MapPath.java => ObjectPath.java} | 10 +- .../xcontent/WatcherXContentUtils.java | 52 +++++ .../support/xcontent/XContentSource.java | 42 +++- .../elasticsearch/watcher/watch/Payload.java | 2 +- .../index/IndexActionIntegrationTests.java | 207 ++++++++++++++++++ .../actions/index/IndexActionTests.java | 187 +++++++++++----- .../throttler/ActionThrottleTests.java | 4 +- .../actions/webhook/WebhookActionTests.java | 8 +- .../execution/ManualExecutionTests.java | 14 +- .../support/xcontent/MapPathTests.java | 20 +- .../integration/TransformSearchTests.java | 16 +- .../test/integration/WatchMetadataTests.java | 8 +- .../watcher/watch/WatchTests.java | 4 +- 18 files changed, 648 insertions(+), 167 deletions(-) create mode 100644 src/main/java/org/elasticsearch/watcher/support/ArrayObjectIterator.java rename src/main/java/org/elasticsearch/watcher/support/xcontent/{MapPath.java => ObjectPath.java} (89%) create mode 100644 src/test/java/org/elasticsearch/watcher/actions/index/IndexActionIntegrationTests.java diff --git a/src/main/java/org/elasticsearch/watcher/actions/index/ExecutableIndexAction.java b/src/main/java/org/elasticsearch/watcher/actions/index/ExecutableIndexAction.java index 49aed2f51ca..21c39dc345f 100644 --- a/src/main/java/org/elasticsearch/watcher/actions/index/ExecutableIndexAction.java +++ b/src/main/java/org/elasticsearch/watcher/actions/index/ExecutableIndexAction.java @@ -5,20 +5,29 @@ */ package org.elasticsearch.watcher.actions.index; +import org.elasticsearch.action.bulk.BulkItemResponse; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.common.xcontent.XContentFactory; +import org.elasticsearch.index.mapper.internal.TimestampFieldMapper; import org.elasticsearch.watcher.actions.Action; import org.elasticsearch.watcher.actions.ExecutableAction; import org.elasticsearch.watcher.execution.WatchExecutionContext; +import org.elasticsearch.watcher.support.ArrayObjectIterator; +import org.elasticsearch.watcher.support.WatcherDateTimeUtils; import org.elasticsearch.watcher.support.init.proxy.ClientProxy; +import org.elasticsearch.watcher.support.xcontent.XContentSource; import org.elasticsearch.watcher.watch.Payload; +import java.io.IOException; import java.util.HashMap; import java.util.Map; +import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; + public class ExecutableIndexAction extends ExecutableAction { private final ClientProxy client; @@ -30,29 +39,86 @@ public class ExecutableIndexAction extends ExecutableAction { @Override public Action.Result execute(String actionId, WatchExecutionContext ctx, Payload payload) throws Exception { + Map data = payload.data(); + if (data.containsKey("_doc")) { + Object doc = data.get("_doc"); + if (doc instanceof Iterable) { + return indexBulk((Iterable) doc, actionId, ctx); + } + if (doc.getClass().isArray()) { + return indexBulk(new ArrayObjectIterator.Iterable(doc), actionId, ctx); + } + if (doc instanceof Map) { + data = (Map) doc; + } else { + throw new IndexActionException("could not execute action [{}] of watch [{}]. failed to index payload data. [_data] field must either hold a Map or an List/Array of Maps", actionId, ctx.watch().id()); + } + } + IndexRequest indexRequest = new IndexRequest(); indexRequest.index(action.index); indexRequest.type(action.docType); - XContentBuilder resultBuilder = XContentFactory.jsonBuilder().prettyPrint(); - resultBuilder.startObject(); - resultBuilder.field("data", payload.data()); - resultBuilder.field("timestamp", ctx.executionTime()); - resultBuilder.endObject(); - indexRequest.source(resultBuilder); + if (action.executionTimeField != null && !TimestampFieldMapper.NAME.equals(action.executionTimeField)) { + if (!(data instanceof HashMap)) { + data = new HashMap<>(data); // ensuring mutability + } + data.put(action.executionTimeField, WatcherDateTimeUtils.formatDate(ctx.executionTime())); + } else { + indexRequest.timestamp(WatcherDateTimeUtils.formatDate(ctx.executionTime())); + } + + indexRequest.source(jsonBuilder().prettyPrint().map(data)); - Map data = new HashMap<>(); if (ctx.simulateAction(actionId)) { - return new IndexAction.Result.Simulated(action.index, action.docType, new Payload.Simple(indexRequest.sourceAsMap())); + return new IndexAction.Result.Simulated(action.index, action.docType, new XContentSource(indexRequest.source())); } IndexResponse response = client.index(indexRequest); - data.put("created", response.isCreated()); - data.put("id", response.getId()); - data.put("version", response.getVersion()); - data.put("type", response.getType()); - data.put("index", response.getIndex()); - return new IndexAction.Result.Success(new Payload.Simple(data)); + XContentBuilder jsonBuilder = jsonBuilder(); + indexResponseToXContent(jsonBuilder, response); + return new IndexAction.Result.Success(new XContentSource(jsonBuilder.bytes())); + } + + Action.Result indexBulk(Iterable list, String actionId, WatchExecutionContext ctx) throws Exception { + BulkRequest bulkRequest = new BulkRequest(); + for (Object item : list) { + if (!(item instanceof Map)) { + throw new IndexActionException("could not execute action [{}] of watch [{}]. failed to index payload data. [_data] field must either hold a Map or an List/Array of Maps", actionId, ctx.watch().id()); + } + Map doc = (Map) item; + IndexRequest indexRequest = new IndexRequest(); + indexRequest.index(action.index); + indexRequest.type(action.docType); + if (action.executionTimeField != null && !TimestampFieldMapper.NAME.equals(action.executionTimeField)) { + if (!(doc instanceof HashMap)) { + doc = new HashMap<>(doc); // ensuring mutability + } + doc.put(action.executionTimeField, WatcherDateTimeUtils.formatDate(ctx.executionTime())); + } else { + indexRequest.timestamp(WatcherDateTimeUtils.formatDate(ctx.executionTime())); + } + indexRequest.source(jsonBuilder().prettyPrint().map(doc)); + bulkRequest.add(indexRequest); + } + BulkResponse bulkResponse = client.bulk(bulkRequest); + XContentBuilder jsonBuilder = jsonBuilder().startArray(); + for (BulkItemResponse item : bulkResponse) { + IndexResponse response = item.getResponse(); + indexResponseToXContent(jsonBuilder, response); + } + jsonBuilder.endArray(); + return new IndexAction.Result.Success(new XContentSource(jsonBuilder.bytes())); + } + + static void indexResponseToXContent(XContentBuilder builder, IndexResponse response) throws IOException { + builder.startObject() + .field("created", response.isCreated()) + .field("id", response.getId()) + .field("version", response.getVersion()) + .field("type", response.getType()) + .field("index", response.getIndex()) + .endObject(); } } diff --git a/src/main/java/org/elasticsearch/watcher/actions/index/IndexAction.java b/src/main/java/org/elasticsearch/watcher/actions/index/IndexAction.java index e29b47d9aa5..0e98b5ecd02 100644 --- a/src/main/java/org/elasticsearch/watcher/actions/index/IndexAction.java +++ b/src/main/java/org/elasticsearch/watcher/actions/index/IndexAction.java @@ -5,11 +5,12 @@ */ package org.elasticsearch.watcher.actions.index; +import org.elasticsearch.common.Nullable; import org.elasticsearch.common.ParseField; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.watcher.actions.Action; -import org.elasticsearch.watcher.watch.Payload; +import org.elasticsearch.watcher.support.xcontent.XContentSource; import java.io.IOException; @@ -22,10 +23,12 @@ public class IndexAction implements Action { final String index; final String docType; + final @Nullable String executionTimeField; - public IndexAction(String index, String docType) { + public IndexAction(String index, String docType, @Nullable String executionTimeField) { this.index = index; this.docType = docType; + this.executionTimeField = executionTimeField; } @Override @@ -41,35 +44,45 @@ public class IndexAction implements Action { return docType; } + public String getExecutionTimeField() { + return executionTimeField; + } + @Override public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; - IndexAction action = (IndexAction) o; + IndexAction that = (IndexAction) o; - if (!index.equals(action.index)) return false; - return docType.equals(action.docType); + if (!index.equals(that.index)) return false; + if (!docType.equals(that.docType)) return false; + return !(executionTimeField != null ? !executionTimeField.equals(that.executionTimeField) : that.executionTimeField != null); } @Override public int hashCode() { int result = index.hashCode(); result = 31 * result + docType.hashCode(); + result = 31 * result + (executionTimeField != null ? executionTimeField.hashCode() : 0); return result; } @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - return builder.startObject() - .field(Field.INDEX.getPreferredName(), index) - .field(Field.DOC_TYPE.getPreferredName(), docType) - .endObject(); + builder.startObject(); + builder.field(Field.INDEX.getPreferredName(), index); + builder.field(Field.DOC_TYPE.getPreferredName(), docType); + if (executionTimeField != null) { + builder.field(Field.EXECUTION_TIME_FIELD.getPreferredName(), executionTimeField); + } + return builder.endObject(); } public static IndexAction parse(String watchId, String actionId, XContentParser parser) throws IOException { String index = null; String docType = null; + String executionTimeField = null; String currentFieldName = null; XContentParser.Token token; @@ -81,6 +94,8 @@ public class IndexAction implements Action { index = parser.text(); } else if (Field.DOC_TYPE.match(currentFieldName)) { docType = parser.text(); + } else if (Field.EXECUTION_TIME_FIELD.match(currentFieldName)) { + executionTimeField = parser.text(); } else { throw new IndexActionException("could not parse [{}] action [{}/{}]. unexpected string field [{}]", TYPE, watchId, actionId, currentFieldName); } @@ -97,7 +112,7 @@ public class IndexAction implements Action { throw new IndexActionException("could not parse [{}] action [{}/{}]. missing required [{}] field", TYPE, watchId, actionId, Field.DOC_TYPE.getPreferredName()); } - return new IndexAction(index, docType); + return new IndexAction(index, docType, executionTimeField); } public static Builder builder(String index, String docType) { @@ -108,14 +123,14 @@ public class IndexAction implements Action { class Success extends Action.Result implements Result { - private final Payload response; + private final XContentSource response; - public Success(Payload response) { + public Success(XContentSource response) { super(TYPE, Status.SUCCESS); this.response = response; } - public Payload response() { + public XContentSource response() { return response; } @@ -131,9 +146,9 @@ public class IndexAction implements Action { private final String index; private final String docType; - private final Payload source; + private final XContentSource source; - protected Simulated(String index, String docType, Payload source) { + protected Simulated(String index, String docType, XContentSource source) { super(TYPE, Status.SIMULATED); this.index = index; this.docType = docType; @@ -148,7 +163,7 @@ public class IndexAction implements Action { return docType; } - public Payload source() { + public XContentSource source() { return source; } @@ -169,21 +184,28 @@ public class IndexAction implements Action { final String index; final String docType; + String executionTimeField; private Builder(String index, String docType) { this.index = index; this.docType = docType; } + public Builder setExecutionTimeField(String executionTimeField) { + this.executionTimeField = executionTimeField; + return this; + } + @Override public IndexAction build() { - return new IndexAction(index, docType); + return new IndexAction(index, docType, executionTimeField); } } interface Field extends Action.Field { ParseField INDEX = new ParseField("index"); ParseField DOC_TYPE = new ParseField("doc_type"); + ParseField EXECUTION_TIME_FIELD = new ParseField("execution_time_field"); ParseField SOURCE = new ParseField("source"); ParseField RESPONSE = new ParseField("response"); ParseField REQUEST = new ParseField("request"); diff --git a/src/main/java/org/elasticsearch/watcher/condition/compare/ExecutableCompareCondition.java b/src/main/java/org/elasticsearch/watcher/condition/compare/ExecutableCompareCondition.java index 3226ff3c3c9..74d916898ce 100644 --- a/src/main/java/org/elasticsearch/watcher/condition/compare/ExecutableCompareCondition.java +++ b/src/main/java/org/elasticsearch/watcher/condition/compare/ExecutableCompareCondition.java @@ -12,7 +12,7 @@ import org.elasticsearch.watcher.execution.WatchExecutionContext; import org.elasticsearch.watcher.support.Variables; import org.elasticsearch.watcher.support.WatcherDateTimeUtils; import org.elasticsearch.watcher.support.clock.Clock; -import org.elasticsearch.watcher.support.xcontent.MapPath; +import org.elasticsearch.watcher.support.xcontent.ObjectPath; import java.io.IOException; import java.util.HashMap; @@ -59,13 +59,13 @@ public class ExecutableCompareCondition extends ExecutableCondition { + + private final Object array; + private final int length; + private int index; + + public ArrayObjectIterator(Object array) { + this.array = array; + this.length = Array.getLength(array); + this.index = 0; + } + + @Override + public boolean hasNext() { + return index < length; + } + + @Override + public Object next() { + return Array.get(array, index++); + } + + @Override + public void remove() { + throw new UnsupportedOperationException("array iterator does not support removing elements"); + } + + public static class Iterable implements java.lang.Iterable { + + private Object array; + + public Iterable(Object array) { + this.array = array; + } + + @Override + public Iterator iterator() { + return new ArrayObjectIterator(array); + } + } +} diff --git a/src/main/java/org/elasticsearch/watcher/support/template/xmustache/XMustacheFactory.java b/src/main/java/org/elasticsearch/watcher/support/template/xmustache/XMustacheFactory.java index 49048538ae2..8d4f4ae9aa9 100644 --- a/src/main/java/org/elasticsearch/watcher/support/template/xmustache/XMustacheFactory.java +++ b/src/main/java/org/elasticsearch/watcher/support/template/xmustache/XMustacheFactory.java @@ -11,6 +11,7 @@ import org.elasticsearch.common.mustache.DefaultMustacheFactory; import org.elasticsearch.common.mustache.MustacheException; import org.elasticsearch.common.mustache.reflect.ReflectionObjectHandler; import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.watcher.support.ArrayObjectIterator; import java.io.IOException; import java.io.Writer; @@ -100,36 +101,10 @@ public class XMustacheFactory extends DefaultMustacheFactory { */ @Override public Iterator iterator() { - return new Iter(array); + return new ArrayObjectIterator(array); } - static class Iter implements Iterator { - private final Object array; - private final int length; - private int index; - - public Iter(Object array) { - this.array = array; - this.length = Array.getLength(array); - this.index = 0; - } - - @Override - public boolean hasNext() { - return index < length; - } - - @Override - public Object next() { - return Array.get(array, index++); - } - - @Override - public void remove() { - throw new UnsupportedOperationException("array iterator does not support removing elements"); - } - } } static class CollectionMap extends AbstractMap implements Iterable { diff --git a/src/main/java/org/elasticsearch/watcher/support/xcontent/MapPath.java b/src/main/java/org/elasticsearch/watcher/support/xcontent/ObjectPath.java similarity index 89% rename from src/main/java/org/elasticsearch/watcher/support/xcontent/MapPath.java rename to src/main/java/org/elasticsearch/watcher/support/xcontent/ObjectPath.java index 8fe1c8df285..8c8ccdd13c6 100644 --- a/src/main/java/org/elasticsearch/watcher/support/xcontent/MapPath.java +++ b/src/main/java/org/elasticsearch/watcher/support/xcontent/ObjectPath.java @@ -14,16 +14,16 @@ import java.util.Map; /** * */ -public class MapPath { +public class ObjectPath { - private MapPath() { + private ObjectPath() { } - public static T eval(String path, Map map) { - return (T) eval(path, (Object) map); + public static T eval(String path, Object object) { + return (T) evalContext(path, object); } - private static Object eval(String path, Object ctx) { + private static Object evalContext(String path, Object ctx) { String[] parts = Strings.splitStringToArray(path, '.'); StringBuilder resolved = new StringBuilder(); for (String part : parts) { diff --git a/src/main/java/org/elasticsearch/watcher/support/xcontent/WatcherXContentUtils.java b/src/main/java/org/elasticsearch/watcher/support/xcontent/WatcherXContentUtils.java index 1791b54a4cf..87c9c43c866 100644 --- a/src/main/java/org/elasticsearch/watcher/support/xcontent/WatcherXContentUtils.java +++ b/src/main/java/org/elasticsearch/watcher/support/xcontent/WatcherXContentUtils.java @@ -5,7 +5,16 @@ */ package org.elasticsearch.watcher.support.xcontent; +import org.elasticsearch.ElasticsearchParseException; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.collect.Tuple; +import org.elasticsearch.common.compress.CompressedStreamInput; +import org.elasticsearch.common.compress.Compressor; +import org.elasticsearch.common.compress.CompressorFactory; +import org.elasticsearch.common.io.stream.BytesStreamInput; +import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.common.xcontent.XContentType; import java.io.IOException; import java.util.ArrayList; @@ -19,6 +28,49 @@ public class WatcherXContentUtils { private WatcherXContentUtils() { } + public static Tuple convertToObject(BytesReference bytes) throws ElasticsearchParseException { + if (bytes.hasArray()) { + return convertToObject(bytes.array(), bytes.arrayOffset(), bytes.length()); + } + try { + XContentParser parser; + XContentType contentType; + Compressor compressor = CompressorFactory.compressor(bytes); + if (compressor != null) { + CompressedStreamInput compressedStreamInput = compressor.streamInput(bytes.streamInput()); + contentType = XContentFactory.xContentType(compressedStreamInput); + compressedStreamInput.resetToBufferStart(); + parser = XContentFactory.xContent(contentType).createParser(compressedStreamInput); + } else { + contentType = XContentFactory.xContentType(bytes); + parser = XContentFactory.xContent(contentType).createParser(bytes.streamInput()); + } + return Tuple.tuple(contentType, readValue(parser, parser.nextToken())); + } catch (IOException e) { + throw new ElasticsearchParseException("Failed to parse content to map", e); + } + } + + public static Tuple convertToObject(byte[] data, int offset, int length) throws ElasticsearchParseException { + try { + XContentParser parser; + XContentType contentType; + Compressor compressor = CompressorFactory.compressor(data, offset, length); + if (compressor != null) { + CompressedStreamInput compressedStreamInput = compressor.streamInput(new BytesStreamInput(data, offset, length, false)); + contentType = XContentFactory.xContentType(compressedStreamInput); + compressedStreamInput.resetToBufferStart(); + parser = XContentFactory.xContent(contentType).createParser(compressedStreamInput); + } else { + contentType = XContentFactory.xContentType(data, offset, length); + parser = XContentFactory.xContent(contentType).createParser(data, offset, length); + } + return Tuple.tuple(contentType, readValue(parser, parser.nextToken())); + } catch (IOException e) { + throw new ElasticsearchParseException("Failed to parse content to map", e); + } + } + // TODO open this up in core public static List readList(XContentParser parser, XContentParser.Token token) throws IOException { List list = new ArrayList<>(); diff --git a/src/main/java/org/elasticsearch/watcher/support/xcontent/XContentSource.java b/src/main/java/org/elasticsearch/watcher/support/xcontent/XContentSource.java index 1e099e9aef2..d81bb006804 100644 --- a/src/main/java/org/elasticsearch/watcher/support/xcontent/XContentSource.java +++ b/src/main/java/org/elasticsearch/watcher/support/xcontent/XContentSource.java @@ -13,6 +13,7 @@ import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.xcontent.*; import java.io.IOException; +import java.util.List; import java.util.Map; /** @@ -23,7 +24,7 @@ public class XContentSource implements ToXContent { private final BytesReference bytes; private XContentType contentType; - private Map data; + private Object data; /** * Constructs a new XContentSource out of the given bytes reference. @@ -39,16 +40,32 @@ public class XContentSource implements ToXContent { return bytes; } + /** + * @return true if the top level value of the source is a map + */ + public boolean isMap() { + return data() instanceof Map; + } + /** * @return The source as a map */ public Map getAsMap() { - if (data == null) { - Tuple> tuple = XContentHelper.convertToMap(bytes, false); - this.contentType = tuple.v1(); - this.data = tuple.v2(); - } - return data; + return (Map) data(); + } + + /** + * @return true if the top level value of the source is a list + */ + public boolean isList() { + return data() instanceof List; + } + + /** + * @return The source as a list + */ + public List getAsList() { + return (List) data(); } /** @@ -58,7 +75,7 @@ public class XContentSource implements ToXContent { * @return The extracted value or {@code null} if no value is associated with the given path */ public T getValue(String path) { - return (T) MapPath.eval(path, getAsMap()); + return (T) ObjectPath.eval(path, data()); } @Override @@ -84,4 +101,13 @@ public class XContentSource implements ToXContent { return contentType; } + private Object data() { + if (data == null) { + Tuple tuple = WatcherXContentUtils.convertToObject(bytes); + this.contentType = tuple.v1(); + this.data = tuple.v2(); + } + return data; + } + } diff --git a/src/main/java/org/elasticsearch/watcher/watch/Payload.java b/src/main/java/org/elasticsearch/watcher/watch/Payload.java index 9608909f488..4e9d3e3b166 100644 --- a/src/main/java/org/elasticsearch/watcher/watch/Payload.java +++ b/src/main/java/org/elasticsearch/watcher/watch/Payload.java @@ -77,7 +77,7 @@ public interface Payload extends ToXContent { } } - static class XContent extends Simple { + class XContent extends Simple { public XContent(XContentParser parser) { super(mapOrdered(parser)); diff --git a/src/test/java/org/elasticsearch/watcher/actions/index/IndexActionIntegrationTests.java b/src/test/java/org/elasticsearch/watcher/actions/index/IndexActionIntegrationTests.java new file mode 100644 index 00000000000..359dc228d94 --- /dev/null +++ b/src/test/java/org/elasticsearch/watcher/actions/index/IndexActionIntegrationTests.java @@ -0,0 +1,207 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.watcher.actions.index; + +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.search.SearchType; +import org.elasticsearch.common.joda.time.DateTime; +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogram; +import org.elasticsearch.search.sort.SortOrder; +import org.elasticsearch.watcher.history.HistoryStore; +import org.elasticsearch.watcher.support.WatcherDateTimeUtils; +import org.elasticsearch.watcher.test.AbstractWatcherIntegrationTests; +import org.elasticsearch.watcher.transport.actions.execute.ExecuteWatchResponse; +import org.elasticsearch.watcher.transport.actions.put.PutWatchResponse; +import org.elasticsearch.watcher.trigger.schedule.ScheduleTriggerEvent; +import org.junit.Test; + +import static org.elasticsearch.common.joda.time.DateTimeZone.UTC; +import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; +import static org.elasticsearch.index.query.QueryBuilders.matchQuery; +import static org.elasticsearch.search.aggregations.AggregationBuilders.dateHistogram; +import static org.elasticsearch.search.builder.SearchSourceBuilder.searchSource; +import static org.elasticsearch.watcher.actions.ActionBuilders.indexAction; +import static org.elasticsearch.watcher.client.WatchSourceBuilders.watchBuilder; +import static org.elasticsearch.watcher.input.InputBuilders.searchInput; +import static org.elasticsearch.watcher.input.InputBuilders.simpleInput; +import static org.elasticsearch.watcher.transform.TransformBuilders.scriptTransform; +import static org.elasticsearch.watcher.trigger.TriggerBuilders.schedule; +import static org.elasticsearch.watcher.trigger.schedule.Schedules.cron; +import static org.hamcrest.Matchers.*; + +/** + * + */ +public class IndexActionIntegrationTests extends AbstractWatcherIntegrationTests { + + @Test + public void testSimple() throws Exception { + PutWatchResponse putWatchResponse = watcherClient().preparePutWatch("_id").setSource(watchBuilder() + .trigger(schedule(cron("0/1 * * * * ? 2020"))) + .input(simpleInput("foo", "bar")) + .addAction("index-buckets", indexAction("idx", "type").setExecutionTimeField("@timestamp"))) + .get(); + + assertThat(putWatchResponse.isCreated(), is(true)); + + DateTime now = timeWarped() ? timeWarp().clock().now(UTC) : DateTime.now(UTC); + + ExecuteWatchResponse executeWatchResponse = watcherClient().prepareExecuteWatch("_id") + .setTriggerEvent(new ScheduleTriggerEvent(now, now)) + .get(); + + assertThat(executeWatchResponse.getRecordSource().getValue("state"), is((Object) "executed")); + + flush("idx"); + refresh(); + + SearchResponse searchResponse = client().prepareSearch("idx").setTypes("type").get(); + assertThat(searchResponse.getHits().totalHits(), is(1L)); + SearchHit hit = searchResponse.getHits().getAt(0); + if (timeWarped()) { + assertThat(hit.getSource(), hasEntry("@timestamp", (Object) WatcherDateTimeUtils.formatDate(now))); + } else { + assertThat(hit.getSource(), hasKey("@timestamp")); + DateTime timestamp = WatcherDateTimeUtils.parseDate((String) hit.getSource().get("@timestamp")); + assertThat(timestamp.isEqual(now) || timestamp.isAfter(now), is(true)); + } + assertThat(hit.getSource(), hasEntry("foo", (Object) "bar")); + } + + @Test + public void testSimple_WithDocField() throws Exception { + PutWatchResponse putWatchResponse = watcherClient().preparePutWatch("_id").setSource(watchBuilder() + .trigger(schedule(cron("0/1 * * * * ? 2020"))) + .input(simpleInput("foo", "bar")) + .addAction("index-buckets", + scriptTransform("return [ '_doc' : ctx.payload ]"), + indexAction("idx", "type").setExecutionTimeField("@timestamp"))) + + .get(); + + assertThat(putWatchResponse.isCreated(), is(true)); + + DateTime now = timeWarped() ? timeWarp().clock().now(UTC) : DateTime.now(UTC); + + ExecuteWatchResponse executeWatchResponse = watcherClient().prepareExecuteWatch("_id") + .setTriggerEvent(new ScheduleTriggerEvent(now, now)) + .get(); + + assertThat(executeWatchResponse.getRecordSource().getValue("state"), is((Object) "executed")); + + flush("idx"); + refresh(); + + SearchResponse searchResponse = client().prepareSearch("idx").setTypes("type").get(); + assertThat(searchResponse.getHits().totalHits(), is(1L)); + SearchHit hit = searchResponse.getHits().getAt(0); + if (timeWarped()) { + assertThat(hit.getSource(), hasEntry("@timestamp", (Object) WatcherDateTimeUtils.formatDate(now))); + } else { + assertThat(hit.getSource(), hasKey("@timestamp")); + DateTime timestamp = WatcherDateTimeUtils.parseDate((String) hit.getSource().get("@timestamp")); + assertThat(timestamp.isEqual(now) || timestamp.isAfter(now), is(true)); + } + assertThat(hit.getSource(), hasEntry("foo", (Object) "bar")); + } + + @Test + public void testSimple_WithDocField_WrongFieldType() throws Exception { + PutWatchResponse putWatchResponse = watcherClient().preparePutWatch("_id").setSource(watchBuilder() + .trigger(schedule(cron("0/1 * * * * ? 2020"))) + .input(simpleInput("foo", "bar")) + .addAction("index-buckets", + scriptTransform("return [ '_doc' : 1 ]"), + indexAction("idx", "type").setExecutionTimeField("@timestamp"))) + .get(); + + assertThat(putWatchResponse.isCreated(), is(true)); + + DateTime now = timeWarped() ? timeWarp().clock().now(UTC) : DateTime.now(UTC); + + ExecuteWatchResponse executeWatchResponse = watcherClient().prepareExecuteWatch("_id") + .setTriggerEvent(new ScheduleTriggerEvent(now, now)) + .setRecordExecution(true) + .get(); + + assertThat(executeWatchResponse.getRecordSource().getValue("state"), is((Object) "executed")); + + flush(); + refresh(); + + assertThat(client().admin().indices().prepareExists("idx").get().isExists(), is(false)); + + assertThat(docCount(HistoryStore.INDEX_PREFIX + "*", HistoryStore.DOC_TYPE, searchSource() + .query(matchQuery("result.actions.status", "failure"))), is(1L)); + + } + + @Test + public void testIndexAggsBucketsAsDocuments() throws Exception { + DateTime now = timeWarped() ? timeWarp().clock().now(UTC) : DateTime.now(UTC); + long bucketCount = (long) randomIntBetween(2, 5); + for (int i = 0; i < bucketCount; i++) { + index("idx", "type", jsonBuilder().startObject() + .field("timestamp", now.minusDays(i)) + .endObject()); + } + + flush("idx"); + refresh(); + + PutWatchResponse putWatchResponse = watcherClient().preparePutWatch("_id").setSource(watchBuilder() + .trigger(schedule(cron("0/1 * * * * ? 2020"))) + .input(searchInput(new SearchRequest("idx") + .types("type") + .searchType(SearchType.COUNT) + .source(searchSource() + .aggregation(dateHistogram("trend") + .field("timestamp") + .interval(DateHistogram.Interval.DAY))))) + .addAction("index-buckets", + + // this transform takes the bucket list and assigns it to `_doc` + // this means each bucket will be indexed as a separate doc, + // so we expect to have the same number of documents as the number + // of buckets. + scriptTransform("return [ '_doc' : ctx.payload.aggregations.trend.buckets]"), + + indexAction("idx", "bucket").setExecutionTimeField("@timestamp"))) + + .get(); + + assertThat(putWatchResponse.isCreated(), is(true)); + + ExecuteWatchResponse executeWatchResponse = watcherClient().prepareExecuteWatch("_id") + .setTriggerEvent(new ScheduleTriggerEvent(now, now)) + .get(); + + assertThat(executeWatchResponse.getRecordSource().getValue("state"), is((Object) "executed")); + + flush("idx"); + refresh(); + + SearchResponse searchResponse = client().prepareSearch("idx").setTypes("bucket") + .addSort("key", SortOrder.DESC) + .get(); + assertThat(searchResponse.getHits().getTotalHits(), is(bucketCount)); + DateTime key = now.withMillisOfDay(0); + int i = 0; + for (SearchHit hit : searchResponse.getHits()) { + if (timeWarped()) { + assertThat(hit.getSource(), hasEntry("@timestamp", (Object) WatcherDateTimeUtils.formatDate(now))); + } else { + assertThat(hit.getSource(), hasKey("@timestamp")); + DateTime timestamp = WatcherDateTimeUtils.parseDate((String) hit.getSource().get("@timestamp")); + assertThat(timestamp.isEqual(now) || timestamp.isAfter(now), is(true)); + } + assertThat(hit.getSource(), hasEntry("key", (Object) key.getMillis())); + key = key.minusDays(1); + } + } +} diff --git a/src/test/java/org/elasticsearch/watcher/actions/index/IndexActionTests.java b/src/test/java/org/elasticsearch/watcher/actions/index/IndexActionTests.java index 13e57e18124..6745fb23e94 100644 --- a/src/test/java/org/elasticsearch/watcher/actions/index/IndexActionTests.java +++ b/src/test/java/org/elasticsearch/watcher/actions/index/IndexActionTests.java @@ -7,99 +7,179 @@ package org.elasticsearch.watcher.actions.index; import com.carrotsearch.randomizedtesting.annotations.Repeat; import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.common.collect.ImmutableList; +import org.elasticsearch.common.collect.ImmutableMap; +import org.elasticsearch.common.collect.ImmutableSet; import org.elasticsearch.common.joda.time.DateTime; +import org.elasticsearch.common.joda.time.DateTimeZone; import org.elasticsearch.common.settings.ImmutableSettings; -import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.json.JsonXContent; -import org.elasticsearch.script.ScriptService; +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.aggregations.bucket.terms.Terms; +import org.elasticsearch.search.sort.SortOrder; import org.elasticsearch.test.ElasticsearchIntegrationTest; import org.elasticsearch.watcher.actions.Action; import org.elasticsearch.watcher.actions.Action.Result.Status; -import org.elasticsearch.watcher.actions.email.service.Authentication; -import org.elasticsearch.watcher.actions.email.service.Email; -import org.elasticsearch.watcher.actions.email.service.EmailService; -import org.elasticsearch.watcher.actions.email.service.Profile; -import org.elasticsearch.watcher.execution.TriggeredExecutionContext; import org.elasticsearch.watcher.execution.WatchExecutionContext; -import org.elasticsearch.watcher.support.http.HttpClient; -import org.elasticsearch.watcher.support.http.auth.HttpAuthRegistry; +import org.elasticsearch.watcher.support.WatcherDateTimeUtils; import org.elasticsearch.watcher.support.init.proxy.ClientProxy; -import org.elasticsearch.watcher.support.init.proxy.ScriptServiceProxy; +import org.elasticsearch.watcher.support.xcontent.XContentSource; import org.elasticsearch.watcher.test.WatcherTestUtils; -import org.elasticsearch.watcher.trigger.schedule.ScheduleTriggerEvent; import org.elasticsearch.watcher.watch.Payload; -import org.elasticsearch.watcher.watch.Watch; import org.junit.Test; -import java.util.HashMap; import java.util.Map; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery; +import static org.elasticsearch.search.aggregations.AggregationBuilders.terms; import static org.elasticsearch.search.builder.SearchSourceBuilder.searchSource; -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.instanceOf; -import static org.mockito.Mockito.mock; +import static org.hamcrest.Matchers.*; /** */ public class IndexActionTests extends ElasticsearchIntegrationTest { - @Test - public void testIndexActionExecute() throws Exception { + @Test @Repeat(iterations = 6) + public void testIndexActionExecute_SingleDoc() throws Exception { - IndexAction action = new IndexAction("test-index", "test-type"); + String timestampField = randomFrom(null, "_timestamp", "@timestamp"); + boolean customTimestampField = "@timestamp".equals(timestampField); + + if (timestampField == null || "_timestamp".equals(timestampField)) { + assertThat(prepareCreate("test-index") + .addMapping("test-type", "{ \"test-type\" : { \"_timestamp\" : { \"enabled\" : \"true\" }}}") + .get().isAcknowledged(), is(true)); + } + + IndexAction action = new IndexAction("test-index", "test-type", timestampField); ExecutableIndexAction executable = new ExecutableIndexAction(action, logger, ClientProxy.of(client())); - final String account = "account1"; - Watch watch = WatcherTestUtils.createTestWatch("test_watch", - ClientProxy.of(client()), - ScriptServiceProxy.of(internalCluster().getInstance(ScriptService.class)), - new HttpClient(ImmutableSettings.EMPTY, mock(HttpAuthRegistry.class)).start(), - new EmailService() { - @Override - public EmailService.EmailSent send(Email email, Authentication auth, Profile profile) { - return new EmailSent(account, email); - } + DateTime executionTime = DateTime.now(DateTimeZone.UTC); + Payload payload = randomBoolean() ? new Payload.Simple("foo", "bar") : new Payload.Simple("_doc", ImmutableMap.of("foo", "bar")); + WatchExecutionContext ctx = WatcherTestUtils.mockExecutionContext("_id", executionTime, payload); - @Override - public EmailSent send(Email email, Authentication auth, Profile profile, String accountName) { - return new EmailSent(account, email); - } - }, - logger); - WatchExecutionContext ctx = new TriggeredExecutionContext(watch, new DateTime(), new ScheduleTriggerEvent(watch.id(), new DateTime(), new DateTime()), TimeValue.timeValueSeconds(5)); - - Map payloadMap = new HashMap<>(); - payloadMap.put("test", "foo"); - Action.Result result = executable.execute("_id", ctx, new Payload.Simple(payloadMap)); + Action.Result result = executable.execute("_id", ctx, ctx.payload()); assertThat(result.status(), equalTo(Status.SUCCESS)); assertThat(result, instanceOf(IndexAction.Result.Success.class)); IndexAction.Result.Success successResult = (IndexAction.Result.Success) result; - Map responseData = successResult.response().data(); - assertThat(responseData.get("created"), equalTo((Object)Boolean.TRUE)); - assertThat(responseData.get("version"), equalTo((Object) 1L)); - assertThat(responseData.get("type").toString(), equalTo("test-type")); - assertThat(responseData.get("index").toString(), equalTo("test-index")); + XContentSource response = successResult.response(); + assertThat(response.getValue("created"), equalTo((Object)Boolean.TRUE)); + assertThat(response.getValue("version"), equalTo((Object) 1)); + assertThat(response.getValue("type").toString(), equalTo("test-type")); + assertThat(response.getValue("index").toString(), equalTo("test-index")); refresh(); //Manually refresh to make sure data is available - SearchResponse sr = client().prepareSearch("test-index") + SearchResponse searchResponse = client().prepareSearch("test-index") .setTypes("test-type") - .setSource(searchSource().query(matchAllQuery()).buildAsBytes()).get(); + .setSource(searchSource() + .query(matchAllQuery()) + .aggregation(terms("timestamps").field(customTimestampField ? timestampField : "_timestamp")) + .buildAsBytes()) + .get(); - assertThat(sr.getHits().totalHits(), equalTo(1L)); + assertThat(searchResponse.getHits().totalHits(), equalTo(1L)); + SearchHit hit = searchResponse.getHits().getAt(0); + + if (customTimestampField) { + assertThat(hit.getSource().size(), is(2)); + assertThat(hit.getSource(), hasEntry("foo", (Object) "bar")); + assertThat(hit.getSource(), hasEntry(timestampField, (Object) WatcherDateTimeUtils.formatDate(executionTime))); + } else { + assertThat(hit.getSource().size(), is(1)); + assertThat(hit.getSource(), hasEntry("foo", (Object) "bar")); + } + Terms terms = searchResponse.getAggregations().get("timestamps"); + assertThat(terms, notNullValue()); + assertThat(terms.getBuckets(), hasSize(1)); + assertThat(terms.getBuckets().get(0).getKeyAsNumber().longValue(), is(executionTime.getMillis())); + assertThat(terms.getBuckets().get(0).getDocCount(), is(1L)); + } + + @Test @Repeat(iterations = 6) + public void testIndexActionExecute_MultiDoc() throws Exception { + + String timestampField = randomFrom(null, "_timestamp", "@timestamp"); + boolean customTimestampField = "@timestamp".equals(timestampField); + + if (timestampField == null || "_timestamp".equals(timestampField)) { + assertThat(prepareCreate("test-index") + .addMapping("test-type", "{ \"test-type\" : { \"_timestamp\" : { \"enabled\" : \"true\" }}}") + .get().isAcknowledged(), is(true)); + } + + Object list = randomFrom( + new Map[] { ImmutableMap.of("foo", "bar"), ImmutableMap.of("foo", "bar1") }, + ImmutableList.of(ImmutableMap.of("foo", "bar"), ImmutableMap.of("foo", "bar1")), + ImmutableSet.of(ImmutableMap.of("foo", "bar"), ImmutableMap.of("foo", "bar1")) + ); + + IndexAction action = new IndexAction("test-index", "test-type", timestampField); + ExecutableIndexAction executable = new ExecutableIndexAction(action, logger, ClientProxy.of(client())); + DateTime executionTime = DateTime.now(DateTimeZone.UTC); + WatchExecutionContext ctx = WatcherTestUtils.mockExecutionContext("_id", executionTime, new Payload.Simple("_doc", list)); + + Action.Result result = executable.execute("_id", ctx, ctx.payload()); + + assertThat(result.status(), equalTo(Status.SUCCESS)); + assertThat(result, instanceOf(IndexAction.Result.Success.class)); + IndexAction.Result.Success successResult = (IndexAction.Result.Success) result; + XContentSource response = successResult.response(); + assertThat(response.getValue("0.created"), equalTo((Object)Boolean.TRUE)); + assertThat(response.getValue("0.version"), equalTo((Object) 1)); + assertThat(response.getValue("0.type").toString(), equalTo("test-type")); + assertThat(response.getValue("0.index").toString(), equalTo("test-index")); + assertThat(response.getValue("1.created"), equalTo((Object)Boolean.TRUE)); + assertThat(response.getValue("1.version"), equalTo((Object) 1)); + assertThat(response.getValue("1.type").toString(), equalTo("test-type")); + assertThat(response.getValue("1.index").toString(), equalTo("test-index")); + + refresh(); //Manually refresh to make sure data is available + + SearchResponse searchResponse = client().prepareSearch("test-index") + .setTypes("test-type") + .addSort("foo", SortOrder.ASC) + .setSource(searchSource() + .query(matchAllQuery()) + .aggregation(terms("timestamps").field(customTimestampField ? timestampField : "_timestamp")) + .buildAsBytes()) + .get(); + + assertThat(searchResponse.getHits().totalHits(), equalTo(2L)); + SearchHit hit = searchResponse.getHits().getAt(0); + if (customTimestampField) { + assertThat(hit.getSource().size(), is(2)); + assertThat(hit.getSource(), hasEntry("foo", (Object) "bar")); + assertThat(hit.getSource(), hasEntry(timestampField, (Object) WatcherDateTimeUtils.formatDate(executionTime))); + } else { + assertThat(hit.getSource().size(), is(1)); + assertThat(hit.getSource(), hasEntry("foo", (Object) "bar")); + } + hit = searchResponse.getHits().getAt(1); + if (customTimestampField) { + assertThat(hit.getSource().size(), is(2)); + assertThat(hit.getSource(), hasEntry("foo", (Object) "bar1")); + assertThat(hit.getSource(), hasEntry(timestampField, (Object) WatcherDateTimeUtils.formatDate(executionTime))); + } else { + assertThat(hit.getSource().size(), is(1)); + assertThat(hit.getSource(), hasEntry("foo", (Object) "bar1")); + } } @Test @Repeat(iterations = 10) public void testParser() throws Exception { + String timestampField = randomBoolean() ? "@timestamp" : null; XContentBuilder builder = jsonBuilder(); - builder.startObject() - .field(IndexAction.Field.INDEX.getPreferredName(), "test-index") - .field(IndexAction.Field.DOC_TYPE.getPreferredName(), "test-type") - .endObject(); + builder.startObject(); + builder.field(IndexAction.Field.INDEX.getPreferredName(), "test-index"); + builder.field(IndexAction.Field.DOC_TYPE.getPreferredName(), "test-type"); + if (timestampField != null) { + builder.field(IndexAction.Field.EXECUTION_TIME_FIELD.getPreferredName(), timestampField); + } + builder.endObject(); IndexActionFactory actionParser = new IndexActionFactory(ImmutableSettings.EMPTY, ClientProxy.of(client())); XContentParser parser = JsonXContent.jsonXContent.createParser(builder.bytes()); @@ -109,6 +189,9 @@ public class IndexActionTests extends ElasticsearchIntegrationTest { assertThat(executable.action().docType, equalTo("test-type")); assertThat(executable.action().index, equalTo("test-index")); + if (timestampField != null) { + assertThat(executable.action().executionTimeField, equalTo(timestampField)); + } } @Test @Repeat(iterations = 10) diff --git a/src/test/java/org/elasticsearch/watcher/actions/throttler/ActionThrottleTests.java b/src/test/java/org/elasticsearch/watcher/actions/throttler/ActionThrottleTests.java index b9c8a8c7f30..0e13ef310d1 100644 --- a/src/test/java/org/elasticsearch/watcher/actions/throttler/ActionThrottleTests.java +++ b/src/test/java/org/elasticsearch/watcher/actions/throttler/ActionThrottleTests.java @@ -25,7 +25,7 @@ import org.elasticsearch.watcher.history.WatchRecord; import org.elasticsearch.watcher.support.clock.SystemClock; import org.elasticsearch.watcher.support.http.HttpRequestTemplate; import org.elasticsearch.watcher.support.template.Template; -import org.elasticsearch.watcher.support.xcontent.MapPath; +import org.elasticsearch.watcher.support.xcontent.ObjectPath; import org.elasticsearch.watcher.test.AbstractWatcherIntegrationTests; import org.elasticsearch.watcher.transport.actions.execute.ExecuteWatchResponse; import org.elasticsearch.watcher.transport.actions.get.GetWatchRequest; @@ -358,7 +358,7 @@ public class ActionThrottleTests extends AbstractWatcherIntegrationTests { private String getExecutionStatus(Map watchRecordMap) { - return MapPath.eval("result.actions.0.status", watchRecordMap); + return ObjectPath.eval("result.actions.0.status", watchRecordMap); } private ManualExecutionContext getManualExecutionContext(TimeValue throttlePeriod) { diff --git a/src/test/java/org/elasticsearch/watcher/actions/webhook/WebhookActionTests.java b/src/test/java/org/elasticsearch/watcher/actions/webhook/WebhookActionTests.java index 722013bcd2f..98027232660 100644 --- a/src/test/java/org/elasticsearch/watcher/actions/webhook/WebhookActionTests.java +++ b/src/test/java/org/elasticsearch/watcher/actions/webhook/WebhookActionTests.java @@ -93,23 +93,19 @@ public class WebhookActionTests extends ElasticsearchTestCase { @Test @Repeat(iterations = 30) public void testExecute() throws Exception { - ClientProxy client = mock(ClientProxy.class); ExecuteScenario scenario = randomFrom(ExecuteScenario.Success, ExecuteScenario.ErrorCode); HttpClient httpClient = scenario.client(); HttpMethod method = randomFrom(HttpMethod.GET, HttpMethod.POST, HttpMethod.PUT, HttpMethod.DELETE, HttpMethod.HEAD); - final String account = "account1"; - HttpRequestTemplate httpRequest = getHttpRequestTemplate(method, TEST_HOST, TEST_PORT, testPath, testBody, null); WebhookAction action = new WebhookAction(httpRequest); ExecutableWebhookAction executable = new ExecutableWebhookAction(action, logger, httpClient, templateEngine); - - Watch watch = createWatch("test_watch", client, account); - WatchExecutionContext ctx = new TriggeredExecutionContext(watch, new DateTime(), new ScheduleTriggerEvent(watch.id(), new DateTime(), new DateTime()), timeValueSeconds(5)); + WatchExecutionContext ctx = WatcherTestUtils.mockExecutionContext("_id", new Payload.Simple("foo", "bar")); Action.Result actionResult = executable.execute("_id", ctx, Payload.EMPTY); + scenario.assertResult(httpClient, actionResult); } diff --git a/src/test/java/org/elasticsearch/watcher/execution/ManualExecutionTests.java b/src/test/java/org/elasticsearch/watcher/execution/ManualExecutionTests.java index db9b7cf9080..e2ad8b40b23 100644 --- a/src/test/java/org/elasticsearch/watcher/execution/ManualExecutionTests.java +++ b/src/test/java/org/elasticsearch/watcher/execution/ManualExecutionTests.java @@ -21,7 +21,7 @@ import org.elasticsearch.watcher.history.HistoryStore; import org.elasticsearch.watcher.history.WatchRecord; import org.elasticsearch.watcher.input.simple.SimpleInput; import org.elasticsearch.watcher.support.Script; -import org.elasticsearch.watcher.support.xcontent.MapPath; +import org.elasticsearch.watcher.support.xcontent.ObjectPath; import org.elasticsearch.watcher.test.AbstractWatcherIntegrationTests; import org.elasticsearch.watcher.transport.actions.delete.DeleteWatchResponse; import org.elasticsearch.watcher.transport.actions.get.GetWatchRequest; @@ -208,8 +208,8 @@ public class ManualExecutionTests extends AbstractWatcherIntegrationTests { .get().getRecordSource().getAsMap(); - assertThat(MapPath.eval("state", executeWatchResult), equalTo(ExecutionState.EXECUTION_NOT_NEEDED.toString())); - assertThat(MapPath.eval("result.input.payload.foo", executeWatchResult), equalTo("bar")); + assertThat(ObjectPath.eval("state", executeWatchResult), equalTo(ExecutionState.EXECUTION_NOT_NEEDED.toString())); + assertThat(ObjectPath.eval("result.input.payload.foo", executeWatchResult), equalTo("bar")); watchBuilder = watchBuilder() .trigger(schedule(cron("0 0 0 1 * ? 2099"))) @@ -224,16 +224,16 @@ public class ManualExecutionTests extends AbstractWatcherIntegrationTests { .setId("_id").setTriggerEvent(triggerEvent).setRecordExecution(true) .get().getRecordSource().getAsMap(); - assertThat(MapPath.eval("state", executeWatchResult), equalTo(ExecutionState.EXECUTED.toString())); - assertThat(MapPath.eval("result.input.payload.foo", executeWatchResult), equalTo("bar")); - assertThat(MapPath.eval("result.actions.0.id", executeWatchResult), equalTo("log")); + assertThat(ObjectPath.eval("state", executeWatchResult), equalTo(ExecutionState.EXECUTED.toString())); + assertThat(ObjectPath.eval("result.input.payload.foo", executeWatchResult), equalTo("bar")); + assertThat(ObjectPath.eval("result.actions.0.id", executeWatchResult), equalTo("log")); executeWatchResult = watcherClient().prepareExecuteWatch() .setId("_id").setTriggerEvent(triggerEvent) .get().getRecordSource().getAsMap(); - assertThat(MapPath.eval("state", executeWatchResult), equalTo(ExecutionState.THROTTLED.toString())); + assertThat(ObjectPath.eval("state", executeWatchResult), equalTo(ExecutionState.THROTTLED.toString())); } @Test diff --git a/src/test/java/org/elasticsearch/watcher/support/xcontent/MapPathTests.java b/src/test/java/org/elasticsearch/watcher/support/xcontent/MapPathTests.java index d7c959c4f43..5a175a785e1 100644 --- a/src/test/java/org/elasticsearch/watcher/support/xcontent/MapPathTests.java +++ b/src/test/java/org/elasticsearch/watcher/support/xcontent/MapPathTests.java @@ -28,8 +28,8 @@ public class MapPathTests extends ElasticsearchTestCase { .put("key", "value") .build(); - assertThat(MapPath.eval("key", map), is((Object) "value")); - assertThat(MapPath.eval("key1", map), nullValue()); + assertThat(ObjectPath.eval("key", map), is((Object) "value")); + assertThat(ObjectPath.eval("key1", map), nullValue()); } @Test @Repeat(iterations = 5) @@ -40,7 +40,7 @@ public class MapPathTests extends ElasticsearchTestCase { .build(); int index = randomInt(3); - assertThat(MapPath.eval("key." + index, map), is(list.get(index))); + assertThat(ObjectPath.eval("key." + index, map), is(list.get(index))); } @Test @Repeat(iterations = 5) @@ -51,7 +51,7 @@ public class MapPathTests extends ElasticsearchTestCase { .build(); int index = randomInt(3); - assertThat(((Number) MapPath.eval("key." + index, map)).intValue(), is(array[index])); + assertThat(((Number) ObjectPath.eval("key." + index, map)).intValue(), is(array[index])); } @Test @@ -60,7 +60,7 @@ public class MapPathTests extends ElasticsearchTestCase { .put("a", ImmutableMap.of("b", "val")) .build(); - assertThat(MapPath.eval("a.b", map), is((Object) "val")); + assertThat(ObjectPath.eval("a.b", map), is((Object) "val")); } @@ -78,11 +78,11 @@ public class MapPathTests extends ElasticsearchTestCase { .build()) .build(); - assertThat(MapPath.eval("", map), is((Object) map)); - assertThat(MapPath.eval("a.b.0.0.c", map), is((Object) "val")); - assertThat(MapPath.eval("a.b.0.0.c.d", map), nullValue()); - assertThat(MapPath.eval("a.b.0.0.d", map), nullValue()); - assertThat(MapPath.eval("a.b.c", map), nullValue()); + assertThat(ObjectPath.eval("", map), is((Object) map)); + assertThat(ObjectPath.eval("a.b.0.0.c", map), is((Object) "val")); + assertThat(ObjectPath.eval("a.b.0.0.c.d", map), nullValue()); + assertThat(ObjectPath.eval("a.b.0.0.d", map), nullValue()); + assertThat(ObjectPath.eval("a.b.c", map), nullValue()); } } diff --git a/src/test/java/org/elasticsearch/watcher/test/integration/TransformSearchTests.java b/src/test/java/org/elasticsearch/watcher/test/integration/TransformSearchTests.java index d0fffea4c67..4df3c16eb39 100644 --- a/src/test/java/org/elasticsearch/watcher/test/integration/TransformSearchTests.java +++ b/src/test/java/org/elasticsearch/watcher/test/integration/TransformSearchTests.java @@ -95,14 +95,14 @@ public class TransformSearchTests extends AbstractWatcherIntegrationTests { SearchResponse response = client().prepareSearch("output1").get(); assertNoFailures(response); assertThat(response.getHits().getTotalHits(), greaterThanOrEqualTo(1l)); - assertThat(((Map) response.getHits().getAt(0).sourceAsMap().get("data")).size(), equalTo(1)); - assertThat(((Map) response.getHits().getAt(0).sourceAsMap().get("data")).get("key3").toString(), equalTo("20")); + assertThat(response.getHits().getAt(0).sourceAsMap().size(), equalTo(1)); + assertThat(response.getHits().getAt(0).sourceAsMap().get("key3").toString(), equalTo("20")); response = client().prepareSearch("output2").get(); assertNoFailures(response); assertThat(response.getHits().getTotalHits(), greaterThanOrEqualTo(1l)); - assertThat(((Map) response.getHits().getAt(0).sourceAsMap().get("data")).size(), equalTo(1)); - assertThat(((Map) response.getHits().getAt(0).sourceAsMap().get("data")).get("key3").toString(), equalTo("20")); + assertThat(response.getHits().getAt(0).sourceAsMap().size(), equalTo(1)); + assertThat(response.getHits().getAt(0).sourceAsMap().get("key3").toString(), equalTo("20")); } @Test @@ -190,14 +190,14 @@ public class TransformSearchTests extends AbstractWatcherIntegrationTests { SearchResponse response = client().prepareSearch("output1").get(); assertNoFailures(response); assertThat(response.getHits().getTotalHits(), greaterThanOrEqualTo(1l)); - assertThat(((Map) response.getHits().getAt(0).sourceAsMap().get("data")).size(), equalTo(1)); - assertThat(((Map) response.getHits().getAt(0).sourceAsMap().get("data")).get("key4").toString(), equalTo("30")); + assertThat(response.getHits().getAt(0).sourceAsMap().size(), equalTo(1)); + assertThat(response.getHits().getAt(0).sourceAsMap().get("key4").toString(), equalTo("30")); response = client().prepareSearch("output2").get(); assertNoFailures(response); assertThat(response.getHits().getTotalHits(), greaterThanOrEqualTo(1l)); - assertThat(((Map) response.getHits().getAt(0).sourceAsMap().get("data")).size(), equalTo(1)); - assertThat(((Map) response.getHits().getAt(0).sourceAsMap().get("data")).get("key4").toString(), equalTo("30")); + assertThat(response.getHits().getAt(0).sourceAsMap().size(), equalTo(1)); + assertThat(response.getHits().getAt(0).sourceAsMap().get("key4").toString(), equalTo("30")); } } diff --git a/src/test/java/org/elasticsearch/watcher/test/integration/WatchMetadataTests.java b/src/test/java/org/elasticsearch/watcher/test/integration/WatchMetadataTests.java index 701724e1b5f..999b4374975 100644 --- a/src/test/java/org/elasticsearch/watcher/test/integration/WatchMetadataTests.java +++ b/src/test/java/org/elasticsearch/watcher/test/integration/WatchMetadataTests.java @@ -13,7 +13,7 @@ import org.elasticsearch.watcher.condition.always.AlwaysCondition; import org.elasticsearch.watcher.execution.ActionExecutionMode; import org.elasticsearch.watcher.history.HistoryStore; import org.elasticsearch.watcher.support.template.Template; -import org.elasticsearch.watcher.support.xcontent.MapPath; +import org.elasticsearch.watcher.support.xcontent.ObjectPath; import org.elasticsearch.watcher.test.AbstractWatcherIntegrationTests; import org.elasticsearch.watcher.test.WatcherTestUtils; import org.elasticsearch.watcher.transport.actions.execute.ExecuteWatchResponse; @@ -100,8 +100,8 @@ public class WatchMetadataTests extends AbstractWatcherIntegrationTests { ExecuteWatchResponse executeWatchResponse = watcherClient().prepareExecuteWatch("_name").setTriggerEvent(triggerEvent).setActionMode("_all", ActionExecutionMode.SIMULATE).get(); Map result = executeWatchResponse.getRecordSource().getAsMap();; - assertThat(MapPath.eval("metadata.foo", result), equalTo("bar")); - assertThat(MapPath.eval("result.actions.0.id", result), equalTo("testLogger")); - assertThat(MapPath.eval("result.actions.0.logging.logged_text", result), equalTo("This is a test")); + assertThat(ObjectPath.eval("metadata.foo", result), equalTo("bar")); + assertThat(ObjectPath.eval("result.actions.0.id", result), equalTo("testLogger")); + assertThat(ObjectPath.eval("result.actions.0.logging.logged_text", result), equalTo("This is a test")); } } diff --git a/src/test/java/org/elasticsearch/watcher/watch/WatchTests.java b/src/test/java/org/elasticsearch/watcher/watch/WatchTests.java index 6c076e25fa5..902bfaefcbb 100644 --- a/src/test/java/org/elasticsearch/watcher/watch/WatchTests.java +++ b/src/test/java/org/elasticsearch/watcher/watch/WatchTests.java @@ -30,6 +30,7 @@ import org.elasticsearch.watcher.actions.email.service.Profile; import org.elasticsearch.watcher.actions.index.ExecutableIndexAction; import org.elasticsearch.watcher.actions.index.IndexAction; import org.elasticsearch.watcher.actions.index.IndexActionFactory; +import org.elasticsearch.watcher.actions.throttler.ActionThrottler; import org.elasticsearch.watcher.actions.webhook.ExecutableWebhookAction; import org.elasticsearch.watcher.actions.webhook.WebhookAction; import org.elasticsearch.watcher.actions.webhook.WebhookActionFactory; @@ -76,7 +77,6 @@ import org.elasticsearch.watcher.support.secret.SecretService; import org.elasticsearch.watcher.support.template.Template; import org.elasticsearch.watcher.support.template.TemplateEngine; import org.elasticsearch.watcher.test.WatcherTestUtils; -import org.elasticsearch.watcher.actions.throttler.ActionThrottler; import org.elasticsearch.watcher.transform.ExecutableTransform; import org.elasticsearch.watcher.transform.TransformFactory; import org.elasticsearch.watcher.transform.TransformRegistry; @@ -387,7 +387,7 @@ public class WatchTests extends ElasticsearchTestCase { list.add(new ActionWrapper("_email_" + randomAsciiOfLength(8), randomThrottler(), transform, new ExecutableEmailAction(action, logger, emailService, templateEngine))); } if (randomBoolean()) { - IndexAction aciton = new IndexAction("_index", "_type"); + IndexAction aciton = new IndexAction("_index", "_type", null); list.add(new ActionWrapper("_index_" + randomAsciiOfLength(8), randomThrottler(), randomTransform(), new ExecutableIndexAction(aciton, logger, client))); } if (randomBoolean()) {