diff --git a/server/src/main/java/org/elasticsearch/cluster/ClusterStateApplier.java b/server/src/main/java/org/elasticsearch/cluster/ClusterStateApplier.java index c339a8ed97e..ad983f43bf2 100644 --- a/server/src/main/java/org/elasticsearch/cluster/ClusterStateApplier.java +++ b/server/src/main/java/org/elasticsearch/cluster/ClusterStateApplier.java @@ -28,7 +28,11 @@ import org.elasticsearch.cluster.service.ClusterService; public interface ClusterStateApplier { /** - * Called when a new cluster state ({@link ClusterChangedEvent#state()} needs to be applied + * Called when a new cluster state ({@link ClusterChangedEvent#state()} needs to be applied. The cluster state to be applied is already + * committed when this method is called, so an applier must therefore be prepared to deal with any state it receives without throwing + * an exception. Throwing an exception from an applier is very bad because it will stop the application of this state before it has + * reached all the other appliers, and will likely result in another attempt to apply the same (or very similar) cluster state which + * might continue until this node is removed from the cluster. */ void applyClusterState(ClusterChangedEvent event); } diff --git a/server/src/main/java/org/elasticsearch/cluster/service/ClusterApplierService.java b/server/src/main/java/org/elasticsearch/cluster/service/ClusterApplierService.java index eb41e710cca..f5bbe2d420b 100644 --- a/server/src/main/java/org/elasticsearch/cluster/service/ClusterApplierService.java +++ b/server/src/main/java/org/elasticsearch/cluster/service/ClusterApplierService.java @@ -390,7 +390,7 @@ public class ClusterApplierService extends AbstractLifecycleComponent implements return true; } - protected void runTask(UpdateTask task) { + private void runTask(UpdateTask task) { if (!lifecycle.started()) { logger.debug("processing [{}]: ignoring, cluster applier service not started", task.source); return; @@ -447,6 +447,9 @@ public class ClusterApplierService extends AbstractLifecycleComponent implements "failed to apply updated cluster state in [{}]:\nversion [{}], uuid [{}], source [{}]", executionTime, newClusterState.version(), newClusterState.stateUUID(), task.source), e); } + // failing to apply a cluster state with an exception indicates a bug in validation or in one of the appliers; if we + // continue we will retry with the same cluster state but that might not help. + assert applicationMayFail(); task.listener.onFailure(task.source, e); } } @@ -667,4 +670,8 @@ public class ClusterApplierService extends AbstractLifecycleComponent implements return threadPool.relativeTimeInMillis(); } + // overridden by tests that need to check behaviour in the event of an application failure + protected boolean applicationMayFail() { + return false; + } } diff --git a/server/src/main/java/org/elasticsearch/common/settings/AbstractScopedSettings.java b/server/src/main/java/org/elasticsearch/common/settings/AbstractScopedSettings.java index 9d3e278e889..9db52b9eb93 100644 --- a/server/src/main/java/org/elasticsearch/common/settings/AbstractScopedSettings.java +++ b/server/src/main/java/org/elasticsearch/common/settings/AbstractScopedSettings.java @@ -193,7 +193,6 @@ public abstract class AbstractScopedSettings { } catch (Exception ex) { logger.warn("failed to apply settings", ex); throw ex; - } finally { } return lastSettingsApplied = newSettings; } diff --git a/server/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java b/server/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java index 11b7f03da10..21cf49a949d 100644 --- a/server/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java +++ b/server/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java @@ -532,8 +532,17 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple final IndexMetaData newIndexMetaData = state.metaData().index(index); assert newIndexMetaData != null : "index " + index + " should have been removed by deleteIndices"; if (ClusterChangedEvent.indexMetaDataChanged(currentIndexMetaData, newIndexMetaData)) { - indexService.updateMetaData(currentIndexMetaData, newIndexMetaData); + String reason = null; try { + reason = "metadata update failed"; + try { + indexService.updateMetaData(currentIndexMetaData, newIndexMetaData); + } catch (Exception e) { + assert false : e; + throw e; + } + + reason = "mapping update failed"; if (indexService.updateMapping(currentIndexMetaData, newIndexMetaData) && sendRefreshMapping) { nodeMappingRefreshAction.nodeMappingRefresh(state.nodes().getMasterNode(), new NodeMappingRefreshAction.NodeMappingRefreshRequest(newIndexMetaData.getIndex().getName(), @@ -541,14 +550,14 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple ); } } catch (Exception e) { - indicesService.removeIndex(indexService.index(), FAILURE, "removing index (mapping update failed)"); + indicesService.removeIndex(indexService.index(), FAILURE, "removing index (" + reason + ")"); // fail shards that would be created or updated by createOrUpdateShards RoutingNode localRoutingNode = state.getRoutingNodes().node(state.nodes().getLocalNodeId()); if (localRoutingNode != null) { for (final ShardRouting shardRouting : localRoutingNode) { if (shardRouting.index().equals(index) && failedShardsCache.containsKey(shardRouting.shardId()) == false) { - sendFailShard(shardRouting, "failed to update mapping for index", e, state); + sendFailShard(shardRouting, "failed to update index (" + reason + ")", e, state); } } } diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java index f968f6f4742..1735c8ba033 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java @@ -586,6 +586,7 @@ public class CoordinatorTests extends AbstractCoordinatorTestCase { final ClusterNode follower0 = cluster.getAnyNodeExcept(leader); final ClusterNode follower1 = cluster.getAnyNodeExcept(leader, follower0); + follower0.allowClusterStateApplicationFailure(); follower0.setClusterStateApplyResponse(ClusterStateApplyResponse.FAIL); AckCollector ackCollector = leader.submitValue(randomLong()); cluster.stabilise(DEFAULT_CLUSTER_STATE_UPDATE_DELAY); @@ -605,6 +606,7 @@ public class CoordinatorTests extends AbstractCoordinatorTestCase { final ClusterNode follower1 = cluster.getAnyNodeExcept(leader, follower0); final long startingTerm = leader.coordinator.getCurrentTerm(); + leader.allowClusterStateApplicationFailure(); leader.setClusterStateApplyResponse(ClusterStateApplyResponse.FAIL); AckCollector ackCollector = leader.submitValue(randomLong()); cluster.runFor(DEFAULT_CLUSTER_STATE_UPDATE_DELAY, "committing value"); diff --git a/server/src/test/java/org/elasticsearch/cluster/service/ClusterApplierServiceTests.java b/server/src/test/java/org/elasticsearch/cluster/service/ClusterApplierServiceTests.java index 4da5de7941e..a851cb7069e 100644 --- a/server/src/test/java/org/elasticsearch/cluster/service/ClusterApplierServiceTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/service/ClusterApplierServiceTests.java @@ -358,6 +358,7 @@ public class ClusterApplierServiceTests extends ESTestCase { clusterApplierService.addStateApplier(event -> { throw new RuntimeException("dummy exception"); }); + clusterApplierService.allowClusterStateApplicationFailure(); CountDownLatch latch = new CountDownLatch(1); clusterApplierService.onNewClusterState("test", () -> ClusterState.builder(clusterApplierService.state()).build(), @@ -386,6 +387,7 @@ public class ClusterApplierServiceTests extends ESTestCase { AtomicReference error = new AtomicReference<>(); clusterApplierService.clusterSettings.addSettingsUpdateConsumer(EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING, v -> {}); + clusterApplierService.allowClusterStateApplicationFailure(); CountDownLatch latch = new CountDownLatch(1); clusterApplierService.onNewClusterState("test", () -> ClusterState.builder(clusterApplierService.state()) @@ -496,6 +498,7 @@ public class ClusterApplierServiceTests extends ESTestCase { final ClusterSettings clusterSettings; volatile Long currentTimeOverride = null; + boolean applicationMayFail; TimedClusterApplierService(Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool) { super("test_node", settings, clusterSettings, threadPool); @@ -509,6 +512,15 @@ public class ClusterApplierServiceTests extends ESTestCase { } return super.currentTimeInMillis(); } + + @Override + protected boolean applicationMayFail() { + return this.applicationMayFail; + } + + void allowClusterStateApplicationFailure() { + this.applicationMayFail = true; + } } } diff --git a/test/framework/src/main/java/org/elasticsearch/cluster/coordination/AbstractCoordinatorTestCase.java b/test/framework/src/main/java/org/elasticsearch/cluster/coordination/AbstractCoordinatorTestCase.java index 102de69cc43..f660b22428a 100644 --- a/test/framework/src/main/java/org/elasticsearch/cluster/coordination/AbstractCoordinatorTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/cluster/coordination/AbstractCoordinatorTestCase.java @@ -1172,6 +1172,10 @@ public class AbstractCoordinatorTestCase extends ESTestCase { private boolean isNotUsefullyBootstrapped() { return getLocalNode().isMasterNode() == false || coordinator.isInitialConfigurationSet() == false; } + + void allowClusterStateApplicationFailure() { + clusterApplierService.allowClusterStateApplicationFailure(); + } } private List provideSeedHosts(SeedHostsProvider.HostsResolver ignored) { @@ -1282,6 +1286,7 @@ public class AbstractCoordinatorTestCase extends ESTestCase { private final String nodeName; private final DeterministicTaskQueue deterministicTaskQueue; ClusterStateApplyResponse clusterStateApplyResponse = ClusterStateApplyResponse.SUCCEED; + private boolean applicationMayFail; DisruptableClusterApplierService(String nodeName, Settings settings, ClusterSettings clusterSettings, DeterministicTaskQueue deterministicTaskQueue, Function runnableWrapper) { @@ -1326,6 +1331,15 @@ public class AbstractCoordinatorTestCase extends ESTestCase { protected void connectToNodesAndWait(ClusterState newClusterState) { // don't do anything, and don't block } + + @Override + protected boolean applicationMayFail() { + return this.applicationMayFail; + } + + void allowClusterStateApplicationFailure() { + this.applicationMayFail = true; + } } protected DiscoveryNode createDiscoveryNode(int nodeIndex, boolean masterEligible) {