Watcher: Ensure watcher service calls are properly caught (elastic/x-pack-elasticsearch#3906)

In order to prevent exceptions to bubble up to the thread pool exception
handler, this properly wraps all the calls for the watcher service
within an executor into an AbstractRunnable to catch and log a possible
exception.

relates elastic/x-pack-elasticsearch#3854

Original commit: elastic/x-pack-elasticsearch@c0b39e6b5b
This commit is contained in:
Alexander Reelsen 2018-02-14 10:45:29 +01:00 committed by GitHub
parent 48847720e8
commit 742c7001c9
2 changed files with 154 additions and 5 deletions

View File

@ -21,6 +21,7 @@ import org.elasticsearch.common.component.LifecycleListener;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.upgrade.UpgradeField;
@ -35,6 +36,7 @@ import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import static org.elasticsearch.cluster.routing.ShardRoutingState.RELOCATING;
@ -200,7 +202,8 @@ public class WatcherLifeCycleService extends AbstractComponent implements Cluste
IndexMetaData watcherIndexMetaData = WatchStoreUtils.getConcreteIndex(Watch.INDEX, state.metaData());
if (watcherIndexMetaData == null) {
if (clearAllocationIds() && callWatcherService) {
executor.execute(() -> watcherService.pauseExecution("no watcher index found"));
executor.execute(wrapWatcherService(() -> watcherService.pauseExecution("no watcher index found"),
e -> logger.error("error pausing watch execution", e)));
}
return;
}
@ -210,7 +213,9 @@ public class WatcherLifeCycleService extends AbstractComponent implements Cluste
// this can happen if the node does not hold any data
if (routingNode == null) {
if (clearAllocationIds() && callWatcherService) {
executor.execute(() -> watcherService.pauseExecution("no routing node for local node found, network issue?"));
executor.execute(wrapWatcherService(
() -> watcherService.pauseExecution("no routing node for local node found, network issue?"),
e -> logger.error("error pausing watch execution", e)));
}
return;
}
@ -220,7 +225,8 @@ public class WatcherLifeCycleService extends AbstractComponent implements Cluste
// no local shards, empty out watcher and dont waste resources!
if (localShards.isEmpty()) {
if (clearAllocationIds() && callWatcherService) {
executor.execute(() -> watcherService.pauseExecution("no local watcher shards found"));
executor.execute(wrapWatcherService(() -> watcherService.pauseExecution("no local watcher shards found"),
e -> logger.error("error pausing watch execution", e)));
}
return;
}
@ -234,7 +240,8 @@ public class WatcherLifeCycleService extends AbstractComponent implements Cluste
if (previousAllocationIds.get().equals(currentAllocationIds) == false) {
previousAllocationIds.set(Collections.unmodifiableList(currentAllocationIds));
if (callWatcherService) {
executor.execute(() -> watcherService.reload(state, "new local watcher shard allocation ids"));
executor.execute(wrapWatcherService(() -> watcherService.reload(state, "new local watcher shard allocation ids"),
e -> logger.error("error reloading watcher", e)));
}
}
}
@ -252,4 +259,27 @@ public class WatcherLifeCycleService extends AbstractComponent implements Cluste
List<String> allocationIds() {
return previousAllocationIds.get();
}
/**
* Wraps an abstract runnable to easier supply onFailure and doRun methods via lambdas
* This ensures that the uncaught exception handler in the executing threadpool does not get called
*
* @param run The code to be executed in the runnable
* @param exceptionConsumer The exception handling code to be executed, if the runnable fails
* @return The AbstractRunnable instance to pass to the executor
*/
private static AbstractRunnable wrapWatcherService(Runnable run, Consumer<Exception> exceptionConsumer) {
return new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
exceptionConsumer.accept(e);
}
@Override
protected void doRun() throws Exception {
run.run();
}
};
}
}

View File

@ -5,6 +5,8 @@
*/
package org.elasticsearch.xpack.watcher;
import com.carrotsearch.randomizedtesting.annotations.Repeat;
import org.elasticsearch.ElasticsearchSecurityException;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
import org.elasticsearch.cluster.ClusterChangedEvent;
@ -20,7 +22,6 @@ import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingHelper;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.TestShardRouting;
import org.elasticsearch.cluster.service.ClusterService;
@ -577,6 +578,124 @@ public class WatcherLifeCycleServiceTests extends ESTestCase {
assertThat(lifeCycleService.allocationIds(), hasSize(1));
}
public void testWatcherServiceExceptionsAreCaught() {
Index index = new Index(Watch.INDEX, "foo");
IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder(index);
indexRoutingTableBuilder.addShard(
TestShardRouting.newShardRouting(Watch.INDEX, 0, "node_1", true, ShardRoutingState.STARTED));
IndexMetaData indexMetaData = IndexMetaData.builder(Watch.INDEX).settings(settings(Version.CURRENT)
.put(IndexMetaData.INDEX_FORMAT_SETTING.getKey(), 6)) // the internal index format, required
.numberOfShards(1).numberOfReplicas(0).build();
// special setup for one of the following cluster states
DiscoveryNodes discoveryNodes = mock(DiscoveryNodes.class);
DiscoveryNode localNode = mock(DiscoveryNode.class);
when(discoveryNodes.getMasterNodeId()).thenReturn("node_1");
when(discoveryNodes.getLocalNode()).thenReturn(localNode);
when(localNode.isDataNode()).thenReturn(true);
when(localNode.getId()).thenReturn("does_not_exist");
ClusterState clusterState = randomFrom(
// cluster state with no watcher index
ClusterState.builder(new ClusterName("my-cluster"))
.nodes(new DiscoveryNodes.Builder().masterNodeId("node_1").localNodeId("node_1").add(newNode("node_1")))
.metaData(MetaData.builder()
.put(IndexTemplateMetaData.builder(HISTORY_TEMPLATE_NAME).patterns(randomIndexPatterns()))
.put(IndexTemplateMetaData.builder(TRIGGERED_TEMPLATE_NAME).patterns(randomIndexPatterns()))
.put(IndexTemplateMetaData.builder(WATCHES_TEMPLATE_NAME).patterns(randomIndexPatterns()))
.build())
.build(),
// cluster state with no routing node
ClusterState.builder(new ClusterName("my-cluster"))
.nodes(discoveryNodes)
.metaData(MetaData.builder()
.put(IndexTemplateMetaData.builder(HISTORY_TEMPLATE_NAME).patterns(randomIndexPatterns()))
.put(IndexTemplateMetaData.builder(TRIGGERED_TEMPLATE_NAME).patterns(randomIndexPatterns()))
.put(IndexTemplateMetaData.builder(WATCHES_TEMPLATE_NAME).patterns(randomIndexPatterns()))
.build())
.build(),
// cluster state with no local shards
ClusterState.builder(new ClusterName("my-cluster"))
.nodes(new DiscoveryNodes.Builder().masterNodeId("node_1").localNodeId("node_1").add(newNode("node_1")))
.metaData(MetaData.builder()
.put(IndexTemplateMetaData.builder(HISTORY_TEMPLATE_NAME).patterns(randomIndexPatterns()))
.put(IndexTemplateMetaData.builder(TRIGGERED_TEMPLATE_NAME).patterns(randomIndexPatterns()))
.put(IndexTemplateMetaData.builder(WATCHES_TEMPLATE_NAME).patterns(randomIndexPatterns()))
.put(indexMetaData, true)
.build())
.build()
);
ClusterState stateWithWatcherShards = ClusterState.builder(new ClusterName("my-cluster"))
.nodes(new DiscoveryNodes.Builder().masterNodeId("node_1").localNodeId("node_1")
.add(newNode("node_1")))
.routingTable(RoutingTable.builder().add(indexRoutingTableBuilder.build()).build())
.metaData(MetaData.builder()
.put(IndexTemplateMetaData.builder(HISTORY_TEMPLATE_NAME).patterns(randomIndexPatterns()))
.put(IndexTemplateMetaData.builder(TRIGGERED_TEMPLATE_NAME).patterns(randomIndexPatterns()))
.put(IndexTemplateMetaData.builder(WATCHES_TEMPLATE_NAME).patterns(randomIndexPatterns()))
.put(indexMetaData, true)
.build())
.build();
lifeCycleService.clusterChanged(new ClusterChangedEvent("foo", stateWithWatcherShards, stateWithWatcherShards));
when(watcherService.validate(anyObject())).thenReturn(true);
when(watcherService.state()).thenReturn(WatcherState.STARTED);
doAnswer(invocation -> {
throw new ElasticsearchSecurityException("breakme");
}).when(watcherService).pauseExecution(anyString());
lifeCycleService.clusterChanged(new ClusterChangedEvent("foo", clusterState, stateWithWatcherShards));
verify(watcherService, times(1)).pauseExecution(anyString());
}
public void testWatcherServiceExceptionsAreCaughtOnReload() {
Index index = new Index(Watch.INDEX, "foo");
IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder(index);
indexRoutingTableBuilder.addShard(
TestShardRouting.newShardRouting(Watch.INDEX, 0, "node_1", true, ShardRoutingState.STARTED));
IndexMetaData indexMetaData = IndexMetaData.builder(Watch.INDEX).settings(settings(Version.CURRENT)
.put(IndexMetaData.INDEX_FORMAT_SETTING.getKey(), 6)) // the internal index format, required
.numberOfShards(1).numberOfReplicas(0).build();
// cluster state with different local shards (another shard id)
ClusterState clusterState = ClusterState.builder(new ClusterName("my-cluster"))
.nodes(new DiscoveryNodes.Builder().masterNodeId("node_1").localNodeId("node_1").add(newNode("node_1"))).routingTable(
RoutingTable.builder().add(IndexRoutingTable.builder(index)
.addShard(TestShardRouting.newShardRouting(Watch.INDEX, 1, "node_1", true, ShardRoutingState.STARTED))
.build()).build()).metaData(
MetaData.builder().put(IndexTemplateMetaData.builder(HISTORY_TEMPLATE_NAME).patterns(randomIndexPatterns()))
.put(IndexTemplateMetaData.builder(TRIGGERED_TEMPLATE_NAME).patterns(randomIndexPatterns()))
.put(IndexTemplateMetaData.builder(WATCHES_TEMPLATE_NAME).patterns(randomIndexPatterns()))
.put(indexMetaData, true).build()).build();
ClusterState stateWithWatcherShards = ClusterState.builder(new ClusterName("my-cluster"))
.nodes(new DiscoveryNodes.Builder().masterNodeId("node_1").localNodeId("node_1")
.add(newNode("node_1")))
.routingTable(RoutingTable.builder().add(indexRoutingTableBuilder.build()).build())
.metaData(MetaData.builder()
.put(IndexTemplateMetaData.builder(HISTORY_TEMPLATE_NAME).patterns(randomIndexPatterns()))
.put(IndexTemplateMetaData.builder(TRIGGERED_TEMPLATE_NAME).patterns(randomIndexPatterns()))
.put(IndexTemplateMetaData.builder(WATCHES_TEMPLATE_NAME).patterns(randomIndexPatterns()))
.put(indexMetaData, true)
.build())
.build();
lifeCycleService.clusterChanged(new ClusterChangedEvent("foo", stateWithWatcherShards, stateWithWatcherShards));
when(watcherService.validate(anyObject())).thenReturn(true);
when(watcherService.state()).thenReturn(WatcherState.STARTED);
doAnswer(invocation -> {
throw new ElasticsearchSecurityException("breakme");
}).when(watcherService).reload(eq(clusterState), anyString());
lifeCycleService.clusterChanged(new ClusterChangedEvent("foo", clusterState, stateWithWatcherShards));
verify(watcherService, times(1)).reload(eq(clusterState), anyString());
}
private List<String> randomIndexPatterns() {
return IntStream.range(0, between(1, 10))
.mapToObj(n -> randomAlphaOfLengthBetween(1, 100))