SOLR-3126: harden peer sync recovery

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1291530 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Mark Robert Miller 2012-02-20 23:58:49 +00:00
parent 70501dd845
commit 314bce4f6e
11 changed files with 100 additions and 49 deletions

View File

@ -103,9 +103,6 @@ public class RecoveryStrategy extends Thread implements SafeStopThread {
private void replicate(String nodeName, SolrCore core, ZkNodeProps leaderprops, String baseUrl)
throws SolrServerException, IOException {
// start buffer updates to tran log
// and do recovery - either replay via realtime get (eventually)
// or full index replication
String leaderBaseUrl = leaderprops.get(ZkStateReader.BASE_URL_PROP);
ZkCoreNodeProps leaderCNodeProps = new ZkCoreNodeProps(leaderprops);
@ -183,7 +180,7 @@ public class RecoveryStrategy extends Thread implements SafeStopThread {
prepCmd.setCoreNodeName(coreZkNodeName);
prepCmd.setState(ZkStateReader.RECOVERING);
prepCmd.setCheckLive(true);
prepCmd.setPauseFor(4000);
prepCmd.setPauseFor(6000);
server.request(prepCmd);
server.shutdown();
@ -239,26 +236,28 @@ public class RecoveryStrategy extends Thread implements SafeStopThread {
while (!succesfulRecovery && !close && !isInterrupted()) { // don't use interruption or it will close channels though
try {
// first thing we just try to sync
zkController.publish(core.getCoreDescriptor(), ZkStateReader.RECOVERING);
CloudDescriptor cloudDesc = core.getCoreDescriptor()
.getCloudDescriptor();
ZkNodeProps leaderprops = zkStateReader.getLeaderProps(
cloudDesc.getCollectionName(), cloudDesc.getShardId());
String leaderBaseUrl = leaderprops.get(ZkStateReader.BASE_URL_PROP);
String leaderCoreName = leaderprops.get(ZkStateReader.CORE_NAME_PROP);
String leaderUrl = ZkCoreNodeProps.getCoreUrl(leaderBaseUrl, leaderCoreName);
String leaderUrl = ZkCoreNodeProps.getCoreUrl(leaderBaseUrl, leaderCoreName);
sendPrepRecoveryCmd(leaderBaseUrl, leaderCoreName);
// first thing we just try to sync
if (firstTime) {
firstTime = false; // only try sync the first time through the loop
log.info("Attempting to PeerSync from " + leaderUrl + " recoveringAfterStartup="+recoveringAfterStartup);
firstTime = false; // only try sync the first time through the loop
log.info("Attempting to PeerSync from " + leaderUrl);
// System.out.println("Attempting to PeerSync from " + leaderUrl
// + " i am:" + zkController.getNodeName());
PeerSync peerSync = new PeerSync(core,
Collections.singletonList(leaderUrl), ulog.numRecordsToKeep);
peerSync.setStartingVersions(startingRecentVersions);
@ -268,6 +267,26 @@ public class RecoveryStrategy extends Thread implements SafeStopThread {
new ModifiableSolrParams());
core.getUpdateHandler().commit(new CommitUpdateCommand(req, false));
log.info("Sync Recovery was succesful - registering as Active");
// System.out
// .println("Sync Recovery was succesful - registering as Active "
// + zkController.getNodeName());
// solrcloud_debug
// try {
// RefCounted<SolrIndexSearcher> searchHolder =
// core.getNewestSearcher(false);
// SolrIndexSearcher searcher = searchHolder.get();
// try {
// System.out.println(core.getCoreDescriptor().getCoreContainer().getZkController().getNodeName()
// + " synched "
// + searcher.search(new MatchAllDocsQuery(), 1).totalHits);
// } finally {
// searchHolder.decref();
// }
// } catch (Exception e) {
//
// }
// sync success - register as active and return
zkController.publishAsActive(baseUrl, core.getCoreDescriptor(),
coreZkNodeName, coreName);
@ -275,10 +294,11 @@ public class RecoveryStrategy extends Thread implements SafeStopThread {
close = true;
return;
}
log.info("Sync Recovery was not successful - trying replication");
}
//System.out.println("Sync Recovery was not successful - trying replication");
log.info("Begin buffering updates");
ulog.bufferUpdates();
replayed = false;

View File

@ -193,7 +193,7 @@ public final class ZkController {
+ descriptor.getName();
publishAsDown(getBaseUrl(), descriptor, coreZkNodeName,
descriptor.getName());
waitForLeaderToSeeDownState(descriptor, coreZkNodeName);
waitForLeaderToSeeDownState(descriptor, coreZkNodeName, true);
}
}
@ -958,7 +958,7 @@ public final class ZkController {
uploadToZK(zkClient, dir, ZkController.CONFIGS_ZKNODE + "/" + configName);
}
public void preRegisterSetup(SolrCore core, CoreDescriptor cd) {
public void preRegisterSetup(SolrCore core, CoreDescriptor cd, boolean waitForNotLive) {
// before becoming available, make sure we are not live and active
// this also gets us our assigned shard id if it was not specified
publish(cd, ZkStateReader.DOWN);
@ -989,12 +989,12 @@ public final class ZkController {
}
waitForLeaderToSeeDownState(cd, coreZkNodeName);
waitForLeaderToSeeDownState(cd, coreZkNodeName, waitForNotLive);
}
private ZkCoreNodeProps waitForLeaderToSeeDownState(
CoreDescriptor descriptor, final String shardZkNodeName) {
CoreDescriptor descriptor, final String shardZkNodeName, boolean waitForNotLive) {
CloudDescriptor cloudDesc = descriptor.getCloudDescriptor();
String collection = cloudDesc.getCollectionName();
String shard = cloudDesc.getShardId();
@ -1034,7 +1034,11 @@ public final class ZkController {
prepCmd.setNodeName(getNodeName());
prepCmd.setCoreNodeName(shardZkNodeName);
prepCmd.setState(ZkStateReader.DOWN);
prepCmd.setCheckLive(false);
prepCmd.setPauseFor(6000);
if (waitForNotLive){
prepCmd.setCheckLive(false);
}
try {
server.request(prepCmd);

View File

@ -540,7 +540,7 @@ public class CoreContainer
if (zkController != null) {
// this happens before we can receive requests
zkController.preRegisterSetup(core, core.getCoreDescriptor());
zkController.preRegisterSetup(core, core.getCoreDescriptor(), false);
}
SolrCore old = null;

View File

@ -56,9 +56,7 @@ import org.apache.solr.request.LocalSolrQueryRequest;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.response.SolrQueryResponse;
import org.apache.solr.search.SolrIndexSearcher;
import org.apache.solr.update.CommitUpdateCommand;
import org.apache.solr.update.MergeIndexesCommand;
import org.apache.solr.update.processor.DistributedUpdateProcessor;
import org.apache.solr.update.processor.UpdateRequestProcessor;
import org.apache.solr.update.processor.UpdateRequestProcessorChain;
import org.apache.solr.util.NumberUtils;
@ -626,7 +624,7 @@ public class CoreAdminHandler extends RequestHandlerBase {
String nodeName = params.get("nodeName");
String coreNodeName = params.get("coreNodeName");
String waitForState = params.get("state");
boolean checkLive = params.getBool("checkLive", true);
Boolean checkLive = params.getBool("checkLive");
int pauseFor = params.getInt("pauseFor", 0);
SolrCore core = null;
@ -636,6 +634,7 @@ public class CoreAdminHandler extends RequestHandlerBase {
throw new SolrException(ErrorCode.BAD_REQUEST, "core not found:" + cname);
}
String state = null;
boolean live = false;
int retry = 0;
while (true) {
// wait until we are sure the recovering node is ready
@ -649,15 +648,16 @@ public class CoreAdminHandler extends RequestHandlerBase {
ZkNodeProps nodeProps =
cloudState.getSlice(collection,
cloudDescriptor.getShardId()).getShards().get(coreNodeName);
boolean live = false;
if (nodeProps != null) {
state = nodeProps.get(ZkStateReader.STATE_PROP);
live = cloudState.liveNodesContain(nodeName);
if (nodeProps != null && state.equals(waitForState)) {
if (checkLive && live) {
if (checkLive == null) {
break;
} else {
} else if (checkLive && live) {
break;
} else if (!checkLive && !live) {
break;
}
}
@ -675,11 +675,15 @@ public class CoreAdminHandler extends RequestHandlerBase {
// small safety net for any updates that started with state that
// kept it from sending the update to be buffered -
// pause for a while to let any outstanding updates finish
//System.out.println("I saw state:" + state + " sleep for " + pauseFor + " live:" + live);
Thread.sleep(pauseFor);
// solrcloud_debug
// try {
// try {;
// LocalSolrQueryRequest r = new LocalSolrQueryRequest(core, new ModifiableSolrParams());
// CommitUpdateCommand commitCmd = new CommitUpdateCommand(r, false);
// commitCmd.softCommit = true;
// core.getUpdateHandler().commit(commitCmd);
// RefCounted<SolrIndexSearcher> searchHolder = core.getNewestSearcher(false);
// SolrIndexSearcher searcher = searchHolder.get();
// try {

View File

@ -16,6 +16,7 @@ package org.apache.solr.handler.component;
* limitations under the License.
*/
import org.apache.commons.httpclient.HttpClient;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.SolrResponse;
import org.apache.solr.client.solrj.SolrServer;
@ -50,10 +51,12 @@ public class HttpShardHandler extends ShardHandler {
private CompletionService<ShardResponse> completionService;
private Set<Future<ShardResponse>> pending;
private Map<String,List<String>> shardToURLs;
private HttpClient httpClient;
public HttpShardHandler(HttpShardHandlerFactory httpShardHandlerFactory) {
public HttpShardHandler(HttpShardHandlerFactory httpShardHandlerFactory, HttpClient httpClient) {
this.httpClient = httpClient;
this.httpShardHandlerFactory = httpShardHandlerFactory;
completionService = new ExecutorCompletionService<ShardResponse>(httpShardHandlerFactory.commExecutor);
pending = new HashSet<Future<ShardResponse>>();
@ -148,7 +151,7 @@ public class HttpShardHandler extends ShardHandler {
if (urls.size() <= 1) {
String url = urls.get(0);
srsp.setShardAddress(url);
SolrServer server = new CommonsHttpSolrServer(url, httpShardHandlerFactory.client);
SolrServer server = new CommonsHttpSolrServer(url, httpClient == null ? httpShardHandlerFactory.client : httpClient);
ssr.nl = server.request(req);
} else {
LBHttpSolrServer.Rsp rsp = httpShardHandlerFactory.loadbalancer.request(new LBHttpSolrServer.Req(req, urls));

View File

@ -60,7 +60,7 @@ public class HttpShardHandlerFactory extends ShardHandlerFactory implements Plug
LBHttpSolrServer loadbalancer;
int soTimeout = 0; //current default values
int connectionTimeout = 0; //current default values
public String scheme = "http://"; //current default values
public String scheme = "http://"; //current default values
private MultiThreadedHttpConnectionManager mgr;
// socket timeout measured in ms, closes a socket if read
@ -79,7 +79,12 @@ public class HttpShardHandlerFactory extends ShardHandlerFactory implements Plug
public ShardHandler getShardHandler(){
return new HttpShardHandler(this);
return getShardHandler(null);
}
public ShardHandler getShardHandler(HttpClient httpClient){
return new HttpShardHandler(this, httpClient);
}
public void init(PluginInfo info) {

View File

@ -26,18 +26,21 @@ import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.commons.httpclient.DefaultHttpMethodRetryHandler;
import org.apache.commons.httpclient.HttpClient;
import org.apache.commons.httpclient.MultiThreadedHttpConnectionManager;
import org.apache.commons.httpclient.NoHttpResponseException;
import org.apache.commons.httpclient.params.HttpMethodParams;
import org.apache.lucene.util.BytesRef;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.cloud.CloudDescriptor;
import org.apache.solr.cloud.ZkController;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.StrUtils;
import org.apache.solr.core.CoreDescriptor;
import org.apache.solr.core.SolrCore;
import org.apache.solr.handler.component.HttpShardHandlerFactory;
import org.apache.solr.handler.component.ShardHandler;
import org.apache.solr.handler.component.ShardHandlerFactory;
import org.apache.solr.handler.component.ShardRequest;
@ -63,7 +66,7 @@ public class PeerSync {
private UpdateHandler uhandler;
private UpdateLog ulog;
private ShardHandlerFactory shardHandlerFactory;
private HttpShardHandlerFactory shardHandlerFactory;
private ShardHandler shardHandler;
private UpdateLog.RecentUpdates recentUpdates;
@ -74,6 +77,18 @@ public class PeerSync {
private Set<Long> requestedUpdateSet;
private long ourLowThreshold; // 20th percentile
private long ourHighThreshold; // 80th percentile
private static MultiThreadedHttpConnectionManager mgr = new MultiThreadedHttpConnectionManager();
private static HttpClient client = new HttpClient(mgr);
static {
mgr.getParams().setDefaultMaxConnectionsPerHost(20);
mgr.getParams().setMaxTotalConnections(10000);
mgr.getParams().setConnectionTimeout(30000);
mgr.getParams().setSoTimeout(30000);
// prevent retries (note: this didn't work when set on mgr.. needed to be set on client)
DefaultHttpMethodRetryHandler retryhandler = new DefaultHttpMethodRetryHandler(0, false);
client.getParams().setParameter(HttpMethodParams.RETRY_HANDLER, retryhandler);
}
// comparator that sorts by absolute value, putting highest first
private static Comparator<Long> absComparator = new Comparator<Long>() {
@ -125,10 +140,13 @@ public class PeerSync {
this.nUpdates = nUpdates;
this.maxUpdates = nUpdates;
uhandler = core.getUpdateHandler();
ulog = uhandler.getUpdateLog();
shardHandlerFactory = core.getCoreDescriptor().getCoreContainer().getShardHandlerFactory();
shardHandler = shardHandlerFactory.getShardHandler();
// TODO: shutdown
shardHandlerFactory = new HttpShardHandlerFactory();
shardHandler = shardHandlerFactory.getShardHandler(client);
}
/** optional list of updates we had before possibly receiving new updates */
@ -518,8 +536,6 @@ public class PeerSync {
/** Requests and applies recent updates from peers */
public static void sync(SolrCore core, List<String> replicas, int nUpdates) {
UpdateHandler uhandler = core.getUpdateHandler();
ShardHandlerFactory shardHandlerFactory = core.getCoreDescriptor().getCoreContainer().getShardHandlerFactory();
ShardHandler shardHandler = shardHandlerFactory.getShardHandler();
@ -538,7 +554,6 @@ public class PeerSync {
ShardResponse srsp = shardHandler.takeCompletedOrError();
}
}
}

View File

@ -32,7 +32,7 @@ import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Ignore;
@Ignore("Fixme! I am so tired of failing all the time. This is cruelty to animals! :(")
@Ignore("SOLR-3126")
public class ChaosMonkeySafeLeaderTest extends FullSolrCloudTest {
@BeforeClass
@ -113,7 +113,7 @@ public class ChaosMonkeySafeLeaderTest extends FullSolrCloudTest {
waitForThingsToLevelOut();
checkShardConsistency(true, false);
checkShardConsistency(true, true);
if (VERBOSE) System.out.println("control docs:" + controlClient.query(new SolrQuery("*:*")).getResults().getNumFound() + "\n\n");
}

View File

@ -1292,7 +1292,7 @@ public class FullSolrCloudTest extends AbstractDistributedZkTestCase {
+ DEFAULT_COLLECTION;
CommonsHttpSolrServer s = new CommonsHttpSolrServer(url);
s.setConnectionTimeout(100); // 1/10th sec
s.setSoTimeout(30000);
s.setSoTimeout(15000);
s.setDefaultMaxConnectionsPerHost(100);
s.setMaxTotalConnections(100);
return s;

View File

@ -151,7 +151,7 @@ public class OverseerTest extends SolrTestCaseJ4 {
collection1Desc.setCollectionName("collection1");
CoreDescriptor desc1 = new CoreDescriptor(null, "core" + (i + 1), "");
desc1.setCloudDescriptor(collection1Desc);
zkController.preRegisterSetup(null, desc1);
zkController.preRegisterSetup(null, desc1, false);
ids[i] = zkController.register("core" + (i + 1), desc1);
}
@ -248,7 +248,7 @@ public class OverseerTest extends SolrTestCaseJ4 {
final CoreDescriptor desc = new CoreDescriptor(null, coreName, "");
desc.setCloudDescriptor(collection1Desc);
try {
controllers[slot % nodeCount].preRegisterSetup(null, desc);
controllers[slot % nodeCount].preRegisterSetup(null, desc, false);
ids[slot] = controllers[slot % nodeCount]
.register(coreName, desc);
} catch (Throwable e) {

View File

@ -235,7 +235,7 @@ public class ZkTestServer {
} catch(IllegalStateException e) {
}
if (cnt == 100) {
if (cnt == 500) {
throw new RuntimeException("Could not get the port for ZooKeeper server");
}
cnt++;