test: add a test that verifies that we only execute watch records that have the await_exection state during the Watcher start phase
Original commit: elastic/x-pack-elasticsearch@62ffe1c984
This commit is contained in:
parent
4ef30822ea
commit
2f18dd8269
|
@ -43,7 +43,6 @@ import static org.elasticsearch.watcher.test.WatcherTestUtils.newInputSearchRequ
|
||||||
import static org.elasticsearch.watcher.transform.TransformBuilders.searchTransform;
|
import static org.elasticsearch.watcher.transform.TransformBuilders.searchTransform;
|
||||||
import static org.elasticsearch.watcher.trigger.TriggerBuilders.schedule;
|
import static org.elasticsearch.watcher.trigger.TriggerBuilders.schedule;
|
||||||
import static org.elasticsearch.watcher.trigger.schedule.Schedules.cron;
|
import static org.elasticsearch.watcher.trigger.schedule.Schedules.cron;
|
||||||
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
|
|
||||||
import static org.hamcrest.Matchers.is;
|
import static org.hamcrest.Matchers.is;
|
||||||
import static org.hamcrest.core.IsEqual.equalTo;
|
import static org.hamcrest.core.IsEqual.equalTo;
|
||||||
|
|
||||||
|
@ -53,6 +52,7 @@ public class BootStrapTests extends AbstractWatcherIntegrationTests {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected boolean timeWarped() {
|
protected boolean timeWarped() {
|
||||||
|
// timewarping isn't necessary here, because we aren't testing triggering or throttling
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -84,6 +84,7 @@ public class BootStrapTests extends AbstractWatcherIntegrationTests {
|
||||||
@Test
|
@Test
|
||||||
@TestLogging("watcher.actions:DEBUG")
|
@TestLogging("watcher.actions:DEBUG")
|
||||||
public void testWatchRecordLoading() throws Exception {
|
public void testWatchRecordLoading() throws Exception {
|
||||||
|
createIndex("output");
|
||||||
WatcherStatsResponse response = watcherClient().prepareWatcherStats().get();
|
WatcherStatsResponse response = watcherClient().prepareWatcherStats().get();
|
||||||
assertThat(response.getWatchServiceState(), equalTo(WatcherService.State.STARTED));
|
assertThat(response.getWatchServiceState(), equalTo(WatcherService.State.STARTED));
|
||||||
assertThat(response.getWatchesCount(), equalTo(0L));
|
assertThat(response.getWatchesCount(), equalTo(0L));
|
||||||
|
@ -99,7 +100,7 @@ public class BootStrapTests extends AbstractWatcherIntegrationTests {
|
||||||
).get();
|
).get();
|
||||||
|
|
||||||
DateTime now = DateTime.now(UTC);
|
DateTime now = DateTime.now(UTC);
|
||||||
int numRecords = scaledRandomIntBetween(2, 128);
|
final int numRecords = scaledRandomIntBetween(2, 128);
|
||||||
for (int i = 0; i < numRecords; i++) {
|
for (int i = 0; i < numRecords; i++) {
|
||||||
now = now.plusMinutes(1);
|
now = now.plusMinutes(1);
|
||||||
ScheduleTriggerEvent event = new ScheduleTriggerEvent(watchId, now, now);
|
ScheduleTriggerEvent event = new ScheduleTriggerEvent(watchId, now, now);
|
||||||
|
@ -115,22 +116,94 @@ public class BootStrapTests extends AbstractWatcherIntegrationTests {
|
||||||
stopWatcher();
|
stopWatcher();
|
||||||
startWatcher();
|
startWatcher();
|
||||||
|
|
||||||
// We need to wait until all the records are processed from the internal execution queue, only then we can assert
|
|
||||||
// that numRecords watch records have been processed as part of starting up.
|
|
||||||
assertBusy(new Runnable() {
|
assertBusy(new Runnable() {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
|
// We need to wait until all the records are processed from the internal execution queue, only then we can assert
|
||||||
|
// that numRecords watch records have been processed as part of starting up.
|
||||||
WatcherStatsResponse response = watcherClient().prepareWatcherStats().get();
|
WatcherStatsResponse response = watcherClient().prepareWatcherStats().get();
|
||||||
assertThat(response.getWatchServiceState(), equalTo(WatcherService.State.STARTED));
|
assertThat(response.getWatchServiceState(), equalTo(WatcherService.State.STARTED));
|
||||||
assertThat(response.getWatchesCount(), equalTo(1L));
|
|
||||||
assertThat(response.getWatchExecutionQueueMaxSize(), greaterThanOrEqualTo(1l));
|
|
||||||
assertThat(response.getExecutionQueueSize(), equalTo(0l));
|
assertThat(response.getExecutionQueueSize(), equalTo(0l));
|
||||||
|
|
||||||
|
// but even then since the execution of the watch record is async it may take a little bit before
|
||||||
|
// the actual documents are in the output index
|
||||||
|
refresh();
|
||||||
|
SearchResponse searchResponse = client().prepareSearch("output").get();
|
||||||
|
assertHitCount(searchResponse, numRecords);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMixedWatchRecordLoading() throws Exception {
|
||||||
|
createIndex("output");
|
||||||
|
WatcherStatsResponse response = watcherClient().prepareWatcherStats().get();
|
||||||
|
assertThat(response.getWatchServiceState(), equalTo(WatcherService.State.STARTED));
|
||||||
|
assertThat(response.getWatchesCount(), equalTo(0L));
|
||||||
|
|
||||||
|
String watchId = "_id";
|
||||||
|
SearchRequest searchRequest = newInputSearchRequest("my-index").source(searchSource().query(termQuery("field", "value")));
|
||||||
|
watcherClient().preparePutWatch(watchId).setSource(watchBuilder()
|
||||||
|
.trigger(schedule(cron("0/5 * * * * ? 2050")))
|
||||||
|
.input(searchInput(searchRequest))
|
||||||
|
.condition(alwaysCondition())
|
||||||
|
.addAction("_id", indexAction("output", "test"))
|
||||||
|
.throttlePeriod(TimeValue.timeValueMillis(0))
|
||||||
|
).get();
|
||||||
|
|
||||||
|
DateTime now = DateTime.now(UTC);
|
||||||
|
int numRecords = scaledRandomIntBetween(2, 128);
|
||||||
|
int awaitsExecution = 0;
|
||||||
|
for (int i = 0; i < numRecords; i++) {
|
||||||
|
now = now.plusMinutes(1);
|
||||||
|
ScheduleTriggerEvent event = new ScheduleTriggerEvent(watchId, now, now);
|
||||||
|
Wid wid = new Wid(watchId, randomLong(), now);
|
||||||
|
WatchRecord watchRecord = new WatchRecord(wid, watchService().getWatch(watchId), event);
|
||||||
|
String index = HistoryStore.getHistoryIndexNameForTime(now);
|
||||||
|
client().prepareIndex(index, HistoryStore.DOC_TYPE, watchRecord.id().value())
|
||||||
|
.setSource(jsonBuilder().value(watchRecord))
|
||||||
|
.setConsistencyLevel(WriteConsistencyLevel.ALL)
|
||||||
|
.get();
|
||||||
|
|
||||||
|
final WatchRecord.State state;
|
||||||
|
if (i == 0) {
|
||||||
|
// at least have one record that we need to execute (otherwise the output index doesn't exist)
|
||||||
|
awaitsExecution++;
|
||||||
|
continue;
|
||||||
|
} else {
|
||||||
|
// update to a random state:
|
||||||
|
state = randomFrom(WatchRecord.State.AWAITS_EXECUTION, WatchRecord.State.CHECKING, WatchRecord.State.EXECUTION_NOT_NEEDED, WatchRecord.State.EXECUTED);
|
||||||
|
}
|
||||||
|
client().prepareUpdate(index, HistoryStore.DOC_TYPE, watchRecord.id().value())
|
||||||
|
.setDoc(WatchRecord.Parser.STATE_FIELD.getPreferredName(), state.id())
|
||||||
|
.get();
|
||||||
|
if (state == WatchRecord.State.AWAITS_EXECUTION) {
|
||||||
|
awaitsExecution++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
stopWatcher();
|
||||||
|
startWatcher();
|
||||||
|
|
||||||
|
final int finalAwaitsExecution = awaitsExecution;
|
||||||
|
assertBusy(new Runnable() {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
// We need to wait until all the records are processed from the internal execution queue, only then we can assert
|
||||||
|
// that numRecords watch records have been processed as part of starting up.
|
||||||
|
WatcherStatsResponse response = watcherClient().prepareWatcherStats().get();
|
||||||
|
assertThat(response.getWatchServiceState(), equalTo(WatcherService.State.STARTED));
|
||||||
|
assertThat(response.getExecutionQueueSize(), equalTo(0l));
|
||||||
|
|
||||||
|
// but even then since the execution of the watch record is async it may take a little bit before
|
||||||
|
// the actual documents are in the output index
|
||||||
|
refresh();
|
||||||
|
SearchResponse searchResponse = client().prepareSearch("output").get();
|
||||||
|
assertHitCount(searchResponse, finalAwaitsExecution);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
refresh();
|
|
||||||
SearchResponse searchResponse = client().prepareSearch("output").get();
|
|
||||||
assertHitCount(searchResponse, numRecords);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|
Loading…
Reference in New Issue