SOLR-12011: FORCELEADER should also remove recovering flag of a replica so it can win the election

This commit is contained in:
Cao Manh Dat 2018-03-07 09:09:52 +07:00
parent 28de8218fe
commit 3c153ccd0e
3 changed files with 24 additions and 10 deletions

View File

@ -194,7 +194,7 @@ public class ZkShardTerms implements AutoCloseable{
} }
/** /**
* Set a replica's term equals to leader's term. * Set a replica's term equals to leader's term, and remove recovering flag of a replica.
* This call should only be used by {@link org.apache.solr.common.params.CollectionParams.CollectionAction#FORCELEADER} * This call should only be used by {@link org.apache.solr.common.params.CollectionParams.CollectionAction#FORCELEADER}
* @param coreNodeName of the replica * @param coreNodeName of the replica
*/ */
@ -554,6 +554,7 @@ public class ZkShardTerms implements AutoCloseable{
HashMap<String, Long> newValues = new HashMap<>(values); HashMap<String, Long> newValues = new HashMap<>(values);
newValues.put(coreNodeName, maxTerm); newValues.put(coreNodeName, maxTerm);
newValues.remove(coreNodeName+"_recovering");
return new Terms(newValues, version); return new Terms(newValues, version);
} }

View File

@ -1148,16 +1148,10 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
.noneMatch(rep -> zkShardTerms.registered(rep.getName()) && zkShardTerms.canBecomeLeader(rep.getName())); .noneMatch(rep -> zkShardTerms.registered(rep.getName()) && zkShardTerms.canBecomeLeader(rep.getName()));
// we won't increase replica's terms if exist a live replica with term equals to leader // we won't increase replica's terms if exist a live replica with term equals to leader
if (shouldIncreaseReplicaTerms) { if (shouldIncreaseReplicaTerms) {
OptionalLong optionalMaxTerm = liveReplicas.stream() //TODO only increase terms of replicas less out-of-sync
liveReplicas.stream()
.filter(rep -> zkShardTerms.registered(rep.getName())) .filter(rep -> zkShardTerms.registered(rep.getName()))
.mapToLong(rep -> zkShardTerms.getTerm(rep.getName())) .forEach(rep -> zkShardTerms.setTermEqualsToLeader(rep.getName()));
.max();
// increase terms of replicas less out-of-sync
if (optionalMaxTerm.isPresent()) {
liveReplicas.stream()
.filter(rep -> zkShardTerms.getTerm(rep.getName()) == optionalMaxTerm.getAsLong())
.forEach(rep -> zkShardTerms.setTermEqualsToLeader(rep.getName()));
}
} }
// Wait till we have an active leader // Wait till we have an active leader

View File

@ -229,6 +229,25 @@ public class ZkShardTermsTest extends SolrCloudTestCase {
replicaTerms.close(); replicaTerms.close();
} }
public void testSetTermEqualsToLeader() throws InterruptedException {
String collection = "setTermEqualsToLeader";
ZkShardTerms leaderTerms = new ZkShardTerms(collection, "shard1", cluster.getZkClient());
ZkShardTerms replicaTerms = new ZkShardTerms(collection, "shard1", cluster.getZkClient());
leaderTerms.registerTerm("leader");
replicaTerms.registerTerm("replica");
leaderTerms.ensureTermsIsHigher("leader", Collections.singleton("replica"));
waitFor(false, () -> replicaTerms.canBecomeLeader("replica"));
waitFor(true, () -> leaderTerms.skipSendingUpdatesTo("replica"));
replicaTerms.setTermEqualsToLeader("replica");
waitFor(true, () -> replicaTerms.canBecomeLeader("replica"));
waitFor(false, () -> leaderTerms.skipSendingUpdatesTo("replica"));
leaderTerms.close();
replicaTerms.close();
}
private <T> void waitFor(T expected, Supplier<T> supplier) throws InterruptedException { private <T> void waitFor(T expected, Supplier<T> supplier) throws InterruptedException {
TimeOut timeOut = new TimeOut(10, TimeUnit.SECONDS, new TimeSource.CurrentTimeSource()); TimeOut timeOut = new TimeOut(10, TimeUnit.SECONDS, new TimeSource.CurrentTimeSource());
while (!timeOut.hasTimedOut()) { while (!timeOut.hasTimedOut()) {