diff --git a/src/main/java/org/elasticsearch/watcher/WatcherLifeCycleService.java b/src/main/java/org/elasticsearch/watcher/WatcherLifeCycleService.java index aa91feebff7..da6a18c6f7c 100644 --- a/src/main/java/org/elasticsearch/watcher/WatcherLifeCycleService.java +++ b/src/main/java/org/elasticsearch/watcher/WatcherLifeCycleService.java @@ -55,6 +55,12 @@ public class WatcherLifeCycleService extends AbstractComponent implements Cluste } private synchronized void stop(boolean manual) { + WatcherState watcherState = watcherService.state(); + if (watcherState != WatcherState.STARTED) { + logger.debug("not stopping watcher. watcher can only stop if its current state is [{}], but its current state now is [{}]", WatcherState.STARTED, watcherState); + return; + } + manuallyStopped = manual; watcherService.stop(); } @@ -62,7 +68,7 @@ public class WatcherLifeCycleService extends AbstractComponent implements Cluste private synchronized void start(ClusterState state, boolean manual) { WatcherState watcherState = watcherService.state(); if (watcherState != WatcherState.STOPPED) { - logger.debug("not starting watcher. watcher can only start if its current state is [{}]", WatcherState.STOPPED); + logger.debug("not starting watcher. watcher can only start if its current state is [{}], but its current state now is [{}]", WatcherState.STOPPED, watcherState); return; } @@ -78,7 +84,7 @@ public class WatcherLifeCycleService extends AbstractComponent implements Cluste return; } - logger.trace("starting... (based on cluster state version [{}])", state.getVersion()); + logger.trace("starting... (based on cluster state version [{}]) (manual [{}])", state.getVersion(), manual); try { watcherService.start(state); } catch (Exception e) { @@ -89,6 +95,11 @@ public class WatcherLifeCycleService extends AbstractComponent implements Cluste @Override public void clusterChanged(final ClusterChangedEvent event) { if (!event.localNodeMaster()) { + if (watcherService.state() != WatcherState.STARTED) { + // to avoid unnecessary forking of threads... + return; + } + // We're no longer the master so we need to stop the watcher. // Stopping the watcher may take a while since it will wait on the scheduler to complete shutdown, // so we fork here so that we don't wait too long. Other events may need to be processed and @@ -105,6 +116,10 @@ public class WatcherLifeCycleService extends AbstractComponent implements Cluste // a .triggered_watches index, but they may not have been restored from the cluster state on disk return; } + if (watcherService.state() != WatcherState.STOPPED) { + // to avoid unnecessary forking of threads... + return; + } final ClusterState state = event.state(); threadPool.executor(ThreadPool.Names.GENERIC).execute(new Runnable() { diff --git a/src/main/java/org/elasticsearch/watcher/execution/TriggeredWatchStore.java b/src/main/java/org/elasticsearch/watcher/execution/TriggeredWatchStore.java index d921eefc8d3..a153f3b7ef5 100644 --- a/src/main/java/org/elasticsearch/watcher/execution/TriggeredWatchStore.java +++ b/src/main/java/org/elasticsearch/watcher/execution/TriggeredWatchStore.java @@ -260,7 +260,7 @@ public class TriggeredWatchStore extends AbstractComponent { public Collection loadTriggeredWatches(ClusterState state) { IndexMetaData indexMetaData = state.getMetaData().index(INDEX_NAME); if (indexMetaData == null) { - logger.debug("no .watch_history indices found. skipping loading awaiting triggered watches"); + logger.debug("no .triggered_watches indices found. skipping loading awaiting triggered watches"); return Collections.emptySet(); } diff --git a/src/main/java/org/elasticsearch/watcher/watch/WatchStore.java b/src/main/java/org/elasticsearch/watcher/watch/WatchStore.java index 3758b15fc30..8007805a8e1 100644 --- a/src/main/java/org/elasticsearch/watcher/watch/WatchStore.java +++ b/src/main/java/org/elasticsearch/watcher/watch/WatchStore.java @@ -97,14 +97,16 @@ public class WatchStore extends AbstractComponent { public boolean validate(ClusterState state) { IndexMetaData watchesIndexMetaData = state.getMetaData().index(INDEX); if (watchesIndexMetaData == null) { - logger.debug("watches index [{}] doesn't exist, so we can start", INDEX); + logger.debug("index [{}] doesn't exist, so we can start", INDEX); return true; } if (state.routingTable().index(INDEX).allPrimaryShardsActive()) { - logger.debug("watches index [{}] exists and all primary shards are started, so we can start", INDEX); + logger.debug("index [{}] exists and all primary shards are started, so we can start", INDEX); return true; + } else { + logger.debug("not all primary shards active for index [{}], so we cannot start", INDEX); + return false; } - return false; } public boolean started() { diff --git a/src/test/java/org/elasticsearch/watcher/WatcherLifeCycleServiceTests.java b/src/test/java/org/elasticsearch/watcher/WatcherLifeCycleServiceTests.java index 87068f5b376..d5cea155f7c 100644 --- a/src/test/java/org/elasticsearch/watcher/WatcherLifeCycleServiceTests.java +++ b/src/test/java/org/elasticsearch/watcher/WatcherLifeCycleServiceTests.java @@ -83,14 +83,15 @@ public class WatcherLifeCycleServiceTests extends ElasticsearchTestCase { ClusterState clusterState = ClusterState.builder(new ClusterName("my-cluster")) .nodes(nodes).build(); when(clusterService.state()).thenReturn(clusterState); - when(watcherService.state()).thenReturn(WatcherState.STOPPED); when(watcherService.validate(clusterState)).thenReturn(true); + when(watcherService.state()).thenReturn(WatcherState.STOPPED); lifeCycleService.start(); verify(watcherService, times(1)).start(any(ClusterState.class)); verify(watcherService, never()).stop(); + when(watcherService.state()).thenReturn(WatcherState.STARTED); lifeCycleService.stop(); verify(watcherService, times(1)).start(any(ClusterState.class)); verify(watcherService, times(1)).stop(); @@ -110,7 +111,7 @@ public class WatcherLifeCycleServiceTests extends ElasticsearchTestCase { nodes = new DiscoveryNodes.Builder().masterNodeId("id1").localNodeId("id2"); clusterState = ClusterState.builder(new ClusterName("my-cluster")) .nodes(nodes).build(); - when(watcherService.state()).thenReturn(WatcherState.STOPPED); + when(watcherService.state()).thenReturn(WatcherState.STARTED); lifeCycleService.clusterChanged(new ClusterChangedEvent("any", clusterState, clusterState)); verify(watcherService, times(2)).start(any(ClusterState.class)); verify(watcherService, times(2)).stop();