SOLR-3126: We should try to do a quick sync on std start up recovery before trying to do a full blown replication.

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1244177 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Mark Robert Miller 2012-02-14 19:15:54 +00:00
parent 82a81271f0
commit 0a02750074
2 changed files with 111 additions and 50 deletions

View File

@ -18,6 +18,7 @@ package org.apache.solr.cloud;
*/
import java.io.IOException;
import java.util.Collections;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeoutException;
@ -35,15 +36,19 @@ import org.apache.solr.core.CoreDescriptor;
import org.apache.solr.core.RequestHandlers.LazyRequestHandlerWrapper;
import org.apache.solr.core.SolrCore;
import org.apache.solr.handler.ReplicationHandler;
import org.apache.solr.request.LocalSolrQueryRequest;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.request.SolrRequestHandler;
import org.apache.solr.update.CommitUpdateCommand;
import org.apache.solr.update.PeerSync;
import org.apache.solr.update.UpdateLog;
import org.apache.solr.update.UpdateLog.RecoveryInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class RecoveryStrategy extends Thread {
private static final int MAX_RETRIES = 100;
private static final int INTERRUPTED = 101;
private static final int MAX_RETRIES = 500;
private static final int INTERRUPTED = MAX_RETRIES + 1;
private static final int START_TIMEOUT = 100;
private static final String REPLICATION_HANDLER = "/replication";
@ -86,7 +91,7 @@ public class RecoveryStrategy extends Thread {
close = true;
}
private void replicate(String nodeName, SolrCore core, String shardZkNodeName, ZkNodeProps leaderprops, String baseUrl)
private void replicate(String nodeName, SolrCore core, ZkNodeProps leaderprops, String baseUrl)
throws SolrServerException, IOException {
// start buffer updates to tran log
// and do recovery - either replay via realtime get (eventually)
@ -97,7 +102,7 @@ public class RecoveryStrategy extends Thread {
String leaderUrl = leaderCNodeProps.getCoreUrl();
String leaderCoreName = leaderCNodeProps.getCoreName();
log.info("Attempt to replicate from " + leaderUrl);
log.info("Attempting to replicate from " + leaderUrl);
// if we are the leader, either we are trying to recover faster
// then our ephemeral timed out or we are the only node
@ -109,7 +114,7 @@ public class RecoveryStrategy extends Thread {
PrepRecovery prepCmd = new PrepRecovery();
prepCmd.setCoreName(leaderCoreName);
prepCmd.setNodeName(nodeName);
prepCmd.setCoreNodeName(shardZkNodeName);
prepCmd.setCoreNodeName(coreZkNodeName);
server.request(prepCmd);
server.shutdown();
@ -158,46 +163,72 @@ public class RecoveryStrategy extends Thread {
boolean succesfulRecovery = false;
while (!succesfulRecovery && !close && !isInterrupted()) {
UpdateLog ulog = core.getUpdateHandler().getUpdateLog();
if (ulog == null) return;
ulog.bufferUpdates();
replayed = false;
CloudDescriptor cloudDesc = core.getCoreDescriptor().getCloudDescriptor();
try {
// first thing we just try to sync
zkController.publish(core, ZkStateReader.RECOVERING);
CloudDescriptor cloudDesc = core.getCoreDescriptor()
.getCloudDescriptor();
ZkNodeProps leaderprops = null;
ZkNodeProps leaderprops = zkStateReader.getLeaderProps(
leaderprops = zkStateReader.getLeaderProps(
cloudDesc.getCollectionName(), cloudDesc.getShardId());
// System.out.println("recover " + shardZkNodeName + " against " +
// leaderprops);
replicate(zkController.getNodeName(), core, coreZkNodeName,
leaderprops, ZkCoreNodeProps.getCoreUrl(baseUrl, coreName));
String leaderUrl = ZkCoreNodeProps.getCoreUrl(leaderprops.get(ZkStateReader.BASE_URL_PROP), leaderprops.get(ZkStateReader.CORE_NAME_PROP));
replay(ulog);
replayed = true;
log.info("Attempting to PeerSync from " + leaderUrl);
PeerSync peerSync = new PeerSync(core,
Collections.singletonList(leaderUrl), 100);
boolean syncSuccess = peerSync.sync();
if (syncSuccess) {
SolrQueryRequest req = new LocalSolrQueryRequest(core,
new ModifiableSolrParams());
core.getUpdateHandler().commit(new CommitUpdateCommand(req, false));
log.info("Sync Recovery was succesful - registering as Active");
// sync success - register as active and return
zkController.publishAsActive(baseUrl, core.getCoreDescriptor(),
coreZkNodeName, coreName);
return;
}
log.info("Sync Recovery was not successful - trying replication");
UpdateLog ulog = core.getUpdateHandler().getUpdateLog();
if (ulog == null) return;
// if there are pending recovery requests, don't advert as active
zkController.publishAsActive(baseUrl, core.getCoreDescriptor(), coreZkNodeName,
coreName);
ulog.bufferUpdates();
replayed = false;
succesfulRecovery = true;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.warn("Recovery was interrupted", e);
retries = INTERRUPTED;
} catch (Throwable t) {
SolrException.log(log, "Error while trying to recover", t);
} finally {
if (!replayed) {
try {
ulog.dropBufferedUpdates();
} catch (Throwable t) {
SolrException.log(log, "", t);
try {
replicate(zkController.getNodeName(), core,
leaderprops, leaderUrl);
replay(ulog);
replayed = true;
log.info("Recovery was succesful - registering as Active");
// if there are pending recovery requests, don't advert as active
zkController.publishAsActive(baseUrl, core.getCoreDescriptor(),
coreZkNodeName, coreName);
succesfulRecovery = true;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.warn("Recovery was interrupted", e);
retries = INTERRUPTED;
} catch (Throwable t) {
SolrException.log(log, "Error while trying to recover", t);
} finally {
if (!replayed) {
try {
ulog.dropBufferedUpdates();
} catch (Throwable t) {
SolrException.log(log, "", t);
}
}
}
} catch (Throwable t) {
SolrException.log(log, "Error while trying to recover", t);
}
if (!succesfulRecovery) {
@ -205,14 +236,14 @@ public class RecoveryStrategy extends Thread {
// TODO: we don't want to retry for some problems?
// Or do a fall off retry...
try {
SolrException.log(log, "Recovery failed - trying again...");
retries++;
if (retries >= MAX_RETRIES) {
if (retries == INTERRUPTED) {
} else {
// TODO: for now, give up after 10 tries - should we do more?
// TODO: for now, give up after X tries - should we do more?
recoveryFailed(core, zkController, baseUrl, coreZkNodeName,
core.getCoreDescriptor());
}

View File

@ -698,8 +698,14 @@ public class FullSolrCloudTest extends AbstractDistributedZkTestCase {
private void brindDownShardIndexSomeDocsAndRecover() throws Exception,
SolrServerException, IOException, InterruptedException {
SolrQuery query = new SolrQuery("*:*");
query.set("distrib", false);
commit();
long deadShardCount = shardToClient.get(SHARD2).get(0).query(query).getResults().getNumFound();
System.out.println("dsc:" + deadShardCount);
query("q", "*:*", "sort", "n_tl1 desc");
// kill a shard
@ -715,7 +721,6 @@ public class FullSolrCloudTest extends AbstractDistributedZkTestCase {
// ensure shard is dead
try {
// TODO: ignore fail
index_specific(shardToClient.get(SHARD2).get(0), id, 999, i1, 107, t1,
"specific doc!");
fail("This server should be down and this update should have failed");
@ -743,6 +748,7 @@ public class FullSolrCloudTest extends AbstractDistributedZkTestCase {
Thread.sleep(1000);
}
long numFound1 = cloudClient.query(new SolrQuery("*:*")).getResults().getNumFound();
index_specific(shardToClient.get(SHARD2).get(1), id, 1000, i1, 108, t1,
"specific doc!");
@ -755,8 +761,10 @@ public class FullSolrCloudTest extends AbstractDistributedZkTestCase {
// try adding a doc with CloudSolrServer
cloudClient.setDefaultCollection(DEFAULT_COLLECTION);
SolrQuery query = new SolrQuery("*:*");
long numFound1 = cloudClient.query(query).getResults().getNumFound();
long numFound2 = cloudClient.query(new SolrQuery("*:*")).getResults().getNumFound();
assertEquals(numFound1 + 1, numFound2);
SolrInputDocument doc = new SolrInputDocument();
doc.addField("id", 1001);
@ -772,10 +780,10 @@ public class FullSolrCloudTest extends AbstractDistributedZkTestCase {
query("q", "*:*", "sort", "n_tl1 desc");
long numFound2 = cloudClient.query(query).getResults().getNumFound();
long numFound3 = cloudClient.query(new SolrQuery("*:*")).getResults().getNumFound();
// lets just check that the one doc since last commit made it in...
assertEquals(numFound1 + 1, numFound2);
assertEquals(numFound2 + 1, numFound3);
// test debugging
testDebugQueries();
@ -786,7 +794,9 @@ public class FullSolrCloudTest extends AbstractDistributedZkTestCase {
for (SolrServer client : clients) {
try {
System.out.println(client.query(new SolrQuery("*:*")).getResults()
SolrQuery q = new SolrQuery("*:*");
q.set("distrib", false);
System.out.println(client.query(q).getResults()
.getNumFound());
} catch (Exception e) {
@ -798,21 +808,41 @@ public class FullSolrCloudTest extends AbstractDistributedZkTestCase {
// query("q","matchesnothing","fl","*,score", "debugQuery", "true");
// this should trigger a recovery phase on deadShard
deadShard.start(true);
// make sure we have published we are recoverying
// make sure we have published we are recovering
Thread.sleep(1500);
waitForRecoveriesToFinish(false);
List<SolrServer> s2c = shardToClient.get(SHARD2);
deadShardCount = shardToClient.get(SHARD2).get(0).query(query).getResults().getNumFound();
// if we properly recovered, we should now have the couple missing docs that
// came in while shard was down
assertEquals(s2c.get(0).query(new SolrQuery("*:*")).getResults()
.getNumFound(), s2c.get(1).query(new SolrQuery("*:*")).getResults()
.getNumFound());
checkShardConsistency(true, false);
// recover over 100 docs so we do more than just peer sync (replicate recovery)
deadShard = chaosMonkey.stopShard(SHARD2, 0);
for (int i = 0; i < 226; i++) {
doc = new SolrInputDocument();
doc.addField("id", 2000 + i);
controlClient.add(doc);
ureq = new UpdateRequest();
ureq.add(doc);
// ureq.setParam("update.chain", DISTRIB_UPDATE_CHAIN);
ureq.process(cloudClient);
}
commit();
deadShard.start(true);
// make sure we have published we are recovering
Thread.sleep(1500);
waitForRecoveriesToFinish(false);
checkShardConsistency(true, false);
}
private void testDebugQueries() throws Exception {