mirror of https://github.com/apache/lucene.git
SOLR-13504: In autoscaling policies, use an explicit 'put : on-each' to specify the the rules is applied on each node (#694)
SOLR-13504: In autoscaling policies, use an explicit 'put : on-each' to specify the the rules is applied on each node
This commit is contained in:
parent
b09d462ee4
commit
0a41163d27
|
@ -102,6 +102,9 @@ New Features
|
|||
* SOLR-13504: In autoscaling policies, use an explicit 'nodeset' attribute for filtering
|
||||
nodes instead of using them directly at the toplevel (noble)
|
||||
|
||||
* SOLR-13504: In autoscaling policies, use an explicit 'put : on-each'
|
||||
to specify the the rules is applied on each node (noble)
|
||||
|
||||
Bug Fixes
|
||||
----------------------
|
||||
|
||||
|
|
|
@ -714,6 +714,22 @@ public class AutoScalingHandlerTest extends SolrCloudTestCase {
|
|||
List clusterPolicy = (List) loaded.get("cluster-policy");
|
||||
assertNotNull(clusterPolicy);
|
||||
assertEquals(3, clusterPolicy.size());
|
||||
|
||||
setClusterPolicyCommand = "{" +
|
||||
" 'set-cluster-policy': [" +
|
||||
" {'cores':'<10', 'node':'#ANY'}," +
|
||||
" {'replica':'<2', 'shard': '#EACH', 'node': '#ANY'}," +
|
||||
" {'replica':0, put : on-each, nodeset:{'nodeRole':'overseer'} }" +
|
||||
" ]" +
|
||||
"}";
|
||||
req = AutoScalingRequest.create(SolrRequest.METHOD.POST, setClusterPolicyCommand);
|
||||
response = solrClient.request(req);
|
||||
assertEquals(response.get("result").toString(), "success");
|
||||
data = zkClient().getData(SOLR_AUTOSCALING_CONF_PATH, null, null, true);
|
||||
loaded = ZkNodeProps.load(data);
|
||||
clusterPolicy = (List) loaded.get("cluster-policy");
|
||||
assertNotNull(clusterPolicy);
|
||||
assertEquals(3, clusterPolicy.size());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
@ -66,12 +66,14 @@ public class Clause implements MapWriter, Comparable<Clause> {
|
|||
boolean nodeSetPresent = false;
|
||||
Condition collection, shard, replica, tag, globalTag;
|
||||
final Replica.Type type;
|
||||
Put put;
|
||||
boolean strict;
|
||||
|
||||
protected Clause(Clause clause, Function<Condition, Object> computedValueEvaluator) {
|
||||
this.original = clause.original;
|
||||
this.hashCode = original.hashCode();
|
||||
this.type = clause.type;
|
||||
this.put = clause.put;
|
||||
this.nodeSetPresent = clause.nodeSetPresent;
|
||||
this.collection = clause.collection;
|
||||
this.shard = clause.shard;
|
||||
|
@ -84,7 +86,7 @@ public class Clause implements MapWriter, Comparable<Clause> {
|
|||
}
|
||||
|
||||
// internal use only
|
||||
Clause(Map<String, Object> original, Condition tag, Condition globalTag, boolean isStrict, boolean nodeSetPresent) {
|
||||
Clause(Map<String, Object> original, Condition tag, Condition globalTag, boolean isStrict, Put put, boolean nodeSetPresent) {
|
||||
this.hashCode = original.hashCode();
|
||||
this.original = original;
|
||||
this.tag = tag;
|
||||
|
@ -94,6 +96,7 @@ public class Clause implements MapWriter, Comparable<Clause> {
|
|||
this.hasComputedValue = false;
|
||||
this.strict = isStrict;
|
||||
derivedFrom = null;
|
||||
this.put = put;
|
||||
this.nodeSetPresent = nodeSetPresent;
|
||||
}
|
||||
|
||||
|
@ -103,6 +106,12 @@ public class Clause implements MapWriter, Comparable<Clause> {
|
|||
this.hashCode = original.hashCode();
|
||||
String type = (String) m.get("type");
|
||||
this.type = type == null || ANY.equals(type) ? null : Replica.Type.valueOf(type.toUpperCase(Locale.ROOT));
|
||||
String put = (String) m.getOrDefault("put", m.containsKey(NODESET)? Put.ON_ALL.val: null );
|
||||
if (put != null) {
|
||||
this.put = Put.get(put);
|
||||
if (this.put == null) throwExp(m, "invalid value for put : {0}", put);
|
||||
}
|
||||
|
||||
strict = Boolean.parseBoolean(String.valueOf(m.getOrDefault("strict", "true")));
|
||||
Optional<String> globalTagName = m.keySet().stream().filter(Policy.GLOBAL_ONLY_TAGS::contains).findFirst();
|
||||
if (globalTagName.isPresent()) {
|
||||
|
@ -348,7 +357,7 @@ public class Clause implements MapWriter, Comparable<Clause> {
|
|||
} catch (IllegalArgumentException iae) {
|
||||
throw iae;
|
||||
} catch (Exception e) {
|
||||
throwExp(m, "Invalid tag : {0} ", s);
|
||||
throwExp(m, " Invalid tag : {0} "+ e.getMessage(), s);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
@ -400,26 +409,25 @@ public class Clause implements MapWriter, Comparable<Clause> {
|
|||
return operand;
|
||||
}
|
||||
|
||||
|
||||
private boolean isRowPass(ComputedValueEvaluator eval, Object t, Row row) {
|
||||
eval.node = row.node;
|
||||
if (t instanceof Condition) {
|
||||
Condition tag = (Condition) t;
|
||||
if (tag.computedType != null) tag = evaluateValue(tag, eval);
|
||||
return tag.isPass(row);
|
||||
} else {
|
||||
return t.equals(row.getVal(tag.name));
|
||||
}
|
||||
}
|
||||
|
||||
List<Violation> testGroupNodes(Policy.Session session, double[] deviations) {
|
||||
//e.g: {replica:'#EQUAL', shard:'#EACH', sysprop.zone:'#EACH'}
|
||||
ComputedValueEvaluator eval = new ComputedValueEvaluator(session);
|
||||
eval.collName = (String) collection.getValue();
|
||||
Violation.Ctx ctx = new Violation.Ctx(this, session.matrix, eval);
|
||||
|
||||
Set tags = new HashSet();
|
||||
for (Row row : session.matrix) {
|
||||
eval.node = row.node;
|
||||
Condition tag = this.tag;
|
||||
if (tag.computedType != null) tag = evaluateValue(tag, eval);
|
||||
Object val = row.getVal(tag.name);
|
||||
if (val != null) {
|
||||
if (tag.op == LESS_THAN || tag.op == GREATER_THAN) {
|
||||
tags.add(this.tag);
|
||||
} else if (tag.isPass(val)) {
|
||||
tags.add(val);
|
||||
}
|
||||
}
|
||||
}
|
||||
Set tags = getUniqueTags(session, eval);
|
||||
if (tags.isEmpty()) return Collections.emptyList();
|
||||
|
||||
Set<String> shards = getShardNames(session, eval);
|
||||
|
@ -428,17 +436,10 @@ public class Clause implements MapWriter, Comparable<Clause> {
|
|||
final ReplicaCount replicaCount = new ReplicaCount();
|
||||
eval.shardName = s;
|
||||
|
||||
for (Object t : tags) {
|
||||
for (Object tag : tags) {
|
||||
replicaCount.reset();
|
||||
for (Row row : session.matrix) {
|
||||
eval.node = row.node;
|
||||
if (t instanceof Condition) {
|
||||
Condition tag = (Condition) t;
|
||||
if (tag.computedType != null) tag = evaluateValue(tag, eval);
|
||||
if (!tag.isPass(row)) continue;
|
||||
} else {
|
||||
if (!t.equals(row.getVal(tag.name))) continue;
|
||||
}
|
||||
if(!isRowPass(eval, tag, row)) continue;
|
||||
addReplicaCountsForNode(eval, replicaCount, row);
|
||||
}
|
||||
|
||||
|
@ -451,27 +452,57 @@ public class Clause implements MapWriter, Comparable<Clause> {
|
|||
null,
|
||||
replicaCountCopy,
|
||||
sealedClause.getReplica().replicaCountDelta(replicaCountCopy),
|
||||
t);
|
||||
ctx.resetAndAddViolation(t, replicaCountCopy, violation);
|
||||
sealedClause.addViolatingReplicas(sealedClause.tag, eval, ctx, tag.name, t, violation, session);
|
||||
tag);
|
||||
ctx.resetAndAddViolation(tag, replicaCountCopy, violation);
|
||||
sealedClause.addViolatingReplicasForGroup(sealedClause.tag, eval, ctx, this.tag.name, tag, violation, session.matrix);
|
||||
if (!this.strict && deviations != null) {
|
||||
tag.varType.computeDeviation(session, deviations, replicaCount, sealedClause);
|
||||
this.tag.varType.computeDeviation(session, deviations, replicaCount, sealedClause);
|
||||
}
|
||||
} else {
|
||||
if (replica.op == RANGE_EQUAL) tag.varType.computeDeviation(session, deviations, replicaCount, sealedClause);
|
||||
if (replica.op == RANGE_EQUAL) this.tag.varType.computeDeviation(session, deviations, replicaCount, sealedClause);
|
||||
}
|
||||
}
|
||||
}
|
||||
return ctx.allViolations;
|
||||
}
|
||||
|
||||
void addViolatingReplicas(Condition tag,
|
||||
ComputedValueEvaluator eval,
|
||||
Violation.Ctx ctx, String tagName, Object tagVal,
|
||||
Violation violation,
|
||||
Policy.Session session) {
|
||||
private Set getUniqueTags(Policy.Session session, ComputedValueEvaluator eval) {
|
||||
Set tags = new HashSet();
|
||||
if(tag.op == WILDCARD){
|
||||
for (Row row : session.matrix) {
|
||||
eval.node = row.node;
|
||||
Condition tag = this.tag;
|
||||
if (tag.computedType != null) tag = evaluateValue(tag, eval);
|
||||
Object val = row.getVal(tag.name);
|
||||
if (val != null) {
|
||||
if (tag.op == LESS_THAN || tag.op == GREATER_THAN) {
|
||||
tags.add(this.tag);
|
||||
} else if (tag.isPass(val)) {
|
||||
tags.add(val);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
} else {
|
||||
|
||||
if (tag.op == LESS_THAN || tag.op == GREATER_THAN || tag.op == RANGE_EQUAL || tag.op == NOT_EQUAL) {
|
||||
tags.add(tag); // eg: freedisk > 100
|
||||
} else if (tag.val instanceof Collection) {
|
||||
tags.addAll((Collection) tag.val); //e: sysprop.zone:[east,west]
|
||||
} else {
|
||||
tags.add(tag.val);//
|
||||
}
|
||||
}
|
||||
return tags;
|
||||
}
|
||||
|
||||
void addViolatingReplicasForGroup(Condition tag,
|
||||
ComputedValueEvaluator eval,
|
||||
Violation.Ctx ctx, String tagName, Object tagVal,
|
||||
Violation violation,
|
||||
List<Row> nodes) {
|
||||
if (tag.varType.addViolatingReplicas(ctx)) return;
|
||||
for (Row row : session.matrix) {
|
||||
for (Row row : nodes) {
|
||||
if (tagVal.equals(row.getVal(tagName))) {
|
||||
row.forEachReplica(eval.collName, ri -> {
|
||||
if (Policy.ANY.equals(eval.shardName)
|
||||
|
@ -535,7 +566,8 @@ public class Clause implements MapWriter, Comparable<Clause> {
|
|||
sealedClause.getReplica().replicaCountDelta(replicaCountCopy),
|
||||
eval.node);
|
||||
ctx.resetAndAddViolation(row.node, replicaCountCopy, violation);
|
||||
sealedClause.addViolatingReplicas(sealedClause.tag, eval, ctx, NODE, row.node, violation, session);
|
||||
sealedClause.addViolatingReplicasForGroup(sealedClause.tag, eval, ctx, NODE, row.node, violation,
|
||||
Collections.singletonList(row));
|
||||
if (!this.strict && deviations != null) {
|
||||
tag.varType.computeDeviation(session, deviations, replicaCount, sealedClause);
|
||||
}
|
||||
|
@ -568,6 +600,14 @@ public class Clause implements MapWriter, Comparable<Clause> {
|
|||
|
||||
public List<Violation> test(Policy.Session session, double[] deviations) {
|
||||
if (isPerCollectiontag()) {
|
||||
if(nodeSetPresent) {
|
||||
if(put == Put.ON_EACH){
|
||||
return testPerNode(session, deviations) ;
|
||||
} else {
|
||||
return testGroupNodes(session, deviations);
|
||||
}
|
||||
}
|
||||
|
||||
return tag.varType == Type.NODE ||
|
||||
(tag.varType.meta.isNodeSpecificVal() && replica.computedType == null) ?
|
||||
testPerNode(session, deviations) :
|
||||
|
@ -581,7 +621,7 @@ public class Clause implements MapWriter, Comparable<Clause> {
|
|||
if (!sealedClause.getGlobalTag().isPass(r)) {
|
||||
ctx.resetAndAddViolation(r.node, null, new Violation(sealedClause, null, null, r.node, r.getVal(sealedClause.globalTag.name),
|
||||
sealedClause.globalTag.delta(r.getVal(globalTag.name)), r.node));
|
||||
addViolatingReplicas(sealedClause.globalTag, computedValueEvaluator, ctx, Type.CORES.tagName, r.node, ctx.currentViolation, session);
|
||||
addViolatingReplicasForGroup(sealedClause.globalTag, computedValueEvaluator, ctx, Type.CORES.tagName, r.node, ctx.currentViolation, session.matrix);
|
||||
|
||||
}
|
||||
}
|
||||
|
@ -710,4 +750,21 @@ public class Clause implements MapWriter, Comparable<Clause> {
|
|||
|
||||
public static final String METRICS_PREFIX = "metrics:";
|
||||
|
||||
enum Put {
|
||||
ON_ALL("on-all"), ON_EACH("on-each");
|
||||
|
||||
public final String val;
|
||||
|
||||
Put(String s) {
|
||||
this.val = s;
|
||||
}
|
||||
|
||||
public static Put get(String s) {
|
||||
for (Put put : values()) {
|
||||
if (put.val.equals(s)) return put;
|
||||
}
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -557,7 +557,7 @@ public class Policy implements MapWriter {
|
|||
if (!withCollMap.isEmpty()) {
|
||||
Clause withCollClause = new Clause((Map<String,Object>)Utils.fromJSONString("{withCollection:'*' , node: '#ANY'}") ,
|
||||
new Condition(NODE.tagName, "#ANY", Operand.EQUAL, null, null),
|
||||
new Condition(WITH_COLLECTION.tagName,"*" , Operand.EQUAL, null, null), true, false
|
||||
new Condition(WITH_COLLECTION.tagName,"*" , Operand.EQUAL, null, null), true, null, false
|
||||
);
|
||||
expandedClauses.add(withCollClause);
|
||||
}
|
||||
|
|
|
@ -214,8 +214,9 @@ public interface Variable {
|
|||
@Meta(name = "STRING",
|
||||
type = String.class,
|
||||
wildCards = Policy.EACH,
|
||||
supportArrayVals = true)
|
||||
STRING,
|
||||
supportArrayVals = true
|
||||
)
|
||||
SYSPROP,
|
||||
|
||||
@Meta(name = "node",
|
||||
type = String.class,
|
||||
|
|
|
@ -93,7 +93,7 @@ public class VariableBase implements Variable {
|
|||
|
||||
public static Type getTagType(String name) {
|
||||
Type info = Type.get(name);
|
||||
if (info == null && name.startsWith(ImplicitSnitch.SYSPROP)) info = Type.STRING;
|
||||
if (info == null && name.startsWith(ImplicitSnitch.SYSPROP)) info = Type.SYSPROP;
|
||||
if (info == null && name.startsWith(Clause.METRICS_PREFIX)) info = Type.LAZY;
|
||||
return info;
|
||||
}
|
||||
|
|
|
@ -701,7 +701,7 @@ public class TestPolicy extends SolrTestCaseJ4 {
|
|||
expectThrows(IllegalArgumentException.class,
|
||||
() -> Clause.create("{cores: '>14%' , node:'#ANY'}"));
|
||||
clause = Clause.create("{replica:1, nodeset : {sysprop.zone : east}}");
|
||||
assertEquals(Variable.Type.STRING, clause.tag.varType);
|
||||
assertEquals(Variable.Type.SYSPROP, clause.tag.varType);
|
||||
clause =Clause.create("{replica:1, nodeset : [node1, node2, node3]}");
|
||||
assertEquals(Variable.Type.NODE, clause.tag.varType);
|
||||
assertEquals(Operand.IN, clause.tag.op);
|
||||
|
@ -719,7 +719,12 @@ public class TestPolicy extends SolrTestCaseJ4 {
|
|||
() -> Clause.create("{replica:1, nodeset : {sysprop.zone : east , port: 8983 }}"));
|
||||
assertTrue(exp.getMessage().contains("nodeset must only have one and only one key"));
|
||||
clause = Clause.create("{'replica': '#ALL', 'nodeset': {'freedisk': '>700'}, 'strict': false}");
|
||||
assertEquals(clause.put, Clause.Put.ON_ALL);
|
||||
assertEquals(Operand.GREATER_THAN , clause.tag.op);
|
||||
clause = Clause.create("{'replica': '#ALL', put: on-each, 'nodeset': {sysprop.zone : east}}");
|
||||
assertEquals(clause.put, Clause.Put.ON_EACH);
|
||||
exp = expectThrows(IllegalArgumentException.class, ()-> Clause.create("{'replica': '#ALL', put: on-Each, 'nodeset': {sysprop.zone : east}}"));
|
||||
assertTrue(exp.getMessage().contains("invalid value for put : on-Each"));
|
||||
|
||||
}
|
||||
|
||||
|
@ -1240,8 +1245,8 @@ public class TestPolicy extends SolrTestCaseJ4 {
|
|||
" 'cluster-policy': [" +
|
||||
" { 'replica': 0, nodeset : {'nodeRole': 'overseer'}}" +
|
||||
" { 'replica': '<2', 'shard': '#EACH', 'node': '#ANY'}," +
|
||||
" { 'replica': 0, 'shard': '#EACH', nodeset : { sysprop.fs : '!ssd'}, type : TLOG }" +
|
||||
" { 'replica': 0, 'shard': '#EACH', nodeset : {sysprop.fs : '!slowdisk'} , type : PULL }" +
|
||||
" { 'replica': 0, 'shard': '#EACH', nodeset : { sysprop.fs : '!ssd'}, type : TLOG }" +
|
||||
" { 'replica': 0, 'shard': '#EACH', put:'on-each' nodeset : {sysprop.fs : '!slowdisk'} , type : PULL }" +
|
||||
" ]" +
|
||||
"}");
|
||||
|
||||
|
@ -1366,8 +1371,8 @@ public class TestPolicy extends SolrTestCaseJ4 {
|
|||
" { 'replica': '<2', 'shard': '#EACH', 'node': '#ANY', 'collection':'newColl'}," +
|
||||
" { 'replica': '<2', 'shard': '#EACH', 'node': '#ANY', 'collection':'newColl2', type : PULL}," +
|
||||
" { 'replica': '<3', 'shard': '#EACH', 'node': '#ANY', 'collection':'newColl2'}," +
|
||||
" { 'replica': 0, 'shard': '#EACH', nodeset:{ sysprop.fs : '!ssd'}, type : TLOG }" +
|
||||
" { 'replica': 0, 'shard': '#EACH', nodeset : {sysprop.fs : '!slowdisk'} , type : PULL }" +
|
||||
" { 'replica': 0, 'shard': '#EACH', put: on-each , nodeset:{ sysprop.fs : '!ssd'}, type : TLOG }" +
|
||||
" { 'replica': 0, 'shard': '#EACH', put: on-each ,nodeset : {sysprop.fs : '!slowdisk'} , type : PULL }" +
|
||||
" ]" +
|
||||
"}");
|
||||
|
||||
|
@ -2389,8 +2394,8 @@ public class TestPolicy extends SolrTestCaseJ4 {
|
|||
" cluster-preferences :[{ minimize : cores, precision : 2 }]}";
|
||||
if(useNodeset){
|
||||
autoScalingjson = " { cluster-policy:[" +
|
||||
" { replica :'0', nodeset:{ freedisk:'<1000'}}," +
|
||||
" { replica :0, nodeset : {nodeRole : overseer}}]," +
|
||||
" { replica :'0', put:on-each , nodeset:{ freedisk:'<1000'}}," +
|
||||
" { replica :0, put : on-each , nodeset : {nodeRole : overseer}}]," +
|
||||
" cluster-preferences :[{ minimize : cores, precision : 2 }]}";
|
||||
}
|
||||
AutoScalingConfig cfg = new AutoScalingConfig((Map<String, Object>) Utils.fromJSONString(autoScalingjson));
|
||||
|
@ -2422,7 +2427,7 @@ public class TestPolicy extends SolrTestCaseJ4 {
|
|||
if(useNodeset){
|
||||
autoScalingjson = " { cluster-policy:[" +
|
||||
" { replica :'#ALL', nodeset:{ freedisk:'>1000'}}," +
|
||||
" { replica :0 , nodeset : {nodeRole : overseer}}]," +
|
||||
" { replica :0 , put: on-each , nodeset : {nodeRole : overseer}}]," +
|
||||
" cluster-preferences :[{ minimize : cores, precision : 2 }]}";
|
||||
}
|
||||
cfg = new AutoScalingConfig((Map<String, Object>) Utils.fromJSONString(autoScalingjson));
|
||||
|
|
Loading…
Reference in New Issue