[test] added a test to make sure a failed action is not throttled

Closes elastic/elasticsearch#253

Original commit: elastic/x-pack-elasticsearch@89f9075731
This commit is contained in:
uboness 2015-06-09 15:37:16 +02:00
parent babe40137f
commit c5fd06d624
2 changed files with 229 additions and 4 deletions

View File

@ -0,0 +1,208 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.watcher.actions;
import org.elasticsearch.common.collect.ImmutableList;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.plugins.AbstractPlugin;
import org.elasticsearch.watcher.WatcherException;
import org.elasticsearch.watcher.execution.WatchExecutionContext;
import org.elasticsearch.watcher.support.xcontent.XContentSource;
import org.elasticsearch.watcher.test.AbstractWatcherIntegrationTests;
import org.elasticsearch.watcher.transport.actions.get.GetWatchResponse;
import org.elasticsearch.watcher.transport.actions.put.PutWatchResponse;
import org.elasticsearch.watcher.watch.Payload;
import org.junit.Test;
import java.io.IOException;
import java.util.List;
import static org.elasticsearch.index.query.FilterBuilders.boolFilter;
import static org.elasticsearch.index.query.FilterBuilders.termFilter;
import static org.elasticsearch.index.query.QueryBuilders.filteredQuery;
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
import static org.elasticsearch.watcher.client.WatchSourceBuilders.watchBuilder;
import static org.elasticsearch.watcher.trigger.TriggerBuilders.schedule;
import static org.elasticsearch.watcher.trigger.schedule.Schedules.interval;
import static org.hamcrest.Matchers.is;
/**
*
*/
public class ActionErrorIntegrationTests extends AbstractWatcherIntegrationTests {
@Override
protected boolean timeWarped() {
return true; // to have control over the execution
}
@Override
protected List<String> pluginTypes() {
return ImmutableList.<String>builder()
.addAll(super.pluginTypes())
.add(ErrorActionPlugin.class.getName())
.build();
}
/**
this test makes sure that when an action encounters an error
it should not be subject to throttling. Also, the ack status
of the action in the watch should remain awaits_successful_execution
as long as the execution fails.
*/
@Test
public void testErrorInAction() throws Exception {
PutWatchResponse putWatchResponse = watcherClient().preparePutWatch("_id").setSource(watchBuilder()
.trigger(schedule(interval("10m")))
// adding an action that throws an error and is associated with a 60 minute throttle period
// with such a period, on successful execution we other executions of the watch will be
// throttled within the hour... but on failed execution there should be no throttling
.addAction("_action", TimeValue.timeValueMinutes(60), new ErrorAction.Builder()))
.get();
assertThat(putWatchResponse.isCreated(), is(true));
timeWarp().scheduler().trigger("_id");
flush();
// there should be a single history record with a failure status for the action:
assertBusy(new Runnable() {
@Override
public void run() {
long count = watchRecordCount(filteredQuery(matchAllQuery(), boolFilter()
.must(termFilter("result.actions.id", "_action"))
.must(termFilter("result.actions.status", "failure"))));
assertThat(count, is(1L));
}
});
// now we'll trigger the watch again and make sure that it's not throttled and instead
// writes another record to the history
// within the 60 minute throttling period
timeWarp().clock().fastForward(TimeValue.timeValueMinutes(randomIntBetween(1, 50)));
timeWarp().scheduler().trigger("_id");
flush();
// there should be a single history record with a failure status for the action:
assertBusy(new Runnable() {
@Override
public void run() {
long count = watchRecordCount(filteredQuery(matchAllQuery(), boolFilter()
.must(termFilter("result.actions.id", "_action"))
.must(termFilter("result.actions.status", "failure"))));
assertThat(count, is(2L));
}
});
// now lets confirm that the ack status of the action is awaits_successful_execution
GetWatchResponse getWatchResponse = watcherClient().prepareGetWatch("_id").get();
XContentSource watch = getWatchResponse.getSource();
watch.getValue("status.actions._action.ack.awaits_successful_execution");
}
public static class ErrorActionPlugin extends AbstractPlugin {
public ErrorActionPlugin() {
}
@Override
public String name() {
return "error-action";
}
@Override
public String description() {
return name();
}
public void onModule(ActionModule module) {
module.registerAction(ErrorAction.TYPE, ErrorAction.Factory.class);
}
}
public static class ErrorAction implements Action {
static final String TYPE = "error";
@Override
public String type() {
return TYPE;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
return builder.startObject().endObject();
}
public static class Result extends Action.Result {
public Result() {
super(TYPE, Status.FAILURE);
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
return builder.startObject().endObject();
}
}
public static class Executable extends ExecutableAction<ErrorAction> {
public Executable(ErrorAction action, ESLogger logger) {
super(action, logger);
}
@Override
public Action.Result execute(String actionId, WatchExecutionContext context, Payload payload) throws Exception {
throw new WatcherException("dummy error");
}
}
public static class Factory extends ActionFactory<ErrorAction, Executable> {
@Inject
public Factory(Settings settings) {
super(Loggers.getLogger(Executable.class, settings));
}
@Override
public String type() {
return TYPE;
}
@Override
public ErrorAction parseAction(String watchId, String actionId, XContentParser parser) throws IOException {
assert parser.currentToken() == XContentParser.Token.START_OBJECT;
assert parser.nextToken() == XContentParser.Token.END_OBJECT;
return new ErrorAction();
}
@Override
public Executable createExecutable(ErrorAction action) {
return new Executable(action, actionLogger);
}
}
public static class Builder implements Action.Builder<ErrorAction> {
@Override
public ErrorAction build() {
return new ErrorAction();
}
}
}
}

View File

@ -14,6 +14,7 @@ import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexTemplateMetaData;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.base.Charsets;
import org.elasticsearch.common.hppc.cursors.ObjectObjectCursor;
import org.elasticsearch.common.io.FileSystemUtils;
@ -96,15 +97,26 @@ public abstract class AbstractWatcherIntegrationTests extends ElasticsearchInteg
// we do this by default in core, but for watcher this isn't needed and only adds noise.
.put("index.store.mock.check_index_on_close", false)
.put("scroll.size", randomIntBetween(1, 100))
.put("plugin.types",
(timeWarped() ? TimeWarpedWatcherPlugin.class.getName() : WatcherPlugin.class.getName()) + "," +
(shieldEnabled ? ShieldPlugin.class.getName() + "," : "") +
licensePluginClass().getName())
.put("plugin.types", Strings.collectionToCommaDelimitedString(pluginTypes()))
.put(ShieldSettings.settings(shieldEnabled))
.put("watcher.trigger.schedule.engine", scheduleImplName)
.build();
}
protected List<String> pluginTypes() {
List<String> types = new ArrayList<>();
if (timeWarped()) {
types.add(TimeWarpedWatcherPlugin.class.getName());
} else {
types.add(WatcherPlugin.class.getName());
}
if (shieldEnabled) {
types.add(ShieldPlugin.class.getName());
}
types.add(licensePluginClass().getName());
return types;
}
/**
* @return whether the test suite should run in time warp mode. By default this will be determined globally
* to all test suites based on {@code -Dtests.timewarp} system property (when missing, defaults to
@ -228,6 +240,11 @@ public abstract class AbstractWatcherIntegrationTests extends ElasticsearchInteg
return docCount(index, type, SearchSourceBuilder.searchSource().query(query));
}
protected long watchRecordCount(QueryBuilder query) {
refresh();
return docCount(HistoryStore.INDEX_PREFIX + "*", HistoryStore.DOC_TYPE, SearchSourceBuilder.searchSource().query(query));
}
protected long docCount(String index, String type, SearchSourceBuilder source) {
SearchRequestBuilder builder = client().prepareSearch(index).setSearchType(SearchType.COUNT);
if (type != null) {