SOLR-13844: Remove replica recovery terms with the replica term (#951)

This commit is contained in:
Houston Putman 2019-11-01 10:34:53 -04:00 committed by Cao Manh Dat
parent c3b8b584bf
commit 2f8b3ea634
3 changed files with 45 additions and 12 deletions

View File

@ -47,6 +47,8 @@ Improvements
* SOLR-13865: Move replica routing code to SolrJ. (Houston Putman via Tomas Fernandez-Lobbe) * SOLR-13865: Move replica routing code to SolrJ. (Houston Putman via Tomas Fernandez-Lobbe)
* SOLR-13844: Remove replica recovery terms with the replica term (Houston Putman via Cao Manh Dat)
Optimizations Optimizations
--------------------- ---------------------

View File

@ -74,6 +74,8 @@ public class ZkShardTerms implements AutoCloseable{
private final Set<CoreTermWatcher> listeners = new HashSet<>(); private final Set<CoreTermWatcher> listeners = new HashSet<>();
private final AtomicBoolean isClosed = new AtomicBoolean(false); private final AtomicBoolean isClosed = new AtomicBoolean(false);
private static final String RECOVERING_TERM_SUFFIX = "_recovering";
private Terms terms; private Terms terms;
// Listener of a core for shard's term change events // Listener of a core for shard's term change events
@ -239,7 +241,11 @@ public class ZkShardTerms implements AutoCloseable{
} }
public boolean isRecovering(String name) { public boolean isRecovering(String name) {
return terms.values.containsKey(name + "_recovering"); return terms.values.containsKey(recoveringTerm(name));
}
public static String recoveringTerm(String coreNodeName) {
return coreNodeName + RECOVERING_TERM_SUFFIX;
} }
@ -448,7 +454,7 @@ public class ZkShardTerms implements AutoCloseable{
* @return true if {@code coreNodeName} can become leader, false if otherwise * @return true if {@code coreNodeName} can become leader, false if otherwise
*/ */
boolean canBecomeLeader(String coreNodeName) { boolean canBecomeLeader(String coreNodeName) {
return haveHighestTermValue(coreNodeName) && !values.containsKey(coreNodeName + "_recovering"); return haveHighestTermValue(coreNodeName) && !values.containsKey(recoveringTerm(coreNodeName));
} }
/** /**
@ -501,9 +507,8 @@ public class ZkShardTerms implements AutoCloseable{
} }
private boolean skipIncreaseTermOf(String key, Set<String> replicasNeedingRecovery) { private boolean skipIncreaseTermOf(String key, Set<String> replicasNeedingRecovery) {
if (key.endsWith("_recovering")) { if (key.endsWith(RECOVERING_TERM_SUFFIX)) {
key = key.substring(0, key.length() - "_recovering".length()); key = key.substring(0, key.length() - RECOVERING_TERM_SUFFIX.length());
return replicasNeedingRecovery.contains(key);
} }
return replicasNeedingRecovery.contains(key); return replicasNeedingRecovery.contains(key);
} }
@ -524,15 +529,19 @@ public class ZkShardTerms implements AutoCloseable{
} }
/** /**
* Return a new {@link Terms} in which term of {@code coreNodeName} is removed * Return a new {@link Terms} in which terms for the {@code coreNodeName} are removed
* @param coreNodeName of the replica * @param coreNodeName of the replica
* @return null if term of {@code coreNodeName} is already not exist * @return null if term of {@code coreNodeName} is already not exist
*/ */
Terms removeTerm(String coreNodeName) { Terms removeTerm(String coreNodeName) {
if (!values.containsKey(coreNodeName)) return null; if (!values.containsKey(recoveringTerm(coreNodeName)) && !values.containsKey(coreNodeName)) {
return null;
}
HashMap<String, Long> newValues = new HashMap<>(values); HashMap<String, Long> newValues = new HashMap<>(values);
newValues.remove(coreNodeName); newValues.remove(coreNodeName);
newValues.remove(recoveringTerm(coreNodeName));
return new Terms(newValues, version); return new Terms(newValues, version);
} }
@ -569,7 +578,7 @@ public class ZkShardTerms implements AutoCloseable{
HashMap<String, Long> newValues = new HashMap<>(values); HashMap<String, Long> newValues = new HashMap<>(values);
newValues.put(coreNodeName, maxTerm); newValues.put(coreNodeName, maxTerm);
newValues.remove(coreNodeName+"_recovering"); newValues.remove(recoveringTerm(coreNodeName));
return new Terms(newValues, version); return new Terms(newValues, version);
} }
@ -588,10 +597,10 @@ public class ZkShardTerms implements AutoCloseable{
return null; return null;
HashMap<String, Long> newValues = new HashMap<>(values); HashMap<String, Long> newValues = new HashMap<>(values);
if (!newValues.containsKey(coreNodeName+"_recovering")) { if (!newValues.containsKey(recoveringTerm(coreNodeName))) {
long currentTerm = newValues.getOrDefault(coreNodeName, 0L); long currentTerm = newValues.getOrDefault(coreNodeName, 0L);
// by keeping old term, we will have more information in leader election // by keeping old term, we will have more information in leader election
newValues.put(coreNodeName+"_recovering", currentTerm); newValues.put(recoveringTerm(coreNodeName), currentTerm);
} }
newValues.put(coreNodeName, maxTerm); newValues.put(coreNodeName, maxTerm);
return new Terms(newValues, version); return new Terms(newValues, version);
@ -603,12 +612,12 @@ public class ZkShardTerms implements AutoCloseable{
* @return null if term of {@code coreNodeName} is already finished doing recovering * @return null if term of {@code coreNodeName} is already finished doing recovering
*/ */
Terms doneRecovering(String coreNodeName) { Terms doneRecovering(String coreNodeName) {
if (!values.containsKey(coreNodeName+"_recovering")) { if (!values.containsKey(recoveringTerm(coreNodeName))) {
return null; return null;
} }
HashMap<String, Long> newValues = new HashMap<>(values); HashMap<String, Long> newValues = new HashMap<>(values);
newValues.remove(coreNodeName+"_recovering"); newValues.remove(recoveringTerm(coreNodeName));
return new Terms(newValues, version); return new Terms(newValues, version);
} }

View File

@ -129,6 +129,28 @@ public class ZkShardTermsTest extends SolrCloudTestCase {
} }
} }
@Test
public void testCoreRemovalWhileRecovering() {
String collection = "recoveringFlag";
try (ZkShardTerms zkShardTerms = new ZkShardTerms(collection, "shard1", cluster.getZkClient())) {
// List all possible orders of ensureTermIsHigher, startRecovering, doneRecovering
zkShardTerms.registerTerm("replica1_rem");
zkShardTerms.registerTerm("replica2_rem");
// normal case when leader failed to send an update to replica
zkShardTerms.ensureTermsIsHigher("replica1_rem", Collections.singleton("replica2_rem"));
zkShardTerms.startRecovering("replica2_rem");
assertEquals(zkShardTerms.getTerm("replica2_rem"), 1);
assertEquals(zkShardTerms.getTerm("replica2_rem_recovering"), 0);
// Remove core, and check if the correct core was removed as well as the recovering term for that core
zkShardTerms.removeTerm("replica2_rem");
assertEquals(zkShardTerms.getTerm("replica1_rem"), 1);
assertEquals(zkShardTerms.getTerm("replica2_rem"), -1);
assertEquals(zkShardTerms.getTerm("replica2_rem_recovering"), -1);
}
}
public void testRegisterTerm() throws InterruptedException { public void testRegisterTerm() throws InterruptedException {
String collection = "registerTerm"; String collection = "registerTerm";
ZkShardTerms rep1Terms = new ZkShardTerms(collection, "shard1", cluster.getZkClient()); ZkShardTerms rep1Terms = new ZkShardTerms(collection, "shard1", cluster.getZkClient());