diff --git a/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/common/stats/Counters.java b/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/common/stats/Counters.java similarity index 62% rename from plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/common/stats/Counters.java rename to plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/common/stats/Counters.java index e51b687b26c..bdf923a79d0 100644 --- a/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/common/stats/Counters.java +++ b/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/common/stats/Counters.java @@ -3,21 +3,26 @@ * or more contributor license agreements. Licensed under the Elastic License; * you may not use this file except in compliance with the Elastic License. */ -package org.elasticsearch.xpack.watcher.common.stats; +package org.elasticsearch.xpack.core.watcher.common.stats; import com.carrotsearch.hppc.ObjectLongHashMap; import com.carrotsearch.hppc.cursors.ObjectLongCursor; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Streamable; +import java.io.IOException; import java.util.HashMap; +import java.util.List; import java.util.Map; /** * Helper class to create simple usage stat counters based on longs * Internally this is a map mapping from String to a long, which is the counter - * Calling toMap() will create a nested map, where each dot of the key name will nest deeper + * Calling toNestedMap() will create a nested map, where each dot of the key name will nest deeper * The main reason for this class is that the stats producer should not be worried about how the map is actually nested */ -public class Counters { +public class Counters implements Streamable { private ObjectLongHashMap counters = new ObjectLongHashMap<>(); @@ -52,12 +57,24 @@ public class Counters { counters.addTo(name, count); } + public long get(String name) { + return counters.get(name); + } + + public long size() { + return counters.size(); + } + + public boolean hasCounters() { + return size() > 0; + } + /** * Convert the counters to a nested map, using the "." as a splitter to create deeper maps * @return A nested map with all the current configured counters */ @SuppressWarnings("unchecked") - public Map toMap() { + public Map toNestedMap() { Map map = new HashMap<>(); for (ObjectLongCursor counter : counters) { if (counter.key.contains(".")) { @@ -84,4 +101,38 @@ public class Counters { return map; } + + @Override + public void readFrom(StreamInput in) throws IOException { + int counters = in.readVInt(); + for (int i = 0; i < counters; i++) { + inc(in.readString(), in.readVLong()); + } + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeVInt(counters.size()); + for (ObjectLongCursor cursor : counters) { + out.writeString(cursor.key); + out.writeVLong(cursor.value); + } + } + + public static Counters read(StreamInput in) throws IOException { + Counters counters = new Counters(); + counters.readFrom(in); + return counters; + } + + public static Counters merge(List counters) { + Counters result = new Counters(); + for (Counters c : counters) { + for (ObjectLongCursor cursor : c.counters) { + result.inc(cursor.key, cursor.value); + } + } + + return result; + } } diff --git a/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/transport/actions/stats/WatcherStatsRequest.java b/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/transport/actions/stats/WatcherStatsRequest.java index 2df487b5516..4fdbd2ac407 100644 --- a/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/transport/actions/stats/WatcherStatsRequest.java +++ b/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/transport/actions/stats/WatcherStatsRequest.java @@ -20,6 +20,7 @@ public class WatcherStatsRequest extends BaseNodesRequest { private boolean includeCurrentWatches; private boolean includeQueuedWatches; + private boolean includeStats; public WatcherStatsRequest() { } @@ -40,6 +41,14 @@ public class WatcherStatsRequest extends BaseNodesRequest { this.includeQueuedWatches = includeQueuedWatches; } + public boolean includeStats() { + return includeStats; + } + + public void includeStats(boolean includeStats) { + this.includeStats = includeStats; + } + @Override public ActionRequestValidationException validate() { return null; @@ -50,6 +59,7 @@ public class WatcherStatsRequest extends BaseNodesRequest { super.readFrom(in); includeCurrentWatches = in.readBoolean(); includeQueuedWatches = in.readBoolean(); + includeStats = in.readBoolean(); } @Override @@ -57,6 +67,7 @@ public class WatcherStatsRequest extends BaseNodesRequest { super.writeTo(out); out.writeBoolean(includeCurrentWatches); out.writeBoolean(includeQueuedWatches); + out.writeBoolean(includeStats); } @Override @@ -68,6 +79,7 @@ public class WatcherStatsRequest extends BaseNodesRequest { private boolean includeCurrentWatches; private boolean includeQueuedWatches; + private boolean includeStats; public Node() {} @@ -75,6 +87,7 @@ public class WatcherStatsRequest extends BaseNodesRequest { super(nodeId); includeCurrentWatches = request.includeCurrentWatches(); includeQueuedWatches = request.includeQueuedWatches(); + includeStats = request.includeStats(); } public boolean includeCurrentWatches() { @@ -85,11 +98,16 @@ public class WatcherStatsRequest extends BaseNodesRequest { return includeQueuedWatches; } + public boolean includeStats() { + return includeStats; + } + @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); includeCurrentWatches = in.readBoolean(); includeQueuedWatches = in.readBoolean(); + includeStats = in.readBoolean(); } @Override @@ -97,6 +115,7 @@ public class WatcherStatsRequest extends BaseNodesRequest { super.writeTo(out); out.writeBoolean(includeCurrentWatches); out.writeBoolean(includeQueuedWatches); + out.writeBoolean(includeStats); } } } \ No newline at end of file diff --git a/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/transport/actions/stats/WatcherStatsResponse.java b/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/transport/actions/stats/WatcherStatsResponse.java index 3f57e7fb889..13b3ddb60b2 100644 --- a/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/transport/actions/stats/WatcherStatsResponse.java +++ b/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/transport/actions/stats/WatcherStatsResponse.java @@ -17,6 +17,7 @@ import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.xpack.core.watcher.WatcherMetaData; import org.elasticsearch.xpack.core.watcher.WatcherState; +import org.elasticsearch.xpack.core.watcher.common.stats.Counters; import org.elasticsearch.xpack.core.watcher.execution.QueuedWatch; import org.elasticsearch.xpack.core.watcher.execution.WatchExecutionSnapshot; @@ -93,6 +94,7 @@ public class WatcherStatsResponse extends BaseNodesResponse snapshots; private List queuedWatches; + private Counters stats; public Node() { } @@ -163,6 +165,14 @@ public class WatcherStatsResponse extends BaseNodesResponse map = counters.toMap(); + Map map = counters.toNestedMap(); assertThat(map, hasEntry("f", 200L)); assertThat(map, hasKey("foo")); assertThat(map.get("foo"), instanceOf(Map.class)); diff --git a/plugin/src/test/resources/rest-api-spec/test/watcher/usage/10_basic.yml b/plugin/src/test/resources/rest-api-spec/test/watcher/usage/10_basic.yml new file mode 100644 index 00000000000..a33fcdb5297 --- /dev/null +++ b/plugin/src/test/resources/rest-api-spec/test/watcher/usage/10_basic.yml @@ -0,0 +1,61 @@ +--- +"Test watcher usage stats output": + + - do: + catch: missing + xpack.watcher.delete_watch: + id: "usage_stats_watch" + + - do: {xpack.usage: {}} + - set: { "watcher.count.active": watch_count_active } + - set: { "watcher.count.total": watch_count_total } + + - do: + xpack.watcher.put_watch: + id: "usage_stats_watch" + body: > + { + "trigger": { + "schedule" : { "cron" : "0 0 0 1 * ? 2099" } + }, + "input": { + "search" : { + "request" : { + "indices" : [ "my_test_index" ], + "body" :{ + "query" : { "match_all": {} } + } + } + } + }, + "condition" : { + "compare" : { + "ctx.payload.hits.total" : { + "gte" : 1 + } + } + }, + "actions": { + "logging": { + "logging": { + "text": "Successfully ran my_watch to test for search input" + } + } + } + } + - match: { _id: "usage_stats_watch" } + + - do: {xpack.usage: {}} + - gt: { "watcher.count.active": $watch_count_active } + - gt: { "watcher.count.total": $watch_count_total } + - gte: { "watcher.watch.action._all.active": 1 } + - gte: { "watcher.watch.action.logging.active": 1 } + - gte: { "watcher.watch.condition._all.active": 1 } + - gte: { "watcher.watch.condition.compare.active": 1 } + - gte: { "watcher.watch.input._all.active": 1 } + - gte: { "watcher.watch.input.search.active": 1 } + - gte: { "watcher.watch.trigger._all.active": 1 } + - gte: { "watcher.watch.trigger.schedule.active": 1 } + - gte: { "watcher.watch.trigger.schedule.cron.active": 1 } + - gte: { "watcher.watch.trigger.schedule._all.active": 1 } + diff --git a/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherFeatureSet.java b/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherFeatureSet.java index 80f37bb5134..1549a94ac35 100644 --- a/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherFeatureSet.java +++ b/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherFeatureSet.java @@ -6,29 +6,42 @@ package org.elasticsearch.xpack.watcher; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.client.Client; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.logging.ESLoggerFactory; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.license.XPackLicenseState; import org.elasticsearch.xpack.core.XPackFeatureSet; import org.elasticsearch.xpack.core.XPackField; import org.elasticsearch.xpack.core.XPackSettings; import org.elasticsearch.xpack.core.watcher.WatcherFeatureSetUsage; +import org.elasticsearch.xpack.core.watcher.client.WatcherClient; +import org.elasticsearch.xpack.core.watcher.common.stats.Counters; +import org.elasticsearch.xpack.core.watcher.transport.actions.stats.WatcherStatsRequest; +import org.elasticsearch.xpack.core.watcher.transport.actions.stats.WatcherStatsResponse; import java.util.Collections; +import java.util.List; import java.util.Map; +import java.util.Objects; +import java.util.stream.Collectors; + +import static org.elasticsearch.xpack.core.ClientHelper.WATCHER_ORIGIN; +import static org.elasticsearch.xpack.core.ClientHelper.stashWithOrigin; public class WatcherFeatureSet implements XPackFeatureSet { private final boolean enabled; private final XPackLicenseState licenseState; - private final WatcherService watcherService; + private Client client; @Inject - public WatcherFeatureSet(Settings settings, @Nullable XPackLicenseState licenseState, @Nullable WatcherService watcherService) { - this.watcherService = watcherService; + public WatcherFeatureSet(Settings settings, @Nullable XPackLicenseState licenseState, Client client) { this.enabled = XPackSettings.WATCHER_ENABLED.get(settings); this.licenseState = licenseState; + this.client = client; } @Override @@ -58,9 +71,24 @@ public class WatcherFeatureSet implements XPackFeatureSet { @Override public void usage(ActionListener listener) { - listener.onResponse( - new WatcherFeatureSetUsage(available(), enabled(), - watcherService != null ? watcherService.usageStats() : Collections.emptyMap())); + if (enabled) { + try (ThreadContext.StoredContext ignore = + stashWithOrigin(client.threadPool().getThreadContext(), WATCHER_ORIGIN)) { + WatcherClient watcherClient = new WatcherClient(client); + WatcherStatsRequest request = new WatcherStatsRequest(); + request.includeStats(true); + watcherClient.watcherStats(request, ActionListener.wrap(r -> { + List countersPerNode = r.getNodes() + .stream() + .map(WatcherStatsResponse.Node::getStats) + .filter(Objects::nonNull) + .collect(Collectors.toList()); + Counters mergedCounters = Counters.merge(countersPerNode); + listener.onResponse(new WatcherFeatureSetUsage(available(), enabled(), mergedCounters.toNestedMap())); + }, listener::onFailure)); + } + } else { + listener.onResponse(new WatcherFeatureSetUsage(available(), enabled(), Collections.emptyMap())); + } } - } diff --git a/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherService.java b/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherService.java index 8b4536d6568..11ef3f584d0 100644 --- a/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherService.java +++ b/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherService.java @@ -319,9 +319,4 @@ public class WatcherService extends AbstractComponent { public WatcherState state() { return state.get(); } - - public Map usageStats() { - Map innerMap = executionService.usageStats(); - return innerMap; - } } diff --git a/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/ExecutionService.java b/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/ExecutionService.java index 9a936010758..223a9d08ad1 100644 --- a/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/ExecutionService.java +++ b/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/ExecutionService.java @@ -36,6 +36,7 @@ import org.elasticsearch.common.xcontent.json.JsonXContent; import org.elasticsearch.index.engine.DocumentMissingException; import org.elasticsearch.xpack.core.watcher.actions.ActionWrapper; import org.elasticsearch.xpack.core.watcher.actions.ActionWrapperResult; +import org.elasticsearch.xpack.core.watcher.common.stats.Counters; import org.elasticsearch.xpack.core.watcher.condition.Condition; import org.elasticsearch.xpack.core.watcher.execution.ExecutionState; import org.elasticsearch.xpack.core.watcher.execution.QueuedWatch; @@ -49,7 +50,6 @@ import org.elasticsearch.xpack.core.watcher.watch.Watch; import org.elasticsearch.xpack.core.watcher.watch.WatchField; import org.elasticsearch.xpack.core.watcher.watch.WatchStatus; import org.elasticsearch.xpack.watcher.Watcher; -import org.elasticsearch.xpack.watcher.common.stats.Counters; import org.elasticsearch.xpack.watcher.history.HistoryStore; import org.elasticsearch.xpack.watcher.watch.WatchParser; import org.joda.time.DateTime; @@ -524,7 +524,7 @@ public class ExecutionService extends AbstractComponent { } } - public Map usageStats() { + public Counters executionTimes() { Counters counters = new Counters(); counters.inc("execution.actions._all.total", totalExecutionsTime.count()); counters.inc("execution.actions._all.total_time_in_ms", totalExecutionsTime.sum()); @@ -534,7 +534,7 @@ public class ExecutionService extends AbstractComponent { counters.inc("execution.actions." + entry.getKey() + ".total_time_in_ms", entry.getValue().sum()); } - return counters.toMap(); + return counters; } /** diff --git a/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/transport/actions/stats/TransportWatcherStatsAction.java b/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/transport/actions/stats/TransportWatcherStatsAction.java index 6d8aa29e646..d7f8962756b 100644 --- a/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/transport/actions/stats/TransportWatcherStatsAction.java +++ b/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/transport/actions/stats/TransportWatcherStatsAction.java @@ -15,6 +15,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.core.watcher.WatcherMetaData; +import org.elasticsearch.xpack.core.watcher.common.stats.Counters; import org.elasticsearch.xpack.core.watcher.transport.actions.stats.WatcherStatsAction; import org.elasticsearch.xpack.core.watcher.transport.actions.stats.WatcherStatsRequest; import org.elasticsearch.xpack.core.watcher.transport.actions.stats.WatcherStatsResponse; @@ -22,6 +23,7 @@ import org.elasticsearch.xpack.watcher.WatcherService; import org.elasticsearch.xpack.watcher.execution.ExecutionService; import org.elasticsearch.xpack.watcher.trigger.TriggerService; +import java.util.Arrays; import java.util.List; /** @@ -75,7 +77,10 @@ public class TransportWatcherStatsAction extends TransportNodesAction { T parseTrigger(String context, XContentParser parser) throws IOException; E parseTriggerEvent(TriggerService service, String watchId, String context, XContentParser parser) throws IOException; + } diff --git a/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/trigger/TriggerService.java b/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/trigger/TriggerService.java index 7da6ca381b3..307b4eb5099 100644 --- a/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/trigger/TriggerService.java +++ b/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/trigger/TriggerService.java @@ -6,9 +6,11 @@ package org.elasticsearch.xpack.watcher.trigger; import org.elasticsearch.ElasticsearchParseException; +import org.elasticsearch.common.Strings; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.xpack.core.watcher.common.stats.Counters; import org.elasticsearch.xpack.core.watcher.trigger.Trigger; import org.elasticsearch.xpack.core.watcher.trigger.TriggerEvent; import org.elasticsearch.xpack.core.watcher.watch.Watch; @@ -29,6 +31,7 @@ public class TriggerService extends AbstractComponent { private final GroupedConsumer consumer = new GroupedConsumer(); private final Map engines; + private final Map perWatchStats = new HashMap<>(); public TriggerService(Settings settings, Set engines) { super(settings); @@ -40,16 +43,18 @@ public class TriggerService extends AbstractComponent { this.engines = unmodifiableMap(builder); } - public synchronized void start(Collection watches) throws Exception { + public synchronized void start(Collection watches) { for (TriggerEngine engine : engines.values()) { engine.start(watches); } + watches.forEach(this::addToStats); } public synchronized void stop() { for (TriggerEngine engine : engines.values()) { engine.stop(); } + perWatchStats.clear(); } /** @@ -60,11 +65,77 @@ public class TriggerService extends AbstractComponent { } /** - * Count the total number of active jobs across all trigger engines - * @return The total count of active jobs + * create statistics for a single watch, and store it in a local map + * allowing for easy deletion in case the watch gets removed from the trigger service */ - public long count() { - return engines.values().stream().mapToInt(TriggerEngine::getJobCount).sum(); + private void addToStats(Watch watch) { + TriggerWatchStats watchStats = TriggerWatchStats.create(watch); + perWatchStats.put(watch.id(), watchStats); + } + + /** + * Returns some statistics about the watches loaded in the trigger service + * @return a set of counters containing statistics + */ + public Counters stats() { + Counters counters = new Counters(); + // for bwc reasons, active/total contain the same values + int watchCount = perWatchStats.size(); + counters.inc("count.active", watchCount); + counters.inc("count.total", watchCount); + counters.inc("watch.trigger._all.active", watchCount); + counters.inc("watch.trigger._all.total", watchCount); + counters.inc("watch.input._all.total", watchCount); + counters.inc("watch.input._all.active", watchCount); + perWatchStats.values().forEach(stats -> { + if (stats.metadata) { + counters.inc("watch.metadata.active"); + counters.inc("watch.metadata.total"); + } + counters.inc("watch.trigger." + stats.triggerType + ".total"); + counters.inc("watch.trigger." + stats.triggerType + ".active"); + if (Strings.isNullOrEmpty(stats.scheduleType) == false) { + counters.inc("watch.trigger.schedule." + stats.scheduleType + ".total"); + counters.inc("watch.trigger.schedule." + stats.scheduleType + ".active"); + counters.inc("watch.trigger.schedule._all.total"); + counters.inc("watch.trigger.schedule._all.active"); + } + counters.inc("watch.input." + stats.inputType + ".active"); + counters.inc("watch.input." + stats.inputType + ".total"); + + counters.inc("watch.condition." + stats.conditionType + ".active"); + counters.inc("watch.condition." + stats.conditionType + ".total"); + counters.inc("watch.condition._all.total"); + counters.inc("watch.condition._all.active"); + + if (Strings.isNullOrEmpty(stats.transformType) == false) { + counters.inc("watch.transform." + stats.transformType + ".active"); + counters.inc("watch.transform." + stats.transformType + ".total"); + counters.inc("watch.transform._all.active"); + counters.inc("watch.transform._all.total"); + } + + for (TriggerWatchStats.ActionStats action : stats.actions) { + counters.inc("watch.action." + action.actionType + ".active"); + counters.inc("watch.action." + action.actionType + ".total"); + counters.inc("watch.action._all.active"); + counters.inc("watch.action._all.total"); + + if (Strings.isNullOrEmpty(action.conditionType) == false) { + counters.inc("watch.action.condition." + action.conditionType + ".active"); + counters.inc("watch.action.condition." + action.conditionType + ".total"); + counters.inc("watch.action.condition._all.active"); + counters.inc("watch.action.condition._all.total"); + } + if (Strings.isNullOrEmpty(action.transformType) == false) { + counters.inc("watch.action.transform." + action.transformType + ".active"); + counters.inc("watch.action.transform." + action.transformType + ".total"); + counters.inc("watch.action.transform._all.active"); + counters.inc("watch.action.transform._all.total"); + } + } + }); + return counters; } /** @@ -75,6 +146,7 @@ public class TriggerService extends AbstractComponent { */ public void add(Watch watch) { engines.get(watch.trigger().type()).add(watch); + addToStats(watch); } /** @@ -84,6 +156,7 @@ public class TriggerService extends AbstractComponent { * @return {@code true} if the job existed and removed, {@code false} otherwise. */ public boolean remove(String jobName) { + perWatchStats.remove(jobName); for (TriggerEngine engine : engines.values()) { if (engine.remove(jobName)) { return true; @@ -166,6 +239,10 @@ public class TriggerService extends AbstractComponent { return engine.parseTriggerEvent(this, watchId, context, parser); } + public long count() { + return perWatchStats.size(); + } + static class GroupedConsumer implements java.util.function.Consumer> { private List>> consumers = new CopyOnWriteArrayList<>(); diff --git a/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/trigger/TriggerWatchStats.java b/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/trigger/TriggerWatchStats.java new file mode 100644 index 00000000000..fb4e8b3f784 --- /dev/null +++ b/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/trigger/TriggerWatchStats.java @@ -0,0 +1,69 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.watcher.trigger; + +import org.elasticsearch.xpack.core.watcher.actions.ActionWrapper; +import org.elasticsearch.xpack.core.watcher.watch.Watch; +import org.elasticsearch.xpack.watcher.trigger.schedule.ScheduleTrigger; + +public class TriggerWatchStats { + + public final boolean metadata; + public final String triggerType; + public final String scheduleType; + public final String inputType; + public final String conditionType; + public final String transformType; + public final ActionStats[] actions; + + private TriggerWatchStats(boolean metadata, String triggerType, String scheduleType, String inputType, + String conditionType, String transformType, ActionStats[] actions) { + this.metadata = metadata; + this.triggerType = triggerType; + this.scheduleType = scheduleType; + this.inputType = inputType; + this.conditionType = conditionType; + this.transformType = transformType; + this.actions = actions; + } + + public static final class ActionStats { + public final String actionType; + public final String transformType; + public final String conditionType; + + public ActionStats(String actionType, String transformType, String conditionType) { + this.actionType = actionType; + this.transformType = transformType; + this.conditionType = conditionType; + } + } + + public static TriggerWatchStats create(Watch watch) { + final boolean metadata = watch.metadata() != null && watch.metadata().isEmpty() == false; + final String triggerType = watch.trigger().type(); + String scheduleTriggerType = null; + if (ScheduleTrigger.TYPE.equals(watch.trigger().type())) { + ScheduleTrigger scheduleTrigger = (ScheduleTrigger) watch.trigger(); + scheduleTriggerType = scheduleTrigger.getSchedule().type(); + } + final String inputType = watch.input().type(); + final String conditionType = watch.condition().type(); + final String transformType = watch.transform() != null ? watch.transform().type() : null; + + final ActionStats[] actionStats = new ActionStats[watch.actions().size()]; + int i = 0; + for (ActionWrapper actionWrapper : watch.actions()) { + String transform = actionWrapper.transform() != null ? actionWrapper.transform().type() : null; + String condition = actionWrapper.condition() != null ? actionWrapper.condition().type() : null; + String type = actionWrapper.action().type(); + actionStats[i++] = new ActionStats(type, transform, condition); + } + + return new TriggerWatchStats(metadata, triggerType, scheduleTriggerType, inputType, + conditionType, transformType, actionStats); + } +} diff --git a/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/trigger/schedule/engine/TickerScheduleTriggerEngine.java b/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/trigger/schedule/engine/TickerScheduleTriggerEngine.java index b52ef09cdf0..7e08f140daf 100644 --- a/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/trigger/schedule/engine/TickerScheduleTriggerEngine.java +++ b/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/trigger/schedule/engine/TickerScheduleTriggerEngine.java @@ -65,10 +65,10 @@ public class TickerScheduleTriggerEngine extends ScheduleTriggerEngine { } @Override - public void add(Watch job) { - assert job.trigger() instanceof ScheduleTrigger; - ScheduleTrigger trigger = (ScheduleTrigger) job.trigger(); - schedules.put(job.id(), new ActiveSchedule(job.id(), trigger.getSchedule(), clock.millis())); + public void add(Watch watch) { + assert watch.trigger() instanceof ScheduleTrigger; + ScheduleTrigger trigger = (ScheduleTrigger) watch.trigger(); + schedules.put(watch.id(), new ActiveSchedule(watch.id(), trigger.getSchedule(), clock.millis())); } @Override diff --git a/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherFeatureSetTests.java b/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherFeatureSetTests.java index f9faf64c18f..e1e8b5b2ddd 100644 --- a/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherFeatureSetTests.java +++ b/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherFeatureSetTests.java @@ -5,48 +5,68 @@ */ package org.elasticsearch.xpack.watcher; +import org.elasticsearch.Version; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.license.XPackLicenseState; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.XPackFeatureSet; import org.elasticsearch.xpack.core.watcher.WatcherFeatureSetUsage; +import org.elasticsearch.xpack.core.watcher.WatcherMetaData; +import org.elasticsearch.xpack.core.watcher.common.stats.Counters; +import org.elasticsearch.xpack.core.watcher.support.xcontent.ObjectPath; import org.elasticsearch.xpack.core.watcher.support.xcontent.XContentSource; +import org.elasticsearch.xpack.core.watcher.transport.actions.stats.WatcherStatsAction; +import org.elasticsearch.xpack.core.watcher.transport.actions.stats.WatcherStatsResponse; import org.junit.Before; +import java.util.ArrayList; import java.util.Arrays; -import java.util.HashMap; -import java.util.Map; +import java.util.Collections; +import java.util.List; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; -import static org.hamcrest.Matchers.hasEntry; +import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.core.Is.is; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; public class WatcherFeatureSetTests extends ESTestCase { private XPackLicenseState licenseState; - private WatcherService watcherService; + private Client client; @Before public void init() throws Exception { licenseState = mock(XPackLicenseState.class); - watcherService = mock(WatcherService.class); + client = mock(Client.class); + ThreadPool threadPool = mock(ThreadPool.class); + ThreadContext threadContext = new ThreadContext(Settings.EMPTY); + when(threadPool.getThreadContext()).thenReturn(threadContext); + when(client.threadPool()).thenReturn(threadPool); } - public void testAvailable() throws Exception { - WatcherFeatureSet featureSet = new WatcherFeatureSet(Settings.EMPTY, licenseState, watcherService); + public void testAvailable() { + WatcherFeatureSet featureSet = new WatcherFeatureSet(Settings.EMPTY, licenseState, client); boolean available = randomBoolean(); when(licenseState.isWatcherAllowed()).thenReturn(available); assertThat(featureSet.available(), is(available)); } - public void testEnabled() throws Exception { + public void testEnabled() { boolean enabled = randomBoolean(); Settings.Builder settings = Settings.builder(); if (enabled) { @@ -56,18 +76,47 @@ public class WatcherFeatureSetTests extends ESTestCase { } else { settings.put("xpack.watcher.enabled", enabled); } - WatcherFeatureSet featureSet = new WatcherFeatureSet(settings.build(), licenseState, watcherService); + WatcherFeatureSet featureSet = new WatcherFeatureSet(settings.build(), licenseState, client); assertThat(featureSet.enabled(), is(enabled)); } public void testUsageStats() throws Exception { - Map statsMap = new HashMap<>(); - statsMap.put("foo", "bar"); - when(watcherService.usageStats()).thenReturn(statsMap); + doAnswer(mock -> { + ActionListener listener = + (ActionListener) mock.getArguments()[2]; - PlainActionFuture future = new PlainActionFuture<>(); - new WatcherFeatureSet(Settings.EMPTY, licenseState, watcherService).usage(future); - XPackFeatureSet.Usage watcherUsage = future.get(); + List nodes = new ArrayList<>(); + DiscoveryNode first = new DiscoveryNode("first", buildNewFakeTransportAddress(), Version.CURRENT); + WatcherStatsResponse.Node firstNode = new WatcherStatsResponse.Node(first); + Counters firstCounters = new Counters(); + firstCounters.inc("foo.foo", 1); + firstCounters.inc("foo.bar.baz", 1); + firstNode.setStats(firstCounters); + nodes.add(firstNode); + + DiscoveryNode second = new DiscoveryNode("second", buildNewFakeTransportAddress(), Version.CURRENT); + WatcherStatsResponse.Node secondNode = new WatcherStatsResponse.Node(second); + Counters secondCounters = new Counters(); + secondCounters.inc("spam", 1); + secondCounters.inc("foo.bar.baz", 4); + secondNode.setStats(secondCounters); + nodes.add(secondNode); + + listener.onResponse(new WatcherStatsResponse(new ClusterName("whatever"), new WatcherMetaData(false), + nodes, Collections.emptyList())); + return null; + }).when(client).execute(eq(WatcherStatsAction.INSTANCE), any(), any()); + + PlainActionFuture future = new PlainActionFuture<>(); + new WatcherFeatureSet(Settings.EMPTY, licenseState, client).usage(future); + WatcherFeatureSetUsage watcherUsage = (WatcherFeatureSetUsage) future.get(); + assertThat(watcherUsage.stats().keySet(), containsInAnyOrder("foo", "spam")); + long fooBarBaz = ObjectPath.eval("foo.bar.baz", watcherUsage.stats()); + assertThat(fooBarBaz, is(5L)); + long fooFoo = ObjectPath.eval("foo.foo", watcherUsage.stats()); + assertThat(fooFoo, is(1L)); + long spam = ObjectPath.eval("spam", watcherUsage.stats()); + assertThat(spam, is(1L)); BytesStreamOutput out = new BytesStreamOutput(); watcherUsage.writeTo(out); XPackFeatureSet.Usage serializedUsage = new WatcherFeatureSetUsage(out.bytes().streamInput()); @@ -77,11 +126,13 @@ public class WatcherFeatureSetTests extends ESTestCase { usage.toXContent(builder, ToXContent.EMPTY_PARAMS); XContentSource source = new XContentSource(builder); - assertThat(source.getValue("foo"), is("bar")); + assertThat(source.getValue("foo.bar.baz"), is(5)); + assertThat(source.getValue("spam"), is(1)); + assertThat(source.getValue("foo.foo"), is(1)); assertThat(usage, instanceOf(WatcherFeatureSetUsage.class)); WatcherFeatureSetUsage featureSetUsage = (WatcherFeatureSetUsage) usage; - assertThat(featureSetUsage.stats(), hasEntry("foo", "bar")); + assertThat(featureSetUsage.stats().keySet(), containsInAnyOrder("foo", "spam")); } } } diff --git a/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherXpackUsageStatsTests.java b/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherXpackUsageStatsTests.java new file mode 100644 index 00000000000..3a314640d74 --- /dev/null +++ b/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherXpackUsageStatsTests.java @@ -0,0 +1,52 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.watcher; + +import org.elasticsearch.xpack.core.XPackFeatureSet; +import org.elasticsearch.xpack.core.action.XPackUsageAction; +import org.elasticsearch.xpack.core.action.XPackUsageRequest; +import org.elasticsearch.xpack.core.action.XPackUsageResponse; +import org.elasticsearch.xpack.core.watcher.WatcherFeatureSetUsage; +import org.elasticsearch.xpack.watcher.test.AbstractWatcherIntegrationTestCase; + +import java.util.Map; +import java.util.Optional; + +import static org.elasticsearch.xpack.watcher.actions.ActionBuilders.loggingAction; +import static org.elasticsearch.xpack.watcher.client.WatchSourceBuilders.watchBuilder; +import static org.elasticsearch.xpack.watcher.input.InputBuilders.simpleInput; +import static org.elasticsearch.xpack.watcher.trigger.TriggerBuilders.schedule; +import static org.elasticsearch.xpack.watcher.trigger.schedule.Schedules.cron; +import static org.hamcrest.Matchers.is; + +public class WatcherXpackUsageStatsTests extends AbstractWatcherIntegrationTestCase { + + // as these tests use three data nodes, those watches will be across two of those + // nodes due to having two watcher shards, so that we can be sure that the count + // was merged + public void testWatcherUsageStatsTests() { + long watchCount = randomLongBetween(5, 20); + for (int i = 0; i < watchCount; i++) { + watcherClient().preparePutWatch("_id" + i).setSource(watchBuilder() + .trigger(schedule(cron("0/5 * * * * ? 2050"))) + .input(simpleInput()) + .addAction("_id", loggingAction("whatever " + i))) + .get(); + } + + XPackUsageRequest request = new XPackUsageRequest(); + XPackUsageResponse usageResponse = client().execute(XPackUsageAction.INSTANCE, request).actionGet(); + Optional usage = usageResponse.getUsages().stream() + .filter(u -> u instanceof WatcherFeatureSetUsage) + .findFirst(); + assertThat(usage.isPresent(), is(true)); + WatcherFeatureSetUsage featureSetUsage = (WatcherFeatureSetUsage) usage.get(); + + long activeWatchCount = (long) ((Map) featureSetUsage.stats().get("count")).get("active"); + assertThat(activeWatchCount, is(watchCount)); + } + +} diff --git a/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/execution/ExecutionServiceTests.java b/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/execution/ExecutionServiceTests.java index 553f0fe311a..ace5065762a 100644 --- a/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/execution/ExecutionServiceTests.java +++ b/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/execution/ExecutionServiceTests.java @@ -37,6 +37,7 @@ import org.elasticsearch.xpack.core.watcher.actions.ActionWrapperResult; import org.elasticsearch.xpack.core.watcher.actions.ExecutableAction; import org.elasticsearch.xpack.core.watcher.actions.throttler.ActionThrottler; import org.elasticsearch.xpack.core.watcher.actions.throttler.Throttler; +import org.elasticsearch.xpack.core.watcher.common.stats.Counters; import org.elasticsearch.xpack.core.watcher.condition.Condition; import org.elasticsearch.xpack.core.watcher.condition.ExecutableCondition; import org.elasticsearch.xpack.core.watcher.execution.ExecutionPhase; @@ -49,7 +50,6 @@ import org.elasticsearch.xpack.core.watcher.history.WatchRecord; import org.elasticsearch.xpack.core.watcher.input.ExecutableInput; import org.elasticsearch.xpack.core.watcher.input.Input; import org.elasticsearch.xpack.core.watcher.support.xcontent.ObjectPath; -import org.elasticsearch.xpack.core.watcher.support.xcontent.XContentSource; import org.elasticsearch.xpack.core.watcher.transform.ExecutableTransform; import org.elasticsearch.xpack.core.watcher.transform.Transform; import org.elasticsearch.xpack.core.watcher.trigger.TriggerEvent; @@ -84,7 +84,6 @@ import java.util.concurrent.atomic.AtomicBoolean; import static java.util.Arrays.asList; import static java.util.Collections.singletonMap; import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds; -import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.instanceOf; @@ -249,11 +248,11 @@ public class ExecutionServiceTests extends ESTestCase { assertThat(watchRecord.result().executionTime(), is(notNullValue())); // test stats - XContentSource source = new XContentSource(jsonBuilder().map(executionService.usageStats())); - assertThat(source.getValue("execution.actions._all.total_time_in_ms"), is(notNullValue())); - assertThat(source.getValue("execution.actions._all.total"), is(1)); - assertThat(source.getValue("execution.actions.MY_AWESOME_TYPE.total_time_in_ms"), is(notNullValue())); - assertThat(source.getValue("execution.actions.MY_AWESOME_TYPE.total"), is(1)); + Counters counters = executionService.executionTimes(); + assertThat(counters.get("execution.actions._all.total_time_in_ms"), is(notNullValue())); + assertThat(counters.get("execution.actions._all.total"), is(1L)); + assertThat(counters.get("execution.actions.MY_AWESOME_TYPE.total_time_in_ms"), is(notNullValue())); + assertThat(counters.get("execution.actions.MY_AWESOME_TYPE.total"), is(1L)); } public void testExecuteFailedInput() throws Exception { diff --git a/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/transport/actions/stats/TransportWatcherStatsActionTests.java b/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/transport/actions/stats/TransportWatcherStatsActionTests.java new file mode 100644 index 00000000000..94b356286da --- /dev/null +++ b/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/transport/actions/stats/TransportWatcherStatsActionTests.java @@ -0,0 +1,110 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.watcher.transport.actions.stats; + +import org.elasticsearch.Version; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.json.JsonXContent; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.rest.yaml.ObjectPath; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.core.watcher.WatcherState; +import org.elasticsearch.xpack.core.watcher.common.stats.Counters; +import org.elasticsearch.xpack.core.watcher.transport.actions.stats.WatcherStatsRequest; +import org.elasticsearch.xpack.core.watcher.transport.actions.stats.WatcherStatsResponse; +import org.elasticsearch.xpack.watcher.WatcherService; +import org.elasticsearch.xpack.watcher.execution.ExecutionService; +import org.elasticsearch.xpack.watcher.trigger.TriggerService; +import org.junit.Before; + +import java.util.Arrays; +import java.util.Collections; + +import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; +import static org.hamcrest.Matchers.is; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class TransportWatcherStatsActionTests extends ESTestCase { + + private TransportWatcherStatsAction action; + + @Before + public void setupTransportAction() { + TransportService transportService = mock(TransportService.class); + ThreadPool threadPool = mock(ThreadPool.class); + + ClusterService clusterService = mock(ClusterService.class); + DiscoveryNode discoveryNode = new DiscoveryNode("nodeId", buildNewFakeTransportAddress(), Version.CURRENT); + when(clusterService.localNode()).thenReturn(discoveryNode); + + ClusterName clusterName = new ClusterName("cluster_name"); + when(clusterService.getClusterName()).thenReturn(clusterName); + + ClusterState clusterState = mock(ClusterState.class); + when(clusterState.getMetaData()).thenReturn(MetaData.EMPTY_META_DATA); + when(clusterService.state()).thenReturn(clusterState); + + WatcherService watcherService = mock(WatcherService.class); + when(watcherService.state()).thenReturn(WatcherState.STARTED); + + ExecutionService executionService = mock(ExecutionService.class); + when(executionService.executionThreadPoolQueueSize()).thenReturn(100L); + when(executionService.executionThreadPoolMaxSize()).thenReturn(5L); + Counters firstExecutionCounters = new Counters(); + firstExecutionCounters.inc("spam.eggs", 1); + Counters secondExecutionCounters = new Counters(); + secondExecutionCounters.inc("whatever", 1); + secondExecutionCounters.inc("foo.bar.baz", 123); + when(executionService.executionTimes()).thenReturn(firstExecutionCounters, secondExecutionCounters); + + TriggerService triggerService = mock(TriggerService.class); + when(triggerService.count()).thenReturn(10L, 30L); + Counters firstTriggerServiceStats = new Counters(); + firstTriggerServiceStats.inc("foo.bar.baz", 1024); + Counters secondTriggerServiceStats = new Counters(); + secondTriggerServiceStats.inc("foo.bar.baz", 1024); + when(triggerService.stats()).thenReturn(firstTriggerServiceStats, secondTriggerServiceStats); + + action = new TransportWatcherStatsAction(Settings.EMPTY, transportService, + clusterService, threadPool, new ActionFilters(Collections.emptySet()), + new IndexNameExpressionResolver(Settings.EMPTY), watcherService, executionService, triggerService); + } + + public void testWatcherStats() throws Exception { + WatcherStatsRequest request = new WatcherStatsRequest(); + request.includeStats(true); + WatcherStatsResponse.Node nodeResponse1 = action.nodeOperation(new WatcherStatsRequest.Node(request, "nodeId")); + WatcherStatsResponse.Node nodeResponse2 = action.nodeOperation(new WatcherStatsRequest.Node(request, "nodeId2")); + + WatcherStatsResponse response = action.newResponse(request, + Arrays.asList(nodeResponse1, nodeResponse2), Collections.emptyList()); + assertThat(response.getWatchesCount(), is(40L)); + + try (XContentBuilder builder = jsonBuilder()) { + builder.startObject(); + response.toXContent(builder, ToXContent.EMPTY_PARAMS); + builder.endObject(); + + ObjectPath objectPath = ObjectPath.createFromXContent(JsonXContent.jsonXContent, BytesReference.bytes(builder)); + assertThat(objectPath.evaluate("stats.0.stats.foo.bar.baz"), is(1024)); + assertThat(objectPath.evaluate("stats.1.stats.foo.bar.baz"), is(1147)); + assertThat(objectPath.evaluate("stats.0.stats.spam.eggs"), is(1)); + assertThat(objectPath.evaluate("stats.1.stats.whatever"), is(1)); + } + } +} \ No newline at end of file diff --git a/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/trigger/ScheduleTriggerEngineMock.java b/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/trigger/ScheduleTriggerEngineMock.java index 9bfd6a14a3b..57fe40f67b4 100644 --- a/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/trigger/ScheduleTriggerEngineMock.java +++ b/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/trigger/ScheduleTriggerEngineMock.java @@ -10,6 +10,7 @@ import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.xpack.core.watcher.common.stats.Counters; import org.elasticsearch.xpack.core.watcher.watch.ClockMock; import org.elasticsearch.xpack.core.watcher.watch.Watch; import org.elasticsearch.xpack.watcher.trigger.schedule.ScheduleRegistry; diff --git a/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/trigger/TriggerServiceTests.java b/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/trigger/TriggerServiceTests.java new file mode 100644 index 00000000000..61222b5b297 --- /dev/null +++ b/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/trigger/TriggerServiceTests.java @@ -0,0 +1,150 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.watcher.trigger; + +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xpack.core.watcher.actions.ActionWrapper; +import org.elasticsearch.xpack.core.watcher.actions.ExecutableAction; +import org.elasticsearch.xpack.core.watcher.common.stats.Counters; +import org.elasticsearch.xpack.core.watcher.condition.ExecutableCondition; +import org.elasticsearch.xpack.core.watcher.transform.ExecutableTransform; +import org.elasticsearch.xpack.core.watcher.trigger.Trigger; +import org.elasticsearch.xpack.core.watcher.watch.Watch; +import org.elasticsearch.xpack.watcher.condition.InternalAlwaysCondition; +import org.elasticsearch.xpack.watcher.input.none.ExecutableNoneInput; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import static org.hamcrest.Matchers.is; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class TriggerServiceTests extends ESTestCase { + + private static final String ENGINE_TYPE = "foo"; + + public void testStats() { + TriggerEngine triggerEngine = mock(TriggerEngine.class); + when(triggerEngine.type()).thenReturn(ENGINE_TYPE); + TriggerService service = new TriggerService(Settings.EMPTY, Collections.singleton(triggerEngine)); + + // simple watch, input and simple action + Watch watch1 = createWatch("1"); + setMetadata(watch1); + setInput(watch1); + addAction(watch1, "my_action", null, null); + service.add(watch1); + + Counters stats = service.stats(); + assertThat(stats.size(), is(20L)); + assertThat(stats.get("watch.input.none.active"), is(1L)); + assertThat(stats.get("watch.input._all.active"), is(1L)); + assertThat(stats.get("watch.condition.always.active"), is(1L)); + assertThat(stats.get("watch.condition._all.active"), is(1L)); + assertThat(stats.get("watch.action.my_action.active"), is(1L)); + assertThat(stats.get("watch.action._all.active"), is(1L)); + assertThat(stats.get("watch.metadata.active"), is(1L)); + assertThat(stats.get("watch.metadata.total"), is(1L)); + assertThat(stats.get("count.active"), is(1L)); + assertThat(stats.get("count.total"), is(1L)); + + Watch watch2 = createWatch("2"); + setInput(watch2); + setCondition(watch2, "script"); + addAction(watch2, "my_action", "script", null); + service.add(watch2); + + stats = service.stats(); + assertThat(stats.size(), is(26L)); + assertThat(stats.get("watch.input.none.active"), is(2L)); + assertThat(stats.get("watch.input._all.active"), is(2L)); + assertThat(stats.get("watch.condition.script.active"), is(1L)); + assertThat(stats.get("watch.condition.always.active"), is(1L)); + assertThat(stats.get("watch.condition._all.active"), is(2L)); + assertThat(stats.get("watch.action.my_action.active"), is(2L)); + assertThat(stats.get("watch.action._all.active"), is(2L)); + assertThat(stats.get("watch.action.condition.script.active"), is(1L)); + assertThat(stats.get("watch.action.condition._all.active"), is(1L)); + assertThat(stats.get("watch.metadata.active"), is(1L)); + assertThat(stats.get("count.active"), is(2L)); + + service.remove("1"); + stats = service.stats(); + assertThat(stats.size(), is(22L)); + assertThat(stats.get("count.active"), is(1L)); + assertThat(stats.get("watch.input.none.active"), is(1L)); + assertThat(stats.get("watch.input._all.active"), is(1L)); + assertThat(stats.get("watch.condition.script.active"), is(1L)); + assertThat(stats.get("watch.condition._all.active"), is(1L)); + assertThat(stats.get("watch.action.my_action.active"), is(1L)); + assertThat(stats.get("watch.action._all.active"), is(1L)); + assertThat(stats.get("watch.action.condition.script.active"), is(1L)); + assertThat(stats.get("watch.action.condition._all.active"), is(1L)); + + service.remove("2"); + stats = service.stats(); + assertThat(stats.size(), is(6L)); + assertThat(stats.get("count.active"), is(0L)); + assertThat(stats.get("count.total"), is(0L)); + } + + private Watch createWatch(String id) { + Watch watch = mock(Watch.class); + when(watch.id()).thenReturn(id); + Trigger trigger = mock(Trigger.class); + when(trigger.type()).thenReturn(ENGINE_TYPE); + when(watch.trigger()).thenReturn(trigger); + when(watch.condition()).thenReturn(InternalAlwaysCondition.INSTANCE); + return watch; + } + + private void setInput(Watch watch) { + ExecutableNoneInput noneInput = new ExecutableNoneInput(logger); + when(watch.input()).thenReturn(noneInput); + } + + private void setMetadata(Watch watch) { + Map metadata = Collections.singletonMap("foo", "bar"); + when(watch.metadata()).thenReturn(metadata); + } + + private void setCondition(Watch watch, String type) { + ExecutableCondition condition = mock(ExecutableCondition.class); + when(condition.type()).thenReturn(type); + when(watch.condition()).thenReturn(condition); + } + + private void addAction(Watch watch, String type, String condition, String transform) { + List actions = watch.actions(); + ArrayList newActions = new ArrayList<>(actions); + ActionWrapper actionWrapper = mock(ActionWrapper.class); + ExecutableAction executableAction = mock(ExecutableAction.class); + when(executableAction.type()).thenReturn(type); + if (condition != null) { + ExecutableCondition executableCondition = mock(ExecutableCondition.class); + when(executableCondition.type()).thenReturn(condition); + when(actionWrapper.condition()).thenReturn(executableCondition); + } + if (transform != null) { + ExecutableTransform executableTransform = mock(ExecutableTransform.class); + when(executableTransform.type()).thenReturn(transform); + when(actionWrapper.transform()).thenReturn(executableTransform); + } + when(actionWrapper.action()).thenReturn(executableAction); + newActions.add(actionWrapper); + when(watch.actions()).thenReturn(newActions); + } + + private void setTransform(Watch watch, String type) { + ExecutableTransform transform = mock(ExecutableTransform.class); + when(transform.type()).thenReturn(type); + when(watch.transform()).thenReturn(transform); + } +} \ No newline at end of file