Reset state recovery after successful recovery (#42576)
The problem this commit addresses is that state recovery is not reset on a node that then becomes master with a cluster state that has a state not recovered flag in it. The situation that was observed in a failed test run of MinimumMasterNodesIT.testThreeNodesNoMasterBlock (see below) is that we have 3 master nodes (node_t0, node_t1, node_t2), two of them are shut down (node_t2 remains), when the first one comes back (renamed to node_t4) it becomes leader in term 2 and sends state (with state_not_recovered_block) to node_t2, which accepts. node_t2 becomes leader in term 3, and as it was previously leader in term1 and successfully completed state recovery, does never retry state recovery in term 3. Closes #39172
This commit is contained in:
parent
746a2f41fd
commit
5598647922
|
@ -325,7 +325,8 @@ public class ClusterState implements ToXContentFragment, Diffable<ClusterState>
|
||||||
public String toString() {
|
public String toString() {
|
||||||
StringBuilder sb = new StringBuilder();
|
StringBuilder sb = new StringBuilder();
|
||||||
final String TAB = " ";
|
final String TAB = " ";
|
||||||
sb.append("cluster uuid: ").append(metaData.clusterUUID()).append("\n");
|
sb.append("cluster uuid: ").append(metaData.clusterUUID())
|
||||||
|
.append(" [committed: ").append(metaData.clusterUUIDCommitted()).append("]").append("\n");
|
||||||
sb.append("version: ").append(version).append("\n");
|
sb.append("version: ").append(version).append("\n");
|
||||||
sb.append("state uuid: ").append(stateUUID).append("\n");
|
sb.append("state uuid: ").append(stateUUID).append("\n");
|
||||||
sb.append("from_diff: ").append(wasReadFromDiff).append("\n");
|
sb.append("from_diff: ").append(wasReadFromDiff).append("\n");
|
||||||
|
|
|
@ -729,7 +729,6 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
|
||||||
assert followersChecker.getFastResponseState().term == getCurrentTerm() : followersChecker.getFastResponseState();
|
assert followersChecker.getFastResponseState().term == getCurrentTerm() : followersChecker.getFastResponseState();
|
||||||
assert followersChecker.getFastResponseState().mode == getMode() : followersChecker.getFastResponseState();
|
assert followersChecker.getFastResponseState().mode == getMode() : followersChecker.getFastResponseState();
|
||||||
assert (applierState.nodes().getMasterNodeId() == null) == applierState.blocks().hasGlobalBlockWithId(NO_MASTER_BLOCK_ID);
|
assert (applierState.nodes().getMasterNodeId() == null) == applierState.blocks().hasGlobalBlockWithId(NO_MASTER_BLOCK_ID);
|
||||||
assert applierState.nodes().getMasterNodeId() == null || applierState.metaData().clusterUUIDCommitted();
|
|
||||||
assert preVoteCollector.getPreVoteResponse().equals(getPreVoteResponse())
|
assert preVoteCollector.getPreVoteResponse().equals(getPreVoteResponse())
|
||||||
: preVoteCollector + " vs " + getPreVoteResponse();
|
: preVoteCollector + " vs " + getPreVoteResponse();
|
||||||
|
|
||||||
|
|
|
@ -86,7 +86,7 @@ public class GatewayService extends AbstractLifecycleComponent implements Cluste
|
||||||
|
|
||||||
private final Runnable recoveryRunnable;
|
private final Runnable recoveryRunnable;
|
||||||
|
|
||||||
private final AtomicBoolean recovered = new AtomicBoolean();
|
private final AtomicBoolean recoveryInProgress = new AtomicBoolean();
|
||||||
private final AtomicBoolean scheduledRecovery = new AtomicBoolean();
|
private final AtomicBoolean scheduledRecovery = new AtomicBoolean();
|
||||||
|
|
||||||
@Inject
|
@Inject
|
||||||
|
@ -214,7 +214,7 @@ public class GatewayService extends AbstractLifecycleComponent implements Cluste
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void doRun() {
|
protected void doRun() {
|
||||||
if (recovered.compareAndSet(false, true)) {
|
if (recoveryInProgress.compareAndSet(false, true)) {
|
||||||
logger.info("recover_after_time [{}] elapsed. performing state recovery...", recoverAfterTime);
|
logger.info("recover_after_time [{}] elapsed. performing state recovery...", recoverAfterTime);
|
||||||
recoveryRunnable.run();
|
recoveryRunnable.run();
|
||||||
}
|
}
|
||||||
|
@ -222,7 +222,7 @@ public class GatewayService extends AbstractLifecycleComponent implements Cluste
|
||||||
}, recoverAfterTime, ThreadPool.Names.GENERIC);
|
}, recoverAfterTime, ThreadPool.Names.GENERIC);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
if (recovered.compareAndSet(false, true)) {
|
if (recoveryInProgress.compareAndSet(false, true)) {
|
||||||
threadPool.generic().execute(new AbstractRunnable() {
|
threadPool.generic().execute(new AbstractRunnable() {
|
||||||
@Override
|
@Override
|
||||||
public void onFailure(final Exception e) {
|
public void onFailure(final Exception e) {
|
||||||
|
@ -240,7 +240,7 @@ public class GatewayService extends AbstractLifecycleComponent implements Cluste
|
||||||
}
|
}
|
||||||
|
|
||||||
private void resetRecoveredFlags() {
|
private void resetRecoveredFlags() {
|
||||||
recovered.set(false);
|
recoveryInProgress.set(false);
|
||||||
scheduledRecovery.set(false);
|
scheduledRecovery.set(false);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -259,6 +259,9 @@ public class GatewayService extends AbstractLifecycleComponent implements Cluste
|
||||||
@Override
|
@Override
|
||||||
public void clusterStateProcessed(final String source, final ClusterState oldState, final ClusterState newState) {
|
public void clusterStateProcessed(final String source, final ClusterState oldState, final ClusterState newState) {
|
||||||
logger.info("recovered [{}] indices into cluster_state", newState.metaData().indices().size());
|
logger.info("recovered [{}] indices into cluster_state", newState.metaData().indices().size());
|
||||||
|
// reset flag even though state recovery completed, to ensure that if we subsequently become leader again based on a
|
||||||
|
// not-recovered state, that we again do another state recovery.
|
||||||
|
resetRecoveredFlags();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -45,6 +45,7 @@ import org.elasticsearch.cluster.metadata.Manifest;
|
||||||
import org.elasticsearch.cluster.metadata.MetaData;
|
import org.elasticsearch.cluster.metadata.MetaData;
|
||||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||||
import org.elasticsearch.cluster.node.DiscoveryNode.Role;
|
import org.elasticsearch.cluster.node.DiscoveryNode.Role;
|
||||||
|
import org.elasticsearch.cluster.routing.allocation.AllocationService;
|
||||||
import org.elasticsearch.cluster.service.ClusterApplierService;
|
import org.elasticsearch.cluster.service.ClusterApplierService;
|
||||||
import org.elasticsearch.cluster.service.ClusterService;
|
import org.elasticsearch.cluster.service.ClusterService;
|
||||||
import org.elasticsearch.common.Nullable;
|
import org.elasticsearch.common.Nullable;
|
||||||
|
@ -70,6 +71,8 @@ import org.elasticsearch.discovery.DiscoveryModule;
|
||||||
import org.elasticsearch.discovery.SeedHostsProvider.HostsResolver;
|
import org.elasticsearch.discovery.SeedHostsProvider.HostsResolver;
|
||||||
import org.elasticsearch.discovery.zen.PublishClusterStateStats;
|
import org.elasticsearch.discovery.zen.PublishClusterStateStats;
|
||||||
import org.elasticsearch.env.NodeEnvironment;
|
import org.elasticsearch.env.NodeEnvironment;
|
||||||
|
import org.elasticsearch.gateway.ClusterStateUpdaters;
|
||||||
|
import org.elasticsearch.gateway.GatewayService;
|
||||||
import org.elasticsearch.gateway.MetaStateService;
|
import org.elasticsearch.gateway.MetaStateService;
|
||||||
import org.elasticsearch.gateway.MockGatewayMetaState;
|
import org.elasticsearch.gateway.MockGatewayMetaState;
|
||||||
import org.elasticsearch.indices.cluster.FakeThreadPoolMasterService;
|
import org.elasticsearch.indices.cluster.FakeThreadPoolMasterService;
|
||||||
|
@ -131,6 +134,7 @@ import static org.elasticsearch.cluster.coordination.NoMasterBlockService.NO_MAS
|
||||||
import static org.elasticsearch.cluster.coordination.NoMasterBlockService.NO_MASTER_BLOCK_WRITES;
|
import static org.elasticsearch.cluster.coordination.NoMasterBlockService.NO_MASTER_BLOCK_WRITES;
|
||||||
import static org.elasticsearch.cluster.coordination.Reconfigurator.CLUSTER_AUTO_SHRINK_VOTING_CONFIGURATION;
|
import static org.elasticsearch.cluster.coordination.Reconfigurator.CLUSTER_AUTO_SHRINK_VOTING_CONFIGURATION;
|
||||||
import static org.elasticsearch.discovery.PeerFinder.DISCOVERY_FIND_PEERS_INTERVAL_SETTING;
|
import static org.elasticsearch.discovery.PeerFinder.DISCOVERY_FIND_PEERS_INTERVAL_SETTING;
|
||||||
|
import static org.elasticsearch.gateway.GatewayService.STATE_NOT_RECOVERED_BLOCK;
|
||||||
import static org.elasticsearch.node.Node.NODE_NAME_SETTING;
|
import static org.elasticsearch.node.Node.NODE_NAME_SETTING;
|
||||||
import static org.elasticsearch.transport.TransportService.NOOP_TRANSPORT_INTERCEPTOR;
|
import static org.elasticsearch.transport.TransportService.NOOP_TRANSPORT_INTERCEPTOR;
|
||||||
import static org.hamcrest.Matchers.containsString;
|
import static org.hamcrest.Matchers.containsString;
|
||||||
|
@ -191,6 +195,45 @@ public class CoordinatorTests extends ESTestCase {
|
||||||
assertEquals(result1, result2);
|
assertEquals(result1, result2);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This test was added to verify that state recovery is properly reset on a node after it has become master and successfully
|
||||||
|
* recovered a state (see {@link GatewayService}). The situation which triggers this with a decent likelihood is as follows:
|
||||||
|
* 3 master-eligible nodes (leader, follower1, follower2), the followers are shut down (leader remains), when followers come back
|
||||||
|
* one of them becomes leader and publishes first state (with STATE_NOT_RECOVERED_BLOCK) to old leader, which accepts it.
|
||||||
|
* Old leader is initiating an election at the same time, and wins election. It becomes leader again, but as it previously
|
||||||
|
* successfully completed state recovery, is never reset to a state where state recovery can be retried.
|
||||||
|
*/
|
||||||
|
public void testStateRecoveryResetAfterPreviousLeadership() {
|
||||||
|
final Cluster cluster = new Cluster(3);
|
||||||
|
cluster.runRandomly();
|
||||||
|
cluster.stabilise();
|
||||||
|
|
||||||
|
final ClusterNode leader = cluster.getAnyLeader();
|
||||||
|
final ClusterNode follower1 = cluster.getAnyNodeExcept(leader);
|
||||||
|
final ClusterNode follower2 = cluster.getAnyNodeExcept(leader, follower1);
|
||||||
|
|
||||||
|
// restart follower1 and follower2
|
||||||
|
for (ClusterNode clusterNode : Arrays.asList(follower1, follower2)) {
|
||||||
|
clusterNode.close();
|
||||||
|
cluster.clusterNodes.forEach(
|
||||||
|
cn -> cluster.deterministicTaskQueue.scheduleNow(cn.onNode(
|
||||||
|
new Runnable() {
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
cn.transportService.disconnectFromNode(clusterNode.getLocalNode());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return "disconnect from " + clusterNode.getLocalNode() + " after shutdown";
|
||||||
|
}
|
||||||
|
})));
|
||||||
|
cluster.clusterNodes.replaceAll(cn -> cn == clusterNode ? cn.restartedNode() : cn);
|
||||||
|
}
|
||||||
|
|
||||||
|
cluster.stabilise();
|
||||||
|
}
|
||||||
|
|
||||||
public void testCanUpdateClusterStateAfterStabilisation() {
|
public void testCanUpdateClusterStateAfterStabilisation() {
|
||||||
final Cluster cluster = new Cluster(randomIntBetween(1, 5));
|
final Cluster cluster = new Cluster(randomIntBetween(1, 5));
|
||||||
cluster.runRandomly();
|
cluster.runRandomly();
|
||||||
|
@ -1525,6 +1568,10 @@ public class CoordinatorTests extends ESTestCase {
|
||||||
|
|
||||||
assertTrue(leaderId + " has been bootstrapped", leader.coordinator.isInitialConfigurationSet());
|
assertTrue(leaderId + " has been bootstrapped", leader.coordinator.isInitialConfigurationSet());
|
||||||
assertTrue(leaderId + " exists in its last-applied state", leader.getLastAppliedClusterState().getNodes().nodeExists(leaderId));
|
assertTrue(leaderId + " exists in its last-applied state", leader.getLastAppliedClusterState().getNodes().nodeExists(leaderId));
|
||||||
|
assertThat(leaderId + " has no NO_MASTER_BLOCK",
|
||||||
|
leader.getLastAppliedClusterState().blocks().hasGlobalBlockWithId(NO_MASTER_BLOCK_ID), equalTo(false));
|
||||||
|
assertThat(leaderId + " has no STATE_NOT_RECOVERED_BLOCK",
|
||||||
|
leader.getLastAppliedClusterState().blocks().hasGlobalBlock(STATE_NOT_RECOVERED_BLOCK), equalTo(false));
|
||||||
assertThat(leaderId + " has applied its state ", leader.getLastAppliedClusterState().getVersion(), isEqualToLeaderVersion);
|
assertThat(leaderId + " has applied its state ", leader.getLastAppliedClusterState().getVersion(), isEqualToLeaderVersion);
|
||||||
|
|
||||||
for (final ClusterNode clusterNode : clusterNodes) {
|
for (final ClusterNode clusterNode : clusterNodes) {
|
||||||
|
@ -1556,6 +1603,8 @@ public class CoordinatorTests extends ESTestCase {
|
||||||
equalTo(leader.getLocalNode()));
|
equalTo(leader.getLocalNode()));
|
||||||
assertThat(nodeId + " has no NO_MASTER_BLOCK",
|
assertThat(nodeId + " has no NO_MASTER_BLOCK",
|
||||||
clusterNode.getLastAppliedClusterState().blocks().hasGlobalBlockWithId(NO_MASTER_BLOCK_ID), equalTo(false));
|
clusterNode.getLastAppliedClusterState().blocks().hasGlobalBlockWithId(NO_MASTER_BLOCK_ID), equalTo(false));
|
||||||
|
assertThat(nodeId + " has no STATE_NOT_RECOVERED_BLOCK",
|
||||||
|
clusterNode.getLastAppliedClusterState().blocks().hasGlobalBlock(STATE_NOT_RECOVERED_BLOCK), equalTo(false));
|
||||||
} else {
|
} else {
|
||||||
assertThat(nodeId + " is not following " + leaderId, clusterNode.coordinator.getMode(), is(CANDIDATE));
|
assertThat(nodeId + " is not following " + leaderId, clusterNode.coordinator.getMode(), is(CANDIDATE));
|
||||||
assertThat(nodeId + " has no master", clusterNode.getLastAppliedClusterState().nodes().getMasterNode(), nullValue());
|
assertThat(nodeId + " has no master", clusterNode.getLastAppliedClusterState().nodes().getMasterNode(), nullValue());
|
||||||
|
@ -1725,7 +1774,8 @@ public class CoordinatorTests extends ESTestCase {
|
||||||
} else {
|
} else {
|
||||||
nodeEnvironment = null;
|
nodeEnvironment = null;
|
||||||
delegate = new InMemoryPersistedState(0L,
|
delegate = new InMemoryPersistedState(0L,
|
||||||
clusterState(0L, 0L, localNode, VotingConfiguration.EMPTY_CONFIG, VotingConfiguration.EMPTY_CONFIG, 0L));
|
ClusterStateUpdaters.addStateNotRecoveredBlock(
|
||||||
|
clusterState(0L, 0L, localNode, VotingConfiguration.EMPTY_CONFIG, VotingConfiguration.EMPTY_CONFIG, 0L)));
|
||||||
}
|
}
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
throw new UncheckedIOException("Unable to create MockPersistedState", e);
|
throw new UncheckedIOException("Unable to create MockPersistedState", e);
|
||||||
|
@ -1765,8 +1815,9 @@ public class CoordinatorTests extends ESTestCase {
|
||||||
clusterState.writeTo(outStream);
|
clusterState.writeTo(outStream);
|
||||||
StreamInput inStream = new NamedWriteableAwareStreamInput(outStream.bytes().streamInput(),
|
StreamInput inStream = new NamedWriteableAwareStreamInput(outStream.bytes().streamInput(),
|
||||||
new NamedWriteableRegistry(ClusterModule.getNamedWriteables()));
|
new NamedWriteableRegistry(ClusterModule.getNamedWriteables()));
|
||||||
|
// adapt cluster state to new localNode instance and add blocks
|
||||||
delegate = new InMemoryPersistedState(adaptCurrentTerm.apply(oldState.getCurrentTerm()),
|
delegate = new InMemoryPersistedState(adaptCurrentTerm.apply(oldState.getCurrentTerm()),
|
||||||
ClusterState.readFrom(inStream, newLocalNode)); // adapts it to new localNode instance
|
ClusterStateUpdaters.addStateNotRecoveredBlock(ClusterState.readFrom(inStream, newLocalNode)));
|
||||||
}
|
}
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
throw new UncheckedIOException("Unable to create MockPersistedState", e);
|
throw new UncheckedIOException("Unable to create MockPersistedState", e);
|
||||||
|
@ -1870,15 +1921,19 @@ public class CoordinatorTests extends ESTestCase {
|
||||||
transportService));
|
transportService));
|
||||||
final Collection<BiConsumer<DiscoveryNode, ClusterState>> onJoinValidators =
|
final Collection<BiConsumer<DiscoveryNode, ClusterState>> onJoinValidators =
|
||||||
Collections.singletonList((dn, cs) -> extraJoinValidators.forEach(validator -> validator.accept(dn, cs)));
|
Collections.singletonList((dn, cs) -> extraJoinValidators.forEach(validator -> validator.accept(dn, cs)));
|
||||||
|
final AllocationService allocationService = ESAllocationTestCase.createAllocationService(Settings.EMPTY);
|
||||||
coordinator = new Coordinator("test_node", settings, clusterSettings, transportService, writableRegistry(),
|
coordinator = new Coordinator("test_node", settings, clusterSettings, transportService, writableRegistry(),
|
||||||
ESAllocationTestCase.createAllocationService(Settings.EMPTY), masterService, this::getPersistedState,
|
allocationService, masterService, this::getPersistedState,
|
||||||
Cluster.this::provideSeedHosts, clusterApplierService, onJoinValidators, Randomness.get());
|
Cluster.this::provideSeedHosts, clusterApplierService, onJoinValidators, Randomness.get());
|
||||||
masterService.setClusterStatePublisher(coordinator);
|
masterService.setClusterStatePublisher(coordinator);
|
||||||
|
final GatewayService gatewayService = new GatewayService(settings, allocationService, clusterService,
|
||||||
|
deterministicTaskQueue.getThreadPool(this::onNode), null, null, coordinator);
|
||||||
|
|
||||||
logger.trace("starting up [{}]", localNode);
|
logger.trace("starting up [{}]", localNode);
|
||||||
transportService.start();
|
transportService.start();
|
||||||
transportService.acceptIncomingRequests();
|
transportService.acceptIncomingRequests();
|
||||||
coordinator.start();
|
coordinator.start();
|
||||||
|
gatewayService.start();
|
||||||
clusterService.start();
|
clusterService.start();
|
||||||
coordinator.startInitialJoin();
|
coordinator.startInitialJoin();
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue