Introducing Watch De/activateion

Today, once you add a watch to watcher, it's always active. Being "active" means that the watch is registered with the trigger engine (scheduled) and will be executed when its trigger is triggered.

Quite often, ppl want to have an option to deactivate/disable a registered watch. Such that while the watch definition still exists in watcher, it is "inactive" and is never triggered. The only way to do this today is using a "hack" where you can change the watch schedule to a cron expression targeting a really far date in the future (say somewhere around 2050). Again.. this is very hackish and it requires changing the actual definition of the watch (you loose its original trigger).

 This commit introduces the notion of an active/inactive watch.. here are the differences between the two states:

 - active: the watch is registered with watcher and with the trigger engine and will be executed when its trigger is fired by the engine
 - inactive: the watch is registered with watcher, but is not registered with the trigger engine. An inactive watch will never be fired, regardless of its trigger.

 This commit also adds two new APIs:

  - `_watcher/watch/{id}/_activate`
  - `_watcher/watch/{id}/_deactivate`

 to activate and deactivate existing watches.

 In addition, the Put Watch API now accepts an `active` parameter that indicates the initial state of the put watch (by default set to  `true`, i.e. "active").

 Closes elastic/elasticsearch#90

Original commit: elastic/x-pack-elasticsearch@37b9ab4d54
This commit is contained in:
uboness 2015-09-04 01:55:40 +02:00
parent 533c14242f
commit e6dfa215b6
49 changed files with 1502 additions and 101 deletions

View File

@ -168,6 +168,30 @@ acknowledgment is cleared and Watcher resumes executing the watch's actions norm
For more information, see <<actions-ack-throttle, action throttling>>. For more information, see <<actions-ack-throttle, action throttling>>.
[[watch-active-state]]
=== Watch Active State
By default, when you add a watch it is immediately set to the active state. An active watch is
registered with the relevant trigger engine and executed according to its configured trigger.
For example, if a watch has a <<trigger-schedule, `schedule`>> trigger, it is executed according
to its schedule.
A watch can also be set to the inactive state. An inactive watch is still registered with Watcher,
but it is not registered with a trigger engine and can never be triggered. When you add a watch,
you can use the <<api-rest-put-watch-active-state, `active`>> parameter to set its initial state
to inactive. You can deactivate an existing watch with the <<api-rest-deactivate-watch, Deactivate Watch API>>.
To reactivate a watch, you use the <<api-rest-activate-watch, Activate Watch API>>.
NOTE: You can still use the <<api-rest-execute-watch, Execute Watch API>> to force the execution
of an inactive watch.
Deactivating watches is useful in a variety of situations. For example, if you have a watch that
monitors an external system and need to take the system down for maintenance, you can deactivate
the watch so doesn't unnecessarily report a bunch of execution failures during the maintenance
window. You might also want to deactivate a watch rather than deleting it entirely in case you
might want to use it at some point in the future.
[[scripts-templates]] [[scripts-templates]]
=== Scripts and Templates === Scripts and Templates

View File

@ -1,4 +1,4 @@
[[array-condition-compare]] [[condition-array-compare]]
==== Array Compare Condition ==== Array Compare Condition
A watch <<condition, condition>> that compares an array of values in the <<watch-execution-context, Watch Execution Context Model>> A watch <<condition, condition>> that compares an array of values in the <<watch-execution-context, Watch Execution Context Model>>
@ -51,7 +51,8 @@ The array path is a "dot-notation" expression that can reference the following v
This array path must resolve to an array. This array path must resolve to an array.
The comparison operator can be any one of the operators from [[condition-compare]]. The comparison operator can be any one of the operators supported by the
<<condition-compare-operators, Compare Condition>>.
The quantifier operator can be any one of the following: The quantifier operator can be any one of the following:
@ -65,4 +66,4 @@ The quantifier operator can be any one of the following:
NOTE: If the array is empty, `all` causes the comparison operator to evaluate to `true` and `some` causes the comparison NOTE: If the array is empty, `all` causes the comparison operator to evaluate to `true` and `some` causes the comparison
operator to evaluate to `false`. operator to evaluate to `false`.
NOTE: It is also possible to use date math expressions and values in the context model as in [[condition-compare]]. NOTE: It is also possible to use date math expressions and values in the context model as in the <<condition-compare, Compare Condition>>.

View File

@ -47,6 +47,7 @@ The path is a "dot-notation" expression that can reference the following variabl
TIP: You can reference entries in arrays using their zero-based array indices. For example, to access the third TIP: You can reference entries in arrays using their zero-based array indices. For example, to access the third
element of the `ctx.payload.hits.hits` array, use `ctx.payload.hits.hits.2`. element of the `ctx.payload.hits.hits` array, use `ctx.payload.hits.hits.2`.
[[condition-compare-operators]]
The comparison operator can be any one of the following: The comparison operator can be any one of the following:
[options="header"] [options="header"]

View File

@ -93,6 +93,10 @@ include::java/execute-watch.asciidoc[]
include::java/ack-watch.asciidoc[] include::java/ack-watch.asciidoc[]
include::java/activate-watch.asciidoc[]
include::java/deactivate-watch.asciidoc[]
include::java/stats.asciidoc[] include::java/stats.asciidoc[]
include::java/service.asciidoc[] include::java/service.asciidoc[]

View File

@ -0,0 +1,26 @@
[[api-java-activate-watch]]
==== Activate Watch API
A watch can be either <<watch-active-state, active or inactive>>. This API enables
you to activate a currently inactive watch.
The status of an inactive watch is returned with the watch definition
when you call the <<api-java-get-watch, Get Watch API>>:
[source,java]
--------------------------------------------------
GetWatchResponse getWatchResponse = watcherClient.prepareGetWatch("my-watch").get();
boolean active = getWatchResponse.getStatus().state().isActive();
assert !active;
--------------------------------------------------
You can activate the watch by executing the following API call:
[source,java]
--------------------------------------------------
ActivateWatchResponse activateResponse = watcherClient.prepareActivateWatch("my-watch", true).get();
boolean active = activateResponse.getStatus().state().isActive();
assert active;
--------------------------------------------------
The new state of the watch is returned as part of its overall status.

View File

@ -0,0 +1,26 @@
[[api-java-deactivate-watch]]
==== Deactivate Watch API
A watch can be either <<watch-active-state, active or inactive>>. This API enables
you to deactivate a currently active watch.
The status of an active watch is returned with the watch definition
when you call the <<api-java-get-watch, Get Watch API>>:
[source,java]
--------------------------------------------------
GetWatchResponse getWatchResponse = watcherClient.prepareGetWatch("my-watch").get();
boolean active = getWatchResponse.getStatus().state().isActive();
assert active;
--------------------------------------------------
You can deactivate the watch by executing the following API call:
[source,java]
--------------------------------------------------
ActivateWatchResponse activateResponse = watcherClient.prepareActivateWatch("my-watch", false).get();
boolean active = activateResponse.getStatus().state().isActive();
assert !active;
--------------------------------------------------
The new state of the watch is returned as part of its overall status.

View File

@ -11,6 +11,10 @@ include::rest/execute-watch.asciidoc[]
include::rest/ack-watch.asciidoc[] include::rest/ack-watch.asciidoc[]
include::rest/activate-watch.asciidoc[]
include::rest/deactivate-watch.asciidoc[]
include::rest/info.asciidoc[] include::rest/info.asciidoc[]
include::rest/stats.asciidoc[] include::rest/stats.asciidoc[]

View File

@ -0,0 +1,52 @@
[[api-rest-activate-watch]]
==== Activate Watch API
A watch can be either <<watch-active-state, active or inactive>>. This API enables
you to activate a currently inactive watch.
The status of an inactive watch is returned with the watch definition
when you call the <<api-rest-get-watch, Get Watch API>>:
[source,json]
--------------------------------------------------
GET _watcher/watch/<watch_id>
--------------------------------------------------
// AUTOSENSE
[source,js]
--------------------------------------------------
"_status": {
"state" : {
"active" : false,
"timestamp" : "2015-08-20T12:21:32.734Z"
}
"actions": {
...
}
}
}
--------------------------------------------------
You can activate the watch by executing the following API call:
[source,json]
--------------------------------------------------
PUT _watcher/watch/<watch_id>/_activate
--------------------------------------------------
// AUTOSENSE
The new state of the watch is returned as part of its overall status.
[source,js]
--------------------------------------------------
"_status": {
"state" : {
"active" : true,
"timestamp" : "2015-09-04T08:39:46.816Z"
}
"actions": {
...
}
}
}
--------------------------------------------------

View File

@ -0,0 +1,52 @@
[[api-rest-deactivate-watch]]
==== Deactivate Watch API
A watch can be either <<watch-active-state, active or inactive>>. This API enables
you to deactivate a currently active watch.
The status of an active watch is returned with the watch definition
when you call the <<api-rest-get-watch, Get Watch API>>:
[source,json]
--------------------------------------------------
GET _watcher/watch/<watch_id>
--------------------------------------------------
// AUTOSENSE
[source,js]
--------------------------------------------------
"_status": {
"state" : {
"active" : true,
"timestamp" : "2015-08-20T12:21:32.734Z"
}
"actions": {
...
}
}
}
--------------------------------------------------
You can deactivate the watch by executing the following API call:
[source,json]
--------------------------------------------------
PUT _watcher/watch/<watch_id>/_deactivate
--------------------------------------------------
// AUTOSENSE
The new state of the watch is returned as part of its overall status.
[source,js]
--------------------------------------------------
"_status": {
"state" : {
"active" : false,
"timestamp" : "2015-09-04T08:39:46.816Z"
}
"actions": {
...
}
}
}
--------------------------------------------------

View File

@ -92,4 +92,16 @@ The following snippet shows how to change the default timeout of the put action
-------------------------------------------------- --------------------------------------------------
PUT _watcher/watch/my-watch?master_timeout=30s PUT _watcher/watch/my-watch?master_timeout=30s
-------------------------------------------------- --------------------------------------------------
// AUTOSENSE
[[api-rest-put-watch-active-state]]
===== Controlling Default Active State
When adding a watch you can also define its initial <<watch-active-state, active state>>. You do that by
setting the `active` parameter. The following command add a watch and sets it to be inactive by default:
[source,js]
--------------------------------------------------
PUT _watcher/watch/my-watch?active=false
--------------------------------------------------
NOTE: If you omit the `active` parameter, the watch is set to the active state by default.

View File

@ -0,0 +1,24 @@
{
"watcher.activate_watch": {
"documentation": "https://www.elastic.co/guide/en/watcher/current/api-rest.html#api-rest-activate-watch",
"methods": [ "PUT", "POST" ],
"url": {
"path": "/_watcher/watch/{watch_id}/_activate",
"paths": [ "/_watcher/watch/{watch_id}/_activate" ],
"parts": {
"watch_id": {
"type" : "string",
"description" : "Watch ID",
"required" : true
}
},
"params": {
"master_timeout": {
"type": "duration",
"description": "Specify timeout for watch write operation"
}
}
},
"body": null
}
}

View File

@ -0,0 +1,24 @@
{
"watcher.deactivate_watch": {
"documentation": "https://www.elastic.co/guide/en/watcher/current/api-rest.html#api-rest-deactivate-watch",
"methods": [ "PUT", "POST" ],
"url": {
"path": "/_watcher/watch/{watch_id}/_deactivate",
"paths": [ "/_watcher/watch/{watch_id}/_deactivate" ],
"parts": {
"watch_id": {
"type" : "string",
"description" : "Watch ID",
"required" : true
}
},
"params": {
"master_timeout": {
"type": "duration",
"description": "Specify timeout for watch write operation"
}
}
},
"body": null
}
}

View File

@ -16,6 +16,10 @@
"master_timeout": { "master_timeout": {
"type": "duration", "type": "duration",
"description": "Specify timeout for watch write operation" "description": "Specify timeout for watch write operation"
},
"active": {
"type": "boolean",
"description": "Specify whether the watch is in/active by default"
} }
} }
}, },

View File

@ -0,0 +1,81 @@
---
"Test activate watch api":
- do:
cluster.health:
wait_for_status: yellow
- do:
watcher.put_watch:
id: "my_watch"
master_timeout: "40s"
body: >
{
"trigger" : {
"schedule" : { "cron" : "0 0 0 1 * ? 2099" }
},
"input": {
"simple": {
"payload": {
"send": "yes"
}
}
},
"condition": {
"always": {}
},
"actions": {
"test_index": {
"index": {
"index": "test",
"doc_type": "test2"
}
}
}
}
- match: { _id: "my_watch" }
- do:
cluster.health:
wait_for_status: yellow
- do:
watcher.get_watch:
id: "my_watch"
- match: { found : true}
- match: { _id: "my_watch" }
- match: { _status.state.active: true }
- do:
watcher.deactivate_watch:
watch_id: "my_watch"
- match: { "_status.state.active" : false }
- do:
watcher.get_watch:
id: "my_watch"
- match: { found : true}
- match: { _id: "my_watch" }
- match: { _status.state.active: false }
- do:
watcher.activate_watch:
watch_id: "my_watch"
- match: { "_status.state.active" : true }
- do:
watcher.get_watch:
id: "my_watch"
- match: { found : true}
- match: { _id: "my_watch" }
- match: { _status.state.active: true }
- do:
watcher.delete_watch:
id: "my_watch"
- match: { found: true }

View File

@ -0,0 +1,49 @@
---
"Test put inactive watch":
- do:
cluster.health:
wait_for_status: green
- do:
watcher.put_watch:
id: "my_watch"
master_timeout: "40s"
active: false
body: >
{
"trigger": {
"schedule": {
"hourly": {
"minute": [ 0, 5 ]
}
}
},
"input": {
"simple": {
"payload": {
"send": "yes"
}
}
},
"condition": {
"always": {}
},
"actions": {
"test_index": {
"index": {
"index": "test",
"doc_type": "test2"
}
}
}
}
- match: { _id: "my_watch" }
- do:
watcher.get_watch:
id: "my_watch"
- match: { found : true }
- match: { _id: "my_watch" }
- match: { _status.version: 1 }
- match: { _status.state.active: false }

View File

@ -46,6 +46,8 @@ import org.elasticsearch.watcher.support.validation.WatcherSettingsValidation;
import org.elasticsearch.watcher.transform.TransformModule; import org.elasticsearch.watcher.transform.TransformModule;
import org.elasticsearch.watcher.transport.actions.ack.AckWatchAction; import org.elasticsearch.watcher.transport.actions.ack.AckWatchAction;
import org.elasticsearch.watcher.transport.actions.ack.TransportAckWatchAction; import org.elasticsearch.watcher.transport.actions.ack.TransportAckWatchAction;
import org.elasticsearch.watcher.transport.actions.activate.ActivateWatchAction;
import org.elasticsearch.watcher.transport.actions.activate.TransportActivateWatchAction;
import org.elasticsearch.watcher.transport.actions.delete.DeleteWatchAction; import org.elasticsearch.watcher.transport.actions.delete.DeleteWatchAction;
import org.elasticsearch.watcher.transport.actions.delete.TransportDeleteWatchAction; import org.elasticsearch.watcher.transport.actions.delete.TransportDeleteWatchAction;
import org.elasticsearch.watcher.transport.actions.execute.ExecuteWatchAction; import org.elasticsearch.watcher.transport.actions.execute.ExecuteWatchAction;
@ -62,7 +64,6 @@ import org.elasticsearch.watcher.trigger.TriggerModule;
import org.elasticsearch.watcher.trigger.schedule.ScheduleModule; import org.elasticsearch.watcher.trigger.schedule.ScheduleModule;
import org.elasticsearch.watcher.watch.WatchModule; import org.elasticsearch.watcher.watch.WatchModule;
import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
@ -98,10 +99,10 @@ public class WatcherPlugin extends Plugin {
@Override @Override
public Collection<Module> nodeModules() { public Collection<Module> nodeModules() {
if (enabled == false) { if (!enabled || transportClient) {
return Collections.emptyList(); return Collections.emptyList();
} else if (transportClient == false){ }
return Arrays.<Module>asList( return Arrays.<Module>asList(
new WatcherModule(settings), new WatcherModule(settings),
new InitializingModule(), new InitializingModule(),
new LicenseModule(), new LicenseModule(),
@ -120,14 +121,12 @@ public class WatcherPlugin extends Plugin {
new ExecutionModule(), new ExecutionModule(),
new WatcherShieldModule(settings), new WatcherShieldModule(settings),
new SecretModule(settings)); new SecretModule(settings));
}
return Collections.emptyList();
} }
@Override @Override
public Collection<Class<? extends LifecycleComponent>> nodeServices() { public Collection<Class<? extends LifecycleComponent>> nodeServices() {
if (!enabled || transportClient) { if (!enabled || transportClient) {
return Collections.EMPTY_SET; return Collections.emptyList();
} }
return Arrays.<Class<? extends LifecycleComponent>>asList( return Arrays.<Class<? extends LifecycleComponent>>asList(
// the initialization service must be first in the list // the initialization service must be first in the list
@ -156,7 +155,7 @@ public class WatcherPlugin extends Plugin {
public void onModule(ScriptModule module) { public void onModule(ScriptModule module) {
module.registerScriptContext(ScriptServiceProxy.INSTANCE); module.registerScriptContext(ScriptServiceProxy.INSTANCE);
if (enabled && transportClient == false) { if (enabled && !transportClient) {
module.addScriptEngine(XMustacheScriptEngineService.class); module.addScriptEngine(XMustacheScriptEngineService.class);
} }
} }
@ -168,7 +167,7 @@ public class WatcherPlugin extends Plugin {
} }
public void onModule(RestModule module) { public void onModule(RestModule module) {
if (enabled && transportClient == false) { if (enabled && !transportClient) {
module.addRestAction(RestPutWatchAction.class); module.addRestAction(RestPutWatchAction.class);
module.addRestAction(RestDeleteWatchAction.class); module.addRestAction(RestDeleteWatchAction.class);
module.addRestAction(RestWatcherStatsAction.class); module.addRestAction(RestWatcherStatsAction.class);
@ -176,6 +175,7 @@ public class WatcherPlugin extends Plugin {
module.addRestAction(RestGetWatchAction.class); module.addRestAction(RestGetWatchAction.class);
module.addRestAction(RestWatchServiceAction.class); module.addRestAction(RestWatchServiceAction.class);
module.addRestAction(RestAckWatchAction.class); module.addRestAction(RestAckWatchAction.class);
module.addRestAction(RestActivateWatchAction.class);
module.addRestAction(RestExecuteWatchAction.class); module.addRestAction(RestExecuteWatchAction.class);
module.addRestAction(RestHijackOperationAction.class); module.addRestAction(RestHijackOperationAction.class);
} }
@ -188,6 +188,7 @@ public class WatcherPlugin extends Plugin {
module.registerAction(GetWatchAction.INSTANCE, TransportGetWatchAction.class); module.registerAction(GetWatchAction.INSTANCE, TransportGetWatchAction.class);
module.registerAction(WatcherStatsAction.INSTANCE, TransportWatcherStatsAction.class); module.registerAction(WatcherStatsAction.INSTANCE, TransportWatcherStatsAction.class);
module.registerAction(AckWatchAction.INSTANCE, TransportAckWatchAction.class); module.registerAction(AckWatchAction.INSTANCE, TransportAckWatchAction.class);
module.registerAction(ActivateWatchAction.INSTANCE, TransportActivateWatchAction.class);
module.registerAction(WatcherServiceAction.INSTANCE, TransportWatcherServiceAction.class); module.registerAction(WatcherServiceAction.INSTANCE, TransportWatcherServiceAction.class);
module.registerAction(ExecuteWatchAction.INSTANCE, TransportExecuteWatchAction.class); module.registerAction(ExecuteWatchAction.INSTANCE, TransportExecuteWatchAction.class);
} }

View File

@ -23,6 +23,7 @@ import org.elasticsearch.watcher.watch.Watch;
import org.elasticsearch.watcher.watch.WatchLockService; import org.elasticsearch.watcher.watch.WatchLockService;
import org.elasticsearch.watcher.watch.WatchStatus; import org.elasticsearch.watcher.watch.WatchStatus;
import org.elasticsearch.watcher.watch.WatchStore; import org.elasticsearch.watcher.watch.WatchStore;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone; import org.joda.time.DateTimeZone;
import org.joda.time.PeriodType; import org.joda.time.PeriodType;
@ -66,7 +67,8 @@ public class WatcherService extends AbstractComponent {
// Try to load watch store before the execution service, b/c action depends on watch store // Try to load watch store before the execution service, b/c action depends on watch store
watchStore.start(clusterState); watchStore.start(clusterState);
executionService.start(clusterState); executionService.start(clusterState);
triggerService.start(watchStore.watches().values());
triggerService.start(watchStore.activeWatches());
state.set(WatcherState.STARTED); state.set(WatcherState.STARTED);
logger.info("watch service has started"); logger.info("watch service has started");
} catch (Exception e) { } catch (Exception e) {
@ -122,17 +124,40 @@ public class WatcherService extends AbstractComponent {
} }
} }
public IndexResponse putWatch(String id, BytesReference watchSource, TimeValue timeout) throws IOException { public IndexResponse putWatch(String id, BytesReference watchSource, TimeValue timeout, boolean active) throws IOException {
ensureStarted(); ensureStarted();
WatchLockService.Lock lock = watchLockService.tryAcquire(id, timeout); WatchLockService.Lock lock = watchLockService.tryAcquire(id, timeout);
if (lock == null) { if (lock == null) {
throw new ElasticsearchTimeoutException("could not put watch [{}] within [{}]... wait and try again. If this error continues to occur there is a high chance that the watch execution is stuck (either due to unresponsive external system such as an email service, or due to a bad script", id, timeout.format(PeriodType.seconds())); throw new ElasticsearchTimeoutException("could not put watch [{}] within [{}]... wait and try again. If this error continues to occur there is a high chance that the watch execution is stuck (either due to unresponsive external system such as an email service, or due to a bad script", id, timeout.format(PeriodType.seconds()));
} }
try { try {
Watch watch = watchParser.parseWithSecrets(id, false, watchSource); DateTime now = clock.nowUTC();
Watch watch = watchParser.parseWithSecrets(id, false, watchSource, now);
watch.setState(active, now);
WatchStore.WatchPut result = watchStore.put(watch); WatchStore.WatchPut result = watchStore.put(watch);
if (result.previous() == null || !result.previous().trigger().equals(result.current().trigger())) {
triggerService.add(result.current()); if (result.previous() == null) {
// this is a newly created watch, so we only need to schedule it if it's active
if (result.current().status().state().isActive()) {
triggerService.add(result.current());
}
} else if (result.current().status().state().isActive()) {
if (!result.previous().status().state().isActive()) {
// the replaced watch was inactive, which means it wasn't scheduled. The new watch is active
// so we need to schedule it
triggerService.add(result.current());
} else if (!result.previous().trigger().equals(result.current().trigger())) {
// the previous watch was active and its schedule is different than the schedule of the
// new watch, so we need to
triggerService.add(result.current());
}
} else {
// if the current is inactive, we'll just remove it from the trigger service
// just to be safe
triggerService.remove(result.current().id());
} }
return result.indexResponse(); return result.indexResponse();
} finally { } finally {
@ -184,6 +209,60 @@ public class WatcherService extends AbstractComponent {
} }
} }
public WatchStatus activateWatch(String id, TimeValue timeout) throws IOException {
return setWatchState(id, true, timeout);
}
public WatchStatus deactivateWatch(String id, TimeValue timeout) throws IOException {
return setWatchState(id, false, timeout);
}
WatchStatus setWatchState(String id, boolean active, TimeValue timeout) throws IOException {
ensureStarted();
WatchLockService.Lock lock = watchLockService.tryAcquire(id, timeout);
if (lock == null) {
throw new ElasticsearchTimeoutException("could not ack watch [{}] within [{}]... wait and try again. If this error continues to occur there is a high chance that the watch execution is stuck (either due to unresponsive external system such as an email service, or due to a bad script", id, timeout.format(PeriodType.seconds()));
}
// for now, when a watch is deactivated we don't remove its runtime representation
// that is, the store will still keep the watch in memory. We only mark the watch
// as inactive (both in runtime and also update the watch in the watches index)
// and remove the watch from the trigger service, such that it will not be triggered
// nor its trigger be evaluated.
//
// later on we can consider removing the watch runtime representation from memory
// as well. This will mean that the in-memory loaded watches will no longer be a
// complete representation of the watches in the index. This requires careful thought
// to make sure, such incompleteness doesn't hurt any other part of watcher (we need
// to run this exercise anyway... and make sure that nothing in watcher relies on the
// fact that the watch store holds all watches in memory.
try {
Watch watch = watchStore.get(id);
if (watch == null) {
throw illegalArgument("watch [{}] does not exist", id);
}
if (watch.setState(active, clock.nowUTC())) {
try {
watchStore.updateStatus(watch);
if (active) {
triggerService.add(watch);
} else {
triggerService.remove(watch.id());
}
} catch (IOException ioe) {
throw ioException("failed to update the watch [{}] on ack", ioe, watch.id());
} catch (VersionConflictEngineException vcee) {
throw illegalState("failed to update the watch [{}] on ack, perhaps it was force deleted", vcee, watch.id());
}
}
// we need to create a safe copy of the status
return new WatchStatus(watch.status());
} finally {
lock.release();
}
}
public long watchesCount() { public long watchesCount() {
return watchStore.watches().size(); return watchStore.watches().size();
} }

View File

@ -17,7 +17,7 @@ import org.elasticsearch.watcher.actions.email.service.Email;
import org.elasticsearch.watcher.actions.email.service.EmailTemplate; import org.elasticsearch.watcher.actions.email.service.EmailTemplate;
import org.elasticsearch.watcher.actions.email.service.Profile; import org.elasticsearch.watcher.actions.email.service.Profile;
import org.elasticsearch.watcher.support.secret.Secret; import org.elasticsearch.watcher.support.secret.Secret;
import org.elasticsearch.watcher.support.secret.SensitiveXContentParser; import org.elasticsearch.watcher.support.xcontent.WatcherXContentParser;
import org.elasticsearch.watcher.support.xcontent.WatcherParams; import org.elasticsearch.watcher.support.xcontent.WatcherParams;
import java.io.IOException; import java.io.IOException;
@ -141,7 +141,7 @@ public class EmailAction implements Action {
} else if (ParseFieldMatcher.STRICT.match(currentFieldName, Field.USER)) { } else if (ParseFieldMatcher.STRICT.match(currentFieldName, Field.USER)) {
user = parser.text(); user = parser.text();
} else if (ParseFieldMatcher.STRICT.match(currentFieldName, Field.PASSWORD)) { } else if (ParseFieldMatcher.STRICT.match(currentFieldName, Field.PASSWORD)) {
password = SensitiveXContentParser.secretOrNull(parser); password = WatcherXContentParser.secretOrNull(parser);
} else if (ParseFieldMatcher.STRICT.match(currentFieldName, Field.PROFILE)) { } else if (ParseFieldMatcher.STRICT.match(currentFieldName, Field.PROFILE)) {
try { try {
profile = Profile.resolve(parser.text()); profile = Profile.resolve(parser.text());

View File

@ -319,7 +319,7 @@ public class HipChatMessage implements ToXContent {
throw new ElasticsearchParseException("failed to parse hipchat message. failed to parse [{}] field", ilae, Field.FORMAT.getPreferredName()); throw new ElasticsearchParseException("failed to parse hipchat message. failed to parse [{}] field", ilae, Field.FORMAT.getPreferredName());
} }
} else { } else {
throw new ElasticsearchParseException("failed to parse hipchat message. unexpected token [{}]", token); throw new ElasticsearchParseException("failed to parse hipchat message. unexpected field [{}]", currentFieldName);
} }
} }

View File

@ -14,6 +14,10 @@ import org.elasticsearch.watcher.transport.actions.ack.AckWatchAction;
import org.elasticsearch.watcher.transport.actions.ack.AckWatchRequest; import org.elasticsearch.watcher.transport.actions.ack.AckWatchRequest;
import org.elasticsearch.watcher.transport.actions.ack.AckWatchRequestBuilder; import org.elasticsearch.watcher.transport.actions.ack.AckWatchRequestBuilder;
import org.elasticsearch.watcher.transport.actions.ack.AckWatchResponse; import org.elasticsearch.watcher.transport.actions.ack.AckWatchResponse;
import org.elasticsearch.watcher.transport.actions.activate.ActivateWatchAction;
import org.elasticsearch.watcher.transport.actions.activate.ActivateWatchRequest;
import org.elasticsearch.watcher.transport.actions.activate.ActivateWatchRequestBuilder;
import org.elasticsearch.watcher.transport.actions.activate.ActivateWatchResponse;
import org.elasticsearch.watcher.transport.actions.delete.DeleteWatchAction; import org.elasticsearch.watcher.transport.actions.delete.DeleteWatchAction;
import org.elasticsearch.watcher.transport.actions.delete.DeleteWatchRequest; import org.elasticsearch.watcher.transport.actions.delete.DeleteWatchRequest;
import org.elasticsearch.watcher.transport.actions.delete.DeleteWatchRequestBuilder; import org.elasticsearch.watcher.transport.actions.delete.DeleteWatchRequestBuilder;
@ -217,7 +221,7 @@ public class WatcherClient {
} }
/** /**
* Acks an watch * Acks a watch
* *
* @param request The ack request with the watch id to be acked * @param request The ack request with the watch id to be acked
* @return The AckWatchResponse * @return The AckWatchResponse
@ -226,6 +230,37 @@ public class WatcherClient {
return client.execute(AckWatchAction.INSTANCE, request); return client.execute(AckWatchAction.INSTANCE, request);
} }
/**
* Creates a request builder to activate a watch by id
*
* @param id the id of the watch
* @param activate indicates whether to activate or deactivate the watch
* @return The request builder
*/
public ActivateWatchRequestBuilder prepareActivateWatch(String id, boolean activate) {
return new ActivateWatchRequestBuilder(client, id, activate);
}
/**
* Activate a watch
*
* @param request The activate request with the watch id
* @param listener The listener for the activate watch response
*/
public void activateWatch(ActivateWatchRequest request, ActionListener<ActivateWatchResponse> listener) {
client.execute(ActivateWatchAction.INSTANCE, request, listener);
}
/**
* Activates a watch
*
* @param request The de/activate request with the watch id.
*/
public ActionFuture<ActivateWatchResponse> activateWatch(ActivateWatchRequest request) {
return client.execute(ActivateWatchAction.INSTANCE, request);
}
/** /**
* Prepare a watch service request. * Prepare a watch service request.
*/ */

View File

@ -0,0 +1,69 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.watcher.rest.action;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.rest.*;
import org.elasticsearch.rest.action.support.RestBuilderListener;
import org.elasticsearch.watcher.client.WatcherClient;
import org.elasticsearch.watcher.rest.WatcherRestHandler;
import org.elasticsearch.watcher.support.xcontent.WatcherParams;
import org.elasticsearch.watcher.transport.actions.activate.ActivateWatchRequest;
import org.elasticsearch.watcher.transport.actions.activate.ActivateWatchResponse;
import org.elasticsearch.watcher.watch.Watch;
/**
* The rest action to de/activate a watch
*/
public class RestActivateWatchAction extends WatcherRestHandler {
@Inject
protected RestActivateWatchAction(Settings settings, RestController controller, Client client) {
super(settings, controller, client);
controller.registerHandler(RestRequest.Method.PUT, URI_BASE + "/watch/{id}/_activate", this);
controller.registerHandler(RestRequest.Method.POST, URI_BASE + "/watch/{id}/_activate", this);
DeactivateRestHandler deactivateRestHandler = new DeactivateRestHandler(settings, controller, client);
controller.registerHandler(RestRequest.Method.PUT, URI_BASE + "/watch/{id}/_deactivate", deactivateRestHandler);
controller.registerHandler(RestRequest.Method.POST, URI_BASE + "/watch/{id}/_deactivate", deactivateRestHandler);
}
@Override
protected void handleRequest(RestRequest request, RestChannel channel, WatcherClient client) throws Exception {
String watchId = request.param("id");
client.activateWatch(new ActivateWatchRequest(watchId, true), new RestBuilderListener<ActivateWatchResponse>(channel) {
@Override
public RestResponse buildResponse(ActivateWatchResponse response, XContentBuilder builder) throws Exception {
return new BytesRestResponse(RestStatus.OK, builder.startObject()
.field(Watch.Field.STATUS.getPreferredName(), response.getStatus(), WatcherParams.HIDE_SECRETS)
.endObject());
}
});
}
static class DeactivateRestHandler extends WatcherRestHandler {
public DeactivateRestHandler(Settings settings, RestController controller, Client client) {
super(settings, controller, client);
}
@Override
protected void handleRequest(RestRequest request, RestChannel channel, WatcherClient client) throws Exception {
String watchId = request.param("id");
client.activateWatch(new ActivateWatchRequest(watchId, false), new RestBuilderListener<ActivateWatchResponse>(channel) {
@Override
public RestResponse buildResponse(ActivateWatchResponse response, XContentBuilder builder) throws Exception {
return new BytesRestResponse(RestStatus.OK, builder.startObject()
.field(Watch.Field.STATUS.getPreferredName(), response.getStatus(), WatcherParams.HIDE_SECRETS)
.endObject());
}
});
}
}
}

View File

@ -36,6 +36,7 @@ public class RestPutWatchAction extends WatcherRestHandler {
protected void handleRequest(final RestRequest request, RestChannel channel, WatcherClient client) throws Exception { protected void handleRequest(final RestRequest request, RestChannel channel, WatcherClient client) throws Exception {
PutWatchRequest putWatchRequest = new PutWatchRequest(request.param("id"), request.content()); PutWatchRequest putWatchRequest = new PutWatchRequest(request.param("id"), request.content());
putWatchRequest.masterNodeTimeout(request.paramAsTime("master_timeout", putWatchRequest.masterNodeTimeout())); putWatchRequest.masterNodeTimeout(request.paramAsTime("master_timeout", putWatchRequest.masterNodeTimeout()));
putWatchRequest.setActive(request.paramAsBoolean("active", putWatchRequest.isActive()));
client.putWatch(putWatchRequest, new RestBuilderListener<PutWatchResponse>(channel) { client.putWatch(putWatchRequest, new RestBuilderListener<PutWatchResponse>(channel) {
@Override @Override
public RestResponse buildResponse(PutWatchResponse response, XContentBuilder builder) throws Exception { public RestResponse buildResponse(PutWatchResponse response, XContentBuilder builder) throws Exception {

View File

@ -107,6 +107,14 @@ public class WatcherDateTimeUtils {
return builder.field(fieldName, formatDate(date)); return builder.field(fieldName, formatDate(date));
} }
public static void writeDate(StreamOutput out, DateTime date) throws IOException {
out.writeLong(date.getMillis());
}
public static DateTime readDate(StreamInput in, DateTimeZone timeZone) throws IOException {
return new DateTime(in.readLong(), timeZone);
}
public static void writeOptionalDate(StreamOutput out, DateTime date) throws IOException { public static void writeOptionalDate(StreamOutput out, DateTime date) throws IOException {
if (date == null) { if (date == null) {
out.writeBoolean(false); out.writeBoolean(false);

View File

@ -0,0 +1,47 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.watcher.support.clock;
import org.elasticsearch.common.unit.TimeValue;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
/**
*
*/
public class HaltedClock implements Clock {
private final DateTime now;
public HaltedClock(DateTime now) {
this.now = now.toDateTime(DateTimeZone.UTC);
}
@Override
public long millis() {
return now.getMillis();
}
@Override
public long nanos() {
return millis() * 1000000;
}
@Override
public DateTime nowUTC() {
return now;
}
@Override
public DateTime now(DateTimeZone timeZone) {
return now.toDateTime(timeZone);
}
@Override
public TimeValue timeElapsedSince(DateTime time) {
return TimeValue.timeValueMillis(millis() - time.getMillis());
}
}

View File

@ -11,7 +11,7 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.watcher.support.http.auth.HttpAuth; import org.elasticsearch.watcher.support.http.auth.HttpAuth;
import org.elasticsearch.watcher.support.secret.Secret; import org.elasticsearch.watcher.support.secret.Secret;
import org.elasticsearch.watcher.support.secret.SensitiveXContentParser; import org.elasticsearch.watcher.support.xcontent.WatcherXContentParser;
import org.elasticsearch.watcher.support.xcontent.WatcherParams; import org.elasticsearch.watcher.support.xcontent.WatcherParams;
import java.io.IOException; import java.io.IOException;
@ -85,7 +85,7 @@ public class BasicAuth implements HttpAuth {
if (Field.USERNAME.getPreferredName().equals(fieldName)) { if (Field.USERNAME.getPreferredName().equals(fieldName)) {
username = parser.text(); username = parser.text();
} else if (Field.PASSWORD.getPreferredName().equals(fieldName)) { } else if (Field.PASSWORD.getPreferredName().equals(fieldName)) {
password = SensitiveXContentParser.secret(parser); password = WatcherXContentParser.secret(parser);
} else { } else {
throw new ElasticsearchParseException("unsupported field [" + fieldName + "]"); throw new ElasticsearchParseException("unsupported field [" + fieldName + "]");
} }

View File

@ -3,28 +3,41 @@
* or more contributor license agreements. Licensed under the Elastic License; * or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License. * you may not use this file except in compliance with the Elastic License.
*/ */
package org.elasticsearch.watcher.support.secret; package org.elasticsearch.watcher.support.xcontent;
import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.BytesRef;
import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.xcontent.XContentLocation; import org.elasticsearch.common.xcontent.XContentLocation;
import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.watcher.support.clock.Clock;
import org.elasticsearch.watcher.support.clock.SystemClock;
import org.elasticsearch.watcher.support.secret.Secret;
import org.elasticsearch.watcher.support.secret.SecretService;
import javax.annotation.Nullable;
import java.io.IOException; import java.io.IOException;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
/** /**
* * A xcontent parser that is used by watcher. This is a special parser that is
* aware of watcher services. In particular, it's aware of the used {@link Clock}
* and the {@link SecretService}. The former (clock) may be used when the current time
* is required during the parse phase of construct. The latter (secret service) is used
* to convert secret values (e.g. passwords, security tokens, etc..) to {@link Secret}s.
* {@link Secret}s are encrypted values that are stored in memory and are decrypted
* on demand when needed.
*/ */
public class SensitiveXContentParser implements XContentParser { public class WatcherXContentParser implements XContentParser {
public static Secret secret(XContentParser parser) throws IOException { public static Secret secret(XContentParser parser) throws IOException {
char[] chars = parser.text().toCharArray(); char[] chars = parser.text().toCharArray();
if (parser instanceof SensitiveXContentParser) { if (parser instanceof WatcherXContentParser) {
chars = ((SensitiveXContentParser) parser).secretService.encrypt(chars); WatcherXContentParser watcherParser = (WatcherXContentParser) parser;
return new Secret(chars); if (watcherParser.secretService != null) {
chars = watcherParser.secretService.encrypt(chars);
}
} }
return new Secret(chars); return new Secret(chars);
} }
@ -35,17 +48,29 @@ public class SensitiveXContentParser implements XContentParser {
return null; return null;
} }
char[] chars = parser.text().toCharArray(); char[] chars = parser.text().toCharArray();
if (parser instanceof SensitiveXContentParser) { if (parser instanceof WatcherXContentParser) {
chars = ((SensitiveXContentParser) parser).secretService.encrypt(text.toCharArray()); WatcherXContentParser watcherParser = (WatcherXContentParser) parser;
if (watcherParser.secretService != null) {
chars = watcherParser.secretService.encrypt(text.toCharArray());
}
return new Secret(chars); return new Secret(chars);
} }
return new Secret(chars); return new Secret(chars);
} }
private final XContentParser parser; public static Clock clock(XContentParser parser) {
private final SecretService secretService; if (parser instanceof WatcherXContentParser) {
return ((WatcherXContentParser) parser).clock;
}
return SystemClock.INSTANCE;
}
public SensitiveXContentParser(XContentParser parser, SecretService secretService) { private final Clock clock;
private final XContentParser parser;
private final @Nullable SecretService secretService;
public WatcherXContentParser(XContentParser parser, Clock clock, @Nullable SecretService secretService) {
this.clock = clock;
this.parser = parser; this.parser = parser;
this.secretService = secretService; this.secretService = secretService;
} }

View File

@ -14,7 +14,8 @@ import org.elasticsearch.watcher.watch.WatchStatus;
import java.io.IOException; import java.io.IOException;
/** /**
* This class contains the ackState of the watch, if the watch was successfully acked this will be ACK * This class contains the status of the watch. If the watch was successfully acked
* this will be reflected in the watch status.
*/ */
public class AckWatchResponse extends ActionResponse { public class AckWatchResponse extends ActionResponse {
@ -28,7 +29,7 @@ public class AckWatchResponse extends ActionResponse {
} }
/** /**
* @return The ack state for the watch * @return The watch status
*/ */
public WatchStatus getStatus() { public WatchStatus getStatus() {
return status; return status;

View File

@ -24,7 +24,7 @@ import org.elasticsearch.watcher.watch.WatchStatus;
import org.elasticsearch.watcher.watch.WatchStore; import org.elasticsearch.watcher.watch.WatchStore;
/** /**
* Performs the delete operation. * Performs the ack operation.
*/ */
public class TransportAckWatchAction extends WatcherTransportAction<AckWatchRequest, AckWatchResponse> { public class TransportAckWatchAction extends WatcherTransportAction<AckWatchRequest, AckWatchResponse> {

View File

@ -0,0 +1,32 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.watcher.transport.actions.activate;
import org.elasticsearch.action.Action;
import org.elasticsearch.client.ElasticsearchClient;
/**
* This action acks a watch in memory, and the index
*/
public class ActivateWatchAction extends Action<ActivateWatchRequest, ActivateWatchResponse, ActivateWatchRequestBuilder> {
public static final ActivateWatchAction INSTANCE = new ActivateWatchAction();
public static final String NAME = "cluster:admin/watcher/watch/activate";
private ActivateWatchAction() {
super(NAME);
}
@Override
public ActivateWatchResponse newResponse() {
return new ActivateWatchResponse();
}
@Override
public ActivateWatchRequestBuilder newRequestBuilder(ElasticsearchClient client) {
return new ActivateWatchRequestBuilder(client);
}
}

View File

@ -0,0 +1,86 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.watcher.transport.actions.activate;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ValidateActions;
import org.elasticsearch.action.support.master.MasterNodeRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.watcher.support.validation.Validation;
import java.io.IOException;
/**
* A ack watch request to ack a watch by name (id)
*/
public class ActivateWatchRequest extends MasterNodeRequest<ActivateWatchRequest> {
private static final TimeValue DEFAULT_TIMEOUT = TimeValue.timeValueSeconds(10);
private String watchId;
private boolean activate;
public ActivateWatchRequest() {
this(null, true);
}
public ActivateWatchRequest(String watchId, boolean activate) {
this.watchId = watchId;
this.activate = activate;
masterNodeTimeout(DEFAULT_TIMEOUT);
}
/**
* @return The id of the watch to be acked
*/
public String getWatchId() {
return watchId;
}
/**
* @return {@code true} if the request is for activating the watch, {@code false} if its
* for deactivating it.
*/
public boolean isActivate() {
return activate;
}
@Override
public ActionRequestValidationException validate() {
ActionRequestValidationException validationException = null;
if (watchId == null){
validationException = ValidateActions.addValidationError("watch id is missing", validationException);
}
Validation.Error error = Validation.watchId(watchId);
if (error != null) {
validationException = ValidateActions.addValidationError(error.message(), validationException);
}
return validationException;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
watchId = in.readString();
activate = in.readBoolean();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(watchId);
out.writeBoolean(activate);
}
@Override
public String toString() {
return activate ?
"activate [" + watchId + "]" :
"deactivate [" + watchId + "]";
}
}

View File

@ -0,0 +1,24 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.watcher.transport.actions.activate;
import org.elasticsearch.action.support.master.MasterNodeOperationRequestBuilder;
import org.elasticsearch.client.ElasticsearchClient;
/**
* A activate watch action request builder.
*/
public class ActivateWatchRequestBuilder extends MasterNodeOperationRequestBuilder<ActivateWatchRequest, ActivateWatchResponse, ActivateWatchRequestBuilder> {
public ActivateWatchRequestBuilder(ElasticsearchClient client) {
super(client, ActivateWatchAction.INSTANCE, new ActivateWatchRequest());
}
public ActivateWatchRequestBuilder(ElasticsearchClient client, String id, boolean activate) {
super(client, ActivateWatchAction.INSTANCE, new ActivateWatchRequest(id, activate));
}
}

View File

@ -0,0 +1,52 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.watcher.transport.actions.activate;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.watcher.watch.WatchStatus;
import java.io.IOException;
/**
* This class contains the status of the watch. If the watch was successfully de/activates
* this will reflected the new state of the watch.
*/
public class ActivateWatchResponse extends ActionResponse {
private WatchStatus status;
public ActivateWatchResponse() {
}
public ActivateWatchResponse(@Nullable WatchStatus status) {
this.status = status;
}
/**
* @return The watch status
*/
public WatchStatus getStatus() {
return status;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
status = in.readBoolean() ? WatchStatus.read(in) : null;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeBoolean(status != null);
if (status != null) {
status.writeTo(out);
}
}
}

View File

@ -0,0 +1,70 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.watcher.transport.actions.activate;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.watcher.WatcherService;
import org.elasticsearch.watcher.license.LicenseService;
import org.elasticsearch.watcher.transport.actions.WatcherTransportAction;
import org.elasticsearch.watcher.watch.WatchStatus;
import org.elasticsearch.watcher.watch.WatchStore;
/**
* Performs the watch de/activation operation.
*/
public class TransportActivateWatchAction extends WatcherTransportAction<ActivateWatchRequest, ActivateWatchResponse> {
private final WatcherService watcherService;
@Inject
public TransportActivateWatchAction(Settings settings, TransportService transportService, ClusterService clusterService,
ThreadPool threadPool, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
WatcherService watcherService, LicenseService licenseService) {
super(settings, ActivateWatchAction.NAME, transportService, clusterService, threadPool, actionFilters, indexNameExpressionResolver, licenseService, ActivateWatchRequest.class);
this.watcherService = watcherService;
}
@Override
protected String executor() {
return ThreadPool.Names.MANAGEMENT;
}
@Override
protected ActivateWatchResponse newResponse() {
return new ActivateWatchResponse();
}
@Override
protected void masterOperation(ActivateWatchRequest request, ClusterState state, ActionListener<ActivateWatchResponse> listener) throws ElasticsearchException {
try {
WatchStatus watchStatus = request.isActivate() ?
watcherService.activateWatch(request.getWatchId(), request.masterNodeTimeout()) :
watcherService.deactivateWatch(request.getWatchId(), request.masterNodeTimeout());
ActivateWatchResponse response = new ActivateWatchResponse(watchStatus);
listener.onResponse(response);
} catch (Exception e) {
listener.onFailure(e);
}
}
@Override
protected ClusterBlockException checkBlock(ActivateWatchRequest request, ClusterState state) {
return state.blocks().indexBlockedException(ClusterBlockLevel.WRITE, WatchStore.INDEX);
}
}

View File

@ -8,14 +8,13 @@ package org.elasticsearch.watcher.transport.actions.put;
import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ValidateActions; import org.elasticsearch.action.ValidateActions;
import org.elasticsearch.action.support.master.MasterNodeReadRequest;
import org.elasticsearch.action.support.master.MasterNodeRequest; import org.elasticsearch.action.support.master.MasterNodeRequest;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.watcher.client.WatchSourceBuilder;
import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.watcher.client.WatchSourceBuilder;
import org.elasticsearch.watcher.support.validation.Validation; import org.elasticsearch.watcher.support.validation.Validation;
import java.io.IOException; import java.io.IOException;
@ -30,6 +29,7 @@ public class PutWatchRequest extends MasterNodeRequest<PutWatchRequest> {
private String id; private String id;
private BytesReference source; private BytesReference source;
private boolean active = true;
PutWatchRequest() { PutWatchRequest() {
} }
@ -79,6 +79,20 @@ public class PutWatchRequest extends MasterNodeRequest<PutWatchRequest> {
this.source = source; this.source = source;
} }
/**
* @return The initial active state of the watch (defaults to {@code true}, e.g. "active")
*/
public boolean isActive() {
return active;
}
/**
* @return Sets the initial active state of the watch
*/
public void setActive(boolean active) {
this.active = active;
}
@Override @Override
public ActionRequestValidationException validate() { public ActionRequestValidationException validate() {
ActionRequestValidationException validationException = null; ActionRequestValidationException validationException = null;

View File

@ -5,12 +5,10 @@
*/ */
package org.elasticsearch.watcher.transport.actions.put; package org.elasticsearch.watcher.transport.actions.put;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.master.MasterNodeOperationRequestBuilder; import org.elasticsearch.action.support.master.MasterNodeOperationRequestBuilder;
import org.elasticsearch.client.ElasticsearchClient; import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.watcher.client.WatchSourceBuilder;
import org.elasticsearch.watcher.client.WatcherClient;
import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.watcher.client.WatchSourceBuilder;
public class PutWatchRequestBuilder extends MasterNodeOperationRequestBuilder<PutWatchRequest, PutWatchResponse, PutWatchRequestBuilder> { public class PutWatchRequestBuilder extends MasterNodeOperationRequestBuilder<PutWatchRequest, PutWatchResponse, PutWatchRequestBuilder> {
@ -46,4 +44,12 @@ public class PutWatchRequestBuilder extends MasterNodeOperationRequestBuilder<Pu
request.setSource(source); request.setSource(source);
return this; return this;
} }
/**
* @param active Sets whether the watcher is in/active by default
*/
public PutWatchRequestBuilder setActive(boolean active) {
request.setActive(active);
return this;
}
} }

View File

@ -50,7 +50,7 @@ public class TransportPutWatchAction extends WatcherTransportAction<PutWatchRequ
@Override @Override
protected void masterOperation(PutWatchRequest request, ClusterState state, ActionListener<PutWatchResponse> listener) throws ElasticsearchException { protected void masterOperation(PutWatchRequest request, ClusterState state, ActionListener<PutWatchResponse> listener) throws ElasticsearchException {
try { try {
IndexResponse indexResponse = watcherService.putWatch(request.getId(), request.getSource(), request.masterNodeTimeout()); IndexResponse indexResponse = watcherService.putWatch(request.getId(), request.getSource(), request.masterNodeTimeout(), request.isActive());
listener.onResponse(new PutWatchResponse(indexResponse.getId(), indexResponse.getVersion(), indexResponse.isCreated())); listener.onResponse(new PutWatchResponse(indexResponse.getId(), indexResponse.getVersion(), indexResponse.isCreated()));
} catch (Exception e) { } catch (Exception e) {
listener.onFailure(e); listener.onFailure(e);

View File

@ -13,7 +13,6 @@ import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.watcher.support.Exceptions;
import java.io.IOException; import java.io.IOException;
import java.util.Collection; import java.util.Collection;
@ -56,6 +55,12 @@ public class TriggerService extends AbstractComponent {
} }
} }
/**
* Adds the given job to the trigger service. If there is already a registered job in this service with the
* same job ID, the newly added job will replace the old job (the old job will not be triggered anymore)
*
* @param job The new job
*/
public void add(TriggerEngine.Job job) { public void add(TriggerEngine.Job job) {
engines.get(job.trigger().type()).add(job); engines.get(job.trigger().type()).add(job);
} }

View File

@ -32,9 +32,10 @@ import org.elasticsearch.watcher.input.InputRegistry;
import org.elasticsearch.watcher.input.none.ExecutableNoneInput; import org.elasticsearch.watcher.input.none.ExecutableNoneInput;
import org.elasticsearch.watcher.support.WatcherDateTimeUtils; import org.elasticsearch.watcher.support.WatcherDateTimeUtils;
import org.elasticsearch.watcher.support.clock.Clock; import org.elasticsearch.watcher.support.clock.Clock;
import org.elasticsearch.watcher.support.clock.HaltedClock;
import org.elasticsearch.watcher.support.secret.SecretService; import org.elasticsearch.watcher.support.secret.SecretService;
import org.elasticsearch.watcher.support.secret.SensitiveXContentParser;
import org.elasticsearch.watcher.support.xcontent.WatcherParams; import org.elasticsearch.watcher.support.xcontent.WatcherParams;
import org.elasticsearch.watcher.support.xcontent.WatcherXContentParser;
import org.elasticsearch.watcher.transform.ExecutableTransform; import org.elasticsearch.watcher.transform.ExecutableTransform;
import org.elasticsearch.watcher.transform.TransformRegistry; import org.elasticsearch.watcher.transform.TransformRegistry;
import org.elasticsearch.watcher.trigger.Trigger; import org.elasticsearch.watcher.trigger.Trigger;
@ -126,6 +127,15 @@ public class Watch implements TriggerEngine.Job, ToXContent {
this.version = version; this.version = version;
} }
/**
* Sets the state of this watch to in/active
*
* @return {@code true} if the status of this watch changed, {@code false} otherwise.
*/
public boolean setState(boolean active, DateTime now) {
return status.setActive(active, now);
}
/** /**
* Acks this watch. * Acks this watch.
* *
@ -223,7 +233,18 @@ public class Watch implements TriggerEngine.Job, ToXContent {
} }
public Watch parse(String name, boolean includeStatus, BytesReference source) throws IOException { public Watch parse(String name, boolean includeStatus, BytesReference source) throws IOException {
return parse(name, includeStatus, false, source); return parse(name, includeStatus, false, source, clock.nowUTC());
}
public Watch parse(String name, boolean includeStatus, BytesReference source, DateTime now) throws IOException {
return parse(name, includeStatus, false, source, now);
}
/**
* @see #parseWithSecrets(String, boolean, BytesReference, DateTime)
*/
public Watch parseWithSecrets(String id, boolean includeStatus, BytesReference source) throws IOException {
return parse(id, includeStatus, true, source, clock.nowUTC());
} }
/** /**
@ -238,22 +259,19 @@ public class Watch implements TriggerEngine.Job, ToXContent {
* This method is only called once - when the user adds a new watch. From that moment on, all representations * This method is only called once - when the user adds a new watch. From that moment on, all representations
* of the watch in the system will be use secrets for sensitive data. * of the watch in the system will be use secrets for sensitive data.
* *
* @see org.elasticsearch.watcher.WatcherService#putWatch(String, BytesReference, TimeValue) * @see org.elasticsearch.watcher.WatcherService#putWatch(String, BytesReference, TimeValue, boolean)
*/ */
public Watch parseWithSecrets(String id, boolean includeStatus, BytesReference source) throws IOException { public Watch parseWithSecrets(String id, boolean includeStatus, BytesReference source, DateTime now) throws IOException {
return parse(id, includeStatus, true, source); return parse(id, includeStatus, true, source, now);
} }
private Watch parse(String id, boolean includeStatus, boolean withSecrets, BytesReference source) throws IOException { private Watch parse(String id, boolean includeStatus, boolean withSecrets, BytesReference source, DateTime now) throws IOException {
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
logger.trace("parsing watch [{}] ", source.toUtf8()); logger.trace("parsing watch [{}] ", source.toUtf8());
} }
XContentParser parser = null; XContentParser parser = null;
try { try {
parser = XContentHelper.createParser(source); parser = new WatcherXContentParser(XContentHelper.createParser(source), new HaltedClock(now), withSecrets ? secretService : null);
if (withSecrets) {
parser = new SensitiveXContentParser(parser, secretService);
}
parser.nextToken(); parser.nextToken();
return parse(id, includeStatus, parser); return parse(id, includeStatus, parser);
} catch (IOException ioe) { } catch (IOException ioe) {
@ -326,11 +344,11 @@ public class Watch implements TriggerEngine.Job, ToXContent {
} else { } else {
// we need to create the initial statuses for the actions // we need to create the initial statuses for the actions
ImmutableMap.Builder<String, ActionStatus> actionsStatuses = ImmutableMap.builder(); ImmutableMap.Builder<String, ActionStatus> actionsStatuses = ImmutableMap.builder();
DateTime now = clock.now(DateTimeZone.UTC); DateTime now = WatcherXContentParser.clock(parser).nowUTC();
for (ActionWrapper action : actions) { for (ActionWrapper action : actions) {
actionsStatuses.put(action.id(), new ActionStatus(now)); actionsStatuses.put(action.id(), new ActionStatus(now));
} }
status = new WatchStatus(actionsStatuses.build()); status = new WatchStatus(WatcherXContentParser.clock(parser).nowUTC(), actionsStatuses.build());
} }
return new Watch(id, trigger, input, condition, transform, throttlePeriod, actions, metatdata, status); return new Watch(id, trigger, input, condition, transform, throttlePeriod, actions, metatdata, status);

View File

@ -7,6 +7,7 @@ package org.elasticsearch.watcher.watch;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import org.elasticsearch.ElasticsearchParseException; import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.Version;
import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.ParseField; import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.ParseFieldMatcher; import org.elasticsearch.common.ParseFieldMatcher;
@ -19,6 +20,8 @@ import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.watcher.actions.Action; import org.elasticsearch.watcher.actions.Action;
import org.elasticsearch.watcher.actions.ActionStatus; import org.elasticsearch.watcher.actions.ActionStatus;
import org.elasticsearch.watcher.actions.throttler.AckThrottler; import org.elasticsearch.watcher.actions.throttler.AckThrottler;
import org.elasticsearch.watcher.support.clock.SystemClock;
import org.elasticsearch.watcher.support.xcontent.WatcherXContentParser;
import org.joda.time.DateTime; import org.joda.time.DateTime;
import org.joda.time.DateTimeZone; import org.joda.time.DateTimeZone;
@ -36,6 +39,8 @@ public class WatchStatus implements ToXContent, Streamable {
private transient long version; private transient long version;
private State state;
private @Nullable DateTime lastChecked; private @Nullable DateTime lastChecked;
private @Nullable DateTime lastMetCondition; private @Nullable DateTime lastMetCondition;
private ImmutableMap<String, ActionStatus> actions; private ImmutableMap<String, ActionStatus> actions;
@ -46,19 +51,24 @@ public class WatchStatus implements ToXContent, Streamable {
private WatchStatus() { private WatchStatus() {
} }
public WatchStatus(ImmutableMap<String, ActionStatus> actions) { public WatchStatus(DateTime now, ImmutableMap<String, ActionStatus> actions) {
this(-1, null, null, actions); this(-1, new State(true, now), null, null, actions);
} }
public WatchStatus(WatchStatus other) { public WatchStatus(WatchStatus other) {
this(other.version, other.lastChecked, other.lastMetCondition, other.actions); this(other.version, other.state, other.lastChecked, other.lastMetCondition, other.actions);
} }
private WatchStatus(long version, DateTime lastChecked, DateTime lastMetCondition, ImmutableMap<String, ActionStatus> actions) { private WatchStatus(long version, State state, DateTime lastChecked, DateTime lastMetCondition, ImmutableMap<String, ActionStatus> actions) {
this.version = version; this.version = version;
this.lastChecked = lastChecked; this.lastChecked = lastChecked;
this.lastMetCondition = lastMetCondition; this.lastMetCondition = lastMetCondition;
this.actions = actions; this.actions = actions;
this.state = state;
}
public State state() {
return state;
} }
public long version() { public long version() {
@ -186,6 +196,15 @@ public class WatchStatus implements ToXContent, Streamable {
return changed; return changed;
} }
boolean setActive(boolean active, DateTime now) {
boolean change = this.state.active != active;
if (change) {
this.dirty = true;
this.state = new State(active, now);
}
return change;
}
@Override @Override
public void writeTo(StreamOutput out) throws IOException { public void writeTo(StreamOutput out) throws IOException {
out.writeLong(version); out.writeLong(version);
@ -196,6 +215,10 @@ public class WatchStatus implements ToXContent, Streamable {
out.writeString(entry.getKey()); out.writeString(entry.getKey());
ActionStatus.writeTo(entry.getValue(), out); ActionStatus.writeTo(entry.getValue(), out);
} }
if (out.getVersion().onOrAfter(Version.V_2_0_0)) {
out.writeBoolean(state.active);
writeDate(out, state.timestamp);
}
} }
@Override @Override
@ -209,6 +232,11 @@ public class WatchStatus implements ToXContent, Streamable {
builder.put(in.readString(), ActionStatus.readFrom(in)); builder.put(in.readString(), ActionStatus.readFrom(in));
} }
actions = builder.build(); actions = builder.build();
if (in.getVersion().onOrAfter(Version.V_2_0_0)) {
state = new State(in.readBoolean(), readDate(in, DateTimeZone.UTC));
} else {
state = new State(true, new DateTime(SystemClock.INSTANCE.millis(), DateTimeZone.UTC));
}
} }
public static WatchStatus read(StreamInput in) throws IOException { public static WatchStatus read(StreamInput in) throws IOException {
@ -223,6 +251,7 @@ public class WatchStatus implements ToXContent, Streamable {
if (params.paramAsBoolean(INCLUDE_VERSION_KEY, false)) { if (params.paramAsBoolean(INCLUDE_VERSION_KEY, false)) {
builder.field(Field.VERSION.getPreferredName(), version); builder.field(Field.VERSION.getPreferredName(), version);
} }
builder.field(Field.STATE.getPreferredName(), state, params);
if (lastChecked != null) { if (lastChecked != null) {
builder.field(Field.LAST_CHECKED.getPreferredName(), lastChecked); builder.field(Field.LAST_CHECKED.getPreferredName(), lastChecked);
} }
@ -241,6 +270,7 @@ public class WatchStatus implements ToXContent, Streamable {
public static WatchStatus parse(String watchId, XContentParser parser) throws IOException { public static WatchStatus parse(String watchId, XContentParser parser) throws IOException {
State state = null;
DateTime lastChecked = null; DateTime lastChecked = null;
DateTime lastMetCondition = null; DateTime lastMetCondition = null;
ImmutableMap.Builder<String, ActionStatus> actions = null; ImmutableMap.Builder<String, ActionStatus> actions = null;
@ -250,6 +280,12 @@ public class WatchStatus implements ToXContent, Streamable {
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) { if (token == XContentParser.Token.FIELD_NAME) {
currentFieldName = parser.currentName(); currentFieldName = parser.currentName();
} else if (ParseFieldMatcher.STRICT.match(currentFieldName, Field.STATE)) {
try {
state = State.parse(parser);
} catch (ElasticsearchParseException e) {
throw new ElasticsearchParseException("could not parse watch status for [{}]. failed to parse field [{}]", e, watchId, currentFieldName);
}
} else if (ParseFieldMatcher.STRICT.match(currentFieldName, Field.LAST_CHECKED)) { } else if (ParseFieldMatcher.STRICT.match(currentFieldName, Field.LAST_CHECKED)) {
if (token.isValue()) { if (token.isValue()) {
lastChecked = parseDate(currentFieldName, parser, DateTimeZone.UTC); lastChecked = parseDate(currentFieldName, parser, DateTimeZone.UTC);
@ -279,15 +315,70 @@ public class WatchStatus implements ToXContent, Streamable {
} }
} }
return new WatchStatus(-1, lastChecked, lastMetCondition, actions.build()); // if the watch status doesn't have a state, we assume active
// this is to support old watches that weren't upgraded yet to
// contain the state
if (state == null) {
state = new State(true, WatcherXContentParser.clock(parser).nowUTC());
}
return new WatchStatus(-1, state, lastChecked, lastMetCondition, actions.build());
} }
public static class State implements ToXContent {
final boolean active;
final DateTime timestamp;
public State(boolean active, DateTime timestamp) {
this.active = active;
this.timestamp = timestamp;
}
public boolean isActive() {
return active;
}
public DateTime getTimestamp() {
return timestamp;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(Field.ACTIVE.getPreferredName(), active);
writeDate(Field.TIMESTAMP.getPreferredName(), builder, timestamp);
return builder.endObject();
}
public static State parse(XContentParser parser) throws IOException {
if (parser.currentToken() != XContentParser.Token.START_OBJECT) {
throw new ElasticsearchParseException("expected an object but found [{}] instead", parser.currentToken());
}
boolean active = true;
DateTime timestamp = SystemClock.INSTANCE.nowUTC();
String currentFieldName = null;
XContentParser.Token token;
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
currentFieldName = parser.currentName();
} else if (ParseFieldMatcher.STRICT.match(currentFieldName, Field.ACTIVE)) {
active = parser.booleanValue();
} else if (ParseFieldMatcher.STRICT.match(currentFieldName, Field.TIMESTAMP)) {
timestamp = parseDate(currentFieldName, parser, DateTimeZone.UTC);
}
}
return new State(active, timestamp);
}
}
interface Field { interface Field {
ParseField VERSION = new ParseField("version"); ParseField VERSION = new ParseField("version");
ParseField STATE = new ParseField("state");
ParseField ACTIVE = new ParseField("active");
ParseField TIMESTAMP = new ParseField("timestamp");
ParseField LAST_CHECKED = new ParseField("last_checked"); ParseField LAST_CHECKED = new ParseField("last_checked");
ParseField LAST_MET_CONDITION = new ParseField("last_met_condition"); ParseField LAST_MET_CONDITION = new ParseField("last_met_condition");
ParseField ACTIONS = new ParseField("actions"); ParseField ACTIONS = new ParseField("actions");
} }
} }

View File

@ -34,6 +34,9 @@ import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.watcher.support.init.proxy.ClientProxy; import org.elasticsearch.watcher.support.init.proxy.ClientProxy;
import java.io.IOException; import java.io.IOException;
import java.util.Collection;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
@ -182,10 +185,24 @@ public class WatchStore extends AbstractComponent {
return new WatchDelete(response); return new WatchDelete(response);
} }
public ConcurrentMap<String, Watch> watches() { public Collection<Watch> watches() {
return watches.values();
}
public Collection<Watch> activeWatches() {
Set<Watch> watches = new HashSet<>();
for (Watch watch : watches()) {
if (watch.status().state().isActive()) {
watches.add(watch);
}
}
return watches; return watches;
} }
public int watchCount() {
return watches.size();
}
IndexRequest createIndexRequest(String id, BytesReference source, long version) { IndexRequest createIndexRequest(String id, BytesReference source, long version) {
IndexRequest indexRequest = new IndexRequest(INDEX, DOC_TYPE, id); IndexRequest indexRequest = new IndexRequest(INDEX, DOC_TYPE, id);
// TODO (2.0 upgrade): move back to BytesReference instead of dealing with the array directly // TODO (2.0 upgrade): move back to BytesReference instead of dealing with the array directly

View File

@ -3,7 +3,7 @@
* or more contributor license agreements. Licensed under the Elastic License; * or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License. * you may not use this file except in compliance with the Elastic License.
*/ */
package org.elasticsearch.watcher.watch; package org.elasticsearch.watcher;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import org.elasticsearch.ElasticsearchTimeoutException; import org.elasticsearch.ElasticsearchTimeoutException;
@ -15,8 +15,6 @@ import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.watcher.WatcherService;
import org.elasticsearch.watcher.WatcherState;
import org.elasticsearch.watcher.actions.ActionStatus; import org.elasticsearch.watcher.actions.ActionStatus;
import org.elasticsearch.watcher.execution.ExecutionService; import org.elasticsearch.watcher.execution.ExecutionService;
import org.elasticsearch.watcher.support.WatcherIndexTemplateRegistry; import org.elasticsearch.watcher.support.WatcherIndexTemplateRegistry;
@ -25,6 +23,10 @@ import org.elasticsearch.watcher.support.clock.SystemClock;
import org.elasticsearch.watcher.trigger.Trigger; import org.elasticsearch.watcher.trigger.Trigger;
import org.elasticsearch.watcher.trigger.TriggerEngine; import org.elasticsearch.watcher.trigger.TriggerEngine;
import org.elasticsearch.watcher.trigger.TriggerService; import org.elasticsearch.watcher.trigger.TriggerService;
import org.elasticsearch.watcher.watch.Watch;
import org.elasticsearch.watcher.watch.WatchLockService;
import org.elasticsearch.watcher.watch.WatchStatus;
import org.elasticsearch.watcher.watch.WatchStore;
import org.joda.time.DateTime; import org.joda.time.DateTime;
import org.joda.time.DateTimeZone; import org.joda.time.DateTimeZone;
import org.junit.Before; import org.junit.Before;
@ -39,8 +41,9 @@ import static org.mockito.Matchers.any;
import static org.mockito.Mockito.*; import static org.mockito.Mockito.*;
/** /**
*
*/ */
public class WatchServiceTests extends ESTestCase { public class WatcherServiceTests extends ESTestCase {
private TriggerService triggerService; private TriggerService triggerService;
private WatchStore watchStore; private WatchStore watchStore;
@ -67,53 +70,86 @@ public class WatchServiceTests extends ESTestCase {
@Test @Test
public void testPutWatch() throws Exception { public void testPutWatch() throws Exception {
boolean activeByDefault = randomBoolean();
IndexResponse indexResponse = mock(IndexResponse.class); IndexResponse indexResponse = mock(IndexResponse.class);
Watch watch = mock(Watch.class); Watch newWatch = mock(Watch.class);
WatchStatus status = mock(WatchStatus.class);
when(status.state()).thenReturn(new WatchStatus.State(activeByDefault, clock.nowUTC()));
when(newWatch.status()).thenReturn(status);
WatchStore.WatchPut watchPut = mock(WatchStore.WatchPut.class); WatchStore.WatchPut watchPut = mock(WatchStore.WatchPut.class);
when(watchPut.indexResponse()).thenReturn(indexResponse); when(watchPut.indexResponse()).thenReturn(indexResponse);
when(watchPut.current()).thenReturn(watch); when(watchPut.current()).thenReturn(newWatch);
TimeValue timeout = TimeValue.timeValueSeconds(5); TimeValue timeout = TimeValue.timeValueSeconds(5);
WatchLockService.Lock lock = mock(WatchLockService.Lock.class); WatchLockService.Lock lock = mock(WatchLockService.Lock.class);
when(watchLockService.tryAcquire(any(String.class), eq(timeout))).thenReturn(lock); when(watchLockService.tryAcquire(any(String.class), eq(timeout))).thenReturn(lock);
when(watchParser.parseWithSecrets(any(String.class), eq(false), any(BytesReference.class))).thenReturn(watch); when(watchParser.parseWithSecrets(any(String.class), eq(false), any(BytesReference.class), any(DateTime.class))).thenReturn(newWatch);
when(watchStore.put(watch)).thenReturn(watchPut); when(watchStore.put(newWatch)).thenReturn(watchPut);
IndexResponse response = watcherService.putWatch("_id", new BytesArray("{}"), timeout); IndexResponse response = watcherService.putWatch("_id", new BytesArray("{}"), timeout, activeByDefault);
assertThat(response, sameInstance(indexResponse)); assertThat(response, sameInstance(indexResponse));
verify(triggerService, times(1)).add(any(TriggerEngine.Job.class)); verify(newWatch, times(1)).setState(activeByDefault, clock.nowUTC());
if (activeByDefault) {
verify(triggerService, times(1)).add(any(TriggerEngine.Job.class));
} else {
verifyZeroInteractions(triggerService);
}
} }
@Test(expected = ElasticsearchTimeoutException.class) @Test(expected = ElasticsearchTimeoutException.class)
public void testPutWatch_Timeout() throws Exception { public void testPutWatch_Timeout() throws Exception {
TimeValue timeout = TimeValue.timeValueSeconds(5); TimeValue timeout = TimeValue.timeValueSeconds(5);
when(watchLockService.tryAcquire("_id", timeout)).thenReturn(null); when(watchLockService.tryAcquire("_id", timeout)).thenReturn(null);
watcherService.putWatch("_id", new BytesArray("{}"), timeout); watcherService.putWatch("_id", new BytesArray("{}"), timeout, randomBoolean());
} }
@Test @Test
public void testPutWatch_NotSchedule() throws Exception { public void testPutWatch_DifferentActiveStates() throws Exception {
Trigger trigger = mock(Trigger.class); Trigger trigger = mock(Trigger.class);
IndexResponse indexResponse = mock(IndexResponse.class); IndexResponse indexResponse = mock(IndexResponse.class);
Watch watch = mock(Watch.class); Watch watch = mock(Watch.class);
when(watch.id()).thenReturn("_id");
WatchStatus status = mock(WatchStatus.class);
boolean active = randomBoolean();
when(status.state()).thenReturn(new WatchStatus.State(active, clock.nowUTC()));
when(watch.status()).thenReturn(status);
when(watch.trigger()).thenReturn(trigger); when(watch.trigger()).thenReturn(trigger);
WatchStore.WatchPut watchPut = mock(WatchStore.WatchPut.class); WatchStore.WatchPut watchPut = mock(WatchStore.WatchPut.class);
when(watchPut.indexResponse()).thenReturn(indexResponse); when(watchPut.indexResponse()).thenReturn(indexResponse);
when(watchPut.current()).thenReturn(watch); when(watchPut.current()).thenReturn(watch);
Watch previousWatch = mock(Watch.class); Watch previousWatch = mock(Watch.class);
WatchStatus previousStatus = mock(WatchStatus.class);
boolean prevActive = randomBoolean();
when(previousStatus.state()).thenReturn(new WatchStatus.State(prevActive, clock.nowUTC()));
when(previousWatch.status()).thenReturn(previousStatus);
when(previousWatch.trigger()).thenReturn(trigger); when(previousWatch.trigger()).thenReturn(trigger);
when(watchPut.previous()).thenReturn(previousWatch); when(watchPut.previous()).thenReturn(previousWatch);
TimeValue timeout = TimeValue.timeValueSeconds(5); TimeValue timeout = TimeValue.timeValueSeconds(5);
WatchLockService.Lock lock = mock(WatchLockService.Lock.class); WatchLockService.Lock lock = mock(WatchLockService.Lock.class);
when(watchLockService.tryAcquire(any(String.class), eq(timeout))).thenReturn(lock); when(watchLockService.tryAcquire(any(String.class), eq(timeout))).thenReturn(lock);
when(watchParser.parseWithSecrets(any(String.class), eq(false), any(BytesReference.class))).thenReturn(watch); when(watchParser.parseWithSecrets(any(String.class), eq(false), any(BytesReference.class), eq(clock.nowUTC()))).thenReturn(watch);
when(watchStore.put(watch)).thenReturn(watchPut); when(watchStore.put(watch)).thenReturn(watchPut);
IndexResponse response = watcherService.putWatch("_id", new BytesArray("{}"), timeout);
IndexResponse response = watcherService.putWatch("_id", new BytesArray("{}"), timeout, active);
assertThat(response, sameInstance(indexResponse)); assertThat(response, sameInstance(indexResponse));
verifyZeroInteractions(triggerService); if (!active) {
// we should always remove the watch from the trigger service, just to be safe
verify(triggerService, times(1)).remove("_id");
} else if (prevActive) {
// if both the new watch and the prev one are active, we should do nothing
verifyZeroInteractions(triggerService);
} else {
// if the prev watch was not active and the new one is active, we should add the watch
verify(triggerService, times(1)).add(watch);
}
} }
@Test @Test
@ -158,7 +194,6 @@ public class WatchServiceTests extends ESTestCase {
verify(triggerService, times(1)).remove("_id"); verify(triggerService, times(1)).remove("_id");
} }
@Test @Test
public void testDeleteWatch_NotFound() throws Exception { public void testDeleteWatch_NotFound() throws Exception {
TimeValue timeout = TimeValue.timeValueSeconds(5); TimeValue timeout = TimeValue.timeValueSeconds(5);
@ -186,7 +221,7 @@ public class WatchServiceTests extends ESTestCase {
when(watchLockService.tryAcquire("_id", timeout)).thenReturn(lock); when(watchLockService.tryAcquire("_id", timeout)).thenReturn(lock);
Watch watch = mock(Watch.class); Watch watch = mock(Watch.class);
when(watch.ack(now, "_all")).thenReturn(true); when(watch.ack(now, "_all")).thenReturn(true);
WatchStatus status = new WatchStatus(ImmutableMap.<String, ActionStatus>of()); WatchStatus status = new WatchStatus(now, ImmutableMap.<String, ActionStatus>of());
when(watch.status()).thenReturn(status); when(watch.status()).thenReturn(status);
when(watchStore.get("_id")).thenReturn(watch); when(watchStore.get("_id")).thenReturn(watch);
@ -196,6 +231,146 @@ public class WatchServiceTests extends ESTestCase {
verify(watchStore, times(1)).updateStatus(watch); verify(watchStore, times(1)).updateStatus(watch);
} }
@Test
public void testActivate() throws Exception {
TimeValue timeout = TimeValue.timeValueSeconds(randomIntBetween(1, 30));
WatcherService service = spy(watcherService);
WatchStatus expectedStatus = mock(WatchStatus.class);
doReturn(expectedStatus).when(service).setWatchState("_id", true, timeout);
WatchStatus actualStatus = service.activateWatch("_id", timeout);
assertThat(actualStatus, sameInstance(expectedStatus));
verify(service, times(1)).setWatchState("_id", true, timeout);
}
@Test
public void testDeactivate() throws Exception {
TimeValue timeout = TimeValue.timeValueSeconds(randomIntBetween(1, 30));
WatcherService service = spy(watcherService);
WatchStatus expectedStatus = mock(WatchStatus.class);
doReturn(expectedStatus).when(service).setWatchState("_id", false, timeout);
WatchStatus actualStatus = service.deactivateWatch("_id", timeout);
assertThat(actualStatus, sameInstance(expectedStatus));
verify(service, times(1)).setWatchState("_id", false, timeout);
}
@Test
public void testSetWatchState_SetActiveOnCurrentlyActive() throws Exception {
// trying to activate a watch that is already active:
// - the watch status should not change
// - the watch doesn't need to be updated in the store
// - the watch should not be removed or re-added to the trigger service
DateTime now = new DateTime(DateTimeZone.UTC);
clock.setTime(now);
TimeValue timeout = TimeValue.timeValueSeconds(5);
WatchLockService.Lock lock = mock(WatchLockService.Lock.class);
when(watchLockService.tryAcquire("_id", timeout)).thenReturn(lock);
Watch watch = mock(Watch.class);
WatchStatus status = new WatchStatus(now, ImmutableMap.<String, ActionStatus>of());
when(watch.status()).thenReturn(status);
when(watch.setState(true, now)).thenReturn(false);
when(watchStore.get("_id")).thenReturn(watch);
WatchStatus result = watcherService.setWatchState("_id", true, timeout);
assertThat(result, not(sameInstance(status)));
verifyZeroInteractions(triggerService);
verify(watchStore, never()).updateStatus(watch);
}
@Test
public void testSetWatchState_SetActiveOnCurrentlyInactive() throws Exception {
// activating a watch that is currently inactive:
// - the watch status should be updated
// - the watch needs to be updated in the store
// - the watch should be re-added to the trigger service (the assumption is that it's not there)
DateTime now = new DateTime(DateTimeZone.UTC);
clock.setTime(now);
TimeValue timeout = TimeValue.timeValueSeconds(5);
WatchLockService.Lock lock = mock(WatchLockService.Lock.class);
when(watchLockService.tryAcquire("_id", timeout)).thenReturn(lock);
Watch watch = mock(Watch.class);
WatchStatus status = new WatchStatus(now, ImmutableMap.<String, ActionStatus>of());
when(watch.status()).thenReturn(status);
when(watch.setState(true, now)).thenReturn(true);
when(watchStore.get("_id")).thenReturn(watch);
WatchStatus result = watcherService.setWatchState("_id", true, timeout);
assertThat(result, not(sameInstance(status)));
verify(triggerService, times(1)).add(watch);
verify(watchStore, times(1)).updateStatus(watch);
}
@Test
public void testSetWatchState_SetInactiveOnCurrentlyActive() throws Exception {
// deactivating a watch that is currently active:
// - the watch status should change
// - the watch needs to be updated in the store
// - the watch should be removed from the trigger service
DateTime now = new DateTime(DateTimeZone.UTC);
clock.setTime(now);
TimeValue timeout = TimeValue.timeValueSeconds(5);
WatchLockService.Lock lock = mock(WatchLockService.Lock.class);
when(watchLockService.tryAcquire("_id", timeout)).thenReturn(lock);
Watch watch = mock(Watch.class);
when(watch.id()).thenReturn("_id");
WatchStatus status = new WatchStatus(now, ImmutableMap.<String, ActionStatus>of());
when(watch.status()).thenReturn(status);
when(watch.setState(false, now)).thenReturn(true);
when(watchStore.get("_id")).thenReturn(watch);
WatchStatus result = watcherService.setWatchState("_id", false, timeout);
assertThat(result, not(sameInstance(status)));
verify(triggerService, times(1)).remove("_id");
verify(watchStore, times(1)).updateStatus(watch);
}
@Test
public void testSetWatchState_SetInactiveOnCurrentlyInactive() throws Exception {
// trying to deactivate a watch that is currently inactive:
// - the watch status should not be updated
// - the watch should not be updated in the store
// - the watch should be re-added or removed to/from the trigger service
DateTime now = new DateTime(DateTimeZone.UTC);
clock.setTime(now);
TimeValue timeout = TimeValue.timeValueSeconds(5);
WatchLockService.Lock lock = mock(WatchLockService.Lock.class);
when(watchLockService.tryAcquire("_id", timeout)).thenReturn(lock);
Watch watch = mock(Watch.class);
when(watch.id()).thenReturn("_id");
WatchStatus status = new WatchStatus(now, ImmutableMap.<String, ActionStatus>of());
when(watch.status()).thenReturn(status);
when(watch.setState(false, now)).thenReturn(false);
when(watchStore.get("_id")).thenReturn(watch);
WatchStatus result = watcherService.setWatchState("_id", false, timeout);
assertThat(result, not(sameInstance(status)));
verifyZeroInteractions(triggerService);
verify(watchStore, never()).updateStatus(watch);
}
@Test(expected = ElasticsearchTimeoutException.class) @Test(expected = ElasticsearchTimeoutException.class)
public void testAckWatch_Timeout() throws Exception { public void testAckWatch_Timeout() throws Exception {
TimeValue timeout = TimeValue.timeValueSeconds(5); TimeValue timeout = TimeValue.timeValueSeconds(5);
@ -211,7 +386,7 @@ public class WatchServiceTests extends ESTestCase {
when(watchLockService.tryAcquire("_id", timeout)).thenReturn(lock); when(watchLockService.tryAcquire("_id", timeout)).thenReturn(lock);
Watch watch = mock(Watch.class); Watch watch = mock(Watch.class);
when(watch.ack(now)).thenReturn(false); when(watch.ack(now)).thenReturn(false);
WatchStatus status = new WatchStatus(ImmutableMap.<String, ActionStatus>of()); WatchStatus status = new WatchStatus(now, ImmutableMap.<String, ActionStatus>of());
when(watch.status()).thenReturn(status); when(watch.status()).thenReturn(status);
when(watchStore.get("_id")).thenReturn(watch); when(watchStore.get("_id")).thenReturn(watch);

View File

@ -5,7 +5,7 @@
*/ */
package org.elasticsearch.watcher.actions.email.service; package org.elasticsearch.watcher.actions.email.service;
import org.apache.lucene.util.LuceneTestCase; import org.apache.lucene.util.LuceneTestCase.AwaitsFix;
import org.elasticsearch.common.cli.Terminal; import org.elasticsearch.common.cli.Terminal;
import org.elasticsearch.common.inject.Provider; import org.elasticsearch.common.inject.Provider;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
@ -18,7 +18,7 @@ import org.elasticsearch.watcher.support.secret.SecretService;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
@LuceneTestCase.AwaitsFix(bugUrl = "https://github.com/elastic/x-plugins/issues/379") @AwaitsFix(bugUrl = "https://github.com/elastic/x-plugins/issues/379")
public class ManualPublicSmtpServersTester { public class ManualPublicSmtpServersTester {
private static final Terminal terminal = Terminal.DEFAULT; private static final Terminal terminal = Terminal.DEFAULT;

View File

@ -127,7 +127,7 @@ public class ExecutionServiceTests extends ESTestCase {
ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionTransform, action); ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionTransform, action);
ExecutableActions actions = new ExecutableActions(Arrays.asList(actionWrapper)); ExecutableActions actions = new ExecutableActions(Arrays.asList(actionWrapper));
WatchStatus watchStatus = new WatchStatus(ImmutableMap.of("_action", new ActionStatus(clock.nowUTC()))); WatchStatus watchStatus = new WatchStatus(clock.nowUTC(), ImmutableMap.of("_action", new ActionStatus(clock.nowUTC())));
when(watch.input()).thenReturn(input); when(watch.input()).thenReturn(input);
when(watch.condition()).thenReturn(condition); when(watch.condition()).thenReturn(condition);
@ -201,7 +201,7 @@ public class ExecutionServiceTests extends ESTestCase {
ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionTransform, action); ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionTransform, action);
ExecutableActions actions = new ExecutableActions(Arrays.asList(actionWrapper)); ExecutableActions actions = new ExecutableActions(Arrays.asList(actionWrapper));
WatchStatus watchStatus = new WatchStatus(ImmutableMap.of("_action", new ActionStatus(clock.nowUTC()))); WatchStatus watchStatus = new WatchStatus(clock.nowUTC(), ImmutableMap.of("_action", new ActionStatus(clock.nowUTC())));
when(watch.input()).thenReturn(input); when(watch.input()).thenReturn(input);
when(watch.condition()).thenReturn(condition); when(watch.condition()).thenReturn(condition);
@ -270,7 +270,7 @@ public class ExecutionServiceTests extends ESTestCase {
ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionTransform, action); ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionTransform, action);
ExecutableActions actions = new ExecutableActions(Arrays.asList(actionWrapper)); ExecutableActions actions = new ExecutableActions(Arrays.asList(actionWrapper));
WatchStatus watchStatus = new WatchStatus(ImmutableMap.of("_action", new ActionStatus(clock.nowUTC()))); WatchStatus watchStatus = new WatchStatus(clock.nowUTC(), ImmutableMap.of("_action", new ActionStatus(clock.nowUTC())));
when(watch.input()).thenReturn(input); when(watch.input()).thenReturn(input);
when(watch.condition()).thenReturn(condition); when(watch.condition()).thenReturn(condition);
@ -338,7 +338,7 @@ public class ExecutionServiceTests extends ESTestCase {
ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionTransform, action); ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionTransform, action);
ExecutableActions actions = new ExecutableActions(Arrays.asList(actionWrapper)); ExecutableActions actions = new ExecutableActions(Arrays.asList(actionWrapper));
WatchStatus watchStatus = new WatchStatus(ImmutableMap.of("_action", new ActionStatus(clock.nowUTC()))); WatchStatus watchStatus = new WatchStatus(clock.nowUTC(), ImmutableMap.of("_action", new ActionStatus(clock.nowUTC())));
when(watch.input()).thenReturn(input); when(watch.input()).thenReturn(input);
when(watch.condition()).thenReturn(condition); when(watch.condition()).thenReturn(condition);
@ -408,7 +408,7 @@ public class ExecutionServiceTests extends ESTestCase {
ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionTransform, action); ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionTransform, action);
ExecutableActions actions = new ExecutableActions(Arrays.asList(actionWrapper)); ExecutableActions actions = new ExecutableActions(Arrays.asList(actionWrapper));
WatchStatus watchStatus = new WatchStatus(ImmutableMap.of("_action", new ActionStatus(clock.nowUTC()))); WatchStatus watchStatus = new WatchStatus(clock.nowUTC(), ImmutableMap.of("_action", new ActionStatus(clock.nowUTC())));
when(watch.input()).thenReturn(input); when(watch.input()).thenReturn(input);
when(watch.condition()).thenReturn(condition); when(watch.condition()).thenReturn(condition);
@ -473,7 +473,7 @@ public class ExecutionServiceTests extends ESTestCase {
ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionTransform, action); ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionTransform, action);
ExecutableActions actions = new ExecutableActions(Arrays.asList(actionWrapper)); ExecutableActions actions = new ExecutableActions(Arrays.asList(actionWrapper));
WatchStatus watchStatus = new WatchStatus(ImmutableMap.of("_action", new ActionStatus(now))); WatchStatus watchStatus = new WatchStatus(clock.nowUTC(), ImmutableMap.of("_action", new ActionStatus(now)));
when(watch.input()).thenReturn(input); when(watch.input()).thenReturn(input);
when(watch.condition()).thenReturn(condition); when(watch.condition()).thenReturn(condition);
@ -519,7 +519,7 @@ public class ExecutionServiceTests extends ESTestCase {
ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, transform, action); ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, transform, action);
ExecutableActions actions = new ExecutableActions(Arrays.asList(actionWrapper)); ExecutableActions actions = new ExecutableActions(Arrays.asList(actionWrapper));
WatchStatus watchStatus = new WatchStatus(ImmutableMap.of("_action", new ActionStatus(now))); WatchStatus watchStatus = new WatchStatus(clock.nowUTC(), ImmutableMap.of("_action", new ActionStatus(now)));
when(watch.input()).thenReturn(input); when(watch.input()).thenReturn(input);
when(watch.condition()).thenReturn(condition); when(watch.condition()).thenReturn(condition);
@ -565,7 +565,7 @@ public class ExecutionServiceTests extends ESTestCase {
ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionTransform, action); ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionTransform, action);
ExecutableActions actions = new ExecutableActions(Arrays.asList(actionWrapper)); ExecutableActions actions = new ExecutableActions(Arrays.asList(actionWrapper));
WatchStatus watchStatus = new WatchStatus(ImmutableMap.of("_action", new ActionStatus(now))); WatchStatus watchStatus = new WatchStatus(clock.nowUTC(), ImmutableMap.of("_action", new ActionStatus(now)));
when(watch.input()).thenReturn(input); when(watch.input()).thenReturn(input);
when(watch.condition()).thenReturn(condition); when(watch.condition()).thenReturn(condition);

View File

@ -125,7 +125,7 @@ public class HttpInputTests extends ESTestCase {
null, null,
new ExecutableActions(new ArrayList<ActionWrapper>()), new ExecutableActions(new ArrayList<ActionWrapper>()),
null, null,
new WatchStatus(ImmutableMap.<String, ActionStatus>of())); new WatchStatus(new DateTime(0, UTC), ImmutableMap.<String, ActionStatus>of()));
WatchExecutionContext ctx = new TriggeredExecutionContext(watch, WatchExecutionContext ctx = new TriggeredExecutionContext(watch,
new DateTime(0, UTC), new DateTime(0, UTC),
new ScheduleTriggerEvent(watch.id(), new DateTime(0, UTC), new DateTime(0, UTC)), new ScheduleTriggerEvent(watch.id(), new DateTime(0, UTC), new DateTime(0, UTC)),
@ -156,7 +156,7 @@ public class HttpInputTests extends ESTestCase {
null, null,
new ExecutableActions(new ArrayList<ActionWrapper>()), new ExecutableActions(new ArrayList<ActionWrapper>()),
null, null,
new WatchStatus(ImmutableMap.<String, ActionStatus>of())); new WatchStatus(new DateTime(0, UTC), ImmutableMap.<String, ActionStatus>of()));
WatchExecutionContext ctx = new TriggeredExecutionContext(watch, WatchExecutionContext ctx = new TriggeredExecutionContext(watch,
new DateTime(0, UTC), new DateTime(0, UTC),
new ScheduleTriggerEvent(watch.id(), new DateTime(0, UTC), new DateTime(0, UTC)), new ScheduleTriggerEvent(watch.id(), new DateTime(0, UTC), new DateTime(0, UTC)),

View File

@ -124,7 +124,7 @@ public class SearchInputTests extends ESIntegTestCase {
null, null,
new ExecutableActions(new ArrayList<ActionWrapper>()), new ExecutableActions(new ArrayList<ActionWrapper>()),
null, null,
new WatchStatus(ImmutableMap.<String, ActionStatus>of())), new WatchStatus(new DateTime(0, UTC), ImmutableMap.<String, ActionStatus>of())),
new DateTime(0, UTC), new DateTime(0, UTC),
new ScheduleTriggerEvent("test-watch", new DateTime(0, UTC), new DateTime(0, UTC)), new ScheduleTriggerEvent("test-watch", new DateTime(0, UTC), new DateTime(0, UTC)),
timeValueSeconds(5)); timeValueSeconds(5));
@ -231,7 +231,7 @@ public class SearchInputTests extends ESIntegTestCase {
null, null,
new ExecutableActions(new ArrayList<ActionWrapper>()), new ExecutableActions(new ArrayList<ActionWrapper>()),
null, null,
new WatchStatus(ImmutableMap.<String, ActionStatus>of())), new WatchStatus(new DateTime(0, UTC), ImmutableMap.<String, ActionStatus>of())),
new DateTime(0, UTC), new DateTime(0, UTC),
new ScheduleTriggerEvent("test-watch", new DateTime(0, UTC), new DateTime(0, UTC)), new ScheduleTriggerEvent("test-watch", new DateTime(0, UTC), new DateTime(0, UTC)),
timeValueSeconds(5)); timeValueSeconds(5));
@ -343,7 +343,7 @@ public class SearchInputTests extends ESIntegTestCase {
null, null,
new ExecutableActions(new ArrayList<ActionWrapper>()), new ExecutableActions(new ArrayList<ActionWrapper>()),
null, null,
new WatchStatus(ImmutableMap.<String, ActionStatus>of())), new WatchStatus(new DateTime(50000, UTC), ImmutableMap.<String, ActionStatus>of())),
new DateTime(60000, UTC), new DateTime(60000, UTC),
new ScheduleTriggerEvent("test-watch", new DateTime(60000, UTC), new DateTime(60000, UTC)), new ScheduleTriggerEvent("test-watch", new DateTime(60000, UTC), new DateTime(60000, UTC)),
timeValueSeconds(5)); timeValueSeconds(5));

View File

@ -227,7 +227,7 @@ public final class WatcherTestUtils {
new TimeValue(0), new TimeValue(0),
new ExecutableActions(actions), new ExecutableActions(actions),
metadata, metadata,
new WatchStatus(ImmutableMap.<String, ActionStatus>builder() new WatchStatus(now, ImmutableMap.<String, ActionStatus>builder()
.put("_webhook", new ActionStatus(now)) .put("_webhook", new ActionStatus(now))
.put("_email", new ActionStatus(now)) .put("_email", new ActionStatus(now))
.build())); .build()));

View File

@ -499,6 +499,7 @@ public class SearchTransformTests extends ESIntegTestCase {
} }
private WatchExecutionContext createContext() { private WatchExecutionContext createContext() {
return new TriggeredExecutionContext( return new TriggeredExecutionContext(
new Watch("test-watch", new Watch("test-watch",
new ScheduleTrigger(new IntervalSchedule(new IntervalSchedule.Interval(1, IntervalSchedule.Interval.Unit.MINUTES))), new ScheduleTrigger(new IntervalSchedule(new IntervalSchedule.Interval(1, IntervalSchedule.Interval.Unit.MINUTES))),
@ -508,7 +509,7 @@ public class SearchTransformTests extends ESIntegTestCase {
null, null,
new ExecutableActions(new ArrayList<ActionWrapper>()), new ExecutableActions(new ArrayList<ActionWrapper>()),
null, null,
new WatchStatus(ImmutableMap.<String, ActionStatus>of())), new WatchStatus( new DateTime(40000, UTC), ImmutableMap.<String, ActionStatus>of())),
new DateTime(60000, UTC), new DateTime(60000, UTC),
new ScheduleTriggerEvent("test-watch", new DateTime(60000, UTC), new DateTime(60000, UTC)), new ScheduleTriggerEvent("test-watch", new DateTime(60000, UTC), new DateTime(60000, UTC)),
timeValueSeconds(5)); timeValueSeconds(5));

View File

@ -0,0 +1,160 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.watcher.transport.action.activate;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.watcher.client.WatcherClient;
import org.elasticsearch.watcher.execution.ExecutionState;
import org.elasticsearch.watcher.support.xcontent.XContentSource;
import org.elasticsearch.watcher.test.AbstractWatcherIntegrationTestCase;
import org.elasticsearch.watcher.transport.actions.activate.ActivateWatchResponse;
import org.elasticsearch.watcher.transport.actions.get.GetWatchResponse;
import org.elasticsearch.watcher.transport.actions.put.PutWatchResponse;
import org.junit.Test;
import java.util.concurrent.TimeUnit;
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
import static org.elasticsearch.watcher.actions.ActionBuilders.indexAction;
import static org.elasticsearch.watcher.client.WatchSourceBuilders.watchBuilder;
import static org.elasticsearch.watcher.condition.ConditionBuilders.alwaysCondition;
import static org.elasticsearch.watcher.input.InputBuilders.simpleInput;
import static org.elasticsearch.watcher.trigger.TriggerBuilders.schedule;
import static org.elasticsearch.watcher.trigger.schedule.Schedules.cron;
import static org.elasticsearch.watcher.trigger.schedule.Schedules.interval;
import static org.hamcrest.Matchers.*;
/**
*/
public class ActivateWatchTests extends AbstractWatcherIntegrationTestCase {
@Override
protected boolean timeWarped() {
return false;
}
@Test
public void testDeactivateAndActivate() throws Exception {
WatcherClient watcherClient = watcherClient();
PutWatchResponse putWatchResponse = watcherClient.preparePutWatch()
.setId("_id")
.setSource(watchBuilder()
.trigger(schedule(interval("1s")))
.input(simpleInput("foo", "bar"))
.condition(alwaysCondition())
.addAction("_a1", indexAction("actions", "action1"))
.defaultThrottlePeriod(new TimeValue(0, TimeUnit.SECONDS)))
.get();
assertThat(putWatchResponse.isCreated(), is(true));
GetWatchResponse getWatchResponse = watcherClient.prepareGetWatch("_id").get();
assertThat(getWatchResponse, notNullValue());
assertThat(getWatchResponse.getStatus().state().isActive(), is(true));
assertWatchWithMinimumActionsCount("_id", ExecutionState.EXECUTED, 5);
// we now know the watch is executing... lets deactivate it
ActivateWatchResponse activateWatchResponse = watcherClient.prepareActivateWatch("_id", false).get();
assertThat(activateWatchResponse, notNullValue());
assertThat(activateWatchResponse.getStatus().state().isActive(), is(false));
getWatchResponse = watcherClient.prepareGetWatch("_id").get();
assertThat(getWatchResponse, notNullValue());
assertThat(getWatchResponse.getStatus().state().isActive(), is(false));
flush();
refresh();
long count1 = docCount(".watch_history*", "watch_record", matchAllQuery());
Thread.sleep(10000);
flush();
refresh();
long count2 = docCount(".watch_history*", "watch_record", matchAllQuery());
assertThat(count2, is(count1));
// lets activate it again
activateWatchResponse = watcherClient.prepareActivateWatch("_id", true).get();
assertThat(activateWatchResponse, notNullValue());
assertThat(activateWatchResponse.getStatus().state().isActive(), is(true));
getWatchResponse = watcherClient.prepareGetWatch("_id").get();
assertThat(getWatchResponse, notNullValue());
assertThat(getWatchResponse.getStatus().state().isActive(), is(true));
Thread.sleep(10000);
long count3 = docCount(".watch_history*", "watch_record", matchAllQuery());
assertThat(count3, greaterThan(count1));
}
@Test
public void testLoadWatchWithoutAState() throws Exception {
WatcherClient watcherClient = watcherClient();
PutWatchResponse putWatchResponse = watcherClient.preparePutWatch()
.setId("_id")
.setSource(watchBuilder()
.trigger(schedule(cron("0 0 0 1 1 ? 2050"))) // some time in 2050
.input(simpleInput("foo", "bar"))
.condition(alwaysCondition())
.addAction("_a1", indexAction("actions", "action1"))
.defaultThrottlePeriod(new TimeValue(0, TimeUnit.SECONDS)))
.get();
assertThat(putWatchResponse.isCreated(), is(true));
GetWatchResponse getWatchResponse = watcherClient.prepareGetWatch("_id").get();
assertThat(getWatchResponse, notNullValue());
assertThat(getWatchResponse.getStatus().state().isActive(), is(true));
GetResponse getResponse = client().prepareGet(".watches", "watch", "_id").get();
XContentSource source = new XContentSource(getResponse.getSourceAsBytesRef(), XContentType.JSON);
String[] filters = new String[] {
"trigger.**",
"input.**",
"condition.**",
"throttle_period.**",
"transform.**",
"actions.**",
"metadata.**",
"_status.version",
"_status.last_checked",
"_status.last_met_condition",
"_status.actions.**",
};
XContentBuilder builder = new XContentBuilder(XContentType.JSON.xContent(), new BytesStreamOutput(), filters);
source.toXContent(builder, ToXContent.EMPTY_PARAMS);
// now that we filtered out the watch status state, lets put it back in
IndexResponse indexResponse = client().prepareIndex(".watches", "watch", "_id")
.setSource(builder.bytes())
.get();
assertThat(indexResponse.getId(), is("_id"));
// now, let's restart
assertThat(watcherClient.prepareWatchService().stop().get().isAcknowledged(), is(true));
ensureWatcherStopped();
assertThat(watcherClient.prepareWatchService().start().get().isAcknowledged(), is(true));
ensureWatcherStarted();
getWatchResponse = watcherClient.prepareGetWatch("_id").get();
assertThat(getWatchResponse, notNullValue());
assertThat(getWatchResponse.getStatus().state(), notNullValue());
assertThat(getWatchResponse.getStatus().state().isActive(), is(true));
}
}

View File

@ -170,7 +170,7 @@ public class WatchTests extends ESTestCase {
for (ActionWrapper action : actions) { for (ActionWrapper action : actions) {
actionsStatuses.put(action.id(), new ActionStatus(now)); actionsStatuses.put(action.id(), new ActionStatus(now));
} }
WatchStatus watchStatus = new WatchStatus(actionsStatuses.build()); WatchStatus watchStatus = new WatchStatus(clock.nowUTC(), actionsStatuses.build());
TimeValue throttlePeriod = randomBoolean() ? null : TimeValue.timeValueSeconds(randomIntBetween(5, 10)); TimeValue throttlePeriod = randomBoolean() ? null : TimeValue.timeValueSeconds(randomIntBetween(5, 10));