Watcher: Ensure usage stats work properly in distributed environment (elastic/x-pack-elasticsearch#4094)

This adds back usage stats by pickybacking on the watcher stats, which
are already running distributed in order to collect and merge watcher
statistics.

In order to be able to track statistics, we need to add information for
each watch in an in-memory data structure that is processed whenever a
usage request is coming in. This processing creates a number of counters
for each node, which then are merged together in the usage stats.

relates elastic/x-pack-elasticsearch#4071

Original commit: elastic/x-pack-elasticsearch@c8bfed288f
This commit is contained in:
Alexander Reelsen 2018-03-15 10:28:03 -07:00 committed by GitHub
parent 783cabbd2f
commit 92379ca9af
19 changed files with 747 additions and 62 deletions

View File

@ -3,21 +3,26 @@
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.watcher.common.stats;
package org.elasticsearch.xpack.core.watcher.common.stats;
import com.carrotsearch.hppc.ObjectLongHashMap;
import com.carrotsearch.hppc.cursors.ObjectLongCursor;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* Helper class to create simple usage stat counters based on longs
* Internally this is a map mapping from String to a long, which is the counter
* Calling toMap() will create a nested map, where each dot of the key name will nest deeper
* Calling toNestedMap() will create a nested map, where each dot of the key name will nest deeper
* The main reason for this class is that the stats producer should not be worried about how the map is actually nested
*/
public class Counters {
public class Counters implements Streamable {
private ObjectLongHashMap<String> counters = new ObjectLongHashMap<>();
@ -52,12 +57,24 @@ public class Counters {
counters.addTo(name, count);
}
public long get(String name) {
return counters.get(name);
}
public long size() {
return counters.size();
}
public boolean hasCounters() {
return size() > 0;
}
/**
* Convert the counters to a nested map, using the "." as a splitter to create deeper maps
* @return A nested map with all the current configured counters
*/
@SuppressWarnings("unchecked")
public Map<String, Object> toMap() {
public Map<String, Object> toNestedMap() {
Map<String, Object> map = new HashMap<>();
for (ObjectLongCursor<String> counter : counters) {
if (counter.key.contains(".")) {
@ -84,4 +101,38 @@ public class Counters {
return map;
}
@Override
public void readFrom(StreamInput in) throws IOException {
int counters = in.readVInt();
for (int i = 0; i < counters; i++) {
inc(in.readString(), in.readVLong());
}
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVInt(counters.size());
for (ObjectLongCursor<String> cursor : counters) {
out.writeString(cursor.key);
out.writeVLong(cursor.value);
}
}
public static Counters read(StreamInput in) throws IOException {
Counters counters = new Counters();
counters.readFrom(in);
return counters;
}
public static Counters merge(List<Counters> counters) {
Counters result = new Counters();
for (Counters c : counters) {
for (ObjectLongCursor<String> cursor : c.counters) {
result.inc(cursor.key, cursor.value);
}
}
return result;
}
}

View File

@ -20,6 +20,7 @@ public class WatcherStatsRequest extends BaseNodesRequest<WatcherStatsRequest> {
private boolean includeCurrentWatches;
private boolean includeQueuedWatches;
private boolean includeStats;
public WatcherStatsRequest() {
}
@ -40,6 +41,14 @@ public class WatcherStatsRequest extends BaseNodesRequest<WatcherStatsRequest> {
this.includeQueuedWatches = includeQueuedWatches;
}
public boolean includeStats() {
return includeStats;
}
public void includeStats(boolean includeStats) {
this.includeStats = includeStats;
}
@Override
public ActionRequestValidationException validate() {
return null;
@ -50,6 +59,7 @@ public class WatcherStatsRequest extends BaseNodesRequest<WatcherStatsRequest> {
super.readFrom(in);
includeCurrentWatches = in.readBoolean();
includeQueuedWatches = in.readBoolean();
includeStats = in.readBoolean();
}
@Override
@ -57,6 +67,7 @@ public class WatcherStatsRequest extends BaseNodesRequest<WatcherStatsRequest> {
super.writeTo(out);
out.writeBoolean(includeCurrentWatches);
out.writeBoolean(includeQueuedWatches);
out.writeBoolean(includeStats);
}
@Override
@ -68,6 +79,7 @@ public class WatcherStatsRequest extends BaseNodesRequest<WatcherStatsRequest> {
private boolean includeCurrentWatches;
private boolean includeQueuedWatches;
private boolean includeStats;
public Node() {}
@ -75,6 +87,7 @@ public class WatcherStatsRequest extends BaseNodesRequest<WatcherStatsRequest> {
super(nodeId);
includeCurrentWatches = request.includeCurrentWatches();
includeQueuedWatches = request.includeQueuedWatches();
includeStats = request.includeStats();
}
public boolean includeCurrentWatches() {
@ -85,11 +98,16 @@ public class WatcherStatsRequest extends BaseNodesRequest<WatcherStatsRequest> {
return includeQueuedWatches;
}
public boolean includeStats() {
return includeStats;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
includeCurrentWatches = in.readBoolean();
includeQueuedWatches = in.readBoolean();
includeStats = in.readBoolean();
}
@Override
@ -97,6 +115,7 @@ public class WatcherStatsRequest extends BaseNodesRequest<WatcherStatsRequest> {
super.writeTo(out);
out.writeBoolean(includeCurrentWatches);
out.writeBoolean(includeQueuedWatches);
out.writeBoolean(includeStats);
}
}
}

View File

@ -17,6 +17,7 @@ import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.xpack.core.watcher.WatcherMetaData;
import org.elasticsearch.xpack.core.watcher.WatcherState;
import org.elasticsearch.xpack.core.watcher.common.stats.Counters;
import org.elasticsearch.xpack.core.watcher.execution.QueuedWatch;
import org.elasticsearch.xpack.core.watcher.execution.WatchExecutionSnapshot;
@ -93,6 +94,7 @@ public class WatcherStatsResponse extends BaseNodesResponse<WatcherStatsResponse
private long threadPoolMaxSize;
private List<WatchExecutionSnapshot> snapshots;
private List<QueuedWatch> queuedWatches;
private Counters stats;
public Node() {
}
@ -163,6 +165,14 @@ public class WatcherStatsResponse extends BaseNodesResponse<WatcherStatsResponse
this.queuedWatches = queuedWatches;
}
public Counters getStats() {
return stats;
}
public void setStats(Counters stats) {
this.stats = stats;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
@ -177,6 +187,9 @@ public class WatcherStatsResponse extends BaseNodesResponse<WatcherStatsResponse
if (in.readBoolean()) {
queuedWatches = in.readStreamableList(QueuedWatch::new);
}
if (in.readBoolean()) {
stats = Counters.read(in);
}
}
@Override
@ -187,17 +200,17 @@ public class WatcherStatsResponse extends BaseNodesResponse<WatcherStatsResponse
out.writeLong(threadPoolMaxSize);
out.writeByte(watcherState.getId());
out.writeBoolean(snapshots != null);
if (snapshots != null) {
out.writeBoolean(true);
out.writeStreamableList(snapshots);
} else {
out.writeBoolean(false);
}
out.writeBoolean(queuedWatches != null);
if (queuedWatches != null) {
out.writeBoolean(true);
out.writeStreamableList(queuedWatches);
} else {
out.writeBoolean(false);
}
out.writeBoolean(stats != null);
if (stats != null) {
stats.writeTo(out);
}
}
@ -228,11 +241,14 @@ public class WatcherStatsResponse extends BaseNodesResponse<WatcherStatsResponse
}
builder.endArray();
}
if (stats != null && stats.hasCounters()) {
builder.field("stats", stats.toNestedMap());
}
builder.endObject();
return builder;
}
public static WatcherStatsResponse.Node readNodeResponse(StreamInput in)
static WatcherStatsResponse.Node readNodeResponse(StreamInput in)
throws IOException {
WatcherStatsResponse.Node node = new WatcherStatsResponse.Node();
node.readFrom(in);

View File

@ -3,7 +3,7 @@
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.watcher.common.stats;
package org.elasticsearch.xpack.core.watcher.common.stats;
import org.elasticsearch.test.ESTestCase;
@ -21,7 +21,7 @@ public class CountersTests extends ESTestCase {
counters.inc("foo.bar");
counters.inc("foo.baz");
counters.inc("foo.baz");
Map<String, Object> map = counters.toMap();
Map<String, Object> map = counters.toNestedMap();
assertThat(map, hasEntry("f", 200L));
assertThat(map, hasKey("foo"));
assertThat(map.get("foo"), instanceOf(Map.class));

View File

@ -0,0 +1,61 @@
---
"Test watcher usage stats output":
- do:
catch: missing
xpack.watcher.delete_watch:
id: "usage_stats_watch"
- do: {xpack.usage: {}}
- set: { "watcher.count.active": watch_count_active }
- set: { "watcher.count.total": watch_count_total }
- do:
xpack.watcher.put_watch:
id: "usage_stats_watch"
body: >
{
"trigger": {
"schedule" : { "cron" : "0 0 0 1 * ? 2099" }
},
"input": {
"search" : {
"request" : {
"indices" : [ "my_test_index" ],
"body" :{
"query" : { "match_all": {} }
}
}
}
},
"condition" : {
"compare" : {
"ctx.payload.hits.total" : {
"gte" : 1
}
}
},
"actions": {
"logging": {
"logging": {
"text": "Successfully ran my_watch to test for search input"
}
}
}
}
- match: { _id: "usage_stats_watch" }
- do: {xpack.usage: {}}
- gt: { "watcher.count.active": $watch_count_active }
- gt: { "watcher.count.total": $watch_count_total }
- gte: { "watcher.watch.action._all.active": 1 }
- gte: { "watcher.watch.action.logging.active": 1 }
- gte: { "watcher.watch.condition._all.active": 1 }
- gte: { "watcher.watch.condition.compare.active": 1 }
- gte: { "watcher.watch.input._all.active": 1 }
- gte: { "watcher.watch.input.search.active": 1 }
- gte: { "watcher.watch.trigger._all.active": 1 }
- gte: { "watcher.watch.trigger.schedule.active": 1 }
- gte: { "watcher.watch.trigger.schedule.cron.active": 1 }
- gte: { "watcher.watch.trigger.schedule._all.active": 1 }

View File

@ -6,29 +6,42 @@
package org.elasticsearch.xpack.watcher;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.logging.ESLoggerFactory;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.xpack.core.XPackFeatureSet;
import org.elasticsearch.xpack.core.XPackField;
import org.elasticsearch.xpack.core.XPackSettings;
import org.elasticsearch.xpack.core.watcher.WatcherFeatureSetUsage;
import org.elasticsearch.xpack.core.watcher.client.WatcherClient;
import org.elasticsearch.xpack.core.watcher.common.stats.Counters;
import org.elasticsearch.xpack.core.watcher.transport.actions.stats.WatcherStatsRequest;
import org.elasticsearch.xpack.core.watcher.transport.actions.stats.WatcherStatsResponse;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import static org.elasticsearch.xpack.core.ClientHelper.WATCHER_ORIGIN;
import static org.elasticsearch.xpack.core.ClientHelper.stashWithOrigin;
public class WatcherFeatureSet implements XPackFeatureSet {
private final boolean enabled;
private final XPackLicenseState licenseState;
private final WatcherService watcherService;
private Client client;
@Inject
public WatcherFeatureSet(Settings settings, @Nullable XPackLicenseState licenseState, @Nullable WatcherService watcherService) {
this.watcherService = watcherService;
public WatcherFeatureSet(Settings settings, @Nullable XPackLicenseState licenseState, Client client) {
this.enabled = XPackSettings.WATCHER_ENABLED.get(settings);
this.licenseState = licenseState;
this.client = client;
}
@Override
@ -58,9 +71,24 @@ public class WatcherFeatureSet implements XPackFeatureSet {
@Override
public void usage(ActionListener<XPackFeatureSet.Usage> listener) {
listener.onResponse(
new WatcherFeatureSetUsage(available(), enabled(),
watcherService != null ? watcherService.usageStats() : Collections.emptyMap()));
if (enabled) {
try (ThreadContext.StoredContext ignore =
stashWithOrigin(client.threadPool().getThreadContext(), WATCHER_ORIGIN)) {
WatcherClient watcherClient = new WatcherClient(client);
WatcherStatsRequest request = new WatcherStatsRequest();
request.includeStats(true);
watcherClient.watcherStats(request, ActionListener.wrap(r -> {
List<Counters> countersPerNode = r.getNodes()
.stream()
.map(WatcherStatsResponse.Node::getStats)
.filter(Objects::nonNull)
.collect(Collectors.toList());
Counters mergedCounters = Counters.merge(countersPerNode);
listener.onResponse(new WatcherFeatureSetUsage(available(), enabled(), mergedCounters.toNestedMap()));
}, listener::onFailure));
}
} else {
listener.onResponse(new WatcherFeatureSetUsage(available(), enabled(), Collections.emptyMap()));
}
}
}

View File

@ -319,9 +319,4 @@ public class WatcherService extends AbstractComponent {
public WatcherState state() {
return state.get();
}
public Map<String, Object> usageStats() {
Map<String, Object> innerMap = executionService.usageStats();
return innerMap;
}
}

View File

@ -36,6 +36,7 @@ import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.index.engine.DocumentMissingException;
import org.elasticsearch.xpack.core.watcher.actions.ActionWrapper;
import org.elasticsearch.xpack.core.watcher.actions.ActionWrapperResult;
import org.elasticsearch.xpack.core.watcher.common.stats.Counters;
import org.elasticsearch.xpack.core.watcher.condition.Condition;
import org.elasticsearch.xpack.core.watcher.execution.ExecutionState;
import org.elasticsearch.xpack.core.watcher.execution.QueuedWatch;
@ -49,7 +50,6 @@ import org.elasticsearch.xpack.core.watcher.watch.Watch;
import org.elasticsearch.xpack.core.watcher.watch.WatchField;
import org.elasticsearch.xpack.core.watcher.watch.WatchStatus;
import org.elasticsearch.xpack.watcher.Watcher;
import org.elasticsearch.xpack.watcher.common.stats.Counters;
import org.elasticsearch.xpack.watcher.history.HistoryStore;
import org.elasticsearch.xpack.watcher.watch.WatchParser;
import org.joda.time.DateTime;
@ -524,7 +524,7 @@ public class ExecutionService extends AbstractComponent {
}
}
public Map<String, Object> usageStats() {
public Counters executionTimes() {
Counters counters = new Counters();
counters.inc("execution.actions._all.total", totalExecutionsTime.count());
counters.inc("execution.actions._all.total_time_in_ms", totalExecutionsTime.sum());
@ -534,7 +534,7 @@ public class ExecutionService extends AbstractComponent {
counters.inc("execution.actions." + entry.getKey() + ".total_time_in_ms", entry.getValue().sum());
}
return counters.toMap();
return counters;
}
/**

View File

@ -15,6 +15,7 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.watcher.WatcherMetaData;
import org.elasticsearch.xpack.core.watcher.common.stats.Counters;
import org.elasticsearch.xpack.core.watcher.transport.actions.stats.WatcherStatsAction;
import org.elasticsearch.xpack.core.watcher.transport.actions.stats.WatcherStatsRequest;
import org.elasticsearch.xpack.core.watcher.transport.actions.stats.WatcherStatsResponse;
@ -22,6 +23,7 @@ import org.elasticsearch.xpack.watcher.WatcherService;
import org.elasticsearch.xpack.watcher.execution.ExecutionService;
import org.elasticsearch.xpack.watcher.trigger.TriggerService;
import java.util.Arrays;
import java.util.List;
/**
@ -75,7 +77,10 @@ public class TransportWatcherStatsAction extends TransportNodesAction<WatcherSta
if (request.includeQueuedWatches()) {
statsResponse.setQueuedWatches(executionService.queuedWatches());
}
if (request.includeStats()) {
Counters stats = Counters.merge(Arrays.asList(triggerService.stats(), executionService.executionTimes()));
statsResponse.setStats(stats);
}
statsResponse.setWatchesCount(triggerService.count());
return statsResponse;
}

View File

@ -55,4 +55,5 @@ public interface TriggerEngine<T extends Trigger, E extends TriggerEvent> {
T parseTrigger(String context, XContentParser parser) throws IOException;
E parseTriggerEvent(TriggerService service, String watchId, String context, XContentParser parser) throws IOException;
}

View File

@ -6,9 +6,11 @@
package org.elasticsearch.xpack.watcher.trigger;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.xpack.core.watcher.common.stats.Counters;
import org.elasticsearch.xpack.core.watcher.trigger.Trigger;
import org.elasticsearch.xpack.core.watcher.trigger.TriggerEvent;
import org.elasticsearch.xpack.core.watcher.watch.Watch;
@ -29,6 +31,7 @@ public class TriggerService extends AbstractComponent {
private final GroupedConsumer consumer = new GroupedConsumer();
private final Map<String, TriggerEngine> engines;
private final Map<String, TriggerWatchStats> perWatchStats = new HashMap<>();
public TriggerService(Settings settings, Set<TriggerEngine> engines) {
super(settings);
@ -40,16 +43,18 @@ public class TriggerService extends AbstractComponent {
this.engines = unmodifiableMap(builder);
}
public synchronized void start(Collection<Watch> watches) throws Exception {
public synchronized void start(Collection<Watch> watches) {
for (TriggerEngine engine : engines.values()) {
engine.start(watches);
}
watches.forEach(this::addToStats);
}
public synchronized void stop() {
for (TriggerEngine engine : engines.values()) {
engine.stop();
}
perWatchStats.clear();
}
/**
@ -60,11 +65,77 @@ public class TriggerService extends AbstractComponent {
}
/**
* Count the total number of active jobs across all trigger engines
* @return The total count of active jobs
* create statistics for a single watch, and store it in a local map
* allowing for easy deletion in case the watch gets removed from the trigger service
*/
public long count() {
return engines.values().stream().mapToInt(TriggerEngine::getJobCount).sum();
private void addToStats(Watch watch) {
TriggerWatchStats watchStats = TriggerWatchStats.create(watch);
perWatchStats.put(watch.id(), watchStats);
}
/**
* Returns some statistics about the watches loaded in the trigger service
* @return a set of counters containing statistics
*/
public Counters stats() {
Counters counters = new Counters();
// for bwc reasons, active/total contain the same values
int watchCount = perWatchStats.size();
counters.inc("count.active", watchCount);
counters.inc("count.total", watchCount);
counters.inc("watch.trigger._all.active", watchCount);
counters.inc("watch.trigger._all.total", watchCount);
counters.inc("watch.input._all.total", watchCount);
counters.inc("watch.input._all.active", watchCount);
perWatchStats.values().forEach(stats -> {
if (stats.metadata) {
counters.inc("watch.metadata.active");
counters.inc("watch.metadata.total");
}
counters.inc("watch.trigger." + stats.triggerType + ".total");
counters.inc("watch.trigger." + stats.triggerType + ".active");
if (Strings.isNullOrEmpty(stats.scheduleType) == false) {
counters.inc("watch.trigger.schedule." + stats.scheduleType + ".total");
counters.inc("watch.trigger.schedule." + stats.scheduleType + ".active");
counters.inc("watch.trigger.schedule._all.total");
counters.inc("watch.trigger.schedule._all.active");
}
counters.inc("watch.input." + stats.inputType + ".active");
counters.inc("watch.input." + stats.inputType + ".total");
counters.inc("watch.condition." + stats.conditionType + ".active");
counters.inc("watch.condition." + stats.conditionType + ".total");
counters.inc("watch.condition._all.total");
counters.inc("watch.condition._all.active");
if (Strings.isNullOrEmpty(stats.transformType) == false) {
counters.inc("watch.transform." + stats.transformType + ".active");
counters.inc("watch.transform." + stats.transformType + ".total");
counters.inc("watch.transform._all.active");
counters.inc("watch.transform._all.total");
}
for (TriggerWatchStats.ActionStats action : stats.actions) {
counters.inc("watch.action." + action.actionType + ".active");
counters.inc("watch.action." + action.actionType + ".total");
counters.inc("watch.action._all.active");
counters.inc("watch.action._all.total");
if (Strings.isNullOrEmpty(action.conditionType) == false) {
counters.inc("watch.action.condition." + action.conditionType + ".active");
counters.inc("watch.action.condition." + action.conditionType + ".total");
counters.inc("watch.action.condition._all.active");
counters.inc("watch.action.condition._all.total");
}
if (Strings.isNullOrEmpty(action.transformType) == false) {
counters.inc("watch.action.transform." + action.transformType + ".active");
counters.inc("watch.action.transform." + action.transformType + ".total");
counters.inc("watch.action.transform._all.active");
counters.inc("watch.action.transform._all.total");
}
}
});
return counters;
}
/**
@ -75,6 +146,7 @@ public class TriggerService extends AbstractComponent {
*/
public void add(Watch watch) {
engines.get(watch.trigger().type()).add(watch);
addToStats(watch);
}
/**
@ -84,6 +156,7 @@ public class TriggerService extends AbstractComponent {
* @return {@code true} if the job existed and removed, {@code false} otherwise.
*/
public boolean remove(String jobName) {
perWatchStats.remove(jobName);
for (TriggerEngine engine : engines.values()) {
if (engine.remove(jobName)) {
return true;
@ -166,6 +239,10 @@ public class TriggerService extends AbstractComponent {
return engine.parseTriggerEvent(this, watchId, context, parser);
}
public long count() {
return perWatchStats.size();
}
static class GroupedConsumer implements java.util.function.Consumer<Iterable<TriggerEvent>> {
private List<Consumer<Iterable<TriggerEvent>>> consumers = new CopyOnWriteArrayList<>();

View File

@ -0,0 +1,69 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.watcher.trigger;
import org.elasticsearch.xpack.core.watcher.actions.ActionWrapper;
import org.elasticsearch.xpack.core.watcher.watch.Watch;
import org.elasticsearch.xpack.watcher.trigger.schedule.ScheduleTrigger;
public class TriggerWatchStats {
public final boolean metadata;
public final String triggerType;
public final String scheduleType;
public final String inputType;
public final String conditionType;
public final String transformType;
public final ActionStats[] actions;
private TriggerWatchStats(boolean metadata, String triggerType, String scheduleType, String inputType,
String conditionType, String transformType, ActionStats[] actions) {
this.metadata = metadata;
this.triggerType = triggerType;
this.scheduleType = scheduleType;
this.inputType = inputType;
this.conditionType = conditionType;
this.transformType = transformType;
this.actions = actions;
}
public static final class ActionStats {
public final String actionType;
public final String transformType;
public final String conditionType;
public ActionStats(String actionType, String transformType, String conditionType) {
this.actionType = actionType;
this.transformType = transformType;
this.conditionType = conditionType;
}
}
public static TriggerWatchStats create(Watch watch) {
final boolean metadata = watch.metadata() != null && watch.metadata().isEmpty() == false;
final String triggerType = watch.trigger().type();
String scheduleTriggerType = null;
if (ScheduleTrigger.TYPE.equals(watch.trigger().type())) {
ScheduleTrigger scheduleTrigger = (ScheduleTrigger) watch.trigger();
scheduleTriggerType = scheduleTrigger.getSchedule().type();
}
final String inputType = watch.input().type();
final String conditionType = watch.condition().type();
final String transformType = watch.transform() != null ? watch.transform().type() : null;
final ActionStats[] actionStats = new ActionStats[watch.actions().size()];
int i = 0;
for (ActionWrapper actionWrapper : watch.actions()) {
String transform = actionWrapper.transform() != null ? actionWrapper.transform().type() : null;
String condition = actionWrapper.condition() != null ? actionWrapper.condition().type() : null;
String type = actionWrapper.action().type();
actionStats[i++] = new ActionStats(type, transform, condition);
}
return new TriggerWatchStats(metadata, triggerType, scheduleTriggerType, inputType,
conditionType, transformType, actionStats);
}
}

View File

@ -65,10 +65,10 @@ public class TickerScheduleTriggerEngine extends ScheduleTriggerEngine {
}
@Override
public void add(Watch job) {
assert job.trigger() instanceof ScheduleTrigger;
ScheduleTrigger trigger = (ScheduleTrigger) job.trigger();
schedules.put(job.id(), new ActiveSchedule(job.id(), trigger.getSchedule(), clock.millis()));
public void add(Watch watch) {
assert watch.trigger() instanceof ScheduleTrigger;
ScheduleTrigger trigger = (ScheduleTrigger) watch.trigger();
schedules.put(watch.id(), new ActiveSchedule(watch.id(), trigger.getSchedule(), clock.millis()));
}
@Override

View File

@ -5,48 +5,68 @@
*/
package org.elasticsearch.xpack.watcher;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.XPackFeatureSet;
import org.elasticsearch.xpack.core.watcher.WatcherFeatureSetUsage;
import org.elasticsearch.xpack.core.watcher.WatcherMetaData;
import org.elasticsearch.xpack.core.watcher.common.stats.Counters;
import org.elasticsearch.xpack.core.watcher.support.xcontent.ObjectPath;
import org.elasticsearch.xpack.core.watcher.support.xcontent.XContentSource;
import org.elasticsearch.xpack.core.watcher.transport.actions.stats.WatcherStatsAction;
import org.elasticsearch.xpack.core.watcher.transport.actions.stats.WatcherStatsResponse;
import org.junit.Before;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Collections;
import java.util.List;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.hamcrest.Matchers.hasEntry;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.core.Is.is;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class WatcherFeatureSetTests extends ESTestCase {
private XPackLicenseState licenseState;
private WatcherService watcherService;
private Client client;
@Before
public void init() throws Exception {
licenseState = mock(XPackLicenseState.class);
watcherService = mock(WatcherService.class);
client = mock(Client.class);
ThreadPool threadPool = mock(ThreadPool.class);
ThreadContext threadContext = new ThreadContext(Settings.EMPTY);
when(threadPool.getThreadContext()).thenReturn(threadContext);
when(client.threadPool()).thenReturn(threadPool);
}
public void testAvailable() throws Exception {
WatcherFeatureSet featureSet = new WatcherFeatureSet(Settings.EMPTY, licenseState, watcherService);
public void testAvailable() {
WatcherFeatureSet featureSet = new WatcherFeatureSet(Settings.EMPTY, licenseState, client);
boolean available = randomBoolean();
when(licenseState.isWatcherAllowed()).thenReturn(available);
assertThat(featureSet.available(), is(available));
}
public void testEnabled() throws Exception {
public void testEnabled() {
boolean enabled = randomBoolean();
Settings.Builder settings = Settings.builder();
if (enabled) {
@ -56,18 +76,47 @@ public class WatcherFeatureSetTests extends ESTestCase {
} else {
settings.put("xpack.watcher.enabled", enabled);
}
WatcherFeatureSet featureSet = new WatcherFeatureSet(settings.build(), licenseState, watcherService);
WatcherFeatureSet featureSet = new WatcherFeatureSet(settings.build(), licenseState, client);
assertThat(featureSet.enabled(), is(enabled));
}
public void testUsageStats() throws Exception {
Map<String, Object> statsMap = new HashMap<>();
statsMap.put("foo", "bar");
when(watcherService.usageStats()).thenReturn(statsMap);
doAnswer(mock -> {
ActionListener<WatcherStatsResponse> listener =
(ActionListener<WatcherStatsResponse>) mock.getArguments()[2];
PlainActionFuture<XPackFeatureSet.Usage> future = new PlainActionFuture<>();
new WatcherFeatureSet(Settings.EMPTY, licenseState, watcherService).usage(future);
XPackFeatureSet.Usage watcherUsage = future.get();
List<WatcherStatsResponse.Node> nodes = new ArrayList<>();
DiscoveryNode first = new DiscoveryNode("first", buildNewFakeTransportAddress(), Version.CURRENT);
WatcherStatsResponse.Node firstNode = new WatcherStatsResponse.Node(first);
Counters firstCounters = new Counters();
firstCounters.inc("foo.foo", 1);
firstCounters.inc("foo.bar.baz", 1);
firstNode.setStats(firstCounters);
nodes.add(firstNode);
DiscoveryNode second = new DiscoveryNode("second", buildNewFakeTransportAddress(), Version.CURRENT);
WatcherStatsResponse.Node secondNode = new WatcherStatsResponse.Node(second);
Counters secondCounters = new Counters();
secondCounters.inc("spam", 1);
secondCounters.inc("foo.bar.baz", 4);
secondNode.setStats(secondCounters);
nodes.add(secondNode);
listener.onResponse(new WatcherStatsResponse(new ClusterName("whatever"), new WatcherMetaData(false),
nodes, Collections.emptyList()));
return null;
}).when(client).execute(eq(WatcherStatsAction.INSTANCE), any(), any());
PlainActionFuture<WatcherFeatureSet.Usage> future = new PlainActionFuture<>();
new WatcherFeatureSet(Settings.EMPTY, licenseState, client).usage(future);
WatcherFeatureSetUsage watcherUsage = (WatcherFeatureSetUsage) future.get();
assertThat(watcherUsage.stats().keySet(), containsInAnyOrder("foo", "spam"));
long fooBarBaz = ObjectPath.eval("foo.bar.baz", watcherUsage.stats());
assertThat(fooBarBaz, is(5L));
long fooFoo = ObjectPath.eval("foo.foo", watcherUsage.stats());
assertThat(fooFoo, is(1L));
long spam = ObjectPath.eval("spam", watcherUsage.stats());
assertThat(spam, is(1L));
BytesStreamOutput out = new BytesStreamOutput();
watcherUsage.writeTo(out);
XPackFeatureSet.Usage serializedUsage = new WatcherFeatureSetUsage(out.bytes().streamInput());
@ -77,11 +126,13 @@ public class WatcherFeatureSetTests extends ESTestCase {
usage.toXContent(builder, ToXContent.EMPTY_PARAMS);
XContentSource source = new XContentSource(builder);
assertThat(source.getValue("foo"), is("bar"));
assertThat(source.getValue("foo.bar.baz"), is(5));
assertThat(source.getValue("spam"), is(1));
assertThat(source.getValue("foo.foo"), is(1));
assertThat(usage, instanceOf(WatcherFeatureSetUsage.class));
WatcherFeatureSetUsage featureSetUsage = (WatcherFeatureSetUsage) usage;
assertThat(featureSetUsage.stats(), hasEntry("foo", "bar"));
assertThat(featureSetUsage.stats().keySet(), containsInAnyOrder("foo", "spam"));
}
}
}

View File

@ -0,0 +1,52 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.watcher;
import org.elasticsearch.xpack.core.XPackFeatureSet;
import org.elasticsearch.xpack.core.action.XPackUsageAction;
import org.elasticsearch.xpack.core.action.XPackUsageRequest;
import org.elasticsearch.xpack.core.action.XPackUsageResponse;
import org.elasticsearch.xpack.core.watcher.WatcherFeatureSetUsage;
import org.elasticsearch.xpack.watcher.test.AbstractWatcherIntegrationTestCase;
import java.util.Map;
import java.util.Optional;
import static org.elasticsearch.xpack.watcher.actions.ActionBuilders.loggingAction;
import static org.elasticsearch.xpack.watcher.client.WatchSourceBuilders.watchBuilder;
import static org.elasticsearch.xpack.watcher.input.InputBuilders.simpleInput;
import static org.elasticsearch.xpack.watcher.trigger.TriggerBuilders.schedule;
import static org.elasticsearch.xpack.watcher.trigger.schedule.Schedules.cron;
import static org.hamcrest.Matchers.is;
public class WatcherXpackUsageStatsTests extends AbstractWatcherIntegrationTestCase {
// as these tests use three data nodes, those watches will be across two of those
// nodes due to having two watcher shards, so that we can be sure that the count
// was merged
public void testWatcherUsageStatsTests() {
long watchCount = randomLongBetween(5, 20);
for (int i = 0; i < watchCount; i++) {
watcherClient().preparePutWatch("_id" + i).setSource(watchBuilder()
.trigger(schedule(cron("0/5 * * * * ? 2050")))
.input(simpleInput())
.addAction("_id", loggingAction("whatever " + i)))
.get();
}
XPackUsageRequest request = new XPackUsageRequest();
XPackUsageResponse usageResponse = client().execute(XPackUsageAction.INSTANCE, request).actionGet();
Optional<XPackFeatureSet.Usage> usage = usageResponse.getUsages().stream()
.filter(u -> u instanceof WatcherFeatureSetUsage)
.findFirst();
assertThat(usage.isPresent(), is(true));
WatcherFeatureSetUsage featureSetUsage = (WatcherFeatureSetUsage) usage.get();
long activeWatchCount = (long) ((Map) featureSetUsage.stats().get("count")).get("active");
assertThat(activeWatchCount, is(watchCount));
}
}

View File

@ -37,6 +37,7 @@ import org.elasticsearch.xpack.core.watcher.actions.ActionWrapperResult;
import org.elasticsearch.xpack.core.watcher.actions.ExecutableAction;
import org.elasticsearch.xpack.core.watcher.actions.throttler.ActionThrottler;
import org.elasticsearch.xpack.core.watcher.actions.throttler.Throttler;
import org.elasticsearch.xpack.core.watcher.common.stats.Counters;
import org.elasticsearch.xpack.core.watcher.condition.Condition;
import org.elasticsearch.xpack.core.watcher.condition.ExecutableCondition;
import org.elasticsearch.xpack.core.watcher.execution.ExecutionPhase;
@ -49,7 +50,6 @@ import org.elasticsearch.xpack.core.watcher.history.WatchRecord;
import org.elasticsearch.xpack.core.watcher.input.ExecutableInput;
import org.elasticsearch.xpack.core.watcher.input.Input;
import org.elasticsearch.xpack.core.watcher.support.xcontent.ObjectPath;
import org.elasticsearch.xpack.core.watcher.support.xcontent.XContentSource;
import org.elasticsearch.xpack.core.watcher.transform.ExecutableTransform;
import org.elasticsearch.xpack.core.watcher.transform.Transform;
import org.elasticsearch.xpack.core.watcher.trigger.TriggerEvent;
@ -84,7 +84,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
import static java.util.Arrays.asList;
import static java.util.Collections.singletonMap;
import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.instanceOf;
@ -249,11 +248,11 @@ public class ExecutionServiceTests extends ESTestCase {
assertThat(watchRecord.result().executionTime(), is(notNullValue()));
// test stats
XContentSource source = new XContentSource(jsonBuilder().map(executionService.usageStats()));
assertThat(source.getValue("execution.actions._all.total_time_in_ms"), is(notNullValue()));
assertThat(source.getValue("execution.actions._all.total"), is(1));
assertThat(source.getValue("execution.actions.MY_AWESOME_TYPE.total_time_in_ms"), is(notNullValue()));
assertThat(source.getValue("execution.actions.MY_AWESOME_TYPE.total"), is(1));
Counters counters = executionService.executionTimes();
assertThat(counters.get("execution.actions._all.total_time_in_ms"), is(notNullValue()));
assertThat(counters.get("execution.actions._all.total"), is(1L));
assertThat(counters.get("execution.actions.MY_AWESOME_TYPE.total_time_in_ms"), is(notNullValue()));
assertThat(counters.get("execution.actions.MY_AWESOME_TYPE.total"), is(1L));
}
public void testExecuteFailedInput() throws Exception {

View File

@ -0,0 +1,110 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.watcher.transport.actions.stats;
import org.elasticsearch.Version;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.rest.yaml.ObjectPath;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.watcher.WatcherState;
import org.elasticsearch.xpack.core.watcher.common.stats.Counters;
import org.elasticsearch.xpack.core.watcher.transport.actions.stats.WatcherStatsRequest;
import org.elasticsearch.xpack.core.watcher.transport.actions.stats.WatcherStatsResponse;
import org.elasticsearch.xpack.watcher.WatcherService;
import org.elasticsearch.xpack.watcher.execution.ExecutionService;
import org.elasticsearch.xpack.watcher.trigger.TriggerService;
import org.junit.Before;
import java.util.Arrays;
import java.util.Collections;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.hamcrest.Matchers.is;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class TransportWatcherStatsActionTests extends ESTestCase {
private TransportWatcherStatsAction action;
@Before
public void setupTransportAction() {
TransportService transportService = mock(TransportService.class);
ThreadPool threadPool = mock(ThreadPool.class);
ClusterService clusterService = mock(ClusterService.class);
DiscoveryNode discoveryNode = new DiscoveryNode("nodeId", buildNewFakeTransportAddress(), Version.CURRENT);
when(clusterService.localNode()).thenReturn(discoveryNode);
ClusterName clusterName = new ClusterName("cluster_name");
when(clusterService.getClusterName()).thenReturn(clusterName);
ClusterState clusterState = mock(ClusterState.class);
when(clusterState.getMetaData()).thenReturn(MetaData.EMPTY_META_DATA);
when(clusterService.state()).thenReturn(clusterState);
WatcherService watcherService = mock(WatcherService.class);
when(watcherService.state()).thenReturn(WatcherState.STARTED);
ExecutionService executionService = mock(ExecutionService.class);
when(executionService.executionThreadPoolQueueSize()).thenReturn(100L);
when(executionService.executionThreadPoolMaxSize()).thenReturn(5L);
Counters firstExecutionCounters = new Counters();
firstExecutionCounters.inc("spam.eggs", 1);
Counters secondExecutionCounters = new Counters();
secondExecutionCounters.inc("whatever", 1);
secondExecutionCounters.inc("foo.bar.baz", 123);
when(executionService.executionTimes()).thenReturn(firstExecutionCounters, secondExecutionCounters);
TriggerService triggerService = mock(TriggerService.class);
when(triggerService.count()).thenReturn(10L, 30L);
Counters firstTriggerServiceStats = new Counters();
firstTriggerServiceStats.inc("foo.bar.baz", 1024);
Counters secondTriggerServiceStats = new Counters();
secondTriggerServiceStats.inc("foo.bar.baz", 1024);
when(triggerService.stats()).thenReturn(firstTriggerServiceStats, secondTriggerServiceStats);
action = new TransportWatcherStatsAction(Settings.EMPTY, transportService,
clusterService, threadPool, new ActionFilters(Collections.emptySet()),
new IndexNameExpressionResolver(Settings.EMPTY), watcherService, executionService, triggerService);
}
public void testWatcherStats() throws Exception {
WatcherStatsRequest request = new WatcherStatsRequest();
request.includeStats(true);
WatcherStatsResponse.Node nodeResponse1 = action.nodeOperation(new WatcherStatsRequest.Node(request, "nodeId"));
WatcherStatsResponse.Node nodeResponse2 = action.nodeOperation(new WatcherStatsRequest.Node(request, "nodeId2"));
WatcherStatsResponse response = action.newResponse(request,
Arrays.asList(nodeResponse1, nodeResponse2), Collections.emptyList());
assertThat(response.getWatchesCount(), is(40L));
try (XContentBuilder builder = jsonBuilder()) {
builder.startObject();
response.toXContent(builder, ToXContent.EMPTY_PARAMS);
builder.endObject();
ObjectPath objectPath = ObjectPath.createFromXContent(JsonXContent.jsonXContent, BytesReference.bytes(builder));
assertThat(objectPath.evaluate("stats.0.stats.foo.bar.baz"), is(1024));
assertThat(objectPath.evaluate("stats.1.stats.foo.bar.baz"), is(1147));
assertThat(objectPath.evaluate("stats.0.stats.spam.eggs"), is(1));
assertThat(objectPath.evaluate("stats.1.stats.whatever"), is(1));
}
}
}

View File

@ -10,6 +10,7 @@ import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.xpack.core.watcher.common.stats.Counters;
import org.elasticsearch.xpack.core.watcher.watch.ClockMock;
import org.elasticsearch.xpack.core.watcher.watch.Watch;
import org.elasticsearch.xpack.watcher.trigger.schedule.ScheduleRegistry;

View File

@ -0,0 +1,150 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.watcher.trigger;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.core.watcher.actions.ActionWrapper;
import org.elasticsearch.xpack.core.watcher.actions.ExecutableAction;
import org.elasticsearch.xpack.core.watcher.common.stats.Counters;
import org.elasticsearch.xpack.core.watcher.condition.ExecutableCondition;
import org.elasticsearch.xpack.core.watcher.transform.ExecutableTransform;
import org.elasticsearch.xpack.core.watcher.trigger.Trigger;
import org.elasticsearch.xpack.core.watcher.watch.Watch;
import org.elasticsearch.xpack.watcher.condition.InternalAlwaysCondition;
import org.elasticsearch.xpack.watcher.input.none.ExecutableNoneInput;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import static org.hamcrest.Matchers.is;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class TriggerServiceTests extends ESTestCase {
private static final String ENGINE_TYPE = "foo";
public void testStats() {
TriggerEngine triggerEngine = mock(TriggerEngine.class);
when(triggerEngine.type()).thenReturn(ENGINE_TYPE);
TriggerService service = new TriggerService(Settings.EMPTY, Collections.singleton(triggerEngine));
// simple watch, input and simple action
Watch watch1 = createWatch("1");
setMetadata(watch1);
setInput(watch1);
addAction(watch1, "my_action", null, null);
service.add(watch1);
Counters stats = service.stats();
assertThat(stats.size(), is(20L));
assertThat(stats.get("watch.input.none.active"), is(1L));
assertThat(stats.get("watch.input._all.active"), is(1L));
assertThat(stats.get("watch.condition.always.active"), is(1L));
assertThat(stats.get("watch.condition._all.active"), is(1L));
assertThat(stats.get("watch.action.my_action.active"), is(1L));
assertThat(stats.get("watch.action._all.active"), is(1L));
assertThat(stats.get("watch.metadata.active"), is(1L));
assertThat(stats.get("watch.metadata.total"), is(1L));
assertThat(stats.get("count.active"), is(1L));
assertThat(stats.get("count.total"), is(1L));
Watch watch2 = createWatch("2");
setInput(watch2);
setCondition(watch2, "script");
addAction(watch2, "my_action", "script", null);
service.add(watch2);
stats = service.stats();
assertThat(stats.size(), is(26L));
assertThat(stats.get("watch.input.none.active"), is(2L));
assertThat(stats.get("watch.input._all.active"), is(2L));
assertThat(stats.get("watch.condition.script.active"), is(1L));
assertThat(stats.get("watch.condition.always.active"), is(1L));
assertThat(stats.get("watch.condition._all.active"), is(2L));
assertThat(stats.get("watch.action.my_action.active"), is(2L));
assertThat(stats.get("watch.action._all.active"), is(2L));
assertThat(stats.get("watch.action.condition.script.active"), is(1L));
assertThat(stats.get("watch.action.condition._all.active"), is(1L));
assertThat(stats.get("watch.metadata.active"), is(1L));
assertThat(stats.get("count.active"), is(2L));
service.remove("1");
stats = service.stats();
assertThat(stats.size(), is(22L));
assertThat(stats.get("count.active"), is(1L));
assertThat(stats.get("watch.input.none.active"), is(1L));
assertThat(stats.get("watch.input._all.active"), is(1L));
assertThat(stats.get("watch.condition.script.active"), is(1L));
assertThat(stats.get("watch.condition._all.active"), is(1L));
assertThat(stats.get("watch.action.my_action.active"), is(1L));
assertThat(stats.get("watch.action._all.active"), is(1L));
assertThat(stats.get("watch.action.condition.script.active"), is(1L));
assertThat(stats.get("watch.action.condition._all.active"), is(1L));
service.remove("2");
stats = service.stats();
assertThat(stats.size(), is(6L));
assertThat(stats.get("count.active"), is(0L));
assertThat(stats.get("count.total"), is(0L));
}
private Watch createWatch(String id) {
Watch watch = mock(Watch.class);
when(watch.id()).thenReturn(id);
Trigger trigger = mock(Trigger.class);
when(trigger.type()).thenReturn(ENGINE_TYPE);
when(watch.trigger()).thenReturn(trigger);
when(watch.condition()).thenReturn(InternalAlwaysCondition.INSTANCE);
return watch;
}
private void setInput(Watch watch) {
ExecutableNoneInput noneInput = new ExecutableNoneInput(logger);
when(watch.input()).thenReturn(noneInput);
}
private void setMetadata(Watch watch) {
Map<String, Object> metadata = Collections.singletonMap("foo", "bar");
when(watch.metadata()).thenReturn(metadata);
}
private void setCondition(Watch watch, String type) {
ExecutableCondition condition = mock(ExecutableCondition.class);
when(condition.type()).thenReturn(type);
when(watch.condition()).thenReturn(condition);
}
private void addAction(Watch watch, String type, String condition, String transform) {
List<ActionWrapper> actions = watch.actions();
ArrayList<ActionWrapper> newActions = new ArrayList<>(actions);
ActionWrapper actionWrapper = mock(ActionWrapper.class);
ExecutableAction executableAction = mock(ExecutableAction.class);
when(executableAction.type()).thenReturn(type);
if (condition != null) {
ExecutableCondition executableCondition = mock(ExecutableCondition.class);
when(executableCondition.type()).thenReturn(condition);
when(actionWrapper.condition()).thenReturn(executableCondition);
}
if (transform != null) {
ExecutableTransform executableTransform = mock(ExecutableTransform.class);
when(executableTransform.type()).thenReturn(transform);
when(actionWrapper.transform()).thenReturn(executableTransform);
}
when(actionWrapper.action()).thenReturn(executableAction);
newActions.add(actionWrapper);
when(watch.actions()).thenReturn(newActions);
}
private void setTransform(Watch watch, String type) {
ExecutableTransform transform = mock(ExecutableTransform.class);
when(transform.type()).thenReturn(type);
when(watch.transform()).thenReturn(transform);
}
}