From a2bc41621cd8c6ba564fd6bfb17cd21b27074bea Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Mon, 25 Feb 2019 18:24:06 +0900 Subject: [PATCH] Clean GatewayAllocator when stepping down as master (#38885) This fixes an issue where a messy master election might prevent shard allocation to properly proceed. I've encountered this in failing CI tests when we were bootstrapping multiple nodes. Tests would sometimes time out on an `ensureGreen` after an unclean master election. The reason for this is how the async shard information fetching works and how the clean-up logic in GatewayAllocator is integrated with the rest of the system. When a node becomes master, it will, as part of the first cluster state update where it becomes master, already try allocating shards (see `JoinTaskExecutor`, in particular the call to `reroute`). This process, which runs on the MasterService thread, will trigger async shard fetching. If the node is still processing an earlier election failure in ClusterApplierService (e.g. due to a messy election), that will possibly trigger the clean-up logic in GatewayAllocator after the shard fetching has been initiated by MasterService, thereby cancelling the fetching, which means that no subsequent reroute (allocation) is triggered after the shard fetching results return. This means that no shard allocation will happen unless the user triggers an explicit reroute command. The bug imo is that GatewayAllocator is called from both MasterService and ClusterApplierService threads, with no clear happens-before relation. The fix here makes it so that the clean-up logic is also run on the MasterService thread instead of the ClusterApplierService thread, reestablishing a clear happens-before relation. Note that testing this is tricky. With the newly added test, I can quite often reproduce this by adding `Thread.sleep(10);` in ClusterApplierService (to make sure it does not go too quickly) and adding `Thread.sleep(50);` in `TransportNodesListGatewayStartedShards` to make sure that shard state fetching does not go too quickly either. Note that older versions of Zen discovery are affected by this as well, but did not exhibit this issue as often because master elections are much slower there. --- .../cluster/coordination/Coordinator.java | 29 +++++++++++++++++ .../coordination/JoinTaskExecutor.java | 1 + .../routing/allocation/AllocationService.java | 4 +++ .../gateway/GatewayAllocator.java | 31 ++++++------------- .../gateway/RecoveryFromGatewayIT.java | 14 +++++++++ 5 files changed, 58 insertions(+), 21 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java index 68435341071..1f640acc54f 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java @@ -29,6 +29,7 @@ import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateTaskConfig; import org.elasticsearch.cluster.ClusterStateUpdateTask; +import org.elasticsearch.cluster.LocalClusterUpdateTask; import org.elasticsearch.cluster.block.ClusterBlocks; import org.elasticsearch.cluster.coordination.ClusterFormationFailureHelper.ClusterFormationState; import org.elasticsearch.cluster.coordination.CoordinationMetaData.VotingConfigExclusion; @@ -99,6 +100,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery private final Settings settings; private final TransportService transportService; private final MasterService masterService; + private final AllocationService allocationService; private final JoinHelper joinHelper; private final NodeRemovalClusterStateTaskExecutor nodeRemovalExecutor; private final Supplier persistedStateSupplier; @@ -144,6 +146,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery this.settings = settings; this.transportService = transportService; this.masterService = masterService; + this.allocationService = allocationService; this.onJoinValidators = JoinTaskExecutor.addBuiltInJoinValidators(onJoinValidators); this.joinHelper = new JoinHelper(settings, allocationService, masterService, transportService, this::getCurrentTerm, this::getStateForMasterService, this::handleJoinRequest, this::joinLeaderInTerm, this.onJoinValidators); @@ -500,6 +503,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery method, getCurrentTerm(), mode, lastKnownLeader); if (mode != Mode.CANDIDATE) { + final Mode prevMode = mode; mode = Mode.CANDIDATE; cancelActivePublication("become candidate: " + method); joinAccumulator.close(mode); @@ -519,6 +523,10 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery followersChecker.updateFastResponseState(getCurrentTerm(), mode); lagDetector.clearTrackedNodes(); + if (prevMode == Mode.LEADER) { + cleanMasterService(); + } + if (applierState.nodes().getMasterNodeId() != null) { applierState = clusterStateWithNoMasterBlock(applierState); clusterApplier.onNewClusterState("becoming candidate: " + method, () -> applierState, (source, e) -> { @@ -555,6 +563,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery void becomeFollower(String method, DiscoveryNode leaderNode) { assert Thread.holdsLock(mutex) : "Coordinator mutex not held"; assert leaderNode.isMasterNode() : leaderNode + " became a leader but is not master-eligible"; + assert mode != Mode.LEADER : "do not switch to follower from leader (should be candidate first)"; if (mode == Mode.FOLLOWER && Optional.of(leaderNode).equals(lastKnownLeader)) { logger.trace("{}: coordinator remaining FOLLOWER of [{}] in term {}", @@ -590,6 +599,26 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery lagDetector.clearTrackedNodes(); } + private void cleanMasterService() { + masterService.submitStateUpdateTask("clean-up after stepping down as master", + new LocalClusterUpdateTask() { + @Override + public void onFailure(String source, Exception e) { + // ignore + logger.trace("failed to clean-up after stepping down as master", e); + } + + @Override + public ClusterTasksResult execute(ClusterState currentState) { + if (currentState.nodes().isLocalNodeElectedMaster() == false) { + allocationService.cleanCaches(); + } + return unchanged(); + } + + }); + } + private PreVoteResponse getPreVoteResponse() { return new PreVoteResponse(getCurrentTerm(), coordinationState.get().getLastAcceptedTerm(), coordinationState.get().getLastAcceptedState().getVersionOrMetaDataVersion()); diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/JoinTaskExecutor.java b/server/src/main/java/org/elasticsearch/cluster/coordination/JoinTaskExecutor.java index a360ea1ab60..ef83b9191d0 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/JoinTaskExecutor.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/JoinTaskExecutor.java @@ -194,6 +194,7 @@ public class JoinTaskExecutor implements ClusterStateTaskExecutor { - boolean cleanCache = false; - DiscoveryNode localNode = event.state().nodes().getLocalNode(); - if (localNode != null) { - if (localNode.isMasterNode() && event.localNodeMaster() == false) { - cleanCache = true; - } - } else { - cleanCache = true; - } - if (cleanCache) { - Releasables.close(asyncFetchStarted.values()); - asyncFetchStarted.clear(); - Releasables.close(asyncFetchStore.values()); - asyncFetchStore.clear(); - } - }); + } + + public void cleanCaches() { + Releasables.close(asyncFetchStarted.values()); + asyncFetchStarted.clear(); + Releasables.close(asyncFetchStore.values()); + asyncFetchStore.clear(); } // for tests diff --git a/server/src/test/java/org/elasticsearch/gateway/RecoveryFromGatewayIT.java b/server/src/test/java/org/elasticsearch/gateway/RecoveryFromGatewayIT.java index 4b0e431c663..3ea0663d7d4 100644 --- a/server/src/test/java/org/elasticsearch/gateway/RecoveryFromGatewayIT.java +++ b/server/src/test/java/org/elasticsearch/gateway/RecoveryFromGatewayIT.java @@ -29,8 +29,10 @@ import org.elasticsearch.action.admin.indices.stats.IndexStats; import org.elasticsearch.action.admin.indices.stats.ShardStats; import org.elasticsearch.action.support.ActionTestUtils; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.coordination.ElectionSchedulerFactory; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.routing.UnassignedInfo; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Strings; import org.elasticsearch.common.settings.Settings; @@ -577,4 +579,16 @@ public class RecoveryFromGatewayIT extends ESIntegTestCase { // start another node so cluster consistency checks won't time out due to the lack of state internalCluster().startNode(); } + + public void testMessyElectionsStillMakeClusterGoGreen() throws Exception { + internalCluster().startNodes(3, + Settings.builder().put(ElectionSchedulerFactory.ELECTION_INITIAL_TIMEOUT_SETTING.getKey(), + "2ms").build()); + createIndex("test", Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0) + .put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), "100ms").build()); + ensureGreen("test"); + internalCluster().fullRestart(); + ensureGreen("test"); + } }