diff --git a/src/test/java/org/elasticsearch/watcher/actions/ActionErrorIntegrationTests.java b/src/test/java/org/elasticsearch/watcher/actions/ActionErrorIntegrationTests.java new file mode 100644 index 00000000000..ccb2936af90 --- /dev/null +++ b/src/test/java/org/elasticsearch/watcher/actions/ActionErrorIntegrationTests.java @@ -0,0 +1,208 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.watcher.actions; + +import org.elasticsearch.common.collect.ImmutableList; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.logging.ESLogger; +import org.elasticsearch.common.logging.Loggers; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.plugins.AbstractPlugin; +import org.elasticsearch.watcher.WatcherException; +import org.elasticsearch.watcher.execution.WatchExecutionContext; +import org.elasticsearch.watcher.support.xcontent.XContentSource; +import org.elasticsearch.watcher.test.AbstractWatcherIntegrationTests; +import org.elasticsearch.watcher.transport.actions.get.GetWatchResponse; +import org.elasticsearch.watcher.transport.actions.put.PutWatchResponse; +import org.elasticsearch.watcher.watch.Payload; +import org.junit.Test; + +import java.io.IOException; +import java.util.List; + +import static org.elasticsearch.index.query.FilterBuilders.boolFilter; +import static org.elasticsearch.index.query.FilterBuilders.termFilter; +import static org.elasticsearch.index.query.QueryBuilders.filteredQuery; +import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery; +import static org.elasticsearch.watcher.client.WatchSourceBuilders.watchBuilder; +import static org.elasticsearch.watcher.trigger.TriggerBuilders.schedule; +import static org.elasticsearch.watcher.trigger.schedule.Schedules.interval; +import static org.hamcrest.Matchers.is; + +/** + * + */ +public class ActionErrorIntegrationTests extends AbstractWatcherIntegrationTests { + + @Override + protected boolean timeWarped() { + return true; // to have control over the execution + } + + @Override + protected List pluginTypes() { + return ImmutableList.builder() + .addAll(super.pluginTypes()) + .add(ErrorActionPlugin.class.getName()) + .build(); + } + + /** + this test makes sure that when an action encounters an error + it should not be subject to throttling. Also, the ack status + of the action in the watch should remain awaits_successful_execution + as long as the execution fails. + */ + @Test + public void testErrorInAction() throws Exception { + + PutWatchResponse putWatchResponse = watcherClient().preparePutWatch("_id").setSource(watchBuilder() + .trigger(schedule(interval("10m"))) + + // adding an action that throws an error and is associated with a 60 minute throttle period + // with such a period, on successful execution we other executions of the watch will be + // throttled within the hour... but on failed execution there should be no throttling + .addAction("_action", TimeValue.timeValueMinutes(60), new ErrorAction.Builder())) + .get(); + + assertThat(putWatchResponse.isCreated(), is(true)); + + timeWarp().scheduler().trigger("_id"); + + flush(); + + // there should be a single history record with a failure status for the action: + assertBusy(new Runnable() { + @Override + public void run() { + long count = watchRecordCount(filteredQuery(matchAllQuery(), boolFilter() + .must(termFilter("result.actions.id", "_action")) + .must(termFilter("result.actions.status", "failure")))); + assertThat(count, is(1L)); + } + }); + + // now we'll trigger the watch again and make sure that it's not throttled and instead + // writes another record to the history + + // within the 60 minute throttling period + timeWarp().clock().fastForward(TimeValue.timeValueMinutes(randomIntBetween(1, 50))); + timeWarp().scheduler().trigger("_id"); + + flush(); + + // there should be a single history record with a failure status for the action: + assertBusy(new Runnable() { + @Override + public void run() { + long count = watchRecordCount(filteredQuery(matchAllQuery(), boolFilter() + .must(termFilter("result.actions.id", "_action")) + .must(termFilter("result.actions.status", "failure")))); + assertThat(count, is(2L)); + } + }); + + // now lets confirm that the ack status of the action is awaits_successful_execution + GetWatchResponse getWatchResponse = watcherClient().prepareGetWatch("_id").get(); + XContentSource watch = getWatchResponse.getSource(); + watch.getValue("status.actions._action.ack.awaits_successful_execution"); + } + + + + public static class ErrorActionPlugin extends AbstractPlugin { + + public ErrorActionPlugin() { + } + + @Override + public String name() { + return "error-action"; + } + + @Override + public String description() { + return name(); + } + + public void onModule(ActionModule module) { + module.registerAction(ErrorAction.TYPE, ErrorAction.Factory.class); + } + } + + public static class ErrorAction implements Action { + + static final String TYPE = "error"; + + @Override + public String type() { + return TYPE; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + return builder.startObject().endObject(); + } + + public static class Result extends Action.Result { + public Result() { + super(TYPE, Status.FAILURE); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + return builder.startObject().endObject(); + } + } + + public static class Executable extends ExecutableAction { + + public Executable(ErrorAction action, ESLogger logger) { + super(action, logger); + } + + @Override + public Action.Result execute(String actionId, WatchExecutionContext context, Payload payload) throws Exception { + throw new WatcherException("dummy error"); + } + } + + public static class Factory extends ActionFactory { + + @Inject + public Factory(Settings settings) { + super(Loggers.getLogger(Executable.class, settings)); + } + + @Override + public String type() { + return TYPE; + } + + @Override + public ErrorAction parseAction(String watchId, String actionId, XContentParser parser) throws IOException { + assert parser.currentToken() == XContentParser.Token.START_OBJECT; + assert parser.nextToken() == XContentParser.Token.END_OBJECT; + return new ErrorAction(); + } + + @Override + public Executable createExecutable(ErrorAction action) { + return new Executable(action, actionLogger); + } + } + + public static class Builder implements Action.Builder { + @Override + public ErrorAction build() { + return new ErrorAction(); + } + } + } +} diff --git a/src/test/java/org/elasticsearch/watcher/test/AbstractWatcherIntegrationTests.java b/src/test/java/org/elasticsearch/watcher/test/AbstractWatcherIntegrationTests.java index b6b4ec8f1f5..31143a06320 100644 --- a/src/test/java/org/elasticsearch/watcher/test/AbstractWatcherIntegrationTests.java +++ b/src/test/java/org/elasticsearch/watcher/test/AbstractWatcherIntegrationTests.java @@ -14,6 +14,7 @@ import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexTemplateMetaData; import org.elasticsearch.cluster.routing.IndexRoutingTable; +import org.elasticsearch.common.Strings; import org.elasticsearch.common.base.Charsets; import org.elasticsearch.common.hppc.cursors.ObjectObjectCursor; import org.elasticsearch.common.io.FileSystemUtils; @@ -96,15 +97,26 @@ public abstract class AbstractWatcherIntegrationTests extends ElasticsearchInteg // we do this by default in core, but for watcher this isn't needed and only adds noise. .put("index.store.mock.check_index_on_close", false) .put("scroll.size", randomIntBetween(1, 100)) - .put("plugin.types", - (timeWarped() ? TimeWarpedWatcherPlugin.class.getName() : WatcherPlugin.class.getName()) + "," + - (shieldEnabled ? ShieldPlugin.class.getName() + "," : "") + - licensePluginClass().getName()) + .put("plugin.types", Strings.collectionToCommaDelimitedString(pluginTypes())) .put(ShieldSettings.settings(shieldEnabled)) .put("watcher.trigger.schedule.engine", scheduleImplName) .build(); } + protected List pluginTypes() { + List types = new ArrayList<>(); + if (timeWarped()) { + types.add(TimeWarpedWatcherPlugin.class.getName()); + } else { + types.add(WatcherPlugin.class.getName()); + } + if (shieldEnabled) { + types.add(ShieldPlugin.class.getName()); + } + types.add(licensePluginClass().getName()); + return types; + } + /** * @return whether the test suite should run in time warp mode. By default this will be determined globally * to all test suites based on {@code -Dtests.timewarp} system property (when missing, defaults to @@ -228,6 +240,11 @@ public abstract class AbstractWatcherIntegrationTests extends ElasticsearchInteg return docCount(index, type, SearchSourceBuilder.searchSource().query(query)); } + protected long watchRecordCount(QueryBuilder query) { + refresh(); + return docCount(HistoryStore.INDEX_PREFIX + "*", HistoryStore.DOC_TYPE, SearchSourceBuilder.searchSource().query(query)); + } + protected long docCount(String index, String type, SearchSourceBuilder source) { SearchRequestBuilder builder = client().prepareSearch(index).setSearchType(SearchType.COUNT); if (type != null) {