From 0e6cbbd8116b9d97e9f0c9a756e3de200e623d07 Mon Sep 17 00:00:00 2001 From: Alexander Reelsen Date: Fri, 4 May 2018 09:02:04 +0200 Subject: [PATCH] Watcher: Ensure trigger service pauses execution (#30363) When the watcher service pauses execution due to a cluster state update, the trigger service and its engines also need to pause properly instead of keeping going. This is also important when the .watches index is deleted, so that watches don't stay in a triggered mode. --- .../xpack/watcher/WatcherService.java | 2 ++ .../xpack/watcher/WatcherServiceTests.java | 34 +++++++++++++++++++ 2 files changed, 36 insertions(+) diff --git a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherService.java b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherService.java index d280a150e8d..dcfb713a665 100644 --- a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherService.java +++ b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherService.java @@ -167,6 +167,7 @@ public class WatcherService extends AbstractComponent { void stopExecutor() { ThreadPool.terminate(executor, 10L, TimeUnit.SECONDS); } + /** * Reload the watcher service, does not switch the state from stopped to started, just keep going * @param state cluster state, which is needed to find out about local shards @@ -231,6 +232,7 @@ public class WatcherService extends AbstractComponent { * manual watch execution, i.e. via the execute watch API */ public void pauseExecution(String reason) { + triggerService.pauseExecution(); int cancelledTaskCount = executionService.pause(); logger.info("paused watch execution, reason [{}], cancelled [{}] queued tasks", reason, cancelledTaskCount); } diff --git a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherServiceTests.java b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherServiceTests.java index 92726fb94cd..5f815170215 100644 --- a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherServiceTests.java +++ b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherServiceTests.java @@ -43,10 +43,14 @@ import org.elasticsearch.search.SearchShardTarget; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.XPackSettings; +import org.elasticsearch.xpack.core.watcher.trigger.Trigger; import org.elasticsearch.xpack.core.watcher.watch.Watch; import org.elasticsearch.xpack.core.watcher.watch.WatchStatus; +import org.elasticsearch.xpack.watcher.condition.InternalAlwaysCondition; import org.elasticsearch.xpack.watcher.execution.ExecutionService; import org.elasticsearch.xpack.watcher.execution.TriggeredWatchStore; +import org.elasticsearch.xpack.watcher.input.none.ExecutableNoneInput; +import org.elasticsearch.xpack.watcher.trigger.TriggerEngine; import org.elasticsearch.xpack.watcher.trigger.TriggerService; import org.elasticsearch.xpack.watcher.watch.WatchParser; import org.joda.time.DateTime; @@ -204,6 +208,36 @@ public class WatcherServiceTests extends ESTestCase { assertThat(watches, hasSize(activeWatchCount)); } + public void testPausingWatcherServiceAlsoPausesTriggerService() { + String engineType = "foo"; + TriggerEngine triggerEngine = mock(TriggerEngine.class); + when(triggerEngine.type()).thenReturn(engineType); + TriggerService triggerService = new TriggerService(Settings.EMPTY, Collections.singleton(triggerEngine)); + + Trigger trigger = mock(Trigger.class); + when(trigger.type()).thenReturn(engineType); + + Watch watch = mock(Watch.class); + when(watch.trigger()).thenReturn(trigger); + when(watch.condition()).thenReturn(InternalAlwaysCondition.INSTANCE); + ExecutableNoneInput noneInput = new ExecutableNoneInput(logger); + when(watch.input()).thenReturn(noneInput); + + triggerService.add(watch); + assertThat(triggerService.count(), is(1L)); + + WatcherService service = new WatcherService(Settings.EMPTY, triggerService, mock(TriggeredWatchStore.class), + mock(ExecutionService.class), mock(WatchParser.class), mock(Client.class), executorService) { + @Override + void stopExecutor() { + } + }; + + service.pauseExecution("pausing"); + assertThat(triggerService.count(), is(0L)); + verify(triggerEngine).pauseExecution(); + } + private static DiscoveryNode newNode() { return new DiscoveryNode("node", ESTestCase.buildNewFakeTransportAddress(), Collections.emptyMap(), new HashSet<>(asList(DiscoveryNode.Role.values())), Version.CURRENT);