diff --git a/rest-api-spec/api/watcher.ack_watch.json b/rest-api-spec/api/watcher.ack_watch.json index da9ffc4c6f5..aab5f9dd3f8 100644 --- a/rest-api-spec/api/watcher.ack_watch.json +++ b/rest-api-spec/api/watcher.ack_watch.json @@ -3,13 +3,17 @@ "documentation": "http://www.elastic.co/guide/en/watcher/current/appendix-api-ack-watch.html", "methods": [ "PUT", "POST" ], "url": { - "path": "/_watcher/watch/{id}/_ack", - "paths": [ "/_watcher/watch/{id}/_ack" ], + "path": "/_watcher/watch/{watch_id}/_ack", + "paths": [ "/_watcher/watch/{watch_id}/_ack", "/_watcher/watch/{watch_id}/{action_id}/_ack"], "parts": { - "id": { + "watch_id": { "type" : "string", "description" : "Watch ID", "required" : true + }, + "action_id": { + "type" : "list", + "description" : "A comma-separated list of the action ids to be acked" } }, "params": { diff --git a/rest-api-spec/test/ack_watch/10_basic.yaml b/rest-api-spec/test/ack_watch/10_basic.yaml index 29f48924951..c66406598ae 100644 --- a/rest-api-spec/test/ack_watch/10_basic.yaml +++ b/rest-api-spec/test/ack_watch/10_basic.yaml @@ -32,6 +32,7 @@ } } } + - match: { _id: "my_watch" } - do: @@ -40,9 +41,10 @@ - do: watcher.ack_watch: - id: "my_watch" + watch_id: "my_watch" + action_id: "test_index" - - match: { "status.actions.test_index.ack_status.state" : "awaits_successful_execution" } + - match: { "status.actions.test_index.ack.state" : "awaits_successful_execution" } - do: watcher.delete_watch: diff --git a/src/main/java/org/elasticsearch/watcher/WatcherService.java b/src/main/java/org/elasticsearch/watcher/WatcherService.java index ae6e7b1192f..b2d2d5831c0 100644 --- a/src/main/java/org/elasticsearch/watcher/WatcherService.java +++ b/src/main/java/org/elasticsearch/watcher/WatcherService.java @@ -142,18 +142,21 @@ public class WatcherService extends AbstractComponent { /** * Acks the watch if needed */ - public WatchStatus ackWatch(String id, TimeValue timeout) { + public WatchStatus ackWatch(String id, String[] actionIds, TimeValue timeout) { ensureStarted(); WatchLockService.Lock lock = watchLockService.tryAcquire(id, timeout); if (lock == null) { throw new TimeoutException("could not ack watch [{}] within [{}]... wait and try again. If this error continues to occur there is a high chance that the watch execution is stuck (either due to unresponsive external system such as an email service, or due to a bad script", id, timeout.format(PeriodType.seconds())); } + if (actionIds == null || actionIds.length == 0) { + actionIds = new String[] { Watch.ALL_ACTIONS_ID }; + } try { Watch watch = watchStore.get(id); if (watch == null) { throw new WatcherException("watch [{}] does not exist", id); } - if (watch.ack(clock.now(UTC), "_all")) { + if (watch.ack(clock.now(UTC), actionIds)) { try { watchStore.updateStatus(watch); } catch (IOException ioe) { diff --git a/src/main/java/org/elasticsearch/watcher/actions/ActionRegistry.java b/src/main/java/org/elasticsearch/watcher/actions/ActionRegistry.java index 7ce996aaaf7..fed91e2033a 100644 --- a/src/main/java/org/elasticsearch/watcher/actions/ActionRegistry.java +++ b/src/main/java/org/elasticsearch/watcher/actions/ActionRegistry.java @@ -10,6 +10,7 @@ import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.watcher.execution.Wid; import org.elasticsearch.watcher.license.LicenseService; +import org.elasticsearch.watcher.support.Validation; import org.elasticsearch.watcher.support.clock.Clock; import org.elasticsearch.watcher.transform.TransformRegistry; @@ -51,6 +52,10 @@ public class ActionRegistry { while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { if (token == XContentParser.Token.FIELD_NAME) { id = parser.currentName(); + Validation.Error error = Validation.actionId(id); + if (error != null) { + throw new ActionException("could not parse action [{}] for watch [{}]. {}", id, watchId, error); + } } else if (token == XContentParser.Token.START_OBJECT && id != null) { ActionWrapper action = ActionWrapper.parse(watchId, id, parser, this, transformRegistry, clock, licenseService); actions.add(action); diff --git a/src/main/java/org/elasticsearch/watcher/actions/ActionStatus.java b/src/main/java/org/elasticsearch/watcher/actions/ActionStatus.java index 4c71d658a2e..e3dbb91de83 100644 --- a/src/main/java/org/elasticsearch/watcher/actions/ActionStatus.java +++ b/src/main/java/org/elasticsearch/watcher/actions/ActionStatus.java @@ -505,7 +505,7 @@ public class ActionStatus implements ToXContent { } interface Field { - ParseField ACK_STATUS = new ParseField("ack_status"); + ParseField ACK_STATUS = new ParseField("ack"); ParseField ACK_STATUS_STATE = new ParseField("state"); ParseField LAST_EXECUTION = new ParseField("last_execution"); diff --git a/src/main/java/org/elasticsearch/watcher/actions/ActionWrapper.java b/src/main/java/org/elasticsearch/watcher/actions/ActionWrapper.java index 291f0bd2fea..a16c6b7360e 100644 --- a/src/main/java/org/elasticsearch/watcher/actions/ActionWrapper.java +++ b/src/main/java/org/elasticsearch/watcher/actions/ActionWrapper.java @@ -15,13 +15,13 @@ import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.json.JsonXContent; +import org.elasticsearch.watcher.actions.throttler.ActionThrottler; +import org.elasticsearch.watcher.actions.throttler.Throttler; import org.elasticsearch.watcher.execution.WatchExecutionContext; import org.elasticsearch.watcher.execution.Wid; import org.elasticsearch.watcher.license.LicenseService; import org.elasticsearch.watcher.support.WatcherDateTimeUtils; import org.elasticsearch.watcher.support.clock.Clock; -import org.elasticsearch.watcher.actions.throttler.ActionThrottler; -import org.elasticsearch.watcher.actions.throttler.Throttler; import org.elasticsearch.watcher.transform.ExecutableTransform; import org.elasticsearch.watcher.transform.Transform; import org.elasticsearch.watcher.transform.TransformRegistry; diff --git a/src/main/java/org/elasticsearch/watcher/client/WatcherClient.java b/src/main/java/org/elasticsearch/watcher/client/WatcherClient.java index df3ea49052b..9a45e0a828e 100644 --- a/src/main/java/org/elasticsearch/watcher/client/WatcherClient.java +++ b/src/main/java/org/elasticsearch/watcher/client/WatcherClient.java @@ -205,15 +205,6 @@ public class WatcherClient { return new AckWatchRequestBuilder(client, id); } - /** - * Creates a request builder that acks an watch - * - * @return The request builder - */ - public AckWatchRequestBuilder prepareAckWatch() { - return new AckWatchRequestBuilder(client); - } - /** * Ack a watch * diff --git a/src/main/java/org/elasticsearch/watcher/execution/ActionExecutionMode.java b/src/main/java/org/elasticsearch/watcher/execution/ActionExecutionMode.java index 0ac2480d10e..b2b09462ba9 100644 --- a/src/main/java/org/elasticsearch/watcher/execution/ActionExecutionMode.java +++ b/src/main/java/org/elasticsearch/watcher/execution/ActionExecutionMode.java @@ -17,38 +17,50 @@ public enum ActionExecutionMode { /** * The action will be simulated (not actually executed) and it will be throttled if needed. */ - SIMULATE((byte) 1), + SIMULATE((byte) 1, false, true), /** * The action will be simulated (not actually executed) and it will not be throttled. */ - FORCE_SIMULATE((byte) 2), + FORCE_SIMULATE((byte) 2, true, true), /** * The action will be executed and it will be throttled if needed. */ - EXECUTE((byte) 3), + EXECUTE((byte) 3, false, false), /** * The action will be executed and it will not be throttled. */ - FORCE_EXECUTE((byte) 4), + FORCE_EXECUTE((byte) 4, true, false), /** * The action will be skipped (it won't be executed nor simulated) - effectively it will be forcefully throttled */ - SKIP((byte) 5); + SKIP((byte) 5, false, false); private final byte id; + private final boolean force; + private final boolean simulate; - ActionExecutionMode(byte id) { + ActionExecutionMode(byte id, boolean froce, boolean simulate) { this.id = id; + this.force = froce; + this.simulate = simulate; } public final byte id() { return id; } + public final boolean simulate() { + return simulate; + } + + public final boolean force() { + return force; + } + public static ActionExecutionMode resolve(byte id) { switch (id) { case 1: return SIMULATE; diff --git a/src/main/java/org/elasticsearch/watcher/execution/ManualExecutionContext.java b/src/main/java/org/elasticsearch/watcher/execution/ManualExecutionContext.java index 54b92f6047d..821463c9627 100644 --- a/src/main/java/org/elasticsearch/watcher/execution/ManualExecutionContext.java +++ b/src/main/java/org/elasticsearch/watcher/execution/ManualExecutionContext.java @@ -60,21 +60,19 @@ public class ManualExecutionContext extends WatchExecutionContext { @Override public final boolean simulateAction(String actionId) { ActionExecutionMode mode = actionModes.get(Builder.ALL); - if (mode == ActionExecutionMode.SIMULATE || mode == ActionExecutionMode.FORCE_SIMULATE) { - return true; + if (mode == null) { + mode = actionModes.get(actionId); } - mode = actionModes.get(actionId); - return mode == ActionExecutionMode.SIMULATE || mode == ActionExecutionMode.FORCE_SIMULATE; + return mode != null && mode.simulate(); } @Override public boolean skipThrottling(String actionId) { ActionExecutionMode mode = actionModes.get(Builder.ALL); - if (mode == ActionExecutionMode.FORCE_EXECUTE || mode == ActionExecutionMode.FORCE_SIMULATE) { - return true; + if (mode == null) { + mode = actionModes.get(actionId); } - mode = actionModes.get(actionId); - return mode == ActionExecutionMode.FORCE_EXECUTE || mode == ActionExecutionMode.FORCE_SIMULATE; + return mode != null && mode.force(); } @Override diff --git a/src/main/java/org/elasticsearch/watcher/rest/action/RestAckWatchAction.java b/src/main/java/org/elasticsearch/watcher/rest/action/RestAckWatchAction.java index cf609b1c74a..e7a5717c094 100644 --- a/src/main/java/org/elasticsearch/watcher/rest/action/RestAckWatchAction.java +++ b/src/main/java/org/elasticsearch/watcher/rest/action/RestAckWatchAction.java @@ -28,11 +28,17 @@ public class RestAckWatchAction extends WatcherRestHandler { super(settings, controller, client); controller.registerHandler(RestRequest.Method.PUT, URI_BASE + "/watch/{id}/_ack", this); controller.registerHandler(RestRequest.Method.POST, URI_BASE + "/watch/{id}/_ack", this); + controller.registerHandler(RestRequest.Method.PUT, URI_BASE + "/watch/{id}/{actions}/_ack", this); + controller.registerHandler(RestRequest.Method.POST, URI_BASE + "/watch/{id}/{actions}/_ack", this); } @Override protected void handleRequest(RestRequest request, RestChannel restChannel, WatcherClient client) throws Exception { AckWatchRequest ackWatchRequest = new AckWatchRequest(request.param("id")); + String[] actions = request.paramAsStringArray("actions", null); + if (actions != null) { + ackWatchRequest.setActionIds(actions); + } ackWatchRequest.masterNodeTimeout(request.paramAsTime("master_timeout", ackWatchRequest.masterNodeTimeout())); client.ackWatch(ackWatchRequest, new RestBuilderListener(restChannel) { @Override diff --git a/src/main/java/org/elasticsearch/watcher/support/Validation.java b/src/main/java/org/elasticsearch/watcher/support/Validation.java new file mode 100644 index 00000000000..d50ecd6504e --- /dev/null +++ b/src/main/java/org/elasticsearch/watcher/support/Validation.java @@ -0,0 +1,50 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.watcher.support; + +import org.elasticsearch.common.logging.support.LoggerMessageFormat; + +import java.util.regex.Pattern; + +/** + * + */ +public class Validation { + + private static final Pattern NO_WS_PATTERN = Pattern.compile("\\S+"); + + public static Error watchId(String id) { + if (!NO_WS_PATTERN.matcher(id).matches()) { + return new Error("Invalid watch id [{}]. Watch id cannot have white spaces", id); + } + return null; + } + + public static Error actionId(String id) { + if (!NO_WS_PATTERN.matcher(id).matches()) { + return new Error("Invalid action id [{}]. Action id cannot have white spaces", id); + } + return null; + } + + public static class Error { + + private final String message; + + public Error(String message, Object... args) { + this.message = LoggerMessageFormat.format(message, args); + } + + public String message() { + return message; + } + + @Override + public String toString() { + return message; + } + } +} diff --git a/src/main/java/org/elasticsearch/watcher/transport/actions/ack/AckWatchRequest.java b/src/main/java/org/elasticsearch/watcher/transport/actions/ack/AckWatchRequest.java index 32ffa1a2ebf..28fb054b9ab 100644 --- a/src/main/java/org/elasticsearch/watcher/transport/actions/ack/AckWatchRequest.java +++ b/src/main/java/org/elasticsearch/watcher/transport/actions/ack/AckWatchRequest.java @@ -8,9 +8,11 @@ package org.elasticsearch.watcher.transport.actions.ack; import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.ValidateActions; import org.elasticsearch.action.support.master.MasterNodeOperationRequest; +import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.watcher.support.Validation; import java.io.IOException; @@ -21,54 +23,86 @@ public class AckWatchRequest extends MasterNodeOperationRequest private static final TimeValue DEFAULT_TIMEOUT = TimeValue.timeValueSeconds(10); - private String id; + private String watchId; + private String[] actionIds = Strings.EMPTY_ARRAY; public AckWatchRequest() { this(null); } - public AckWatchRequest(String id) { - this.id = id; + public AckWatchRequest(String watchId, String... actionIds) { + this.watchId = watchId; + this.actionIds = actionIds; masterNodeTimeout(DEFAULT_TIMEOUT); } /** - * @return The name of the watch to be acked + * @return The id of the watch to be acked */ - public String getId() { - return id; + public String getWatchId() { + return watchId; } /** - * Sets the name of the watch to be acked + * @param actionIds The ids of the actions to be acked */ - public void setId(String id) { - this.id = id; + public void setActionIds(String... actionIds) { + this.actionIds = actionIds; + } + + /** + * @return The ids of the actions to be acked + */ + public String[] getActionIds() { + return actionIds; } @Override public ActionRequestValidationException validate() { ActionRequestValidationException validationException = null; - if (id == null){ + if (watchId == null){ validationException = ValidateActions.addValidationError("watch id is missing", validationException); } + Validation.Error error = Validation.watchId(watchId); + if (error != null) { + validationException = ValidateActions.addValidationError(error.message(), validationException); + } + for (String actionId : actionIds) { + error = Validation.actionId(actionId); + if (error != null) { + validationException = ValidateActions.addValidationError(error.message(), validationException); + } + } return validationException; } @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); - id = in.readString(); + watchId = in.readString(); + actionIds = in.readStringArray(); } @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); - out.writeString(id); + out.writeString(watchId); + out.writeStringArray(actionIds); } @Override public String toString() { - return "ack [" + id + "]"; + StringBuilder sb = new StringBuilder("ack [").append(watchId).append("]"); + if (actionIds.length > 0) { + sb.append("["); + for (int i = 0; i < actionIds.length; i++) { + if (i > 0) { + sb.append(", "); + } + sb.append(actionIds[i]); + } + sb.append("]"); + } + return sb.toString(); } } diff --git a/src/main/java/org/elasticsearch/watcher/transport/actions/ack/AckWatchRequestBuilder.java b/src/main/java/org/elasticsearch/watcher/transport/actions/ack/AckWatchRequestBuilder.java index 0ff054669d7..b4506338f51 100644 --- a/src/main/java/org/elasticsearch/watcher/transport/actions/ack/AckWatchRequestBuilder.java +++ b/src/main/java/org/elasticsearch/watcher/transport/actions/ack/AckWatchRequestBuilder.java @@ -23,11 +23,8 @@ public class AckWatchRequestBuilder extends MasterNodeOperationRequestBuilder listener) throws ElasticsearchException { try { - AckWatchResponse response = new AckWatchResponse(watcherService.ackWatch(request.getId(), request.masterNodeTimeout())); + WatchStatus watchStatus = watcherService.ackWatch(request.getWatchId(), request.getActionIds(), request.masterNodeTimeout()); + AckWatchResponse response = new AckWatchResponse(watchStatus); listener.onResponse(response); } catch (Exception e) { listener.onFailure(e); diff --git a/src/main/java/org/elasticsearch/watcher/transport/actions/delete/DeleteWatchRequest.java b/src/main/java/org/elasticsearch/watcher/transport/actions/delete/DeleteWatchRequest.java index f9336dddc16..23b59546657 100644 --- a/src/main/java/org/elasticsearch/watcher/transport/actions/delete/DeleteWatchRequest.java +++ b/src/main/java/org/elasticsearch/watcher/transport/actions/delete/DeleteWatchRequest.java @@ -12,6 +12,7 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.lucene.uid.Versions; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.watcher.support.Validation; import java.io.IOException; @@ -82,6 +83,10 @@ public class DeleteWatchRequest extends MasterNodeOperationRequest>>>>>> Action level throttling */ public Map getActionModes() { return actionModes; @@ -175,6 +173,16 @@ public class ExecuteWatchRequest extends MasterNodeOperationRequest modes : actionModes.entrySet()) { + error = Validation.actionId(modes.getKey()); + if (error != null) { + validationException = ValidateActions.addValidationError(error.message(), validationException); + } + } if (triggerSource == null || triggerType == null) { validationException = ValidateActions.addValidationError("trigger event is missing", validationException); } diff --git a/src/main/java/org/elasticsearch/watcher/transport/actions/get/GetWatchRequest.java b/src/main/java/org/elasticsearch/watcher/transport/actions/get/GetWatchRequest.java index bb864540bd8..aa0ce25204d 100644 --- a/src/main/java/org/elasticsearch/watcher/transport/actions/get/GetWatchRequest.java +++ b/src/main/java/org/elasticsearch/watcher/transport/actions/get/GetWatchRequest.java @@ -12,6 +12,7 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.lucene.uid.Versions; import org.elasticsearch.index.VersionType; +import org.elasticsearch.watcher.support.Validation; import java.io.IOException; @@ -46,6 +47,10 @@ public class GetWatchRequest extends MasterNodeOperationRequest if (id == null) { validationException = ValidateActions.addValidationError("id is missing", validationException); } + Validation.Error error = Validation.watchId(id); + if (error != null) { + validationException = ValidateActions.addValidationError(error.message(), validationException); + } return validationException; } diff --git a/src/main/java/org/elasticsearch/watcher/transport/actions/put/PutWatchRequest.java b/src/main/java/org/elasticsearch/watcher/transport/actions/put/PutWatchRequest.java index 7f7773d696f..4ef589602b4 100644 --- a/src/main/java/org/elasticsearch/watcher/transport/actions/put/PutWatchRequest.java +++ b/src/main/java/org/elasticsearch/watcher/transport/actions/put/PutWatchRequest.java @@ -15,6 +15,7 @@ import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.watcher.support.Validation; import java.io.IOException; @@ -83,6 +84,10 @@ public class PutWatchRequest extends MasterNodeOperationRequest if (id == null) { validationException = ValidateActions.addValidationError("watch name is missing", validationException); } + Validation.Error error = Validation.watchId(id); + if (error != null) { + validationException = ValidateActions.addValidationError(error.message(), validationException); + } if (source == null) { validationException = ValidateActions.addValidationError("watch source is missing", validationException); } diff --git a/src/main/java/org/elasticsearch/watcher/watch/Watch.java b/src/main/java/org/elasticsearch/watcher/watch/Watch.java index 2de9d556268..96000aadcbf 100644 --- a/src/main/java/org/elasticsearch/watcher/watch/Watch.java +++ b/src/main/java/org/elasticsearch/watcher/watch/Watch.java @@ -51,6 +51,8 @@ import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; public class Watch implements TriggerEngine.Job, ToXContent { + public static final String ALL_ACTIONS_ID = "_all"; + private final String id; private final Trigger trigger; private final ExecutableInput input; @@ -119,6 +121,7 @@ public class Watch implements TriggerEngine.Job, ToXContent { public void version(long version) { this.version = version; } + /** * Acks this watch. * diff --git a/src/main/java/org/elasticsearch/watcher/watch/WatchStatus.java b/src/main/java/org/elasticsearch/watcher/watch/WatchStatus.java index de490c5d0bd..80cc09a93dd 100644 --- a/src/main/java/org/elasticsearch/watcher/watch/WatchStatus.java +++ b/src/main/java/org/elasticsearch/watcher/watch/WatchStatus.java @@ -159,7 +159,7 @@ public class WatchStatus implements ToXContent, Streamable { */ boolean onAck(DateTime timestamp, String... actionIds) { boolean changed = false; - if (ArrayUtils.contains(actionIds, "_all")) { + if (ArrayUtils.contains(actionIds, Watch.ALL_ACTIONS_ID)) { for (ActionStatus status : actions.values()) { changed |= status.onAck(timestamp); } @@ -277,6 +277,4 @@ public class WatchStatus implements ToXContent, Streamable { ParseField ACTIONS = new ParseField("actions"); } - - } diff --git a/src/test/java/org/elasticsearch/watcher/license/LicenseIntegrationTests.java b/src/test/java/org/elasticsearch/watcher/license/LicenseIntegrationTests.java index 29b0441d97d..2eaf8e030d3 100644 --- a/src/test/java/org/elasticsearch/watcher/license/LicenseIntegrationTests.java +++ b/src/test/java/org/elasticsearch/watcher/license/LicenseIntegrationTests.java @@ -246,7 +246,7 @@ public class LicenseIntegrationTests extends AbstractWatcherIntegrationTests { @Override public void run() { XContentSource source = watcherClient().prepareGetWatch(watchName).get().getSource(); - assertThat(source.getValue("status.actions._index.ack_status.state"), is((Object) "ackable")); + assertThat(source.getValue("status.actions._index.ack.state"), is((Object) "ackable")); } }); diff --git a/src/test/java/org/elasticsearch/watcher/test/integration/WatchThrottleTests.java b/src/test/java/org/elasticsearch/watcher/test/integration/WatchAckTests.java similarity index 56% rename from src/test/java/org/elasticsearch/watcher/test/integration/WatchThrottleTests.java rename to src/test/java/org/elasticsearch/watcher/test/integration/WatchAckTests.java index cb5cbc7ac57..8bbd5d77ec1 100644 --- a/src/test/java/org/elasticsearch/watcher/test/integration/WatchThrottleTests.java +++ b/src/test/java/org/elasticsearch/watcher/test/integration/WatchAckTests.java @@ -6,17 +6,18 @@ package org.elasticsearch.watcher.test.integration; import com.carrotsearch.randomizedtesting.annotations.Repeat; +import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.delete.DeleteResponse; import org.elasticsearch.action.get.GetRequest; import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.index.IndexResponse; -import org.elasticsearch.common.joda.time.DateTime; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.watcher.actions.ActionStatus; import org.elasticsearch.watcher.client.WatcherClient; import org.elasticsearch.watcher.history.HistoryStore; import org.elasticsearch.watcher.history.WatchRecord; import org.elasticsearch.watcher.test.AbstractWatcherIntegrationTests; +import org.elasticsearch.watcher.transport.actions.ack.AckWatchRequestBuilder; import org.elasticsearch.watcher.transport.actions.ack.AckWatchResponse; import org.elasticsearch.watcher.transport.actions.get.GetWatchRequest; import org.elasticsearch.watcher.transport.actions.get.GetWatchResponse; @@ -28,7 +29,6 @@ import org.junit.Test; import java.util.concurrent.TimeUnit; -import static org.elasticsearch.common.joda.time.DateTimeZone.UTC; import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery; import static org.elasticsearch.index.query.QueryBuilders.matchQuery; import static org.elasticsearch.watcher.actions.ActionBuilders.indexAction; @@ -39,79 +39,14 @@ import static org.elasticsearch.watcher.test.WatcherTestUtils.matchAllRequest; import static org.elasticsearch.watcher.transform.TransformBuilders.searchTransform; import static org.elasticsearch.watcher.trigger.TriggerBuilders.schedule; import static org.elasticsearch.watcher.trigger.schedule.Schedules.cron; -import static org.elasticsearch.watcher.trigger.schedule.Schedules.interval; import static org.hamcrest.Matchers.*; import static org.hamcrest.core.IsEqual.equalTo; /** */ -public class WatchThrottleTests extends AbstractWatcherIntegrationTests { +public class WatchAckTests extends AbstractWatcherIntegrationTests { - @Test - public void test_AckThrottle() throws Exception { - WatcherClient watcherClient = watcherClient(); - IndexResponse eventIndexResponse = indexTestDoc(); - - PutWatchResponse putWatchResponse = watcherClient.preparePutWatch() - .setId("_name") - .setSource(watchBuilder() - .trigger(schedule(cron("0/5 * * * * ? *"))) - .input(searchInput(matchAllRequest().indices("events"))) - .condition(scriptCondition("ctx.payload.hits.total > 0")) - .transform(searchTransform(matchAllRequest().indices("events"))) - .addAction("_id", indexAction("actions", "action")) - .defaultThrottlePeriod(new TimeValue(0, TimeUnit.SECONDS))) - .get(); - - assertThat(putWatchResponse.isCreated(), is(true)); - - if (timeWarped()) { - timeWarp().scheduler().trigger("_name", 4, TimeValue.timeValueSeconds(5)); - } else { - Thread.sleep(20000); - } - AckWatchResponse ackResponse = watcherClient.prepareAckWatch("_name").get(); - assertThat(ackResponse.getStatus().actionStatus("_id").ackStatus().state(), is(ActionStatus.AckStatus.State.ACKED)); - - refresh(); - long countAfterAck = docCount("actions", "action", matchAllQuery()); - assertThat(countAfterAck, greaterThanOrEqualTo((long) 1)); - - if (timeWarped()) { - timeWarp().scheduler().trigger("_name", 4, TimeValue.timeValueSeconds(5)); - } else { - Thread.sleep(20000); - } - refresh(); - - // There shouldn't be more actions in the index after we ack the watch, even though the watch was triggered - long countAfterPostAckFires = docCount("actions", "action", matchAllQuery()); - assertThat(countAfterPostAckFires, equalTo(countAfterAck)); - - // Now delete the event and the ack state should change to AWAITS_EXECUTION - DeleteResponse response = client().prepareDelete("events", "event", eventIndexResponse.getId()).get(); - assertThat(response.isFound(), is(true)); - refresh(); - - if (timeWarped()) { - timeWarp().scheduler().trigger("_name", 4, TimeValue.timeValueSeconds(5)); - } else { - Thread.sleep(20000); - } - - GetWatchResponse getWatchResponse = watcherClient.prepareGetWatch("_name").get(); - assertThat(getWatchResponse.isFound(), is(true)); - - Watch parsedWatch = watchParser().parse(getWatchResponse.getId(), true, getWatchResponse.getSource().getBytes()); - assertThat(parsedWatch.status().actionStatus("_id").ackStatus().state(), is(ActionStatus.AckStatus.State.AWAITS_SUCCESSFUL_EXECUTION)); - - long throttledCount = docCount(HistoryStore.INDEX_PREFIX + "*", null, - matchQuery(WatchRecord.Field.STATE.getPreferredName(), WatchRecord.State.THROTTLED.id())); - assertThat(throttledCount, greaterThan(0L)); - } - - - public IndexResponse indexTestDoc() { + private IndexResponse indexTestDoc() { createIndex("actions", "events"); ensureGreen("actions", "events"); @@ -123,87 +58,163 @@ public class WatchThrottleTests extends AbstractWatcherIntegrationTests { return eventIndexResponse; } - @Test - @Repeat(iterations = 10) - public void testTimeThrottle() throws Exception { + public void testAck_SingleAction() throws Exception { WatcherClient watcherClient = watcherClient(); - indexTestDoc(); + IndexResponse eventIndexResponse = indexTestDoc(); PutWatchResponse putWatchResponse = watcherClient.preparePutWatch() - .setId("_name") + .setId("_id") .setSource(watchBuilder() - .trigger(schedule(interval("5s"))) + .trigger(schedule(cron("0/5 * * * * ? *"))) .input(searchInput(matchAllRequest().indices("events"))) .condition(scriptCondition("ctx.payload.hits.total > 0")) .transform(searchTransform(matchAllRequest().indices("events"))) - .addAction("_id", indexAction("actions", "action")) - .defaultThrottlePeriod(TimeValue.timeValueSeconds(30))) + .addAction("_a1", indexAction("actions", "action1")) + .addAction("_a2", indexAction("actions", "action2")) + .defaultThrottlePeriod(new TimeValue(0, TimeUnit.SECONDS))) .get(); + assertThat(putWatchResponse.isCreated(), is(true)); if (timeWarped()) { - timeWarp().clock().setTime(DateTime.now(UTC)); - - timeWarp().scheduler().trigger("_name"); - refresh(); - - // the first fire should work - long actionsCount = docCount("actions", "action", matchAllQuery()); - assertThat(actionsCount, is(1L)); - - timeWarp().clock().fastForwardSeconds(5); - timeWarp().scheduler().trigger("_name"); - refresh(); - - // the last fire should have been throttled, so number of actions shouldn't change - actionsCount = docCount("actions", "action", matchAllQuery()); - assertThat(actionsCount, is(1L)); - - timeWarp().clock().fastForwardSeconds(30); - timeWarp().scheduler().trigger("_name"); - refresh(); - - // the last fire occurred passed the throttle period, so a new action should have been added - actionsCount = docCount("actions", "action", matchAllQuery()); - assertThat(actionsCount, is(2L)); - - long throttledCount = docCount(HistoryStore.INDEX_PREFIX + "*", null, - matchQuery(WatchRecord.Field.STATE.getPreferredName(), WatchRecord.State.THROTTLED.id())); - assertThat(throttledCount, is(1L)); - + timeWarp().scheduler().trigger("_id", 4, TimeValue.timeValueSeconds(5)); } else { - Thread.sleep(TimeUnit.SECONDS.toMillis(5)); - // the first fire should work so we should have a single action in the actions index - assertBusy(new Runnable() { - @Override - public void run() { - refresh(); - long actionsCount = docCount("actions", "action", matchAllQuery()); - assertThat(actionsCount, is(1L)); - } - }, 5, TimeUnit.SECONDS); - Thread.sleep(TimeUnit.SECONDS.toMillis(5)); - // we should still be within the throttling period... so the number of actions shouldn't change - assertBusy(new Runnable() { - @Override - public void run() { - refresh(); - long actionsCount = docCount("actions", "action", matchAllQuery()); - assertThat(actionsCount, is(1L)); - - long throttledCount = docCount(HistoryStore.INDEX_PREFIX + "*", null, - matchQuery(WatchRecord.Field.STATE.getPreferredName(), WatchRecord.State.THROTTLED.id())); - assertThat(throttledCount, greaterThanOrEqualTo(1L)); - } - }, 5, TimeUnit.SECONDS); + Thread.sleep(20000); } + AckWatchResponse ackResponse = watcherClient.prepareAckWatch("_id").setActionIds("_a1").get(); + assertThat(ackResponse.getStatus().actionStatus("_a1").ackStatus().state(), is(ActionStatus.AckStatus.State.ACKED)); + assertThat(ackResponse.getStatus().actionStatus("_a2").ackStatus().state(), is(ActionStatus.AckStatus.State.ACKABLE)); + + refresh(); + long a1CountAfterAck = docCount("actions", "action1", matchAllQuery()); + long a2CountAfterAck = docCount("actions", "action2", matchAllQuery()); + assertThat(a1CountAfterAck, greaterThanOrEqualTo((long) 1)); + assertThat(a2CountAfterAck, greaterThanOrEqualTo((long) 1)); + + if (timeWarped()) { + timeWarp().scheduler().trigger("_id", 4, TimeValue.timeValueSeconds(5)); + } else { + Thread.sleep(20000); + } + flush(); + refresh(); + + // There shouldn't be more a1 actions in the index after we ack the watch, even though the watch was triggered + long a1CountAfterPostAckFires = docCount("actions", "action1", matchAllQuery()); + assertThat(a1CountAfterPostAckFires, equalTo(a1CountAfterAck)); + + // There should be more a2 actions in the index after we ack the watch + long a2CountAfterPostAckFires = docCount("actions", "action2", matchAllQuery()); + assertThat(a2CountAfterPostAckFires, greaterThan(a2CountAfterAck)); + + // Now delete the event and the ack states should change to AWAITS_EXECUTION + DeleteResponse response = client().prepareDelete("events", "event", eventIndexResponse.getId()).get(); + assertThat(response.isFound(), is(true)); + refresh(); + + if (timeWarped()) { + timeWarp().scheduler().trigger("_id", 4, TimeValue.timeValueSeconds(5)); + } else { + Thread.sleep(20000); + } + + GetWatchResponse getWatchResponse = watcherClient.prepareGetWatch("_id").get(); + assertThat(getWatchResponse.isFound(), is(true)); + + Watch parsedWatch = watchParser().parse(getWatchResponse.getId(), true, getWatchResponse.getSource().getBytes()); + assertThat(parsedWatch.status().actionStatus("_a1").ackStatus().state(), is(ActionStatus.AckStatus.State.AWAITS_SUCCESSFUL_EXECUTION)); + assertThat(parsedWatch.status().actionStatus("_a2").ackStatus().state(), is(ActionStatus.AckStatus.State.AWAITS_SUCCESSFUL_EXECUTION)); + + long throttledCount = docCount(HistoryStore.INDEX_PREFIX + "*", null, + matchQuery(WatchRecord.Field.STATE.getPreferredName(), WatchRecord.State.THROTTLED.id())); + assertThat(throttledCount, greaterThan(0L)); } + @Test @Repeat(iterations = 5) + public void testAck_AllActions() throws Exception { + WatcherClient watcherClient = watcherClient(); + IndexResponse eventIndexResponse = indexTestDoc(); + + PutWatchResponse putWatchResponse = watcherClient.preparePutWatch() + .setId("_id") + .setSource(watchBuilder() + .trigger(schedule(cron("0/5 * * * * ? *"))) + .input(searchInput(matchAllRequest().indices("events"))) + .condition(scriptCondition("ctx.payload.hits.total > 0")) + .transform(searchTransform(matchAllRequest().indices("events"))) + .addAction("_a1", indexAction("actions", "action1")) + .addAction("_a2", indexAction("actions", "action2")) + .defaultThrottlePeriod(new TimeValue(0, TimeUnit.SECONDS))) + .get(); + + assertThat(putWatchResponse.isCreated(), is(true)); + + if (timeWarped()) { + timeWarp().scheduler().trigger("_id", 4, TimeValue.timeValueSeconds(5)); + } else { + Thread.sleep(20000); + } + + AckWatchRequestBuilder ackWatchRequestBuilder = watcherClient.prepareAckWatch("_id"); + if (randomBoolean()) { + ackWatchRequestBuilder.setActionIds("_all"); + } else if (randomBoolean()) { + ackWatchRequestBuilder.setActionIds("_all", "a1"); + } + AckWatchResponse ackResponse = ackWatchRequestBuilder.get(); + + assertThat(ackResponse.getStatus().actionStatus("_a1").ackStatus().state(), is(ActionStatus.AckStatus.State.ACKED)); + assertThat(ackResponse.getStatus().actionStatus("_a2").ackStatus().state(), is(ActionStatus.AckStatus.State.ACKED)); + + refresh(); + long a1CountAfterAck = docCount("actions", "action1", matchAllQuery()); + long a2CountAfterAck = docCount("actions", "action2", matchAllQuery()); + assertThat(a1CountAfterAck, greaterThanOrEqualTo((long) 1)); + assertThat(a2CountAfterAck, greaterThanOrEqualTo((long) 1)); + + if (timeWarped()) { + timeWarp().scheduler().trigger("_id", 4, TimeValue.timeValueSeconds(5)); + } else { + Thread.sleep(20000); + } + flush(); + refresh(); + + // There shouldn't be more a1 actions in the index after we ack the watch, even though the watch was triggered + long a1CountAfterPostAckFires = docCount("actions", "action1", matchAllQuery()); + assertThat(a1CountAfterPostAckFires, equalTo(a1CountAfterAck)); + + // There shouldn't be more a2 actions in the index after we ack the watch, even though the watch was triggered + long a2CountAfterPostAckFires = docCount("actions", "action2", matchAllQuery()); + assertThat(a2CountAfterPostAckFires, equalTo(a2CountAfterAck)); + + // Now delete the event and the ack states should change to AWAITS_EXECUTION + DeleteResponse response = client().prepareDelete("events", "event", eventIndexResponse.getId()).get(); + assertThat(response.isFound(), is(true)); + refresh(); + + if (timeWarped()) { + timeWarp().scheduler().trigger("_id", 4, TimeValue.timeValueSeconds(5)); + } else { + Thread.sleep(20000); + } + + GetWatchResponse getWatchResponse = watcherClient.prepareGetWatch("_id").get(); + assertThat(getWatchResponse.isFound(), is(true)); + + Watch parsedWatch = watchParser().parse(getWatchResponse.getId(), true, getWatchResponse.getSource().getBytes()); + assertThat(parsedWatch.status().actionStatus("_a1").ackStatus().state(), is(ActionStatus.AckStatus.State.AWAITS_SUCCESSFUL_EXECUTION)); + assertThat(parsedWatch.status().actionStatus("_a2").ackStatus().state(), is(ActionStatus.AckStatus.State.AWAITS_SUCCESSFUL_EXECUTION)); + + long throttledCount = docCount(HistoryStore.INDEX_PREFIX + "*", null, + matchQuery(WatchRecord.Field.STATE.getPreferredName(), WatchRecord.State.THROTTLED.id())); + assertThat(throttledCount, greaterThan(0L)); + } @Test @Repeat(iterations = 2) - public void test_ack_with_restart() throws Exception { + public void testAck_WithRestart() throws Exception { WatcherClient watcherClient = watcherClient(); indexTestDoc(); PutWatchResponse putWatchResponse = watcherClient.preparePutWatch() @@ -266,53 +277,15 @@ public class WatchThrottleTests extends AbstractWatcherIntegrationTests { assertThat(countAfterPostAckFires, equalTo(countAfterAck)); } - - @Test @Repeat(iterations = 10) - public void test_default_TimeThrottle() throws Exception { + @Test(expected = ActionRequestValidationException.class) + public void testAck_InvalidWatchId() throws Exception { WatcherClient watcherClient = watcherClient(); - indexTestDoc(); - - PutWatchResponse putWatchResponse = watcherClient.preparePutWatch() - .setId("_name") - .setSource(watchBuilder() - .trigger(schedule(interval("1s"))) - .input(searchInput(matchAllRequest().indices("events"))) - .condition(scriptCondition("ctx.payload.hits.total > 0")) - .transform(searchTransform(matchAllRequest().indices("events"))) - .addAction("_id", indexAction("actions", "action"))) - .get(); - assertThat(putWatchResponse.isCreated(), is(true)); - - if (timeWarped()) { - timeWarp().clock().setTime(DateTime.now(UTC)); - - timeWarp().scheduler().trigger("_name"); - refresh(); - - // the first trigger should work - long actionsCount = docCount("actions", "action", matchAllQuery()); - assertThat(actionsCount, is(1L)); - - timeWarp().clock().fastForwardSeconds(2); - timeWarp().scheduler().trigger("_name"); - refresh(); - - // the last fire should have been throttled, so number of actions shouldn't change - actionsCount = docCount("actions", "action", matchAllQuery()); - assertThat(actionsCount, is(1L)); - - timeWarp().clock().fastForwardSeconds(10); - timeWarp().scheduler().trigger("_name"); - refresh(); - - // the last fire occurred passed the throttle period, so a new action should have been added - actionsCount = docCount("actions", "action", matchAllQuery()); - assertThat(actionsCount, is(2L)); - - long throttledCount = docCount(HistoryStore.INDEX_PREFIX + "*", null, - matchQuery(WatchRecord.Field.STATE.getPreferredName(), WatchRecord.State.THROTTLED.id())); - assertThat(throttledCount, is(1L)); - } + watcherClient.prepareAckWatch("id with whitespaces").get(); } + @Test(expected = ActionRequestValidationException.class) + public void testAck_InvalidActionId() throws Exception { + WatcherClient watcherClient = watcherClient(); + watcherClient.prepareAckWatch("_id").setActionIds("id with whitespaces").get(); + } } diff --git a/src/test/java/org/elasticsearch/watcher/test/integration/WatchCrudTests.java b/src/test/java/org/elasticsearch/watcher/test/integration/WatchCrudTests.java index d58690969d5..9ee6f51b974 100644 --- a/src/test/java/org/elasticsearch/watcher/test/integration/WatchCrudTests.java +++ b/src/test/java/org/elasticsearch/watcher/test/integration/WatchCrudTests.java @@ -6,6 +6,8 @@ package org.elasticsearch.watcher.test.integration; import com.carrotsearch.randomizedtesting.annotations.Repeat; +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.watcher.WatcherException; import org.elasticsearch.watcher.client.WatchSourceBuilder; import org.elasticsearch.watcher.support.xcontent.XContentSource; import org.elasticsearch.watcher.test.AbstractWatcherIntegrationTests; @@ -65,6 +67,23 @@ public class WatchCrudTests extends AbstractWatcherIntegrationTests { .get(); } + @Test(expected = ActionRequestValidationException.class) + public void testPut_InvalidWatchId() throws Exception { + ensureWatcherStarted(); + watcherClient().preparePutWatch("id with whitespaces").setSource(watchBuilder() + .trigger(schedule(interval("5m")))) + .get(); + } + + @Test(expected = WatcherException.class) + public void testPut_InvalidActionId() throws Exception { + ensureWatcherStarted(); + watcherClient().preparePutWatch("_name").setSource(watchBuilder() + .trigger(schedule(interval("5m"))) + .addAction("id with whitespaces", loggingAction("{{ctx.watch_id}}"))) + .get(); + } + @Test public void testGet() throws Exception { ensureWatcherStarted(); @@ -92,6 +111,11 @@ public class WatchCrudTests extends AbstractWatcherIntegrationTests { assertThat(source, hasKey("status")); } + @Test(expected = ActionRequestValidationException.class) + public void testGet_InvalidWatchId() throws Exception { + watcherClient().prepareGetWatch("id with whitespaces").get(); + } + @Test public void testGet_NotFound() throws Exception { ensureWatcherStarted(); @@ -135,4 +159,8 @@ public class WatchCrudTests extends AbstractWatcherIntegrationTests { assertThat(response.isFound(), is(false)); } + @Test(expected = ActionRequestValidationException.class) + public void testDelete_InvalidWatchId() throws Exception { + watcherClient().deleteWatch(new DeleteWatchRequest("id with whitespaces")).actionGet(); + } } diff --git a/src/test/java/org/elasticsearch/watcher/test/integration/WatchExecuteTests.java b/src/test/java/org/elasticsearch/watcher/test/integration/WatchExecuteTests.java new file mode 100644 index 00000000000..718ba3658d3 --- /dev/null +++ b/src/test/java/org/elasticsearch/watcher/test/integration/WatchExecuteTests.java @@ -0,0 +1,38 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.watcher.test.integration; + +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.common.joda.time.DateTime; +import org.elasticsearch.common.joda.time.DateTimeZone; +import org.elasticsearch.watcher.execution.ActionExecutionMode; +import org.elasticsearch.watcher.test.AbstractWatcherIntegrationTests; +import org.elasticsearch.watcher.trigger.schedule.ScheduleTriggerEvent; +import org.junit.Test; + +/** + * + */ +public class WatchExecuteTests extends AbstractWatcherIntegrationTests { + + + @Test(expected = ActionRequestValidationException.class) + public void testExecute_InvalidWatchId() throws Exception { + DateTime now = DateTime.now(DateTimeZone.UTC); + watcherClient().prepareExecuteWatch("id with whitespaces") + .setTriggerEvent(new ScheduleTriggerEvent(now, now)) + .get(); + } + + @Test(expected = ActionRequestValidationException.class) + public void testExecute_InvalidActionId() throws Exception { + DateTime now = DateTime.now(DateTimeZone.UTC); + watcherClient().prepareExecuteWatch("_id") + .setTriggerEvent(new ScheduleTriggerEvent(now, now)) + .setActionMode("id with whitespaces", randomFrom(ActionExecutionMode.values())) + .get(); + } +} diff --git a/src/test/java/org/elasticsearch/watcher/test/integration/WatchTimeThrottleTests.java b/src/test/java/org/elasticsearch/watcher/test/integration/WatchTimeThrottleTests.java new file mode 100644 index 00000000000..2c58f183beb --- /dev/null +++ b/src/test/java/org/elasticsearch/watcher/test/integration/WatchTimeThrottleTests.java @@ -0,0 +1,176 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.watcher.test.integration; + +import com.carrotsearch.randomizedtesting.annotations.Repeat; +import org.elasticsearch.action.index.IndexResponse; +import org.elasticsearch.common.joda.time.DateTime; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.watcher.client.WatcherClient; +import org.elasticsearch.watcher.history.HistoryStore; +import org.elasticsearch.watcher.history.WatchRecord; +import org.elasticsearch.watcher.test.AbstractWatcherIntegrationTests; +import org.elasticsearch.watcher.transport.actions.put.PutWatchResponse; +import org.junit.Test; + +import java.util.concurrent.TimeUnit; + +import static org.elasticsearch.common.joda.time.DateTimeZone.UTC; +import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery; +import static org.elasticsearch.index.query.QueryBuilders.matchQuery; +import static org.elasticsearch.watcher.actions.ActionBuilders.indexAction; +import static org.elasticsearch.watcher.client.WatchSourceBuilders.watchBuilder; +import static org.elasticsearch.watcher.condition.ConditionBuilders.scriptCondition; +import static org.elasticsearch.watcher.input.InputBuilders.searchInput; +import static org.elasticsearch.watcher.test.WatcherTestUtils.matchAllRequest; +import static org.elasticsearch.watcher.transform.TransformBuilders.searchTransform; +import static org.elasticsearch.watcher.trigger.TriggerBuilders.schedule; +import static org.elasticsearch.watcher.trigger.schedule.Schedules.interval; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.is; + +/** + */ +public class WatchTimeThrottleTests extends AbstractWatcherIntegrationTests { + + private IndexResponse indexTestDoc() { + createIndex("actions", "events"); + ensureGreen("actions", "events"); + + IndexResponse eventIndexResponse = client().prepareIndex("events", "event") + .setSource("level", "error") + .get(); + assertThat(eventIndexResponse.isCreated(), is(true)); + refresh(); + return eventIndexResponse; + } + + + @Test + @Repeat(iterations = 10) + public void testTimeThrottle() throws Exception { + WatcherClient watcherClient = watcherClient(); + indexTestDoc(); + + PutWatchResponse putWatchResponse = watcherClient.preparePutWatch() + .setId("_name") + .setSource(watchBuilder() + .trigger(schedule(interval("5s"))) + .input(searchInput(matchAllRequest().indices("events"))) + .condition(scriptCondition("ctx.payload.hits.total > 0")) + .transform(searchTransform(matchAllRequest().indices("events"))) + .addAction("_id", indexAction("actions", "action")) + .defaultThrottlePeriod(TimeValue.timeValueSeconds(30))) + .get(); + assertThat(putWatchResponse.isCreated(), is(true)); + + if (timeWarped()) { + timeWarp().clock().setTime(DateTime.now(UTC)); + + timeWarp().scheduler().trigger("_name"); + refresh(); + + // the first fire should work + long actionsCount = docCount("actions", "action", matchAllQuery()); + assertThat(actionsCount, is(1L)); + + timeWarp().clock().fastForwardSeconds(5); + timeWarp().scheduler().trigger("_name"); + refresh(); + + // the last fire should have been throttled, so number of actions shouldn't change + actionsCount = docCount("actions", "action", matchAllQuery()); + assertThat(actionsCount, is(1L)); + + timeWarp().clock().fastForwardSeconds(30); + timeWarp().scheduler().trigger("_name"); + refresh(); + + // the last fire occurred passed the throttle period, so a new action should have been added + actionsCount = docCount("actions", "action", matchAllQuery()); + assertThat(actionsCount, is(2L)); + + long throttledCount = docCount(HistoryStore.INDEX_PREFIX + "*", null, + matchQuery(WatchRecord.Field.STATE.getPreferredName(), WatchRecord.State.THROTTLED.id())); + assertThat(throttledCount, is(1L)); + + } else { + Thread.sleep(TimeUnit.SECONDS.toMillis(5)); + // the first fire should work so we should have a single action in the actions index + assertBusy(new Runnable() { + @Override + public void run() { + refresh(); + long actionsCount = docCount("actions", "action", matchAllQuery()); + assertThat(actionsCount, is(1L)); + } + }, 5, TimeUnit.SECONDS); + Thread.sleep(TimeUnit.SECONDS.toMillis(5)); + // we should still be within the throttling period... so the number of actions shouldn't change + assertBusy(new Runnable() { + @Override + public void run() { + refresh(); + long actionsCount = docCount("actions", "action", matchAllQuery()); + assertThat(actionsCount, is(1L)); + + long throttledCount = docCount(HistoryStore.INDEX_PREFIX + "*", null, + matchQuery(WatchRecord.Field.STATE.getPreferredName(), WatchRecord.State.THROTTLED.id())); + assertThat(throttledCount, greaterThanOrEqualTo(1L)); + } + }, 5, TimeUnit.SECONDS); + } + } + + @Test @Repeat(iterations = 10) + public void testTimeThrottle_Defaults() throws Exception { + WatcherClient watcherClient = watcherClient(); + indexTestDoc(); + + PutWatchResponse putWatchResponse = watcherClient.preparePutWatch() + .setId("_name") + .setSource(watchBuilder() + .trigger(schedule(interval("1s"))) + .input(searchInput(matchAllRequest().indices("events"))) + .condition(scriptCondition("ctx.payload.hits.total > 0")) + .transform(searchTransform(matchAllRequest().indices("events"))) + .addAction("_id", indexAction("actions", "action"))) + .get(); + assertThat(putWatchResponse.isCreated(), is(true)); + + if (timeWarped()) { + timeWarp().clock().setTime(DateTime.now(UTC)); + + timeWarp().scheduler().trigger("_name"); + refresh(); + + // the first trigger should work + long actionsCount = docCount("actions", "action", matchAllQuery()); + assertThat(actionsCount, is(1L)); + + timeWarp().clock().fastForwardSeconds(2); + timeWarp().scheduler().trigger("_name"); + refresh(); + + // the last fire should have been throttled, so number of actions shouldn't change + actionsCount = docCount("actions", "action", matchAllQuery()); + assertThat(actionsCount, is(1L)); + + timeWarp().clock().fastForwardSeconds(10); + timeWarp().scheduler().trigger("_name"); + refresh(); + + // the last fire occurred passed the throttle period, so a new action should have been added + actionsCount = docCount("actions", "action", matchAllQuery()); + assertThat(actionsCount, is(2L)); + + long throttledCount = docCount(HistoryStore.INDEX_PREFIX + "*", null, + matchQuery(WatchRecord.Field.STATE.getPreferredName(), WatchRecord.State.THROTTLED.id())); + assertThat(throttledCount, is(1L)); + } + } + +} diff --git a/src/test/java/org/elasticsearch/watcher/watch/WatchServiceTests.java b/src/test/java/org/elasticsearch/watcher/watch/WatchServiceTests.java index fb8f596aa82..c4022f498a3 100644 --- a/src/test/java/org/elasticsearch/watcher/watch/WatchServiceTests.java +++ b/src/test/java/org/elasticsearch/watcher/watch/WatchServiceTests.java @@ -7,11 +7,11 @@ package org.elasticsearch.watcher.watch; import org.elasticsearch.action.delete.DeleteResponse; import org.elasticsearch.action.index.IndexResponse; +import org.elasticsearch.common.Strings; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.collect.ImmutableMap; import org.elasticsearch.common.joda.time.DateTime; -import org.elasticsearch.common.joda.time.DateTimeZone; import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.test.ElasticsearchTestCase; @@ -189,7 +189,7 @@ public class WatchServiceTests extends ElasticsearchTestCase { when(watch.status()).thenReturn(status); when(watchStore.get("_id")).thenReturn(watch); - WatchStatus result = watcherService.ackWatch("_id", timeout); + WatchStatus result = watcherService.ackWatch("_id", Strings.EMPTY_ARRAY, timeout); assertThat(result, not(sameInstance(status))); verify(watchStore, times(1)).updateStatus(watch); @@ -199,7 +199,7 @@ public class WatchServiceTests extends ElasticsearchTestCase { public void testAckWatch_Timeout() throws Exception { TimeValue timeout = TimeValue.timeValueSeconds(5); when(watchLockService.tryAcquire("_id", timeout)).thenReturn(null); - watcherService.ackWatch("_id", timeout); + watcherService.ackWatch("_id", Strings.EMPTY_ARRAY, timeout); } @Test @@ -214,7 +214,7 @@ public class WatchServiceTests extends ElasticsearchTestCase { when(watch.status()).thenReturn(status); when(watchStore.get("_id")).thenReturn(watch); - WatchStatus result = watcherService.ackWatch("_id", timeout); + WatchStatus result = watcherService.ackWatch("_id", Strings.EMPTY_ARRAY, timeout); assertThat(result, not(sameInstance(status))); verify(watchStore, never()).updateStatus(watch); @@ -228,7 +228,7 @@ public class WatchServiceTests extends ElasticsearchTestCase { when(watchStore.get("_id")).thenReturn(null); try { - watcherService.ackWatch("_id", timeout); + watcherService.ackWatch("_id", Strings.EMPTY_ARRAY, timeout); fail(); } catch (WatcherException e) { // expected