From c4cb85104ede2506cd514c1dd521b4268c786b69 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Thu, 18 Jun 2015 13:45:55 +0200 Subject: [PATCH] lifecycle: Verify whether Watcher was stopped manually under a lock Before if timing permitted it a start and stop could be executed very close to each other time wise. This could make the manual stopped check seem to be valid, because the stop hadn't yet been performed. Original commit: elastic/x-pack-elasticsearch@c1865c10697539f71c1039e037eca639e57afc8a --- .../watcher/WatcherLifeCycleService.java | 100 ++++++++++-------- .../watcher/WatcherLifeCycleServiceTests.java | 44 +++++++- .../history/HistoryStoreSettingsTests.java | 2 +- 3 files changed, 98 insertions(+), 48 deletions(-) diff --git a/src/main/java/org/elasticsearch/watcher/WatcherLifeCycleService.java b/src/main/java/org/elasticsearch/watcher/WatcherLifeCycleService.java index b8aef15dd6a..93787246925 100644 --- a/src/main/java/org/elasticsearch/watcher/WatcherLifeCycleService.java +++ b/src/main/java/org/elasticsearch/watcher/WatcherLifeCycleService.java @@ -24,7 +24,8 @@ public class WatcherLifeCycleService extends AbstractComponent implements Cluste private final WatcherService watcherService; private final ClusterService clusterService; - // Maybe this should be a setting in the cluster settings? + // TODO: If Watcher was stopped via api and the master is changed then Watcher will start regardless of the previous + // stop command, so at some point this needs to be a cluster setting private volatile boolean manuallyStopped; @Inject @@ -46,24 +47,65 @@ public class WatcherLifeCycleService extends AbstractComponent implements Cluste } public void start() { - start(clusterService.state()); + start(clusterService.state(), true); } public void stop() { stop(true); } - private synchronized void start(ClusterState state) { - watcherService.start(state); - } - private synchronized void stop(boolean manual) { manuallyStopped = manual; watcherService.stop(); } + private synchronized void start(ClusterState state, boolean manual) { + WatcherState watcherState = watcherService.state(); + if (watcherState != WatcherState.STOPPED) { + logger.debug("Not starting, because state [{}] while [{}] is expected", watcherState, WatcherState.STOPPED); + return; + } + + // If we start from a cluster state update we need to check if previously we stopped manually + // otherwise Watcher would start upon the next cluster state update while the user instructed Watcher to not run + if (!manual && manuallyStopped) { + logger.debug("Not starting, because watcher has been stopped manually, so watcher can't be started automatically"); + return; + } + + if (!watcherService.validate(state)) { + logger.debug("Not starting, because the cluster state isn't valid"); + return; + } + + int attempts = 0; + while(true) { + try { + logger.debug("Start attempt [{}], based on cluster state version [{}]", attempts, state.getVersion()); + watcherService.start(state); + return; + } catch (Exception e) { + if (++attempts < 3) { + logger.warn("error occurred while starting, retrying...", e); + try { + Thread.sleep(1000); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + } + if (!clusterService.localNode().masterNode()) { + logger.error("abort retry, we are no longer master"); + return; + } + } else { + logger.error("attempted to start Watcher [{}] times, aborting now, please try to start Watcher manually", attempts); + return; + } + } + } + } + @Override - public void clusterChanged(ClusterChangedEvent event) { + public void clusterChanged(final ClusterChangedEvent event) { if (!event.localNodeMaster()) { // We're no longer the master so we need to stop the watcher. // Stopping the watcher may take a while since it will wait on the scheduler to complete shutdown, @@ -77,46 +119,18 @@ public class WatcherLifeCycleService extends AbstractComponent implements Cluste }); } else { if (event.state().blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)) { - // wait until the gateway has recovered from disk, otherwise we think may not have .watchs and - // a .watch_history index, but they may not have been restored from the cluster state on disk + // wait until the gateway has recovered from disk, otherwise we think may not have .watches and + // a .triggered_watches index, but they may not have been restored from the cluster state on disk return; } final ClusterState state = event.state(); - if (!watcherService.validate(state)) { - return; - } - - if (watcherService.state() == WatcherState.STOPPED && !manuallyStopped) { - threadPool.executor(ThreadPool.Names.GENERIC).execute(new Runnable() { - @Override - public void run() { - int attempts = 0; - while(true) { - try { - start(state); - return; - } catch (Exception e) { - if (++attempts < 3) { - logger.warn("error occurred while starting, retrying...", e); - try { - Thread.sleep(1000); - } catch (InterruptedException ie) { - Thread.currentThread().interrupt(); - } - if (!clusterService.localNode().masterNode()) { - logger.error("abort retry, we are no longer master"); - return; - } - } else { - logger.error("attempted to start Watcher [{}] times, aborting now, please try to start Watcher manually", attempts); - return; - } - } - } - } - }); - } + threadPool.executor(ThreadPool.Names.GENERIC).execute(new Runnable() { + @Override + public void run() { + start(state, false); + } + }); } } } diff --git a/src/test/java/org/elasticsearch/watcher/WatcherLifeCycleServiceTests.java b/src/test/java/org/elasticsearch/watcher/WatcherLifeCycleServiceTests.java index 4b935f40482..87068f5b376 100644 --- a/src/test/java/org/elasticsearch/watcher/WatcherLifeCycleServiceTests.java +++ b/src/test/java/org/elasticsearch/watcher/WatcherLifeCycleServiceTests.java @@ -26,6 +26,7 @@ import static org.mockito.Mockito.*; */ public class WatcherLifeCycleServiceTests extends ElasticsearchTestCase { + private ClusterService clusterService; private WatcherService watcherService; private WatcherLifeCycleService lifeCycleService; @@ -34,7 +35,7 @@ public class WatcherLifeCycleServiceTests extends ElasticsearchTestCase { ThreadPool threadPool = mock(ThreadPool.class); when(threadPool.executor(anyString())).thenReturn(MoreExecutors.newDirectExecutorService()); watcherService = mock(WatcherService.class); - ClusterService clusterService = mock(ClusterService.class); + clusterService = mock(ClusterService.class); lifeCycleService = new WatcherLifeCycleService(Settings.EMPTY, clusterService, threadPool, watcherService); } @@ -78,6 +79,14 @@ public class WatcherLifeCycleServiceTests extends ElasticsearchTestCase { @Test public void testManualStartStop() { + DiscoveryNodes.Builder nodes = new DiscoveryNodes.Builder().masterNodeId("id1").localNodeId("id1"); + ClusterState clusterState = ClusterState.builder(new ClusterName("my-cluster")) + .nodes(nodes).build(); + when(clusterService.state()).thenReturn(clusterState); + when(watcherService.state()).thenReturn(WatcherState.STOPPED); + when(watcherService.validate(clusterState)).thenReturn(true); + + lifeCycleService.start(); verify(watcherService, times(1)).start(any(ClusterState.class)); verify(watcherService, never()).stop(); @@ -87,9 +96,6 @@ public class WatcherLifeCycleServiceTests extends ElasticsearchTestCase { verify(watcherService, times(1)).stop(); // Starting via cluster state update, we shouldn't start because we have been stopped manually. - DiscoveryNodes.Builder nodes = new DiscoveryNodes.Builder().masterNodeId("id1").localNodeId("id1"); - ClusterState clusterState = ClusterState.builder(new ClusterName("my-cluster")) - .nodes(nodes).build(); when(watcherService.state()).thenReturn(WatcherState.STOPPED); lifeCycleService.clusterChanged(new ClusterChangedEvent("any", clusterState, clusterState)); verify(watcherService, times(1)).start(any(ClusterState.class)); @@ -120,4 +126,34 @@ public class WatcherLifeCycleServiceTests extends ElasticsearchTestCase { verify(watcherService, times(2)).stop(); } + @Test + public void testManualStartStop_clusterStateNotValid() { + DiscoveryNodes.Builder nodes = new DiscoveryNodes.Builder().masterNodeId("id1").localNodeId("id1"); + ClusterState clusterState = ClusterState.builder(new ClusterName("my-cluster")) + .nodes(nodes).build(); + when(clusterService.state()).thenReturn(clusterState); + when(watcherService.state()).thenReturn(WatcherState.STOPPED); + when(watcherService.validate(clusterState)).thenReturn(false); + + + lifeCycleService.start(); + verify(watcherService, never()).start(any(ClusterState.class)); + verify(watcherService, never()).stop(); + } + + @Test + public void testManualStartStop_watcherNotStopped() { + DiscoveryNodes.Builder nodes = new DiscoveryNodes.Builder().masterNodeId("id1").localNodeId("id1"); + ClusterState clusterState = ClusterState.builder(new ClusterName("my-cluster")) + .nodes(nodes).build(); + when(clusterService.state()).thenReturn(clusterState); + when(watcherService.state()).thenReturn(WatcherState.STOPPING); + + + lifeCycleService.start(); + verify(watcherService, never()).validate(any(ClusterState.class)); + verify(watcherService, never()).start(any(ClusterState.class)); + verify(watcherService, never()).stop(); + } + } diff --git a/src/test/java/org/elasticsearch/watcher/history/HistoryStoreSettingsTests.java b/src/test/java/org/elasticsearch/watcher/history/HistoryStoreSettingsTests.java index 90ccbdf5c65..cf3fe906a78 100644 --- a/src/test/java/org/elasticsearch/watcher/history/HistoryStoreSettingsTests.java +++ b/src/test/java/org/elasticsearch/watcher/history/HistoryStoreSettingsTests.java @@ -20,7 +20,7 @@ import static org.hamcrest.core.Is.is; /** */ -@TestLogging("cluster:DEBUG,action.admin.cluster.settings:DEBUG") +@TestLogging("cluster:DEBUG,action.admin.cluster.settings:DEBUG,watcher:DEBUG") @ElasticsearchIntegrationTest.ClusterScope(scope = TEST, numClientNodes = 0, transportClientRatio = 0, randomDynamicTemplates = false, numDataNodes = 1) public class HistoryStoreSettingsTests extends AbstractWatcherIntegrationTests {