diff --git a/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleService.java b/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleService.java index c72b4c740b1..3612dca42a5 100644 --- a/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleService.java +++ b/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleService.java @@ -21,6 +21,7 @@ import org.elasticsearch.common.component.LifecycleListener; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Setting.Property; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.gateway.GatewayService; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.upgrade.UpgradeField; @@ -35,6 +36,7 @@ import java.util.Collections; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; import java.util.stream.Collectors; import static org.elasticsearch.cluster.routing.ShardRoutingState.RELOCATING; @@ -200,7 +202,8 @@ public class WatcherLifeCycleService extends AbstractComponent implements Cluste IndexMetaData watcherIndexMetaData = WatchStoreUtils.getConcreteIndex(Watch.INDEX, state.metaData()); if (watcherIndexMetaData == null) { if (clearAllocationIds() && callWatcherService) { - executor.execute(() -> watcherService.pauseExecution("no watcher index found")); + executor.execute(wrapWatcherService(() -> watcherService.pauseExecution("no watcher index found"), + e -> logger.error("error pausing watch execution", e))); } return; } @@ -210,7 +213,9 @@ public class WatcherLifeCycleService extends AbstractComponent implements Cluste // this can happen if the node does not hold any data if (routingNode == null) { if (clearAllocationIds() && callWatcherService) { - executor.execute(() -> watcherService.pauseExecution("no routing node for local node found, network issue?")); + executor.execute(wrapWatcherService( + () -> watcherService.pauseExecution("no routing node for local node found, network issue?"), + e -> logger.error("error pausing watch execution", e))); } return; } @@ -220,7 +225,8 @@ public class WatcherLifeCycleService extends AbstractComponent implements Cluste // no local shards, empty out watcher and dont waste resources! if (localShards.isEmpty()) { if (clearAllocationIds() && callWatcherService) { - executor.execute(() -> watcherService.pauseExecution("no local watcher shards found")); + executor.execute(wrapWatcherService(() -> watcherService.pauseExecution("no local watcher shards found"), + e -> logger.error("error pausing watch execution", e))); } return; } @@ -234,7 +240,8 @@ public class WatcherLifeCycleService extends AbstractComponent implements Cluste if (previousAllocationIds.get().equals(currentAllocationIds) == false) { previousAllocationIds.set(Collections.unmodifiableList(currentAllocationIds)); if (callWatcherService) { - executor.execute(() -> watcherService.reload(state, "new local watcher shard allocation ids")); + executor.execute(wrapWatcherService(() -> watcherService.reload(state, "new local watcher shard allocation ids"), + e -> logger.error("error reloading watcher", e))); } } } @@ -252,4 +259,27 @@ public class WatcherLifeCycleService extends AbstractComponent implements Cluste List allocationIds() { return previousAllocationIds.get(); } + + /** + * Wraps an abstract runnable to easier supply onFailure and doRun methods via lambdas + * This ensures that the uncaught exception handler in the executing threadpool does not get called + * + * @param run The code to be executed in the runnable + * @param exceptionConsumer The exception handling code to be executed, if the runnable fails + * @return The AbstractRunnable instance to pass to the executor + */ + private static AbstractRunnable wrapWatcherService(Runnable run, Consumer exceptionConsumer) { + + return new AbstractRunnable() { + @Override + public void onFailure(Exception e) { + exceptionConsumer.accept(e); + } + + @Override + protected void doRun() throws Exception { + run.run(); + } + }; + } } diff --git a/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleServiceTests.java b/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleServiceTests.java index 639c05a25a6..4e105803ea9 100644 --- a/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleServiceTests.java +++ b/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleServiceTests.java @@ -5,6 +5,8 @@ */ package org.elasticsearch.xpack.watcher; +import com.carrotsearch.randomizedtesting.annotations.Repeat; +import org.elasticsearch.ElasticsearchSecurityException; import org.elasticsearch.Version; import org.elasticsearch.cluster.AckedClusterStateUpdateTask; import org.elasticsearch.cluster.ClusterChangedEvent; @@ -20,7 +22,6 @@ import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.cluster.routing.RoutingTable; import org.elasticsearch.cluster.routing.ShardRouting; -import org.elasticsearch.cluster.routing.ShardRoutingHelper; import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.cluster.routing.TestShardRouting; import org.elasticsearch.cluster.service.ClusterService; @@ -577,6 +578,124 @@ public class WatcherLifeCycleServiceTests extends ESTestCase { assertThat(lifeCycleService.allocationIds(), hasSize(1)); } + public void testWatcherServiceExceptionsAreCaught() { + Index index = new Index(Watch.INDEX, "foo"); + IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder(index); + indexRoutingTableBuilder.addShard( + TestShardRouting.newShardRouting(Watch.INDEX, 0, "node_1", true, ShardRoutingState.STARTED)); + IndexMetaData indexMetaData = IndexMetaData.builder(Watch.INDEX).settings(settings(Version.CURRENT) + .put(IndexMetaData.INDEX_FORMAT_SETTING.getKey(), 6)) // the internal index format, required + .numberOfShards(1).numberOfReplicas(0).build(); + + // special setup for one of the following cluster states + DiscoveryNodes discoveryNodes = mock(DiscoveryNodes.class); + DiscoveryNode localNode = mock(DiscoveryNode.class); + when(discoveryNodes.getMasterNodeId()).thenReturn("node_1"); + when(discoveryNodes.getLocalNode()).thenReturn(localNode); + when(localNode.isDataNode()).thenReturn(true); + when(localNode.getId()).thenReturn("does_not_exist"); + + ClusterState clusterState = randomFrom( + // cluster state with no watcher index + ClusterState.builder(new ClusterName("my-cluster")) + .nodes(new DiscoveryNodes.Builder().masterNodeId("node_1").localNodeId("node_1").add(newNode("node_1"))) + .metaData(MetaData.builder() + .put(IndexTemplateMetaData.builder(HISTORY_TEMPLATE_NAME).patterns(randomIndexPatterns())) + .put(IndexTemplateMetaData.builder(TRIGGERED_TEMPLATE_NAME).patterns(randomIndexPatterns())) + .put(IndexTemplateMetaData.builder(WATCHES_TEMPLATE_NAME).patterns(randomIndexPatterns())) + .build()) + .build(), + // cluster state with no routing node + ClusterState.builder(new ClusterName("my-cluster")) + .nodes(discoveryNodes) + .metaData(MetaData.builder() + .put(IndexTemplateMetaData.builder(HISTORY_TEMPLATE_NAME).patterns(randomIndexPatterns())) + .put(IndexTemplateMetaData.builder(TRIGGERED_TEMPLATE_NAME).patterns(randomIndexPatterns())) + .put(IndexTemplateMetaData.builder(WATCHES_TEMPLATE_NAME).patterns(randomIndexPatterns())) + .build()) + .build(), + + // cluster state with no local shards + ClusterState.builder(new ClusterName("my-cluster")) + .nodes(new DiscoveryNodes.Builder().masterNodeId("node_1").localNodeId("node_1").add(newNode("node_1"))) + .metaData(MetaData.builder() + .put(IndexTemplateMetaData.builder(HISTORY_TEMPLATE_NAME).patterns(randomIndexPatterns())) + .put(IndexTemplateMetaData.builder(TRIGGERED_TEMPLATE_NAME).patterns(randomIndexPatterns())) + .put(IndexTemplateMetaData.builder(WATCHES_TEMPLATE_NAME).patterns(randomIndexPatterns())) + .put(indexMetaData, true) + .build()) + .build() + ); + + ClusterState stateWithWatcherShards = ClusterState.builder(new ClusterName("my-cluster")) + .nodes(new DiscoveryNodes.Builder().masterNodeId("node_1").localNodeId("node_1") + .add(newNode("node_1"))) + .routingTable(RoutingTable.builder().add(indexRoutingTableBuilder.build()).build()) + .metaData(MetaData.builder() + .put(IndexTemplateMetaData.builder(HISTORY_TEMPLATE_NAME).patterns(randomIndexPatterns())) + .put(IndexTemplateMetaData.builder(TRIGGERED_TEMPLATE_NAME).patterns(randomIndexPatterns())) + .put(IndexTemplateMetaData.builder(WATCHES_TEMPLATE_NAME).patterns(randomIndexPatterns())) + .put(indexMetaData, true) + .build()) + .build(); + + lifeCycleService.clusterChanged(new ClusterChangedEvent("foo", stateWithWatcherShards, stateWithWatcherShards)); + + when(watcherService.validate(anyObject())).thenReturn(true); + when(watcherService.state()).thenReturn(WatcherState.STARTED); + doAnswer(invocation -> { + throw new ElasticsearchSecurityException("breakme"); + }).when(watcherService).pauseExecution(anyString()); + + lifeCycleService.clusterChanged(new ClusterChangedEvent("foo", clusterState, stateWithWatcherShards)); + verify(watcherService, times(1)).pauseExecution(anyString()); + } + + public void testWatcherServiceExceptionsAreCaughtOnReload() { + Index index = new Index(Watch.INDEX, "foo"); + IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder(index); + indexRoutingTableBuilder.addShard( + TestShardRouting.newShardRouting(Watch.INDEX, 0, "node_1", true, ShardRoutingState.STARTED)); + IndexMetaData indexMetaData = IndexMetaData.builder(Watch.INDEX).settings(settings(Version.CURRENT) + .put(IndexMetaData.INDEX_FORMAT_SETTING.getKey(), 6)) // the internal index format, required + .numberOfShards(1).numberOfReplicas(0).build(); + + // cluster state with different local shards (another shard id) + ClusterState clusterState = ClusterState.builder(new ClusterName("my-cluster")) + .nodes(new DiscoveryNodes.Builder().masterNodeId("node_1").localNodeId("node_1").add(newNode("node_1"))).routingTable( + RoutingTable.builder().add(IndexRoutingTable.builder(index) + .addShard(TestShardRouting.newShardRouting(Watch.INDEX, 1, "node_1", true, ShardRoutingState.STARTED)) + .build()).build()).metaData( + MetaData.builder().put(IndexTemplateMetaData.builder(HISTORY_TEMPLATE_NAME).patterns(randomIndexPatterns())) + .put(IndexTemplateMetaData.builder(TRIGGERED_TEMPLATE_NAME).patterns(randomIndexPatterns())) + .put(IndexTemplateMetaData.builder(WATCHES_TEMPLATE_NAME).patterns(randomIndexPatterns())) + .put(indexMetaData, true).build()).build(); + + ClusterState stateWithWatcherShards = ClusterState.builder(new ClusterName("my-cluster")) + .nodes(new DiscoveryNodes.Builder().masterNodeId("node_1").localNodeId("node_1") + .add(newNode("node_1"))) + .routingTable(RoutingTable.builder().add(indexRoutingTableBuilder.build()).build()) + .metaData(MetaData.builder() + .put(IndexTemplateMetaData.builder(HISTORY_TEMPLATE_NAME).patterns(randomIndexPatterns())) + .put(IndexTemplateMetaData.builder(TRIGGERED_TEMPLATE_NAME).patterns(randomIndexPatterns())) + .put(IndexTemplateMetaData.builder(WATCHES_TEMPLATE_NAME).patterns(randomIndexPatterns())) + .put(indexMetaData, true) + .build()) + .build(); + + lifeCycleService.clusterChanged(new ClusterChangedEvent("foo", stateWithWatcherShards, stateWithWatcherShards)); + + when(watcherService.validate(anyObject())).thenReturn(true); + when(watcherService.state()).thenReturn(WatcherState.STARTED); + doAnswer(invocation -> { + throw new ElasticsearchSecurityException("breakme"); + }).when(watcherService).reload(eq(clusterState), anyString()); + + lifeCycleService.clusterChanged(new ClusterChangedEvent("foo", clusterState, stateWithWatcherShards)); + verify(watcherService, times(1)).reload(eq(clusterState), anyString()); + + } + private List randomIndexPatterns() { return IntStream.range(0, between(1, 10)) .mapToObj(n -> randomAlphaOfLengthBetween(1, 100))