mirror of https://github.com/apache/lucene.git
SOLR-7673: Race condition in shard splitting can cause operation to hang indefinitely or sub-shards to never become active
git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1688396 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
a7d1ee7002
commit
a32822991a
|
@ -189,6 +189,9 @@ Bug Fixes
|
|||
* SOLR-6686: facet.threads can return wrong results when using facet.prefix multiple
|
||||
times on same field. (Michael Ryan, Tim Underwood via shalin)
|
||||
|
||||
* SOLR-7673: Race condition in shard splitting can cause operation to hang indefinitely
|
||||
or sub-shards to never become active. (shalin)
|
||||
|
||||
Optimizations
|
||||
----------------------
|
||||
|
||||
|
|
|
@ -17,6 +17,18 @@ package org.apache.solr.cloud;
|
|||
* the License.
|
||||
*/
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.Comparator;
|
||||
import java.util.HashMap;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.regex.Matcher;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
import org.apache.solr.cloud.rule.ReplicaAssigner;
|
||||
import org.apache.solr.cloud.rule.Rule;
|
||||
import org.apache.solr.common.SolrException;
|
||||
|
@ -30,20 +42,9 @@ import org.apache.solr.core.CoreContainer;
|
|||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.Comparator;
|
||||
import java.util.HashMap;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.regex.Matcher;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
import static org.apache.solr.cloud.OverseerCollectionProcessor.CREATE_NODE_SET;
|
||||
import static org.apache.solr.cloud.OverseerCollectionProcessor.NUM_SLICES;
|
||||
import static org.apache.solr.common.cloud.ZkStateReader.CORE_NAME_PROP;
|
||||
import static org.apache.solr.common.cloud.ZkStateReader.MAX_SHARDS_PER_NODE;
|
||||
import static org.apache.solr.common.cloud.ZkStateReader.REPLICATION_FACTOR;
|
||||
|
||||
|
@ -120,16 +121,34 @@ public class Assign {
|
|||
return returnShardId;
|
||||
}
|
||||
|
||||
static class Node {
|
||||
public final String nodeName;
|
||||
public int thisCollectionNodes=0;
|
||||
public int totalNodes=0;
|
||||
static String buildCoreName(DocCollection collection, String shard) {
|
||||
Slice slice = collection.getSlice(shard);
|
||||
int replicaNum = slice.getReplicas().size();
|
||||
for (;;) {
|
||||
String replicaName = collection.getName() + "_" + shard + "_replica" + replicaNum;
|
||||
boolean exists = false;
|
||||
for (Replica replica : slice.getReplicas()) {
|
||||
if (replicaName.equals(replica.getStr(CORE_NAME_PROP))) {
|
||||
exists = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (exists) replicaNum++;
|
||||
else break;
|
||||
}
|
||||
return collection.getName() + "_" + shard + "_replica" + replicaNum;
|
||||
}
|
||||
|
||||
static class Node {
|
||||
public final String nodeName;
|
||||
public int thisCollectionNodes = 0;
|
||||
public int totalNodes = 0;
|
||||
|
||||
Node(String nodeName) {
|
||||
this.nodeName = nodeName;
|
||||
}
|
||||
|
||||
public int weight(){
|
||||
public int weight() {
|
||||
return (thisCollectionNodes * 100) + totalNodes;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -150,6 +150,8 @@ public class OverseerCollectionProcessor implements Runnable, Closeable {
|
|||
|
||||
public static final String ONLY_ACTIVE_NODES = "onlyactivenodes";
|
||||
|
||||
private static final String SKIP_CREATE_REPLICA_IN_CLUSTER_STATE = "skipCreateReplicaInClusterState";
|
||||
|
||||
public int maxParallelThreads = 10;
|
||||
|
||||
public static final Map<String, Object> COLL_PROPS = Collections.unmodifiableMap(makeMap(
|
||||
|
@ -1494,7 +1496,7 @@ public class OverseerCollectionProcessor implements Runnable, Closeable {
|
|||
collectShardResponses(results, false, null, shardHandler);
|
||||
|
||||
final String asyncId = message.getStr(ASYNC);
|
||||
HashMap<String,String> requestMap = new HashMap<String,String>();
|
||||
HashMap<String,String> requestMap = new HashMap<>();
|
||||
|
||||
for (int i = 0; i < subRanges.size(); i++) {
|
||||
String subSlice = subSlices.get(i);
|
||||
|
@ -1627,25 +1629,33 @@ public class OverseerCollectionProcessor implements Runnable, Closeable {
|
|||
List<String> nodeList = new ArrayList<>(nodes.size());
|
||||
nodeList.addAll(nodes);
|
||||
|
||||
Collections.shuffle(nodeList, RANDOM);
|
||||
|
||||
// TODO: Have maxShardsPerNode param for this operation?
|
||||
|
||||
// Remove the node that hosts the parent shard for replica creation.
|
||||
nodeList.remove(nodeName);
|
||||
|
||||
// TODO: change this to handle sharding a slice into > 2 sub-shards.
|
||||
|
||||
|
||||
List<Map<String, Object>> replicas = new ArrayList<>((repFactor - 1) * 2);
|
||||
for (int i = 1; i <= subSlices.size(); i++) {
|
||||
Collections.shuffle(nodeList, RANDOM);
|
||||
String sliceName = subSlices.get(i - 1);
|
||||
for (int j = 2; j <= repFactor; j++) {
|
||||
String subShardNodeName = nodeList.get((repFactor * (i - 1) + (j - 2)) % nodeList.size());
|
||||
String shardName = collectionName + "_" + sliceName + "_replica" + (j);
|
||||
|
||||
|
||||
log.info("Creating replica shard " + shardName + " as part of slice " + sliceName + " of collection "
|
||||
+ collectionName + " on " + subShardNodeName);
|
||||
|
||||
|
||||
ZkNodeProps props = new ZkNodeProps(Overseer.QUEUE_OPERATION, ADDREPLICA.toLower(),
|
||||
ZkStateReader.COLLECTION_PROP, collectionName,
|
||||
ZkStateReader.SHARD_ID_PROP, sliceName,
|
||||
ZkStateReader.CORE_NAME_PROP, shardName,
|
||||
ZkStateReader.STATE_PROP, Replica.State.DOWN.toString(),
|
||||
ZkStateReader.BASE_URL_PROP, zkStateReader.getBaseUrlForNodeName(subShardNodeName),
|
||||
ZkStateReader.NODE_NAME_PROP, subShardNodeName);
|
||||
Overseer.getInQueue(zkStateReader.getZkClient()).offer(ZkStateReader.toJSON(props));
|
||||
|
||||
HashMap<String,Object> propMap = new HashMap<>();
|
||||
propMap.put(Overseer.QUEUE_OPERATION, ADDREPLICA.toLower());
|
||||
propMap.put(COLLECTION_PROP, collectionName);
|
||||
|
@ -1662,34 +1672,17 @@ public class OverseerCollectionProcessor implements Runnable, Closeable {
|
|||
if (asyncId != null) {
|
||||
propMap.put(ASYNC, asyncId);
|
||||
}
|
||||
addReplica(clusterState, new ZkNodeProps(propMap), results);
|
||||
|
||||
String coreNodeName = waitForCoreNodeName(collectionName, subShardNodeName, shardName);
|
||||
// wait for the replicas to be seen as active on sub shard leader
|
||||
log.info("Asking sub shard leader to wait for: " + shardName + " to be alive on: " + subShardNodeName);
|
||||
CoreAdminRequest.WaitForState cmd = new CoreAdminRequest.WaitForState();
|
||||
cmd.setCoreName(subShardNames.get(i - 1));
|
||||
cmd.setNodeName(subShardNodeName);
|
||||
cmd.setCoreNodeName(coreNodeName);
|
||||
cmd.setState(Replica.State.RECOVERING);
|
||||
cmd.setCheckLive(true);
|
||||
cmd.setOnlyIfLeader(true);
|
||||
ModifiableSolrParams p = new ModifiableSolrParams(cmd.getParams());
|
||||
|
||||
sendShardRequest(nodeName, p, shardHandler, asyncId, requestMap);
|
||||
|
||||
// special flag param to instruct addReplica not to create the replica in cluster state again
|
||||
propMap.put(SKIP_CREATE_REPLICA_IN_CLUSTER_STATE, "true");
|
||||
|
||||
replicas.add(propMap);
|
||||
}
|
||||
}
|
||||
|
||||
collectShardResponses(results, true,
|
||||
"SPLITSHARD failed to create subshard replicas or timed out waiting for them to come up", shardHandler);
|
||||
|
||||
completeAsyncRequest(asyncId, requestMap, results);
|
||||
|
||||
log.info("Successfully created all replica shards for all sub-slices " + subSlices);
|
||||
|
||||
commit(results, slice, parentShardLeader);
|
||||
|
||||
|
||||
// we must set the slice state into recovery before actually creating the replica cores
|
||||
// this ensures that the logic inside Overseer to update sub-shard state to 'active'
|
||||
// always gets a chance to execute. See SOLR-7673
|
||||
|
||||
if (repFactor == 1) {
|
||||
// switch sub shard states to 'active'
|
||||
log.info("Replication factor is 1 so switching shard states");
|
||||
|
@ -1715,6 +1708,20 @@ public class OverseerCollectionProcessor implements Runnable, Closeable {
|
|||
ZkNodeProps m = new ZkNodeProps(propMap);
|
||||
inQueue.offer(ZkStateReader.toJSON(m));
|
||||
}
|
||||
|
||||
// now actually create replica cores on sub shard nodes
|
||||
for (Map<String, Object> replica : replicas) {
|
||||
addReplica(clusterState, new ZkNodeProps(replica), results);
|
||||
}
|
||||
|
||||
collectShardResponses(results, true,
|
||||
"SPLITSHARD failed to create subshard replicas", shardHandler);
|
||||
|
||||
completeAsyncRequest(asyncId, requestMap, results);
|
||||
|
||||
log.info("Successfully created all replica shards for all sub-slices " + subSlices);
|
||||
|
||||
commit(results, slice, parentShardLeader);
|
||||
|
||||
return true;
|
||||
} catch (SolrException e) {
|
||||
|
@ -2456,8 +2463,7 @@ public class OverseerCollectionProcessor implements Runnable, Closeable {
|
|||
Map<String, Replica> result = new HashMap<>();
|
||||
long endTime = System.nanoTime() + TimeUnit.NANOSECONDS.convert(30, TimeUnit.SECONDS);
|
||||
while (true) {
|
||||
DocCollection coll = zkStateReader.getClusterState().getCollection(
|
||||
collectionName);
|
||||
DocCollection coll = zkStateReader.getClusterState().getCollection(collectionName);
|
||||
for (String coreName : coreNames) {
|
||||
if (result.containsKey(coreName)) continue;
|
||||
for (Slice slice : coll.getSlices()) {
|
||||
|
@ -2474,7 +2480,7 @@ public class OverseerCollectionProcessor implements Runnable, Closeable {
|
|||
return result;
|
||||
}
|
||||
if (System.nanoTime() > endTime) {
|
||||
throw new SolrException(ErrorCode.SERVER_ERROR, "Timed out waiting to see all replicas in cluster state.");
|
||||
throw new SolrException(ErrorCode.SERVER_ERROR, "Timed out waiting to see all replicas: " + coreNames + " in cluster state.");
|
||||
}
|
||||
|
||||
Thread.sleep(100);
|
||||
|
@ -2511,23 +2517,8 @@ public class OverseerCollectionProcessor implements Runnable, Closeable {
|
|||
throw new SolrException(ErrorCode.BAD_REQUEST, "Node: " + node + " is not live");
|
||||
}
|
||||
if (coreName == null) {
|
||||
// assign a name to this core
|
||||
Slice slice = coll.getSlice(shard);
|
||||
int replicaNum = slice.getReplicas().size();
|
||||
for (;;) {
|
||||
String replicaName = collection + "_" + shard + "_replica" + replicaNum;
|
||||
boolean exists = false;
|
||||
for (Replica replica : slice.getReplicas()) {
|
||||
if (replicaName.equals(replica.getStr(CORE_NAME_PROP))) {
|
||||
exists = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (exists) replicaNum++;
|
||||
else break;
|
||||
}
|
||||
coreName = collection + "_" + shard + "_replica" + replicaNum;
|
||||
} else {
|
||||
coreName = Assign.buildCoreName(coll, shard);
|
||||
} else if (!message.getBool(SKIP_CREATE_REPLICA_IN_CLUSTER_STATE, false)) {
|
||||
//Validate that the core name is unique in that collection
|
||||
for (Slice slice : coll.getSlices()) {
|
||||
for (Replica replica : slice.getReplicas()) {
|
||||
|
@ -2542,11 +2533,13 @@ public class OverseerCollectionProcessor implements Runnable, Closeable {
|
|||
ModifiableSolrParams params = new ModifiableSolrParams();
|
||||
|
||||
if (!Overseer.isLegacy(zkStateReader.getClusterProps())) {
|
||||
ZkNodeProps props = new ZkNodeProps(Overseer.QUEUE_OPERATION, ADDREPLICA.toLower(), ZkStateReader.COLLECTION_PROP,
|
||||
collection, ZkStateReader.SHARD_ID_PROP, shard, ZkStateReader.CORE_NAME_PROP, coreName,
|
||||
ZkStateReader.STATE_PROP, Replica.State.DOWN.toString(), ZkStateReader.BASE_URL_PROP,
|
||||
zkStateReader.getBaseUrlForNodeName(node));
|
||||
Overseer.getInQueue(zkStateReader.getZkClient()).offer(ZkStateReader.toJSON(props));
|
||||
if (!message.getBool(SKIP_CREATE_REPLICA_IN_CLUSTER_STATE, false)) {
|
||||
ZkNodeProps props = new ZkNodeProps(Overseer.QUEUE_OPERATION, ADDREPLICA.toLower(), ZkStateReader.COLLECTION_PROP,
|
||||
collection, ZkStateReader.SHARD_ID_PROP, shard, ZkStateReader.CORE_NAME_PROP, coreName,
|
||||
ZkStateReader.STATE_PROP, Replica.State.DOWN.toString(), ZkStateReader.BASE_URL_PROP,
|
||||
zkStateReader.getBaseUrlForNodeName(node), ZkStateReader.NODE_NAME_PROP, node);
|
||||
Overseer.getInQueue(zkStateReader.getZkClient()).offer(ZkStateReader.toJSON(props));
|
||||
}
|
||||
params.set(CoreAdminParams.CORE_NODE_NAME,
|
||||
waitToSeeReplicasInState(collection, Collections.singletonList(coreName)).get(coreName).getName());
|
||||
}
|
||||
|
|
|
@ -72,7 +72,8 @@ public class SliceMutator {
|
|||
makeMap(
|
||||
ZkStateReader.CORE_NAME_PROP, message.getStr(ZkStateReader.CORE_NAME_PROP),
|
||||
ZkStateReader.BASE_URL_PROP, message.getStr(ZkStateReader.BASE_URL_PROP),
|
||||
ZkStateReader.STATE_PROP, message.getStr(ZkStateReader.STATE_PROP)));
|
||||
ZkStateReader.STATE_PROP, message.getStr(ZkStateReader.STATE_PROP),
|
||||
ZkStateReader.NODE_NAME_PROP, message.getStr(ZkStateReader.NODE_NAME_PROP)));
|
||||
return new ZkWriteCommand(coll, updateReplica(collection, sl, replica.getName(), replica));
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue