diff --git a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleService.java b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleService.java index fca29821bfa..f1dd7be1965 100644 --- a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleService.java +++ b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleService.java @@ -5,6 +5,8 @@ */ package org.elasticsearch.xpack.watcher; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateListener; @@ -25,6 +27,7 @@ import org.elasticsearch.xpack.watcher.watch.WatchStoreUtils; import java.util.Collections; import java.util.Comparator; +import java.util.EnumSet; import java.util.List; import java.util.Set; import java.util.concurrent.atomic.AtomicReference; @@ -35,10 +38,12 @@ import static org.elasticsearch.cluster.routing.ShardRoutingState.STARTED; public class WatcherLifeCycleService implements ClusterStateListener { + private static final Logger logger = LogManager.getLogger(WatcherLifeCycleService.class); private final AtomicReference state = new AtomicReference<>(WatcherState.STARTED); private final AtomicReference> previousShardRoutings = new AtomicReference<>(Collections.emptyList()); private volatile boolean shutDown = false; // indicates that the node has been shutdown and we should never start watcher after this. private volatile WatcherService watcherService; + private final EnumSet stopStates = EnumSet.of(WatcherState.STOPPED, WatcherState.STOPPING); WatcherLifeCycleService(ClusterService clusterService, WatcherService watcherService) { this.watcherService = watcherService; @@ -57,8 +62,10 @@ public class WatcherLifeCycleService implements ClusterStateListener { this.state.set(WatcherState.STOPPING); shutDown = true; clearAllocationIds(); - watcherService.shutDown(); - this.state.set(WatcherState.STOPPED); + watcherService.shutDown(() -> { + this.state.set(WatcherState.STOPPED); + logger.info("watcher has stopped and shutdown"); + }); } /** @@ -88,9 +95,10 @@ public class WatcherLifeCycleService implements ClusterStateListener { } boolean isWatcherStoppedManually = isWatcherStoppedManually(event.state()); + boolean isStoppedOrStopping = stopStates.contains(this.state.get()); // if this is not a data node, we need to start it ourselves possibly if (event.state().nodes().getLocalNode().isDataNode() == false && - isWatcherStoppedManually == false && this.state.get() == WatcherState.STOPPED) { + isWatcherStoppedManually == false && isStoppedOrStopping) { this.state.set(WatcherState.STARTING); watcherService.start(event.state(), () -> this.state.set(WatcherState.STARTED)); return; @@ -99,8 +107,20 @@ public class WatcherLifeCycleService implements ClusterStateListener { if (isWatcherStoppedManually) { if (this.state.get() == WatcherState.STARTED) { clearAllocationIds(); - watcherService.stop("watcher manually marked to shutdown by cluster state update"); - this.state.set(WatcherState.STOPPED); + boolean stopping = this.state.compareAndSet(WatcherState.STARTED, WatcherState.STOPPING); + if (stopping) { + //waiting to set state to stopped until after all currently running watches are finished + watcherService.stop("watcher manually marked to shutdown by cluster state update", () -> { + //only transition from stopping -> stopped (which may not be the case if restarted quickly) + boolean stopped = state.compareAndSet(WatcherState.STOPPING, WatcherState.STOPPED); + if (stopped) { + logger.info("watcher has stopped"); + } else { + logger.info("watcher has not been stopped. not currently in a stopping state, current state [{}]", state.get()); + } + + }); + } } return; } @@ -142,7 +162,7 @@ public class WatcherLifeCycleService implements ClusterStateListener { previousShardRoutings.set(localAffectedShardRoutings); if (state.get() == WatcherState.STARTED) { watcherService.reload(event.state(), "new local watcher shard allocation ids"); - } else if (state.get() == WatcherState.STOPPED) { + } else if (isStoppedOrStopping) { this.state.set(WatcherState.STARTING); watcherService.start(event.state(), () -> this.state.set(WatcherState.STARTED)); } diff --git a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherService.java b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherService.java index c96203bd642..32031e78f5e 100644 --- a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherService.java +++ b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherService.java @@ -35,6 +35,7 @@ import org.elasticsearch.search.sort.SortBuilders; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.ClientHelper; import org.elasticsearch.xpack.core.upgrade.UpgradeField; +import org.elasticsearch.xpack.core.watcher.WatcherState; import org.elasticsearch.xpack.core.watcher.execution.TriggeredWatchStoreField; import org.elasticsearch.xpack.core.watcher.watch.Watch; import org.elasticsearch.xpack.watcher.execution.ExecutionService; @@ -144,24 +145,29 @@ public class WatcherService { } /** - * Stops the watcher service and marks its services as paused + * Stops the watcher service and marks its services as paused. Callers should set the Watcher state to {@link WatcherState#STOPPING} + * prior to calling this method. + * + * @param stoppedListener The listener that will set Watcher state to: {@link WatcherState#STOPPED}, may not be {@code null} */ - public void stop(String reason) { + public void stop(String reason, Runnable stoppedListener) { + assert stoppedListener != null; logger.info("stopping watch service, reason [{}]", reason); - executionService.pause(); + executionService.pause(stoppedListener); triggerService.pauseExecution(); } /** * shuts down the trigger service as well to make sure there are no lingering threads - * also no need to check anything, as this is final, we just can go to status STOPPED + * + * @param stoppedListener The listener that will set Watcher state to: {@link WatcherState#STOPPED}, may not be {@code null} */ - void shutDown() { + void shutDown(Runnable stoppedListener) { + assert stoppedListener != null; logger.info("stopping watch service, reason [shutdown initiated]"); - executionService.pause(); + executionService.pause(stoppedListener); triggerService.stop(); stopExecutor(); - logger.debug("watch service has stopped"); } void stopExecutor() { @@ -185,7 +191,7 @@ public class WatcherService { processedClusterStateVersion.set(state.getVersion()); triggerService.pauseExecution(); - int cancelledTaskCount = executionService.clearExecutionsAndQueue(); + int cancelledTaskCount = executionService.clearExecutionsAndQueue(() -> {}); logger.info("reloading watcher, reason [{}], cancelled [{}] queued tasks", reason, cancelledTaskCount); executor.execute(wrapWatcherService(() -> reloadInner(state, reason, false), @@ -256,7 +262,7 @@ public class WatcherService { */ public void pauseExecution(String reason) { triggerService.pauseExecution(); - int cancelledTaskCount = executionService.pause(); + int cancelledTaskCount = executionService.pause(() -> {}); logger.info("paused watch execution, reason [{}], cancelled [{}] queued tasks", reason, cancelledTaskCount); } diff --git a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/CurrentExecutions.java b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/CurrentExecutions.java index 95ac8030036..9e76cbcffca 100644 --- a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/CurrentExecutions.java +++ b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/CurrentExecutions.java @@ -5,8 +5,11 @@ */ package org.elasticsearch.xpack.watcher.execution; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.apache.lucene.util.SetOnce; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.xpack.core.watcher.WatcherState; import java.util.Iterator; import java.util.concurrent.ConcurrentHashMap; @@ -19,6 +22,7 @@ import static org.elasticsearch.xpack.core.watcher.support.Exceptions.illegalSta public final class CurrentExecutions implements Iterable { + private static final Logger logger = LogManager.getLogger(CurrentExecutions.class); private final ConcurrentMap currentExecutions = new ConcurrentHashMap<>(); // the condition of the lock is used to wait and signal the finishing of all executions on shutdown private final ReentrantLock lock = new ReentrantLock(); @@ -63,9 +67,12 @@ public final class CurrentExecutions implements Iterable()); - this.clearExecutions(); + this.clearExecutions(stoppedListener); return cancelledTaskCount; } @@ -280,8 +289,10 @@ public class ExecutionService { ctx.setNodeId(clusterService.localNode().getId()); WatchRecord record = null; final String watchId = ctx.id().watchId(); + //pull this to a local reference since the class reference can be swapped, and need to ensure same object is used for put/remove + final CurrentExecutions currentExecutions = this.currentExecutions.get(); try { - boolean executionAlreadyExists = currentExecutions.get().put(watchId, new WatchExecution(ctx, Thread.currentThread())); + boolean executionAlreadyExists = currentExecutions.put(watchId, new WatchExecution(ctx, Thread.currentThread())); if (executionAlreadyExists) { logger.trace("not executing watch [{}] because it is already queued", watchId); record = ctx.abortBeforeExecution(ExecutionState.NOT_EXECUTED_ALREADY_QUEUED, "Watch is already queued in thread pool"); @@ -336,7 +347,7 @@ public class ExecutionService { triggeredWatchStore.delete(ctx.id()); } - currentExecutions.get().remove(watchId); + currentExecutions.remove(watchId); logger.debug("finished [{}]/[{}]", watchId, ctx.id()); } return record; @@ -580,11 +591,15 @@ public class ExecutionService { /** * This clears out the current executions and sets new empty current executions * This is needed, because when this method is called, watcher keeps running, so sealing executions would be a bad idea + * + * @param stoppedListener The listener that will set Watcher state to: {@link WatcherState#STOPPED}, may be a no-op assuming the + * {@link WatcherState#STOPPED} is set elsewhere or not needed to be set. */ - private void clearExecutions() { + private void clearExecutions(Runnable stoppedListener) { + assert stoppedListener != null; final CurrentExecutions currentExecutionsBeforeSetting = currentExecutions.getAndSet(new CurrentExecutions()); // clear old executions in background, no need to wait - genericExecutor.execute(() -> currentExecutionsBeforeSetting.sealAndAwaitEmpty(maxStopTimeout)); + genericExecutor.execute(() -> currentExecutionsBeforeSetting.sealAndAwaitEmpty(maxStopTimeout, stoppedListener)); } // the watch execution task takes another runnable as parameter diff --git a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleServiceTests.java b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleServiceTests.java index 548583ac14b..cf6c2c5ac66 100644 --- a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleServiceTests.java +++ b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleServiceTests.java @@ -34,6 +34,7 @@ import org.elasticsearch.xpack.core.watcher.WatcherMetaData; import org.elasticsearch.xpack.core.watcher.WatcherState; import org.elasticsearch.xpack.core.watcher.watch.Watch; import org.junit.Before; +import org.mockito.ArgumentCaptor; import org.mockito.stubbing.Answer; import java.util.Collections; @@ -133,8 +134,8 @@ public class WatcherLifeCycleServiceTests extends ESTestCase { when(watcherService.validate(clusterState)).thenReturn(true); lifeCycleService.shutDown(); - verify(watcherService, never()).stop(anyString()); - verify(watcherService, times(1)).shutDown(); + verify(watcherService, never()).stop(anyString(), any()); + verify(watcherService, times(1)).shutDown(any()); reset(watcherService); lifeCycleService.clusterChanged(new ClusterChangedEvent("any", clusterState, clusterState)); @@ -175,7 +176,12 @@ public class WatcherLifeCycleServiceTests extends ESTestCase { .build(); lifeCycleService.clusterChanged(new ClusterChangedEvent("foo", stoppedClusterState, clusterState)); - verify(watcherService, times(1)).stop(eq("watcher manually marked to shutdown by cluster state update")); + ArgumentCaptor captor = ArgumentCaptor.forClass(Runnable.class); + verify(watcherService, times(1)) + .stop(eq("watcher manually marked to shutdown by cluster state update"), captor.capture()); + assertEquals(WatcherState.STOPPING, lifeCycleService.getState()); + captor.getValue().run(); + assertEquals(WatcherState.STOPPED, lifeCycleService.getState()); // Starting via cluster state update, as the watcher metadata block is removed/set to true reset(watcherService); diff --git a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherServiceTests.java b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherServiceTests.java index f4ee831266b..e67512ee694 100644 --- a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherServiceTests.java +++ b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherServiceTests.java @@ -269,8 +269,8 @@ public class WatcherServiceTests extends ESTestCase { csBuilder.metaData(MetaData.builder()); service.reload(csBuilder.build(), "whatever"); - verify(executionService).clearExecutionsAndQueue(); - verify(executionService, never()).pause(); + verify(executionService).clearExecutionsAndQueue(any()); + verify(executionService, never()).pause(any()); verify(triggerService).pauseExecution(); } diff --git a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/AbstractWatcherIntegrationTestCase.java b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/AbstractWatcherIntegrationTestCase.java index 5a59ba24762..65d7589ff8b 100644 --- a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/AbstractWatcherIntegrationTestCase.java +++ b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/AbstractWatcherIntegrationTestCase.java @@ -538,10 +538,12 @@ public abstract class AbstractWatcherIntegrationTestCase extends ESIntegTestCase WatcherStatsResponse watcherStatsResponse = watcherClient().prepareWatcherStats().get(); assertThat(watcherStatsResponse.hasFailures(), is(false)); List> currentStatesFromStatsRequest = watcherStatsResponse.getNodes().stream() - .map(response -> Tuple.tuple(response.getNode().getName(), response.getWatcherState())) - .collect(Collectors.toList()); + .map(response -> Tuple.tuple(response.getNode().getName() + " (" + response.getThreadPoolQueueSize() + ")", + response.getWatcherState())).collect(Collectors.toList()); List states = currentStatesFromStatsRequest.stream().map(Tuple::v2).collect(Collectors.toList()); + + logger.info("waiting to stop watcher, current states {}", currentStatesFromStatsRequest); boolean isAllStateStarted = states.stream().allMatch(w -> w == WatcherState.STARTED); @@ -566,7 +568,7 @@ public abstract class AbstractWatcherIntegrationTestCase extends ESIntegTestCase } throw new AssertionError("unexpected state, retrying with next run"); - }); + }, 30, TimeUnit.SECONDS); } public static class NoopEmailService extends EmailService { diff --git a/x-pack/qa/smoke-test-watcher-with-security/src/test/java/org/elasticsearch/smoketest/SmokeTestWatcherWithSecurityClientYamlTestSuiteIT.java b/x-pack/qa/smoke-test-watcher-with-security/src/test/java/org/elasticsearch/smoketest/SmokeTestWatcherWithSecurityClientYamlTestSuiteIT.java index 679bc08f01f..9ec458067dc 100644 --- a/x-pack/qa/smoke-test-watcher-with-security/src/test/java/org/elasticsearch/smoketest/SmokeTestWatcherWithSecurityClientYamlTestSuiteIT.java +++ b/x-pack/qa/smoke-test-watcher-with-security/src/test/java/org/elasticsearch/smoketest/SmokeTestWatcherWithSecurityClientYamlTestSuiteIT.java @@ -19,6 +19,7 @@ import org.junit.After; import org.junit.Before; import java.util.Collections; +import java.util.concurrent.TimeUnit; import static java.util.Collections.emptyList; import static java.util.Collections.emptyMap; @@ -108,7 +109,7 @@ public class SmokeTestWatcherWithSecurityClientYamlTestSuiteIT extends ESClientY default: throw new AssertionError("unknown state[" + state + "]"); } - }); + }, 30, TimeUnit.SECONDS); } @Override diff --git a/x-pack/qa/smoke-test-watcher-with-security/src/test/java/org/elasticsearch/smoketest/SmokeTestWatcherWithSecurityIT.java b/x-pack/qa/smoke-test-watcher-with-security/src/test/java/org/elasticsearch/smoketest/SmokeTestWatcherWithSecurityIT.java index e0da00f29d4..3bad41a1393 100644 --- a/x-pack/qa/smoke-test-watcher-with-security/src/test/java/org/elasticsearch/smoketest/SmokeTestWatcherWithSecurityIT.java +++ b/x-pack/qa/smoke-test-watcher-with-security/src/test/java/org/elasticsearch/smoketest/SmokeTestWatcherWithSecurityIT.java @@ -22,6 +22,7 @@ import org.junit.Before; import java.io.IOException; import java.util.Map; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; @@ -91,8 +92,7 @@ public class SmokeTestWatcherWithSecurityIT extends ESRestTestCase { @After public void stopWatcher() throws Exception { - adminClient().performRequest(new Request("DELETE", "/my_test_index")); - + assertBusy(() -> { try { Response statsResponse = adminClient().performRequest(new Request("GET", "/_watcher/stats")); @@ -118,7 +118,9 @@ public class SmokeTestWatcherWithSecurityIT extends ESRestTestCase { } catch (IOException e) { throw new AssertionError(e); } - }); + }, 30, TimeUnit.SECONDS); + + adminClient().performRequest(new Request("DELETE", "/my_test_index")); } @Override diff --git a/x-pack/qa/smoke-test-watcher/src/test/java/org/elasticsearch/smoketest/SmokeTestWatcherTestSuiteIT.java b/x-pack/qa/smoke-test-watcher/src/test/java/org/elasticsearch/smoketest/SmokeTestWatcherTestSuiteIT.java index 3df9512298e..b720f0620de 100644 --- a/x-pack/qa/smoke-test-watcher/src/test/java/org/elasticsearch/smoketest/SmokeTestWatcherTestSuiteIT.java +++ b/x-pack/qa/smoke-test-watcher/src/test/java/org/elasticsearch/smoketest/SmokeTestWatcherTestSuiteIT.java @@ -20,6 +20,7 @@ import org.junit.Before; import java.io.IOException; import java.util.Map; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; @@ -92,7 +93,7 @@ public class SmokeTestWatcherTestSuiteIT extends ESRestTestCase { default: throw new AssertionError("unknown state[" + state + "]"); } - }); + }, 30, TimeUnit.SECONDS); } @Override diff --git a/x-pack/qa/smoke-test-watcher/src/test/java/org/elasticsearch/smoketest/WatcherRestIT.java b/x-pack/qa/smoke-test-watcher/src/test/java/org/elasticsearch/smoketest/WatcherRestIT.java index 2dd5cc86a89..3a1155d562d 100644 --- a/x-pack/qa/smoke-test-watcher/src/test/java/org/elasticsearch/smoketest/WatcherRestIT.java +++ b/x-pack/qa/smoke-test-watcher/src/test/java/org/elasticsearch/smoketest/WatcherRestIT.java @@ -14,6 +14,8 @@ import org.elasticsearch.xpack.test.rest.XPackRestTestConstants; import org.junit.After; import org.junit.Before; +import java.util.concurrent.TimeUnit; + import static java.util.Collections.emptyList; import static java.util.Collections.emptyMap; import static java.util.Collections.singletonMap; @@ -90,6 +92,6 @@ public class WatcherRestIT extends ESClientYamlSuiteTestCase { default: throw new AssertionError("unknown state[" + state + "]"); } - }); + }, 30, TimeUnit.SECONDS); } } diff --git a/x-pack/qa/third-party/jira/src/test/java/org/elasticsearch/smoketest/WatcherJiraYamlTestSuiteIT.java b/x-pack/qa/third-party/jira/src/test/java/org/elasticsearch/smoketest/WatcherJiraYamlTestSuiteIT.java index 8f8792f2697..c95c89a7ba9 100644 --- a/x-pack/qa/third-party/jira/src/test/java/org/elasticsearch/smoketest/WatcherJiraYamlTestSuiteIT.java +++ b/x-pack/qa/third-party/jira/src/test/java/org/elasticsearch/smoketest/WatcherJiraYamlTestSuiteIT.java @@ -17,6 +17,7 @@ import org.junit.Before; import java.io.IOException; import java.util.Arrays; import java.util.List; +import java.util.concurrent.TimeUnit; import static java.util.Collections.emptyList; import static java.util.Collections.emptyMap; @@ -70,6 +71,6 @@ public class WatcherJiraYamlTestSuiteIT extends ESClientYamlSuiteTestCase { } catch (IOException e) { throw new AssertionError(e); } - }); + }, 30, TimeUnit.SECONDS); } } diff --git a/x-pack/qa/third-party/pagerduty/src/test/java/org/elasticsearch/smoketest/WatcherPagerDutyYamlTestSuiteIT.java b/x-pack/qa/third-party/pagerduty/src/test/java/org/elasticsearch/smoketest/WatcherPagerDutyYamlTestSuiteIT.java index b9a628f71f9..64de13f8375 100644 --- a/x-pack/qa/third-party/pagerduty/src/test/java/org/elasticsearch/smoketest/WatcherPagerDutyYamlTestSuiteIT.java +++ b/x-pack/qa/third-party/pagerduty/src/test/java/org/elasticsearch/smoketest/WatcherPagerDutyYamlTestSuiteIT.java @@ -17,6 +17,7 @@ import org.junit.Before; import java.io.IOException; import java.util.Arrays; import java.util.List; +import java.util.concurrent.TimeUnit; import static java.util.Collections.emptyList; import static java.util.Collections.emptyMap; @@ -70,6 +71,6 @@ public class WatcherPagerDutyYamlTestSuiteIT extends ESClientYamlSuiteTestCase { } catch (IOException e) { throw new AssertionError(e); } - }); + }, 30, TimeUnit.SECONDS); } } diff --git a/x-pack/qa/third-party/slack/src/test/java/org/elasticsearch/smoketest/WatcherSlackYamlTestSuiteIT.java b/x-pack/qa/third-party/slack/src/test/java/org/elasticsearch/smoketest/WatcherSlackYamlTestSuiteIT.java index 01eeae442b2..a1e2938817b 100644 --- a/x-pack/qa/third-party/slack/src/test/java/org/elasticsearch/smoketest/WatcherSlackYamlTestSuiteIT.java +++ b/x-pack/qa/third-party/slack/src/test/java/org/elasticsearch/smoketest/WatcherSlackYamlTestSuiteIT.java @@ -17,6 +17,7 @@ import org.junit.Before; import java.io.IOException; import java.util.Arrays; import java.util.List; +import java.util.concurrent.TimeUnit; import static java.util.Collections.emptyList; import static java.util.Collections.emptyMap; @@ -70,6 +71,6 @@ public class WatcherSlackYamlTestSuiteIT extends ESClientYamlSuiteTestCase { } catch (IOException e) { throw new AssertionError(e); } - }); + }, 30, TimeUnit.SECONDS); } }