SOLR-11794: Restart replicate from ledaer on core reload on PULL replicas

This commit is contained in:
Tomas Fernandez Lobbe 2018-01-15 16:15:14 -08:00
parent a08f71279c
commit 1c4b417c50
3 changed files with 51 additions and 32 deletions

View File

@ -104,6 +104,8 @@ Bug Fixes
* SOLR-11839: Fix test failures resulting from SOLR-11218 (Erick Erickson)
* SOLR-11794: PULL replicas stop replicating after collection RELOAD (Samuel Tatipamula, Tomás Fernández Löbbe)
Optimizations
----------------------

View File

@ -1300,6 +1300,9 @@ public class CoreContainer {
getZkController().startReplicationFromLeader(newCore.getName(), true);
}
} else if(replica.getType() == Replica.Type.PULL) {
getZkController().stopReplicationFromLeader(core.getName());
getZkController().startReplicationFromLeader(newCore.getName(), false);
}
}
} catch (SolrCoreState.CoreIsClosedException e) {

View File

@ -214,45 +214,59 @@ public class TestPullReplica extends SolrCloudTestCase {
@SuppressWarnings("unchecked")
public void testAddDocs() throws Exception {
int numReadOnlyReplicas = 1 + random().nextInt(3);
CollectionAdminRequest.createCollection(collectionName, "conf", 1, 1, 0, numReadOnlyReplicas)
int numPullReplicas = 1 + random().nextInt(3);
CollectionAdminRequest.createCollection(collectionName, "conf", 1, 1, 0, numPullReplicas)
.setMaxShardsPerNode(100)
.process(cluster.getSolrClient());
waitForState("Expected collection to be created with 1 shard and " + (numReadOnlyReplicas + 1) + " replicas", collectionName, clusterShape(1, numReadOnlyReplicas + 1));
DocCollection docCollection = assertNumberOfReplicas(1, 0, numReadOnlyReplicas, false, true);
waitForState("Expected collection to be created with 1 shard and " + (numPullReplicas + 1) + " replicas", collectionName, clusterShape(1, numPullReplicas + 1));
DocCollection docCollection = assertNumberOfReplicas(1, 0, numPullReplicas, false, true);
assertEquals(1, docCollection.getSlices().size());
cluster.getSolrClient().add(collectionName, new SolrInputDocument("id", "1", "foo", "bar"));
cluster.getSolrClient().commit(collectionName);
Slice s = docCollection.getSlices().iterator().next();
try (HttpSolrClient leaderClient = getHttpSolrClient(s.getLeader().getCoreUrl())) {
assertEquals(1, leaderClient.query(new SolrQuery("*:*")).getResults().getNumFound());
}
TimeOut t = new TimeOut(REPLICATION_TIMEOUT_SECS, TimeUnit.SECONDS, TimeSource.NANO_TIME);
for (Replica r:s.getReplicas(EnumSet.of(Replica.Type.PULL))) {
//TODO: assert replication < REPLICATION_TIMEOUT_SECS
try (HttpSolrClient readOnlyReplicaClient = getHttpSolrClient(r.getCoreUrl())) {
while (true) {
try {
assertEquals("Replica " + r.getName() + " not up to date after 10 seconds",
1, readOnlyReplicaClient.query(new SolrQuery("*:*")).getResults().getNumFound());
break;
} catch (AssertionError e) {
if (t.hasTimedOut()) {
throw e;
} else {
Thread.sleep(100);
boolean reloaded = false;
int numDocs = 0;
while (true) {
numDocs++;
cluster.getSolrClient().add(collectionName, new SolrInputDocument("id", String.valueOf(numDocs), "foo", "bar"));
cluster.getSolrClient().commit(collectionName);
Slice s = docCollection.getSlices().iterator().next();
try (HttpSolrClient leaderClient = getHttpSolrClient(s.getLeader().getCoreUrl())) {
assertEquals(numDocs, leaderClient.query(new SolrQuery("*:*")).getResults().getNumFound());
}
TimeOut t = new TimeOut(REPLICATION_TIMEOUT_SECS, TimeUnit.SECONDS, TimeSource.NANO_TIME);
for (Replica r:s.getReplicas(EnumSet.of(Replica.Type.PULL))) {
//TODO: assert replication < REPLICATION_TIMEOUT_SECS
try (HttpSolrClient pullReplicaClient = getHttpSolrClient(r.getCoreUrl())) {
while (true) {
try {
assertEquals("Replica " + r.getName() + " not up to date after 10 seconds",
numDocs, pullReplicaClient.query(new SolrQuery("*:*")).getResults().getNumFound());
break;
} catch (AssertionError e) {
if (t.hasTimedOut()) {
throw e;
} else {
Thread.sleep(100);
}
}
}
SolrQuery req = new SolrQuery(
"qt", "/admin/plugins",
"stats", "true");
QueryResponse statsResponse = pullReplicaClient.query(req);
assertEquals("Replicas shouldn't process the add document request: " + statsResponse,
0L, ((Map<String, Object>)((NamedList<Object>)statsResponse.getResponse()).findRecursive("plugins", "UPDATE", "updateHandler", "stats")).get("UPDATE.updateHandler.adds"));
}
SolrQuery req = new SolrQuery(
"qt", "/admin/plugins",
"stats", "true");
QueryResponse statsResponse = readOnlyReplicaClient.query(req);
assertEquals("Replicas shouldn't process the add document request: " + statsResponse,
0L, ((Map<String, Object>)((NamedList<Object>)statsResponse.getResponse()).findRecursive("plugins", "UPDATE", "updateHandler", "stats")).get("UPDATE.updateHandler.adds"));
}
if (reloaded) {
break;
} else {
// reload
CollectionAdminResponse response = CollectionAdminRequest.reloadCollection(collectionName)
.process(cluster.getSolrClient());
assertEquals(0, response.getStatus());
reloaded = true;
}
}
assertUlogPresence(docCollection);