SOLR-7577: Add support for rules in CREATESHARD and ADDREPLICA

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1680870 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Noble Paul 2015-05-21 13:24:32 +00:00
parent 961960f1c2
commit e996532668
7 changed files with 129 additions and 56 deletions

View File

@ -182,7 +182,8 @@ New Features
* SOLR-7231: DIH-TikaEntityprocessor, create lat-lon field from Metadata
(Tim Allison via Noble Paul)
* SOLR-6220: Rule Based Replica Assignment during collection creation (Noble Paul)
* SOLR-6220: Rule Based Replica Assignment during collection, shard creation
and replica creation (Noble Paul)
* SOLR-6968: New 'cardinality' option for stats.field, uses HyperLogLog to efficiently
estimate the cardinality of a field w/bounded RAM. (hossman)

View File

@ -17,6 +17,8 @@ package org.apache.solr.cloud;
* the License.
*/
import org.apache.solr.cloud.rule.ReplicaAssigner;
import org.apache.solr.cloud.rule.Rule;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection;
@ -24,6 +26,7 @@ import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.util.StrUtils;
import org.apache.solr.core.CoreContainer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -32,6 +35,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
@ -41,6 +45,7 @@ import java.util.regex.Pattern;
import static org.apache.solr.cloud.OverseerCollectionProcessor.CREATE_NODE_SET;
import static org.apache.solr.cloud.OverseerCollectionProcessor.NUM_SLICES;
import static org.apache.solr.common.cloud.ZkStateReader.MAX_SHARDS_PER_NODE;
import static org.apache.solr.common.cloud.ZkStateReader.REPLICATION_FACTOR;
public class Assign {
@ -129,10 +134,14 @@ public class Assign {
}
}
public static ArrayList<Node> getNodesForNewShard(ClusterState clusterState, String collectionName, int numSlices, int maxShardsPerNode, int repFactor, String createNodeSetStr) {
public static List<Node> getNodesForNewShard(ClusterState clusterState, String collectionName,String shard,int numberOfNodes,
String createNodeSetStr, CoreContainer cc) {
DocCollection coll = clusterState.getCollection(collectionName);
Integer maxShardsPerNode = coll.getInt(MAX_SHARDS_PER_NODE, 1);
Integer repFactor = coll.getInt(REPLICATION_FACTOR, 1);
int numSlices = coll.getSlices().size();
List<String> createNodeList = createNodeSetStr == null ? null: StrUtils.splitSmart(createNodeSetStr, ",", true);
Set<String> nodes = clusterState.getLiveNodes();
List<String> nodeList = new ArrayList<>(nodes.size());
@ -191,6 +200,37 @@ public class Assign {
+ " shards to be created (higher than the allowed number)");
}
List l = (List) coll.get(DocCollection.RULE);
if(l != null) {
ArrayList<Rule> rules = new ArrayList<>();
for (Object o : l) rules.add(new Rule((Map) o));
Map<String, Map<String,Integer>> shardVsNodes = new LinkedHashMap<>();
for (Slice slice : coll.getSlices()) {
LinkedHashMap<String, Integer> n = new LinkedHashMap<>();
shardVsNodes.put(slice.getName(), n);
for (Replica replica : slice.getReplicas()) {
Integer count = n.get(replica.getNodeName());
if(count == null) count = 0;
n.put(replica.getNodeName(),++count);
}
}
List snitches = (List) coll.get(DocCollection.SNITCH);
List<String> nodesList = createNodeList == null ?
new ArrayList<>(clusterState.getLiveNodes()) :
createNodeList ;
Map<ReplicaAssigner.Position, String> positions = new ReplicaAssigner(
rules,
Collections.singletonMap(shard, numberOfNodes),
snitches,
shardVsNodes,
nodesList, cc, clusterState).getNodeMappings();
List<Node> n = new ArrayList<>();
for (String s : positions.values()) n.add(new Node(s));
return n;
}else {
ArrayList<Node> sortedNodeList = new ArrayList<>(nodeNameVsShardCount.values());
Collections.sort(sortedNodeList, new Comparator<Node>() {
@Override
@ -200,5 +240,6 @@ public class Assign {
});
return sortedNodeList;
}
}
}

View File

@ -18,6 +18,7 @@ package org.apache.solr.cloud;
*/
import static org.apache.solr.cloud.Assign.*;
import static org.apache.solr.common.cloud.DocCollection.SNITCH;
import static org.apache.solr.common.cloud.ZkNodeProps.makeMap;
import static org.apache.solr.common.cloud.ZkStateReader.*;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.*;
@ -142,7 +143,7 @@ public class OverseerCollectionProcessor implements Runnable, Closeable {
ZkStateReader.MAX_SHARDS_PER_NODE, "1",
ZkStateReader.AUTO_ADD_REPLICAS, "false",
DocCollection.RULE, null,
DocCollection.SNITCH, null));
SNITCH, null));
static final Random RANDOM;
static {
@ -1296,11 +1297,10 @@ public class OverseerCollectionProcessor implements Runnable, Closeable {
ShardHandler shardHandler = shardHandlerFactory.getShardHandler();
DocCollection collection = clusterState.getCollection(collectionName);
int maxShardsPerNode = collection.getInt(MAX_SHARDS_PER_NODE, 1);
int repFactor = message.getInt(REPLICATION_FACTOR, collection.getInt(REPLICATION_FACTOR, 1));
String createNodeSetStr = message.getStr(CREATE_NODE_SET);
ArrayList<Node> sortedNodeList = getNodesForNewShard(clusterState, collectionName, numSlices, maxShardsPerNode, repFactor, createNodeSetStr);
List<Node> sortedNodeList = getNodesForNewShard(clusterState, collectionName, sliceName, repFactor,
createNodeSetStr, overseer.getZkController().getCoreContainer());
Overseer.getInQueue(zkStateReader.getZkClient()).offer(ZkStateReader.toJSON(message));
// wait for a while until we see the shard
@ -2483,31 +2483,15 @@ public class OverseerCollectionProcessor implements Runnable, Closeable {
Map<String, Integer> sharVsReplicaCount = new HashMap<>();
for (String shard : shardNames) sharVsReplicaCount.put(shard, repFactor);
maps = (List<Map>) message.get("snitch");
List snitchList = maps == null? Collections.emptyList(): maps;
ReplicaAssigner replicaAssigner = new ReplicaAssigner(rules,
sharVsReplicaCount,
snitchList,
(List<Map>) message.get(SNITCH),
new HashMap<>(),//this is a new collection. So, there are no nodes in any shard
nodeList,
overseer.getZkController().getCoreContainer(),
clusterState);
Map<Position, String> nodeMappings = replicaAssigner.getNodeMappings();
if(nodeMappings == null){
String msg = "Could not identify nodes matching the rules " + rules ;
if(!replicaAssigner.failedNodes.isEmpty()){
Map<String, String> failedNodes = new HashMap<>();
for (Map.Entry<String, SnitchContext> e : replicaAssigner.failedNodes.entrySet()) {
failedNodes.put(e.getKey(), e.getValue().getErrMsg());
}
msg+=" Some nodes where excluded from assigning replicas because tags could not be obtained from them "+ failedNodes;
}
msg+= ZkStateReader.toJSONString(replicaAssigner.getNodeVsTags());
throw new SolrException(ErrorCode.BAD_REQUEST, msg);
}
return nodeMappings;
return replicaAssigner.getNodeMappings();
}
private Map<String, Replica> waitToSeeReplicasInState(String collectionName, Collection<String> coreNames) throws InterruptedException {
@ -2560,7 +2544,9 @@ public class OverseerCollectionProcessor implements Runnable, Closeable {
ShardHandler shardHandler = shardHandlerFactory.getShardHandler();
if (node == null) {
node = getNodesForNewShard(clusterState, collection, coll.getSlices().size(), coll.getInt(MAX_SHARDS_PER_NODE, 1), coll.getInt(REPLICATION_FACTOR, 1), null).get(0).nodeName;
node = getNodesForNewShard(clusterState, collection, shard, 1,
null, overseer.getZkController().getCoreContainer()).get(0).nodeName;
log.info("Node not provided, Identified {} for creating new replica", node);
}

View File

@ -36,6 +36,7 @@ import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.util.StrUtils;
import org.apache.solr.core.CoreContainer;
import org.slf4j.Logger;
@ -55,7 +56,7 @@ public class ReplicaAssigner {
List<Rule> rules;
Map<String, Integer> shardVsReplicaCount;
Map<String, Map<String, Object>> nodeVsTags;
Map<String, Set<String>> shardVsNodes;
Map<String, HashMap<String, Integer>> shardVsNodes;
List<String> liveNodes;
Set<String> tagNames = new HashSet<>();
private Map<String, AtomicInteger> nodeVsCores = new HashMap<>();
@ -93,7 +94,7 @@ public class ReplicaAssigner {
public ReplicaAssigner(List<Rule> rules,
Map<String, Integer> shardVsReplicaCount,
List snitches,
Map<String, Set<String>> shardVsNodes,
Map<String, Map<String, Integer>> shardVsNodes,
List<String> liveNodes,
CoreContainer cc, ClusterState clusterState) {
this.rules = rules;
@ -129,6 +130,27 @@ public class ReplicaAssigner {
* the specified rule
*/
public Map<Position, String> getNodeMappings() {
Map<Position, String> result = getNodeMappings0();
if (result == null) {
String msg = "Could not identify nodes matching the rules " + rules;
if (!failedNodes.isEmpty()) {
Map<String, String> failedNodes = new HashMap<>();
for (Map.Entry<String, SnitchContext> e : this.failedNodes.entrySet()) {
failedNodes.put(e.getKey(), e.getValue().getErrMsg());
}
msg += " Some nodes where excluded from assigning replicas because tags could not be obtained from them " + failedNodes;
}
msg += "\n tag values" + ZkStateReader.toJSONString(getNodeVsTags());
if (!shardVsNodes.isEmpty()) {
msg += "\nInitial state for the coll : " + ZkStateReader.toJSONString(shardVsNodes);
}
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, msg);
}
return result;
}
Map<Position, String> getNodeMappings0() {
List<String> shardNames = new ArrayList<>(shardVsReplicaCount.keySet());
int[] shardOrder = new int[shardNames.size()];
for (int i = 0; i < shardNames.size(); i++) shardOrder[i] = i;
@ -152,7 +174,6 @@ public class ReplicaAssigner {
result = tryAllPermutations(shardNames, shardOrder, nonWildCardShardRules, true);
}
return result;
}
private Map<Position, String> tryAllPermutations(List<String> shardNames,
@ -191,7 +212,7 @@ public class ReplicaAssigner {
Map<String, Map<String, Object>> nodeVsTagsCopy = getDeepCopy(nodeVsTags, 2);
Map<Position, String> result = new LinkedHashMap<>();
int startPosition = 0;
Map<String, Set<String>> copyOfCurrentState = getDeepCopy(shardVsNodes, 2);
Map<String, Map<String, Integer>> copyOfCurrentState = getDeepCopy(shardVsNodes, 2);
List<String> sortedLiveNodes = new ArrayList<>(this.liveNodes);
Collections.sort(sortedLiveNodes, new Comparator<String>() {
@Override
@ -235,9 +256,11 @@ public class ReplicaAssigner {
//We have reached this far means this node can be applied to this position
//and all rules are fine. So let us change the currentState
result.put(position, liveNode);
Set<String> nodeNames = copyOfCurrentState.get(position.shard);
if (nodeNames == null) copyOfCurrentState.put(position.shard, nodeNames = new HashSet<>());
nodeNames.add(liveNode);
Map<String, Integer> nodeNames = copyOfCurrentState.get(position.shard);
if (nodeNames == null) copyOfCurrentState.put(position.shard, nodeNames = new HashMap<>());
Integer n = nodeNames.get(liveNode);
n = n == null ? 1 : n + 1;
nodeNames.put(liveNode, n);
Number coreCount = (Number) nodeVsTagsCopy.get(liveNode).get(ImplicitSnitch.CORES);
if (coreCount != null) {
nodeVsTagsCopy.get(liveNode).put(ImplicitSnitch.CORES, coreCount.intValue() + 1);
@ -431,8 +454,8 @@ public class ReplicaAssigner {
static Map<Class, SnitchInfoImpl> getSnitchInfos(CoreContainer cc, List snitchConf) {
if (snitchConf == null) snitchConf = Collections.emptyList();
Map<Class, SnitchInfoImpl> snitches = new LinkedHashMap<>();
if (snitchConf == null) return snitches;
for (Object o : snitchConf) {
//instantiating explicitly specified snitches
String klas = null;

View File

@ -119,7 +119,7 @@ public class Rule {
* @return MatchStatus
*/
MatchStatus tryAssignNodeToShard(String testNode,
Map<String, Set<String>> shardVsNodeSet,
Map<String, Map<String,Integer>> shardVsNodeSet,
Map<String, Map<String, Object>> nodeVsTags,
String shardName, Phase phase) {
@ -153,21 +153,21 @@ public class Rule {
private int getNumberOfNodesWithSameTagVal(Condition shardCondition,
Map<String, Map<String, Object>> nodeVsTags,
Map<String, Set<String>> shardVsNodeSet,
Map<String, Map<String,Integer>> shardVsNodeSet,
String shardName,
Condition tagCondition,
Phase phase) {
int countMatchingThisTagValue = 0;
for (Map.Entry<String, Set<String>> entry : shardVsNodeSet.entrySet()) {
for (Map.Entry<String, Map<String,Integer>> entry : shardVsNodeSet.entrySet()) {
//check if this shard is relevant. either it is a ANY Wild card (**)
// or this shard is same as the shard in question
if (shardCondition.val.equals(WILD_WILD_CARD) || entry.getKey().equals(shardName)) {
Set<String> nodesInThisShard = shardVsNodeSet.get(shardCondition.val.equals(WILD_WILD_CARD) ? entry.getKey() : shardName);
Map<String,Integer> nodesInThisShard = shardVsNodeSet.get(shardCondition.val.equals(WILD_WILD_CARD) ? entry.getKey() : shardName);
if (nodesInThisShard != null) {
for (String aNode : nodesInThisShard) {
Object obj = nodeVsTags.get(aNode).get(tag.name);
if (tagCondition.canMatch(obj, phase)) countMatchingThisTagValue++;
for (Map.Entry<String,Integer> aNode : nodesInThisShard.entrySet()) {
Object obj = nodeVsTags.get(aNode.getKey()).get(tag.name);
if (tagCondition.canMatch(obj, phase)) countMatchingThisTagValue += aNode.getValue();
}
}
}
@ -177,7 +177,7 @@ public class Rule {
public int compare(String n1, String n2,
Map<String, Map<String, Object>> nodeVsTags,
Map<String, Set<String>> currentState) {
Map<String, Map<String,Integer>> currentState) {
return tag.compare(n1, n2, nodeVsTags);
}

View File

@ -29,6 +29,7 @@ import java.util.Set;
import org.apache.solr.SolrTestCaseJ4;
import org.apache.solr.cloud.rule.ReplicaAssigner.Position;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.ZkStateReader;
import org.junit.Test;
@ -129,7 +130,7 @@ public class RuleEngineTest extends SolrTestCaseJ4{
mapping = new ReplicaAssigner(
rules,
shardVsReplicaCount, Collections.singletonList(MockSnitch.class.getName()),
new HashMap(), new ArrayList<>(MockSnitch.nodeVsTags.keySet()), null, null).getNodeMappings();
new HashMap(), new ArrayList<>(MockSnitch.nodeVsTags.keySet()), null, null).getNodeMappings0();
assertNull(mapping);
@ -141,7 +142,7 @@ public class RuleEngineTest extends SolrTestCaseJ4{
mapping = new ReplicaAssigner(
rules,
shardVsReplicaCount, Collections.singletonList(MockSnitch.class.getName()),
new HashMap(), new ArrayList<>(MockSnitch.nodeVsTags.keySet()), null, null).getNodeMappings();
new HashMap(), new ArrayList<>(MockSnitch.nodeVsTags.keySet()), null, null).getNodeMappings0();
assertNotNull(mapping);
assertFalse(mapping.containsValue("127.0.0.2:49958_"));
@ -164,7 +165,7 @@ public class RuleEngineTest extends SolrTestCaseJ4{
mapping = new ReplicaAssigner(
rules,
shardVsReplicaCount, Collections.singletonList(MockSnitch.class.getName()),
new HashMap(), new ArrayList<>(MockSnitch.nodeVsTags.keySet()), null, null).getNodeMappings();
new HashMap(), new ArrayList<>(MockSnitch.nodeVsTags.keySet()), null, null).getNodeMappings0();
assertNull(mapping);
rules = parseRules(
@ -206,7 +207,7 @@ public class RuleEngineTest extends SolrTestCaseJ4{
Map<Position, String> mapping = new ReplicaAssigner(
rules,
shardVsReplicaCount, Collections.singletonList(MockSnitch.class.getName()),
new HashMap(), new ArrayList<>(MockSnitch.nodeVsTags.keySet()), null ,null).getNodeMappings();
new HashMap(), new ArrayList<>(MockSnitch.nodeVsTags.keySet()), null, null).getNodeMappings0();
assertNull(mapping);
rulesStr = "rack:*,replica:<2~";
rules = parse(Arrays.asList(rulesStr));

View File

@ -29,6 +29,7 @@ import org.apache.solr.client.solrj.response.CollectionAdminResponse;
import org.apache.solr.cloud.AbstractFullDistribZkTestBase;
import org.apache.solr.cloud.OverseerCollectionProcessor;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.ImplicitDocRouter;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.core.CoreContainer;
@ -43,15 +44,17 @@ public class RulesTest extends AbstractFullDistribZkTestBase {
static final Logger log = LoggerFactory.getLogger(RulesTest.class);
@Test
@ShardsFixed(num = 5)
public void doIntegrationTest() throws Exception {
String rulesColl = "rulesColl";
try (SolrClient client = createNewSolrClient("", getBaseUrl((HttpSolrClient) clients.get(0)))) {
CollectionAdminResponse rsp;
CollectionAdminRequest.Create create = new CollectionAdminRequest.Create();
create.setCollectionName(rulesColl);
create.setNumShards(1);
create.setShards("shard1");
create.setRouterName(ImplicitDocRouter.NAME);
create.setReplicationFactor(2);
create.setRule("cores:<4", "node:*,replica:1", "freedisk:>1");
create.setRule("cores:<4", "node:*,replica:<2", "freedisk:>1");
create.setSnitch("class:ImplicitSnitch");
rsp = create.process(client);
assertEquals(0, rsp.getStatus());
@ -63,12 +66,30 @@ public class RulesTest extends AbstractFullDistribZkTestBase {
List list = (List) rulesCollection.get("rule");
assertEquals(3, list.size());
assertEquals ( "<4", ((Map)list.get(0)).get("cores"));
assertEquals("1", ((Map) list.get(1)).get("replica"));
assertEquals("<2", ((Map) list.get(1)).get("replica"));
assertEquals(">1", ((Map) list.get(2)).get("freedisk"));
list = (List) rulesCollection.get("snitch");
assertEquals(1, list.size());
assertEquals ( "ImplicitSnitch", ((Map)list.get(0)).get("class"));
try (SolrClient client = createNewSolrClient("", getBaseUrl((HttpSolrClient) clients.get(0)))) {
CollectionAdminResponse rsp;
CollectionAdminRequest.CreateShard createShard = new CollectionAdminRequest.CreateShard();
createShard.setCollectionName(rulesColl);
createShard.setShardName("shard2");
rsp = createShard.process(client);
assertEquals(0, rsp.getStatus());
assertTrue(rsp.isSuccess());
CollectionAdminRequest.AddReplica addReplica = new CollectionAdminRequest.AddReplica();
addReplica.setCollectionName(rulesColl);
addReplica.setShardName("shard2");
rsp = createShard.process(client);
assertEquals(0, rsp.getStatus());
assertTrue(rsp.isSuccess());
}
}
@Test