From 11334b2df3892a678c99bb0f35e3e11381bc69ad Mon Sep 17 00:00:00 2001 From: Alexander Reelsen Date: Fri, 11 Aug 2017 09:19:25 +0200 Subject: [PATCH] Tests: Fix TimeThrottleIntegrationTests to not rely on shard actions (elastic/x-pack-elasticsearch#2234) These tests used to fail rarely, because during a watch execution one of the watcher shards was relocated resulting in a second execution of watch. In order to prevent this, the tests do not need to actually create any shards, which causes watcher potentially to be rebalanced. This simplifies and speeds up the test as well. relates elastic/x-pack-elasticsearch#1608 Original commit: elastic/x-pack-elasticsearch@1cfac1145de4a4c375adf2441383734c0cdb4dd6 --- .../actions/TimeThrottleIntegrationTests.java | 128 +++++++----------- 1 file changed, 49 insertions(+), 79 deletions(-) diff --git a/plugin/src/test/java/org/elasticsearch/xpack/watcher/actions/TimeThrottleIntegrationTests.java b/plugin/src/test/java/org/elasticsearch/xpack/watcher/actions/TimeThrottleIntegrationTests.java index bd7ccdc61cd..7fb6ac791d0 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/watcher/actions/TimeThrottleIntegrationTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/watcher/actions/TimeThrottleIntegrationTests.java @@ -5,45 +5,31 @@ */ package org.elasticsearch.xpack.watcher.actions; -import org.elasticsearch.action.DocWriteResponse; -import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.search.SearchResponse; -import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.query.QueryBuilders; -import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.elasticsearch.search.sort.SortBuilders; +import org.elasticsearch.search.sort.SortOrder; import org.elasticsearch.test.junit.annotations.TestLogging; -import org.elasticsearch.xpack.watcher.condition.CompareCondition; -import org.elasticsearch.xpack.watcher.execution.ExecutionState; import org.elasticsearch.xpack.watcher.history.HistoryStore; -import org.elasticsearch.xpack.watcher.history.WatchRecord; +import org.elasticsearch.xpack.watcher.support.xcontent.ObjectPath; import org.elasticsearch.xpack.watcher.test.AbstractWatcherIntegrationTestCase; import org.elasticsearch.xpack.watcher.transport.actions.put.PutWatchResponse; -import org.joda.time.DateTime; -import org.joda.time.DateTimeZone; -import org.junit.Before; -import java.util.Arrays; -import java.util.List; -import java.util.Locale; import java.util.Map; -import java.util.stream.Collectors; -import static org.elasticsearch.index.query.QueryBuilders.matchQuery; import static org.elasticsearch.index.query.QueryBuilders.termQuery; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; import static org.elasticsearch.xpack.watcher.actions.ActionBuilders.indexAction; +import static org.elasticsearch.xpack.watcher.actions.ActionBuilders.loggingAction; import static org.elasticsearch.xpack.watcher.client.WatchSourceBuilders.watchBuilder; -import static org.elasticsearch.xpack.watcher.input.InputBuilders.searchInput; -import static org.elasticsearch.xpack.watcher.test.WatcherTestUtils.templateRequest; -import static org.elasticsearch.xpack.watcher.transform.TransformBuilders.searchTransform; +import static org.elasticsearch.xpack.watcher.input.InputBuilders.simpleInput; import static org.elasticsearch.xpack.watcher.trigger.TriggerBuilders.schedule; import static org.elasticsearch.xpack.watcher.trigger.schedule.Schedules.interval; import static org.hamcrest.Matchers.is; @TestLogging("org.elasticsearch.xpack.watcher:DEBUG," + - "org.elasticsearch.xpack.security.audit.logfile.LoggingAuditTrail:WARN," + "org.elasticsearch.xpack.watcher.WatcherLifeCycleService:DEBUG," + "org.elasticsearch.xpack.watcher.trigger.ScheduleTriggerMock:TRACE," + "org.elasticsearch.xpack.watcher.WatcherIndexingListener:TRACE") @@ -54,62 +40,30 @@ public class TimeThrottleIntegrationTests extends AbstractWatcherIntegrationTest return true; } - @Before - public void indexTestDocument() { - IndexResponse eventIndexResponse = client().prepareIndex("events", "event") - .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) - .setSource("level", "error") - .get(); - assertEquals(DocWriteResponse.Result.CREATED, eventIndexResponse.getResult()); - } - public void testTimeThrottle() throws Exception { String id = randomAlphaOfLength(20); PutWatchResponse putWatchResponse = watcherClient().preparePutWatch() .setId(id) .setSource(watchBuilder() .trigger(schedule(interval("5s"))) - .input(searchInput(templateRequest(new SearchSourceBuilder(), "events"))) - .condition(new CompareCondition("ctx.payload.hits.total", CompareCondition.Op.GT, 0L)) - .transform(searchTransform(templateRequest(new SearchSourceBuilder(), "events"))) - .addAction("_id", indexAction("actions", "action")) + .input(simpleInput()) + .addAction("my-logging-action", loggingAction("foo")) .defaultThrottlePeriod(TimeValue.timeValueSeconds(30))) .get(); assertThat(putWatchResponse.isCreated(), is(true)); - timeWarp().clock().setTime(DateTime.now(DateTimeZone.UTC)); - timeWarp().trigger(id); - refresh(); - - // the first fire should work - assertHitCount(client().prepareSearch("actions").setTypes("action").get(), 1); + assertHistoryEntryExecuted(id); timeWarp().clock().fastForward(TimeValue.timeValueMillis(4000)); timeWarp().trigger(id); - refresh(); - - // the last fire should have been throttled, so number of actions shouldn't change - assertHitCount(client().prepareSearch("actions").setTypes("action").get(), 1); + assertHistoryEntryThrottled(id); timeWarp().clock().fastForwardSeconds(30); timeWarp().trigger(id); - refresh(); + assertHistoryEntryExecuted(id); - // the last fire occurred passed the throttle period, so a new action should have been added - assertHitCount(client().prepareSearch("actions").setTypes("action").get(), 2); - - SearchResponse response = client().prepareSearch(HistoryStore.INDEX_PREFIX_WITH_TEMPLATE + "*") - .setSource(new SearchSourceBuilder().query(QueryBuilders.boolQuery() - .must(matchQuery(WatchRecord.STATE.getPreferredName(), ExecutionState.THROTTLED.id())) - .must(termQuery("watch_id", id)))) - .get(); - List> hits = Arrays.stream(response.getHits().getHits()) - .map(SearchHit::getSourceAsMap) - .collect(Collectors.toList()); - - String message = String.format(Locale.ROOT, "Expected single throttled hits, but was %s", hits); - assertThat(message, response.getHits().getTotalHits(), is(1L)); + assertTotalHistoryEntries(id, 3); } public void testTimeThrottleDefaults() throws Exception { @@ -118,43 +72,59 @@ public class TimeThrottleIntegrationTests extends AbstractWatcherIntegrationTest .setId(id) .setSource(watchBuilder() .trigger(schedule(interval("1s"))) - .input(searchInput(templateRequest(new SearchSourceBuilder(), "events"))) - .condition(new CompareCondition("ctx.payload.hits.total", CompareCondition.Op.GT, 0L)) - .transform(searchTransform(templateRequest(new SearchSourceBuilder(), "events"))) - .addAction("_id", indexAction("actions", "action"))) + .input(simpleInput()) + .addAction("my-logging-action", indexAction("actions", "action"))) .get(); assertThat(putWatchResponse.isCreated(), is(true)); - timeWarp().clock().setTime(DateTime.now(DateTimeZone.UTC)); - timeWarp().trigger(id); - refresh(); - - // the first trigger should work - SearchResponse response = client().prepareSearch("actions").setTypes("action").get(); - assertHitCount(response, 1); + assertHistoryEntryExecuted(id); timeWarp().clock().fastForwardSeconds(2); timeWarp().trigger(id); - refresh("actions"); - - // the last fire should have been throttled, so number of actions shouldn't change - response = client().prepareSearch("actions").setTypes("action").get(); - assertHitCount(response, 1); + assertHistoryEntryThrottled(id); timeWarp().clock().fastForwardSeconds(10); timeWarp().trigger(id); - refresh(); + assertHistoryEntryExecuted(id); - // the last fire occurred passed the throttle period, so a new action should have been added - response = client().prepareSearch("actions").setTypes("action").get(); - assertHitCount(response, 2); + assertTotalHistoryEntries(id, 3); + } + + private void assertHistoryEntryExecuted(String id) { + Map map = assertLatestHistoryEntry(id); + String actionStatus = ObjectPath.eval("result.actions.0.status", map); + assertThat(actionStatus, is("success")); + } + + private void assertHistoryEntryThrottled(String id) { + Map map = assertLatestHistoryEntry(id); + String actionStatus = ObjectPath.eval("result.actions.0.status", map); + assertThat(actionStatus, is("throttled")); + } + + private Map assertLatestHistoryEntry(String id) { + refresh(HistoryStore.INDEX_PREFIX_WITH_TEMPLATE + "*"); SearchResponse searchResponse = client().prepareSearch(HistoryStore.INDEX_PREFIX_WITH_TEMPLATE + "*") + .setSize(1) .setSource(new SearchSourceBuilder().query(QueryBuilders.boolQuery() - .must(matchQuery(WatchRecord.STATE.getPreferredName(), ExecutionState.THROTTLED.id())) .must(termQuery("watch_id", id)))) + .addSort(SortBuilders.fieldSort("result.execution_time").order(SortOrder.DESC)) .get(); - assertHitCount(searchResponse, 1); + + Map map = searchResponse.getHits().getHits()[0].getSourceAsMap(); + String actionId = ObjectPath.eval("result.actions.0.id", map); + assertThat(actionId, is("my-logging-action")); + return map; + } + + private void assertTotalHistoryEntries(String id, long expectedCount) { + SearchResponse searchResponse = client().prepareSearch(HistoryStore.INDEX_PREFIX_WITH_TEMPLATE + "*") + .setSize(0) + .setSource(new SearchSourceBuilder().query(QueryBuilders.boolQuery().must(termQuery("watch_id", id)))) + .get(); + + assertHitCount(searchResponse, expectedCount); } }