Reset state recovery after successful recovery (#42576)
The problem this commit addresses is that state recovery is not reset on a node that then becomes master with a cluster state that has a state not recovered flag in it. The situation that was observed in a failed test run of MinimumMasterNodesIT.testThreeNodesNoMasterBlock (see below) is that we have 3 master nodes (node_t0, node_t1, node_t2), two of them are shut down (node_t2 remains), when the first one comes back (renamed to node_t4) it becomes leader in term 2 and sends state (with state_not_recovered_block) to node_t2, which accepts. node_t2 becomes leader in term 3, and as it was previously leader in term1 and successfully completed state recovery, does never retry state recovery in term 3. Closes #39172
This commit is contained in:
parent
746a2f41fd
commit
5598647922
|
@ -325,7 +325,8 @@ public class ClusterState implements ToXContentFragment, Diffable<ClusterState>
|
|||
public String toString() {
|
||||
StringBuilder sb = new StringBuilder();
|
||||
final String TAB = " ";
|
||||
sb.append("cluster uuid: ").append(metaData.clusterUUID()).append("\n");
|
||||
sb.append("cluster uuid: ").append(metaData.clusterUUID())
|
||||
.append(" [committed: ").append(metaData.clusterUUIDCommitted()).append("]").append("\n");
|
||||
sb.append("version: ").append(version).append("\n");
|
||||
sb.append("state uuid: ").append(stateUUID).append("\n");
|
||||
sb.append("from_diff: ").append(wasReadFromDiff).append("\n");
|
||||
|
|
|
@ -729,7 +729,6 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
|
|||
assert followersChecker.getFastResponseState().term == getCurrentTerm() : followersChecker.getFastResponseState();
|
||||
assert followersChecker.getFastResponseState().mode == getMode() : followersChecker.getFastResponseState();
|
||||
assert (applierState.nodes().getMasterNodeId() == null) == applierState.blocks().hasGlobalBlockWithId(NO_MASTER_BLOCK_ID);
|
||||
assert applierState.nodes().getMasterNodeId() == null || applierState.metaData().clusterUUIDCommitted();
|
||||
assert preVoteCollector.getPreVoteResponse().equals(getPreVoteResponse())
|
||||
: preVoteCollector + " vs " + getPreVoteResponse();
|
||||
|
||||
|
|
|
@ -86,7 +86,7 @@ public class GatewayService extends AbstractLifecycleComponent implements Cluste
|
|||
|
||||
private final Runnable recoveryRunnable;
|
||||
|
||||
private final AtomicBoolean recovered = new AtomicBoolean();
|
||||
private final AtomicBoolean recoveryInProgress = new AtomicBoolean();
|
||||
private final AtomicBoolean scheduledRecovery = new AtomicBoolean();
|
||||
|
||||
@Inject
|
||||
|
@ -214,7 +214,7 @@ public class GatewayService extends AbstractLifecycleComponent implements Cluste
|
|||
|
||||
@Override
|
||||
protected void doRun() {
|
||||
if (recovered.compareAndSet(false, true)) {
|
||||
if (recoveryInProgress.compareAndSet(false, true)) {
|
||||
logger.info("recover_after_time [{}] elapsed. performing state recovery...", recoverAfterTime);
|
||||
recoveryRunnable.run();
|
||||
}
|
||||
|
@ -222,7 +222,7 @@ public class GatewayService extends AbstractLifecycleComponent implements Cluste
|
|||
}, recoverAfterTime, ThreadPool.Names.GENERIC);
|
||||
}
|
||||
} else {
|
||||
if (recovered.compareAndSet(false, true)) {
|
||||
if (recoveryInProgress.compareAndSet(false, true)) {
|
||||
threadPool.generic().execute(new AbstractRunnable() {
|
||||
@Override
|
||||
public void onFailure(final Exception e) {
|
||||
|
@ -240,7 +240,7 @@ public class GatewayService extends AbstractLifecycleComponent implements Cluste
|
|||
}
|
||||
|
||||
private void resetRecoveredFlags() {
|
||||
recovered.set(false);
|
||||
recoveryInProgress.set(false);
|
||||
scheduledRecovery.set(false);
|
||||
}
|
||||
|
||||
|
@ -259,6 +259,9 @@ public class GatewayService extends AbstractLifecycleComponent implements Cluste
|
|||
@Override
|
||||
public void clusterStateProcessed(final String source, final ClusterState oldState, final ClusterState newState) {
|
||||
logger.info("recovered [{}] indices into cluster_state", newState.metaData().indices().size());
|
||||
// reset flag even though state recovery completed, to ensure that if we subsequently become leader again based on a
|
||||
// not-recovered state, that we again do another state recovery.
|
||||
resetRecoveredFlags();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -45,6 +45,7 @@ import org.elasticsearch.cluster.metadata.Manifest;
|
|||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode.Role;
|
||||
import org.elasticsearch.cluster.routing.allocation.AllocationService;
|
||||
import org.elasticsearch.cluster.service.ClusterApplierService;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
|
@ -70,6 +71,8 @@ import org.elasticsearch.discovery.DiscoveryModule;
|
|||
import org.elasticsearch.discovery.SeedHostsProvider.HostsResolver;
|
||||
import org.elasticsearch.discovery.zen.PublishClusterStateStats;
|
||||
import org.elasticsearch.env.NodeEnvironment;
|
||||
import org.elasticsearch.gateway.ClusterStateUpdaters;
|
||||
import org.elasticsearch.gateway.GatewayService;
|
||||
import org.elasticsearch.gateway.MetaStateService;
|
||||
import org.elasticsearch.gateway.MockGatewayMetaState;
|
||||
import org.elasticsearch.indices.cluster.FakeThreadPoolMasterService;
|
||||
|
@ -131,6 +134,7 @@ import static org.elasticsearch.cluster.coordination.NoMasterBlockService.NO_MAS
|
|||
import static org.elasticsearch.cluster.coordination.NoMasterBlockService.NO_MASTER_BLOCK_WRITES;
|
||||
import static org.elasticsearch.cluster.coordination.Reconfigurator.CLUSTER_AUTO_SHRINK_VOTING_CONFIGURATION;
|
||||
import static org.elasticsearch.discovery.PeerFinder.DISCOVERY_FIND_PEERS_INTERVAL_SETTING;
|
||||
import static org.elasticsearch.gateway.GatewayService.STATE_NOT_RECOVERED_BLOCK;
|
||||
import static org.elasticsearch.node.Node.NODE_NAME_SETTING;
|
||||
import static org.elasticsearch.transport.TransportService.NOOP_TRANSPORT_INTERCEPTOR;
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
|
@ -191,6 +195,45 @@ public class CoordinatorTests extends ESTestCase {
|
|||
assertEquals(result1, result2);
|
||||
}
|
||||
|
||||
/**
|
||||
* This test was added to verify that state recovery is properly reset on a node after it has become master and successfully
|
||||
* recovered a state (see {@link GatewayService}). The situation which triggers this with a decent likelihood is as follows:
|
||||
* 3 master-eligible nodes (leader, follower1, follower2), the followers are shut down (leader remains), when followers come back
|
||||
* one of them becomes leader and publishes first state (with STATE_NOT_RECOVERED_BLOCK) to old leader, which accepts it.
|
||||
* Old leader is initiating an election at the same time, and wins election. It becomes leader again, but as it previously
|
||||
* successfully completed state recovery, is never reset to a state where state recovery can be retried.
|
||||
*/
|
||||
public void testStateRecoveryResetAfterPreviousLeadership() {
|
||||
final Cluster cluster = new Cluster(3);
|
||||
cluster.runRandomly();
|
||||
cluster.stabilise();
|
||||
|
||||
final ClusterNode leader = cluster.getAnyLeader();
|
||||
final ClusterNode follower1 = cluster.getAnyNodeExcept(leader);
|
||||
final ClusterNode follower2 = cluster.getAnyNodeExcept(leader, follower1);
|
||||
|
||||
// restart follower1 and follower2
|
||||
for (ClusterNode clusterNode : Arrays.asList(follower1, follower2)) {
|
||||
clusterNode.close();
|
||||
cluster.clusterNodes.forEach(
|
||||
cn -> cluster.deterministicTaskQueue.scheduleNow(cn.onNode(
|
||||
new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
cn.transportService.disconnectFromNode(clusterNode.getLocalNode());
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "disconnect from " + clusterNode.getLocalNode() + " after shutdown";
|
||||
}
|
||||
})));
|
||||
cluster.clusterNodes.replaceAll(cn -> cn == clusterNode ? cn.restartedNode() : cn);
|
||||
}
|
||||
|
||||
cluster.stabilise();
|
||||
}
|
||||
|
||||
public void testCanUpdateClusterStateAfterStabilisation() {
|
||||
final Cluster cluster = new Cluster(randomIntBetween(1, 5));
|
||||
cluster.runRandomly();
|
||||
|
@ -1525,6 +1568,10 @@ public class CoordinatorTests extends ESTestCase {
|
|||
|
||||
assertTrue(leaderId + " has been bootstrapped", leader.coordinator.isInitialConfigurationSet());
|
||||
assertTrue(leaderId + " exists in its last-applied state", leader.getLastAppliedClusterState().getNodes().nodeExists(leaderId));
|
||||
assertThat(leaderId + " has no NO_MASTER_BLOCK",
|
||||
leader.getLastAppliedClusterState().blocks().hasGlobalBlockWithId(NO_MASTER_BLOCK_ID), equalTo(false));
|
||||
assertThat(leaderId + " has no STATE_NOT_RECOVERED_BLOCK",
|
||||
leader.getLastAppliedClusterState().blocks().hasGlobalBlock(STATE_NOT_RECOVERED_BLOCK), equalTo(false));
|
||||
assertThat(leaderId + " has applied its state ", leader.getLastAppliedClusterState().getVersion(), isEqualToLeaderVersion);
|
||||
|
||||
for (final ClusterNode clusterNode : clusterNodes) {
|
||||
|
@ -1556,6 +1603,8 @@ public class CoordinatorTests extends ESTestCase {
|
|||
equalTo(leader.getLocalNode()));
|
||||
assertThat(nodeId + " has no NO_MASTER_BLOCK",
|
||||
clusterNode.getLastAppliedClusterState().blocks().hasGlobalBlockWithId(NO_MASTER_BLOCK_ID), equalTo(false));
|
||||
assertThat(nodeId + " has no STATE_NOT_RECOVERED_BLOCK",
|
||||
clusterNode.getLastAppliedClusterState().blocks().hasGlobalBlock(STATE_NOT_RECOVERED_BLOCK), equalTo(false));
|
||||
} else {
|
||||
assertThat(nodeId + " is not following " + leaderId, clusterNode.coordinator.getMode(), is(CANDIDATE));
|
||||
assertThat(nodeId + " has no master", clusterNode.getLastAppliedClusterState().nodes().getMasterNode(), nullValue());
|
||||
|
@ -1725,7 +1774,8 @@ public class CoordinatorTests extends ESTestCase {
|
|||
} else {
|
||||
nodeEnvironment = null;
|
||||
delegate = new InMemoryPersistedState(0L,
|
||||
clusterState(0L, 0L, localNode, VotingConfiguration.EMPTY_CONFIG, VotingConfiguration.EMPTY_CONFIG, 0L));
|
||||
ClusterStateUpdaters.addStateNotRecoveredBlock(
|
||||
clusterState(0L, 0L, localNode, VotingConfiguration.EMPTY_CONFIG, VotingConfiguration.EMPTY_CONFIG, 0L)));
|
||||
}
|
||||
} catch (IOException e) {
|
||||
throw new UncheckedIOException("Unable to create MockPersistedState", e);
|
||||
|
@ -1765,8 +1815,9 @@ public class CoordinatorTests extends ESTestCase {
|
|||
clusterState.writeTo(outStream);
|
||||
StreamInput inStream = new NamedWriteableAwareStreamInput(outStream.bytes().streamInput(),
|
||||
new NamedWriteableRegistry(ClusterModule.getNamedWriteables()));
|
||||
// adapt cluster state to new localNode instance and add blocks
|
||||
delegate = new InMemoryPersistedState(adaptCurrentTerm.apply(oldState.getCurrentTerm()),
|
||||
ClusterState.readFrom(inStream, newLocalNode)); // adapts it to new localNode instance
|
||||
ClusterStateUpdaters.addStateNotRecoveredBlock(ClusterState.readFrom(inStream, newLocalNode)));
|
||||
}
|
||||
} catch (IOException e) {
|
||||
throw new UncheckedIOException("Unable to create MockPersistedState", e);
|
||||
|
@ -1870,15 +1921,19 @@ public class CoordinatorTests extends ESTestCase {
|
|||
transportService));
|
||||
final Collection<BiConsumer<DiscoveryNode, ClusterState>> onJoinValidators =
|
||||
Collections.singletonList((dn, cs) -> extraJoinValidators.forEach(validator -> validator.accept(dn, cs)));
|
||||
final AllocationService allocationService = ESAllocationTestCase.createAllocationService(Settings.EMPTY);
|
||||
coordinator = new Coordinator("test_node", settings, clusterSettings, transportService, writableRegistry(),
|
||||
ESAllocationTestCase.createAllocationService(Settings.EMPTY), masterService, this::getPersistedState,
|
||||
allocationService, masterService, this::getPersistedState,
|
||||
Cluster.this::provideSeedHosts, clusterApplierService, onJoinValidators, Randomness.get());
|
||||
masterService.setClusterStatePublisher(coordinator);
|
||||
final GatewayService gatewayService = new GatewayService(settings, allocationService, clusterService,
|
||||
deterministicTaskQueue.getThreadPool(this::onNode), null, null, coordinator);
|
||||
|
||||
logger.trace("starting up [{}]", localNode);
|
||||
transportService.start();
|
||||
transportService.acceptIncomingRequests();
|
||||
coordinator.start();
|
||||
gatewayService.start();
|
||||
clusterService.start();
|
||||
coordinator.startInitialJoin();
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue