diff --git a/elasticsearch/x-pack/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleService.java b/elasticsearch/x-pack/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleService.java index daa21ac5d89..b96fed9f746 100644 --- a/elasticsearch/x-pack/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleService.java +++ b/elasticsearch/x-pack/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleService.java @@ -13,6 +13,7 @@ import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateListener; import org.elasticsearch.cluster.ack.AckedRequest; +import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.component.AbstractComponent; @@ -22,6 +23,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.gateway.GatewayService; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xpack.watcher.watch.WatchStore; import java.util.concurrent.CountDownLatch; @@ -119,34 +121,33 @@ public class WatcherLifeCycleService extends AbstractComponent implements Cluste } if (!event.localNodeMaster()) { - if (watcherService.state() != WatcherState.STARTED) { - // to avoid unnecessary forking of threads... - return; + if (watcherService.state() == WatcherState.STARTED) { + // We're no longer the master so we need to stop the watcher. + // Stopping the watcher may take a while since it will wait on the scheduler to complete shutdown, + // so we fork here so that we don't wait too long. Other events may need to be processed and + // other cluster state listeners may need to be executed as well for this event. + threadPool.executor(ThreadPool.Names.GENERIC).execute(() -> stop(false)); } - - // We're no longer the master so we need to stop the watcher. - // Stopping the watcher may take a while since it will wait on the scheduler to complete shutdown, - // so we fork here so that we don't wait too long. Other events may need to be processed and - // other cluster state listeners may need to be executed as well for this event. - threadPool.executor(ThreadPool.Names.GENERIC).execute(new Runnable() { - @Override - public void run() { - stop(false); - } - }); } else { - if (watcherService.state() != WatcherState.STOPPED) { - // to avoid unnecessary forking of threads... - return; - } + if (watcherService.state() == WatcherState.STOPPED) { + final ClusterState state = event.state(); + threadPool.executor(ThreadPool.Names.GENERIC).execute(() -> start(state, false)); + } else { + boolean isWatchIndexDeleted = event.indicesDeleted().stream() + .filter(index -> WatchStore.INDEX.equals(index.getName())) + .findAny() + .isPresent(); - final ClusterState state = event.state(); - threadPool.executor(ThreadPool.Names.GENERIC).execute(new Runnable() { - @Override - public void run() { - start(state, false); + boolean isWatchIndexOpenInPreviousClusterState = event.previousState().metaData().hasIndex(WatchStore.INDEX) && + event.previousState().metaData().index(WatchStore.INDEX).getState() == IndexMetaData.State.OPEN; + boolean isWatchIndexClosedInCurrentClusterState = event.state().metaData().hasIndex(WatchStore.INDEX) && + event.state().metaData().index(WatchStore.INDEX).getState() == IndexMetaData.State.CLOSE; + boolean hasWatcherIndexBeenClosed = isWatchIndexOpenInPreviousClusterState && isWatchIndexClosedInCurrentClusterState; + + if (isWatchIndexDeleted || hasWatcherIndexBeenClosed) { + threadPool.executor(ThreadPool.Names.GENERIC).execute(() -> watcherService.watchIndexDeletedOrClosed()); } - }); + } } } diff --git a/elasticsearch/x-pack/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherService.java b/elasticsearch/x-pack/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherService.java index fb66c4d5062..174eb77fa2f 100644 --- a/elasticsearch/x-pack/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherService.java +++ b/elasticsearch/x-pack/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherService.java @@ -16,9 +16,9 @@ import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.engine.VersionConflictEngineException; +import org.elasticsearch.xpack.support.clock.Clock; import org.elasticsearch.xpack.watcher.execution.ExecutionService; import org.elasticsearch.xpack.watcher.support.WatcherIndexTemplateRegistry; -import org.elasticsearch.xpack.support.clock.Clock; import org.elasticsearch.xpack.watcher.trigger.TriggerService; import org.elasticsearch.xpack.watcher.watch.Watch; import org.elasticsearch.xpack.watcher.watch.WatchLockService; @@ -292,4 +292,13 @@ public class WatcherService extends AbstractComponent { innerMap.putAll(watchStore.usageStats()); return innerMap; } + + /** + * Something deleted or closed the {@link WatchStore#INDEX} and thus we need to do some cleanup to prevent further execution of watches + * as those watches cannot be updated anymore + */ + public void watchIndexDeletedOrClosed() { + watchStore.clearWatchesInMemory(); + executionService.clearExecutions(); + } } diff --git a/elasticsearch/x-pack/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/ExecutionService.java b/elasticsearch/x-pack/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/ExecutionService.java index 35ad6a9f9da..5e4f97e4102 100644 --- a/elasticsearch/x-pack/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/ExecutionService.java +++ b/elasticsearch/x-pack/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/ExecutionService.java @@ -141,12 +141,7 @@ public class ExecutionService extends AbstractComponent { currentExecutions.add(watchExecution.createSnapshot()); } // Lets show the longest running watch first: - Collections.sort(currentExecutions, new Comparator() { - @Override - public int compare(WatchExecutionSnapshot e1, WatchExecutionSnapshot e2) { - return e1.executionTime().compareTo(e2.executionTime()); - } - }); + Collections.sort(currentExecutions, Comparator.comparing(WatchExecutionSnapshot::executionTime)); return currentExecutions; } @@ -163,12 +158,8 @@ public class ExecutionService extends AbstractComponent { queuedWatches.add(new QueuedWatch(executionTask.ctx)); } // Lets show the execution that pending the longest first: - Collections.sort(queuedWatches, new Comparator() { - @Override - public int compare(QueuedWatch e1, QueuedWatch e2) { - return e1.executionTime().compareTo(e2.executionTime()); - } - }); + + Collections.sort(queuedWatches, Comparator.comparing(QueuedWatch::executionTime)); return queuedWatches; } @@ -438,6 +429,15 @@ public class ExecutionService extends AbstractComponent { return counters.toMap(); } + /** + * This clears out the current executions and sets new empty current executions + * This is needed, because when this method is called, watcher keeps running, so sealing executions would be a bad idea + */ + public void clearExecutions() { + currentExecutions.sealAndAwaitEmpty(maxStopTimeout); + currentExecutions = new CurrentExecutions(); + } + private static final class StartupExecutionContext extends TriggeredExecutionContext { public StartupExecutionContext(Watch watch, DateTime executionTime, TriggerEvent triggerEvent, TimeValue defaultThrottlePeriod) { diff --git a/elasticsearch/x-pack/watcher/src/main/java/org/elasticsearch/xpack/watcher/watch/WatchStore.java b/elasticsearch/x-pack/watcher/src/main/java/org/elasticsearch/xpack/watcher/watch/WatchStore.java index 4975baaa470..43305267267 100644 --- a/elasticsearch/x-pack/watcher/src/main/java/org/elasticsearch/xpack/watcher/watch/WatchStore.java +++ b/elasticsearch/x-pack/watcher/src/main/java/org/elasticsearch/xpack/watcher/watch/WatchStore.java @@ -333,6 +333,10 @@ public class WatchStore extends AbstractComponent { } } + public void clearWatchesInMemory() { + watches.clear(); + } + public class WatchPut { private final Watch previous; diff --git a/elasticsearch/x-pack/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleServiceTests.java b/elasticsearch/x-pack/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleServiceTests.java index 193e927cf43..feff0267558 100644 --- a/elasticsearch/x-pack/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleServiceTests.java +++ b/elasticsearch/x-pack/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleServiceTests.java @@ -5,18 +5,22 @@ */ package org.elasticsearch.xpack.watcher; +import org.elasticsearch.Version; import org.elasticsearch.cluster.AckedClusterStateUpdateTask; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterName; -import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.block.ClusterBlocks; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.gateway.GatewayService; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xpack.watcher.watch.WatchStore; import org.junit.Before; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; @@ -165,4 +169,46 @@ public class WatcherLifeCycleServiceTests extends ESTestCase { verify(watcherService, never()).start(any(ClusterState.class)); verify(watcherService, never()).stop(); } + + public void testWatchIndexDeletion() throws Exception { + DiscoveryNodes discoveryNodes = new DiscoveryNodes.Builder().masterNodeId("id1").localNodeId("id1").build(); + // old cluster state that contains watcher index + Settings indexSettings = Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT).build(); + ClusterState oldClusterState = ClusterState.builder(new ClusterName("my-cluster")) + .metaData(new MetaData.Builder().put(IndexMetaData.builder(WatchStore.INDEX) + .settings(indexSettings).numberOfReplicas(0).numberOfShards(1))) + .nodes(discoveryNodes).build(); + + // new cluster state that does not contain watcher index + ClusterState newClusterState = ClusterState.builder(new ClusterName("my-cluster")).nodes(discoveryNodes).build(); + when(watcherService.state()).thenReturn(WatcherState.STARTED); + + lifeCycleService.clusterChanged(new ClusterChangedEvent("any", newClusterState, oldClusterState)); + verify(watcherService, never()).start(any(ClusterState.class)); + verify(watcherService, never()).stop(); + verify(watcherService, times(1)).watchIndexDeletedOrClosed(); + } + + public void testWatchIndexClosing() throws Exception { + DiscoveryNodes discoveryNodes = new DiscoveryNodes.Builder().masterNodeId("id1").localNodeId("id1").build(); + // old cluster state that contains watcher index + Settings indexSettings = Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT).build(); + ClusterState oldClusterState = ClusterState.builder(new ClusterName("my-cluster")) + .metaData(new MetaData.Builder().put(IndexMetaData.builder(WatchStore.INDEX) + .settings(indexSettings).numberOfReplicas(0).numberOfShards(1))) + .nodes(discoveryNodes).build(); + + // new cluster state with a closed watcher index + ClusterState newClusterState = ClusterState.builder(new ClusterName("my-cluster")) + .metaData(new MetaData.Builder().put(IndexMetaData.builder(WatchStore.INDEX).state(IndexMetaData.State.CLOSE) + .settings(indexSettings).numberOfReplicas(0).numberOfShards(1))) + .nodes(discoveryNodes).build(); + when(watcherService.state()).thenReturn(WatcherState.STARTED); + + lifeCycleService.clusterChanged(new ClusterChangedEvent("any", newClusterState, oldClusterState)); + verify(watcherService, never()).start(any(ClusterState.class)); + verify(watcherService, never()).stop(); + verify(watcherService, times(1)).watchIndexDeletedOrClosed(); + } + } diff --git a/elasticsearch/x-pack/watcher/src/test/java/org/elasticsearch/xpack/watcher/watch/WatchStoreTests.java b/elasticsearch/x-pack/watcher/src/test/java/org/elasticsearch/xpack/watcher/watch/WatchStoreTests.java index dc5c0838c91..44221316b23 100644 --- a/elasticsearch/x-pack/watcher/src/test/java/org/elasticsearch/xpack/watcher/watch/WatchStoreTests.java +++ b/elasticsearch/x-pack/watcher/src/test/java/org/elasticsearch/xpack/watcher/watch/WatchStoreTests.java @@ -54,6 +54,7 @@ import java.util.List; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.lessThan; import static org.hamcrest.core.Is.is; import static org.hamcrest.core.IsEqual.equalTo; @@ -123,22 +124,7 @@ public class WatchStoreTests extends ESTestCase { public void testStartRefreshFailed() { ClusterState.Builder csBuilder = new ClusterState.Builder(new ClusterName("_name")); - MetaData.Builder metaDateBuilder = MetaData.builder(); - RoutingTable.Builder routingTableBuilder = RoutingTable.builder(); - Settings settings = settings(Version.CURRENT) - .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) - .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) - .build(); - metaDateBuilder.put(IndexMetaData.builder(WatchStore.INDEX).settings(settings).numberOfShards(1).numberOfReplicas(1)); - final Index index = metaDateBuilder.get(WatchStore.INDEX).getIndex(); - IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder(index); - indexRoutingTableBuilder.addIndexShard(new IndexShardRoutingTable.Builder(new ShardId(index, 0)) - .addShard(TestShardRouting.newShardRouting(WatchStore.INDEX, 0, "_node_id", null, true, ShardRoutingState.STARTED)) - .build()); - indexRoutingTableBuilder.addReplica(); - routingTableBuilder.add(indexRoutingTableBuilder.build()); - csBuilder.metaData(metaDateBuilder); - csBuilder.routingTable(routingTableBuilder.build()); + createWatchIndexMetaData(csBuilder); RefreshResponse refreshResponse = mockRefreshResponse(1, 0); when(clientProxy.refresh(any(RefreshRequest.class))).thenReturn(refreshResponse); @@ -158,22 +144,7 @@ public class WatchStoreTests extends ESTestCase { public void testStartSearchFailed() { ClusterState.Builder csBuilder = new ClusterState.Builder(new ClusterName("_name")); - MetaData.Builder metaDateBuilder = MetaData.builder(); - RoutingTable.Builder routingTableBuilder = RoutingTable.builder(); - Settings settings = settings(Version.CURRENT) - .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) - .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) - .build(); - metaDateBuilder.put(IndexMetaData.builder(WatchStore.INDEX).settings(settings).numberOfShards(1).numberOfReplicas(1)); - final Index index = metaDateBuilder.get(WatchStore.INDEX).getIndex(); - IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder(index); - indexRoutingTableBuilder.addIndexShard(new IndexShardRoutingTable.Builder(new ShardId(index, 0)) - .addShard(TestShardRouting.newShardRouting(WatchStore.INDEX, 0, "_node_id", null, true, ShardRoutingState.STARTED)) - .build()); - indexRoutingTableBuilder.addReplica(); - routingTableBuilder.add(indexRoutingTableBuilder.build()); - csBuilder.metaData(metaDateBuilder); - csBuilder.routingTable(routingTableBuilder.build()); + createWatchIndexMetaData(csBuilder); RefreshResponse refreshResponse = mockRefreshResponse(1, 1); when(clientProxy.refresh(any(RefreshRequest.class))).thenReturn(refreshResponse); @@ -197,22 +168,7 @@ public class WatchStoreTests extends ESTestCase { public void testStartNoWatchStored() throws Exception { ClusterState.Builder csBuilder = new ClusterState.Builder(new ClusterName("_name")); - MetaData.Builder metaDateBuilder = MetaData.builder(); - RoutingTable.Builder routingTableBuilder = RoutingTable.builder(); - Settings settings = settings(Version.CURRENT) - .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) - .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) - .build(); - metaDateBuilder.put(IndexMetaData.builder(WatchStore.INDEX).settings(settings).numberOfShards(1).numberOfReplicas(1)); - final Index index = metaDateBuilder.get(WatchStore.INDEX).getIndex(); - IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder(index); - indexRoutingTableBuilder.addIndexShard(new IndexShardRoutingTable.Builder(new ShardId(index, 0)) - .addShard(TestShardRouting.newShardRouting(WatchStore.INDEX, 0, "_node_id", null, true, ShardRoutingState.STARTED)) - .build()); - indexRoutingTableBuilder.addReplica(); - routingTableBuilder.add(indexRoutingTableBuilder.build()); - csBuilder.metaData(metaDateBuilder); - csBuilder.routingTable(routingTableBuilder.build()); + createWatchIndexMetaData(csBuilder); RefreshResponse refreshResponse = mockRefreshResponse(1, 1); when(clientProxy.refresh(any(RefreshRequest.class))).thenReturn(refreshResponse); @@ -234,22 +190,7 @@ public class WatchStoreTests extends ESTestCase { public void testStartWatchStored() throws Exception { ClusterState.Builder csBuilder = new ClusterState.Builder(new ClusterName("_name")); - MetaData.Builder metaDateBuilder = MetaData.builder(); - RoutingTable.Builder routingTableBuilder = RoutingTable.builder(); - Settings settings = settings(Version.CURRENT) - .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) - .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) - .build(); - metaDateBuilder.put(IndexMetaData.builder(WatchStore.INDEX).settings(settings).numberOfShards(1).numberOfReplicas(1)); - final Index index = metaDateBuilder.get(WatchStore.INDEX).getIndex(); - IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder(index); - indexRoutingTableBuilder.addIndexShard(new IndexShardRoutingTable.Builder(new ShardId(index, 0)) - .addShard(TestShardRouting.newShardRouting(WatchStore.INDEX, 0, "_node_id", null, true, ShardRoutingState.STARTED)) - .build()); - indexRoutingTableBuilder.addReplica(); - routingTableBuilder.add(indexRoutingTableBuilder.build()); - csBuilder.metaData(metaDateBuilder); - csBuilder.routingTable(routingTableBuilder.build()); + createWatchIndexMetaData(csBuilder); RefreshResponse refreshResponse = mockRefreshResponse(1, 1); when(clientProxy.refresh(any(RefreshRequest.class))).thenReturn(refreshResponse); @@ -300,22 +241,7 @@ public class WatchStoreTests extends ESTestCase { public void testUsageStats() throws Exception { ClusterState.Builder csBuilder = new ClusterState.Builder(new ClusterName("_name")); - MetaData.Builder metaDateBuilder = MetaData.builder(); - RoutingTable.Builder routingTableBuilder = RoutingTable.builder(); - Settings settings = settings(Version.CURRENT) - .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) - .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) - .build(); - metaDateBuilder.put(IndexMetaData.builder(WatchStore.INDEX).settings(settings).numberOfShards(1).numberOfReplicas(1)); - final Index index = metaDateBuilder.get(WatchStore.INDEX).getIndex(); - IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder(index); - indexRoutingTableBuilder.addIndexShard(new IndexShardRoutingTable.Builder(new ShardId(index, 0)) - .addShard(TestShardRouting.newShardRouting(WatchStore.INDEX, 0, "_node_id", null, true, ShardRoutingState.STARTED)) - .build()); - indexRoutingTableBuilder.addReplica(); - routingTableBuilder.add(indexRoutingTableBuilder.build()); - csBuilder.metaData(metaDateBuilder); - csBuilder.routingTable(routingTableBuilder.build()); + createWatchIndexMetaData(csBuilder); RefreshResponse refreshResponse = mockRefreshResponse(1, 1); when(clientProxy.refresh(any(RefreshRequest.class))).thenReturn(refreshResponse); @@ -419,6 +345,65 @@ public class WatchStoreTests extends ESTestCase { assertThat(stats.getValue("watch.transform.TYPE.active"), is(greaterThan(0))); } + public void testThatCleaningWatchesWorks() throws Exception { + ClusterState.Builder csBuilder = new ClusterState.Builder(new ClusterName("_name")); + createWatchIndexMetaData(csBuilder); + + RefreshResponse refreshResponse = mockRefreshResponse(1, 1); + when(clientProxy.refresh(any(RefreshRequest.class))).thenReturn(refreshResponse); + + BytesReference source = new BytesArray("{}"); + InternalSearchHit hit = new InternalSearchHit(0, "_id1", new Text("type"), Collections.emptyMap()); + hit.sourceRef(source); + + SearchResponse searchResponse = mockSearchResponse(1, 1, 1, hit); + when(clientProxy.search(any(SearchRequest.class), any(TimeValue.class))).thenReturn(searchResponse); + + SearchResponse finalSearchResponse = mockSearchResponse(1, 1, 0); + when(clientProxy.searchScroll(anyString(), any(TimeValue.class))).thenReturn(finalSearchResponse); + + Watch watch = mock(Watch.class); + WatchStatus status = mock(WatchStatus.class); + when(watch.status()).thenReturn(status); + when(parser.parse("_id1", true, source)).thenReturn(watch); + + when(clientProxy.clearScroll(anyString())).thenReturn(new ClearScrollResponse(true, 0)); + + ClusterState cs = csBuilder.build(); + assertThat(watchStore.validate(cs), is(true)); + watchStore.start(cs); + assertThat(watchStore.started(), is(true)); + assertThat(watchStore.watches(), hasSize(1)); + + watchStore.clearWatchesInMemory(); + assertThat(watchStore.started(), is(true)); + assertThat(watchStore.watches(), hasSize(0)); + assertThat(watchStore.activeWatches(), hasSize(0)); + } + + /* + * Creates the standard cluster state metadata for the watches index + * with shards/replicas being marked as started + */ + private void createWatchIndexMetaData(ClusterState.Builder builder) { + MetaData.Builder metaDateBuilder = MetaData.builder(); + RoutingTable.Builder routingTableBuilder = RoutingTable.builder(); + Settings settings = settings(Version.CURRENT) + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) + .build(); + metaDateBuilder.put(IndexMetaData.builder(WatchStore.INDEX).settings(settings).numberOfShards(1).numberOfReplicas(1)); + final Index index = metaDateBuilder.get(WatchStore.INDEX).getIndex(); + IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder(index); + indexRoutingTableBuilder.addIndexShard(new IndexShardRoutingTable.Builder(new ShardId(index, 0)) + .addShard(TestShardRouting.newShardRouting(WatchStore.INDEX, 0, "_node_id", null, true, ShardRoutingState.STARTED)) + .build()); + indexRoutingTableBuilder.addReplica(); + routingTableBuilder.add(indexRoutingTableBuilder.build()); + builder.metaData(metaDateBuilder); + builder.routingTable(routingTableBuilder.build()); + } + private RefreshResponse mockRefreshResponse(int total, int successful) { RefreshResponse refreshResponse = mock(RefreshResponse.class); when(refreshResponse.getTotalShards()).thenReturn(total); @@ -432,7 +417,6 @@ public class WatchStoreTests extends ESTestCase { when(searchResponse.getTotalShards()).thenReturn(total); when(searchResponse.getSuccessfulShards()).thenReturn(successful); when(searchResponse.getHits()).thenReturn(internalSearchHits); - when(searchResponse.getHits()).thenReturn(internalSearchHits); return searchResponse; }