From 89056ffd9d9fc60ddd6a21230e1a752f21f81647 Mon Sep 17 00:00:00 2001 From: Mark Robert Miller Date: Sat, 28 Jul 2012 00:35:46 +0000 Subject: [PATCH] add sync tests and logging git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1366572 13f79535-47bb-0310-9956-ffa450edef68 --- .../org/apache/solr/cloud/SyncStrategy.java | 86 ++++++----- .../solr/handler/admin/CoreAdminHandler.java | 14 +- solr/core/src/test-files/solr/solr.xml | 2 +- .../apache/solr/cloud/FullSolrCloudTest.java | 20 ++- .../org/apache/solr/cloud/SyncSliceTest.java | 142 +++++++++++++++--- 5 files changed, 199 insertions(+), 65 deletions(-) diff --git a/solr/core/src/java/org/apache/solr/cloud/SyncStrategy.java b/solr/core/src/java/org/apache/solr/cloud/SyncStrategy.java index b9b3dc4fe93..c2f60fb314a 100644 --- a/solr/core/src/java/org/apache/solr/cloud/SyncStrategy.java +++ b/solr/core/src/java/org/apache/solr/cloud/SyncStrategy.java @@ -17,11 +17,13 @@ package org.apache.solr.cloud; * limitations under the License. */ +import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Map; import org.apache.http.client.HttpClient; +import org.apache.solr.client.solrj.SolrServerException; import org.apache.solr.client.solrj.impl.HttpClientUtil; import org.apache.solr.client.solrj.impl.HttpSolrServer; import org.apache.solr.client.solrj.request.CoreAdminRequest.RequestRecovery; @@ -63,8 +65,9 @@ public class SyncStrategy { shardHandler = new HttpShardHandlerFactory().getShardHandler(client); } - private static class SyncShardRequest extends ShardRequest { + private static class ShardCoreRequest extends ShardRequest { String coreName; + public String baseUrl; } public boolean sync(ZkController zkController, SolrCore core, @@ -105,23 +108,19 @@ public class SyncStrategy { if (!success && !areAnyOtherReplicasActive(zkController, leaderProps, collection, shardId)) { -// System.out -// .println("wasnt a success but no on else i active! I am the leader"); - + log.info("Sync was not a success but no on else i active! I am the leader"); success = true; } if (success) { - // solrcloud_debug - // System.out.println("Sync success"); - // we are the leader - tell all of our replias to sync with us + log.info("Sync Success - now sync replicas to me"); syncToMe(zkController, collection, shardId, leaderProps); } else { + SolrException.log(log, "Sync Failed"); - // solrcloud_debug - // System.out.println("Sync failure"); + // lets see who seems ahead... } } catch (Exception e) { @@ -163,11 +162,7 @@ public class SyncStrategy { .getReplicaProps(collection, shardId, props.get(ZkStateReader.NODE_NAME_PROP), props.get(ZkStateReader.CORE_NAME_PROP), ZkStateReader.ACTIVE); // TODO: - // should - // there - // be a - // state - // filter? + // TODO should there be a state filter? if (nodes == null) { // I have no replicas @@ -198,19 +193,17 @@ public class SyncStrategy { leaderProps.get(ZkStateReader.NODE_NAME_PROP), leaderProps.get(ZkStateReader.CORE_NAME_PROP), ZkStateReader.ACTIVE); if (nodes == null) { - // System.out.println("I have no replicas"); - // I have no replicas + log.info(ZkCoreNodeProps.getCoreUrl(leaderProps) + " has no replicas"); return; } - //System.out.println("tell my replicas to sync"); + ZkCoreNodeProps zkLeader = new ZkCoreNodeProps(leaderProps); for (ZkCoreNodeProps node : nodes) { try { -// System.out -// .println("try and ask " + node.getCoreUrl() + " to sync"); - log.info("try and ask " + node.getCoreUrl() + " to sync"); - requestSync(node.getCoreUrl(), zkLeader.getCoreUrl(), node.getCoreName()); - + log.info(ZkCoreNodeProps.getCoreUrl(leaderProps) + ": try and ask " + node.getCoreUrl() + " to sync"); + + requestSync(node.getBaseUrl(), node.getCoreUrl(), zkLeader.getCoreUrl(), node.getCoreName()); + } catch (Exception e) { SolrException.log(log, "Error syncing replica to leader", e); } @@ -221,27 +214,25 @@ public class SyncStrategy { ShardResponse srsp = shardHandler.takeCompletedOrError(); if (srsp == null) break; boolean success = handleResponse(srsp); - //System.out.println("got response:" + success); + if (srsp.getException() != null) { + SolrException.log(log, "Sync request error: " + srsp.getException()); + } + if (!success) { try { - log.info("Sync failed - asking replica to recover."); + log.info(ZkCoreNodeProps.getCoreUrl(leaderProps) + ": Sync failed - asking replica (" + srsp.getShardAddress() + ") to recover."); - // TODO: do this in background threads - RequestRecovery recoverRequestCmd = new RequestRecovery(); - recoverRequestCmd.setAction(CoreAdminAction.REQUESTRECOVERY); - recoverRequestCmd.setCoreName(((SyncShardRequest)srsp.getShardRequest()).coreName); - - HttpSolrServer server = new HttpSolrServer(srsp.getShardAddress()); - server.setConnectionTimeout(45000); - server.setSoTimeout(45000); - server.request(recoverRequestCmd); + requestRecovery(((ShardCoreRequest)srsp.getShardRequest()).baseUrl, ((ShardCoreRequest)srsp.getShardRequest()).coreName); + } catch (Exception e) { - log.info("Could not tell a replica to recover", e); + SolrException.log(log, ZkCoreNodeProps.getCoreUrl(leaderProps) + ": Could not tell a replica to recover", e); } - shardHandler.cancelAll(); - break; + } else { + log.info(ZkCoreNodeProps.getCoreUrl(leaderProps) + ": " + " sync completed with " + srsp.getShardAddress()); } } + + } private boolean handleResponse(ShardResponse srsp) { @@ -250,14 +241,19 @@ public class SyncStrategy { if (response == null) { return false; } - boolean success = (Boolean) response.get("sync"); + Boolean success = (Boolean) response.get("sync"); + + if (success == null) { + success = false; + } return success; } - private void requestSync(String replica, String leaderUrl, String coreName) { - SyncShardRequest sreq = new SyncShardRequest(); + private void requestSync(String baseUrl, String replica, String leaderUrl, String coreName) { + ShardCoreRequest sreq = new ShardCoreRequest(); sreq.coreName = coreName; + sreq.baseUrl = baseUrl; sreq.purpose = 1; // TODO: this sucks if (replica.startsWith("http://")) @@ -273,6 +269,18 @@ public class SyncStrategy { shardHandler.submit(sreq, replica, sreq.params); } + private void requestRecovery(String baseUrl, String coreName) throws SolrServerException, IOException { + // TODO: do this in background threads + RequestRecovery recoverRequestCmd = new RequestRecovery(); + recoverRequestCmd.setAction(CoreAdminAction.REQUESTRECOVERY); + recoverRequestCmd.setCoreName(coreName); + + HttpSolrServer server = new HttpSolrServer(baseUrl); + server.setConnectionTimeout(45000); + server.setSoTimeout(45000); + server.request(recoverRequestCmd); + } + public static ModifiableSolrParams params(String... params) { ModifiableSolrParams msp = new ModifiableSolrParams(); for (int i = 0; i < params.length; i += 2) { diff --git a/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java b/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java index 5fc81ad8894..92f3cfaad04 100644 --- a/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java +++ b/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java @@ -19,13 +19,12 @@ package org.apache.solr.handler.admin; import java.io.File; import java.io.IOException; -import java.util.Map; +import java.util.Collections; import java.util.Date; import java.util.HashMap; import java.util.Iterator; import java.util.Map; import java.util.Properties; -import java.util.Collections; import org.apache.commons.io.FileUtils; import org.apache.lucene.index.DirectoryReader; @@ -666,7 +665,7 @@ public class CoreAdminHandler extends RequestHandlerBase { protected void handleRequestRecoveryAction(SolrQueryRequest req, SolrQueryResponse rsp) throws IOException { final SolrParams params = req.getParams(); - log.info("The leader requested that we recover"); + log.info("It has been requested that we recover"); String cname = params.get(CoreAdminParams.CORE); if (cname == null) { cname = ""; @@ -675,6 +674,15 @@ public class CoreAdminHandler extends RequestHandlerBase { try { core = coreContainer.getCore(cname); if (core != null) { + // try to publish as recovering right away + try { + coreContainer.getZkController().publish(core.getCoreDescriptor(), ZkStateReader.RECOVERING); + } catch (KeeperException e) { + SolrException.log(log, "", e); + } catch (InterruptedException e) { + SolrException.log(log, "", e); + } + core.getUpdateHandler().getSolrCoreState().doRecovery(coreContainer, cname); } else { SolrException.log(log, "Cound not find core to call recovery:" + cname); diff --git a/solr/core/src/test-files/solr/solr.xml b/solr/core/src/test-files/solr/solr.xml index ccfb224841e..3045891fe29 100644 --- a/solr/core/src/test-files/solr/solr.xml +++ b/solr/core/src/test-files/solr/solr.xml @@ -28,7 +28,7 @@ adminPath: RequestHandler path to manage cores. If 'null' (or absent), cores will not be manageable via request handler --> - + diff --git a/solr/core/src/test/org/apache/solr/cloud/FullSolrCloudTest.java b/solr/core/src/test/org/apache/solr/cloud/FullSolrCloudTest.java index 342c310262a..23b82b173d8 100644 --- a/solr/core/src/test/org/apache/solr/cloud/FullSolrCloudTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/FullSolrCloudTest.java @@ -114,6 +114,24 @@ public class FullSolrCloudTest extends AbstractDistributedZkTestCase { String url; CloudSolrServerClient client; public ZkNodeProps info; + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + ((url == null) ? 0 : url.hashCode()); + return result; + } + @Override + public boolean equals(Object obj) { + if (this == obj) return true; + if (obj == null) return false; + if (getClass() != obj.getClass()) return false; + CloudJettyRunner other = (CloudJettyRunner) obj; + if (url == null) { + if (other.url != null) return false; + } else if (!url.equals(other.url)) return false; + return true; + } } static class CloudSolrServerClient { @@ -418,7 +436,7 @@ public class FullSolrCloudTest extends AbstractDistributedZkTestCase { List jetties = shardToJetty.get(slice.getKey()); assertNotNull("Test setup problem: We found no jetties for shard: " + slice.getKey() + " just:" + shardToJetty.keySet(), jetties); - assertTrue(jetties.size() > 0); + assertEquals(slice.getValue().getShards().size(), jetties.size()); } } diff --git a/solr/core/src/test/org/apache/solr/cloud/SyncSliceTest.java b/solr/core/src/test/org/apache/solr/cloud/SyncSliceTest.java index efc5bf91ea5..c2de9b9f53c 100644 --- a/solr/core/src/test/org/apache/solr/cloud/SyncSliceTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/SyncSliceTest.java @@ -31,7 +31,6 @@ import org.apache.solr.client.solrj.impl.HttpSolrServer; import org.apache.solr.client.solrj.request.QueryRequest; import org.apache.solr.client.solrj.request.UpdateRequest; import org.apache.solr.common.SolrInputDocument; -import org.apache.solr.common.cloud.ZkStateReader; import org.apache.solr.common.params.CollectionParams.CollectionAction; import org.apache.solr.common.params.ModifiableSolrParams; import org.apache.solr.servlet.SolrDispatchFilter; @@ -77,7 +76,7 @@ public class SyncSliceTest extends FullSolrCloudTest { public SyncSliceTest() { super(); sliceCount = 1; - shardCount = 3; + shardCount = TEST_NIGHTLY ? 7 : 3; } @Override @@ -91,25 +90,31 @@ public class SyncSliceTest extends FullSolrCloudTest { del("*:*"); List skipServers = new ArrayList(); - - indexDoc(skipServers, id, 0, i1, 50, tlong, 50, t1, + int docId = 0; + indexDoc(skipServers, id, docId++, i1, 50, tlong, 50, t1, "to come to the aid of their country."); - indexDoc(skipServers, id, 1, i1, 50, tlong, 50, t1, + indexDoc(skipServers, id, docId++, i1, 50, tlong, 50, t1, "old haven was blue."); skipServers.add(shardToJetty.get("shard1").get(1).url + "/"); - indexDoc(skipServers, id, 2, i1, 50, tlong, 50, t1, + indexDoc(skipServers, id, docId++, i1, 50, tlong, 50, t1, "but the song was fancy."); skipServers.add(shardToJetty.get("shard1").get(2).url + "/"); - indexDoc(skipServers, id, 3, i1, 50, tlong, 50, t1, + indexDoc(skipServers, id,docId++, i1, 50, tlong, 50, t1, "under the moon and over the lake"); commit(); + + waitForRecoveriesToFinish(false); + // shard should be inconsistent + String shardFailMessage = checkShardConsistency("shard1", true); + assertNotNull(shardFailMessage); + ModifiableSolrParams params = new ModifiableSolrParams(); params.set("action", CollectionAction.SYNCSHARD.toString()); params.set("collection", "collection1"); @@ -131,32 +136,28 @@ public class SyncSliceTest extends FullSolrCloudTest { long cloudClientDocs = cloudClient.query(new SolrQuery("*:*")).getResults().getNumFound(); assertEquals(4, cloudClientDocs); - skipServers = new ArrayList(); - - skipServers.add(shardToJetty.get("shard1").get(random().nextInt(shardCount)).url + "/"); - - // this doc won't be on one node - indexDoc(skipServers, id, 4, i1, 50, tlong, 50, t1, - "to come to the aid of their country."); // kill the leader - new leader could have all the docs or be missing one CloudJettyRunner leaderJetty = shardToLeaderJetty.get("shard1"); - + + skipServers = getRandomOtherJetty(leaderJetty, null); // but not the leader + + // this doc won't be on one node + indexDoc(skipServers, id, docId++, i1, 50, tlong, 50, t1, + "to come to the aid of their country."); + + Set jetties = new HashSet(); jetties.addAll(shardToJetty.get("shard1")); jetties.remove(leaderJetty); + assertEquals(shardCount - 1, jetties.size()); chaosMonkey.killJetty(leaderJetty); // we are careful to make sure the downed node is no longer in the state, // because on some systems (especially freebsd w/ blackhole enabled), trying // to talk to a downed node causes grief - for (CloudJettyRunner cjetty : jetties) { - waitToSeeNotLive(((SolrDispatchFilter) cjetty.jetty.getDispatchFilter() - .getFilter()).getCores().getZkController().getZkStateReader(), - leaderJetty); - } - waitToSeeNotLive(cloudClient.getZkStateReader(), leaderJetty); + waitToSeeDownInCloudState(leaderJetty, jetties); waitForThingsToLevelOut(); @@ -164,6 +165,105 @@ public class SyncSliceTest extends FullSolrCloudTest { cloudClientDocs = cloudClient.query(new SolrQuery("*:*")).getResults().getNumFound(); assertEquals(5, cloudClientDocs); + + CloudJettyRunner deadJetty = leaderJetty; + + // let's get the latest leader + while (deadJetty == leaderJetty) { + updateMappingsFromZk(this.jettys, this.clients); + leaderJetty = shardToLeaderJetty.get("shard1"); + } + + // bring back dead node + ChaosMonkey.start(deadJetty.jetty); // he is not the leader anymore + + // give a moment to be sure it has started recovering + Thread.sleep(2000); + + waitForThingsToLevelOut(); + waitForRecoveriesToFinish(false); + + skipServers = getRandomOtherJetty(leaderJetty, null); + + // skip list should be + + //System.out.println("leader:" + leaderJetty.url); + //System.out.println("skip list:" + skipServers); + + // we are skipping the leader and one node + assertEquals(1, skipServers.size()); + + // more docs than can peer sync + for (int i = 0; i < 300; i++) { + indexDoc(skipServers, id, docId++, i1, 50, tlong, 50, t1, + "to come to the aid of their country."); + } + + commit(); + + waitForRecoveriesToFinish(false); + + // shard should be inconsistent + shardFailMessage = checkShardConsistency("shard1", true); + assertNotNull(shardFailMessage); + + + jetties = new HashSet(); + jetties.addAll(shardToJetty.get("shard1")); + jetties.remove(leaderJetty); + assertEquals(shardCount - 1, jetties.size()); + + + // kill the current leader + chaosMonkey.killJetty(leaderJetty); + + waitToSeeDownInCloudState(leaderJetty, jetties); + + Thread.sleep(4000); + + waitForRecoveriesToFinish(false); + + + // TODO: for now, we just check consistency - + // there will be 305 or 5 docs depending on who + // becomes the leader - eventually we want that to + // always be the 305 + //checkShardConsistency(true, true); + checkShardConsistency(false, true); + + } + + private List getRandomJetty() { + return getRandomOtherJetty(null, null); + } + + private List getRandomOtherJetty(CloudJettyRunner leader, CloudJettyRunner down) { + List skipServers = new ArrayList(); + List candidates = new ArrayList(); + candidates.addAll(shardToJetty.get("shard1")); + + if (leader != null) { + candidates.remove(leader); + } + + if (down != null) { + candidates.remove(down); + } + + CloudJettyRunner cjetty = candidates.get(random().nextInt(candidates.size())); + skipServers.add(cjetty.url + "/"); + return skipServers; + } + + private void waitToSeeDownInCloudState(CloudJettyRunner leaderJetty, + Set jetties) throws InterruptedException { + + for (CloudJettyRunner cjetty : jetties) { + waitToSeeNotLive(((SolrDispatchFilter) cjetty.jetty.getDispatchFilter() + .getFilter()).getCores().getZkController().getZkStateReader(), + leaderJetty); + } + waitToSeeNotLive(cloudClient.getZkStateReader(), leaderJetty); } private void waitForThingsToLevelOut() throws Exception {