diff --git a/elasticsearch/x-pack/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/ExecutionService.java b/elasticsearch/x-pack/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/ExecutionService.java index 5e4f97e4102..4091f291652 100644 --- a/elasticsearch/x-pack/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/ExecutionService.java +++ b/elasticsearch/x-pack/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/ExecutionService.java @@ -17,6 +17,7 @@ import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; +import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.common.stats.Counters; import org.elasticsearch.xpack.support.clock.Clock; import org.elasticsearch.xpack.watcher.Watcher; @@ -60,13 +61,14 @@ public class ExecutionService extends AbstractComponent { private final Clock clock; private final TimeValue defaultThrottlePeriod; private final TimeValue maxStopTimeout; + private final ThreadPool threadPool; private volatile CurrentExecutions currentExecutions = null; private final AtomicBoolean started = new AtomicBoolean(false); @Inject public ExecutionService(Settings settings, HistoryStore historyStore, TriggeredWatchStore triggeredWatchStore, WatchExecutor executor, - WatchStore watchStore, WatchLockService watchLockService, Clock clock) { + WatchStore watchStore, WatchLockService watchLockService, Clock clock, ThreadPool threadPool) { super(settings); this.historyStore = historyStore; this.triggeredWatchStore = triggeredWatchStore; @@ -76,6 +78,7 @@ public class ExecutionService extends AbstractComponent { this.clock = clock; this.defaultThrottlePeriod = DEFAULT_THROTTLE_PERIOD_SETTING.get(settings); this.maxStopTimeout = Watcher.MAX_STOP_TIMEOUT_SETTING.get(settings); + this.threadPool = threadPool; } public void start(ClusterState state) throws Exception { @@ -323,20 +326,36 @@ public class ExecutionService extends AbstractComponent { thread pool that executes the watches is completely busy, we don't lose the fact that the watch was triggered (it'll have its history record) */ - - private void executeAsync(WatchExecutionContext ctx, TriggeredWatch triggeredWatch) throws Exception { + private void executeAsync(WatchExecutionContext ctx, final TriggeredWatch triggeredWatch) { try { executor.execute(new WatchExecutionTask(ctx)); } catch (EsRejectedExecutionException e) { - String message = "failed to run triggered watch [" + triggeredWatch.id() + "] due to thread pool capacity"; - logger.debug("{}", message); - WatchRecord record = ctx.abortBeforeExecution(ExecutionState.FAILED, message); - if (ctx.overrideRecordOnConflict()) { - historyStore.forcePut(record); - } else { - historyStore.put(record); - } - triggeredWatchStore.delete(triggeredWatch.id()); + // we are still in the transport thread here most likely, so we cannot run heavy operations + // this means some offloading needs to be done for indexing into the history and delete the triggered watches entry + threadPool.generic().execute(() -> { + String message = "failed to run triggered watch [" + triggeredWatch.id() + "] due to thread pool capacity"; + logger.debug("{}", message); + WatchRecord record = ctx.abortBeforeExecution(ExecutionState.FAILED, message); + try { + if (ctx.overrideRecordOnConflict()) { + historyStore.forcePut(record); + } else { + historyStore.put(record); + } + } catch (Exception exc) { + logger.error((Supplier) () -> + new ParameterizedMessage("Error storing watch history record for watch [{}] after thread pool rejection", + triggeredWatch.id()), exc); + } + + try { + triggeredWatchStore.delete(triggeredWatch.id()); + } catch (Exception exc) { + logger.error((Supplier) () -> + new ParameterizedMessage("Error deleting triggered watch store record for watch [{}] after thread pool " + + "rejection", triggeredWatch.id()), exc); + } + }); } } diff --git a/elasticsearch/x-pack/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/TriggeredWatchStore.java b/elasticsearch/x-pack/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/TriggeredWatchStore.java index c1b64a9a630..b964d0b74b2 100644 --- a/elasticsearch/x-pack/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/TriggeredWatchStore.java +++ b/elasticsearch/x-pack/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/TriggeredWatchStore.java @@ -222,7 +222,7 @@ public class TriggeredWatchStore extends AbstractComponent { } } - public void delete(Wid wid) throws Exception { + public void delete(Wid wid) { ensureStarted(); accessLock.lock(); try { diff --git a/elasticsearch/x-pack/watcher/src/test/java/org/elasticsearch/xpack/watcher/execution/ExecutionServiceTests.java b/elasticsearch/x-pack/watcher/src/test/java/org/elasticsearch/xpack/watcher/execution/ExecutionServiceTests.java index 97846e9ba02..4ff96fe3597 100644 --- a/elasticsearch/x-pack/watcher/src/test/java/org/elasticsearch/xpack/watcher/execution/ExecutionServiceTests.java +++ b/elasticsearch/x-pack/watcher/src/test/java/org/elasticsearch/xpack/watcher/execution/ExecutionServiceTests.java @@ -5,10 +5,13 @@ */ package org.elasticsearch.xpack.watcher.execution; +import org.elasticsearch.ElasticsearchException; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.support.clock.Clock; import org.elasticsearch.xpack.support.clock.ClockMock; import org.elasticsearch.xpack.watcher.actions.Action; @@ -41,7 +44,9 @@ import org.junit.Before; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.Executor; import static java.util.Collections.singletonMap; import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds; @@ -51,7 +56,9 @@ import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; import static org.hamcrest.Matchers.sameInstance; +import static org.joda.time.DateTime.now; import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; @@ -68,15 +75,16 @@ public class ExecutionServiceTests extends ESTestCase { private Input.Result inputResult; private WatchStore watchStore; + private TriggeredWatchStore triggeredWatchStore; + private WatchExecutor executor; private HistoryStore historyStore; private WatchLockService watchLockService; private ExecutionService executionService; private Clock clock; + private ThreadPool threadPool; @Before public void init() throws Exception { - TriggeredWatchStore triggeredWatchStore; - payload = mock(Payload.class); input = mock(ExecutableInput.class); inputResult = mock(Input.Result.class); @@ -88,13 +96,14 @@ public class ExecutionServiceTests extends ESTestCase { triggeredWatchStore = mock(TriggeredWatchStore.class); historyStore = mock(HistoryStore.class); - WatchExecutor executor = mock(WatchExecutor.class); + executor = mock(WatchExecutor.class); when(executor.queue()).thenReturn(new ArrayBlockingQueue<>(1)); watchLockService = mock(WatchLockService.class); clock = new ClockMock(); + threadPool = mock(ThreadPool.class); executionService = new ExecutionService(Settings.EMPTY, historyStore, triggeredWatchStore, executor, watchStore, - watchLockService, clock); + watchLockService, clock, threadPool); ClusterState clusterState = mock(ClusterState.class); when(triggeredWatchStore.loadTriggeredWatches(clusterState)).thenReturn(new ArrayList<>()); @@ -483,7 +492,7 @@ public class ExecutionServiceTests extends ESTestCase { } public void testExecuteInner() throws Exception { - DateTime now = DateTime.now(DateTimeZone.UTC); + DateTime now = now(DateTimeZone.UTC); Watch watch = mock(Watch.class); ScheduleTriggerEvent event = new ScheduleTriggerEvent("_id", now, now); WatchExecutionContext context = new TriggeredExecutionContext(watch, now, event, timeValueSeconds(5)); @@ -560,7 +569,7 @@ public class ExecutionServiceTests extends ESTestCase { } public void testExecuteInnerThrottled() throws Exception { - DateTime now = DateTime.now(DateTimeZone.UTC); + DateTime now = now(DateTimeZone.UTC); Watch watch = mock(Watch.class); ScheduleTriggerEvent event = new ScheduleTriggerEvent("_id", now, now); WatchExecutionContext context = new TriggeredExecutionContext(watch, now, event, timeValueSeconds(5)); @@ -613,7 +622,7 @@ public class ExecutionServiceTests extends ESTestCase { } public void testExecuteInnerConditionNotMet() throws Exception { - DateTime now = DateTime.now(DateTimeZone.UTC); + DateTime now = now(DateTimeZone.UTC); Watch watch = mock(Watch.class); ScheduleTriggerEvent event = new ScheduleTriggerEvent("_id", now, now); WatchExecutionContext context = new TriggeredExecutionContext(watch, now, event, timeValueSeconds(5)); @@ -774,6 +783,28 @@ public class ExecutionServiceTests extends ESTestCase { verify(action, never()).execute("_action", context, payload); } + public void testThatTriggeredWatchDeletionWorksOnExecutionRejection() throws Exception { + Watch watch = mock(Watch.class); + when(watch.id()).thenReturn("foo"); + when(watch.nonce()).thenReturn(1L); + when(watchStore.get(any())).thenReturn(watch); + + // execute needs to fail as well as storing the history + doThrow(new EsRejectedExecutionException()).when(executor).execute(any()); + doThrow(new ElasticsearchException("whatever")).when(historyStore).forcePut(any()); + + Wid wid = new Wid(watch.id(), watch.nonce(), now()); + + Executor currentThreadExecutor = command -> command.run(); + when(threadPool.generic()).thenReturn(currentThreadExecutor); + + TriggeredWatch triggeredWatch = new TriggeredWatch(wid, new ScheduleTriggerEvent(now() ,now())); + executionService.executeTriggeredWatches(Collections.singleton(triggeredWatch)); + + verify(triggeredWatchStore, times(1)).delete(wid); + verify(historyStore, times(1)).forcePut(any(WatchRecord.class)); + } + private Tuple whenCondition(final WatchExecutionContext context) { Condition.Result conditionResult = mock(Condition.Result.class); when(conditionResult.met()).thenReturn(true); @@ -791,5 +822,4 @@ public class ExecutionServiceTests extends ESTestCase { return new Tuple<>(transform, transformResult); } - }