SOLR-12433: Recovering flag of a replica is set equals to leader even it failed to receive update on recovering

This commit is contained in:
Cao Manh Dat 2018-06-01 09:23:10 +07:00
parent 252a8145d9
commit 1d33130fcb
3 changed files with 66 additions and 5 deletions

View File

@ -287,6 +287,9 @@ Bug Fixes
* SOLR-12271: Fixed bug in how Analytics component reads negative values from float and double fields. (Houston Putman)
* SOLR-12433: Recovering flag of a replica is set equals to leader even it failed to receive update
on recovering. (Cao Manh Dat)
Optimizations
----------------------

View File

@ -487,13 +487,13 @@ public class ZkShardTerms implements AutoCloseable{
HashMap<String, Long> newValues = new HashMap<>(values);
long leaderTerm = newValues.get(leader);
for (String replica : newValues.keySet()) {
if (replicasNeedingRecovery.contains(replica)) foundReplicasInLowerTerms = true;
if (Objects.equals(newValues.get(replica), leaderTerm)) {
if(replicasNeedingRecovery.contains(replica)) {
for (String key : newValues.keySet()) {
if (replicasNeedingRecovery.contains(key)) foundReplicasInLowerTerms = true;
if (Objects.equals(newValues.get(key), leaderTerm)) {
if(skipIncreaseTermOf(key, replicasNeedingRecovery)) {
changed = true;
} else {
newValues.put(replica, leaderTerm+1);
newValues.put(key, leaderTerm+1);
}
}
}
@ -504,6 +504,14 @@ public class ZkShardTerms implements AutoCloseable{
return new Terms(newValues, version);
}
private boolean skipIncreaseTermOf(String key, Set<String> replicasNeedingRecovery) {
if (key.endsWith("_recovering")) {
key = key.substring(0, key.length() - "_recovering".length());
return replicasNeedingRecovery.contains(key);
}
return replicasNeedingRecovery.contains(key);
}
/**
* Return a new {@link Terms} in which highest terms are not zero
* @return null if highest terms are already larger than zero

View File

@ -75,6 +75,56 @@ public class ZkShardTermsTest extends SolrCloudTestCase {
}
}
@Test
public void testRecoveringFlag() {
String collection = "recoveringFlag";
try (ZkShardTerms zkShardTerms = new ZkShardTerms(collection, "shard1", cluster.getZkClient())) {
// List all possible orders of ensureTermIsHigher, startRecovering, doneRecovering
zkShardTerms.registerTerm("replica1");
zkShardTerms.registerTerm("replica2");
zkShardTerms.ensureTermsIsHigher("replica1", Collections.singleton("replica2"));
zkShardTerms.startRecovering("replica2");
assertEquals(zkShardTerms.getTerm("replica2"), 1);
assertEquals(zkShardTerms.getTerm("replica2_recovering"), 1);
zkShardTerms.doneRecovering("replica2");
assertEquals(zkShardTerms.getTerm("replica1"), 1);
assertEquals(zkShardTerms.getTerm("replica2"), 1);
assertEquals(zkShardTerms.getTerm("replica2_recovering"), -1);
zkShardTerms.ensureTermsIsHigher("replica1", Collections.singleton("replica2"));
assertEquals(zkShardTerms.getTerm("replica1"), 2);
assertEquals(zkShardTerms.getTerm("replica2"), 1);
assertEquals(zkShardTerms.getTerm("replica2_recovering"), -1);
zkShardTerms.startRecovering("replica2");
assertEquals(zkShardTerms.getTerm("replica2"), 2);
assertEquals(zkShardTerms.getTerm("replica2_recovering"), 2);
zkShardTerms.ensureTermsIsHigher("replica1", Collections.singleton("replica2"));
assertEquals(zkShardTerms.getTerm("replica1"), 3);
assertEquals(zkShardTerms.getTerm("replica2"), 2);
assertEquals(zkShardTerms.getTerm("replica2_recovering"), 2);
zkShardTerms.doneRecovering("replica2");
assertEquals(zkShardTerms.getTerm("replica2"), 2);
assertEquals(zkShardTerms.getTerm("replica2_recovering"), -1);
zkShardTerms.startRecovering("replica2");
zkShardTerms.doneRecovering("replica2");
zkShardTerms.ensureTermsIsHigher("replica1", Collections.singleton("replica2"));
zkShardTerms.startRecovering("replica2");
zkShardTerms.ensureTermsIsHigher("replica1", Collections.singleton("replica2"));
zkShardTerms.startRecovering("replica2");
assertEquals(zkShardTerms.getTerm("replica1"), 5);
assertEquals(zkShardTerms.getTerm("replica2"), 5);
assertEquals(zkShardTerms.getTerm("replica2_recovering"), 5);
zkShardTerms.doneRecovering("replica2");
assertEquals(zkShardTerms.getTerm("replica2_recovering"), -1);
}
}
public void testRegisterTerm() throws InterruptedException {
String collection = "registerTerm";
ZkShardTerms rep1Terms = new ZkShardTerms(collection, "shard1", cluster.getZkClient());