Add execution_duration to watch history records.
This change add the actual length of time a watch spends executing. This is useful to find watches that take long to execute to pinpoint those watches that may be candidates for throttling. Add the execution_duration as a number of milliseconds rather than a timevalue so it can be aggregated from the watch_history index. Original commit: elastic/x-pack-elasticsearch@0036468f55
This commit is contained in:
parent
2f48d980b3
commit
46c111b016
|
@ -272,6 +272,7 @@ public class ExecutionService extends AbstractComponent {
|
||||||
}
|
}
|
||||||
|
|
||||||
WatchExecutionResult executeInner(WatchExecutionContext ctx) throws IOException {
|
WatchExecutionResult executeInner(WatchExecutionContext ctx) throws IOException {
|
||||||
|
ctx.start();
|
||||||
Watch watch = ctx.watch();
|
Watch watch = ctx.watch();
|
||||||
ctx.beforeInput();
|
ctx.beforeInput();
|
||||||
Input.Result inputResult = ctx.inputResult();
|
Input.Result inputResult = ctx.inputResult();
|
||||||
|
|
|
@ -42,6 +42,8 @@ public abstract class WatchExecutionContext {
|
||||||
|
|
||||||
private volatile ExecutionPhase executionPhase = ExecutionPhase.AWAITS_EXECUTION;
|
private volatile ExecutionPhase executionPhase = ExecutionPhase.AWAITS_EXECUTION;
|
||||||
|
|
||||||
|
private long actualExecutionStartMs;
|
||||||
|
|
||||||
public WatchExecutionContext(Watch watch, DateTime executionTime, TriggerEvent triggerEvent, TimeValue defaultThrottlePeriod) {
|
public WatchExecutionContext(Watch watch, DateTime executionTime, TriggerEvent triggerEvent, TimeValue defaultThrottlePeriod) {
|
||||||
this.id = new Wid(watch.id(), watch.nonce(), executionTime);
|
this.id = new Wid(watch.id(), watch.nonce(), executionTime);
|
||||||
this.watch = watch;
|
this.watch = watch;
|
||||||
|
@ -171,12 +173,18 @@ public abstract class WatchExecutionContext {
|
||||||
return new ExecutableActions.Results(actionsResults);
|
return new ExecutableActions.Results(actionsResults);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void start() {
|
||||||
|
actualExecutionStartMs = System.currentTimeMillis();
|
||||||
|
}
|
||||||
|
|
||||||
public WatchExecutionResult finish() {
|
public WatchExecutionResult finish() {
|
||||||
executionPhase = ExecutionPhase.FINISHED;
|
executionPhase = ExecutionPhase.FINISHED;
|
||||||
return new WatchExecutionResult(this);
|
long executionFinishMs = System.currentTimeMillis();
|
||||||
|
return new WatchExecutionResult(this, executionFinishMs - actualExecutionStartMs);
|
||||||
}
|
}
|
||||||
|
|
||||||
public WatchExecutionSnapshot createSnapshot(Thread executionThread) {
|
public WatchExecutionSnapshot createSnapshot(Thread executionThread) {
|
||||||
return new WatchExecutionSnapshot(this, executionThread.getStackTrace());
|
return new WatchExecutionSnapshot(this, executionThread.getStackTrace());
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -32,27 +32,33 @@ import static org.elasticsearch.common.joda.time.DateTimeZone.UTC;
|
||||||
public class WatchExecutionResult implements ToXContent {
|
public class WatchExecutionResult implements ToXContent {
|
||||||
|
|
||||||
private final DateTime executionTime;
|
private final DateTime executionTime;
|
||||||
|
private final long executionDurationMs;
|
||||||
private final Input.Result inputResult;
|
private final Input.Result inputResult;
|
||||||
private final Condition.Result conditionResult;
|
private final Condition.Result conditionResult;
|
||||||
private final @Nullable Transform.Result transformResult;
|
private final @Nullable Transform.Result transformResult;
|
||||||
private final ExecutableActions.Results actionsResults;
|
private final ExecutableActions.Results actionsResults;
|
||||||
|
|
||||||
public WatchExecutionResult(WatchExecutionContext context) {
|
public WatchExecutionResult(WatchExecutionContext context, long executionDurationMs) {
|
||||||
this(context.executionTime(), context.inputResult(), context.conditionResult(), context.transformResult(), context.actionsResults());
|
this(context.executionTime(), executionDurationMs, context.inputResult(), context.conditionResult(), context.transformResult(), context.actionsResults());
|
||||||
}
|
}
|
||||||
|
|
||||||
WatchExecutionResult(DateTime executionTime, Input.Result inputResult, Condition.Result conditionResult, @Nullable Transform.Result transformResult, ExecutableActions.Results actionsResults) {
|
WatchExecutionResult(DateTime executionTime, long executionDurationMs, Input.Result inputResult, Condition.Result conditionResult, @Nullable Transform.Result transformResult, ExecutableActions.Results actionsResults) {
|
||||||
this.executionTime = executionTime;
|
this.executionTime = executionTime;
|
||||||
this.inputResult = inputResult;
|
this.inputResult = inputResult;
|
||||||
this.conditionResult = conditionResult;
|
this.conditionResult = conditionResult;
|
||||||
this.transformResult = transformResult;
|
this.transformResult = transformResult;
|
||||||
this.actionsResults = actionsResults;
|
this.actionsResults = actionsResults;
|
||||||
|
this.executionDurationMs = executionDurationMs;
|
||||||
}
|
}
|
||||||
|
|
||||||
public DateTime executionTime() {
|
public DateTime executionTime() {
|
||||||
return executionTime;
|
return executionTime;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public long executionDurationMs() {
|
||||||
|
return executionDurationMs;
|
||||||
|
}
|
||||||
|
|
||||||
public Input.Result inputResult() {
|
public Input.Result inputResult() {
|
||||||
return inputResult;
|
return inputResult;
|
||||||
}
|
}
|
||||||
|
@ -72,7 +78,10 @@ public class WatchExecutionResult implements ToXContent {
|
||||||
@Override
|
@Override
|
||||||
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
||||||
builder.startObject();
|
builder.startObject();
|
||||||
|
|
||||||
WatcherDateTimeUtils.writeDate(Field.EXECUTION_TIME.getPreferredName(), builder, executionTime);
|
WatcherDateTimeUtils.writeDate(Field.EXECUTION_TIME.getPreferredName(), builder, executionTime);
|
||||||
|
builder.field(Field.EXECUTION_DURATION.getPreferredName(), executionDurationMs);
|
||||||
|
|
||||||
if (inputResult != null) {
|
if (inputResult != null) {
|
||||||
builder.startObject(Field.INPUT.getPreferredName())
|
builder.startObject(Field.INPUT.getPreferredName())
|
||||||
.field(inputResult.type(), inputResult, params)
|
.field(inputResult.type(), inputResult, params)
|
||||||
|
@ -97,6 +106,9 @@ public class WatchExecutionResult implements ToXContent {
|
||||||
public static WatchExecutionResult parse(Wid wid, XContentParser parser, ConditionRegistry conditionRegistry, ActionRegistry actionRegistry,
|
public static WatchExecutionResult parse(Wid wid, XContentParser parser, ConditionRegistry conditionRegistry, ActionRegistry actionRegistry,
|
||||||
InputRegistry inputRegistry, TransformRegistry transformRegistry) throws IOException {
|
InputRegistry inputRegistry, TransformRegistry transformRegistry) throws IOException {
|
||||||
DateTime executionTime = null;
|
DateTime executionTime = null;
|
||||||
|
long executionDurationMs = 0;
|
||||||
|
boolean throttled = false;
|
||||||
|
String throttleReason = null;
|
||||||
ExecutableActions.Results actionResults = null;
|
ExecutableActions.Results actionResults = null;
|
||||||
Input.Result inputResult = null;
|
Input.Result inputResult = null;
|
||||||
Condition.Result conditionResult = null;
|
Condition.Result conditionResult = null;
|
||||||
|
@ -121,6 +133,8 @@ public class WatchExecutionResult implements ToXContent {
|
||||||
conditionResult = conditionRegistry.parseResult(wid.watchId(), parser);
|
conditionResult = conditionRegistry.parseResult(wid.watchId(), parser);
|
||||||
} else if (Transform.Field.TRANSFORM.match(currentFieldName)) {
|
} else if (Transform.Field.TRANSFORM.match(currentFieldName)) {
|
||||||
transformResult = transformRegistry.parseResult(wid.watchId(), parser);
|
transformResult = transformRegistry.parseResult(wid.watchId(), parser);
|
||||||
|
} else if (Field.EXECUTION_DURATION.match(currentFieldName) && token.isValue()) {
|
||||||
|
executionDurationMs = parser.longValue();
|
||||||
} else {
|
} else {
|
||||||
throw new WatcherException("could not parse watch execution [{}]. unexpected field [{}]", wid, currentFieldName);
|
throw new WatcherException("could not parse watch execution [{}]. unexpected field [{}]", wid, currentFieldName);
|
||||||
}
|
}
|
||||||
|
@ -128,7 +142,8 @@ public class WatchExecutionResult implements ToXContent {
|
||||||
if (executionTime == null) {
|
if (executionTime == null) {
|
||||||
throw new WatcherException("could not parse watch execution [{}]. missing required date field [{}]", wid, Field.EXECUTION_TIME.getPreferredName());
|
throw new WatcherException("could not parse watch execution [{}]. missing required date field [{}]", wid, Field.EXECUTION_TIME.getPreferredName());
|
||||||
}
|
}
|
||||||
return new WatchExecutionResult(executionTime, inputResult, conditionResult, transformResult, actionResults);
|
|
||||||
|
return new WatchExecutionResult(executionTime, executionDurationMs, inputResult, conditionResult, transformResult, actionResults);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -137,5 +152,6 @@ public class WatchExecutionResult implements ToXContent {
|
||||||
ParseField INPUT = new ParseField("input");
|
ParseField INPUT = new ParseField("input");
|
||||||
ParseField CONDITION = new ParseField("condition");
|
ParseField CONDITION = new ParseField("condition");
|
||||||
ParseField ACTIONS = new ParseField("actions");
|
ParseField ACTIONS = new ParseField("actions");
|
||||||
|
ParseField EXECUTION_DURATION = new ParseField("execution_duration");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -78,6 +78,9 @@
|
||||||
"execution_time": {
|
"execution_time": {
|
||||||
"type": "date"
|
"type": "date"
|
||||||
},
|
},
|
||||||
|
"execution_duration": {
|
||||||
|
"type": "long"
|
||||||
|
},
|
||||||
"input": {
|
"input": {
|
||||||
"type": "object",
|
"type": "object",
|
||||||
"dynamic": true,
|
"dynamic": true,
|
||||||
|
|
|
@ -239,6 +239,19 @@ public class ManualExecutionTests extends AbstractWatcherIntegrationTests {
|
||||||
assertThat(watchRecord.state(), equalTo(WatchRecord.State.THROTTLED));
|
assertThat(watchRecord.state(), equalTo(WatchRecord.State.THROTTLED));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testWatchExecutionDuration() throws Exception {
|
||||||
|
WatchSourceBuilder watchBuilder = watchBuilder()
|
||||||
|
.trigger(schedule(cron("0 0 0 1 * ? 2099")))
|
||||||
|
.input(simpleInput("foo", "bar"))
|
||||||
|
.condition(new ScriptCondition((new Script.Builder.Inline("sleep 100; return true")).build()))
|
||||||
|
.addAction("log", loggingAction("foobar"));
|
||||||
|
|
||||||
|
Watch watch = watchParser().parse("_id", false, watchBuilder.buildAsBytes(XContentType.JSON));
|
||||||
|
ManualExecutionContext.Builder ctxBuilder = ManualExecutionContext.builder(watch, new ManualTriggerEvent("_id", new ScheduleTriggerEvent(new DateTime(UTC), new DateTime(UTC))), new TimeValue(1, TimeUnit.HOURS));
|
||||||
|
WatchRecord record = executionService().execute(ctxBuilder.build());
|
||||||
|
assertThat(record.execution().executionDurationMs(), greaterThanOrEqualTo(100L));
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@Slow
|
@Slow
|
||||||
|
|
|
@ -17,6 +17,7 @@ import org.elasticsearch.watcher.condition.Condition;
|
||||||
import org.elasticsearch.watcher.condition.always.AlwaysCondition;
|
import org.elasticsearch.watcher.condition.always.AlwaysCondition;
|
||||||
import org.elasticsearch.watcher.execution.TriggeredExecutionContext;
|
import org.elasticsearch.watcher.execution.TriggeredExecutionContext;
|
||||||
import org.elasticsearch.watcher.execution.WatchExecutionContext;
|
import org.elasticsearch.watcher.execution.WatchExecutionContext;
|
||||||
|
import org.elasticsearch.watcher.execution.WatchExecutionResult;
|
||||||
import org.elasticsearch.watcher.execution.Wid;
|
import org.elasticsearch.watcher.execution.Wid;
|
||||||
import org.elasticsearch.watcher.input.simple.SimpleInput;
|
import org.elasticsearch.watcher.input.simple.SimpleInput;
|
||||||
import org.elasticsearch.watcher.support.http.HttpRequest;
|
import org.elasticsearch.watcher.support.http.HttpRequest;
|
||||||
|
@ -26,7 +27,6 @@ import org.elasticsearch.watcher.test.WatcherTestUtils;
|
||||||
import org.elasticsearch.watcher.trigger.schedule.ScheduleTriggerEvent;
|
import org.elasticsearch.watcher.trigger.schedule.ScheduleTriggerEvent;
|
||||||
import org.elasticsearch.watcher.watch.Payload;
|
import org.elasticsearch.watcher.watch.Payload;
|
||||||
import org.elasticsearch.watcher.watch.Watch;
|
import org.elasticsearch.watcher.watch.Watch;
|
||||||
import org.elasticsearch.watcher.execution.WatchExecutionResult;
|
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
import static org.elasticsearch.common.joda.time.DateTimeZone.UTC;
|
import static org.elasticsearch.common.joda.time.DateTimeZone.UTC;
|
||||||
|
@ -70,8 +70,9 @@ public class WatchRecordTests extends AbstractWatcherIntegrationTests {
|
||||||
Condition.Result conditionResult = AlwaysCondition.Result.INSTANCE;
|
Condition.Result conditionResult = AlwaysCondition.Result.INSTANCE;
|
||||||
ctx.onInputResult(inputResult);
|
ctx.onInputResult(inputResult);
|
||||||
ctx.onConditionResult(conditionResult);
|
ctx.onConditionResult(conditionResult);
|
||||||
watchRecord.seal(new WatchExecutionResult(ctx));
|
long watchExecutionDuration = randomIntBetween(30, 100000);
|
||||||
|
watchRecord.seal(new WatchExecutionResult(ctx, watchExecutionDuration));
|
||||||
|
assertThat(watchRecord.execution().executionDurationMs(), equalTo(watchExecutionDuration));
|
||||||
XContentBuilder jsonBuilder = XContentFactory.jsonBuilder();
|
XContentBuilder jsonBuilder = XContentFactory.jsonBuilder();
|
||||||
watchRecord.toXContent(jsonBuilder, ToXContent.EMPTY_PARAMS);
|
watchRecord.toXContent(jsonBuilder, ToXContent.EMPTY_PARAMS);
|
||||||
WatchRecord parsedWatchRecord = watchRecordParser().parse(watchRecord.id().value(), 0, jsonBuilder.bytes());
|
WatchRecord parsedWatchRecord = watchRecordParser().parse(watchRecord.id().value(), 0, jsonBuilder.bytes());
|
||||||
|
@ -98,8 +99,9 @@ public class WatchRecordTests extends AbstractWatcherIntegrationTests {
|
||||||
Condition.Result conditionResult = AlwaysCondition.Result.INSTANCE;
|
Condition.Result conditionResult = AlwaysCondition.Result.INSTANCE;
|
||||||
ctx.onInputResult(inputResult);
|
ctx.onInputResult(inputResult);
|
||||||
ctx.onConditionResult(conditionResult);
|
ctx.onConditionResult(conditionResult);
|
||||||
watchRecord.seal(new WatchExecutionResult(ctx));
|
long watchExecutionDuration = randomIntBetween(30, 100000);
|
||||||
|
watchRecord.seal(new WatchExecutionResult(ctx, watchExecutionDuration));
|
||||||
|
assertThat(watchRecord.execution().executionDurationMs(), equalTo(watchExecutionDuration));
|
||||||
XContentBuilder jsonBuilder = XContentFactory.jsonBuilder();
|
XContentBuilder jsonBuilder = XContentFactory.jsonBuilder();
|
||||||
watchRecord.toXContent(jsonBuilder, ToXContent.EMPTY_PARAMS);
|
watchRecord.toXContent(jsonBuilder, ToXContent.EMPTY_PARAMS);
|
||||||
WatchRecord parsedWatchRecord = watchRecordParser().parse(watchRecord.id().value(), 0, jsonBuilder.bytes());
|
WatchRecord parsedWatchRecord = watchRecordParser().parse(watchRecord.id().value(), 0, jsonBuilder.bytes());
|
||||||
|
|
Loading…
Reference in New Issue