From d87ea6b1ccd28e0dd8e30565fe95b2e0a31f82e8 Mon Sep 17 00:00:00 2001 From: Noble Paul Date: Thu, 26 Jul 2018 21:32:45 +1000 Subject: [PATCH] SOLR-12536: autoscaling policy support to equally distribute replicas on the basis of arbitrary properties --- solr/CHANGES.txt | 2 + .../solrj/cloud/autoscaling/Clause.java | 115 +++++++-- .../solrj/cloud/autoscaling/Operand.java | 8 + .../solrj/cloud/autoscaling/Policy.java | 2 +- .../client/solrj/cloud/autoscaling/Row.java | 2 +- .../solrj/cloud/autoscaling/Suggestion.java | 93 +++++--- .../impl/SolrClientNodeStateProvider.java | 28 +-- .../solrj/cloud/autoscaling/TestPolicy.java | 99 ++++++-- .../solrj/cloud/autoscaling/TestPolicy2.java | 223 ++++++++++++++++++ 9 files changed, 478 insertions(+), 94 deletions(-) create mode 100644 solr/solrj/src/test/org/apache/solr/client/solrj/cloud/autoscaling/TestPolicy2.java diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt index b35ac224140..6c43f14aa75 100644 --- a/solr/CHANGES.txt +++ b/solr/CHANGES.txt @@ -121,6 +121,8 @@ New Features * SOLR-12581: the JSON Facet 'relatedness()' aggregate function now supports a 'min_popularity' option using the extended type:func syntax (hossman) +* SOLR-12536: autoscaling policy support to equally distribute replicas on the basis of arbitrary properties (noble) + Bug Fixes ---------------------- diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Clause.java b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Clause.java index 7bce283c3dc..87fecda1ea5 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Clause.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Clause.java @@ -29,6 +29,7 @@ import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.function.Function; +import java.util.stream.Collectors; import org.apache.solr.client.solrj.cloud.autoscaling.Suggestion.ConditionType; import org.apache.solr.common.MapWriter; @@ -38,6 +39,7 @@ import org.apache.solr.common.util.Utils; import static java.util.Collections.singletonMap; import static org.apache.solr.client.solrj.cloud.autoscaling.Clause.TestStatus.PASS; +import static org.apache.solr.client.solrj.cloud.autoscaling.Operand.EQUAL; import static org.apache.solr.client.solrj.cloud.autoscaling.Operand.GREATER_THAN; import static org.apache.solr.client.solrj.cloud.autoscaling.Operand.LESS_THAN; import static org.apache.solr.client.solrj.cloud.autoscaling.Operand.NOT_EQUAL; @@ -46,6 +48,8 @@ import static org.apache.solr.client.solrj.cloud.autoscaling.Policy.ANY; import static org.apache.solr.common.params.CoreAdminParams.COLLECTION; import static org.apache.solr.common.params.CoreAdminParams.REPLICA; import static org.apache.solr.common.params.CoreAdminParams.SHARD; +import static org.apache.solr.common.util.StrUtils.formatString; +import static org.apache.solr.common.util.Utils.toJSONString; /** * Represents a set of conditions in the policy @@ -80,7 +84,7 @@ public class Clause implements MapWriter, Comparable { if (globalTagName.isPresent()) { globalTag = parse(globalTagName.get(), m); if (m.size() > 2) { - throw new RuntimeException("Only one extra tag supported for the tag " + globalTagName.get() + " in " + Utils.toJSONString(m)); + throw new RuntimeException("Only one extra tag supported for the tag " + globalTagName.get() + " in " + toJSONString(m)); } tag = parse(m.keySet().stream() .filter(s -> (!globalTagName.get().equals(s) && !IGNORE_TAGS.contains(s))) @@ -89,18 +93,18 @@ public class Clause implements MapWriter, Comparable { collection = parse(COLLECTION, m); shard = parse(SHARD, m); if (m.get(REPLICA) == null) { - throw new RuntimeException(StrUtils.formatString("'replica' is required in {0}", Utils.toJSONString(m))); + throw new IllegalArgumentException(formatString("'replica' is required in {0}", toJSONString(m))); } this.replica = parse(REPLICA, m); - if (replica.op == WILDCARD) throw new RuntimeException("replica val cannot be null" + Utils.toJSONString(m)); + if (replica.op == WILDCARD) throw new IllegalArgumentException("replica val cannot be null" + toJSONString(m)); m.forEach(this::parseCondition); } if (tag == null) - throw new RuntimeException("Invalid op, must have one and only one tag other than collection, shard,replica " + Utils.toJSONString(m)); + throw new RuntimeException("Invalid op, must have one and only one tag other than collection, shard,replica " + toJSONString(m)); if (tag.name.startsWith(Clause.METRICS_PREFIX)) { List ss = StrUtils.splitSmart(tag.name, ':'); if (ss.size() < 3 || ss.size() > 4) { - throw new RuntimeException("Invalid metrics: param in " + Utils.toJSONString(m) + " must have at 2 or 3 segments after 'metrics:' separated by ':'"); + throw new RuntimeException("Invalid metrics: param in " + toJSONString(m) + " must have at 2 or 3 segments after 'metrics:' separated by ':'"); } } doPostValidate(collection, shard, replica, tag, globalTag); @@ -112,7 +116,7 @@ public class Clause implements MapWriter, Comparable { if (condition == null) continue; String err = condition.varType.postValidate(condition); if (err != null) { - throw new IllegalArgumentException(StrUtils.formatString("Error in clause : {0}, caused by : {1}", Utils.toJSONString(original), err)); + throw new IllegalArgumentException(formatString("Error in clause : {0}, caused by : {1}", toJSONString(original), err)); } } } @@ -241,8 +245,8 @@ public class Clause implements MapWriter, Comparable { ComputedType computedType = null; Object val = m.get(s); ConditionType varType = Suggestion.getTagType(s); - if (varType.isHidden) { - throw new IllegalArgumentException(StrUtils.formatString("''{0}'' is not allowed in a policy rule : ''{1}'' ", varType.tagName, Utils.toJSONString(m))); + if (varType.meta.isHidden()) { + throw new IllegalArgumentException(formatString("''{0}'' is not allowed in a policy rule : ''{1}'' ", varType.tagName, toJSONString(m))); } try { String conditionName = s.trim(); @@ -250,14 +254,17 @@ public class Clause implements MapWriter, Comparable { if (val == null) { operand = WILDCARD; expectedVal = Policy.ANY; + } else if (val instanceof List) { + if (!varType.meta.supportArrayVals()) { + throw new IllegalArgumentException(formatString("array values are not supported for {0} in clause {1}", + conditionName, toJSONString(m))); + } + expectedVal = readListVal(m, (List) val, varType, conditionName); + operand = Operand.IN; } else if (val instanceof String) { String strVal = ((String) val).trim(); val = strVal; - if (Policy.ANY.equals(strVal) || Policy.EACH.equals(strVal)) operand = WILDCARD; - else if (strVal.startsWith(NOT_EQUAL.operand)) operand = NOT_EQUAL; - else if (strVal.startsWith(GREATER_THAN.operand)) operand = GREATER_THAN; - else if (strVal.startsWith(LESS_THAN.operand)) operand = LESS_THAN; - else operand = Operand.EQUAL; + operand = getOperand(strVal); strVal = strVal.substring(Operand.EQUAL == operand || WILDCARD == operand ? 0 : 1); for (ComputedType t : ComputedType.values()) { String changedVal = t.match(strVal); @@ -265,15 +272,14 @@ public class Clause implements MapWriter, Comparable { computedType = t; strVal = changedVal; if (varType == null || !varType.supportedComputedTypes.contains(computedType)) { - throw new IllegalArgumentException(StrUtils.formatString("''{0}'' is not allowed for variable : ''{1}'' , in clause : ''{2}'' ", - t, conditionName, Utils.toJSONString(m))); + throw new IllegalArgumentException(formatString("''{0}'' is not allowed for variable : ''{1}'' , in clause : ''{2}'' ", + t, conditionName, toJSONString(m))); } } } if (computedType == null && ((String) val).charAt(0) == '#' && !varType.wildCards.contains(val)) { - throw new IllegalArgumentException(StrUtils.formatString("''{0}'' is not an allowed value for ''{1}'' , in clause : ''{2}'' . Supported value is : {3}", - val, conditionName, Utils.toJSONString(m), varType.wildCards)); - + throw new IllegalArgumentException(formatString("''{0}'' is not an allowed value for ''{1}'' , in clause : ''{2}'' . Supported value is : {3}", + val, conditionName, toJSONString(m), varType.wildCards)); } operand = varType == null ? operand : varType.getOperand(operand, strVal, computedType); @@ -293,6 +299,49 @@ public class Clause implements MapWriter, Comparable { } } + private List readListVal(Map m, List val, ConditionType varType, String conditionName) { + List list = val; + list = (List) list.stream() + .map(it -> varType.validate(conditionName, it, true)) + .map(it -> { + if (it instanceof String) { + String trim = ((String) it).trim(); + if (trim.isEmpty()) + throw new IllegalArgumentException(formatString("{0} cannot have an empty string value in clause : {1}", + conditionName, toJSONString(m))); + return trim; + } else return it; + }).filter(it -> it == null ? false : true) + .collect(Collectors.toList()); + if (list.isEmpty()) + throw new IllegalArgumentException(formatString("{0} cannot have an empty list value in clause : {1}", + conditionName, toJSONString(m))); + for (Object o : list) { + if (o instanceof String) { + if (getOperand((String) o) != EQUAL) { + throw new IllegalArgumentException(formatString("No operators are supported in collection values in condition : {0} in clause : {1}", + conditionName, toJSONString(m))); + } + } + } + if (list.size() < 2) { + throw new IllegalArgumentException(formatString("Array should have more than one value in condition : {0} in clause : {1}", + conditionName, toJSONString(m))); + + } + return list; + } + + private Operand getOperand(String strVal) { + Operand operand; + if (Policy.ANY.equals(strVal) || Policy.EACH.equals(strVal)) operand = WILDCARD; + else if (strVal.startsWith(NOT_EQUAL.operand)) operand = NOT_EQUAL; + else if (strVal.startsWith(GREATER_THAN.operand)) operand = GREATER_THAN; + else if (strVal.startsWith(LESS_THAN.operand)) operand = LESS_THAN; + else operand = Operand.EQUAL; + return operand; + } + public List test(Policy.Session session) { ComputedValueEvaluator computedValueEvaluator = new ComputedValueEvaluator(session); Suggestion.ViolationCtx ctx = new Suggestion.ViolationCtx(this, session.matrix, computedValueEvaluator); @@ -305,17 +354,17 @@ public class Clause implements MapWriter, Comparable { computedValueEvaluator.shardName = shardVsCount.getKey(); if (!shard.isPass(computedValueEvaluator.shardName)) continue; for (Map.Entry counts : shardVsCount.getValue().entrySet()) { - if(tag.varType.isPerNodeValue) computedValueEvaluator.node = counts.getKey(); + if (tag.varType.meta.isNodeSpecificVal()) computedValueEvaluator.node = counts.getKey(); SealedClause sealedClause = getSealedClause(computedValueEvaluator); ReplicaCount replicas = counts.getValue(); if (!sealedClause.replica.isPass(replicas)) { Violation violation = new Violation(sealedClause, computedValueEvaluator.collName, computedValueEvaluator.shardName, - tag.varType.isPerNodeValue ? computedValueEvaluator.node : null, + tag.varType.meta.isNodeSpecificVal() ? computedValueEvaluator.node : null, counts.getValue(), sealedClause.getReplica().delta(replicas), - tag.varType.isPerNodeValue? null: counts.getKey()); + tag.varType.meta.isNodeSpecificVal() ? null : counts.getKey()); tag.varType.addViolatingReplicas(ctx.reset(counts.getKey(), replicas, violation)); } } @@ -353,7 +402,7 @@ public class Clause implements MapWriter, Comparable { computedValueEvaluator.shardName = shardName; SealedClause sealedClause = getSealedClause(computedValueEvaluator); Condition t = sealedClause.getTag(); - if(t.varType.isPerNodeValue){ + if (t.varType.meta.isNodeSpecificVal()) { boolean pass = t.getOperand().match(t.val, tagVal) == TestStatus.PASS; tagVsCount.computeIfAbsent(row.node, s -> new ReplicaCount()); if(pass) { @@ -372,6 +421,23 @@ public class Clause implements MapWriter, Comparable { return collVsShardVsTagVsCount; } + public boolean isMatch(ReplicaInfo r, String collection, String shard) { + if (type != null && r.getType() != type) return false; + if (r.getCollection().equals(collection)) { + if (this.shard == null || this.shard.val.equals(Policy.ANY)) return true; + else if (this.shard.val.equals(Policy.EACH) && r.getShard().equals(shard)) return true; + else return this.shard.val.equals(r.getShard()) && r.getShard().equals(shard); + } + return false; + } + + boolean matchShard(String replicaShard, String shardInContext) { + if (shard == null || shard.val.equals(ANY)) return true; + if (shard.val.equals(Policy.EACH) && replicaShard.equals(shardInContext)) return true; + if (shard.val.equals(replicaShard)) return true; + return false; + } + enum ComputedType { NULL(), EQUAL() { @@ -555,7 +621,7 @@ public class Clause implements MapWriter, Comparable { @Override public String toString() { - return Utils.toJSONString(original); + return toJSONString(original); } @Override @@ -625,6 +691,7 @@ public class Clause implements MapWriter, Comparable { public static Double parseDouble(String name, Object val) { if (val == null) return null; + if (val instanceof RangeVal) val = ((RangeVal) val).actual; if (val instanceof Double) return (Double) val; Number num = null; if (val instanceof String) { @@ -661,7 +728,7 @@ public class Clause implements MapWriter, Comparable { } public Double delta(double v) { - if (actual != null) return v - actual.doubleValue(); +// if (actual != null) return v - actual.doubleValue(); if (v >= max.doubleValue()) return v - max.doubleValue(); if (v <= min.doubleValue()) return v - min.doubleValue(); return 0d; diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Operand.java b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Operand.java index abd65314948..b45864605f8 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Operand.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Operand.java @@ -17,6 +17,7 @@ package org.apache.solr.client.solrj.cloud.autoscaling; +import java.util.List; import java.util.Objects; import org.apache.solr.client.solrj.cloud.autoscaling.Clause.TestStatus; @@ -76,6 +77,13 @@ public enum Operand { return actual - expected; } }, + IN("", 0) { + @Override + public TestStatus match(Object ruleVal, Object testVal) { + List l = (List) ruleVal; + return (l.contains(testVal)) ? PASS: FAIL; + } + }, RANGE_NOT_EQUAL("", 2) { @Override public TestStatus match(Object ruleVal, Object testVal) { diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Policy.java b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Policy.java index 9addc67ef2b..a5f57bb9483 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Policy.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Policy.java @@ -539,7 +539,7 @@ public class Policy implements MapWriter { .collect(Collectors.toList()); } - Policy getPolicy() { + public Policy getPolicy() { return Policy.this; } diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Row.java b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Row.java index 1adfdbf90b2..c00a90ebd32 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Row.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Row.java @@ -106,7 +106,7 @@ public class Row implements MapWriter { @Override public String toString() { - return node; + return jsonStr(); } /** diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Suggestion.java b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Suggestion.java index c61f371c7d7..a481a4070c8 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Suggestion.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Suggestion.java @@ -23,6 +23,7 @@ import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; @@ -77,6 +78,8 @@ public class Suggestion { double max() default -1d; + boolean supportArrayVals() default false; + String metricsKey() default NULL; ComputedType[] computedValues() default ComputedType.NULL; @@ -207,13 +210,20 @@ public class Suggestion { public String postValidate(Clause.Condition condition) { if (condition.computedType == ComputedType.EQUAL) { if (condition.getClause().tag != null && - condition.getClause().tag.varType == NODE && - condition.getClause().tag.op == Operand.WILDCARD) { +// condition.getClause().tag.varType == NODE && + (condition.getClause().tag.op == Operand.WILDCARD || condition.getClause().tag.op == Operand.IN)) { return null; } else { return "'replica': '#EQUAL` must be used with 'node':'#ANY'"; } } + if (condition.computedType == ComputedType.ALL) { + if (condition.getClause().tag != null && (condition.getClause().getTag().op == Operand.IN || + condition.getClause().getTag().op == Operand.WILDCARD)) { + return StrUtils.formatString("array value or wild card cannot be used for tag {0} with replica : '#ALL'", + condition.getClause().tag.getName()); + } + } return null; } @@ -223,8 +233,9 @@ public class Suggestion { return Double.valueOf(getRelevantReplicasCount(session, cv, collection, shard)); if (cv.computedType == ComputedType.EQUAL) { int relevantReplicasCount = getRelevantReplicasCount(session, cv, collection, shard); - if (relevantReplicasCount == 0) return 0; - return (double) session.matrix.size() / (double) relevantReplicasCount; + double bucketsCount = getNumBuckets(session, cv.getClause()); + if (relevantReplicasCount == 0 || bucketsCount == 0) return 0; + return (double) relevantReplicasCount / bucketsCount; } else if (cv.computedType == ComputedType.PERCENT) { return ComputedType.PERCENT.compute(getRelevantReplicasCount(session, cv, collection, shard), cv); } else { @@ -232,33 +243,60 @@ public class Suggestion { } } + + private int getNumBuckets(Policy.Session session, Clause clause) { + if (clause.getTag().getOperand() == Operand.IN) { + return ((Collection) clause.getTag().val).size(); + } else if (clause.getTag().getOperand() == Operand.WILDCARD) { + if (clause.getTag().varType == NODE) return session.matrix.size(); + Set uniqueVals = new HashSet(); + for (Row matrix : session.matrix) { + Object val = matrix.getVal(clause.getTag().name); + if (val != null) uniqueVals.add(val); + } + return uniqueVals.size(); + } else { + throw new IllegalArgumentException("Invalid operand for the tag in " + clause); + } + + } }, @Meta(name = ImplicitSnitch.PORT, type = Long.class, min = 1, - max = 65535) + max = 65535, + supportArrayVals = true, + wildCards = Policy.EACH + ) PORT(), @Meta(name = "ip_1", type = Long.class, min = 0, - max = 255) + max = 255, + supportArrayVals = true, + wildCards = Policy.EACH) IP_1(), @Meta(name = "ip_2", type = Long.class, min = 0, - max = 255) + max = 255, + supportArrayVals = true, + wildCards = Policy.EACH) IP_2(), @Meta(name = "ip_3", type = Long.class, min = 0, - max = 255) + max = 255, + supportArrayVals = true, + wildCards = Policy.EACH) IP_3(), @Meta(name = "ip_4", type = Long.class, min = 0, - max = 255) + max = 255, + supportArrayVals = true, + wildCards = Policy.EACH) IP_4(), - @Meta(name = ImplicitSnitch.DISK, type = Double.class, min = 0, @@ -452,15 +490,18 @@ public class Suggestion { type = Long.class, min = 0) NUMBER(), + @Meta(name = "STRING", type = String.class, - wildCards = Policy.EACH) + wildCards = Policy.EACH, + supportArrayVals = true) STRING(), @Meta(name = "node", type = String.class, isNodeSpecificVal = true, - wildCards = {Policy.ANY, Policy.EACH}) + wildCards = {Policy.ANY, Policy.EACH}, + supportArrayVals = true) NODE() { @Override public void getSuggestions(SuggestionCtx ctx) { @@ -495,7 +536,8 @@ public class Suggestion { @Meta(name = ImplicitSnitch.DISKTYPE, type = String.class, - enumVals = {"ssd", "rotational"}) + enumVals = {"ssd", "rotational"}, + supportArrayVals = true) DISKTYPE() { @Override public void getSuggestions(SuggestionCtx ctx) { @@ -511,12 +553,10 @@ public class Suggestion { public final Number min; public final Number max; public final Boolean additive; - public final boolean isHidden; public final Set wildCards; public final String perReplicaValue; public final Set associatedPerNodeValues; public final String metricsAttribute; - public final boolean isPerNodeValue; public final Set supportedComputedTypes; @@ -539,14 +579,16 @@ public class Suggestion { this.associatedPerNodeValues = readSet(meta.associatedPerNodeValue()); this.additive = meta.isAdditive(); this.metricsAttribute = readStr(meta.metricsKey()); - this.isPerNodeValue = meta.isNodeSpecificVal(); this.supportedComputedTypes = meta.computedValues()[0] == ComputedType.NULL ? emptySet() : unmodifiableSet(new HashSet(Arrays.asList(meta.computedValues()))); - this.isHidden = meta.isHidden(); this.wildCards = readSet(meta.wildCards()); } + public String getTagName() { + return meta.name(); + } + private String readStr(String s) { return NULL.equals(s) ? null : s; } @@ -567,7 +609,7 @@ public class Suggestion { public void addViolatingReplicas(ViolationCtx ctx) { for (Row row : ctx.allRows) { - if (ctx.clause.tag.varType.isPerNodeValue && !row.node.equals(ctx.tagKey)) continue; + if (ctx.clause.tag.varType.meta.isNodeSpecificVal() && !row.node.equals(ctx.tagKey)) continue; collectViolatingReplicas(ctx, row); } } @@ -645,7 +687,7 @@ public class Suggestion { } private static void collectViolatingReplicas(ViolationCtx ctx, Row row) { - if (ctx.clause.tag.varType.isPerNodeValue) { + if (ctx.clause.tag.varType.meta.isNodeSpecificVal()) { row.forEachReplica(replica -> { if (ctx.clause.collection.isPass(replica.getCollection()) && ctx.clause.getShard().isPass(replica.getShard())) { ctx.currentViolation.addReplica(new ReplicaInfoAndErr(replica) @@ -656,7 +698,7 @@ public class Suggestion { row.forEachReplica(replica -> { if (ctx.clause.replica.isPass(0) && !ctx.clause.tag.isPass(row)) return; if (!ctx.clause.replica.isPass(0) && ctx.clause.tag.isPass(row)) return; - if (!ctx.currentViolation.matchShard(replica.getShard())) return; + if(!ctx.currentViolation.getClause().matchShard(replica.getShard(), ctx.currentViolation.shard)) return; if (!ctx.clause.collection.isPass(ctx.currentViolation.coll) || !ctx.clause.shard.isPass(ctx.currentViolation.shard)) return; ctx.currentViolation.addReplica(new ReplicaInfoAndErr(replica).withDelta(ctx.clause.tag.delta(row.getVal(ctx.clause.tag.name)))); @@ -672,12 +714,8 @@ public class Suggestion { Clause clause = cv.getClause(); for (Row row : session.matrix) { row.forEachReplica(replicaInfo -> { - if (replicaInfo.getCollection().equals(collection)) { - if (clause.getShard() ==null || clause.getShard().op == Operand.WILDCARD || replicaInfo.getShard().equals(shard)) { - if (cv.getClause().type == null || replicaInfo.getType() == cv.getClause().type) - totalReplicasOfInterest.incrementAndGet(); - } - } + if (clause.isMatch(replicaInfo, collection, shard)) + totalReplicasOfInterest.incrementAndGet(); }); } return totalReplicasOfInterest.get(); @@ -719,9 +757,6 @@ public class Suggestion { } } - /*public static final Map tagVsPerReplicaVal = Stream.of(ConditionType.values()) - .filter(tag -> tag.perReplicaValue != null) - .collect(Collectors.toMap(tag -> tag.tagName, tag -> tag.perReplicaValue));*/ static { for (Suggestion.ConditionType t : Suggestion.ConditionType.values()) Suggestion.validatetypes.put(t.tagName, t); } diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/SolrClientNodeStateProvider.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/SolrClientNodeStateProvider.java index 92183b86926..a7701029572 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/SolrClientNodeStateProvider.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/SolrClientNodeStateProvider.java @@ -72,7 +72,7 @@ public class SolrClientNodeStateProvider implements NodeStateProvider, MapWriter private final CloudSolrClient solrClient; - private final Map>>> nodeVsCollectionVsShardVsReplicaInfo = new HashMap<>(); + protected final Map>>> nodeVsCollectionVsShardVsReplicaInfo = new HashMap<>(); private Map snitchSession = new HashMap<>(); private Map nodeVsTags = new HashMap<>(); @@ -140,28 +140,26 @@ public class SolrClientNodeStateProvider implements NodeStateProvider, MapWriter public Map>> getReplicaInfo(String node, Collection keys) { Map>> result = nodeVsCollectionVsShardVsReplicaInfo.computeIfAbsent(node, s -> emptyMap()); if (!keys.isEmpty()) { - Map> keyVsReplica = new HashMap<>(); + Map> metricsKeyVsTagReplica = new HashMap<>(); Row.forEachReplica(result, r -> { for (String key : keys) { - if (r.getVariables().containsKey(key)) continue; - String perReplicaAttrKeyPrefix = "solr.core." + r.getCollection() + "." + r.getShard() + "." + Utils.parseMetricsReplicaName(r.getCollection(), r.getCore()) + ":"; + if (r.getVariables().containsKey(key)) continue;// it's already collected + String perReplicaMetricsKey = "solr.core." + r.getCollection() + "." + r.getShard() + "." + Utils.parseMetricsReplicaName(r.getCollection(), r.getCore()) + ":"; Suggestion.ConditionType tagType = Suggestion.getTagType(key); String perReplicaValue = key; if (tagType != null) { perReplicaValue = tagType.metricsAttribute; perReplicaValue = perReplicaValue == null ? key : perReplicaValue; } - perReplicaAttrKeyPrefix += perReplicaValue; - keyVsReplica.put(perReplicaAttrKeyPrefix, new Pair<>(key, r)); + perReplicaMetricsKey += perReplicaValue; + metricsKeyVsTagReplica.put(perReplicaMetricsKey, new Pair<>(key, r)); } }); - if (!keyVsReplica.isEmpty()) { - Map tags = fetchReplicaMetrics(node, - keyVsReplica.entrySet().stream() - .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getKey))); - tags.forEach((k, o) -> { - Pair p = keyVsReplica.get(k); + if (!metricsKeyVsTagReplica.isEmpty()) { + Map tagValues = fetchReplicaMetrics(node, metricsKeyVsTagReplica); + tagValues.forEach((k, o) -> { + Pair p = metricsKeyVsTagReplica.get(k); Suggestion.ConditionType validator = Suggestion.getTagType(p.first()); if (validator != null) o = validator.convertVal(o); if (p.second() != null) p.second().getVariables().put(p.first(), o); @@ -172,9 +170,11 @@ public class SolrClientNodeStateProvider implements NodeStateProvider, MapWriter return result; } - protected Map fetchReplicaMetrics(String solrNode, Map metricsKeyVsTag) { + protected Map fetchReplicaMetrics(String node, Map> metricsKeyVsTagReplica) { + Map collect = metricsKeyVsTagReplica.entrySet().stream() + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getKey)); ClientSnitchCtx ctx = new ClientSnitchCtx(null, null, emptyMap(), solrClient); - fetchReplicaMetrics(solrNode, ctx,metricsKeyVsTag); + fetchReplicaMetrics(node, ctx, collect); return ctx.getTags(); } diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/cloud/autoscaling/TestPolicy.java b/solr/solrj/src/test/org/apache/solr/client/solrj/cloud/autoscaling/TestPolicy.java index 0d250fdfc81..feae38b8403 100644 --- a/solr/solrj/src/test/org/apache/solr/client/solrj/cloud/autoscaling/TestPolicy.java +++ b/solr/solrj/src/test/org/apache/solr/client/solrj/cloud/autoscaling/TestPolicy.java @@ -72,6 +72,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import static java.nio.charset.StandardCharsets.UTF_8; +import static org.apache.solr.client.solrj.cloud.autoscaling.Suggestion.ConditionType.CORES; import static org.apache.solr.client.solrj.cloud.autoscaling.Suggestion.ConditionType.FREEDISK; import static org.apache.solr.client.solrj.cloud.autoscaling.Suggestion.ConditionType.REPLICA; import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDREPLICA; @@ -79,7 +80,7 @@ import static org.apache.solr.common.params.CollectionParams.CollectionAction.MO public class TestPolicy extends SolrTestCaseJ4 { private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); - private Suggester createSuggester(SolrCloudManager cloudManager, Map jsonObj, Suggester seed) throws IOException, InterruptedException { + static Suggester createSuggester(SolrCloudManager cloudManager, Map jsonObj, Suggester seed) throws IOException, InterruptedException { Policy.Session session = null; if (seed != null) session = seed.session; else { @@ -96,7 +97,7 @@ public class TestPolicy extends SolrTestCaseJ4 { return result; } - private SolrCloudManager createCloudManager(Map jsonObj) { + static SolrCloudManager createCloudManager(Map jsonObj) { return cloudManagerWithData(jsonObj); } @@ -282,9 +283,35 @@ public class TestPolicy extends SolrTestCaseJ4 { () -> Clause.create("{replica:'<3', shard: '#ANV', node:'#ANY'}")); expectThrows(IllegalArgumentException.class, () -> Clause.create("{replica:'<3', shard: '#EACH', node:'#E4CH'}")); + try { + Clause.create("{replica:0, 'ip_1':'<30%'}"); + fail("Expected exception"); + } catch (Exception e) { + assertTrue(e.getMessage().contains("'%' is not allowed for variable : 'ip_1'")); + } clause = Clause.create("{replica: '#ALL', freedisk:'>20%'}"); clause = Clause.create("{replica: '#ALL', sysprop.zone :'west'}"); + expectThrows(IllegalArgumentException.class, + () -> Clause.create("{replica: [3,4] , freedisk:'>20'}")); + clause = Clause.create("{replica: 3 , port:[8983, 7574]}"); + assertEquals(Operand.IN, clause.tag.op); + expectThrows(IllegalArgumentException.class, + () -> Clause.create("{replica: 3 , sysprop.zone :['east', ' ', 'west']}")); + expectThrows(IllegalArgumentException.class, + () -> Clause.create("{replica: 3 , sysprop.zone :[]}")); + expectThrows(IllegalArgumentException.class, + () -> Clause.create("{replica: 3 , sysprop.zone :['!east','west']}")); + expectThrows(IllegalArgumentException.class, + () -> Clause.create("{replica: '#ALL' , shard: '#EACH' , sysprop.zone:[east, west]}")); + expectThrows(IllegalArgumentException.class, + () -> Clause.create("{replica: '#ALL' , shard: '#EACH' , sysprop.zone:'#EACH'}")); + clause = Clause.create("{replica: '#EQUAL' , shard: '#EACH' , sysprop.zone:[east, west]}"); + assertEquals(Clause.ComputedType.EQUAL, clause.replica.computedType); + assertEquals(Operand.IN, clause.tag.op); + expectThrows(IllegalArgumentException.class, + () -> Clause.create("{replica: '#EQUAL' , shard: '#EACH' , sysprop.zone:[east]}")); + } @@ -375,10 +402,10 @@ public class TestPolicy extends SolrTestCaseJ4 { } @Override - protected Map fetchReplicaMetrics(String solrNode, Map metricsKeyVsTag) { + protected Map fetchReplicaMetrics(String node, Map> metricsKeyVsTagReplica) { //e.g: solr.core.perReplicaDataColl.shard1.replica_n4:INDEX.sizeInBytes Map result = new HashMap<>(); - metricsKeyVsTag.forEach((k, v) -> { + metricsKeyVsTagReplica.forEach((k, v) -> { if (k.endsWith(":INDEX.sizeInBytes")) result.put(k, 100); }); @@ -417,9 +444,10 @@ public class TestPolicy extends SolrTestCaseJ4 { Violation violation = violations.get(0); assertEquals("node1", violation.node); RangeVal val = (RangeVal) violation.getClause().replica.val; - assertEquals(0.0, val.min); - assertEquals(1.0, val.max); - assertEquals(0, Preference.compareWithTolerance(val.actual.doubleValue(), 0.833, 1)); + assertEquals(1.0d, val.min.doubleValue(), 0.01); + assertEquals(2.0, val.max.doubleValue(), 0.01); + assertEquals(1.2d, val.actual.doubleValue(), 0.01d); + assertEquals(1, violation.replicaCountDelta.doubleValue(), 0.01); assertEquals(3, violation.getViolatingReplicas().size()); Set expected = ImmutableSet.of("r1", "r3", "r5"); for (Violation.ReplicaInfoAndErr replicaInfoAndErr : violation.getViolatingReplicas()) { @@ -476,13 +504,10 @@ public class TestPolicy extends SolrTestCaseJ4 { assertFalse(c.tag.isPass("12.6")); assertFalse(c.tag.isPass(12.6d)); - try { - c = Clause.create("{replica:0, 'ip_1':'<30%'}"); - fail("Expected exception"); - } catch (Exception e) { - assertTrue(e.getMessage().contains("'%' is not allowed for variable : 'ip_1'")); - - } + c = Clause.create("{replica: '<3', sysprop.zone : [east, west]}"); + assertTrue(c.tag.isPass("east")); + assertTrue(c.tag.isPass("west")); + assertFalse(c.tag.isPass("south")); } @@ -667,7 +692,7 @@ public class TestPolicy extends SolrTestCaseJ4 { return cloudManagerWithData((Map) Utils.fromJSONString(data)); } - private static SolrCloudManager cloudManagerWithData(Map m) { + static SolrCloudManager cloudManagerWithData(Map m) { Map replicaInfo = (Map) m.get("replicaInfo"); replicaInfo.forEach((node, val) -> { Map m1 = (Map) val; @@ -1737,6 +1762,34 @@ public class TestPolicy extends SolrTestCaseJ4 { public void testReplicaPercentage() { String dataproviderdata = "{" + + " 'liveNodes':[" + + " '10.0.0.6:7574_solr'," + + " '10.0.0.6:8983_solr']," + + " 'replicaInfo':{" + + " '10.0.0.6:7574_solr':{}," + + " '10.0.0.6:8983_solr':{'mycoll1':{" + + " 'shard1':[{'core_node1':{'type':'NRT'}},{'core_node2':{'type':'NRT'}},{'core_node3':{'type':'NRT'}}]}}}," + + " 'nodeValues':{" + + " '10.0.0.6:7574_solr':{" + + " 'node':'10.0.0.6:7574_solr'," + + " 'cores':0}," + + " '10.0.0.6:8983_solr':{" + + " 'node':'10.0.0.6:8983_solr'," + + " 'cores':3}}}"; + String autoScalingjson = " { cluster-policy:[" + + " { replica :'51%', shard:'#EACH', node:'#ANY'}]," + + " cluster-preferences :[{ minimize : cores }]}"; + + + AutoScalingConfig autoScalingConfig = new AutoScalingConfig((Map) Utils.fromJSONString(autoScalingjson)); + Policy.Session session = autoScalingConfig.getPolicy().createSession(cloudManagerWithData(dataproviderdata)); + List violations = session.getViolations(); + assertEquals(1, violations.size()); + assertEquals(1.0d, violations.get(0).replicaCountDelta, 0.01); + assertEquals(1.53d, ((RangeVal) violations.get(0).getClause().getReplica().val).actual); + + + dataproviderdata = "{" + " 'liveNodes':[" + " '10.0.0.6:7574_solr'," + " '10.0.0.6:8983_solr']," + @@ -1752,17 +1805,12 @@ public class TestPolicy extends SolrTestCaseJ4 { " '10.0.0.6:8983_solr':{" + " 'node':'10.0.0.6:8983_solr'," + " 'cores':2}}}"; - String autoScalingjson = " { cluster-policy:[" + - " { replica :'<51%', node:'#ANY'}]," + - " cluster-preferences :[{ minimize : cores }]}"; - - AutoScalingConfig autoScalingConfig = new AutoScalingConfig((Map) Utils.fromJSONString(autoScalingjson)); - Policy.Session session = autoScalingConfig.getPolicy().createSession(cloudManagerWithData(dataproviderdata)); - List violations = session.getViolations(); - assertEquals(1, violations.size()); + session = autoScalingConfig.getPolicy().createSession(cloudManagerWithData(dataproviderdata)); + violations = session.getViolations(); + assertEquals(0, violations.size()); autoScalingjson = " { cluster-policy:[" + - " { replica :'<51%', shard: '#EACH' , node:'#ANY'}]," + + " { replica :'51%', shard: '#EACH' , node:'#ANY'}]," + " cluster-preferences :[{ minimize : cores }]}"; autoScalingConfig = new AutoScalingConfig((Map) Utils.fromJSONString(autoScalingjson)); session = autoScalingConfig.getPolicy().createSession(cloudManagerWithData(dataproviderdata)); @@ -1909,6 +1957,7 @@ public class TestPolicy extends SolrTestCaseJ4 { " cluster-preferences :[{ minimize : cores, precision : 2 }]}"; cfg = new AutoScalingConfig((Map) Utils.fromJSONString(autoScalingjson)); violations = cfg.getPolicy().createSession(cloudManagerWithData(dataproviderdata)).getViolations(); + assertEquals(1, violations.size()); assertEquals(-4, violations.get(0).replicaCountDelta, 0.1); assertEquals(1, violations.size()); assertEquals(4, violations.get(0).getViolatingReplicas().size()); @@ -2807,7 +2856,7 @@ public void testUtilizeNodeFailure2() throws Exception { List l = (List) ((Map) Utils.fromJSONString(rowsData)).get("sortedNodes"); List params = new ArrayList<>(); - params.add(Suggestion.ConditionType.CORES); + params.add(CORES); params.add(Suggestion.ConditionType.FREEDISK); params.add(Suggestion.ConditionType.SYSLOADAVG); params.add(Suggestion.ConditionType.NODE); diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/cloud/autoscaling/TestPolicy2.java b/solr/solrj/src/test/org/apache/solr/client/solrj/cloud/autoscaling/TestPolicy2.java new file mode 100644 index 00000000000..9c5528adfe6 --- /dev/null +++ b/solr/solrj/src/test/org/apache/solr/client/solrj/cloud/autoscaling/TestPolicy2.java @@ -0,0 +1,223 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.solr.client.solrj.cloud.autoscaling; + +import java.io.IOException; +import java.lang.invoke.MethodHandles; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.solr.SolrTestCaseJ4; +import org.apache.solr.client.solrj.cloud.NodeStateProvider; +import org.apache.solr.client.solrj.cloud.SolrCloudManager; +import org.apache.solr.client.solrj.impl.ClusterStateProvider; +import org.apache.solr.client.solrj.impl.SolrClientNodeStateProvider; +import org.apache.solr.common.cloud.ClusterState; +import org.apache.solr.common.util.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static java.util.Collections.emptyMap; +import static org.apache.solr.client.solrj.cloud.autoscaling.Suggestion.ConditionType.CORES; + +public class TestPolicy2 extends SolrTestCaseJ4 { + private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + public void testEqualOnNonNode() { + String state = "{" + + " 'coll1': {" + + " 'router': {" + + " 'name': 'compositeId'" + + " }," + + " 'shards': {" + + " 'shard1': {" + + " 'range': '80000000-ffffffff'," + + " 'replicas': {" + + " 'r1': {" + + " 'core': 'r1'," + + " 'base_url': 'http://10.0.0.4:8983/solr'," + + " 'node_name': 'node1'," + + " 'state': 'active'" + + " }," + + " 'r2': {" + + " 'core': 'r2'," + + " 'base_url': 'http://10.0.0.4:7574/solr'," + + " 'node_name': 'node2'," + + " 'state': 'active'" + + " }" + + " }" + + " }," + + " 'shard2': {" + + " 'range': '0-7fffffff'," + + " 'replicas': {" + + " 'r3': {" + + " 'core': 'r3'," + + " 'base_url': 'http://10.0.0.4:8983/solr'," + + " 'node_name': 'node1'," + + " 'state': 'active'" + + " }," + + " 'r4': {" + + " 'core': 'r4'," + + " 'base_url': 'http://10.0.0.4:8987/solr'," + + " 'node_name': 'node4'," + + " 'state': 'active'" + + " }," + + " 'r6': {" + + " 'core': 'r6'," + + " 'base_url': 'http://10.0.0.4:8989/solr'," + + " 'node_name': 'node3'," + + " 'state': 'active'" + + " }," + + " 'r5': {" + + " 'core': 'r5'," + + " 'base_url': 'http://10.0.0.4:8983/solr'," + + " 'node_name': 'node1'," + + " 'state': 'active'" + + " }" + + " }" + + " }" + + " }" + + " }" + + "}"; + String metaData = + " {'nodeValues':{" + + " 'node1':{'cores' : 3, 'freedisk' : 700, 'totaldisk' :1000, 'sysprop.zone' : 'east'}," + + " 'node2':{'cores' : 1, 'freedisk' : 900, 'totaldisk' :1000, 'sysprop.zone' : 'west'}," + + " 'node3':{'cores' : 1, 'freedisk' : 900, 'totaldisk' :1000, 'sysprop.zone': 'east'}," + + " 'node4':{'cores' : 1, 'freedisk' : 900, 'totaldisk' :1000, 'sysprop.zone': 'west'}" + + " }," + + " 'replicaValues':[" + + " {'INDEX.sizeInGB': 100, core : r1}," + + " {'INDEX.sizeInGB': 100, core : r2}," + + " {'INDEX.sizeInGB': 100, core : r3}," + + " {'INDEX.sizeInGB': 100, core : r4}," + + " {'INDEX.sizeInGB': 100, core : r5}," + + " {'INDEX.sizeInGB': 100, core : r6}]}"; + + String autoScalingjson = "{cluster-policy:[" + + " { replica : '<3' , shard : '#EACH', sysprop.zone: [east,west] } ]," + + " 'cluster-preferences':[{ minimize : cores},{minimize : freedisk, precision : 50}]}"; + Policy policy = new Policy((Map) Utils.fromJSONString(autoScalingjson)); + Policy.Session session = policy.createSession(createCloudManager(state, metaData)); + List violations = session.getViolations(); + assertEquals(1, violations.size()); + assertEquals(4, violations.get(0).getViolatingReplicas().size()); + assertEquals(1.0, violations.get(0).replicaCountDelta, 0.01); + for (Violation.ReplicaInfoAndErr r : violations.get(0).getViolatingReplicas()) { + assertEquals("shard2", r.replicaInfo.getShard()); + } + + autoScalingjson = "{cluster-policy:[" + + " { replica : '<3' , shard : '#EACH', sysprop.zone: '#EACH' } ]," + + " 'cluster-preferences':[{ minimize : cores},{minimize : freedisk, precision : 50}]}"; + policy = new Policy((Map) Utils.fromJSONString(autoScalingjson)); + session = policy.createSession(createCloudManager(state, metaData)); + violations = session.getViolations(); + assertEquals(1, violations.size()); + assertEquals(4, violations.get(0).getViolatingReplicas().size()); + assertEquals(1.0, violations.get(0).replicaCountDelta, 0.01); + for (Violation.ReplicaInfoAndErr r : violations.get(0).getViolatingReplicas()) { + assertEquals("shard2", r.replicaInfo.getShard()); + } + + } + + static SolrCloudManager createCloudManager(String clusterStateStr, String metadata) { + Map m = (Map) Utils.fromJSONString(clusterStateStr); + Map meta = (Map) Utils.fromJSONString(metadata); + Map nodeVals = (Map) meta.get("nodeValues"); + List replicaVals = (List) meta.get("replicaValues"); + ClusterState clusterState = ClusterState.load(0, m, Collections.emptySet(), null); + Map coreCount = new LinkedHashMap<>(); + Set nodes = new HashSet<>(nodeVals.keySet()); + clusterState.getCollectionStates().forEach((s, collectionRef) -> collectionRef.get() + .forEachReplica((s12, replica) -> { + nodes.add(replica.getNodeName()); + coreCount.computeIfAbsent(replica.getNodeName(), s1 -> new AtomicInteger(0)) + .incrementAndGet(); + })); + + DelegatingClusterStateProvider delegatingClusterStateProvider = new DelegatingClusterStateProvider(null) { + @Override + public ClusterState getClusterState() throws IOException { + return clusterState; + } + + @Override + public Set getLiveNodes() { + return nodes; + } + }; + + return new DelegatingCloudManager(null) { + + @Override + public ClusterStateProvider getClusterStateProvider() { + return delegatingClusterStateProvider; + } + + @Override + public NodeStateProvider getNodeStateProvider() { + + return new SolrClientNodeStateProvider(null) { + @Override + protected ClusterStateProvider getClusterStateProvider() { + return delegatingClusterStateProvider; + } + + @Override + protected Map fetchTagValues(String node, Collection tags) { + Map result = new LinkedHashMap<>(); + for (String tag : tags) { + if (tag.equals(CORES.tagName)) + result.put(CORES.tagName, coreCount.getOrDefault(node, new AtomicInteger(0)).get()); + result.put(tag, Utils.getObjectByPath(nodeVals, true, Arrays.asList(node, tag))); + } + return result; + } + + @Override + public Map>> getReplicaInfo(String node, Collection keys) { + Map>> result = nodeVsCollectionVsShardVsReplicaInfo.computeIfAbsent(node, s -> emptyMap()); + if (!keys.isEmpty()) { + Row.forEachReplica(result, replicaInfo -> { + for (String key : keys) { + if (!replicaInfo.getVariables().containsKey(key)) { + replicaVals.stream() + .filter(it -> replicaInfo.getCore().equals(it.get("core"))) + .findFirst() + .ifPresent(map -> replicaInfo.getVariables().put(key, map.get(key))); + } + } + }); + } + return result; + } + }; + } + }; + } + +}