mirror of https://github.com/apache/lucene.git
SOLR-1095: Refactor code to standardize replica assignment
This commit is contained in:
parent
196d84b9e0
commit
15118d40c5
|
@ -19,7 +19,6 @@ package org.apache.solr.cloud;
|
|||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
import java.lang.invoke.MethodHandles;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
|
@ -37,16 +36,12 @@ import com.google.common.collect.ImmutableMap;
|
|||
import org.apache.commons.lang.StringUtils;
|
||||
import org.apache.solr.client.solrj.SolrResponse;
|
||||
import org.apache.solr.client.solrj.SolrServerException;
|
||||
import org.apache.solr.client.solrj.cloud.autoscaling.Policy;
|
||||
import org.apache.solr.client.solrj.impl.HttpSolrClient;
|
||||
import org.apache.solr.client.solrj.impl.HttpSolrClient.RemoteSolrException;
|
||||
import org.apache.solr.client.solrj.request.AbstractUpdateRequest;
|
||||
import org.apache.solr.client.solrj.request.UpdateRequest;
|
||||
import org.apache.solr.client.solrj.response.UpdateResponse;
|
||||
import org.apache.solr.cloud.overseer.OverseerAction;
|
||||
import org.apache.solr.cloud.rule.ReplicaAssigner;
|
||||
import org.apache.solr.cloud.rule.ReplicaAssigner.Position;
|
||||
import org.apache.solr.cloud.rule.Rule;
|
||||
import org.apache.solr.common.SolrException;
|
||||
import org.apache.solr.common.SolrException.ErrorCode;
|
||||
import org.apache.solr.common.cloud.ClusterState;
|
||||
|
@ -93,7 +88,6 @@ import static org.apache.solr.common.cloud.ZkStateReader.PROPERTY_VALUE_PROP;
|
|||
import static org.apache.solr.common.cloud.ZkStateReader.REJOIN_AT_HEAD_PROP;
|
||||
import static org.apache.solr.common.cloud.ZkStateReader.REPLICA_PROP;
|
||||
import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
|
||||
import static org.apache.solr.common.cloud.ZkStateReader.SOLR_AUTOSCALING_CONF_PATH;
|
||||
import static org.apache.solr.common.params.CollectionParams.CollectionAction.*;
|
||||
import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
|
||||
import static org.apache.solr.common.params.CommonParams.NAME;
|
||||
|
@ -632,30 +626,6 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
|
|||
}
|
||||
}
|
||||
|
||||
static List<String> getLiveOrLiveAndCreateNodeSetList(final Set<String> liveNodes, final ZkNodeProps message, final Random random) {
|
||||
// TODO: add smarter options that look at the current number of cores per
|
||||
// node?
|
||||
// for now we just go random (except when createNodeSet and createNodeSet.shuffle=false are passed in)
|
||||
|
||||
List<String> nodeList;
|
||||
|
||||
final String createNodeSetStr = message.getStr(CREATE_NODE_SET);
|
||||
final List<String> createNodeList = (createNodeSetStr == null)?null:StrUtils.splitSmart((CREATE_NODE_SET_EMPTY.equals(createNodeSetStr)?"":createNodeSetStr), ",", true);
|
||||
|
||||
if (createNodeList != null) {
|
||||
nodeList = new ArrayList<>(createNodeList);
|
||||
nodeList.retainAll(liveNodes);
|
||||
if (message.getBool(CREATE_NODE_SET_SHUFFLE, CREATE_NODE_SET_SHUFFLE_DEFAULT)) {
|
||||
Collections.shuffle(nodeList, random);
|
||||
}
|
||||
} else {
|
||||
nodeList = new ArrayList<>(liveNodes);
|
||||
Collections.shuffle(nodeList, random);
|
||||
}
|
||||
|
||||
return nodeList;
|
||||
}
|
||||
|
||||
|
||||
private void modifyCollection(ClusterState clusterState, ZkNodeProps message, NamedList results)
|
||||
throws KeeperException, InterruptedException {
|
||||
|
@ -704,66 +674,6 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
|
|||
commandMap.get(DELETE).call(zkStateReader.getClusterState(), new ZkNodeProps(props), results);
|
||||
}
|
||||
|
||||
Map<Position, String> identifyNodes(ClusterState clusterState,
|
||||
List<String> nodeList,
|
||||
String collectionName,
|
||||
ZkNodeProps message,
|
||||
List<String> shardNames,
|
||||
int numNrtReplicas,
|
||||
int numTlogReplicas,
|
||||
int numPullReplicas) throws KeeperException, InterruptedException {
|
||||
List<Map> rulesMap = (List) message.get("rule");
|
||||
String policyName = message.getStr(POLICY);
|
||||
Map autoScalingJson = Utils.getJson(zkStateReader.getZkClient(), SOLR_AUTOSCALING_CONF_PATH, true);
|
||||
|
||||
if (rulesMap == null && policyName == null) {
|
||||
int i = 0;
|
||||
Map<Position, String> result = new HashMap<>();
|
||||
for (String aShard : shardNames) {
|
||||
for (int j = 0; j < numNrtReplicas; j++){
|
||||
result.put(new Position(aShard, j, Replica.Type.NRT), nodeList.get(i % nodeList.size()));
|
||||
i++;
|
||||
}
|
||||
for (int j = 0; j < numTlogReplicas; j++){
|
||||
result.put(new Position(aShard, j, Replica.Type.TLOG), nodeList.get(i % nodeList.size()));
|
||||
i++;
|
||||
}
|
||||
for (int j = 0; j < numPullReplicas; j++){
|
||||
result.put(new Position(aShard, j, Replica.Type.PULL), nodeList.get(i % nodeList.size()));
|
||||
i++;
|
||||
}
|
||||
}
|
||||
return result;
|
||||
} else {
|
||||
if (numTlogReplicas + numPullReplicas != 0) {
|
||||
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
|
||||
Replica.Type.TLOG + " or " + Replica.Type.PULL + " replica types not supported with placement rules or cluster policies");
|
||||
}
|
||||
}
|
||||
|
||||
if (policyName != null || autoScalingJson.get(Policy.CLUSTER_POLICY) != null) {
|
||||
return Assign.getPositionsUsingPolicy(collectionName,
|
||||
shardNames, numNrtReplicas, policyName, zkStateReader, nodeList);
|
||||
|
||||
} else {
|
||||
List<Rule> rules = new ArrayList<>();
|
||||
for (Object map : rulesMap) rules.add(new Rule((Map) map));
|
||||
|
||||
Map<String, Integer> sharVsReplicaCount = new HashMap<>();
|
||||
|
||||
for (String shard : shardNames) sharVsReplicaCount.put(shard, numNrtReplicas);
|
||||
ReplicaAssigner replicaAssigner = new ReplicaAssigner(rules,
|
||||
sharVsReplicaCount,
|
||||
(List<Map>) message.get(SNITCH),
|
||||
new HashMap<>(),//this is a new collection. So, there are no nodes in any shard
|
||||
nodeList,
|
||||
overseer.getZkController().getCoreContainer(),
|
||||
clusterState);
|
||||
|
||||
return replicaAssigner.getNodeMappings();
|
||||
}
|
||||
}
|
||||
|
||||
Map<String, Replica> waitToSeeReplicasInState(String collectionName, Collection<String> coreNames) throws InterruptedException {
|
||||
Map<String, Replica> result = new HashMap<>();
|
||||
TimeOut timeout = new TimeOut(30, TimeUnit.SECONDS);
|
||||
|
|
Loading…
Reference in New Issue