Reset state recovery after successful recovery (#42576)

The problem this commit addresses is that state recovery is not reset on a node that then becomes
master with a cluster state that has a state not recovered flag in it. The situation that was observed
in a failed test run of MinimumMasterNodesIT.testThreeNodesNoMasterBlock (see below) is that we
have 3 master nodes (node_t0, node_t1, node_t2), two of them are shut down (node_t2 remains),
when the first one comes back (renamed to node_t4) it becomes leader in term 2 and sends state
(with state_not_recovered_block) to node_t2, which accepts. node_t2 becomes leader in term 3, and
as it was previously leader in term1 and successfully completed state recovery, does never retry
state recovery in term 3.

Closes #39172
This commit is contained in:
Yannick Welsch 2019-05-28 13:35:01 +02:00
parent 746a2f41fd
commit 5598647922
4 changed files with 67 additions and 9 deletions

View File

@ -325,7 +325,8 @@ public class ClusterState implements ToXContentFragment, Diffable<ClusterState>
public String toString() { public String toString() {
StringBuilder sb = new StringBuilder(); StringBuilder sb = new StringBuilder();
final String TAB = " "; final String TAB = " ";
sb.append("cluster uuid: ").append(metaData.clusterUUID()).append("\n"); sb.append("cluster uuid: ").append(metaData.clusterUUID())
.append(" [committed: ").append(metaData.clusterUUIDCommitted()).append("]").append("\n");
sb.append("version: ").append(version).append("\n"); sb.append("version: ").append(version).append("\n");
sb.append("state uuid: ").append(stateUUID).append("\n"); sb.append("state uuid: ").append(stateUUID).append("\n");
sb.append("from_diff: ").append(wasReadFromDiff).append("\n"); sb.append("from_diff: ").append(wasReadFromDiff).append("\n");

View File

@ -729,7 +729,6 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
assert followersChecker.getFastResponseState().term == getCurrentTerm() : followersChecker.getFastResponseState(); assert followersChecker.getFastResponseState().term == getCurrentTerm() : followersChecker.getFastResponseState();
assert followersChecker.getFastResponseState().mode == getMode() : followersChecker.getFastResponseState(); assert followersChecker.getFastResponseState().mode == getMode() : followersChecker.getFastResponseState();
assert (applierState.nodes().getMasterNodeId() == null) == applierState.blocks().hasGlobalBlockWithId(NO_MASTER_BLOCK_ID); assert (applierState.nodes().getMasterNodeId() == null) == applierState.blocks().hasGlobalBlockWithId(NO_MASTER_BLOCK_ID);
assert applierState.nodes().getMasterNodeId() == null || applierState.metaData().clusterUUIDCommitted();
assert preVoteCollector.getPreVoteResponse().equals(getPreVoteResponse()) assert preVoteCollector.getPreVoteResponse().equals(getPreVoteResponse())
: preVoteCollector + " vs " + getPreVoteResponse(); : preVoteCollector + " vs " + getPreVoteResponse();

View File

@ -86,7 +86,7 @@ public class GatewayService extends AbstractLifecycleComponent implements Cluste
private final Runnable recoveryRunnable; private final Runnable recoveryRunnable;
private final AtomicBoolean recovered = new AtomicBoolean(); private final AtomicBoolean recoveryInProgress = new AtomicBoolean();
private final AtomicBoolean scheduledRecovery = new AtomicBoolean(); private final AtomicBoolean scheduledRecovery = new AtomicBoolean();
@Inject @Inject
@ -214,7 +214,7 @@ public class GatewayService extends AbstractLifecycleComponent implements Cluste
@Override @Override
protected void doRun() { protected void doRun() {
if (recovered.compareAndSet(false, true)) { if (recoveryInProgress.compareAndSet(false, true)) {
logger.info("recover_after_time [{}] elapsed. performing state recovery...", recoverAfterTime); logger.info("recover_after_time [{}] elapsed. performing state recovery...", recoverAfterTime);
recoveryRunnable.run(); recoveryRunnable.run();
} }
@ -222,7 +222,7 @@ public class GatewayService extends AbstractLifecycleComponent implements Cluste
}, recoverAfterTime, ThreadPool.Names.GENERIC); }, recoverAfterTime, ThreadPool.Names.GENERIC);
} }
} else { } else {
if (recovered.compareAndSet(false, true)) { if (recoveryInProgress.compareAndSet(false, true)) {
threadPool.generic().execute(new AbstractRunnable() { threadPool.generic().execute(new AbstractRunnable() {
@Override @Override
public void onFailure(final Exception e) { public void onFailure(final Exception e) {
@ -240,7 +240,7 @@ public class GatewayService extends AbstractLifecycleComponent implements Cluste
} }
private void resetRecoveredFlags() { private void resetRecoveredFlags() {
recovered.set(false); recoveryInProgress.set(false);
scheduledRecovery.set(false); scheduledRecovery.set(false);
} }
@ -259,6 +259,9 @@ public class GatewayService extends AbstractLifecycleComponent implements Cluste
@Override @Override
public void clusterStateProcessed(final String source, final ClusterState oldState, final ClusterState newState) { public void clusterStateProcessed(final String source, final ClusterState oldState, final ClusterState newState) {
logger.info("recovered [{}] indices into cluster_state", newState.metaData().indices().size()); logger.info("recovered [{}] indices into cluster_state", newState.metaData().indices().size());
// reset flag even though state recovery completed, to ensure that if we subsequently become leader again based on a
// not-recovered state, that we again do another state recovery.
resetRecoveredFlags();
} }
@Override @Override

View File

@ -45,6 +45,7 @@ import org.elasticsearch.cluster.metadata.Manifest;
import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNode.Role; import org.elasticsearch.cluster.node.DiscoveryNode.Role;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.service.ClusterApplierService; import org.elasticsearch.cluster.service.ClusterApplierService;
import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Nullable;
@ -70,6 +71,8 @@ import org.elasticsearch.discovery.DiscoveryModule;
import org.elasticsearch.discovery.SeedHostsProvider.HostsResolver; import org.elasticsearch.discovery.SeedHostsProvider.HostsResolver;
import org.elasticsearch.discovery.zen.PublishClusterStateStats; import org.elasticsearch.discovery.zen.PublishClusterStateStats;
import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.gateway.ClusterStateUpdaters;
import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.gateway.MetaStateService; import org.elasticsearch.gateway.MetaStateService;
import org.elasticsearch.gateway.MockGatewayMetaState; import org.elasticsearch.gateway.MockGatewayMetaState;
import org.elasticsearch.indices.cluster.FakeThreadPoolMasterService; import org.elasticsearch.indices.cluster.FakeThreadPoolMasterService;
@ -131,6 +134,7 @@ import static org.elasticsearch.cluster.coordination.NoMasterBlockService.NO_MAS
import static org.elasticsearch.cluster.coordination.NoMasterBlockService.NO_MASTER_BLOCK_WRITES; import static org.elasticsearch.cluster.coordination.NoMasterBlockService.NO_MASTER_BLOCK_WRITES;
import static org.elasticsearch.cluster.coordination.Reconfigurator.CLUSTER_AUTO_SHRINK_VOTING_CONFIGURATION; import static org.elasticsearch.cluster.coordination.Reconfigurator.CLUSTER_AUTO_SHRINK_VOTING_CONFIGURATION;
import static org.elasticsearch.discovery.PeerFinder.DISCOVERY_FIND_PEERS_INTERVAL_SETTING; import static org.elasticsearch.discovery.PeerFinder.DISCOVERY_FIND_PEERS_INTERVAL_SETTING;
import static org.elasticsearch.gateway.GatewayService.STATE_NOT_RECOVERED_BLOCK;
import static org.elasticsearch.node.Node.NODE_NAME_SETTING; import static org.elasticsearch.node.Node.NODE_NAME_SETTING;
import static org.elasticsearch.transport.TransportService.NOOP_TRANSPORT_INTERCEPTOR; import static org.elasticsearch.transport.TransportService.NOOP_TRANSPORT_INTERCEPTOR;
import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.containsString;
@ -191,6 +195,45 @@ public class CoordinatorTests extends ESTestCase {
assertEquals(result1, result2); assertEquals(result1, result2);
} }
/**
* This test was added to verify that state recovery is properly reset on a node after it has become master and successfully
* recovered a state (see {@link GatewayService}). The situation which triggers this with a decent likelihood is as follows:
* 3 master-eligible nodes (leader, follower1, follower2), the followers are shut down (leader remains), when followers come back
* one of them becomes leader and publishes first state (with STATE_NOT_RECOVERED_BLOCK) to old leader, which accepts it.
* Old leader is initiating an election at the same time, and wins election. It becomes leader again, but as it previously
* successfully completed state recovery, is never reset to a state where state recovery can be retried.
*/
public void testStateRecoveryResetAfterPreviousLeadership() {
final Cluster cluster = new Cluster(3);
cluster.runRandomly();
cluster.stabilise();
final ClusterNode leader = cluster.getAnyLeader();
final ClusterNode follower1 = cluster.getAnyNodeExcept(leader);
final ClusterNode follower2 = cluster.getAnyNodeExcept(leader, follower1);
// restart follower1 and follower2
for (ClusterNode clusterNode : Arrays.asList(follower1, follower2)) {
clusterNode.close();
cluster.clusterNodes.forEach(
cn -> cluster.deterministicTaskQueue.scheduleNow(cn.onNode(
new Runnable() {
@Override
public void run() {
cn.transportService.disconnectFromNode(clusterNode.getLocalNode());
}
@Override
public String toString() {
return "disconnect from " + clusterNode.getLocalNode() + " after shutdown";
}
})));
cluster.clusterNodes.replaceAll(cn -> cn == clusterNode ? cn.restartedNode() : cn);
}
cluster.stabilise();
}
public void testCanUpdateClusterStateAfterStabilisation() { public void testCanUpdateClusterStateAfterStabilisation() {
final Cluster cluster = new Cluster(randomIntBetween(1, 5)); final Cluster cluster = new Cluster(randomIntBetween(1, 5));
cluster.runRandomly(); cluster.runRandomly();
@ -1525,6 +1568,10 @@ public class CoordinatorTests extends ESTestCase {
assertTrue(leaderId + " has been bootstrapped", leader.coordinator.isInitialConfigurationSet()); assertTrue(leaderId + " has been bootstrapped", leader.coordinator.isInitialConfigurationSet());
assertTrue(leaderId + " exists in its last-applied state", leader.getLastAppliedClusterState().getNodes().nodeExists(leaderId)); assertTrue(leaderId + " exists in its last-applied state", leader.getLastAppliedClusterState().getNodes().nodeExists(leaderId));
assertThat(leaderId + " has no NO_MASTER_BLOCK",
leader.getLastAppliedClusterState().blocks().hasGlobalBlockWithId(NO_MASTER_BLOCK_ID), equalTo(false));
assertThat(leaderId + " has no STATE_NOT_RECOVERED_BLOCK",
leader.getLastAppliedClusterState().blocks().hasGlobalBlock(STATE_NOT_RECOVERED_BLOCK), equalTo(false));
assertThat(leaderId + " has applied its state ", leader.getLastAppliedClusterState().getVersion(), isEqualToLeaderVersion); assertThat(leaderId + " has applied its state ", leader.getLastAppliedClusterState().getVersion(), isEqualToLeaderVersion);
for (final ClusterNode clusterNode : clusterNodes) { for (final ClusterNode clusterNode : clusterNodes) {
@ -1556,6 +1603,8 @@ public class CoordinatorTests extends ESTestCase {
equalTo(leader.getLocalNode())); equalTo(leader.getLocalNode()));
assertThat(nodeId + " has no NO_MASTER_BLOCK", assertThat(nodeId + " has no NO_MASTER_BLOCK",
clusterNode.getLastAppliedClusterState().blocks().hasGlobalBlockWithId(NO_MASTER_BLOCK_ID), equalTo(false)); clusterNode.getLastAppliedClusterState().blocks().hasGlobalBlockWithId(NO_MASTER_BLOCK_ID), equalTo(false));
assertThat(nodeId + " has no STATE_NOT_RECOVERED_BLOCK",
clusterNode.getLastAppliedClusterState().blocks().hasGlobalBlock(STATE_NOT_RECOVERED_BLOCK), equalTo(false));
} else { } else {
assertThat(nodeId + " is not following " + leaderId, clusterNode.coordinator.getMode(), is(CANDIDATE)); assertThat(nodeId + " is not following " + leaderId, clusterNode.coordinator.getMode(), is(CANDIDATE));
assertThat(nodeId + " has no master", clusterNode.getLastAppliedClusterState().nodes().getMasterNode(), nullValue()); assertThat(nodeId + " has no master", clusterNode.getLastAppliedClusterState().nodes().getMasterNode(), nullValue());
@ -1725,7 +1774,8 @@ public class CoordinatorTests extends ESTestCase {
} else { } else {
nodeEnvironment = null; nodeEnvironment = null;
delegate = new InMemoryPersistedState(0L, delegate = new InMemoryPersistedState(0L,
clusterState(0L, 0L, localNode, VotingConfiguration.EMPTY_CONFIG, VotingConfiguration.EMPTY_CONFIG, 0L)); ClusterStateUpdaters.addStateNotRecoveredBlock(
clusterState(0L, 0L, localNode, VotingConfiguration.EMPTY_CONFIG, VotingConfiguration.EMPTY_CONFIG, 0L)));
} }
} catch (IOException e) { } catch (IOException e) {
throw new UncheckedIOException("Unable to create MockPersistedState", e); throw new UncheckedIOException("Unable to create MockPersistedState", e);
@ -1765,8 +1815,9 @@ public class CoordinatorTests extends ESTestCase {
clusterState.writeTo(outStream); clusterState.writeTo(outStream);
StreamInput inStream = new NamedWriteableAwareStreamInput(outStream.bytes().streamInput(), StreamInput inStream = new NamedWriteableAwareStreamInput(outStream.bytes().streamInput(),
new NamedWriteableRegistry(ClusterModule.getNamedWriteables())); new NamedWriteableRegistry(ClusterModule.getNamedWriteables()));
// adapt cluster state to new localNode instance and add blocks
delegate = new InMemoryPersistedState(adaptCurrentTerm.apply(oldState.getCurrentTerm()), delegate = new InMemoryPersistedState(adaptCurrentTerm.apply(oldState.getCurrentTerm()),
ClusterState.readFrom(inStream, newLocalNode)); // adapts it to new localNode instance ClusterStateUpdaters.addStateNotRecoveredBlock(ClusterState.readFrom(inStream, newLocalNode)));
} }
} catch (IOException e) { } catch (IOException e) {
throw new UncheckedIOException("Unable to create MockPersistedState", e); throw new UncheckedIOException("Unable to create MockPersistedState", e);
@ -1870,15 +1921,19 @@ public class CoordinatorTests extends ESTestCase {
transportService)); transportService));
final Collection<BiConsumer<DiscoveryNode, ClusterState>> onJoinValidators = final Collection<BiConsumer<DiscoveryNode, ClusterState>> onJoinValidators =
Collections.singletonList((dn, cs) -> extraJoinValidators.forEach(validator -> validator.accept(dn, cs))); Collections.singletonList((dn, cs) -> extraJoinValidators.forEach(validator -> validator.accept(dn, cs)));
final AllocationService allocationService = ESAllocationTestCase.createAllocationService(Settings.EMPTY);
coordinator = new Coordinator("test_node", settings, clusterSettings, transportService, writableRegistry(), coordinator = new Coordinator("test_node", settings, clusterSettings, transportService, writableRegistry(),
ESAllocationTestCase.createAllocationService(Settings.EMPTY), masterService, this::getPersistedState, allocationService, masterService, this::getPersistedState,
Cluster.this::provideSeedHosts, clusterApplierService, onJoinValidators, Randomness.get()); Cluster.this::provideSeedHosts, clusterApplierService, onJoinValidators, Randomness.get());
masterService.setClusterStatePublisher(coordinator); masterService.setClusterStatePublisher(coordinator);
final GatewayService gatewayService = new GatewayService(settings, allocationService, clusterService,
deterministicTaskQueue.getThreadPool(this::onNode), null, null, coordinator);
logger.trace("starting up [{}]", localNode); logger.trace("starting up [{}]", localNode);
transportService.start(); transportService.start();
transportService.acceptIncomingRequests(); transportService.acceptIncomingRequests();
coordinator.start(); coordinator.start();
gatewayService.start();
clusterService.start(); clusterService.start();
coordinator.startInitialJoin(); coordinator.startInitialJoin();
} }