diff --git a/rest-api-spec/api/watcher.stats.json b/rest-api-spec/api/watcher.stats.json index cd977803a71..3be7eb474c1 100644 --- a/rest-api-spec/api/watcher.stats.json +++ b/rest-api-spec/api/watcher.stats.json @@ -4,10 +4,20 @@ "methods": [ "GET" ], "url": { "path": "/_watcher/stats", - "paths": [ "/_watcher/stats" ], + "paths": [ "/_watcher/stats", "/_watcher/stats/{metric}" ], "parts": { + "metric": { + "type" : "enum", + "options" : ["_all", "queued_watches", "pending_watches"], + "description" : "Controls what additional stat metrics should be include in the response" + } }, "params": { + "metric": { + "type" : "enum", + "options" : ["_all", "queued_watches", "pending_watches"], + "description" : "Controls what additional stat metrics should be include in the response" + } } }, "body": null diff --git a/src/main/java/org/elasticsearch/watcher/WatcherService.java b/src/main/java/org/elasticsearch/watcher/WatcherService.java index 8a957bd8e7f..b2d2d5831c0 100644 --- a/src/main/java/org/elasticsearch/watcher/WatcherService.java +++ b/src/main/java/org/elasticsearch/watcher/WatcherService.java @@ -73,12 +73,12 @@ public class WatcherService extends AbstractComponent { if (state.compareAndSet(WatcherState.STARTED, WatcherState.STOPPING)) { logger.info("stopping watch service..."); triggerService.stop(); + executionService.stop(); try { watchLockService.stop(); } catch (WatchLockService.TimeoutException we) { logger.warn("error stopping WatchLockService", we); } - executionService.stop(); watchStore.stop(); state.set(WatcherState.STOPPED); logger.info("watch service has stopped"); diff --git a/src/main/java/org/elasticsearch/watcher/execution/ExecutionService.java b/src/main/java/org/elasticsearch/watcher/execution/ExecutionService.java index 1b5fb165cf4..756ab5cc1ec 100644 --- a/src/main/java/org/elasticsearch/watcher/execution/ExecutionService.java +++ b/src/main/java/org/elasticsearch/watcher/execution/ExecutionService.java @@ -127,12 +127,33 @@ public class ExecutionService extends AbstractComponent { Collections.sort(currentExecutions, new Comparator() { @Override public int compare(WatchExecutionSnapshot e1, WatchExecutionSnapshot e2) { - return -e1.executionTime().compareTo(e2.executionTime()); + return e1.executionTime().compareTo(e2.executionTime()); } }); return currentExecutions; } + public List queuedWatches() { + List snapshot = new ArrayList<>(executor.queue()); + if (snapshot.isEmpty()) { + return Collections.emptyList(); + } + + List queuedWatches = new ArrayList<>(snapshot.size()); + for (Runnable task : snapshot) { + WatchExecutionTask executionTask = (WatchExecutionTask) task; + queuedWatches.add(new QueuedWatch(executionTask.ctx)); + } + // Lets show the execution that pending the longest first: + Collections.sort(queuedWatches, new Comparator() { + @Override + public int compare(QueuedWatch e1, QueuedWatch e2) { + return e1.executionTime().compareTo(e2.executionTime()); + } + }); + return queuedWatches; + } + void processEventsAsync(Iterable events) throws WatcherException { if (!started.get()) { throw new ElasticsearchIllegalStateException("not started"); diff --git a/src/main/java/org/elasticsearch/watcher/execution/QueuedWatch.java b/src/main/java/org/elasticsearch/watcher/execution/QueuedWatch.java new file mode 100644 index 00000000000..81c4da2d2f2 --- /dev/null +++ b/src/main/java/org/elasticsearch/watcher/execution/QueuedWatch.java @@ -0,0 +1,95 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.watcher.execution; + +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Streamable; +import org.elasticsearch.common.joda.time.DateTime; +import org.elasticsearch.common.joda.time.DateTimeZone; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentBuilder; + +import java.io.IOException; + +public class QueuedWatch implements Streamable, ToXContent { + + private String watchId; + private String watchRecordId; + private DateTime triggeredTime; + private DateTime executionTime; + + public QueuedWatch(WatchExecutionContext ctx) { + this.watchId = ctx.watch().id(); + this.watchRecordId = ctx.id().value(); + this.triggeredTime = ctx.triggerEvent().triggeredTime(); + this.executionTime = ctx.executionTime(); + } + + public QueuedWatch(StreamInput in) throws IOException { + readFrom(in); + } + + public String watchId() { + return watchId; + } + + public void WatchId(String watchId) { + this.watchId = watchId; + } + + public String watchRecordId() { + return watchRecordId; + } + + public void watchRecordId(String watchRecordId) { + this.watchRecordId = watchRecordId; + } + + public DateTime triggeredTime() { + return triggeredTime; + } + + public void triggeredTime(DateTime triggeredTime) { + this.triggeredTime = triggeredTime; + } + + public DateTime executionTime() { + return executionTime; + } + + public void executionTime(DateTime executionTime) { + this.executionTime = executionTime; + } + + @Override + public void readFrom(StreamInput in) throws IOException { + watchId = in.readString(); + watchRecordId = in.readString(); + triggeredTime = new DateTime(in.readVLong(), DateTimeZone.UTC); + executionTime = new DateTime(in.readVLong(), DateTimeZone.UTC); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeString(watchId); + out.writeString(watchRecordId); + out.writeVLong(triggeredTime.getMillis()); + out.writeVLong(executionTime.getMillis()); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field("watch_id", watchId); + builder.field("watch_record_id", watchRecordId); + builder.field("triggered_time", triggeredTime); + builder.field("execution_time", executionTime); + builder.endObject(); + return builder; + } + +} diff --git a/src/main/java/org/elasticsearch/watcher/rest/action/RestWatcherStatsAction.java b/src/main/java/org/elasticsearch/watcher/rest/action/RestWatcherStatsAction.java index 83eb2a98609..e541f830c72 100644 --- a/src/main/java/org/elasticsearch/watcher/rest/action/RestWatcherStatsAction.java +++ b/src/main/java/org/elasticsearch/watcher/rest/action/RestWatcherStatsAction.java @@ -12,13 +12,11 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.rest.*; import org.elasticsearch.rest.action.support.RestBuilderListener; -import org.elasticsearch.rest.action.support.RestToXContentListener; import org.elasticsearch.watcher.client.WatcherClient; import org.elasticsearch.watcher.rest.WatcherRestHandler; -import org.elasticsearch.watcher.transport.actions.stats.WatcherStatsResponse; import org.elasticsearch.watcher.transport.actions.stats.WatcherStatsRequest; +import org.elasticsearch.watcher.transport.actions.stats.WatcherStatsResponse; -import java.util.Locale; import java.util.Set; import static org.elasticsearch.rest.RestRequest.Method.GET; @@ -38,10 +36,12 @@ public class RestWatcherStatsAction extends WatcherRestHandler { Set metrics = Strings.splitStringByCommaToSet(restRequest.param("metric", "")); WatcherStatsRequest request = new WatcherStatsRequest(); - if (metrics.size() == 1 && metrics.contains("_all")) { + if (metrics.contains("_all")) { request.includeCurrentWatches(true); + request.includePendingWatches(true); } else { - request.includeCurrentWatches(metrics.contains("executing_watches")); + request.includeCurrentWatches(metrics.contains("queued_watches")); + request.includePendingWatches(metrics.contains("pending_watches")); } client.watcherStats(request, new RestBuilderListener(restChannel) { diff --git a/src/main/java/org/elasticsearch/watcher/transport/actions/stats/TransportWatcherStatsAction.java b/src/main/java/org/elasticsearch/watcher/transport/actions/stats/TransportWatcherStatsAction.java index 5283b70d5a8..829b9f61f3e 100644 --- a/src/main/java/org/elasticsearch/watcher/transport/actions/stats/TransportWatcherStatsAction.java +++ b/src/main/java/org/elasticsearch/watcher/transport/actions/stats/TransportWatcherStatsAction.java @@ -69,6 +69,9 @@ public class TransportWatcherStatsAction extends WatcherTransportAction { private boolean includeCurrentWatches; + private boolean includePendingWatches; public WatcherStatsRequest() { } @@ -30,6 +31,14 @@ public class WatcherStatsRequest extends MasterNodeOperationRequest listener) { diff --git a/src/main/java/org/elasticsearch/watcher/transport/actions/stats/WatcherStatsResponse.java b/src/main/java/org/elasticsearch/watcher/transport/actions/stats/WatcherStatsResponse.java index 2b3f3a0b2a0..14d5267d101 100644 --- a/src/main/java/org/elasticsearch/watcher/transport/actions/stats/WatcherStatsResponse.java +++ b/src/main/java/org/elasticsearch/watcher/transport/actions/stats/WatcherStatsResponse.java @@ -14,6 +14,7 @@ import org.elasticsearch.watcher.WatcherState; import org.elasticsearch.watcher.WatcherVersion; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.watcher.execution.QueuedWatch; import org.elasticsearch.watcher.execution.WatchExecutionSnapshot; import java.io.IOException; @@ -31,6 +32,7 @@ public class WatcherStatsResponse extends ActionResponse implements ToXContent { private long watchExecutionQueueMaxSize; private List snapshots; + private List queuedWatches; WatcherStatsResponse() { } @@ -110,6 +112,15 @@ public class WatcherStatsResponse extends ActionResponse implements ToXContent { this.snapshots = snapshots; } + @Nullable + public List getQueuedWatches() { + return queuedWatches; + } + + public void setQueuedWatches(List queuedWatches) { + this.queuedWatches = queuedWatches; + } + @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); @@ -127,6 +138,13 @@ public class WatcherStatsResponse extends ActionResponse implements ToXContent { snapshots.add(new WatchExecutionSnapshot(in)); } } + if (in.readBoolean()) { + int size = in.readVInt(); + queuedWatches = new ArrayList<>(size); + for (int i = 0; i < size; i++) { + queuedWatches.add(new QueuedWatch(in)); + } + } } @Override @@ -148,6 +166,15 @@ public class WatcherStatsResponse extends ActionResponse implements ToXContent { } else { out.writeBoolean(false); } + if (queuedWatches != null) { + out.writeBoolean(true); + out.writeVInt(queuedWatches.size()); + for (QueuedWatch pending : this.queuedWatches) { + pending.writeTo(out); + } + } else { + out.writeBoolean(false); + } } @Override @@ -167,6 +194,13 @@ public class WatcherStatsResponse extends ActionResponse implements ToXContent { } builder.endArray(); } + if (queuedWatches != null) { + builder.startArray("queued_watches"); + for (QueuedWatch queuedWatch : queuedWatches) { + queuedWatch.toXContent(builder, params); + } + builder.endArray(); + } builder.endObject(); return builder; diff --git a/src/test/java/org/elasticsearch/watcher/test/integration/SlowWatchStatsTests.java b/src/test/java/org/elasticsearch/watcher/test/integration/SlowWatchStatsTests.java new file mode 100644 index 00000000000..b22cd58716f --- /dev/null +++ b/src/test/java/org/elasticsearch/watcher/test/integration/SlowWatchStatsTests.java @@ -0,0 +1,110 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.watcher.test.integration; + +import org.apache.lucene.util.LuceneTestCase; +import org.elasticsearch.common.joda.time.DateTime; +import org.elasticsearch.common.settings.ImmutableSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.EsExecutors; +import org.elasticsearch.test.ElasticsearchIntegrationTest; +import org.elasticsearch.watcher.WatcherState; +import org.elasticsearch.watcher.actions.ActionBuilders; +import org.elasticsearch.watcher.condition.ConditionBuilders; +import org.elasticsearch.watcher.execution.ExecutionPhase; +import org.elasticsearch.watcher.execution.QueuedWatch; +import org.elasticsearch.watcher.input.InputBuilders; +import org.elasticsearch.watcher.test.AbstractWatcherIntegrationTests; +import org.elasticsearch.watcher.transport.actions.stats.WatcherStatsResponse; +import org.junit.Test; + +import java.util.concurrent.TimeUnit; + +import static org.elasticsearch.test.ElasticsearchIntegrationTest.Scope.TEST; +import static org.elasticsearch.watcher.client.WatchSourceBuilders.watchBuilder; +import static org.elasticsearch.watcher.trigger.TriggerBuilders.schedule; +import static org.elasticsearch.watcher.trigger.schedule.Schedules.interval; +import static org.hamcrest.Matchers.*; + +/** + */ +@LuceneTestCase.Slow +@ElasticsearchIntegrationTest.ClusterScope(scope = TEST, numClientNodes = 0, transportClientRatio = 0, randomDynamicTemplates = false, numDataNodes = 1) +public class SlowWatchStatsTests extends AbstractWatcherIntegrationTests { + + @Override + protected boolean timeWarped() { + return false; + } + + @Override + protected Settings nodeSettings(int nodeOrdinal) { + return ImmutableSettings.builder() + .put(super.nodeSettings(nodeOrdinal)) + // So it is predictable how many slow watches we need to add to accumulate pending watches + .put(EsExecutors.PROCESSORS, "1") + .build(); + } + + @Test + public void testCurrentWatches() throws Exception { + watcherClient().preparePutWatch("_id").setSource(watchBuilder() + .trigger(schedule(interval("1s"))) + .input(InputBuilders.simpleInput("key", "value")) + .condition(ConditionBuilders.scriptCondition("sleep 10000; return true")) + .addAction("_action", ActionBuilders.loggingAction("hello {{ctx.watch_id}}!")) + ).get(); + + assertBusy(new Runnable() { + @Override + public void run() { + WatcherStatsResponse response = watcherClient().prepareWatcherStats().setIncludeCurrentWatches(true).get(); + assertThat(response.getWatcherState(), equalTo(WatcherState.STARTED)); + assertThat(response.getWatchesCount(), equalTo(1l)); + assertThat(response.getQueuedWatches(), nullValue()); + assertThat(response.getSnapshots(), notNullValue()); + assertThat(response.getSnapshots().size(), equalTo(1)); + assertThat(response.getSnapshots().get(0).watchId(), equalTo("_id")); + assertThat(response.getSnapshots().get(0).executionPhase(), equalTo(ExecutionPhase.CONDITION)); + } + }); + } + + @Test + public void testPendingWatches() throws Exception { + // Add 5 slow watches and we should almost immediately see pending watches in the stats api + for (int i = 0; i < 5; i++) { + watcherClient().preparePutWatch("_id" + i).setSource(watchBuilder() + .trigger(schedule(interval("1s"))) + .input(InputBuilders.simpleInput("key", "value")) + .condition(ConditionBuilders.scriptCondition("sleep 10000; return true")) + .addAction("_action", ActionBuilders.loggingAction("hello {{ctx.watch_id}}!")) + ).get(); + } + + assertBusy(new Runnable() { + @Override + public void run() { + WatcherStatsResponse response = watcherClient().prepareWatcherStats().setIncludePendingWatches(true).get(); + assertThat(response.getWatcherState(), equalTo(WatcherState.STARTED)); + assertThat(response.getWatchesCount(), equalTo(5l)); + assertThat(response.getSnapshots(), nullValue()); + assertThat(response.getQueuedWatches(), notNullValue()); + assertThat(response.getQueuedWatches().size(), greaterThanOrEqualTo(5)); + DateTime previous = null; + for (QueuedWatch queuedWatch : response.getQueuedWatches()) { + assertThat(queuedWatch.watchId(), anyOf(equalTo("_id0"), equalTo("_id1"), equalTo("_id2"), equalTo("_id3"), equalTo("_id4"))); + if (previous != null) { + // older pending watch should be on top: + assertThat(previous.getMillis(), lessThanOrEqualTo(queuedWatch.executionTime().getMillis())); + } + previous = queuedWatch.executionTime(); + } + } + }, 60, TimeUnit.SECONDS); + } + +} diff --git a/src/test/java/org/elasticsearch/watcher/test/integration/WatchStatsCurrentWatchesTests.java b/src/test/java/org/elasticsearch/watcher/test/integration/WatchStatsCurrentWatchesTests.java deleted file mode 100644 index 638ce181f65..00000000000 --- a/src/test/java/org/elasticsearch/watcher/test/integration/WatchStatsCurrentWatchesTests.java +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ -package org.elasticsearch.watcher.test.integration; - -import org.apache.lucene.util.LuceneTestCase; -import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.test.ElasticsearchIntegrationTest; -import org.elasticsearch.watcher.WatcherState; -import org.elasticsearch.watcher.actions.ActionBuilders; -import org.elasticsearch.watcher.condition.ConditionBuilders; -import org.elasticsearch.watcher.execution.ExecutionPhase; -import org.elasticsearch.watcher.input.InputBuilders; -import org.elasticsearch.watcher.test.AbstractWatcherIntegrationTests; -import org.elasticsearch.watcher.transport.actions.stats.WatcherStatsResponse; -import org.junit.Test; - -import java.util.concurrent.TimeUnit; - -import static org.elasticsearch.test.ElasticsearchIntegrationTest.Scope.TEST; -import static org.elasticsearch.watcher.client.WatchSourceBuilders.watchBuilder; -import static org.elasticsearch.watcher.trigger.TriggerBuilders.schedule; -import static org.elasticsearch.watcher.trigger.schedule.Schedules.interval; -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.notNullValue; - -/** - */ -@ElasticsearchIntegrationTest.ClusterScope(scope = TEST, numClientNodes = 0, transportClientRatio = 0, randomDynamicTemplates = false, numDataNodes = 1) -public class WatchStatsCurrentWatchesTests extends AbstractWatcherIntegrationTests { - - @Override - protected boolean timeWarped() { - return false; - } - - @Test - @LuceneTestCase.Slow - public void testCurrentWatches() throws Exception { - watcherClient().preparePutWatch("_id").setSource(watchBuilder() - .trigger(schedule(interval("1s"))) - .input(InputBuilders.simpleInput("key", "value")) - .condition(ConditionBuilders.scriptCondition("sleep 10000; return true")) - .addAction("_action", ActionBuilders.loggingAction("hello {{ctx.watch_id}}!")) - ).get(); - - assertBusy(new Runnable() { - @Override - public void run() { - WatcherStatsResponse response = watcherClient().prepareWatcherStats().setIncludeCurrentWatches(true).get(); - assertThat(response.getWatcherState(), equalTo(WatcherState.STARTED)); - assertThat(response.getWatchesCount(), equalTo(1l)); - assertThat(response.getSnapshots(), notNullValue()); - assertThat(response.getSnapshots().size(), equalTo(1)); - assertThat(response.getSnapshots().get(0).watchId(), equalTo("_id")); - assertThat(response.getSnapshots().get(0).executionPhase(), equalTo(ExecutionPhase.CONDITION)); - } - }); - } - -}