test: improved tests that verify the loading of watches and watch records at Watcher startup time

Original commit: elastic/x-pack-elasticsearch@2ee28bdf7e
This commit is contained in:
Martijn van Groningen 2015-04-28 17:34:40 +02:00
parent 29d25d4440
commit 4ef30822ea

View File

@ -8,45 +8,41 @@ package org.elasticsearch.watcher.test.integration;
import org.elasticsearch.action.WriteConsistencyLevel;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.common.joda.time.DateTime;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.watcher.actions.ActionWrapper;
import org.elasticsearch.watcher.actions.ExecutableActions;
import org.elasticsearch.watcher.condition.script.ExecutableScriptCondition;
import org.elasticsearch.watcher.condition.script.ScriptCondition;
import org.elasticsearch.watcher.WatcherService;
import org.elasticsearch.watcher.client.WatchSourceBuilder;
import org.elasticsearch.watcher.execution.Wid;
import org.elasticsearch.watcher.history.HistoryStore;
import org.elasticsearch.watcher.history.WatchRecord;
import org.elasticsearch.watcher.input.search.ExecutableSearchInput;
import org.elasticsearch.watcher.input.search.SearchInput;
import org.elasticsearch.watcher.support.Script;
import org.elasticsearch.watcher.support.clock.SystemClock;
import org.elasticsearch.watcher.support.init.proxy.ClientProxy;
import org.elasticsearch.watcher.test.AbstractWatcherIntegrationTests;
import org.elasticsearch.watcher.test.WatcherTestUtils;
import org.elasticsearch.watcher.transform.search.ExecutableSearchTransform;
import org.elasticsearch.watcher.transform.search.SearchTransform;
import org.elasticsearch.watcher.transport.actions.put.PutWatchResponse;
import org.elasticsearch.watcher.transport.actions.stats.WatcherStatsResponse;
import org.elasticsearch.watcher.trigger.schedule.CronSchedule;
import org.elasticsearch.watcher.trigger.schedule.ScheduleTrigger;
import org.elasticsearch.watcher.trigger.schedule.ScheduleTriggerEvent;
import org.elasticsearch.watcher.watch.Watch;
import org.elasticsearch.watcher.WatcherService;
import org.elasticsearch.watcher.watch.WatchStore;
import org.junit.Test;
import java.util.ArrayList;
import java.util.concurrent.TimeUnit;
import static org.elasticsearch.common.joda.time.DateTimeZone.UTC;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.index.query.QueryBuilders.termQuery;
import static org.elasticsearch.search.builder.SearchSourceBuilder.searchSource;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.elasticsearch.watcher.actions.ActionBuilders.indexAction;
import static org.elasticsearch.watcher.client.WatchSourceBuilders.watchBuilder;
import static org.elasticsearch.watcher.condition.ConditionBuilders.alwaysCondition;
import static org.elasticsearch.watcher.condition.ConditionBuilders.scriptCondition;
import static org.elasticsearch.watcher.input.InputBuilders.searchInput;
import static org.elasticsearch.watcher.test.WatcherTestUtils.newInputSearchRequest;
import static org.elasticsearch.watcher.transform.TransformBuilders.searchTransform;
import static org.elasticsearch.watcher.trigger.TriggerBuilders.schedule;
import static org.elasticsearch.watcher.trigger.schedule.Schedules.cron;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.core.IsEqual.equalTo;
@ -55,79 +51,86 @@ import static org.hamcrest.core.IsEqual.equalTo;
*/
public class BootStrapTests extends AbstractWatcherIntegrationTests {
@Override
protected boolean timeWarped() {
return false;
}
@Test
public void testBootStrapWatcher() throws Exception {
ensureWatcherStarted();
public void testLoadExistingWatchesUponStartup() throws Exception {
int numWatches = scaledRandomIntBetween(16, 128);
SearchRequest searchRequest = newInputSearchRequest("my-index").source(searchSource().query(termQuery("field", "value")));
for (int i = 0; i < numWatches; i++) {
client().prepareIndex(WatchStore.INDEX, WatchStore.DOC_TYPE, "_id" + i)
.setSource(watchBuilder()
.trigger(schedule(cron("0 0/5 * * * ? 2050")))
.input(searchInput(searchRequest))
.condition(scriptCondition("ctx.payload.hits.total == 1"))
.buildAsBytes(XContentType.JSON)
)
.setConsistencyLevel(WriteConsistencyLevel.ALL)
.get();
}
SearchRequest searchRequest = WatcherTestUtils.newInputSearchRequest("my-index").source(searchSource().query(termQuery("field", "value")));
BytesReference watchSource = createWatchSource("0 0/5 * * * ? *", searchRequest, "ctx.payload.hits.total == 1");
client().prepareIndex(WatchStore.INDEX, WatchStore.DOC_TYPE, "my-first-watch")
.setSource(watchSource)
.setConsistencyLevel(WriteConsistencyLevel.ALL)
.get();
client().admin().indices().prepareRefresh(WatchStore.INDEX).get();
refresh();
stopWatcher();
startWatcher();
WatcherStatsResponse response = watcherClient().prepareWatcherStats().get();
assertThat(response.getWatchServiceState(), equalTo(WatcherService.State.STARTED));
assertThat(response.getWatchesCount(), equalTo(1L));
assertThat(response.getWatchesCount(), equalTo((long) numWatches));
}
@Test
@TestLogging("watcher.actions:DEBUG")
public void testBootstrapHistory() throws Exception {
ensureWatcherStarted();
public void testWatchRecordLoading() throws Exception {
WatcherStatsResponse response = watcherClient().prepareWatcherStats().get();
assertThat(response.getWatchServiceState(), equalTo(WatcherService.State.STARTED));
assertThat(response.getWatchesCount(), equalTo(0L));
SearchRequest searchRequest = WatcherTestUtils.newInputSearchRequest("my-index").source(searchSource().query(termQuery("field", "value")));
Watch watch = new Watch(
"test-serialization",
SystemClock.INSTANCE,
licenseService(),
new ScheduleTrigger(new CronSchedule("0/5 * * * * ? 2035")), //Set this into the future so we don't get any extra runs
new ExecutableSearchInput(new SearchInput(searchRequest, null), logger, scriptService(), ClientProxy.of(client())),
new ExecutableScriptCondition(new ScriptCondition(new Script("return true")), logger, scriptService()),
new ExecutableSearchTransform(new SearchTransform(searchRequest), logger, scriptService(), ClientProxy.of(client())),
new ExecutableActions(new ArrayList<ActionWrapper>()),
null, // metadata
new TimeValue(0),
new Watch.Status());
XContentBuilder builder = jsonBuilder().value(watch);
IndexResponse indexResponse = client().prepareIndex(WatchStore.INDEX, WatchStore.DOC_TYPE, watch.id())
.setSource(builder).get();
ensureGreen(WatchStore.INDEX);
refresh();
assertThat(indexResponse.isCreated(), is(true));
String watchId = "_id";
SearchRequest searchRequest = newInputSearchRequest("my-index").source(searchSource().query(termQuery("field", "value")));
watcherClient().preparePutWatch(watchId).setSource(watchBuilder()
.trigger(schedule(cron("0/5 * * * * ? 2050")))
.input(searchInput(searchRequest))
.condition(alwaysCondition())
.addAction("_id", indexAction("output", "test"))
.throttlePeriod(TimeValue.timeValueMillis(0))
).get();
DateTime now = DateTime.now(UTC);
ScheduleTriggerEvent event = new ScheduleTriggerEvent(watch.id(), now, now);
Wid wid = new Wid("_record", randomLong(), DateTime.now(UTC));
WatchRecord watchRecord = new WatchRecord(wid, watch, event);
String actionHistoryIndex = HistoryStore.getHistoryIndexNameForTime(now);
createIndex(actionHistoryIndex);
ensureGreen(actionHistoryIndex);
logger.info("Created index {}", actionHistoryIndex);
indexResponse = client().prepareIndex(actionHistoryIndex, HistoryStore.DOC_TYPE, watchRecord.id().value())
.setConsistencyLevel(WriteConsistencyLevel.ALL)
.setSource(jsonBuilder().value(watchRecord))
.get();
assertThat(indexResponse.isCreated(), is(true));
int numRecords = scaledRandomIntBetween(2, 128);
for (int i = 0; i < numRecords; i++) {
now = now.plusMinutes(1);
ScheduleTriggerEvent event = new ScheduleTriggerEvent(watchId, now, now);
Wid wid = new Wid(watchId, randomLong(), now);
WatchRecord watchRecord = new WatchRecord(wid, watchService().getWatch(watchId), event);
String index = HistoryStore.getHistoryIndexNameForTime(now);
client().prepareIndex(index, HistoryStore.DOC_TYPE, watchRecord.id().value())
.setSource(jsonBuilder().value(watchRecord))
.setConsistencyLevel(WriteConsistencyLevel.ALL)
.get();
}
stopWatcher();
startWatcher();
response = watcherClient().prepareWatcherStats().get();
assertThat(response.getWatchServiceState(), equalTo(WatcherService.State.STARTED));
assertThat(response.getWatchesCount(), equalTo(1L));
assertThat(response.getWatchExecutionQueueMaxSize(), greaterThanOrEqualTo(1l));
// We need to wait until all the records are processed from the internal execution queue, only then we can assert
// that numRecords watch records have been processed as part of starting up.
assertBusy(new Runnable() {
@Override
public void run() {
WatcherStatsResponse response = watcherClient().prepareWatcherStats().get();
assertThat(response.getWatchServiceState(), equalTo(WatcherService.State.STARTED));
assertThat(response.getWatchesCount(), equalTo(1L));
assertThat(response.getWatchExecutionQueueMaxSize(), greaterThanOrEqualTo(1l));
assertThat(response.getExecutionQueueSize(), equalTo(0l));
}
});
refresh();
SearchResponse searchResponse = client().prepareSearch("output").get();
assertHitCount(searchResponse, numRecords);
}
@Test
@ -136,7 +139,7 @@ public class BootStrapTests extends AbstractWatcherIntegrationTests {
DateTime now = new DateTime(UTC);
long numberOfWatchHistoryIndices = randomIntBetween(2, 8);
long numberOfWatchRecordsPerIndex = randomIntBetween(5, 10);
SearchRequest searchRequest = WatcherTestUtils.newInputSearchRequest("my-index").source(searchSource().query(termQuery("field", "value")));
SearchRequest searchRequest = newInputSearchRequest("my-index").source(searchSource().query(termQuery("field", "value")));
for (int i = 0; i < numberOfWatchHistoryIndices; i++) {
DateTime historyIndexDate = now.minus((new TimeValue(i, TimeUnit.DAYS)).getMillis());
@ -146,28 +149,19 @@ public class BootStrapTests extends AbstractWatcherIntegrationTests {
logger.info("Created index {}", actionHistoryIndex);
for (int j = 0; j < numberOfWatchRecordsPerIndex; j++) {
String watchId = "_id" + i + "-" + j;
WatchSourceBuilder watchSource = watchBuilder()
.trigger(schedule(cron("0/5 * * * * ? 2050")))
.input(searchInput(searchRequest))
.condition(alwaysCondition())
.transform(searchTransform(searchRequest));
Watch watch = new Watch(
"action-test-" + i + " " + j,
SystemClock.INSTANCE,
licenseService(),
new ScheduleTrigger(new CronSchedule("0/5 * * * * ? 2035")), //Set a cron schedule far into the future so this watch is never scheduled
new ExecutableSearchInput(new SearchInput(searchRequest, null), logger, scriptService(), ClientProxy.of(client())),
new ExecutableScriptCondition(new ScriptCondition(new Script("return true")), logger, scriptService()),
new ExecutableSearchTransform(new SearchTransform(searchRequest), logger, scriptService(), ClientProxy.of(client())),
new ExecutableActions(new ArrayList<ActionWrapper>()),
null, // metadata
new TimeValue(0),
new Watch.Status());
XContentBuilder jsonBuilder = jsonBuilder();
watch.toXContent(jsonBuilder, ToXContent.EMPTY_PARAMS);
PutWatchResponse putWatchResponse = watcherClient().preparePutWatch(watch.id()).setSource(jsonBuilder.bytes()).get();
PutWatchResponse putWatchResponse = watcherClient().preparePutWatch(watchId).setSource(watchSource).get();
assertThat(putWatchResponse.isCreated(), is(true));
ScheduleTriggerEvent event = new ScheduleTriggerEvent(watch.id(), historyIndexDate, historyIndexDate);
Wid wid = new Wid("record_" + i, randomLong(), DateTime.now(UTC));
WatchRecord watchRecord = new WatchRecord(wid, watch, event);
ScheduleTriggerEvent event = new ScheduleTriggerEvent(watchId, historyIndexDate, historyIndexDate);
Wid wid = new Wid(watchId, randomLong(), DateTime.now(UTC));
WatchRecord watchRecord = new WatchRecord(wid, watchService().getWatch(watchId), event);
XContentBuilder jsonBuilder2 = jsonBuilder();
watchRecord.toXContent(jsonBuilder2, ToXContent.EMPTY_PARAMS);