diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/watcher/execution/ExecutionService.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/watcher/execution/ExecutionService.java index a10af754c37..bb0ace19bb4 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/watcher/execution/ExecutionService.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/watcher/execution/ExecutionService.java @@ -109,15 +109,28 @@ public final class ExecutionService extends AbstractComponent { public boolean validate(ClusterState state) { boolean triggeredWatchStoreReady = triggeredWatchStore.validate(state); - try { - IndexMetaData indexMetaData = WatchStoreUtils.getConcreteIndex(Watch.INDEX, state.metaData()); - if (indexMetaData != null) { - return triggeredWatchStoreReady && state.routingTable().index(indexMetaData.getIndex()).allPrimaryShardsActive(); - } - } catch (Exception e) { + if (triggeredWatchStoreReady == false) { + return false; + } + + try { + IndexMetaData indexMetaData = WatchStoreUtils.getConcreteIndex(Watch.INDEX, state.metaData()); + // no watch index yet means we are good to go + if (indexMetaData == null) { + return true; + } else { + if (indexMetaData.getState() == IndexMetaData.State.CLOSE) { + logger.debug("watch index [{}] is marked as closed, watcher cannot be started", + indexMetaData.getIndex().getName()); + return false; + } else { + return state.routingTable().index(indexMetaData.getIndex()).allPrimaryShardsActive(); + } + } + } catch (IllegalStateException e) { + logger.trace((Supplier) () -> new ParameterizedMessage("error getting index meta data [{}]: ", Watch.INDEX), e); return false; } - return triggeredWatchStoreReady; } public void stop() { diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/watcher/execution/TriggeredWatchStore.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/watcher/execution/TriggeredWatchStore.java index 9669c170bcd..34a85605172 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/watcher/execution/TriggeredWatchStore.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/watcher/execution/TriggeredWatchStore.java @@ -74,7 +74,13 @@ public class TriggeredWatchStore extends AbstractComponent { try { IndexMetaData indexMetaData = WatchStoreUtils.getConcreteIndex(INDEX_NAME, state.metaData()); if (indexMetaData != null) { - return state.routingTable().index(indexMetaData.getIndex()).allPrimaryShardsActive(); + if (indexMetaData.getState() == IndexMetaData.State.CLOSE) { + logger.debug("triggered watch index [{}] is marked as closed, watcher cannot be started", + indexMetaData.getIndex().getName()); + return false; + } else { + return state.routingTable().index(indexMetaData.getIndex()).allPrimaryShardsActive(); + } } } catch (IllegalStateException e) { logger.trace((Supplier) () -> new ParameterizedMessage("error getting index meta data [{}]: ", INDEX_NAME), e); diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/watcher/execution/ExecutionServiceTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/watcher/execution/ExecutionServiceTests.java index 125b06a2d55..3ac0229173e 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/watcher/execution/ExecutionServiceTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/watcher/execution/ExecutionServiceTests.java @@ -6,8 +6,12 @@ package org.elasticsearch.xpack.watcher.execution; import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.Version; import org.elasticsearch.action.get.GetResponse; +import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.EsExecutors; @@ -58,6 +62,7 @@ import static org.hamcrest.Matchers.nullValue; import static org.hamcrest.Matchers.sameInstance; import static org.joda.time.DateTime.now; import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyObject; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.eq; import static org.mockito.Mockito.mock; @@ -838,6 +843,20 @@ public class ExecutionServiceTests extends ESTestCase { } } + public void testValidateStartWithClosedIndex() throws Exception { + when(triggeredWatchStore.validate(anyObject())).thenReturn(true); + ClusterState.Builder csBuilder = new ClusterState.Builder(new ClusterName("_name")); + MetaData.Builder metaDataBuilder = MetaData.builder(); + Settings indexSettings = settings(Version.CURRENT) + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) + .build(); + metaDataBuilder.put(IndexMetaData.builder(Watch.INDEX).state(IndexMetaData.State.CLOSE).settings(indexSettings)); + csBuilder.metaData(metaDataBuilder); + + assertThat(executionService.validate(csBuilder.build()), is(false)); + } + private Tuple whenCondition(final WatchExecutionContext context) { Condition.Result conditionResult = mock(Condition.Result.class); when(conditionResult.met()).thenReturn(true); diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/watcher/execution/TriggeredWatchStoreTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/watcher/execution/TriggeredWatchStoreTests.java index 36a938f37b8..c353e42ea18 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/watcher/execution/TriggeredWatchStoreTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/watcher/execution/TriggeredWatchStoreTests.java @@ -357,6 +357,19 @@ public class TriggeredWatchStoreTests extends ESTestCase { verifyZeroInteractions(clientProxy); } + // this is a special condition that could lead to an NPE in earlier versions + public void testTriggeredWatchesIndexIsClosed() throws Exception { + ClusterState.Builder csBuilder = new ClusterState.Builder(new ClusterName("_name")); + + MetaData.Builder metaDataBuilder = MetaData.builder(); + metaDataBuilder.put(IndexMetaData.builder(TriggeredWatchStore.INDEX_NAME) + .settings(indexSettings) + .state(IndexMetaData.State.CLOSE)); + csBuilder.metaData(metaDataBuilder); + + assertThat(triggeredWatchStore.validate(csBuilder.build()), is(false)); + } + private RefreshResponse mockRefreshResponse(int total, int successful) { RefreshResponse refreshResponse = mock(RefreshResponse.class); when(refreshResponse.getTotalShards()).thenReturn(total); diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/watcher/transport/action/put/PutWatchTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/watcher/transport/action/put/PutWatchTests.java index ddc9c6c2220..8bc1482c7db 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/watcher/transport/action/put/PutWatchTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/watcher/transport/action/put/PutWatchTests.java @@ -22,8 +22,6 @@ import static org.hamcrest.Matchers.notNullValue; public class PutWatchTests extends AbstractWatcherIntegrationTestCase { public void testPut() throws Exception { - ensureWatcherStarted(); - WatchSourceBuilder source = watchBuilder() .trigger(schedule(interval("5m"))); @@ -45,7 +43,6 @@ public class PutWatchTests extends AbstractWatcherIntegrationTestCase { } public void testPutNoTrigger() throws Exception { - ensureWatcherStarted(); ElasticsearchException exception = expectThrows(ElasticsearchException.class, () -> watcherClient().preparePutWatch("_name").setSource(watchBuilder() .input(simpleInput())