execution service: Ignore watch records that where deleted while awaiting for execution.

Closes elastic/elasticsearch#268

Original commit: elastic/x-pack-elasticsearch@dbb7160a67
This commit is contained in:
Martijn van Groningen 2015-04-29 12:00:47 +02:00
parent 42d49d54ef
commit e438c51555
3 changed files with 47 additions and 6 deletions

View File

@ -279,12 +279,14 @@ public class ExecutionService extends AbstractComponent {
for (WatchRecord record : records) {
Watch watch = watchStore.get(record.name());
if (watch == null) {
logger.warn("unable to find watch [{}]/[{}] in watch store. perhaps it has been deleted. skipping...", record.name(), record.id());
continue;
String message = "unable to find watch for record [" + record. name()+ "]/[" + record.id() + "], perhaps it has been deleted, ignoring...";
record.update(WatchRecord.State.DELETED_WHILE_QUEUED, message);
historyStore.update(record);
} else {
TriggeredExecutionContext ctx = new TriggeredExecutionContext(watch, clock.now(UTC), record.triggerEvent());
executeAsync(ctx, record);
counter++;
}
TriggeredExecutionContext ctx = new TriggeredExecutionContext(watch, clock.now(UTC), record.triggerEvent());
executeAsync(ctx, record);
counter++;
}
logger.debug("executed [{}] watches from the watch history", counter);
}

View File

@ -183,7 +183,8 @@ public class WatchRecord implements ToXContent {
EXECUTION_NOT_NEEDED,
THROTTLED,
EXECUTED,
FAILED;
FAILED,
DELETED_WHILE_QUEUED;
public String id() {
return name().toLowerCase(Locale.ROOT);

View File

@ -17,6 +17,8 @@ import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.watcher.WatcherService;
import org.elasticsearch.watcher.client.WatchSourceBuilder;
import org.elasticsearch.watcher.condition.Condition;
import org.elasticsearch.watcher.condition.always.AlwaysCondition;
import org.elasticsearch.watcher.execution.Wid;
import org.elasticsearch.watcher.history.HistoryStore;
import org.elasticsearch.watcher.history.WatchRecord;
@ -24,7 +26,9 @@ import org.elasticsearch.watcher.test.AbstractWatcherIntegrationTests;
import org.elasticsearch.watcher.transport.actions.put.PutWatchResponse;
import org.elasticsearch.watcher.transport.actions.stats.WatcherStatsResponse;
import org.elasticsearch.watcher.trigger.schedule.ScheduleTriggerEvent;
import org.elasticsearch.watcher.watch.Watch;
import org.elasticsearch.watcher.watch.WatchStore;
import org.hamcrest.Matchers;
import org.junit.Test;
import java.util.concurrent.TimeUnit;
@ -56,6 +60,40 @@ public class BootStrapTests extends AbstractWatcherIntegrationTests {
return false;
}
@Test
public void testDeletedWhileQueued() throws Exception {
DateTime now = DateTime.now(UTC);
Wid wid = new Wid("_id", 1, now);
ScheduleTriggerEvent event = new ScheduleTriggerEvent("_id", now, now);
Condition condition = new AlwaysCondition();
String index = HistoryStore.getHistoryIndexNameForTime(now);
client().prepareIndex(index, HistoryStore.DOC_TYPE, wid.value())
.setSource(jsonBuilder().startObject()
.field(WatchRecord.Parser.WATCH_ID_FIELD.getPreferredName(), wid.value())
.startObject(WatchRecord.Parser.TRIGGER_EVENT_FIELD.getPreferredName())
.field(event.type(), event)
.endObject()
.startObject(Watch.Parser.CONDITION_FIELD.getPreferredName())
.field(condition.type(), condition)
.endObject()
.field(WatchRecord.Parser.STATE_FIELD.getPreferredName(), WatchRecord.State.AWAITS_EXECUTION)
.endObject())
.setConsistencyLevel(WriteConsistencyLevel.ALL)
.setRefresh(true)
.get();
stopWatcher();
startWatcher();
refresh();
SearchResponse searchResponse = client().prepareSearch(index).get();
assertHitCount(searchResponse, 1);
assertThat(searchResponse.getHits().getAt(0).sourceAsMap().get(WatchRecord.Parser.WATCH_ID_FIELD.getPreferredName()).toString(), Matchers.equalTo(wid.value()));
assertThat(searchResponse.getHits().getAt(0).sourceAsMap().get(WatchRecord.Parser.STATE_FIELD.getPreferredName()).toString(), Matchers.equalTo(WatchRecord.State.DELETED_WHILE_QUEUED.toString()));
}
@Test
public void testLoadExistingWatchesUponStartup() throws Exception {
int numWatches = scaledRandomIntBetween(16, 128);