SOLR-3180: simplify core reference counting by just using the same core for the full duration of recovery for now

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1296854 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Yonik Seeley 2012-03-04 18:51:45 +00:00
parent 54b79934ea
commit 36126c9b1f
2 changed files with 54 additions and 58 deletions

View File

@ -198,36 +198,35 @@ public class RecoveryStrategy extends Thread implements SafeStopThread {
}
// set request info for logging
SolrQueryRequest req = new LocalSolrQueryRequest(core, new ModifiableSolrParams());
SolrQueryResponse rsp = new SolrQueryResponse();
SolrRequestInfo.setRequestInfo(new SolrRequestInfo(req, rsp));
log.info("Starting recovery process. recoveringAfterStartup=" + recoveringAfterStartup);
try {
SolrQueryRequest req = new LocalSolrQueryRequest(core, new ModifiableSolrParams());
SolrQueryResponse rsp = new SolrQueryResponse();
SolrRequestInfo.setRequestInfo(new SolrRequestInfo(req, rsp));
log.info("Starting recovery process. recoveringAfterStartup=" + recoveringAfterStartup);
doRecovery(core);
} finally {
if (core != null) core.close();
SolrRequestInfo.clearRequestInfo();
}
}
// TODO: perhaps make this grab a new core each time through the loop to handle core reloads?
public void doRecovery(SolrCore core) {
boolean replayed = false;
boolean successfulRecovery = false;
UpdateLog ulog;
try {
ulog = core.getUpdateHandler().getUpdateLog();
if (ulog == null) {
SolrException.log(log, "No UpdateLog found - cannot recover");
recoveryFailed(core, zkController, baseUrl, coreZkNodeName,
core.getCoreDescriptor());
return;
}
} finally {
core.close();
ulog = core.getUpdateHandler().getUpdateLog();
if (ulog == null) {
SolrException.log(log, "No UpdateLog found - cannot recover");
recoveryFailed(core, zkController, baseUrl, coreZkNodeName,
core.getCoreDescriptor());
return;
}
List<Long> startingRecentVersions;
UpdateLog.RecentUpdates startingRecentUpdates = ulog.getRecentUpdates();
try {
@ -254,7 +253,7 @@ public class RecoveryStrategy extends Thread implements SafeStopThread {
log.info("###### startupVersions=" + reallyStartingVersions);
}
if (recoveringAfterStartup) {
// if we're recovering after startup (i.e. we have been down), then we need to know what the last versions were
// when we went down.
@ -264,28 +263,23 @@ public class RecoveryStrategy extends Thread implements SafeStopThread {
boolean firstTime = true;
while (!successfulRecovery && !close && !isInterrupted()) { // don't use interruption or it will close channels though
core = cc.getCore(coreName);
if (core == null) {
SolrException.log(log, "SolrCore not found - cannot recover:" + coreName);
return;
}
try {
// first thing we just try to sync
zkController.publish(core.getCoreDescriptor(), ZkStateReader.RECOVERING);
CloudDescriptor cloudDesc = core.getCoreDescriptor()
.getCloudDescriptor();
ZkNodeProps leaderprops = zkStateReader.getLeaderProps(
cloudDesc.getCollectionName(), cloudDesc.getShardId());
String leaderBaseUrl = leaderprops.get(ZkStateReader.BASE_URL_PROP);
String leaderCoreName = leaderprops.get(ZkStateReader.CORE_NAME_PROP);
String leaderUrl = ZkCoreNodeProps.getCoreUrl(leaderBaseUrl, leaderCoreName);
String leaderUrl = ZkCoreNodeProps.getCoreUrl(leaderBaseUrl, leaderCoreName);
sendPrepRecoveryCmd(leaderBaseUrl, leaderCoreName);
// first thing we just try to sync
if (firstTime) {
firstTime = false; // only try sync the first time through the loop
@ -304,7 +298,7 @@ public class RecoveryStrategy extends Thread implements SafeStopThread {
// System.out
// .println("Sync Recovery was successful - registering as Active "
// + zkController.getNodeName());
// solrcloud_debug
// try {
// RefCounted<SolrIndexSearcher> searchHolder =
@ -320,7 +314,7 @@ public class RecoveryStrategy extends Thread implements SafeStopThread {
// } catch (Exception e) {
//
// }
// sync success - register as active and return
zkController.publishAsActive(baseUrl, core.getCoreDescriptor(),
coreZkNodeName, coreName);
@ -328,23 +322,23 @@ public class RecoveryStrategy extends Thread implements SafeStopThread {
close = true;
return;
}
log.info("Sync Recovery was not successful - trying replication");
}
//System.out.println("Sync Recovery was not successful - trying replication");
log.info("Begin buffering updates");
ulog.bufferUpdates();
replayed = false;
try {
replicate(zkController.getNodeName(), core,
leaderprops, leaderUrl);
replay(ulog);
replayed = true;
log.info("Recovery was successful - registering as Active");
// if there are pending recovery requests, don't advert as active
zkController.publishAsActive(baseUrl, core.getCoreDescriptor(),
@ -367,42 +361,34 @@ public class RecoveryStrategy extends Thread implements SafeStopThread {
}
}
} catch (Throwable t) {
log.error("Error while trying to recover... closing core.", t);
} finally {
core.close();
log.error("Error while trying to recover.", t);
}
if (!successfulRecovery) {
// lets pause for a moment and we need to try again...
// TODO: we don't want to retry for some problems?
// Or do a fall off retry...
try {
log.error("Recovery failed - trying again...");
retries++;
if (retries >= MAX_RETRIES) {
if (retries == INTERRUPTED) {
} else {
log.error("Recovery failed - max retries exceeded.");
// TODO: for now, give up after X tries - should we do more?
core = cc.getCore(coreName);
try {
recoveryFailed(core, zkController, baseUrl, coreZkNodeName,
core.getCoreDescriptor());
} finally {
core.close();
}
recoveryFailed(core, zkController, baseUrl, coreZkNodeName,
core.getCoreDescriptor());
}
break;
}
} catch (Exception e) {
SolrException.log(log, "", e);
}
try {
Thread.sleep(Math.min(START_TIMEOUT * retries, 60000));
} catch (InterruptedException e) {

View File

@ -74,10 +74,20 @@ public final class DefaultSolrCoreState extends SolrCoreState {
} else if (indexWriter != null) {
indexWriter.close();
}
} catch (Throwable t) {
SolrException.log(log, t);
} catch (Throwable t) {
log.error("Error during shutdown of writer.", t);
}
directoryFactory.close();
try {
directoryFactory.close();
} catch (Throwable t) {
log.error("Error during shutdown of directory factory.", t);
}
try {
cancelRecovery();
} catch (Throwable t) {
log.error("Error cancelling recovery", t);
}
closed = true;
}
}