Tests: Fix TimeThrottleIntegrationTests to not rely on shard actions (elastic/x-pack-elasticsearch#2234)

These tests used to fail rarely, because during a watch execution
one of the watcher shards was relocated resulting in a second execution
of watch.

In order to prevent this, the tests do not need to actually create any
shards, which causes watcher potentially to be rebalanced.

This simplifies and speeds up the test as well.

relates elastic/x-pack-elasticsearch#1608

Original commit: elastic/x-pack-elasticsearch@1cfac1145d
This commit is contained in:
Alexander Reelsen 2017-08-11 09:19:25 +02:00 committed by GitHub
parent cc7c9aeddb
commit 11334b2df3
1 changed files with 49 additions and 79 deletions

View File

@ -5,45 +5,31 @@
*/
package org.elasticsearch.xpack.watcher.actions;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.SortBuilders;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.xpack.watcher.condition.CompareCondition;
import org.elasticsearch.xpack.watcher.execution.ExecutionState;
import org.elasticsearch.xpack.watcher.history.HistoryStore;
import org.elasticsearch.xpack.watcher.history.WatchRecord;
import org.elasticsearch.xpack.watcher.support.xcontent.ObjectPath;
import org.elasticsearch.xpack.watcher.test.AbstractWatcherIntegrationTestCase;
import org.elasticsearch.xpack.watcher.transport.actions.put.PutWatchResponse;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.junit.Before;
import java.util.Arrays;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.stream.Collectors;
import static org.elasticsearch.index.query.QueryBuilders.matchQuery;
import static org.elasticsearch.index.query.QueryBuilders.termQuery;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.elasticsearch.xpack.watcher.actions.ActionBuilders.indexAction;
import static org.elasticsearch.xpack.watcher.actions.ActionBuilders.loggingAction;
import static org.elasticsearch.xpack.watcher.client.WatchSourceBuilders.watchBuilder;
import static org.elasticsearch.xpack.watcher.input.InputBuilders.searchInput;
import static org.elasticsearch.xpack.watcher.test.WatcherTestUtils.templateRequest;
import static org.elasticsearch.xpack.watcher.transform.TransformBuilders.searchTransform;
import static org.elasticsearch.xpack.watcher.input.InputBuilders.simpleInput;
import static org.elasticsearch.xpack.watcher.trigger.TriggerBuilders.schedule;
import static org.elasticsearch.xpack.watcher.trigger.schedule.Schedules.interval;
import static org.hamcrest.Matchers.is;
@TestLogging("org.elasticsearch.xpack.watcher:DEBUG," +
"org.elasticsearch.xpack.security.audit.logfile.LoggingAuditTrail:WARN," +
"org.elasticsearch.xpack.watcher.WatcherLifeCycleService:DEBUG," +
"org.elasticsearch.xpack.watcher.trigger.ScheduleTriggerMock:TRACE," +
"org.elasticsearch.xpack.watcher.WatcherIndexingListener:TRACE")
@ -54,62 +40,30 @@ public class TimeThrottleIntegrationTests extends AbstractWatcherIntegrationTest
return true;
}
@Before
public void indexTestDocument() {
IndexResponse eventIndexResponse = client().prepareIndex("events", "event")
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.setSource("level", "error")
.get();
assertEquals(DocWriteResponse.Result.CREATED, eventIndexResponse.getResult());
}
public void testTimeThrottle() throws Exception {
String id = randomAlphaOfLength(20);
PutWatchResponse putWatchResponse = watcherClient().preparePutWatch()
.setId(id)
.setSource(watchBuilder()
.trigger(schedule(interval("5s")))
.input(searchInput(templateRequest(new SearchSourceBuilder(), "events")))
.condition(new CompareCondition("ctx.payload.hits.total", CompareCondition.Op.GT, 0L))
.transform(searchTransform(templateRequest(new SearchSourceBuilder(), "events")))
.addAction("_id", indexAction("actions", "action"))
.input(simpleInput())
.addAction("my-logging-action", loggingAction("foo"))
.defaultThrottlePeriod(TimeValue.timeValueSeconds(30)))
.get();
assertThat(putWatchResponse.isCreated(), is(true));
timeWarp().clock().setTime(DateTime.now(DateTimeZone.UTC));
timeWarp().trigger(id);
refresh();
// the first fire should work
assertHitCount(client().prepareSearch("actions").setTypes("action").get(), 1);
assertHistoryEntryExecuted(id);
timeWarp().clock().fastForward(TimeValue.timeValueMillis(4000));
timeWarp().trigger(id);
refresh();
// the last fire should have been throttled, so number of actions shouldn't change
assertHitCount(client().prepareSearch("actions").setTypes("action").get(), 1);
assertHistoryEntryThrottled(id);
timeWarp().clock().fastForwardSeconds(30);
timeWarp().trigger(id);
refresh();
assertHistoryEntryExecuted(id);
// the last fire occurred passed the throttle period, so a new action should have been added
assertHitCount(client().prepareSearch("actions").setTypes("action").get(), 2);
SearchResponse response = client().prepareSearch(HistoryStore.INDEX_PREFIX_WITH_TEMPLATE + "*")
.setSource(new SearchSourceBuilder().query(QueryBuilders.boolQuery()
.must(matchQuery(WatchRecord.STATE.getPreferredName(), ExecutionState.THROTTLED.id()))
.must(termQuery("watch_id", id))))
.get();
List<Map<String, Object>> hits = Arrays.stream(response.getHits().getHits())
.map(SearchHit::getSourceAsMap)
.collect(Collectors.toList());
String message = String.format(Locale.ROOT, "Expected single throttled hits, but was %s", hits);
assertThat(message, response.getHits().getTotalHits(), is(1L));
assertTotalHistoryEntries(id, 3);
}
public void testTimeThrottleDefaults() throws Exception {
@ -118,43 +72,59 @@ public class TimeThrottleIntegrationTests extends AbstractWatcherIntegrationTest
.setId(id)
.setSource(watchBuilder()
.trigger(schedule(interval("1s")))
.input(searchInput(templateRequest(new SearchSourceBuilder(), "events")))
.condition(new CompareCondition("ctx.payload.hits.total", CompareCondition.Op.GT, 0L))
.transform(searchTransform(templateRequest(new SearchSourceBuilder(), "events")))
.addAction("_id", indexAction("actions", "action")))
.input(simpleInput())
.addAction("my-logging-action", indexAction("actions", "action")))
.get();
assertThat(putWatchResponse.isCreated(), is(true));
timeWarp().clock().setTime(DateTime.now(DateTimeZone.UTC));
timeWarp().trigger(id);
refresh();
// the first trigger should work
SearchResponse response = client().prepareSearch("actions").setTypes("action").get();
assertHitCount(response, 1);
assertHistoryEntryExecuted(id);
timeWarp().clock().fastForwardSeconds(2);
timeWarp().trigger(id);
refresh("actions");
// the last fire should have been throttled, so number of actions shouldn't change
response = client().prepareSearch("actions").setTypes("action").get();
assertHitCount(response, 1);
assertHistoryEntryThrottled(id);
timeWarp().clock().fastForwardSeconds(10);
timeWarp().trigger(id);
refresh();
assertHistoryEntryExecuted(id);
// the last fire occurred passed the throttle period, so a new action should have been added
response = client().prepareSearch("actions").setTypes("action").get();
assertHitCount(response, 2);
assertTotalHistoryEntries(id, 3);
}
private void assertHistoryEntryExecuted(String id) {
Map<String, Object> map = assertLatestHistoryEntry(id);
String actionStatus = ObjectPath.eval("result.actions.0.status", map);
assertThat(actionStatus, is("success"));
}
private void assertHistoryEntryThrottled(String id) {
Map<String, Object> map = assertLatestHistoryEntry(id);
String actionStatus = ObjectPath.eval("result.actions.0.status", map);
assertThat(actionStatus, is("throttled"));
}
private Map<String, Object> assertLatestHistoryEntry(String id) {
refresh(HistoryStore.INDEX_PREFIX_WITH_TEMPLATE + "*");
SearchResponse searchResponse = client().prepareSearch(HistoryStore.INDEX_PREFIX_WITH_TEMPLATE + "*")
.setSize(1)
.setSource(new SearchSourceBuilder().query(QueryBuilders.boolQuery()
.must(matchQuery(WatchRecord.STATE.getPreferredName(), ExecutionState.THROTTLED.id()))
.must(termQuery("watch_id", id))))
.addSort(SortBuilders.fieldSort("result.execution_time").order(SortOrder.DESC))
.get();
assertHitCount(searchResponse, 1);
Map<String, Object> map = searchResponse.getHits().getHits()[0].getSourceAsMap();
String actionId = ObjectPath.eval("result.actions.0.id", map);
assertThat(actionId, is("my-logging-action"));
return map;
}
private void assertTotalHistoryEntries(String id, long expectedCount) {
SearchResponse searchResponse = client().prepareSearch(HistoryStore.INDEX_PREFIX_WITH_TEMPLATE + "*")
.setSize(0)
.setSource(new SearchSourceBuilder().query(QueryBuilders.boolQuery().must(termQuery("watch_id", id))))
.get();
assertHitCount(searchResponse, expectedCount);
}
}