Watcher: Add thread pool rejection to execution state (elastic/x-pack-elasticsearch#2805)
The execution state of a watch did not differentiate between failures of the execution like a broken painless script and a thread pool rejection. This adds an own state, which allows to aggregate on such data in the watch history, which should ease debugging issues a bit. Original commit: elastic/x-pack-elasticsearch@351e64e14d
This commit is contained in:
parent
96b0b4e96d
commit
940eabd5f3
|
@ -399,7 +399,7 @@ public class ExecutionService extends AbstractComponent {
|
||||||
executor.execute(new WatchExecutionTask(ctx, () -> execute(ctx)));
|
executor.execute(new WatchExecutionTask(ctx, () -> execute(ctx)));
|
||||||
} catch (EsRejectedExecutionException e) {
|
} catch (EsRejectedExecutionException e) {
|
||||||
String message = "failed to run triggered watch [" + triggeredWatch.id() + "] due to thread pool capacity";
|
String message = "failed to run triggered watch [" + triggeredWatch.id() + "] due to thread pool capacity";
|
||||||
WatchRecord record = ctx.abortBeforeExecution(ExecutionState.FAILED, message);
|
WatchRecord record = ctx.abortBeforeExecution(ExecutionState.THREADPOOL_REJECTION, message);
|
||||||
try {
|
try {
|
||||||
if (ctx.overrideRecordOnConflict()) {
|
if (ctx.overrideRecordOnConflict()) {
|
||||||
historyStore.forcePut(record);
|
historyStore.forcePut(record);
|
||||||
|
|
|
@ -24,6 +24,9 @@ public enum ExecutionState {
|
||||||
// an error in the condition or the execution of the input
|
// an error in the condition or the execution of the input
|
||||||
FAILED,
|
FAILED,
|
||||||
|
|
||||||
|
// a rejection due to a filled up threadpool
|
||||||
|
THREADPOOL_REJECTION,
|
||||||
|
|
||||||
// the execution was scheduled, but in between the watch was deleted
|
// the execution was scheduled, but in between the watch was deleted
|
||||||
NOT_EXECUTED_WATCH_MISSING,
|
NOT_EXECUTED_WATCH_MISSING,
|
||||||
|
|
||||||
|
|
|
@ -55,6 +55,7 @@ import org.elasticsearch.xpack.watcher.watch.Watch;
|
||||||
import org.elasticsearch.xpack.watcher.watch.WatchStatus;
|
import org.elasticsearch.xpack.watcher.watch.WatchStatus;
|
||||||
import org.joda.time.DateTime;
|
import org.joda.time.DateTime;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
|
import org.mockito.ArgumentCaptor;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.time.Clock;
|
import java.time.Clock;
|
||||||
|
@ -831,7 +832,9 @@ public class ExecutionServiceTests extends ESTestCase {
|
||||||
executionService.executeTriggeredWatches(Collections.singleton(triggeredWatch));
|
executionService.executeTriggeredWatches(Collections.singleton(triggeredWatch));
|
||||||
|
|
||||||
verify(triggeredWatchStore, times(1)).delete(wid);
|
verify(triggeredWatchStore, times(1)).delete(wid);
|
||||||
verify(historyStore, times(1)).forcePut(any(WatchRecord.class));
|
ArgumentCaptor<WatchRecord> captor = ArgumentCaptor.forClass(WatchRecord.class);
|
||||||
|
verify(historyStore, times(1)).forcePut(captor.capture());
|
||||||
|
assertThat(captor.getValue().state(), is(ExecutionState.THREADPOOL_REJECTION));
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testThatTriggeredWatchDeletionHappensOnlyIfWatchExists() throws Exception {
|
public void testThatTriggeredWatchDeletionHappensOnlyIfWatchExists() throws Exception {
|
||||||
|
|
Loading…
Reference in New Issue