SOLR-3153: When a leader goes down he should ask replicas to sync in parallel rather than serially.

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1292652 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Mark Robert Miller 2012-02-23 05:22:31 +00:00
parent 9f2cec662f
commit 5713824147
11 changed files with 266 additions and 137 deletions

View File

@ -172,7 +172,7 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
zkClient.delete(leaderSeqPath, -1, true);
core.getUpdateHandler().getSolrCoreState().doRecovery(core);
core.getUpdateHandler().getSolrCoreState().doRecovery(cc, core.getName());
leaderElector.joinElection(this, null); // don't pass core, pass null
}

View File

@ -37,6 +37,7 @@ import org.apache.solr.common.cloud.ZkCoreNodeProps;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.core.CoreDescriptor;
import org.apache.solr.core.RequestHandlers.LazyRequestHandlerWrapper;
import org.apache.solr.core.SolrCore;
@ -69,14 +70,14 @@ public class RecoveryStrategy extends Thread implements SafeStopThread {
private ZkStateReader zkStateReader;
private volatile String coreName;
private int retries;
private SolrCore core;
private boolean recoveringAfterStartup;
private CoreContainer cc;
public RecoveryStrategy(SolrCore core) {
this.core = core;
this.coreName = core.getName();
public RecoveryStrategy(CoreContainer cc, String name) {
this.cc = cc;
this.coreName = name;
setName("RecoveryThread");
zkController = core.getCoreDescriptor().getCoreContainer().getZkController();
zkController = cc.getZkController();
zkStateReader = zkController.getZkStateReader();
baseUrl = zkController.getBaseUrl();
coreZkNodeName = zkController.getNodeName() + "_" + coreName;
@ -190,14 +191,24 @@ public class RecoveryStrategy extends Thread implements SafeStopThread {
public void run() {
boolean replayed = false;
boolean succesfulRecovery = false;
UpdateLog ulog = core.getUpdateHandler().getUpdateLog();
if (ulog == null) {
SolrException.log(log, "No UpdateLog found - cannot recover");
recoveryFailed(core, zkController, baseUrl, coreZkNodeName,
core.getCoreDescriptor());
SolrCore core = cc.getCore(coreName);
if (core == null) {
SolrException.log(log, "SolrCore not found - cannot recover:" + coreName);
return;
}
UpdateLog ulog;
try {
ulog = core.getUpdateHandler().getUpdateLog();
if (ulog == null) {
SolrException.log(log, "No UpdateLog found - cannot recover");
recoveryFailed(core, zkController, baseUrl, coreZkNodeName,
core.getCoreDescriptor());
return;
}
} finally {
core.close();
}
List<Long> startingRecentVersions;
UpdateLog.RecentUpdates startingRecentUpdates = ulog.getRecentUpdates();
@ -235,6 +246,11 @@ public class RecoveryStrategy extends Thread implements SafeStopThread {
boolean firstTime = true;
while (!succesfulRecovery && !close && !isInterrupted()) { // don't use interruption or it will close channels though
core = cc.getCore(coreName);
if (core == null) {
SolrException.log(log, "SolrCore not found - cannot recover:" + coreName);
return;
}
try {
// first thing we just try to sync
zkController.publish(core.getCoreDescriptor(), ZkStateReader.RECOVERING);
@ -331,11 +347,15 @@ public class RecoveryStrategy extends Thread implements SafeStopThread {
SolrException.log(log, "", t);
}
}
}
} catch (Throwable t) {
SolrException.log(log, "Error while trying to recover", t);
} finally {
if (core != null) {
core.close();
}
}
if (!succesfulRecovery) {
@ -351,8 +371,15 @@ public class RecoveryStrategy extends Thread implements SafeStopThread {
} else {
// TODO: for now, give up after X tries - should we do more?
recoveryFailed(core, zkController, baseUrl, coreZkNodeName,
core.getCoreDescriptor());
core = cc.getCore(coreName);
try {
recoveryFailed(core, zkController, baseUrl, coreZkNodeName,
core.getCoreDescriptor());
} finally {
if (core != null) {
core.close();
}
}
}
break;
}
@ -369,6 +396,7 @@ public class RecoveryStrategy extends Thread implements SafeStopThread {
retries = INTERRUPTED;
}
}
log.info("Finished recovery process");

View File

@ -23,10 +23,13 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.commons.httpclient.DefaultHttpMethodRetryHandler;
import org.apache.commons.httpclient.HttpClient;
import org.apache.commons.httpclient.MultiThreadedHttpConnectionManager;
import org.apache.commons.httpclient.params.HttpMethodParams;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.impl.CommonsHttpSolrServer;
import org.apache.solr.client.solrj.request.CoreAdminRequest.RequestRecovery;
import org.apache.solr.client.solrj.request.QueryRequest;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.CloudState;
import org.apache.solr.common.cloud.Slice;
@ -37,12 +40,42 @@ import org.apache.solr.common.params.CoreAdminParams.CoreAdminAction;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.core.SolrCore;
import org.apache.solr.handler.component.HttpShardHandlerFactory;
import org.apache.solr.handler.component.ShardHandler;
import org.apache.solr.handler.component.ShardRequest;
import org.apache.solr.handler.component.ShardResponse;
import org.apache.solr.update.PeerSync;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class SyncStrategy {
protected final Logger log = LoggerFactory.getLogger(getClass());
private HttpShardHandlerFactory shardHandlerFactory;
private ShardHandler shardHandler;
private static MultiThreadedHttpConnectionManager mgr = new MultiThreadedHttpConnectionManager();
private static HttpClient client = new HttpClient(mgr);
static {
mgr.getParams().setDefaultMaxConnectionsPerHost(20);
mgr.getParams().setMaxTotalConnections(10000);
mgr.getParams().setConnectionTimeout(30000);
mgr.getParams().setSoTimeout(30000);
// prevent retries (note: this didn't work when set on mgr.. needed to be set on client)
DefaultHttpMethodRetryHandler retryhandler = new DefaultHttpMethodRetryHandler(0, false);
client.getParams().setParameter(HttpMethodParams.RETRY_HANDLER, retryhandler);
}
public SyncStrategy() {
shardHandlerFactory = new HttpShardHandlerFactory();
shardHandler = shardHandlerFactory.getShardHandler(client);
}
private static class SyncShardRequest extends ShardRequest {
String coreName;
}
public boolean sync(ZkController zkController, SolrCore core,
ZkNodeProps leaderProps) {
@ -51,6 +84,10 @@ public class SyncStrategy {
// solrcloud_debug
// System.out.println("SYNC UP");
if (core.getUpdateHandler().getUpdateLog() == null) {
log.error("No UpdateLog found - cannot sync");
return false;
}
boolean success = syncReplicas(zkController, core, leaderProps);
return success;
}
@ -156,7 +193,7 @@ public class SyncStrategy {
}
PeerSync peerSync = new PeerSync(core, syncWith, 1000);
PeerSync peerSync = new PeerSync(core, syncWith, core.getUpdateHandler().getUpdateLog().numRecordsToKeep);
return peerSync.sync();
}
@ -180,44 +217,68 @@ public class SyncStrategy {
ZkCoreNodeProps zkLeader = new ZkCoreNodeProps(leaderProps);
for (ZkCoreNodeProps node : nodes) {
try {
// TODO: do we first everyone register as sync phase? get the overseer
// to do it?
// TODO: this should be done in parallel
QueryRequest qr = new QueryRequest(params("qt", "/get", "getVersions",
Integer.toString(1000), "sync", zkLeader.getCoreUrl(), "distrib",
"false"));
CommonsHttpSolrServer server = new CommonsHttpSolrServer(
node.getCoreUrl());
server.setConnectionTimeout(15000);
server.setSoTimeout(15000);
//System.out.println("ask " + node.getCoreUrl() + " to sync");
NamedList rsp = server.request(qr);
//System.out.println("response about syncing to leader:" + rsp + " node:"
// + node.getCoreUrl() + " me:" + zkController.getBaseUrl());
boolean success = (Boolean) rsp.get("sync");
//System.out.println("success:" + success);
if (!success) {
// System.out
// .println("try and ask " + node.getCoreUrl() + " to recover");
log.info("try and ask " + node.getCoreUrl() + " to recover");
try {
server = new CommonsHttpSolrServer(node.getBaseUrl());
server.setSoTimeout(5000);
server.setConnectionTimeout(5000);
RequestRecovery recoverRequestCmd = new RequestRecovery();
recoverRequestCmd.setAction(CoreAdminAction.REQUESTRECOVERY);
recoverRequestCmd.setCoreName(node.getCoreName());
server.request(recoverRequestCmd);
} catch (Exception e) {
log.info("Could not tell a replica to recover", e);
}
}
// System.out
// .println("try and ask " + node.getCoreUrl() + " to sync");
log.info("try and ask " + node.getCoreUrl() + " to sync");
requestSync(zkLeader.getCoreUrl(), node.getCoreName());
} catch (Exception e) {
SolrException.log(log, "Error syncing replica to leader", e);
}
}
for(;;) {
ShardResponse srsp = shardHandler.takeCompletedOrError();
if (srsp == null) break;
boolean success = handleResponse(srsp);
//System.out.println("got response:" + success);
if (!success) {
try {
log.info("Sync failed - asking replica to recover.");
//System.out.println("Sync failed - asking replica to recover.");
RequestRecovery recoverRequestCmd = new RequestRecovery();
recoverRequestCmd.setAction(CoreAdminAction.REQUESTRECOVERY);
recoverRequestCmd.setCoreName(((SyncShardRequest)srsp.getShardRequest()).coreName);
CommonsHttpSolrServer server = new CommonsHttpSolrServer(zkLeader.getBaseUrl());
server.request(recoverRequestCmd);
} catch (Exception e) {
log.info("Could not tell a replica to recover", e);
}
shardHandler.cancelAll();
break;
}
}
}
private boolean handleResponse(ShardResponse srsp) {
NamedList<Object> response = srsp.getSolrResponse().getResponse();
// TODO: why does this return null sometimes?
if (response == null) {
return false;
}
boolean success = (Boolean) response.get("sync");
return success;
}
private void requestSync(String replica, String coreName) {
SyncShardRequest sreq = new SyncShardRequest();
sreq.coreName = coreName;
sreq.purpose = 1;
// TODO: this sucks
if (replica.startsWith("http://"))
replica = replica.substring(7);
sreq.shards = new String[]{replica};
sreq.actualShards = sreq.shards;
sreq.params = new ModifiableSolrParams();
sreq.params.set("qt","/get");
sreq.params.set("distrib",false);
sreq.params.set("getVersions",Integer.toString(100));
sreq.params.set("sync",replica);
shardHandler.submit(sreq, replica, sreq.params);
}
public static ModifiableSolrParams params(String... params) {

View File

@ -645,7 +645,7 @@ public final class ZkController {
if (doRecovery) {
log.info("Core needs to recover:" + core.getName());
core.getUpdateHandler().getSolrCoreState().doRecovery(core);
core.getUpdateHandler().getSolrCoreState().doRecovery(cc, coreName);
return true;
}
} else {

View File

@ -546,6 +546,7 @@ public class CoreContainer
SolrCore old = null;
synchronized (cores) {
if (isShutDown) {
core.close();
throw new IllegalStateException("This CoreContainer has been shutdown");
}
old = cores.put(name, core);
@ -580,14 +581,14 @@ public class CoreContainer
} catch (InterruptedException e) {
// Restore the interrupted status
Thread.currentThread().interrupt();
log.error("", e);
SolrException.log(log, "", e);
throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "",
e);
} catch (Exception e) {
// if register fails, this is really bad - close the zkController to
// minimize any damage we can cause
zkController.publish(core.getCoreDescriptor(), ZkStateReader.DOWN);
log.error("", e);
SolrException.log(log, "", e);
throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "",
e);
}
@ -862,15 +863,19 @@ public class CoreContainer
public void rename(String name, String toName) {
SolrCore core = getCore(name);
if (core != null) {
register(toName, core, false);
name = checkDefault(name);
synchronized(cores) {
cores.remove(name);
try {
if (core != null) {
register(toName, core, false);
name = checkDefault(name);
synchronized (cores) {
cores.remove(name);
}
}
} finally {
if (core != null) {
core.close();
}
core.close();
}
}

View File

@ -56,6 +56,8 @@ import java.io.*;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.URL;
@ -69,6 +71,12 @@ import java.util.concurrent.locks.ReentrantLock;
public final class SolrCore implements SolrInfoMBean {
public static final String version="1.0";
// These should *only* be used for debugging or monitoring purposes
public static final AtomicLong numOpens = new AtomicLong();
public static final AtomicLong numCloses = new AtomicLong();
public static Map<SolrCore,Exception> openHandles = Collections.synchronizedMap(new IdentityHashMap<SolrCore,Exception>());
public static Logger log = LoggerFactory.getLogger(SolrCore.class);
private String name;
@ -618,6 +626,10 @@ public final class SolrCore implements SolrInfoMBean {
// and a SolrCoreAware MBean may have properties that depend on getting a Searcher
// from the core.
resourceLoader.inform(infoRegistry);
// For debugging
// numOpens.incrementAndGet();
// openHandles.put(this, new RuntimeException("unclosed core - name:" + getName() + " refs: " + refCount.get()));
}
private Codec initCodec(SolrConfig solrConfig, final IndexSchema schema) {
@ -772,6 +784,10 @@ public final class SolrCore implements SolrInfoMBean {
}
}
}
// For debugging
// numCloses.incrementAndGet();
// openHandles.remove(this);
}
/** Current core usage count. */

View File

@ -601,7 +601,7 @@ public class CoreAdminHandler extends RequestHandlerBase {
try {
core = coreContainer.getCore(cname);
if (core != null) {
core.getUpdateHandler().getSolrCoreState().doRecovery(core);
core.getUpdateHandler().getSolrCoreState().doRecovery(coreContainer, cname);
} else {
SolrException.log(log, "Cound not find core to call recovery:" + cname);
}
@ -627,39 +627,41 @@ public class CoreAdminHandler extends RequestHandlerBase {
String waitForState = params.get("state");
Boolean checkLive = params.getBool("checkLive");
int pauseFor = params.getInt("pauseFor", 0);
SolrCore core = null;
try {
core = coreContainer.getCore(cname);
if (core == null) {
throw new SolrException(ErrorCode.BAD_REQUEST, "core not found:" + cname);
}
String state = null;
boolean live = false;
int retry = 0;
while (true) {
// wait until we are sure the recovering node is ready
// to accept updates
CloudDescriptor cloudDescriptor = core.getCoreDescriptor()
.getCloudDescriptor();
CloudState cloudState = coreContainer
.getZkController()
.getCloudState();
String collection = cloudDescriptor.getCollectionName();
Slice slice = cloudState.getSlice(collection,
cloudDescriptor.getShardId());
if (slice != null) {
ZkNodeProps nodeProps = slice.getShards().get(coreNodeName);
if (nodeProps != null) {
state = nodeProps.get(ZkStateReader.STATE_PROP);
live = cloudState.liveNodesContain(nodeName);
if (nodeProps != null && state.equals(waitForState)) {
if (checkLive == null) {
break;
} else if (checkLive && live) {
break;
} else if (!checkLive && !live) {
break;
String state = null;
boolean live = false;
int retry = 0;
while (true) {
SolrCore core = null;
try {
core = coreContainer.getCore(cname);
if (core == null && retry == 30) {
throw new SolrException(ErrorCode.BAD_REQUEST, "core not found:"
+ cname);
}
if (core != null) {
// wait until we are sure the recovering node is ready
// to accept updates
CloudDescriptor cloudDescriptor = core.getCoreDescriptor()
.getCloudDescriptor();
CloudState cloudState = coreContainer.getZkController()
.getCloudState();
String collection = cloudDescriptor.getCollectionName();
Slice slice = cloudState.getSlice(collection,
cloudDescriptor.getShardId());
if (slice != null) {
ZkNodeProps nodeProps = slice.getShards().get(coreNodeName);
if (nodeProps != null) {
state = nodeProps.get(ZkStateReader.STATE_PROP);
live = cloudState.liveNodesContain(nodeName);
if (nodeProps != null && state.equals(waitForState)) {
if (checkLive == null) {
break;
} else if (checkLive && live) {
break;
} else if (!checkLive && !live) {
break;
}
}
}
}
@ -667,42 +669,49 @@ public class CoreAdminHandler extends RequestHandlerBase {
if (retry++ == 30) {
throw new SolrException(ErrorCode.BAD_REQUEST,
"I was asked to wait on state " + waitForState + " for " + nodeName
+ " but I still do not see the request state. I see state: " + state + " live:" + live);
"I was asked to wait on state " + waitForState + " for "
+ nodeName
+ " but I still do not see the request state. I see state: "
+ state + " live:" + live);
}
} finally {
if (core != null) {
core.close();
}
Thread.sleep(1000);
}
// small safety net for any updates that started with state that
// kept it from sending the update to be buffered -
// pause for a while to let any outstanding updates finish
//System.out.println("I saw state:" + state + " sleep for " + pauseFor + " live:" + live);
Thread.sleep(pauseFor);
// solrcloud_debug
// try {;
// LocalSolrQueryRequest r = new LocalSolrQueryRequest(core, new ModifiableSolrParams());
// CommitUpdateCommand commitCmd = new CommitUpdateCommand(r, false);
// commitCmd.softCommit = true;
// core.getUpdateHandler().commit(commitCmd);
// RefCounted<SolrIndexSearcher> searchHolder = core.getNewestSearcher(false);
// SolrIndexSearcher searcher = searchHolder.get();
// try {
// System.out.println(core.getCoreDescriptor().getCoreContainer().getZkController().getNodeName() + " to replicate "
// + searcher.search(new MatchAllDocsQuery(), 1).totalHits + " gen:" + core.getDeletionPolicy().getLatestCommit().getGeneration() + " data:" + core.getDataDir());
// } finally {
// searchHolder.decref();
// }
// } catch (Exception e) {
//
// }
} finally {
if (core != null) {
core.close();
}
Thread.sleep(1000);
}
// small safety net for any updates that started with state that
// kept it from sending the update to be buffered -
// pause for a while to let any outstanding updates finish
// System.out.println("I saw state:" + state + " sleep for " + pauseFor +
// " live:" + live);
Thread.sleep(pauseFor);
// solrcloud_debug
// try {;
// LocalSolrQueryRequest r = new LocalSolrQueryRequest(core, new
// ModifiableSolrParams());
// CommitUpdateCommand commitCmd = new CommitUpdateCommand(r, false);
// commitCmd.softCommit = true;
// core.getUpdateHandler().commit(commitCmd);
// RefCounted<SolrIndexSearcher> searchHolder =
// core.getNewestSearcher(false);
// SolrIndexSearcher searcher = searchHolder.get();
// try {
// System.out.println(core.getCoreDescriptor().getCoreContainer().getZkController().getNodeName()
// + " to replicate "
// + searcher.search(new MatchAllDocsQuery(), 1).totalHits + " gen:" +
// core.getDeletionPolicy().getLatestCommit().getGeneration() + " data:" +
// core.getDataDir());
// } finally {
// searchHolder.decref();
// }
// } catch (Exception e) {
//
// }
}
protected void handleDistribUrlAction(SolrQueryRequest req,

View File

@ -22,6 +22,7 @@ import java.io.IOException;
import org.apache.lucene.index.IndexWriter;
import org.apache.solr.cloud.RecoveryStrategy;
import org.apache.solr.common.SolrException;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.core.DirectoryFactory;
import org.apache.solr.core.SolrCore;
import org.slf4j.Logger;
@ -64,7 +65,6 @@ public final class DefaultSolrCoreState extends SolrCoreState {
@Override
public void decref(IndexWriterCloser closer) throws IOException {
boolean cancelRecovery = false;
synchronized (this) {
refCnt--;
if (refCnt == 0) {
@ -79,11 +79,8 @@ public final class DefaultSolrCoreState extends SolrCoreState {
}
directoryFactory.close();
closed = true;
cancelRecovery = true;
}
}
// don't wait for this in the sync block
if (cancelRecovery) cancelRecovery();
}
@Override
@ -113,7 +110,7 @@ public final class DefaultSolrCoreState extends SolrCoreState {
}
@Override
public void doRecovery(SolrCore core) {
public void doRecovery(CoreContainer cc, String name) {
if (SKIP_AUTO_RECOVERY) {
log.warn("Skipping recovery according to sys prop solrcloud.skip.autorecovery");
return;
@ -133,7 +130,7 @@ public final class DefaultSolrCoreState extends SolrCoreState {
// if true, we are recovering after startup and shouldn't have (or be receiving) additional updates (except for local tlog recovery)
boolean recoveringAfterStartup = recoveryStrat == null;
recoveryStrat = new RecoveryStrategy(core);
recoveryStrat = new RecoveryStrategy(cc, name);
recoveryStrat.setRecoveringAfterStartup(recoveringAfterStartup);
recoveryStrat.start();
recoveryRunning = true;

View File

@ -20,6 +20,7 @@ package org.apache.solr.update;
import java.io.IOException;
import org.apache.lucene.index.IndexWriter;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.core.DirectoryFactory;
import org.apache.solr.core.SolrCore;
@ -80,7 +81,7 @@ public abstract class SolrCoreState {
public void closeWriter(IndexWriter writer) throws IOException;
}
public abstract void doRecovery(SolrCore core);
public abstract void doRecovery(CoreContainer cc, String name);
public abstract void cancelRecovery();

View File

@ -74,6 +74,7 @@ public class LeaderElectionIntegrationTest extends SolrTestCaseJ4 {
public void setUp() throws Exception {
super.setUp();
createTempDir();
ignoreException("No UpdateLog found - cannot sync");
System.setProperty("zkClientTimeout", "3000");
zkDir = dataDir.getAbsolutePath() + File.separator
@ -268,6 +269,7 @@ public class LeaderElectionIntegrationTest extends SolrTestCaseJ4 {
@AfterClass
public static void afterClass() throws InterruptedException {
System.clearProperty("solrcloud.skip.autorecovery");
resetExceptionIgnores();
// wait just a bit for any zk client threads to outlast timeout
Thread.sleep(2000);
}

View File

@ -50,10 +50,13 @@ import org.slf4j.LoggerFactory;
import org.xml.sax.SAXException;
import javax.xml.xpath.XPathExpressionException;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.io.StringWriter;
import java.util.*;
import java.util.Map.Entry;
/**
* A junit4 Solr test harness that extends LuceneTestCaseJ4.
@ -148,7 +151,14 @@ public abstract class SolrTestCaseJ4 extends LuceneTestCase {
if (endNumOpens-numOpens != endNumCloses-numCloses) {
String msg = "ERROR: SolrIndexSearcher opens=" + (endNumOpens-numOpens) + " closes=" + (endNumCloses-numCloses);
log.error(msg);
testsFailed = true;
testsFailed = true;
// For debugging
// Set<Entry<SolrCore,Exception>> coreEntries = SolrCore.openHandles.entrySet();
// for (Entry<SolrCore,Exception> entry : coreEntries) {
// entry.getValue().printStackTrace();
// }
fail(msg);
}
}