Watcher: Allow unmet action conditions to reset action status (elastic/x-pack-elasticsearch#1859)

The logic of resetting acknowledgements is only executed, if the watch
wide condition is not met. However, if you dont specify a condition
(which makes it always true), but create a condition in your action
(this might make sense because it allows you to execute a transform and
then execute the condition), then after acking this action, it will
never get be unacked, because the watch wide condition is always met.

relates elastic/x-pack-elasticsearch#1857

Original commit: elastic/x-pack-elasticsearch@95aa402c27
This commit is contained in:
Alexander Reelsen 2017-06-28 14:52:26 +02:00 committed by GitHub
parent 1a076e2eb9
commit edd5fa4ab4
8 changed files with 201 additions and 61 deletions

View File

@ -26,6 +26,8 @@ import org.elasticsearch.xpack.watcher.transform.ExecutableTransform;
import org.elasticsearch.xpack.watcher.transform.Transform;
import org.elasticsearch.xpack.watcher.watch.Payload;
import org.elasticsearch.xpack.watcher.watch.Watch;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import java.io.IOException;
import java.time.Clock;
@ -43,10 +45,6 @@ public class ActionWrapper implements ToXContentObject {
private final ActionThrottler throttler;
private final ExecutableAction action;
public ActionWrapper(String id, ExecutableAction action) {
this(id, null, null, null, action);
}
public ActionWrapper(String id, ActionThrottler throttler,
@Nullable Condition condition,
@Nullable ExecutableTransform transform,
@ -112,6 +110,7 @@ public class ActionWrapper implements ToXContentObject {
try {
conditionResult = condition.execute(ctx);
if (conditionResult.met() == false) {
ctx.watch().status().actionStatus(id).resetAckStatus(DateTime.now(DateTimeZone.UTC));
return new ActionWrapper.Result(id, conditionResult, null,
new Action.Result.ConditionFailed(action.type(), "condition not met. skipping"));
}

View File

@ -0,0 +1,69 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.watcher.actions;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.watcher.condition.NeverCondition;
import org.elasticsearch.xpack.watcher.execution.WatchExecutionContext;
import org.elasticsearch.xpack.watcher.watch.Watch;
import org.elasticsearch.xpack.watcher.watch.WatchStatus;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import static org.elasticsearch.xpack.watcher.actions.ActionStatus.AckStatus.State;
import static org.hamcrest.Matchers.is;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class ActionWrapperTests extends ESTestCase {
private DateTime now = DateTime.now(DateTimeZone.UTC);
private Watch watch = mock(Watch.class);
private ExecutableAction executableAction = mock(ExecutableAction.class);
private ActionWrapper actionWrapper = new ActionWrapper("_action", null, NeverCondition.INSTANCE, null, executableAction);
public void testThatUnmetActionConditionResetsAckStatus() throws Exception {
WatchStatus watchStatus = new WatchStatus(now, Collections.singletonMap("_action", createActionStatus(State.ACKED)));
when(watch.status()).thenReturn(watchStatus);
ActionWrapper.Result result = actionWrapper.execute(mockExecutionContent(watch));
assertThat(result.condition().met(), is(false));
assertThat(result.action().status(), is(Action.Result.Status.CONDITION_FAILED));
assertThat(watch.status().actionStatus("_action").ackStatus().state(), is(State.AWAITS_SUCCESSFUL_EXECUTION));
}
public void testOtherActionsAreNotAffectedOnActionConditionReset() throws Exception {
Map<String, ActionStatus> statusMap = new HashMap<>();
statusMap.put("_action", createActionStatus(State.ACKED));
State otherState = randomFrom(State.ACKABLE, State.AWAITS_SUCCESSFUL_EXECUTION);
statusMap.put("other", createActionStatus(otherState));
WatchStatus watchStatus = new WatchStatus(now, statusMap);
when(watch.status()).thenReturn(watchStatus);
actionWrapper.execute(mockExecutionContent(watch));
assertThat(watch.status().actionStatus("other").ackStatus().state(), is(otherState));
}
private WatchExecutionContext mockExecutionContent(Watch watch) {
WatchExecutionContext ctx = mock(WatchExecutionContext.class);
when(watch.id()).thenReturn("watchId");
when(ctx.watch()).thenReturn(watch);
when(ctx.skipThrottling(eq("_action"))).thenReturn(true);
return ctx;
}
private ActionStatus createActionStatus(State state) {
ActionStatus.AckStatus ackStatus = new ActionStatus.AckStatus(now, state);
ActionStatus.Execution execution = ActionStatus.Execution.successful(now);
return new ActionStatus(ackStatus, execution, execution, null);
}
}

View File

@ -8,6 +8,7 @@ package org.elasticsearch.xpack.watcher.actions.throttler;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.watcher.actions.ActionStatus;
import org.elasticsearch.xpack.watcher.execution.WatchExecutionContext;
import org.elasticsearch.xpack.watcher.watch.Payload;
import org.elasticsearch.xpack.watcher.watch.Watch;
import org.elasticsearch.xpack.watcher.watch.WatchStatus;
import org.joda.time.DateTime;
@ -15,7 +16,6 @@ import org.joda.time.DateTime;
import java.time.Clock;
import static org.elasticsearch.xpack.watcher.support.WatcherDateTimeUtils.formatDate;
import static org.elasticsearch.xpack.watcher.test.WatcherTestUtils.EMPTY_PAYLOAD;
import static org.elasticsearch.xpack.watcher.test.WatcherTestUtils.mockExecutionContext;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.Matchers.is;
@ -26,7 +26,7 @@ import static org.mockito.Mockito.when;
public class AckThrottlerTests extends ESTestCase {
public void testWhenAcked() throws Exception {
DateTime timestamp = new DateTime(Clock.systemUTC().millis(), UTC);
WatchExecutionContext ctx = mockExecutionContext("_watch", EMPTY_PAYLOAD);
WatchExecutionContext ctx = mockExecutionContext("_watch", Payload.EMPTY);
Watch watch = ctx.watch();
ActionStatus actionStatus = mock(ActionStatus.class);
when(actionStatus.ackStatus()).thenReturn(new ActionStatus.AckStatus(timestamp, ActionStatus.AckStatus.State.ACKED));
@ -42,7 +42,7 @@ public class AckThrottlerTests extends ESTestCase {
public void testThrottleWhenAwaitsSuccessfulExecution() throws Exception {
DateTime timestamp = new DateTime(Clock.systemUTC().millis(), UTC);
WatchExecutionContext ctx = mockExecutionContext("_watch", EMPTY_PAYLOAD);
WatchExecutionContext ctx = mockExecutionContext("_watch", Payload.EMPTY);
Watch watch = ctx.watch();
ActionStatus actionStatus = mock(ActionStatus.class);
when(actionStatus.ackStatus()).thenReturn(new ActionStatus.AckStatus(timestamp,
@ -58,7 +58,7 @@ public class AckThrottlerTests extends ESTestCase {
public void testThrottleWhenAckable() throws Exception {
DateTime timestamp = new DateTime(Clock.systemUTC().millis(), UTC);
WatchExecutionContext ctx = mockExecutionContext("_watch", EMPTY_PAYLOAD);
WatchExecutionContext ctx = mockExecutionContext("_watch", Payload.EMPTY);
Watch watch = ctx.watch();
ActionStatus actionStatus = mock(ActionStatus.class);
when(actionStatus.ackStatus()).thenReturn(new ActionStatus.AckStatus(timestamp, ActionStatus.AckStatus.State.ACKABLE));

View File

@ -9,13 +9,13 @@ import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.watcher.actions.ActionStatus;
import org.elasticsearch.xpack.watcher.execution.WatchExecutionContext;
import org.elasticsearch.xpack.watcher.watch.Payload;
import org.elasticsearch.xpack.watcher.watch.WatchStatus;
import org.joda.time.DateTime;
import org.joda.time.PeriodType;
import java.time.Clock;
import static org.elasticsearch.xpack.watcher.test.WatcherTestUtils.EMPTY_PAYLOAD;
import static org.elasticsearch.xpack.watcher.test.WatcherTestUtils.mockExecutionContext;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.Matchers.is;
@ -30,7 +30,7 @@ public class PeriodThrottlerTests extends ESTestCase {
TimeValue period = TimeValue.timeValueSeconds(randomIntBetween(2, 5));
PeriodThrottler throttler = new PeriodThrottler(Clock.systemUTC(), period, periodType);
WatchExecutionContext ctx = mockExecutionContext("_name", EMPTY_PAYLOAD);
WatchExecutionContext ctx = mockExecutionContext("_name", Payload.EMPTY);
ActionStatus actionStatus = mock(ActionStatus.class);
DateTime now = new DateTime(Clock.systemUTC().millis());
when(actionStatus.lastSuccessfulExecution())
@ -52,7 +52,7 @@ public class PeriodThrottlerTests extends ESTestCase {
TimeValue period = TimeValue.timeValueSeconds(randomIntBetween(2, 5));
PeriodThrottler throttler = new PeriodThrottler(Clock.systemUTC(), period, periodType);
WatchExecutionContext ctx = mockExecutionContext("_name", EMPTY_PAYLOAD);
WatchExecutionContext ctx = mockExecutionContext("_name", Payload.EMPTY);
ActionStatus actionStatus = mock(ActionStatus.class);
DateTime now = new DateTime(Clock.systemUTC().millis());
when(actionStatus.lastSuccessfulExecution())

View File

@ -66,11 +66,9 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import static java.util.Collections.emptyMap;
import static org.apache.lucene.util.LuceneTestCase.createTempDir;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
@ -81,8 +79,6 @@ import static org.junit.Assert.assertThat;
public final class WatcherTestUtils {
public static final Payload EMPTY_PAYLOAD = new Payload.Simple(emptyMap());
private WatcherTestUtils() {
}
@ -115,10 +111,6 @@ public final class WatcherTestUtils {
}
}
public static SearchRequest matchAllRequest() {
return matchAllRequest(null);
}
public static SearchRequest matchAllRequest(IndicesOptions indicesOptions) {
SearchRequest request = new SearchRequest(Strings.EMPTY_ARRAY)
.source(SearchSourceBuilder.searchSource().query(matchAllQuery()));
@ -171,63 +163,41 @@ public final class WatcherTestUtils {
public static Watch createTestWatch(String watchName, Client client, HttpClient httpClient, EmailService emailService,
WatcherSearchTemplateService searchTemplateService, Logger logger) throws AddressException {
WatcherSearchTemplateRequest transformRequest =
templateRequest(searchSource().query(matchAllQuery()), "my-payload-index");
List<ActionWrapper> actions = new ArrayList<>();
TextTemplateEngine engine = new MockTextTemplateEngine();
HttpRequestTemplate.Builder httpRequest = HttpRequestTemplate.builder("localhost", 80);
httpRequest.method(HttpMethod.POST);
httpRequest.path(new TextTemplate("/foobarbaz/{{ctx.watch_id}}"));
httpRequest.body(new TextTemplate("{{ctx.watch_id}} executed with {{ctx.payload.response.hits.total_hits}} hits"));
actions.add(new ActionWrapper("_webhook", null, null, null, new ExecutableWebhookAction(new WebhookAction(httpRequest.build()),
logger, httpClient, engine)));
TextTemplate path = new TextTemplate("/foobarbaz/{{ctx.watch_id}}");
httpRequest.path(path);
TextTemplate body = new TextTemplate("{{ctx.watch_id}} executed with {{ctx.payload.response.hits.total_hits}} hits");
httpRequest.body(body);
TextTemplateEngine engine = new MockTextTemplateEngine();
actions.add(new ActionWrapper("_webhook", new ExecutableWebhookAction(new WebhookAction(httpRequest.build()), logger, httpClient,
engine)));
String from = "from@test.com";
String to = "to@test.com";
EmailTemplate email = EmailTemplate.builder()
.from(from)
.to(to)
.build();
EmailTemplate email = EmailTemplate.builder().from("from@test.com").to("to@test.com").build();
Authentication auth = new Authentication("testname", new Secret("testpassword".toCharArray()));
EmailAction action = new EmailAction(email, "testaccount", auth, Profile.STANDARD, null, null);
ExecutableEmailAction executale = new ExecutableEmailAction(action, logger, emailService, engine,
new HtmlSanitizer(Settings.EMPTY), Collections.emptyMap());
actions.add(new ActionWrapper("_email", executale));
Map<String, Object> metadata = new LinkedHashMap<>();
metadata.put("foo", "bar");
Map<String, Object> inputData = new LinkedHashMap<>();
inputData.put("bar", "foo");
actions.add(new ActionWrapper("_email", null, null, null, executale));
DateTime now = DateTime.now(UTC);
Map<String, ActionStatus> statuses = new HashMap<>();
statuses.put("_webhook", new ActionStatus(now));
statuses.put("_email", new ActionStatus(now));
WatcherSearchTemplateRequest transformRequest = templateRequest(searchSource().query(matchAllQuery()), "my-payload-index");
SearchTransform searchTransform = new SearchTransform(transformRequest, null, null);
return new Watch(
watchName,
new ScheduleTrigger(new CronSchedule("0/5 * * * * ? *")),
new ExecutableSimpleInput(new SimpleInput(new Payload.Simple(inputData)), logger),
new ExecutableSimpleInput(new SimpleInput(new Payload.Simple(Collections.singletonMap("bar", "foo"))), logger),
AlwaysCondition.INSTANCE,
new ExecutableSearchTransform(searchTransform, logger, client, searchTemplateService, TimeValue.timeValueMinutes(1)),
new TimeValue(0),
actions,
metadata,
Collections.singletonMap("foo", "bar"),
new WatchStatus(now, statuses));
}

View File

@ -62,7 +62,6 @@ import static org.elasticsearch.index.query.QueryBuilders.boolQuery;
import static org.elasticsearch.index.query.QueryBuilders.matchQuery;
import static org.elasticsearch.search.builder.SearchSourceBuilder.searchSource;
import static org.elasticsearch.test.ESIntegTestCase.Scope.SUITE;
import static org.elasticsearch.xpack.watcher.test.WatcherTestUtils.EMPTY_PAYLOAD;
import static org.elasticsearch.xpack.watcher.test.WatcherTestUtils.getRandomSupportedSearchType;
import static org.elasticsearch.xpack.watcher.test.WatcherTestUtils.mockExecutionContext;
import static org.elasticsearch.xpack.watcher.test.WatcherTestUtils.templateRequest;
@ -108,9 +107,9 @@ public class SearchTransformTests extends ESIntegTestCase {
ExecutableSearchTransform transform = new ExecutableSearchTransform(searchTransform, logger, client(),
watcherSearchTemplateService(), TimeValue.timeValueMinutes(1));
WatchExecutionContext ctx = mockExecutionContext("_name", EMPTY_PAYLOAD);
WatchExecutionContext ctx = mockExecutionContext("_name", Payload.EMPTY);
Transform.Result result = transform.execute(ctx, EMPTY_PAYLOAD);
Transform.Result result = transform.execute(ctx, Payload.EMPTY);
assertThat(result, notNullValue());
assertThat(result.type(), is(SearchTransform.TYPE));
assertThat(result.status(), is(Transform.Result.Status.SUCCESS));
@ -145,9 +144,9 @@ public class SearchTransformTests extends ESIntegTestCase {
ExecutableSearchTransform transform = new ExecutableSearchTransform(searchTransform, logger, client(),
watcherSearchTemplateService(), TimeValue.timeValueMinutes(1));
WatchExecutionContext ctx = mockExecutionContext("_name", EMPTY_PAYLOAD);
WatchExecutionContext ctx = mockExecutionContext("_name", Payload.EMPTY);
SearchTransform.Result result = transform.execute(ctx, EMPTY_PAYLOAD);
SearchTransform.Result result = transform.execute(ctx, Payload.EMPTY);
assertThat(result, notNullValue());
assertThat(result.type(), is(SearchTransform.TYPE));
assertThat(result.status(), is(Transform.Result.Status.FAILURE));

View File

@ -32,7 +32,6 @@ import static java.util.Collections.singleton;
import static java.util.Collections.singletonMap;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.xpack.watcher.support.Exceptions.illegalArgument;
import static org.elasticsearch.xpack.watcher.test.WatcherTestUtils.EMPTY_PAYLOAD;
import static org.elasticsearch.xpack.watcher.test.WatcherTestUtils.createScriptService;
import static org.elasticsearch.xpack.watcher.test.WatcherTestUtils.mockExecutionContext;
import static org.elasticsearch.xpack.watcher.test.WatcherTestUtils.simplePayload;
@ -64,7 +63,7 @@ public class ScriptTransformTests extends ESTestCase {
when(service.compile(script, Watcher.SCRIPT_EXECUTABLE_CONTEXT)).thenReturn(factory);
ExecutableScriptTransform transform = new ExecutableScriptTransform(new ScriptTransform(script), logger, service);
WatchExecutionContext ctx = mockExecutionContext("_name", EMPTY_PAYLOAD);
WatchExecutionContext ctx = mockExecutionContext("_name", Payload.EMPTY);
Payload payload = simplePayload("key", "value");
@ -92,7 +91,7 @@ public class ScriptTransformTests extends ESTestCase {
when(service.compile(script, Watcher.SCRIPT_EXECUTABLE_CONTEXT)).thenReturn(factory);
ExecutableScriptTransform transform = new ExecutableScriptTransform(new ScriptTransform(script), logger, service);
WatchExecutionContext ctx = mockExecutionContext("_name", EMPTY_PAYLOAD);
WatchExecutionContext ctx = mockExecutionContext("_name", Payload.EMPTY);
Payload payload = simplePayload("key", "value");
@ -118,14 +117,14 @@ public class ScriptTransformTests extends ESTestCase {
when(service.compile(script, Watcher.SCRIPT_EXECUTABLE_CONTEXT)).thenReturn(factory);
ExecutableScriptTransform transform = new ExecutableScriptTransform(new ScriptTransform(script), logger, service);
WatchExecutionContext ctx = mockExecutionContext("_name", EMPTY_PAYLOAD);
WatchExecutionContext ctx = mockExecutionContext("_name", Payload.EMPTY);
Payload payload = simplePayload("key", "value");
Map<String, Object> model = Variables.createCtxModel(ctx, payload);
ExecutableScript executable = mock(ExecutableScript.class);
Object value = randomFrom("value", 1, new String[] { "value" }, Arrays.asList("value"), singleton("value"));
Object value = randomFrom("value", 1, new String[] { "value" }, Collections.singletonList("value"), singleton("value"));
when(executable.run()).thenReturn(value);
when(factory.newInstance(model)).thenReturn(executable);

View File

@ -0,0 +1,104 @@
---
setup:
- do:
cluster.health:
wait_for_status: yellow
---
teardown:
- do:
xpack.watcher.delete_watch:
id: "my_watch"
ignore: 404
---
"Ensure that ack status is reset after unmet action condition":
- do:
xpack.watcher.put_watch:
id: "my_watch"
body: >
{
"trigger": {
"schedule": {
"interval": "1m"
}
},
"input": {
"simple" : { "match" : "true" }
},
"actions": {
"indexme" : {
"condition": {
"compare": {
"ctx.payload.match": {
"eq": "true"
}
}
},
"index" : {
"index" : "my-index",
"doc_type" : "my-type",
"doc_id": "my-id"
}
}
}
}
- do:
xpack.watcher.execute_watch:
id: "my_watch"
body: >
{
"record_execution" : true
}
- match: { watch_record.status.actions.indexme.ack.state: "ackable" }
- do:
xpack.watcher.ack_watch:
watch_id: "my_watch"
- match: { "status.actions.indexme.ack.state" : "acked" }
- do:
xpack.watcher.get_watch:
id: "my_watch"
- match: { "status.actions.indexme.ack.state" : "acked" }
# having a false result will reset the ack state
- do:
xpack.watcher.execute_watch:
id: "my_watch"
body: >
{
"record_execution" : true,
"alternative_input" : {
"match" : "false"
},
"action_modes" : {
"indexme" : "force_execute"
}
}
- match: { watch_record.status.actions.indexme.ack.state: "awaits_successful_execution" }
- do:
xpack.watcher.get_watch:
id: "my_watch"
- match: { "status.actions.indexme.ack.state" : "awaits_successful_execution" }
- do:
xpack.watcher.execute_watch:
id: "my_watch"
body: >
{
"record_execution" : true,
"action_modes" : {
"indexme" : "force_execute"
}
}
- match: { watch_record.status.actions.indexme.ack.state: "ackable" }
- do:
xpack.watcher.get_watch:
id: "my_watch"
- match: { "status.actions.indexme.ack.state" : "ackable" }