Watcher: Refactor integration test into rest test (elastic/elasticsearch#2864)
As the specified integration tests were better suited as REST tests, I changed them into those and removed the (partially already unused) java classes. Original commit: elastic/x-pack-elasticsearch@f26d8d94e8
This commit is contained in:
parent
00f8281f37
commit
efc5de782b
|
@ -1,215 +0,0 @@
|
||||||
/*
|
|
||||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
|
||||||
* or more contributor license agreements. Licensed under the Elastic License;
|
|
||||||
* you may not use this file except in compliance with the Elastic License.
|
|
||||||
*/
|
|
||||||
package org.elasticsearch.messy.tests;
|
|
||||||
|
|
||||||
import org.elasticsearch.action.search.SearchRequest;
|
|
||||||
import org.elasticsearch.action.search.SearchResponse;
|
|
||||||
import org.elasticsearch.action.search.SearchType;
|
|
||||||
import org.elasticsearch.plugins.Plugin;
|
|
||||||
import org.elasticsearch.script.groovy.GroovyPlugin;
|
|
||||||
import org.elasticsearch.search.SearchHit;
|
|
||||||
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;
|
|
||||||
import org.elasticsearch.search.sort.SortOrder;
|
|
||||||
import org.elasticsearch.xpack.watcher.history.HistoryStore;
|
|
||||||
import org.elasticsearch.xpack.watcher.support.WatcherDateTimeUtils;
|
|
||||||
import org.elasticsearch.xpack.watcher.test.AbstractWatcherIntegrationTestCase;
|
|
||||||
import org.elasticsearch.xpack.watcher.transport.actions.execute.ExecuteWatchResponse;
|
|
||||||
import org.elasticsearch.xpack.watcher.transport.actions.put.PutWatchResponse;
|
|
||||||
import org.elasticsearch.xpack.watcher.trigger.schedule.ScheduleTriggerEvent;
|
|
||||||
import org.joda.time.DateTime;
|
|
||||||
import org.joda.time.DateTimeZone;
|
|
||||||
|
|
||||||
import java.util.List;
|
|
||||||
|
|
||||||
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
|
|
||||||
import static org.elasticsearch.index.query.QueryBuilders.matchQuery;
|
|
||||||
import static org.elasticsearch.search.aggregations.AggregationBuilders.dateHistogram;
|
|
||||||
import static org.elasticsearch.search.builder.SearchSourceBuilder.searchSource;
|
|
||||||
import static org.elasticsearch.xpack.watcher.actions.ActionBuilders.indexAction;
|
|
||||||
import static org.elasticsearch.xpack.watcher.client.WatchSourceBuilders.watchBuilder;
|
|
||||||
import static org.elasticsearch.xpack.watcher.input.InputBuilders.searchInput;
|
|
||||||
import static org.elasticsearch.xpack.watcher.input.InputBuilders.simpleInput;
|
|
||||||
import static org.elasticsearch.xpack.watcher.transform.TransformBuilders.scriptTransform;
|
|
||||||
import static org.elasticsearch.xpack.watcher.trigger.TriggerBuilders.schedule;
|
|
||||||
import static org.elasticsearch.xpack.watcher.trigger.schedule.Schedules.cron;
|
|
||||||
import static org.hamcrest.Matchers.hasEntry;
|
|
||||||
import static org.hamcrest.Matchers.hasKey;
|
|
||||||
import static org.hamcrest.Matchers.is;
|
|
||||||
|
|
||||||
/**
|
|
||||||
*
|
|
||||||
*/
|
|
||||||
public class IndexActionIT extends AbstractWatcherIntegrationTestCase {
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected List<Class<? extends Plugin>> pluginTypes() {
|
|
||||||
List<Class<? extends Plugin>> types = super.pluginTypes();
|
|
||||||
types.add(GroovyPlugin.class);
|
|
||||||
return types;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void testSimple() throws Exception {
|
|
||||||
PutWatchResponse putWatchResponse = watcherClient().preparePutWatch("_id").setSource(watchBuilder()
|
|
||||||
.trigger(schedule(cron("0/1 * * * * ? 2020")))
|
|
||||||
.input(simpleInput("foo", "bar"))
|
|
||||||
.addAction("index-buckets", indexAction("idx", "type").setExecutionTimeField("@timestamp")))
|
|
||||||
.get();
|
|
||||||
|
|
||||||
assertThat(putWatchResponse.isCreated(), is(true));
|
|
||||||
|
|
||||||
DateTime now = timeWarped() ? timeWarp().clock().now(DateTimeZone.UTC) : DateTime.now(DateTimeZone.UTC);
|
|
||||||
|
|
||||||
ExecuteWatchResponse executeWatchResponse = watcherClient().prepareExecuteWatch("_id")
|
|
||||||
.setTriggerEvent(new ScheduleTriggerEvent(now, now))
|
|
||||||
.get();
|
|
||||||
|
|
||||||
assertThat(executeWatchResponse.getRecordSource().getValue("state"), is((Object) "executed"));
|
|
||||||
|
|
||||||
flush("idx");
|
|
||||||
refresh();
|
|
||||||
|
|
||||||
SearchResponse searchResponse = client().prepareSearch("idx").setTypes("type").get();
|
|
||||||
assertThat(searchResponse.getHits().totalHits(), is(1L));
|
|
||||||
SearchHit hit = searchResponse.getHits().getAt(0);
|
|
||||||
if (timeWarped()) {
|
|
||||||
assertThat(hit.getSource(), hasEntry("@timestamp", (Object) WatcherDateTimeUtils.formatDate(now)));
|
|
||||||
} else {
|
|
||||||
assertThat(hit.getSource(), hasKey("@timestamp"));
|
|
||||||
DateTime timestamp = WatcherDateTimeUtils.parseDate((String) hit.getSource().get("@timestamp"));
|
|
||||||
assertThat(timestamp.isEqual(now) || timestamp.isAfter(now), is(true));
|
|
||||||
}
|
|
||||||
assertThat(hit.getSource(), hasEntry("foo", (Object) "bar"));
|
|
||||||
}
|
|
||||||
|
|
||||||
public void testSimpleWithDocField() throws Exception {
|
|
||||||
PutWatchResponse putWatchResponse = watcherClient().preparePutWatch("_id").setSource(watchBuilder()
|
|
||||||
.trigger(schedule(cron("0/1 * * * * ? 2020")))
|
|
||||||
.input(simpleInput("foo", "bar"))
|
|
||||||
.addAction("index-buckets",
|
|
||||||
scriptTransform("return [ '_doc' : ctx.payload ]"),
|
|
||||||
indexAction("idx", "type").setExecutionTimeField("@timestamp")))
|
|
||||||
|
|
||||||
.get();
|
|
||||||
|
|
||||||
assertThat(putWatchResponse.isCreated(), is(true));
|
|
||||||
|
|
||||||
DateTime now = timeWarped() ? timeWarp().clock().now(DateTimeZone.UTC) : DateTime.now(DateTimeZone.UTC);
|
|
||||||
|
|
||||||
ExecuteWatchResponse executeWatchResponse = watcherClient().prepareExecuteWatch("_id")
|
|
||||||
.setTriggerEvent(new ScheduleTriggerEvent(now, now))
|
|
||||||
.get();
|
|
||||||
|
|
||||||
assertThat(executeWatchResponse.getRecordSource().getValue("state"), is((Object) "executed"));
|
|
||||||
|
|
||||||
flush("idx");
|
|
||||||
refresh();
|
|
||||||
|
|
||||||
SearchResponse searchResponse = client().prepareSearch("idx").setTypes("type").get();
|
|
||||||
assertThat(searchResponse.getHits().totalHits(), is(1L));
|
|
||||||
SearchHit hit = searchResponse.getHits().getAt(0);
|
|
||||||
if (timeWarped()) {
|
|
||||||
assertThat(hit.getSource(), hasEntry("@timestamp", (Object) WatcherDateTimeUtils.formatDate(now)));
|
|
||||||
} else {
|
|
||||||
assertThat(hit.getSource(), hasKey("@timestamp"));
|
|
||||||
DateTime timestamp = WatcherDateTimeUtils.parseDate((String) hit.getSource().get("@timestamp"));
|
|
||||||
assertThat(timestamp.isEqual(now) || timestamp.isAfter(now), is(true));
|
|
||||||
}
|
|
||||||
assertThat(hit.getSource(), hasEntry("foo", (Object) "bar"));
|
|
||||||
}
|
|
||||||
|
|
||||||
public void testSimpleWithDocFieldWrongFieldType() throws Exception {
|
|
||||||
PutWatchResponse putWatchResponse = watcherClient().preparePutWatch("_id").setSource(watchBuilder()
|
|
||||||
.trigger(schedule(cron("0/1 * * * * ? 2020")))
|
|
||||||
.input(simpleInput("foo", "bar"))
|
|
||||||
.addAction("index-buckets",
|
|
||||||
scriptTransform("return [ '_doc' : 1 ]"),
|
|
||||||
indexAction("idx", "type").setExecutionTimeField("@timestamp")))
|
|
||||||
.get();
|
|
||||||
|
|
||||||
assertThat(putWatchResponse.isCreated(), is(true));
|
|
||||||
|
|
||||||
DateTime now = timeWarped() ? timeWarp().clock().now(DateTimeZone.UTC) : DateTime.now(DateTimeZone.UTC);
|
|
||||||
|
|
||||||
ExecuteWatchResponse executeWatchResponse = watcherClient().prepareExecuteWatch("_id")
|
|
||||||
.setTriggerEvent(new ScheduleTriggerEvent(now, now))
|
|
||||||
.setRecordExecution(true)
|
|
||||||
.get();
|
|
||||||
|
|
||||||
assertThat(executeWatchResponse.getRecordSource().getValue("state"), is((Object) "executed"));
|
|
||||||
|
|
||||||
flush();
|
|
||||||
refresh();
|
|
||||||
|
|
||||||
assertThat(client().admin().indices().prepareExists("idx").get().isExists(), is(false));
|
|
||||||
|
|
||||||
assertThat(docCount(HistoryStore.INDEX_PREFIX_WITH_TEMPLATE + "*", HistoryStore.DOC_TYPE, searchSource()
|
|
||||||
.query(matchQuery("result.actions.status", "failure"))), is(1L));
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
public void testIndexAggsBucketsAsDocuments() throws Exception {
|
|
||||||
DateTime now = timeWarped() ? timeWarp().clock().now(DateTimeZone.UTC) : DateTime.now(DateTimeZone.UTC);
|
|
||||||
long bucketCount = randomIntBetween(2, 5);
|
|
||||||
for (int i = 0; i < bucketCount; i++) {
|
|
||||||
index("idx", "type", jsonBuilder().startObject()
|
|
||||||
.field("timestamp", now.minusDays(i))
|
|
||||||
.endObject());
|
|
||||||
}
|
|
||||||
|
|
||||||
flush("idx");
|
|
||||||
refresh();
|
|
||||||
|
|
||||||
PutWatchResponse putWatchResponse = watcherClient().preparePutWatch("_id").setSource(watchBuilder()
|
|
||||||
.trigger(schedule(cron("0/1 * * * * ? 2020")))
|
|
||||||
.input(searchInput(new SearchRequest("idx")
|
|
||||||
.types("type")
|
|
||||||
.searchType(SearchType.QUERY_THEN_FETCH)
|
|
||||||
.source(searchSource()
|
|
||||||
.aggregation(dateHistogram("trend")
|
|
||||||
.field("timestamp")
|
|
||||||
.dateHistogramInterval(DateHistogramInterval.DAY)))))
|
|
||||||
.addAction("index-buckets",
|
|
||||||
|
|
||||||
// this transform takes the bucket list and assigns it to `_doc`
|
|
||||||
// this means each bucket will be indexed as a separate doc,
|
|
||||||
// so we expect to have the same number of documents as the number
|
|
||||||
// of buckets.
|
|
||||||
scriptTransform("return [ '_doc' : ctx.payload.aggregations.trend.buckets]"),
|
|
||||||
|
|
||||||
indexAction("idx", "bucket").setExecutionTimeField("@timestamp")))
|
|
||||||
|
|
||||||
.get();
|
|
||||||
|
|
||||||
assertThat(putWatchResponse.isCreated(), is(true));
|
|
||||||
|
|
||||||
ExecuteWatchResponse executeWatchResponse = watcherClient().prepareExecuteWatch("_id")
|
|
||||||
.setTriggerEvent(new ScheduleTriggerEvent(now, now))
|
|
||||||
.get();
|
|
||||||
|
|
||||||
assertThat(executeWatchResponse.getRecordSource().getValue("state"), is((Object) "executed"));
|
|
||||||
|
|
||||||
flush("idx");
|
|
||||||
refresh();
|
|
||||||
|
|
||||||
SearchResponse searchResponse = client().prepareSearch("idx").setTypes("bucket")
|
|
||||||
.addSort("key", SortOrder.DESC)
|
|
||||||
.get();
|
|
||||||
assertThat(searchResponse.getHits().getTotalHits(), is(bucketCount));
|
|
||||||
DateTime key = now.withMillisOfDay(0);
|
|
||||||
int i = 0;
|
|
||||||
for (SearchHit hit : searchResponse.getHits()) {
|
|
||||||
if (timeWarped()) {
|
|
||||||
assertThat(hit.getSource(), hasEntry("@timestamp", (Object) WatcherDateTimeUtils.formatDate(now)));
|
|
||||||
} else {
|
|
||||||
assertThat(hit.getSource(), hasKey("@timestamp"));
|
|
||||||
DateTime timestamp = WatcherDateTimeUtils.parseDate((String) hit.getSource().get("@timestamp"));
|
|
||||||
assertThat(timestamp.isEqual(now) || timestamp.isAfter(now), is(true));
|
|
||||||
}
|
|
||||||
assertThat(hit.getSource(), hasEntry("key", (Object) key.getMillis()));
|
|
||||||
key = key.minusDays(1);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -0,0 +1,238 @@
|
||||||
|
---
|
||||||
|
"Test simple input to index action":
|
||||||
|
- do:
|
||||||
|
xpack.watcher.put_watch:
|
||||||
|
id: my_watch
|
||||||
|
body: >
|
||||||
|
{
|
||||||
|
"trigger" : { "schedule" : { "cron" : "0/1 * * * * ? 2020" } },
|
||||||
|
"input" : { "simple" : { "foo": "bar" } },
|
||||||
|
"actions" : {
|
||||||
|
"index_action" : {
|
||||||
|
"index" : {
|
||||||
|
"index" : "idx",
|
||||||
|
"doc_type" : "type",
|
||||||
|
"execution_time_field" : "@timestamp"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
- match: { _id: "my_watch" }
|
||||||
|
|
||||||
|
|
||||||
|
- do:
|
||||||
|
xpack.watcher.execute_watch:
|
||||||
|
id: "my_watch"
|
||||||
|
body: >
|
||||||
|
{
|
||||||
|
"trigger_data" : {
|
||||||
|
"triggered_time" : "2016-07-07T09:00:00Z",
|
||||||
|
"scheduled_time" : "2016-07-07T09:00:00Z"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
- match: { "watch_record.state": "executed" }
|
||||||
|
|
||||||
|
- do:
|
||||||
|
indices.refresh: {}
|
||||||
|
|
||||||
|
- do:
|
||||||
|
search:
|
||||||
|
index: idx
|
||||||
|
type: type
|
||||||
|
|
||||||
|
- match: { hits.total: 1 }
|
||||||
|
- match: { hits.hits.0._source.foo: bar }
|
||||||
|
- gte: { hits.hits.0._source.@timestamp: '2016-07-08' }
|
||||||
|
|
||||||
|
---
|
||||||
|
"Test simple input with document field":
|
||||||
|
|
||||||
|
- do:
|
||||||
|
xpack.watcher.put_watch:
|
||||||
|
id: my_watch
|
||||||
|
body: >
|
||||||
|
{
|
||||||
|
"trigger" : { "schedule" : { "cron" : "0/1 * * * * ? 2020" } },
|
||||||
|
"input" : { "simple" : { "foo": "bar" } },
|
||||||
|
"actions" : {
|
||||||
|
"index_action" : {
|
||||||
|
"transform" : { "script" : { "inline": "return [ '_doc' : ctx.payload ]" } },
|
||||||
|
"index" : {
|
||||||
|
"index" : "idx",
|
||||||
|
"doc_type" : "type",
|
||||||
|
"execution_time_field" : "@timestamp"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
- match: { _id: "my_watch" }
|
||||||
|
|
||||||
|
|
||||||
|
- do:
|
||||||
|
xpack.watcher.execute_watch:
|
||||||
|
id: "my_watch"
|
||||||
|
body: >
|
||||||
|
{
|
||||||
|
"trigger_data" : {
|
||||||
|
"triggered_time" : "2016-07-07T09:00:00Z",
|
||||||
|
"scheduled_time" : "2016-07-07T09:00:00Z"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
- match: { "watch_record.state": "executed" }
|
||||||
|
|
||||||
|
- do:
|
||||||
|
indices.refresh: {}
|
||||||
|
|
||||||
|
- do:
|
||||||
|
search:
|
||||||
|
index: idx
|
||||||
|
type: type
|
||||||
|
|
||||||
|
- match: { hits.total: 1 }
|
||||||
|
- match: { hits.hits.0._source.foo: bar }
|
||||||
|
- gte: { hits.hits.0._source.@timestamp: '2016-07-08"' }
|
||||||
|
|
||||||
|
|
||||||
|
---
|
||||||
|
"Test simple input with wrong document results in error":
|
||||||
|
|
||||||
|
- do:
|
||||||
|
xpack.watcher.put_watch:
|
||||||
|
id: my_watch
|
||||||
|
body: >
|
||||||
|
{
|
||||||
|
"trigger" : { "schedule" : { "cron" : "0/1 * * * * ? 2020" } },
|
||||||
|
"input" : { "simple" : { "foo": "bar" } },
|
||||||
|
"actions" : {
|
||||||
|
"index_action" : {
|
||||||
|
"transform" : { "script" : { "inline": "return [ '_doc' : 1 ]" } },
|
||||||
|
"index" : {
|
||||||
|
"index" : "idx",
|
||||||
|
"doc_type" : "type",
|
||||||
|
"execution_time_field" : "@timestamp"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
- match: { _id: "my_watch" }
|
||||||
|
|
||||||
|
|
||||||
|
- do:
|
||||||
|
xpack.watcher.execute_watch:
|
||||||
|
id: "my_watch"
|
||||||
|
body: >
|
||||||
|
{
|
||||||
|
"record_execution" : true,
|
||||||
|
"trigger_data" : {
|
||||||
|
"triggered_time" : "2016-07-07T09:00:00Z",
|
||||||
|
"scheduled_time" : "2016-07-07T09:00:00Z"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
- match: { "watch_record.state": "executed" }
|
||||||
|
|
||||||
|
- do:
|
||||||
|
indices.refresh: {}
|
||||||
|
|
||||||
|
- do:
|
||||||
|
indices.exists:
|
||||||
|
index: idx
|
||||||
|
|
||||||
|
- is_false: ''
|
||||||
|
|
||||||
|
- do:
|
||||||
|
search:
|
||||||
|
index: .watcher-history-*
|
||||||
|
type: watch_record
|
||||||
|
body: >
|
||||||
|
{
|
||||||
|
"query" : {
|
||||||
|
"match" : {
|
||||||
|
"result.actions.status": "failure"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
- match: { hits.total: 1 }
|
||||||
|
|
||||||
|
---
|
||||||
|
"Test search input to index action with aggs":
|
||||||
|
|
||||||
|
- do:
|
||||||
|
bulk:
|
||||||
|
refresh: true
|
||||||
|
body:
|
||||||
|
- '{"index": {"_index": "idx", "_type": "type", "_id": "1"}}'
|
||||||
|
- '{"@timestamp": "2016-07-07" }'
|
||||||
|
- '{"index": {"_index": "idx", "_type": "type", "_id": "2"}}'
|
||||||
|
- '{"@timestamp": "2016-07-08" }'
|
||||||
|
- '{"index": {"_index": "idx", "_type": "type", "_id": "3"}}'
|
||||||
|
- '{"@timestamp": "2016-07-09" }'
|
||||||
|
|
||||||
|
- do:
|
||||||
|
xpack.watcher.put_watch:
|
||||||
|
id: my_watch
|
||||||
|
body: >
|
||||||
|
{
|
||||||
|
"trigger" : { "schedule" : { "cron" : "0/1 * * * * ? 2020" } },
|
||||||
|
"input" : {
|
||||||
|
"search" : {
|
||||||
|
"request": {
|
||||||
|
"indices" : [ "idx" ],
|
||||||
|
"types" : [ "type" ],
|
||||||
|
"body" : {
|
||||||
|
"aggs" : {
|
||||||
|
"trend" : {
|
||||||
|
"date_histogram" : {
|
||||||
|
"field" : "@timestamp",
|
||||||
|
"interval" : "day"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"actions" : {
|
||||||
|
"index_action" : {
|
||||||
|
"transform" : { "script" : { "inline": "return [ '_doc' : ctx.payload.aggregations.trend.buckets]" } },
|
||||||
|
"index" : {
|
||||||
|
"index" : "idx",
|
||||||
|
"doc_type" : "bucket",
|
||||||
|
"execution_time_field" : "@timestamp"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
- match: { _id: "my_watch" }
|
||||||
|
|
||||||
|
|
||||||
|
- do:
|
||||||
|
xpack.watcher.execute_watch:
|
||||||
|
id: "my_watch"
|
||||||
|
body: >
|
||||||
|
{
|
||||||
|
"trigger_data" : {
|
||||||
|
"triggered_time" : "2016-07-07T09:00:00Z",
|
||||||
|
"scheduled_time" : "2016-07-07T09:00:00Z"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
- match: { "watch_record.state": "executed" }
|
||||||
|
|
||||||
|
- do:
|
||||||
|
indices.refresh: {}
|
||||||
|
|
||||||
|
- do:
|
||||||
|
search:
|
||||||
|
index: idx
|
||||||
|
type: bucket
|
||||||
|
|
||||||
|
- match: { hits.total: 3 }
|
||||||
|
|
|
@ -1,205 +0,0 @@
|
||||||
/*
|
|
||||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
|
||||||
* or more contributor license agreements. Licensed under the Elastic License;
|
|
||||||
* you may not use this file except in compliance with the Elastic License.
|
|
||||||
*/
|
|
||||||
package org.elasticsearch.xpack.watcher.actions.index;
|
|
||||||
|
|
||||||
import org.apache.lucene.util.LuceneTestCase.AwaitsFix;
|
|
||||||
import org.elasticsearch.action.search.SearchRequest;
|
|
||||||
import org.elasticsearch.action.search.SearchResponse;
|
|
||||||
import org.elasticsearch.action.search.SearchType;
|
|
||||||
import org.elasticsearch.search.SearchHit;
|
|
||||||
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;
|
|
||||||
import org.elasticsearch.search.sort.SortOrder;
|
|
||||||
import org.elasticsearch.xpack.watcher.history.HistoryStore;
|
|
||||||
import org.elasticsearch.xpack.watcher.support.WatcherDateTimeUtils;
|
|
||||||
import org.elasticsearch.xpack.watcher.test.AbstractWatcherIntegrationTestCase;
|
|
||||||
import org.elasticsearch.xpack.watcher.transport.actions.execute.ExecuteWatchResponse;
|
|
||||||
import org.elasticsearch.xpack.watcher.transport.actions.put.PutWatchResponse;
|
|
||||||
import org.elasticsearch.xpack.watcher.trigger.schedule.ScheduleTriggerEvent;
|
|
||||||
import org.joda.time.DateTime;
|
|
||||||
import org.joda.time.DateTimeZone;
|
|
||||||
|
|
||||||
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
|
|
||||||
import static org.elasticsearch.index.query.QueryBuilders.matchQuery;
|
|
||||||
import static org.elasticsearch.search.aggregations.AggregationBuilders.dateHistogram;
|
|
||||||
import static org.elasticsearch.search.builder.SearchSourceBuilder.searchSource;
|
|
||||||
import static org.elasticsearch.xpack.watcher.actions.ActionBuilders.indexAction;
|
|
||||||
import static org.elasticsearch.xpack.watcher.client.WatchSourceBuilders.watchBuilder;
|
|
||||||
import static org.elasticsearch.xpack.watcher.input.InputBuilders.searchInput;
|
|
||||||
import static org.elasticsearch.xpack.watcher.input.InputBuilders.simpleInput;
|
|
||||||
import static org.elasticsearch.xpack.watcher.transform.TransformBuilders.scriptTransform;
|
|
||||||
import static org.elasticsearch.xpack.watcher.trigger.TriggerBuilders.schedule;
|
|
||||||
import static org.elasticsearch.xpack.watcher.trigger.schedule.Schedules.cron;
|
|
||||||
import static org.hamcrest.Matchers.hasEntry;
|
|
||||||
import static org.hamcrest.Matchers.hasKey;
|
|
||||||
import static org.hamcrest.Matchers.is;
|
|
||||||
|
|
||||||
/**
|
|
||||||
*
|
|
||||||
*/
|
|
||||||
@AwaitsFix(bugUrl = "https://github.com/elastic/x-plugins/issues/724")
|
|
||||||
public class IndexActionIntegrationTests extends AbstractWatcherIntegrationTestCase {
|
|
||||||
public void testSimple() throws Exception {
|
|
||||||
PutWatchResponse putWatchResponse = watcherClient().preparePutWatch("_id").setSource(watchBuilder()
|
|
||||||
.trigger(schedule(cron("0/1 * * * * ? 2020")))
|
|
||||||
.input(simpleInput("foo", "bar"))
|
|
||||||
.addAction("index-buckets", indexAction("idx", "type").setExecutionTimeField("@timestamp")))
|
|
||||||
.get();
|
|
||||||
|
|
||||||
assertThat(putWatchResponse.isCreated(), is(true));
|
|
||||||
|
|
||||||
DateTime now = timeWarped() ? timeWarp().clock().now(DateTimeZone.UTC) : DateTime.now(DateTimeZone.UTC);
|
|
||||||
|
|
||||||
ExecuteWatchResponse executeWatchResponse = watcherClient().prepareExecuteWatch("_id")
|
|
||||||
.setTriggerEvent(new ScheduleTriggerEvent(now, now))
|
|
||||||
.get();
|
|
||||||
|
|
||||||
assertThat(executeWatchResponse.getRecordSource().getValue("state"), is((Object) "executed"));
|
|
||||||
|
|
||||||
flush("idx");
|
|
||||||
refresh();
|
|
||||||
|
|
||||||
SearchResponse searchResponse = client().prepareSearch("idx").setTypes("type").get();
|
|
||||||
assertThat(searchResponse.getHits().totalHits(), is(1L));
|
|
||||||
SearchHit hit = searchResponse.getHits().getAt(0);
|
|
||||||
if (timeWarped()) {
|
|
||||||
assertThat(hit.getSource(), hasEntry("@timestamp", (Object) WatcherDateTimeUtils.formatDate(now)));
|
|
||||||
} else {
|
|
||||||
assertThat(hit.getSource(), hasKey("@timestamp"));
|
|
||||||
DateTime timestamp = WatcherDateTimeUtils.parseDate((String) hit.getSource().get("@timestamp"));
|
|
||||||
assertThat(timestamp.isEqual(now) || timestamp.isAfter(now), is(true));
|
|
||||||
}
|
|
||||||
assertThat(hit.getSource(), hasEntry("foo", (Object) "bar"));
|
|
||||||
}
|
|
||||||
|
|
||||||
public void testSimpleWithDocField() throws Exception {
|
|
||||||
PutWatchResponse putWatchResponse = watcherClient().preparePutWatch("_id").setSource(watchBuilder()
|
|
||||||
.trigger(schedule(cron("0/1 * * * * ? 2020")))
|
|
||||||
.input(simpleInput("foo", "bar"))
|
|
||||||
.addAction("index-buckets",
|
|
||||||
scriptTransform("return [ '_doc' : ctx.payload ]"),
|
|
||||||
indexAction("idx", "type").setExecutionTimeField("@timestamp")))
|
|
||||||
|
|
||||||
.get();
|
|
||||||
|
|
||||||
assertThat(putWatchResponse.isCreated(), is(true));
|
|
||||||
|
|
||||||
DateTime now = timeWarped() ? timeWarp().clock().now(DateTimeZone.UTC) : DateTime.now(DateTimeZone.UTC);
|
|
||||||
|
|
||||||
ExecuteWatchResponse executeWatchResponse = watcherClient().prepareExecuteWatch("_id")
|
|
||||||
.setTriggerEvent(new ScheduleTriggerEvent(now, now))
|
|
||||||
.get();
|
|
||||||
|
|
||||||
assertThat(executeWatchResponse.getRecordSource().getValue("state"), is((Object) "executed"));
|
|
||||||
|
|
||||||
flush("idx");
|
|
||||||
refresh();
|
|
||||||
|
|
||||||
SearchResponse searchResponse = client().prepareSearch("idx").setTypes("type").get();
|
|
||||||
assertThat(searchResponse.getHits().totalHits(), is(1L));
|
|
||||||
SearchHit hit = searchResponse.getHits().getAt(0);
|
|
||||||
if (timeWarped()) {
|
|
||||||
assertThat(hit.getSource(), hasEntry("@timestamp", (Object) WatcherDateTimeUtils.formatDate(now)));
|
|
||||||
} else {
|
|
||||||
assertThat(hit.getSource(), hasKey("@timestamp"));
|
|
||||||
DateTime timestamp = WatcherDateTimeUtils.parseDate((String) hit.getSource().get("@timestamp"));
|
|
||||||
assertThat(timestamp.isEqual(now) || timestamp.isAfter(now), is(true));
|
|
||||||
}
|
|
||||||
assertThat(hit.getSource(), hasEntry("foo", (Object) "bar"));
|
|
||||||
}
|
|
||||||
|
|
||||||
public void testSimpleWithDocFieldWrongFieldType() throws Exception {
|
|
||||||
PutWatchResponse putWatchResponse = watcherClient().preparePutWatch("_id").setSource(watchBuilder()
|
|
||||||
.trigger(schedule(cron("0/1 * * * * ? 2020")))
|
|
||||||
.input(simpleInput("foo", "bar"))
|
|
||||||
.addAction("index-buckets",
|
|
||||||
scriptTransform("return [ '_doc' : 1 ]"),
|
|
||||||
indexAction("idx", "type").setExecutionTimeField("@timestamp")))
|
|
||||||
.get();
|
|
||||||
|
|
||||||
assertThat(putWatchResponse.isCreated(), is(true));
|
|
||||||
|
|
||||||
DateTime now = timeWarped() ? timeWarp().clock().now(DateTimeZone.UTC) : DateTime.now(DateTimeZone.UTC);
|
|
||||||
|
|
||||||
ExecuteWatchResponse executeWatchResponse = watcherClient().prepareExecuteWatch("_id")
|
|
||||||
.setTriggerEvent(new ScheduleTriggerEvent(now, now))
|
|
||||||
.setRecordExecution(true)
|
|
||||||
.get();
|
|
||||||
|
|
||||||
assertThat(executeWatchResponse.getRecordSource().getValue("state"), is((Object) "executed"));
|
|
||||||
|
|
||||||
flush();
|
|
||||||
refresh();
|
|
||||||
|
|
||||||
assertThat(client().admin().indices().prepareExists("idx").get().isExists(), is(false));
|
|
||||||
|
|
||||||
assertThat(docCount(HistoryStore.INDEX_PREFIX_WITH_TEMPLATE + "*", HistoryStore.DOC_TYPE, searchSource()
|
|
||||||
.query(matchQuery("result.actions.status", "failure"))), is(1L));
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
public void testIndexAggsBucketsAsDocuments() throws Exception {
|
|
||||||
DateTime now = timeWarped() ? timeWarp().clock().now(DateTimeZone.UTC) : DateTime.now(DateTimeZone.UTC);
|
|
||||||
long bucketCount = randomIntBetween(2, 5);
|
|
||||||
for (int i = 0; i < bucketCount; i++) {
|
|
||||||
index("idx", "type", jsonBuilder().startObject()
|
|
||||||
.field("timestamp", now.minusDays(i))
|
|
||||||
.endObject());
|
|
||||||
}
|
|
||||||
|
|
||||||
flush("idx");
|
|
||||||
refresh();
|
|
||||||
|
|
||||||
PutWatchResponse putWatchResponse = watcherClient().preparePutWatch("_id").setSource(watchBuilder()
|
|
||||||
.trigger(schedule(cron("0/1 * * * * ? 2020")))
|
|
||||||
.input(searchInput(new SearchRequest("idx")
|
|
||||||
.types("type")
|
|
||||||
.searchType(SearchType.QUERY_THEN_FETCH)
|
|
||||||
.source(searchSource()
|
|
||||||
.aggregation(dateHistogram("trend")
|
|
||||||
.field("timestamp")
|
|
||||||
.dateHistogramInterval(DateHistogramInterval.DAY)))))
|
|
||||||
.addAction("index-buckets",
|
|
||||||
|
|
||||||
// this transform takes the bucket list and assigns it to `_doc`
|
|
||||||
// this means each bucket will be indexed as a separate doc,
|
|
||||||
// so we expect to have the same number of documents as the number
|
|
||||||
// of buckets.
|
|
||||||
scriptTransform("return [ '_doc' : ctx.payload.aggregations.trend.buckets]"),
|
|
||||||
|
|
||||||
indexAction("idx", "bucket").setExecutionTimeField("@timestamp")))
|
|
||||||
|
|
||||||
.get();
|
|
||||||
|
|
||||||
assertThat(putWatchResponse.isCreated(), is(true));
|
|
||||||
|
|
||||||
ExecuteWatchResponse executeWatchResponse = watcherClient().prepareExecuteWatch("_id")
|
|
||||||
.setTriggerEvent(new ScheduleTriggerEvent(now, now))
|
|
||||||
.get();
|
|
||||||
|
|
||||||
assertThat(executeWatchResponse.getRecordSource().getValue("state"), is((Object) "executed"));
|
|
||||||
|
|
||||||
flush("idx");
|
|
||||||
refresh();
|
|
||||||
|
|
||||||
SearchResponse searchResponse = client().prepareSearch("idx").setTypes("bucket")
|
|
||||||
.addSort("key", SortOrder.DESC)
|
|
||||||
.get();
|
|
||||||
assertThat(searchResponse.getHits().getTotalHits(), is(bucketCount));
|
|
||||||
DateTime key = now.withMillisOfDay(0);
|
|
||||||
int i = 0;
|
|
||||||
for (SearchHit hit : searchResponse.getHits()) {
|
|
||||||
if (timeWarped()) {
|
|
||||||
assertThat(hit.getSource(), hasEntry("@timestamp", (Object) WatcherDateTimeUtils.formatDate(now)));
|
|
||||||
} else {
|
|
||||||
assertThat(hit.getSource(), hasKey("@timestamp"));
|
|
||||||
DateTime timestamp = WatcherDateTimeUtils.parseDate((String) hit.getSource().get("@timestamp"));
|
|
||||||
assertThat(timestamp.isEqual(now) || timestamp.isAfter(now), is(true));
|
|
||||||
}
|
|
||||||
assertThat(hit.getSource(), hasEntry("key", (Object) key.getMillis()));
|
|
||||||
key = key.minusDays(1);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
Loading…
Reference in New Issue