From ac920e8e645cdcc4320f79d7c11614e4da8ffe62 Mon Sep 17 00:00:00 2001 From: David Turner Date: Wed, 25 Sep 2019 12:31:36 +0100 Subject: [PATCH] Assert no exceptions during state application (#47090) Today we log and swallow exceptions during cluster state application, but such an exception should not occur. This commit adds assertions of this fact, and updates the Javadocs to explain it. Relates #47038 --- .../cluster/ClusterStateApplier.java | 6 +++++- .../cluster/service/ClusterApplierService.java | 9 ++++++++- .../common/settings/AbstractScopedSettings.java | 1 - .../cluster/IndicesClusterStateService.java | 15 ++++++++++++--- .../cluster/coordination/CoordinatorTests.java | 2 ++ .../service/ClusterApplierServiceTests.java | 12 ++++++++++++ .../coordination/AbstractCoordinatorTestCase.java | 14 ++++++++++++++ 7 files changed, 53 insertions(+), 6 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/ClusterStateApplier.java b/server/src/main/java/org/elasticsearch/cluster/ClusterStateApplier.java index c339a8ed97e..ad983f43bf2 100644 --- a/server/src/main/java/org/elasticsearch/cluster/ClusterStateApplier.java +++ b/server/src/main/java/org/elasticsearch/cluster/ClusterStateApplier.java @@ -28,7 +28,11 @@ import org.elasticsearch.cluster.service.ClusterService; public interface ClusterStateApplier { /** - * Called when a new cluster state ({@link ClusterChangedEvent#state()} needs to be applied + * Called when a new cluster state ({@link ClusterChangedEvent#state()} needs to be applied. The cluster state to be applied is already + * committed when this method is called, so an applier must therefore be prepared to deal with any state it receives without throwing + * an exception. Throwing an exception from an applier is very bad because it will stop the application of this state before it has + * reached all the other appliers, and will likely result in another attempt to apply the same (or very similar) cluster state which + * might continue until this node is removed from the cluster. */ void applyClusterState(ClusterChangedEvent event); } diff --git a/server/src/main/java/org/elasticsearch/cluster/service/ClusterApplierService.java b/server/src/main/java/org/elasticsearch/cluster/service/ClusterApplierService.java index eb41e710cca..f5bbe2d420b 100644 --- a/server/src/main/java/org/elasticsearch/cluster/service/ClusterApplierService.java +++ b/server/src/main/java/org/elasticsearch/cluster/service/ClusterApplierService.java @@ -390,7 +390,7 @@ public class ClusterApplierService extends AbstractLifecycleComponent implements return true; } - protected void runTask(UpdateTask task) { + private void runTask(UpdateTask task) { if (!lifecycle.started()) { logger.debug("processing [{}]: ignoring, cluster applier service not started", task.source); return; @@ -447,6 +447,9 @@ public class ClusterApplierService extends AbstractLifecycleComponent implements "failed to apply updated cluster state in [{}]:\nversion [{}], uuid [{}], source [{}]", executionTime, newClusterState.version(), newClusterState.stateUUID(), task.source), e); } + // failing to apply a cluster state with an exception indicates a bug in validation or in one of the appliers; if we + // continue we will retry with the same cluster state but that might not help. + assert applicationMayFail(); task.listener.onFailure(task.source, e); } } @@ -667,4 +670,8 @@ public class ClusterApplierService extends AbstractLifecycleComponent implements return threadPool.relativeTimeInMillis(); } + // overridden by tests that need to check behaviour in the event of an application failure + protected boolean applicationMayFail() { + return false; + } } diff --git a/server/src/main/java/org/elasticsearch/common/settings/AbstractScopedSettings.java b/server/src/main/java/org/elasticsearch/common/settings/AbstractScopedSettings.java index 9d3e278e889..9db52b9eb93 100644 --- a/server/src/main/java/org/elasticsearch/common/settings/AbstractScopedSettings.java +++ b/server/src/main/java/org/elasticsearch/common/settings/AbstractScopedSettings.java @@ -193,7 +193,6 @@ public abstract class AbstractScopedSettings { } catch (Exception ex) { logger.warn("failed to apply settings", ex); throw ex; - } finally { } return lastSettingsApplied = newSettings; } diff --git a/server/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java b/server/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java index 11b7f03da10..21cf49a949d 100644 --- a/server/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java +++ b/server/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java @@ -532,8 +532,17 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple final IndexMetaData newIndexMetaData = state.metaData().index(index); assert newIndexMetaData != null : "index " + index + " should have been removed by deleteIndices"; if (ClusterChangedEvent.indexMetaDataChanged(currentIndexMetaData, newIndexMetaData)) { - indexService.updateMetaData(currentIndexMetaData, newIndexMetaData); + String reason = null; try { + reason = "metadata update failed"; + try { + indexService.updateMetaData(currentIndexMetaData, newIndexMetaData); + } catch (Exception e) { + assert false : e; + throw e; + } + + reason = "mapping update failed"; if (indexService.updateMapping(currentIndexMetaData, newIndexMetaData) && sendRefreshMapping) { nodeMappingRefreshAction.nodeMappingRefresh(state.nodes().getMasterNode(), new NodeMappingRefreshAction.NodeMappingRefreshRequest(newIndexMetaData.getIndex().getName(), @@ -541,14 +550,14 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple ); } } catch (Exception e) { - indicesService.removeIndex(indexService.index(), FAILURE, "removing index (mapping update failed)"); + indicesService.removeIndex(indexService.index(), FAILURE, "removing index (" + reason + ")"); // fail shards that would be created or updated by createOrUpdateShards RoutingNode localRoutingNode = state.getRoutingNodes().node(state.nodes().getLocalNodeId()); if (localRoutingNode != null) { for (final ShardRouting shardRouting : localRoutingNode) { if (shardRouting.index().equals(index) && failedShardsCache.containsKey(shardRouting.shardId()) == false) { - sendFailShard(shardRouting, "failed to update mapping for index", e, state); + sendFailShard(shardRouting, "failed to update index (" + reason + ")", e, state); } } } diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java index f968f6f4742..1735c8ba033 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java @@ -586,6 +586,7 @@ public class CoordinatorTests extends AbstractCoordinatorTestCase { final ClusterNode follower0 = cluster.getAnyNodeExcept(leader); final ClusterNode follower1 = cluster.getAnyNodeExcept(leader, follower0); + follower0.allowClusterStateApplicationFailure(); follower0.setClusterStateApplyResponse(ClusterStateApplyResponse.FAIL); AckCollector ackCollector = leader.submitValue(randomLong()); cluster.stabilise(DEFAULT_CLUSTER_STATE_UPDATE_DELAY); @@ -605,6 +606,7 @@ public class CoordinatorTests extends AbstractCoordinatorTestCase { final ClusterNode follower1 = cluster.getAnyNodeExcept(leader, follower0); final long startingTerm = leader.coordinator.getCurrentTerm(); + leader.allowClusterStateApplicationFailure(); leader.setClusterStateApplyResponse(ClusterStateApplyResponse.FAIL); AckCollector ackCollector = leader.submitValue(randomLong()); cluster.runFor(DEFAULT_CLUSTER_STATE_UPDATE_DELAY, "committing value"); diff --git a/server/src/test/java/org/elasticsearch/cluster/service/ClusterApplierServiceTests.java b/server/src/test/java/org/elasticsearch/cluster/service/ClusterApplierServiceTests.java index 4da5de7941e..a851cb7069e 100644 --- a/server/src/test/java/org/elasticsearch/cluster/service/ClusterApplierServiceTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/service/ClusterApplierServiceTests.java @@ -358,6 +358,7 @@ public class ClusterApplierServiceTests extends ESTestCase { clusterApplierService.addStateApplier(event -> { throw new RuntimeException("dummy exception"); }); + clusterApplierService.allowClusterStateApplicationFailure(); CountDownLatch latch = new CountDownLatch(1); clusterApplierService.onNewClusterState("test", () -> ClusterState.builder(clusterApplierService.state()).build(), @@ -386,6 +387,7 @@ public class ClusterApplierServiceTests extends ESTestCase { AtomicReference error = new AtomicReference<>(); clusterApplierService.clusterSettings.addSettingsUpdateConsumer(EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING, v -> {}); + clusterApplierService.allowClusterStateApplicationFailure(); CountDownLatch latch = new CountDownLatch(1); clusterApplierService.onNewClusterState("test", () -> ClusterState.builder(clusterApplierService.state()) @@ -496,6 +498,7 @@ public class ClusterApplierServiceTests extends ESTestCase { final ClusterSettings clusterSettings; volatile Long currentTimeOverride = null; + boolean applicationMayFail; TimedClusterApplierService(Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool) { super("test_node", settings, clusterSettings, threadPool); @@ -509,6 +512,15 @@ public class ClusterApplierServiceTests extends ESTestCase { } return super.currentTimeInMillis(); } + + @Override + protected boolean applicationMayFail() { + return this.applicationMayFail; + } + + void allowClusterStateApplicationFailure() { + this.applicationMayFail = true; + } } } diff --git a/test/framework/src/main/java/org/elasticsearch/cluster/coordination/AbstractCoordinatorTestCase.java b/test/framework/src/main/java/org/elasticsearch/cluster/coordination/AbstractCoordinatorTestCase.java index 102de69cc43..f660b22428a 100644 --- a/test/framework/src/main/java/org/elasticsearch/cluster/coordination/AbstractCoordinatorTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/cluster/coordination/AbstractCoordinatorTestCase.java @@ -1172,6 +1172,10 @@ public class AbstractCoordinatorTestCase extends ESTestCase { private boolean isNotUsefullyBootstrapped() { return getLocalNode().isMasterNode() == false || coordinator.isInitialConfigurationSet() == false; } + + void allowClusterStateApplicationFailure() { + clusterApplierService.allowClusterStateApplicationFailure(); + } } private List provideSeedHosts(SeedHostsProvider.HostsResolver ignored) { @@ -1282,6 +1286,7 @@ public class AbstractCoordinatorTestCase extends ESTestCase { private final String nodeName; private final DeterministicTaskQueue deterministicTaskQueue; ClusterStateApplyResponse clusterStateApplyResponse = ClusterStateApplyResponse.SUCCEED; + private boolean applicationMayFail; DisruptableClusterApplierService(String nodeName, Settings settings, ClusterSettings clusterSettings, DeterministicTaskQueue deterministicTaskQueue, Function runnableWrapper) { @@ -1326,6 +1331,15 @@ public class AbstractCoordinatorTestCase extends ESTestCase { protected void connectToNodesAndWait(ClusterState newClusterState) { // don't do anything, and don't block } + + @Override + protected boolean applicationMayFail() { + return this.applicationMayFail; + } + + void allowClusterStateApplicationFailure() { + this.applicationMayFail = true; + } } protected DiscoveryNode createDiscoveryNode(int nodeIndex, boolean masterEligible) {