add sync tests and logging

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1366572 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Mark Robert Miller 2012-07-28 00:35:46 +00:00
parent 3b1d444672
commit 89056ffd9d
5 changed files with 199 additions and 65 deletions

View File

@ -17,11 +17,13 @@ package org.apache.solr.cloud;
* limitations under the License.
*/
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.http.client.HttpClient;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.impl.HttpClientUtil;
import org.apache.solr.client.solrj.impl.HttpSolrServer;
import org.apache.solr.client.solrj.request.CoreAdminRequest.RequestRecovery;
@ -63,8 +65,9 @@ public class SyncStrategy {
shardHandler = new HttpShardHandlerFactory().getShardHandler(client);
}
private static class SyncShardRequest extends ShardRequest {
private static class ShardCoreRequest extends ShardRequest {
String coreName;
public String baseUrl;
}
public boolean sync(ZkController zkController, SolrCore core,
@ -105,23 +108,19 @@ public class SyncStrategy {
if (!success
&& !areAnyOtherReplicasActive(zkController, leaderProps, collection,
shardId)) {
// System.out
// .println("wasnt a success but no on else i active! I am the leader");
log.info("Sync was not a success but no on else i active! I am the leader");
success = true;
}
if (success) {
// solrcloud_debug
// System.out.println("Sync success");
// we are the leader - tell all of our replias to sync with us
log.info("Sync Success - now sync replicas to me");
syncToMe(zkController, collection, shardId, leaderProps);
} else {
SolrException.log(log, "Sync Failed");
// solrcloud_debug
// System.out.println("Sync failure");
// lets see who seems ahead...
}
} catch (Exception e) {
@ -163,11 +162,7 @@ public class SyncStrategy {
.getReplicaProps(collection, shardId,
props.get(ZkStateReader.NODE_NAME_PROP),
props.get(ZkStateReader.CORE_NAME_PROP), ZkStateReader.ACTIVE); // TODO:
// should
// there
// be a
// state
// filter?
// TODO should there be a state filter?
if (nodes == null) {
// I have no replicas
@ -198,19 +193,17 @@ public class SyncStrategy {
leaderProps.get(ZkStateReader.NODE_NAME_PROP),
leaderProps.get(ZkStateReader.CORE_NAME_PROP), ZkStateReader.ACTIVE);
if (nodes == null) {
// System.out.println("I have no replicas");
// I have no replicas
log.info(ZkCoreNodeProps.getCoreUrl(leaderProps) + " has no replicas");
return;
}
//System.out.println("tell my replicas to sync");
ZkCoreNodeProps zkLeader = new ZkCoreNodeProps(leaderProps);
for (ZkCoreNodeProps node : nodes) {
try {
// System.out
// .println("try and ask " + node.getCoreUrl() + " to sync");
log.info("try and ask " + node.getCoreUrl() + " to sync");
requestSync(node.getCoreUrl(), zkLeader.getCoreUrl(), node.getCoreName());
log.info(ZkCoreNodeProps.getCoreUrl(leaderProps) + ": try and ask " + node.getCoreUrl() + " to sync");
requestSync(node.getBaseUrl(), node.getCoreUrl(), zkLeader.getCoreUrl(), node.getCoreName());
} catch (Exception e) {
SolrException.log(log, "Error syncing replica to leader", e);
}
@ -221,27 +214,25 @@ public class SyncStrategy {
ShardResponse srsp = shardHandler.takeCompletedOrError();
if (srsp == null) break;
boolean success = handleResponse(srsp);
//System.out.println("got response:" + success);
if (srsp.getException() != null) {
SolrException.log(log, "Sync request error: " + srsp.getException());
}
if (!success) {
try {
log.info("Sync failed - asking replica to recover.");
log.info(ZkCoreNodeProps.getCoreUrl(leaderProps) + ": Sync failed - asking replica (" + srsp.getShardAddress() + ") to recover.");
// TODO: do this in background threads
RequestRecovery recoverRequestCmd = new RequestRecovery();
recoverRequestCmd.setAction(CoreAdminAction.REQUESTRECOVERY);
recoverRequestCmd.setCoreName(((SyncShardRequest)srsp.getShardRequest()).coreName);
HttpSolrServer server = new HttpSolrServer(srsp.getShardAddress());
server.setConnectionTimeout(45000);
server.setSoTimeout(45000);
server.request(recoverRequestCmd);
requestRecovery(((ShardCoreRequest)srsp.getShardRequest()).baseUrl, ((ShardCoreRequest)srsp.getShardRequest()).coreName);
} catch (Exception e) {
log.info("Could not tell a replica to recover", e);
SolrException.log(log, ZkCoreNodeProps.getCoreUrl(leaderProps) + ": Could not tell a replica to recover", e);
}
shardHandler.cancelAll();
break;
} else {
log.info(ZkCoreNodeProps.getCoreUrl(leaderProps) + ": " + " sync completed with " + srsp.getShardAddress());
}
}
}
private boolean handleResponse(ShardResponse srsp) {
@ -250,14 +241,19 @@ public class SyncStrategy {
if (response == null) {
return false;
}
boolean success = (Boolean) response.get("sync");
Boolean success = (Boolean) response.get("sync");
if (success == null) {
success = false;
}
return success;
}
private void requestSync(String replica, String leaderUrl, String coreName) {
SyncShardRequest sreq = new SyncShardRequest();
private void requestSync(String baseUrl, String replica, String leaderUrl, String coreName) {
ShardCoreRequest sreq = new ShardCoreRequest();
sreq.coreName = coreName;
sreq.baseUrl = baseUrl;
sreq.purpose = 1;
// TODO: this sucks
if (replica.startsWith("http://"))
@ -273,6 +269,18 @@ public class SyncStrategy {
shardHandler.submit(sreq, replica, sreq.params);
}
private void requestRecovery(String baseUrl, String coreName) throws SolrServerException, IOException {
// TODO: do this in background threads
RequestRecovery recoverRequestCmd = new RequestRecovery();
recoverRequestCmd.setAction(CoreAdminAction.REQUESTRECOVERY);
recoverRequestCmd.setCoreName(coreName);
HttpSolrServer server = new HttpSolrServer(baseUrl);
server.setConnectionTimeout(45000);
server.setSoTimeout(45000);
server.request(recoverRequestCmd);
}
public static ModifiableSolrParams params(String... params) {
ModifiableSolrParams msp = new ModifiableSolrParams();
for (int i = 0; i < params.length; i += 2) {

View File

@ -19,13 +19,12 @@ package org.apache.solr.handler.admin;
import java.io.File;
import java.io.IOException;
import java.util.Map;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Properties;
import java.util.Collections;
import org.apache.commons.io.FileUtils;
import org.apache.lucene.index.DirectoryReader;
@ -666,7 +665,7 @@ public class CoreAdminHandler extends RequestHandlerBase {
protected void handleRequestRecoveryAction(SolrQueryRequest req,
SolrQueryResponse rsp) throws IOException {
final SolrParams params = req.getParams();
log.info("The leader requested that we recover");
log.info("It has been requested that we recover");
String cname = params.get(CoreAdminParams.CORE);
if (cname == null) {
cname = "";
@ -675,6 +674,15 @@ public class CoreAdminHandler extends RequestHandlerBase {
try {
core = coreContainer.getCore(cname);
if (core != null) {
// try to publish as recovering right away
try {
coreContainer.getZkController().publish(core.getCoreDescriptor(), ZkStateReader.RECOVERING);
} catch (KeeperException e) {
SolrException.log(log, "", e);
} catch (InterruptedException e) {
SolrException.log(log, "", e);
}
core.getUpdateHandler().getSolrCoreState().doRecovery(coreContainer, cname);
} else {
SolrException.log(log, "Cound not find core to call recovery:" + cname);

View File

@ -28,7 +28,7 @@
adminPath: RequestHandler path to manage cores.
If 'null' (or absent), cores will not be manageable via request handler
-->
<cores adminPath="/admin/cores" defaultCoreName="collection1" host="127.0.0.1" hostPort="${hostPort:8983}" hostContext="solr" zkClientTimeout="12000" numShards="${numShards:3}">
<cores adminPath="/admin/cores" defaultCoreName="collection1" host="127.0.0.1" hostPort="${hostPort:8983}" hostContext="solr" zkClientTimeout="8000" numShards="${numShards:3}">
<core name="collection1" instanceDir="collection1" shard="${shard:}" collection="${collection:collection1}" config="${solrconfig:solrconfig.xml}" schema="${schema:schema.xml}"/>
</cores>
</solr>

View File

@ -114,6 +114,24 @@ public class FullSolrCloudTest extends AbstractDistributedZkTestCase {
String url;
CloudSolrServerClient client;
public ZkNodeProps info;
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((url == null) ? 0 : url.hashCode());
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj) return true;
if (obj == null) return false;
if (getClass() != obj.getClass()) return false;
CloudJettyRunner other = (CloudJettyRunner) obj;
if (url == null) {
if (other.url != null) return false;
} else if (!url.equals(other.url)) return false;
return true;
}
}
static class CloudSolrServerClient {
@ -418,7 +436,7 @@ public class FullSolrCloudTest extends AbstractDistributedZkTestCase {
List<CloudJettyRunner> jetties = shardToJetty.get(slice.getKey());
assertNotNull("Test setup problem: We found no jetties for shard: " + slice.getKey()
+ " just:" + shardToJetty.keySet(), jetties);
assertTrue(jetties.size() > 0);
assertEquals(slice.getValue().getShards().size(), jetties.size());
}
}

View File

@ -31,7 +31,6 @@ import org.apache.solr.client.solrj.impl.HttpSolrServer;
import org.apache.solr.client.solrj.request.QueryRequest;
import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.CollectionParams.CollectionAction;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.servlet.SolrDispatchFilter;
@ -77,7 +76,7 @@ public class SyncSliceTest extends FullSolrCloudTest {
public SyncSliceTest() {
super();
sliceCount = 1;
shardCount = 3;
shardCount = TEST_NIGHTLY ? 7 : 3;
}
@Override
@ -91,25 +90,31 @@ public class SyncSliceTest extends FullSolrCloudTest {
del("*:*");
List<String> skipServers = new ArrayList<String>();
indexDoc(skipServers, id, 0, i1, 50, tlong, 50, t1,
int docId = 0;
indexDoc(skipServers, id, docId++, i1, 50, tlong, 50, t1,
"to come to the aid of their country.");
indexDoc(skipServers, id, 1, i1, 50, tlong, 50, t1,
indexDoc(skipServers, id, docId++, i1, 50, tlong, 50, t1,
"old haven was blue.");
skipServers.add(shardToJetty.get("shard1").get(1).url + "/");
indexDoc(skipServers, id, 2, i1, 50, tlong, 50, t1,
indexDoc(skipServers, id, docId++, i1, 50, tlong, 50, t1,
"but the song was fancy.");
skipServers.add(shardToJetty.get("shard1").get(2).url + "/");
indexDoc(skipServers, id, 3, i1, 50, tlong, 50, t1,
indexDoc(skipServers, id,docId++, i1, 50, tlong, 50, t1,
"under the moon and over the lake");
commit();
waitForRecoveriesToFinish(false);
// shard should be inconsistent
String shardFailMessage = checkShardConsistency("shard1", true);
assertNotNull(shardFailMessage);
ModifiableSolrParams params = new ModifiableSolrParams();
params.set("action", CollectionAction.SYNCSHARD.toString());
params.set("collection", "collection1");
@ -131,32 +136,28 @@ public class SyncSliceTest extends FullSolrCloudTest {
long cloudClientDocs = cloudClient.query(new SolrQuery("*:*")).getResults().getNumFound();
assertEquals(4, cloudClientDocs);
skipServers = new ArrayList<String>();
skipServers.add(shardToJetty.get("shard1").get(random().nextInt(shardCount)).url + "/");
// this doc won't be on one node
indexDoc(skipServers, id, 4, i1, 50, tlong, 50, t1,
"to come to the aid of their country.");
// kill the leader - new leader could have all the docs or be missing one
CloudJettyRunner leaderJetty = shardToLeaderJetty.get("shard1");
skipServers = getRandomOtherJetty(leaderJetty, null); // but not the leader
// this doc won't be on one node
indexDoc(skipServers, id, docId++, i1, 50, tlong, 50, t1,
"to come to the aid of their country.");
Set<CloudJettyRunner> jetties = new HashSet<CloudJettyRunner>();
jetties.addAll(shardToJetty.get("shard1"));
jetties.remove(leaderJetty);
assertEquals(shardCount - 1, jetties.size());
chaosMonkey.killJetty(leaderJetty);
// we are careful to make sure the downed node is no longer in the state,
// because on some systems (especially freebsd w/ blackhole enabled), trying
// to talk to a downed node causes grief
for (CloudJettyRunner cjetty : jetties) {
waitToSeeNotLive(((SolrDispatchFilter) cjetty.jetty.getDispatchFilter()
.getFilter()).getCores().getZkController().getZkStateReader(),
leaderJetty);
}
waitToSeeNotLive(cloudClient.getZkStateReader(), leaderJetty);
waitToSeeDownInCloudState(leaderJetty, jetties);
waitForThingsToLevelOut();
@ -164,6 +165,105 @@ public class SyncSliceTest extends FullSolrCloudTest {
cloudClientDocs = cloudClient.query(new SolrQuery("*:*")).getResults().getNumFound();
assertEquals(5, cloudClientDocs);
CloudJettyRunner deadJetty = leaderJetty;
// let's get the latest leader
while (deadJetty == leaderJetty) {
updateMappingsFromZk(this.jettys, this.clients);
leaderJetty = shardToLeaderJetty.get("shard1");
}
// bring back dead node
ChaosMonkey.start(deadJetty.jetty); // he is not the leader anymore
// give a moment to be sure it has started recovering
Thread.sleep(2000);
waitForThingsToLevelOut();
waitForRecoveriesToFinish(false);
skipServers = getRandomOtherJetty(leaderJetty, null);
// skip list should be
//System.out.println("leader:" + leaderJetty.url);
//System.out.println("skip list:" + skipServers);
// we are skipping the leader and one node
assertEquals(1, skipServers.size());
// more docs than can peer sync
for (int i = 0; i < 300; i++) {
indexDoc(skipServers, id, docId++, i1, 50, tlong, 50, t1,
"to come to the aid of their country.");
}
commit();
waitForRecoveriesToFinish(false);
// shard should be inconsistent
shardFailMessage = checkShardConsistency("shard1", true);
assertNotNull(shardFailMessage);
jetties = new HashSet<CloudJettyRunner>();
jetties.addAll(shardToJetty.get("shard1"));
jetties.remove(leaderJetty);
assertEquals(shardCount - 1, jetties.size());
// kill the current leader
chaosMonkey.killJetty(leaderJetty);
waitToSeeDownInCloudState(leaderJetty, jetties);
Thread.sleep(4000);
waitForRecoveriesToFinish(false);
// TODO: for now, we just check consistency -
// there will be 305 or 5 docs depending on who
// becomes the leader - eventually we want that to
// always be the 305
//checkShardConsistency(true, true);
checkShardConsistency(false, true);
}
private List<String> getRandomJetty() {
return getRandomOtherJetty(null, null);
}
private List<String> getRandomOtherJetty(CloudJettyRunner leader, CloudJettyRunner down) {
List<String> skipServers = new ArrayList<String>();
List<CloudJettyRunner> candidates = new ArrayList<CloudJettyRunner>();
candidates.addAll(shardToJetty.get("shard1"));
if (leader != null) {
candidates.remove(leader);
}
if (down != null) {
candidates.remove(down);
}
CloudJettyRunner cjetty = candidates.get(random().nextInt(candidates.size()));
skipServers.add(cjetty.url + "/");
return skipServers;
}
private void waitToSeeDownInCloudState(CloudJettyRunner leaderJetty,
Set<CloudJettyRunner> jetties) throws InterruptedException {
for (CloudJettyRunner cjetty : jetties) {
waitToSeeNotLive(((SolrDispatchFilter) cjetty.jetty.getDispatchFilter()
.getFilter()).getCores().getZkController().getZkStateReader(),
leaderJetty);
}
waitToSeeNotLive(cloudClient.getZkStateReader(), leaderJetty);
}
private void waitForThingsToLevelOut() throws Exception {