stats: Add queued watches metric

The queued watches metric gives insight into the watches that are queued for execution.
Per watch that is queued, executing information is shared, like the `watch_id`,
when the watch was triggered and when execution started.

Original commit: elastic/x-pack-elasticsearch@deb5ddfde2
This commit is contained in:
Martijn van Groningen 2015-05-21 14:30:24 +02:00
parent 4335669635
commit 0890001470
11 changed files with 296 additions and 71 deletions

View File

@ -4,10 +4,20 @@
"methods": [ "GET" ],
"url": {
"path": "/_watcher/stats",
"paths": [ "/_watcher/stats" ],
"paths": [ "/_watcher/stats", "/_watcher/stats/{metric}" ],
"parts": {
"metric": {
"type" : "enum",
"options" : ["_all", "queued_watches", "pending_watches"],
"description" : "Controls what additional stat metrics should be include in the response"
}
},
"params": {
"metric": {
"type" : "enum",
"options" : ["_all", "queued_watches", "pending_watches"],
"description" : "Controls what additional stat metrics should be include in the response"
}
}
},
"body": null

View File

@ -73,12 +73,12 @@ public class WatcherService extends AbstractComponent {
if (state.compareAndSet(WatcherState.STARTED, WatcherState.STOPPING)) {
logger.info("stopping watch service...");
triggerService.stop();
executionService.stop();
try {
watchLockService.stop();
} catch (WatchLockService.TimeoutException we) {
logger.warn("error stopping WatchLockService", we);
}
executionService.stop();
watchStore.stop();
state.set(WatcherState.STOPPED);
logger.info("watch service has stopped");

View File

@ -127,12 +127,33 @@ public class ExecutionService extends AbstractComponent {
Collections.sort(currentExecutions, new Comparator<WatchExecutionSnapshot>() {
@Override
public int compare(WatchExecutionSnapshot e1, WatchExecutionSnapshot e2) {
return -e1.executionTime().compareTo(e2.executionTime());
return e1.executionTime().compareTo(e2.executionTime());
}
});
return currentExecutions;
}
public List<QueuedWatch> queuedWatches() {
List<Runnable> snapshot = new ArrayList<>(executor.queue());
if (snapshot.isEmpty()) {
return Collections.emptyList();
}
List<QueuedWatch> queuedWatches = new ArrayList<>(snapshot.size());
for (Runnable task : snapshot) {
WatchExecutionTask executionTask = (WatchExecutionTask) task;
queuedWatches.add(new QueuedWatch(executionTask.ctx));
}
// Lets show the execution that pending the longest first:
Collections.sort(queuedWatches, new Comparator<QueuedWatch>() {
@Override
public int compare(QueuedWatch e1, QueuedWatch e2) {
return e1.executionTime().compareTo(e2.executionTime());
}
});
return queuedWatches;
}
void processEventsAsync(Iterable<TriggerEvent> events) throws WatcherException {
if (!started.get()) {
throw new ElasticsearchIllegalStateException("not started");

View File

@ -0,0 +1,95 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.watcher.execution;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.joda.time.DateTime;
import org.elasticsearch.common.joda.time.DateTimeZone;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import java.io.IOException;
public class QueuedWatch implements Streamable, ToXContent {
private String watchId;
private String watchRecordId;
private DateTime triggeredTime;
private DateTime executionTime;
public QueuedWatch(WatchExecutionContext ctx) {
this.watchId = ctx.watch().id();
this.watchRecordId = ctx.id().value();
this.triggeredTime = ctx.triggerEvent().triggeredTime();
this.executionTime = ctx.executionTime();
}
public QueuedWatch(StreamInput in) throws IOException {
readFrom(in);
}
public String watchId() {
return watchId;
}
public void WatchId(String watchId) {
this.watchId = watchId;
}
public String watchRecordId() {
return watchRecordId;
}
public void watchRecordId(String watchRecordId) {
this.watchRecordId = watchRecordId;
}
public DateTime triggeredTime() {
return triggeredTime;
}
public void triggeredTime(DateTime triggeredTime) {
this.triggeredTime = triggeredTime;
}
public DateTime executionTime() {
return executionTime;
}
public void executionTime(DateTime executionTime) {
this.executionTime = executionTime;
}
@Override
public void readFrom(StreamInput in) throws IOException {
watchId = in.readString();
watchRecordId = in.readString();
triggeredTime = new DateTime(in.readVLong(), DateTimeZone.UTC);
executionTime = new DateTime(in.readVLong(), DateTimeZone.UTC);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(watchId);
out.writeString(watchRecordId);
out.writeVLong(triggeredTime.getMillis());
out.writeVLong(executionTime.getMillis());
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field("watch_id", watchId);
builder.field("watch_record_id", watchRecordId);
builder.field("triggered_time", triggeredTime);
builder.field("execution_time", executionTime);
builder.endObject();
return builder;
}
}

View File

@ -12,13 +12,11 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.rest.*;
import org.elasticsearch.rest.action.support.RestBuilderListener;
import org.elasticsearch.rest.action.support.RestToXContentListener;
import org.elasticsearch.watcher.client.WatcherClient;
import org.elasticsearch.watcher.rest.WatcherRestHandler;
import org.elasticsearch.watcher.transport.actions.stats.WatcherStatsResponse;
import org.elasticsearch.watcher.transport.actions.stats.WatcherStatsRequest;
import org.elasticsearch.watcher.transport.actions.stats.WatcherStatsResponse;
import java.util.Locale;
import java.util.Set;
import static org.elasticsearch.rest.RestRequest.Method.GET;
@ -38,10 +36,12 @@ public class RestWatcherStatsAction extends WatcherRestHandler {
Set<String> metrics = Strings.splitStringByCommaToSet(restRequest.param("metric", ""));
WatcherStatsRequest request = new WatcherStatsRequest();
if (metrics.size() == 1 && metrics.contains("_all")) {
if (metrics.contains("_all")) {
request.includeCurrentWatches(true);
request.includePendingWatches(true);
} else {
request.includeCurrentWatches(metrics.contains("executing_watches"));
request.includeCurrentWatches(metrics.contains("queued_watches"));
request.includePendingWatches(metrics.contains("pending_watches"));
}
client.watcherStats(request, new RestBuilderListener<WatcherStatsResponse>(restChannel) {

View File

@ -69,6 +69,9 @@ public class TransportWatcherStatsAction extends WatcherTransportAction<WatcherS
if (request.includeCurrentWatches()) {
statsResponse.setSnapshots(executionService.currentExecutions());
}
if (request.includePendingWatches()) {
statsResponse.setQueuedWatches(executionService.queuedWatches());
}
listener.onResponse(statsResponse);
}

View File

@ -18,6 +18,7 @@ import java.io.IOException;
public class WatcherStatsRequest extends MasterNodeOperationRequest<WatcherStatsRequest> {
private boolean includeCurrentWatches;
private boolean includePendingWatches;
public WatcherStatsRequest() {
}
@ -30,6 +31,14 @@ public class WatcherStatsRequest extends MasterNodeOperationRequest<WatcherStats
this.includeCurrentWatches = currentWatches;
}
public boolean includePendingWatches() {
return includePendingWatches;
}
public void includePendingWatches(boolean includePendingWatches) {
this.includePendingWatches = includePendingWatches;
}
@Override
public ActionRequestValidationException validate() {
return null;
@ -39,12 +48,14 @@ public class WatcherStatsRequest extends MasterNodeOperationRequest<WatcherStats
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
includeCurrentWatches = in.readBoolean();
includePendingWatches = in.readBoolean();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeBoolean(includeCurrentWatches);
out.writeBoolean(includeCurrentWatches);
}
@Override

View File

@ -23,6 +23,10 @@ public class WatcherStatsRequestBuilder extends MasterNodeOperationRequestBuilde
request().includeCurrentWatches(includeCurrentWatches);
return this;
}
public WatcherStatsRequestBuilder setIncludePendingWatches(boolean includePendingWatches) {
request().includePendingWatches(includePendingWatches);
return this;
}
@Override
protected void doExecute(final ActionListener<WatcherStatsResponse> listener) {

View File

@ -14,6 +14,7 @@ import org.elasticsearch.watcher.WatcherState;
import org.elasticsearch.watcher.WatcherVersion;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.watcher.execution.QueuedWatch;
import org.elasticsearch.watcher.execution.WatchExecutionSnapshot;
import java.io.IOException;
@ -31,6 +32,7 @@ public class WatcherStatsResponse extends ActionResponse implements ToXContent {
private long watchExecutionQueueMaxSize;
private List<WatchExecutionSnapshot> snapshots;
private List<QueuedWatch> queuedWatches;
WatcherStatsResponse() {
}
@ -110,6 +112,15 @@ public class WatcherStatsResponse extends ActionResponse implements ToXContent {
this.snapshots = snapshots;
}
@Nullable
public List<QueuedWatch> getQueuedWatches() {
return queuedWatches;
}
public void setQueuedWatches(List<QueuedWatch> queuedWatches) {
this.queuedWatches = queuedWatches;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
@ -127,6 +138,13 @@ public class WatcherStatsResponse extends ActionResponse implements ToXContent {
snapshots.add(new WatchExecutionSnapshot(in));
}
}
if (in.readBoolean()) {
int size = in.readVInt();
queuedWatches = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
queuedWatches.add(new QueuedWatch(in));
}
}
}
@Override
@ -148,6 +166,15 @@ public class WatcherStatsResponse extends ActionResponse implements ToXContent {
} else {
out.writeBoolean(false);
}
if (queuedWatches != null) {
out.writeBoolean(true);
out.writeVInt(queuedWatches.size());
for (QueuedWatch pending : this.queuedWatches) {
pending.writeTo(out);
}
} else {
out.writeBoolean(false);
}
}
@Override
@ -167,6 +194,13 @@ public class WatcherStatsResponse extends ActionResponse implements ToXContent {
}
builder.endArray();
}
if (queuedWatches != null) {
builder.startArray("queued_watches");
for (QueuedWatch queuedWatch : queuedWatches) {
queuedWatch.toXContent(builder, params);
}
builder.endArray();
}
builder.endObject();
return builder;

View File

@ -0,0 +1,110 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.watcher.test.integration;
import org.apache.lucene.util.LuceneTestCase;
import org.elasticsearch.common.joda.time.DateTime;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.elasticsearch.watcher.WatcherState;
import org.elasticsearch.watcher.actions.ActionBuilders;
import org.elasticsearch.watcher.condition.ConditionBuilders;
import org.elasticsearch.watcher.execution.ExecutionPhase;
import org.elasticsearch.watcher.execution.QueuedWatch;
import org.elasticsearch.watcher.input.InputBuilders;
import org.elasticsearch.watcher.test.AbstractWatcherIntegrationTests;
import org.elasticsearch.watcher.transport.actions.stats.WatcherStatsResponse;
import org.junit.Test;
import java.util.concurrent.TimeUnit;
import static org.elasticsearch.test.ElasticsearchIntegrationTest.Scope.TEST;
import static org.elasticsearch.watcher.client.WatchSourceBuilders.watchBuilder;
import static org.elasticsearch.watcher.trigger.TriggerBuilders.schedule;
import static org.elasticsearch.watcher.trigger.schedule.Schedules.interval;
import static org.hamcrest.Matchers.*;
/**
*/
@LuceneTestCase.Slow
@ElasticsearchIntegrationTest.ClusterScope(scope = TEST, numClientNodes = 0, transportClientRatio = 0, randomDynamicTemplates = false, numDataNodes = 1)
public class SlowWatchStatsTests extends AbstractWatcherIntegrationTests {
@Override
protected boolean timeWarped() {
return false;
}
@Override
protected Settings nodeSettings(int nodeOrdinal) {
return ImmutableSettings.builder()
.put(super.nodeSettings(nodeOrdinal))
// So it is predictable how many slow watches we need to add to accumulate pending watches
.put(EsExecutors.PROCESSORS, "1")
.build();
}
@Test
public void testCurrentWatches() throws Exception {
watcherClient().preparePutWatch("_id").setSource(watchBuilder()
.trigger(schedule(interval("1s")))
.input(InputBuilders.simpleInput("key", "value"))
.condition(ConditionBuilders.scriptCondition("sleep 10000; return true"))
.addAction("_action", ActionBuilders.loggingAction("hello {{ctx.watch_id}}!"))
).get();
assertBusy(new Runnable() {
@Override
public void run() {
WatcherStatsResponse response = watcherClient().prepareWatcherStats().setIncludeCurrentWatches(true).get();
assertThat(response.getWatcherState(), equalTo(WatcherState.STARTED));
assertThat(response.getWatchesCount(), equalTo(1l));
assertThat(response.getQueuedWatches(), nullValue());
assertThat(response.getSnapshots(), notNullValue());
assertThat(response.getSnapshots().size(), equalTo(1));
assertThat(response.getSnapshots().get(0).watchId(), equalTo("_id"));
assertThat(response.getSnapshots().get(0).executionPhase(), equalTo(ExecutionPhase.CONDITION));
}
});
}
@Test
public void testPendingWatches() throws Exception {
// Add 5 slow watches and we should almost immediately see pending watches in the stats api
for (int i = 0; i < 5; i++) {
watcherClient().preparePutWatch("_id" + i).setSource(watchBuilder()
.trigger(schedule(interval("1s")))
.input(InputBuilders.simpleInput("key", "value"))
.condition(ConditionBuilders.scriptCondition("sleep 10000; return true"))
.addAction("_action", ActionBuilders.loggingAction("hello {{ctx.watch_id}}!"))
).get();
}
assertBusy(new Runnable() {
@Override
public void run() {
WatcherStatsResponse response = watcherClient().prepareWatcherStats().setIncludePendingWatches(true).get();
assertThat(response.getWatcherState(), equalTo(WatcherState.STARTED));
assertThat(response.getWatchesCount(), equalTo(5l));
assertThat(response.getSnapshots(), nullValue());
assertThat(response.getQueuedWatches(), notNullValue());
assertThat(response.getQueuedWatches().size(), greaterThanOrEqualTo(5));
DateTime previous = null;
for (QueuedWatch queuedWatch : response.getQueuedWatches()) {
assertThat(queuedWatch.watchId(), anyOf(equalTo("_id0"), equalTo("_id1"), equalTo("_id2"), equalTo("_id3"), equalTo("_id4")));
if (previous != null) {
// older pending watch should be on top:
assertThat(previous.getMillis(), lessThanOrEqualTo(queuedWatch.executionTime().getMillis()));
}
previous = queuedWatch.executionTime();
}
}
}, 60, TimeUnit.SECONDS);
}
}

View File

@ -1,63 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.watcher.test.integration;
import org.apache.lucene.util.LuceneTestCase;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.elasticsearch.watcher.WatcherState;
import org.elasticsearch.watcher.actions.ActionBuilders;
import org.elasticsearch.watcher.condition.ConditionBuilders;
import org.elasticsearch.watcher.execution.ExecutionPhase;
import org.elasticsearch.watcher.input.InputBuilders;
import org.elasticsearch.watcher.test.AbstractWatcherIntegrationTests;
import org.elasticsearch.watcher.transport.actions.stats.WatcherStatsResponse;
import org.junit.Test;
import java.util.concurrent.TimeUnit;
import static org.elasticsearch.test.ElasticsearchIntegrationTest.Scope.TEST;
import static org.elasticsearch.watcher.client.WatchSourceBuilders.watchBuilder;
import static org.elasticsearch.watcher.trigger.TriggerBuilders.schedule;
import static org.elasticsearch.watcher.trigger.schedule.Schedules.interval;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.notNullValue;
/**
*/
@ElasticsearchIntegrationTest.ClusterScope(scope = TEST, numClientNodes = 0, transportClientRatio = 0, randomDynamicTemplates = false, numDataNodes = 1)
public class WatchStatsCurrentWatchesTests extends AbstractWatcherIntegrationTests {
@Override
protected boolean timeWarped() {
return false;
}
@Test
@LuceneTestCase.Slow
public void testCurrentWatches() throws Exception {
watcherClient().preparePutWatch("_id").setSource(watchBuilder()
.trigger(schedule(interval("1s")))
.input(InputBuilders.simpleInput("key", "value"))
.condition(ConditionBuilders.scriptCondition("sleep 10000; return true"))
.addAction("_action", ActionBuilders.loggingAction("hello {{ctx.watch_id}}!"))
).get();
assertBusy(new Runnable() {
@Override
public void run() {
WatcherStatsResponse response = watcherClient().prepareWatcherStats().setIncludeCurrentWatches(true).get();
assertThat(response.getWatcherState(), equalTo(WatcherState.STARTED));
assertThat(response.getWatchesCount(), equalTo(1l));
assertThat(response.getSnapshots(), notNullValue());
assertThat(response.getSnapshots().size(), equalTo(1));
assertThat(response.getSnapshots().get(0).watchId(), equalTo("_id"));
assertThat(response.getSnapshots().get(0).executionPhase(), equalTo(ExecutionPhase.CONDITION));
}
});
}
}