From 2f8b3ea63449a9c4bce62ea38e0d501543f54ac9 Mon Sep 17 00:00:00 2001 From: Houston Putman Date: Fri, 1 Nov 2019 10:34:53 -0400 Subject: [PATCH] SOLR-13844: Remove replica recovery terms with the replica term (#951) --- solr/CHANGES.txt | 2 ++ .../org/apache/solr/cloud/ZkShardTerms.java | 33 ++++++++++++------- .../apache/solr/cloud/ZkShardTermsTest.java | 22 +++++++++++++ 3 files changed, 45 insertions(+), 12 deletions(-) diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt index 1d2b0eb128d..cb8029e0e56 100644 --- a/solr/CHANGES.txt +++ b/solr/CHANGES.txt @@ -47,6 +47,8 @@ Improvements * SOLR-13865: Move replica routing code to SolrJ. (Houston Putman via Tomas Fernandez-Lobbe) +* SOLR-13844: Remove replica recovery terms with the replica term (Houston Putman via Cao Manh Dat) + Optimizations --------------------- diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java b/solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java index 206ccd28bf4..2c97164bad5 100644 --- a/solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java +++ b/solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java @@ -74,6 +74,8 @@ public class ZkShardTerms implements AutoCloseable{ private final Set listeners = new HashSet<>(); private final AtomicBoolean isClosed = new AtomicBoolean(false); + private static final String RECOVERING_TERM_SUFFIX = "_recovering"; + private Terms terms; // Listener of a core for shard's term change events @@ -239,7 +241,11 @@ public class ZkShardTerms implements AutoCloseable{ } public boolean isRecovering(String name) { - return terms.values.containsKey(name + "_recovering"); + return terms.values.containsKey(recoveringTerm(name)); + } + + public static String recoveringTerm(String coreNodeName) { + return coreNodeName + RECOVERING_TERM_SUFFIX; } @@ -448,7 +454,7 @@ public class ZkShardTerms implements AutoCloseable{ * @return true if {@code coreNodeName} can become leader, false if otherwise */ boolean canBecomeLeader(String coreNodeName) { - return haveHighestTermValue(coreNodeName) && !values.containsKey(coreNodeName + "_recovering"); + return haveHighestTermValue(coreNodeName) && !values.containsKey(recoveringTerm(coreNodeName)); } /** @@ -501,9 +507,8 @@ public class ZkShardTerms implements AutoCloseable{ } private boolean skipIncreaseTermOf(String key, Set replicasNeedingRecovery) { - if (key.endsWith("_recovering")) { - key = key.substring(0, key.length() - "_recovering".length()); - return replicasNeedingRecovery.contains(key); + if (key.endsWith(RECOVERING_TERM_SUFFIX)) { + key = key.substring(0, key.length() - RECOVERING_TERM_SUFFIX.length()); } return replicasNeedingRecovery.contains(key); } @@ -524,15 +529,19 @@ public class ZkShardTerms implements AutoCloseable{ } /** - * Return a new {@link Terms} in which term of {@code coreNodeName} is removed + * Return a new {@link Terms} in which terms for the {@code coreNodeName} are removed * @param coreNodeName of the replica * @return null if term of {@code coreNodeName} is already not exist */ Terms removeTerm(String coreNodeName) { - if (!values.containsKey(coreNodeName)) return null; + if (!values.containsKey(recoveringTerm(coreNodeName)) && !values.containsKey(coreNodeName)) { + return null; + } HashMap newValues = new HashMap<>(values); newValues.remove(coreNodeName); + newValues.remove(recoveringTerm(coreNodeName)); + return new Terms(newValues, version); } @@ -569,7 +578,7 @@ public class ZkShardTerms implements AutoCloseable{ HashMap newValues = new HashMap<>(values); newValues.put(coreNodeName, maxTerm); - newValues.remove(coreNodeName+"_recovering"); + newValues.remove(recoveringTerm(coreNodeName)); return new Terms(newValues, version); } @@ -588,10 +597,10 @@ public class ZkShardTerms implements AutoCloseable{ return null; HashMap newValues = new HashMap<>(values); - if (!newValues.containsKey(coreNodeName+"_recovering")) { + if (!newValues.containsKey(recoveringTerm(coreNodeName))) { long currentTerm = newValues.getOrDefault(coreNodeName, 0L); // by keeping old term, we will have more information in leader election - newValues.put(coreNodeName+"_recovering", currentTerm); + newValues.put(recoveringTerm(coreNodeName), currentTerm); } newValues.put(coreNodeName, maxTerm); return new Terms(newValues, version); @@ -603,12 +612,12 @@ public class ZkShardTerms implements AutoCloseable{ * @return null if term of {@code coreNodeName} is already finished doing recovering */ Terms doneRecovering(String coreNodeName) { - if (!values.containsKey(coreNodeName+"_recovering")) { + if (!values.containsKey(recoveringTerm(coreNodeName))) { return null; } HashMap newValues = new HashMap<>(values); - newValues.remove(coreNodeName+"_recovering"); + newValues.remove(recoveringTerm(coreNodeName)); return new Terms(newValues, version); } diff --git a/solr/core/src/test/org/apache/solr/cloud/ZkShardTermsTest.java b/solr/core/src/test/org/apache/solr/cloud/ZkShardTermsTest.java index 5c9fea0a7e1..c006ed5f965 100644 --- a/solr/core/src/test/org/apache/solr/cloud/ZkShardTermsTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/ZkShardTermsTest.java @@ -129,6 +129,28 @@ public class ZkShardTermsTest extends SolrCloudTestCase { } } + @Test + public void testCoreRemovalWhileRecovering() { + String collection = "recoveringFlag"; + try (ZkShardTerms zkShardTerms = new ZkShardTerms(collection, "shard1", cluster.getZkClient())) { + // List all possible orders of ensureTermIsHigher, startRecovering, doneRecovering + zkShardTerms.registerTerm("replica1_rem"); + zkShardTerms.registerTerm("replica2_rem"); + + // normal case when leader failed to send an update to replica + zkShardTerms.ensureTermsIsHigher("replica1_rem", Collections.singleton("replica2_rem")); + zkShardTerms.startRecovering("replica2_rem"); + assertEquals(zkShardTerms.getTerm("replica2_rem"), 1); + assertEquals(zkShardTerms.getTerm("replica2_rem_recovering"), 0); + + // Remove core, and check if the correct core was removed as well as the recovering term for that core + zkShardTerms.removeTerm("replica2_rem"); + assertEquals(zkShardTerms.getTerm("replica1_rem"), 1); + assertEquals(zkShardTerms.getTerm("replica2_rem"), -1); + assertEquals(zkShardTerms.getTerm("replica2_rem_recovering"), -1); + } + } + public void testRegisterTerm() throws InterruptedException { String collection = "registerTerm"; ZkShardTerms rep1Terms = new ZkShardTerms(collection, "shard1", cluster.getZkClient());