SOLR-3663: There are a couple of bugs in the sync process when a leader goes down and a new leader is elected.

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1364429 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Mark Robert Miller 2012-07-22 20:30:37 +00:00
parent 19aa69b0cd
commit 6578d7f512
17 changed files with 445 additions and 95 deletions

View File

@ -128,6 +128,9 @@ Bug Fixes
was changed to synchronize on core state since more than on update handler
can coexist for a single index during a reload. (yonik)
* SOLR-3663: There are a couple of bugs in the sync process when a leader goes down and a
new leader is elected. (Mark Miller)
Other Changes
----------------------

View File

@ -69,6 +69,7 @@ public class SyncStrategy {
public boolean sync(ZkController zkController, SolrCore core,
ZkNodeProps leaderProps) {
log.info("Sync replicas to " + ZkCoreNodeProps.getCoreUrl(leaderProps));
// TODO: look at our state usage of sync
// zkController.publish(core, ZkStateReader.SYNC);
@ -208,7 +209,7 @@ public class SyncStrategy {
// System.out
// .println("try and ask " + node.getCoreUrl() + " to sync");
log.info("try and ask " + node.getCoreUrl() + " to sync");
requestSync(zkLeader.getCoreUrl(), node.getCoreName());
requestSync(node.getCoreUrl(), zkLeader.getCoreUrl(), node.getCoreName());
} catch (Exception e) {
SolrException.log(log, "Error syncing replica to leader", e);
@ -224,12 +225,15 @@ public class SyncStrategy {
if (!success) {
try {
log.info("Sync failed - asking replica to recover.");
//System.out.println("Sync failed - asking replica to recover.");
// TODO: do this in background threads
RequestRecovery recoverRequestCmd = new RequestRecovery();
recoverRequestCmd.setAction(CoreAdminAction.REQUESTRECOVERY);
recoverRequestCmd.setCoreName(((SyncShardRequest)srsp.getShardRequest()).coreName);
HttpSolrServer server = new HttpSolrServer(zkLeader.getBaseUrl());
HttpSolrServer server = new HttpSolrServer(srsp.getShardAddress());
server.setConnectionTimeout(45000);
server.setSoTimeout(45000);
server.request(recoverRequestCmd);
} catch (Exception e) {
log.info("Could not tell a replica to recover", e);
@ -251,7 +255,7 @@ public class SyncStrategy {
return success;
}
private void requestSync(String replica, String coreName) {
private void requestSync(String replica, String leaderUrl, String coreName) {
SyncShardRequest sreq = new SyncShardRequest();
sreq.coreName = coreName;
sreq.purpose = 1;
@ -264,7 +268,7 @@ public class SyncStrategy {
sreq.params.set("qt","/get");
sreq.params.set("distrib",false);
sreq.params.set("getVersions",Integer.toString(100));
sreq.params.set("sync",replica);
sreq.params.set("sync",leaderUrl);
shardHandler.submit(sreq, replica, sreq.params);
}

View File

@ -17,9 +17,17 @@ package org.apache.solr.handler.admin;
* limitations under the License.
*/
import java.io.IOException;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.impl.HttpSolrServer;
import org.apache.solr.client.solrj.request.CoreAdminRequest;
import org.apache.solr.client.solrj.request.CoreAdminRequest.RequestSyncShard;
import org.apache.solr.cloud.Overseer;
import org.apache.solr.cloud.OverseerCollectionProcessor;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.CloudState;
import org.apache.solr.common.cloud.ZkCoreNodeProps;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.CollectionParams.CollectionAction;
@ -103,6 +111,10 @@ public class CollectionsHandler extends RequestHandlerBase {
this.handleReloadAction(req, rsp);
break;
}
case SYNCSHARD: {
this.handleSyncShardAction(req, rsp);
break;
}
default: {
throw new RuntimeException("Unknown action: " + action);
@ -124,6 +136,24 @@ public class CollectionsHandler extends RequestHandlerBase {
coreContainer.getZkController().getOverseerCollectionQueue().offer(ZkStateReader.toJSON(m));
}
private void handleSyncShardAction(SolrQueryRequest req, SolrQueryResponse rsp) throws KeeperException, InterruptedException, SolrServerException, IOException {
log.info("Syncing shard : " + req.getParamString());
String collection = req.getParams().required().get("collection");
String shard = req.getParams().required().get("shard");
CloudState cloudState = coreContainer.getZkController().getCloudState();
ZkNodeProps leaderProps = cloudState.getLeader(collection, shard);
ZkCoreNodeProps nodeProps = new ZkCoreNodeProps(leaderProps);
HttpSolrServer server = new HttpSolrServer(nodeProps.getBaseUrl());
RequestSyncShard reqSyncShard = new CoreAdminRequest.RequestSyncShard();
reqSyncShard.setCollection(collection);
reqSyncShard.setShard(shard);
reqSyncShard.setCoreName(nodeProps.getCoreName());
server.request(reqSyncShard);
}
private void handleDeleteAction(SolrQueryRequest req, SolrQueryResponse rsp) throws KeeperException, InterruptedException {
log.info("Deleting Collection : " + req.getParamString());

View File

@ -20,7 +20,9 @@ package org.apache.solr.handler.admin;
import java.io.File;
import java.io.IOException;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Properties;
import org.apache.commons.io.FileUtils;
@ -28,6 +30,8 @@ import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.store.Directory;
import org.apache.lucene.util.IOUtils;
import org.apache.solr.cloud.CloudDescriptor;
import org.apache.solr.cloud.SyncStrategy;
import org.apache.solr.cloud.ZkController;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.cloud.CloudState;
@ -47,8 +51,6 @@ import org.apache.solr.core.CoreDescriptor;
import org.apache.solr.core.DirectoryFactory;
import org.apache.solr.core.SolrCore;
import org.apache.solr.handler.RequestHandlerBase;
import org.apache.solr.handler.component.ShardHandler;
import org.apache.solr.handler.component.ShardHandlerFactory;
import org.apache.solr.request.LocalSolrQueryRequest;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.response.SolrQueryResponse;
@ -69,8 +71,6 @@ import org.slf4j.LoggerFactory;
public class CoreAdminHandler extends RequestHandlerBase {
protected static Logger log = LoggerFactory.getLogger(CoreAdminHandler.class);
protected final CoreContainer coreContainer;
private ShardHandlerFactory shardHandlerFactory;
private ShardHandler shardHandler;
public CoreAdminHandler() {
super();
@ -87,8 +87,6 @@ public class CoreAdminHandler extends RequestHandlerBase {
*/
public CoreAdminHandler(final CoreContainer coreContainer) {
this.coreContainer = coreContainer;
shardHandlerFactory = coreContainer.getShardHandlerFactory();
shardHandler = shardHandlerFactory.getShardHandler();
}
@ -182,6 +180,11 @@ public class CoreAdminHandler extends RequestHandlerBase {
break;
}
case REQUESTSYNCSHARD: {
this.handleRequestSyncAction(req, rsp);
break;
}
default: {
doPersist = this.handleCustomAction(req, rsp);
break;
@ -676,6 +679,48 @@ public class CoreAdminHandler extends RequestHandlerBase {
}
}
protected void handleRequestSyncAction(SolrQueryRequest req,
SolrQueryResponse rsp) throws IOException {
final SolrParams params = req.getParams();
log.info("I have been requested to sync up my shard");
ZkController zkController = coreContainer.getZkController();
if (zkController == null) {
throw new SolrException(ErrorCode.BAD_REQUEST, "Only valid for SolrCloud");
}
String cname = params.get(CoreAdminParams.CORE);
if (cname == null) {
throw new IllegalArgumentException(CoreAdminParams.CORE + " is required");
}
SolrCore core = null;
try {
core = coreContainer.getCore(cname);
if (core != null) {
SyncStrategy syncStrategy = new SyncStrategy();
Map<String,String> props = new HashMap<String,String>();
props.put(ZkStateReader.BASE_URL_PROP, zkController.getBaseUrl());
props.put(ZkStateReader.CORE_NAME_PROP, cname);
props.put(ZkStateReader.NODE_NAME_PROP, zkController.getNodeName());
boolean success = syncStrategy.sync(zkController, core, new ZkNodeProps(props));
if (!success) {
throw new SolrException(ErrorCode.SERVER_ERROR, "Sync Failed");
}
} else {
SolrException.log(log, "Cound not find core to call sync:" + cname);
}
} finally {
// no recoveryStrat close for now
if (core != null) {
core.close();
}
}
}
protected void handleWaitForStateAction(SolrQueryRequest req,
SolrQueryResponse rsp) throws IOException, InterruptedException {
final SolrParams params = req.getParams();

View File

@ -165,7 +165,7 @@ public class PeerSync {
String myURL = "";
if (zkController != null) {
myURL = zkController.getZkServerAddress();
myURL = zkController.getBaseUrl();
}
// TODO: core name turns up blank in many tests - find URL if cloud enabled?

View File

@ -19,9 +19,12 @@ package org.apache.solr.update.processor;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.CharsRef;
@ -209,9 +212,23 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
coreName, null, ZkStateReader.DOWN);
if (replicaProps != null) {
nodes = new ArrayList<Node>(replicaProps.size());
// check for test param that lets us miss replicas
String[] skipList = req.getParams().getParams("test.distrib.skip.servers");
Set<String> skipListSet = null;
if (skipList != null) {
skipListSet = new HashSet<String>(skipList.length);
skipListSet.addAll(Arrays.asList(skipList));
}
for (ZkCoreNodeProps props : replicaProps) {
if (skipList != null) {
if (!skipListSet.contains(props.getCoreUrl())) {
nodes.add(new StdNode(props));
}
} else {
nodes.add(new StdNode(props));
}
}
}
} else {

View File

@ -110,9 +110,15 @@ public abstract class AbstractDistributedZkTestCase extends BaseDistributedSearc
waitForRecoveriesToFinish(collection, zkStateReader, verbose, true);
}
protected void waitForRecoveriesToFinish(String collection,
ZkStateReader zkStateReader, boolean verbose, boolean failOnTimeout)
protected void waitForRecoveriesToFinish(String collection, ZkStateReader zkStateReader, boolean verbose, boolean failOnTimeout)
throws Exception {
waitForRecoveriesToFinish(collection, zkStateReader, verbose, failOnTimeout, 120 * (TEST_NIGHTLY ? 2 : 1) * RANDOM_MULTIPLIER);
}
protected void waitForRecoveriesToFinish(String collection,
ZkStateReader zkStateReader, boolean verbose, boolean failOnTimeout, int timeoutSeconds)
throws Exception {
log.info("Wait for recoveries to finish - collection: " + collection + " failOnTimeout:" + failOnTimeout + " timeout (sec):" + timeoutSeconds);
boolean cont = true;
int cnt = 0;
@ -139,7 +145,7 @@ public abstract class AbstractDistributedZkTestCase extends BaseDistributedSearc
}
}
}
if (!sawLiveRecovering || cnt == 520) {
if (!sawLiveRecovering || cnt == timeoutSeconds) {
if (!sawLiveRecovering) {
if (verbose) System.out.println("no one is recoverying");
} else {

View File

@ -36,6 +36,8 @@ import org.apache.solr.core.CoreContainer;
import org.apache.solr.servlet.SolrDispatchFilter;
import org.apache.zookeeper.KeeperException;
import org.eclipse.jetty.servlet.FilterHolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* The monkey can stop random or specific jetties used with SolrCloud.
@ -45,6 +47,7 @@ import org.eclipse.jetty.servlet.FilterHolder;
*
*/
public class ChaosMonkey {
private static Logger log = LoggerFactory.getLogger(ChaosMonkey.class);
private static final int CONLOSS_PERCENT = 3; //30%
private static final int EXPIRE_PERCENT = 4; //40%
@ -82,9 +85,12 @@ public class ChaosMonkey {
Random random = LuceneTestCase.random();
expireSessions = random.nextBoolean();
causeConnectionLoss = random.nextBoolean();
monkeyLog("init - expire sessions:" + expireSessions
+ " cause connection loss:" + causeConnectionLoss);
}
public void expireSession(JettySolrRunner jetty) {
monkeyLog("expire session for " + jetty.getLocalPort() + " !");
SolrDispatchFilter solrDispatchFilter = (SolrDispatchFilter) jetty.getDispatchFilter().getFilter();
if (solrDispatchFilter != null) {
CoreContainer cores = solrDispatchFilter.getCores();
@ -106,8 +112,9 @@ public class ChaosMonkey {
}
public void randomConnectionLoss() throws KeeperException, InterruptedException {
String sliceName = getRandomSlice();
monkeyLog("cause connection loss!");
String sliceName = getRandomSlice();
JettySolrRunner jetty = getRandomJetty(sliceName, aggressivelyKillLeaders);
if (jetty != null) {
causeConnectionLoss(jetty);
@ -145,7 +152,7 @@ public class ChaosMonkey {
}
public static void stop(JettySolrRunner jetty) throws Exception {
monkeyLog("stop shard! " + jetty.getLocalPort());
// get a clean shutdown so that no dirs are left open...
FilterHolder fh = jetty.getDispatchFilter();
if (fh != null) {
@ -162,6 +169,7 @@ public class ChaosMonkey {
}
public static void kill(JettySolrRunner jetty) throws Exception {
monkeyLog("kill shard! " + jetty.getLocalPort());
FilterHolder fh = jetty.getDispatchFilter();
SolrDispatchFilter sdf = null;
if (fh != null) {
@ -288,6 +296,7 @@ public class ChaosMonkey {
if (numActive < 2) {
// we cannot kill anyone
monkeyLog("only one active node in shard - monkey cannot kill :(");
return null;
}
Random random = LuceneTestCase.random();
@ -306,17 +315,19 @@ public class ChaosMonkey {
boolean isLeader = leader.get(ZkStateReader.NODE_NAME_PROP).equals(jetties.get(index).nodeName);
if (!aggressivelyKillLeaders && isLeader) {
// we don't kill leaders...
monkeyLog("abort! I don't kill leaders");
return null;
}
}
if (jetty.getLocalPort() == -1) {
// we can't kill the dead
monkeyLog("abort! This guy is already dead");
return null;
}
//System.out.println("num active:" + numActive + " for " + slice + " sac:" + jetty.getLocalPort());
monkeyLog("chose a victim! " + jetty.getLocalPort());
return jetty;
}
@ -335,6 +346,7 @@ public class ChaosMonkey {
// synchronously starts and stops shards randomly, unless there is only one
// active shard up for a slice or if there is one active and others recovering
public void startTheMonkey(boolean killLeaders, final int roundPause) {
monkeyLog("starting");
this.aggressivelyKillLeaders = killLeaders;
startTime = System.currentTimeMillis();
// TODO: when kill leaders is on, lets kill a higher percentage of leaders
@ -409,14 +421,18 @@ public class ChaosMonkey {
e.printStackTrace();
}
}
System.out.println("I ran for " + (System.currentTimeMillis() - startTime)/1000.0f + "sec. I stopped " + stops + " and I started " + starts
monkeyLog("finished");
monkeyLog("I ran for " + (System.currentTimeMillis() - startTime)/1000.0f + "sec. I stopped " + stops + " and I started " + starts
+ ". I also expired " + expires.get() + " and caused " + connloss
+ " connection losses");
}
}.start();
}
public static void monkeyLog(String msg) {
log.info("monkey: " + msg);
}
public void stopTheMonkey() {
stop = true;
}

View File

@ -20,7 +20,6 @@ package org.apache.solr.cloud;
import java.net.ConnectException;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import org.apache.http.client.HttpClient;
import org.apache.lucene.util.LuceneTestCase.Slow;
@ -35,10 +34,15 @@ import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@Slow
@Ignore("ignore while investigating jenkins fails")
public class ChaosMonkeyNothingIsSafeTest extends FullSolrCloudTest {
public static Logger log = LoggerFactory.getLogger(ChaosMonkeyNothingIsSafeTest.class);
private static final int BASE_RUN_LENGTH = 180000;
@BeforeClass
public static void beforeSuperClass() {
@ -53,7 +57,7 @@ public class ChaosMonkeyNothingIsSafeTest extends FullSolrCloudTest {
public void setUp() throws Exception {
super.setUp();
// TODO use @Noisy annotation as we expect lots of exceptions
ignoreException(".*");
//ignoreException(".*");
System.setProperty("numShards", Integer.toString(sliceCount));
}
@ -67,8 +71,8 @@ public class ChaosMonkeyNothingIsSafeTest extends FullSolrCloudTest {
public ChaosMonkeyNothingIsSafeTest() {
super();
sliceCount = atLeast(2);
shardCount = atLeast(sliceCount * 2);
sliceCount = 2;
shardCount = 6;
}
@Override
@ -99,8 +103,9 @@ public class ChaosMonkeyNothingIsSafeTest extends FullSolrCloudTest {
ftIndexThread.start();
chaosMonkey.startTheMonkey(true, 1500);
int runLength = atLeast(BASE_RUN_LENGTH);
try {
Thread.sleep(180000);
Thread.sleep(runLength);
} finally {
chaosMonkey.stopTheMonkey();
}
@ -124,7 +129,7 @@ public class ChaosMonkeyNothingIsSafeTest extends FullSolrCloudTest {
Thread.sleep(2000);
// wait until there are no recoveries...
waitForThingsToLevelOut();
waitForThingsToLevelOut(Math.round((runLength / 1000.0f / 5.0f)));
// make sure we again have leaders for each shard
for (int j = 1; j < sliceCount; j++) {
@ -157,35 +162,6 @@ public class ChaosMonkeyNothingIsSafeTest extends FullSolrCloudTest {
}
}
private void waitForThingsToLevelOut() throws Exception {
int cnt = 0;
boolean retry = false;
do {
waitForRecoveriesToFinish(VERBOSE);
try {
commit();
} catch (Exception e) {
// we don't care if this commit fails on some nodes
}
updateMappingsFromZk(jettys, clients);
Set<String> theShards = shardToClient.keySet();
String failMessage = null;
for (String shard : theShards) {
failMessage = checkShardConsistency(shard, false);
}
if (failMessage != null) {
retry = true;
}
cnt++;
if (cnt > 10) break;
Thread.sleep(4000);
} while (retry);
}
// skip the randoms - they can deadlock...
protected void indexr(Object... fields) throws Exception {
SolrInputDocument doc = getDoc(fields);

View File

@ -19,7 +19,6 @@ package org.apache.solr.cloud;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import org.apache.solr.client.solrj.SolrQuery;
import org.apache.solr.common.SolrInputDocument;
@ -32,6 +31,8 @@ import org.junit.Ignore;
@Ignore("SOLR-3126")
public class ChaosMonkeySafeLeaderTest extends FullSolrCloudTest {
private static final int BASE_RUN_LENGTH = 120000;
@BeforeClass
public static void beforeSuperClass() {
@ -66,7 +67,7 @@ public class ChaosMonkeySafeLeaderTest extends FullSolrCloudTest {
public ChaosMonkeySafeLeaderTest() {
super();
sliceCount = atLeast(2);
shardCount = atLeast(sliceCount);
shardCount = atLeast(sliceCount*2);
}
@Override
@ -89,8 +90,8 @@ public class ChaosMonkeySafeLeaderTest extends FullSolrCloudTest {
}
chaosMonkey.startTheMonkey(false, 500);
Thread.sleep(atLeast(8000));
int runLength = atLeast(BASE_RUN_LENGTH);
Thread.sleep(runLength);
chaosMonkey.stopTheMonkey();
@ -109,41 +110,13 @@ public class ChaosMonkeySafeLeaderTest extends FullSolrCloudTest {
// try and wait for any replications and what not to finish...
waitForThingsToLevelOut();
waitForThingsToLevelOut(Math.round((runLength / 1000.0f / 5.0f)));
checkShardConsistency(true, true);
if (VERBOSE) System.out.println("control docs:" + controlClient.query(new SolrQuery("*:*")).getResults().getNumFound() + "\n\n");
}
private void waitForThingsToLevelOut() throws Exception {
int cnt = 0;
boolean retry = false;
do {
waitForRecoveriesToFinish(false);
commit();
updateMappingsFromZk(jettys, clients);
Set<String> theShards = shardToClient.keySet();
String failMessage = null;
for (String shard : theShards) {
failMessage = checkShardConsistency(shard, false);
}
if (failMessage != null) {
retry = true;
} else {
retry = false;
}
cnt++;
if (cnt > 10) break;
Thread.sleep(2000);
} while (retry);
}
// skip the randoms - they can deadlock...
protected void indexr(Object... fields) throws Exception {
SolrInputDocument doc = new SolrInputDocument();

View File

@ -55,6 +55,8 @@ import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
*
@ -64,6 +66,8 @@ import org.junit.BeforeClass;
*/
@Slow
public class FullSolrCloudTest extends AbstractDistributedZkTestCase {
private static Logger log = LoggerFactory.getLogger(FullSolrCloudTest.class);
@BeforeClass
public static void beforeFullSolrCloudTest() {
// shorten the log output more for this test type
@ -103,12 +107,13 @@ public class FullSolrCloudTest extends AbstractDistributedZkTestCase {
protected volatile ZkStateReader zkStateReader;
private Map<String,SolrServer> shardToLeaderClient = new HashMap<String,SolrServer>();
private Map<String,CloudJettyRunner> shardToLeaderJetty = new HashMap<String,CloudJettyRunner>();
protected Map<String,CloudJettyRunner> shardToLeaderJetty = new HashMap<String,CloudJettyRunner>();
class CloudJettyRunner {
JettySolrRunner jetty;
String nodeName;
String coreNodeName;
String url;
}
static class CloudSolrServerClient {
@ -403,6 +408,7 @@ public class FullSolrCloudTest extends AbstractDistributedZkTestCase {
cjr.jetty = jetty;
cjr.nodeName = shard.getValue().get(ZkStateReader.NODE_NAME_PROP);
cjr.coreNodeName = shard.getKey();
cjr.url = shard.getValue().get(ZkStateReader.BASE_URL_PROP) + "/" + shard.getValue().get(ZkStateReader.CORE_NAME_PROP);
list.add(cjr);
if (isLeader) {
shardToLeaderJetty.put(slice.getKey(), cjr);
@ -653,6 +659,11 @@ public class FullSolrCloudTest extends AbstractDistributedZkTestCase {
super.waitForRecoveriesToFinish(DEFAULT_COLLECTION, zkStateReader, verbose);
}
protected void waitForRecoveriesToFinish(boolean verbose, int timeoutSeconds)
throws Exception {
super.waitForRecoveriesToFinish(DEFAULT_COLLECTION, zkStateReader, verbose, true, timeoutSeconds);
}
private void brindDownShardIndexSomeDocsAndRecover() throws Exception {
SolrQuery query = new SolrQuery("*:*");
query.set("distrib", false);
@ -660,7 +671,6 @@ public class FullSolrCloudTest extends AbstractDistributedZkTestCase {
commit();
long deadShardCount = shardToClient.get(SHARD2).get(0).query(query).getResults().getNumFound();
System.err.println("dsc:" + deadShardCount);
query("q", "*:*", "sort", "n_tl1 desc");
@ -1320,6 +1330,36 @@ public class FullSolrCloudTest extends AbstractDistributedZkTestCase {
};
protected void waitForThingsToLevelOut(int waitForRecTimeSeconds) throws Exception {
log.info("Wait for recoveries to finish - wait " + waitForRecTimeSeconds + " for each attempt");
int cnt = 0;
boolean retry = false;
do {
waitForRecoveriesToFinish(VERBOSE, waitForRecTimeSeconds);
try {
commit();
} catch (Exception e) {
// we don't care if this commit fails on some nodes
}
updateMappingsFromZk(jettys, clients);
Set<String> theShards = shardToClient.keySet();
String failMessage = null;
for (String shard : theShards) {
failMessage = checkShardConsistency(shard, false);
}
if (failMessage != null) {
retry = true;
}
cnt++;
if (cnt > 2) break;
Thread.sleep(4000);
} while (retry);
}
@Override
@After
public void tearDown() throws Exception {

View File

@ -0,0 +1,198 @@
package org.apache.solr.cloud;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import org.apache.lucene.util.LuceneTestCase.Slow;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.impl.HttpSolrServer;
import org.apache.solr.client.solrj.request.QueryRequest;
import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.params.CollectionParams.CollectionAction;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
/**
* Test sync phase that occurs when Leader goes down and a new Leader is
* elected.
*/
@Slow
public class SyncSliceTest extends FullSolrCloudTest {
@BeforeClass
public static void beforeSuperClass() {
}
@AfterClass
public static void afterSuperClass() {
}
@Before
@Override
public void setUp() throws Exception {
super.setUp();
// we expect this time of exception as shards go up and down...
//ignoreException(".*");
System.setProperty("numShards", Integer.toString(sliceCount));
}
@Override
@After
public void tearDown() throws Exception {
super.tearDown();
resetExceptionIgnores();
}
public SyncSliceTest() {
super();
sliceCount = 1;
shardCount = 3;
}
@Override
public void doTest() throws Exception {
handle.clear();
handle.put("QTime", SKIPVAL);
handle.put("timestamp", SKIPVAL);
waitForThingsToLevelOut();
del("*:*");
List<String> skipServers = new ArrayList<String>();
indexDoc(skipServers, id, 0, i1, 50, tlong, 50, t1,
"to come to the aid of their country.");
indexDoc(skipServers, id, 1, i1, 50, tlong, 50, t1,
"old haven was blue.");
skipServers.add(shardToJetty.get("shard1").get(1).url + "/");
indexDoc(skipServers, id, 2, i1, 50, tlong, 50, t1,
"but the song was fancy.");
skipServers.add(shardToJetty.get("shard1").get(2).url + "/");
indexDoc(skipServers, id, 3, i1, 50, tlong, 50, t1,
"under the moon and over the lake");
commit();
ModifiableSolrParams params = new ModifiableSolrParams();
params.set("action", CollectionAction.SYNCSHARD.toString());
params.set("collection", "collection1");
params.set("shard", "shard1");
SolrRequest request = new QueryRequest(params);
request.setPath("/admin/collections");
String baseUrl = ((HttpSolrServer) shardToClient.get("shard1").get(2)).getBaseURL();
baseUrl = baseUrl.substring(0, baseUrl.length() - "collection1".length());
HttpSolrServer baseServer = new HttpSolrServer(baseUrl);
baseServer.request(request);
waitForThingsToLevelOut();
checkShardConsistency(false, true);
skipServers = new ArrayList<String>();
skipServers.add(shardToJetty.get("shard1").get(random().nextInt(shardCount)).url + "/");
// this doc won't be on one node
indexDoc(skipServers, id, 4, i1, 50, tlong, 50, t1,
"to come to the aid of their country.");
// kill the leader - new leader could have all the docs or be missing one
chaosMonkey.killJetty(shardToLeaderJetty.get("shard1").jetty);
waitForThingsToLevelOut();
checkShardConsistency(false, true);
}
private void waitForThingsToLevelOut() throws Exception {
int cnt = 0;
boolean retry = false;
do {
waitForRecoveriesToFinish(false);
commit();
updateMappingsFromZk(jettys, clients);
Set<String> theShards = shardToClient.keySet();
String failMessage = null;
for (String shard : theShards) {
failMessage = checkShardConsistency(shard, false);
}
if (failMessage != null) {
retry = true;
} else {
retry = false;
}
cnt++;
if (cnt > 10) break;
Thread.sleep(2000);
} while (retry);
}
protected void indexDoc(List<String> skipServers, Object... fields) throws IOException,
SolrServerException {
SolrInputDocument doc = new SolrInputDocument();
addFields(doc, fields);
addFields(doc, "rnd_b", true);
controlClient.add(doc);
UpdateRequest ureq = new UpdateRequest();
ureq.add(doc);
ModifiableSolrParams params = new ModifiableSolrParams();
for (String skip : skipServers) {
params.add("test.distrib.skip.servers", skip);
}
ureq.setParams(params);
ureq.process(cloudClient);
}
// skip the randoms - they can deadlock...
protected void indexr(Object... fields) throws Exception {
SolrInputDocument doc = new SolrInputDocument();
addFields(doc, fields);
addFields(doc, "rnd_b", true);
indexDoc(doc);
}
}

View File

@ -221,6 +221,44 @@ public class CoreAdminRequest extends SolrRequest
}
}
public static class RequestSyncShard extends CoreAdminRequest {
private String shard;
private String collection;
public RequestSyncShard() {
action = CoreAdminAction.REQUESTSYNCSHARD;
}
@Override
public SolrParams getParams() {
if( action == null ) {
throw new RuntimeException( "no action specified!" );
}
ModifiableSolrParams params = new ModifiableSolrParams();
params.set(CoreAdminParams.ACTION, action.toString());
params.set("shard", shard);
params.set("collection", collection);
params.set(CoreAdminParams.CORE, core);
return params;
}
public String getShard() {
return shard;
}
public void setShard(String shard) {
this.shard = shard;
}
public String getCollection() {
return collection;
}
public void setCollection(String collection) {
this.collection = collection;
}
}
//a persist core request
public static class Persist extends CoreAdminRequest {
protected String fileName = null;

View File

@ -45,6 +45,10 @@ public class ZkCoreNodeProps {
return nodeProps.get(ZkStateReader.CORE_NAME_PROP);
}
public static String getCoreUrl(ZkNodeProps nodeProps) {
return getCoreUrl(nodeProps.get(ZkStateReader.BASE_URL_PROP), nodeProps.get(ZkStateReader.CORE_NAME_PROP));
}
public static String getCoreUrl(String baseUrl, String coreName) {
StringBuilder sb = new StringBuilder();
if (baseUrl == null) return null;

View File

@ -277,7 +277,6 @@ public class ZkStateReader {
private synchronized void updateCloudState(boolean immediate,
final boolean onlyLiveNodes) throws KeeperException,
InterruptedException {
log.info("Manual update of cluster state initiated");
// build immutable CloudInfo
if (immediate) {

View File

@ -28,7 +28,7 @@ public interface CollectionParams
public enum CollectionAction {
CREATE, DELETE, RELOAD;
CREATE, DELETE, RELOAD, SYNCSHARD;
public static CollectionAction get( String p )
{

View File

@ -93,7 +93,8 @@ public interface CoreAdminParams
RENAME,
MERGEINDEXES,
PREPRECOVERY,
REQUESTRECOVERY;
REQUESTRECOVERY,
REQUESTSYNCSHARD;
public static CoreAdminAction get( String p )
{