Watcher: Only try to load triggered watches index, if it exists (elastic/x-pack-elasticsearch#1569)
This is mainly a commit to reduce noise in test logfiles when going through them. When watcher shuts down and another node takes over, it might try to start watcher again and tries to load triggered watches. However the triggered watches index could be gone in the meantime due to further shutdown. This results in logging a stack trace that the index does not exist. This commit checks the cluster state before trying to load triggered watches to prevent an IndexNotFoundException in the logs. Original commit: elastic/x-pack-elasticsearch@9f26d557d0
This commit is contained in:
parent
d769ee0813
commit
709ed7d50e
|
@ -126,7 +126,7 @@ public class WatcherService extends AbstractComponent {
|
|||
Collection<Watch> watches = loadWatches(clusterState);
|
||||
triggerService.start(watches);
|
||||
|
||||
Collection<TriggeredWatch> triggeredWatches = triggeredWatchStore.findTriggeredWatches(watches);
|
||||
Collection<TriggeredWatch> triggeredWatches = triggeredWatchStore.findTriggeredWatches(watches, clusterState);
|
||||
executionService.executeTriggeredWatches(triggeredWatches);
|
||||
|
||||
state.set(WatcherState.STARTED);
|
||||
|
@ -175,7 +175,7 @@ public class WatcherService extends AbstractComponent {
|
|||
// then load triggered watches, which might have been in the queue that we just cleared,
|
||||
// maybe we dont need to execute those anymore however, i.e. due to shard shuffling
|
||||
// then someone else will
|
||||
Collection<TriggeredWatch> triggeredWatches = triggeredWatchStore.findTriggeredWatches(watches);
|
||||
Collection<TriggeredWatch> triggeredWatches = triggeredWatchStore.findTriggeredWatches(watches, clusterState);
|
||||
executionService.executeTriggeredWatches(triggeredWatches);
|
||||
}
|
||||
|
||||
|
|
|
@ -204,17 +204,23 @@ public class TriggeredWatchStore extends AbstractComponent {
|
|||
* Note: This is executing a blocking call over the network, thus a potential source of problems
|
||||
*
|
||||
* @param watches The list of watches that will be loaded here
|
||||
* @param clusterState The current cluster state
|
||||
* @return A list of triggered watches that have been started to execute somewhere else but not finished
|
||||
*/
|
||||
public Collection<TriggeredWatch> findTriggeredWatches(Collection<Watch> watches) {
|
||||
public Collection<TriggeredWatch> findTriggeredWatches(Collection<Watch> watches, ClusterState clusterState) {
|
||||
if (watches.isEmpty()) {
|
||||
return Collections.emptyList();
|
||||
}
|
||||
|
||||
// non existing index, return immediately
|
||||
IndexMetaData indexMetaData = WatchStoreUtils.getConcreteIndex(TriggeredWatchStore.INDEX_NAME, clusterState.metaData());
|
||||
if (indexMetaData == null) {
|
||||
return Collections.emptyList();
|
||||
}
|
||||
|
||||
try {
|
||||
client.admin().indices().refresh(new RefreshRequest(TriggeredWatchStore.INDEX_NAME)).actionGet(TimeValue.timeValueSeconds(5));
|
||||
} catch (IndexNotFoundException e) {
|
||||
// no index, no problems, we dont need to search further
|
||||
return Collections.emptyList();
|
||||
}
|
||||
|
||||
|
|
|
@ -32,6 +32,7 @@ import org.elasticsearch.common.bytes.BytesReference;
|
|||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.text.Text;
|
||||
import org.elasticsearch.index.Index;
|
||||
import org.elasticsearch.index.IndexNotFoundException;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.search.SearchHit;
|
||||
import org.elasticsearch.search.SearchHits;
|
||||
|
@ -58,6 +59,7 @@ import static org.mockito.Mockito.doAnswer;
|
|||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.times;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.verifyZeroInteractions;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
public class TriggeredWatchStoreTests extends ESTestCase {
|
||||
|
@ -80,7 +82,8 @@ public class TriggeredWatchStoreTests extends ESTestCase {
|
|||
}
|
||||
|
||||
public void testFindTriggeredWatchesEmptyCollection() throws Exception {
|
||||
Collection<TriggeredWatch> triggeredWatches = triggeredWatchStore.findTriggeredWatches(Collections.emptyList());
|
||||
ClusterState.Builder csBuilder = new ClusterState.Builder(new ClusterName("name"));
|
||||
Collection<TriggeredWatch> triggeredWatches = triggeredWatchStore.findTriggeredWatches(Collections.emptyList(), csBuilder.build());
|
||||
assertThat(triggeredWatches, hasSize(0));
|
||||
}
|
||||
|
||||
|
@ -219,7 +222,7 @@ public class TriggeredWatchStoreTests extends ESTestCase {
|
|||
if (randomBoolean()) {
|
||||
watches.add(watch2);
|
||||
}
|
||||
Collection<TriggeredWatch> triggeredWatches = triggeredWatchStore.findTriggeredWatches(watches);
|
||||
Collection<TriggeredWatch> triggeredWatches = triggeredWatchStore.findTriggeredWatches(watches, cs);
|
||||
assertThat(triggeredWatches, notNullValue());
|
||||
assertThat(triggeredWatches, hasSize(watches.size()));
|
||||
|
||||
|
@ -297,6 +300,36 @@ public class TriggeredWatchStoreTests extends ESTestCase {
|
|||
assertThat(triggeredWatchStore.validate(csBuilder.build()), is(false));
|
||||
}
|
||||
|
||||
public void testTriggeredWatchesIndexDoesNotExistOnStartup() throws Exception {
|
||||
ClusterState.Builder csBuilder = new ClusterState.Builder(new ClusterName("_name"));
|
||||
ClusterState cs = csBuilder.build();
|
||||
assertThat(triggeredWatchStore.validate(cs), is(true));
|
||||
Watch watch = mock(Watch.class);
|
||||
triggeredWatchStore.findTriggeredWatches(Collections.singletonList(watch), cs);
|
||||
verifyZeroInteractions(client);
|
||||
}
|
||||
|
||||
public void testIndexNotFoundButInMetaData() throws Exception {
|
||||
ClusterState.Builder csBuilder = new ClusterState.Builder(new ClusterName("_name"));
|
||||
MetaData.Builder metaDataBuilder = MetaData.builder()
|
||||
.put(IndexMetaData.builder(TriggeredWatchStore.INDEX_NAME).settings(indexSettings));
|
||||
csBuilder.metaData(metaDataBuilder);
|
||||
|
||||
ClusterState cs = csBuilder.build();
|
||||
Watch watch = mock(Watch.class);
|
||||
|
||||
AdminClient adminClient = mock(AdminClient.class);
|
||||
when(client.admin()).thenReturn(adminClient);
|
||||
IndicesAdminClient indicesAdminClient = mock(IndicesAdminClient.class);
|
||||
when(adminClient.indices()).thenReturn(indicesAdminClient);
|
||||
PlainActionFuture<RefreshResponse> future = PlainActionFuture.newFuture();
|
||||
when(indicesAdminClient.refresh(any())).thenReturn(future);
|
||||
future.onFailure(new IndexNotFoundException(TriggeredWatchStore.INDEX_NAME));
|
||||
|
||||
Collection<TriggeredWatch> triggeredWatches = triggeredWatchStore.findTriggeredWatches(Collections.singletonList(watch), cs);
|
||||
assertThat(triggeredWatches, hasSize(0));
|
||||
}
|
||||
|
||||
private RefreshResponse mockRefreshResponse(int total, int successful) {
|
||||
RefreshResponse refreshResponse = mock(RefreshResponse.class);
|
||||
when(refreshResponse.getTotalShards()).thenReturn(total);
|
||||
|
|
Loading…
Reference in New Issue