SOLR-13844: Remove replica recovery terms with the replica term (#951)

This commit is contained in:
Houston Putman 2019-11-01 10:34:53 -04:00 committed by Cao Manh Dat
parent 53b002f59d
commit 6e1ecd1218
3 changed files with 45 additions and 12 deletions

View File

@ -121,6 +121,8 @@ Improvements
* SOLR-13865: Move replica routing code to SolrJ. (Houston Putman via Tomas Fernandez-Lobbe)
* SOLR-13844: Remove replica recovery terms with the replica term (Houston Putman via Cao Manh Dat)
Optimizations
---------------------

View File

@ -74,6 +74,8 @@ public class ZkShardTerms implements AutoCloseable{
private final Set<CoreTermWatcher> listeners = new HashSet<>();
private final AtomicBoolean isClosed = new AtomicBoolean(false);
private static final String RECOVERING_TERM_SUFFIX = "_recovering";
private Terms terms;
// Listener of a core for shard's term change events
@ -239,7 +241,11 @@ public class ZkShardTerms implements AutoCloseable{
}
public boolean isRecovering(String name) {
return terms.values.containsKey(name + "_recovering");
return terms.values.containsKey(recoveringTerm(name));
}
public static String recoveringTerm(String coreNodeName) {
return coreNodeName + RECOVERING_TERM_SUFFIX;
}
@ -448,7 +454,7 @@ public class ZkShardTerms implements AutoCloseable{
* @return true if {@code coreNodeName} can become leader, false if otherwise
*/
boolean canBecomeLeader(String coreNodeName) {
return haveHighestTermValue(coreNodeName) && !values.containsKey(coreNodeName + "_recovering");
return haveHighestTermValue(coreNodeName) && !values.containsKey(recoveringTerm(coreNodeName));
}
/**
@ -501,9 +507,8 @@ public class ZkShardTerms implements AutoCloseable{
}
private boolean skipIncreaseTermOf(String key, Set<String> replicasNeedingRecovery) {
if (key.endsWith("_recovering")) {
key = key.substring(0, key.length() - "_recovering".length());
return replicasNeedingRecovery.contains(key);
if (key.endsWith(RECOVERING_TERM_SUFFIX)) {
key = key.substring(0, key.length() - RECOVERING_TERM_SUFFIX.length());
}
return replicasNeedingRecovery.contains(key);
}
@ -524,15 +529,19 @@ public class ZkShardTerms implements AutoCloseable{
}
/**
* Return a new {@link Terms} in which term of {@code coreNodeName} is removed
* Return a new {@link Terms} in which terms for the {@code coreNodeName} are removed
* @param coreNodeName of the replica
* @return null if term of {@code coreNodeName} is already not exist
*/
Terms removeTerm(String coreNodeName) {
if (!values.containsKey(coreNodeName)) return null;
if (!values.containsKey(recoveringTerm(coreNodeName)) && !values.containsKey(coreNodeName)) {
return null;
}
HashMap<String, Long> newValues = new HashMap<>(values);
newValues.remove(coreNodeName);
newValues.remove(recoveringTerm(coreNodeName));
return new Terms(newValues, version);
}
@ -569,7 +578,7 @@ public class ZkShardTerms implements AutoCloseable{
HashMap<String, Long> newValues = new HashMap<>(values);
newValues.put(coreNodeName, maxTerm);
newValues.remove(coreNodeName+"_recovering");
newValues.remove(recoveringTerm(coreNodeName));
return new Terms(newValues, version);
}
@ -588,10 +597,10 @@ public class ZkShardTerms implements AutoCloseable{
return null;
HashMap<String, Long> newValues = new HashMap<>(values);
if (!newValues.containsKey(coreNodeName+"_recovering")) {
if (!newValues.containsKey(recoveringTerm(coreNodeName))) {
long currentTerm = newValues.getOrDefault(coreNodeName, 0L);
// by keeping old term, we will have more information in leader election
newValues.put(coreNodeName+"_recovering", currentTerm);
newValues.put(recoveringTerm(coreNodeName), currentTerm);
}
newValues.put(coreNodeName, maxTerm);
return new Terms(newValues, version);
@ -603,12 +612,12 @@ public class ZkShardTerms implements AutoCloseable{
* @return null if term of {@code coreNodeName} is already finished doing recovering
*/
Terms doneRecovering(String coreNodeName) {
if (!values.containsKey(coreNodeName+"_recovering")) {
if (!values.containsKey(recoveringTerm(coreNodeName))) {
return null;
}
HashMap<String, Long> newValues = new HashMap<>(values);
newValues.remove(coreNodeName+"_recovering");
newValues.remove(recoveringTerm(coreNodeName));
return new Terms(newValues, version);
}

View File

@ -129,6 +129,28 @@ public class ZkShardTermsTest extends SolrCloudTestCase {
}
}
@Test
public void testCoreRemovalWhileRecovering() {
String collection = "recoveringFlag";
try (ZkShardTerms zkShardTerms = new ZkShardTerms(collection, "shard1", cluster.getZkClient())) {
// List all possible orders of ensureTermIsHigher, startRecovering, doneRecovering
zkShardTerms.registerTerm("replica1_rem");
zkShardTerms.registerTerm("replica2_rem");
// normal case when leader failed to send an update to replica
zkShardTerms.ensureTermsIsHigher("replica1_rem", Collections.singleton("replica2_rem"));
zkShardTerms.startRecovering("replica2_rem");
assertEquals(zkShardTerms.getTerm("replica2_rem"), 1);
assertEquals(zkShardTerms.getTerm("replica2_rem_recovering"), 0);
// Remove core, and check if the correct core was removed as well as the recovering term for that core
zkShardTerms.removeTerm("replica2_rem");
assertEquals(zkShardTerms.getTerm("replica1_rem"), 1);
assertEquals(zkShardTerms.getTerm("replica2_rem"), -1);
assertEquals(zkShardTerms.getTerm("replica2_rem_recovering"), -1);
}
}
public void testRegisterTerm() throws InterruptedException {
String collection = "registerTerm";
ZkShardTerms rep1Terms = new ZkShardTerms(collection, "shard1", cluster.getZkClient());