From bb033f1e0085454134d4da1cd9d9fd4238b2b529 Mon Sep 17 00:00:00 2001 From: Alexander Reelsen Date: Wed, 7 Sep 2016 15:06:03 +0200 Subject: [PATCH] Watcher: Clear out WatchStore on watch index deletion (elastic/elasticsearch#2807) If someone deletes the watch index (i.e. by deleting all indices), the watcher in memory store still contains all the watches and tries to execute watches - which results in exceptions as the watch itself cannot be updated anymore. In order to minimize this problem (it cant be get rid of completely), we should act accordingly if the watch index goes missing (either deleted or closed) and clear out the memory representation of watches in the watchstore as well as trying to finish all the current executions. Closes elastic/elasticsearch#2794 Original commit: elastic/x-pack-elasticsearch@12d98cd566ee4d0d37fc5eecf0ac6a314722c8b9 --- .../watcher/WatcherLifeCycleService.java | 49 +++--- .../xpack/watcher/WatcherService.java | 11 +- .../watcher/execution/ExecutionService.java | 24 +-- .../xpack/watcher/watch/WatchStore.java | 4 + .../watcher/WatcherLifeCycleServiceTests.java | 48 +++++- .../xpack/watcher/watch/WatchStoreTests.java | 146 ++++++++---------- 6 files changed, 163 insertions(+), 119 deletions(-) diff --git a/elasticsearch/x-pack/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleService.java b/elasticsearch/x-pack/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleService.java index daa21ac5d89..b96fed9f746 100644 --- a/elasticsearch/x-pack/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleService.java +++ b/elasticsearch/x-pack/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleService.java @@ -13,6 +13,7 @@ import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateListener; import org.elasticsearch.cluster.ack.AckedRequest; +import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.component.AbstractComponent; @@ -22,6 +23,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.gateway.GatewayService; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xpack.watcher.watch.WatchStore; import java.util.concurrent.CountDownLatch; @@ -119,34 +121,33 @@ public class WatcherLifeCycleService extends AbstractComponent implements Cluste } if (!event.localNodeMaster()) { - if (watcherService.state() != WatcherState.STARTED) { - // to avoid unnecessary forking of threads... - return; + if (watcherService.state() == WatcherState.STARTED) { + // We're no longer the master so we need to stop the watcher. + // Stopping the watcher may take a while since it will wait on the scheduler to complete shutdown, + // so we fork here so that we don't wait too long. Other events may need to be processed and + // other cluster state listeners may need to be executed as well for this event. + threadPool.executor(ThreadPool.Names.GENERIC).execute(() -> stop(false)); } - - // We're no longer the master so we need to stop the watcher. - // Stopping the watcher may take a while since it will wait on the scheduler to complete shutdown, - // so we fork here so that we don't wait too long. Other events may need to be processed and - // other cluster state listeners may need to be executed as well for this event. - threadPool.executor(ThreadPool.Names.GENERIC).execute(new Runnable() { - @Override - public void run() { - stop(false); - } - }); } else { - if (watcherService.state() != WatcherState.STOPPED) { - // to avoid unnecessary forking of threads... - return; - } + if (watcherService.state() == WatcherState.STOPPED) { + final ClusterState state = event.state(); + threadPool.executor(ThreadPool.Names.GENERIC).execute(() -> start(state, false)); + } else { + boolean isWatchIndexDeleted = event.indicesDeleted().stream() + .filter(index -> WatchStore.INDEX.equals(index.getName())) + .findAny() + .isPresent(); - final ClusterState state = event.state(); - threadPool.executor(ThreadPool.Names.GENERIC).execute(new Runnable() { - @Override - public void run() { - start(state, false); + boolean isWatchIndexOpenInPreviousClusterState = event.previousState().metaData().hasIndex(WatchStore.INDEX) && + event.previousState().metaData().index(WatchStore.INDEX).getState() == IndexMetaData.State.OPEN; + boolean isWatchIndexClosedInCurrentClusterState = event.state().metaData().hasIndex(WatchStore.INDEX) && + event.state().metaData().index(WatchStore.INDEX).getState() == IndexMetaData.State.CLOSE; + boolean hasWatcherIndexBeenClosed = isWatchIndexOpenInPreviousClusterState && isWatchIndexClosedInCurrentClusterState; + + if (isWatchIndexDeleted || hasWatcherIndexBeenClosed) { + threadPool.executor(ThreadPool.Names.GENERIC).execute(() -> watcherService.watchIndexDeletedOrClosed()); } - }); + } } } diff --git a/elasticsearch/x-pack/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherService.java b/elasticsearch/x-pack/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherService.java index fb66c4d5062..174eb77fa2f 100644 --- a/elasticsearch/x-pack/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherService.java +++ b/elasticsearch/x-pack/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherService.java @@ -16,9 +16,9 @@ import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.engine.VersionConflictEngineException; +import org.elasticsearch.xpack.support.clock.Clock; import org.elasticsearch.xpack.watcher.execution.ExecutionService; import org.elasticsearch.xpack.watcher.support.WatcherIndexTemplateRegistry; -import org.elasticsearch.xpack.support.clock.Clock; import org.elasticsearch.xpack.watcher.trigger.TriggerService; import org.elasticsearch.xpack.watcher.watch.Watch; import org.elasticsearch.xpack.watcher.watch.WatchLockService; @@ -292,4 +292,13 @@ public class WatcherService extends AbstractComponent { innerMap.putAll(watchStore.usageStats()); return innerMap; } + + /** + * Something deleted or closed the {@link WatchStore#INDEX} and thus we need to do some cleanup to prevent further execution of watches + * as those watches cannot be updated anymore + */ + public void watchIndexDeletedOrClosed() { + watchStore.clearWatchesInMemory(); + executionService.clearExecutions(); + } } diff --git a/elasticsearch/x-pack/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/ExecutionService.java b/elasticsearch/x-pack/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/ExecutionService.java index 35ad6a9f9da..5e4f97e4102 100644 --- a/elasticsearch/x-pack/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/ExecutionService.java +++ b/elasticsearch/x-pack/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/ExecutionService.java @@ -141,12 +141,7 @@ public class ExecutionService extends AbstractComponent { currentExecutions.add(watchExecution.createSnapshot()); } // Lets show the longest running watch first: - Collections.sort(currentExecutions, new Comparator() { - @Override - public int compare(WatchExecutionSnapshot e1, WatchExecutionSnapshot e2) { - return e1.executionTime().compareTo(e2.executionTime()); - } - }); + Collections.sort(currentExecutions, Comparator.comparing(WatchExecutionSnapshot::executionTime)); return currentExecutions; } @@ -163,12 +158,8 @@ public class ExecutionService extends AbstractComponent { queuedWatches.add(new QueuedWatch(executionTask.ctx)); } // Lets show the execution that pending the longest first: - Collections.sort(queuedWatches, new Comparator() { - @Override - public int compare(QueuedWatch e1, QueuedWatch e2) { - return e1.executionTime().compareTo(e2.executionTime()); - } - }); + + Collections.sort(queuedWatches, Comparator.comparing(QueuedWatch::executionTime)); return queuedWatches; } @@ -438,6 +429,15 @@ public class ExecutionService extends AbstractComponent { return counters.toMap(); } + /** + * This clears out the current executions and sets new empty current executions + * This is needed, because when this method is called, watcher keeps running, so sealing executions would be a bad idea + */ + public void clearExecutions() { + currentExecutions.sealAndAwaitEmpty(maxStopTimeout); + currentExecutions = new CurrentExecutions(); + } + private static final class StartupExecutionContext extends TriggeredExecutionContext { public StartupExecutionContext(Watch watch, DateTime executionTime, TriggerEvent triggerEvent, TimeValue defaultThrottlePeriod) { diff --git a/elasticsearch/x-pack/watcher/src/main/java/org/elasticsearch/xpack/watcher/watch/WatchStore.java b/elasticsearch/x-pack/watcher/src/main/java/org/elasticsearch/xpack/watcher/watch/WatchStore.java index 4975baaa470..43305267267 100644 --- a/elasticsearch/x-pack/watcher/src/main/java/org/elasticsearch/xpack/watcher/watch/WatchStore.java +++ b/elasticsearch/x-pack/watcher/src/main/java/org/elasticsearch/xpack/watcher/watch/WatchStore.java @@ -333,6 +333,10 @@ public class WatchStore extends AbstractComponent { } } + public void clearWatchesInMemory() { + watches.clear(); + } + public class WatchPut { private final Watch previous; diff --git a/elasticsearch/x-pack/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleServiceTests.java b/elasticsearch/x-pack/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleServiceTests.java index 193e927cf43..feff0267558 100644 --- a/elasticsearch/x-pack/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleServiceTests.java +++ b/elasticsearch/x-pack/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleServiceTests.java @@ -5,18 +5,22 @@ */ package org.elasticsearch.xpack.watcher; +import org.elasticsearch.Version; import org.elasticsearch.cluster.AckedClusterStateUpdateTask; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterName; -import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.block.ClusterBlocks; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.gateway.GatewayService; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xpack.watcher.watch.WatchStore; import org.junit.Before; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; @@ -165,4 +169,46 @@ public class WatcherLifeCycleServiceTests extends ESTestCase { verify(watcherService, never()).start(any(ClusterState.class)); verify(watcherService, never()).stop(); } + + public void testWatchIndexDeletion() throws Exception { + DiscoveryNodes discoveryNodes = new DiscoveryNodes.Builder().masterNodeId("id1").localNodeId("id1").build(); + // old cluster state that contains watcher index + Settings indexSettings = Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT).build(); + ClusterState oldClusterState = ClusterState.builder(new ClusterName("my-cluster")) + .metaData(new MetaData.Builder().put(IndexMetaData.builder(WatchStore.INDEX) + .settings(indexSettings).numberOfReplicas(0).numberOfShards(1))) + .nodes(discoveryNodes).build(); + + // new cluster state that does not contain watcher index + ClusterState newClusterState = ClusterState.builder(new ClusterName("my-cluster")).nodes(discoveryNodes).build(); + when(watcherService.state()).thenReturn(WatcherState.STARTED); + + lifeCycleService.clusterChanged(new ClusterChangedEvent("any", newClusterState, oldClusterState)); + verify(watcherService, never()).start(any(ClusterState.class)); + verify(watcherService, never()).stop(); + verify(watcherService, times(1)).watchIndexDeletedOrClosed(); + } + + public void testWatchIndexClosing() throws Exception { + DiscoveryNodes discoveryNodes = new DiscoveryNodes.Builder().masterNodeId("id1").localNodeId("id1").build(); + // old cluster state that contains watcher index + Settings indexSettings = Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT).build(); + ClusterState oldClusterState = ClusterState.builder(new ClusterName("my-cluster")) + .metaData(new MetaData.Builder().put(IndexMetaData.builder(WatchStore.INDEX) + .settings(indexSettings).numberOfReplicas(0).numberOfShards(1))) + .nodes(discoveryNodes).build(); + + // new cluster state with a closed watcher index + ClusterState newClusterState = ClusterState.builder(new ClusterName("my-cluster")) + .metaData(new MetaData.Builder().put(IndexMetaData.builder(WatchStore.INDEX).state(IndexMetaData.State.CLOSE) + .settings(indexSettings).numberOfReplicas(0).numberOfShards(1))) + .nodes(discoveryNodes).build(); + when(watcherService.state()).thenReturn(WatcherState.STARTED); + + lifeCycleService.clusterChanged(new ClusterChangedEvent("any", newClusterState, oldClusterState)); + verify(watcherService, never()).start(any(ClusterState.class)); + verify(watcherService, never()).stop(); + verify(watcherService, times(1)).watchIndexDeletedOrClosed(); + } + } diff --git a/elasticsearch/x-pack/watcher/src/test/java/org/elasticsearch/xpack/watcher/watch/WatchStoreTests.java b/elasticsearch/x-pack/watcher/src/test/java/org/elasticsearch/xpack/watcher/watch/WatchStoreTests.java index dc5c0838c91..44221316b23 100644 --- a/elasticsearch/x-pack/watcher/src/test/java/org/elasticsearch/xpack/watcher/watch/WatchStoreTests.java +++ b/elasticsearch/x-pack/watcher/src/test/java/org/elasticsearch/xpack/watcher/watch/WatchStoreTests.java @@ -54,6 +54,7 @@ import java.util.List; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.lessThan; import static org.hamcrest.core.Is.is; import static org.hamcrest.core.IsEqual.equalTo; @@ -123,22 +124,7 @@ public class WatchStoreTests extends ESTestCase { public void testStartRefreshFailed() { ClusterState.Builder csBuilder = new ClusterState.Builder(new ClusterName("_name")); - MetaData.Builder metaDateBuilder = MetaData.builder(); - RoutingTable.Builder routingTableBuilder = RoutingTable.builder(); - Settings settings = settings(Version.CURRENT) - .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) - .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) - .build(); - metaDateBuilder.put(IndexMetaData.builder(WatchStore.INDEX).settings(settings).numberOfShards(1).numberOfReplicas(1)); - final Index index = metaDateBuilder.get(WatchStore.INDEX).getIndex(); - IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder(index); - indexRoutingTableBuilder.addIndexShard(new IndexShardRoutingTable.Builder(new ShardId(index, 0)) - .addShard(TestShardRouting.newShardRouting(WatchStore.INDEX, 0, "_node_id", null, true, ShardRoutingState.STARTED)) - .build()); - indexRoutingTableBuilder.addReplica(); - routingTableBuilder.add(indexRoutingTableBuilder.build()); - csBuilder.metaData(metaDateBuilder); - csBuilder.routingTable(routingTableBuilder.build()); + createWatchIndexMetaData(csBuilder); RefreshResponse refreshResponse = mockRefreshResponse(1, 0); when(clientProxy.refresh(any(RefreshRequest.class))).thenReturn(refreshResponse); @@ -158,22 +144,7 @@ public class WatchStoreTests extends ESTestCase { public void testStartSearchFailed() { ClusterState.Builder csBuilder = new ClusterState.Builder(new ClusterName("_name")); - MetaData.Builder metaDateBuilder = MetaData.builder(); - RoutingTable.Builder routingTableBuilder = RoutingTable.builder(); - Settings settings = settings(Version.CURRENT) - .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) - .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) - .build(); - metaDateBuilder.put(IndexMetaData.builder(WatchStore.INDEX).settings(settings).numberOfShards(1).numberOfReplicas(1)); - final Index index = metaDateBuilder.get(WatchStore.INDEX).getIndex(); - IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder(index); - indexRoutingTableBuilder.addIndexShard(new IndexShardRoutingTable.Builder(new ShardId(index, 0)) - .addShard(TestShardRouting.newShardRouting(WatchStore.INDEX, 0, "_node_id", null, true, ShardRoutingState.STARTED)) - .build()); - indexRoutingTableBuilder.addReplica(); - routingTableBuilder.add(indexRoutingTableBuilder.build()); - csBuilder.metaData(metaDateBuilder); - csBuilder.routingTable(routingTableBuilder.build()); + createWatchIndexMetaData(csBuilder); RefreshResponse refreshResponse = mockRefreshResponse(1, 1); when(clientProxy.refresh(any(RefreshRequest.class))).thenReturn(refreshResponse); @@ -197,22 +168,7 @@ public class WatchStoreTests extends ESTestCase { public void testStartNoWatchStored() throws Exception { ClusterState.Builder csBuilder = new ClusterState.Builder(new ClusterName("_name")); - MetaData.Builder metaDateBuilder = MetaData.builder(); - RoutingTable.Builder routingTableBuilder = RoutingTable.builder(); - Settings settings = settings(Version.CURRENT) - .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) - .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) - .build(); - metaDateBuilder.put(IndexMetaData.builder(WatchStore.INDEX).settings(settings).numberOfShards(1).numberOfReplicas(1)); - final Index index = metaDateBuilder.get(WatchStore.INDEX).getIndex(); - IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder(index); - indexRoutingTableBuilder.addIndexShard(new IndexShardRoutingTable.Builder(new ShardId(index, 0)) - .addShard(TestShardRouting.newShardRouting(WatchStore.INDEX, 0, "_node_id", null, true, ShardRoutingState.STARTED)) - .build()); - indexRoutingTableBuilder.addReplica(); - routingTableBuilder.add(indexRoutingTableBuilder.build()); - csBuilder.metaData(metaDateBuilder); - csBuilder.routingTable(routingTableBuilder.build()); + createWatchIndexMetaData(csBuilder); RefreshResponse refreshResponse = mockRefreshResponse(1, 1); when(clientProxy.refresh(any(RefreshRequest.class))).thenReturn(refreshResponse); @@ -234,22 +190,7 @@ public class WatchStoreTests extends ESTestCase { public void testStartWatchStored() throws Exception { ClusterState.Builder csBuilder = new ClusterState.Builder(new ClusterName("_name")); - MetaData.Builder metaDateBuilder = MetaData.builder(); - RoutingTable.Builder routingTableBuilder = RoutingTable.builder(); - Settings settings = settings(Version.CURRENT) - .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) - .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) - .build(); - metaDateBuilder.put(IndexMetaData.builder(WatchStore.INDEX).settings(settings).numberOfShards(1).numberOfReplicas(1)); - final Index index = metaDateBuilder.get(WatchStore.INDEX).getIndex(); - IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder(index); - indexRoutingTableBuilder.addIndexShard(new IndexShardRoutingTable.Builder(new ShardId(index, 0)) - .addShard(TestShardRouting.newShardRouting(WatchStore.INDEX, 0, "_node_id", null, true, ShardRoutingState.STARTED)) - .build()); - indexRoutingTableBuilder.addReplica(); - routingTableBuilder.add(indexRoutingTableBuilder.build()); - csBuilder.metaData(metaDateBuilder); - csBuilder.routingTable(routingTableBuilder.build()); + createWatchIndexMetaData(csBuilder); RefreshResponse refreshResponse = mockRefreshResponse(1, 1); when(clientProxy.refresh(any(RefreshRequest.class))).thenReturn(refreshResponse); @@ -300,22 +241,7 @@ public class WatchStoreTests extends ESTestCase { public void testUsageStats() throws Exception { ClusterState.Builder csBuilder = new ClusterState.Builder(new ClusterName("_name")); - MetaData.Builder metaDateBuilder = MetaData.builder(); - RoutingTable.Builder routingTableBuilder = RoutingTable.builder(); - Settings settings = settings(Version.CURRENT) - .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) - .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) - .build(); - metaDateBuilder.put(IndexMetaData.builder(WatchStore.INDEX).settings(settings).numberOfShards(1).numberOfReplicas(1)); - final Index index = metaDateBuilder.get(WatchStore.INDEX).getIndex(); - IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder(index); - indexRoutingTableBuilder.addIndexShard(new IndexShardRoutingTable.Builder(new ShardId(index, 0)) - .addShard(TestShardRouting.newShardRouting(WatchStore.INDEX, 0, "_node_id", null, true, ShardRoutingState.STARTED)) - .build()); - indexRoutingTableBuilder.addReplica(); - routingTableBuilder.add(indexRoutingTableBuilder.build()); - csBuilder.metaData(metaDateBuilder); - csBuilder.routingTable(routingTableBuilder.build()); + createWatchIndexMetaData(csBuilder); RefreshResponse refreshResponse = mockRefreshResponse(1, 1); when(clientProxy.refresh(any(RefreshRequest.class))).thenReturn(refreshResponse); @@ -419,6 +345,65 @@ public class WatchStoreTests extends ESTestCase { assertThat(stats.getValue("watch.transform.TYPE.active"), is(greaterThan(0))); } + public void testThatCleaningWatchesWorks() throws Exception { + ClusterState.Builder csBuilder = new ClusterState.Builder(new ClusterName("_name")); + createWatchIndexMetaData(csBuilder); + + RefreshResponse refreshResponse = mockRefreshResponse(1, 1); + when(clientProxy.refresh(any(RefreshRequest.class))).thenReturn(refreshResponse); + + BytesReference source = new BytesArray("{}"); + InternalSearchHit hit = new InternalSearchHit(0, "_id1", new Text("type"), Collections.emptyMap()); + hit.sourceRef(source); + + SearchResponse searchResponse = mockSearchResponse(1, 1, 1, hit); + when(clientProxy.search(any(SearchRequest.class), any(TimeValue.class))).thenReturn(searchResponse); + + SearchResponse finalSearchResponse = mockSearchResponse(1, 1, 0); + when(clientProxy.searchScroll(anyString(), any(TimeValue.class))).thenReturn(finalSearchResponse); + + Watch watch = mock(Watch.class); + WatchStatus status = mock(WatchStatus.class); + when(watch.status()).thenReturn(status); + when(parser.parse("_id1", true, source)).thenReturn(watch); + + when(clientProxy.clearScroll(anyString())).thenReturn(new ClearScrollResponse(true, 0)); + + ClusterState cs = csBuilder.build(); + assertThat(watchStore.validate(cs), is(true)); + watchStore.start(cs); + assertThat(watchStore.started(), is(true)); + assertThat(watchStore.watches(), hasSize(1)); + + watchStore.clearWatchesInMemory(); + assertThat(watchStore.started(), is(true)); + assertThat(watchStore.watches(), hasSize(0)); + assertThat(watchStore.activeWatches(), hasSize(0)); + } + + /* + * Creates the standard cluster state metadata for the watches index + * with shards/replicas being marked as started + */ + private void createWatchIndexMetaData(ClusterState.Builder builder) { + MetaData.Builder metaDateBuilder = MetaData.builder(); + RoutingTable.Builder routingTableBuilder = RoutingTable.builder(); + Settings settings = settings(Version.CURRENT) + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) + .build(); + metaDateBuilder.put(IndexMetaData.builder(WatchStore.INDEX).settings(settings).numberOfShards(1).numberOfReplicas(1)); + final Index index = metaDateBuilder.get(WatchStore.INDEX).getIndex(); + IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder(index); + indexRoutingTableBuilder.addIndexShard(new IndexShardRoutingTable.Builder(new ShardId(index, 0)) + .addShard(TestShardRouting.newShardRouting(WatchStore.INDEX, 0, "_node_id", null, true, ShardRoutingState.STARTED)) + .build()); + indexRoutingTableBuilder.addReplica(); + routingTableBuilder.add(indexRoutingTableBuilder.build()); + builder.metaData(metaDateBuilder); + builder.routingTable(routingTableBuilder.build()); + } + private RefreshResponse mockRefreshResponse(int total, int successful) { RefreshResponse refreshResponse = mock(RefreshResponse.class); when(refreshResponse.getTotalShards()).thenReturn(total); @@ -432,7 +417,6 @@ public class WatchStoreTests extends ESTestCase { when(searchResponse.getTotalShards()).thenReturn(total); when(searchResponse.getSuccessfulShards()).thenReturn(successful); when(searchResponse.getHits()).thenReturn(internalSearchHits); - when(searchResponse.getHits()).thenReturn(internalSearchHits); return searchResponse; }