diff --git a/watcher/docs/how-watcher-works.asciidoc b/watcher/docs/how-watcher-works.asciidoc index 9f355f7eeaa..43a86a34707 100644 --- a/watcher/docs/how-watcher-works.asciidoc +++ b/watcher/docs/how-watcher-works.asciidoc @@ -168,6 +168,30 @@ acknowledgment is cleared and Watcher resumes executing the watch's actions norm For more information, see <>. +[[watch-active-state]] +=== Watch Active State + +By default, when you add a watch it is immediately set to the active state. An active watch is +registered with the relevant trigger engine and executed according to its configured trigger. +For example, if a watch has a <> trigger, it is executed according +to its schedule. + +A watch can also be set to the inactive state. An inactive watch is still registered with Watcher, +but it is not registered with a trigger engine and can never be triggered. When you add a watch, +you can use the <> parameter to set its initial state +to inactive. You can deactivate an existing watch with the <>. +To reactivate a watch, you use the <>. + +NOTE: You can still use the <> to force the execution + of an inactive watch. + +Deactivating watches is useful in a variety of situations. For example, if you have a watch that +monitors an external system and need to take the system down for maintenance, you can deactivate +the watch so doesn't unnecessarily report a bunch of execution failures during the maintenance +window. You might also want to deactivate a watch rather than deleting it entirely in case you +might want to use it at some point in the future. + + [[scripts-templates]] === Scripts and Templates diff --git a/watcher/docs/reference/condition/array-compare.asciidoc b/watcher/docs/reference/condition/array-compare.asciidoc index dc885a83402..5086579941c 100644 --- a/watcher/docs/reference/condition/array-compare.asciidoc +++ b/watcher/docs/reference/condition/array-compare.asciidoc @@ -1,4 +1,4 @@ -[[array-condition-compare]] +[[condition-array-compare]] ==== Array Compare Condition A watch <> that compares an array of values in the <> @@ -51,7 +51,8 @@ The array path is a "dot-notation" expression that can reference the following v This array path must resolve to an array. -The comparison operator can be any one of the operators from [[condition-compare]]. +The comparison operator can be any one of the operators supported by the +<>. The quantifier operator can be any one of the following: @@ -65,4 +66,4 @@ The quantifier operator can be any one of the following: NOTE: If the array is empty, `all` causes the comparison operator to evaluate to `true` and `some` causes the comparison operator to evaluate to `false`. -NOTE: It is also possible to use date math expressions and values in the context model as in [[condition-compare]]. +NOTE: It is also possible to use date math expressions and values in the context model as in the <>. diff --git a/watcher/docs/reference/condition/compare.asciidoc b/watcher/docs/reference/condition/compare.asciidoc index ff625d1f6e0..5e5ec9b506d 100644 --- a/watcher/docs/reference/condition/compare.asciidoc +++ b/watcher/docs/reference/condition/compare.asciidoc @@ -47,6 +47,7 @@ The path is a "dot-notation" expression that can reference the following variabl TIP: You can reference entries in arrays using their zero-based array indices. For example, to access the third element of the `ctx.payload.hits.hits` array, use `ctx.payload.hits.hits.2`. +[[condition-compare-operators]] The comparison operator can be any one of the following: [options="header"] diff --git a/watcher/docs/reference/java.asciidoc b/watcher/docs/reference/java.asciidoc index 677e67c9619..c632ebb9101 100644 --- a/watcher/docs/reference/java.asciidoc +++ b/watcher/docs/reference/java.asciidoc @@ -93,6 +93,10 @@ include::java/execute-watch.asciidoc[] include::java/ack-watch.asciidoc[] +include::java/activate-watch.asciidoc[] + +include::java/deactivate-watch.asciidoc[] + include::java/stats.asciidoc[] include::java/service.asciidoc[] diff --git a/watcher/docs/reference/java/activate-watch.asciidoc b/watcher/docs/reference/java/activate-watch.asciidoc new file mode 100644 index 00000000000..e4992d51253 --- /dev/null +++ b/watcher/docs/reference/java/activate-watch.asciidoc @@ -0,0 +1,26 @@ +[[api-java-activate-watch]] +==== Activate Watch API + +A watch can be either <>. This API enables +you to activate a currently inactive watch. + +The status of an inactive watch is returned with the watch definition +when you call the <>: + +[source,java] +-------------------------------------------------- +GetWatchResponse getWatchResponse = watcherClient.prepareGetWatch("my-watch").get(); +boolean active = getWatchResponse.getStatus().state().isActive(); +assert !active; +-------------------------------------------------- + +You can activate the watch by executing the following API call: + +[source,java] +-------------------------------------------------- +ActivateWatchResponse activateResponse = watcherClient.prepareActivateWatch("my-watch", true).get(); +boolean active = activateResponse.getStatus().state().isActive(); +assert active; +-------------------------------------------------- + +The new state of the watch is returned as part of its overall status. \ No newline at end of file diff --git a/watcher/docs/reference/java/deactivate-watch.asciidoc b/watcher/docs/reference/java/deactivate-watch.asciidoc new file mode 100644 index 00000000000..9f28ff4644b --- /dev/null +++ b/watcher/docs/reference/java/deactivate-watch.asciidoc @@ -0,0 +1,26 @@ +[[api-java-deactivate-watch]] +==== Deactivate Watch API + +A watch can be either <>. This API enables +you to deactivate a currently active watch. + +The status of an active watch is returned with the watch definition +when you call the <>: + +[source,java] +-------------------------------------------------- +GetWatchResponse getWatchResponse = watcherClient.prepareGetWatch("my-watch").get(); +boolean active = getWatchResponse.getStatus().state().isActive(); +assert active; +-------------------------------------------------- + +You can deactivate the watch by executing the following API call: + +[source,java] +-------------------------------------------------- +ActivateWatchResponse activateResponse = watcherClient.prepareActivateWatch("my-watch", false).get(); +boolean active = activateResponse.getStatus().state().isActive(); +assert !active; +-------------------------------------------------- + +The new state of the watch is returned as part of its overall status. \ No newline at end of file diff --git a/watcher/docs/reference/rest.asciidoc b/watcher/docs/reference/rest.asciidoc index affa019a11b..d945c05b072 100644 --- a/watcher/docs/reference/rest.asciidoc +++ b/watcher/docs/reference/rest.asciidoc @@ -11,6 +11,10 @@ include::rest/execute-watch.asciidoc[] include::rest/ack-watch.asciidoc[] +include::rest/activate-watch.asciidoc[] + +include::rest/deactivate-watch.asciidoc[] + include::rest/info.asciidoc[] include::rest/stats.asciidoc[] diff --git a/watcher/docs/reference/rest/activate-watch.asciidoc b/watcher/docs/reference/rest/activate-watch.asciidoc new file mode 100644 index 00000000000..c08ee906a88 --- /dev/null +++ b/watcher/docs/reference/rest/activate-watch.asciidoc @@ -0,0 +1,52 @@ +[[api-rest-activate-watch]] +==== Activate Watch API + +A watch can be either <>. This API enables +you to activate a currently inactive watch. + +The status of an inactive watch is returned with the watch definition +when you call the <>: + +[source,json] +-------------------------------------------------- +GET _watcher/watch/ +-------------------------------------------------- +// AUTOSENSE + +[source,js] +-------------------------------------------------- +"_status": { + "state" : { + "active" : false, + "timestamp" : "2015-08-20T12:21:32.734Z" + } + "actions": { + ... + } + } +} +-------------------------------------------------- + +You can activate the watch by executing the following API call: + +[source,json] +-------------------------------------------------- +PUT _watcher/watch//_activate +-------------------------------------------------- +// AUTOSENSE + +The new state of the watch is returned as part of its overall status. + +[source,js] +-------------------------------------------------- +"_status": { + "state" : { + "active" : true, + "timestamp" : "2015-09-04T08:39:46.816Z" + } + "actions": { + ... + } + } +} +-------------------------------------------------- \ No newline at end of file diff --git a/watcher/docs/reference/rest/deactivate-watch.asciidoc b/watcher/docs/reference/rest/deactivate-watch.asciidoc new file mode 100644 index 00000000000..694651162e5 --- /dev/null +++ b/watcher/docs/reference/rest/deactivate-watch.asciidoc @@ -0,0 +1,52 @@ +[[api-rest-deactivate-watch]] +==== Deactivate Watch API + +A watch can be either <>. This API enables +you to deactivate a currently active watch. + +The status of an active watch is returned with the watch definition +when you call the <>: + +[source,json] +-------------------------------------------------- +GET _watcher/watch/ +-------------------------------------------------- +// AUTOSENSE + +[source,js] +-------------------------------------------------- +"_status": { + "state" : { + "active" : true, + "timestamp" : "2015-08-20T12:21:32.734Z" + } + "actions": { + ... + } + } +} +-------------------------------------------------- + +You can deactivate the watch by executing the following API call: + +[source,json] +-------------------------------------------------- +PUT _watcher/watch//_deactivate +-------------------------------------------------- +// AUTOSENSE + +The new state of the watch is returned as part of its overall status. + +[source,js] +-------------------------------------------------- +"_status": { + "state" : { + "active" : false, + "timestamp" : "2015-09-04T08:39:46.816Z" + } + "actions": { + ... + } + } +} +-------------------------------------------------- \ No newline at end of file diff --git a/watcher/docs/reference/rest/put-watch.asciidoc b/watcher/docs/reference/rest/put-watch.asciidoc index 828d10f8a39..cf8360757ad 100644 --- a/watcher/docs/reference/rest/put-watch.asciidoc +++ b/watcher/docs/reference/rest/put-watch.asciidoc @@ -92,4 +92,16 @@ The following snippet shows how to change the default timeout of the put action -------------------------------------------------- PUT _watcher/watch/my-watch?master_timeout=30s -------------------------------------------------- -// AUTOSENSE \ No newline at end of file + +[[api-rest-put-watch-active-state]] +===== Controlling Default Active State + +When adding a watch you can also define its initial <>. You do that by +setting the `active` parameter. The following command add a watch and sets it to be inactive by default: + +[source,js] +-------------------------------------------------- +PUT _watcher/watch/my-watch?active=false +-------------------------------------------------- + +NOTE: If you omit the `active` parameter, the watch is set to the active state by default. \ No newline at end of file diff --git a/watcher/rest-api-spec/api/watcher.activate_watch.json b/watcher/rest-api-spec/api/watcher.activate_watch.json new file mode 100644 index 00000000000..19685687932 --- /dev/null +++ b/watcher/rest-api-spec/api/watcher.activate_watch.json @@ -0,0 +1,24 @@ +{ + "watcher.activate_watch": { + "documentation": "https://www.elastic.co/guide/en/watcher/current/api-rest.html#api-rest-activate-watch", + "methods": [ "PUT", "POST" ], + "url": { + "path": "/_watcher/watch/{watch_id}/_activate", + "paths": [ "/_watcher/watch/{watch_id}/_activate" ], + "parts": { + "watch_id": { + "type" : "string", + "description" : "Watch ID", + "required" : true + } + }, + "params": { + "master_timeout": { + "type": "duration", + "description": "Specify timeout for watch write operation" + } + } + }, + "body": null + } +} diff --git a/watcher/rest-api-spec/api/watcher.deactivate_watch.json b/watcher/rest-api-spec/api/watcher.deactivate_watch.json new file mode 100644 index 00000000000..df56e1ceca3 --- /dev/null +++ b/watcher/rest-api-spec/api/watcher.deactivate_watch.json @@ -0,0 +1,24 @@ +{ + "watcher.deactivate_watch": { + "documentation": "https://www.elastic.co/guide/en/watcher/current/api-rest.html#api-rest-deactivate-watch", + "methods": [ "PUT", "POST" ], + "url": { + "path": "/_watcher/watch/{watch_id}/_deactivate", + "paths": [ "/_watcher/watch/{watch_id}/_deactivate" ], + "parts": { + "watch_id": { + "type" : "string", + "description" : "Watch ID", + "required" : true + } + }, + "params": { + "master_timeout": { + "type": "duration", + "description": "Specify timeout for watch write operation" + } + } + }, + "body": null + } +} diff --git a/watcher/rest-api-spec/api/watcher.put_watch.json b/watcher/rest-api-spec/api/watcher.put_watch.json index c634e6737a1..411a9641dcd 100644 --- a/watcher/rest-api-spec/api/watcher.put_watch.json +++ b/watcher/rest-api-spec/api/watcher.put_watch.json @@ -16,6 +16,10 @@ "master_timeout": { "type": "duration", "description": "Specify timeout for watch write operation" + }, + "active": { + "type": "boolean", + "description": "Specify whether the watch is in/active by default" } } }, diff --git a/watcher/rest-api-spec/test/activate_watch/10_basic.yaml b/watcher/rest-api-spec/test/activate_watch/10_basic.yaml new file mode 100644 index 00000000000..100942d08b5 --- /dev/null +++ b/watcher/rest-api-spec/test/activate_watch/10_basic.yaml @@ -0,0 +1,81 @@ +--- +"Test activate watch api": + - do: + cluster.health: + wait_for_status: yellow + + - do: + watcher.put_watch: + id: "my_watch" + master_timeout: "40s" + body: > + { + "trigger" : { + "schedule" : { "cron" : "0 0 0 1 * ? 2099" } + }, + "input": { + "simple": { + "payload": { + "send": "yes" + } + } + }, + "condition": { + "always": {} + }, + "actions": { + "test_index": { + "index": { + "index": "test", + "doc_type": "test2" + } + } + } + } + + - match: { _id: "my_watch" } + + - do: + cluster.health: + wait_for_status: yellow + + - do: + watcher.get_watch: + id: "my_watch" + + - match: { found : true} + - match: { _id: "my_watch" } + - match: { _status.state.active: true } + + - do: + watcher.deactivate_watch: + watch_id: "my_watch" + + - match: { "_status.state.active" : false } + + - do: + watcher.get_watch: + id: "my_watch" + - match: { found : true} + - match: { _id: "my_watch" } + - match: { _status.state.active: false } + + - do: + watcher.activate_watch: + watch_id: "my_watch" + + - match: { "_status.state.active" : true } + + - do: + watcher.get_watch: + id: "my_watch" + + - match: { found : true} + - match: { _id: "my_watch" } + - match: { _status.state.active: true } + + - do: + watcher.delete_watch: + id: "my_watch" + + - match: { found: true } diff --git a/watcher/rest-api-spec/test/put_watch/40_put_watch_as_inactive.yaml b/watcher/rest-api-spec/test/put_watch/40_put_watch_as_inactive.yaml new file mode 100644 index 00000000000..72fa737df70 --- /dev/null +++ b/watcher/rest-api-spec/test/put_watch/40_put_watch_as_inactive.yaml @@ -0,0 +1,49 @@ +--- +"Test put inactive watch": + - do: + cluster.health: + wait_for_status: green + + - do: + watcher.put_watch: + id: "my_watch" + master_timeout: "40s" + active: false + body: > + { + "trigger": { + "schedule": { + "hourly": { + "minute": [ 0, 5 ] + } + } + }, + "input": { + "simple": { + "payload": { + "send": "yes" + } + } + }, + "condition": { + "always": {} + }, + "actions": { + "test_index": { + "index": { + "index": "test", + "doc_type": "test2" + } + } + } + } + - match: { _id: "my_watch" } + + - do: + watcher.get_watch: + id: "my_watch" + + - match: { found : true } + - match: { _id: "my_watch" } + - match: { _status.version: 1 } + - match: { _status.state.active: false } diff --git a/watcher/src/main/java/org/elasticsearch/watcher/WatcherPlugin.java b/watcher/src/main/java/org/elasticsearch/watcher/WatcherPlugin.java index 2a9ed30c84f..f208a8a82b0 100644 --- a/watcher/src/main/java/org/elasticsearch/watcher/WatcherPlugin.java +++ b/watcher/src/main/java/org/elasticsearch/watcher/WatcherPlugin.java @@ -46,6 +46,8 @@ import org.elasticsearch.watcher.support.validation.WatcherSettingsValidation; import org.elasticsearch.watcher.transform.TransformModule; import org.elasticsearch.watcher.transport.actions.ack.AckWatchAction; import org.elasticsearch.watcher.transport.actions.ack.TransportAckWatchAction; +import org.elasticsearch.watcher.transport.actions.activate.ActivateWatchAction; +import org.elasticsearch.watcher.transport.actions.activate.TransportActivateWatchAction; import org.elasticsearch.watcher.transport.actions.delete.DeleteWatchAction; import org.elasticsearch.watcher.transport.actions.delete.TransportDeleteWatchAction; import org.elasticsearch.watcher.transport.actions.execute.ExecuteWatchAction; @@ -62,7 +64,6 @@ import org.elasticsearch.watcher.trigger.TriggerModule; import org.elasticsearch.watcher.trigger.schedule.ScheduleModule; import org.elasticsearch.watcher.watch.WatchModule; -import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; @@ -98,10 +99,10 @@ public class WatcherPlugin extends Plugin { @Override public Collection nodeModules() { - if (enabled == false) { + if (!enabled || transportClient) { return Collections.emptyList(); - } else if (transportClient == false){ - return Arrays.asList( + } + return Arrays.asList( new WatcherModule(settings), new InitializingModule(), new LicenseModule(), @@ -120,14 +121,12 @@ public class WatcherPlugin extends Plugin { new ExecutionModule(), new WatcherShieldModule(settings), new SecretModule(settings)); - } - return Collections.emptyList(); } @Override public Collection> nodeServices() { if (!enabled || transportClient) { - return Collections.EMPTY_SET; + return Collections.emptyList(); } return Arrays.>asList( // the initialization service must be first in the list @@ -156,7 +155,7 @@ public class WatcherPlugin extends Plugin { public void onModule(ScriptModule module) { module.registerScriptContext(ScriptServiceProxy.INSTANCE); - if (enabled && transportClient == false) { + if (enabled && !transportClient) { module.addScriptEngine(XMustacheScriptEngineService.class); } } @@ -168,7 +167,7 @@ public class WatcherPlugin extends Plugin { } public void onModule(RestModule module) { - if (enabled && transportClient == false) { + if (enabled && !transportClient) { module.addRestAction(RestPutWatchAction.class); module.addRestAction(RestDeleteWatchAction.class); module.addRestAction(RestWatcherStatsAction.class); @@ -176,6 +175,7 @@ public class WatcherPlugin extends Plugin { module.addRestAction(RestGetWatchAction.class); module.addRestAction(RestWatchServiceAction.class); module.addRestAction(RestAckWatchAction.class); + module.addRestAction(RestActivateWatchAction.class); module.addRestAction(RestExecuteWatchAction.class); module.addRestAction(RestHijackOperationAction.class); } @@ -188,6 +188,7 @@ public class WatcherPlugin extends Plugin { module.registerAction(GetWatchAction.INSTANCE, TransportGetWatchAction.class); module.registerAction(WatcherStatsAction.INSTANCE, TransportWatcherStatsAction.class); module.registerAction(AckWatchAction.INSTANCE, TransportAckWatchAction.class); + module.registerAction(ActivateWatchAction.INSTANCE, TransportActivateWatchAction.class); module.registerAction(WatcherServiceAction.INSTANCE, TransportWatcherServiceAction.class); module.registerAction(ExecuteWatchAction.INSTANCE, TransportExecuteWatchAction.class); } diff --git a/watcher/src/main/java/org/elasticsearch/watcher/WatcherService.java b/watcher/src/main/java/org/elasticsearch/watcher/WatcherService.java index a2f02276cad..8188e32448b 100644 --- a/watcher/src/main/java/org/elasticsearch/watcher/WatcherService.java +++ b/watcher/src/main/java/org/elasticsearch/watcher/WatcherService.java @@ -23,6 +23,7 @@ import org.elasticsearch.watcher.watch.Watch; import org.elasticsearch.watcher.watch.WatchLockService; import org.elasticsearch.watcher.watch.WatchStatus; import org.elasticsearch.watcher.watch.WatchStore; +import org.joda.time.DateTime; import org.joda.time.DateTimeZone; import org.joda.time.PeriodType; @@ -66,7 +67,8 @@ public class WatcherService extends AbstractComponent { // Try to load watch store before the execution service, b/c action depends on watch store watchStore.start(clusterState); executionService.start(clusterState); - triggerService.start(watchStore.watches().values()); + + triggerService.start(watchStore.activeWatches()); state.set(WatcherState.STARTED); logger.info("watch service has started"); } catch (Exception e) { @@ -122,17 +124,40 @@ public class WatcherService extends AbstractComponent { } } - public IndexResponse putWatch(String id, BytesReference watchSource, TimeValue timeout) throws IOException { + public IndexResponse putWatch(String id, BytesReference watchSource, TimeValue timeout, boolean active) throws IOException { ensureStarted(); WatchLockService.Lock lock = watchLockService.tryAcquire(id, timeout); if (lock == null) { throw new ElasticsearchTimeoutException("could not put watch [{}] within [{}]... wait and try again. If this error continues to occur there is a high chance that the watch execution is stuck (either due to unresponsive external system such as an email service, or due to a bad script", id, timeout.format(PeriodType.seconds())); } try { - Watch watch = watchParser.parseWithSecrets(id, false, watchSource); + DateTime now = clock.nowUTC(); + Watch watch = watchParser.parseWithSecrets(id, false, watchSource, now); + watch.setState(active, now); WatchStore.WatchPut result = watchStore.put(watch); - if (result.previous() == null || !result.previous().trigger().equals(result.current().trigger())) { - triggerService.add(result.current()); + + if (result.previous() == null) { + // this is a newly created watch, so we only need to schedule it if it's active + if (result.current().status().state().isActive()) { + triggerService.add(result.current()); + } + + } else if (result.current().status().state().isActive()) { + + if (!result.previous().status().state().isActive()) { + // the replaced watch was inactive, which means it wasn't scheduled. The new watch is active + // so we need to schedule it + triggerService.add(result.current()); + + } else if (!result.previous().trigger().equals(result.current().trigger())) { + // the previous watch was active and its schedule is different than the schedule of the + // new watch, so we need to + triggerService.add(result.current()); + } + } else { + // if the current is inactive, we'll just remove it from the trigger service + // just to be safe + triggerService.remove(result.current().id()); } return result.indexResponse(); } finally { @@ -184,6 +209,60 @@ public class WatcherService extends AbstractComponent { } } + public WatchStatus activateWatch(String id, TimeValue timeout) throws IOException { + return setWatchState(id, true, timeout); + } + + public WatchStatus deactivateWatch(String id, TimeValue timeout) throws IOException { + return setWatchState(id, false, timeout); + } + + WatchStatus setWatchState(String id, boolean active, TimeValue timeout) throws IOException { + ensureStarted(); + WatchLockService.Lock lock = watchLockService.tryAcquire(id, timeout); + if (lock == null) { + throw new ElasticsearchTimeoutException("could not ack watch [{}] within [{}]... wait and try again. If this error continues to occur there is a high chance that the watch execution is stuck (either due to unresponsive external system such as an email service, or due to a bad script", id, timeout.format(PeriodType.seconds())); + } + + // for now, when a watch is deactivated we don't remove its runtime representation + // that is, the store will still keep the watch in memory. We only mark the watch + // as inactive (both in runtime and also update the watch in the watches index) + // and remove the watch from the trigger service, such that it will not be triggered + // nor its trigger be evaluated. + // + // later on we can consider removing the watch runtime representation from memory + // as well. This will mean that the in-memory loaded watches will no longer be a + // complete representation of the watches in the index. This requires careful thought + // to make sure, such incompleteness doesn't hurt any other part of watcher (we need + // to run this exercise anyway... and make sure that nothing in watcher relies on the + // fact that the watch store holds all watches in memory. + + try { + Watch watch = watchStore.get(id); + if (watch == null) { + throw illegalArgument("watch [{}] does not exist", id); + } + if (watch.setState(active, clock.nowUTC())) { + try { + watchStore.updateStatus(watch); + if (active) { + triggerService.add(watch); + } else { + triggerService.remove(watch.id()); + } + } catch (IOException ioe) { + throw ioException("failed to update the watch [{}] on ack", ioe, watch.id()); + } catch (VersionConflictEngineException vcee) { + throw illegalState("failed to update the watch [{}] on ack, perhaps it was force deleted", vcee, watch.id()); + } + } + // we need to create a safe copy of the status + return new WatchStatus(watch.status()); + } finally { + lock.release(); + } + } + public long watchesCount() { return watchStore.watches().size(); } diff --git a/watcher/src/main/java/org/elasticsearch/watcher/actions/email/EmailAction.java b/watcher/src/main/java/org/elasticsearch/watcher/actions/email/EmailAction.java index f112e6b8653..8c603945b39 100644 --- a/watcher/src/main/java/org/elasticsearch/watcher/actions/email/EmailAction.java +++ b/watcher/src/main/java/org/elasticsearch/watcher/actions/email/EmailAction.java @@ -17,7 +17,7 @@ import org.elasticsearch.watcher.actions.email.service.Email; import org.elasticsearch.watcher.actions.email.service.EmailTemplate; import org.elasticsearch.watcher.actions.email.service.Profile; import org.elasticsearch.watcher.support.secret.Secret; -import org.elasticsearch.watcher.support.secret.SensitiveXContentParser; +import org.elasticsearch.watcher.support.xcontent.WatcherXContentParser; import org.elasticsearch.watcher.support.xcontent.WatcherParams; import java.io.IOException; @@ -141,7 +141,7 @@ public class EmailAction implements Action { } else if (ParseFieldMatcher.STRICT.match(currentFieldName, Field.USER)) { user = parser.text(); } else if (ParseFieldMatcher.STRICT.match(currentFieldName, Field.PASSWORD)) { - password = SensitiveXContentParser.secretOrNull(parser); + password = WatcherXContentParser.secretOrNull(parser); } else if (ParseFieldMatcher.STRICT.match(currentFieldName, Field.PROFILE)) { try { profile = Profile.resolve(parser.text()); diff --git a/watcher/src/main/java/org/elasticsearch/watcher/actions/hipchat/service/HipChatMessage.java b/watcher/src/main/java/org/elasticsearch/watcher/actions/hipchat/service/HipChatMessage.java index 37fa8ae1113..b1301a46922 100644 --- a/watcher/src/main/java/org/elasticsearch/watcher/actions/hipchat/service/HipChatMessage.java +++ b/watcher/src/main/java/org/elasticsearch/watcher/actions/hipchat/service/HipChatMessage.java @@ -319,7 +319,7 @@ public class HipChatMessage implements ToXContent { throw new ElasticsearchParseException("failed to parse hipchat message. failed to parse [{}] field", ilae, Field.FORMAT.getPreferredName()); } } else { - throw new ElasticsearchParseException("failed to parse hipchat message. unexpected token [{}]", token); + throw new ElasticsearchParseException("failed to parse hipchat message. unexpected field [{}]", currentFieldName); } } diff --git a/watcher/src/main/java/org/elasticsearch/watcher/client/WatcherClient.java b/watcher/src/main/java/org/elasticsearch/watcher/client/WatcherClient.java index 6d0638cdf82..b5304798ef4 100644 --- a/watcher/src/main/java/org/elasticsearch/watcher/client/WatcherClient.java +++ b/watcher/src/main/java/org/elasticsearch/watcher/client/WatcherClient.java @@ -14,6 +14,10 @@ import org.elasticsearch.watcher.transport.actions.ack.AckWatchAction; import org.elasticsearch.watcher.transport.actions.ack.AckWatchRequest; import org.elasticsearch.watcher.transport.actions.ack.AckWatchRequestBuilder; import org.elasticsearch.watcher.transport.actions.ack.AckWatchResponse; +import org.elasticsearch.watcher.transport.actions.activate.ActivateWatchAction; +import org.elasticsearch.watcher.transport.actions.activate.ActivateWatchRequest; +import org.elasticsearch.watcher.transport.actions.activate.ActivateWatchRequestBuilder; +import org.elasticsearch.watcher.transport.actions.activate.ActivateWatchResponse; import org.elasticsearch.watcher.transport.actions.delete.DeleteWatchAction; import org.elasticsearch.watcher.transport.actions.delete.DeleteWatchRequest; import org.elasticsearch.watcher.transport.actions.delete.DeleteWatchRequestBuilder; @@ -217,7 +221,7 @@ public class WatcherClient { } /** - * Acks an watch + * Acks a watch * * @param request The ack request with the watch id to be acked * @return The AckWatchResponse @@ -226,6 +230,37 @@ public class WatcherClient { return client.execute(AckWatchAction.INSTANCE, request); } + /** + * Creates a request builder to activate a watch by id + * + * @param id the id of the watch + * @param activate indicates whether to activate or deactivate the watch + * @return The request builder + */ + public ActivateWatchRequestBuilder prepareActivateWatch(String id, boolean activate) { + return new ActivateWatchRequestBuilder(client, id, activate); + } + + /** + * Activate a watch + * + * @param request The activate request with the watch id + * @param listener The listener for the activate watch response + */ + public void activateWatch(ActivateWatchRequest request, ActionListener listener) { + client.execute(ActivateWatchAction.INSTANCE, request, listener); + } + + /** + * Activates a watch + * + * @param request The de/activate request with the watch id. + */ + public ActionFuture activateWatch(ActivateWatchRequest request) { + return client.execute(ActivateWatchAction.INSTANCE, request); + } + + /** * Prepare a watch service request. */ diff --git a/watcher/src/main/java/org/elasticsearch/watcher/rest/action/RestActivateWatchAction.java b/watcher/src/main/java/org/elasticsearch/watcher/rest/action/RestActivateWatchAction.java new file mode 100644 index 00000000000..ea6f8c3ea85 --- /dev/null +++ b/watcher/src/main/java/org/elasticsearch/watcher/rest/action/RestActivateWatchAction.java @@ -0,0 +1,69 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.watcher.rest.action; + +import org.elasticsearch.client.Client; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.rest.*; +import org.elasticsearch.rest.action.support.RestBuilderListener; +import org.elasticsearch.watcher.client.WatcherClient; +import org.elasticsearch.watcher.rest.WatcherRestHandler; +import org.elasticsearch.watcher.support.xcontent.WatcherParams; +import org.elasticsearch.watcher.transport.actions.activate.ActivateWatchRequest; +import org.elasticsearch.watcher.transport.actions.activate.ActivateWatchResponse; +import org.elasticsearch.watcher.watch.Watch; + +/** + * The rest action to de/activate a watch + */ +public class RestActivateWatchAction extends WatcherRestHandler { + + @Inject + protected RestActivateWatchAction(Settings settings, RestController controller, Client client) { + super(settings, controller, client); + controller.registerHandler(RestRequest.Method.PUT, URI_BASE + "/watch/{id}/_activate", this); + controller.registerHandler(RestRequest.Method.POST, URI_BASE + "/watch/{id}/_activate", this); + DeactivateRestHandler deactivateRestHandler = new DeactivateRestHandler(settings, controller, client); + controller.registerHandler(RestRequest.Method.PUT, URI_BASE + "/watch/{id}/_deactivate", deactivateRestHandler); + controller.registerHandler(RestRequest.Method.POST, URI_BASE + "/watch/{id}/_deactivate", deactivateRestHandler); + } + + @Override + protected void handleRequest(RestRequest request, RestChannel channel, WatcherClient client) throws Exception { + String watchId = request.param("id"); + client.activateWatch(new ActivateWatchRequest(watchId, true), new RestBuilderListener(channel) { + @Override + public RestResponse buildResponse(ActivateWatchResponse response, XContentBuilder builder) throws Exception { + return new BytesRestResponse(RestStatus.OK, builder.startObject() + .field(Watch.Field.STATUS.getPreferredName(), response.getStatus(), WatcherParams.HIDE_SECRETS) + .endObject()); + } + }); + } + + static class DeactivateRestHandler extends WatcherRestHandler { + + public DeactivateRestHandler(Settings settings, RestController controller, Client client) { + super(settings, controller, client); + } + + @Override + protected void handleRequest(RestRequest request, RestChannel channel, WatcherClient client) throws Exception { + String watchId = request.param("id"); + client.activateWatch(new ActivateWatchRequest(watchId, false), new RestBuilderListener(channel) { + @Override + public RestResponse buildResponse(ActivateWatchResponse response, XContentBuilder builder) throws Exception { + return new BytesRestResponse(RestStatus.OK, builder.startObject() + .field(Watch.Field.STATUS.getPreferredName(), response.getStatus(), WatcherParams.HIDE_SECRETS) + .endObject()); + } + }); + } + } + +} diff --git a/watcher/src/main/java/org/elasticsearch/watcher/rest/action/RestPutWatchAction.java b/watcher/src/main/java/org/elasticsearch/watcher/rest/action/RestPutWatchAction.java index 56d8b550154..8a03254a74d 100644 --- a/watcher/src/main/java/org/elasticsearch/watcher/rest/action/RestPutWatchAction.java +++ b/watcher/src/main/java/org/elasticsearch/watcher/rest/action/RestPutWatchAction.java @@ -36,6 +36,7 @@ public class RestPutWatchAction extends WatcherRestHandler { protected void handleRequest(final RestRequest request, RestChannel channel, WatcherClient client) throws Exception { PutWatchRequest putWatchRequest = new PutWatchRequest(request.param("id"), request.content()); putWatchRequest.masterNodeTimeout(request.paramAsTime("master_timeout", putWatchRequest.masterNodeTimeout())); + putWatchRequest.setActive(request.paramAsBoolean("active", putWatchRequest.isActive())); client.putWatch(putWatchRequest, new RestBuilderListener(channel) { @Override public RestResponse buildResponse(PutWatchResponse response, XContentBuilder builder) throws Exception { diff --git a/watcher/src/main/java/org/elasticsearch/watcher/support/WatcherDateTimeUtils.java b/watcher/src/main/java/org/elasticsearch/watcher/support/WatcherDateTimeUtils.java index 56edc1a0992..c0f03c017a5 100644 --- a/watcher/src/main/java/org/elasticsearch/watcher/support/WatcherDateTimeUtils.java +++ b/watcher/src/main/java/org/elasticsearch/watcher/support/WatcherDateTimeUtils.java @@ -107,6 +107,14 @@ public class WatcherDateTimeUtils { return builder.field(fieldName, formatDate(date)); } + public static void writeDate(StreamOutput out, DateTime date) throws IOException { + out.writeLong(date.getMillis()); + } + + public static DateTime readDate(StreamInput in, DateTimeZone timeZone) throws IOException { + return new DateTime(in.readLong(), timeZone); + } + public static void writeOptionalDate(StreamOutput out, DateTime date) throws IOException { if (date == null) { out.writeBoolean(false); diff --git a/watcher/src/main/java/org/elasticsearch/watcher/support/clock/HaltedClock.java b/watcher/src/main/java/org/elasticsearch/watcher/support/clock/HaltedClock.java new file mode 100644 index 00000000000..cd52818c38e --- /dev/null +++ b/watcher/src/main/java/org/elasticsearch/watcher/support/clock/HaltedClock.java @@ -0,0 +1,47 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.watcher.support.clock; + +import org.elasticsearch.common.unit.TimeValue; +import org.joda.time.DateTime; +import org.joda.time.DateTimeZone; + +/** + * + */ +public class HaltedClock implements Clock { + + private final DateTime now; + + public HaltedClock(DateTime now) { + this.now = now.toDateTime(DateTimeZone.UTC); + } + + @Override + public long millis() { + return now.getMillis(); + } + + @Override + public long nanos() { + return millis() * 1000000; + } + + @Override + public DateTime nowUTC() { + return now; + } + + @Override + public DateTime now(DateTimeZone timeZone) { + return now.toDateTime(timeZone); + } + + @Override + public TimeValue timeElapsedSince(DateTime time) { + return TimeValue.timeValueMillis(millis() - time.getMillis()); + } +} diff --git a/watcher/src/main/java/org/elasticsearch/watcher/support/http/auth/basic/BasicAuth.java b/watcher/src/main/java/org/elasticsearch/watcher/support/http/auth/basic/BasicAuth.java index ed7f66ab0ec..043e09878a8 100644 --- a/watcher/src/main/java/org/elasticsearch/watcher/support/http/auth/basic/BasicAuth.java +++ b/watcher/src/main/java/org/elasticsearch/watcher/support/http/auth/basic/BasicAuth.java @@ -11,7 +11,7 @@ import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.watcher.support.http.auth.HttpAuth; import org.elasticsearch.watcher.support.secret.Secret; -import org.elasticsearch.watcher.support.secret.SensitiveXContentParser; +import org.elasticsearch.watcher.support.xcontent.WatcherXContentParser; import org.elasticsearch.watcher.support.xcontent.WatcherParams; import java.io.IOException; @@ -85,7 +85,7 @@ public class BasicAuth implements HttpAuth { if (Field.USERNAME.getPreferredName().equals(fieldName)) { username = parser.text(); } else if (Field.PASSWORD.getPreferredName().equals(fieldName)) { - password = SensitiveXContentParser.secret(parser); + password = WatcherXContentParser.secret(parser); } else { throw new ElasticsearchParseException("unsupported field [" + fieldName + "]"); } diff --git a/watcher/src/main/java/org/elasticsearch/watcher/support/secret/SensitiveXContentParser.java b/watcher/src/main/java/org/elasticsearch/watcher/support/xcontent/WatcherXContentParser.java similarity index 73% rename from watcher/src/main/java/org/elasticsearch/watcher/support/secret/SensitiveXContentParser.java rename to watcher/src/main/java/org/elasticsearch/watcher/support/xcontent/WatcherXContentParser.java index cc7e0bb36f3..b2ea9e8a281 100644 --- a/watcher/src/main/java/org/elasticsearch/watcher/support/secret/SensitiveXContentParser.java +++ b/watcher/src/main/java/org/elasticsearch/watcher/support/xcontent/WatcherXContentParser.java @@ -3,28 +3,41 @@ * or more contributor license agreements. Licensed under the Elastic License; * you may not use this file except in compliance with the Elastic License. */ -package org.elasticsearch.watcher.support.secret; +package org.elasticsearch.watcher.support.xcontent; import org.apache.lucene.util.BytesRef; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.common.xcontent.XContentLocation; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.watcher.support.clock.Clock; +import org.elasticsearch.watcher.support.clock.SystemClock; +import org.elasticsearch.watcher.support.secret.Secret; +import org.elasticsearch.watcher.support.secret.SecretService; +import javax.annotation.Nullable; import java.io.IOException; import java.util.List; import java.util.Map; /** - * + * A xcontent parser that is used by watcher. This is a special parser that is + * aware of watcher services. In particular, it's aware of the used {@link Clock} + * and the {@link SecretService}. The former (clock) may be used when the current time + * is required during the parse phase of construct. The latter (secret service) is used + * to convert secret values (e.g. passwords, security tokens, etc..) to {@link Secret}s. + * {@link Secret}s are encrypted values that are stored in memory and are decrypted + * on demand when needed. */ -public class SensitiveXContentParser implements XContentParser { +public class WatcherXContentParser implements XContentParser { public static Secret secret(XContentParser parser) throws IOException { char[] chars = parser.text().toCharArray(); - if (parser instanceof SensitiveXContentParser) { - chars = ((SensitiveXContentParser) parser).secretService.encrypt(chars); - return new Secret(chars); + if (parser instanceof WatcherXContentParser) { + WatcherXContentParser watcherParser = (WatcherXContentParser) parser; + if (watcherParser.secretService != null) { + chars = watcherParser.secretService.encrypt(chars); + } } return new Secret(chars); } @@ -35,17 +48,29 @@ public class SensitiveXContentParser implements XContentParser { return null; } char[] chars = parser.text().toCharArray(); - if (parser instanceof SensitiveXContentParser) { - chars = ((SensitiveXContentParser) parser).secretService.encrypt(text.toCharArray()); + if (parser instanceof WatcherXContentParser) { + WatcherXContentParser watcherParser = (WatcherXContentParser) parser; + if (watcherParser.secretService != null) { + chars = watcherParser.secretService.encrypt(text.toCharArray()); + } return new Secret(chars); } return new Secret(chars); } - private final XContentParser parser; - private final SecretService secretService; + public static Clock clock(XContentParser parser) { + if (parser instanceof WatcherXContentParser) { + return ((WatcherXContentParser) parser).clock; + } + return SystemClock.INSTANCE; + } - public SensitiveXContentParser(XContentParser parser, SecretService secretService) { + private final Clock clock; + private final XContentParser parser; + private final @Nullable SecretService secretService; + + public WatcherXContentParser(XContentParser parser, Clock clock, @Nullable SecretService secretService) { + this.clock = clock; this.parser = parser; this.secretService = secretService; } diff --git a/watcher/src/main/java/org/elasticsearch/watcher/transport/actions/ack/AckWatchResponse.java b/watcher/src/main/java/org/elasticsearch/watcher/transport/actions/ack/AckWatchResponse.java index 4ed41f76396..8660ff433e0 100644 --- a/watcher/src/main/java/org/elasticsearch/watcher/transport/actions/ack/AckWatchResponse.java +++ b/watcher/src/main/java/org/elasticsearch/watcher/transport/actions/ack/AckWatchResponse.java @@ -14,7 +14,8 @@ import org.elasticsearch.watcher.watch.WatchStatus; import java.io.IOException; /** - * This class contains the ackState of the watch, if the watch was successfully acked this will be ACK + * This class contains the status of the watch. If the watch was successfully acked + * this will be reflected in the watch status. */ public class AckWatchResponse extends ActionResponse { @@ -28,7 +29,7 @@ public class AckWatchResponse extends ActionResponse { } /** - * @return The ack state for the watch + * @return The watch status */ public WatchStatus getStatus() { return status; diff --git a/watcher/src/main/java/org/elasticsearch/watcher/transport/actions/ack/TransportAckWatchAction.java b/watcher/src/main/java/org/elasticsearch/watcher/transport/actions/ack/TransportAckWatchAction.java index a8657b3b27e..be75a453d94 100644 --- a/watcher/src/main/java/org/elasticsearch/watcher/transport/actions/ack/TransportAckWatchAction.java +++ b/watcher/src/main/java/org/elasticsearch/watcher/transport/actions/ack/TransportAckWatchAction.java @@ -24,7 +24,7 @@ import org.elasticsearch.watcher.watch.WatchStatus; import org.elasticsearch.watcher.watch.WatchStore; /** - * Performs the delete operation. + * Performs the ack operation. */ public class TransportAckWatchAction extends WatcherTransportAction { diff --git a/watcher/src/main/java/org/elasticsearch/watcher/transport/actions/activate/ActivateWatchAction.java b/watcher/src/main/java/org/elasticsearch/watcher/transport/actions/activate/ActivateWatchAction.java new file mode 100644 index 00000000000..5ea0c3833ee --- /dev/null +++ b/watcher/src/main/java/org/elasticsearch/watcher/transport/actions/activate/ActivateWatchAction.java @@ -0,0 +1,32 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.watcher.transport.actions.activate; + +import org.elasticsearch.action.Action; +import org.elasticsearch.client.ElasticsearchClient; + +/** + * This action acks a watch in memory, and the index + */ +public class ActivateWatchAction extends Action { + + public static final ActivateWatchAction INSTANCE = new ActivateWatchAction(); + public static final String NAME = "cluster:admin/watcher/watch/activate"; + + private ActivateWatchAction() { + super(NAME); + } + + @Override + public ActivateWatchResponse newResponse() { + return new ActivateWatchResponse(); + } + + @Override + public ActivateWatchRequestBuilder newRequestBuilder(ElasticsearchClient client) { + return new ActivateWatchRequestBuilder(client); + } +} diff --git a/watcher/src/main/java/org/elasticsearch/watcher/transport/actions/activate/ActivateWatchRequest.java b/watcher/src/main/java/org/elasticsearch/watcher/transport/actions/activate/ActivateWatchRequest.java new file mode 100644 index 00000000000..78977479c2d --- /dev/null +++ b/watcher/src/main/java/org/elasticsearch/watcher/transport/actions/activate/ActivateWatchRequest.java @@ -0,0 +1,86 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.watcher.transport.actions.activate; + +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.ValidateActions; +import org.elasticsearch.action.support.master.MasterNodeRequest; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.watcher.support.validation.Validation; + +import java.io.IOException; + +/** + * A ack watch request to ack a watch by name (id) + */ +public class ActivateWatchRequest extends MasterNodeRequest { + + private static final TimeValue DEFAULT_TIMEOUT = TimeValue.timeValueSeconds(10); + + private String watchId; + private boolean activate; + + public ActivateWatchRequest() { + this(null, true); + } + + public ActivateWatchRequest(String watchId, boolean activate) { + this.watchId = watchId; + this.activate = activate; + masterNodeTimeout(DEFAULT_TIMEOUT); + } + + /** + * @return The id of the watch to be acked + */ + public String getWatchId() { + return watchId; + } + + /** + * @return {@code true} if the request is for activating the watch, {@code false} if its + * for deactivating it. + */ + public boolean isActivate() { + return activate; + } + + @Override + public ActionRequestValidationException validate() { + ActionRequestValidationException validationException = null; + if (watchId == null){ + validationException = ValidateActions.addValidationError("watch id is missing", validationException); + } + Validation.Error error = Validation.watchId(watchId); + if (error != null) { + validationException = ValidateActions.addValidationError(error.message(), validationException); + } + return validationException; + } + + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + watchId = in.readString(); + activate = in.readBoolean(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeString(watchId); + out.writeBoolean(activate); + } + + @Override + public String toString() { + return activate ? + "activate [" + watchId + "]" : + "deactivate [" + watchId + "]"; + } +} diff --git a/watcher/src/main/java/org/elasticsearch/watcher/transport/actions/activate/ActivateWatchRequestBuilder.java b/watcher/src/main/java/org/elasticsearch/watcher/transport/actions/activate/ActivateWatchRequestBuilder.java new file mode 100644 index 00000000000..1cda3d3e1da --- /dev/null +++ b/watcher/src/main/java/org/elasticsearch/watcher/transport/actions/activate/ActivateWatchRequestBuilder.java @@ -0,0 +1,24 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.watcher.transport.actions.activate; + +import org.elasticsearch.action.support.master.MasterNodeOperationRequestBuilder; +import org.elasticsearch.client.ElasticsearchClient; + +/** + * A activate watch action request builder. + */ +public class ActivateWatchRequestBuilder extends MasterNodeOperationRequestBuilder { + + public ActivateWatchRequestBuilder(ElasticsearchClient client) { + super(client, ActivateWatchAction.INSTANCE, new ActivateWatchRequest()); + } + + public ActivateWatchRequestBuilder(ElasticsearchClient client, String id, boolean activate) { + super(client, ActivateWatchAction.INSTANCE, new ActivateWatchRequest(id, activate)); + } + +} diff --git a/watcher/src/main/java/org/elasticsearch/watcher/transport/actions/activate/ActivateWatchResponse.java b/watcher/src/main/java/org/elasticsearch/watcher/transport/actions/activate/ActivateWatchResponse.java new file mode 100644 index 00000000000..0377bf10143 --- /dev/null +++ b/watcher/src/main/java/org/elasticsearch/watcher/transport/actions/activate/ActivateWatchResponse.java @@ -0,0 +1,52 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.watcher.transport.actions.activate; + +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.watcher.watch.WatchStatus; + +import java.io.IOException; + +/** + * This class contains the status of the watch. If the watch was successfully de/activates + * this will reflected the new state of the watch. + */ +public class ActivateWatchResponse extends ActionResponse { + + private WatchStatus status; + + public ActivateWatchResponse() { + } + + public ActivateWatchResponse(@Nullable WatchStatus status) { + this.status = status; + } + + /** + * @return The watch status + */ + public WatchStatus getStatus() { + return status; + } + + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + status = in.readBoolean() ? WatchStatus.read(in) : null; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeBoolean(status != null); + if (status != null) { + status.writeTo(out); + } + } +} diff --git a/watcher/src/main/java/org/elasticsearch/watcher/transport/actions/activate/TransportActivateWatchAction.java b/watcher/src/main/java/org/elasticsearch/watcher/transport/actions/activate/TransportActivateWatchAction.java new file mode 100644 index 00000000000..a04684e0a3f --- /dev/null +++ b/watcher/src/main/java/org/elasticsearch/watcher/transport/actions/activate/TransportActivateWatchAction.java @@ -0,0 +1,70 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.watcher.transport.actions.activate; + +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.cluster.ClusterService; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.block.ClusterBlockException; +import org.elasticsearch.cluster.block.ClusterBlockLevel; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; +import org.elasticsearch.watcher.WatcherService; +import org.elasticsearch.watcher.license.LicenseService; +import org.elasticsearch.watcher.transport.actions.WatcherTransportAction; +import org.elasticsearch.watcher.watch.WatchStatus; +import org.elasticsearch.watcher.watch.WatchStore; + +/** + * Performs the watch de/activation operation. + */ +public class TransportActivateWatchAction extends WatcherTransportAction { + + private final WatcherService watcherService; + + @Inject + public TransportActivateWatchAction(Settings settings, TransportService transportService, ClusterService clusterService, + ThreadPool threadPool, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, + WatcherService watcherService, LicenseService licenseService) { + super(settings, ActivateWatchAction.NAME, transportService, clusterService, threadPool, actionFilters, indexNameExpressionResolver, licenseService, ActivateWatchRequest.class); + this.watcherService = watcherService; + } + + @Override + protected String executor() { + return ThreadPool.Names.MANAGEMENT; + } + + @Override + protected ActivateWatchResponse newResponse() { + return new ActivateWatchResponse(); + } + + @Override + protected void masterOperation(ActivateWatchRequest request, ClusterState state, ActionListener listener) throws ElasticsearchException { + try { + WatchStatus watchStatus = request.isActivate() ? + watcherService.activateWatch(request.getWatchId(), request.masterNodeTimeout()) : + watcherService.deactivateWatch(request.getWatchId(), request.masterNodeTimeout()); + ActivateWatchResponse response = new ActivateWatchResponse(watchStatus); + listener.onResponse(response); + } catch (Exception e) { + listener.onFailure(e); + } + } + + @Override + protected ClusterBlockException checkBlock(ActivateWatchRequest request, ClusterState state) { + return state.blocks().indexBlockedException(ClusterBlockLevel.WRITE, WatchStore.INDEX); + } + + +} diff --git a/watcher/src/main/java/org/elasticsearch/watcher/transport/actions/put/PutWatchRequest.java b/watcher/src/main/java/org/elasticsearch/watcher/transport/actions/put/PutWatchRequest.java index 74ddd08f7c0..1854b40aee6 100644 --- a/watcher/src/main/java/org/elasticsearch/watcher/transport/actions/put/PutWatchRequest.java +++ b/watcher/src/main/java/org/elasticsearch/watcher/transport/actions/put/PutWatchRequest.java @@ -8,14 +8,13 @@ package org.elasticsearch.watcher.transport.actions.put; import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.ValidateActions; -import org.elasticsearch.action.support.master.MasterNodeReadRequest; import org.elasticsearch.action.support.master.MasterNodeRequest; -import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.watcher.client.WatchSourceBuilder; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.watcher.client.WatchSourceBuilder; import org.elasticsearch.watcher.support.validation.Validation; import java.io.IOException; @@ -30,6 +29,7 @@ public class PutWatchRequest extends MasterNodeRequest { private String id; private BytesReference source; + private boolean active = true; PutWatchRequest() { } @@ -79,6 +79,20 @@ public class PutWatchRequest extends MasterNodeRequest { this.source = source; } + /** + * @return The initial active state of the watch (defaults to {@code true}, e.g. "active") + */ + public boolean isActive() { + return active; + } + + /** + * @return Sets the initial active state of the watch + */ + public void setActive(boolean active) { + this.active = active; + } + @Override public ActionRequestValidationException validate() { ActionRequestValidationException validationException = null; diff --git a/watcher/src/main/java/org/elasticsearch/watcher/transport/actions/put/PutWatchRequestBuilder.java b/watcher/src/main/java/org/elasticsearch/watcher/transport/actions/put/PutWatchRequestBuilder.java index 6cf607a5b3e..8a87aa4b913 100644 --- a/watcher/src/main/java/org/elasticsearch/watcher/transport/actions/put/PutWatchRequestBuilder.java +++ b/watcher/src/main/java/org/elasticsearch/watcher/transport/actions/put/PutWatchRequestBuilder.java @@ -5,12 +5,10 @@ */ package org.elasticsearch.watcher.transport.actions.put; -import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.master.MasterNodeOperationRequestBuilder; import org.elasticsearch.client.ElasticsearchClient; -import org.elasticsearch.watcher.client.WatchSourceBuilder; -import org.elasticsearch.watcher.client.WatcherClient; import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.watcher.client.WatchSourceBuilder; public class PutWatchRequestBuilder extends MasterNodeOperationRequestBuilder { @@ -46,4 +44,12 @@ public class PutWatchRequestBuilder extends MasterNodeOperationRequestBuilder listener) throws ElasticsearchException { try { - IndexResponse indexResponse = watcherService.putWatch(request.getId(), request.getSource(), request.masterNodeTimeout()); + IndexResponse indexResponse = watcherService.putWatch(request.getId(), request.getSource(), request.masterNodeTimeout(), request.isActive()); listener.onResponse(new PutWatchResponse(indexResponse.getId(), indexResponse.getVersion(), indexResponse.isCreated())); } catch (Exception e) { listener.onFailure(e); diff --git a/watcher/src/main/java/org/elasticsearch/watcher/trigger/TriggerService.java b/watcher/src/main/java/org/elasticsearch/watcher/trigger/TriggerService.java index b5ebd30a616..0250344d4ae 100644 --- a/watcher/src/main/java/org/elasticsearch/watcher/trigger/TriggerService.java +++ b/watcher/src/main/java/org/elasticsearch/watcher/trigger/TriggerService.java @@ -13,7 +13,6 @@ import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.common.xcontent.XContentParser; -import org.elasticsearch.watcher.support.Exceptions; import java.io.IOException; import java.util.Collection; @@ -56,6 +55,12 @@ public class TriggerService extends AbstractComponent { } } + /** + * Adds the given job to the trigger service. If there is already a registered job in this service with the + * same job ID, the newly added job will replace the old job (the old job will not be triggered anymore) + * + * @param job The new job + */ public void add(TriggerEngine.Job job) { engines.get(job.trigger().type()).add(job); } diff --git a/watcher/src/main/java/org/elasticsearch/watcher/watch/Watch.java b/watcher/src/main/java/org/elasticsearch/watcher/watch/Watch.java index b70895a5fe4..5b48f869870 100644 --- a/watcher/src/main/java/org/elasticsearch/watcher/watch/Watch.java +++ b/watcher/src/main/java/org/elasticsearch/watcher/watch/Watch.java @@ -32,9 +32,10 @@ import org.elasticsearch.watcher.input.InputRegistry; import org.elasticsearch.watcher.input.none.ExecutableNoneInput; import org.elasticsearch.watcher.support.WatcherDateTimeUtils; import org.elasticsearch.watcher.support.clock.Clock; +import org.elasticsearch.watcher.support.clock.HaltedClock; import org.elasticsearch.watcher.support.secret.SecretService; -import org.elasticsearch.watcher.support.secret.SensitiveXContentParser; import org.elasticsearch.watcher.support.xcontent.WatcherParams; +import org.elasticsearch.watcher.support.xcontent.WatcherXContentParser; import org.elasticsearch.watcher.transform.ExecutableTransform; import org.elasticsearch.watcher.transform.TransformRegistry; import org.elasticsearch.watcher.trigger.Trigger; @@ -126,6 +127,15 @@ public class Watch implements TriggerEngine.Job, ToXContent { this.version = version; } + /** + * Sets the state of this watch to in/active + * + * @return {@code true} if the status of this watch changed, {@code false} otherwise. + */ + public boolean setState(boolean active, DateTime now) { + return status.setActive(active, now); + } + /** * Acks this watch. * @@ -223,7 +233,18 @@ public class Watch implements TriggerEngine.Job, ToXContent { } public Watch parse(String name, boolean includeStatus, BytesReference source) throws IOException { - return parse(name, includeStatus, false, source); + return parse(name, includeStatus, false, source, clock.nowUTC()); + } + + public Watch parse(String name, boolean includeStatus, BytesReference source, DateTime now) throws IOException { + return parse(name, includeStatus, false, source, now); + } + + /** + * @see #parseWithSecrets(String, boolean, BytesReference, DateTime) + */ + public Watch parseWithSecrets(String id, boolean includeStatus, BytesReference source) throws IOException { + return parse(id, includeStatus, true, source, clock.nowUTC()); } /** @@ -238,22 +259,19 @@ public class Watch implements TriggerEngine.Job, ToXContent { * This method is only called once - when the user adds a new watch. From that moment on, all representations * of the watch in the system will be use secrets for sensitive data. * - * @see org.elasticsearch.watcher.WatcherService#putWatch(String, BytesReference, TimeValue) + * @see org.elasticsearch.watcher.WatcherService#putWatch(String, BytesReference, TimeValue, boolean) */ - public Watch parseWithSecrets(String id, boolean includeStatus, BytesReference source) throws IOException { - return parse(id, includeStatus, true, source); + public Watch parseWithSecrets(String id, boolean includeStatus, BytesReference source, DateTime now) throws IOException { + return parse(id, includeStatus, true, source, now); } - private Watch parse(String id, boolean includeStatus, boolean withSecrets, BytesReference source) throws IOException { + private Watch parse(String id, boolean includeStatus, boolean withSecrets, BytesReference source, DateTime now) throws IOException { if (logger.isTraceEnabled()) { logger.trace("parsing watch [{}] ", source.toUtf8()); } XContentParser parser = null; try { - parser = XContentHelper.createParser(source); - if (withSecrets) { - parser = new SensitiveXContentParser(parser, secretService); - } + parser = new WatcherXContentParser(XContentHelper.createParser(source), new HaltedClock(now), withSecrets ? secretService : null); parser.nextToken(); return parse(id, includeStatus, parser); } catch (IOException ioe) { @@ -326,11 +344,11 @@ public class Watch implements TriggerEngine.Job, ToXContent { } else { // we need to create the initial statuses for the actions ImmutableMap.Builder actionsStatuses = ImmutableMap.builder(); - DateTime now = clock.now(DateTimeZone.UTC); + DateTime now = WatcherXContentParser.clock(parser).nowUTC(); for (ActionWrapper action : actions) { actionsStatuses.put(action.id(), new ActionStatus(now)); } - status = new WatchStatus(actionsStatuses.build()); + status = new WatchStatus(WatcherXContentParser.clock(parser).nowUTC(), actionsStatuses.build()); } return new Watch(id, trigger, input, condition, transform, throttlePeriod, actions, metatdata, status); diff --git a/watcher/src/main/java/org/elasticsearch/watcher/watch/WatchStatus.java b/watcher/src/main/java/org/elasticsearch/watcher/watch/WatchStatus.java index 46d2faf193e..070f6e5004d 100644 --- a/watcher/src/main/java/org/elasticsearch/watcher/watch/WatchStatus.java +++ b/watcher/src/main/java/org/elasticsearch/watcher/watch/WatchStatus.java @@ -7,6 +7,7 @@ package org.elasticsearch.watcher.watch; import com.google.common.collect.ImmutableMap; import org.elasticsearch.ElasticsearchParseException; +import org.elasticsearch.Version; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.ParseField; import org.elasticsearch.common.ParseFieldMatcher; @@ -19,6 +20,8 @@ import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.watcher.actions.Action; import org.elasticsearch.watcher.actions.ActionStatus; import org.elasticsearch.watcher.actions.throttler.AckThrottler; +import org.elasticsearch.watcher.support.clock.SystemClock; +import org.elasticsearch.watcher.support.xcontent.WatcherXContentParser; import org.joda.time.DateTime; import org.joda.time.DateTimeZone; @@ -36,6 +39,8 @@ public class WatchStatus implements ToXContent, Streamable { private transient long version; + private State state; + private @Nullable DateTime lastChecked; private @Nullable DateTime lastMetCondition; private ImmutableMap actions; @@ -46,19 +51,24 @@ public class WatchStatus implements ToXContent, Streamable { private WatchStatus() { } - public WatchStatus(ImmutableMap actions) { - this(-1, null, null, actions); + public WatchStatus(DateTime now, ImmutableMap actions) { + this(-1, new State(true, now), null, null, actions); } public WatchStatus(WatchStatus other) { - this(other.version, other.lastChecked, other.lastMetCondition, other.actions); + this(other.version, other.state, other.lastChecked, other.lastMetCondition, other.actions); } - private WatchStatus(long version, DateTime lastChecked, DateTime lastMetCondition, ImmutableMap actions) { + private WatchStatus(long version, State state, DateTime lastChecked, DateTime lastMetCondition, ImmutableMap actions) { this.version = version; this.lastChecked = lastChecked; this.lastMetCondition = lastMetCondition; this.actions = actions; + this.state = state; + } + + public State state() { + return state; } public long version() { @@ -186,6 +196,15 @@ public class WatchStatus implements ToXContent, Streamable { return changed; } + boolean setActive(boolean active, DateTime now) { + boolean change = this.state.active != active; + if (change) { + this.dirty = true; + this.state = new State(active, now); + } + return change; + } + @Override public void writeTo(StreamOutput out) throws IOException { out.writeLong(version); @@ -196,6 +215,10 @@ public class WatchStatus implements ToXContent, Streamable { out.writeString(entry.getKey()); ActionStatus.writeTo(entry.getValue(), out); } + if (out.getVersion().onOrAfter(Version.V_2_0_0)) { + out.writeBoolean(state.active); + writeDate(out, state.timestamp); + } } @Override @@ -209,6 +232,11 @@ public class WatchStatus implements ToXContent, Streamable { builder.put(in.readString(), ActionStatus.readFrom(in)); } actions = builder.build(); + if (in.getVersion().onOrAfter(Version.V_2_0_0)) { + state = new State(in.readBoolean(), readDate(in, DateTimeZone.UTC)); + } else { + state = new State(true, new DateTime(SystemClock.INSTANCE.millis(), DateTimeZone.UTC)); + } } public static WatchStatus read(StreamInput in) throws IOException { @@ -223,6 +251,7 @@ public class WatchStatus implements ToXContent, Streamable { if (params.paramAsBoolean(INCLUDE_VERSION_KEY, false)) { builder.field(Field.VERSION.getPreferredName(), version); } + builder.field(Field.STATE.getPreferredName(), state, params); if (lastChecked != null) { builder.field(Field.LAST_CHECKED.getPreferredName(), lastChecked); } @@ -241,6 +270,7 @@ public class WatchStatus implements ToXContent, Streamable { public static WatchStatus parse(String watchId, XContentParser parser) throws IOException { + State state = null; DateTime lastChecked = null; DateTime lastMetCondition = null; ImmutableMap.Builder actions = null; @@ -250,6 +280,12 @@ public class WatchStatus implements ToXContent, Streamable { while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { if (token == XContentParser.Token.FIELD_NAME) { currentFieldName = parser.currentName(); + } else if (ParseFieldMatcher.STRICT.match(currentFieldName, Field.STATE)) { + try { + state = State.parse(parser); + } catch (ElasticsearchParseException e) { + throw new ElasticsearchParseException("could not parse watch status for [{}]. failed to parse field [{}]", e, watchId, currentFieldName); + } } else if (ParseFieldMatcher.STRICT.match(currentFieldName, Field.LAST_CHECKED)) { if (token.isValue()) { lastChecked = parseDate(currentFieldName, parser, DateTimeZone.UTC); @@ -279,15 +315,70 @@ public class WatchStatus implements ToXContent, Streamable { } } - return new WatchStatus(-1, lastChecked, lastMetCondition, actions.build()); + // if the watch status doesn't have a state, we assume active + // this is to support old watches that weren't upgraded yet to + // contain the state + if (state == null) { + state = new State(true, WatcherXContentParser.clock(parser).nowUTC()); + } + + return new WatchStatus(-1, state, lastChecked, lastMetCondition, actions.build()); } + public static class State implements ToXContent { + + final boolean active; + final DateTime timestamp; + + public State(boolean active, DateTime timestamp) { + this.active = active; + this.timestamp = timestamp; + } + + public boolean isActive() { + return active; + } + + public DateTime getTimestamp() { + return timestamp; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field(Field.ACTIVE.getPreferredName(), active); + writeDate(Field.TIMESTAMP.getPreferredName(), builder, timestamp); + return builder.endObject(); + } + + public static State parse(XContentParser parser) throws IOException { + if (parser.currentToken() != XContentParser.Token.START_OBJECT) { + throw new ElasticsearchParseException("expected an object but found [{}] instead", parser.currentToken()); + } + boolean active = true; + DateTime timestamp = SystemClock.INSTANCE.nowUTC(); + String currentFieldName = null; + XContentParser.Token token; + while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { + if (token == XContentParser.Token.FIELD_NAME) { + currentFieldName = parser.currentName(); + } else if (ParseFieldMatcher.STRICT.match(currentFieldName, Field.ACTIVE)) { + active = parser.booleanValue(); + } else if (ParseFieldMatcher.STRICT.match(currentFieldName, Field.TIMESTAMP)) { + timestamp = parseDate(currentFieldName, parser, DateTimeZone.UTC); + } + } + return new State(active, timestamp); + } + } interface Field { ParseField VERSION = new ParseField("version"); + ParseField STATE = new ParseField("state"); + ParseField ACTIVE = new ParseField("active"); + ParseField TIMESTAMP = new ParseField("timestamp"); ParseField LAST_CHECKED = new ParseField("last_checked"); ParseField LAST_MET_CONDITION = new ParseField("last_met_condition"); ParseField ACTIONS = new ParseField("actions"); } - } diff --git a/watcher/src/main/java/org/elasticsearch/watcher/watch/WatchStore.java b/watcher/src/main/java/org/elasticsearch/watcher/watch/WatchStore.java index ef30cd0e072..969814e786f 100644 --- a/watcher/src/main/java/org/elasticsearch/watcher/watch/WatchStore.java +++ b/watcher/src/main/java/org/elasticsearch/watcher/watch/WatchStore.java @@ -34,6 +34,9 @@ import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.watcher.support.init.proxy.ClientProxy; import java.io.IOException; +import java.util.Collection; +import java.util.HashSet; +import java.util.Set; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicBoolean; @@ -182,10 +185,24 @@ public class WatchStore extends AbstractComponent { return new WatchDelete(response); } - public ConcurrentMap watches() { + public Collection watches() { + return watches.values(); + } + + public Collection activeWatches() { + Set watches = new HashSet<>(); + for (Watch watch : watches()) { + if (watch.status().state().isActive()) { + watches.add(watch); + } + } return watches; } + public int watchCount() { + return watches.size(); + } + IndexRequest createIndexRequest(String id, BytesReference source, long version) { IndexRequest indexRequest = new IndexRequest(INDEX, DOC_TYPE, id); // TODO (2.0 upgrade): move back to BytesReference instead of dealing with the array directly diff --git a/watcher/src/test/java/org/elasticsearch/watcher/watch/WatchServiceTests.java b/watcher/src/test/java/org/elasticsearch/watcher/WatcherServiceTests.java similarity index 54% rename from watcher/src/test/java/org/elasticsearch/watcher/watch/WatchServiceTests.java rename to watcher/src/test/java/org/elasticsearch/watcher/WatcherServiceTests.java index 7ef5b91f938..599010a1f21 100644 --- a/watcher/src/test/java/org/elasticsearch/watcher/watch/WatchServiceTests.java +++ b/watcher/src/test/java/org/elasticsearch/watcher/WatcherServiceTests.java @@ -3,7 +3,7 @@ * or more contributor license agreements. Licensed under the Elastic License; * you may not use this file except in compliance with the Elastic License. */ -package org.elasticsearch.watcher.watch; +package org.elasticsearch.watcher; import com.google.common.collect.ImmutableMap; import org.elasticsearch.ElasticsearchTimeoutException; @@ -15,8 +15,6 @@ import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.test.ESTestCase; -import org.elasticsearch.watcher.WatcherService; -import org.elasticsearch.watcher.WatcherState; import org.elasticsearch.watcher.actions.ActionStatus; import org.elasticsearch.watcher.execution.ExecutionService; import org.elasticsearch.watcher.support.WatcherIndexTemplateRegistry; @@ -25,6 +23,10 @@ import org.elasticsearch.watcher.support.clock.SystemClock; import org.elasticsearch.watcher.trigger.Trigger; import org.elasticsearch.watcher.trigger.TriggerEngine; import org.elasticsearch.watcher.trigger.TriggerService; +import org.elasticsearch.watcher.watch.Watch; +import org.elasticsearch.watcher.watch.WatchLockService; +import org.elasticsearch.watcher.watch.WatchStatus; +import org.elasticsearch.watcher.watch.WatchStore; import org.joda.time.DateTime; import org.joda.time.DateTimeZone; import org.junit.Before; @@ -39,8 +41,9 @@ import static org.mockito.Matchers.any; import static org.mockito.Mockito.*; /** + * */ -public class WatchServiceTests extends ESTestCase { +public class WatcherServiceTests extends ESTestCase { private TriggerService triggerService; private WatchStore watchStore; @@ -67,53 +70,86 @@ public class WatchServiceTests extends ESTestCase { @Test public void testPutWatch() throws Exception { + + boolean activeByDefault = randomBoolean(); + IndexResponse indexResponse = mock(IndexResponse.class); - Watch watch = mock(Watch.class); + Watch newWatch = mock(Watch.class); + WatchStatus status = mock(WatchStatus.class); + when(status.state()).thenReturn(new WatchStatus.State(activeByDefault, clock.nowUTC())); + when(newWatch.status()).thenReturn(status); + WatchStore.WatchPut watchPut = mock(WatchStore.WatchPut.class); when(watchPut.indexResponse()).thenReturn(indexResponse); - when(watchPut.current()).thenReturn(watch); + when(watchPut.current()).thenReturn(newWatch); TimeValue timeout = TimeValue.timeValueSeconds(5); WatchLockService.Lock lock = mock(WatchLockService.Lock.class); when(watchLockService.tryAcquire(any(String.class), eq(timeout))).thenReturn(lock); - when(watchParser.parseWithSecrets(any(String.class), eq(false), any(BytesReference.class))).thenReturn(watch); - when(watchStore.put(watch)).thenReturn(watchPut); - IndexResponse response = watcherService.putWatch("_id", new BytesArray("{}"), timeout); + when(watchParser.parseWithSecrets(any(String.class), eq(false), any(BytesReference.class), any(DateTime.class))).thenReturn(newWatch); + when(watchStore.put(newWatch)).thenReturn(watchPut); + IndexResponse response = watcherService.putWatch("_id", new BytesArray("{}"), timeout, activeByDefault); assertThat(response, sameInstance(indexResponse)); - verify(triggerService, times(1)).add(any(TriggerEngine.Job.class)); + verify(newWatch, times(1)).setState(activeByDefault, clock.nowUTC()); + if (activeByDefault) { + verify(triggerService, times(1)).add(any(TriggerEngine.Job.class)); + } else { + verifyZeroInteractions(triggerService); + } } @Test(expected = ElasticsearchTimeoutException.class) public void testPutWatch_Timeout() throws Exception { TimeValue timeout = TimeValue.timeValueSeconds(5); when(watchLockService.tryAcquire("_id", timeout)).thenReturn(null); - watcherService.putWatch("_id", new BytesArray("{}"), timeout); + watcherService.putWatch("_id", new BytesArray("{}"), timeout, randomBoolean()); } @Test - public void testPutWatch_NotSchedule() throws Exception { + public void testPutWatch_DifferentActiveStates() throws Exception { Trigger trigger = mock(Trigger.class); IndexResponse indexResponse = mock(IndexResponse.class); + Watch watch = mock(Watch.class); + when(watch.id()).thenReturn("_id"); + WatchStatus status = mock(WatchStatus.class); + boolean active = randomBoolean(); + when(status.state()).thenReturn(new WatchStatus.State(active, clock.nowUTC())); + when(watch.status()).thenReturn(status); when(watch.trigger()).thenReturn(trigger); WatchStore.WatchPut watchPut = mock(WatchStore.WatchPut.class); when(watchPut.indexResponse()).thenReturn(indexResponse); when(watchPut.current()).thenReturn(watch); + Watch previousWatch = mock(Watch.class); + WatchStatus previousStatus = mock(WatchStatus.class); + boolean prevActive = randomBoolean(); + when(previousStatus.state()).thenReturn(new WatchStatus.State(prevActive, clock.nowUTC())); + when(previousWatch.status()).thenReturn(previousStatus); when(previousWatch.trigger()).thenReturn(trigger); when(watchPut.previous()).thenReturn(previousWatch); TimeValue timeout = TimeValue.timeValueSeconds(5); WatchLockService.Lock lock = mock(WatchLockService.Lock.class); when(watchLockService.tryAcquire(any(String.class), eq(timeout))).thenReturn(lock); - when(watchParser.parseWithSecrets(any(String.class), eq(false), any(BytesReference.class))).thenReturn(watch); + when(watchParser.parseWithSecrets(any(String.class), eq(false), any(BytesReference.class), eq(clock.nowUTC()))).thenReturn(watch); when(watchStore.put(watch)).thenReturn(watchPut); - IndexResponse response = watcherService.putWatch("_id", new BytesArray("{}"), timeout); + + IndexResponse response = watcherService.putWatch("_id", new BytesArray("{}"), timeout, active); assertThat(response, sameInstance(indexResponse)); - verifyZeroInteractions(triggerService); + if (!active) { + // we should always remove the watch from the trigger service, just to be safe + verify(triggerService, times(1)).remove("_id"); + } else if (prevActive) { + // if both the new watch and the prev one are active, we should do nothing + verifyZeroInteractions(triggerService); + } else { + // if the prev watch was not active and the new one is active, we should add the watch + verify(triggerService, times(1)).add(watch); + } } @Test @@ -158,7 +194,6 @@ public class WatchServiceTests extends ESTestCase { verify(triggerService, times(1)).remove("_id"); } - @Test public void testDeleteWatch_NotFound() throws Exception { TimeValue timeout = TimeValue.timeValueSeconds(5); @@ -186,7 +221,7 @@ public class WatchServiceTests extends ESTestCase { when(watchLockService.tryAcquire("_id", timeout)).thenReturn(lock); Watch watch = mock(Watch.class); when(watch.ack(now, "_all")).thenReturn(true); - WatchStatus status = new WatchStatus(ImmutableMap.of()); + WatchStatus status = new WatchStatus(now, ImmutableMap.of()); when(watch.status()).thenReturn(status); when(watchStore.get("_id")).thenReturn(watch); @@ -196,6 +231,146 @@ public class WatchServiceTests extends ESTestCase { verify(watchStore, times(1)).updateStatus(watch); } + @Test + public void testActivate() throws Exception { + TimeValue timeout = TimeValue.timeValueSeconds(randomIntBetween(1, 30)); + WatcherService service = spy(watcherService); + WatchStatus expectedStatus = mock(WatchStatus.class); + doReturn(expectedStatus).when(service).setWatchState("_id", true, timeout); + WatchStatus actualStatus = service.activateWatch("_id", timeout); + assertThat(actualStatus, sameInstance(expectedStatus)); + verify(service, times(1)).setWatchState("_id", true, timeout); + } + + @Test + public void testDeactivate() throws Exception { + TimeValue timeout = TimeValue.timeValueSeconds(randomIntBetween(1, 30)); + WatcherService service = spy(watcherService); + WatchStatus expectedStatus = mock(WatchStatus.class); + doReturn(expectedStatus).when(service).setWatchState("_id", false, timeout); + WatchStatus actualStatus = service.deactivateWatch("_id", timeout); + assertThat(actualStatus, sameInstance(expectedStatus)); + verify(service, times(1)).setWatchState("_id", false, timeout); + } + + @Test + public void testSetWatchState_SetActiveOnCurrentlyActive() throws Exception { + + // trying to activate a watch that is already active: + // - the watch status should not change + // - the watch doesn't need to be updated in the store + // - the watch should not be removed or re-added to the trigger service + + DateTime now = new DateTime(DateTimeZone.UTC); + clock.setTime(now); + TimeValue timeout = TimeValue.timeValueSeconds(5); + WatchLockService.Lock lock = mock(WatchLockService.Lock.class); + when(watchLockService.tryAcquire("_id", timeout)).thenReturn(lock); + + Watch watch = mock(Watch.class); + WatchStatus status = new WatchStatus(now, ImmutableMap.of()); + when(watch.status()).thenReturn(status); + when(watch.setState(true, now)).thenReturn(false); + + when(watchStore.get("_id")).thenReturn(watch); + + + WatchStatus result = watcherService.setWatchState("_id", true, timeout); + assertThat(result, not(sameInstance(status))); + + verifyZeroInteractions(triggerService); + verify(watchStore, never()).updateStatus(watch); + } + + @Test + public void testSetWatchState_SetActiveOnCurrentlyInactive() throws Exception { + + // activating a watch that is currently inactive: + // - the watch status should be updated + // - the watch needs to be updated in the store + // - the watch should be re-added to the trigger service (the assumption is that it's not there) + + DateTime now = new DateTime(DateTimeZone.UTC); + clock.setTime(now); + + TimeValue timeout = TimeValue.timeValueSeconds(5); + WatchLockService.Lock lock = mock(WatchLockService.Lock.class); + when(watchLockService.tryAcquire("_id", timeout)).thenReturn(lock); + + Watch watch = mock(Watch.class); + WatchStatus status = new WatchStatus(now, ImmutableMap.of()); + when(watch.status()).thenReturn(status); + when(watch.setState(true, now)).thenReturn(true); + + when(watchStore.get("_id")).thenReturn(watch); + + WatchStatus result = watcherService.setWatchState("_id", true, timeout); + assertThat(result, not(sameInstance(status))); + + verify(triggerService, times(1)).add(watch); + verify(watchStore, times(1)).updateStatus(watch); + } + + @Test + public void testSetWatchState_SetInactiveOnCurrentlyActive() throws Exception { + + // deactivating a watch that is currently active: + // - the watch status should change + // - the watch needs to be updated in the store + // - the watch should be removed from the trigger service + + DateTime now = new DateTime(DateTimeZone.UTC); + clock.setTime(now); + TimeValue timeout = TimeValue.timeValueSeconds(5); + WatchLockService.Lock lock = mock(WatchLockService.Lock.class); + when(watchLockService.tryAcquire("_id", timeout)).thenReturn(lock); + + Watch watch = mock(Watch.class); + when(watch.id()).thenReturn("_id"); + WatchStatus status = new WatchStatus(now, ImmutableMap.of()); + when(watch.status()).thenReturn(status); + when(watch.setState(false, now)).thenReturn(true); + + when(watchStore.get("_id")).thenReturn(watch); + + + WatchStatus result = watcherService.setWatchState("_id", false, timeout); + assertThat(result, not(sameInstance(status))); + + verify(triggerService, times(1)).remove("_id"); + verify(watchStore, times(1)).updateStatus(watch); + } + + @Test + public void testSetWatchState_SetInactiveOnCurrentlyInactive() throws Exception { + + // trying to deactivate a watch that is currently inactive: + // - the watch status should not be updated + // - the watch should not be updated in the store + // - the watch should be re-added or removed to/from the trigger service + + DateTime now = new DateTime(DateTimeZone.UTC); + clock.setTime(now); + + TimeValue timeout = TimeValue.timeValueSeconds(5); + WatchLockService.Lock lock = mock(WatchLockService.Lock.class); + when(watchLockService.tryAcquire("_id", timeout)).thenReturn(lock); + + Watch watch = mock(Watch.class); + when(watch.id()).thenReturn("_id"); + WatchStatus status = new WatchStatus(now, ImmutableMap.of()); + when(watch.status()).thenReturn(status); + when(watch.setState(false, now)).thenReturn(false); + + when(watchStore.get("_id")).thenReturn(watch); + + WatchStatus result = watcherService.setWatchState("_id", false, timeout); + assertThat(result, not(sameInstance(status))); + + verifyZeroInteractions(triggerService); + verify(watchStore, never()).updateStatus(watch); + } + @Test(expected = ElasticsearchTimeoutException.class) public void testAckWatch_Timeout() throws Exception { TimeValue timeout = TimeValue.timeValueSeconds(5); @@ -211,7 +386,7 @@ public class WatchServiceTests extends ESTestCase { when(watchLockService.tryAcquire("_id", timeout)).thenReturn(lock); Watch watch = mock(Watch.class); when(watch.ack(now)).thenReturn(false); - WatchStatus status = new WatchStatus(ImmutableMap.of()); + WatchStatus status = new WatchStatus(now, ImmutableMap.of()); when(watch.status()).thenReturn(status); when(watchStore.get("_id")).thenReturn(watch); diff --git a/watcher/src/test/java/org/elasticsearch/watcher/actions/email/service/ManualPublicSmtpServersTester.java b/watcher/src/test/java/org/elasticsearch/watcher/actions/email/service/ManualPublicSmtpServersTester.java index 33fb2a82bb0..f0acbd44461 100644 --- a/watcher/src/test/java/org/elasticsearch/watcher/actions/email/service/ManualPublicSmtpServersTester.java +++ b/watcher/src/test/java/org/elasticsearch/watcher/actions/email/service/ManualPublicSmtpServersTester.java @@ -5,7 +5,7 @@ */ package org.elasticsearch.watcher.actions.email.service; -import org.apache.lucene.util.LuceneTestCase; +import org.apache.lucene.util.LuceneTestCase.AwaitsFix; import org.elasticsearch.common.cli.Terminal; import org.elasticsearch.common.inject.Provider; import org.elasticsearch.common.settings.Settings; @@ -18,7 +18,7 @@ import org.elasticsearch.watcher.support.secret.SecretService; import java.io.IOException; import java.io.InputStream; -@LuceneTestCase.AwaitsFix(bugUrl = "https://github.com/elastic/x-plugins/issues/379") +@AwaitsFix(bugUrl = "https://github.com/elastic/x-plugins/issues/379") public class ManualPublicSmtpServersTester { private static final Terminal terminal = Terminal.DEFAULT; diff --git a/watcher/src/test/java/org/elasticsearch/watcher/execution/ExecutionServiceTests.java b/watcher/src/test/java/org/elasticsearch/watcher/execution/ExecutionServiceTests.java index 28d5c633e77..fbaa931f2ff 100644 --- a/watcher/src/test/java/org/elasticsearch/watcher/execution/ExecutionServiceTests.java +++ b/watcher/src/test/java/org/elasticsearch/watcher/execution/ExecutionServiceTests.java @@ -127,7 +127,7 @@ public class ExecutionServiceTests extends ESTestCase { ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionTransform, action); ExecutableActions actions = new ExecutableActions(Arrays.asList(actionWrapper)); - WatchStatus watchStatus = new WatchStatus(ImmutableMap.of("_action", new ActionStatus(clock.nowUTC()))); + WatchStatus watchStatus = new WatchStatus(clock.nowUTC(), ImmutableMap.of("_action", new ActionStatus(clock.nowUTC()))); when(watch.input()).thenReturn(input); when(watch.condition()).thenReturn(condition); @@ -201,7 +201,7 @@ public class ExecutionServiceTests extends ESTestCase { ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionTransform, action); ExecutableActions actions = new ExecutableActions(Arrays.asList(actionWrapper)); - WatchStatus watchStatus = new WatchStatus(ImmutableMap.of("_action", new ActionStatus(clock.nowUTC()))); + WatchStatus watchStatus = new WatchStatus(clock.nowUTC(), ImmutableMap.of("_action", new ActionStatus(clock.nowUTC()))); when(watch.input()).thenReturn(input); when(watch.condition()).thenReturn(condition); @@ -270,7 +270,7 @@ public class ExecutionServiceTests extends ESTestCase { ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionTransform, action); ExecutableActions actions = new ExecutableActions(Arrays.asList(actionWrapper)); - WatchStatus watchStatus = new WatchStatus(ImmutableMap.of("_action", new ActionStatus(clock.nowUTC()))); + WatchStatus watchStatus = new WatchStatus(clock.nowUTC(), ImmutableMap.of("_action", new ActionStatus(clock.nowUTC()))); when(watch.input()).thenReturn(input); when(watch.condition()).thenReturn(condition); @@ -338,7 +338,7 @@ public class ExecutionServiceTests extends ESTestCase { ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionTransform, action); ExecutableActions actions = new ExecutableActions(Arrays.asList(actionWrapper)); - WatchStatus watchStatus = new WatchStatus(ImmutableMap.of("_action", new ActionStatus(clock.nowUTC()))); + WatchStatus watchStatus = new WatchStatus(clock.nowUTC(), ImmutableMap.of("_action", new ActionStatus(clock.nowUTC()))); when(watch.input()).thenReturn(input); when(watch.condition()).thenReturn(condition); @@ -408,7 +408,7 @@ public class ExecutionServiceTests extends ESTestCase { ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionTransform, action); ExecutableActions actions = new ExecutableActions(Arrays.asList(actionWrapper)); - WatchStatus watchStatus = new WatchStatus(ImmutableMap.of("_action", new ActionStatus(clock.nowUTC()))); + WatchStatus watchStatus = new WatchStatus(clock.nowUTC(), ImmutableMap.of("_action", new ActionStatus(clock.nowUTC()))); when(watch.input()).thenReturn(input); when(watch.condition()).thenReturn(condition); @@ -473,7 +473,7 @@ public class ExecutionServiceTests extends ESTestCase { ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionTransform, action); ExecutableActions actions = new ExecutableActions(Arrays.asList(actionWrapper)); - WatchStatus watchStatus = new WatchStatus(ImmutableMap.of("_action", new ActionStatus(now))); + WatchStatus watchStatus = new WatchStatus(clock.nowUTC(), ImmutableMap.of("_action", new ActionStatus(now))); when(watch.input()).thenReturn(input); when(watch.condition()).thenReturn(condition); @@ -519,7 +519,7 @@ public class ExecutionServiceTests extends ESTestCase { ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, transform, action); ExecutableActions actions = new ExecutableActions(Arrays.asList(actionWrapper)); - WatchStatus watchStatus = new WatchStatus(ImmutableMap.of("_action", new ActionStatus(now))); + WatchStatus watchStatus = new WatchStatus(clock.nowUTC(), ImmutableMap.of("_action", new ActionStatus(now))); when(watch.input()).thenReturn(input); when(watch.condition()).thenReturn(condition); @@ -565,7 +565,7 @@ public class ExecutionServiceTests extends ESTestCase { ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionTransform, action); ExecutableActions actions = new ExecutableActions(Arrays.asList(actionWrapper)); - WatchStatus watchStatus = new WatchStatus(ImmutableMap.of("_action", new ActionStatus(now))); + WatchStatus watchStatus = new WatchStatus(clock.nowUTC(), ImmutableMap.of("_action", new ActionStatus(now))); when(watch.input()).thenReturn(input); when(watch.condition()).thenReturn(condition); diff --git a/watcher/src/test/java/org/elasticsearch/watcher/input/http/HttpInputTests.java b/watcher/src/test/java/org/elasticsearch/watcher/input/http/HttpInputTests.java index ea8ecd71ff7..16ea2ad342b 100644 --- a/watcher/src/test/java/org/elasticsearch/watcher/input/http/HttpInputTests.java +++ b/watcher/src/test/java/org/elasticsearch/watcher/input/http/HttpInputTests.java @@ -125,7 +125,7 @@ public class HttpInputTests extends ESTestCase { null, new ExecutableActions(new ArrayList()), null, - new WatchStatus(ImmutableMap.of())); + new WatchStatus(new DateTime(0, UTC), ImmutableMap.of())); WatchExecutionContext ctx = new TriggeredExecutionContext(watch, new DateTime(0, UTC), new ScheduleTriggerEvent(watch.id(), new DateTime(0, UTC), new DateTime(0, UTC)), @@ -156,7 +156,7 @@ public class HttpInputTests extends ESTestCase { null, new ExecutableActions(new ArrayList()), null, - new WatchStatus(ImmutableMap.of())); + new WatchStatus(new DateTime(0, UTC), ImmutableMap.of())); WatchExecutionContext ctx = new TriggeredExecutionContext(watch, new DateTime(0, UTC), new ScheduleTriggerEvent(watch.id(), new DateTime(0, UTC), new DateTime(0, UTC)), diff --git a/watcher/src/test/java/org/elasticsearch/watcher/input/search/SearchInputTests.java b/watcher/src/test/java/org/elasticsearch/watcher/input/search/SearchInputTests.java index cddcc8bebf4..2cde65ce491 100644 --- a/watcher/src/test/java/org/elasticsearch/watcher/input/search/SearchInputTests.java +++ b/watcher/src/test/java/org/elasticsearch/watcher/input/search/SearchInputTests.java @@ -124,7 +124,7 @@ public class SearchInputTests extends ESIntegTestCase { null, new ExecutableActions(new ArrayList()), null, - new WatchStatus(ImmutableMap.of())), + new WatchStatus(new DateTime(0, UTC), ImmutableMap.of())), new DateTime(0, UTC), new ScheduleTriggerEvent("test-watch", new DateTime(0, UTC), new DateTime(0, UTC)), timeValueSeconds(5)); @@ -231,7 +231,7 @@ public class SearchInputTests extends ESIntegTestCase { null, new ExecutableActions(new ArrayList()), null, - new WatchStatus(ImmutableMap.of())), + new WatchStatus(new DateTime(0, UTC), ImmutableMap.of())), new DateTime(0, UTC), new ScheduleTriggerEvent("test-watch", new DateTime(0, UTC), new DateTime(0, UTC)), timeValueSeconds(5)); @@ -343,7 +343,7 @@ public class SearchInputTests extends ESIntegTestCase { null, new ExecutableActions(new ArrayList()), null, - new WatchStatus(ImmutableMap.of())), + new WatchStatus(new DateTime(50000, UTC), ImmutableMap.of())), new DateTime(60000, UTC), new ScheduleTriggerEvent("test-watch", new DateTime(60000, UTC), new DateTime(60000, UTC)), timeValueSeconds(5)); diff --git a/watcher/src/test/java/org/elasticsearch/watcher/test/WatcherTestUtils.java b/watcher/src/test/java/org/elasticsearch/watcher/test/WatcherTestUtils.java index 2e4f1b84eca..1b04ae5642a 100644 --- a/watcher/src/test/java/org/elasticsearch/watcher/test/WatcherTestUtils.java +++ b/watcher/src/test/java/org/elasticsearch/watcher/test/WatcherTestUtils.java @@ -227,7 +227,7 @@ public final class WatcherTestUtils { new TimeValue(0), new ExecutableActions(actions), metadata, - new WatchStatus(ImmutableMap.builder() + new WatchStatus(now, ImmutableMap.builder() .put("_webhook", new ActionStatus(now)) .put("_email", new ActionStatus(now)) .build())); diff --git a/watcher/src/test/java/org/elasticsearch/watcher/transform/search/SearchTransformTests.java b/watcher/src/test/java/org/elasticsearch/watcher/transform/search/SearchTransformTests.java index 764a1a9d566..908419b3b6c 100644 --- a/watcher/src/test/java/org/elasticsearch/watcher/transform/search/SearchTransformTests.java +++ b/watcher/src/test/java/org/elasticsearch/watcher/transform/search/SearchTransformTests.java @@ -499,6 +499,7 @@ public class SearchTransformTests extends ESIntegTestCase { } private WatchExecutionContext createContext() { + return new TriggeredExecutionContext( new Watch("test-watch", new ScheduleTrigger(new IntervalSchedule(new IntervalSchedule.Interval(1, IntervalSchedule.Interval.Unit.MINUTES))), @@ -508,7 +509,7 @@ public class SearchTransformTests extends ESIntegTestCase { null, new ExecutableActions(new ArrayList()), null, - new WatchStatus(ImmutableMap.of())), + new WatchStatus( new DateTime(40000, UTC), ImmutableMap.of())), new DateTime(60000, UTC), new ScheduleTriggerEvent("test-watch", new DateTime(60000, UTC), new DateTime(60000, UTC)), timeValueSeconds(5)); diff --git a/watcher/src/test/java/org/elasticsearch/watcher/transport/action/activate/ActivateWatchTests.java b/watcher/src/test/java/org/elasticsearch/watcher/transport/action/activate/ActivateWatchTests.java new file mode 100644 index 00000000000..6e852ea8fc1 --- /dev/null +++ b/watcher/src/test/java/org/elasticsearch/watcher/transport/action/activate/ActivateWatchTests.java @@ -0,0 +1,160 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.watcher.transport.action.activate; + + +import org.elasticsearch.action.get.GetResponse; +import org.elasticsearch.action.index.IndexResponse; +import org.elasticsearch.common.io.stream.BytesStreamOutput; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.watcher.client.WatcherClient; +import org.elasticsearch.watcher.execution.ExecutionState; +import org.elasticsearch.watcher.support.xcontent.XContentSource; +import org.elasticsearch.watcher.test.AbstractWatcherIntegrationTestCase; +import org.elasticsearch.watcher.transport.actions.activate.ActivateWatchResponse; +import org.elasticsearch.watcher.transport.actions.get.GetWatchResponse; +import org.elasticsearch.watcher.transport.actions.put.PutWatchResponse; +import org.junit.Test; + +import java.util.concurrent.TimeUnit; + +import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery; +import static org.elasticsearch.watcher.actions.ActionBuilders.indexAction; +import static org.elasticsearch.watcher.client.WatchSourceBuilders.watchBuilder; +import static org.elasticsearch.watcher.condition.ConditionBuilders.alwaysCondition; +import static org.elasticsearch.watcher.input.InputBuilders.simpleInput; +import static org.elasticsearch.watcher.trigger.TriggerBuilders.schedule; +import static org.elasticsearch.watcher.trigger.schedule.Schedules.cron; +import static org.elasticsearch.watcher.trigger.schedule.Schedules.interval; +import static org.hamcrest.Matchers.*; + +/** + */ +public class ActivateWatchTests extends AbstractWatcherIntegrationTestCase { + + @Override + protected boolean timeWarped() { + return false; + } + + @Test + public void testDeactivateAndActivate() throws Exception { + WatcherClient watcherClient = watcherClient(); + + PutWatchResponse putWatchResponse = watcherClient.preparePutWatch() + .setId("_id") + .setSource(watchBuilder() + .trigger(schedule(interval("1s"))) + .input(simpleInput("foo", "bar")) + .condition(alwaysCondition()) + .addAction("_a1", indexAction("actions", "action1")) + .defaultThrottlePeriod(new TimeValue(0, TimeUnit.SECONDS))) + .get(); + + assertThat(putWatchResponse.isCreated(), is(true)); + + GetWatchResponse getWatchResponse = watcherClient.prepareGetWatch("_id").get(); + assertThat(getWatchResponse, notNullValue()); + assertThat(getWatchResponse.getStatus().state().isActive(), is(true)); + + assertWatchWithMinimumActionsCount("_id", ExecutionState.EXECUTED, 5); + + // we now know the watch is executing... lets deactivate it + ActivateWatchResponse activateWatchResponse = watcherClient.prepareActivateWatch("_id", false).get(); + assertThat(activateWatchResponse, notNullValue()); + assertThat(activateWatchResponse.getStatus().state().isActive(), is(false)); + + getWatchResponse = watcherClient.prepareGetWatch("_id").get(); + assertThat(getWatchResponse, notNullValue()); + assertThat(getWatchResponse.getStatus().state().isActive(), is(false)); + + flush(); + refresh(); + long count1 = docCount(".watch_history*", "watch_record", matchAllQuery()); + + Thread.sleep(10000); + + flush(); + refresh(); + long count2 = docCount(".watch_history*", "watch_record", matchAllQuery()); + + assertThat(count2, is(count1)); + + // lets activate it again + + activateWatchResponse = watcherClient.prepareActivateWatch("_id", true).get(); + assertThat(activateWatchResponse, notNullValue()); + assertThat(activateWatchResponse.getStatus().state().isActive(), is(true)); + + getWatchResponse = watcherClient.prepareGetWatch("_id").get(); + assertThat(getWatchResponse, notNullValue()); + assertThat(getWatchResponse.getStatus().state().isActive(), is(true)); + + Thread.sleep(10000); + long count3 = docCount(".watch_history*", "watch_record", matchAllQuery()); + assertThat(count3, greaterThan(count1)); + } + + @Test + public void testLoadWatchWithoutAState() throws Exception { + WatcherClient watcherClient = watcherClient(); + + PutWatchResponse putWatchResponse = watcherClient.preparePutWatch() + .setId("_id") + .setSource(watchBuilder() + .trigger(schedule(cron("0 0 0 1 1 ? 2050"))) // some time in 2050 + .input(simpleInput("foo", "bar")) + .condition(alwaysCondition()) + .addAction("_a1", indexAction("actions", "action1")) + .defaultThrottlePeriod(new TimeValue(0, TimeUnit.SECONDS))) + .get(); + + assertThat(putWatchResponse.isCreated(), is(true)); + + GetWatchResponse getWatchResponse = watcherClient.prepareGetWatch("_id").get(); + assertThat(getWatchResponse, notNullValue()); + assertThat(getWatchResponse.getStatus().state().isActive(), is(true)); + + GetResponse getResponse = client().prepareGet(".watches", "watch", "_id").get(); + XContentSource source = new XContentSource(getResponse.getSourceAsBytesRef(), XContentType.JSON); + + String[] filters = new String[] { + "trigger.**", + "input.**", + "condition.**", + "throttle_period.**", + "transform.**", + "actions.**", + "metadata.**", + "_status.version", + "_status.last_checked", + "_status.last_met_condition", + "_status.actions.**", + }; + XContentBuilder builder = new XContentBuilder(XContentType.JSON.xContent(), new BytesStreamOutput(), filters); + source.toXContent(builder, ToXContent.EMPTY_PARAMS); + + // now that we filtered out the watch status state, lets put it back in + IndexResponse indexResponse = client().prepareIndex(".watches", "watch", "_id") + .setSource(builder.bytes()) + .get(); + assertThat(indexResponse.getId(), is("_id")); + + // now, let's restart + assertThat(watcherClient.prepareWatchService().stop().get().isAcknowledged(), is(true)); + ensureWatcherStopped(); + assertThat(watcherClient.prepareWatchService().start().get().isAcknowledged(), is(true)); + ensureWatcherStarted(); + + getWatchResponse = watcherClient.prepareGetWatch("_id").get(); + assertThat(getWatchResponse, notNullValue()); + assertThat(getWatchResponse.getStatus().state(), notNullValue()); + assertThat(getWatchResponse.getStatus().state().isActive(), is(true)); + } +} diff --git a/watcher/src/test/java/org/elasticsearch/watcher/watch/WatchTests.java b/watcher/src/test/java/org/elasticsearch/watcher/watch/WatchTests.java index 9215b4a9fbe..94daddda1c0 100644 --- a/watcher/src/test/java/org/elasticsearch/watcher/watch/WatchTests.java +++ b/watcher/src/test/java/org/elasticsearch/watcher/watch/WatchTests.java @@ -170,7 +170,7 @@ public class WatchTests extends ESTestCase { for (ActionWrapper action : actions) { actionsStatuses.put(action.id(), new ActionStatus(now)); } - WatchStatus watchStatus = new WatchStatus(actionsStatuses.build()); + WatchStatus watchStatus = new WatchStatus(clock.nowUTC(), actionsStatuses.build()); TimeValue throttlePeriod = randomBoolean() ? null : TimeValue.timeValueSeconds(randomIntBetween(5, 10));