diff --git a/plugin/src/main/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleService.java b/plugin/src/main/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleService.java index 12b54882edd..443511013ea 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleService.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleService.java @@ -25,6 +25,7 @@ import org.elasticsearch.xpack.watcher.watch.WatchStoreUtils; import java.util.Collections; import java.util.List; import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import static org.elasticsearch.cluster.routing.ShardRoutingState.RELOCATING; @@ -35,6 +36,7 @@ public class WatcherLifeCycleService extends AbstractComponent implements Cluste private final WatcherService watcherService; private final ClusterService clusterService; private final ExecutorService executor; + private AtomicReference> previousAllocationIds = new AtomicReference<>(Collections.emptyList()); private volatile WatcherMetaData watcherMetaData; public WatcherLifeCycleService(Settings settings, ThreadPool threadPool, ClusterService clusterService, @@ -119,11 +121,10 @@ public class WatcherLifeCycleService extends AbstractComponent implements Cluste if (watcherService.state() == WatcherState.STARTED && event.state().nodes().getLocalNode().isDataNode()) { DiscoveryNode localNode = event.state().nodes().getLocalNode(); RoutingNode routingNode = event.state().getRoutingNodes().node(localNode.getId()); - IndexMetaData watcherIndexMetaData = WatchStoreUtils.getConcreteIndex(Watch.INDEX, - event.state().metaData()); + IndexMetaData watcherIndexMetaData = WatchStoreUtils.getConcreteIndex(Watch.INDEX, event.state().metaData()); // no watcher index, time to pause if (watcherIndexMetaData == null) { - executor.execute(watcherService::pauseExecution); + executor.execute(() -> watcherService.pauseExecution("no watcher index found")); return; } @@ -132,30 +133,20 @@ public class WatcherLifeCycleService extends AbstractComponent implements Cluste // no local shards, empty out watcher and not waste resources! if (localShards.isEmpty()) { - executor.execute(watcherService::pauseExecution); + executor.execute(() -> watcherService.pauseExecution("no local watcher shards")); + previousAllocationIds.set(Collections.emptyList()); return; } - List previousAllocationIds = event.previousState().getRoutingNodes().node(localNode.getId()) - .shardsWithState(watchIndex, RELOCATING, STARTED).stream() - .map(ShardRouting::allocationId) - .map(AllocationId::getId) - .collect(Collectors.toList()); - List currentAllocationIds = localShards.stream() .map(ShardRouting::allocationId) .map(AllocationId::getId) .collect(Collectors.toList()); + Collections.sort(currentAllocationIds); - if (previousAllocationIds.size() == currentAllocationIds.size()) { - // make sure we can compare the created list of allocation ids - Collections.sort(previousAllocationIds); - Collections.sort(currentAllocationIds); - if (previousAllocationIds.equals(currentAllocationIds) == false) { - executor.execute(() -> watcherService.reload(event.state())); - } - } else { - executor.execute(() -> watcherService.reload(event.state())); + if (previousAllocationIds.get().equals(currentAllocationIds) == false) { + previousAllocationIds.set(currentAllocationIds); + executor.execute(() -> watcherService.reload(event.state(), "different shard allocation ids")); } } else if (watcherService.state() != WatcherState.STARTED && watcherService.state() != WatcherState.STARTING) { executor.execute(() -> start(event.state(), false)); diff --git a/plugin/src/main/java/org/elasticsearch/xpack/watcher/WatcherService.java b/plugin/src/main/java/org/elasticsearch/xpack/watcher/WatcherService.java index 085985a9fe2..ebd7442811a 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/watcher/WatcherService.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/watcher/WatcherService.java @@ -165,8 +165,8 @@ public class WatcherService extends AbstractComponent { * Reload the watcher service, does not switch the state from stopped to started, just keep going * @param clusterState cluster state, which is needed to find out about local shards */ - public void reload(ClusterState clusterState) { - pauseExecution(); + public void reload(ClusterState clusterState, String reason) { + pauseExecution(reason); // load watches Collection watches = loadWatches(clusterState); @@ -183,9 +183,10 @@ public class WatcherService extends AbstractComponent { * Stop execution of watches on this node, do not try to reload anything, but still allow * manual watch execution, i.e. via the execute watch API */ - public void pauseExecution() { - executionService.pauseExecution(); + public void pauseExecution(String reason) { + int cancelledTaskCount = executionService.pauseExecution(); triggerService.pauseExecution(); + logger.debug("paused execution service, reason [{}], cancelled [{}] queued tasks", reason, cancelledTaskCount); } /** diff --git a/plugin/src/main/java/org/elasticsearch/xpack/watcher/execution/ExecutionService.java b/plugin/src/main/java/org/elasticsearch/xpack/watcher/execution/ExecutionService.java index b108b909a19..83325f65063 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/watcher/execution/ExecutionService.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/watcher/execution/ExecutionService.java @@ -134,10 +134,14 @@ public class ExecutionService extends AbstractComponent { } } - public synchronized void pauseExecution() { + /** + * Pause the execution of the watcher executor + * @return the number of tasks that have been removed + */ + public synchronized int pauseExecution() { int cancelledTaskCount = executor.queue().drainTo(new ArrayList<>()); this.clearExecutions(); - logger.debug("paused execution service, cancelled [{}] queued tasks", cancelledTaskCount); + return cancelledTaskCount; } public TimeValue defaultThrottlePeriod() { diff --git a/plugin/src/test/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleServiceTests.java b/plugin/src/test/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleServiceTests.java index b129929cae8..1c5c1cbed39 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleServiceTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleServiceTests.java @@ -40,6 +40,7 @@ import static java.util.Arrays.asList; import static org.elasticsearch.cluster.routing.ShardRoutingState.RELOCATING; import static org.elasticsearch.cluster.routing.ShardRoutingState.STARTED; import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyObject; import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.doAnswer; @@ -187,15 +188,24 @@ public class WatcherLifeCycleServiceTests extends ESTestCase { DiscoveryNodes nodes = new DiscoveryNodes.Builder().masterNodeId("node_1").localNodeId("node_1") .add(newNode("node_1")).add(newNode("node_2")) .build(); + IndexMetaData indexMetaData = IndexMetaData.builder(Watch.INDEX) + .settings(Settings.builder() + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0) + .put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) + ).build(); + ClusterState clusterState = ClusterState.builder(new ClusterName("my-cluster")) .nodes(nodes) .routingTable(RoutingTable.builder().add(watchRoutingTable).build()) + .metaData(MetaData.builder().put(indexMetaData, false)) + .build(); when(watcherService.state()).thenReturn(WatcherState.STARTED); lifeCycleService.clusterChanged(new ClusterChangedEvent("any", clusterState, clusterState)); - verify(watcherService).pauseExecution(); + verify(watcherService).pauseExecution(eq("no local watcher shards")); } public void testReplicaWasAddedOrRemoved() throws Exception { @@ -251,7 +261,7 @@ public class WatcherLifeCycleServiceTests extends ESTestCase { when(watcherService.state()).thenReturn(WatcherState.STARTED); lifeCycleService.clusterChanged(event); - verify(watcherService).reload(eq(usedClusterState)); + verify(watcherService).reload(eq(usedClusterState), anyString()); } // make sure that cluster state changes can be processed on nodes that do not hold data @@ -300,8 +310,8 @@ public class WatcherLifeCycleServiceTests extends ESTestCase { when(watcherService.state()).thenReturn(WatcherState.STARTED); lifeCycleService.clusterChanged(new ClusterChangedEvent("any", currentState, previousState)); - verify(watcherService, times(0)).pauseExecution(); - verify(watcherService, times(0)).reload(any()); + verify(watcherService, times(0)).pauseExecution(anyObject()); + verify(watcherService, times(0)).reload(any(), any()); } private static DiscoveryNode newNode(String nodeName) { diff --git a/plugin/src/test/java/org/elasticsearch/xpack/watcher/test/integration/BootStrapTests.java b/plugin/src/test/java/org/elasticsearch/xpack/watcher/test/integration/BootStrapTests.java index c58e534e346..bf0bb491c92 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/watcher/test/integration/BootStrapTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/watcher/test/integration/BootStrapTests.java @@ -231,7 +231,6 @@ public class BootStrapTests extends AbstractWatcherIntegrationTestCase { assertSingleExecutionAndCompleteWatchHistory(numWatches, numRecords); } - @AwaitsFix(bugUrl = "https://github.com/elastic/x-pack-elasticsearch/issues/1309") public void testTriggeredWatchLoading() throws Exception { createIndex("output"); client().prepareIndex("my-index", "foo", "bar") @@ -255,8 +254,7 @@ public class BootStrapTests extends AbstractWatcherIntegrationTestCase { stopWatcher(); DateTime now = DateTime.now(UTC); -// final int numRecords = scaledRandomIntBetween(2, 12); - final int numRecords = 10; + final int numRecords = scaledRandomIntBetween(2, 12); BulkRequestBuilder bulkRequestBuilder = client().prepareBulk(); for (int i = 0; i < numRecords; i++) { now = now.plusMinutes(1); @@ -291,8 +289,7 @@ public class BootStrapTests extends AbstractWatcherIntegrationTestCase { long successfulWatchExecutions = searchResponse.getHits().getTotalHits(); // the watch history should contain entries for each triggered watch, which a few have been marked as not executed - SearchResponse historySearchResponse = client().prepareSearch(HistoryStore.INDEX_PREFIX + "*") - .setSize(10000).get(); + SearchResponse historySearchResponse = client().prepareSearch(HistoryStore.INDEX_PREFIX + "*").setSize(10000).get(); assertHitCount(historySearchResponse, expectedWatchHistoryCount); long notExecutedCount = Arrays.stream(historySearchResponse.getHits().getHits()) .filter(hit -> hit.getSourceAsMap().get("state").equals(ExecutionState.NOT_EXECUTED_ALREADY_QUEUED.id()))