diff --git a/plugin/src/main/java/org/elasticsearch/xpack/watcher/execution/ExecutionService.java b/plugin/src/main/java/org/elasticsearch/xpack/watcher/execution/ExecutionService.java index de9f2376352..c7afc31afc0 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/watcher/execution/ExecutionService.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/watcher/execution/ExecutionService.java @@ -399,7 +399,7 @@ public class ExecutionService extends AbstractComponent { executor.execute(new WatchExecutionTask(ctx, () -> execute(ctx))); } catch (EsRejectedExecutionException e) { String message = "failed to run triggered watch [" + triggeredWatch.id() + "] due to thread pool capacity"; - WatchRecord record = ctx.abortBeforeExecution(ExecutionState.FAILED, message); + WatchRecord record = ctx.abortBeforeExecution(ExecutionState.THREADPOOL_REJECTION, message); try { if (ctx.overrideRecordOnConflict()) { historyStore.forcePut(record); diff --git a/plugin/src/main/java/org/elasticsearch/xpack/watcher/execution/ExecutionState.java b/plugin/src/main/java/org/elasticsearch/xpack/watcher/execution/ExecutionState.java index 98de4874bc9..c188090e549 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/watcher/execution/ExecutionState.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/watcher/execution/ExecutionState.java @@ -24,6 +24,9 @@ public enum ExecutionState { // an error in the condition or the execution of the input FAILED, + // a rejection due to a filled up threadpool + THREADPOOL_REJECTION, + // the execution was scheduled, but in between the watch was deleted NOT_EXECUTED_WATCH_MISSING, diff --git a/plugin/src/test/java/org/elasticsearch/xpack/watcher/execution/ExecutionServiceTests.java b/plugin/src/test/java/org/elasticsearch/xpack/watcher/execution/ExecutionServiceTests.java index 4a9fbdbab93..7a89a28f1d0 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/watcher/execution/ExecutionServiceTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/watcher/execution/ExecutionServiceTests.java @@ -55,6 +55,7 @@ import org.elasticsearch.xpack.watcher.watch.Watch; import org.elasticsearch.xpack.watcher.watch.WatchStatus; import org.joda.time.DateTime; import org.junit.Before; +import org.mockito.ArgumentCaptor; import java.io.IOException; import java.time.Clock; @@ -831,7 +832,9 @@ public class ExecutionServiceTests extends ESTestCase { executionService.executeTriggeredWatches(Collections.singleton(triggeredWatch)); verify(triggeredWatchStore, times(1)).delete(wid); - verify(historyStore, times(1)).forcePut(any(WatchRecord.class)); + ArgumentCaptor captor = ArgumentCaptor.forClass(WatchRecord.class); + verify(historyStore, times(1)).forcePut(captor.capture()); + assertThat(captor.getValue().state(), is(ExecutionState.THREADPOOL_REJECTION)); } public void testThatTriggeredWatchDeletionHappensOnlyIfWatchExists() throws Exception {