Clean GatewayAllocator when stepping down as master (#38885)

This fixes an issue where a messy master election might prevent shard allocation to properly
proceed. I've encountered this in failing CI tests when we were bootstrapping multiple nodes. Tests
would sometimes time out on an `ensureGreen` after an unclean master election. The reason for
this is how the async shard information fetching works and how the clean-up logic in
GatewayAllocator is integrated with the rest of the system. When a node becomes master, it will, as
part of the first cluster state update where it becomes master, already try allocating shards (see
`JoinTaskExecutor`, in particular the call to `reroute`). This process, which runs on the
MasterService thread, will trigger async shard fetching. If the node is still processing an earlier
election failure in ClusterApplierService (e.g. due to a messy election), that will possibly trigger the
clean-up logic in GatewayAllocator after the shard fetching has been initiated by MasterService,
thereby cancelling the fetching, which means that no subsequent reroute (allocation) is triggered
after the shard fetching results return. This means that no shard allocation will happen unless the
user triggers an explicit reroute command. The bug imo is that GatewayAllocator is called from both
MasterService and ClusterApplierService threads, with no clear happens-before relation. The fix
here makes it so that the clean-up logic is also run on the MasterService thread instead of the
ClusterApplierService thread, reestablishing a clear happens-before relation. Note that testing this
is tricky. With the newly added test, I can quite often reproduce this by adding `Thread.sleep(10);`
in ClusterApplierService (to make sure it does not go too quickly) and adding `Thread.sleep(50);` in
`TransportNodesListGatewayStartedShards` to make sure that shard state fetching does not go too
quickly either.

Note that older versions of Zen discovery are affected by this as well, but did not exhibit this issue
as often because master elections are much slower there.
This commit is contained in:
Yannick Welsch 2019-02-25 18:24:06 +09:00
parent 7021e1bd3b
commit a2bc41621c
5 changed files with 58 additions and 21 deletions

View File

@ -29,6 +29,7 @@ import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateTaskConfig; import org.elasticsearch.cluster.ClusterStateTaskConfig;
import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.LocalClusterUpdateTask;
import org.elasticsearch.cluster.block.ClusterBlocks; import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.coordination.ClusterFormationFailureHelper.ClusterFormationState; import org.elasticsearch.cluster.coordination.ClusterFormationFailureHelper.ClusterFormationState;
import org.elasticsearch.cluster.coordination.CoordinationMetaData.VotingConfigExclusion; import org.elasticsearch.cluster.coordination.CoordinationMetaData.VotingConfigExclusion;
@ -99,6 +100,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
private final Settings settings; private final Settings settings;
private final TransportService transportService; private final TransportService transportService;
private final MasterService masterService; private final MasterService masterService;
private final AllocationService allocationService;
private final JoinHelper joinHelper; private final JoinHelper joinHelper;
private final NodeRemovalClusterStateTaskExecutor nodeRemovalExecutor; private final NodeRemovalClusterStateTaskExecutor nodeRemovalExecutor;
private final Supplier<CoordinationState.PersistedState> persistedStateSupplier; private final Supplier<CoordinationState.PersistedState> persistedStateSupplier;
@ -144,6 +146,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
this.settings = settings; this.settings = settings;
this.transportService = transportService; this.transportService = transportService;
this.masterService = masterService; this.masterService = masterService;
this.allocationService = allocationService;
this.onJoinValidators = JoinTaskExecutor.addBuiltInJoinValidators(onJoinValidators); this.onJoinValidators = JoinTaskExecutor.addBuiltInJoinValidators(onJoinValidators);
this.joinHelper = new JoinHelper(settings, allocationService, masterService, transportService, this.joinHelper = new JoinHelper(settings, allocationService, masterService, transportService,
this::getCurrentTerm, this::getStateForMasterService, this::handleJoinRequest, this::joinLeaderInTerm, this.onJoinValidators); this::getCurrentTerm, this::getStateForMasterService, this::handleJoinRequest, this::joinLeaderInTerm, this.onJoinValidators);
@ -500,6 +503,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
method, getCurrentTerm(), mode, lastKnownLeader); method, getCurrentTerm(), mode, lastKnownLeader);
if (mode != Mode.CANDIDATE) { if (mode != Mode.CANDIDATE) {
final Mode prevMode = mode;
mode = Mode.CANDIDATE; mode = Mode.CANDIDATE;
cancelActivePublication("become candidate: " + method); cancelActivePublication("become candidate: " + method);
joinAccumulator.close(mode); joinAccumulator.close(mode);
@ -519,6 +523,10 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
followersChecker.updateFastResponseState(getCurrentTerm(), mode); followersChecker.updateFastResponseState(getCurrentTerm(), mode);
lagDetector.clearTrackedNodes(); lagDetector.clearTrackedNodes();
if (prevMode == Mode.LEADER) {
cleanMasterService();
}
if (applierState.nodes().getMasterNodeId() != null) { if (applierState.nodes().getMasterNodeId() != null) {
applierState = clusterStateWithNoMasterBlock(applierState); applierState = clusterStateWithNoMasterBlock(applierState);
clusterApplier.onNewClusterState("becoming candidate: " + method, () -> applierState, (source, e) -> { clusterApplier.onNewClusterState("becoming candidate: " + method, () -> applierState, (source, e) -> {
@ -555,6 +563,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
void becomeFollower(String method, DiscoveryNode leaderNode) { void becomeFollower(String method, DiscoveryNode leaderNode) {
assert Thread.holdsLock(mutex) : "Coordinator mutex not held"; assert Thread.holdsLock(mutex) : "Coordinator mutex not held";
assert leaderNode.isMasterNode() : leaderNode + " became a leader but is not master-eligible"; assert leaderNode.isMasterNode() : leaderNode + " became a leader but is not master-eligible";
assert mode != Mode.LEADER : "do not switch to follower from leader (should be candidate first)";
if (mode == Mode.FOLLOWER && Optional.of(leaderNode).equals(lastKnownLeader)) { if (mode == Mode.FOLLOWER && Optional.of(leaderNode).equals(lastKnownLeader)) {
logger.trace("{}: coordinator remaining FOLLOWER of [{}] in term {}", logger.trace("{}: coordinator remaining FOLLOWER of [{}] in term {}",
@ -590,6 +599,26 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
lagDetector.clearTrackedNodes(); lagDetector.clearTrackedNodes();
} }
private void cleanMasterService() {
masterService.submitStateUpdateTask("clean-up after stepping down as master",
new LocalClusterUpdateTask() {
@Override
public void onFailure(String source, Exception e) {
// ignore
logger.trace("failed to clean-up after stepping down as master", e);
}
@Override
public ClusterTasksResult<LocalClusterUpdateTask> execute(ClusterState currentState) {
if (currentState.nodes().isLocalNodeElectedMaster() == false) {
allocationService.cleanCaches();
}
return unchanged();
}
});
}
private PreVoteResponse getPreVoteResponse() { private PreVoteResponse getPreVoteResponse() {
return new PreVoteResponse(getCurrentTerm(), coordinationState.get().getLastAcceptedTerm(), return new PreVoteResponse(getCurrentTerm(), coordinationState.get().getLastAcceptedTerm(),
coordinationState.get().getLastAcceptedState().getVersionOrMetaDataVersion()); coordinationState.get().getLastAcceptedState().getVersionOrMetaDataVersion());

View File

@ -194,6 +194,7 @@ public class JoinTaskExecutor implements ClusterStateTaskExecutor<JoinTaskExecut
.minimumMasterNodesOnPublishingMaster(minimumMasterNodesOnLocalNode) .minimumMasterNodesOnPublishingMaster(minimumMasterNodesOnLocalNode)
.build(); .build();
logger.trace("becomeMasterAndTrimConflictingNodes: {}", tmpState.nodes()); logger.trace("becomeMasterAndTrimConflictingNodes: {}", tmpState.nodes());
allocationService.cleanCaches();
tmpState = PersistentTasksCustomMetaData.disassociateDeadNodes(tmpState); tmpState = PersistentTasksCustomMetaData.disassociateDeadNodes(tmpState);
return ClusterState.builder(allocationService.disassociateDeadNodes(tmpState, false, "removed dead nodes on election")); return ClusterState.builder(allocationService.disassociateDeadNodes(tmpState, false, "removed dead nodes on election"));
} }

View File

@ -460,6 +460,10 @@ public class AllocationService {
return System.nanoTime(); return System.nanoTime();
} }
public void cleanCaches() {
gatewayAllocator.cleanCaches();
}
/** /**
* this class is used to describe results of applying a set of * this class is used to describe results of applying a set of
* {@link org.elasticsearch.cluster.routing.allocation.command.AllocationCommand} * {@link org.elasticsearch.cluster.routing.allocation.command.AllocationCommand}

View File

@ -23,14 +23,12 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.support.nodes.BaseNodeResponse; import org.elasticsearch.action.support.nodes.BaseNodeResponse;
import org.elasticsearch.action.support.nodes.BaseNodesResponse; import org.elasticsearch.action.support.nodes.BaseNodesResponse;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.RoutingNodes; import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.RoutingService; import org.elasticsearch.cluster.routing.RoutingService;
import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.allocation.AllocateUnassignedDecision; import org.elasticsearch.cluster.routing.allocation.AllocateUnassignedDecision;
import org.elasticsearch.cluster.routing.allocation.FailedShard; import org.elasticsearch.cluster.routing.allocation.FailedShard;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.lease.Releasables; import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
@ -55,29 +53,20 @@ public class GatewayAllocator {
asyncFetchStore = ConcurrentCollections.newConcurrentMap(); asyncFetchStore = ConcurrentCollections.newConcurrentMap();
@Inject @Inject
public GatewayAllocator(ClusterService clusterService, RoutingService routingService, public GatewayAllocator(RoutingService routingService,
TransportNodesListGatewayStartedShards startedAction, TransportNodesListShardStoreMetaData storeAction) { TransportNodesListGatewayStartedShards startedAction,
TransportNodesListShardStoreMetaData storeAction) {
this.routingService = routingService; this.routingService = routingService;
this.primaryShardAllocator = new InternalPrimaryShardAllocator(startedAction); this.primaryShardAllocator = new InternalPrimaryShardAllocator(startedAction);
this.replicaShardAllocator = new InternalReplicaShardAllocator(storeAction); this.replicaShardAllocator = new InternalReplicaShardAllocator(storeAction);
clusterService.addStateApplier(event -> {
boolean cleanCache = false;
DiscoveryNode localNode = event.state().nodes().getLocalNode();
if (localNode != null) {
if (localNode.isMasterNode() && event.localNodeMaster() == false) {
cleanCache = true;
} }
} else {
cleanCache = true; public void cleanCaches() {
}
if (cleanCache) {
Releasables.close(asyncFetchStarted.values()); Releasables.close(asyncFetchStarted.values());
asyncFetchStarted.clear(); asyncFetchStarted.clear();
Releasables.close(asyncFetchStore.values()); Releasables.close(asyncFetchStore.values());
asyncFetchStore.clear(); asyncFetchStore.clear();
} }
});
}
// for tests // for tests
protected GatewayAllocator() { protected GatewayAllocator() {

View File

@ -29,8 +29,10 @@ import org.elasticsearch.action.admin.indices.stats.IndexStats;
import org.elasticsearch.action.admin.indices.stats.ShardStats; import org.elasticsearch.action.admin.indices.stats.ShardStats;
import org.elasticsearch.action.support.ActionTestUtils; import org.elasticsearch.action.support.ActionTestUtils;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.coordination.ElectionSchedulerFactory;
import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings; import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
@ -577,4 +579,16 @@ public class RecoveryFromGatewayIT extends ESIntegTestCase {
// start another node so cluster consistency checks won't time out due to the lack of state // start another node so cluster consistency checks won't time out due to the lack of state
internalCluster().startNode(); internalCluster().startNode();
} }
public void testMessyElectionsStillMakeClusterGoGreen() throws Exception {
internalCluster().startNodes(3,
Settings.builder().put(ElectionSchedulerFactory.ELECTION_INITIAL_TIMEOUT_SETTING.getKey(),
"2ms").build());
createIndex("test", Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
.put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), "100ms").build());
ensureGreen("test");
internalCluster().fullRestart();
ensureGreen("test");
}
} }