diff --git a/plugin/src/test/java/org/elasticsearch/xpack/watcher/actions/TimeThrottleIntegrationTests.java b/plugin/src/test/java/org/elasticsearch/xpack/watcher/actions/TimeThrottleIntegrationTests.java index bd7ccdc61cd..7fb6ac791d0 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/watcher/actions/TimeThrottleIntegrationTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/watcher/actions/TimeThrottleIntegrationTests.java @@ -5,45 +5,31 @@ */ package org.elasticsearch.xpack.watcher.actions; -import org.elasticsearch.action.DocWriteResponse; -import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.search.SearchResponse; -import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.query.QueryBuilders; -import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.elasticsearch.search.sort.SortBuilders; +import org.elasticsearch.search.sort.SortOrder; import org.elasticsearch.test.junit.annotations.TestLogging; -import org.elasticsearch.xpack.watcher.condition.CompareCondition; -import org.elasticsearch.xpack.watcher.execution.ExecutionState; import org.elasticsearch.xpack.watcher.history.HistoryStore; -import org.elasticsearch.xpack.watcher.history.WatchRecord; +import org.elasticsearch.xpack.watcher.support.xcontent.ObjectPath; import org.elasticsearch.xpack.watcher.test.AbstractWatcherIntegrationTestCase; import org.elasticsearch.xpack.watcher.transport.actions.put.PutWatchResponse; -import org.joda.time.DateTime; -import org.joda.time.DateTimeZone; -import org.junit.Before; -import java.util.Arrays; -import java.util.List; -import java.util.Locale; import java.util.Map; -import java.util.stream.Collectors; -import static org.elasticsearch.index.query.QueryBuilders.matchQuery; import static org.elasticsearch.index.query.QueryBuilders.termQuery; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; import static org.elasticsearch.xpack.watcher.actions.ActionBuilders.indexAction; +import static org.elasticsearch.xpack.watcher.actions.ActionBuilders.loggingAction; import static org.elasticsearch.xpack.watcher.client.WatchSourceBuilders.watchBuilder; -import static org.elasticsearch.xpack.watcher.input.InputBuilders.searchInput; -import static org.elasticsearch.xpack.watcher.test.WatcherTestUtils.templateRequest; -import static org.elasticsearch.xpack.watcher.transform.TransformBuilders.searchTransform; +import static org.elasticsearch.xpack.watcher.input.InputBuilders.simpleInput; import static org.elasticsearch.xpack.watcher.trigger.TriggerBuilders.schedule; import static org.elasticsearch.xpack.watcher.trigger.schedule.Schedules.interval; import static org.hamcrest.Matchers.is; @TestLogging("org.elasticsearch.xpack.watcher:DEBUG," + - "org.elasticsearch.xpack.security.audit.logfile.LoggingAuditTrail:WARN," + "org.elasticsearch.xpack.watcher.WatcherLifeCycleService:DEBUG," + "org.elasticsearch.xpack.watcher.trigger.ScheduleTriggerMock:TRACE," + "org.elasticsearch.xpack.watcher.WatcherIndexingListener:TRACE") @@ -54,62 +40,30 @@ public class TimeThrottleIntegrationTests extends AbstractWatcherIntegrationTest return true; } - @Before - public void indexTestDocument() { - IndexResponse eventIndexResponse = client().prepareIndex("events", "event") - .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) - .setSource("level", "error") - .get(); - assertEquals(DocWriteResponse.Result.CREATED, eventIndexResponse.getResult()); - } - public void testTimeThrottle() throws Exception { String id = randomAlphaOfLength(20); PutWatchResponse putWatchResponse = watcherClient().preparePutWatch() .setId(id) .setSource(watchBuilder() .trigger(schedule(interval("5s"))) - .input(searchInput(templateRequest(new SearchSourceBuilder(), "events"))) - .condition(new CompareCondition("ctx.payload.hits.total", CompareCondition.Op.GT, 0L)) - .transform(searchTransform(templateRequest(new SearchSourceBuilder(), "events"))) - .addAction("_id", indexAction("actions", "action")) + .input(simpleInput()) + .addAction("my-logging-action", loggingAction("foo")) .defaultThrottlePeriod(TimeValue.timeValueSeconds(30))) .get(); assertThat(putWatchResponse.isCreated(), is(true)); - timeWarp().clock().setTime(DateTime.now(DateTimeZone.UTC)); - timeWarp().trigger(id); - refresh(); - - // the first fire should work - assertHitCount(client().prepareSearch("actions").setTypes("action").get(), 1); + assertHistoryEntryExecuted(id); timeWarp().clock().fastForward(TimeValue.timeValueMillis(4000)); timeWarp().trigger(id); - refresh(); - - // the last fire should have been throttled, so number of actions shouldn't change - assertHitCount(client().prepareSearch("actions").setTypes("action").get(), 1); + assertHistoryEntryThrottled(id); timeWarp().clock().fastForwardSeconds(30); timeWarp().trigger(id); - refresh(); + assertHistoryEntryExecuted(id); - // the last fire occurred passed the throttle period, so a new action should have been added - assertHitCount(client().prepareSearch("actions").setTypes("action").get(), 2); - - SearchResponse response = client().prepareSearch(HistoryStore.INDEX_PREFIX_WITH_TEMPLATE + "*") - .setSource(new SearchSourceBuilder().query(QueryBuilders.boolQuery() - .must(matchQuery(WatchRecord.STATE.getPreferredName(), ExecutionState.THROTTLED.id())) - .must(termQuery("watch_id", id)))) - .get(); - List> hits = Arrays.stream(response.getHits().getHits()) - .map(SearchHit::getSourceAsMap) - .collect(Collectors.toList()); - - String message = String.format(Locale.ROOT, "Expected single throttled hits, but was %s", hits); - assertThat(message, response.getHits().getTotalHits(), is(1L)); + assertTotalHistoryEntries(id, 3); } public void testTimeThrottleDefaults() throws Exception { @@ -118,43 +72,59 @@ public class TimeThrottleIntegrationTests extends AbstractWatcherIntegrationTest .setId(id) .setSource(watchBuilder() .trigger(schedule(interval("1s"))) - .input(searchInput(templateRequest(new SearchSourceBuilder(), "events"))) - .condition(new CompareCondition("ctx.payload.hits.total", CompareCondition.Op.GT, 0L)) - .transform(searchTransform(templateRequest(new SearchSourceBuilder(), "events"))) - .addAction("_id", indexAction("actions", "action"))) + .input(simpleInput()) + .addAction("my-logging-action", indexAction("actions", "action"))) .get(); assertThat(putWatchResponse.isCreated(), is(true)); - timeWarp().clock().setTime(DateTime.now(DateTimeZone.UTC)); - timeWarp().trigger(id); - refresh(); - - // the first trigger should work - SearchResponse response = client().prepareSearch("actions").setTypes("action").get(); - assertHitCount(response, 1); + assertHistoryEntryExecuted(id); timeWarp().clock().fastForwardSeconds(2); timeWarp().trigger(id); - refresh("actions"); - - // the last fire should have been throttled, so number of actions shouldn't change - response = client().prepareSearch("actions").setTypes("action").get(); - assertHitCount(response, 1); + assertHistoryEntryThrottled(id); timeWarp().clock().fastForwardSeconds(10); timeWarp().trigger(id); - refresh(); + assertHistoryEntryExecuted(id); - // the last fire occurred passed the throttle period, so a new action should have been added - response = client().prepareSearch("actions").setTypes("action").get(); - assertHitCount(response, 2); + assertTotalHistoryEntries(id, 3); + } + + private void assertHistoryEntryExecuted(String id) { + Map map = assertLatestHistoryEntry(id); + String actionStatus = ObjectPath.eval("result.actions.0.status", map); + assertThat(actionStatus, is("success")); + } + + private void assertHistoryEntryThrottled(String id) { + Map map = assertLatestHistoryEntry(id); + String actionStatus = ObjectPath.eval("result.actions.0.status", map); + assertThat(actionStatus, is("throttled")); + } + + private Map assertLatestHistoryEntry(String id) { + refresh(HistoryStore.INDEX_PREFIX_WITH_TEMPLATE + "*"); SearchResponse searchResponse = client().prepareSearch(HistoryStore.INDEX_PREFIX_WITH_TEMPLATE + "*") + .setSize(1) .setSource(new SearchSourceBuilder().query(QueryBuilders.boolQuery() - .must(matchQuery(WatchRecord.STATE.getPreferredName(), ExecutionState.THROTTLED.id())) .must(termQuery("watch_id", id)))) + .addSort(SortBuilders.fieldSort("result.execution_time").order(SortOrder.DESC)) .get(); - assertHitCount(searchResponse, 1); + + Map map = searchResponse.getHits().getHits()[0].getSourceAsMap(); + String actionId = ObjectPath.eval("result.actions.0.id", map); + assertThat(actionId, is("my-logging-action")); + return map; + } + + private void assertTotalHistoryEntries(String id, long expectedCount) { + SearchResponse searchResponse = client().prepareSearch(HistoryStore.INDEX_PREFIX_WITH_TEMPLATE + "*") + .setSize(0) + .setSource(new SearchSourceBuilder().query(QueryBuilders.boolQuery().must(termQuery("watch_id", id)))) + .get(); + + assertHitCount(searchResponse, expectedCount); } }