mirror of https://github.com/apache/lucene.git
SOLR-11665: Improve error handling of shard splitting. Fix splitting of mixed replica types.
This commit is contained in:
parent
7fb36c5906
commit
1023b839ae
|
@ -119,6 +119,8 @@ Bug Fixes
|
||||||
|
|
||||||
* SOLR-12513: Reproducing TestCodecSupport.testMixedCompressionMode failure (Erick Erickson)
|
* SOLR-12513: Reproducing TestCodecSupport.testMixedCompressionMode failure (Erick Erickson)
|
||||||
|
|
||||||
|
* SOLR-11665: Improve error handling of shard splitting. Fix splitting of mixed replica types. (ab)
|
||||||
|
|
||||||
Optimizations
|
Optimizations
|
||||||
----------------------
|
----------------------
|
||||||
|
|
||||||
|
|
|
@ -172,7 +172,7 @@ public class Assign {
|
||||||
return returnShardId;
|
return returnShardId;
|
||||||
}
|
}
|
||||||
|
|
||||||
private static String buildSolrCoreName(String collectionName, String shard, Replica.Type type, int replicaNum) {
|
public static String buildSolrCoreName(String collectionName, String shard, Replica.Type type, int replicaNum) {
|
||||||
// TODO: Adding the suffix is great for debugging, but may be an issue if at some point we want to support a way to change replica type
|
// TODO: Adding the suffix is great for debugging, but may be an issue if at some point we want to support a way to change replica type
|
||||||
return String.format(Locale.ROOT, "%s_%s_replica_%s%s", collectionName, shard, type.name().substring(0,1).toLowerCase(Locale.ROOT), replicaNum);
|
return String.format(Locale.ROOT, "%s_%s_replica_%s%s", collectionName, shard, type.name().substring(0,1).toLowerCase(Locale.ROOT), replicaNum);
|
||||||
}
|
}
|
||||||
|
|
|
@ -26,11 +26,15 @@ import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
import java.util.concurrent.atomic.AtomicReference;
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
|
||||||
import org.apache.solr.client.solrj.cloud.DistributedQueue;
|
import org.apache.solr.client.solrj.cloud.DistributedQueue;
|
||||||
|
import org.apache.solr.client.solrj.cloud.NodeStateProvider;
|
||||||
import org.apache.solr.client.solrj.cloud.autoscaling.PolicyHelper;
|
import org.apache.solr.client.solrj.cloud.autoscaling.PolicyHelper;
|
||||||
import org.apache.solr.client.solrj.cloud.SolrCloudManager;
|
import org.apache.solr.client.solrj.cloud.SolrCloudManager;
|
||||||
|
import org.apache.solr.client.solrj.cloud.autoscaling.ReplicaInfo;
|
||||||
|
import org.apache.solr.client.solrj.cloud.autoscaling.Suggestion;
|
||||||
import org.apache.solr.client.solrj.request.CoreAdminRequest;
|
import org.apache.solr.client.solrj.request.CoreAdminRequest;
|
||||||
import org.apache.solr.cloud.Overseer;
|
import org.apache.solr.cloud.Overseer;
|
||||||
import org.apache.solr.cloud.overseer.OverseerAction;
|
import org.apache.solr.cloud.overseer.OverseerAction;
|
||||||
|
@ -45,6 +49,7 @@ import org.apache.solr.common.cloud.ReplicaPosition;
|
||||||
import org.apache.solr.common.cloud.Slice;
|
import org.apache.solr.common.cloud.Slice;
|
||||||
import org.apache.solr.common.cloud.ZkNodeProps;
|
import org.apache.solr.common.cloud.ZkNodeProps;
|
||||||
import org.apache.solr.common.cloud.ZkStateReader;
|
import org.apache.solr.common.cloud.ZkStateReader;
|
||||||
|
import org.apache.solr.common.cloud.rule.ImplicitSnitch;
|
||||||
import org.apache.solr.common.params.CommonAdminParams;
|
import org.apache.solr.common.params.CommonAdminParams;
|
||||||
import org.apache.solr.common.params.CoreAdminParams;
|
import org.apache.solr.common.params.CoreAdminParams;
|
||||||
import org.apache.solr.common.params.ModifiableSolrParams;
|
import org.apache.solr.common.params.ModifiableSolrParams;
|
||||||
|
@ -52,11 +57,13 @@ import org.apache.solr.common.util.NamedList;
|
||||||
import org.apache.solr.common.util.Utils;
|
import org.apache.solr.common.util.Utils;
|
||||||
import org.apache.solr.handler.component.ShardHandler;
|
import org.apache.solr.handler.component.ShardHandler;
|
||||||
import org.apache.solr.util.TestInjection;
|
import org.apache.solr.util.TestInjection;
|
||||||
|
import org.apache.zookeeper.KeeperException;
|
||||||
import org.apache.zookeeper.data.Stat;
|
import org.apache.zookeeper.data.Stat;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
|
import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
|
||||||
|
import static org.apache.solr.common.cloud.ZkStateReader.REPLICA_TYPE;
|
||||||
import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
|
import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
|
||||||
import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDREPLICA;
|
import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDREPLICA;
|
||||||
import static org.apache.solr.common.params.CollectionParams.CollectionAction.CREATESHARD;
|
import static org.apache.solr.common.params.CollectionParams.CollectionAction.CREATESHARD;
|
||||||
|
@ -101,8 +108,11 @@ public class SplitShardCmd implements OverseerCollectionMessageHandler.Cmd {
|
||||||
parentShardLeader = zkStateReader.getLeaderRetry(collectionName, slice.get(), 10000);
|
parentShardLeader = zkStateReader.getLeaderRetry(collectionName, slice.get(), 10000);
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
Thread.currentThread().interrupt();
|
Thread.currentThread().interrupt();
|
||||||
|
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Interrupted.");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
checkDiskSpace(collectionName, slice.get(), parentShardLeader);
|
||||||
|
|
||||||
// let's record the ephemeralOwner of the parent leader node
|
// let's record the ephemeralOwner of the parent leader node
|
||||||
Stat leaderZnodeStat = zkStateReader.getZkClient().exists(ZkStateReader.LIVE_NODES_ZKNODE + "/" + parentShardLeader.getNodeName(), null, true);
|
Stat leaderZnodeStat = zkStateReader.getZkClient().exists(ZkStateReader.LIVE_NODES_ZKNODE + "/" + parentShardLeader.getNodeName(), null, true);
|
||||||
if (leaderZnodeStat == null) {
|
if (leaderZnodeStat == null) {
|
||||||
|
@ -114,7 +124,36 @@ public class SplitShardCmd implements OverseerCollectionMessageHandler.Cmd {
|
||||||
List<String> subSlices = new ArrayList<>();
|
List<String> subSlices = new ArrayList<>();
|
||||||
List<String> subShardNames = new ArrayList<>();
|
List<String> subShardNames = new ArrayList<>();
|
||||||
|
|
||||||
String rangesStr = fillRanges(ocmh.cloudManager, message, collection, parentSlice, subRanges, subSlices, subShardNames);
|
// reproduce the currently existing number of replicas per type
|
||||||
|
AtomicInteger numNrt = new AtomicInteger();
|
||||||
|
AtomicInteger numTlog = new AtomicInteger();
|
||||||
|
AtomicInteger numPull = new AtomicInteger();
|
||||||
|
parentSlice.getReplicas().forEach(r -> {
|
||||||
|
switch (r.getType()) {
|
||||||
|
case NRT:
|
||||||
|
numNrt.incrementAndGet();
|
||||||
|
break;
|
||||||
|
case TLOG:
|
||||||
|
numTlog.incrementAndGet();
|
||||||
|
break;
|
||||||
|
case PULL:
|
||||||
|
numPull.incrementAndGet();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
int repFactor = numNrt.get() + numTlog.get() + numPull.get();
|
||||||
|
|
||||||
|
// type of the first subreplica will be the same as leader
|
||||||
|
boolean firstNrtReplica = parentShardLeader.getType() == Replica.Type.NRT;
|
||||||
|
// verify that we indeed have the right number of correct replica types
|
||||||
|
if ((firstNrtReplica && numNrt.get() < 1) || (!firstNrtReplica && numTlog.get() < 1)) {
|
||||||
|
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "aborting split - inconsistent replica types in collection " + collectionName +
|
||||||
|
": nrt=" + numNrt.get() + ", tlog=" + numTlog.get() + ", pull=" + numPull.get() + ", shard leader type is " +
|
||||||
|
parentShardLeader.getType());
|
||||||
|
}
|
||||||
|
|
||||||
|
List<Map<String, Object>> replicas = new ArrayList<>((repFactor - 1) * 2);
|
||||||
|
|
||||||
|
String rangesStr = fillRanges(ocmh.cloudManager, message, collection, parentSlice, subRanges, subSlices, subShardNames, firstNrtReplica);
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
|
||||||
|
@ -126,7 +165,7 @@ public class SplitShardCmd implements OverseerCollectionMessageHandler.Cmd {
|
||||||
if (state == Slice.State.ACTIVE) {
|
if (state == Slice.State.ACTIVE) {
|
||||||
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
|
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
|
||||||
"Sub-shard: " + subSlice + " exists in active state. Aborting split shard.");
|
"Sub-shard: " + subSlice + " exists in active state. Aborting split shard.");
|
||||||
} else if (state == Slice.State.CONSTRUCTION || state == Slice.State.RECOVERY) {
|
} else {
|
||||||
// delete the shards
|
// delete the shards
|
||||||
log.info("Sub-shard: {} already exists therefore requesting its deletion", subSlice);
|
log.info("Sub-shard: {} already exists therefore requesting its deletion", subSlice);
|
||||||
Map<String, Object> propMap = new HashMap<>();
|
Map<String, Object> propMap = new HashMap<>();
|
||||||
|
@ -188,6 +227,7 @@ public class SplitShardCmd implements OverseerCollectionMessageHandler.Cmd {
|
||||||
propMap.put(Overseer.QUEUE_OPERATION, ADDREPLICA.toLower());
|
propMap.put(Overseer.QUEUE_OPERATION, ADDREPLICA.toLower());
|
||||||
propMap.put(COLLECTION_PROP, collectionName);
|
propMap.put(COLLECTION_PROP, collectionName);
|
||||||
propMap.put(SHARD_ID_PROP, subSlice);
|
propMap.put(SHARD_ID_PROP, subSlice);
|
||||||
|
propMap.put(REPLICA_TYPE, firstNrtReplica ? Replica.Type.NRT.toString() : Replica.Type.TLOG.toString());
|
||||||
propMap.put("node", nodeName);
|
propMap.put("node", nodeName);
|
||||||
propMap.put(CoreAdminParams.NAME, subShardName);
|
propMap.put(CoreAdminParams.NAME, subShardName);
|
||||||
propMap.put(CommonAdminParams.WAIT_FOR_FINAL_STATE, Boolean.toString(waitForFinalState));
|
propMap.put(CommonAdminParams.WAIT_FOR_FINAL_STATE, Boolean.toString(waitForFinalState));
|
||||||
|
@ -268,21 +308,8 @@ public class SplitShardCmd implements OverseerCollectionMessageHandler.Cmd {
|
||||||
log.info("Successfully applied buffered updates on : " + subShardNames);
|
log.info("Successfully applied buffered updates on : " + subShardNames);
|
||||||
|
|
||||||
// Replica creation for the new Slices
|
// Replica creation for the new Slices
|
||||||
|
// replica placement is controlled by the autoscaling policy framework
|
||||||
|
|
||||||
// look at the replication factor and see if it matches reality
|
|
||||||
// if it does not, find best nodes to create more cores
|
|
||||||
|
|
||||||
// TODO: Have replication factor decided in some other way instead of numShards for the parent
|
|
||||||
|
|
||||||
int repFactor = parentSlice.getReplicas().size();
|
|
||||||
|
|
||||||
// we need to look at every node and see how many cores it serves
|
|
||||||
// add our new cores to existing nodes serving the least number of cores
|
|
||||||
// but (for now) require that each core goes on a distinct node.
|
|
||||||
|
|
||||||
// TODO: add smarter options that look at the current number of cores per
|
|
||||||
// node?
|
|
||||||
// for now we just go random
|
|
||||||
Set<String> nodes = clusterState.getLiveNodes();
|
Set<String> nodes = clusterState.getLiveNodes();
|
||||||
List<String> nodeList = new ArrayList<>(nodes.size());
|
List<String> nodeList = new ArrayList<>(nodes.size());
|
||||||
nodeList.addAll(nodes);
|
nodeList.addAll(nodes);
|
||||||
|
@ -294,28 +321,39 @@ public class SplitShardCmd implements OverseerCollectionMessageHandler.Cmd {
|
||||||
|
|
||||||
// TODO: change this to handle sharding a slice into > 2 sub-shards.
|
// TODO: change this to handle sharding a slice into > 2 sub-shards.
|
||||||
|
|
||||||
|
// we have already created one subReplica for each subShard on the parent node.
|
||||||
|
// identify locations for the remaining replicas
|
||||||
|
if (firstNrtReplica) {
|
||||||
|
numNrt.decrementAndGet();
|
||||||
|
} else {
|
||||||
|
numTlog.decrementAndGet();
|
||||||
|
}
|
||||||
|
|
||||||
List<ReplicaPosition> replicaPositions = Assign.identifyNodes(ocmh.cloudManager,
|
List<ReplicaPosition> replicaPositions = Assign.identifyNodes(ocmh.cloudManager,
|
||||||
clusterState,
|
clusterState,
|
||||||
new ArrayList<>(clusterState.getLiveNodes()),
|
new ArrayList<>(clusterState.getLiveNodes()),
|
||||||
collectionName,
|
collectionName,
|
||||||
new ZkNodeProps(collection.getProperties()),
|
new ZkNodeProps(collection.getProperties()),
|
||||||
subSlices, repFactor - 1, 0, 0);
|
subSlices, numNrt.get(), numTlog.get(), numPull.get());
|
||||||
sessionWrapper = PolicyHelper.getLastSessionWrapper(true);
|
sessionWrapper = PolicyHelper.getLastSessionWrapper(true);
|
||||||
|
|
||||||
List<Map<String, Object>> replicas = new ArrayList<>((repFactor - 1) * 2);
|
|
||||||
|
|
||||||
for (ReplicaPosition replicaPosition : replicaPositions) {
|
for (ReplicaPosition replicaPosition : replicaPositions) {
|
||||||
String sliceName = replicaPosition.shard;
|
String sliceName = replicaPosition.shard;
|
||||||
String subShardNodeName = replicaPosition.node;
|
String subShardNodeName = replicaPosition.node;
|
||||||
String solrCoreName = collectionName + "_" + sliceName + "_replica" + (replicaPosition.index);
|
String solrCoreName = Assign.buildSolrCoreName(collectionName, sliceName, replicaPosition.type, replicaPosition.index);
|
||||||
|
|
||||||
log.info("Creating replica shard " + solrCoreName + " as part of slice " + sliceName + " of collection "
|
log.info("Creating replica shard " + solrCoreName + " as part of slice " + sliceName + " of collection "
|
||||||
+ collectionName + " on " + subShardNodeName);
|
+ collectionName + " on " + subShardNodeName);
|
||||||
|
|
||||||
|
// we first create all replicas in DOWN state without actually creating their cores in order to
|
||||||
|
// avoid a race condition where Overseer may prematurely activate the new sub-slices (and deactivate
|
||||||
|
// the parent slice) before all new replicas are added. This situation may lead to a loss of performance
|
||||||
|
// because the new shards will be activated with possibly many fewer replicas.
|
||||||
ZkNodeProps props = new ZkNodeProps(Overseer.QUEUE_OPERATION, ADDREPLICA.toLower(),
|
ZkNodeProps props = new ZkNodeProps(Overseer.QUEUE_OPERATION, ADDREPLICA.toLower(),
|
||||||
ZkStateReader.COLLECTION_PROP, collectionName,
|
ZkStateReader.COLLECTION_PROP, collectionName,
|
||||||
ZkStateReader.SHARD_ID_PROP, sliceName,
|
ZkStateReader.SHARD_ID_PROP, sliceName,
|
||||||
ZkStateReader.CORE_NAME_PROP, solrCoreName,
|
ZkStateReader.CORE_NAME_PROP, solrCoreName,
|
||||||
|
ZkStateReader.REPLICA_TYPE, replicaPosition.type.name(),
|
||||||
ZkStateReader.STATE_PROP, Replica.State.DOWN.toString(),
|
ZkStateReader.STATE_PROP, Replica.State.DOWN.toString(),
|
||||||
ZkStateReader.BASE_URL_PROP, zkStateReader.getBaseUrlForNodeName(subShardNodeName),
|
ZkStateReader.BASE_URL_PROP, zkStateReader.getBaseUrlForNodeName(subShardNodeName),
|
||||||
ZkStateReader.NODE_NAME_PROP, subShardNodeName,
|
ZkStateReader.NODE_NAME_PROP, subShardNodeName,
|
||||||
|
@ -326,6 +364,7 @@ public class SplitShardCmd implements OverseerCollectionMessageHandler.Cmd {
|
||||||
propMap.put(Overseer.QUEUE_OPERATION, ADDREPLICA.toLower());
|
propMap.put(Overseer.QUEUE_OPERATION, ADDREPLICA.toLower());
|
||||||
propMap.put(COLLECTION_PROP, collectionName);
|
propMap.put(COLLECTION_PROP, collectionName);
|
||||||
propMap.put(SHARD_ID_PROP, sliceName);
|
propMap.put(SHARD_ID_PROP, sliceName);
|
||||||
|
propMap.put(REPLICA_TYPE, replicaPosition.type.name());
|
||||||
propMap.put("node", subShardNodeName);
|
propMap.put("node", subShardNodeName);
|
||||||
propMap.put(CoreAdminParams.NAME, solrCoreName);
|
propMap.put(CoreAdminParams.NAME, solrCoreName);
|
||||||
// copy over property params:
|
// copy over property params:
|
||||||
|
@ -409,6 +448,8 @@ public class SplitShardCmd implements OverseerCollectionMessageHandler.Cmd {
|
||||||
ocmh.addReplica(clusterState, new ZkNodeProps(replica), results, null);
|
ocmh.addReplica(clusterState, new ZkNodeProps(replica), results, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
assert TestInjection.injectSplitFailureAfterReplicaCreation();
|
||||||
|
|
||||||
ocmh.processResponses(results, shardHandler, true, "SPLITSHARD failed to create subshard replicas", asyncId, requestMap);
|
ocmh.processResponses(results, shardHandler, true, "SPLITSHARD failed to create subshard replicas", asyncId, requestMap);
|
||||||
|
|
||||||
log.info("Successfully created all replica shards for all sub-slices " + subSlices);
|
log.info("Successfully created all replica shards for all sub-slices " + subSlices);
|
||||||
|
@ -417,6 +458,7 @@ public class SplitShardCmd implements OverseerCollectionMessageHandler.Cmd {
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
} catch (SolrException e) {
|
} catch (SolrException e) {
|
||||||
|
cleanupAfterFailure(zkStateReader, collectionName, parentSlice.getName(), subSlices);
|
||||||
throw e;
|
throw e;
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.error("Error executing split operation for collection: " + collectionName + " parent shard: " + slice, e);
|
log.error("Error executing split operation for collection: " + collectionName + " parent shard: " + slice, e);
|
||||||
|
@ -426,6 +468,106 @@ public class SplitShardCmd implements OverseerCollectionMessageHandler.Cmd {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void checkDiskSpace(String collection, String shard, Replica parentShardLeader) throws SolrException {
|
||||||
|
// check that enough disk space is available on the parent leader node
|
||||||
|
// otherwise the actual index splitting will always fail
|
||||||
|
NodeStateProvider nodeStateProvider = ocmh.cloudManager.getNodeStateProvider();
|
||||||
|
Map<String, Object> nodeValues = nodeStateProvider.getNodeValues(parentShardLeader.getNodeName(),
|
||||||
|
Collections.singletonList(ImplicitSnitch.DISK));
|
||||||
|
Map<String, Map<String, List<ReplicaInfo>>> infos = nodeStateProvider.getReplicaInfo(parentShardLeader.getNodeName(),
|
||||||
|
Collections.singletonList(Suggestion.ConditionType.CORE_IDX.metricsAttribute));
|
||||||
|
if (infos.get(collection) == null || infos.get(collection).get(shard) == null) {
|
||||||
|
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "missing replica information for parent shard leader");
|
||||||
|
}
|
||||||
|
// find the leader
|
||||||
|
List<ReplicaInfo> lst = infos.get(collection).get(shard);
|
||||||
|
Double indexSize = null;
|
||||||
|
for (ReplicaInfo info : lst) {
|
||||||
|
if (info.getCore().equals(parentShardLeader.getCoreName())) {
|
||||||
|
Number size = (Number)info.getVariable(Suggestion.ConditionType.CORE_IDX.metricsAttribute);
|
||||||
|
if (size == null) {
|
||||||
|
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "missing index size information for parent shard leader");
|
||||||
|
}
|
||||||
|
indexSize = (Double)Suggestion.ConditionType.CORE_IDX.convertVal(size);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (indexSize == null) {
|
||||||
|
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "missing replica information for parent shard leader");
|
||||||
|
}
|
||||||
|
Number freeSize = (Number)nodeValues.get(ImplicitSnitch.DISK);
|
||||||
|
if (freeSize == null) {
|
||||||
|
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "missing node disk space information for parent shard leader");
|
||||||
|
}
|
||||||
|
if (freeSize.doubleValue() < 2.0 * indexSize) {
|
||||||
|
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "not enough free disk space to perform index split on node " +
|
||||||
|
parentShardLeader.getNodeName() + ", required: " + (2 * indexSize) + ", available: " + freeSize);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void cleanupAfterFailure(ZkStateReader zkStateReader, String collectionName, String parentShard, List<String> subSlices) {
|
||||||
|
log.debug("- cleanup after failed split of " + collectionName + "/" + parentShard);
|
||||||
|
// get the latest state
|
||||||
|
try {
|
||||||
|
zkStateReader.forceUpdateCollection(collectionName);
|
||||||
|
} catch (KeeperException | InterruptedException e) {
|
||||||
|
log.warn("Cleanup after failed split of " + collectionName + "/" + parentShard + ": (force update collection)", e);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
ClusterState clusterState = zkStateReader.getClusterState();
|
||||||
|
DocCollection coll = clusterState.getCollectionOrNull(collectionName);
|
||||||
|
|
||||||
|
if (coll == null) { // may have been deleted
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// set already created sub shards states to CONSTRUCTION - this prevents them
|
||||||
|
// from entering into RECOVERY or ACTIVE (SOLR-9455)
|
||||||
|
DistributedQueue inQueue = Overseer.getStateUpdateQueue(zkStateReader.getZkClient());
|
||||||
|
Map<String, Object> propMap = new HashMap<>();
|
||||||
|
propMap.put(Overseer.QUEUE_OPERATION, OverseerAction.UPDATESHARDSTATE.toLower());
|
||||||
|
propMap.put(ZkStateReader.COLLECTION_PROP, collectionName);
|
||||||
|
for (Slice s : coll.getSlices()) {
|
||||||
|
if (!subSlices.contains(s.getName())) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
propMap.put(s.getName(), Slice.State.CONSTRUCTION.toString());
|
||||||
|
}
|
||||||
|
|
||||||
|
// if parent is inactive activate it again
|
||||||
|
Slice parentSlice = coll.getSlice(parentShard);
|
||||||
|
if (parentSlice.getState() == Slice.State.INACTIVE) {
|
||||||
|
propMap.put(parentShard, Slice.State.ACTIVE.toString());
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
ZkNodeProps m = new ZkNodeProps(propMap);
|
||||||
|
inQueue.offer(Utils.toJSON(m));
|
||||||
|
} catch (Exception e) {
|
||||||
|
// don't give up yet - just log the error, we may still be able to clean up
|
||||||
|
log.warn("Cleanup after failed split of " + collectionName + "/" + parentShard + ": (slice state changes)", e);
|
||||||
|
}
|
||||||
|
|
||||||
|
// delete existing subShards
|
||||||
|
for (String subSlice : subSlices) {
|
||||||
|
Slice s = coll.getSlice(subSlice);
|
||||||
|
if (s == null) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
log.info("Sub-shard: {} already exists therefore requesting its deletion", subSlice);
|
||||||
|
propMap = new HashMap<>();
|
||||||
|
propMap.put(Overseer.QUEUE_OPERATION, "deleteshard");
|
||||||
|
propMap.put(COLLECTION_PROP, collectionName);
|
||||||
|
propMap.put(SHARD_ID_PROP, subSlice);
|
||||||
|
ZkNodeProps m = new ZkNodeProps(propMap);
|
||||||
|
try {
|
||||||
|
ocmh.commandMap.get(DELETESHARD).call(clusterState, m, new NamedList());
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.warn("Cleanup after failed split of " + collectionName + "/" + parentShard + ": (deleting existing sub shard " + subSlice + ")", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public static Slice getParentSlice(ClusterState clusterState, String collectionName, AtomicReference<String> slice, String splitKey) {
|
public static Slice getParentSlice(ClusterState clusterState, String collectionName, AtomicReference<String> slice, String splitKey) {
|
||||||
DocCollection collection = clusterState.getCollection(collectionName);
|
DocCollection collection = clusterState.getCollection(collectionName);
|
||||||
DocRouter router = collection.getRouter() != null ? collection.getRouter() : DocRouter.DEFAULT;
|
DocRouter router = collection.getRouter() != null ? collection.getRouter() : DocRouter.DEFAULT;
|
||||||
|
@ -463,7 +605,8 @@ public class SplitShardCmd implements OverseerCollectionMessageHandler.Cmd {
|
||||||
}
|
}
|
||||||
|
|
||||||
public static String fillRanges(SolrCloudManager cloudManager, ZkNodeProps message, DocCollection collection, Slice parentSlice,
|
public static String fillRanges(SolrCloudManager cloudManager, ZkNodeProps message, DocCollection collection, Slice parentSlice,
|
||||||
List<DocRouter.Range> subRanges, List<String> subSlices, List<String> subShardNames) {
|
List<DocRouter.Range> subRanges, List<String> subSlices, List<String> subShardNames,
|
||||||
|
boolean firstReplicaNrt) {
|
||||||
String splitKey = message.getStr("split.key");
|
String splitKey = message.getStr("split.key");
|
||||||
DocRouter.Range range = parentSlice.getRange();
|
DocRouter.Range range = parentSlice.getRange();
|
||||||
if (range == null) {
|
if (range == null) {
|
||||||
|
@ -532,7 +675,8 @@ public class SplitShardCmd implements OverseerCollectionMessageHandler.Cmd {
|
||||||
for (int i = 0; i < subRanges.size(); i++) {
|
for (int i = 0; i < subRanges.size(); i++) {
|
||||||
String subSlice = parentSlice.getName() + "_" + i;
|
String subSlice = parentSlice.getName() + "_" + i;
|
||||||
subSlices.add(subSlice);
|
subSlices.add(subSlice);
|
||||||
String subShardName = Assign.buildSolrCoreName(cloudManager.getDistribStateManager(), collection, subSlice, Replica.Type.NRT);
|
String subShardName = Assign.buildSolrCoreName(cloudManager.getDistribStateManager(), collection, subSlice,
|
||||||
|
firstReplicaNrt ? Replica.Type.NRT : Replica.Type.TLOG);
|
||||||
subShardNames.add(subShardName);
|
subShardNames.add(subShardName);
|
||||||
}
|
}
|
||||||
return rangesStr;
|
return rangesStr;
|
||||||
|
|
|
@ -133,6 +133,8 @@ public class TestInjection {
|
||||||
|
|
||||||
public static String splitFailureBeforeReplicaCreation = null;
|
public static String splitFailureBeforeReplicaCreation = null;
|
||||||
|
|
||||||
|
public static String splitFailureAfterReplicaCreation = null;
|
||||||
|
|
||||||
public static String waitForReplicasInSync = "true:60";
|
public static String waitForReplicasInSync = "true:60";
|
||||||
|
|
||||||
public static String failIndexFingerprintRequests = null;
|
public static String failIndexFingerprintRequests = null;
|
||||||
|
@ -156,6 +158,7 @@ public class TestInjection {
|
||||||
updateRandomPause = null;
|
updateRandomPause = null;
|
||||||
randomDelayInCoreCreation = null;
|
randomDelayInCoreCreation = null;
|
||||||
splitFailureBeforeReplicaCreation = null;
|
splitFailureBeforeReplicaCreation = null;
|
||||||
|
splitFailureAfterReplicaCreation = null;
|
||||||
prepRecoveryOpPauseForever = null;
|
prepRecoveryOpPauseForever = null;
|
||||||
countPrepRecoveryOpPauseForever = new AtomicInteger(0);
|
countPrepRecoveryOpPauseForever = new AtomicInteger(0);
|
||||||
waitForReplicasInSync = "true:60";
|
waitForReplicasInSync = "true:60";
|
||||||
|
@ -386,21 +389,28 @@ public class TestInjection {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
public static boolean injectSplitFailureBeforeReplicaCreation() {
|
private static boolean injectSplitFailure(String probability, String label) {
|
||||||
if (splitFailureBeforeReplicaCreation != null) {
|
if (probability != null) {
|
||||||
Random rand = random();
|
Random rand = random();
|
||||||
if (null == rand) return true;
|
if (null == rand) return true;
|
||||||
|
|
||||||
Pair<Boolean,Integer> pair = parseValue(splitFailureBeforeReplicaCreation);
|
Pair<Boolean,Integer> pair = parseValue(probability);
|
||||||
boolean enabled = pair.first();
|
boolean enabled = pair.first();
|
||||||
int chanceIn100 = pair.second();
|
int chanceIn100 = pair.second();
|
||||||
if (enabled && rand.nextInt(100) >= (100 - chanceIn100)) {
|
if (enabled && rand.nextInt(100) >= (100 - chanceIn100)) {
|
||||||
log.info("Injecting failure in creating replica for sub-shard");
|
log.info("Injecting failure: " + label);
|
||||||
throw new SolrException(ErrorCode.SERVER_ERROR, "Unable to create replica");
|
throw new SolrException(ErrorCode.SERVER_ERROR, "Error: " + label);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
return true;
|
public static boolean injectSplitFailureBeforeReplicaCreation() {
|
||||||
|
return injectSplitFailure(splitFailureBeforeReplicaCreation, "before creating replica for sub-shard");
|
||||||
|
}
|
||||||
|
|
||||||
|
public static boolean injectSplitFailureAfterReplicaCreation() {
|
||||||
|
return injectSplitFailure(splitFailureAfterReplicaCreation, "after creating replica for sub-shard");
|
||||||
}
|
}
|
||||||
|
|
||||||
@SuppressForbidden(reason = "Need currentTimeMillis, because COMMIT_TIME_MSEC_KEY use currentTimeMillis as value")
|
@SuppressForbidden(reason = "Need currentTimeMillis, because COMMIT_TIME_MSEC_KEY use currentTimeMillis as value")
|
||||||
|
|
|
@ -28,6 +28,7 @@ import java.util.Set;
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
import org.apache.lucene.util.LuceneTestCase.Slow;
|
import org.apache.lucene.util.LuceneTestCase.Slow;
|
||||||
import org.apache.solr.client.solrj.SolrClient;
|
import org.apache.solr.client.solrj.SolrClient;
|
||||||
|
@ -279,13 +280,12 @@ public class ShardSplitTest extends BasicDistributedZkTest {
|
||||||
ClusterState state = zkStateReader.getClusterState();
|
ClusterState state = zkStateReader.getClusterState();
|
||||||
DocCollection collection = state.getCollection(AbstractDistribZkTestBase.DEFAULT_COLLECTION);
|
DocCollection collection = state.getCollection(AbstractDistribZkTestBase.DEFAULT_COLLECTION);
|
||||||
|
|
||||||
|
// should be cleaned up
|
||||||
Slice shard10 = collection.getSlice(SHARD1_0);
|
Slice shard10 = collection.getSlice(SHARD1_0);
|
||||||
assertEquals(Slice.State.CONSTRUCTION, shard10.getState());
|
assertNull(shard10);
|
||||||
assertEquals(1, shard10.getReplicas().size());
|
|
||||||
|
|
||||||
Slice shard11 = collection.getSlice(SHARD1_1);
|
Slice shard11 = collection.getSlice(SHARD1_1);
|
||||||
assertEquals(Slice.State.CONSTRUCTION, shard11.getState());
|
assertNull(shard11);
|
||||||
assertEquals(1, shard11.getReplicas().size());
|
|
||||||
|
|
||||||
// lets retry the split
|
// lets retry the split
|
||||||
TestInjection.reset(); // let the split succeed
|
TestInjection.reset(); // let the split succeed
|
||||||
|
@ -304,6 +304,95 @@ public class ShardSplitTest extends BasicDistributedZkTest {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSplitAfterFailedSplit2() throws Exception {
|
||||||
|
waitForThingsToLevelOut(15);
|
||||||
|
|
||||||
|
TestInjection.splitFailureAfterReplicaCreation = "true:100"; // we definitely want split to fail
|
||||||
|
try {
|
||||||
|
try {
|
||||||
|
CollectionAdminRequest.SplitShard splitShard = CollectionAdminRequest.splitShard(AbstractDistribZkTestBase.DEFAULT_COLLECTION);
|
||||||
|
splitShard.setShardName(SHARD1);
|
||||||
|
splitShard.process(cloudClient);
|
||||||
|
fail("Shard split was not supposed to succeed after failure injection!");
|
||||||
|
} catch (Exception e) {
|
||||||
|
// expected
|
||||||
|
}
|
||||||
|
|
||||||
|
// assert that sub-shards cores exist and sub-shard is in construction state
|
||||||
|
ZkStateReader zkStateReader = cloudClient.getZkStateReader();
|
||||||
|
zkStateReader.forceUpdateCollection(AbstractDistribZkTestBase.DEFAULT_COLLECTION);
|
||||||
|
ClusterState state = zkStateReader.getClusterState();
|
||||||
|
DocCollection collection = state.getCollection(AbstractDistribZkTestBase.DEFAULT_COLLECTION);
|
||||||
|
|
||||||
|
// should be cleaned up
|
||||||
|
Slice shard10 = collection.getSlice(SHARD1_0);
|
||||||
|
assertNull(shard10);
|
||||||
|
|
||||||
|
Slice shard11 = collection.getSlice(SHARD1_1);
|
||||||
|
assertNull(shard11);
|
||||||
|
|
||||||
|
// lets retry the split
|
||||||
|
TestInjection.reset(); // let the split succeed
|
||||||
|
try {
|
||||||
|
CollectionAdminRequest.SplitShard splitShard = CollectionAdminRequest.splitShard(AbstractDistribZkTestBase.DEFAULT_COLLECTION);
|
||||||
|
splitShard.setShardName(SHARD1);
|
||||||
|
splitShard.process(cloudClient);
|
||||||
|
// Yay!
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("Shard split failed", e);
|
||||||
|
fail("Shard split did not succeed after a previous failed split attempt left sub-shards in construction state");
|
||||||
|
}
|
||||||
|
|
||||||
|
} finally {
|
||||||
|
TestInjection.reset();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSplitMixedReplicaTypes() throws Exception {
|
||||||
|
waitForThingsToLevelOut(15);
|
||||||
|
String collectionName = "testSplitMixedReplicaTypes";
|
||||||
|
CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(collectionName, "conf1", 1, 2, 2, 2);
|
||||||
|
create.setMaxShardsPerNode(5); // some high number so we can create replicas without hindrance
|
||||||
|
create.process(cloudClient);
|
||||||
|
waitForRecoveriesToFinish(collectionName, false);
|
||||||
|
|
||||||
|
CollectionAdminRequest.SplitShard splitShard = CollectionAdminRequest.splitShard(collectionName);
|
||||||
|
splitShard.setShardName(SHARD1);
|
||||||
|
splitShard.process(cloudClient);
|
||||||
|
waitForThingsToLevelOut(15);
|
||||||
|
|
||||||
|
cloudClient.getZkStateReader().forceUpdateCollection(collectionName);
|
||||||
|
ClusterState clusterState = cloudClient.getZkStateReader().getClusterState();
|
||||||
|
DocCollection coll = clusterState.getCollection(collectionName);
|
||||||
|
log.info("coll: " + coll);
|
||||||
|
|
||||||
|
// verify the original shard
|
||||||
|
verifyShard(coll, SHARD1, Slice.State.INACTIVE, 2, 2, 2);
|
||||||
|
// verify new sub-shards
|
||||||
|
verifyShard(coll, SHARD1_0, Slice.State.ACTIVE, 2, 2, 2);
|
||||||
|
verifyShard(coll, SHARD1_1, Slice.State.ACTIVE, 2, 2, 2);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void verifyShard(DocCollection coll, String shard, Slice.State expectedState, int numNrt, int numTlog, int numPull) throws Exception {
|
||||||
|
Slice s = coll.getSlice(shard);
|
||||||
|
assertEquals("unexpected shard state", expectedState, s.getState());
|
||||||
|
AtomicInteger actualNrt = new AtomicInteger();
|
||||||
|
AtomicInteger actualTlog = new AtomicInteger();
|
||||||
|
AtomicInteger actualPull = new AtomicInteger();
|
||||||
|
s.getReplicas().forEach(r -> {
|
||||||
|
switch (r.getType()) {
|
||||||
|
case NRT: actualNrt.incrementAndGet(); break;
|
||||||
|
case TLOG: actualTlog.incrementAndGet(); break;
|
||||||
|
case PULL: actualPull.incrementAndGet(); break;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
assertEquals("actual NRT", numNrt, actualNrt.get());
|
||||||
|
assertEquals("actual TLOG", numTlog, actualTlog.get());
|
||||||
|
assertEquals("actual PULL", numPull, actualPull.get());
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@BadApple(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028")
|
@BadApple(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028")
|
||||||
public void testSplitWithChaosMonkey() throws Exception {
|
public void testSplitWithChaosMonkey() throws Exception {
|
||||||
|
|
|
@ -969,7 +969,7 @@ public class SimClusterStateProvider implements ClusterStateProvider {
|
||||||
|
|
||||||
opDelay(collectionName, CollectionParams.CollectionAction.SPLITSHARD.name());
|
opDelay(collectionName, CollectionParams.CollectionAction.SPLITSHARD.name());
|
||||||
|
|
||||||
SplitShardCmd.fillRanges(cloudManager, message, collection, parentSlice, subRanges, subSlices, subShardNames);
|
SplitShardCmd.fillRanges(cloudManager, message, collection, parentSlice, subRanges, subSlices, subShardNames, true);
|
||||||
// add replicas for new subShards
|
// add replicas for new subShards
|
||||||
int repFactor = parentSlice.getReplicas().size();
|
int repFactor = parentSlice.getReplicas().size();
|
||||||
List<ReplicaPosition> replicaPositions = Assign.identifyNodes(cloudManager,
|
List<ReplicaPosition> replicaPositions = Assign.identifyNodes(cloudManager,
|
||||||
|
|
Binary file not shown.
After Width: | Height: | Size: 97 KiB |
|
@ -0,0 +1,115 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
@startuml
|
||||||
|
|
||||||
|
' Note: in TEOZ mode som skinparams don't work
|
||||||
|
' and currently notes on messages can't be customized -
|
||||||
|
' but this mode creates more compact layout, enable if needed
|
||||||
|
|
||||||
|
'!pragma teoz true
|
||||||
|
|
||||||
|
skinparam noteFontSize 10
|
||||||
|
skinparam ParticipantPadding 5
|
||||||
|
skinparam SequenceGroupBorderThickness 1
|
||||||
|
skinparam SequenceGroupBorderColor #blue
|
||||||
|
skinparam SequenceDividerBorderThickness 1
|
||||||
|
skinparam SequenceDividerFontStyle plain
|
||||||
|
skinparam sequence {
|
||||||
|
MessageAlign center
|
||||||
|
}
|
||||||
|
|
||||||
|
title Split shard into N parts with RF replicas
|
||||||
|
|
||||||
|
actor User
|
||||||
|
control Overseer
|
||||||
|
entity ShardLeader
|
||||||
|
collections OtherNodes
|
||||||
|
|
||||||
|
User -> Overseer : SPLITSHARD
|
||||||
|
activate Overseer
|
||||||
|
== Prepare ==
|
||||||
|
Overseer -> Overseer : checkLeaderDiskSpace
|
||||||
|
Overseer -> Overseer : fillRanges(N)
|
||||||
|
loop 1..N
|
||||||
|
Overseer -> Overseer : deleteOldSubShard
|
||||||
|
rnote right: left-overs from a\nprevious failed split
|
||||||
|
end
|
||||||
|
== Sub shards construction ==
|
||||||
|
loop 1..N
|
||||||
|
Overseer -> Overseer : createSubShard
|
||||||
|
rnote right : subShard state\nCONSTRUCTION
|
||||||
|
Overseer -> Overseer : waitForSubShard
|
||||||
|
Overseer --> Overseer : addSubLeader
|
||||||
|
rnote right : placeholder, no core,\nsame node as parent
|
||||||
|
end
|
||||||
|
Overseer -> Overseer : waitForSubLeadersVisible
|
||||||
|
loop 1..N
|
||||||
|
Overseer -> Overseer : waitForCoreNodeVisible
|
||||||
|
Overseer --> Overseer : setSubLeaderActive
|
||||||
|
end
|
||||||
|
Overseer -> Overseer : waitForSubLeadersActive
|
||||||
|
== Split parent shard leader ==
|
||||||
|
Overseer --> ShardLeader : splitParentCore
|
||||||
|
activate ShardLeader
|
||||||
|
ShardLeader --> Overseer : splitComplete
|
||||||
|
deactivate ShardLeader
|
||||||
|
loop 1..N
|
||||||
|
Overseer --> ShardLeader : applyBufferedUpdates
|
||||||
|
activate ShardLeader
|
||||||
|
end
|
||||||
|
ShardLeader --> Overseer : updatesComplete
|
||||||
|
deactivate ShardLeader
|
||||||
|
== Create sub replicas ==
|
||||||
|
loop N * (RF-1)
|
||||||
|
Overseer --> Overseer : addSubReplica
|
||||||
|
rnote right : placeholder, no core,\nstate DOWN, other node
|
||||||
|
end
|
||||||
|
alt
|
||||||
|
Overseer -> Overseer : checkParentStillLeader
|
||||||
|
else
|
||||||
|
Overseer -> User : changed / error
|
||||||
|
end
|
||||||
|
Overseer -> Overseer : updateSubShardStates
|
||||||
|
rnote right : subShards in RECOVERY or\nACTIVE if RF=1
|
||||||
|
loop N * (RF-1)
|
||||||
|
Overseer --> OtherNodes : createSubReplicaCore
|
||||||
|
activate OtherNodes
|
||||||
|
end
|
||||||
|
OtherNodes --> Overseer : createComplete
|
||||||
|
Overseer -> User : success
|
||||||
|
deactivate Overseer
|
||||||
|
...
|
||||||
|
OtherNodes ---> Overseer : replicasRecovering
|
||||||
|
...
|
||||||
|
alt
|
||||||
|
OtherNodes --> Overseer : allReplicasActive
|
||||||
|
activate Overseer
|
||||||
|
Overseer -> Overseer : switchShards
|
||||||
|
rnote right : parent shard INACTIVE\nsub shards ACTIVE
|
||||||
|
else
|
||||||
|
alt
|
||||||
|
OtherNodes --> Overseer : someReplicasFailed
|
||||||
|
else
|
||||||
|
Overseer --> Overseer : parentShardLeaderChanged
|
||||||
|
end
|
||||||
|
deactivate OtherNodes
|
||||||
|
Overseer -> Overseer : shardRecoveryFailed
|
||||||
|
rnote right : parent shard ACTIVE\nsub shards RECOVERY_FAILED
|
||||||
|
end
|
||||||
|
deactivate Overseer
|
||||||
|
@enduml
|
Loading…
Reference in New Issue