Expose the current executing watches as part of the Watcher stats API.
The following additional information will be shown per watch that is executing: `watch_id`, `watch_execution_id`, `triggered_time`, `execution_time`, `execution_phase` (whether it is execution an input, condition or an action) and `stack_trace` (useful for us when a customer reports an issue :) ). The stats api will by default include the executing watches in the response. In order to control this, a `metric` option has been added, which can be specified as query string argument or as last path element in the stats api url. By default the watcher stats API will only return the basic statistics that are already there. The `metric` option has the following values: * `current_watches` - Include the current executing watches in the response. * `_all` - Include all metrics in the stats response. Not very useful now, but when we expose more metrics in this api it will be more useful. Original commit: elastic/x-pack-elasticsearch@093bef9bb3
This commit is contained in:
parent
81d19d3468
commit
c3dd74df7f
|
@ -0,0 +1,29 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.watcher.execution;
|
||||
|
||||
import java.util.Locale;
|
||||
|
||||
/**
|
||||
*/
|
||||
public enum ExecutionPhase {
|
||||
|
||||
AWAITS_EXECUTION,
|
||||
INPUT,
|
||||
CONDITION,
|
||||
WATCH_TRANSFORM,
|
||||
ACTIONS,
|
||||
FINISHED;
|
||||
|
||||
public static ExecutionPhase parse(String value) {
|
||||
return valueOf(value.toUpperCase(Locale.ROOT));
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return name().toLowerCase(Locale.ROOT);
|
||||
}
|
||||
}
|
|
@ -13,6 +13,7 @@ import org.elasticsearch.common.component.AbstractComponent;
|
|||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.joda.time.DateTime;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
|
||||
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
|
||||
import org.elasticsearch.index.engine.VersionConflictEngineException;
|
||||
import org.elasticsearch.watcher.WatcherException;
|
||||
|
@ -31,10 +32,8 @@ import org.elasticsearch.watcher.watch.WatchLockService;
|
|||
import org.elasticsearch.watcher.watch.WatchStore;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import static org.elasticsearch.common.joda.time.DateTimeZone.UTC;
|
||||
|
@ -48,6 +47,7 @@ public class ExecutionService extends AbstractComponent {
|
|||
private final WatchStore watchStore;
|
||||
private final WatchLockService watchLockService;
|
||||
private final Clock clock;
|
||||
private final ConcurrentMap<String, WatchExecution> currentExecutions = ConcurrentCollections.newConcurrentMapWithAggressiveConcurrency();
|
||||
|
||||
private final AtomicBoolean started = new AtomicBoolean(false);
|
||||
|
||||
|
@ -106,6 +106,21 @@ public class ExecutionService extends AbstractComponent {
|
|||
return executor.largestPoolSize();
|
||||
}
|
||||
|
||||
public List<WatchExecutionSnapshot> currentExecutions() {
|
||||
List<WatchExecutionSnapshot> currentExecutions = new ArrayList<>();
|
||||
for (WatchExecution watchExecution : this.currentExecutions.values()) {
|
||||
currentExecutions.add(watchExecution.createSnapshot());
|
||||
}
|
||||
// Lets show the longest running watch first:
|
||||
Collections.sort(currentExecutions, new Comparator<WatchExecutionSnapshot>() {
|
||||
@Override
|
||||
public int compare(WatchExecutionSnapshot e1, WatchExecutionSnapshot e2) {
|
||||
return -e1.executionTime().compareTo(e2.executionTime());
|
||||
}
|
||||
});
|
||||
return currentExecutions;
|
||||
}
|
||||
|
||||
void processEventsAsync(Iterable<TriggerEvent> events) throws WatcherException {
|
||||
if (!started.get()) {
|
||||
throw new ElasticsearchIllegalStateException("not started");
|
||||
|
@ -212,6 +227,7 @@ public class ExecutionService extends AbstractComponent {
|
|||
WatchRecord watchRecord = new WatchRecord(ctx.id(), ctx.watch(), ctx.triggerEvent());
|
||||
WatchLockService.Lock lock = watchLockService.acquire(ctx.watch().id());
|
||||
try {
|
||||
currentExecutions.put(ctx.watch().id(), new WatchExecution(ctx, Thread.currentThread()));
|
||||
WatchExecutionResult result = executeInner(ctx);
|
||||
watchRecord.seal(result);
|
||||
if (ctx.recordExecution()) {
|
||||
|
@ -220,6 +236,7 @@ public class ExecutionService extends AbstractComponent {
|
|||
} catch (VersionConflictEngineException vcee) {
|
||||
throw new WatcherException("Failed to update the watch [{}] on execute perhaps it was force deleted", vcee, ctx.watch().id());
|
||||
} finally {
|
||||
currentExecutions.remove(ctx.watch().id());
|
||||
lock.release();
|
||||
}
|
||||
if (ctx.recordExecution()) {
|
||||
|
@ -250,11 +267,13 @@ public class ExecutionService extends AbstractComponent {
|
|||
|
||||
WatchExecutionResult executeInner(WatchExecutionContext ctx) throws IOException {
|
||||
Watch watch = ctx.watch();
|
||||
ctx.beforeInput();
|
||||
Input.Result inputResult = ctx.inputResult();
|
||||
if (inputResult == null) {
|
||||
inputResult = watch.input().execute(ctx);
|
||||
ctx.onInputResult(inputResult);
|
||||
}
|
||||
ctx.beforeCondition();
|
||||
Condition.Result conditionResult = ctx.conditionResult();
|
||||
if (conditionResult == null) {
|
||||
conditionResult = watch.condition().execute(ctx);
|
||||
|
@ -262,7 +281,6 @@ public class ExecutionService extends AbstractComponent {
|
|||
}
|
||||
|
||||
if (conditionResult.met()) {
|
||||
|
||||
Throttler.Result throttleResult = ctx.throttleResult();
|
||||
if (throttleResult == null) {
|
||||
throttleResult = watch.throttler().throttle(ctx);
|
||||
|
@ -272,9 +290,11 @@ public class ExecutionService extends AbstractComponent {
|
|||
if (!throttleResult.throttle()) {
|
||||
ExecutableTransform transform = watch.transform();
|
||||
if (transform != null) {
|
||||
ctx.beforeWatchTransform();
|
||||
Transform.Result result = watch.transform().execute(ctx, inputResult.payload());
|
||||
ctx.onTransformResult(result);
|
||||
}
|
||||
ctx.beforeAction();
|
||||
for (ActionWrapper action : watch.actions()) {
|
||||
ActionWrapper.Result actionResult = action.execute(ctx);
|
||||
ctx.onActionResult(actionResult);
|
||||
|
@ -323,6 +343,7 @@ public class ExecutionService extends AbstractComponent {
|
|||
|
||||
WatchLockService.Lock lock = watchLockService.acquire(ctx.watch().id());
|
||||
try {
|
||||
currentExecutions.put(ctx.watch().id(), new WatchExecution(ctx, Thread.currentThread()));
|
||||
if (watchStore.get(ctx.watch().id()) == null) {
|
||||
//Fail fast if we are trying to execute a deleted watch
|
||||
String message = "unable to find watch for record [" + watchRecord.id() + "], perhaps it has been deleted, ignoring...";
|
||||
|
@ -345,6 +366,7 @@ public class ExecutionService extends AbstractComponent {
|
|||
logger.debug("failed to execute watch [{}] after shutdown", e, watchRecord);
|
||||
}
|
||||
} finally {
|
||||
currentExecutions.remove(ctx.watch().id());
|
||||
lock.release();
|
||||
logger.trace("finished [{}]/[{}]", ctx.watch().id(), ctx.id());
|
||||
}
|
||||
|
@ -358,4 +380,20 @@ public class ExecutionService extends AbstractComponent {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
public static class WatchExecution {
|
||||
|
||||
private final WatchExecutionContext context;
|
||||
private final Thread executionThread;
|
||||
|
||||
public WatchExecution(WatchExecutionContext context, Thread executionThread) {
|
||||
this.context = context;
|
||||
this.executionThread = executionThread;
|
||||
}
|
||||
|
||||
public WatchExecutionSnapshot createSnapshot() {
|
||||
return context.createSnapshot(executionThread);
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
|
|
@ -6,6 +6,7 @@
|
|||
package org.elasticsearch.watcher.execution;
|
||||
|
||||
import org.elasticsearch.common.joda.time.DateTime;
|
||||
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
|
||||
import org.elasticsearch.watcher.actions.ActionWrapper;
|
||||
import org.elasticsearch.watcher.actions.ExecutableActions;
|
||||
import org.elasticsearch.watcher.condition.Condition;
|
||||
|
@ -16,8 +17,7 @@ import org.elasticsearch.watcher.trigger.TriggerEvent;
|
|||
import org.elasticsearch.watcher.watch.Payload;
|
||||
import org.elasticsearch.watcher.watch.Watch;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
|
||||
/**
|
||||
*
|
||||
|
@ -33,9 +33,10 @@ public abstract class WatchExecutionContext {
|
|||
private Condition.Result conditionResult;
|
||||
private Throttler.Result throttleResult;
|
||||
private Transform.Result transformResult;
|
||||
private Map<String, ActionWrapper.Result> actionsResults = new HashMap<>();
|
||||
private ConcurrentMap<String, ActionWrapper.Result> actionsResults = ConcurrentCollections.newConcurrentMap();
|
||||
|
||||
private Payload payload;
|
||||
private volatile ExecutionPhase executionPhase = ExecutionPhase.AWAITS_EXECUTION;
|
||||
|
||||
public WatchExecutionContext(Watch watch, DateTime executionTime, TriggerEvent triggerEvent) {
|
||||
this.watch = watch;
|
||||
|
@ -74,6 +75,14 @@ public abstract class WatchExecutionContext {
|
|||
return payload;
|
||||
}
|
||||
|
||||
public ExecutionPhase executionPhase() {
|
||||
return executionPhase;
|
||||
}
|
||||
|
||||
public void beforeInput() {
|
||||
executionPhase = ExecutionPhase.INPUT;
|
||||
}
|
||||
|
||||
public void onInputResult(Input.Result inputResult) {
|
||||
this.inputResult = inputResult;
|
||||
this.payload = inputResult.payload();
|
||||
|
@ -83,6 +92,10 @@ public abstract class WatchExecutionContext {
|
|||
return inputResult;
|
||||
}
|
||||
|
||||
public void beforeCondition() {
|
||||
executionPhase = ExecutionPhase.CONDITION;
|
||||
}
|
||||
|
||||
public void onConditionResult(Condition.Result conditionResult) {
|
||||
this.conditionResult = conditionResult;
|
||||
if (recordExecution()) {
|
||||
|
@ -106,6 +119,10 @@ public abstract class WatchExecutionContext {
|
|||
}
|
||||
}
|
||||
|
||||
public void beforeWatchTransform() {
|
||||
this.executionPhase = ExecutionPhase.WATCH_TRANSFORM;
|
||||
}
|
||||
|
||||
public Throttler.Result throttleResult() {
|
||||
return throttleResult;
|
||||
}
|
||||
|
@ -119,6 +136,10 @@ public abstract class WatchExecutionContext {
|
|||
return transformResult;
|
||||
}
|
||||
|
||||
public void beforeAction() {
|
||||
executionPhase = ExecutionPhase.ACTIONS;
|
||||
}
|
||||
|
||||
public void onActionResult(ActionWrapper.Result result) {
|
||||
actionsResults.put(result.id(), result);
|
||||
}
|
||||
|
@ -128,6 +149,11 @@ public abstract class WatchExecutionContext {
|
|||
}
|
||||
|
||||
public WatchExecutionResult finish() {
|
||||
executionPhase = ExecutionPhase.FINISHED;
|
||||
return new WatchExecutionResult(this);
|
||||
}
|
||||
|
||||
public WatchExecutionSnapshot createSnapshot(Thread executionThread) {
|
||||
return new WatchExecutionSnapshot(this, executionThread.getStackTrace());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,138 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.watcher.execution;
|
||||
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.io.stream.Streamable;
|
||||
import org.elasticsearch.common.joda.time.DateTime;
|
||||
import org.elasticsearch.common.joda.time.DateTimeZone;
|
||||
import org.elasticsearch.common.xcontent.ToXContent;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.watcher.actions.ActionWrapper;
|
||||
import org.elasticsearch.watcher.actions.ExecutableActions;
|
||||
import org.elasticsearch.watcher.execution.ExecutionPhase;
|
||||
import org.elasticsearch.watcher.execution.ExecutionService;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class WatchExecutionSnapshot implements Streamable, ToXContent {
|
||||
|
||||
private String watchId;
|
||||
private String watchRecordId;
|
||||
private DateTime triggeredTime;
|
||||
private DateTime executionTime;
|
||||
private ExecutionPhase phase;
|
||||
private String[] executedActions;
|
||||
private StackTraceElement[] executionStackTrace;
|
||||
|
||||
public WatchExecutionSnapshot(StreamInput in) throws IOException {
|
||||
readFrom(in);
|
||||
}
|
||||
|
||||
WatchExecutionSnapshot(WatchExecutionContext context, StackTraceElement[] executionStackTrace) {
|
||||
watchId = context.watch().id();
|
||||
watchRecordId = context.id().value();
|
||||
triggeredTime = context.triggerEvent().triggeredTime();
|
||||
executionTime = context.executionTime();
|
||||
phase = context.executionPhase();
|
||||
if (phase == ExecutionPhase.ACTIONS) {
|
||||
ExecutableActions.Results actionResults = context.actionsResults();
|
||||
executedActions = new String[actionResults.count()];
|
||||
int i = 0;
|
||||
for (ActionWrapper.Result actionResult : actionResults) {
|
||||
executedActions[i++] = actionResult.id();
|
||||
}
|
||||
}
|
||||
this.executionStackTrace = executionStackTrace;
|
||||
}
|
||||
|
||||
public String watchId() {
|
||||
return watchId;
|
||||
}
|
||||
|
||||
public String watchRecordId() {
|
||||
return watchRecordId;
|
||||
}
|
||||
|
||||
public DateTime triggeredTime() {
|
||||
return triggeredTime;
|
||||
}
|
||||
|
||||
public DateTime executionTime() {
|
||||
return executionTime;
|
||||
}
|
||||
|
||||
public ExecutionPhase executionPhase() {
|
||||
return phase;
|
||||
}
|
||||
|
||||
public StackTraceElement[] executionStackTrace() {
|
||||
return executionStackTrace;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void readFrom(StreamInput in) throws IOException {
|
||||
watchId = in.readString();
|
||||
watchRecordId = in.readString();
|
||||
triggeredTime = new DateTime(in.readVLong(), DateTimeZone.UTC);
|
||||
executionTime = new DateTime(in.readVLong(), DateTimeZone.UTC);
|
||||
int size = in.readVInt();
|
||||
phase = ExecutionPhase.parse(in.readString());
|
||||
executionStackTrace = new StackTraceElement[size];
|
||||
for (int i = 0; i < size; i++) {
|
||||
String declaringClass = in.readString();
|
||||
String methodName = in.readString();
|
||||
String fileName = in.readString();
|
||||
int lineNumber = in.readInt();
|
||||
executionStackTrace[i] = new StackTraceElement(declaringClass, methodName, fileName, lineNumber);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
out.writeString(watchId);
|
||||
out.writeString(watchRecordId);
|
||||
out.writeVLong(triggeredTime.getMillis());
|
||||
out.writeVLong(executionTime.getMillis());
|
||||
out.writeString(phase.toString());
|
||||
out.writeVInt(executionStackTrace.length);
|
||||
for (StackTraceElement element : executionStackTrace) {
|
||||
out.writeString(element.getClassName());
|
||||
out.writeString(element.getMethodName());
|
||||
out.writeString(element.getFileName());
|
||||
out.writeInt(element.getLineNumber());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
builder.startObject();
|
||||
builder.field("watch_id", watchId);
|
||||
builder.field("watch_record_id", watchRecordId);
|
||||
builder.field("triggered_time", triggeredTime);
|
||||
builder.field("execution_time", executionTime);
|
||||
builder.field("execution_phase", phase);
|
||||
if (executedActions != null) {
|
||||
builder.startArray("executed_actions");
|
||||
for (String executedAction : executedActions) {
|
||||
builder.value(executedAction);
|
||||
}
|
||||
builder.endArray();
|
||||
}
|
||||
if (params.paramAsBoolean("emit_stacktraces", false)) {
|
||||
builder.startArray("stack_trace");
|
||||
for (StackTraceElement element : executionStackTrace) {
|
||||
builder.value(element.toString());
|
||||
}
|
||||
builder.endArray();
|
||||
}
|
||||
builder.endObject();
|
||||
return builder;
|
||||
}
|
||||
}
|
|
@ -6,17 +6,20 @@
|
|||
package org.elasticsearch.watcher.rest.action;
|
||||
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.rest.*;
|
||||
import org.elasticsearch.rest.action.support.RestBuilderListener;
|
||||
import org.elasticsearch.rest.action.support.RestToXContentListener;
|
||||
import org.elasticsearch.watcher.client.WatcherClient;
|
||||
import org.elasticsearch.watcher.rest.WatcherRestHandler;
|
||||
import org.elasticsearch.watcher.transport.actions.stats.WatcherStatsResponse;
|
||||
import org.elasticsearch.watcher.transport.actions.stats.WatcherStatsRequest;
|
||||
|
||||
import java.util.Locale;
|
||||
import java.util.Set;
|
||||
|
||||
import static org.elasticsearch.rest.RestRequest.Method.GET;
|
||||
import static org.elasticsearch.rest.RestStatus.OK;
|
||||
|
@ -27,24 +30,25 @@ public class RestWatcherStatsAction extends WatcherRestHandler {
|
|||
protected RestWatcherStatsAction(Settings settings, RestController controller, Client client) {
|
||||
super(settings, controller, client);
|
||||
controller.registerHandler(GET, URI_BASE + "/stats", this);
|
||||
controller.registerHandler(GET, URI_BASE + "/stats/{metric}", this);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void handleRequest(RestRequest request, RestChannel restChannel, WatcherClient client) throws Exception {
|
||||
client.watcherStats(new WatcherStatsRequest(), new RestBuilderListener<WatcherStatsResponse>(restChannel) {
|
||||
protected void handleRequest(final RestRequest restRequest, RestChannel restChannel, WatcherClient client) throws Exception {
|
||||
Set<String> metrics = Strings.splitStringByCommaToSet(restRequest.param("metric"));
|
||||
|
||||
WatcherStatsRequest request = new WatcherStatsRequest();
|
||||
if (metrics.size() == 1 && metrics.contains("_all")) {
|
||||
request.includeCurrentWatches(true);
|
||||
} else {
|
||||
request.includeCurrentWatches(metrics.contains("executing_watches"));
|
||||
}
|
||||
|
||||
client.watcherStats(request, new RestBuilderListener<WatcherStatsResponse>(restChannel) {
|
||||
@Override
|
||||
public RestResponse buildResponse(WatcherStatsResponse watcherStatsResponse, XContentBuilder builder) throws Exception {
|
||||
builder.startObject();
|
||||
builder.field("watcher_state", watcherStatsResponse.getWatcherState().toString().toLowerCase(Locale.ROOT))
|
||||
.field("watch_count", watcherStatsResponse.getWatchesCount());
|
||||
|
||||
builder.startObject("execution_queue")
|
||||
.field("size", watcherStatsResponse.getExecutionQueueSize())
|
||||
.field("max_size", watcherStatsResponse.getWatchExecutionQueueMaxSize())
|
||||
.endObject();
|
||||
builder.endObject();
|
||||
watcherStatsResponse.toXContent(builder, restRequest);
|
||||
return new BytesRestResponse(OK, builder);
|
||||
|
||||
}
|
||||
});
|
||||
}
|
||||
|
|
|
@ -65,6 +65,11 @@ public class TransportWatcherStatsAction extends WatcherTransportAction<WatcherS
|
|||
statsResponse.setWatchExecutionQueueMaxSize(executionService.largestQueueSize());
|
||||
statsResponse.setVersion(WatcherVersion.CURRENT);
|
||||
statsResponse.setBuild(WatcherBuild.CURRENT);
|
||||
|
||||
if (request.includeCurrentWatches()) {
|
||||
statsResponse.setSnapshots(executionService.currentExecutions());
|
||||
}
|
||||
|
||||
listener.onResponse(statsResponse);
|
||||
}
|
||||
|
||||
|
|
|
@ -17,10 +17,19 @@ import java.io.IOException;
|
|||
*/
|
||||
public class WatcherStatsRequest extends MasterNodeOperationRequest<WatcherStatsRequest> {
|
||||
|
||||
private boolean includeCurrentWatches;
|
||||
|
||||
public WatcherStatsRequest() {
|
||||
}
|
||||
|
||||
public boolean includeCurrentWatches() {
|
||||
return includeCurrentWatches;
|
||||
}
|
||||
|
||||
public void includeCurrentWatches(boolean currentWatches) {
|
||||
this.includeCurrentWatches = currentWatches;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ActionRequestValidationException validate() {
|
||||
return null;
|
||||
|
@ -29,11 +38,13 @@ public class WatcherStatsRequest extends MasterNodeOperationRequest<WatcherStats
|
|||
@Override
|
||||
public void readFrom(StreamInput in) throws IOException {
|
||||
super.readFrom(in);
|
||||
includeCurrentWatches = in.readBoolean();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
super.writeTo(out);
|
||||
out.writeBoolean(includeCurrentWatches);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -19,6 +19,10 @@ public class WatcherStatsRequestBuilder extends MasterNodeOperationRequestBuilde
|
|||
super(client, new WatcherStatsRequest());
|
||||
}
|
||||
|
||||
public WatcherStatsRequestBuilder setIncludeCurrentWatches(boolean includeCurrentWatches) {
|
||||
request().includeCurrentWatches(includeCurrentWatches);
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doExecute(final ActionListener<WatcherStatsResponse> listener) {
|
||||
|
|
|
@ -6,15 +6,21 @@
|
|||
package org.elasticsearch.watcher.transport.actions.stats;
|
||||
|
||||
import org.elasticsearch.action.ActionResponse;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.xcontent.ToXContent;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.watcher.WatcherBuild;
|
||||
import org.elasticsearch.watcher.WatcherState;
|
||||
import org.elasticsearch.watcher.WatcherVersion;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.watcher.execution.WatchExecutionSnapshot;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
|
||||
public class WatcherStatsResponse extends ActionResponse {
|
||||
public class WatcherStatsResponse extends ActionResponse implements ToXContent {
|
||||
|
||||
private WatcherVersion version;
|
||||
private WatcherBuild build;
|
||||
|
@ -23,6 +29,8 @@ public class WatcherStatsResponse extends ActionResponse {
|
|||
private long watchExecutionQueueSize;
|
||||
private long watchExecutionQueueMaxSize;
|
||||
|
||||
private List<WatchExecutionSnapshot> snapshots;
|
||||
|
||||
WatcherStatsResponse() {
|
||||
}
|
||||
|
||||
|
@ -92,6 +100,15 @@ public class WatcherStatsResponse extends ActionResponse {
|
|||
this.build = build;
|
||||
}
|
||||
|
||||
@Nullable
|
||||
public List<WatchExecutionSnapshot> getSnapshots() {
|
||||
return snapshots;
|
||||
}
|
||||
|
||||
void setSnapshots(List<WatchExecutionSnapshot> snapshots) {
|
||||
this.snapshots = snapshots;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void readFrom(StreamInput in) throws IOException {
|
||||
super.readFrom(in);
|
||||
|
@ -101,6 +118,13 @@ public class WatcherStatsResponse extends ActionResponse {
|
|||
watcherState = WatcherState.fromId(in.readByte());
|
||||
version = WatcherVersion.readVersion(in);
|
||||
build = WatcherBuild.readBuild(in);
|
||||
|
||||
if (in.readBoolean()) {
|
||||
int size = in.readVInt();
|
||||
for (int i = 0; i < size; i++) {
|
||||
snapshots.add(new WatchExecutionSnapshot(in));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -112,5 +136,37 @@ public class WatcherStatsResponse extends ActionResponse {
|
|||
out.writeByte(watcherState.getId());
|
||||
WatcherVersion.writeVersion(version, out);
|
||||
WatcherBuild.writeBuild(build, out);
|
||||
|
||||
if (snapshots != null) {
|
||||
out.writeBoolean(true);
|
||||
out.writeVInt(snapshots.size());
|
||||
for (WatchExecutionSnapshot snapshot : snapshots) {
|
||||
snapshot.writeTo(out);
|
||||
}
|
||||
} else {
|
||||
out.writeBoolean(false);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
builder.startObject();
|
||||
builder.field("watcher_state", watcherState.toString().toLowerCase(Locale.ROOT));
|
||||
builder.field("watch_count", watchesCount);
|
||||
builder.startObject("execution_queue");
|
||||
builder.field("size", watchExecutionQueueSize);
|
||||
builder.field("max_size", watchExecutionQueueMaxSize);
|
||||
builder.endObject();
|
||||
|
||||
if (snapshots != null) {
|
||||
builder.startArray("current_watches");
|
||||
for (WatchExecutionSnapshot snapshot : snapshots) {
|
||||
snapshot.toXContent(builder, params);
|
||||
}
|
||||
builder.endArray();
|
||||
}
|
||||
|
||||
builder.endObject();
|
||||
return builder;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,63 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.watcher.test.integration;
|
||||
|
||||
import org.apache.lucene.util.LuceneTestCase;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.test.ElasticsearchIntegrationTest;
|
||||
import org.elasticsearch.watcher.WatcherState;
|
||||
import org.elasticsearch.watcher.actions.ActionBuilders;
|
||||
import org.elasticsearch.watcher.condition.ConditionBuilders;
|
||||
import org.elasticsearch.watcher.execution.ExecutionPhase;
|
||||
import org.elasticsearch.watcher.input.InputBuilders;
|
||||
import org.elasticsearch.watcher.test.AbstractWatcherIntegrationTests;
|
||||
import org.elasticsearch.watcher.transport.actions.stats.WatcherStatsResponse;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import static org.elasticsearch.test.ElasticsearchIntegrationTest.Scope.TEST;
|
||||
import static org.elasticsearch.watcher.client.WatchSourceBuilders.watchBuilder;
|
||||
import static org.elasticsearch.watcher.trigger.TriggerBuilders.schedule;
|
||||
import static org.elasticsearch.watcher.trigger.schedule.Schedules.interval;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.notNullValue;
|
||||
|
||||
/**
|
||||
*/
|
||||
@ElasticsearchIntegrationTest.ClusterScope(scope = TEST, numClientNodes = 0, transportClientRatio = 0, randomDynamicTemplates = false, numDataNodes = 1)
|
||||
public class WatchStatsCurrentWatchesTests extends AbstractWatcherIntegrationTests {
|
||||
|
||||
@Override
|
||||
protected boolean timeWarped() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Test
|
||||
@LuceneTestCase.Slow
|
||||
public void testCurrentWatches() throws Exception {
|
||||
watcherClient().preparePutWatch("_id").setSource(watchBuilder()
|
||||
.trigger(schedule(interval("1s")))
|
||||
.input(InputBuilders.simpleInput("key", "value"))
|
||||
.condition(ConditionBuilders.scriptCondition("sleep 10000; return true"))
|
||||
.addAction("_action", ActionBuilders.loggingAction("hello {{ctx.watch_id}}!"))
|
||||
).get();
|
||||
|
||||
assertBusy(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
WatcherStatsResponse response = watcherClient().prepareWatcherStats().setIncludeCurrentWatches(true).get();
|
||||
assertThat(response.getWatcherState(), equalTo(WatcherState.STARTED));
|
||||
assertThat(response.getWatchesCount(), equalTo(1l));
|
||||
assertThat(response.getSnapshots(), notNullValue());
|
||||
assertThat(response.getSnapshots().size(), equalTo(1));
|
||||
assertThat(response.getSnapshots().get(0).watchId(), equalTo("_id"));
|
||||
assertThat(response.getSnapshots().get(0).executionPhase(), equalTo(ExecutionPhase.CONDITION));
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue