Persist the Watch.Status if needed after execution

This change persists the Watch.Status if needed by marking the Status as `dirty` if the status mutates during watch execution.
If the status is dirty it will be persisted at the end of execution while the execution lock is held.
Change record_in_history to record_execution which also controls weither the status will be updated.

Fixes elastic/elasticsearch#222

Original commit: elastic/x-pack-elasticsearch@25869cabf0
This commit is contained in:
Brian Murphy 2015-04-15 21:14:21 -04:00
parent 44a3f600ab
commit 0a7cf71152
17 changed files with 193 additions and 73 deletions

View File

@ -68,8 +68,8 @@
"trigger_data" : {
"scheduled_time" : "now"
},
"record_in_history" : true
}
"record_execution" : true
}
- match: { "watch_id": "my_exe_watch" }
- match: { "watch_execution.condition_result.always_true": {} }
- match: { "state": "executed" }

View File

@ -115,6 +115,7 @@ public class ExecutionService extends AbstractComponent {
public WatchRecord execute(WatchExecutionContext ctx) throws IOException {
WatchRecord watchRecord = new WatchRecord(ctx.id(), ctx.watch(), ctx.triggerEvent());
WatchLockService.Lock lock = watchLockService.acquire(ctx.watch().name());
try {
WatchExecution execution = executeInner(ctx);
@ -122,9 +123,10 @@ public class ExecutionService extends AbstractComponent {
} finally {
lock.release();
}
if (ctx.recordInHistory()) {
if (ctx.recordExecution()) {
historyStore.put(watchRecord);
}
watchStore.updateStatus(ctx.watch());
return watchRecord;
}
@ -155,7 +157,7 @@ public class ExecutionService extends AbstractComponent {
}
TriggeredExecutionContext ctx = new TriggeredExecutionContext(watch, clock.now(), event);
WatchRecord watchRecord = new WatchRecord(ctx.id(), watch, event);
if (ctx.recordInHistory()) {
if (ctx.recordExecution()) {
logger.debug("saving watch record [{}] for watch [{}]", watchRecord.id(), watch.name());
historyStore.put(watchRecord);
}
@ -198,7 +200,6 @@ public class ExecutionService extends AbstractComponent {
}
}
return ctx.finish();
}
void executeRecords(Collection<WatchRecord> records) {
@ -267,15 +268,16 @@ public class ExecutionService extends AbstractComponent {
logger.debug("checking watch [{}]", watchRecord.name());
WatchExecution execution = executeInner(ctx);
watchRecord.seal(execution);
if (ctx.recordInHistory()) {
if (ctx.recordExecution()) {
historyStore.update(watchRecord);
}
watchStore.updateStatus(ctx.watch());
} catch (Exception e) {
if (started()) {
logger.warn("failed to execute watch [{}] [{}]", e, watchRecord.name(), ctx.id());
try {
watchRecord.update(WatchRecord.State.FAILED, e.getMessage());
if (ctx.recordInHistory()) {
if (ctx.recordExecution()) {
historyStore.update(watchRecord);
}
} catch (Exception e2) {

View File

@ -24,12 +24,12 @@ import java.util.Set;
public class ManualExecutionContext extends WatchExecutionContext {
private final Predicate<String> simulateActionPredicate;
private final boolean recordInHistory;
private final boolean recordExecution;
ManualExecutionContext(Watch watch, DateTime executionTime, ManualTriggerEvent triggerEvent,
Input.Result inputResult, Condition.Result conditionResult,
Throttler.Result throttlerResult, Predicate<String> simulateActionPredicate,
boolean recordInHistory) {
boolean recordExecution) {
super(watch, executionTime, triggerEvent);
if (inputResult != null) {
onInputResult(inputResult);
@ -41,7 +41,7 @@ public class ManualExecutionContext extends WatchExecutionContext {
onThrottleResult(throttlerResult);
}
this.simulateActionPredicate = simulateActionPredicate;
this.recordInHistory = recordInHistory;
this.recordExecution = recordExecution;
}
@Override
@ -50,8 +50,8 @@ public class ManualExecutionContext extends WatchExecutionContext {
}
@Override
public final boolean recordInHistory() {
return recordInHistory;
public final boolean recordExecution() {
return recordExecution;
}
public static Builder builder(Watch watch) {
@ -63,7 +63,7 @@ public class ManualExecutionContext extends WatchExecutionContext {
private final Watch watch;
protected DateTime executionTime;
private boolean recordInHistory = false;
private boolean recordExecution = false;
private Predicate<String> simulateActionPredicate = Predicates.alwaysFalse();
private Input.Result inputResult;
private Condition.Result conditionResult;
@ -79,8 +79,8 @@ public class ManualExecutionContext extends WatchExecutionContext {
return this;
}
public Builder recordInHistory(boolean recordInHistory) {
this.recordInHistory = recordInHistory;
public Builder recordExecution(boolean recordExecution) {
this.recordExecution = recordExecution;
return this;
}
@ -114,6 +114,7 @@ public class ManualExecutionContext extends WatchExecutionContext {
return this;
}
public ManualExecutionContext build() {
if (executionTime == null) {
executionTime = DateTime.now(DateTimeZone.UTC);
@ -121,7 +122,7 @@ public class ManualExecutionContext extends WatchExecutionContext {
if (triggerEvent == null) {
triggerEvent = new ManualTriggerEvent(executionTime, new HashMap<String, Object>());
}
return new ManualExecutionContext(watch, executionTime, triggerEvent, inputResult, conditionResult, throttlerResult, simulateActionPredicate, recordInHistory);
return new ManualExecutionContext(watch, executionTime, triggerEvent, inputResult, conditionResult, throttlerResult, simulateActionPredicate, recordExecution);
}
}

View File

@ -23,8 +23,7 @@ public class TriggeredExecutionContext extends WatchExecutionContext {
}
@Override
final public boolean recordInHistory() {
final public boolean recordExecution() {
return true;
}
}

View File

@ -53,7 +53,7 @@ public abstract class WatchExecutionContext {
/**
* @return true if this execution should be recorded in the .watch_history index
*/
public abstract boolean recordInHistory();
public abstract boolean recordExecution();
public Wid id() {
return id;
@ -85,8 +85,11 @@ public abstract class WatchExecutionContext {
}
public void onConditionResult(Condition.Result conditionResult) {
watch.status().onCheck(conditionResult.met(), executionTime);
this.conditionResult = conditionResult;
if (recordExecution()) {
watch.status().onCheck(conditionResult.met(), executionTime);
}
}
public Condition.Result conditionResult() {
@ -95,10 +98,12 @@ public abstract class WatchExecutionContext {
public void onThrottleResult(Throttler.Result throttleResult) {
this.throttleResult = throttleResult;
if (throttleResult.throttle()) {
watch.status().onThrottle(executionTime, throttleResult.reason());
} else {
watch.status().onExecution(executionTime);
if (recordExecution()) {
if (throttleResult.throttle()) {
watch.status().onThrottle(executionTime, throttleResult.reason());
} else {
watch.status().onExecution(executionTime);
}
}
}

View File

@ -84,7 +84,7 @@ public class HttpInput extends Input<HttpInput.Result> {
for (String extractKey : extractKeys) {
builder.value(extractKey);
}
builder.endObject();
builder.endArray();
}
return builder.endObject();
}

View File

@ -108,7 +108,7 @@ public class SearchInput extends Input<SearchInput.Result> {
for (String extractKey : extractKeys) {
builder.value(extractKey);
}
builder.endObject();
builder.endArray();
}
builder.endObject();
return builder;

View File

@ -27,7 +27,7 @@ import java.io.IOException;
*/
public class RestExecuteWatchAction extends WatcherRestHandler {
static ParseField RECORD_IN_HISTORY_FIELD = new ParseField("record_in_history");
static ParseField RECORD_EXECUTION_FIELD = new ParseField("record_execution");
static ParseField SIMULATED_ACTIONS_FIELD = new ParseField("simulated_actions");
static ParseField ALTERNATIVE_INPUT_FIELD = new ParseField("alternative_input");
static ParseField IGNORE_CONDITION_FIELD = new ParseField("ignore_condition");
@ -78,8 +78,8 @@ public class RestExecuteWatchAction extends WatcherRestHandler {
executeWatchRequestBuilder.setIgnoreCondition(parser.booleanValue());
} else if (IGNORE_THROTTLE_FIELD.match(currentFieldName)) {
executeWatchRequestBuilder.setIgnoreThrottle(parser.booleanValue());
} else if (RECORD_IN_HISTORY_FIELD.match(currentFieldName)) {
executeWatchRequestBuilder.setRecordInHistory(parser.booleanValue());
} else if (RECORD_EXECUTION_FIELD.match(currentFieldName)) {
executeWatchRequestBuilder.setRecordExecution(parser.booleanValue());
} else {
throw new ParseException("invalid watch execution request, unexpected boolean value field [" + currentFieldName + "]");
}

View File

@ -25,7 +25,7 @@ public class ExecuteWatchRequest extends MasterNodeOperationRequest<ExecuteWatch
private String id;
private boolean ignoreCondition = true;
private boolean ignoreThrottle = false;
private boolean recordInHistory = false;
private boolean recordExecution = false;
private Map<String, Object> alternativeInput = null;
private Map<String, Object> triggerData = null;
private Set<String> simulatedActionIds = new HashSet<>();
@ -85,15 +85,15 @@ public class ExecuteWatchRequest extends MasterNodeOperationRequest<ExecuteWatch
/**
* @return Should this execution be recorded in the history index
*/
public boolean isRecordInHistory() {
return recordInHistory;
public boolean isRecordExecution() {
return recordExecution;
}
/**
* @param recordInHistory Sets if this execution be recorded in the history index
* @param recordExecution Sets if this execution be recorded in the history index
*/
public void setRecordInHistory(boolean recordInHistory) {
this.recordInHistory = recordInHistory;
public void setRecordExecution(boolean recordExecution) {
this.recordExecution = recordExecution;
}
/**
@ -162,7 +162,7 @@ public class ExecuteWatchRequest extends MasterNodeOperationRequest<ExecuteWatch
id = in.readString();
ignoreCondition = in.readBoolean();
ignoreThrottle = in.readBoolean();
recordInHistory = in.readBoolean();
recordExecution = in.readBoolean();
if (in.readBoolean()){
alternativeInput = in.readMap();
}
@ -182,7 +182,7 @@ public class ExecuteWatchRequest extends MasterNodeOperationRequest<ExecuteWatch
out.writeString(id);
out.writeBoolean(ignoreCondition);
out.writeBoolean(ignoreThrottle);
out.writeBoolean(recordInHistory);
out.writeBoolean(recordExecution);
out.writeBoolean(alternativeInput != null);
if (alternativeInput != null) {
out.writeMap(alternativeInput);

View File

@ -49,10 +49,10 @@ public class ExecuteWatchRequestBuilder extends MasterNodeOperationRequestBuilde
}
/**
* @param recordInHistory Sets if this execution be recorded in the history index
* @param recordExecution Sets if this execution be recorded in the history index and reflected in the watch
*/
public ExecuteWatchRequestBuilder setRecordInHistory(boolean recordInHistory) {
request.setRecordInHistory(recordInHistory);
public ExecuteWatchRequestBuilder setRecordExecution(boolean recordExecution) {
request.setRecordExecution(recordExecution);
return this;
}

View File

@ -99,7 +99,7 @@ public class TransportExecuteWatchAction extends WatcherTransportAction<ExecuteW
if (request.isIgnoreThrottle()) {
ctxBuilder.withThrottle(Throttler.Result.NO);
}
ctxBuilder.recordInHistory(request.isRecordInHistory());
ctxBuilder.recordExecution(request.isRecordExecution());
WatchRecord record = executionService.execute(ctxBuilder.build());
XContentBuilder builder = XContentFactory.jsonBuilder();

View File

@ -136,6 +136,7 @@ public class Watch implements TriggerEngine.Job, ToXContent {
return nonceCounter.getAndIncrement();
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
@ -296,6 +297,8 @@ public class Watch implements TriggerEngine.Job, ToXContent {
private DateTime lastExecuted;
private AckStatus ackStatus;
private volatile boolean dirty = false;
public Status() {
this(-1, null, null, null, null, new AckStatus());
}
@ -353,6 +356,21 @@ public class Watch implements TriggerEngine.Job, ToXContent {
return ackStatus;
}
/**
* @param dirty if true this Watch.Status has been modified since it was read, if false we just wrote the updated watch
*/
public void dirty(boolean dirty) {
this.dirty = dirty;
}
/**
* @return does this Watch.Status needs to be persisted to the index
*/
public boolean dirty() {
return dirty;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
@ -395,9 +413,11 @@ public class Watch implements TriggerEngine.Job, ToXContent {
lastChecked = timestamp;
if (metCondition) {
lastMetCondition = timestamp;
dirty(true);
} else if (ackStatus.state == AckStatus.State.ACKED) {
// didn't meet condition now after it met it in the past - we need to reset the ack state
ackStatus = new AckStatus(AckStatus.State.AWAITS_EXECUTION, timestamp);
dirty(true);
}
}
@ -406,17 +426,22 @@ public class Watch implements TriggerEngine.Job, ToXContent {
*/
public void onThrottle(DateTime timestamp, String reason) {
lastThrottle = new Throttle(timestamp, reason);
dirty(true);
}
/**
* Notified this status that the watch was executed. If the current state is {@link Watch.Status.AckStatus.State#AWAITS_EXECUTION}, it will change to
* {@link Watch.Status.AckStatus.State#ACKABLE}.
* @return {@code true} if the state changed due to the execution {@code false} otherwise
*/
public void onExecution(DateTime timestamp) {
public boolean onExecution(DateTime timestamp) {
lastExecuted = timestamp;
if (ackStatus.state == AckStatus.State.AWAITS_EXECUTION) {
ackStatus = new AckStatus(AckStatus.State.ACKABLE, timestamp);
dirty(true);
return true;
}
return false;
}
/**
@ -429,6 +454,7 @@ public class Watch implements TriggerEngine.Job, ToXContent {
boolean onAck(DateTime timestamp) {
if (ackStatus.state == AckStatus.State.ACKABLE) {
ackStatus = new AckStatus(AckStatus.State.ACKED, timestamp);
dirty(true);
return true;
}
return false;

View File

@ -149,11 +149,13 @@ public class WatchStore extends AbstractComponent {
/**
* Updates and persists the status of the given watch
*/
void updateStatus(Watch watch) throws IOException {
public void updateStatus(Watch watch) throws IOException {
// at the moment we store the status together with the watch,
// so we just need to update the watch itself
// TODO: consider storing the status in a different documment (watch_status doc) (must smaller docs... faster for frequent updates)
update(watch);
if (watch.status().dirty()) {
update(watch);
}
}
/**
@ -165,6 +167,7 @@ public class WatchStore extends AbstractComponent {
BytesReference source = JsonXContent.contentBuilder().value(watch).bytes();
IndexResponse response = client.index(createIndexRequest(watch.name(), source));
watch.status().version(response.getVersion());
watch.status().dirty(false);
// Don't need to update the watches, since we are working on an instance from it.
}

View File

@ -6,7 +6,6 @@
package org.elasticsearch.watcher.execution;
import com.carrotsearch.randomizedtesting.annotations.Repeat;
import com.carrotsearch.randomizedtesting.annotations.Seed;
import org.elasticsearch.action.count.CountRequest;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.watcher.actions.logging.LoggingAction;
@ -44,9 +43,8 @@ public class ManualExecutionTests extends AbstractWatcherIntegrationTests {
public void testExecuteWatch() throws Exception {
ensureWatcherStarted();
boolean ignoreCondition = randomBoolean();
boolean persistRecord = randomBoolean();
boolean recordExecution = randomBoolean();
boolean conditionAlwaysTrue = randomBoolean();
boolean storeWatch = randomBoolean();
String actionIdToSimulate = randomFrom("_all", "log", null);
LoggingAction.Builder loggingAction = LoggingAction.builder(new Template("foobar"));
@ -56,22 +54,26 @@ public class ManualExecutionTests extends AbstractWatcherIntegrationTests {
.condition(conditionAlwaysTrue ? alwaysTrueCondition() : alwaysFalseCondition())
.addAction("log", loggingAction);
if (storeWatch) {
PutWatchResponse putWatchResponse = watcherClient().putWatch(new PutWatchRequest("testrun", testWatchBuilder)).actionGet();
ManualExecutionContext.Builder ctxBuilder;
Watch parsedWatch = null;
if (recordExecution) {
PutWatchResponse putWatchResponse = watcherClient().putWatch(new PutWatchRequest("testwatch", testWatchBuilder)).actionGet();
assertThat(putWatchResponse.getVersion(), greaterThan(0L));
refresh();
assertThat(watcherClient().getWatch(new GetWatchRequest("testrun")).actionGet().isFound(), equalTo(true));
assertThat(watcherClient().getWatch(new GetWatchRequest("testwatch")).actionGet().isFound(), equalTo(true));
ctxBuilder = ManualExecutionContext.builder(watchService().getWatch("testwatch")); //If we are persisting the state we need to use the exact watch that is in memory
} else {
parsedWatch = watchParser().parse("testwatch", false, testWatchBuilder.buildAsBytes(XContentType.JSON));
ctxBuilder = ManualExecutionContext.builder(parsedWatch);
}
Watch testWatch = watchParser().parse("testwatch", false, testWatchBuilder.buildAsBytes(XContentType.JSON));
ManualExecutionContext.Builder ctxBuilder = ManualExecutionContext.builder(testWatch);
if (ignoreCondition) {
ctxBuilder.withCondition(AlwaysTrueCondition.RESULT);
}
ctxBuilder.recordInHistory(persistRecord);
ctxBuilder.recordExecution(recordExecution);
if (actionIdToSimulate != null) {
if ("_all".equals(actionIdToSimulate)) {
@ -89,7 +91,7 @@ public class ManualExecutionTests extends AbstractWatcherIntegrationTests {
refresh();
long newRecordCount = client().count(new CountRequest(HistoryStore.INDEX_PREFIX + "*")).actionGet().getCount();
long expectedCount = oldRecordCount + (persistRecord ? 1 : 0);
long expectedCount = oldRecordCount + (recordExecution ? 1 : 0);
assertThat("the expected count of history records should be [" + expectedCount + "]", newRecordCount, equalTo(expectedCount));
if (ignoreCondition) {
@ -106,5 +108,20 @@ public class ManualExecutionTests extends AbstractWatcherIntegrationTests {
if ((ignoreCondition || conditionAlwaysTrue) && actionIdToSimulate != null ) {
assertThat("The action should have run simulated", watchRecord.execution().actionsResults().get("log").action(), instanceOf(LoggingAction.Result.Simulated.class));
}
Watch testWatch = watchService().getWatch("testwatch");
if (recordExecution) {
refresh();
Watch persistedWatch = watchParser().parse("testwatch", true, watcherClient().getWatch(new GetWatchRequest("testwatch")).actionGet().getSource());
if (ignoreCondition || conditionAlwaysTrue) {
assertThat(testWatch.status().ackStatus().state(), equalTo(Watch.Status.AckStatus.State.ACKABLE));
assertThat(persistedWatch.status().ackStatus().state(), equalTo(Watch.Status.AckStatus.State.ACKABLE));
} else {
assertThat(testWatch.status().ackStatus().state(), equalTo(Watch.Status.AckStatus.State.AWAITS_EXECUTION));
}
} else {
assertThat(parsedWatch.status().ackStatus().state(), equalTo(Watch.Status.AckStatus.State.AWAITS_EXECUTION));
}
}
}

View File

@ -50,7 +50,6 @@ import org.elasticsearch.watcher.support.WatcherUtils;
import org.elasticsearch.watcher.support.clock.ClockMock;
import org.elasticsearch.watcher.support.http.HttpClient;
import org.elasticsearch.watcher.support.init.proxy.ScriptServiceProxy;
import org.elasticsearch.watcher.support.template.Template;
import org.elasticsearch.watcher.transport.actions.stats.WatcherStatsResponse;
import org.elasticsearch.watcher.trigger.ScheduleTriggerEngineMock;
import org.elasticsearch.watcher.trigger.TriggerService;
@ -281,6 +280,10 @@ public abstract class AbstractWatcherIntegrationTests extends ElasticsearchInteg
return getInstanceFromMaster(ExecutionService.class);
}
protected WatchService watchService() {
return getInstanceFromMaster(WatchService.class);
}
protected TriggerService triggerService() {
return getInstanceFromMaster(TriggerService.class);
}

View File

@ -178,8 +178,4 @@ public class NoMasterNodeTests extends AbstractWatcherIntegrationTests {
ensureWatcherStarted(false);
}
private WatchService watchService() {
return getInstanceFromMaster(WatchService.class);
}
}

View File

@ -7,6 +7,8 @@ package org.elasticsearch.watcher.test.integration;
import com.carrotsearch.randomizedtesting.annotations.Repeat;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.common.joda.time.DateTime;
import org.elasticsearch.common.joda.time.DateTimeZone;
@ -16,9 +18,12 @@ import org.elasticsearch.watcher.history.HistoryStore;
import org.elasticsearch.watcher.history.WatchRecord;
import org.elasticsearch.watcher.test.AbstractWatcherIntegrationTests;
import org.elasticsearch.watcher.transport.actions.ack.AckWatchResponse;
import org.elasticsearch.watcher.transport.actions.get.GetWatchRequest;
import org.elasticsearch.watcher.transport.actions.get.GetWatchResponse;
import org.elasticsearch.watcher.transport.actions.put.PutWatchResponse;
import org.elasticsearch.watcher.watch.Watch;
import org.elasticsearch.watcher.watch.WatchStore;
import org.hamcrest.Matchers;
import org.junit.Test;
import java.util.concurrent.TimeUnit;
@ -44,14 +49,7 @@ public class WatchThrottleTests extends AbstractWatcherIntegrationTests {
@Test
public void testAckThrottle() throws Exception {
WatcherClient watcherClient = watcherClient();
createIndex("actions", "events");
ensureGreen("actions", "events");
IndexResponse eventIndexResponse = client().prepareIndex("events", "event")
.setSource("level", "error")
.get();
assertThat(eventIndexResponse.isCreated(), is(true));
refresh();
IndexResponse eventIndexResponse = indexTestDoc();
PutWatchResponse putWatchResponse = watcherClient.preparePutWatch()
.setId("_name")
@ -111,18 +109,23 @@ public class WatchThrottleTests extends AbstractWatcherIntegrationTests {
assertThat(throttledCount, greaterThan(0L));
}
@Test @Repeat(iterations = 10)
public void testTimeThrottle() throws Exception {
WatcherClient watcherClient = watcherClient();
public IndexResponse indexTestDoc() {
createIndex("actions", "events");
ensureGreen("actions", "events");
IndexResponse eventIndexResponse = client().prepareIndex("events", "event")
.setSource("level", "error")
.get();
assertTrue(eventIndexResponse.isCreated());
assertThat(eventIndexResponse.isCreated(), is(true));
refresh();
return eventIndexResponse;
}
@Test @Repeat(iterations = 10)
public void testTimeThrottle() throws Exception {
WatcherClient watcherClient = watcherClient();
indexTestDoc();
PutWatchResponse putWatchResponse = watcherClient.preparePutWatch()
.setId("_name")
@ -187,4 +190,69 @@ public class WatchThrottleTests extends AbstractWatcherIntegrationTests {
}
}
@Test
@Repeat(iterations = 2)
public void test_ack_with_restart() throws Exception {
WatcherClient watcherClient = watcherClient();
indexTestDoc();
PutWatchResponse putWatchResponse = watcherClient.preparePutWatch()
.setId("_name")
.setSource(watchBuilder()
.trigger(schedule(cron("0/5 * * * * ? *")))
.input(searchInput(matchAllRequest().indices("events")))
.condition(scriptCondition("ctx.payload.hits.total > 0"))
.transform(searchTransform(matchAllRequest().indices("events")))
.addAction("_id", indexAction("actions", "action")))
.get();
assertThat(putWatchResponse.isCreated(), is(true));
if (timeWarped()) {
timeWarp().scheduler().trigger("_name", 4, TimeValue.timeValueSeconds(5));
} else {
Thread.sleep(20000);
}
if (randomBoolean()) {
stopWatcher();
ensureWatcherStopped();
startWatcher();
ensureWatcherStarted();
}
AckWatchResponse ackResponse = watcherClient.prepareAckWatch("_name").get();
assertThat(ackResponse.getStatus().ackStatus().state(), is(Watch.Status.AckStatus.State.ACKED));
refresh();
long countAfterAck = docCount("actions", "action", matchAllQuery());
assertThat(countAfterAck, greaterThanOrEqualTo((long) 1));
if (randomBoolean()) {
stopWatcher();
ensureWatcherStopped();
startWatcher();
ensureWatcherStarted();
}
GetWatchResponse watchResponse = watcherClient.getWatch(new GetWatchRequest("_name")).actionGet();
Watch watch = watchParser().parse("_name", true, watchResponse.getSource());
assertThat(watch.status().ackStatus().state(), Matchers.equalTo(Watch.Status.AckStatus.State.ACKED));
refresh();
GetResponse getResponse = client().get(new GetRequest(WatchStore.INDEX, WatchStore.DOC_TYPE, "_name")).actionGet();
Watch indexedWatch = watchParser().parse("_name", true, getResponse.getSourceAsBytesRef());
assertThat(watch.status().ackStatus().state(), Matchers.equalTo(indexedWatch.status().ackStatus().state()));
if (timeWarped()) {
timeWarp().scheduler().trigger("_name", 4, TimeValue.timeValueSeconds(5));
} else {
Thread.sleep(20000);
}
refresh();
// There shouldn't be more actions in the index after we ack the watch, even though the watch was triggered
long countAfterPostAckFires = docCount("actions", "action", matchAllQuery());
assertThat(countAfterPostAckFires, equalTo(countAfterAck));
}
}