diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java index 68435341071..1f640acc54f 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java @@ -29,6 +29,7 @@ import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateTaskConfig; import org.elasticsearch.cluster.ClusterStateUpdateTask; +import org.elasticsearch.cluster.LocalClusterUpdateTask; import org.elasticsearch.cluster.block.ClusterBlocks; import org.elasticsearch.cluster.coordination.ClusterFormationFailureHelper.ClusterFormationState; import org.elasticsearch.cluster.coordination.CoordinationMetaData.VotingConfigExclusion; @@ -99,6 +100,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery private final Settings settings; private final TransportService transportService; private final MasterService masterService; + private final AllocationService allocationService; private final JoinHelper joinHelper; private final NodeRemovalClusterStateTaskExecutor nodeRemovalExecutor; private final Supplier persistedStateSupplier; @@ -144,6 +146,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery this.settings = settings; this.transportService = transportService; this.masterService = masterService; + this.allocationService = allocationService; this.onJoinValidators = JoinTaskExecutor.addBuiltInJoinValidators(onJoinValidators); this.joinHelper = new JoinHelper(settings, allocationService, masterService, transportService, this::getCurrentTerm, this::getStateForMasterService, this::handleJoinRequest, this::joinLeaderInTerm, this.onJoinValidators); @@ -500,6 +503,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery method, getCurrentTerm(), mode, lastKnownLeader); if (mode != Mode.CANDIDATE) { + final Mode prevMode = mode; mode = Mode.CANDIDATE; cancelActivePublication("become candidate: " + method); joinAccumulator.close(mode); @@ -519,6 +523,10 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery followersChecker.updateFastResponseState(getCurrentTerm(), mode); lagDetector.clearTrackedNodes(); + if (prevMode == Mode.LEADER) { + cleanMasterService(); + } + if (applierState.nodes().getMasterNodeId() != null) { applierState = clusterStateWithNoMasterBlock(applierState); clusterApplier.onNewClusterState("becoming candidate: " + method, () -> applierState, (source, e) -> { @@ -555,6 +563,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery void becomeFollower(String method, DiscoveryNode leaderNode) { assert Thread.holdsLock(mutex) : "Coordinator mutex not held"; assert leaderNode.isMasterNode() : leaderNode + " became a leader but is not master-eligible"; + assert mode != Mode.LEADER : "do not switch to follower from leader (should be candidate first)"; if (mode == Mode.FOLLOWER && Optional.of(leaderNode).equals(lastKnownLeader)) { logger.trace("{}: coordinator remaining FOLLOWER of [{}] in term {}", @@ -590,6 +599,26 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery lagDetector.clearTrackedNodes(); } + private void cleanMasterService() { + masterService.submitStateUpdateTask("clean-up after stepping down as master", + new LocalClusterUpdateTask() { + @Override + public void onFailure(String source, Exception e) { + // ignore + logger.trace("failed to clean-up after stepping down as master", e); + } + + @Override + public ClusterTasksResult execute(ClusterState currentState) { + if (currentState.nodes().isLocalNodeElectedMaster() == false) { + allocationService.cleanCaches(); + } + return unchanged(); + } + + }); + } + private PreVoteResponse getPreVoteResponse() { return new PreVoteResponse(getCurrentTerm(), coordinationState.get().getLastAcceptedTerm(), coordinationState.get().getLastAcceptedState().getVersionOrMetaDataVersion()); diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/JoinTaskExecutor.java b/server/src/main/java/org/elasticsearch/cluster/coordination/JoinTaskExecutor.java index a360ea1ab60..ef83b9191d0 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/JoinTaskExecutor.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/JoinTaskExecutor.java @@ -194,6 +194,7 @@ public class JoinTaskExecutor implements ClusterStateTaskExecutor { - boolean cleanCache = false; - DiscoveryNode localNode = event.state().nodes().getLocalNode(); - if (localNode != null) { - if (localNode.isMasterNode() && event.localNodeMaster() == false) { - cleanCache = true; - } - } else { - cleanCache = true; - } - if (cleanCache) { - Releasables.close(asyncFetchStarted.values()); - asyncFetchStarted.clear(); - Releasables.close(asyncFetchStore.values()); - asyncFetchStore.clear(); - } - }); + } + + public void cleanCaches() { + Releasables.close(asyncFetchStarted.values()); + asyncFetchStarted.clear(); + Releasables.close(asyncFetchStore.values()); + asyncFetchStore.clear(); } // for tests diff --git a/server/src/test/java/org/elasticsearch/gateway/RecoveryFromGatewayIT.java b/server/src/test/java/org/elasticsearch/gateway/RecoveryFromGatewayIT.java index 4b0e431c663..3ea0663d7d4 100644 --- a/server/src/test/java/org/elasticsearch/gateway/RecoveryFromGatewayIT.java +++ b/server/src/test/java/org/elasticsearch/gateway/RecoveryFromGatewayIT.java @@ -29,8 +29,10 @@ import org.elasticsearch.action.admin.indices.stats.IndexStats; import org.elasticsearch.action.admin.indices.stats.ShardStats; import org.elasticsearch.action.support.ActionTestUtils; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.coordination.ElectionSchedulerFactory; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.routing.UnassignedInfo; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Strings; import org.elasticsearch.common.settings.Settings; @@ -577,4 +579,16 @@ public class RecoveryFromGatewayIT extends ESIntegTestCase { // start another node so cluster consistency checks won't time out due to the lack of state internalCluster().startNode(); } + + public void testMessyElectionsStillMakeClusterGoGreen() throws Exception { + internalCluster().startNodes(3, + Settings.builder().put(ElectionSchedulerFactory.ELECTION_INITIAL_TIMEOUT_SETTING.getKey(), + "2ms").build()); + createIndex("test", Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0) + .put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), "100ms").build()); + ensureGreen("test"); + internalCluster().fullRestart(); + ensureGreen("test"); + } }