Make sure we skip over watches and watch records that are malformed during the Watcher starting phase.

Original commit: elastic/x-pack-elasticsearch@8bc37cdcc3
This commit is contained in:
Martijn van Groningen 2015-05-02 01:15:30 +02:00
parent c8a986cab7
commit 31f039d1bd
4 changed files with 162 additions and 14 deletions

View File

@ -31,7 +31,6 @@ import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.watcher.WatcherException;
import org.elasticsearch.watcher.support.TemplateUtils; import org.elasticsearch.watcher.support.TemplateUtils;
import org.elasticsearch.watcher.support.init.proxy.ClientProxy; import org.elasticsearch.watcher.support.init.proxy.ClientProxy;
@ -283,9 +282,8 @@ public class HistoryStore extends AbstractComponent {
assert record.state() == recordState; assert record.state() == recordState;
logger.debug("loaded watch record [{}/{}/{}]", sh.index(), sh.type(), sh.id()); logger.debug("loaded watch record [{}/{}/{}]", sh.index(), sh.type(), sh.id());
records.add(record); records.add(record);
} catch (WatcherException we) { } catch (Exception e) {
logger.error("while loading records, failed to parse watch record [{}]", we, id); logger.error("couldn't load watch record [{}], ignoring it...", e, id);
throw we;
} }
} }
response = client.searchScroll(response.getScrollId(), scrollTimeout); response = client.searchScroll(response.getScrollId(), scrollTimeout);

View File

@ -10,7 +10,6 @@ import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.watcher.WatcherSettingsException;
import java.io.IOException; import java.io.IOException;
import java.util.Collection; import java.util.Collection;
@ -78,16 +77,18 @@ public class TriggerService extends AbstractComponent {
assert token == XContentParser.Token.START_OBJECT; assert token == XContentParser.Token.START_OBJECT;
token = parser.nextToken(); token = parser.nextToken();
if (token != XContentParser.Token.FIELD_NAME) { if (token != XContentParser.Token.FIELD_NAME) {
throw new WatcherSettingsException("could not parse trigger for [" + jobName + "]. expected trigger type string field, but found [" + token + "]"); throw new TriggerException("could not parse trigger for [{}]. expected trigger type string field, but found [{}]", jobName, token);
} }
String type = parser.text(); String type = parser.text();
token = parser.nextToken(); token = parser.nextToken();
if (token != XContentParser.Token.START_OBJECT) { if (token != XContentParser.Token.START_OBJECT) {
throw new WatcherSettingsException("could not parse trigger [" + type + "] for [" + jobName + "]. expected trigger an object as the trigger body, but found [" + token + "]"); throw new TriggerException("could not parse trigger [{}] for [{}]. expected trigger an object as the trigger body, but found [{}]", type, jobName, token);
} }
Trigger trigger = parseTrigger(jobName, type, parser); Trigger trigger = parseTrigger(jobName, type, parser);
token = parser.nextToken(); token = parser.nextToken();
assert token == XContentParser.Token.END_OBJECT; if (token != XContentParser.Token.END_OBJECT) {
throw new TriggerException("could not parse trigger [{}] for [{}]. expected [END_OBJECT] token, but found [{}]", type, jobName, token);
}
return trigger; return trigger;
} }
@ -107,17 +108,21 @@ public class TriggerService extends AbstractComponent {
String type = parser.text(); String type = parser.text();
token = parser.nextToken(); token = parser.nextToken();
if (token != XContentParser.Token.START_OBJECT) { if (token != XContentParser.Token.START_OBJECT) {
throw new WatcherSettingsException("could not parse trigger event [" + type + "] for [" + historyRecordId + "]. expected trigger an object as the trigger body, but found [" + token + "]"); throw new TriggerException("could not parse trigger event [" + type + "] for [" + historyRecordId + "]. expected trigger an object as the trigger body, but found [" + token + "]");
} }
TriggerEvent trigger = parseTriggerEvent(historyRecordId, type, parser); TriggerEvent trigger = parseTriggerEvent(historyRecordId, type, parser);
token = parser.nextToken(); token = parser.nextToken();
assert token == XContentParser.Token.END_OBJECT; if (token != XContentParser.Token.END_OBJECT) {
throw new TriggerException("could not parse trigger [{}] for [{}]. expected [END_OBJECT] token, but found [{}]", type, historyRecordId, token);
}
return trigger; return trigger;
} }
public TriggerEvent parseTriggerEvent(String context, String type, XContentParser parser) throws IOException { public TriggerEvent parseTriggerEvent(String context, String type, XContentParser parser) throws IOException {
TriggerEngine engine = engines.get(type); TriggerEngine engine = engines.get(type);
assert engine != null; if (engine == null) {
throw new TriggerException("Unknown trigger type [{}]", type);
}
return engine.parseTriggerEvent(context, parser); return engine.parseTriggerEvent(context, parser);
} }

View File

@ -81,6 +81,7 @@ public class WatchStore extends AbstractComponent {
} catch (Exception e) { } catch (Exception e) {
logger.debug("failed to load watches for watch index [{}]", e, INDEX); logger.debug("failed to load watches for watch index [{}]", e, INDEX);
watches.clear(); watches.clear();
throw e;
} }
} else { } else {
templateUtils.ensureIndexTemplateIsLoaded(state, INDEX_TEMPLATE); templateUtils.ensureIndexTemplateIsLoaded(state, INDEX_TEMPLATE);
@ -221,9 +222,8 @@ public class WatchStore extends AbstractComponent {
watch.status().version(hit.version()); watch.status().version(hit.version());
watches.put(name, watch); watches.put(name, watch);
count++; count++;
} catch (WatcherException we) { } catch (Exception e) {
logger.error("while loading watches, failed to parse [{}]", we, name); logger.error("couldn't load watch [{}], ignoring it...", e, name);
throw we;
} }
} }
response = client.searchScroll(response.getScrollId(), scrollTimeout); response = client.searchScroll(response.getScrollId(), scrollTimeout);

View File

@ -60,6 +60,151 @@ public class BootStrapTests extends AbstractWatcherIntegrationTests {
return false; return false;
} }
@Test
public void testLoadMalformedWatch() throws Exception {
// valid watch
client().prepareIndex(WatchStore.INDEX, WatchStore.DOC_TYPE, "_id0")
.setSource(jsonBuilder().startObject()
.startObject(Watch.Parser.TRIGGER_FIELD.getPreferredName())
.startObject("schedule")
.field("interval", "1s")
.endObject()
.endObject()
.startObject(Watch.Parser.ACTIONS_FIELD.getPreferredName())
.endObject()
.endObject())
.get();
// no actions field:
client().prepareIndex(WatchStore.INDEX, WatchStore.DOC_TYPE, "_id1")
.setSource(jsonBuilder().startObject()
.startObject(Watch.Parser.TRIGGER_FIELD.getPreferredName())
.startObject("schedule")
.field("interval", "1s")
.endObject()
.endObject()
.endObject())
.get();
// invalid interval
client().prepareIndex(WatchStore.INDEX, WatchStore.DOC_TYPE, "_id2")
.setSource(jsonBuilder().startObject()
.startObject(Watch.Parser.TRIGGER_FIELD.getPreferredName())
.startObject("schedule")
.field("interval", true)
.endObject()
.endObject()
.startObject(Watch.Parser.ACTIONS_FIELD.getPreferredName())
.endObject()
.endObject())
.get();
// illegal top level field
client().prepareIndex(WatchStore.INDEX, WatchStore.DOC_TYPE, "_id3")
.setSource(jsonBuilder().startObject()
.startObject(Watch.Parser.TRIGGER_FIELD.getPreferredName())
.startObject("schedule")
.field("interval", "1s")
.endObject()
.startObject("illegal_field").endObject()
.endObject()
.startObject(Watch.Parser.ACTIONS_FIELD.getPreferredName()).endObject()
.endObject())
.get();
stopWatcher();
startWatcher();
WatcherStatsResponse response = watcherClient().prepareWatcherStats().get();
assertThat(response.getWatchServiceState(), equalTo(WatcherService.State.STARTED));
// Only the valid watch should been loaded
assertThat(response.getWatchesCount(), equalTo(1l));
assertThat(watcherClient().prepareGetWatch("_id0").get().getId(), Matchers.equalTo("_id0"));
}
@Test
public void testLoadMalformedWatchRecord() throws Exception {
client().prepareIndex(WatchStore.INDEX, WatchStore.DOC_TYPE, "_id")
.setSource(jsonBuilder().startObject()
.startObject(Watch.Parser.TRIGGER_FIELD.getPreferredName())
.startObject("schedule")
.field("cron", "0/5 * * * * ? 2050")
.endObject()
.endObject()
.startObject(Watch.Parser.ACTIONS_FIELD.getPreferredName())
.endObject()
.endObject())
.get();
// valid watch record:
DateTime now = DateTime.now(UTC);
Wid wid = new Wid("_id", 1, now);
ScheduleTriggerEvent event = new ScheduleTriggerEvent("_id", now, now);
Condition condition = new AlwaysCondition();
String index = HistoryStore.getHistoryIndexNameForTime(now);
client().prepareIndex(index, HistoryStore.DOC_TYPE, wid.value())
.setSource(jsonBuilder().startObject()
.field(WatchRecord.Parser.WATCH_ID_FIELD.getPreferredName(), wid.watchId())
.startObject(WatchRecord.Parser.TRIGGER_EVENT_FIELD.getPreferredName())
.field(event.type(), event)
.endObject()
.startObject(Watch.Parser.CONDITION_FIELD.getPreferredName())
.field(condition.type(), condition)
.endObject()
.field(WatchRecord.Parser.STATE_FIELD.getPreferredName(), WatchRecord.State.AWAITS_EXECUTION)
.endObject())
.setConsistencyLevel(WriteConsistencyLevel.ALL)
.setRefresh(true)
.get();
// unknown condition:
wid = new Wid("_id", 2, now);
client().prepareIndex(index, HistoryStore.DOC_TYPE, wid.value())
.setSource(jsonBuilder().startObject()
.field(WatchRecord.Parser.WATCH_ID_FIELD.getPreferredName(), wid.watchId())
.startObject(WatchRecord.Parser.TRIGGER_EVENT_FIELD.getPreferredName())
.field(event.type(), event)
.endObject()
.startObject(Watch.Parser.CONDITION_FIELD.getPreferredName())
.startObject("unknown").endObject()
.endObject()
.field(WatchRecord.Parser.STATE_FIELD.getPreferredName(), WatchRecord.State.AWAITS_EXECUTION)
.endObject())
.setConsistencyLevel(WriteConsistencyLevel.ALL)
.setRefresh(true)
.get();
// unknown trigger:
wid = new Wid("_id", 2, now);
client().prepareIndex(index, HistoryStore.DOC_TYPE, wid.value())
.setSource(jsonBuilder().startObject()
.field(WatchRecord.Parser.WATCH_ID_FIELD.getPreferredName(), wid.watchId())
.startObject(WatchRecord.Parser.TRIGGER_EVENT_FIELD.getPreferredName())
.startObject("unknown").endObject()
.endObject()
.startObject(Watch.Parser.CONDITION_FIELD.getPreferredName())
.field(condition.type(), condition)
.endObject()
.field(WatchRecord.Parser.STATE_FIELD.getPreferredName(), WatchRecord.State.AWAITS_EXECUTION)
.endObject())
.setConsistencyLevel(WriteConsistencyLevel.ALL)
.setRefresh(true)
.get();
stopWatcher();
startWatcher();
assertBusy(new Runnable() {
@Override
public void run() {
WatcherStatsResponse response = watcherClient().prepareWatcherStats().get();
assertThat(response.getWatchServiceState(), equalTo(WatcherService.State.STARTED));
assertThat(response.getWatchesCount(), equalTo(1l));
assertThat(response.getWatchExecutionQueueMaxSize(), equalTo(1l));
}
});
}
@Test @Test
public void testDeletedWhileQueued() throws Exception { public void testDeletedWhileQueued() throws Exception {
DateTime now = DateTime.now(UTC); DateTime now = DateTime.now(UTC);