From 2f18dd8269f9561e6a22da88394f132a25465955 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Tue, 28 Apr 2015 19:08:09 +0200 Subject: [PATCH] test: add a test that verifies that we only execute watch records that have the await_exection state during the Watcher start phase Original commit: elastic/x-pack-elasticsearch@62ffe1c9848150741c861e68d1cc93f5f59e946e --- .../test/integration/BootStrapTests.java | 91 +++++++++++++++++-- 1 file changed, 82 insertions(+), 9 deletions(-) diff --git a/src/test/java/org/elasticsearch/watcher/test/integration/BootStrapTests.java b/src/test/java/org/elasticsearch/watcher/test/integration/BootStrapTests.java index 3d07296aad3..361012c0254 100644 --- a/src/test/java/org/elasticsearch/watcher/test/integration/BootStrapTests.java +++ b/src/test/java/org/elasticsearch/watcher/test/integration/BootStrapTests.java @@ -43,7 +43,6 @@ import static org.elasticsearch.watcher.test.WatcherTestUtils.newInputSearchRequ import static org.elasticsearch.watcher.transform.TransformBuilders.searchTransform; import static org.elasticsearch.watcher.trigger.TriggerBuilders.schedule; import static org.elasticsearch.watcher.trigger.schedule.Schedules.cron; -import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.is; import static org.hamcrest.core.IsEqual.equalTo; @@ -53,6 +52,7 @@ public class BootStrapTests extends AbstractWatcherIntegrationTests { @Override protected boolean timeWarped() { + // timewarping isn't necessary here, because we aren't testing triggering or throttling return false; } @@ -84,6 +84,7 @@ public class BootStrapTests extends AbstractWatcherIntegrationTests { @Test @TestLogging("watcher.actions:DEBUG") public void testWatchRecordLoading() throws Exception { + createIndex("output"); WatcherStatsResponse response = watcherClient().prepareWatcherStats().get(); assertThat(response.getWatchServiceState(), equalTo(WatcherService.State.STARTED)); assertThat(response.getWatchesCount(), equalTo(0L)); @@ -99,7 +100,7 @@ public class BootStrapTests extends AbstractWatcherIntegrationTests { ).get(); DateTime now = DateTime.now(UTC); - int numRecords = scaledRandomIntBetween(2, 128); + final int numRecords = scaledRandomIntBetween(2, 128); for (int i = 0; i < numRecords; i++) { now = now.plusMinutes(1); ScheduleTriggerEvent event = new ScheduleTriggerEvent(watchId, now, now); @@ -115,22 +116,94 @@ public class BootStrapTests extends AbstractWatcherIntegrationTests { stopWatcher(); startWatcher(); - // We need to wait until all the records are processed from the internal execution queue, only then we can assert - // that numRecords watch records have been processed as part of starting up. assertBusy(new Runnable() { @Override public void run() { + // We need to wait until all the records are processed from the internal execution queue, only then we can assert + // that numRecords watch records have been processed as part of starting up. WatcherStatsResponse response = watcherClient().prepareWatcherStats().get(); assertThat(response.getWatchServiceState(), equalTo(WatcherService.State.STARTED)); - assertThat(response.getWatchesCount(), equalTo(1L)); - assertThat(response.getWatchExecutionQueueMaxSize(), greaterThanOrEqualTo(1l)); assertThat(response.getExecutionQueueSize(), equalTo(0l)); + + // but even then since the execution of the watch record is async it may take a little bit before + // the actual documents are in the output index + refresh(); + SearchResponse searchResponse = client().prepareSearch("output").get(); + assertHitCount(searchResponse, numRecords); + } + }); + } + + @Test + public void testMixedWatchRecordLoading() throws Exception { + createIndex("output"); + WatcherStatsResponse response = watcherClient().prepareWatcherStats().get(); + assertThat(response.getWatchServiceState(), equalTo(WatcherService.State.STARTED)); + assertThat(response.getWatchesCount(), equalTo(0L)); + + String watchId = "_id"; + SearchRequest searchRequest = newInputSearchRequest("my-index").source(searchSource().query(termQuery("field", "value"))); + watcherClient().preparePutWatch(watchId).setSource(watchBuilder() + .trigger(schedule(cron("0/5 * * * * ? 2050"))) + .input(searchInput(searchRequest)) + .condition(alwaysCondition()) + .addAction("_id", indexAction("output", "test")) + .throttlePeriod(TimeValue.timeValueMillis(0)) + ).get(); + + DateTime now = DateTime.now(UTC); + int numRecords = scaledRandomIntBetween(2, 128); + int awaitsExecution = 0; + for (int i = 0; i < numRecords; i++) { + now = now.plusMinutes(1); + ScheduleTriggerEvent event = new ScheduleTriggerEvent(watchId, now, now); + Wid wid = new Wid(watchId, randomLong(), now); + WatchRecord watchRecord = new WatchRecord(wid, watchService().getWatch(watchId), event); + String index = HistoryStore.getHistoryIndexNameForTime(now); + client().prepareIndex(index, HistoryStore.DOC_TYPE, watchRecord.id().value()) + .setSource(jsonBuilder().value(watchRecord)) + .setConsistencyLevel(WriteConsistencyLevel.ALL) + .get(); + + final WatchRecord.State state; + if (i == 0) { + // at least have one record that we need to execute (otherwise the output index doesn't exist) + awaitsExecution++; + continue; + } else { + // update to a random state: + state = randomFrom(WatchRecord.State.AWAITS_EXECUTION, WatchRecord.State.CHECKING, WatchRecord.State.EXECUTION_NOT_NEEDED, WatchRecord.State.EXECUTED); + } + client().prepareUpdate(index, HistoryStore.DOC_TYPE, watchRecord.id().value()) + .setDoc(WatchRecord.Parser.STATE_FIELD.getPreferredName(), state.id()) + .get(); + if (state == WatchRecord.State.AWAITS_EXECUTION) { + awaitsExecution++; + } + } + + stopWatcher(); + startWatcher(); + + final int finalAwaitsExecution = awaitsExecution; + assertBusy(new Runnable() { + + @Override + public void run() { + // We need to wait until all the records are processed from the internal execution queue, only then we can assert + // that numRecords watch records have been processed as part of starting up. + WatcherStatsResponse response = watcherClient().prepareWatcherStats().get(); + assertThat(response.getWatchServiceState(), equalTo(WatcherService.State.STARTED)); + assertThat(response.getExecutionQueueSize(), equalTo(0l)); + + // but even then since the execution of the watch record is async it may take a little bit before + // the actual documents are in the output index + refresh(); + SearchResponse searchResponse = client().prepareSearch("output").get(); + assertHitCount(searchResponse, finalAwaitsExecution); } }); - refresh(); - SearchResponse searchResponse = client().prepareSearch("output").get(); - assertHitCount(searchResponse, numRecords); } @Test