From 742c7001c96dda6fba2144d417e7acaba274983f Mon Sep 17 00:00:00 2001 From: Alexander Reelsen Date: Wed, 14 Feb 2018 10:45:29 +0100 Subject: [PATCH] Watcher: Ensure watcher service calls are properly caught (elastic/x-pack-elasticsearch#3906) In order to prevent exceptions to bubble up to the thread pool exception handler, this properly wraps all the calls for the watcher service within an executor into an AbstractRunnable to catch and log a possible exception. relates elastic/x-pack-elasticsearch#3854 Original commit: elastic/x-pack-elasticsearch@c0b39e6b5ba16901f6c294d5cd036d25bc902aaf --- .../watcher/WatcherLifeCycleService.java | 38 +++++- .../watcher/WatcherLifeCycleServiceTests.java | 121 +++++++++++++++++- 2 files changed, 154 insertions(+), 5 deletions(-) diff --git a/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleService.java b/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleService.java index c72b4c740b1..3612dca42a5 100644 --- a/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleService.java +++ b/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleService.java @@ -21,6 +21,7 @@ import org.elasticsearch.common.component.LifecycleListener; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Setting.Property; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.gateway.GatewayService; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.upgrade.UpgradeField; @@ -35,6 +36,7 @@ import java.util.Collections; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; import java.util.stream.Collectors; import static org.elasticsearch.cluster.routing.ShardRoutingState.RELOCATING; @@ -200,7 +202,8 @@ public class WatcherLifeCycleService extends AbstractComponent implements Cluste IndexMetaData watcherIndexMetaData = WatchStoreUtils.getConcreteIndex(Watch.INDEX, state.metaData()); if (watcherIndexMetaData == null) { if (clearAllocationIds() && callWatcherService) { - executor.execute(() -> watcherService.pauseExecution("no watcher index found")); + executor.execute(wrapWatcherService(() -> watcherService.pauseExecution("no watcher index found"), + e -> logger.error("error pausing watch execution", e))); } return; } @@ -210,7 +213,9 @@ public class WatcherLifeCycleService extends AbstractComponent implements Cluste // this can happen if the node does not hold any data if (routingNode == null) { if (clearAllocationIds() && callWatcherService) { - executor.execute(() -> watcherService.pauseExecution("no routing node for local node found, network issue?")); + executor.execute(wrapWatcherService( + () -> watcherService.pauseExecution("no routing node for local node found, network issue?"), + e -> logger.error("error pausing watch execution", e))); } return; } @@ -220,7 +225,8 @@ public class WatcherLifeCycleService extends AbstractComponent implements Cluste // no local shards, empty out watcher and dont waste resources! if (localShards.isEmpty()) { if (clearAllocationIds() && callWatcherService) { - executor.execute(() -> watcherService.pauseExecution("no local watcher shards found")); + executor.execute(wrapWatcherService(() -> watcherService.pauseExecution("no local watcher shards found"), + e -> logger.error("error pausing watch execution", e))); } return; } @@ -234,7 +240,8 @@ public class WatcherLifeCycleService extends AbstractComponent implements Cluste if (previousAllocationIds.get().equals(currentAllocationIds) == false) { previousAllocationIds.set(Collections.unmodifiableList(currentAllocationIds)); if (callWatcherService) { - executor.execute(() -> watcherService.reload(state, "new local watcher shard allocation ids")); + executor.execute(wrapWatcherService(() -> watcherService.reload(state, "new local watcher shard allocation ids"), + e -> logger.error("error reloading watcher", e))); } } } @@ -252,4 +259,27 @@ public class WatcherLifeCycleService extends AbstractComponent implements Cluste List allocationIds() { return previousAllocationIds.get(); } + + /** + * Wraps an abstract runnable to easier supply onFailure and doRun methods via lambdas + * This ensures that the uncaught exception handler in the executing threadpool does not get called + * + * @param run The code to be executed in the runnable + * @param exceptionConsumer The exception handling code to be executed, if the runnable fails + * @return The AbstractRunnable instance to pass to the executor + */ + private static AbstractRunnable wrapWatcherService(Runnable run, Consumer exceptionConsumer) { + + return new AbstractRunnable() { + @Override + public void onFailure(Exception e) { + exceptionConsumer.accept(e); + } + + @Override + protected void doRun() throws Exception { + run.run(); + } + }; + } } diff --git a/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleServiceTests.java b/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleServiceTests.java index 639c05a25a6..4e105803ea9 100644 --- a/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleServiceTests.java +++ b/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleServiceTests.java @@ -5,6 +5,8 @@ */ package org.elasticsearch.xpack.watcher; +import com.carrotsearch.randomizedtesting.annotations.Repeat; +import org.elasticsearch.ElasticsearchSecurityException; import org.elasticsearch.Version; import org.elasticsearch.cluster.AckedClusterStateUpdateTask; import org.elasticsearch.cluster.ClusterChangedEvent; @@ -20,7 +22,6 @@ import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.cluster.routing.RoutingTable; import org.elasticsearch.cluster.routing.ShardRouting; -import org.elasticsearch.cluster.routing.ShardRoutingHelper; import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.cluster.routing.TestShardRouting; import org.elasticsearch.cluster.service.ClusterService; @@ -577,6 +578,124 @@ public class WatcherLifeCycleServiceTests extends ESTestCase { assertThat(lifeCycleService.allocationIds(), hasSize(1)); } + public void testWatcherServiceExceptionsAreCaught() { + Index index = new Index(Watch.INDEX, "foo"); + IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder(index); + indexRoutingTableBuilder.addShard( + TestShardRouting.newShardRouting(Watch.INDEX, 0, "node_1", true, ShardRoutingState.STARTED)); + IndexMetaData indexMetaData = IndexMetaData.builder(Watch.INDEX).settings(settings(Version.CURRENT) + .put(IndexMetaData.INDEX_FORMAT_SETTING.getKey(), 6)) // the internal index format, required + .numberOfShards(1).numberOfReplicas(0).build(); + + // special setup for one of the following cluster states + DiscoveryNodes discoveryNodes = mock(DiscoveryNodes.class); + DiscoveryNode localNode = mock(DiscoveryNode.class); + when(discoveryNodes.getMasterNodeId()).thenReturn("node_1"); + when(discoveryNodes.getLocalNode()).thenReturn(localNode); + when(localNode.isDataNode()).thenReturn(true); + when(localNode.getId()).thenReturn("does_not_exist"); + + ClusterState clusterState = randomFrom( + // cluster state with no watcher index + ClusterState.builder(new ClusterName("my-cluster")) + .nodes(new DiscoveryNodes.Builder().masterNodeId("node_1").localNodeId("node_1").add(newNode("node_1"))) + .metaData(MetaData.builder() + .put(IndexTemplateMetaData.builder(HISTORY_TEMPLATE_NAME).patterns(randomIndexPatterns())) + .put(IndexTemplateMetaData.builder(TRIGGERED_TEMPLATE_NAME).patterns(randomIndexPatterns())) + .put(IndexTemplateMetaData.builder(WATCHES_TEMPLATE_NAME).patterns(randomIndexPatterns())) + .build()) + .build(), + // cluster state with no routing node + ClusterState.builder(new ClusterName("my-cluster")) + .nodes(discoveryNodes) + .metaData(MetaData.builder() + .put(IndexTemplateMetaData.builder(HISTORY_TEMPLATE_NAME).patterns(randomIndexPatterns())) + .put(IndexTemplateMetaData.builder(TRIGGERED_TEMPLATE_NAME).patterns(randomIndexPatterns())) + .put(IndexTemplateMetaData.builder(WATCHES_TEMPLATE_NAME).patterns(randomIndexPatterns())) + .build()) + .build(), + + // cluster state with no local shards + ClusterState.builder(new ClusterName("my-cluster")) + .nodes(new DiscoveryNodes.Builder().masterNodeId("node_1").localNodeId("node_1").add(newNode("node_1"))) + .metaData(MetaData.builder() + .put(IndexTemplateMetaData.builder(HISTORY_TEMPLATE_NAME).patterns(randomIndexPatterns())) + .put(IndexTemplateMetaData.builder(TRIGGERED_TEMPLATE_NAME).patterns(randomIndexPatterns())) + .put(IndexTemplateMetaData.builder(WATCHES_TEMPLATE_NAME).patterns(randomIndexPatterns())) + .put(indexMetaData, true) + .build()) + .build() + ); + + ClusterState stateWithWatcherShards = ClusterState.builder(new ClusterName("my-cluster")) + .nodes(new DiscoveryNodes.Builder().masterNodeId("node_1").localNodeId("node_1") + .add(newNode("node_1"))) + .routingTable(RoutingTable.builder().add(indexRoutingTableBuilder.build()).build()) + .metaData(MetaData.builder() + .put(IndexTemplateMetaData.builder(HISTORY_TEMPLATE_NAME).patterns(randomIndexPatterns())) + .put(IndexTemplateMetaData.builder(TRIGGERED_TEMPLATE_NAME).patterns(randomIndexPatterns())) + .put(IndexTemplateMetaData.builder(WATCHES_TEMPLATE_NAME).patterns(randomIndexPatterns())) + .put(indexMetaData, true) + .build()) + .build(); + + lifeCycleService.clusterChanged(new ClusterChangedEvent("foo", stateWithWatcherShards, stateWithWatcherShards)); + + when(watcherService.validate(anyObject())).thenReturn(true); + when(watcherService.state()).thenReturn(WatcherState.STARTED); + doAnswer(invocation -> { + throw new ElasticsearchSecurityException("breakme"); + }).when(watcherService).pauseExecution(anyString()); + + lifeCycleService.clusterChanged(new ClusterChangedEvent("foo", clusterState, stateWithWatcherShards)); + verify(watcherService, times(1)).pauseExecution(anyString()); + } + + public void testWatcherServiceExceptionsAreCaughtOnReload() { + Index index = new Index(Watch.INDEX, "foo"); + IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder(index); + indexRoutingTableBuilder.addShard( + TestShardRouting.newShardRouting(Watch.INDEX, 0, "node_1", true, ShardRoutingState.STARTED)); + IndexMetaData indexMetaData = IndexMetaData.builder(Watch.INDEX).settings(settings(Version.CURRENT) + .put(IndexMetaData.INDEX_FORMAT_SETTING.getKey(), 6)) // the internal index format, required + .numberOfShards(1).numberOfReplicas(0).build(); + + // cluster state with different local shards (another shard id) + ClusterState clusterState = ClusterState.builder(new ClusterName("my-cluster")) + .nodes(new DiscoveryNodes.Builder().masterNodeId("node_1").localNodeId("node_1").add(newNode("node_1"))).routingTable( + RoutingTable.builder().add(IndexRoutingTable.builder(index) + .addShard(TestShardRouting.newShardRouting(Watch.INDEX, 1, "node_1", true, ShardRoutingState.STARTED)) + .build()).build()).metaData( + MetaData.builder().put(IndexTemplateMetaData.builder(HISTORY_TEMPLATE_NAME).patterns(randomIndexPatterns())) + .put(IndexTemplateMetaData.builder(TRIGGERED_TEMPLATE_NAME).patterns(randomIndexPatterns())) + .put(IndexTemplateMetaData.builder(WATCHES_TEMPLATE_NAME).patterns(randomIndexPatterns())) + .put(indexMetaData, true).build()).build(); + + ClusterState stateWithWatcherShards = ClusterState.builder(new ClusterName("my-cluster")) + .nodes(new DiscoveryNodes.Builder().masterNodeId("node_1").localNodeId("node_1") + .add(newNode("node_1"))) + .routingTable(RoutingTable.builder().add(indexRoutingTableBuilder.build()).build()) + .metaData(MetaData.builder() + .put(IndexTemplateMetaData.builder(HISTORY_TEMPLATE_NAME).patterns(randomIndexPatterns())) + .put(IndexTemplateMetaData.builder(TRIGGERED_TEMPLATE_NAME).patterns(randomIndexPatterns())) + .put(IndexTemplateMetaData.builder(WATCHES_TEMPLATE_NAME).patterns(randomIndexPatterns())) + .put(indexMetaData, true) + .build()) + .build(); + + lifeCycleService.clusterChanged(new ClusterChangedEvent("foo", stateWithWatcherShards, stateWithWatcherShards)); + + when(watcherService.validate(anyObject())).thenReturn(true); + when(watcherService.state()).thenReturn(WatcherState.STARTED); + doAnswer(invocation -> { + throw new ElasticsearchSecurityException("breakme"); + }).when(watcherService).reload(eq(clusterState), anyString()); + + lifeCycleService.clusterChanged(new ClusterChangedEvent("foo", clusterState, stateWithWatcherShards)); + verify(watcherService, times(1)).reload(eq(clusterState), anyString()); + + } + private List randomIndexPatterns() { return IntStream.range(0, between(1, 10)) .mapToObj(n -> randomAlphaOfLengthBetween(1, 100))