From e9965326683ed814929a1de7a37d6d478d0eca41 Mon Sep 17 00:00:00 2001 From: Noble Paul Date: Thu, 21 May 2015 13:24:32 +0000 Subject: [PATCH] SOLR-7577: Add support for rules in CREATESHARD and ADDREPLICA git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1680870 13f79535-47bb-0310-9956-ffa450edef68 --- solr/CHANGES.txt | 3 +- .../java/org/apache/solr/cloud/Assign.java | 59 ++++++++++++++++--- .../cloud/OverseerCollectionProcessor.java | 32 +++------- .../solr/cloud/rule/ReplicaAssigner.java | 39 +++++++++--- .../java/org/apache/solr/cloud/rule/Rule.java | 16 ++--- .../solr/cloud/rule/RuleEngineTest.java | 9 +-- .../org/apache/solr/cloud/rule/RulesTest.java | 27 ++++++++- 7 files changed, 129 insertions(+), 56 deletions(-) diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt index 503668504d3..f19fe77fa34 100644 --- a/solr/CHANGES.txt +++ b/solr/CHANGES.txt @@ -182,7 +182,8 @@ New Features * SOLR-7231: DIH-TikaEntityprocessor, create lat-lon field from Metadata (Tim Allison via Noble Paul) -* SOLR-6220: Rule Based Replica Assignment during collection creation (Noble Paul) +* SOLR-6220: Rule Based Replica Assignment during collection, shard creation + and replica creation (Noble Paul) * SOLR-6968: New 'cardinality' option for stats.field, uses HyperLogLog to efficiently estimate the cardinality of a field w/bounded RAM. (hossman) diff --git a/solr/core/src/java/org/apache/solr/cloud/Assign.java b/solr/core/src/java/org/apache/solr/cloud/Assign.java index a544852ebdb..64a8602d553 100644 --- a/solr/core/src/java/org/apache/solr/cloud/Assign.java +++ b/solr/core/src/java/org/apache/solr/cloud/Assign.java @@ -17,6 +17,8 @@ package org.apache.solr.cloud; * the License. */ +import org.apache.solr.cloud.rule.ReplicaAssigner; +import org.apache.solr.cloud.rule.Rule; import org.apache.solr.common.SolrException; import org.apache.solr.common.cloud.ClusterState; import org.apache.solr.common.cloud.DocCollection; @@ -24,6 +26,7 @@ import org.apache.solr.common.cloud.Replica; import org.apache.solr.common.cloud.Slice; import org.apache.solr.common.cloud.ZkStateReader; import org.apache.solr.common.util.StrUtils; +import org.apache.solr.core.CoreContainer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -32,6 +35,7 @@ import java.util.Collection; import java.util.Collections; import java.util.Comparator; import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Set; @@ -41,6 +45,7 @@ import java.util.regex.Pattern; import static org.apache.solr.cloud.OverseerCollectionProcessor.CREATE_NODE_SET; import static org.apache.solr.cloud.OverseerCollectionProcessor.NUM_SLICES; import static org.apache.solr.common.cloud.ZkStateReader.MAX_SHARDS_PER_NODE; +import static org.apache.solr.common.cloud.ZkStateReader.REPLICATION_FACTOR; public class Assign { @@ -129,10 +134,14 @@ public class Assign { } } - public static ArrayList getNodesForNewShard(ClusterState clusterState, String collectionName, int numSlices, int maxShardsPerNode, int repFactor, String createNodeSetStr) { + public static List getNodesForNewShard(ClusterState clusterState, String collectionName,String shard,int numberOfNodes, + String createNodeSetStr, CoreContainer cc) { + DocCollection coll = clusterState.getCollection(collectionName); + Integer maxShardsPerNode = coll.getInt(MAX_SHARDS_PER_NODE, 1); + Integer repFactor = coll.getInt(REPLICATION_FACTOR, 1); + int numSlices = coll.getSlices().size(); List createNodeList = createNodeSetStr == null ? null: StrUtils.splitSmart(createNodeSetStr, ",", true); - Set nodes = clusterState.getLiveNodes(); List nodeList = new ArrayList<>(nodes.size()); @@ -191,14 +200,46 @@ public class Assign { + " shards to be created (higher than the allowed number)"); } - ArrayList sortedNodeList = new ArrayList<>(nodeNameVsShardCount.values()); - Collections.sort(sortedNodeList, new Comparator() { - @Override - public int compare(Node x, Node y) { - return (x.weight() < y.weight()) ? -1 : ((x.weight() == y.weight()) ? 0 : 1); + List l = (List) coll.get(DocCollection.RULE); + if(l != null) { + ArrayList rules = new ArrayList<>(); + for (Object o : l) rules.add(new Rule((Map) o)); + Map> shardVsNodes = new LinkedHashMap<>(); + for (Slice slice : coll.getSlices()) { + LinkedHashMap n = new LinkedHashMap<>(); + shardVsNodes.put(slice.getName(), n); + for (Replica replica : slice.getReplicas()) { + Integer count = n.get(replica.getNodeName()); + if(count == null) count = 0; + n.put(replica.getNodeName(),++count); + } } - }); - return sortedNodeList; + List snitches = (List) coll.get(DocCollection.SNITCH); + List nodesList = createNodeList == null ? + new ArrayList<>(clusterState.getLiveNodes()) : + createNodeList ; + Map positions = new ReplicaAssigner( + rules, + Collections.singletonMap(shard, numberOfNodes), + snitches, + shardVsNodes, + nodesList, cc, clusterState).getNodeMappings(); + + List n = new ArrayList<>(); + for (String s : positions.values()) n.add(new Node(s)); + return n; + + }else { + + ArrayList sortedNodeList = new ArrayList<>(nodeNameVsShardCount.values()); + Collections.sort(sortedNodeList, new Comparator() { + @Override + public int compare(Node x, Node y) { + return (x.weight() < y.weight()) ? -1 : ((x.weight() == y.weight()) ? 0 : 1); + } + }); + return sortedNodeList; + } } } diff --git a/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java b/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java index 9f00eb2b638..917ea56ba32 100644 --- a/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java +++ b/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java @@ -18,6 +18,7 @@ package org.apache.solr.cloud; */ import static org.apache.solr.cloud.Assign.*; +import static org.apache.solr.common.cloud.DocCollection.SNITCH; import static org.apache.solr.common.cloud.ZkNodeProps.makeMap; import static org.apache.solr.common.cloud.ZkStateReader.*; import static org.apache.solr.common.params.CollectionParams.CollectionAction.*; @@ -142,7 +143,7 @@ public class OverseerCollectionProcessor implements Runnable, Closeable { ZkStateReader.MAX_SHARDS_PER_NODE, "1", ZkStateReader.AUTO_ADD_REPLICAS, "false", DocCollection.RULE, null, - DocCollection.SNITCH, null)); + SNITCH, null)); static final Random RANDOM; static { @@ -1296,11 +1297,10 @@ public class OverseerCollectionProcessor implements Runnable, Closeable { ShardHandler shardHandler = shardHandlerFactory.getShardHandler(); DocCollection collection = clusterState.getCollection(collectionName); - int maxShardsPerNode = collection.getInt(MAX_SHARDS_PER_NODE, 1); int repFactor = message.getInt(REPLICATION_FACTOR, collection.getInt(REPLICATION_FACTOR, 1)); String createNodeSetStr = message.getStr(CREATE_NODE_SET); - - ArrayList sortedNodeList = getNodesForNewShard(clusterState, collectionName, numSlices, maxShardsPerNode, repFactor, createNodeSetStr); + List sortedNodeList = getNodesForNewShard(clusterState, collectionName, sliceName, repFactor, + createNodeSetStr, overseer.getZkController().getCoreContainer()); Overseer.getInQueue(zkStateReader.getZkClient()).offer(ZkStateReader.toJSON(message)); // wait for a while until we see the shard @@ -2483,31 +2483,15 @@ public class OverseerCollectionProcessor implements Runnable, Closeable { Map sharVsReplicaCount = new HashMap<>(); for (String shard : shardNames) sharVsReplicaCount.put(shard, repFactor); - maps = (List) message.get("snitch"); - List snitchList = maps == null? Collections.emptyList(): maps; ReplicaAssigner replicaAssigner = new ReplicaAssigner(rules, sharVsReplicaCount, - snitchList, + (List) message.get(SNITCH), new HashMap<>(),//this is a new collection. So, there are no nodes in any shard nodeList, overseer.getZkController().getCoreContainer(), clusterState); - Map nodeMappings = replicaAssigner.getNodeMappings(); - if(nodeMappings == null){ - String msg = "Could not identify nodes matching the rules " + rules ; - if(!replicaAssigner.failedNodes.isEmpty()){ - Map failedNodes = new HashMap<>(); - for (Map.Entry e : replicaAssigner.failedNodes.entrySet()) { - failedNodes.put(e.getKey(), e.getValue().getErrMsg()); - } - msg+=" Some nodes where excluded from assigning replicas because tags could not be obtained from them "+ failedNodes; - } - msg+= ZkStateReader.toJSONString(replicaAssigner.getNodeVsTags()); - - throw new SolrException(ErrorCode.BAD_REQUEST, msg); - } - return nodeMappings; + return replicaAssigner.getNodeMappings(); } private Map waitToSeeReplicasInState(String collectionName, Collection coreNames) throws InterruptedException { @@ -2560,7 +2544,9 @@ public class OverseerCollectionProcessor implements Runnable, Closeable { ShardHandler shardHandler = shardHandlerFactory.getShardHandler(); if (node == null) { - node = getNodesForNewShard(clusterState, collection, coll.getSlices().size(), coll.getInt(MAX_SHARDS_PER_NODE, 1), coll.getInt(REPLICATION_FACTOR, 1), null).get(0).nodeName; + + node = getNodesForNewShard(clusterState, collection, shard, 1, + null, overseer.getZkController().getCoreContainer()).get(0).nodeName; log.info("Node not provided, Identified {} for creating new replica", node); } diff --git a/solr/core/src/java/org/apache/solr/cloud/rule/ReplicaAssigner.java b/solr/core/src/java/org/apache/solr/cloud/rule/ReplicaAssigner.java index b05b9d460b4..c6781274d6c 100644 --- a/solr/core/src/java/org/apache/solr/cloud/rule/ReplicaAssigner.java +++ b/solr/core/src/java/org/apache/solr/cloud/rule/ReplicaAssigner.java @@ -36,6 +36,7 @@ import org.apache.solr.common.cloud.ClusterState; import org.apache.solr.common.cloud.DocCollection; import org.apache.solr.common.cloud.Replica; import org.apache.solr.common.cloud.Slice; +import org.apache.solr.common.cloud.ZkStateReader; import org.apache.solr.common.util.StrUtils; import org.apache.solr.core.CoreContainer; import org.slf4j.Logger; @@ -55,7 +56,7 @@ public class ReplicaAssigner { List rules; Map shardVsReplicaCount; Map> nodeVsTags; - Map> shardVsNodes; + Map> shardVsNodes; List liveNodes; Set tagNames = new HashSet<>(); private Map nodeVsCores = new HashMap<>(); @@ -93,7 +94,7 @@ public class ReplicaAssigner { public ReplicaAssigner(List rules, Map shardVsReplicaCount, List snitches, - Map> shardVsNodes, + Map> shardVsNodes, List liveNodes, CoreContainer cc, ClusterState clusterState) { this.rules = rules; @@ -129,6 +130,27 @@ public class ReplicaAssigner { * the specified rule */ public Map getNodeMappings() { + Map result = getNodeMappings0(); + if (result == null) { + String msg = "Could not identify nodes matching the rules " + rules; + if (!failedNodes.isEmpty()) { + Map failedNodes = new HashMap<>(); + for (Map.Entry e : this.failedNodes.entrySet()) { + failedNodes.put(e.getKey(), e.getValue().getErrMsg()); + } + msg += " Some nodes where excluded from assigning replicas because tags could not be obtained from them " + failedNodes; + } + msg += "\n tag values" + ZkStateReader.toJSONString(getNodeVsTags()); + if (!shardVsNodes.isEmpty()) { + msg += "\nInitial state for the coll : " + ZkStateReader.toJSONString(shardVsNodes); + } + throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, msg); + } + return result; + + } + + Map getNodeMappings0() { List shardNames = new ArrayList<>(shardVsReplicaCount.keySet()); int[] shardOrder = new int[shardNames.size()]; for (int i = 0; i < shardNames.size(); i++) shardOrder[i] = i; @@ -152,7 +174,6 @@ public class ReplicaAssigner { result = tryAllPermutations(shardNames, shardOrder, nonWildCardShardRules, true); } return result; - } private Map tryAllPermutations(List shardNames, @@ -191,7 +212,7 @@ public class ReplicaAssigner { Map> nodeVsTagsCopy = getDeepCopy(nodeVsTags, 2); Map result = new LinkedHashMap<>(); int startPosition = 0; - Map> copyOfCurrentState = getDeepCopy(shardVsNodes, 2); + Map> copyOfCurrentState = getDeepCopy(shardVsNodes, 2); List sortedLiveNodes = new ArrayList<>(this.liveNodes); Collections.sort(sortedLiveNodes, new Comparator() { @Override @@ -235,9 +256,11 @@ public class ReplicaAssigner { //We have reached this far means this node can be applied to this position //and all rules are fine. So let us change the currentState result.put(position, liveNode); - Set nodeNames = copyOfCurrentState.get(position.shard); - if (nodeNames == null) copyOfCurrentState.put(position.shard, nodeNames = new HashSet<>()); - nodeNames.add(liveNode); + Map nodeNames = copyOfCurrentState.get(position.shard); + if (nodeNames == null) copyOfCurrentState.put(position.shard, nodeNames = new HashMap<>()); + Integer n = nodeNames.get(liveNode); + n = n == null ? 1 : n + 1; + nodeNames.put(liveNode, n); Number coreCount = (Number) nodeVsTagsCopy.get(liveNode).get(ImplicitSnitch.CORES); if (coreCount != null) { nodeVsTagsCopy.get(liveNode).put(ImplicitSnitch.CORES, coreCount.intValue() + 1); @@ -431,8 +454,8 @@ public class ReplicaAssigner { static Map getSnitchInfos(CoreContainer cc, List snitchConf) { + if (snitchConf == null) snitchConf = Collections.emptyList(); Map snitches = new LinkedHashMap<>(); - if (snitchConf == null) return snitches; for (Object o : snitchConf) { //instantiating explicitly specified snitches String klas = null; diff --git a/solr/core/src/java/org/apache/solr/cloud/rule/Rule.java b/solr/core/src/java/org/apache/solr/cloud/rule/Rule.java index b5a34c102b6..f6e7ecb01a1 100644 --- a/solr/core/src/java/org/apache/solr/cloud/rule/Rule.java +++ b/solr/core/src/java/org/apache/solr/cloud/rule/Rule.java @@ -119,7 +119,7 @@ public class Rule { * @return MatchStatus */ MatchStatus tryAssignNodeToShard(String testNode, - Map> shardVsNodeSet, + Map> shardVsNodeSet, Map> nodeVsTags, String shardName, Phase phase) { @@ -153,21 +153,21 @@ public class Rule { private int getNumberOfNodesWithSameTagVal(Condition shardCondition, Map> nodeVsTags, - Map> shardVsNodeSet, + Map> shardVsNodeSet, String shardName, Condition tagCondition, Phase phase) { int countMatchingThisTagValue = 0; - for (Map.Entry> entry : shardVsNodeSet.entrySet()) { + for (Map.Entry> entry : shardVsNodeSet.entrySet()) { //check if this shard is relevant. either it is a ANY Wild card (**) // or this shard is same as the shard in question if (shardCondition.val.equals(WILD_WILD_CARD) || entry.getKey().equals(shardName)) { - Set nodesInThisShard = shardVsNodeSet.get(shardCondition.val.equals(WILD_WILD_CARD) ? entry.getKey() : shardName); + Map nodesInThisShard = shardVsNodeSet.get(shardCondition.val.equals(WILD_WILD_CARD) ? entry.getKey() : shardName); if (nodesInThisShard != null) { - for (String aNode : nodesInThisShard) { - Object obj = nodeVsTags.get(aNode).get(tag.name); - if (tagCondition.canMatch(obj, phase)) countMatchingThisTagValue++; + for (Map.Entry aNode : nodesInThisShard.entrySet()) { + Object obj = nodeVsTags.get(aNode.getKey()).get(tag.name); + if (tagCondition.canMatch(obj, phase)) countMatchingThisTagValue += aNode.getValue(); } } } @@ -177,7 +177,7 @@ public class Rule { public int compare(String n1, String n2, Map> nodeVsTags, - Map> currentState) { + Map> currentState) { return tag.compare(n1, n2, nodeVsTags); } diff --git a/solr/core/src/test/org/apache/solr/cloud/rule/RuleEngineTest.java b/solr/core/src/test/org/apache/solr/cloud/rule/RuleEngineTest.java index c469ee4f607..f147a6019cf 100644 --- a/solr/core/src/test/org/apache/solr/cloud/rule/RuleEngineTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/rule/RuleEngineTest.java @@ -29,6 +29,7 @@ import java.util.Set; import org.apache.solr.SolrTestCaseJ4; import org.apache.solr.cloud.rule.ReplicaAssigner.Position; +import org.apache.solr.common.SolrException; import org.apache.solr.common.cloud.ZkStateReader; import org.junit.Test; @@ -129,7 +130,7 @@ public class RuleEngineTest extends SolrTestCaseJ4{ mapping = new ReplicaAssigner( rules, shardVsReplicaCount, Collections.singletonList(MockSnitch.class.getName()), - new HashMap(), new ArrayList<>(MockSnitch.nodeVsTags.keySet()), null, null).getNodeMappings(); + new HashMap(), new ArrayList<>(MockSnitch.nodeVsTags.keySet()), null, null).getNodeMappings0(); assertNull(mapping); @@ -141,7 +142,7 @@ public class RuleEngineTest extends SolrTestCaseJ4{ mapping = new ReplicaAssigner( rules, shardVsReplicaCount, Collections.singletonList(MockSnitch.class.getName()), - new HashMap(), new ArrayList<>(MockSnitch.nodeVsTags.keySet()), null, null).getNodeMappings(); + new HashMap(), new ArrayList<>(MockSnitch.nodeVsTags.keySet()), null, null).getNodeMappings0(); assertNotNull(mapping); assertFalse(mapping.containsValue("127.0.0.2:49958_")); @@ -164,7 +165,7 @@ public class RuleEngineTest extends SolrTestCaseJ4{ mapping = new ReplicaAssigner( rules, shardVsReplicaCount, Collections.singletonList(MockSnitch.class.getName()), - new HashMap(), new ArrayList<>(MockSnitch.nodeVsTags.keySet()), null, null).getNodeMappings(); + new HashMap(), new ArrayList<>(MockSnitch.nodeVsTags.keySet()), null, null).getNodeMappings0(); assertNull(mapping); rules = parseRules( @@ -206,7 +207,7 @@ public class RuleEngineTest extends SolrTestCaseJ4{ Map mapping = new ReplicaAssigner( rules, shardVsReplicaCount, Collections.singletonList(MockSnitch.class.getName()), - new HashMap(), new ArrayList<>(MockSnitch.nodeVsTags.keySet()), null ,null).getNodeMappings(); + new HashMap(), new ArrayList<>(MockSnitch.nodeVsTags.keySet()), null, null).getNodeMappings0(); assertNull(mapping); rulesStr = "rack:*,replica:<2~"; rules = parse(Arrays.asList(rulesStr)); diff --git a/solr/core/src/test/org/apache/solr/cloud/rule/RulesTest.java b/solr/core/src/test/org/apache/solr/cloud/rule/RulesTest.java index e32baa3acff..405e6625267 100644 --- a/solr/core/src/test/org/apache/solr/cloud/rule/RulesTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/rule/RulesTest.java @@ -29,6 +29,7 @@ import org.apache.solr.client.solrj.response.CollectionAdminResponse; import org.apache.solr.cloud.AbstractFullDistribZkTestBase; import org.apache.solr.cloud.OverseerCollectionProcessor; import org.apache.solr.common.cloud.DocCollection; +import org.apache.solr.common.cloud.ImplicitDocRouter; import org.apache.solr.common.cloud.ZkStateReader; import org.apache.solr.common.params.ModifiableSolrParams; import org.apache.solr.core.CoreContainer; @@ -43,15 +44,17 @@ public class RulesTest extends AbstractFullDistribZkTestBase { static final Logger log = LoggerFactory.getLogger(RulesTest.class); @Test + @ShardsFixed(num = 5) public void doIntegrationTest() throws Exception { String rulesColl = "rulesColl"; try (SolrClient client = createNewSolrClient("", getBaseUrl((HttpSolrClient) clients.get(0)))) { CollectionAdminResponse rsp; CollectionAdminRequest.Create create = new CollectionAdminRequest.Create(); create.setCollectionName(rulesColl); - create.setNumShards(1); + create.setShards("shard1"); + create.setRouterName(ImplicitDocRouter.NAME); create.setReplicationFactor(2); - create.setRule("cores:<4", "node:*,replica:1", "freedisk:>1"); + create.setRule("cores:<4", "node:*,replica:<2", "freedisk:>1"); create.setSnitch("class:ImplicitSnitch"); rsp = create.process(client); assertEquals(0, rsp.getStatus()); @@ -63,12 +66,30 @@ public class RulesTest extends AbstractFullDistribZkTestBase { List list = (List) rulesCollection.get("rule"); assertEquals(3, list.size()); assertEquals ( "<4", ((Map)list.get(0)).get("cores")); - assertEquals("1", ((Map) list.get(1)).get("replica")); + assertEquals("<2", ((Map) list.get(1)).get("replica")); assertEquals(">1", ((Map) list.get(2)).get("freedisk")); list = (List) rulesCollection.get("snitch"); assertEquals(1, list.size()); assertEquals ( "ImplicitSnitch", ((Map)list.get(0)).get("class")); + try (SolrClient client = createNewSolrClient("", getBaseUrl((HttpSolrClient) clients.get(0)))) { + CollectionAdminResponse rsp; + CollectionAdminRequest.CreateShard createShard = new CollectionAdminRequest.CreateShard(); + createShard.setCollectionName(rulesColl); + createShard.setShardName("shard2"); + rsp = createShard.process(client); + assertEquals(0, rsp.getStatus()); + assertTrue(rsp.isSuccess()); + + CollectionAdminRequest.AddReplica addReplica = new CollectionAdminRequest.AddReplica(); + addReplica.setCollectionName(rulesColl); + addReplica.setShardName("shard2"); + rsp = createShard.process(client); + assertEquals(0, rsp.getStatus()); + assertTrue(rsp.isSuccess()); + } + + } @Test