SOLR-5324: Make sub shard replica recovery and shard state switch asynchronous

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1530994 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Shalin Shekhar Mangar 2013-10-10 14:26:29 +00:00
parent 1c4c891e8e
commit 2ab0051551
12 changed files with 157 additions and 42 deletions

View File

@ -92,6 +92,9 @@ New Features
* SOLR-5226: Add Lucene index heap usage to the Solr admin UI.
(Areek Zillur via Robert Muir)
* SOLR-5324: Make sub shard replica recovery and shard state switch asynchronous.
(Yago Riveiro, shalin)
Bug Fixes
----------------------
* SOLR-4590: Collections API should return a nice error when not in SolrCloud mode.

View File

@ -38,6 +38,7 @@ public class CloudDescriptor {
* Use the values from {@link Slice} instead */
volatile String shardRange = null;
volatile String shardState = Slice.ACTIVE;
volatile String shardParent = null;
volatile boolean isLeader = false;
volatile String lastPublished = ZkStateReader.ACTIVE;
@ -45,6 +46,7 @@ public class CloudDescriptor {
public static final String SHARD_STATE = "shardState";
public static final String NUM_SHARDS = "numShards";
public static final String SHARD_RANGE = "shardRange";
public static final String SHARD_PARENT = "shardParent";
public CloudDescriptor(String coreName, Properties props) {
this.shardId = props.getProperty(CoreDescriptor.CORE_SHARD, null);
@ -55,6 +57,7 @@ public class CloudDescriptor {
this.shardState = props.getProperty(CloudDescriptor.SHARD_STATE, Slice.ACTIVE);
this.numShards = PropertiesUtil.toInteger(props.getProperty(CloudDescriptor.NUM_SHARDS), null);
this.shardRange = props.getProperty(CloudDescriptor.SHARD_RANGE, null);
this.shardParent = props.getProperty(CloudDescriptor.SHARD_PARENT, null);
}
public String getLastPublished() {
@ -134,4 +137,12 @@ public class CloudDescriptor {
public void setShardState(String shardState) {
this.shardState = shardState;
}
public String getShardParent() {
return shardParent;
}
public void setShardParent(String shardParent) {
this.shardParent = shardParent;
}
}

View File

@ -234,7 +234,7 @@ public class Overseer {
private ClusterState updateShardState(ClusterState clusterState, ZkNodeProps message) {
String collection = message.getStr(ZkStateReader.COLLECTION_PROP);
log.info("Update shard state invoked for collection: " + collection);
log.info("Update shard state invoked for collection: " + collection + " with message: " + message);
for (String key : message.keySet()) {
if (ZkStateReader.COLLECTION_PROP.equals(key)) continue;
if (QUEUE_OPERATION.equals(key)) continue;
@ -245,6 +245,9 @@ public class Overseer {
}
log.info("Update shard state " + key + " to " + message.getStr(key));
Map<String, Object> props = slice.shallowCopy();
if (Slice.RECOVERY.equals(props.get(Slice.STATE)) && Slice.ACTIVE.equals(message.getStr(key))) {
props.remove(Slice.PARENT);
}
props.put(Slice.STATE, message.getStr(key));
Slice newSlice = new Slice(slice.getName(), slice.getReplicasCopy(), props);
clusterState = updateSlice(clusterState, collection, newSlice);
@ -344,14 +347,6 @@ public class Overseer {
replicaProps.putAll(message.getProperties());
// System.out.println("########## UPDATE MESSAGE: " + JSONUtil.toJSON(message));
if (slice != null) {
String sliceState = slice.getState();
// throw an exception if the slice is not yet active.
//if(!sliceState.equals(Slice.ACTIVE)) {
// throw new SolrException(ErrorCode.BAD_REQUEST, "Can not assign core to a non-active slice [" + slice.getName() + "]");
//}
Replica oldReplica = slice.getReplicasMap().get(coreNodeName);
if (oldReplica != null && oldReplica.containsKey(ZkStateReader.LEADER_PROP)) {
replicaProps.put(ZkStateReader.LEADER_PROP, oldReplica.get(ZkStateReader.LEADER_PROP));
@ -380,6 +375,7 @@ public class Overseer {
// remove shard specific properties
String shardRange = (String) replicaProps.remove(ZkStateReader.SHARD_RANGE_PROP);
String shardState = (String) replicaProps.remove(ZkStateReader.SHARD_STATE_PROP);
String shardParent = (String) replicaProps.remove(ZkStateReader.SHARD_PARENT_PROP);
Replica replica = new Replica(coreNodeName, replicaProps);
@ -390,6 +386,9 @@ public class Overseer {
Map<String,Replica> replicas;
if (slice != null) {
state = checkAndCompleteShardSplit(state, collection, coreNodeName, sliceName, replicaProps);
// get the current slice again because it may have been updated due to checkAndCompleteShardSplit method
slice = state.getSlice(collection, sliceName);
sliceProps = slice.getProperties();
replicas = slice.getReplicasCopy();
} else {
@ -397,6 +396,7 @@ public class Overseer {
sliceProps = new HashMap<String, Object>();
sliceProps.put(Slice.RANGE, shardRange);
sliceProps.put(Slice.STATE, shardState);
sliceProps.put(Slice.PARENT, shardParent);
}
replicas.put(replica.getName(), replica);
@ -406,6 +406,71 @@ public class Overseer {
return newClusterState;
}
private ClusterState checkAndCompleteShardSplit(ClusterState state, String collection, String coreNodeName, String sliceName, Map<String,Object> replicaProps) {
Slice slice = state.getSlice(collection, sliceName);
Map<String, Object> sliceProps = slice.getProperties();
String sliceState = slice.getState();
if (Slice.RECOVERY.equals(sliceState)) {
log.info("Shard: {} is in recovery state", sliceName);
// is this replica active?
if (ZkStateReader.ACTIVE.equals(replicaProps.get(ZkStateReader.STATE_PROP))) {
log.info("Shard: {} is in recovery state and coreNodeName: {} is active", sliceName, coreNodeName);
// are all other replicas also active?
boolean allActive = true;
for (Entry<String, Replica> entry : slice.getReplicasMap().entrySet()) {
if (coreNodeName.equals(entry.getKey())) continue;
if (!Slice.ACTIVE.equals(entry.getValue().getStr(Slice.STATE))) {
allActive = false;
break;
}
}
if (allActive) {
log.info("Shard: {} - all replicas are active. Finding status of fellow sub-shards", sliceName);
// find out about other sub shards
Map<String, Slice> allSlicesCopy = new HashMap<String, Slice>(state.getSlicesMap(collection));
List<Slice> subShardSlices = new ArrayList<Slice>();
outer:
for (Entry<String, Slice> entry : allSlicesCopy.entrySet()) {
if (sliceName.equals(entry.getKey()))
continue;
Slice otherSlice = entry.getValue();
if (Slice.RECOVERY.equals(otherSlice.getState())) {
if (slice.getParent() != null && slice.getParent().equals(otherSlice.getParent())) {
log.info("Shard: {} - Fellow sub-shard: {} found", sliceName, otherSlice.getName());
// this is a fellow sub shard so check if all replicas are active
for (Entry<String, Replica> sliceEntry : otherSlice.getReplicasMap().entrySet()) {
if (!ZkStateReader.ACTIVE.equals(sliceEntry.getValue().getStr(ZkStateReader.STATE_PROP))) {
allActive = false;
break outer;
}
}
log.info("Shard: {} - Fellow sub-shard: {} has all replicas active", sliceName, otherSlice.getName());
subShardSlices.add(otherSlice);
}
}
}
if (allActive) {
// hurray, all sub shard replicas are active
log.info("Shard: {} - All replicas across all fellow sub-shards are now ACTIVE. Preparing to switch shard states.", sliceName);
String parentSliceName = (String) sliceProps.remove(Slice.PARENT);
Map<String, Object> propMap = new HashMap<String, Object>();
propMap.put(Overseer.QUEUE_OPERATION, "updateshardstate");
propMap.put(parentSliceName, Slice.INACTIVE);
propMap.put(sliceName, Slice.ACTIVE);
for (Slice subShardSlice : subShardSlices) {
propMap.put(subShardSlice.getName(), Slice.ACTIVE);
}
propMap.put(ZkStateReader.COLLECTION_PROP, collection);
ZkNodeProps m = new ZkNodeProps(propMap);
state = updateShardState(state, m);
}
}
}
}
return state;
}
private ClusterState createCollection(ClusterState state, String collectionName, List<String> shards , ZkNodeProps message) {
log.info("Create collection {} with shards {}", collectionName, shards);

View File

@ -524,18 +524,19 @@ public class OverseerCollectionProcessor implements Runnable, ClosableThread {
if (oSlice != null) {
if (Slice.ACTIVE.equals(oSlice.getState())) {
throw new SolrException(ErrorCode.BAD_REQUEST, "Sub-shard: " + subSlice + " exists in active state. Aborting split shard.");
} else if (Slice.CONSTRUCTION.equals(oSlice.getState())) {
for (Replica replica : oSlice.getReplicas()) {
if (clusterState.liveNodesContain(replica.getNodeName())) {
String core = replica.getStr("core");
log.info("Unloading core: " + core + " from node: " + replica.getNodeName());
ModifiableSolrParams params = new ModifiableSolrParams();
params.set(CoreAdminParams.ACTION, CoreAdminAction.UNLOAD.toString());
params.set(CoreAdminParams.CORE, core);
params.set(CoreAdminParams.DELETE_INDEX, "true");
sendShardRequest(replica.getNodeName(), params);
} else {
log.warn("Replica {} exists in shard {} but is not live and cannot be unloaded", replica, oSlice);
} else if (Slice.CONSTRUCTION.equals(oSlice.getState()) || Slice.RECOVERY.equals(oSlice.getState())) {
// delete the shards
for (String sub : subSlices) {
log.info("Sub-shard: {} already exists therefore requesting its deletion", sub);
Map<String, Object> propMap = new HashMap<String, Object>();
propMap.put(Overseer.QUEUE_OPERATION, "deleteshard");
propMap.put(COLLECTION_PROP, collectionName);
propMap.put(SHARD_ID_PROP, sub);
ZkNodeProps m = new ZkNodeProps(propMap);
try {
deleteShard(clusterState, m, new NamedList());
} catch (Exception e) {
throw new SolrException(ErrorCode.SERVER_ERROR, "Unable to delete already existing sub shard: " + sub, e);
}
}
}
@ -564,6 +565,7 @@ public class OverseerCollectionProcessor implements Runnable, ClosableThread {
params.set(CoreAdminParams.SHARD, subSlice);
params.set(CoreAdminParams.SHARD_RANGE, subRange.toString());
params.set(CoreAdminParams.SHARD_STATE, Slice.CONSTRUCTION);
params.set(CoreAdminParams.SHARD_PARENT, parentSlice.getName());
sendShardRequest(nodeName, params);
}
@ -689,7 +691,7 @@ public class OverseerCollectionProcessor implements Runnable, ClosableThread {
cmd.setCoreName(subShardNames.get(i-1));
cmd.setNodeName(subShardNodeName);
cmd.setCoreNodeName(coreNodeName);
cmd.setState(ZkStateReader.ACTIVE);
cmd.setState(ZkStateReader.RECOVERING);
cmd.setCheckLive(true);
cmd.setOnlyIfLeader(true);
sendShardRequest(nodeName, new ModifiableSolrParams(cmd.getParams()));
@ -699,6 +701,8 @@ public class OverseerCollectionProcessor implements Runnable, ClosableThread {
collectShardResponses(results, true,
"SPLTSHARD failed to create subshard replicas or timed out waiting for them to come up");
log.info("Successfully created all replica shards for all sub-slices " + subSlices);
log.info("Calling soft commit to make sub shard updates visible");
String coreUrl = new ZkCoreNodeProps(parentShardLeader).getCoreUrl();
// HttpShardHandler is hard coded to send a QueryRequest hence we go direct
@ -712,10 +716,9 @@ public class OverseerCollectionProcessor implements Runnable, ClosableThread {
throw new SolrException(ErrorCode.SERVER_ERROR, "Unable to call distrib softCommit on: " + coreUrl, e);
}
log.info("Successfully created all replica shards for all sub-slices "
+ subSlices);
log.info("Requesting update shard state");
if (repFactor == 1) {
// switch sub shard states to 'active'
log.info("Replication factor is 1 so switching shard states");
DistributedQueue inQueue = Overseer.getInQueue(zkStateReader.getZkClient());
Map<String, Object> propMap = new HashMap<String, Object>();
propMap.put(Overseer.QUEUE_OPERATION, "updateshardstate");
@ -726,6 +729,18 @@ public class OverseerCollectionProcessor implements Runnable, ClosableThread {
propMap.put(ZkStateReader.COLLECTION_PROP, collectionName);
ZkNodeProps m = new ZkNodeProps(propMap);
inQueue.offer(ZkStateReader.toJSON(m));
} else {
log.info("Requesting shard state be set to 'recovery'");
DistributedQueue inQueue = Overseer.getInQueue(zkStateReader.getZkClient());
Map<String, Object> propMap = new HashMap<String, Object>();
propMap.put(Overseer.QUEUE_OPERATION, "updateshardstate");
for (String subSlice : subSlices) {
propMap.put(subSlice, Slice.RECOVERY);
}
propMap.put(ZkStateReader.COLLECTION_PROP, collectionName);
ZkNodeProps m = new ZkNodeProps(propMap);
inQueue.offer(ZkStateReader.toJSON(m));
}
return true;
} catch (SolrException e) {
@ -819,7 +834,7 @@ public class OverseerCollectionProcessor implements Runnable, ClosableThread {
}
// For now, only allow for deletions of Inactive slices or custom hashes (range==null).
// TODO: Add check for range gaps on Slice deletion
if (!(slice.getRange() == null || slice.getState().equals(Slice.INACTIVE))) {
if (!(slice.getRange() == null || slice.getState().equals(Slice.INACTIVE) || slice.getState().equals(Slice.RECOVERY))) {
throw new SolrException(ErrorCode.BAD_REQUEST,
"The slice: " + slice.getName() + " is currently "
+ slice.getState() + ". Only INACTIVE (or custom-hashed) slices can be deleted.");

View File

@ -1036,6 +1036,7 @@ public final class ZkController {
ZkStateReader.SHARD_ID_PROP, cd.getCloudDescriptor().getShardId(),
ZkStateReader.SHARD_RANGE_PROP, cd.getCloudDescriptor().getShardRange(),
ZkStateReader.SHARD_STATE_PROP, cd.getCloudDescriptor().getShardState(),
ZkStateReader.SHARD_PARENT_PROP, cd.getCloudDescriptor().getShardParent(),
ZkStateReader.COLLECTION_PROP, cd.getCloudDescriptor()
.getCollectionName(),
ZkStateReader.NUM_SHARDS_PROP, numShards != null ? numShards.toString()

View File

@ -856,7 +856,7 @@ public final class SolrCore implements SolrInfoMBean {
cd.getCloudDescriptor().setShardState(null);
cd.getCloudDescriptor().setShardRange(null);
cd.getCloudDescriptor().setShardParent(null);
}
// For debugging
// numOpens.incrementAndGet();

View File

@ -423,6 +423,7 @@ public class CoreAdminHandler extends RequestHandlerBase {
.put(CoreAdminParams.CORE_NODE_NAME, CoreDescriptor.CORE_NODE_NAME)
.put(CoreAdminParams.SHARD_STATE, CloudDescriptor.SHARD_STATE)
.put(CoreAdminParams.SHARD_RANGE, CloudDescriptor.SHARD_RANGE)
.put(CoreAdminParams.SHARD_PARENT, CloudDescriptor.SHARD_PARENT)
.put(ZkStateReader.NUM_SHARDS_PROP, CloudDescriptor.NUM_SHARDS)
.build();

View File

@ -301,7 +301,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
// Am I the leader of a shard in "construction" state?
String myShardId = req.getCore().getCoreDescriptor().getCloudDescriptor().getShardId();
Slice mySlice = coll.getSlice(myShardId);
if (Slice.CONSTRUCTION.equals(mySlice.getState())) {
if (Slice.CONSTRUCTION.equals(mySlice.getState()) || Slice.RECOVERY.equals(mySlice.getState())) {
Replica myLeader = zkController.getZkStateReader().getLeaderRetry(collection, myShardId);
boolean amILeader = myLeader.getName().equals(
req.getCore().getCoreDescriptor().getCloudDescriptor()
@ -326,7 +326,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
Collection<Slice> allSlices = coll.getSlices();
List<Node> nodes = null;
for (Slice aslice : allSlices) {
if (Slice.CONSTRUCTION.equals(aslice.getState())) {
if (Slice.CONSTRUCTION.equals(aslice.getState()) || Slice.RECOVERY.equals(aslice.getState())) {
DocRouter.Range myRange = coll.getSlice(shardId).getRange();
if (myRange == null) myRange = new DocRouter.Range(Integer.MIN_VALUE, Integer.MAX_VALUE);
boolean isSubset = aslice.getRange() != null && aslice.getRange().isSubsetOf(myRange);
@ -358,9 +358,9 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
if (DistribPhase.FROMLEADER == phase && localIsLeader && from != null) { // from will be null on log replay
String fromShard = req.getParams().get("distrib.from.parent");
if (fromShard != null) {
if (!Slice.CONSTRUCTION.equals(mySlice.getState())) {
if (Slice.ACTIVE.equals(mySlice.getState())) {
throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE,
"Request says it is coming from parent shard leader but we are not in construction state");
"Request says it is coming from parent shard leader but we are in active state");
}
// shard splitting case -- check ranges to see if we are a sub-shard
Slice fromSlice = zkController.getClusterState().getCollection(collection).getSlice(fromShard);

View File

@ -198,6 +198,7 @@ public class ShardSplitTest extends BasicDistributedZkTest {
}
}
waitForRecoveriesToFinish(false);
checkDocCountsAndShardStates(docCounts, numReplicas);
}
@ -274,6 +275,8 @@ public class ShardSplitTest extends BasicDistributedZkTest {
}
}
waitForRecoveriesToFinish(collectionName, false);
assertEquals(docCounts[0], collectionClient.query(new SolrQuery("*:*").setParam("shards", "shard1_0")).getResults().getNumFound());
assertEquals(docCounts[1], collectionClient.query(new SolrQuery("*:*").setParam("shards", "shard1_1")).getResults().getNumFound());
}

View File

@ -36,6 +36,8 @@ public class Slice extends ZkNodeProps {
public static String ACTIVE = "active";
public static String INACTIVE = "inactive";
public static String CONSTRUCTION = "construction";
public static String RECOVERY = "recovery";
public static String PARENT = "parent";
private final String name;
private final DocRouter.Range range;
@ -43,6 +45,7 @@ public class Slice extends ZkNodeProps {
private final Map<String,Replica> replicas;
private final Replica leader;
private final String state;
private final String parent;
/**
* @param name The name of the slice
@ -75,6 +78,11 @@ public class Slice extends ZkNodeProps {
}
**/
if (propMap.containsKey(PARENT) && propMap.get(PARENT) != null)
this.parent = (String) propMap.get(PARENT);
else
this.parent = null;
replicationFactor = null; // future
// add the replicas *after* the other properties (for aesthetics, so it's easy to find slice properties in the JSON output)
@ -150,6 +158,10 @@ public class Slice extends ZkNodeProps {
return state;
}
public String getParent() {
return parent;
}
@Override
public String toString() {
return name + ':' + JSONUtil.toJSON(propMap);

View File

@ -59,6 +59,7 @@ public class ZkStateReader {
public static final String SHARD_ID_PROP = "shard";
public static final String SHARD_RANGE_PROP = "shard_range";
public static final String SHARD_STATE_PROP = "shard_state";
public static final String SHARD_PARENT_PROP = "shard_parent";
public static final String NUM_SHARDS_PROP = "numShards";
public static final String LEADER_PROP = "leader";

View File

@ -80,6 +80,9 @@ public abstract class CoreAdminParams
/** The shard range in solr cloud */
public final static String SHARD_STATE = "shard.state";
/** The parent shard if applicable */
public final static String SHARD_PARENT = "shard.parent";
/** The target core to which a split index should be written to
* Multiple targetCores can be specified by multiple targetCore parameters */
public final static String TARGET_CORE = "targetCore";