mirror of https://github.com/apache/lucene.git
SOLR-11985: Support percentage values in replica attribute in autoscaling policy
SOLR-12511: Support non integer values for replica in autoscaling policy SOLR-12517: Support range values for replica in autoscaling policy
This commit is contained in:
parent
095f9eb90d
commit
1eb2676f27
|
@ -90,6 +90,12 @@ New Features
|
|||
|
||||
* SOLR-12398: The JSON Facet API now supports type=heatmap facets, just as classic faceting does. (David Smiley)
|
||||
|
||||
* SOLR-11985: Support percentage values in replica attribute in autoscaling policy (noble)
|
||||
|
||||
* SOLR-12511: Support non integer values for replica in autoscaling policy (noble)
|
||||
|
||||
* SOLR-12517: Support range values for replica in autoscaling policy (noble)
|
||||
|
||||
Bug Fixes
|
||||
----------------------
|
||||
|
||||
|
|
|
@ -36,13 +36,13 @@ import java.util.stream.Stream;
|
|||
|
||||
import org.apache.solr.api.Api;
|
||||
import org.apache.solr.api.ApiBag;
|
||||
import org.apache.solr.client.solrj.cloud.SolrCloudManager;
|
||||
import org.apache.solr.client.solrj.cloud.autoscaling.AutoScalingConfig;
|
||||
import org.apache.solr.client.solrj.cloud.autoscaling.BadVersionException;
|
||||
import org.apache.solr.client.solrj.cloud.autoscaling.Clause;
|
||||
import org.apache.solr.client.solrj.cloud.autoscaling.Policy;
|
||||
import org.apache.solr.client.solrj.cloud.autoscaling.PolicyHelper;
|
||||
import org.apache.solr.client.solrj.cloud.autoscaling.Preference;
|
||||
import org.apache.solr.client.solrj.cloud.SolrCloudManager;
|
||||
import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventProcessorStage;
|
||||
import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventType;
|
||||
import org.apache.solr.common.MapWriter;
|
||||
|
@ -52,6 +52,7 @@ import org.apache.solr.common.params.CollectionAdminParams;
|
|||
import org.apache.solr.common.util.CommandOperation;
|
||||
import org.apache.solr.common.util.IOUtils;
|
||||
import org.apache.solr.common.util.StrUtils;
|
||||
import org.apache.solr.common.util.TimeSource;
|
||||
import org.apache.solr.common.util.Utils;
|
||||
import org.apache.solr.core.SolrResourceLoader;
|
||||
import org.apache.solr.handler.RequestHandlerBase;
|
||||
|
@ -61,7 +62,6 @@ import org.apache.solr.request.SolrRequestHandler;
|
|||
import org.apache.solr.response.SolrQueryResponse;
|
||||
import org.apache.solr.security.AuthorizationContext;
|
||||
import org.apache.solr.security.PermissionNameProvider;
|
||||
import org.apache.solr.common.util.TimeSource;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
@ -252,7 +252,7 @@ public class AutoScalingHandler extends RequestHandlerBase implements Permission
|
|||
}
|
||||
List<Clause> cp = null;
|
||||
try {
|
||||
cp = clusterPolicy.stream().map(Clause::new).collect(Collectors.toList());
|
||||
cp = clusterPolicy.stream().map(Clause::create).collect(Collectors.toList());
|
||||
} catch (Exception e) {
|
||||
op.addError(e.getMessage());
|
||||
return currentConfig;
|
||||
|
|
|
@ -831,7 +831,7 @@ public class AutoScalingHandlerTest extends SolrCloudTestCase {
|
|||
for (Map<String, Object> violation : violations) {
|
||||
assertEquals("readApiTestViolations", violation.get("collection"));
|
||||
assertEquals("shard1", violation.get("shard"));
|
||||
assertEquals(2l, getObjectByPath(violation, true, "violation/delta"));
|
||||
assertEquals(2d, getObjectByPath(violation, true, "violation/delta"));
|
||||
assertEquals(3l, getObjectByPath(violation, true, "violation/replica/NRT"));
|
||||
assertNotNull(violation.get("clause"));
|
||||
}
|
||||
|
@ -842,7 +842,7 @@ public class AutoScalingHandlerTest extends SolrCloudTestCase {
|
|||
response = solrClient.request(req);
|
||||
List l = (List) response.get("suggestions");
|
||||
assertNotNull(l);
|
||||
assertEquals(1, l.size());
|
||||
assertEquals(2, l.size());
|
||||
for (int i = 0; i < l.size(); i++) {
|
||||
Object suggestion = l.get(i);
|
||||
assertEquals("violation", Utils.getObjectByPath(suggestion, true, "type"));
|
||||
|
|
|
@ -44,14 +44,15 @@ class AddReplicaSuggester extends Suggester {
|
|||
}
|
||||
for (Pair<String, String> shard : shards) {
|
||||
Replica.Type type = Replica.Type.get((String) hints.get(Hint.REPLICATYPE));
|
||||
//iterate through elemenodesnts and identify the least loaded
|
||||
//iterate through nodes and identify the least loaded
|
||||
List<Violation> leastSeriousViolation = null;
|
||||
Row bestNode = null;
|
||||
for (int i = getMatrix().size() - 1; i >= 0; i--) {
|
||||
Row row = getMatrix().get(i);
|
||||
if (!isNodeSuitableForReplicaAddition(row)) continue;
|
||||
Row tmpRow = row.addReplica(shard.first(), shard.second(), type);
|
||||
List<Violation> errs = testChangedMatrix(strict, tmpRow.session.matrix);
|
||||
List<Violation> errs = testChangedMatrix(strict, tmpRow.session);
|
||||
|
||||
if (!containsNewErrors(errs)) {
|
||||
if (isLessSerious(errs, leastSeriousViolation)) {
|
||||
leastSeriousViolation = errs;
|
||||
|
|
|
@ -28,6 +28,7 @@ import java.util.Map;
|
|||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.function.Function;
|
||||
|
||||
import org.apache.solr.common.MapWriter;
|
||||
import org.apache.solr.common.cloud.Replica;
|
||||
|
@ -52,13 +53,25 @@ import static org.apache.solr.common.params.CoreAdminParams.SHARD;
|
|||
public class Clause implements MapWriter, Comparable<Clause> {
|
||||
private static final Set<String> IGNORE_TAGS = new HashSet<>(Arrays.asList(REPLICA, COLLECTION, SHARD, "strict", "type"));
|
||||
|
||||
final boolean hasComputedValue;
|
||||
final Map<String, Object> original;
|
||||
Condition collection, shard, replica, tag, globalTag;
|
||||
final Replica.Type type;
|
||||
|
||||
boolean strict;
|
||||
|
||||
public Clause(Map<String, Object> m) {
|
||||
protected Clause(Clause clause, Function<Condition, Object> computedValueEvaluator) {
|
||||
this.original = clause.original;
|
||||
this.type = clause.type;
|
||||
this.collection = clause.collection;
|
||||
this.shard = clause.shard;
|
||||
this.tag = evaluateValue(clause.tag, computedValueEvaluator);
|
||||
this.replica = evaluateValue(clause.replica, computedValueEvaluator);
|
||||
this.globalTag = evaluateValue(clause.globalTag, computedValueEvaluator);
|
||||
this.hasComputedValue = clause.hasComputedValue;
|
||||
this.strict = clause.strict;
|
||||
}
|
||||
|
||||
private Clause(Map<String, Object> m) {
|
||||
this.original = Utils.getDeepCopy(m, 10);
|
||||
String type = (String) m.get("type");
|
||||
this.type = type == null || ANY.equals(type) ? null : Replica.Type.valueOf(type.toUpperCase(Locale.ROOT));
|
||||
|
@ -90,7 +103,47 @@ public class Clause implements MapWriter, Comparable<Clause> {
|
|||
throw new RuntimeException("Invalid metrics: param in " + Utils.toJSONString(m) + " must have at 2 or 3 segments after 'metrics:' separated by ':'");
|
||||
}
|
||||
}
|
||||
hasComputedValue = hasComputedValue();
|
||||
}
|
||||
|
||||
public static Clause create(Map<String, Object> m) {
|
||||
Clause clause = new Clause(m);
|
||||
return clause.hasComputedValue() ?
|
||||
clause :
|
||||
clause.getSealedClause(null);
|
||||
}
|
||||
|
||||
public static String parseString(Object val) {
|
||||
if (val instanceof Condition) val = ((Condition) val).val;
|
||||
return val == null ? null : String.valueOf(val);
|
||||
}
|
||||
|
||||
public Condition getCollection() {
|
||||
return collection;
|
||||
}
|
||||
|
||||
public Condition getShard() {
|
||||
return shard;
|
||||
}
|
||||
|
||||
public Condition getReplica() {
|
||||
return replica;
|
||||
}
|
||||
|
||||
public Condition getTag() {
|
||||
return tag;
|
||||
}
|
||||
|
||||
public Condition getGlobalTag() {
|
||||
return globalTag;
|
||||
}
|
||||
|
||||
private Condition evaluateValue(Condition condition, Function<Condition, Object> computedValueEvaluator) {
|
||||
if (condition == null) return null;
|
||||
if (condition.computationType == null) return condition;
|
||||
Object val = computedValueEvaluator.apply(condition);
|
||||
val = condition.op.readRuleValue(new Condition(condition.name, val, condition.op, null, null));
|
||||
return new Condition(condition.name, val, condition.op, null, this);
|
||||
}
|
||||
|
||||
public boolean doesOverride(Clause that) {
|
||||
|
@ -118,6 +171,14 @@ public class Clause implements MapWriter, Comparable<Clause> {
|
|||
return 0;
|
||||
}
|
||||
|
||||
private boolean hasComputedValue() {
|
||||
if (replica != null && replica.computationType != null) return true;
|
||||
if (tag != null && tag.computationType != null) return true;
|
||||
if (globalTag != null && globalTag.computationType != null) return true;
|
||||
return false;
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public int compareTo(Clause that) {
|
||||
int v = Integer.compare(this.tag.op.priority, that.tag.op.priority);
|
||||
|
@ -125,7 +186,7 @@ public class Clause implements MapWriter, Comparable<Clause> {
|
|||
if (this.isPerCollectiontag() && that.isPerCollectiontag()) {
|
||||
v = Integer.compare(this.replica.op.priority, that.replica.op.priority);
|
||||
if (v == 0) {// higher the number of replicas , harder to satisfy
|
||||
v = Long.compare((Long) this.replica.val, (Long) that.replica.val);
|
||||
v = Preference.compareWithTolerance((Double) this.replica.val, (Double) that.replica.val, 1);
|
||||
v = this.replica.op == LESS_THAN ? v : v * -1;
|
||||
}
|
||||
if (v == 0) v = compareTypes(this.type, that.type);
|
||||
|
@ -135,39 +196,217 @@ public class Clause implements MapWriter, Comparable<Clause> {
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (this == o) return true;
|
||||
if (o == null || getClass() != o.getClass()) return false;
|
||||
Clause that = (Clause)o;
|
||||
return Objects.equals(this.original, that.original);
|
||||
}
|
||||
|
||||
void addTags(Collection<String> params) {
|
||||
if (globalTag != null && !params.contains(globalTag.name)) params.add(globalTag.name);
|
||||
if (tag != null && !params.contains(tag.name)) params.add(tag.name);
|
||||
}
|
||||
|
||||
boolean isReplicaZero() {
|
||||
return replica != null && replica.getOperand() == EQUAL && 0L == (Long) replica.val;
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (this == o) return true;
|
||||
if (!(o instanceof Clause)) return false;
|
||||
Clause that = (Clause)o;
|
||||
return Objects.equals(this.original, that.original);
|
||||
}
|
||||
|
||||
class Condition {
|
||||
//replica value is zero
|
||||
boolean isReplicaZero() {
|
||||
return replica != null && replica.getOperand() == EQUAL &&
|
||||
Preference.compareWithTolerance(0d, (Double) replica.val, 1) == 0;
|
||||
}
|
||||
|
||||
public SealedClause getSealedClause(Function<Condition, Object> computedValueEvaluator) {
|
||||
return this instanceof SealedClause ?
|
||||
(SealedClause) this :
|
||||
new SealedClause(this, computedValueEvaluator);
|
||||
}
|
||||
|
||||
Condition parse(String s, Map m) {
|
||||
Object expectedVal = null;
|
||||
ComputationType computationType = null;
|
||||
Object val = m.get(s);
|
||||
Suggestion.ConditionType varType = Suggestion.getTagType(s);
|
||||
try {
|
||||
String conditionName = s.trim();
|
||||
Operand operand = null;
|
||||
if (val == null) {
|
||||
operand = WILDCARD;
|
||||
expectedVal = Policy.ANY;
|
||||
} else if (val instanceof String) {
|
||||
String strVal = ((String) val).trim();
|
||||
if (Policy.ANY.equals(strVal) || Policy.EACH.equals(strVal)) operand = WILDCARD;
|
||||
else if (strVal.startsWith(NOT_EQUAL.operand)) operand = NOT_EQUAL;
|
||||
else if (strVal.startsWith(GREATER_THAN.operand)) operand = GREATER_THAN;
|
||||
else if (strVal.startsWith(LESS_THAN.operand)) operand = LESS_THAN;
|
||||
else operand = EQUAL;
|
||||
strVal = strVal.substring(EQUAL == operand || WILDCARD == operand ? 0 : 1);
|
||||
for (ComputationType t : ComputationType.values()) {
|
||||
String changedVal = t.match(strVal);
|
||||
if (changedVal != null) {
|
||||
computationType = t;
|
||||
strVal = changedVal;
|
||||
if (varType == null || !varType.supportComputed(computationType)) {
|
||||
throw new IllegalArgumentException(StrUtils.formatString("''{0}'' is not allowed for variable : ''{1}'' , in condition : ''{2}'' ",
|
||||
t, conditionName, Utils.toJSONString(m)));
|
||||
}
|
||||
}
|
||||
}
|
||||
operand = varType == null ? operand : varType.getOperand(operand, strVal, computationType);
|
||||
expectedVal = validate(s, new Condition(s, strVal, operand, computationType, null), true);
|
||||
|
||||
} else if (val instanceof Number) {
|
||||
operand = EQUAL;
|
||||
operand = varType.getOperand(operand, val, null);
|
||||
expectedVal = validate(s, new Condition(s, val, operand, null, null), true);
|
||||
}
|
||||
return new Condition(conditionName, expectedVal, operand, computationType, this);
|
||||
|
||||
} catch (IllegalArgumentException iae) {
|
||||
throw iae;
|
||||
} catch (Exception e) {
|
||||
throw new IllegalArgumentException("Invalid tag : " + s + ":" + val, e);
|
||||
}
|
||||
}
|
||||
|
||||
public List<Violation> test(Policy.Session session) {
|
||||
ComputedValueEvaluator computedValueEvaluator = new ComputedValueEvaluator(session);
|
||||
Suggestion.ViolationCtx ctx = new Suggestion.ViolationCtx(this, session.matrix, computedValueEvaluator);
|
||||
if (isPerCollectiontag()) {
|
||||
Map<String, Map<String, Map<String, ReplicaCount>>> replicaCount = computeReplicaCounts(session.matrix, computedValueEvaluator);
|
||||
for (Map.Entry<String, Map<String, Map<String, ReplicaCount>>> e : replicaCount.entrySet()) {
|
||||
computedValueEvaluator.collName = e.getKey();
|
||||
if (!collection.isPass(computedValueEvaluator.collName)) continue;
|
||||
for (Map.Entry<String, Map<String, ReplicaCount>> shardVsCount : e.getValue().entrySet()) {
|
||||
computedValueEvaluator.shardName = shardVsCount.getKey();
|
||||
if (!shard.isPass(computedValueEvaluator.shardName)) continue;
|
||||
for (Map.Entry<String, ReplicaCount> counts : shardVsCount.getValue().entrySet()) {
|
||||
SealedClause sealedClause = getSealedClause(computedValueEvaluator);
|
||||
if (!sealedClause.replica.isPass(counts.getValue())) {
|
||||
Violation violation = new Violation(sealedClause,
|
||||
computedValueEvaluator.collName,
|
||||
computedValueEvaluator.shardName,
|
||||
tag.name.equals("node") ? counts.getKey() : null,
|
||||
counts.getValue(),
|
||||
sealedClause.getReplica().delta(counts.getValue()),
|
||||
counts.getKey());
|
||||
Suggestion.getTagType(tag.name).addViolatingReplicas(ctx.reset(counts.getKey(), counts.getValue(), violation));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
for (Row r : session.matrix) {
|
||||
SealedClause sealedClause = getSealedClause(computedValueEvaluator);
|
||||
if (!sealedClause.getGlobalTag().isPass(r)) {
|
||||
Suggestion.ConditionType.CORES.addViolatingReplicas(ctx.reset(null, null,
|
||||
new Violation(sealedClause, null, null, r.node, r.getVal(sealedClause.globalTag.name), sealedClause.globalTag.delta(r.getVal(globalTag.name)), null)));
|
||||
}
|
||||
}
|
||||
}
|
||||
return ctx.allViolations;
|
||||
|
||||
}
|
||||
|
||||
private Map<String, Map<String, Map<String, ReplicaCount>>> computeReplicaCounts(List<Row> allRows,
|
||||
ComputedValueEvaluator computedValueEvaluator) {
|
||||
Map<String, Map<String, Map<String, ReplicaCount>>> collVsShardVsTagVsCount = new HashMap<>();
|
||||
for (Row row : allRows) {
|
||||
for (Map.Entry<String, Map<String, List<ReplicaInfo>>> colls : row.collectionVsShardVsReplicas.entrySet()) {
|
||||
String collectionName = colls.getKey();
|
||||
if (!collection.isPass(collectionName)) continue;
|
||||
Map<String, Map<String, ReplicaCount>> collMap = collVsShardVsTagVsCount.computeIfAbsent(collectionName, s -> new HashMap<>());
|
||||
for (Map.Entry<String, List<ReplicaInfo>> shards : colls.getValue().entrySet()) {
|
||||
String shardName = shards.getKey();
|
||||
if (ANY.equals(shard.val)) shardName = ANY;
|
||||
if (!shard.isPass(shardName)) break;
|
||||
Map<String, ReplicaCount> tagVsCount = collMap.computeIfAbsent(shardName, s -> new HashMap<>());
|
||||
Object tagVal = row.getVal(tag.name);
|
||||
computedValueEvaluator.collName = collectionName;
|
||||
computedValueEvaluator.shardName = shardName;
|
||||
SealedClause sealedClause = getSealedClause(computedValueEvaluator);
|
||||
boolean pass = sealedClause.getTag().isPass(tagVal);
|
||||
tagVsCount.computeIfAbsent(pass ? String.valueOf(tagVal) : "", s -> new ReplicaCount());
|
||||
if (pass) {
|
||||
tagVsCount.get(String.valueOf(tagVal)).increment(shards.getValue());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return collVsShardVsTagVsCount;
|
||||
}
|
||||
|
||||
enum ComputationType {
|
||||
PERCENT {
|
||||
@Override
|
||||
public String wrap(String value) {
|
||||
return value + "%";
|
||||
}
|
||||
|
||||
@Override
|
||||
public String match(String val) {
|
||||
if (val != null && !val.isEmpty() && val.charAt(val.length() - 1) == '%') {
|
||||
return val.substring(0, val.length() - 1);
|
||||
} else {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "%";
|
||||
}
|
||||
};
|
||||
|
||||
// return null if there is no match. return a modified string
|
||||
// if there is a match
|
||||
public String match(String val) {
|
||||
return null;
|
||||
}
|
||||
|
||||
public String wrap(String value) {
|
||||
return value;
|
||||
}
|
||||
}
|
||||
|
||||
public static class Condition implements MapWriter {
|
||||
final String name;
|
||||
final Object val;
|
||||
final Suggestion.ConditionType varType;
|
||||
final ComputationType computationType;
|
||||
final Operand op;
|
||||
private Clause clause;
|
||||
|
||||
Condition(String name, Object val, Operand op) {
|
||||
Condition(String name, Object val, Operand op, ComputationType computationType, Clause parent) {
|
||||
this.name = name;
|
||||
this.val = val;
|
||||
this.op = op;
|
||||
varType = Suggestion.getTagType(name);
|
||||
this.computationType = computationType;
|
||||
this.clause = parent;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeMap(EntryWriter ew) throws IOException {
|
||||
String value = op.wrap(val);
|
||||
if (computationType != null) value = computationType.wrap(value);
|
||||
ew.put(name, value);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return jsonStr();
|
||||
}
|
||||
|
||||
public Clause getClause() {
|
||||
return clause;
|
||||
}
|
||||
|
||||
boolean isPass(Object inputVal) {
|
||||
if (inputVal instanceof ReplicaCount) inputVal = ((ReplicaCount) inputVal).getVal(type);
|
||||
if (computationType != null) {
|
||||
throw new IllegalStateException("This is supposed to be called only from a Condition with no computed value or a SealedCondition");
|
||||
|
||||
}
|
||||
if (inputVal instanceof ReplicaCount) inputVal = ((ReplicaCount) inputVal).getVal(getClause().type);
|
||||
if (varType == Suggestion.ConditionType.LAZY) { // we don't know the type
|
||||
return op.match(parseString(val), parseString(inputVal)) == PASS;
|
||||
} else {
|
||||
|
@ -177,7 +416,7 @@ public class Clause implements MapWriter, Comparable<Clause> {
|
|||
|
||||
|
||||
boolean isPass(Row row) {
|
||||
return op.match(val, row.getVal(name)) == PASS;
|
||||
return isPass(row.getVal(name));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -189,18 +428,18 @@ public class Clause implements MapWriter, Comparable<Clause> {
|
|||
return false;
|
||||
}
|
||||
|
||||
public Long delta(Object val) {
|
||||
if (val instanceof ReplicaCount) val = ((ReplicaCount) val).getVal(type);
|
||||
public Double delta(Object val) {
|
||||
if (val instanceof ReplicaCount) val = ((ReplicaCount) val).getVal(getClause().type);
|
||||
if (this.val instanceof String) {
|
||||
if (op == LESS_THAN || op == GREATER_THAN) {
|
||||
return op
|
||||
.opposite(isReplicaZero() && this == tag)
|
||||
.opposite(getClause().isReplicaZero() && this == getClause().tag)
|
||||
.delta(Clause.parseDouble(name, this.val), Clause.parseDouble(name, val));
|
||||
} else {
|
||||
return 0L;
|
||||
return 0d;
|
||||
}
|
||||
} else return op
|
||||
.opposite(isReplicaZero() && this == tag)
|
||||
.opposite(getClause().isReplicaZero() && this == getClause().getTag())
|
||||
.delta(this.val, val);
|
||||
}
|
||||
|
||||
|
@ -217,94 +456,6 @@ public class Clause implements MapWriter, Comparable<Clause> {
|
|||
}
|
||||
}
|
||||
|
||||
Condition parse(String s, Map m) {
|
||||
Object expectedVal = null;
|
||||
Object val = m.get(s);
|
||||
try {
|
||||
String conditionName = s.trim();
|
||||
Operand operand = null;
|
||||
if (val == null) {
|
||||
operand = WILDCARD;
|
||||
expectedVal = Policy.ANY;
|
||||
} else if (val instanceof String) {
|
||||
String strVal = ((String) val).trim();
|
||||
if (Policy.ANY.equals(strVal) || Policy.EACH.equals(strVal)) operand = WILDCARD;
|
||||
else if (strVal.startsWith(NOT_EQUAL.operand)) operand = NOT_EQUAL;
|
||||
else if (strVal.startsWith(GREATER_THAN.operand)) operand = GREATER_THAN;
|
||||
else if (strVal.startsWith(LESS_THAN.operand)) operand = LESS_THAN;
|
||||
else operand = EQUAL;
|
||||
expectedVal = validate(s, strVal.substring(EQUAL == operand || WILDCARD == operand ? 0 : 1), true);
|
||||
} else if (val instanceof Number) {
|
||||
operand = EQUAL;
|
||||
expectedVal = validate(s, val, true);
|
||||
}
|
||||
return new Condition(conditionName, expectedVal, operand);
|
||||
|
||||
} catch (Exception e) {
|
||||
throw new IllegalArgumentException("Invalid tag : " + s + ":" + val, e);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
public List<Violation> test(List<Row> allRows) {
|
||||
Suggestion.ViolationCtx ctx = new Suggestion.ViolationCtx(this, allRows);
|
||||
if (isPerCollectiontag()) {
|
||||
Map<String, Map<String, Map<String, ReplicaCount>>> replicaCount = computeReplicaCounts(allRows);
|
||||
for (Map.Entry<String, Map<String, Map<String, ReplicaCount>>> e : replicaCount.entrySet()) {
|
||||
if (!collection.isPass(e.getKey())) continue;
|
||||
for (Map.Entry<String, Map<String, ReplicaCount>> shardVsCount : e.getValue().entrySet()) {
|
||||
if (!shard.isPass(shardVsCount.getKey())) continue;
|
||||
for (Map.Entry<String, ReplicaCount> counts : shardVsCount.getValue().entrySet()) {
|
||||
if (!replica.isPass(counts.getValue())) {
|
||||
Violation violation = new Violation(this,
|
||||
e.getKey(),
|
||||
shardVsCount.getKey(),
|
||||
tag.name.equals("node") ? counts.getKey() : null,
|
||||
counts.getValue(),
|
||||
replica.delta(counts.getValue()),
|
||||
counts.getKey());
|
||||
Suggestion.getTagType(tag.name).addViolatingReplicas(ctx.reset(counts.getKey(), counts.getValue(), violation));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
for (Row r : allRows) {
|
||||
if (!globalTag.isPass(r)) {
|
||||
Suggestion.ConditionType.CORES.addViolatingReplicas(ctx.reset(null, null,
|
||||
new Violation(this, null, null, r.node, r.getVal(globalTag.name), globalTag.delta(r.getVal(globalTag.name)), null)));
|
||||
}
|
||||
}
|
||||
}
|
||||
return ctx.allViolations;
|
||||
|
||||
}
|
||||
|
||||
|
||||
private Map<String, Map<String, Map<String, ReplicaCount>>> computeReplicaCounts(List<Row> allRows) {
|
||||
Map<String, Map<String, Map<String, ReplicaCount>>> collVsShardVsTagVsCount = new HashMap<>();
|
||||
for (Row row : allRows) {
|
||||
for (Map.Entry<String, Map<String, List<ReplicaInfo>>> colls : row.collectionVsShardVsReplicas.entrySet()) {
|
||||
String collectionName = colls.getKey();
|
||||
if (!collection.isPass(collectionName)) continue;
|
||||
Map<String, Map<String, ReplicaCount>> collMap = collVsShardVsTagVsCount.computeIfAbsent(collectionName, s -> new HashMap<>());
|
||||
for (Map.Entry<String, List<ReplicaInfo>> shards : colls.getValue().entrySet()) {
|
||||
String shardName = shards.getKey();
|
||||
if (ANY.equals(shard.val)) shardName = ANY;
|
||||
if (!shard.isPass(shardName)) break;
|
||||
Map<String, ReplicaCount> tagVsCount = collMap.computeIfAbsent(shardName, s -> new HashMap<>());
|
||||
Object tagVal = row.getVal(tag.name);
|
||||
boolean pass = tag.isPass(tagVal);
|
||||
tagVsCount.computeIfAbsent(pass ? String.valueOf(tagVal) : "", s -> new ReplicaCount());
|
||||
if (pass) {
|
||||
tagVsCount.get(String.valueOf(tagVal)).increment(shards.getValue());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return collVsShardVsTagVsCount;
|
||||
}
|
||||
|
||||
public boolean isStrict() {
|
||||
return strict;
|
||||
}
|
||||
|
@ -323,8 +474,20 @@ public class Clause implements MapWriter, Comparable<Clause> {
|
|||
NOT_APPLICABLE, FAIL, PASS
|
||||
}
|
||||
|
||||
public static String parseString(Object val) {
|
||||
return val == null ? null : String.valueOf(val);
|
||||
public static class ComputedValueEvaluator implements Function<Condition, Object> {
|
||||
final Policy.Session session;
|
||||
String collName = null;
|
||||
String shardName = null;
|
||||
|
||||
public ComputedValueEvaluator(Policy.Session session) {
|
||||
this.session = session;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object apply(Condition computedValue) {
|
||||
return computedValue.varType.computeValue(session, computedValue, collName, shardName);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -389,4 +552,31 @@ public class Clause implements MapWriter, Comparable<Clause> {
|
|||
|
||||
public static final String METRICS_PREFIX = "metrics:";
|
||||
|
||||
static class RangeVal implements MapWriter {
|
||||
final Number min, max, actual;
|
||||
|
||||
RangeVal(Number min, Number max, Number actual) {
|
||||
this.min = min;
|
||||
this.max = max;
|
||||
this.actual = actual;
|
||||
}
|
||||
|
||||
public boolean match(Number testVal) {
|
||||
return Double.compare(testVal.doubleValue(), min.doubleValue()) >= 0 &&
|
||||
Double.compare(testVal.doubleValue(), max.doubleValue()) <= 0;
|
||||
}
|
||||
|
||||
public Double delta(double v) {
|
||||
if (actual != null) return v - actual.doubleValue();
|
||||
if (v >= max.doubleValue()) return v - max.doubleValue();
|
||||
if (v <= min.doubleValue()) return v - min.doubleValue();
|
||||
return 0d;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeMap(EntryWriter ew) throws IOException {
|
||||
ew.put("min", min).put("max", max).putIfNotNull("actual", actual);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -60,7 +60,7 @@ public class MoveReplicaSuggester extends Suggester {
|
|||
Pair<Row, ReplicaInfo> pair = targetRow.session.getNode(fromRow.node).removeReplica(ri.getCollection(), ri.getShard(), ri.getType());//then remove replica from source node
|
||||
if (pair == null) continue;//should not happen
|
||||
Row srcRowModified = pair.first();//this is the final state of the source row and session
|
||||
List<Violation> errs = testChangedMatrix(strict, srcRowModified.session.matrix);
|
||||
List<Violation> errs = testChangedMatrix(strict, srcRowModified.session);
|
||||
srcRowModified.session.applyRules();// now resort the nodes with the new values
|
||||
Policy.Session tmpSession = srcRowModified.session;
|
||||
if (!containsNewErrors(errs) &&
|
||||
|
|
|
@ -35,12 +35,59 @@ public enum Operand {
|
|||
}
|
||||
|
||||
},
|
||||
|
||||
RANGE_EQUAL("", 0) {
|
||||
@Override
|
||||
public TestStatus match(Object ruleVal, Object testVal) {
|
||||
return ((Clause.RangeVal) ruleVal).match((Number) testVal) ? PASS : FAIL;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Double delta(Object expected, Object actual) {
|
||||
return ((Clause.RangeVal) expected).delta(((Number) actual).doubleValue());
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object readRuleValue(Clause.Condition condition) {
|
||||
if (condition.val instanceof String) {
|
||||
String strVal = ((String) condition.val).trim();
|
||||
int hyphenIdx = strVal.indexOf('-');
|
||||
if (hyphenIdx > 0) {
|
||||
String minS = strVal.substring(0, hyphenIdx).trim();
|
||||
String maxS = strVal.substring(hyphenIdx + 1, strVal.length()).trim();
|
||||
return new Clause.RangeVal(
|
||||
(Number) condition.varType.validate(condition.name, minS, true),
|
||||
(Number) condition.varType.validate(condition.name, maxS, true),
|
||||
null
|
||||
);
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
Number num = (Number) condition.varType.validate(condition.name, condition.val, true);
|
||||
return new Clause.RangeVal(Math.floor(num.doubleValue()), Math.ceil(num.doubleValue()), num);
|
||||
}
|
||||
},
|
||||
EQUAL("", 0) {
|
||||
@Override
|
||||
public long _delta(long expected, long actual) {
|
||||
public double _delta(double expected, double actual) {
|
||||
return expected - actual;
|
||||
}
|
||||
},
|
||||
RANGE_NOT_EQUAL("", 2) {
|
||||
@Override
|
||||
public TestStatus match(Object ruleVal, Object testVal) {
|
||||
return ((Clause.RangeVal) ruleVal).match((Number) testVal) ? FAIL : PASS;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object readRuleValue(Clause.Condition condition) {
|
||||
return RANGE_EQUAL.readRuleValue(condition);
|
||||
}
|
||||
|
||||
},
|
||||
NOT_EQUAL("!", 2) {
|
||||
@Override
|
||||
public TestStatus match(Object ruleVal, Object testVal) {
|
||||
|
@ -48,7 +95,7 @@ public enum Operand {
|
|||
}
|
||||
|
||||
@Override
|
||||
public long _delta(long expected, long actual) {
|
||||
public double _delta(double expected, double actual) {
|
||||
return expected - actual;
|
||||
}
|
||||
|
||||
|
@ -59,19 +106,24 @@ public enum Operand {
|
|||
if (testVal == null) return NOT_APPLICABLE;
|
||||
if (ruleVal instanceof String) ruleVal = Clause.parseDouble("", ruleVal);
|
||||
if (ruleVal instanceof Double) {
|
||||
return Double.compare(Clause.parseDouble("", testVal), (Double) ruleVal) == 1 ? PASS : FAIL;
|
||||
return Double.compare(Clause.parseDouble("", testVal), (Double) ruleVal) == -1 ? FAIL : PASS;
|
||||
}
|
||||
return getLong(testVal) > getLong(ruleVal) ? PASS: FAIL ;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String wrap(Object val) {
|
||||
return ">" + (((Number) val).doubleValue() - 1);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Operand opposite(boolean flag) {
|
||||
return flag ? LESS_THAN : GREATER_THAN;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected long _delta(long expected, long actual) {
|
||||
return actual > expected ? 0 : (expected + 1) - actual;
|
||||
protected double _delta(double expected, double actual) {
|
||||
return actual > expected ? 0 : expected - actual;
|
||||
}
|
||||
},
|
||||
LESS_THAN("<", 2) {
|
||||
|
@ -80,14 +132,19 @@ public enum Operand {
|
|||
if (testVal == null) return NOT_APPLICABLE;
|
||||
if (ruleVal instanceof String) ruleVal = Clause.parseDouble("", ruleVal);
|
||||
if (ruleVal instanceof Double) {
|
||||
return Double.compare(Clause.parseDouble("", testVal), (Double) ruleVal) == -1 ? PASS : FAIL;
|
||||
return Double.compare(Clause.parseDouble("", testVal), (Double) ruleVal) == 1 ? FAIL : PASS;
|
||||
}
|
||||
return getLong(testVal) < getLong(ruleVal) ? PASS: FAIL ;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected long _delta(long expected, long actual) {
|
||||
return actual < expected ? 0 : (actual + 1) - expected;
|
||||
public String wrap(Object val) {
|
||||
return "<" + (((Number) val).doubleValue() + 1);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected double _delta(double expected, double actual) {
|
||||
return actual < expected ? 0 : actual - expected;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -118,18 +175,26 @@ public enum Operand {
|
|||
|
||||
}
|
||||
|
||||
public Long delta(Object expected, Object actual) {
|
||||
public Double delta(Object expected, Object actual) {
|
||||
if (expected instanceof Number && actual instanceof Number) {
|
||||
Long expectedL = ((Number) expected).longValue();
|
||||
Long actualL = ((Number) actual).longValue();
|
||||
Double expectedL = ((Number) expected).doubleValue();
|
||||
Double actualL = ((Number) actual).doubleValue();
|
||||
return _delta(expectedL, actualL);
|
||||
} else {
|
||||
return 0L;
|
||||
return 0d;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
protected long _delta(long expected, long actual) {
|
||||
protected double _delta(double expected, double actual) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
public String wrap(Object val) {
|
||||
return operand + val.toString();
|
||||
}
|
||||
|
||||
public Object readRuleValue(Clause.Condition condition) {
|
||||
return condition.val;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -112,7 +112,7 @@ public class Policy implements MapWriter {
|
|||
clusterPreferences.forEach(preference -> paramsOfInterest.add(preference.name.toString()));
|
||||
List<String> newParams = new ArrayList<>(paramsOfInterest);
|
||||
clusterPolicy = ((List<Map<String, Object>>) jsonMap.getOrDefault(CLUSTER_POLICY, emptyList())).stream()
|
||||
.map(Clause::new)
|
||||
.map(Clause::create)
|
||||
.filter(clause -> {
|
||||
clause.addTags(newParams);
|
||||
return true;
|
||||
|
@ -219,144 +219,20 @@ public class Policy implements MapWriter {
|
|||
return getClusterPreferences().equals(policy.getClusterPreferences());
|
||||
}
|
||||
|
||||
/*This stores the logical state of the system, given a policy and
|
||||
* a cluster state.
|
||||
*
|
||||
*/
|
||||
public class Session implements MapWriter {
|
||||
final List<String> nodes;
|
||||
final SolrCloudManager cloudManager;
|
||||
final List<Row> matrix;
|
||||
Set<String> collections = new HashSet<>();
|
||||
List<Clause> expandedClauses;
|
||||
List<Violation> violations = new ArrayList<>();
|
||||
final NodeStateProvider nodeStateProvider;
|
||||
final int znodeVersion;
|
||||
|
||||
private Session(List<String> nodes, SolrCloudManager cloudManager,
|
||||
List<Row> matrix, List<Clause> expandedClauses, int znodeVersion, NodeStateProvider nodeStateProvider) {
|
||||
this.nodes = nodes;
|
||||
this.cloudManager = cloudManager;
|
||||
this.matrix = matrix;
|
||||
this.expandedClauses = expandedClauses;
|
||||
this.znodeVersion = znodeVersion;
|
||||
this.nodeStateProvider = nodeStateProvider;
|
||||
for (Row row : matrix) row.session = this;
|
||||
}
|
||||
|
||||
|
||||
Session(SolrCloudManager cloudManager) {
|
||||
ClusterState state = null;
|
||||
this.nodeStateProvider = cloudManager.getNodeStateProvider();
|
||||
try {
|
||||
state = cloudManager.getClusterStateProvider().getClusterState();
|
||||
LOG.trace("-- session created with cluster state: {}", state);
|
||||
} catch (Exception e) {
|
||||
LOG.trace("-- session created, can't obtain cluster state", e);
|
||||
}
|
||||
this.znodeVersion = state != null ? state.getZNodeVersion() : -1;
|
||||
this.nodes = new ArrayList<>(cloudManager.getClusterStateProvider().getLiveNodes());
|
||||
this.cloudManager = cloudManager;
|
||||
for (String node : nodes) {
|
||||
collections.addAll(nodeStateProvider.getReplicaInfo(node, Collections.emptyList()).keySet());
|
||||
}
|
||||
|
||||
expandedClauses = clusterPolicy.stream()
|
||||
.filter(clause -> !clause.isPerCollectiontag())
|
||||
.collect(Collectors.toList());
|
||||
|
||||
ClusterStateProvider stateProvider = cloudManager.getClusterStateProvider();
|
||||
for (String c : collections) {
|
||||
addClausesForCollection(stateProvider, c);
|
||||
}
|
||||
|
||||
Collections.sort(expandedClauses);
|
||||
|
||||
matrix = new ArrayList<>(nodes.size());
|
||||
for (String node : nodes) matrix.add(new Row(node, params, perReplicaAttributes, this));
|
||||
applyRules();
|
||||
}
|
||||
|
||||
void addClausesForCollection(ClusterStateProvider stateProvider, String c) {
|
||||
String p = stateProvider.getPolicyNameByCollection(c);
|
||||
if (p != null) {
|
||||
List<Clause> perCollPolicy = policies.get(p);
|
||||
if (perCollPolicy == null) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
expandedClauses.addAll(mergePolicies(c, policies.getOrDefault(p, emptyList()), clusterPolicy));
|
||||
}
|
||||
|
||||
Session copy() {
|
||||
return new Session(nodes, cloudManager, getMatrixCopy(), expandedClauses, znodeVersion, nodeStateProvider);
|
||||
}
|
||||
|
||||
public Row getNode(String node) {
|
||||
for (Row row : matrix) if (row.node.equals(node)) return row;
|
||||
return null;
|
||||
}
|
||||
|
||||
List<Row> getMatrixCopy() {
|
||||
return matrix.stream()
|
||||
.map(row -> row.copy(this))
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
|
||||
Policy getPolicy() {
|
||||
return Policy.this;
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Apply the preferences and conditions
|
||||
*/
|
||||
void applyRules() {
|
||||
setApproxValuesAndSortNodes(clusterPreferences, matrix);
|
||||
|
||||
for (Clause clause : expandedClauses) {
|
||||
List<Violation> errs = clause.test(matrix);
|
||||
violations.addAll(errs);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
public List<Violation> getViolations() {
|
||||
return violations;
|
||||
}
|
||||
|
||||
public Suggester getSuggester(CollectionAction action) {
|
||||
Suggester op = ops.get(action).get();
|
||||
if (op == null) throw new UnsupportedOperationException(action.toString() + "is not supported");
|
||||
op._init(this);
|
||||
return op;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeMap(EntryWriter ew) throws IOException {
|
||||
ew.put("znodeVersion", znodeVersion);
|
||||
for (Row row : matrix) {
|
||||
ew.put(row.node, row);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return Utils.toJSONString(toMap(new LinkedHashMap<>()));
|
||||
}
|
||||
|
||||
public List<Row> getSorted() {
|
||||
return Collections.unmodifiableList(matrix);
|
||||
}
|
||||
|
||||
public NodeStateProvider getNodeStateProvider() {
|
||||
return nodeStateProvider;
|
||||
}
|
||||
|
||||
public int indexOf(String node) {
|
||||
for (int i = 0; i < matrix.size(); i++) if (matrix.get(i).node.equals(node)) return i;
|
||||
throw new RuntimeException("NO such node found " + node);
|
||||
}
|
||||
public static Map<String, List<Clause>> policiesFromMap(Map<String, List<Map<String, Object>>> map, List<String> newParams) {
|
||||
Map<String, List<Clause>> newPolicies = new HashMap<>();
|
||||
map.forEach((s, l1) ->
|
||||
newPolicies.put(s, l1.stream()
|
||||
.map(Clause::create)
|
||||
.filter(clause -> {
|
||||
if (!clause.isPerCollectiontag())
|
||||
throw new RuntimeException(clause.getGlobalTag().name + " is only allowed in 'cluster-policy'");
|
||||
clause.addTags(newParams);
|
||||
return true;
|
||||
})
|
||||
.sorted()
|
||||
.collect(collectingAndThen(toList(), Collections::unmodifiableList))));
|
||||
return newPolicies;
|
||||
}
|
||||
|
||||
static void setApproxValuesAndSortNodes(List<Preference> clusterPreferences, List<Row> matrix) {
|
||||
|
@ -408,8 +284,24 @@ public class Policy implements MapWriter {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Insert the collection name into the clauses where collection is not specified
|
||||
*/
|
||||
static List<Clause> insertColl(String coll, Collection<Clause> conditions) {
|
||||
return conditions.stream()
|
||||
.filter(Clause::isPerCollectiontag)
|
||||
.map(clause -> {
|
||||
Map<String, Object> copy = new LinkedHashMap<>(clause.original);
|
||||
if (!copy.containsKey("collection")) copy.put("collection", coll);
|
||||
return Clause.create(copy);
|
||||
})
|
||||
.filter(it -> (it.getCollection().isPass(coll)))
|
||||
.collect(Collectors.toList());
|
||||
|
||||
}
|
||||
|
||||
public Session createSession(SolrCloudManager cloudManager) {
|
||||
return new Session(cloudManager);
|
||||
return createSession(cloudManager, null);
|
||||
}
|
||||
|
||||
public enum SortParam {
|
||||
|
@ -446,21 +338,8 @@ public class Policy implements MapWriter {
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
public static Map<String, List<Clause>> policiesFromMap(Map<String, List<Map<String, Object>>> map, List<String> newParams) {
|
||||
Map<String, List<Clause>> newPolicies = new HashMap<>();
|
||||
map.forEach((s, l1) ->
|
||||
newPolicies.put(s, l1.stream()
|
||||
.map(Clause::new)
|
||||
.filter(clause -> {
|
||||
if (!clause.isPerCollectiontag())
|
||||
throw new RuntimeException(clause.globalTag.name + " is only allowed in 'cluster-policy'");
|
||||
clause.addTags(newParams);
|
||||
return true;
|
||||
})
|
||||
.sorted()
|
||||
.collect(collectingAndThen(toList(), Collections::unmodifiableList))));
|
||||
return newPolicies;
|
||||
private Session createSession(SolrCloudManager cloudManager, Transaction tx) {
|
||||
return new Session(cloudManager, tx);
|
||||
}
|
||||
|
||||
public static List<Clause> mergePolicies(String coll,
|
||||
|
@ -475,20 +354,51 @@ public class Policy implements MapWriter {
|
|||
return merged;
|
||||
}
|
||||
|
||||
/**
|
||||
* Insert the collection name into the clauses where collection is not specified
|
||||
*/
|
||||
static List<Clause> insertColl(String coll, Collection<Clause> conditions) {
|
||||
return conditions.stream()
|
||||
.filter(Clause::isPerCollectiontag)
|
||||
.map(clause -> {
|
||||
Map<String, Object> copy = new LinkedHashMap<>(clause.original);
|
||||
if (!copy.containsKey("collection")) copy.put("collection", coll);
|
||||
return new Clause(copy);
|
||||
})
|
||||
.filter(it -> (it.collection.isPass(coll)))
|
||||
.collect(Collectors.toList());
|
||||
static class Transaction {
|
||||
private final Policy policy;
|
||||
private boolean open = false;
|
||||
private Session firstSession;
|
||||
private Session currentSession;
|
||||
|
||||
|
||||
public Transaction(Policy config) {
|
||||
this.policy = config;
|
||||
|
||||
}
|
||||
|
||||
public Session open(SolrCloudManager cloudManager) {
|
||||
firstSession = currentSession = policy.createSession(cloudManager, Transaction.this);
|
||||
open = true;
|
||||
return firstSession;
|
||||
}
|
||||
|
||||
|
||||
public boolean isOpen() {
|
||||
return open;
|
||||
}
|
||||
|
||||
List<Violation> close() {
|
||||
if (!open) throw new RuntimeException("Already closed");
|
||||
open = false;
|
||||
return currentSession.getViolations();
|
||||
}
|
||||
|
||||
public boolean undo() {
|
||||
if (currentSession.parent != null) {
|
||||
currentSession = currentSession.parent;
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
|
||||
public Session getCurrentSession() {
|
||||
return currentSession;
|
||||
}
|
||||
|
||||
void updateSession(Session session) {
|
||||
currentSession = session;
|
||||
}
|
||||
}
|
||||
|
||||
private static final Map<CollectionAction, Supplier<Suggester>> ops = new HashMap<>();
|
||||
|
@ -528,4 +438,150 @@ public class Policy implements MapWriter {
|
|||
public String toString() {
|
||||
return Utils.toJSONString(this);
|
||||
}
|
||||
|
||||
/*This stores the logical state of the system, given a policy and
|
||||
* a cluster state.
|
||||
*
|
||||
*/
|
||||
public class Session implements MapWriter {
|
||||
final List<String> nodes;
|
||||
final SolrCloudManager cloudManager;
|
||||
final List<Row> matrix;
|
||||
final NodeStateProvider nodeStateProvider;
|
||||
final int znodeVersion;
|
||||
Set<String> collections = new HashSet<>();
|
||||
List<Clause> expandedClauses;
|
||||
List<Violation> violations = new ArrayList<>();
|
||||
Transaction transaction;
|
||||
private Session parent = null;
|
||||
|
||||
private Session(List<String> nodes, SolrCloudManager cloudManager,
|
||||
List<Row> matrix, List<Clause> expandedClauses, int znodeVersion,
|
||||
NodeStateProvider nodeStateProvider, Transaction transaction, Session parent) {
|
||||
this.parent = parent;
|
||||
this.transaction = transaction;
|
||||
this.nodes = nodes;
|
||||
this.cloudManager = cloudManager;
|
||||
this.matrix = matrix;
|
||||
this.expandedClauses = expandedClauses;
|
||||
this.znodeVersion = znodeVersion;
|
||||
this.nodeStateProvider = nodeStateProvider;
|
||||
for (Row row : matrix) row.session = this;
|
||||
}
|
||||
|
||||
|
||||
Session(SolrCloudManager cloudManager, Transaction transaction) {
|
||||
this.transaction = transaction;
|
||||
ClusterState state = null;
|
||||
this.nodeStateProvider = cloudManager.getNodeStateProvider();
|
||||
try {
|
||||
state = cloudManager.getClusterStateProvider().getClusterState();
|
||||
LOG.trace("-- session created with cluster state: {}", state);
|
||||
} catch (Exception e) {
|
||||
LOG.trace("-- session created, can't obtain cluster state", e);
|
||||
}
|
||||
this.znodeVersion = state != null ? state.getZNodeVersion() : -1;
|
||||
this.nodes = new ArrayList<>(cloudManager.getClusterStateProvider().getLiveNodes());
|
||||
this.cloudManager = cloudManager;
|
||||
for (String node : nodes) {
|
||||
collections.addAll(nodeStateProvider.getReplicaInfo(node, Collections.emptyList()).keySet());
|
||||
}
|
||||
|
||||
expandedClauses = clusterPolicy.stream()
|
||||
.filter(clause -> !clause.isPerCollectiontag())
|
||||
.collect(Collectors.toList());
|
||||
|
||||
ClusterStateProvider stateProvider = cloudManager.getClusterStateProvider();
|
||||
for (String c : collections) {
|
||||
addClausesForCollection(stateProvider, c);
|
||||
}
|
||||
|
||||
Collections.sort(expandedClauses);
|
||||
|
||||
matrix = new ArrayList<>(nodes.size());
|
||||
for (String node : nodes) matrix.add(new Row(node, params, perReplicaAttributes, this));
|
||||
applyRules();
|
||||
}
|
||||
|
||||
void addClausesForCollection(ClusterStateProvider stateProvider, String c) {
|
||||
String p = stateProvider.getPolicyNameByCollection(c);
|
||||
if (p != null) {
|
||||
List<Clause> perCollPolicy = policies.get(p);
|
||||
if (perCollPolicy == null) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
expandedClauses.addAll(mergePolicies(c, policies.getOrDefault(p, emptyList()), clusterPolicy));
|
||||
}
|
||||
|
||||
Session copy() {
|
||||
return new Session(nodes, cloudManager, getMatrixCopy(), expandedClauses, znodeVersion, nodeStateProvider, transaction, this);
|
||||
}
|
||||
|
||||
public Row getNode(String node) {
|
||||
for (Row row : matrix) if (row.node.equals(node)) return row;
|
||||
return null;
|
||||
}
|
||||
|
||||
List<Row> getMatrixCopy() {
|
||||
return matrix.stream()
|
||||
.map(row -> row.copy(this))
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
|
||||
Policy getPolicy() {
|
||||
return Policy.this;
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Apply the preferences and conditions
|
||||
*/
|
||||
void applyRules() {
|
||||
setApproxValuesAndSortNodes(clusterPreferences, matrix);
|
||||
|
||||
for (Clause clause : expandedClauses) {
|
||||
List<Violation> errs = clause.test(this);
|
||||
violations.addAll(errs);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
public List<Violation> getViolations() {
|
||||
return violations;
|
||||
}
|
||||
|
||||
public Suggester getSuggester(CollectionAction action) {
|
||||
Suggester op = ops.get(action).get();
|
||||
if (op == null) throw new UnsupportedOperationException(action.toString() + "is not supported");
|
||||
op._init(this);
|
||||
return op;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeMap(EntryWriter ew) throws IOException {
|
||||
ew.put("znodeVersion", znodeVersion);
|
||||
for (Row row : matrix) {
|
||||
ew.put(row.node, row);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return Utils.toJSONString(toMap(new LinkedHashMap<>()));
|
||||
}
|
||||
|
||||
public List<Row> getSorted() {
|
||||
return Collections.unmodifiableList(matrix);
|
||||
}
|
||||
|
||||
public NodeStateProvider getNodeStateProvider() {
|
||||
return nodeStateProvider;
|
||||
}
|
||||
|
||||
public int indexOf(String node) {
|
||||
for (int i = 0; i < matrix.size(); i++) if (matrix.get(i).node.equals(node)) return i;
|
||||
throw new RuntimeException("NO such node found " + node);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -77,7 +77,7 @@ public class Preference implements MapWriter {
|
|||
next.compare(r1, r2, useApprox)) : sort.sortval * result;
|
||||
}
|
||||
|
||||
private int compareWithTolerance(Double o1, Double o2, int percentage) {
|
||||
static int compareWithTolerance(Double o1, Double o2, int percentage) {
|
||||
if (percentage == 0) return o1.compareTo(o2);
|
||||
if (o1.equals(o2)) return 0;
|
||||
double delta = Math.abs(o1 - o2);
|
||||
|
|
|
@ -0,0 +1,29 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.solr.client.solrj.cloud.autoscaling;
|
||||
|
||||
import java.util.function.Function;
|
||||
|
||||
/**
|
||||
* This clause is an instance with no conditions with computed value
|
||||
*/
|
||||
public class SealedClause extends Clause {
|
||||
SealedClause(Clause clause, Function<Condition, Object> computedValueEvaluator) {
|
||||
super(clause, computedValueEvaluator);
|
||||
}
|
||||
}
|
|
@ -129,6 +129,9 @@ public abstract class Suggester implements MapWriter {
|
|||
this.operation = init();
|
||||
isInitialized = true;
|
||||
}
|
||||
if (operation != null && session.transaction != null && session.transaction.isOpen()) {
|
||||
session.transaction.updateSession(session);
|
||||
}
|
||||
return operation;
|
||||
}
|
||||
|
||||
|
@ -178,9 +181,15 @@ public abstract class Suggester implements MapWriter {
|
|||
for (int i = 0; i < fresh.size(); i++) {
|
||||
Violation freshViolation = fresh.get(i);
|
||||
Violation oldViolation = null;
|
||||
for (Violation v : old) {
|
||||
for (Violation v : old) {//look for exactly same clause being violated
|
||||
if (v.equals(freshViolation)) oldViolation = v;
|
||||
}
|
||||
if (oldViolation == null) {//if no match, look for similar violation
|
||||
for (Violation v : old) {
|
||||
if (v.isSimilarViolation(freshViolation)) oldViolation = v;
|
||||
}
|
||||
}
|
||||
|
||||
if (oldViolation != null && freshViolation.isLessSerious(oldViolation)) return true;
|
||||
}
|
||||
}
|
||||
|
@ -188,7 +197,10 @@ public abstract class Suggester implements MapWriter {
|
|||
}
|
||||
|
||||
boolean containsNewErrors(List<Violation> violations) {
|
||||
boolean isTxOpen = session.transaction != null && session.transaction.isOpen();
|
||||
for (Violation v : violations) {
|
||||
//the computed value can change over time. So it's better to evaluate it in the end
|
||||
if (isTxOpen && v.getClause().hasComputedValue) continue;
|
||||
int idx = originalViolations.indexOf(v);
|
||||
if (idx < 0 /*|| originalViolations.get(idx).isLessSerious(v)*/) return true;
|
||||
}
|
||||
|
@ -221,12 +233,12 @@ public abstract class Suggester implements MapWriter {
|
|||
}
|
||||
}
|
||||
|
||||
List<Violation> testChangedMatrix(boolean strict, List<Row> rows) {
|
||||
Policy.setApproxValuesAndSortNodes(session.getPolicy().clusterPreferences, rows);
|
||||
List<Violation> testChangedMatrix(boolean strict, Policy.Session session) {
|
||||
Policy.setApproxValuesAndSortNodes(session.getPolicy().clusterPreferences, session.matrix);
|
||||
List<Violation> errors = new ArrayList<>();
|
||||
for (Clause clause : session.expandedClauses) {
|
||||
if (strict || clause.strict) {
|
||||
List<Violation> errs = clause.test(rows);
|
||||
List<Violation> errs = clause.test(session);
|
||||
if (!errs.isEmpty()) {
|
||||
errors.addAll(errs);
|
||||
}
|
||||
|
|
|
@ -26,6 +26,8 @@ import java.util.HashSet;
|
|||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.function.Function;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
|
@ -52,29 +54,25 @@ public class Suggestion {
|
|||
return info;
|
||||
}
|
||||
|
||||
static class ViolationCtx {
|
||||
String tagKey;
|
||||
Clause clause;
|
||||
ReplicaCount count;
|
||||
|
||||
Violation currentViolation;
|
||||
List<Row> allRows;
|
||||
|
||||
List<Violation> allViolations = new ArrayList<>();
|
||||
|
||||
public ViolationCtx(Clause clause, List<Row> allRows) {
|
||||
this.allRows = allRows;
|
||||
this.clause = clause;
|
||||
private static Object getOperandAdjustedValue(Object val, Object original) {
|
||||
if (original instanceof Clause.Condition) {
|
||||
Clause.Condition condition = (Clause.Condition) original;
|
||||
if (condition.computationType == null && isIntegerEquivalent(val)) {
|
||||
if (condition.op == Operand.LESS_THAN) {
|
||||
//replica : '<3'
|
||||
val = val instanceof Long ?
|
||||
(Long) val - 1 :
|
||||
(Double) val - 1;
|
||||
} else if (condition.op == Operand.GREATER_THAN) {
|
||||
//replica : '>4'
|
||||
val = val instanceof Long ?
|
||||
(Long) val + 1 :
|
||||
(Double) val + 1;
|
||||
}
|
||||
|
||||
public ViolationCtx reset(String tagKey, ReplicaCount count, Violation currentViolation) {
|
||||
this.tagKey = tagKey;
|
||||
this.count = count;
|
||||
this.currentViolation = currentViolation;
|
||||
allViolations.add(currentViolation);
|
||||
return this;
|
||||
}
|
||||
}
|
||||
return val;
|
||||
}
|
||||
|
||||
|
||||
static class SuggestionCtx {
|
||||
|
@ -103,6 +101,23 @@ public class Suggestion {
|
|||
}
|
||||
}
|
||||
|
||||
static boolean isIntegerEquivalent(Object val) {
|
||||
if (val instanceof Number) {
|
||||
Number number = (Number) val;
|
||||
return Math.ceil(number.doubleValue()) == Math.floor(number.doubleValue());
|
||||
} else if (val instanceof String) {
|
||||
try {
|
||||
double dval = Double.parseDouble((String) val);
|
||||
return Math.ceil(dval) == Math.floor(dval);
|
||||
} catch (NumberFormatException e) {
|
||||
return false;
|
||||
}
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
public static final Map<String, String> tagVsPerReplicaVal = Stream.of(ConditionType.values())
|
||||
.filter(tag -> tag.perReplicaValue != null)
|
||||
|
@ -115,7 +130,63 @@ public class Suggestion {
|
|||
|
||||
COLL("collection", String.class, null, null, null),
|
||||
SHARD("shard", String.class, null, null, null),
|
||||
REPLICA("replica", Long.class, null, 0L, null),
|
||||
REPLICA("replica", Double.class, null, 0L, null) {
|
||||
@Override
|
||||
public Object validate(String name, Object val, boolean isRuleVal) {
|
||||
return getOperandAdjustedValue(super.validate(name, val, isRuleVal), val);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Operand getOperand(Operand expected, Object strVal, Clause.ComputationType computationType) {
|
||||
if (strVal instanceof String) {
|
||||
String s = ((String) strVal).trim();
|
||||
int hyphenIdx = s.indexOf('-');
|
||||
if (hyphenIdx > 0) {
|
||||
if (hyphenIdx == s.length() - 1) {
|
||||
throw new IllegalArgumentException("bad range input :" + expected);
|
||||
}
|
||||
if (expected == Operand.EQUAL) return Operand.RANGE_EQUAL;
|
||||
if (expected == Operand.NOT_EQUAL) return Operand.RANGE_NOT_EQUAL;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
if (expected == Operand.EQUAL && (computationType != null || !isIntegerEquivalent(strVal))) {
|
||||
return Operand.RANGE_EQUAL;
|
||||
}
|
||||
if (expected == Operand.NOT_EQUAL && (computationType != null || !isIntegerEquivalent(strVal)))
|
||||
return Operand.RANGE_NOT_EQUAL;
|
||||
|
||||
return expected;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean supportComputed(Clause.ComputationType computedType) {
|
||||
return computedType == Clause.ComputationType.PERCENT;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object computeValue(Policy.Session session, Clause.Condition cv, String collection, String shard) {
|
||||
if (cv.computationType == Clause.ComputationType.PERCENT) {
|
||||
AtomicInteger totalReplicasOfInterest = new AtomicInteger(0);
|
||||
Clause clause = cv.getClause();
|
||||
for (Row row : session.matrix) {
|
||||
row.forEachReplica(replicaInfo -> {
|
||||
if (replicaInfo.getCollection().equals(collection)) {
|
||||
if (clause.getShard().op == Operand.WILDCARD || replicaInfo.getShard().equals(shard)) {
|
||||
totalReplicasOfInterest.incrementAndGet();
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
return totalReplicasOfInterest.doubleValue() * Clause.parseDouble(cv.name, cv.val).doubleValue() / 100;
|
||||
} else {
|
||||
throw new RuntimeException("Unsupported type " + cv.computationType);
|
||||
|
||||
}
|
||||
}
|
||||
},
|
||||
PORT(ImplicitSnitch.PORT, Long.class, null, 1L, 65535L),
|
||||
IP_1("ip_1", Long.class, null, 0L, 255L),
|
||||
IP_2("ip_2", Long.class, null, 0L, 255L),
|
||||
|
@ -133,9 +204,10 @@ public class Suggestion {
|
|||
|
||||
@Override
|
||||
public int compareViolation(Violation v1, Violation v2) {
|
||||
return Long.compare(
|
||||
v1.getViolatingReplicas().stream().mapToLong(v -> v.delta == null? 0 :v.delta).max().orElse(0L),
|
||||
v2.getViolatingReplicas().stream().mapToLong(v3 -> v3.delta == null? 0 : v3.delta).max().orElse(0L));
|
||||
//TODO use tolerance compare
|
||||
return Double.compare(
|
||||
v1.getViolatingReplicas().stream().mapToDouble(v -> v.delta == null ? 0 : v.delta).max().orElse(0d),
|
||||
v2.getViolatingReplicas().stream().mapToDouble(v3 -> v3.delta == null ? 0 : v3.delta).max().orElse(0d));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -162,9 +234,9 @@ public class Suggestion {
|
|||
if (s1 != null && s2 != null) return s1.compareTo(s2);
|
||||
return 0;
|
||||
});
|
||||
long currentDelta = ctx.violation.getClause().tag.delta(node.getVal(ImplicitSnitch.DISK));
|
||||
double currentDelta = ctx.violation.getClause().tag.delta(node.getVal(ImplicitSnitch.DISK));
|
||||
for (ReplicaInfo replica : replicas) {
|
||||
if (currentDelta <= 0) break;
|
||||
if (currentDelta < 1) break;
|
||||
if (replica.getVariables().get(ConditionType.CORE_IDX.tagName) == null) continue;
|
||||
Suggester suggester = ctx.session.getSuggester(MOVEREPLICA)
|
||||
.hint(Suggester.Hint.COLL_SHARD, new Pair<>(replica.getCollection(), replica.getShard()))
|
||||
|
@ -214,12 +286,18 @@ public class Suggestion {
|
|||
},
|
||||
NODE_ROLE(ImplicitSnitch.NODEROLE, String.class, Collections.singleton("overseer"), null, null),
|
||||
CORES(ImplicitSnitch.CORES, Long.class, null, 0L, Long.MAX_VALUE) {
|
||||
@Override
|
||||
public Object validate(String name, Object val, boolean isRuleVal) {
|
||||
return getOperandAdjustedValue(super.validate(name, val, isRuleVal), val);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void addViolatingReplicas(ViolationCtx ctx) {
|
||||
for (Row r : ctx.allRows) {
|
||||
if (!ctx.clause.tag.isPass(r)) {
|
||||
r.forEachReplica(replicaInfo -> ctx.currentViolation
|
||||
.addReplica(new ReplicaInfoAndErr(replicaInfo).withDelta(ctx.clause.tag.delta(r.getVal(ImplicitSnitch.CORES)))));
|
||||
.addReplica(new ReplicaInfoAndErr(replicaInfo)
|
||||
.withDelta(ctx.clause.tag.delta(r.getVal(ImplicitSnitch.CORES)))));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -330,21 +408,30 @@ public class Suggestion {
|
|||
}
|
||||
}
|
||||
|
||||
public Operand getOperand(Operand expected, Object val, Clause.ComputationType computationType) {
|
||||
return expected;
|
||||
}
|
||||
|
||||
|
||||
public Object convertVal(Object val) {
|
||||
return val;
|
||||
}
|
||||
|
||||
public Object validate(String name, Object val, boolean isRuleVal) {
|
||||
if (val instanceof Clause.Condition) {
|
||||
Clause.Condition condition = (Clause.Condition) val;
|
||||
val = condition.op.readRuleValue(condition);
|
||||
if (val != condition.val) return val;
|
||||
}
|
||||
if (name == null) name = this.tagName;
|
||||
if (type == Double.class) {
|
||||
Double num = Clause.parseDouble(name, val);
|
||||
if (isRuleVal) {
|
||||
if (min != null)
|
||||
if (Double.compare(num, (Double) min) == -1)
|
||||
if (Double.compare(num, min.doubleValue()) == -1)
|
||||
throw new RuntimeException(name + ": " + val + " must be greater than " + min);
|
||||
if (max != null)
|
||||
if (Double.compare(num, (Double) max) == 1)
|
||||
if (Double.compare(num, max.doubleValue()) == 1)
|
||||
throw new RuntimeException(name + ": " + val + " must be less than " + max);
|
||||
}
|
||||
return num;
|
||||
|
@ -383,6 +470,39 @@ public class Suggestion {
|
|||
if (Math.abs(v1.replicaCountDelta) == Math.abs(v2.replicaCountDelta)) return 0;
|
||||
return Math.abs(v1.replicaCountDelta) < Math.abs(v2.replicaCountDelta) ? -1 : 1;
|
||||
}
|
||||
|
||||
public boolean supportComputed(Clause.ComputationType computedType) {
|
||||
return false;
|
||||
}
|
||||
|
||||
public Object computeValue(Policy.Session session, Clause.Condition condition, String collection, String shard) {
|
||||
return condition.val;
|
||||
}
|
||||
}
|
||||
|
||||
static class ViolationCtx {
|
||||
final Function<Clause.Condition, Object> evaluator;
|
||||
String tagKey;
|
||||
Clause clause;
|
||||
ReplicaCount count;
|
||||
Violation currentViolation;
|
||||
List<Row> allRows;
|
||||
List<Violation> allViolations = new ArrayList<>();
|
||||
|
||||
public ViolationCtx(Clause clause, List<Row> allRows, Function<Clause.Condition, Object> evaluator) {
|
||||
this.allRows = allRows;
|
||||
this.clause = clause;
|
||||
this.evaluator = evaluator;
|
||||
}
|
||||
|
||||
public ViolationCtx reset(String tagKey, ReplicaCount count, Violation currentViolation) {
|
||||
this.tagKey = tagKey;
|
||||
this.count = count;
|
||||
this.currentViolation = currentViolation;
|
||||
allViolations.add(currentViolation);
|
||||
this.clause = currentViolation.getClause();
|
||||
return this;
|
||||
}
|
||||
}
|
||||
|
||||
private static void perNodeSuggestions(SuggestionCtx ctx) {
|
||||
|
|
|
@ -30,13 +30,13 @@ import org.apache.solr.common.util.Utils;
|
|||
public class Violation implements MapWriter {
|
||||
final String shard, coll, node;
|
||||
final Object actualVal;
|
||||
final Long replicaCountDelta;//how far is the actual value from the expected value
|
||||
final Double replicaCountDelta;//how far is the actual value from the expected value
|
||||
final Object tagKey;
|
||||
private final int hash;
|
||||
private final Clause clause;
|
||||
private List<ReplicaInfoAndErr> replicaInfoAndErrs = new ArrayList<>();
|
||||
|
||||
Violation(Clause clause, String coll, String shard, String node, Object actualVal, Long replicaCountDelta, Object tagKey) {
|
||||
Violation(SealedClause clause, String coll, String shard, String node, Object actualVal, Double replicaCountDelta, Object tagKey) {
|
||||
this.clause = clause;
|
||||
this.shard = shard;
|
||||
this.coll = coll;
|
||||
|
@ -61,19 +61,60 @@ public class Violation implements MapWriter {
|
|||
}
|
||||
|
||||
public boolean matchShard(String shard) {
|
||||
if (getClause().shard.op == Operand.WILDCARD) return true;
|
||||
if (getClause().getShard().op == Operand.WILDCARD) return true;
|
||||
return this.shard == null || this.shard.equals(shard);
|
||||
}
|
||||
|
||||
//if the delta is lower , this violation is less serious
|
||||
public boolean isLessSerious(Violation that) {
|
||||
return this.getClause().getTag().varType.compareViolation(this, that) < 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return hash;
|
||||
}
|
||||
|
||||
public boolean isSimilarViolation(Violation that) {
|
||||
if (Objects.equals(this.shard, that.shard) &&
|
||||
Objects.equals(this.coll, that.coll) &&
|
||||
Objects.equals(this.node, that.node)) {
|
||||
if (this.clause.isPerCollectiontag() && that.clause.isPerCollectiontag()) {
|
||||
return Objects.equals(this.clause.tag.getName(), that.clause.tag.getName());
|
||||
} else if (!this.clause.isPerCollectiontag() && !that.clause.isPerCollectiontag()) {
|
||||
return Objects.equals(this.clause.globalTag.getName(), that.clause.globalTag.getName())
|
||||
&& Objects.equals(this.node, that.node);
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object that) {
|
||||
if (that instanceof Violation) {
|
||||
Violation v = (Violation) that;
|
||||
return Objects.equals(this.shard, v.shard) &&
|
||||
Objects.equals(this.coll, v.coll) &&
|
||||
// Objects.equals(this.node, v.node) &&
|
||||
Objects.equals(this.clause, v.clause)
|
||||
;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
static class ReplicaInfoAndErr implements MapWriter{
|
||||
final ReplicaInfo replicaInfo;
|
||||
Double delta;
|
||||
|
||||
ReplicaInfoAndErr(ReplicaInfo replicaInfo) {
|
||||
this.replicaInfo = replicaInfo;
|
||||
}
|
||||
Long delta;
|
||||
|
||||
public ReplicaInfoAndErr withDelta(Long delta) {
|
||||
public ReplicaInfoAndErr withDelta(Double delta) {
|
||||
this.delta = delta;
|
||||
return this;
|
||||
}
|
||||
|
@ -85,28 +126,6 @@ public class Violation implements MapWriter {
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return hash;
|
||||
}
|
||||
//if the delta is lower , this violation is less serious
|
||||
public boolean isLessSerious(Violation that) {
|
||||
return this.getClause().tag.varType.compareViolation(this,that) <0 ;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object that) {
|
||||
if (that instanceof Violation) {
|
||||
Violation v = (Violation) that;
|
||||
return Objects.equals(this.shard, v.shard) &&
|
||||
Objects.equals(this.coll, v.coll) &&
|
||||
Objects.equals(this.node, v.node) &&
|
||||
Objects.equals(this.clause, v.clause)
|
||||
;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return Utils.toJSONString(Utils.getDeepCopy(toMap(new LinkedHashMap<>()), 5));
|
||||
|
|
|
@ -31,6 +31,7 @@ import java.util.LinkedHashMap;
|
|||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
import com.google.common.collect.ImmutableList;
|
||||
|
@ -41,6 +42,7 @@ import org.apache.solr.client.solrj.cloud.DistribStateManager;
|
|||
import org.apache.solr.client.solrj.cloud.DistributedQueueFactory;
|
||||
import org.apache.solr.client.solrj.cloud.NodeStateProvider;
|
||||
import org.apache.solr.client.solrj.cloud.SolrCloudManager;
|
||||
import org.apache.solr.client.solrj.cloud.autoscaling.Clause.RangeVal;
|
||||
import org.apache.solr.client.solrj.cloud.autoscaling.Suggester.Hint;
|
||||
import org.apache.solr.client.solrj.impl.ClusterStateProvider;
|
||||
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
|
||||
|
@ -66,6 +68,7 @@ import org.slf4j.Logger;
|
|||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import static org.apache.solr.client.solrj.cloud.autoscaling.Suggestion.ConditionType.FREEDISK;
|
||||
import static org.apache.solr.client.solrj.cloud.autoscaling.Suggestion.ConditionType.REPLICA;
|
||||
import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDREPLICA;
|
||||
import static org.apache.solr.common.params.CollectionParams.CollectionAction.MOVEREPLICA;
|
||||
|
||||
|
@ -160,7 +163,7 @@ public class TestPolicy extends SolrTestCaseJ4 {
|
|||
public void testValidate() {
|
||||
expectError("replica", -1, "must be greater than");
|
||||
expectError("replica", "hello", "not a valid number");
|
||||
assertEquals(1L, Clause.validate("replica", "1", true));
|
||||
assertEquals(1d, Clause.validate("replica", "1", true));
|
||||
assertEquals("c", Clause.validate("collection", "c", true));
|
||||
assertEquals("s", Clause.validate("shard", "s", true));
|
||||
assertEquals("overseer", Clause.validate("nodeRole", "overseer", true));
|
||||
|
@ -195,6 +198,63 @@ public class TestPolicy extends SolrTestCaseJ4 {
|
|||
|
||||
expectError("cores", "-1", "must be greater than");
|
||||
|
||||
assertEquals(Operand.EQUAL, REPLICA.getOperand(Operand.EQUAL, "2.0", null));
|
||||
assertEquals(Operand.NOT_EQUAL, REPLICA.getOperand(Operand.NOT_EQUAL, "2.0", null));
|
||||
assertEquals(Operand.EQUAL, REPLICA.getOperand(Operand.EQUAL, "2", null));
|
||||
assertEquals(Operand.NOT_EQUAL, REPLICA.getOperand(Operand.NOT_EQUAL, "2", null));
|
||||
assertEquals(Operand.RANGE_EQUAL, REPLICA.getOperand(Operand.EQUAL, "2.1", null));
|
||||
assertEquals(Operand.RANGE_NOT_EQUAL, REPLICA.getOperand(Operand.NOT_EQUAL, "2.1", null));
|
||||
assertEquals(Operand.RANGE_EQUAL, REPLICA.getOperand(Operand.EQUAL, "2.01", null));
|
||||
assertEquals(Operand.RANGE_NOT_EQUAL, REPLICA.getOperand(Operand.NOT_EQUAL, "2.01", null));
|
||||
|
||||
Clause clause = Clause.create((Map<String, Object>) Utils.fromJSONString("{replica: '1.23', node:'#ANY'}"));
|
||||
assertTrue(clause.getReplica().isPass(2));
|
||||
assertTrue(clause.getReplica().isPass(1));
|
||||
assertFalse(clause.getReplica().isPass(0));
|
||||
assertFalse(clause.getReplica().isPass(3));
|
||||
|
||||
clause = Clause.create((Map<String, Object>) Utils.fromJSONString("{replica: '<1.23', node:'#ANY'}"));
|
||||
assertTrue(clause.getReplica().isPass(1));
|
||||
assertTrue(clause.getReplica().isPass(0));
|
||||
assertFalse(clause.getReplica().isPass(2));
|
||||
|
||||
clause = Clause.create((Map<String, Object>) Utils.fromJSONString("{replica: '!1.23', node:'#ANY'}"));
|
||||
assertFalse(clause.getReplica().isPass(2));
|
||||
assertFalse(clause.getReplica().isPass(1));
|
||||
assertTrue(clause.getReplica().isPass(0));
|
||||
assertTrue(clause.getReplica().isPass(3));
|
||||
|
||||
clause = Clause.create((Map<String, Object>) Utils.fromJSONString("{replica: 1.23, node:'#ANY'}"));
|
||||
assertTrue(clause.getReplica().isPass(2));
|
||||
assertTrue(clause.getReplica().isPass(1));
|
||||
assertFalse(clause.getReplica().isPass(0));
|
||||
assertFalse(clause.getReplica().isPass(3));
|
||||
|
||||
clause = Clause.create((Map<String, Object>) Utils.fromJSONString("{replica: '33%', node:'#ANY'}"));
|
||||
assertEquals(Operand.RANGE_EQUAL, clause.getReplica().op);
|
||||
clause = clause.getSealedClause(condition -> {
|
||||
if (condition.name.equals("replica")) {
|
||||
return 2.0d;
|
||||
}
|
||||
throw new RuntimeException("");
|
||||
});
|
||||
assertTrue( clause.getReplica().isPass(2));
|
||||
|
||||
clause = Clause.create((Map<String, Object>) Utils.fromJSONString("{replica: '3 - 5', node:'#ANY'}"));
|
||||
assertEquals(Operand.RANGE_EQUAL, clause.getReplica().getOperand());
|
||||
RangeVal range = (RangeVal) clause.getReplica().getValue();
|
||||
assertEquals(3.0 , range.min);
|
||||
assertEquals(5.0 , range.max);
|
||||
assertTrue(clause.replica.isPass(3));
|
||||
assertTrue(clause.replica.isPass(4));
|
||||
assertTrue(clause.replica.isPass(5));
|
||||
assertFalse(clause.replica.isPass(6));
|
||||
assertFalse(clause.replica.isPass(2));
|
||||
|
||||
assertEquals(new Double(1.0), clause.replica.delta(6));
|
||||
assertEquals(new Double(-1.0), clause.replica.delta(2));
|
||||
assertEquals(new Double(0.0), clause.replica.delta(4));
|
||||
|
||||
|
||||
}
|
||||
|
||||
|
@ -209,39 +269,50 @@ public class TestPolicy extends SolrTestCaseJ4 {
|
|||
}
|
||||
|
||||
public void testOperands() {
|
||||
Clause c = new Clause((Map<String, Object>) Utils.fromJSONString("{replica:'<2', node:'#ANY'}"));
|
||||
Clause c = Clause.create((Map<String, Object>) Utils.fromJSONString("{replica:'<2', node:'#ANY'}"));
|
||||
assertFalse(c.replica.isPass(3));
|
||||
assertFalse(c.replica.isPass(2));
|
||||
assertTrue(c.replica.isPass(1));
|
||||
assertEquals("{\"replica\":\"<2.0\"}", c.replica.toString());
|
||||
|
||||
c = new Clause((Map<String, Object>) Utils.fromJSONString("{replica:'>2', node:'#ANY'}"));
|
||||
c = Clause.create((Map<String, Object>) Utils.fromJSONString("{replica:'>2', node:'#ANY'}"));
|
||||
assertTrue(c.replica.isPass(3));
|
||||
assertFalse(c.replica.isPass(2));
|
||||
assertFalse(c.replica.isPass(1));
|
||||
assertEquals("{\"replica\":\">2.0\"}", c.replica.toString());
|
||||
|
||||
c = new Clause((Map<String, Object>) Utils.fromJSONString("{replica:0, nodeRole:'!overseer'}"));
|
||||
|
||||
c = Clause.create((Map<String, Object>) Utils.fromJSONString("{replica:0, nodeRole:'!overseer'}"));
|
||||
assertTrue(c.tag.isPass("OVERSEER"));
|
||||
assertFalse(c.tag.isPass("overseer"));
|
||||
|
||||
c = new Clause((Map<String, Object>) Utils.fromJSONString("{replica:0, sysLoadAvg:'<12.7'}"));
|
||||
c = Clause.create((Map<String, Object>) Utils.fromJSONString("{replica:0, sysLoadAvg:'<12.7'}"));
|
||||
assertTrue(c.tag.isPass("12.6"));
|
||||
assertTrue(c.tag.isPass(12.6d));
|
||||
assertFalse(c.tag.isPass("12.9"));
|
||||
assertFalse(c.tag.isPass(12.9d));
|
||||
|
||||
c = new Clause((Map<String, Object>) Utils.fromJSONString("{replica:0, sysLoadAvg:'>12.7'}"));
|
||||
c = Clause.create((Map<String, Object>) Utils.fromJSONString("{replica:0, sysLoadAvg:'>12.7'}"));
|
||||
assertTrue(c.tag.isPass("12.8"));
|
||||
assertTrue(c.tag.isPass(12.8d));
|
||||
assertFalse(c.tag.isPass("12.6"));
|
||||
assertFalse(c.tag.isPass(12.6d));
|
||||
|
||||
c = new Clause((Map<String, Object>) Utils.fromJSONString("{replica:0, 'metrics:x:y:z':'>12.7'}"));
|
||||
c = Clause.create((Map<String, Object>) Utils.fromJSONString("{replica:0, 'metrics:x:y:z':'>12.7'}"));
|
||||
assertTrue(c.tag.val instanceof String);
|
||||
assertTrue(c.tag.isPass("12.8"));
|
||||
assertTrue(c.tag.isPass(12.8d));
|
||||
assertFalse(c.tag.isPass("12.6"));
|
||||
assertFalse(c.tag.isPass(12.6d));
|
||||
|
||||
try {
|
||||
c = Clause.create((Map<String, Object>) Utils.fromJSONString("{replica:0, 'ip_1':'<30%'}"));
|
||||
fail("Expected exception");
|
||||
} catch (Exception e) {
|
||||
assertTrue(e.getMessage().contains("'%' is not allowed for variable : 'ip_1'"));
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public void testNodeLost() {
|
||||
|
@ -261,32 +332,6 @@ public class TestPolicy extends SolrTestCaseJ4 {
|
|||
" 'node':'127.0.0.1:65434_solr'," +
|
||||
" 'cores':0," +
|
||||
" 'freedisk':884.7097854614258}}}";
|
||||
/* String stateJson = "{'testNodeLost':{" +
|
||||
" 'pullReplicas':'0'," +
|
||||
" 'replicationFactor':'2'," +
|
||||
" 'router':{'name':'compositeId'}," +
|
||||
" 'maxShardsPerNode':'1'," +
|
||||
" 'autoAddReplicas':'false'," +
|
||||
" 'nrtReplicas':'2'," +
|
||||
" 'tlogReplicas':'0'," +
|
||||
" 'shards':{'shard1':{" +
|
||||
" 'range':'80000000-7fffffff'," +
|
||||
" 'state':'active'," +
|
||||
" 'replicas':{" +
|
||||
" 'core_node1':{" +
|
||||
" 'core':'testNodeLost_shard1_replica_n1'," +
|
||||
" 'base_url':'http://127.0.0.1:65417/solr'," +
|
||||
" 'node_name':'127.0.0.1:65417_solr'," +
|
||||
" 'state':'active'," +
|
||||
" 'type':'NRT'," +
|
||||
" 'leader':'true'}," +
|
||||
" 'core_node2':{" +
|
||||
" 'core':'testNodeLost_shard1_replica_n2'," +
|
||||
" 'base_url':'http://127.0.0.1:65427/solr'," +
|
||||
" 'node_name':'127.0.0.1:65427_solr'," +
|
||||
" 'state':'down'," +
|
||||
" 'type':'NRT'}}}}}}";*/
|
||||
|
||||
String autoScalingjson = "{" +
|
||||
" 'cluster-policy':[" +
|
||||
" {" +
|
||||
|
@ -746,7 +791,7 @@ public class TestPolicy extends SolrTestCaseJ4 {
|
|||
assertTrue("Collection for replica is not as expected " + collection, collection.equals("newColl") || collection.equals("newColl2"));
|
||||
if (collection.equals("newColl")) countNewCollOp++;
|
||||
else countNewColl2Op++;
|
||||
assertEquals("PULL type node must be in 'slowdisk' node", "node1", op.getParams().get("node"));
|
||||
assertEquals("PULL type node must be in 'slowdisk' node, countOp : " + countOp, "node1", op.getParams().get("node"));
|
||||
suggester = suggester.getSession().getSuggester(ADDREPLICA)
|
||||
.hint(Hint.REPLICATYPE, Replica.Type.PULL)
|
||||
.hint(Hint.COLL_SHARD, new Pair<>("newColl", "shard1"))
|
||||
|
@ -1510,7 +1555,7 @@ public class TestPolicy extends SolrTestCaseJ4 {
|
|||
assertFalse(l.isEmpty());
|
||||
|
||||
Map m = l.get(0).toMap(new LinkedHashMap<>());
|
||||
assertEquals(1L, Utils.getObjectByPath(m, true, "violation/violation/delta"));
|
||||
assertEquals(1.0d, Utils.getObjectByPath(m, true, "violation/violation/delta"));
|
||||
assertEquals("POST", Utils.getObjectByPath(m, true, "operation/method"));
|
||||
assertEquals("/c/mycoll1", Utils.getObjectByPath(m, true, "operation/path"));
|
||||
assertNotNull(Utils.getObjectByPath(m, false, "operation/command/move-replica"));
|
||||
|
@ -1518,6 +1563,105 @@ public class TestPolicy extends SolrTestCaseJ4 {
|
|||
assertEquals("core_node2", Utils.getObjectByPath(m, true, "operation/command/move-replica/replica"));
|
||||
}
|
||||
|
||||
|
||||
public void testReplicaPercentage() {
|
||||
String dataproviderdata = "{" +
|
||||
" 'liveNodes':[" +
|
||||
" '10.0.0.6:7574_solr'," +
|
||||
" '10.0.0.6:8983_solr']," +
|
||||
" 'replicaInfo':{" +
|
||||
" '10.0.0.6:7574_solr':{}," +
|
||||
" '10.0.0.6:8983_solr':{'mycoll1':{" +
|
||||
" 'shard2':[{'core_node2':{'type':'NRT'}}]," +
|
||||
" 'shard1':[{'core_node1':{'type':'NRT'}}]}}}," +
|
||||
" 'nodeValues':{" +
|
||||
" '10.0.0.6:7574_solr':{" +
|
||||
" 'node':'10.0.0.6:7574_solr'," +
|
||||
" 'cores':0}," +
|
||||
" '10.0.0.6:8983_solr':{" +
|
||||
" 'node':'10.0.0.6:8983_solr'," +
|
||||
" 'cores':2}}}";
|
||||
String autoScalingjson = " { cluster-policy:[" +
|
||||
" { replica :'<51%', node:'#ANY'}]," +
|
||||
" cluster-preferences :[{ minimize : cores }]}";
|
||||
|
||||
|
||||
AutoScalingConfig autoScalingConfig = new AutoScalingConfig((Map<String, Object>) Utils.fromJSONString(autoScalingjson));
|
||||
Policy.Session session = autoScalingConfig.getPolicy().createSession(cloudManagerWithData(dataproviderdata));
|
||||
List<Violation> violations = session.getViolations();
|
||||
assertEquals(1, violations.size());
|
||||
autoScalingjson = " { cluster-policy:[" +
|
||||
" { replica :'<51%', shard: '#EACH' , node:'#ANY'}]," +
|
||||
" cluster-preferences :[{ minimize : cores }]}";
|
||||
autoScalingConfig = new AutoScalingConfig((Map<String, Object>) Utils.fromJSONString(autoScalingjson));
|
||||
session = autoScalingConfig.getPolicy().createSession(cloudManagerWithData(dataproviderdata));
|
||||
violations = session.getViolations();
|
||||
assertEquals(0, violations.size());
|
||||
}
|
||||
|
||||
public void testReplicaZonesPercentage() {
|
||||
String dataproviderdata = "{" +
|
||||
" 'liveNodes':[" +
|
||||
" '10.0.0.6:7574_solr'," +
|
||||
" '10.0.0.6:8983_solr']," +
|
||||
" 'replicaInfo':{" +
|
||||
" '10.0.0.6:7574_solr':{}," +
|
||||
" '10.0.0.6:8983_solr':{}}," +
|
||||
" 'nodeValues':{" +
|
||||
" '10.0.0.6:7574_solr':{" +
|
||||
" 'node':'10.0.0.6:7574_solr'," +
|
||||
" 'cores':0," +
|
||||
" 'sysprop.az': 'west'" +
|
||||
" }," +
|
||||
" '10.0.0.6:8983_solr':{" +
|
||||
" 'node':'10.0.0.6:8983_solr'," +
|
||||
" 'cores':0," +
|
||||
" 'sysprop.az': 'east' " +
|
||||
" }}}";
|
||||
|
||||
String autoScalingjson = " { cluster-policy:[" +
|
||||
" { replica :'<34%', shard: '#EACH', sysprop.az : east}," +
|
||||
" { replica :'<67%', shard: '#EACH', sysprop.az : west}" +
|
||||
" ]," +
|
||||
" cluster-preferences :[{ minimize : cores }]}";
|
||||
|
||||
String COLL_NAME = "percentColl";
|
||||
AutoScalingConfig autoScalingConfig = new AutoScalingConfig((Map<String, Object>) Utils.fromJSONString(autoScalingjson));
|
||||
|
||||
Policy.Transaction txn = new Policy.Transaction(autoScalingConfig.getPolicy());
|
||||
txn.open(cloudManagerWithData(dataproviderdata));
|
||||
|
||||
List<String> nodes = new ArrayList<>();
|
||||
|
||||
for (int i = 0; i < 12; i++) {
|
||||
SolrRequest suggestion = txn.getCurrentSession()
|
||||
.getSuggester(ADDREPLICA)
|
||||
.hint(Hint.COLL_SHARD, new Pair<>(COLL_NAME, "shard1"))
|
||||
.getSuggestion();
|
||||
assertNotNull(suggestion);
|
||||
String node = suggestion.getParams().get("node");
|
||||
nodes.add(node);
|
||||
if (i % 3 == 1) assertEquals("10.0.0.6:8983_solr", node);
|
||||
else assertEquals("10.0.0.6:7574_solr", node);
|
||||
}
|
||||
|
||||
List<Violation> violations = txn.close();
|
||||
assertTrue(violations.isEmpty());
|
||||
Policy.Session latestSession = txn.getCurrentSession();
|
||||
assertEquals("10.0.0.6:7574_solr", latestSession.matrix.get(0).node);
|
||||
AtomicInteger count = new AtomicInteger();
|
||||
latestSession.matrix.get(0).forEachReplica(replicaInfo -> count.incrementAndGet());
|
||||
assertEquals(8, count.get());
|
||||
|
||||
assertEquals("10.0.0.6:8983_solr", latestSession.matrix.get(1).node);
|
||||
count.set(0);
|
||||
latestSession.matrix.get(1).forEachReplica(replicaInfo -> count.incrementAndGet());
|
||||
assertEquals(4, count.get());
|
||||
|
||||
}
|
||||
|
||||
|
||||
|
||||
public void testFreeDiskSuggestions() {
|
||||
String dataproviderdata = "{" +
|
||||
" liveNodes:[node1,node2]," +
|
||||
|
|
Loading…
Reference in New Issue