Watcher: Prevent NPE if watcher indices are closed (elastic/elasticsearch#4763)

The way we check for the triggered watches on start-up did not take into account
that an index could be closed and thus resulted in an NPE.

This commit adds a check to ensure that the watch index and triggered watches index
are open, before trying to check if all primary shards are active.

Original commit: elastic/x-pack-elasticsearch@ee05779963
This commit is contained in:
Alexander Reelsen 2017-02-03 11:44:48 +01:00 committed by GitHub
parent 1f32ef21a2
commit 44618b5b87
5 changed files with 59 additions and 11 deletions

View File

@ -109,15 +109,28 @@ public final class ExecutionService extends AbstractComponent {
public boolean validate(ClusterState state) {
boolean triggeredWatchStoreReady = triggeredWatchStore.validate(state);
try {
IndexMetaData indexMetaData = WatchStoreUtils.getConcreteIndex(Watch.INDEX, state.metaData());
if (indexMetaData != null) {
return triggeredWatchStoreReady && state.routingTable().index(indexMetaData.getIndex()).allPrimaryShardsActive();
}
} catch (Exception e) {
if (triggeredWatchStoreReady == false) {
return false;
}
try {
IndexMetaData indexMetaData = WatchStoreUtils.getConcreteIndex(Watch.INDEX, state.metaData());
// no watch index yet means we are good to go
if (indexMetaData == null) {
return true;
} else {
if (indexMetaData.getState() == IndexMetaData.State.CLOSE) {
logger.debug("watch index [{}] is marked as closed, watcher cannot be started",
indexMetaData.getIndex().getName());
return false;
} else {
return state.routingTable().index(indexMetaData.getIndex()).allPrimaryShardsActive();
}
}
} catch (IllegalStateException e) {
logger.trace((Supplier<?>) () -> new ParameterizedMessage("error getting index meta data [{}]: ", Watch.INDEX), e);
return false;
}
return triggeredWatchStoreReady;
}
public void stop() {

View File

@ -74,8 +74,14 @@ public class TriggeredWatchStore extends AbstractComponent {
try {
IndexMetaData indexMetaData = WatchStoreUtils.getConcreteIndex(INDEX_NAME, state.metaData());
if (indexMetaData != null) {
if (indexMetaData.getState() == IndexMetaData.State.CLOSE) {
logger.debug("triggered watch index [{}] is marked as closed, watcher cannot be started",
indexMetaData.getIndex().getName());
return false;
} else {
return state.routingTable().index(indexMetaData.getIndex()).allPrimaryShardsActive();
}
}
} catch (IllegalStateException e) {
logger.trace((Supplier<?>) () -> new ParameterizedMessage("error getting index meta data [{}]: ", INDEX_NAME), e);
return false;

View File

@ -6,8 +6,12 @@
package org.elasticsearch.xpack.watcher.execution;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsExecutors;
@ -58,6 +62,7 @@ import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.Matchers.sameInstance;
import static org.joda.time.DateTime.now;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyObject;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.mock;
@ -838,6 +843,20 @@ public class ExecutionServiceTests extends ESTestCase {
}
}
public void testValidateStartWithClosedIndex() throws Exception {
when(triggeredWatchStore.validate(anyObject())).thenReturn(true);
ClusterState.Builder csBuilder = new ClusterState.Builder(new ClusterName("_name"));
MetaData.Builder metaDataBuilder = MetaData.builder();
Settings indexSettings = settings(Version.CURRENT)
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)
.build();
metaDataBuilder.put(IndexMetaData.builder(Watch.INDEX).state(IndexMetaData.State.CLOSE).settings(indexSettings));
csBuilder.metaData(metaDataBuilder);
assertThat(executionService.validate(csBuilder.build()), is(false));
}
private Tuple<Condition, Condition.Result> whenCondition(final WatchExecutionContext context) {
Condition.Result conditionResult = mock(Condition.Result.class);
when(conditionResult.met()).thenReturn(true);

View File

@ -357,6 +357,19 @@ public class TriggeredWatchStoreTests extends ESTestCase {
verifyZeroInteractions(clientProxy);
}
// this is a special condition that could lead to an NPE in earlier versions
public void testTriggeredWatchesIndexIsClosed() throws Exception {
ClusterState.Builder csBuilder = new ClusterState.Builder(new ClusterName("_name"));
MetaData.Builder metaDataBuilder = MetaData.builder();
metaDataBuilder.put(IndexMetaData.builder(TriggeredWatchStore.INDEX_NAME)
.settings(indexSettings)
.state(IndexMetaData.State.CLOSE));
csBuilder.metaData(metaDataBuilder);
assertThat(triggeredWatchStore.validate(csBuilder.build()), is(false));
}
private RefreshResponse mockRefreshResponse(int total, int successful) {
RefreshResponse refreshResponse = mock(RefreshResponse.class);
when(refreshResponse.getTotalShards()).thenReturn(total);

View File

@ -22,8 +22,6 @@ import static org.hamcrest.Matchers.notNullValue;
public class PutWatchTests extends AbstractWatcherIntegrationTestCase {
public void testPut() throws Exception {
ensureWatcherStarted();
WatchSourceBuilder source = watchBuilder()
.trigger(schedule(interval("5m")));
@ -45,7 +43,6 @@ public class PutWatchTests extends AbstractWatcherIntegrationTestCase {
}
public void testPutNoTrigger() throws Exception {
ensureWatcherStarted();
ElasticsearchException exception = expectThrows(ElasticsearchException.class,
() -> watcherClient().preparePutWatch("_name").setSource(watchBuilder()
.input(simpleInput())