mirror of https://github.com/apache/lucene.git
SOLR-12495: An #EQUALS function for replica in autoscaling policy to equally distribute replicas
This commit is contained in:
parent
7b2a2d989c
commit
f86c477521
|
@ -101,6 +101,8 @@ New Features
|
|||
* SOLR-12530: Ability to disable configset upload via -Dconfigset.upload.enabled=false startup parameter
|
||||
(Ishan Chattopadhyaya)
|
||||
|
||||
* SOLR-12495: An #EQUALS function for replica in autoscaling policy to equally distribute replicas (noble)
|
||||
|
||||
Bug Fixes
|
||||
----------------------
|
||||
|
||||
|
|
|
@ -37,7 +37,6 @@ import org.apache.solr.common.util.Utils;
|
|||
|
||||
import static java.util.Collections.singletonMap;
|
||||
import static org.apache.solr.client.solrj.cloud.autoscaling.Clause.TestStatus.PASS;
|
||||
import static org.apache.solr.client.solrj.cloud.autoscaling.Operand.EQUAL;
|
||||
import static org.apache.solr.client.solrj.cloud.autoscaling.Operand.GREATER_THAN;
|
||||
import static org.apache.solr.client.solrj.cloud.autoscaling.Operand.LESS_THAN;
|
||||
import static org.apache.solr.client.solrj.cloud.autoscaling.Operand.NOT_EQUAL;
|
||||
|
@ -103,9 +102,20 @@ public class Clause implements MapWriter, Comparable<Clause> {
|
|||
throw new RuntimeException("Invalid metrics: param in " + Utils.toJSONString(m) + " must have at 2 or 3 segments after 'metrics:' separated by ':'");
|
||||
}
|
||||
}
|
||||
doPostValidate(collection, shard, replica, tag, globalTag);
|
||||
hasComputedValue = hasComputedValue();
|
||||
}
|
||||
|
||||
private void doPostValidate(Condition... conditions) {
|
||||
for (Condition condition : conditions) {
|
||||
if (condition == null) continue;
|
||||
String err = condition.varType.postValidate(condition);
|
||||
if (err != null) {
|
||||
throw new IllegalArgumentException(StrUtils.formatString("Error in clause : {0}, caused by : {1}", Utils.toJSONString(original), err));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public static Clause create(Map<String, Object> m) {
|
||||
Clause clause = new Clause(m);
|
||||
return clause.hasComputedValue() ?
|
||||
|
@ -211,7 +221,7 @@ public class Clause implements MapWriter, Comparable<Clause> {
|
|||
|
||||
//replica value is zero
|
||||
boolean isReplicaZero() {
|
||||
return replica != null && replica.getOperand() == EQUAL &&
|
||||
return replica != null && replica.getOperand() == Operand.EQUAL &&
|
||||
Preference.compareWithTolerance(0d, (Double) replica.val, 1) == 0;
|
||||
}
|
||||
|
||||
|
@ -238,14 +248,14 @@ public class Clause implements MapWriter, Comparable<Clause> {
|
|||
else if (strVal.startsWith(NOT_EQUAL.operand)) operand = NOT_EQUAL;
|
||||
else if (strVal.startsWith(GREATER_THAN.operand)) operand = GREATER_THAN;
|
||||
else if (strVal.startsWith(LESS_THAN.operand)) operand = LESS_THAN;
|
||||
else operand = EQUAL;
|
||||
strVal = strVal.substring(EQUAL == operand || WILDCARD == operand ? 0 : 1);
|
||||
else operand = Operand.EQUAL;
|
||||
strVal = strVal.substring(Operand.EQUAL == operand || WILDCARD == operand ? 0 : 1);
|
||||
for (ComputationType t : ComputationType.values()) {
|
||||
String changedVal = t.match(strVal);
|
||||
if (changedVal != null) {
|
||||
computationType = t;
|
||||
strVal = changedVal;
|
||||
if (varType == null || !varType.supportComputed(computationType)) {
|
||||
if (varType == null || !varType.supportComputed(computationType, this)) {
|
||||
throw new IllegalArgumentException(StrUtils.formatString("''{0}'' is not allowed for variable : ''{1}'' , in condition : ''{2}'' ",
|
||||
t, conditionName, Utils.toJSONString(m)));
|
||||
}
|
||||
|
@ -255,7 +265,7 @@ public class Clause implements MapWriter, Comparable<Clause> {
|
|||
expectedVal = validate(s, new Condition(s, strVal, operand, computationType, null), true);
|
||||
|
||||
} else if (val instanceof Number) {
|
||||
operand = EQUAL;
|
||||
operand = Operand.EQUAL;
|
||||
operand = varType.getOperand(operand, val, null);
|
||||
expectedVal = validate(s, new Condition(s, val, operand, null, null), true);
|
||||
}
|
||||
|
@ -272,8 +282,8 @@ public class Clause implements MapWriter, Comparable<Clause> {
|
|||
ComputedValueEvaluator computedValueEvaluator = new ComputedValueEvaluator(session);
|
||||
Suggestion.ViolationCtx ctx = new Suggestion.ViolationCtx(this, session.matrix, computedValueEvaluator);
|
||||
if (isPerCollectiontag()) {
|
||||
Map<String, Map<String, Map<String, ReplicaCount>>> replicaCount = computeReplicaCounts(session.matrix, computedValueEvaluator);
|
||||
for (Map.Entry<String, Map<String, Map<String, ReplicaCount>>> e : replicaCount.entrySet()) {
|
||||
Map<String, Map<String, Map<String, ReplicaCount>>> replicaCounts = computeReplicaCounts(session.matrix, computedValueEvaluator);
|
||||
for (Map.Entry<String, Map<String, Map<String, ReplicaCount>>> e : replicaCounts.entrySet()) {
|
||||
computedValueEvaluator.collName = e.getKey();
|
||||
if (!collection.isPass(computedValueEvaluator.collName)) continue;
|
||||
for (Map.Entry<String, Map<String, ReplicaCount>> shardVsCount : e.getValue().entrySet()) {
|
||||
|
@ -281,15 +291,16 @@ public class Clause implements MapWriter, Comparable<Clause> {
|
|||
if (!shard.isPass(computedValueEvaluator.shardName)) continue;
|
||||
for (Map.Entry<String, ReplicaCount> counts : shardVsCount.getValue().entrySet()) {
|
||||
SealedClause sealedClause = getSealedClause(computedValueEvaluator);
|
||||
if (!sealedClause.replica.isPass(counts.getValue())) {
|
||||
ReplicaCount replicas = counts.getValue();
|
||||
if (!sealedClause.replica.isPass(replicas)) {
|
||||
Violation violation = new Violation(sealedClause,
|
||||
computedValueEvaluator.collName,
|
||||
computedValueEvaluator.shardName,
|
||||
tag.name.equals("node") ? counts.getKey() : null,
|
||||
counts.getValue(),
|
||||
sealedClause.getReplica().delta(counts.getValue()),
|
||||
sealedClause.getReplica().delta(replicas),
|
||||
counts.getKey());
|
||||
Suggestion.getTagType(tag.name).addViolatingReplicas(ctx.reset(counts.getKey(), counts.getValue(), violation));
|
||||
tag.varType.addViolatingReplicas(ctx.reset(counts.getKey(), replicas, violation));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -336,6 +347,21 @@ public class Clause implements MapWriter, Comparable<Clause> {
|
|||
}
|
||||
|
||||
enum ComputationType {
|
||||
EQUAL() {
|
||||
@Override
|
||||
public String wrap(String value) {
|
||||
return "#EQUAL";
|
||||
}
|
||||
|
||||
@Override
|
||||
public String match(String val) {
|
||||
if ("#EQUAL".equals(val)) return "1";
|
||||
return null;
|
||||
}
|
||||
|
||||
},
|
||||
|
||||
|
||||
PERCENT {
|
||||
@Override
|
||||
public String wrap(String value) {
|
||||
|
@ -583,6 +609,11 @@ public class Clause implements MapWriter, Comparable<Clause> {
|
|||
return 0d;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return jsonStr();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeMap(EntryWriter ew) throws IOException {
|
||||
ew.put("min", min).put("max", max).putIfNotNull("actual", actual);
|
||||
|
|
|
@ -138,6 +138,7 @@ public class Suggestion {
|
|||
|
||||
@Override
|
||||
public Operand getOperand(Operand expected, Object strVal, Clause.ComputationType computationType) {
|
||||
// if (computationType == Clause.ComputationType.EQUAL) return expected;
|
||||
if (strVal instanceof String) {
|
||||
String s = ((String) strVal).trim();
|
||||
int hyphenIdx = s.indexOf('-');
|
||||
|
@ -161,29 +162,37 @@ public class Suggestion {
|
|||
}
|
||||
|
||||
@Override
|
||||
public boolean supportComputed(Clause.ComputationType computedType) {
|
||||
return computedType == Clause.ComputationType.PERCENT;
|
||||
public boolean supportComputed(Clause.ComputationType computedType, Clause clause) {
|
||||
if (computedType == Clause.ComputationType.PERCENT || computedType == Clause.ComputationType.EQUAL) return true;
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String postValidate(Clause.Condition condition) {
|
||||
if (condition.computationType == Clause.ComputationType.EQUAL) {
|
||||
if (condition.getClause().tag != null &&
|
||||
condition.getClause().tag.varType == NODE &&
|
||||
condition.getClause().tag.op == Operand.WILDCARD) {
|
||||
return null;
|
||||
} else {
|
||||
return "'replica': '#EQUAL` must be used with 'node':'#ANY'";
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object computeValue(Policy.Session session, Clause.Condition cv, String collection, String shard) {
|
||||
if (cv.computationType == Clause.ComputationType.PERCENT) {
|
||||
AtomicInteger totalReplicasOfInterest = new AtomicInteger(0);
|
||||
Clause clause = cv.getClause();
|
||||
for (Row row : session.matrix) {
|
||||
row.forEachReplica(replicaInfo -> {
|
||||
if (replicaInfo.getCollection().equals(collection)) {
|
||||
if (clause.getShard().op == Operand.WILDCARD || replicaInfo.getShard().equals(shard)) {
|
||||
if(cv.getClause().type == null || replicaInfo.getType() == cv.getClause().type)
|
||||
totalReplicasOfInterest.incrementAndGet();
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
return totalReplicasOfInterest.doubleValue() * Clause.parseDouble(cv.name, cv.val).doubleValue() / 100;
|
||||
if (cv.computationType == Clause.ComputationType.EQUAL) {
|
||||
int relevantReplicasCount = getRelevantReplicasCount(session, cv, collection, shard);
|
||||
if (relevantReplicasCount == 0) return 0;
|
||||
return (double) session.matrix.size() / (double) relevantReplicasCount;
|
||||
} else if (cv.computationType == Clause.ComputationType.PERCENT) {
|
||||
int relevantReplicasCount = getRelevantReplicasCount(session, cv, collection, shard);
|
||||
if (relevantReplicasCount == 0) return 0;
|
||||
return (double) relevantReplicasCount * Clause.parseDouble(cv.name, cv.val).doubleValue() / 100;
|
||||
} else {
|
||||
throw new RuntimeException("Unsupported type " + cv.computationType);
|
||||
throw new IllegalArgumentException("Unsupported type " + cv.computationType);
|
||||
|
||||
}
|
||||
}
|
||||
|
@ -346,6 +355,13 @@ public class Suggestion {
|
|||
}
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void addViolatingReplicas(ViolationCtx ctx) {
|
||||
for (Row r : ctx.allRows) {
|
||||
if(r.node.equals(ctx.tagKey)) collectViolatingReplicas(ctx,r);
|
||||
}
|
||||
}
|
||||
},
|
||||
LAZY("LAZY", null, null, null, null) {
|
||||
@Override
|
||||
|
@ -398,14 +414,7 @@ public class Suggestion {
|
|||
|
||||
public void addViolatingReplicas(ViolationCtx ctx) {
|
||||
for (Row row : ctx.allRows) {
|
||||
row.forEachReplica(replica -> {
|
||||
if (ctx.clause.replica.isPass(0) && !ctx.clause.tag.isPass(row)) return;
|
||||
if (!ctx.clause.replica.isPass(0) && ctx.clause.tag.isPass(row)) return;
|
||||
if (!ctx.currentViolation.matchShard(replica.getShard())) return;
|
||||
if (!ctx.clause.collection.isPass(ctx.currentViolation.coll) || !ctx.clause.shard.isPass(ctx.currentViolation.shard))
|
||||
return;
|
||||
ctx.currentViolation.addReplica(new ReplicaInfoAndErr(replica).withDelta(ctx.clause.tag.delta(row.getVal(ctx.clause.tag.name))));
|
||||
});
|
||||
collectViolatingReplicas(ctx, row);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -418,6 +427,10 @@ public class Suggestion {
|
|||
return val;
|
||||
}
|
||||
|
||||
public String postValidate(Clause.Condition condition) {
|
||||
return null;
|
||||
}
|
||||
|
||||
public Object validate(String name, Object val, boolean isRuleVal) {
|
||||
if (val instanceof Clause.Condition) {
|
||||
Clause.Condition condition = (Clause.Condition) val;
|
||||
|
@ -472,7 +485,7 @@ public class Suggestion {
|
|||
return Math.abs(v1.replicaCountDelta) < Math.abs(v2.replicaCountDelta) ? -1 : 1;
|
||||
}
|
||||
|
||||
public boolean supportComputed(Clause.ComputationType computedType) {
|
||||
public boolean supportComputed(Clause.ComputationType computedType, Clause clause) {
|
||||
return false;
|
||||
}
|
||||
|
||||
|
@ -481,6 +494,33 @@ public class Suggestion {
|
|||
}
|
||||
}
|
||||
|
||||
private static void collectViolatingReplicas(ViolationCtx ctx, Row row) {
|
||||
row.forEachReplica(replica -> {
|
||||
if (ctx.clause.replica.isPass(0) && !ctx.clause.tag.isPass(row)) return;
|
||||
if (!ctx.clause.replica.isPass(0) && ctx.clause.tag.isPass(row)) return;
|
||||
if (!ctx.currentViolation.matchShard(replica.getShard())) return;
|
||||
if (!ctx.clause.collection.isPass(ctx.currentViolation.coll) || !ctx.clause.shard.isPass(ctx.currentViolation.shard))
|
||||
return;
|
||||
ctx.currentViolation.addReplica(new ReplicaInfoAndErr(replica).withDelta(ctx.clause.tag.delta(row.getVal(ctx.clause.tag.name))));
|
||||
});
|
||||
}
|
||||
|
||||
private static int getRelevantReplicasCount(Policy.Session session, Clause.Condition cv, String collection, String shard) {
|
||||
AtomicInteger totalReplicasOfInterest = new AtomicInteger(0);
|
||||
Clause clause = cv.getClause();
|
||||
for (Row row : session.matrix) {
|
||||
row.forEachReplica(replicaInfo -> {
|
||||
if (replicaInfo.getCollection().equals(collection)) {
|
||||
if (clause.getShard() ==null || clause.getShard().op == Operand.WILDCARD || replicaInfo.getShard().equals(shard)) {
|
||||
if (cv.getClause().type == null || replicaInfo.getType() == cv.getClause().type)
|
||||
totalReplicasOfInterest.incrementAndGet();
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
return totalReplicasOfInterest.get();
|
||||
}
|
||||
|
||||
static class ViolationCtx {
|
||||
final Function<Clause.Condition, Object> evaluator;
|
||||
String tagKey;
|
||||
|
|
|
@ -43,7 +43,6 @@ import org.apache.solr.common.MapWriter;
|
|||
import org.apache.solr.common.SolrException;
|
||||
import org.apache.solr.common.cloud.ClusterState;
|
||||
import org.apache.solr.common.cloud.DocCollection;
|
||||
import org.apache.solr.common.cloud.ZkStateReader;
|
||||
import org.apache.solr.common.cloud.rule.ImplicitSnitch;
|
||||
import org.apache.solr.common.cloud.rule.SnitchContext;
|
||||
import org.apache.solr.common.params.CommonParams;
|
||||
|
@ -71,19 +70,31 @@ public class SolrClientNodeStateProvider implements NodeStateProvider, MapWriter
|
|||
|
||||
|
||||
private final CloudSolrClient solrClient;
|
||||
private final ZkStateReader zkStateReader;
|
||||
private final Map<String, Map<String, Map<String, List<ReplicaInfo>>>> nodeVsCollectionVsShardVsReplicaInfo = new HashMap<>();
|
||||
private Map<String, Object> snitchSession = new HashMap<>();
|
||||
private Map<String, Map> nodeVsTags = new HashMap<>();
|
||||
|
||||
public SolrClientNodeStateProvider(CloudSolrClient solrClient) {
|
||||
this.solrClient = solrClient;
|
||||
this.zkStateReader = solrClient.getZkStateReader();
|
||||
ClusterState clusterState = zkStateReader.getClusterState();
|
||||
try {
|
||||
readReplicaDetails();
|
||||
} catch (IOException e) {
|
||||
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
|
||||
}
|
||||
if(log.isDebugEnabled()) INST = this;
|
||||
}
|
||||
|
||||
protected ClusterStateProvider getClusterStateProvider() {
|
||||
return solrClient.getClusterStateProvider();
|
||||
}
|
||||
|
||||
private void readReplicaDetails() throws IOException {
|
||||
ClusterStateProvider clusterStateProvider = getClusterStateProvider();
|
||||
ClusterState clusterState = clusterStateProvider.getClusterState();
|
||||
if (clusterState == null) { // zkStateReader still initializing
|
||||
return;
|
||||
}
|
||||
Map<String, ClusterState.CollectionRef> all = clusterState.getCollectionStates();
|
||||
Map<String, ClusterState.CollectionRef> all = clusterStateProvider.getClusterState().getCollectionStates();
|
||||
all.forEach((collName, ref) -> {
|
||||
DocCollection coll = ref.get();
|
||||
if (coll == null) return;
|
||||
|
@ -94,7 +105,6 @@ public class SolrClientNodeStateProvider implements NodeStateProvider, MapWriter
|
|||
replicas.add(new ReplicaInfo(collName, shard, replica, new HashMap<>(replica.getProperties())));
|
||||
});
|
||||
});
|
||||
if(log.isDebugEnabled()) INST = this;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -107,10 +117,15 @@ public class SolrClientNodeStateProvider implements NodeStateProvider, MapWriter
|
|||
|
||||
@Override
|
||||
public Map<String, Object> getNodeValues(String node, Collection<String> tags) {
|
||||
Map<String, Object> tagVals = fetchTagValues(node, tags);
|
||||
nodeVsTags.put(node, tagVals);
|
||||
return tagVals;
|
||||
}
|
||||
|
||||
protected Map<String, Object> fetchTagValues(String node, Collection<String> tags) {
|
||||
AutoScalingSnitch snitch = new AutoScalingSnitch();
|
||||
ClientSnitchCtx ctx = new ClientSnitchCtx(null, node, snitchSession, solrClient);
|
||||
snitch.getTags(node, new HashSet<>(tags), ctx);
|
||||
nodeVsTags.put(node, ctx.getTags());
|
||||
return ctx.getTags();
|
||||
}
|
||||
|
||||
|
@ -140,11 +155,10 @@ public class SolrClientNodeStateProvider implements NodeStateProvider, MapWriter
|
|||
});
|
||||
|
||||
if (!keyVsReplica.isEmpty()) {
|
||||
ClientSnitchCtx ctx = new ClientSnitchCtx(null, null, emptyMap(), solrClient);
|
||||
fetchMetrics(node, ctx,
|
||||
Map<String, Object> tags = fetchReplicaMetrics(node,
|
||||
keyVsReplica.entrySet().stream()
|
||||
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getKey)));
|
||||
ctx.getTags().forEach((k, o) -> {
|
||||
tags.forEach((k, o) -> {
|
||||
Pair<String, ReplicaInfo> p = keyVsReplica.get(k);
|
||||
Suggestion.ConditionType validator = Suggestion.getTagType(p.first());
|
||||
if (validator != null) o = validator.convertVal(o);
|
||||
|
@ -156,7 +170,14 @@ public class SolrClientNodeStateProvider implements NodeStateProvider, MapWriter
|
|||
return result;
|
||||
}
|
||||
|
||||
static void fetchMetrics(String solrNode, ClientSnitchCtx ctx, Map<String, Object> metricsKeyVsTag) {
|
||||
protected Map<String,Object> fetchReplicaMetrics(String solrNode, Map<String, Object> metricsKeyVsTag) {
|
||||
ClientSnitchCtx ctx = new ClientSnitchCtx(null, null, emptyMap(), solrClient);
|
||||
fetchReplicaMetrics(solrNode, ctx,metricsKeyVsTag);
|
||||
return ctx.getTags();
|
||||
|
||||
}
|
||||
|
||||
static void fetchReplicaMetrics(String solrNode, ClientSnitchCtx ctx, Map<String, Object> metricsKeyVsTag) {
|
||||
ModifiableSolrParams params = new ModifiableSolrParams();
|
||||
params.add("key", metricsKeyVsTag.keySet().toArray(new String[0]));
|
||||
try {
|
||||
|
@ -209,7 +230,7 @@ public class SolrClientNodeStateProvider implements NodeStateProvider, MapWriter
|
|||
});
|
||||
}
|
||||
if (!metricsKeyVsTag.isEmpty()) {
|
||||
fetchMetrics(solrNode, snitchContext, metricsKeyVsTag);
|
||||
fetchReplicaMetrics(solrNode, snitchContext, metricsKeyVsTag);
|
||||
}
|
||||
|
||||
Set<String> groups = new HashSet<>();
|
||||
|
|
|
@ -35,6 +35,7 @@ import java.util.concurrent.atomic.AtomicInteger;
|
|||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.ImmutableSet;
|
||||
import org.apache.solr.SolrTestCaseJ4;
|
||||
import org.apache.solr.client.solrj.SolrRequest;
|
||||
import org.apache.solr.client.solrj.SolrResponse;
|
||||
|
@ -45,12 +46,15 @@ import org.apache.solr.client.solrj.cloud.SolrCloudManager;
|
|||
import org.apache.solr.client.solrj.cloud.autoscaling.Clause.RangeVal;
|
||||
import org.apache.solr.client.solrj.cloud.autoscaling.Suggester.Hint;
|
||||
import org.apache.solr.client.solrj.impl.ClusterStateProvider;
|
||||
import org.apache.solr.client.solrj.impl.SolrClientNodeStateProvider;
|
||||
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
|
||||
import org.apache.solr.common.cloud.ClusterState;
|
||||
import org.apache.solr.common.cloud.DocCollection;
|
||||
import org.apache.solr.common.cloud.DocRouter;
|
||||
import org.apache.solr.common.cloud.Replica;
|
||||
import org.apache.solr.common.cloud.ReplicaPosition;
|
||||
import org.apache.solr.common.cloud.ZkStateReader;
|
||||
import org.apache.solr.common.cloud.rule.ImplicitSnitch;
|
||||
import org.apache.solr.common.params.CollectionParams;
|
||||
import org.apache.solr.common.params.CollectionParams.CollectionAction;
|
||||
import org.apache.solr.common.params.SolrParams;
|
||||
|
@ -67,6 +71,7 @@ import org.junit.Test;
|
|||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import static java.nio.charset.StandardCharsets.UTF_8;
|
||||
import static org.apache.solr.client.solrj.cloud.autoscaling.Suggestion.ConditionType.FREEDISK;
|
||||
import static org.apache.solr.client.solrj.cloud.autoscaling.Suggestion.ConditionType.REPLICA;
|
||||
import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDREPLICA;
|
||||
|
@ -262,6 +267,154 @@ public class TestPolicy extends SolrTestCaseJ4 {
|
|||
expectThrows(IllegalArgumentException.class,
|
||||
() -> Clause.create((Map<String, Object>) Utils.fromJSONString("{replica: '20%-33%', node:'#ANY'}")));
|
||||
|
||||
clause = Clause.create((Map<String, Object>) Utils.fromJSONString("{replica: '#EQUAL', shard:'#EACH', node:'#ANY'}"));
|
||||
assertEquals(Operand.RANGE_EQUAL, clause.replica.op);
|
||||
clause = Clause.create((Map<String, Object>) Utils.fromJSONString("{replica: '#EQUAL', node:'#ANY'}"));
|
||||
assertEquals(Operand.RANGE_EQUAL, clause.replica.op);
|
||||
expectThrows(IllegalArgumentException.class,
|
||||
() -> Clause.create((Map<String, Object>) Utils.fromJSONString("{replica: '#EQUAL', node:'node_1'}")));
|
||||
}
|
||||
|
||||
|
||||
public void testEqualFunction() {
|
||||
|
||||
String clusterStateStr = "{" +
|
||||
" 'coll1': {" +
|
||||
" 'router': {" +
|
||||
" 'name': 'compositeId'" +
|
||||
" }," +
|
||||
" 'shards': {" +
|
||||
" 'shard1': {" +
|
||||
" 'range': '80000000-ffffffff'," +
|
||||
" 'replicas': {" +
|
||||
" 'r1': {" +
|
||||
" 'core': 'r1'," +
|
||||
" 'base_url': 'http://10.0.0.4:8983/solr'," +
|
||||
" 'node_name': 'node1'," +
|
||||
" 'state': 'active'," +
|
||||
" 'leader': 'true'" +
|
||||
" }," +
|
||||
" 'r2': {" +
|
||||
" 'core': 'r2'," +
|
||||
" 'base_url': 'http://10.0.0.4:7574/solr'," +
|
||||
" 'node_name': 'node2'," +
|
||||
" 'state': 'active'" +
|
||||
" }" +
|
||||
" }" +
|
||||
" }," +
|
||||
" 'shard2': {" +
|
||||
" 'range': '0-7fffffff'," +
|
||||
" 'replicas': {" +
|
||||
" 'r3': {" +
|
||||
" 'core': 'r3'," +
|
||||
" 'base_url': 'http://10.0.0.4:8983/solr'," +
|
||||
" 'node_name': 'node1'," +
|
||||
" 'state': 'active'," +
|
||||
" 'leader': 'true'" +
|
||||
" }," +
|
||||
" 'r4': {" +
|
||||
" 'core': 'r4'," +
|
||||
" 'base_url': 'http://10.0.0.4:8987/solr'," +
|
||||
" 'node_name': 'node4'," +
|
||||
" 'state': 'active'" +
|
||||
" }," +
|
||||
" 'r6': {" +
|
||||
" 'core': 'r6'," +
|
||||
" 'base_url': 'http://10.0.0.4:8989/solr'," +
|
||||
" 'node_name': 'node3'," +
|
||||
" 'state': 'active'" +
|
||||
" }," +
|
||||
" 'r5': {" +
|
||||
" 'core': 'r5'," +
|
||||
" 'base_url': 'http://10.0.0.4:8983/solr'," +
|
||||
" 'node_name': 'node1'," +
|
||||
" 'state': 'active'" +
|
||||
" }" +
|
||||
" }" +
|
||||
" }" +
|
||||
" }" +
|
||||
" }" +
|
||||
"}";
|
||||
|
||||
|
||||
ClusterState clusterState = ClusterState.load(1, clusterStateStr.getBytes(UTF_8),
|
||||
ImmutableSet.of("node1", "node2", "node3", "node4", "node5"));
|
||||
DelegatingClusterStateProvider clusterStateProvider = new DelegatingClusterStateProvider(null) {
|
||||
@Override
|
||||
public ClusterState getClusterState() throws IOException {
|
||||
return clusterState;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Set<String> getLiveNodes() {
|
||||
return clusterState.getLiveNodes();
|
||||
}
|
||||
};
|
||||
|
||||
SolrClientNodeStateProvider solrClientNodeStateProvider = new SolrClientNodeStateProvider(null) {
|
||||
@Override
|
||||
protected Map<String, Object> fetchTagValues(String node, Collection<String> tags) {
|
||||
Map<String, Object> result = new HashMap<>();
|
||||
AtomicInteger cores = new AtomicInteger();
|
||||
forEachReplica(node, replicaInfo -> cores.incrementAndGet());
|
||||
if (tags.contains(ImplicitSnitch.CORES)) result.put(ImplicitSnitch.CORES, cores.get());
|
||||
if (tags.contains(ImplicitSnitch.DISK)) result.put(ImplicitSnitch.DISK, 100);
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Map<String, Object> fetchReplicaMetrics(String solrNode, Map<String, Object> metricsKeyVsTag) {
|
||||
//e.g: solr.core.perReplicaDataColl.shard1.replica_n4:INDEX.sizeInBytes
|
||||
Map<String, Object> result = new HashMap<>();
|
||||
metricsKeyVsTag.forEach((k, v) -> {
|
||||
if (k.endsWith(":INDEX.sizeInBytes")) result.put(k, 100);
|
||||
});
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ClusterStateProvider getClusterStateProvider() {
|
||||
return clusterStateProvider;
|
||||
}
|
||||
};
|
||||
|
||||
Map policies = (Map) Utils.fromJSONString("{" +
|
||||
" 'cluster-preferences': [" +
|
||||
" { 'minimize': 'cores', 'precision': 50}" +
|
||||
" ]," +
|
||||
" 'cluster-policy': [" +
|
||||
" { 'replica': '#EQUAL', 'node': '#ANY'}," +
|
||||
" ]" +
|
||||
"}");
|
||||
AutoScalingConfig config = new AutoScalingConfig(policies);
|
||||
Policy policy = config.getPolicy();
|
||||
Policy.Session session = policy.createSession(new DelegatingCloudManager(null) {
|
||||
@Override
|
||||
public ClusterStateProvider getClusterStateProvider() {
|
||||
return clusterStateProvider;
|
||||
}
|
||||
|
||||
@Override
|
||||
public NodeStateProvider getNodeStateProvider() {
|
||||
return solrClientNodeStateProvider;
|
||||
}
|
||||
});
|
||||
List<Violation> violations = session.getViolations();
|
||||
assertEquals(1, violations.size());
|
||||
Violation violation = violations.get(0);
|
||||
assertEquals("node1", violation.node);
|
||||
RangeVal val = (RangeVal) violation.getClause().replica.val;
|
||||
assertEquals(0.0, val.min);
|
||||
assertEquals(1.0, val.max);
|
||||
assertEquals(0, Preference.compareWithTolerance(val.actual.doubleValue(), 0.833, 1));
|
||||
assertEquals(3, violation.getViolatingReplicas().size());
|
||||
Set<String> expected = ImmutableSet.of("r1", "r3", "r5");
|
||||
for (Violation.ReplicaInfoAndErr replicaInfoAndErr : violation.getViolatingReplicas()) {
|
||||
assertTrue(expected.contains(replicaInfoAndErr.replicaInfo.getCore()));
|
||||
}
|
||||
System.out.println();
|
||||
|
||||
}
|
||||
|
||||
private static void expectError(String name, Object val, String msg) {
|
||||
|
|
Loading…
Reference in New Issue