SOLR-12536: autoscaling policy support to equally distribute replicas on the basis of arbitrary properties

This commit is contained in:
Noble Paul 2018-07-26 21:32:45 +10:00
parent 6ab3ff8392
commit d87ea6b1cc
9 changed files with 478 additions and 94 deletions

View File

@ -121,6 +121,8 @@ New Features
* SOLR-12581: the JSON Facet 'relatedness()' aggregate function now supports a 'min_popularity' option
using the extended type:func syntax (hossman)
* SOLR-12536: autoscaling policy support to equally distribute replicas on the basis of arbitrary properties (noble)
Bug Fixes
----------------------

View File

@ -29,6 +29,7 @@ import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.solr.client.solrj.cloud.autoscaling.Suggestion.ConditionType;
import org.apache.solr.common.MapWriter;
@ -38,6 +39,7 @@ import org.apache.solr.common.util.Utils;
import static java.util.Collections.singletonMap;
import static org.apache.solr.client.solrj.cloud.autoscaling.Clause.TestStatus.PASS;
import static org.apache.solr.client.solrj.cloud.autoscaling.Operand.EQUAL;
import static org.apache.solr.client.solrj.cloud.autoscaling.Operand.GREATER_THAN;
import static org.apache.solr.client.solrj.cloud.autoscaling.Operand.LESS_THAN;
import static org.apache.solr.client.solrj.cloud.autoscaling.Operand.NOT_EQUAL;
@ -46,6 +48,8 @@ import static org.apache.solr.client.solrj.cloud.autoscaling.Policy.ANY;
import static org.apache.solr.common.params.CoreAdminParams.COLLECTION;
import static org.apache.solr.common.params.CoreAdminParams.REPLICA;
import static org.apache.solr.common.params.CoreAdminParams.SHARD;
import static org.apache.solr.common.util.StrUtils.formatString;
import static org.apache.solr.common.util.Utils.toJSONString;
/**
* Represents a set of conditions in the policy
@ -80,7 +84,7 @@ public class Clause implements MapWriter, Comparable<Clause> {
if (globalTagName.isPresent()) {
globalTag = parse(globalTagName.get(), m);
if (m.size() > 2) {
throw new RuntimeException("Only one extra tag supported for the tag " + globalTagName.get() + " in " + Utils.toJSONString(m));
throw new RuntimeException("Only one extra tag supported for the tag " + globalTagName.get() + " in " + toJSONString(m));
}
tag = parse(m.keySet().stream()
.filter(s -> (!globalTagName.get().equals(s) && !IGNORE_TAGS.contains(s)))
@ -89,18 +93,18 @@ public class Clause implements MapWriter, Comparable<Clause> {
collection = parse(COLLECTION, m);
shard = parse(SHARD, m);
if (m.get(REPLICA) == null) {
throw new RuntimeException(StrUtils.formatString("'replica' is required in {0}", Utils.toJSONString(m)));
throw new IllegalArgumentException(formatString("'replica' is required in {0}", toJSONString(m)));
}
this.replica = parse(REPLICA, m);
if (replica.op == WILDCARD) throw new RuntimeException("replica val cannot be null" + Utils.toJSONString(m));
if (replica.op == WILDCARD) throw new IllegalArgumentException("replica val cannot be null" + toJSONString(m));
m.forEach(this::parseCondition);
}
if (tag == null)
throw new RuntimeException("Invalid op, must have one and only one tag other than collection, shard,replica " + Utils.toJSONString(m));
throw new RuntimeException("Invalid op, must have one and only one tag other than collection, shard,replica " + toJSONString(m));
if (tag.name.startsWith(Clause.METRICS_PREFIX)) {
List<String> ss = StrUtils.splitSmart(tag.name, ':');
if (ss.size() < 3 || ss.size() > 4) {
throw new RuntimeException("Invalid metrics: param in " + Utils.toJSONString(m) + " must have at 2 or 3 segments after 'metrics:' separated by ':'");
throw new RuntimeException("Invalid metrics: param in " + toJSONString(m) + " must have at 2 or 3 segments after 'metrics:' separated by ':'");
}
}
doPostValidate(collection, shard, replica, tag, globalTag);
@ -112,7 +116,7 @@ public class Clause implements MapWriter, Comparable<Clause> {
if (condition == null) continue;
String err = condition.varType.postValidate(condition);
if (err != null) {
throw new IllegalArgumentException(StrUtils.formatString("Error in clause : {0}, caused by : {1}", Utils.toJSONString(original), err));
throw new IllegalArgumentException(formatString("Error in clause : {0}, caused by : {1}", toJSONString(original), err));
}
}
}
@ -241,8 +245,8 @@ public class Clause implements MapWriter, Comparable<Clause> {
ComputedType computedType = null;
Object val = m.get(s);
ConditionType varType = Suggestion.getTagType(s);
if (varType.isHidden) {
throw new IllegalArgumentException(StrUtils.formatString("''{0}'' is not allowed in a policy rule : ''{1}'' ", varType.tagName, Utils.toJSONString(m)));
if (varType.meta.isHidden()) {
throw new IllegalArgumentException(formatString("''{0}'' is not allowed in a policy rule : ''{1}'' ", varType.tagName, toJSONString(m)));
}
try {
String conditionName = s.trim();
@ -250,14 +254,17 @@ public class Clause implements MapWriter, Comparable<Clause> {
if (val == null) {
operand = WILDCARD;
expectedVal = Policy.ANY;
} else if (val instanceof List) {
if (!varType.meta.supportArrayVals()) {
throw new IllegalArgumentException(formatString("array values are not supported for {0} in clause {1}",
conditionName, toJSONString(m)));
}
expectedVal = readListVal(m, (List) val, varType, conditionName);
operand = Operand.IN;
} else if (val instanceof String) {
String strVal = ((String) val).trim();
val = strVal;
if (Policy.ANY.equals(strVal) || Policy.EACH.equals(strVal)) operand = WILDCARD;
else if (strVal.startsWith(NOT_EQUAL.operand)) operand = NOT_EQUAL;
else if (strVal.startsWith(GREATER_THAN.operand)) operand = GREATER_THAN;
else if (strVal.startsWith(LESS_THAN.operand)) operand = LESS_THAN;
else operand = Operand.EQUAL;
operand = getOperand(strVal);
strVal = strVal.substring(Operand.EQUAL == operand || WILDCARD == operand ? 0 : 1);
for (ComputedType t : ComputedType.values()) {
String changedVal = t.match(strVal);
@ -265,15 +272,14 @@ public class Clause implements MapWriter, Comparable<Clause> {
computedType = t;
strVal = changedVal;
if (varType == null || !varType.supportedComputedTypes.contains(computedType)) {
throw new IllegalArgumentException(StrUtils.formatString("''{0}'' is not allowed for variable : ''{1}'' , in clause : ''{2}'' ",
t, conditionName, Utils.toJSONString(m)));
throw new IllegalArgumentException(formatString("''{0}'' is not allowed for variable : ''{1}'' , in clause : ''{2}'' ",
t, conditionName, toJSONString(m)));
}
}
}
if (computedType == null && ((String) val).charAt(0) == '#' && !varType.wildCards.contains(val)) {
throw new IllegalArgumentException(StrUtils.formatString("''{0}'' is not an allowed value for ''{1}'' , in clause : ''{2}'' . Supported value is : {3}",
val, conditionName, Utils.toJSONString(m), varType.wildCards));
throw new IllegalArgumentException(formatString("''{0}'' is not an allowed value for ''{1}'' , in clause : ''{2}'' . Supported value is : {3}",
val, conditionName, toJSONString(m), varType.wildCards));
}
operand = varType == null ? operand : varType.getOperand(operand, strVal, computedType);
@ -293,6 +299,49 @@ public class Clause implements MapWriter, Comparable<Clause> {
}
}
private List readListVal(Map m, List val, ConditionType varType, String conditionName) {
List list = val;
list = (List) list.stream()
.map(it -> varType.validate(conditionName, it, true))
.map(it -> {
if (it instanceof String) {
String trim = ((String) it).trim();
if (trim.isEmpty())
throw new IllegalArgumentException(formatString("{0} cannot have an empty string value in clause : {1}",
conditionName, toJSONString(m)));
return trim;
} else return it;
}).filter(it -> it == null ? false : true)
.collect(Collectors.toList());
if (list.isEmpty())
throw new IllegalArgumentException(formatString("{0} cannot have an empty list value in clause : {1}",
conditionName, toJSONString(m)));
for (Object o : list) {
if (o instanceof String) {
if (getOperand((String) o) != EQUAL) {
throw new IllegalArgumentException(formatString("No operators are supported in collection values in condition : {0} in clause : {1}",
conditionName, toJSONString(m)));
}
}
}
if (list.size() < 2) {
throw new IllegalArgumentException(formatString("Array should have more than one value in condition : {0} in clause : {1}",
conditionName, toJSONString(m)));
}
return list;
}
private Operand getOperand(String strVal) {
Operand operand;
if (Policy.ANY.equals(strVal) || Policy.EACH.equals(strVal)) operand = WILDCARD;
else if (strVal.startsWith(NOT_EQUAL.operand)) operand = NOT_EQUAL;
else if (strVal.startsWith(GREATER_THAN.operand)) operand = GREATER_THAN;
else if (strVal.startsWith(LESS_THAN.operand)) operand = LESS_THAN;
else operand = Operand.EQUAL;
return operand;
}
public List<Violation> test(Policy.Session session) {
ComputedValueEvaluator computedValueEvaluator = new ComputedValueEvaluator(session);
Suggestion.ViolationCtx ctx = new Suggestion.ViolationCtx(this, session.matrix, computedValueEvaluator);
@ -305,17 +354,17 @@ public class Clause implements MapWriter, Comparable<Clause> {
computedValueEvaluator.shardName = shardVsCount.getKey();
if (!shard.isPass(computedValueEvaluator.shardName)) continue;
for (Map.Entry<String, ReplicaCount> counts : shardVsCount.getValue().entrySet()) {
if(tag.varType.isPerNodeValue) computedValueEvaluator.node = counts.getKey();
if (tag.varType.meta.isNodeSpecificVal()) computedValueEvaluator.node = counts.getKey();
SealedClause sealedClause = getSealedClause(computedValueEvaluator);
ReplicaCount replicas = counts.getValue();
if (!sealedClause.replica.isPass(replicas)) {
Violation violation = new Violation(sealedClause,
computedValueEvaluator.collName,
computedValueEvaluator.shardName,
tag.varType.isPerNodeValue ? computedValueEvaluator.node : null,
tag.varType.meta.isNodeSpecificVal() ? computedValueEvaluator.node : null,
counts.getValue(),
sealedClause.getReplica().delta(replicas),
tag.varType.isPerNodeValue? null: counts.getKey());
tag.varType.meta.isNodeSpecificVal() ? null : counts.getKey());
tag.varType.addViolatingReplicas(ctx.reset(counts.getKey(), replicas, violation));
}
}
@ -353,7 +402,7 @@ public class Clause implements MapWriter, Comparable<Clause> {
computedValueEvaluator.shardName = shardName;
SealedClause sealedClause = getSealedClause(computedValueEvaluator);
Condition t = sealedClause.getTag();
if(t.varType.isPerNodeValue){
if (t.varType.meta.isNodeSpecificVal()) {
boolean pass = t.getOperand().match(t.val, tagVal) == TestStatus.PASS;
tagVsCount.computeIfAbsent(row.node, s -> new ReplicaCount());
if(pass) {
@ -372,6 +421,23 @@ public class Clause implements MapWriter, Comparable<Clause> {
return collVsShardVsTagVsCount;
}
public boolean isMatch(ReplicaInfo r, String collection, String shard) {
if (type != null && r.getType() != type) return false;
if (r.getCollection().equals(collection)) {
if (this.shard == null || this.shard.val.equals(Policy.ANY)) return true;
else if (this.shard.val.equals(Policy.EACH) && r.getShard().equals(shard)) return true;
else return this.shard.val.equals(r.getShard()) && r.getShard().equals(shard);
}
return false;
}
boolean matchShard(String replicaShard, String shardInContext) {
if (shard == null || shard.val.equals(ANY)) return true;
if (shard.val.equals(Policy.EACH) && replicaShard.equals(shardInContext)) return true;
if (shard.val.equals(replicaShard)) return true;
return false;
}
enum ComputedType {
NULL(),
EQUAL() {
@ -555,7 +621,7 @@ public class Clause implements MapWriter, Comparable<Clause> {
@Override
public String toString() {
return Utils.toJSONString(original);
return toJSONString(original);
}
@Override
@ -625,6 +691,7 @@ public class Clause implements MapWriter, Comparable<Clause> {
public static Double parseDouble(String name, Object val) {
if (val == null) return null;
if (val instanceof RangeVal) val = ((RangeVal) val).actual;
if (val instanceof Double) return (Double) val;
Number num = null;
if (val instanceof String) {
@ -661,7 +728,7 @@ public class Clause implements MapWriter, Comparable<Clause> {
}
public Double delta(double v) {
if (actual != null) return v - actual.doubleValue();
// if (actual != null) return v - actual.doubleValue();
if (v >= max.doubleValue()) return v - max.doubleValue();
if (v <= min.doubleValue()) return v - min.doubleValue();
return 0d;

View File

@ -17,6 +17,7 @@
package org.apache.solr.client.solrj.cloud.autoscaling;
import java.util.List;
import java.util.Objects;
import org.apache.solr.client.solrj.cloud.autoscaling.Clause.TestStatus;
@ -76,6 +77,13 @@ public enum Operand {
return actual - expected;
}
},
IN("", 0) {
@Override
public TestStatus match(Object ruleVal, Object testVal) {
List l = (List) ruleVal;
return (l.contains(testVal)) ? PASS: FAIL;
}
},
RANGE_NOT_EQUAL("", 2) {
@Override
public TestStatus match(Object ruleVal, Object testVal) {

View File

@ -539,7 +539,7 @@ public class Policy implements MapWriter {
.collect(Collectors.toList());
}
Policy getPolicy() {
public Policy getPolicy() {
return Policy.this;
}

View File

@ -106,7 +106,7 @@ public class Row implements MapWriter {
@Override
public String toString() {
return node;
return jsonStr();
}
/**

View File

@ -23,6 +23,7 @@ import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
@ -77,6 +78,8 @@ public class Suggestion {
double max() default -1d;
boolean supportArrayVals() default false;
String metricsKey() default NULL;
ComputedType[] computedValues() default ComputedType.NULL;
@ -207,13 +210,20 @@ public class Suggestion {
public String postValidate(Clause.Condition condition) {
if (condition.computedType == ComputedType.EQUAL) {
if (condition.getClause().tag != null &&
condition.getClause().tag.varType == NODE &&
condition.getClause().tag.op == Operand.WILDCARD) {
// condition.getClause().tag.varType == NODE &&
(condition.getClause().tag.op == Operand.WILDCARD || condition.getClause().tag.op == Operand.IN)) {
return null;
} else {
return "'replica': '#EQUAL` must be used with 'node':'#ANY'";
}
}
if (condition.computedType == ComputedType.ALL) {
if (condition.getClause().tag != null && (condition.getClause().getTag().op == Operand.IN ||
condition.getClause().getTag().op == Operand.WILDCARD)) {
return StrUtils.formatString("array value or wild card cannot be used for tag {0} with replica : '#ALL'",
condition.getClause().tag.getName());
}
}
return null;
}
@ -223,8 +233,9 @@ public class Suggestion {
return Double.valueOf(getRelevantReplicasCount(session, cv, collection, shard));
if (cv.computedType == ComputedType.EQUAL) {
int relevantReplicasCount = getRelevantReplicasCount(session, cv, collection, shard);
if (relevantReplicasCount == 0) return 0;
return (double) session.matrix.size() / (double) relevantReplicasCount;
double bucketsCount = getNumBuckets(session, cv.getClause());
if (relevantReplicasCount == 0 || bucketsCount == 0) return 0;
return (double) relevantReplicasCount / bucketsCount;
} else if (cv.computedType == ComputedType.PERCENT) {
return ComputedType.PERCENT.compute(getRelevantReplicasCount(session, cv, collection, shard), cv);
} else {
@ -232,33 +243,60 @@ public class Suggestion {
}
}
private int getNumBuckets(Policy.Session session, Clause clause) {
if (clause.getTag().getOperand() == Operand.IN) {
return ((Collection) clause.getTag().val).size();
} else if (clause.getTag().getOperand() == Operand.WILDCARD) {
if (clause.getTag().varType == NODE) return session.matrix.size();
Set uniqueVals = new HashSet();
for (Row matrix : session.matrix) {
Object val = matrix.getVal(clause.getTag().name);
if (val != null) uniqueVals.add(val);
}
return uniqueVals.size();
} else {
throw new IllegalArgumentException("Invalid operand for the tag in " + clause);
}
}
},
@Meta(name = ImplicitSnitch.PORT,
type = Long.class,
min = 1,
max = 65535)
max = 65535,
supportArrayVals = true,
wildCards = Policy.EACH
)
PORT(),
@Meta(name = "ip_1",
type = Long.class,
min = 0,
max = 255)
max = 255,
supportArrayVals = true,
wildCards = Policy.EACH)
IP_1(),
@Meta(name = "ip_2",
type = Long.class,
min = 0,
max = 255)
max = 255,
supportArrayVals = true,
wildCards = Policy.EACH)
IP_2(),
@Meta(name = "ip_3",
type = Long.class,
min = 0,
max = 255)
max = 255,
supportArrayVals = true,
wildCards = Policy.EACH)
IP_3(),
@Meta(name = "ip_4",
type = Long.class,
min = 0,
max = 255)
max = 255,
supportArrayVals = true,
wildCards = Policy.EACH)
IP_4(),
@Meta(name = ImplicitSnitch.DISK,
type = Double.class,
min = 0,
@ -452,15 +490,18 @@ public class Suggestion {
type = Long.class,
min = 0)
NUMBER(),
@Meta(name = "STRING",
type = String.class,
wildCards = Policy.EACH)
wildCards = Policy.EACH,
supportArrayVals = true)
STRING(),
@Meta(name = "node",
type = String.class,
isNodeSpecificVal = true,
wildCards = {Policy.ANY, Policy.EACH})
wildCards = {Policy.ANY, Policy.EACH},
supportArrayVals = true)
NODE() {
@Override
public void getSuggestions(SuggestionCtx ctx) {
@ -495,7 +536,8 @@ public class Suggestion {
@Meta(name = ImplicitSnitch.DISKTYPE,
type = String.class,
enumVals = {"ssd", "rotational"})
enumVals = {"ssd", "rotational"},
supportArrayVals = true)
DISKTYPE() {
@Override
public void getSuggestions(SuggestionCtx ctx) {
@ -511,12 +553,10 @@ public class Suggestion {
public final Number min;
public final Number max;
public final Boolean additive;
public final boolean isHidden;
public final Set<String> wildCards;
public final String perReplicaValue;
public final Set<String> associatedPerNodeValues;
public final String metricsAttribute;
public final boolean isPerNodeValue;
public final Set<ComputedType> supportedComputedTypes;
@ -539,14 +579,16 @@ public class Suggestion {
this.associatedPerNodeValues = readSet(meta.associatedPerNodeValue());
this.additive = meta.isAdditive();
this.metricsAttribute = readStr(meta.metricsKey());
this.isPerNodeValue = meta.isNodeSpecificVal();
this.supportedComputedTypes = meta.computedValues()[0] == ComputedType.NULL ?
emptySet() :
unmodifiableSet(new HashSet(Arrays.asList(meta.computedValues())));
this.isHidden = meta.isHidden();
this.wildCards = readSet(meta.wildCards());
}
public String getTagName() {
return meta.name();
}
private String readStr(String s) {
return NULL.equals(s) ? null : s;
}
@ -567,7 +609,7 @@ public class Suggestion {
public void addViolatingReplicas(ViolationCtx ctx) {
for (Row row : ctx.allRows) {
if (ctx.clause.tag.varType.isPerNodeValue && !row.node.equals(ctx.tagKey)) continue;
if (ctx.clause.tag.varType.meta.isNodeSpecificVal() && !row.node.equals(ctx.tagKey)) continue;
collectViolatingReplicas(ctx, row);
}
}
@ -645,7 +687,7 @@ public class Suggestion {
}
private static void collectViolatingReplicas(ViolationCtx ctx, Row row) {
if (ctx.clause.tag.varType.isPerNodeValue) {
if (ctx.clause.tag.varType.meta.isNodeSpecificVal()) {
row.forEachReplica(replica -> {
if (ctx.clause.collection.isPass(replica.getCollection()) && ctx.clause.getShard().isPass(replica.getShard())) {
ctx.currentViolation.addReplica(new ReplicaInfoAndErr(replica)
@ -656,7 +698,7 @@ public class Suggestion {
row.forEachReplica(replica -> {
if (ctx.clause.replica.isPass(0) && !ctx.clause.tag.isPass(row)) return;
if (!ctx.clause.replica.isPass(0) && ctx.clause.tag.isPass(row)) return;
if (!ctx.currentViolation.matchShard(replica.getShard())) return;
if(!ctx.currentViolation.getClause().matchShard(replica.getShard(), ctx.currentViolation.shard)) return;
if (!ctx.clause.collection.isPass(ctx.currentViolation.coll) || !ctx.clause.shard.isPass(ctx.currentViolation.shard))
return;
ctx.currentViolation.addReplica(new ReplicaInfoAndErr(replica).withDelta(ctx.clause.tag.delta(row.getVal(ctx.clause.tag.name))));
@ -672,12 +714,8 @@ public class Suggestion {
Clause clause = cv.getClause();
for (Row row : session.matrix) {
row.forEachReplica(replicaInfo -> {
if (replicaInfo.getCollection().equals(collection)) {
if (clause.getShard() ==null || clause.getShard().op == Operand.WILDCARD || replicaInfo.getShard().equals(shard)) {
if (cv.getClause().type == null || replicaInfo.getType() == cv.getClause().type)
totalReplicasOfInterest.incrementAndGet();
}
}
if (clause.isMatch(replicaInfo, collection, shard))
totalReplicasOfInterest.incrementAndGet();
});
}
return totalReplicasOfInterest.get();
@ -719,9 +757,6 @@ public class Suggestion {
}
}
/*public static final Map<String, String> tagVsPerReplicaVal = Stream.of(ConditionType.values())
.filter(tag -> tag.perReplicaValue != null)
.collect(Collectors.toMap(tag -> tag.tagName, tag -> tag.perReplicaValue));*/
static {
for (Suggestion.ConditionType t : Suggestion.ConditionType.values()) Suggestion.validatetypes.put(t.tagName, t);
}

View File

@ -72,7 +72,7 @@ public class SolrClientNodeStateProvider implements NodeStateProvider, MapWriter
private final CloudSolrClient solrClient;
private final Map<String, Map<String, Map<String, List<ReplicaInfo>>>> nodeVsCollectionVsShardVsReplicaInfo = new HashMap<>();
protected final Map<String, Map<String, Map<String, List<ReplicaInfo>>>> nodeVsCollectionVsShardVsReplicaInfo = new HashMap<>();
private Map<String, Object> snitchSession = new HashMap<>();
private Map<String, Map> nodeVsTags = new HashMap<>();
@ -140,28 +140,26 @@ public class SolrClientNodeStateProvider implements NodeStateProvider, MapWriter
public Map<String, Map<String, List<ReplicaInfo>>> getReplicaInfo(String node, Collection<String> keys) {
Map<String, Map<String, List<ReplicaInfo>>> result = nodeVsCollectionVsShardVsReplicaInfo.computeIfAbsent(node, s -> emptyMap());
if (!keys.isEmpty()) {
Map<String, Pair<String, ReplicaInfo>> keyVsReplica = new HashMap<>();
Map<String, Pair<String, ReplicaInfo>> metricsKeyVsTagReplica = new HashMap<>();
Row.forEachReplica(result, r -> {
for (String key : keys) {
if (r.getVariables().containsKey(key)) continue;
String perReplicaAttrKeyPrefix = "solr.core." + r.getCollection() + "." + r.getShard() + "." + Utils.parseMetricsReplicaName(r.getCollection(), r.getCore()) + ":";
if (r.getVariables().containsKey(key)) continue;// it's already collected
String perReplicaMetricsKey = "solr.core." + r.getCollection() + "." + r.getShard() + "." + Utils.parseMetricsReplicaName(r.getCollection(), r.getCore()) + ":";
Suggestion.ConditionType tagType = Suggestion.getTagType(key);
String perReplicaValue = key;
if (tagType != null) {
perReplicaValue = tagType.metricsAttribute;
perReplicaValue = perReplicaValue == null ? key : perReplicaValue;
}
perReplicaAttrKeyPrefix += perReplicaValue;
keyVsReplica.put(perReplicaAttrKeyPrefix, new Pair<>(key, r));
perReplicaMetricsKey += perReplicaValue;
metricsKeyVsTagReplica.put(perReplicaMetricsKey, new Pair<>(key, r));
}
});
if (!keyVsReplica.isEmpty()) {
Map<String, Object> tags = fetchReplicaMetrics(node,
keyVsReplica.entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getKey)));
tags.forEach((k, o) -> {
Pair<String, ReplicaInfo> p = keyVsReplica.get(k);
if (!metricsKeyVsTagReplica.isEmpty()) {
Map<String, Object> tagValues = fetchReplicaMetrics(node, metricsKeyVsTagReplica);
tagValues.forEach((k, o) -> {
Pair<String, ReplicaInfo> p = metricsKeyVsTagReplica.get(k);
Suggestion.ConditionType validator = Suggestion.getTagType(p.first());
if (validator != null) o = validator.convertVal(o);
if (p.second() != null) p.second().getVariables().put(p.first(), o);
@ -172,9 +170,11 @@ public class SolrClientNodeStateProvider implements NodeStateProvider, MapWriter
return result;
}
protected Map<String,Object> fetchReplicaMetrics(String solrNode, Map<String, Object> metricsKeyVsTag) {
protected Map<String, Object> fetchReplicaMetrics(String node, Map<String, Pair<String, ReplicaInfo>> metricsKeyVsTagReplica) {
Map<String, Object> collect = metricsKeyVsTagReplica.entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getKey));
ClientSnitchCtx ctx = new ClientSnitchCtx(null, null, emptyMap(), solrClient);
fetchReplicaMetrics(solrNode, ctx,metricsKeyVsTag);
fetchReplicaMetrics(node, ctx, collect);
return ctx.getTags();
}

View File

@ -72,6 +72,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.solr.client.solrj.cloud.autoscaling.Suggestion.ConditionType.CORES;
import static org.apache.solr.client.solrj.cloud.autoscaling.Suggestion.ConditionType.FREEDISK;
import static org.apache.solr.client.solrj.cloud.autoscaling.Suggestion.ConditionType.REPLICA;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDREPLICA;
@ -79,7 +80,7 @@ import static org.apache.solr.common.params.CollectionParams.CollectionAction.MO
public class TestPolicy extends SolrTestCaseJ4 {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private Suggester createSuggester(SolrCloudManager cloudManager, Map jsonObj, Suggester seed) throws IOException, InterruptedException {
static Suggester createSuggester(SolrCloudManager cloudManager, Map jsonObj, Suggester seed) throws IOException, InterruptedException {
Policy.Session session = null;
if (seed != null) session = seed.session;
else {
@ -96,7 +97,7 @@ public class TestPolicy extends SolrTestCaseJ4 {
return result;
}
private SolrCloudManager createCloudManager(Map jsonObj) {
static SolrCloudManager createCloudManager(Map jsonObj) {
return cloudManagerWithData(jsonObj);
}
@ -282,9 +283,35 @@ public class TestPolicy extends SolrTestCaseJ4 {
() -> Clause.create("{replica:'<3', shard: '#ANV', node:'#ANY'}"));
expectThrows(IllegalArgumentException.class,
() -> Clause.create("{replica:'<3', shard: '#EACH', node:'#E4CH'}"));
try {
Clause.create("{replica:0, 'ip_1':'<30%'}");
fail("Expected exception");
} catch (Exception e) {
assertTrue(e.getMessage().contains("'%' is not allowed for variable : 'ip_1'"));
}
clause = Clause.create("{replica: '#ALL', freedisk:'>20%'}");
clause = Clause.create("{replica: '#ALL', sysprop.zone :'west'}");
expectThrows(IllegalArgumentException.class,
() -> Clause.create("{replica: [3,4] , freedisk:'>20'}"));
clause = Clause.create("{replica: 3 , port:[8983, 7574]}");
assertEquals(Operand.IN, clause.tag.op);
expectThrows(IllegalArgumentException.class,
() -> Clause.create("{replica: 3 , sysprop.zone :['east', ' ', 'west']}"));
expectThrows(IllegalArgumentException.class,
() -> Clause.create("{replica: 3 , sysprop.zone :[]}"));
expectThrows(IllegalArgumentException.class,
() -> Clause.create("{replica: 3 , sysprop.zone :['!east','west']}"));
expectThrows(IllegalArgumentException.class,
() -> Clause.create("{replica: '#ALL' , shard: '#EACH' , sysprop.zone:[east, west]}"));
expectThrows(IllegalArgumentException.class,
() -> Clause.create("{replica: '#ALL' , shard: '#EACH' , sysprop.zone:'#EACH'}"));
clause = Clause.create("{replica: '#EQUAL' , shard: '#EACH' , sysprop.zone:[east, west]}");
assertEquals(Clause.ComputedType.EQUAL, clause.replica.computedType);
assertEquals(Operand.IN, clause.tag.op);
expectThrows(IllegalArgumentException.class,
() -> Clause.create("{replica: '#EQUAL' , shard: '#EACH' , sysprop.zone:[east]}"));
}
@ -375,10 +402,10 @@ public class TestPolicy extends SolrTestCaseJ4 {
}
@Override
protected Map<String, Object> fetchReplicaMetrics(String solrNode, Map<String, Object> metricsKeyVsTag) {
protected Map<String, Object> fetchReplicaMetrics(String node, Map<String, Pair<String, ReplicaInfo>> metricsKeyVsTagReplica) {
//e.g: solr.core.perReplicaDataColl.shard1.replica_n4:INDEX.sizeInBytes
Map<String, Object> result = new HashMap<>();
metricsKeyVsTag.forEach((k, v) -> {
metricsKeyVsTagReplica.forEach((k, v) -> {
if (k.endsWith(":INDEX.sizeInBytes")) result.put(k, 100);
});
@ -417,9 +444,10 @@ public class TestPolicy extends SolrTestCaseJ4 {
Violation violation = violations.get(0);
assertEquals("node1", violation.node);
RangeVal val = (RangeVal) violation.getClause().replica.val;
assertEquals(0.0, val.min);
assertEquals(1.0, val.max);
assertEquals(0, Preference.compareWithTolerance(val.actual.doubleValue(), 0.833, 1));
assertEquals(1.0d, val.min.doubleValue(), 0.01);
assertEquals(2.0, val.max.doubleValue(), 0.01);
assertEquals(1.2d, val.actual.doubleValue(), 0.01d);
assertEquals(1, violation.replicaCountDelta.doubleValue(), 0.01);
assertEquals(3, violation.getViolatingReplicas().size());
Set<String> expected = ImmutableSet.of("r1", "r3", "r5");
for (Violation.ReplicaInfoAndErr replicaInfoAndErr : violation.getViolatingReplicas()) {
@ -476,13 +504,10 @@ public class TestPolicy extends SolrTestCaseJ4 {
assertFalse(c.tag.isPass("12.6"));
assertFalse(c.tag.isPass(12.6d));
try {
c = Clause.create("{replica:0, 'ip_1':'<30%'}");
fail("Expected exception");
} catch (Exception e) {
assertTrue(e.getMessage().contains("'%' is not allowed for variable : 'ip_1'"));
}
c = Clause.create("{replica: '<3', sysprop.zone : [east, west]}");
assertTrue(c.tag.isPass("east"));
assertTrue(c.tag.isPass("west"));
assertFalse(c.tag.isPass("south"));
}
@ -667,7 +692,7 @@ public class TestPolicy extends SolrTestCaseJ4 {
return cloudManagerWithData((Map) Utils.fromJSONString(data));
}
private static SolrCloudManager cloudManagerWithData(Map m) {
static SolrCloudManager cloudManagerWithData(Map m) {
Map replicaInfo = (Map) m.get("replicaInfo");
replicaInfo.forEach((node, val) -> {
Map m1 = (Map) val;
@ -1737,6 +1762,34 @@ public class TestPolicy extends SolrTestCaseJ4 {
public void testReplicaPercentage() {
String dataproviderdata = "{" +
" 'liveNodes':[" +
" '10.0.0.6:7574_solr'," +
" '10.0.0.6:8983_solr']," +
" 'replicaInfo':{" +
" '10.0.0.6:7574_solr':{}," +
" '10.0.0.6:8983_solr':{'mycoll1':{" +
" 'shard1':[{'core_node1':{'type':'NRT'}},{'core_node2':{'type':'NRT'}},{'core_node3':{'type':'NRT'}}]}}}," +
" 'nodeValues':{" +
" '10.0.0.6:7574_solr':{" +
" 'node':'10.0.0.6:7574_solr'," +
" 'cores':0}," +
" '10.0.0.6:8983_solr':{" +
" 'node':'10.0.0.6:8983_solr'," +
" 'cores':3}}}";
String autoScalingjson = " { cluster-policy:[" +
" { replica :'51%', shard:'#EACH', node:'#ANY'}]," +
" cluster-preferences :[{ minimize : cores }]}";
AutoScalingConfig autoScalingConfig = new AutoScalingConfig((Map<String, Object>) Utils.fromJSONString(autoScalingjson));
Policy.Session session = autoScalingConfig.getPolicy().createSession(cloudManagerWithData(dataproviderdata));
List<Violation> violations = session.getViolations();
assertEquals(1, violations.size());
assertEquals(1.0d, violations.get(0).replicaCountDelta, 0.01);
assertEquals(1.53d, ((RangeVal) violations.get(0).getClause().getReplica().val).actual);
dataproviderdata = "{" +
" 'liveNodes':[" +
" '10.0.0.6:7574_solr'," +
" '10.0.0.6:8983_solr']," +
@ -1752,17 +1805,12 @@ public class TestPolicy extends SolrTestCaseJ4 {
" '10.0.0.6:8983_solr':{" +
" 'node':'10.0.0.6:8983_solr'," +
" 'cores':2}}}";
String autoScalingjson = " { cluster-policy:[" +
" { replica :'<51%', node:'#ANY'}]," +
" cluster-preferences :[{ minimize : cores }]}";
AutoScalingConfig autoScalingConfig = new AutoScalingConfig((Map<String, Object>) Utils.fromJSONString(autoScalingjson));
Policy.Session session = autoScalingConfig.getPolicy().createSession(cloudManagerWithData(dataproviderdata));
List<Violation> violations = session.getViolations();
assertEquals(1, violations.size());
session = autoScalingConfig.getPolicy().createSession(cloudManagerWithData(dataproviderdata));
violations = session.getViolations();
assertEquals(0, violations.size());
autoScalingjson = " { cluster-policy:[" +
" { replica :'<51%', shard: '#EACH' , node:'#ANY'}]," +
" { replica :'51%', shard: '#EACH' , node:'#ANY'}]," +
" cluster-preferences :[{ minimize : cores }]}";
autoScalingConfig = new AutoScalingConfig((Map<String, Object>) Utils.fromJSONString(autoScalingjson));
session = autoScalingConfig.getPolicy().createSession(cloudManagerWithData(dataproviderdata));
@ -1909,6 +1957,7 @@ public class TestPolicy extends SolrTestCaseJ4 {
" cluster-preferences :[{ minimize : cores, precision : 2 }]}";
cfg = new AutoScalingConfig((Map<String, Object>) Utils.fromJSONString(autoScalingjson));
violations = cfg.getPolicy().createSession(cloudManagerWithData(dataproviderdata)).getViolations();
assertEquals(1, violations.size());
assertEquals(-4, violations.get(0).replicaCountDelta, 0.1);
assertEquals(1, violations.size());
assertEquals(4, violations.get(0).getViolatingReplicas().size());
@ -2807,7 +2856,7 @@ public void testUtilizeNodeFailure2() throws Exception {
List l = (List) ((Map) Utils.fromJSONString(rowsData)).get("sortedNodes");
List<Suggestion.ConditionType> params = new ArrayList<>();
params.add(Suggestion.ConditionType.CORES);
params.add(CORES);
params.add(Suggestion.ConditionType.FREEDISK);
params.add(Suggestion.ConditionType.SYSLOADAVG);
params.add(Suggestion.ConditionType.NODE);

View File

@ -0,0 +1,223 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.client.solrj.cloud.autoscaling;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.solr.SolrTestCaseJ4;
import org.apache.solr.client.solrj.cloud.NodeStateProvider;
import org.apache.solr.client.solrj.cloud.SolrCloudManager;
import org.apache.solr.client.solrj.impl.ClusterStateProvider;
import org.apache.solr.client.solrj.impl.SolrClientNodeStateProvider;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.util.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static java.util.Collections.emptyMap;
import static org.apache.solr.client.solrj.cloud.autoscaling.Suggestion.ConditionType.CORES;
public class TestPolicy2 extends SolrTestCaseJ4 {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
public void testEqualOnNonNode() {
String state = "{" +
" 'coll1': {" +
" 'router': {" +
" 'name': 'compositeId'" +
" }," +
" 'shards': {" +
" 'shard1': {" +
" 'range': '80000000-ffffffff'," +
" 'replicas': {" +
" 'r1': {" +
" 'core': 'r1'," +
" 'base_url': 'http://10.0.0.4:8983/solr'," +
" 'node_name': 'node1'," +
" 'state': 'active'" +
" }," +
" 'r2': {" +
" 'core': 'r2'," +
" 'base_url': 'http://10.0.0.4:7574/solr'," +
" 'node_name': 'node2'," +
" 'state': 'active'" +
" }" +
" }" +
" }," +
" 'shard2': {" +
" 'range': '0-7fffffff'," +
" 'replicas': {" +
" 'r3': {" +
" 'core': 'r3'," +
" 'base_url': 'http://10.0.0.4:8983/solr'," +
" 'node_name': 'node1'," +
" 'state': 'active'" +
" }," +
" 'r4': {" +
" 'core': 'r4'," +
" 'base_url': 'http://10.0.0.4:8987/solr'," +
" 'node_name': 'node4'," +
" 'state': 'active'" +
" }," +
" 'r6': {" +
" 'core': 'r6'," +
" 'base_url': 'http://10.0.0.4:8989/solr'," +
" 'node_name': 'node3'," +
" 'state': 'active'" +
" }," +
" 'r5': {" +
" 'core': 'r5'," +
" 'base_url': 'http://10.0.0.4:8983/solr'," +
" 'node_name': 'node1'," +
" 'state': 'active'" +
" }" +
" }" +
" }" +
" }" +
" }" +
"}";
String metaData =
" {'nodeValues':{" +
" 'node1':{'cores' : 3, 'freedisk' : 700, 'totaldisk' :1000, 'sysprop.zone' : 'east'}," +
" 'node2':{'cores' : 1, 'freedisk' : 900, 'totaldisk' :1000, 'sysprop.zone' : 'west'}," +
" 'node3':{'cores' : 1, 'freedisk' : 900, 'totaldisk' :1000, 'sysprop.zone': 'east'}," +
" 'node4':{'cores' : 1, 'freedisk' : 900, 'totaldisk' :1000, 'sysprop.zone': 'west'}" +
" }," +
" 'replicaValues':[" +
" {'INDEX.sizeInGB': 100, core : r1}," +
" {'INDEX.sizeInGB': 100, core : r2}," +
" {'INDEX.sizeInGB': 100, core : r3}," +
" {'INDEX.sizeInGB': 100, core : r4}," +
" {'INDEX.sizeInGB': 100, core : r5}," +
" {'INDEX.sizeInGB': 100, core : r6}]}";
String autoScalingjson = "{cluster-policy:[" +
" { replica : '<3' , shard : '#EACH', sysprop.zone: [east,west] } ]," +
" 'cluster-preferences':[{ minimize : cores},{minimize : freedisk, precision : 50}]}";
Policy policy = new Policy((Map<String, Object>) Utils.fromJSONString(autoScalingjson));
Policy.Session session = policy.createSession(createCloudManager(state, metaData));
List<Violation> violations = session.getViolations();
assertEquals(1, violations.size());
assertEquals(4, violations.get(0).getViolatingReplicas().size());
assertEquals(1.0, violations.get(0).replicaCountDelta, 0.01);
for (Violation.ReplicaInfoAndErr r : violations.get(0).getViolatingReplicas()) {
assertEquals("shard2", r.replicaInfo.getShard());
}
autoScalingjson = "{cluster-policy:[" +
" { replica : '<3' , shard : '#EACH', sysprop.zone: '#EACH' } ]," +
" 'cluster-preferences':[{ minimize : cores},{minimize : freedisk, precision : 50}]}";
policy = new Policy((Map<String, Object>) Utils.fromJSONString(autoScalingjson));
session = policy.createSession(createCloudManager(state, metaData));
violations = session.getViolations();
assertEquals(1, violations.size());
assertEquals(4, violations.get(0).getViolatingReplicas().size());
assertEquals(1.0, violations.get(0).replicaCountDelta, 0.01);
for (Violation.ReplicaInfoAndErr r : violations.get(0).getViolatingReplicas()) {
assertEquals("shard2", r.replicaInfo.getShard());
}
}
static SolrCloudManager createCloudManager(String clusterStateStr, String metadata) {
Map m = (Map) Utils.fromJSONString(clusterStateStr);
Map meta = (Map) Utils.fromJSONString(metadata);
Map nodeVals = (Map) meta.get("nodeValues");
List<Map> replicaVals = (List<Map>) meta.get("replicaValues");
ClusterState clusterState = ClusterState.load(0, m, Collections.emptySet(), null);
Map<String, AtomicInteger> coreCount = new LinkedHashMap<>();
Set<String> nodes = new HashSet<>(nodeVals.keySet());
clusterState.getCollectionStates().forEach((s, collectionRef) -> collectionRef.get()
.forEachReplica((s12, replica) -> {
nodes.add(replica.getNodeName());
coreCount.computeIfAbsent(replica.getNodeName(), s1 -> new AtomicInteger(0))
.incrementAndGet();
}));
DelegatingClusterStateProvider delegatingClusterStateProvider = new DelegatingClusterStateProvider(null) {
@Override
public ClusterState getClusterState() throws IOException {
return clusterState;
}
@Override
public Set<String> getLiveNodes() {
return nodes;
}
};
return new DelegatingCloudManager(null) {
@Override
public ClusterStateProvider getClusterStateProvider() {
return delegatingClusterStateProvider;
}
@Override
public NodeStateProvider getNodeStateProvider() {
return new SolrClientNodeStateProvider(null) {
@Override
protected ClusterStateProvider getClusterStateProvider() {
return delegatingClusterStateProvider;
}
@Override
protected Map<String, Object> fetchTagValues(String node, Collection<String> tags) {
Map<String, Object> result = new LinkedHashMap<>();
for (String tag : tags) {
if (tag.equals(CORES.tagName))
result.put(CORES.tagName, coreCount.getOrDefault(node, new AtomicInteger(0)).get());
result.put(tag, Utils.getObjectByPath(nodeVals, true, Arrays.asList(node, tag)));
}
return result;
}
@Override
public Map<String, Map<String, List<ReplicaInfo>>> getReplicaInfo(String node, Collection<String> keys) {
Map<String, Map<String, List<ReplicaInfo>>> result = nodeVsCollectionVsShardVsReplicaInfo.computeIfAbsent(node, s -> emptyMap());
if (!keys.isEmpty()) {
Row.forEachReplica(result, replicaInfo -> {
for (String key : keys) {
if (!replicaInfo.getVariables().containsKey(key)) {
replicaVals.stream()
.filter(it -> replicaInfo.getCore().equals(it.get("core")))
.findFirst()
.ifPresent(map -> replicaInfo.getVariables().put(key, map.get(key)));
}
}
});
}
return result;
}
};
}
};
}
}