Watcher: Replace System.currentTimeMillis() with nanotime() (elastic/x-pack-elasticsearch#2393)
Enjoy the luxury of monotonically increasing clocks. So that the duration will never be zero. Original commit: elastic/x-pack-elasticsearch@c934ff0adb
This commit is contained in:
parent
1121a15eb7
commit
4cada797d7
|
@ -56,6 +56,7 @@ import java.util.HashMap;
|
||||||
import java.util.LinkedList;
|
import java.util.LinkedList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
|
||||||
import static org.joda.time.DateTimeZone.UTC;
|
import static org.joda.time.DateTimeZone.UTC;
|
||||||
|
@ -460,9 +461,9 @@ public class ExecutionService extends AbstractComponent {
|
||||||
// actions
|
// actions
|
||||||
ctx.beforeActions();
|
ctx.beforeActions();
|
||||||
for (ActionWrapper action : watch.actions()) {
|
for (ActionWrapper action : watch.actions()) {
|
||||||
long now = System.currentTimeMillis();
|
long start = System.nanoTime();
|
||||||
ActionWrapper.Result actionResult = action.execute(ctx);
|
ActionWrapper.Result actionResult = action.execute(ctx);
|
||||||
long executionTime = System.currentTimeMillis() - now;
|
long executionTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
|
||||||
String type = action.action().type();
|
String type = action.action().type();
|
||||||
actionByTypeExecutionTime.putIfAbsent(type, new MeanMetric());
|
actionByTypeExecutionTime.putIfAbsent(type, new MeanMetric());
|
||||||
actionByTypeExecutionTime.get(type).inc(executionTime);
|
actionByTypeExecutionTime.get(type).inc(executionTime);
|
||||||
|
|
|
@ -22,6 +22,7 @@ import java.util.Collections;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.concurrent.ConcurrentMap;
|
import java.util.concurrent.ConcurrentMap;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
public abstract class WatchExecutionContext {
|
public abstract class WatchExecutionContext {
|
||||||
|
|
||||||
|
@ -31,7 +32,7 @@ public abstract class WatchExecutionContext {
|
||||||
private final TimeValue defaultThrottlePeriod;
|
private final TimeValue defaultThrottlePeriod;
|
||||||
|
|
||||||
private ExecutionPhase phase = ExecutionPhase.AWAITS_EXECUTION;
|
private ExecutionPhase phase = ExecutionPhase.AWAITS_EXECUTION;
|
||||||
private long startTimestamp;
|
private long relativeStartTime;
|
||||||
private Watch watch;
|
private Watch watch;
|
||||||
|
|
||||||
private Payload payload;
|
private Payload payload;
|
||||||
|
@ -133,7 +134,7 @@ public abstract class WatchExecutionContext {
|
||||||
|
|
||||||
public void start() {
|
public void start() {
|
||||||
assert phase == ExecutionPhase.AWAITS_EXECUTION;
|
assert phase == ExecutionPhase.AWAITS_EXECUTION;
|
||||||
startTimestamp = System.currentTimeMillis();
|
relativeStartTime = System.nanoTime();
|
||||||
phase = ExecutionPhase.STARTED;
|
phase = ExecutionPhase.STARTED;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -210,24 +211,24 @@ public abstract class WatchExecutionContext {
|
||||||
public WatchRecord abortFailedExecution(String message) {
|
public WatchRecord abortFailedExecution(String message) {
|
||||||
assert !phase.sealed();
|
assert !phase.sealed();
|
||||||
phase = ExecutionPhase.ABORTED;
|
phase = ExecutionPhase.ABORTED;
|
||||||
long executionFinishMs = System.currentTimeMillis();
|
long executionTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - relativeStartTime);
|
||||||
WatchExecutionResult result = new WatchExecutionResult(this, executionFinishMs - startTimestamp);
|
WatchExecutionResult result = new WatchExecutionResult(this, executionTime);
|
||||||
return new WatchRecord.MessageWatchRecord(this, result, message);
|
return new WatchRecord.MessageWatchRecord(this, result, message);
|
||||||
}
|
}
|
||||||
|
|
||||||
public WatchRecord abortFailedExecution(Exception e) {
|
public WatchRecord abortFailedExecution(Exception e) {
|
||||||
assert !phase.sealed();
|
assert !phase.sealed();
|
||||||
phase = ExecutionPhase.ABORTED;
|
phase = ExecutionPhase.ABORTED;
|
||||||
long executionFinishMs = System.currentTimeMillis();
|
long executionTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - relativeStartTime);
|
||||||
WatchExecutionResult result = new WatchExecutionResult(this, executionFinishMs - startTimestamp);
|
WatchExecutionResult result = new WatchExecutionResult(this, executionTime);
|
||||||
return new WatchRecord.ExceptionWatchRecord(this, result, e);
|
return new WatchRecord.ExceptionWatchRecord(this, result, e);
|
||||||
}
|
}
|
||||||
|
|
||||||
public WatchRecord finish() {
|
public WatchRecord finish() {
|
||||||
assert !phase.sealed();
|
assert !phase.sealed();
|
||||||
phase = ExecutionPhase.FINISHED;
|
phase = ExecutionPhase.FINISHED;
|
||||||
long executionFinishMs = System.currentTimeMillis();
|
long executionTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - relativeStartTime);
|
||||||
WatchExecutionResult result = new WatchExecutionResult(this, executionFinishMs - startTimestamp);
|
WatchExecutionResult result = new WatchExecutionResult(this, executionTime);
|
||||||
return new WatchRecord.MessageWatchRecord(this, result);
|
return new WatchRecord.MessageWatchRecord(this, result);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -72,7 +72,7 @@ import static java.util.Arrays.asList;
|
||||||
import static java.util.Collections.singletonMap;
|
import static java.util.Collections.singletonMap;
|
||||||
import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds;
|
import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds;
|
||||||
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
|
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
|
||||||
import static org.hamcrest.Matchers.greaterThan;
|
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
|
||||||
import static org.hamcrest.Matchers.hasSize;
|
import static org.hamcrest.Matchers.hasSize;
|
||||||
import static org.hamcrest.Matchers.instanceOf;
|
import static org.hamcrest.Matchers.instanceOf;
|
||||||
import static org.hamcrest.Matchers.is;
|
import static org.hamcrest.Matchers.is;
|
||||||
|
@ -151,7 +151,12 @@ public class ExecutionServiceTests extends ESTestCase {
|
||||||
|
|
||||||
Condition.Result conditionResult = AlwaysCondition.RESULT_INSTANCE;
|
Condition.Result conditionResult = AlwaysCondition.RESULT_INSTANCE;
|
||||||
Condition condition = mock(Condition.class);
|
Condition condition = mock(Condition.class);
|
||||||
when(condition.execute(any(WatchExecutionContext.class))).thenReturn(conditionResult);
|
// introduce a very short sleep time which we can use to check if the duration in milliseconds is correctly created
|
||||||
|
long randomConditionDurationMs = randomIntBetween(1, 10);
|
||||||
|
when(condition.execute(any(WatchExecutionContext.class))).then(invocationOnMock -> {
|
||||||
|
Thread.sleep(randomConditionDurationMs);
|
||||||
|
return conditionResult;
|
||||||
|
});
|
||||||
|
|
||||||
// watch level transform
|
// watch level transform
|
||||||
Transform.Result watchTransformResult = mock(Transform.Result.class);
|
Transform.Result watchTransformResult = mock(Transform.Result.class);
|
||||||
|
@ -222,6 +227,10 @@ public class ExecutionServiceTests extends ESTestCase {
|
||||||
verify(watchTransform, times(1)).execute(context, payload);
|
verify(watchTransform, times(1)).execute(context, payload);
|
||||||
verify(action, times(1)).execute("_action", context, payload);
|
verify(action, times(1)).execute("_action", context, payload);
|
||||||
|
|
||||||
|
// test execution duration
|
||||||
|
assertThat(watchRecord.result().executionDurationMs(), is(greaterThanOrEqualTo(randomConditionDurationMs)));
|
||||||
|
assertThat(watchRecord.result().executionTime(), is(notNullValue()));
|
||||||
|
|
||||||
// test stats
|
// test stats
|
||||||
XContentSource source = new XContentSource(jsonBuilder().map(executionService.usageStats()));
|
XContentSource source = new XContentSource(jsonBuilder().map(executionService.usageStats()));
|
||||||
assertThat(source.getValue("execution.actions._all.total_time_in_ms"), is(notNullValue()));
|
assertThat(source.getValue("execution.actions._all.total_time_in_ms"), is(notNullValue()));
|
||||||
|
|
|
@ -61,6 +61,7 @@ teardown:
|
||||||
- match: { watch_record.status.state.active: true }
|
- match: { watch_record.status.state.active: true }
|
||||||
- is_true: watch_record.node
|
- is_true: watch_record.node
|
||||||
- match: { watch_record.status.actions.indexme.ack.state: "ackable" }
|
- match: { watch_record.status.actions.indexme.ack.state: "ackable" }
|
||||||
|
- gt: { watch_record.result.execution_duration: 0 }
|
||||||
|
|
||||||
---
|
---
|
||||||
"Test execute watch API with user supplied watch":
|
"Test execute watch API with user supplied watch":
|
||||||
|
|
Loading…
Reference in New Issue