From 376c9be6faadfbe84585036d4e537cee023edf17 Mon Sep 17 00:00:00 2001 From: Alexander Reelsen Date: Tue, 6 Jun 2017 09:39:11 +0200 Subject: [PATCH] Watcher: Ensure reloading happens based on watch index instead of alias (elastic/x-pack-elasticsearch#1544) The cluster state listener to decide if watcher should be reloaded was assuming that no aliases could be used and thus wrongly could trigger a reload, which could have lead to wrong test results. During debugging I also added a reason for reloading and fixed another wrong test assumption. Also the listener does not rely on previous cluster state, but stores this in instance variable, as we need to compare with local state and not the previous cluster state. Original commit: elastic/x-pack-elasticsearch@582783a66d0792d82554517f5666c532a3468dad --- .../watcher/WatcherLifeCycleService.java | 29 +++++++------------ .../xpack/watcher/WatcherService.java | 9 +++--- .../watcher/execution/ExecutionService.java | 8 +++-- .../watcher/WatcherLifeCycleServiceTests.java | 18 +++++++++--- .../test/integration/BootStrapTests.java | 7 ++--- 5 files changed, 37 insertions(+), 34 deletions(-) diff --git a/plugin/src/main/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleService.java b/plugin/src/main/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleService.java index 12b54882edd..443511013ea 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleService.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleService.java @@ -25,6 +25,7 @@ import org.elasticsearch.xpack.watcher.watch.WatchStoreUtils; import java.util.Collections; import java.util.List; import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import static org.elasticsearch.cluster.routing.ShardRoutingState.RELOCATING; @@ -35,6 +36,7 @@ public class WatcherLifeCycleService extends AbstractComponent implements Cluste private final WatcherService watcherService; private final ClusterService clusterService; private final ExecutorService executor; + private AtomicReference> previousAllocationIds = new AtomicReference<>(Collections.emptyList()); private volatile WatcherMetaData watcherMetaData; public WatcherLifeCycleService(Settings settings, ThreadPool threadPool, ClusterService clusterService, @@ -119,11 +121,10 @@ public class WatcherLifeCycleService extends AbstractComponent implements Cluste if (watcherService.state() == WatcherState.STARTED && event.state().nodes().getLocalNode().isDataNode()) { DiscoveryNode localNode = event.state().nodes().getLocalNode(); RoutingNode routingNode = event.state().getRoutingNodes().node(localNode.getId()); - IndexMetaData watcherIndexMetaData = WatchStoreUtils.getConcreteIndex(Watch.INDEX, - event.state().metaData()); + IndexMetaData watcherIndexMetaData = WatchStoreUtils.getConcreteIndex(Watch.INDEX, event.state().metaData()); // no watcher index, time to pause if (watcherIndexMetaData == null) { - executor.execute(watcherService::pauseExecution); + executor.execute(() -> watcherService.pauseExecution("no watcher index found")); return; } @@ -132,30 +133,20 @@ public class WatcherLifeCycleService extends AbstractComponent implements Cluste // no local shards, empty out watcher and not waste resources! if (localShards.isEmpty()) { - executor.execute(watcherService::pauseExecution); + executor.execute(() -> watcherService.pauseExecution("no local watcher shards")); + previousAllocationIds.set(Collections.emptyList()); return; } - List previousAllocationIds = event.previousState().getRoutingNodes().node(localNode.getId()) - .shardsWithState(watchIndex, RELOCATING, STARTED).stream() - .map(ShardRouting::allocationId) - .map(AllocationId::getId) - .collect(Collectors.toList()); - List currentAllocationIds = localShards.stream() .map(ShardRouting::allocationId) .map(AllocationId::getId) .collect(Collectors.toList()); + Collections.sort(currentAllocationIds); - if (previousAllocationIds.size() == currentAllocationIds.size()) { - // make sure we can compare the created list of allocation ids - Collections.sort(previousAllocationIds); - Collections.sort(currentAllocationIds); - if (previousAllocationIds.equals(currentAllocationIds) == false) { - executor.execute(() -> watcherService.reload(event.state())); - } - } else { - executor.execute(() -> watcherService.reload(event.state())); + if (previousAllocationIds.get().equals(currentAllocationIds) == false) { + previousAllocationIds.set(currentAllocationIds); + executor.execute(() -> watcherService.reload(event.state(), "different shard allocation ids")); } } else if (watcherService.state() != WatcherState.STARTED && watcherService.state() != WatcherState.STARTING) { executor.execute(() -> start(event.state(), false)); diff --git a/plugin/src/main/java/org/elasticsearch/xpack/watcher/WatcherService.java b/plugin/src/main/java/org/elasticsearch/xpack/watcher/WatcherService.java index 085985a9fe2..ebd7442811a 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/watcher/WatcherService.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/watcher/WatcherService.java @@ -165,8 +165,8 @@ public class WatcherService extends AbstractComponent { * Reload the watcher service, does not switch the state from stopped to started, just keep going * @param clusterState cluster state, which is needed to find out about local shards */ - public void reload(ClusterState clusterState) { - pauseExecution(); + public void reload(ClusterState clusterState, String reason) { + pauseExecution(reason); // load watches Collection watches = loadWatches(clusterState); @@ -183,9 +183,10 @@ public class WatcherService extends AbstractComponent { * Stop execution of watches on this node, do not try to reload anything, but still allow * manual watch execution, i.e. via the execute watch API */ - public void pauseExecution() { - executionService.pauseExecution(); + public void pauseExecution(String reason) { + int cancelledTaskCount = executionService.pauseExecution(); triggerService.pauseExecution(); + logger.debug("paused execution service, reason [{}], cancelled [{}] queued tasks", reason, cancelledTaskCount); } /** diff --git a/plugin/src/main/java/org/elasticsearch/xpack/watcher/execution/ExecutionService.java b/plugin/src/main/java/org/elasticsearch/xpack/watcher/execution/ExecutionService.java index b108b909a19..83325f65063 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/watcher/execution/ExecutionService.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/watcher/execution/ExecutionService.java @@ -134,10 +134,14 @@ public class ExecutionService extends AbstractComponent { } } - public synchronized void pauseExecution() { + /** + * Pause the execution of the watcher executor + * @return the number of tasks that have been removed + */ + public synchronized int pauseExecution() { int cancelledTaskCount = executor.queue().drainTo(new ArrayList<>()); this.clearExecutions(); - logger.debug("paused execution service, cancelled [{}] queued tasks", cancelledTaskCount); + return cancelledTaskCount; } public TimeValue defaultThrottlePeriod() { diff --git a/plugin/src/test/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleServiceTests.java b/plugin/src/test/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleServiceTests.java index b129929cae8..1c5c1cbed39 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleServiceTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleServiceTests.java @@ -40,6 +40,7 @@ import static java.util.Arrays.asList; import static org.elasticsearch.cluster.routing.ShardRoutingState.RELOCATING; import static org.elasticsearch.cluster.routing.ShardRoutingState.STARTED; import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyObject; import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.doAnswer; @@ -187,15 +188,24 @@ public class WatcherLifeCycleServiceTests extends ESTestCase { DiscoveryNodes nodes = new DiscoveryNodes.Builder().masterNodeId("node_1").localNodeId("node_1") .add(newNode("node_1")).add(newNode("node_2")) .build(); + IndexMetaData indexMetaData = IndexMetaData.builder(Watch.INDEX) + .settings(Settings.builder() + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0) + .put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) + ).build(); + ClusterState clusterState = ClusterState.builder(new ClusterName("my-cluster")) .nodes(nodes) .routingTable(RoutingTable.builder().add(watchRoutingTable).build()) + .metaData(MetaData.builder().put(indexMetaData, false)) + .build(); when(watcherService.state()).thenReturn(WatcherState.STARTED); lifeCycleService.clusterChanged(new ClusterChangedEvent("any", clusterState, clusterState)); - verify(watcherService).pauseExecution(); + verify(watcherService).pauseExecution(eq("no local watcher shards")); } public void testReplicaWasAddedOrRemoved() throws Exception { @@ -251,7 +261,7 @@ public class WatcherLifeCycleServiceTests extends ESTestCase { when(watcherService.state()).thenReturn(WatcherState.STARTED); lifeCycleService.clusterChanged(event); - verify(watcherService).reload(eq(usedClusterState)); + verify(watcherService).reload(eq(usedClusterState), anyString()); } // make sure that cluster state changes can be processed on nodes that do not hold data @@ -300,8 +310,8 @@ public class WatcherLifeCycleServiceTests extends ESTestCase { when(watcherService.state()).thenReturn(WatcherState.STARTED); lifeCycleService.clusterChanged(new ClusterChangedEvent("any", currentState, previousState)); - verify(watcherService, times(0)).pauseExecution(); - verify(watcherService, times(0)).reload(any()); + verify(watcherService, times(0)).pauseExecution(anyObject()); + verify(watcherService, times(0)).reload(any(), any()); } private static DiscoveryNode newNode(String nodeName) { diff --git a/plugin/src/test/java/org/elasticsearch/xpack/watcher/test/integration/BootStrapTests.java b/plugin/src/test/java/org/elasticsearch/xpack/watcher/test/integration/BootStrapTests.java index c58e534e346..bf0bb491c92 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/watcher/test/integration/BootStrapTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/watcher/test/integration/BootStrapTests.java @@ -231,7 +231,6 @@ public class BootStrapTests extends AbstractWatcherIntegrationTestCase { assertSingleExecutionAndCompleteWatchHistory(numWatches, numRecords); } - @AwaitsFix(bugUrl = "https://github.com/elastic/x-pack-elasticsearch/issues/1309") public void testTriggeredWatchLoading() throws Exception { createIndex("output"); client().prepareIndex("my-index", "foo", "bar") @@ -255,8 +254,7 @@ public class BootStrapTests extends AbstractWatcherIntegrationTestCase { stopWatcher(); DateTime now = DateTime.now(UTC); -// final int numRecords = scaledRandomIntBetween(2, 12); - final int numRecords = 10; + final int numRecords = scaledRandomIntBetween(2, 12); BulkRequestBuilder bulkRequestBuilder = client().prepareBulk(); for (int i = 0; i < numRecords; i++) { now = now.plusMinutes(1); @@ -291,8 +289,7 @@ public class BootStrapTests extends AbstractWatcherIntegrationTestCase { long successfulWatchExecutions = searchResponse.getHits().getTotalHits(); // the watch history should contain entries for each triggered watch, which a few have been marked as not executed - SearchResponse historySearchResponse = client().prepareSearch(HistoryStore.INDEX_PREFIX + "*") - .setSize(10000).get(); + SearchResponse historySearchResponse = client().prepareSearch(HistoryStore.INDEX_PREFIX + "*").setSize(10000).get(); assertHitCount(historySearchResponse, expectedWatchHistoryCount); long notExecutedCount = Arrays.stream(historySearchResponse.getHits().getHits()) .filter(hit -> hit.getSourceAsMap().get("state").equals(ExecutionState.NOT_EXECUTED_ALREADY_QUEUED.id()))