Add the ability to ack specific actions
- now it's possible to ack specific actions via the `Ack Watch API` - Added tests for acking specific actions - Changed the watch status structure such that the action ack state can be referred to by `status.actions.<action_id>.ack` (instead of `status.actions.<action_id>.ack_status`... removed the extra redundant "_status") - As part of this work, also added validation for watch/action ids, such that we disallow having whitespaces in them. - Updated the docs around acking & throttling of watch actions Closes elastic/elasticsearch#531 Closes elastic/elasticsearch#537 Original commit: elastic/x-pack-elasticsearch@813e601bf5
This commit is contained in:
parent
6acc3f2616
commit
fb893e774a
|
@ -3,13 +3,17 @@
|
|||
"documentation": "http://www.elastic.co/guide/en/watcher/current/appendix-api-ack-watch.html",
|
||||
"methods": [ "PUT", "POST" ],
|
||||
"url": {
|
||||
"path": "/_watcher/watch/{id}/_ack",
|
||||
"paths": [ "/_watcher/watch/{id}/_ack" ],
|
||||
"path": "/_watcher/watch/{watch_id}/_ack",
|
||||
"paths": [ "/_watcher/watch/{watch_id}/_ack", "/_watcher/watch/{watch_id}/{action_id}/_ack"],
|
||||
"parts": {
|
||||
"id": {
|
||||
"watch_id": {
|
||||
"type" : "string",
|
||||
"description" : "Watch ID",
|
||||
"required" : true
|
||||
},
|
||||
"action_id": {
|
||||
"type" : "list",
|
||||
"description" : "A comma-separated list of the action ids to be acked"
|
||||
}
|
||||
},
|
||||
"params": {
|
||||
|
|
|
@ -32,6 +32,7 @@
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
- match: { _id: "my_watch" }
|
||||
|
||||
- do:
|
||||
|
@ -40,9 +41,10 @@
|
|||
|
||||
- do:
|
||||
watcher.ack_watch:
|
||||
id: "my_watch"
|
||||
watch_id: "my_watch"
|
||||
action_id: "test_index"
|
||||
|
||||
- match: { "status.actions.test_index.ack_status.state" : "awaits_successful_execution" }
|
||||
- match: { "status.actions.test_index.ack.state" : "awaits_successful_execution" }
|
||||
|
||||
- do:
|
||||
watcher.delete_watch:
|
||||
|
|
|
@ -142,18 +142,21 @@ public class WatcherService extends AbstractComponent {
|
|||
/**
|
||||
* Acks the watch if needed
|
||||
*/
|
||||
public WatchStatus ackWatch(String id, TimeValue timeout) {
|
||||
public WatchStatus ackWatch(String id, String[] actionIds, TimeValue timeout) {
|
||||
ensureStarted();
|
||||
WatchLockService.Lock lock = watchLockService.tryAcquire(id, timeout);
|
||||
if (lock == null) {
|
||||
throw new TimeoutException("could not ack watch [{}] within [{}]... wait and try again. If this error continues to occur there is a high chance that the watch execution is stuck (either due to unresponsive external system such as an email service, or due to a bad script", id, timeout.format(PeriodType.seconds()));
|
||||
}
|
||||
if (actionIds == null || actionIds.length == 0) {
|
||||
actionIds = new String[] { Watch.ALL_ACTIONS_ID };
|
||||
}
|
||||
try {
|
||||
Watch watch = watchStore.get(id);
|
||||
if (watch == null) {
|
||||
throw new WatcherException("watch [{}] does not exist", id);
|
||||
}
|
||||
if (watch.ack(clock.now(UTC), "_all")) {
|
||||
if (watch.ack(clock.now(UTC), actionIds)) {
|
||||
try {
|
||||
watchStore.updateStatus(watch);
|
||||
} catch (IOException ioe) {
|
||||
|
|
|
@ -10,6 +10,7 @@ import org.elasticsearch.common.inject.Inject;
|
|||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
import org.elasticsearch.watcher.execution.Wid;
|
||||
import org.elasticsearch.watcher.license.LicenseService;
|
||||
import org.elasticsearch.watcher.support.Validation;
|
||||
import org.elasticsearch.watcher.support.clock.Clock;
|
||||
import org.elasticsearch.watcher.transform.TransformRegistry;
|
||||
|
||||
|
@ -51,6 +52,10 @@ public class ActionRegistry {
|
|||
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
|
||||
if (token == XContentParser.Token.FIELD_NAME) {
|
||||
id = parser.currentName();
|
||||
Validation.Error error = Validation.actionId(id);
|
||||
if (error != null) {
|
||||
throw new ActionException("could not parse action [{}] for watch [{}]. {}", id, watchId, error);
|
||||
}
|
||||
} else if (token == XContentParser.Token.START_OBJECT && id != null) {
|
||||
ActionWrapper action = ActionWrapper.parse(watchId, id, parser, this, transformRegistry, clock, licenseService);
|
||||
actions.add(action);
|
||||
|
|
|
@ -505,7 +505,7 @@ public class ActionStatus implements ToXContent {
|
|||
}
|
||||
|
||||
interface Field {
|
||||
ParseField ACK_STATUS = new ParseField("ack_status");
|
||||
ParseField ACK_STATUS = new ParseField("ack");
|
||||
ParseField ACK_STATUS_STATE = new ParseField("state");
|
||||
|
||||
ParseField LAST_EXECUTION = new ParseField("last_execution");
|
||||
|
|
|
@ -15,13 +15,13 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
|
|||
import org.elasticsearch.common.xcontent.XContentHelper;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
import org.elasticsearch.common.xcontent.json.JsonXContent;
|
||||
import org.elasticsearch.watcher.actions.throttler.ActionThrottler;
|
||||
import org.elasticsearch.watcher.actions.throttler.Throttler;
|
||||
import org.elasticsearch.watcher.execution.WatchExecutionContext;
|
||||
import org.elasticsearch.watcher.execution.Wid;
|
||||
import org.elasticsearch.watcher.license.LicenseService;
|
||||
import org.elasticsearch.watcher.support.WatcherDateTimeUtils;
|
||||
import org.elasticsearch.watcher.support.clock.Clock;
|
||||
import org.elasticsearch.watcher.actions.throttler.ActionThrottler;
|
||||
import org.elasticsearch.watcher.actions.throttler.Throttler;
|
||||
import org.elasticsearch.watcher.transform.ExecutableTransform;
|
||||
import org.elasticsearch.watcher.transform.Transform;
|
||||
import org.elasticsearch.watcher.transform.TransformRegistry;
|
||||
|
|
|
@ -205,15 +205,6 @@ public class WatcherClient {
|
|||
return new AckWatchRequestBuilder(client, id);
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a request builder that acks an watch
|
||||
*
|
||||
* @return The request builder
|
||||
*/
|
||||
public AckWatchRequestBuilder prepareAckWatch() {
|
||||
return new AckWatchRequestBuilder(client);
|
||||
}
|
||||
|
||||
/**
|
||||
* Ack a watch
|
||||
*
|
||||
|
|
|
@ -17,38 +17,50 @@ public enum ActionExecutionMode {
|
|||
/**
|
||||
* The action will be simulated (not actually executed) and it will be throttled if needed.
|
||||
*/
|
||||
SIMULATE((byte) 1),
|
||||
SIMULATE((byte) 1, false, true),
|
||||
|
||||
/**
|
||||
* The action will be simulated (not actually executed) and it will <b>not</b> be throttled.
|
||||
*/
|
||||
FORCE_SIMULATE((byte) 2),
|
||||
FORCE_SIMULATE((byte) 2, true, true),
|
||||
|
||||
/**
|
||||
* The action will be executed and it will be throttled if needed.
|
||||
*/
|
||||
EXECUTE((byte) 3),
|
||||
EXECUTE((byte) 3, false, false),
|
||||
|
||||
/**
|
||||
* The action will be executed and it will <b>not</b> be throttled.
|
||||
*/
|
||||
FORCE_EXECUTE((byte) 4),
|
||||
FORCE_EXECUTE((byte) 4, true, false),
|
||||
|
||||
/**
|
||||
* The action will be skipped (it won't be executed nor simulated) - effectively it will be forcefully throttled
|
||||
*/
|
||||
SKIP((byte) 5);
|
||||
SKIP((byte) 5, false, false);
|
||||
|
||||
private final byte id;
|
||||
private final boolean force;
|
||||
private final boolean simulate;
|
||||
|
||||
ActionExecutionMode(byte id) {
|
||||
ActionExecutionMode(byte id, boolean froce, boolean simulate) {
|
||||
this.id = id;
|
||||
this.force = froce;
|
||||
this.simulate = simulate;
|
||||
}
|
||||
|
||||
public final byte id() {
|
||||
return id;
|
||||
}
|
||||
|
||||
public final boolean simulate() {
|
||||
return simulate;
|
||||
}
|
||||
|
||||
public final boolean force() {
|
||||
return force;
|
||||
}
|
||||
|
||||
public static ActionExecutionMode resolve(byte id) {
|
||||
switch (id) {
|
||||
case 1: return SIMULATE;
|
||||
|
|
|
@ -60,21 +60,19 @@ public class ManualExecutionContext extends WatchExecutionContext {
|
|||
@Override
|
||||
public final boolean simulateAction(String actionId) {
|
||||
ActionExecutionMode mode = actionModes.get(Builder.ALL);
|
||||
if (mode == ActionExecutionMode.SIMULATE || mode == ActionExecutionMode.FORCE_SIMULATE) {
|
||||
return true;
|
||||
if (mode == null) {
|
||||
mode = actionModes.get(actionId);
|
||||
}
|
||||
mode = actionModes.get(actionId);
|
||||
return mode == ActionExecutionMode.SIMULATE || mode == ActionExecutionMode.FORCE_SIMULATE;
|
||||
return mode != null && mode.simulate();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean skipThrottling(String actionId) {
|
||||
ActionExecutionMode mode = actionModes.get(Builder.ALL);
|
||||
if (mode == ActionExecutionMode.FORCE_EXECUTE || mode == ActionExecutionMode.FORCE_SIMULATE) {
|
||||
return true;
|
||||
if (mode == null) {
|
||||
mode = actionModes.get(actionId);
|
||||
}
|
||||
mode = actionModes.get(actionId);
|
||||
return mode == ActionExecutionMode.FORCE_EXECUTE || mode == ActionExecutionMode.FORCE_SIMULATE;
|
||||
return mode != null && mode.force();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -28,11 +28,17 @@ public class RestAckWatchAction extends WatcherRestHandler {
|
|||
super(settings, controller, client);
|
||||
controller.registerHandler(RestRequest.Method.PUT, URI_BASE + "/watch/{id}/_ack", this);
|
||||
controller.registerHandler(RestRequest.Method.POST, URI_BASE + "/watch/{id}/_ack", this);
|
||||
controller.registerHandler(RestRequest.Method.PUT, URI_BASE + "/watch/{id}/{actions}/_ack", this);
|
||||
controller.registerHandler(RestRequest.Method.POST, URI_BASE + "/watch/{id}/{actions}/_ack", this);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void handleRequest(RestRequest request, RestChannel restChannel, WatcherClient client) throws Exception {
|
||||
AckWatchRequest ackWatchRequest = new AckWatchRequest(request.param("id"));
|
||||
String[] actions = request.paramAsStringArray("actions", null);
|
||||
if (actions != null) {
|
||||
ackWatchRequest.setActionIds(actions);
|
||||
}
|
||||
ackWatchRequest.masterNodeTimeout(request.paramAsTime("master_timeout", ackWatchRequest.masterNodeTimeout()));
|
||||
client.ackWatch(ackWatchRequest, new RestBuilderListener<AckWatchResponse>(restChannel) {
|
||||
@Override
|
||||
|
|
|
@ -0,0 +1,50 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.watcher.support;
|
||||
|
||||
import org.elasticsearch.common.logging.support.LoggerMessageFormat;
|
||||
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
public class Validation {
|
||||
|
||||
private static final Pattern NO_WS_PATTERN = Pattern.compile("\\S+");
|
||||
|
||||
public static Error watchId(String id) {
|
||||
if (!NO_WS_PATTERN.matcher(id).matches()) {
|
||||
return new Error("Invalid watch id [{}]. Watch id cannot have white spaces", id);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
public static Error actionId(String id) {
|
||||
if (!NO_WS_PATTERN.matcher(id).matches()) {
|
||||
return new Error("Invalid action id [{}]. Action id cannot have white spaces", id);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
public static class Error {
|
||||
|
||||
private final String message;
|
||||
|
||||
public Error(String message, Object... args) {
|
||||
this.message = LoggerMessageFormat.format(message, args);
|
||||
}
|
||||
|
||||
public String message() {
|
||||
return message;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return message;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -8,9 +8,11 @@ package org.elasticsearch.watcher.transport.actions.ack;
|
|||
import org.elasticsearch.action.ActionRequestValidationException;
|
||||
import org.elasticsearch.action.ValidateActions;
|
||||
import org.elasticsearch.action.support.master.MasterNodeOperationRequest;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.watcher.support.Validation;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
|
@ -21,54 +23,86 @@ public class AckWatchRequest extends MasterNodeOperationRequest<AckWatchRequest>
|
|||
|
||||
private static final TimeValue DEFAULT_TIMEOUT = TimeValue.timeValueSeconds(10);
|
||||
|
||||
private String id;
|
||||
private String watchId;
|
||||
private String[] actionIds = Strings.EMPTY_ARRAY;
|
||||
|
||||
public AckWatchRequest() {
|
||||
this(null);
|
||||
}
|
||||
|
||||
public AckWatchRequest(String id) {
|
||||
this.id = id;
|
||||
public AckWatchRequest(String watchId, String... actionIds) {
|
||||
this.watchId = watchId;
|
||||
this.actionIds = actionIds;
|
||||
masterNodeTimeout(DEFAULT_TIMEOUT);
|
||||
}
|
||||
|
||||
/**
|
||||
* @return The name of the watch to be acked
|
||||
* @return The id of the watch to be acked
|
||||
*/
|
||||
public String getId() {
|
||||
return id;
|
||||
public String getWatchId() {
|
||||
return watchId;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the name of the watch to be acked
|
||||
* @param actionIds The ids of the actions to be acked
|
||||
*/
|
||||
public void setId(String id) {
|
||||
this.id = id;
|
||||
public void setActionIds(String... actionIds) {
|
||||
this.actionIds = actionIds;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return The ids of the actions to be acked
|
||||
*/
|
||||
public String[] getActionIds() {
|
||||
return actionIds;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ActionRequestValidationException validate() {
|
||||
ActionRequestValidationException validationException = null;
|
||||
if (id == null){
|
||||
if (watchId == null){
|
||||
validationException = ValidateActions.addValidationError("watch id is missing", validationException);
|
||||
}
|
||||
Validation.Error error = Validation.watchId(watchId);
|
||||
if (error != null) {
|
||||
validationException = ValidateActions.addValidationError(error.message(), validationException);
|
||||
}
|
||||
for (String actionId : actionIds) {
|
||||
error = Validation.actionId(actionId);
|
||||
if (error != null) {
|
||||
validationException = ValidateActions.addValidationError(error.message(), validationException);
|
||||
}
|
||||
}
|
||||
return validationException;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void readFrom(StreamInput in) throws IOException {
|
||||
super.readFrom(in);
|
||||
id = in.readString();
|
||||
watchId = in.readString();
|
||||
actionIds = in.readStringArray();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
super.writeTo(out);
|
||||
out.writeString(id);
|
||||
out.writeString(watchId);
|
||||
out.writeStringArray(actionIds);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "ack [" + id + "]";
|
||||
StringBuilder sb = new StringBuilder("ack [").append(watchId).append("]");
|
||||
if (actionIds.length > 0) {
|
||||
sb.append("[");
|
||||
for (int i = 0; i < actionIds.length; i++) {
|
||||
if (i > 0) {
|
||||
sb.append(", ");
|
||||
}
|
||||
sb.append(actionIds[i]);
|
||||
}
|
||||
sb.append("]");
|
||||
}
|
||||
return sb.toString();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -23,11 +23,8 @@ public class AckWatchRequestBuilder extends MasterNodeOperationRequestBuilder<Ac
|
|||
super(client, new AckWatchRequest(id));
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the id of the watch to be ack
|
||||
*/
|
||||
public AckWatchRequestBuilder setId(String id) {
|
||||
this.request().setId(id);
|
||||
public AckWatchRequestBuilder setActionIds(String... actionIds) {
|
||||
request.setActionIds(actionIds);
|
||||
return this;
|
||||
}
|
||||
|
||||
|
|
|
@ -19,6 +19,7 @@ import org.elasticsearch.transport.TransportService;
|
|||
import org.elasticsearch.watcher.WatcherService;
|
||||
import org.elasticsearch.watcher.license.LicenseService;
|
||||
import org.elasticsearch.watcher.transport.actions.WatcherTransportAction;
|
||||
import org.elasticsearch.watcher.watch.WatchStatus;
|
||||
import org.elasticsearch.watcher.watch.WatchStore;
|
||||
|
||||
/**
|
||||
|
@ -53,7 +54,8 @@ public class TransportAckWatchAction extends WatcherTransportAction<AckWatchRequ
|
|||
@Override
|
||||
protected void masterOperation(AckWatchRequest request, ClusterState state, ActionListener<AckWatchResponse> listener) throws ElasticsearchException {
|
||||
try {
|
||||
AckWatchResponse response = new AckWatchResponse(watcherService.ackWatch(request.getId(), request.masterNodeTimeout()));
|
||||
WatchStatus watchStatus = watcherService.ackWatch(request.getWatchId(), request.getActionIds(), request.masterNodeTimeout());
|
||||
AckWatchResponse response = new AckWatchResponse(watchStatus);
|
||||
listener.onResponse(response);
|
||||
} catch (Exception e) {
|
||||
listener.onFailure(e);
|
||||
|
|
|
@ -12,6 +12,7 @@ import org.elasticsearch.common.io.stream.StreamInput;
|
|||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.lucene.uid.Versions;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.watcher.support.Validation;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
|
@ -82,6 +83,10 @@ public class DeleteWatchRequest extends MasterNodeOperationRequest<DeleteWatchRe
|
|||
if (id == null){
|
||||
validationException = ValidateActions.addValidationError("watch id is missing", validationException);
|
||||
}
|
||||
Validation.Error error = Validation.watchId(id);
|
||||
if (error != null) {
|
||||
validationException = ValidateActions.addValidationError(error.message(), validationException);
|
||||
}
|
||||
return validationException;
|
||||
}
|
||||
|
||||
|
|
|
@ -14,6 +14,7 @@ import org.elasticsearch.common.io.stream.StreamOutput;
|
|||
import org.elasticsearch.common.xcontent.ToXContent;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentFactory;
|
||||
import org.elasticsearch.watcher.support.Validation;
|
||||
import org.elasticsearch.watcher.trigger.TriggerEvent;
|
||||
import org.elasticsearch.watcher.execution.ActionExecutionMode;
|
||||
|
||||
|
@ -115,13 +116,6 @@ public class ExecuteWatchRequest extends MasterNodeOperationRequest<ExecuteWatch
|
|||
this.alternativeInput = alternativeInput;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the trigger to use
|
||||
*/
|
||||
public BytesReference getTriggerSource() {
|
||||
return triggerSource;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param triggerType the type of trigger to use
|
||||
* @param triggerSource the trigger source to use
|
||||
|
@ -132,7 +126,6 @@ public class ExecuteWatchRequest extends MasterNodeOperationRequest<ExecuteWatch
|
|||
}
|
||||
|
||||
/**
|
||||
<<<<<<< HEAD
|
||||
* @param triggerEvent the trigger event to use
|
||||
* @throws IOException
|
||||
*/
|
||||
|
@ -147,13 +140,18 @@ public class ExecuteWatchRequest extends MasterNodeOperationRequest<ExecuteWatch
|
|||
*/
|
||||
public String getTriggerType() { return triggerType; }
|
||||
|
||||
/**
|
||||
* @return the trigger to use
|
||||
*/
|
||||
public BytesReference getTriggerSource() {
|
||||
return triggerSource;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* @return the trigger data to use
|
||||
=======
|
||||
*
|
||||
* @return the execution modes for the actions. These modes determine the nature of the execution
|
||||
* of the watch actions while the watch is executing.
|
||||
>>>>>>> Action level throttling
|
||||
*/
|
||||
public Map<String, ActionExecutionMode> getActionModes() {
|
||||
return actionModes;
|
||||
|
@ -175,6 +173,16 @@ public class ExecuteWatchRequest extends MasterNodeOperationRequest<ExecuteWatch
|
|||
if (id == null){
|
||||
validationException = ValidateActions.addValidationError("watch id is missing", validationException);
|
||||
}
|
||||
Validation.Error error = Validation.watchId(id);
|
||||
if (error != null) {
|
||||
validationException = ValidateActions.addValidationError(error.message(), validationException);
|
||||
}
|
||||
for (Map.Entry<String, ActionExecutionMode> modes : actionModes.entrySet()) {
|
||||
error = Validation.actionId(modes.getKey());
|
||||
if (error != null) {
|
||||
validationException = ValidateActions.addValidationError(error.message(), validationException);
|
||||
}
|
||||
}
|
||||
if (triggerSource == null || triggerType == null) {
|
||||
validationException = ValidateActions.addValidationError("trigger event is missing", validationException);
|
||||
}
|
||||
|
|
|
@ -12,6 +12,7 @@ import org.elasticsearch.common.io.stream.StreamInput;
|
|||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.lucene.uid.Versions;
|
||||
import org.elasticsearch.index.VersionType;
|
||||
import org.elasticsearch.watcher.support.Validation;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
|
@ -46,6 +47,10 @@ public class GetWatchRequest extends MasterNodeOperationRequest<GetWatchRequest>
|
|||
if (id == null) {
|
||||
validationException = ValidateActions.addValidationError("id is missing", validationException);
|
||||
}
|
||||
Validation.Error error = Validation.watchId(id);
|
||||
if (error != null) {
|
||||
validationException = ValidateActions.addValidationError(error.message(), validationException);
|
||||
}
|
||||
return validationException;
|
||||
}
|
||||
|
||||
|
|
|
@ -15,6 +15,7 @@ import org.elasticsearch.common.bytes.BytesReference;
|
|||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.xcontent.XContentType;
|
||||
import org.elasticsearch.watcher.support.Validation;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
|
@ -83,6 +84,10 @@ public class PutWatchRequest extends MasterNodeOperationRequest<PutWatchRequest>
|
|||
if (id == null) {
|
||||
validationException = ValidateActions.addValidationError("watch name is missing", validationException);
|
||||
}
|
||||
Validation.Error error = Validation.watchId(id);
|
||||
if (error != null) {
|
||||
validationException = ValidateActions.addValidationError(error.message(), validationException);
|
||||
}
|
||||
if (source == null) {
|
||||
validationException = ValidateActions.addValidationError("watch source is missing", validationException);
|
||||
}
|
||||
|
|
|
@ -51,6 +51,8 @@ import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
|
|||
|
||||
public class Watch implements TriggerEngine.Job, ToXContent {
|
||||
|
||||
public static final String ALL_ACTIONS_ID = "_all";
|
||||
|
||||
private final String id;
|
||||
private final Trigger trigger;
|
||||
private final ExecutableInput input;
|
||||
|
@ -119,6 +121,7 @@ public class Watch implements TriggerEngine.Job, ToXContent {
|
|||
public void version(long version) {
|
||||
this.version = version;
|
||||
}
|
||||
|
||||
/**
|
||||
* Acks this watch.
|
||||
*
|
||||
|
|
|
@ -159,7 +159,7 @@ public class WatchStatus implements ToXContent, Streamable {
|
|||
*/
|
||||
boolean onAck(DateTime timestamp, String... actionIds) {
|
||||
boolean changed = false;
|
||||
if (ArrayUtils.contains(actionIds, "_all")) {
|
||||
if (ArrayUtils.contains(actionIds, Watch.ALL_ACTIONS_ID)) {
|
||||
for (ActionStatus status : actions.values()) {
|
||||
changed |= status.onAck(timestamp);
|
||||
}
|
||||
|
@ -277,6 +277,4 @@ public class WatchStatus implements ToXContent, Streamable {
|
|||
ParseField ACTIONS = new ParseField("actions");
|
||||
}
|
||||
|
||||
|
||||
|
||||
}
|
||||
|
|
|
@ -246,7 +246,7 @@ public class LicenseIntegrationTests extends AbstractWatcherIntegrationTests {
|
|||
@Override
|
||||
public void run() {
|
||||
XContentSource source = watcherClient().prepareGetWatch(watchName).get().getSource();
|
||||
assertThat(source.getValue("status.actions._index.ack_status.state"), is((Object) "ackable"));
|
||||
assertThat(source.getValue("status.actions._index.ack.state"), is((Object) "ackable"));
|
||||
}
|
||||
});
|
||||
|
||||
|
|
|
@ -6,17 +6,18 @@
|
|||
package org.elasticsearch.watcher.test.integration;
|
||||
|
||||
import com.carrotsearch.randomizedtesting.annotations.Repeat;
|
||||
import org.elasticsearch.action.ActionRequestValidationException;
|
||||
import org.elasticsearch.action.delete.DeleteResponse;
|
||||
import org.elasticsearch.action.get.GetRequest;
|
||||
import org.elasticsearch.action.get.GetResponse;
|
||||
import org.elasticsearch.action.index.IndexResponse;
|
||||
import org.elasticsearch.common.joda.time.DateTime;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.watcher.actions.ActionStatus;
|
||||
import org.elasticsearch.watcher.client.WatcherClient;
|
||||
import org.elasticsearch.watcher.history.HistoryStore;
|
||||
import org.elasticsearch.watcher.history.WatchRecord;
|
||||
import org.elasticsearch.watcher.test.AbstractWatcherIntegrationTests;
|
||||
import org.elasticsearch.watcher.transport.actions.ack.AckWatchRequestBuilder;
|
||||
import org.elasticsearch.watcher.transport.actions.ack.AckWatchResponse;
|
||||
import org.elasticsearch.watcher.transport.actions.get.GetWatchRequest;
|
||||
import org.elasticsearch.watcher.transport.actions.get.GetWatchResponse;
|
||||
|
@ -28,7 +29,6 @@ import org.junit.Test;
|
|||
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import static org.elasticsearch.common.joda.time.DateTimeZone.UTC;
|
||||
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
|
||||
import static org.elasticsearch.index.query.QueryBuilders.matchQuery;
|
||||
import static org.elasticsearch.watcher.actions.ActionBuilders.indexAction;
|
||||
|
@ -39,79 +39,14 @@ import static org.elasticsearch.watcher.test.WatcherTestUtils.matchAllRequest;
|
|||
import static org.elasticsearch.watcher.transform.TransformBuilders.searchTransform;
|
||||
import static org.elasticsearch.watcher.trigger.TriggerBuilders.schedule;
|
||||
import static org.elasticsearch.watcher.trigger.schedule.Schedules.cron;
|
||||
import static org.elasticsearch.watcher.trigger.schedule.Schedules.interval;
|
||||
import static org.hamcrest.Matchers.*;
|
||||
import static org.hamcrest.core.IsEqual.equalTo;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class WatchThrottleTests extends AbstractWatcherIntegrationTests {
|
||||
public class WatchAckTests extends AbstractWatcherIntegrationTests {
|
||||
|
||||
@Test
|
||||
public void test_AckThrottle() throws Exception {
|
||||
WatcherClient watcherClient = watcherClient();
|
||||
IndexResponse eventIndexResponse = indexTestDoc();
|
||||
|
||||
PutWatchResponse putWatchResponse = watcherClient.preparePutWatch()
|
||||
.setId("_name")
|
||||
.setSource(watchBuilder()
|
||||
.trigger(schedule(cron("0/5 * * * * ? *")))
|
||||
.input(searchInput(matchAllRequest().indices("events")))
|
||||
.condition(scriptCondition("ctx.payload.hits.total > 0"))
|
||||
.transform(searchTransform(matchAllRequest().indices("events")))
|
||||
.addAction("_id", indexAction("actions", "action"))
|
||||
.defaultThrottlePeriod(new TimeValue(0, TimeUnit.SECONDS)))
|
||||
.get();
|
||||
|
||||
assertThat(putWatchResponse.isCreated(), is(true));
|
||||
|
||||
if (timeWarped()) {
|
||||
timeWarp().scheduler().trigger("_name", 4, TimeValue.timeValueSeconds(5));
|
||||
} else {
|
||||
Thread.sleep(20000);
|
||||
}
|
||||
AckWatchResponse ackResponse = watcherClient.prepareAckWatch("_name").get();
|
||||
assertThat(ackResponse.getStatus().actionStatus("_id").ackStatus().state(), is(ActionStatus.AckStatus.State.ACKED));
|
||||
|
||||
refresh();
|
||||
long countAfterAck = docCount("actions", "action", matchAllQuery());
|
||||
assertThat(countAfterAck, greaterThanOrEqualTo((long) 1));
|
||||
|
||||
if (timeWarped()) {
|
||||
timeWarp().scheduler().trigger("_name", 4, TimeValue.timeValueSeconds(5));
|
||||
} else {
|
||||
Thread.sleep(20000);
|
||||
}
|
||||
refresh();
|
||||
|
||||
// There shouldn't be more actions in the index after we ack the watch, even though the watch was triggered
|
||||
long countAfterPostAckFires = docCount("actions", "action", matchAllQuery());
|
||||
assertThat(countAfterPostAckFires, equalTo(countAfterAck));
|
||||
|
||||
// Now delete the event and the ack state should change to AWAITS_EXECUTION
|
||||
DeleteResponse response = client().prepareDelete("events", "event", eventIndexResponse.getId()).get();
|
||||
assertThat(response.isFound(), is(true));
|
||||
refresh();
|
||||
|
||||
if (timeWarped()) {
|
||||
timeWarp().scheduler().trigger("_name", 4, TimeValue.timeValueSeconds(5));
|
||||
} else {
|
||||
Thread.sleep(20000);
|
||||
}
|
||||
|
||||
GetWatchResponse getWatchResponse = watcherClient.prepareGetWatch("_name").get();
|
||||
assertThat(getWatchResponse.isFound(), is(true));
|
||||
|
||||
Watch parsedWatch = watchParser().parse(getWatchResponse.getId(), true, getWatchResponse.getSource().getBytes());
|
||||
assertThat(parsedWatch.status().actionStatus("_id").ackStatus().state(), is(ActionStatus.AckStatus.State.AWAITS_SUCCESSFUL_EXECUTION));
|
||||
|
||||
long throttledCount = docCount(HistoryStore.INDEX_PREFIX + "*", null,
|
||||
matchQuery(WatchRecord.Field.STATE.getPreferredName(), WatchRecord.State.THROTTLED.id()));
|
||||
assertThat(throttledCount, greaterThan(0L));
|
||||
}
|
||||
|
||||
|
||||
public IndexResponse indexTestDoc() {
|
||||
private IndexResponse indexTestDoc() {
|
||||
createIndex("actions", "events");
|
||||
ensureGreen("actions", "events");
|
||||
|
||||
|
@ -123,87 +58,163 @@ public class WatchThrottleTests extends AbstractWatcherIntegrationTests {
|
|||
return eventIndexResponse;
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
@Repeat(iterations = 10)
|
||||
public void testTimeThrottle() throws Exception {
|
||||
public void testAck_SingleAction() throws Exception {
|
||||
WatcherClient watcherClient = watcherClient();
|
||||
indexTestDoc();
|
||||
IndexResponse eventIndexResponse = indexTestDoc();
|
||||
|
||||
PutWatchResponse putWatchResponse = watcherClient.preparePutWatch()
|
||||
.setId("_name")
|
||||
.setId("_id")
|
||||
.setSource(watchBuilder()
|
||||
.trigger(schedule(interval("5s")))
|
||||
.trigger(schedule(cron("0/5 * * * * ? *")))
|
||||
.input(searchInput(matchAllRequest().indices("events")))
|
||||
.condition(scriptCondition("ctx.payload.hits.total > 0"))
|
||||
.transform(searchTransform(matchAllRequest().indices("events")))
|
||||
.addAction("_id", indexAction("actions", "action"))
|
||||
.defaultThrottlePeriod(TimeValue.timeValueSeconds(30)))
|
||||
.addAction("_a1", indexAction("actions", "action1"))
|
||||
.addAction("_a2", indexAction("actions", "action2"))
|
||||
.defaultThrottlePeriod(new TimeValue(0, TimeUnit.SECONDS)))
|
||||
.get();
|
||||
|
||||
assertThat(putWatchResponse.isCreated(), is(true));
|
||||
|
||||
if (timeWarped()) {
|
||||
timeWarp().clock().setTime(DateTime.now(UTC));
|
||||
|
||||
timeWarp().scheduler().trigger("_name");
|
||||
refresh();
|
||||
|
||||
// the first fire should work
|
||||
long actionsCount = docCount("actions", "action", matchAllQuery());
|
||||
assertThat(actionsCount, is(1L));
|
||||
|
||||
timeWarp().clock().fastForwardSeconds(5);
|
||||
timeWarp().scheduler().trigger("_name");
|
||||
refresh();
|
||||
|
||||
// the last fire should have been throttled, so number of actions shouldn't change
|
||||
actionsCount = docCount("actions", "action", matchAllQuery());
|
||||
assertThat(actionsCount, is(1L));
|
||||
|
||||
timeWarp().clock().fastForwardSeconds(30);
|
||||
timeWarp().scheduler().trigger("_name");
|
||||
refresh();
|
||||
|
||||
// the last fire occurred passed the throttle period, so a new action should have been added
|
||||
actionsCount = docCount("actions", "action", matchAllQuery());
|
||||
assertThat(actionsCount, is(2L));
|
||||
|
||||
long throttledCount = docCount(HistoryStore.INDEX_PREFIX + "*", null,
|
||||
matchQuery(WatchRecord.Field.STATE.getPreferredName(), WatchRecord.State.THROTTLED.id()));
|
||||
assertThat(throttledCount, is(1L));
|
||||
|
||||
timeWarp().scheduler().trigger("_id", 4, TimeValue.timeValueSeconds(5));
|
||||
} else {
|
||||
Thread.sleep(TimeUnit.SECONDS.toMillis(5));
|
||||
// the first fire should work so we should have a single action in the actions index
|
||||
assertBusy(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
refresh();
|
||||
long actionsCount = docCount("actions", "action", matchAllQuery());
|
||||
assertThat(actionsCount, is(1L));
|
||||
}
|
||||
}, 5, TimeUnit.SECONDS);
|
||||
Thread.sleep(TimeUnit.SECONDS.toMillis(5));
|
||||
// we should still be within the throttling period... so the number of actions shouldn't change
|
||||
assertBusy(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
refresh();
|
||||
long actionsCount = docCount("actions", "action", matchAllQuery());
|
||||
assertThat(actionsCount, is(1L));
|
||||
|
||||
long throttledCount = docCount(HistoryStore.INDEX_PREFIX + "*", null,
|
||||
matchQuery(WatchRecord.Field.STATE.getPreferredName(), WatchRecord.State.THROTTLED.id()));
|
||||
assertThat(throttledCount, greaterThanOrEqualTo(1L));
|
||||
}
|
||||
}, 5, TimeUnit.SECONDS);
|
||||
Thread.sleep(20000);
|
||||
}
|
||||
AckWatchResponse ackResponse = watcherClient.prepareAckWatch("_id").setActionIds("_a1").get();
|
||||
assertThat(ackResponse.getStatus().actionStatus("_a1").ackStatus().state(), is(ActionStatus.AckStatus.State.ACKED));
|
||||
assertThat(ackResponse.getStatus().actionStatus("_a2").ackStatus().state(), is(ActionStatus.AckStatus.State.ACKABLE));
|
||||
|
||||
refresh();
|
||||
long a1CountAfterAck = docCount("actions", "action1", matchAllQuery());
|
||||
long a2CountAfterAck = docCount("actions", "action2", matchAllQuery());
|
||||
assertThat(a1CountAfterAck, greaterThanOrEqualTo((long) 1));
|
||||
assertThat(a2CountAfterAck, greaterThanOrEqualTo((long) 1));
|
||||
|
||||
if (timeWarped()) {
|
||||
timeWarp().scheduler().trigger("_id", 4, TimeValue.timeValueSeconds(5));
|
||||
} else {
|
||||
Thread.sleep(20000);
|
||||
}
|
||||
flush();
|
||||
refresh();
|
||||
|
||||
// There shouldn't be more a1 actions in the index after we ack the watch, even though the watch was triggered
|
||||
long a1CountAfterPostAckFires = docCount("actions", "action1", matchAllQuery());
|
||||
assertThat(a1CountAfterPostAckFires, equalTo(a1CountAfterAck));
|
||||
|
||||
// There should be more a2 actions in the index after we ack the watch
|
||||
long a2CountAfterPostAckFires = docCount("actions", "action2", matchAllQuery());
|
||||
assertThat(a2CountAfterPostAckFires, greaterThan(a2CountAfterAck));
|
||||
|
||||
// Now delete the event and the ack states should change to AWAITS_EXECUTION
|
||||
DeleteResponse response = client().prepareDelete("events", "event", eventIndexResponse.getId()).get();
|
||||
assertThat(response.isFound(), is(true));
|
||||
refresh();
|
||||
|
||||
if (timeWarped()) {
|
||||
timeWarp().scheduler().trigger("_id", 4, TimeValue.timeValueSeconds(5));
|
||||
} else {
|
||||
Thread.sleep(20000);
|
||||
}
|
||||
|
||||
GetWatchResponse getWatchResponse = watcherClient.prepareGetWatch("_id").get();
|
||||
assertThat(getWatchResponse.isFound(), is(true));
|
||||
|
||||
Watch parsedWatch = watchParser().parse(getWatchResponse.getId(), true, getWatchResponse.getSource().getBytes());
|
||||
assertThat(parsedWatch.status().actionStatus("_a1").ackStatus().state(), is(ActionStatus.AckStatus.State.AWAITS_SUCCESSFUL_EXECUTION));
|
||||
assertThat(parsedWatch.status().actionStatus("_a2").ackStatus().state(), is(ActionStatus.AckStatus.State.AWAITS_SUCCESSFUL_EXECUTION));
|
||||
|
||||
long throttledCount = docCount(HistoryStore.INDEX_PREFIX + "*", null,
|
||||
matchQuery(WatchRecord.Field.STATE.getPreferredName(), WatchRecord.State.THROTTLED.id()));
|
||||
assertThat(throttledCount, greaterThan(0L));
|
||||
}
|
||||
|
||||
@Test @Repeat(iterations = 5)
|
||||
public void testAck_AllActions() throws Exception {
|
||||
WatcherClient watcherClient = watcherClient();
|
||||
IndexResponse eventIndexResponse = indexTestDoc();
|
||||
|
||||
PutWatchResponse putWatchResponse = watcherClient.preparePutWatch()
|
||||
.setId("_id")
|
||||
.setSource(watchBuilder()
|
||||
.trigger(schedule(cron("0/5 * * * * ? *")))
|
||||
.input(searchInput(matchAllRequest().indices("events")))
|
||||
.condition(scriptCondition("ctx.payload.hits.total > 0"))
|
||||
.transform(searchTransform(matchAllRequest().indices("events")))
|
||||
.addAction("_a1", indexAction("actions", "action1"))
|
||||
.addAction("_a2", indexAction("actions", "action2"))
|
||||
.defaultThrottlePeriod(new TimeValue(0, TimeUnit.SECONDS)))
|
||||
.get();
|
||||
|
||||
assertThat(putWatchResponse.isCreated(), is(true));
|
||||
|
||||
if (timeWarped()) {
|
||||
timeWarp().scheduler().trigger("_id", 4, TimeValue.timeValueSeconds(5));
|
||||
} else {
|
||||
Thread.sleep(20000);
|
||||
}
|
||||
|
||||
AckWatchRequestBuilder ackWatchRequestBuilder = watcherClient.prepareAckWatch("_id");
|
||||
if (randomBoolean()) {
|
||||
ackWatchRequestBuilder.setActionIds("_all");
|
||||
} else if (randomBoolean()) {
|
||||
ackWatchRequestBuilder.setActionIds("_all", "a1");
|
||||
}
|
||||
AckWatchResponse ackResponse = ackWatchRequestBuilder.get();
|
||||
|
||||
assertThat(ackResponse.getStatus().actionStatus("_a1").ackStatus().state(), is(ActionStatus.AckStatus.State.ACKED));
|
||||
assertThat(ackResponse.getStatus().actionStatus("_a2").ackStatus().state(), is(ActionStatus.AckStatus.State.ACKED));
|
||||
|
||||
refresh();
|
||||
long a1CountAfterAck = docCount("actions", "action1", matchAllQuery());
|
||||
long a2CountAfterAck = docCount("actions", "action2", matchAllQuery());
|
||||
assertThat(a1CountAfterAck, greaterThanOrEqualTo((long) 1));
|
||||
assertThat(a2CountAfterAck, greaterThanOrEqualTo((long) 1));
|
||||
|
||||
if (timeWarped()) {
|
||||
timeWarp().scheduler().trigger("_id", 4, TimeValue.timeValueSeconds(5));
|
||||
} else {
|
||||
Thread.sleep(20000);
|
||||
}
|
||||
flush();
|
||||
refresh();
|
||||
|
||||
// There shouldn't be more a1 actions in the index after we ack the watch, even though the watch was triggered
|
||||
long a1CountAfterPostAckFires = docCount("actions", "action1", matchAllQuery());
|
||||
assertThat(a1CountAfterPostAckFires, equalTo(a1CountAfterAck));
|
||||
|
||||
// There shouldn't be more a2 actions in the index after we ack the watch, even though the watch was triggered
|
||||
long a2CountAfterPostAckFires = docCount("actions", "action2", matchAllQuery());
|
||||
assertThat(a2CountAfterPostAckFires, equalTo(a2CountAfterAck));
|
||||
|
||||
// Now delete the event and the ack states should change to AWAITS_EXECUTION
|
||||
DeleteResponse response = client().prepareDelete("events", "event", eventIndexResponse.getId()).get();
|
||||
assertThat(response.isFound(), is(true));
|
||||
refresh();
|
||||
|
||||
if (timeWarped()) {
|
||||
timeWarp().scheduler().trigger("_id", 4, TimeValue.timeValueSeconds(5));
|
||||
} else {
|
||||
Thread.sleep(20000);
|
||||
}
|
||||
|
||||
GetWatchResponse getWatchResponse = watcherClient.prepareGetWatch("_id").get();
|
||||
assertThat(getWatchResponse.isFound(), is(true));
|
||||
|
||||
Watch parsedWatch = watchParser().parse(getWatchResponse.getId(), true, getWatchResponse.getSource().getBytes());
|
||||
assertThat(parsedWatch.status().actionStatus("_a1").ackStatus().state(), is(ActionStatus.AckStatus.State.AWAITS_SUCCESSFUL_EXECUTION));
|
||||
assertThat(parsedWatch.status().actionStatus("_a2").ackStatus().state(), is(ActionStatus.AckStatus.State.AWAITS_SUCCESSFUL_EXECUTION));
|
||||
|
||||
long throttledCount = docCount(HistoryStore.INDEX_PREFIX + "*", null,
|
||||
matchQuery(WatchRecord.Field.STATE.getPreferredName(), WatchRecord.State.THROTTLED.id()));
|
||||
assertThat(throttledCount, greaterThan(0L));
|
||||
}
|
||||
|
||||
@Test
|
||||
@Repeat(iterations = 2)
|
||||
public void test_ack_with_restart() throws Exception {
|
||||
public void testAck_WithRestart() throws Exception {
|
||||
WatcherClient watcherClient = watcherClient();
|
||||
indexTestDoc();
|
||||
PutWatchResponse putWatchResponse = watcherClient.preparePutWatch()
|
||||
|
@ -266,53 +277,15 @@ public class WatchThrottleTests extends AbstractWatcherIntegrationTests {
|
|||
assertThat(countAfterPostAckFires, equalTo(countAfterAck));
|
||||
}
|
||||
|
||||
|
||||
@Test @Repeat(iterations = 10)
|
||||
public void test_default_TimeThrottle() throws Exception {
|
||||
@Test(expected = ActionRequestValidationException.class)
|
||||
public void testAck_InvalidWatchId() throws Exception {
|
||||
WatcherClient watcherClient = watcherClient();
|
||||
indexTestDoc();
|
||||
|
||||
PutWatchResponse putWatchResponse = watcherClient.preparePutWatch()
|
||||
.setId("_name")
|
||||
.setSource(watchBuilder()
|
||||
.trigger(schedule(interval("1s")))
|
||||
.input(searchInput(matchAllRequest().indices("events")))
|
||||
.condition(scriptCondition("ctx.payload.hits.total > 0"))
|
||||
.transform(searchTransform(matchAllRequest().indices("events")))
|
||||
.addAction("_id", indexAction("actions", "action")))
|
||||
.get();
|
||||
assertThat(putWatchResponse.isCreated(), is(true));
|
||||
|
||||
if (timeWarped()) {
|
||||
timeWarp().clock().setTime(DateTime.now(UTC));
|
||||
|
||||
timeWarp().scheduler().trigger("_name");
|
||||
refresh();
|
||||
|
||||
// the first trigger should work
|
||||
long actionsCount = docCount("actions", "action", matchAllQuery());
|
||||
assertThat(actionsCount, is(1L));
|
||||
|
||||
timeWarp().clock().fastForwardSeconds(2);
|
||||
timeWarp().scheduler().trigger("_name");
|
||||
refresh();
|
||||
|
||||
// the last fire should have been throttled, so number of actions shouldn't change
|
||||
actionsCount = docCount("actions", "action", matchAllQuery());
|
||||
assertThat(actionsCount, is(1L));
|
||||
|
||||
timeWarp().clock().fastForwardSeconds(10);
|
||||
timeWarp().scheduler().trigger("_name");
|
||||
refresh();
|
||||
|
||||
// the last fire occurred passed the throttle period, so a new action should have been added
|
||||
actionsCount = docCount("actions", "action", matchAllQuery());
|
||||
assertThat(actionsCount, is(2L));
|
||||
|
||||
long throttledCount = docCount(HistoryStore.INDEX_PREFIX + "*", null,
|
||||
matchQuery(WatchRecord.Field.STATE.getPreferredName(), WatchRecord.State.THROTTLED.id()));
|
||||
assertThat(throttledCount, is(1L));
|
||||
}
|
||||
watcherClient.prepareAckWatch("id with whitespaces").get();
|
||||
}
|
||||
|
||||
@Test(expected = ActionRequestValidationException.class)
|
||||
public void testAck_InvalidActionId() throws Exception {
|
||||
WatcherClient watcherClient = watcherClient();
|
||||
watcherClient.prepareAckWatch("_id").setActionIds("id with whitespaces").get();
|
||||
}
|
||||
}
|
|
@ -6,6 +6,8 @@
|
|||
package org.elasticsearch.watcher.test.integration;
|
||||
|
||||
import com.carrotsearch.randomizedtesting.annotations.Repeat;
|
||||
import org.elasticsearch.action.ActionRequestValidationException;
|
||||
import org.elasticsearch.watcher.WatcherException;
|
||||
import org.elasticsearch.watcher.client.WatchSourceBuilder;
|
||||
import org.elasticsearch.watcher.support.xcontent.XContentSource;
|
||||
import org.elasticsearch.watcher.test.AbstractWatcherIntegrationTests;
|
||||
|
@ -65,6 +67,23 @@ public class WatchCrudTests extends AbstractWatcherIntegrationTests {
|
|||
.get();
|
||||
}
|
||||
|
||||
@Test(expected = ActionRequestValidationException.class)
|
||||
public void testPut_InvalidWatchId() throws Exception {
|
||||
ensureWatcherStarted();
|
||||
watcherClient().preparePutWatch("id with whitespaces").setSource(watchBuilder()
|
||||
.trigger(schedule(interval("5m"))))
|
||||
.get();
|
||||
}
|
||||
|
||||
@Test(expected = WatcherException.class)
|
||||
public void testPut_InvalidActionId() throws Exception {
|
||||
ensureWatcherStarted();
|
||||
watcherClient().preparePutWatch("_name").setSource(watchBuilder()
|
||||
.trigger(schedule(interval("5m")))
|
||||
.addAction("id with whitespaces", loggingAction("{{ctx.watch_id}}")))
|
||||
.get();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGet() throws Exception {
|
||||
ensureWatcherStarted();
|
||||
|
@ -92,6 +111,11 @@ public class WatchCrudTests extends AbstractWatcherIntegrationTests {
|
|||
assertThat(source, hasKey("status"));
|
||||
}
|
||||
|
||||
@Test(expected = ActionRequestValidationException.class)
|
||||
public void testGet_InvalidWatchId() throws Exception {
|
||||
watcherClient().prepareGetWatch("id with whitespaces").get();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGet_NotFound() throws Exception {
|
||||
ensureWatcherStarted();
|
||||
|
@ -135,4 +159,8 @@ public class WatchCrudTests extends AbstractWatcherIntegrationTests {
|
|||
assertThat(response.isFound(), is(false));
|
||||
}
|
||||
|
||||
@Test(expected = ActionRequestValidationException.class)
|
||||
public void testDelete_InvalidWatchId() throws Exception {
|
||||
watcherClient().deleteWatch(new DeleteWatchRequest("id with whitespaces")).actionGet();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,38 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.watcher.test.integration;
|
||||
|
||||
import org.elasticsearch.action.ActionRequestValidationException;
|
||||
import org.elasticsearch.common.joda.time.DateTime;
|
||||
import org.elasticsearch.common.joda.time.DateTimeZone;
|
||||
import org.elasticsearch.watcher.execution.ActionExecutionMode;
|
||||
import org.elasticsearch.watcher.test.AbstractWatcherIntegrationTests;
|
||||
import org.elasticsearch.watcher.trigger.schedule.ScheduleTriggerEvent;
|
||||
import org.junit.Test;
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
public class WatchExecuteTests extends AbstractWatcherIntegrationTests {
|
||||
|
||||
|
||||
@Test(expected = ActionRequestValidationException.class)
|
||||
public void testExecute_InvalidWatchId() throws Exception {
|
||||
DateTime now = DateTime.now(DateTimeZone.UTC);
|
||||
watcherClient().prepareExecuteWatch("id with whitespaces")
|
||||
.setTriggerEvent(new ScheduleTriggerEvent(now, now))
|
||||
.get();
|
||||
}
|
||||
|
||||
@Test(expected = ActionRequestValidationException.class)
|
||||
public void testExecute_InvalidActionId() throws Exception {
|
||||
DateTime now = DateTime.now(DateTimeZone.UTC);
|
||||
watcherClient().prepareExecuteWatch("_id")
|
||||
.setTriggerEvent(new ScheduleTriggerEvent(now, now))
|
||||
.setActionMode("id with whitespaces", randomFrom(ActionExecutionMode.values()))
|
||||
.get();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,176 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.watcher.test.integration;
|
||||
|
||||
import com.carrotsearch.randomizedtesting.annotations.Repeat;
|
||||
import org.elasticsearch.action.index.IndexResponse;
|
||||
import org.elasticsearch.common.joda.time.DateTime;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.watcher.client.WatcherClient;
|
||||
import org.elasticsearch.watcher.history.HistoryStore;
|
||||
import org.elasticsearch.watcher.history.WatchRecord;
|
||||
import org.elasticsearch.watcher.test.AbstractWatcherIntegrationTests;
|
||||
import org.elasticsearch.watcher.transport.actions.put.PutWatchResponse;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import static org.elasticsearch.common.joda.time.DateTimeZone.UTC;
|
||||
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
|
||||
import static org.elasticsearch.index.query.QueryBuilders.matchQuery;
|
||||
import static org.elasticsearch.watcher.actions.ActionBuilders.indexAction;
|
||||
import static org.elasticsearch.watcher.client.WatchSourceBuilders.watchBuilder;
|
||||
import static org.elasticsearch.watcher.condition.ConditionBuilders.scriptCondition;
|
||||
import static org.elasticsearch.watcher.input.InputBuilders.searchInput;
|
||||
import static org.elasticsearch.watcher.test.WatcherTestUtils.matchAllRequest;
|
||||
import static org.elasticsearch.watcher.transform.TransformBuilders.searchTransform;
|
||||
import static org.elasticsearch.watcher.trigger.TriggerBuilders.schedule;
|
||||
import static org.elasticsearch.watcher.trigger.schedule.Schedules.interval;
|
||||
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class WatchTimeThrottleTests extends AbstractWatcherIntegrationTests {
|
||||
|
||||
private IndexResponse indexTestDoc() {
|
||||
createIndex("actions", "events");
|
||||
ensureGreen("actions", "events");
|
||||
|
||||
IndexResponse eventIndexResponse = client().prepareIndex("events", "event")
|
||||
.setSource("level", "error")
|
||||
.get();
|
||||
assertThat(eventIndexResponse.isCreated(), is(true));
|
||||
refresh();
|
||||
return eventIndexResponse;
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
@Repeat(iterations = 10)
|
||||
public void testTimeThrottle() throws Exception {
|
||||
WatcherClient watcherClient = watcherClient();
|
||||
indexTestDoc();
|
||||
|
||||
PutWatchResponse putWatchResponse = watcherClient.preparePutWatch()
|
||||
.setId("_name")
|
||||
.setSource(watchBuilder()
|
||||
.trigger(schedule(interval("5s")))
|
||||
.input(searchInput(matchAllRequest().indices("events")))
|
||||
.condition(scriptCondition("ctx.payload.hits.total > 0"))
|
||||
.transform(searchTransform(matchAllRequest().indices("events")))
|
||||
.addAction("_id", indexAction("actions", "action"))
|
||||
.defaultThrottlePeriod(TimeValue.timeValueSeconds(30)))
|
||||
.get();
|
||||
assertThat(putWatchResponse.isCreated(), is(true));
|
||||
|
||||
if (timeWarped()) {
|
||||
timeWarp().clock().setTime(DateTime.now(UTC));
|
||||
|
||||
timeWarp().scheduler().trigger("_name");
|
||||
refresh();
|
||||
|
||||
// the first fire should work
|
||||
long actionsCount = docCount("actions", "action", matchAllQuery());
|
||||
assertThat(actionsCount, is(1L));
|
||||
|
||||
timeWarp().clock().fastForwardSeconds(5);
|
||||
timeWarp().scheduler().trigger("_name");
|
||||
refresh();
|
||||
|
||||
// the last fire should have been throttled, so number of actions shouldn't change
|
||||
actionsCount = docCount("actions", "action", matchAllQuery());
|
||||
assertThat(actionsCount, is(1L));
|
||||
|
||||
timeWarp().clock().fastForwardSeconds(30);
|
||||
timeWarp().scheduler().trigger("_name");
|
||||
refresh();
|
||||
|
||||
// the last fire occurred passed the throttle period, so a new action should have been added
|
||||
actionsCount = docCount("actions", "action", matchAllQuery());
|
||||
assertThat(actionsCount, is(2L));
|
||||
|
||||
long throttledCount = docCount(HistoryStore.INDEX_PREFIX + "*", null,
|
||||
matchQuery(WatchRecord.Field.STATE.getPreferredName(), WatchRecord.State.THROTTLED.id()));
|
||||
assertThat(throttledCount, is(1L));
|
||||
|
||||
} else {
|
||||
Thread.sleep(TimeUnit.SECONDS.toMillis(5));
|
||||
// the first fire should work so we should have a single action in the actions index
|
||||
assertBusy(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
refresh();
|
||||
long actionsCount = docCount("actions", "action", matchAllQuery());
|
||||
assertThat(actionsCount, is(1L));
|
||||
}
|
||||
}, 5, TimeUnit.SECONDS);
|
||||
Thread.sleep(TimeUnit.SECONDS.toMillis(5));
|
||||
// we should still be within the throttling period... so the number of actions shouldn't change
|
||||
assertBusy(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
refresh();
|
||||
long actionsCount = docCount("actions", "action", matchAllQuery());
|
||||
assertThat(actionsCount, is(1L));
|
||||
|
||||
long throttledCount = docCount(HistoryStore.INDEX_PREFIX + "*", null,
|
||||
matchQuery(WatchRecord.Field.STATE.getPreferredName(), WatchRecord.State.THROTTLED.id()));
|
||||
assertThat(throttledCount, greaterThanOrEqualTo(1L));
|
||||
}
|
||||
}, 5, TimeUnit.SECONDS);
|
||||
}
|
||||
}
|
||||
|
||||
@Test @Repeat(iterations = 10)
|
||||
public void testTimeThrottle_Defaults() throws Exception {
|
||||
WatcherClient watcherClient = watcherClient();
|
||||
indexTestDoc();
|
||||
|
||||
PutWatchResponse putWatchResponse = watcherClient.preparePutWatch()
|
||||
.setId("_name")
|
||||
.setSource(watchBuilder()
|
||||
.trigger(schedule(interval("1s")))
|
||||
.input(searchInput(matchAllRequest().indices("events")))
|
||||
.condition(scriptCondition("ctx.payload.hits.total > 0"))
|
||||
.transform(searchTransform(matchAllRequest().indices("events")))
|
||||
.addAction("_id", indexAction("actions", "action")))
|
||||
.get();
|
||||
assertThat(putWatchResponse.isCreated(), is(true));
|
||||
|
||||
if (timeWarped()) {
|
||||
timeWarp().clock().setTime(DateTime.now(UTC));
|
||||
|
||||
timeWarp().scheduler().trigger("_name");
|
||||
refresh();
|
||||
|
||||
// the first trigger should work
|
||||
long actionsCount = docCount("actions", "action", matchAllQuery());
|
||||
assertThat(actionsCount, is(1L));
|
||||
|
||||
timeWarp().clock().fastForwardSeconds(2);
|
||||
timeWarp().scheduler().trigger("_name");
|
||||
refresh();
|
||||
|
||||
// the last fire should have been throttled, so number of actions shouldn't change
|
||||
actionsCount = docCount("actions", "action", matchAllQuery());
|
||||
assertThat(actionsCount, is(1L));
|
||||
|
||||
timeWarp().clock().fastForwardSeconds(10);
|
||||
timeWarp().scheduler().trigger("_name");
|
||||
refresh();
|
||||
|
||||
// the last fire occurred passed the throttle period, so a new action should have been added
|
||||
actionsCount = docCount("actions", "action", matchAllQuery());
|
||||
assertThat(actionsCount, is(2L));
|
||||
|
||||
long throttledCount = docCount(HistoryStore.INDEX_PREFIX + "*", null,
|
||||
matchQuery(WatchRecord.Field.STATE.getPreferredName(), WatchRecord.State.THROTTLED.id()));
|
||||
assertThat(throttledCount, is(1L));
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -7,11 +7,11 @@ package org.elasticsearch.watcher.watch;
|
|||
|
||||
import org.elasticsearch.action.delete.DeleteResponse;
|
||||
import org.elasticsearch.action.index.IndexResponse;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.bytes.BytesArray;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.collect.ImmutableMap;
|
||||
import org.elasticsearch.common.joda.time.DateTime;
|
||||
import org.elasticsearch.common.joda.time.DateTimeZone;
|
||||
import org.elasticsearch.common.settings.ImmutableSettings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.test.ElasticsearchTestCase;
|
||||
|
@ -189,7 +189,7 @@ public class WatchServiceTests extends ElasticsearchTestCase {
|
|||
when(watch.status()).thenReturn(status);
|
||||
when(watchStore.get("_id")).thenReturn(watch);
|
||||
|
||||
WatchStatus result = watcherService.ackWatch("_id", timeout);
|
||||
WatchStatus result = watcherService.ackWatch("_id", Strings.EMPTY_ARRAY, timeout);
|
||||
assertThat(result, not(sameInstance(status)));
|
||||
|
||||
verify(watchStore, times(1)).updateStatus(watch);
|
||||
|
@ -199,7 +199,7 @@ public class WatchServiceTests extends ElasticsearchTestCase {
|
|||
public void testAckWatch_Timeout() throws Exception {
|
||||
TimeValue timeout = TimeValue.timeValueSeconds(5);
|
||||
when(watchLockService.tryAcquire("_id", timeout)).thenReturn(null);
|
||||
watcherService.ackWatch("_id", timeout);
|
||||
watcherService.ackWatch("_id", Strings.EMPTY_ARRAY, timeout);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -214,7 +214,7 @@ public class WatchServiceTests extends ElasticsearchTestCase {
|
|||
when(watch.status()).thenReturn(status);
|
||||
when(watchStore.get("_id")).thenReturn(watch);
|
||||
|
||||
WatchStatus result = watcherService.ackWatch("_id", timeout);
|
||||
WatchStatus result = watcherService.ackWatch("_id", Strings.EMPTY_ARRAY, timeout);
|
||||
assertThat(result, not(sameInstance(status)));
|
||||
|
||||
verify(watchStore, never()).updateStatus(watch);
|
||||
|
@ -228,7 +228,7 @@ public class WatchServiceTests extends ElasticsearchTestCase {
|
|||
when(watchStore.get("_id")).thenReturn(null);
|
||||
|
||||
try {
|
||||
watcherService.ackWatch("_id", timeout);
|
||||
watcherService.ackWatch("_id", Strings.EMPTY_ARRAY, timeout);
|
||||
fail();
|
||||
} catch (WatcherException e) {
|
||||
// expected
|
||||
|
|
Loading…
Reference in New Issue