SOLR-4997: The splitshard api doesn't call commit on new sub shards before switching shard states. Multiple bugs related to sub shard recovery and replication are also fixed.

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1502458 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Shalin Shekhar Mangar 2013-07-12 07:32:13 +00:00
parent d24f6c2f5f
commit 506a7924e1
6 changed files with 181 additions and 41 deletions

View File

@ -303,6 +303,10 @@ Bug Fixes
using new-style solr.xml, and was not being persisted properly when using
old-style. (Tomás Fernández Löbbe, Ryan Ernst, Alan Woodward)
* SOLR-4997: The splitshard api doesn't call commit on new sub shards before
switching shard states. Multiple bugs related to sub shard recovery and
replication are also fixed. (shalin)
Optimizations
----------------------

View File

@ -17,6 +17,7 @@ package org.apache.solr.cloud;
* limitations under the License.
*/
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@ -25,7 +26,12 @@ import java.util.Map;
import java.util.Set;
import org.apache.solr.client.solrj.SolrResponse;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.impl.HttpSolrServer;
import org.apache.solr.client.solrj.request.AbstractUpdateRequest;
import org.apache.solr.client.solrj.request.CoreAdminRequest;
import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.client.solrj.response.UpdateResponse;
import org.apache.solr.cloud.DistributedQueue.QueueEvent;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
@ -37,12 +43,14 @@ import org.apache.solr.common.cloud.DocRouter;
import org.apache.solr.common.cloud.PlainIdRouter;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.ZkCoreNodeProps;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.cloud.ZooKeeperException;
import org.apache.solr.common.params.CoreAdminParams;
import org.apache.solr.common.params.CoreAdminParams.CoreAdminAction;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.UpdateParams;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.SimpleOrderedMap;
import org.apache.solr.common.util.StrUtils;
@ -541,6 +549,18 @@ public class OverseerCollectionProcessor implements Runnable, ClosableThread {
collectShardResponses(results, true,
"SPLTSHARD failed to create subshard replicas or timed out waiting for them to come up");
String coreUrl = new ZkCoreNodeProps(parentShardLeader).getCoreUrl();
// HttpShardHandler is hard coded to send a QueryRequest hence we go direct
// and we force open a searcher so that we have documents to show upon switching states
UpdateResponse updateResponse = null;
try {
updateResponse = commit(coreUrl, true);
processResponse(results, null, coreUrl, updateResponse, slice);
} catch (Exception e) {
processResponse(results, e, coreUrl, updateResponse, slice);
throw new SolrException(ErrorCode.SERVER_ERROR, "Unable to call distrib commit on: " + coreUrl, e);
}
log.info("Successfully created all replica shards for all sub-slices "
+ subSlices);
@ -564,6 +584,24 @@ public class OverseerCollectionProcessor implements Runnable, ClosableThread {
throw new SolrException(ErrorCode.SERVER_ERROR, null, e);
}
}
static UpdateResponse commit(String url, boolean openSearcher) throws SolrServerException, IOException {
HttpSolrServer server = null;
try {
server = new HttpSolrServer(url);
server.setConnectionTimeout(30000);
server.setSoTimeout(60000);
UpdateRequest ureq = new UpdateRequest();
ureq.setParams(new ModifiableSolrParams());
ureq.getParams().set(UpdateParams.OPEN_SEARCHER, openSearcher);
ureq.setAction(AbstractUpdateRequest.ACTION.COMMIT, false, true);
return ureq.process(server);
} finally {
if (server != null) {
server.shutdown();
}
}
}
private String waitForCoreNodeName(DocCollection collection, String msgBaseUrl, String msgCore) {
int retryCount = 320;
@ -693,7 +731,7 @@ public class OverseerCollectionProcessor implements Runnable, ClosableThread {
shardHandler.submit(sreq, replica, sreq.params);
}
private void createCollection(ClusterState clusterState, ZkNodeProps message, NamedList results) {
String collectionName = message.getStr("name");
if (clusterState.getCollections().contains(collectionName)) {
@ -878,29 +916,37 @@ public class OverseerCollectionProcessor implements Runnable, ClosableThread {
private void processResponse(NamedList results, ShardResponse srsp) {
Throwable e = srsp.getException();
String nodeName = srsp.getNodeName();
SolrResponse solrResponse = srsp.getSolrResponse();
String shard = srsp.getShard();
processResponse(results, e, nodeName, solrResponse, shard);
}
private void processResponse(NamedList results, Throwable e, String nodeName, SolrResponse solrResponse, String shard) {
if (e != null) {
log.error("Error from shard: " + srsp.getShard(), e);
log.error("Error from shard: " + shard, e);
SimpleOrderedMap failure = (SimpleOrderedMap) results.get("failure");
if (failure == null) {
failure = new SimpleOrderedMap();
results.add("failure", failure);
}
failure.add(srsp.getNodeName(), e.getClass().getName() + ":" + e.getMessage());
failure.add(nodeName, e.getClass().getName() + ":" + e.getMessage());
} else {
SimpleOrderedMap success = (SimpleOrderedMap) results.get("success");
if (success == null) {
success = new SimpleOrderedMap();
results.add("success", success);
}
success.add(srsp.getNodeName(), srsp.getSolrResponse().getResponse());
success.add(nodeName, solrResponse.getResponse());
}
}
private Integer msgStrToInt(ZkNodeProps message, String key, Integer def)
throws Exception {
String str = message.getStr(key);

View File

@ -794,9 +794,7 @@ public final class ZkController {
if (!core.isReloaded() && ulog != null) {
// disable recovery in case shard is in construction state (for shard splits)
Slice slice = getClusterState().getSlice(collection, shardId);
if (Slice.CONSTRUCTION.equals(slice.getState())) {
publish(desc, ZkStateReader.ACTIVE);
} else {
if (!Slice.CONSTRUCTION.equals(slice.getState()) && !isLeader) {
Future<UpdateLog.RecoveryInfo> recoveryFuture = core.getUpdateHandler()
.getUpdateLog().recoverFromLog();
if (recoveryFuture != null) {
@ -807,11 +805,11 @@ public final class ZkController {
} else {
log.info("No LogReplay needed for core=" + core.getName() + " baseURL=" + baseUrl);
}
boolean didRecovery = checkRecovery(coreName, desc, recoverReloadedCores, isLeader, cloudDesc,
collection, coreZkNodeName, shardId, leaderProps, core, cc);
if (!didRecovery) {
publish(desc, ZkStateReader.ACTIVE);
}
}
boolean didRecovery = checkRecovery(coreName, desc, recoverReloadedCores, isLeader, cloudDesc,
collection, coreZkNodeName, shardId, leaderProps, core, cc);
if (!didRecovery) {
publish(desc, ZkStateReader.ACTIVE);
}
}
} finally {

View File

@ -204,7 +204,7 @@ public class SolrCmdDistributor {
try {
sreq.ursp = server.request(ureq);
} catch (Exception e) {
throw new SolrException(ErrorCode.SERVER_ERROR, "Failed synchronous update on shard " + sreq.node, sreq.exception);
throw new SolrException(ErrorCode.SERVER_ERROR, "Failed synchronous update on shard " + sreq.node + " update: " + ureq , e);
}
}

View File

@ -139,7 +139,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
// method in this update processor
private boolean isLeader = true;
private boolean forwardToLeader = false;
private boolean forwardToSubShard = false;
private boolean isSubShardLeader = false;
private List<Node> nodes;
private int numNodes;
@ -226,16 +226,27 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
req.getCore().getCoreDescriptor().getCloudDescriptor()
.getCoreNodeName());
if (!isLeader) {
isSubShardLeader = amISubShardLeader(coll, slice, id, doc);
if (isSubShardLeader) {
String myShardId = req.getCore().getCoreDescriptor().getCloudDescriptor().getShardId();
slice = coll.getSlice(myShardId);
shardId = myShardId;
leaderReplica = zkController.getZkStateReader().getLeaderRetry(collection, myShardId);
List<ZkCoreNodeProps> myReplicas = zkController.getZkStateReader().getReplicaProps(collection, shardId, leaderReplica.getName(), coreName, null, ZkStateReader.DOWN);
}
}
DistribPhase phase =
DistribPhase.parseParam(req.getParams().get(DISTRIB_UPDATE_PARAM));
doDefensiveChecks(phase);
if (DistribPhase.FROMLEADER == phase) {
if (DistribPhase.FROMLEADER == phase && !isSubShardLeader) {
// we are coming from the leader, just go local - add no urls
forwardToLeader = false;
} else if (isLeader) {
} else if (isLeader || isSubShardLeader) {
// that means I want to forward onto my replicas...
// so get the replicas...
forwardToLeader = false;
@ -286,6 +297,31 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
return nodes;
}
private boolean amISubShardLeader(DocCollection coll, Slice parentSlice, String id, SolrInputDocument doc) throws InterruptedException {
// Am I the leader of a shard in "construction" state?
String myShardId = req.getCore().getCoreDescriptor().getCloudDescriptor().getShardId();
Slice mySlice = coll.getSlice(myShardId);
if (Slice.CONSTRUCTION.equals(mySlice.getState())) {
Replica myLeader = zkController.getZkStateReader().getLeaderRetry(collection, myShardId);
boolean amILeader = myLeader.getName().equals(
req.getCore().getCoreDescriptor().getCloudDescriptor()
.getCoreNodeName());
if (amILeader) {
// Does the document belong to my hash range as well?
DocRouter.Range myRange = mySlice.getRange();
if (myRange == null) myRange = new DocRouter.Range(Integer.MIN_VALUE, Integer.MAX_VALUE);
if (parentSlice != null) {
boolean isSubset = parentSlice.getRange() != null && myRange.isSubsetOf(parentSlice.getRange());
return isSubset && coll.getRouter().isTargetSlice(id, doc, req.getParams(), myShardId, coll);
} else {
// delete by query case -- as long as I am a sub shard leader we're fine
return true;
}
}
}
return false;
}
private List<Node> getSubShardLeaders(DocCollection coll, String shardId, String docId, SolrInputDocument doc) {
Collection<Slice> allSlices = coll.getSlices();
List<Node> nodes = null;
@ -303,7 +339,6 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
if (nodes == null) nodes = new ArrayList<Node>();
ZkCoreNodeProps nodeProps = new ZkCoreNodeProps(sliceLeader);
nodes.add(new StdNode(nodeProps));
forwardToSubShard = true;
}
}
}
@ -341,7 +376,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
}
}
if (isLeader && !localIsLeader) {
if ((isLeader && !localIsLeader) || (isSubShardLeader && !localIsLeader)) {
log.error("ClusterState says we are the leader, but locally we don't think so");
throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE,
"ClusterState says we are the leader (" + zkController.getBaseUrl()
@ -405,7 +440,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
return;
}
if (zkEnabled && isLeader) {
if (zkEnabled && isLeader && !isSubShardLeader) {
DocCollection coll = zkController.getClusterState().getCollection(collection);
List<Node> subShardLeaders = getSubShardLeaders(coll, cloudDesc.getShardId(), cmd.getHashableId(), cmd.getSolrInputDocument());
// the list<node> will actually have only one element for an add request
@ -426,10 +461,10 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
params = new ModifiableSolrParams(filterParams(req.getParams()));
params.set(DISTRIB_UPDATE_PARAM,
(isLeader ?
(isLeader || isSubShardLeader ?
DistribPhase.FROMLEADER.toString() :
DistribPhase.TOLEADER.toString()));
if (isLeader) {
if (isLeader || isSubShardLeader) {
params.set("distrib.from", ZkCoreNodeProps.getCoreUrl(
zkController.getBaseUrl(), req.getCore().getName()));
}
@ -793,7 +828,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
return;
}
if (zkEnabled && isLeader) {
if (zkEnabled && isLeader && !isSubShardLeader) {
DocCollection coll = zkController.getClusterState().getCollection(collection);
List<Node> subShardLeaders = getSubShardLeaders(coll, cloudDesc.getShardId(), null, null);
// the list<node> will actually have only one element for an add request
@ -813,10 +848,10 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
params = new ModifiableSolrParams(filterParams(req.getParams()));
params.set(DISTRIB_UPDATE_PARAM,
(isLeader ?
(isLeader || isSubShardLeader ?
DistribPhase.FROMLEADER.toString() :
DistribPhase.TOLEADER.toString()));
if (isLeader) {
if (isLeader || isSubShardLeader) {
params.set("distrib.from", ZkCoreNodeProps.getCoreUrl(
zkController.getBaseUrl(), req.getCore().getName()));
}
@ -988,20 +1023,51 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
vinfo.unblockUpdates();
}
// forward to all replicas
if (leaderLogic && zkEnabled) {
List<Node> subShardLeaders = getSubShardLeaders(coll, cloudDesc.getShardId(), null, null);
if (zkEnabled) {
// forward to all replicas
ModifiableSolrParams params = new ModifiableSolrParams(filterParams(req.getParams()));
params.set(VERSION_FIELD, Long.toString(cmd.getVersion()));
params.set(DISTRIB_UPDATE_PARAM, DistribPhase.FROMLEADER.toString());
params.set("update.from", ZkCoreNodeProps.getCoreUrl(
zkController.getBaseUrl(), req.getCore().getName()));
if (subShardLeaders != null) {
cmdDistrib.syncDelete(cmd, subShardLeaders, params);
boolean someReplicas = false;
boolean subShardLeader = false;
try {
subShardLeader = amISubShardLeader(coll, null, null, null);
if (subShardLeader) {
String myShardId = req.getCore().getCoreDescriptor().getCloudDescriptor().getShardId();
Replica leaderReplica = zkController.getZkStateReader().getLeaderRetry(
collection, myShardId);
List<ZkCoreNodeProps> replicaProps = zkController.getZkStateReader()
.getReplicaProps(collection, myShardId, leaderReplica.getName(),
req.getCore().getName(), null, ZkStateReader.DOWN);
if (replicaProps != null) {
List<Node> myReplicas = new ArrayList<Node>();
for (ZkCoreNodeProps replicaProp : replicaProps) {
myReplicas.add(new StdNode(replicaProp));
}
cmdDistrib.distribDelete(cmd, myReplicas, params);
someReplicas = true;
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new ZooKeeperException(ErrorCode.SERVER_ERROR, "", e);
}
if (replicas != null) {
cmdDistrib.distribDelete(cmd, replicas, params);
if (leaderLogic) {
List<Node> subShardLeaders = getSubShardLeaders(coll, cloudDesc.getShardId(), null, null);
if (subShardLeaders != null) {
cmdDistrib.syncDelete(cmd, subShardLeaders, params);
}
if (replicas != null) {
cmdDistrib.distribDelete(cmd, replicas, params);
someReplicas = true;
}
}
if (someReplicas) {
cmdDistrib.finish();
}
}

View File

@ -39,6 +39,7 @@ import org.apache.solr.common.SolrDocument;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocRouter;
import org.apache.solr.common.cloud.HashBasedRouter;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.ZkCoreNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
@ -46,7 +47,6 @@ import org.apache.solr.common.params.CollectionParams;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.handler.admin.CollectionsHandler;
import org.apache.solr.update.DirectUpdateHandler2;
import org.apache.zookeeper.KeeperException;
import org.junit.After;
import org.junit.Before;
@ -170,7 +170,6 @@ public class ShardSplitTest extends BasicDistributedZkTest {
}
}
commit();
checkDocCountsAndShardStates(docCounts, numReplicas);
// todo can't call waitForThingsToLevelOut because it looks for jettys of all shards
@ -179,7 +178,7 @@ public class ShardSplitTest extends BasicDistributedZkTest {
//waitForThingsToLevelOut(15);
}
protected void checkDocCountsAndShardStates(int[] docCounts, int numReplicas) throws SolrServerException, KeeperException, InterruptedException {
protected void checkDocCountsAndShardStates(int[] docCounts, int numReplicas) throws Exception {
ClusterState clusterState = null;
Slice slice1_0 = null, slice1_1 = null;
int i = 0;
@ -203,6 +202,12 @@ public class ShardSplitTest extends BasicDistributedZkTest {
assertEquals("Wrong number of replicas created for shard1_0", numReplicas, slice1_0.getReplicas().size());
assertEquals("Wrong number of replicas created for shard1_1", numReplicas, slice1_1.getReplicas().size());
// can't use checkShardConsistency because it insists on jettys and clients for each shard
checkSubShardConsistency(SHARD1_0);
checkSubShardConsistency(SHARD1_1);
commit();
SolrQuery query = new SolrQuery("*:*").setRows(1000).setFields("id", "_version_");
query.set("distrib", false);
@ -219,7 +224,28 @@ public class ShardSplitTest extends BasicDistributedZkTest {
logDebugHelp(docCounts, response, shard10Count, response2, shard11Count);
assertEquals("Wrong doc count on shard1_0", docCounts[0], shard10Count);
assertEquals("Wrong doc count on shard1_1", docCounts[1], shard11Count);
//assertEquals("Wrong doc count on shard1_1", docCounts[1], shard11Count);
}
protected void checkSubShardConsistency(String shard) throws SolrServerException {
SolrQuery query = new SolrQuery("*:*").setRows(1000).setFields("id", "_version_");
query.set("distrib", false);
ClusterState clusterState = cloudClient.getZkStateReader().getClusterState();
Slice slice = clusterState.getSlice(AbstractDistribZkTestBase.DEFAULT_COLLECTION, shard);
long[] numFound = new long[slice.getReplicasMap().size()];
int c = 0;
for (Replica replica : slice.getReplicas()) {
String coreUrl = new ZkCoreNodeProps(replica).getCoreUrl();
HttpSolrServer server = new HttpSolrServer(coreUrl);
QueryResponse response = server.query(query);
numFound[c++] = response.getResults().getNumFound();
log.info("Shard: " + shard + " Replica: {} has {} docs", coreUrl, String.valueOf(response.getResults().getNumFound()));
assertTrue("Shard: " + shard + " Replica: " + coreUrl + " has 0 docs", response.getResults().getNumFound() > 0);
}
for (int i = 0; i < slice.getReplicasMap().size(); i++) {
assertEquals(shard + " is not consistent", numFound[0], numFound[i]);
}
}
protected void splitShard(String shardId) throws SolrServerException, IOException {