SOLR-12806: use autoscaling policies with strict=false to prioritize node allocation

This commit is contained in:
Noble Paul 2018-10-16 16:56:51 +11:00
parent 306065fca8
commit 9c7b8564d8
12 changed files with 202 additions and 35 deletions

View File

@ -203,6 +203,8 @@ Improvements
* SOLR-12739: Make autoscaling policy based replica placement the default strategy for placing replicas. (shalin)
* SOLR-12806: use autoscaling policies with strict=false to prioritize node allocation (noble)
================== 7.5.0 ==================
Consult the LUCENE_CHANGES.txt file for additional, low level, changes in this release.

View File

@ -47,19 +47,16 @@ class AddReplicaSuggester extends Suggester {
//iterate through nodes and identify the least loaded
List<Violation> leastSeriousViolation = null;
Row bestNode = null;
double[] bestDeviation = null;
for (int i = getMatrix().size() - 1; i >= 0; i--) {
Row row = getMatrix().get(i);
if (!isNodeSuitableForReplicaAddition(row)) continue;
Row tmpRow = row.addReplica(shard.first(), shard.second(), type, strict);
double[] deviation = new double[1];
List<Violation> errs = testChangedMatrix(strict, tmpRow.session, deviation);
List<Violation> errs = testChangedMatrix(strict, tmpRow.session);
if (!containsNewErrors(errs)) {
if ((errs.isEmpty() && isLessDeviant(bestDeviation, deviation)) ||//there are no violations but this is deviating less
if ((errs.isEmpty() && isLessDeviant()) ||//there are no violations but this is deviating less
isLessSerious(errs, leastSeriousViolation)) {//there are errors , but this has less serious violation
leastSeriousViolation = errs;
bestNode = tmpRow;
bestDeviation = deviation;
}
}
}

View File

@ -60,6 +60,7 @@ public class Clause implements MapWriter, Comparable<Clause> {
final boolean hasComputedValue;
final Map<String, Object> original;
final Clause derivedFrom;
Condition collection, shard, replica, tag, globalTag;
final Replica.Type type;
boolean strict;
@ -74,6 +75,7 @@ public class Clause implements MapWriter, Comparable<Clause> {
this.globalTag = evaluateValue(clause.globalTag, computedValueEvaluator);
this.hasComputedValue = clause.hasComputedValue;
this.strict = clause.strict;
derivedFrom = clause.derivedFrom;
}
// internal use only
@ -85,9 +87,11 @@ public class Clause implements MapWriter, Comparable<Clause> {
this.type = null;
this.hasComputedValue = false;
this.strict = isStrict;
derivedFrom = null;
}
private Clause(Map<String, Object> m) {
derivedFrom = (Clause) m.remove(Clause.class.getName());
this.original = Utils.getDeepCopy(m, 10);
String type = (String) m.get("type");
this.type = type == null || ANY.equals(type) ? null : Replica.Type.valueOf(type.toUpperCase(Locale.ROOT));
@ -371,10 +375,10 @@ public class Clause implements MapWriter, Comparable<Clause> {
Condition tag = this.tag;
if (tag.computedType != null) tag = evaluateValue(tag, eval);
Object val = row.getVal(tag.name);
if (val != null && tag.isPass(val)) {
if (val != null) {
if (tag.op == LESS_THAN || tag.op == GREATER_THAN) {
tags.add(this.tag);
} else {
} else if (tag.isPass(val)) {
tags.add(val);
}
}
@ -413,23 +417,17 @@ public class Clause implements MapWriter, Comparable<Clause> {
t);
ctx.resetAndAddViolation(t, replicaCountCopy, violation);
sealedClause.addViolatingReplicas(sealedClause.tag, eval, ctx, tag.name, t, violation, session);
if (!this.strict && deviations != null) {
tag.varType.computeDeviation(session, deviations, replicaCount, sealedClause);
}
} else {
computeDeviation(deviations, replicaCount, sealedClause);
if (replica.op == RANGE_EQUAL) tag.varType.computeDeviation(session, deviations, replicaCount, sealedClause);
}
}
}
return ctx.allViolations;
}
private void computeDeviation(double[] deviations, ReplicaCount replicaCount, SealedClause sealedClause) {
if (deviations != null && sealedClause.replica.op == RANGE_EQUAL) {
Number actualCount = replicaCount.getVal(type);
Double realDelta = ((RangeVal) sealedClause.replica.val).realDelta(actualCount.doubleValue());
realDelta = this.isReplicaZero() ? -1 * realDelta : realDelta;
deviations[0] += Math.abs(realDelta);
}
}
void addViolatingReplicas(Condition tag,
ComputedValueEvaluator eval,
Violation.Ctx ctx, String tagName, Object tagVal,
@ -485,8 +483,11 @@ public class Clause implements MapWriter, Comparable<Clause> {
eval.node);
ctx.resetAndAddViolation(row.node, replicaCountCopy, violation);
sealedClause.addViolatingReplicas(sealedClause.tag, eval, ctx, NODE, row.node, violation, session);
if (!this.strict && deviations != null) {
tag.varType.computeDeviation(session, deviations, replicaCount, sealedClause);
}
} else {
computeDeviation(deviations, replicaCount, sealedClause);
if (replica.op == RANGE_EQUAL) tag.varType.computeDeviation(session, deviations, replicaCount, sealedClause);
}
}
}
@ -627,6 +628,10 @@ public class Clause implements MapWriter, Comparable<Clause> {
throw new RuntimeException(name + ": " + val + "not a valid number");
}
@Override
public int hashCode() {
return original.hashCode();
}
public static Double parseDouble(String name, Object val) {
if (val == null) return null;

View File

@ -26,12 +26,13 @@ import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.solr.client.solrj.cloud.autoscaling.Suggester.Hint;
import org.apache.solr.common.cloud.rule.ImplicitSnitch;
import org.apache.solr.common.util.Pair;
import static org.apache.solr.client.solrj.cloud.autoscaling.Suggestion.suggestNegativeViolations;
import static org.apache.solr.client.solrj.cloud.autoscaling.Variable.Type.CORE_IDX;
import static org.apache.solr.client.solrj.cloud.autoscaling.Variable.Type.FREEDISK;
import static org.apache.solr.client.solrj.cloud.autoscaling.Variable.Type.TOTALDISK;
import static org.apache.solr.common.cloud.rule.ImplicitSnitch.DISK;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.MOVEREPLICA;
public class FreeDiskVariable extends VariableBase {
@ -42,7 +43,7 @@ public class FreeDiskVariable extends VariableBase {
@Override
public Object convertVal(Object val) {
Number value = (Number) super.validate(ImplicitSnitch.DISK, val, false);
Number value = (Number) super.validate(FREEDISK.tagName, val, false);
if (value != null) {
value = value.doubleValue() / 1024.0d / 1024.0d / 1024.0d;
}
@ -69,6 +70,19 @@ public class FreeDiskVariable extends VariableBase {
v2.getViolatingReplicas().stream().mapToDouble(v3 -> v3.delta == null ? 0 : v3.delta).max().orElse(0d));
}
@Override
public void computeDeviation(Policy.Session session, double[] deviation, ReplicaCount replicaCount,
SealedClause sealedClause) {
if (deviation == null) return;
for (Row node : session.matrix) {
Object val = node.getVal(sealedClause.tag.name);
Double delta = sealedClause.tag.delta(val);
if (delta != null) {
deviation[0] += Math.abs(delta);
}
}
}
@Override
public void getSuggestions(Suggestion.Ctx ctx) {
if (ctx.violation == null) return;
@ -77,7 +91,7 @@ public class FreeDiskVariable extends VariableBase {
row -> ctx.violation.getViolatingReplicas()
.stream()
.anyMatch(p -> row.node.equals(p.replicaInfo.getNode())))
.sorted(Comparator.comparing(r -> ((Double) r.getVal(ImplicitSnitch.DISK, 0d))))
.sorted(Comparator.comparing(r -> ((Double) r.getVal(DISK, 0d))))
.collect(Collectors.toList());
@ -91,7 +105,7 @@ public class FreeDiskVariable extends VariableBase {
if (s1 != null && s2 != null) return s1.compareTo(s2);
return 0;
});
double currentDelta = ctx.violation.getClause().tag.delta(node.getVal(ImplicitSnitch.DISK));
double currentDelta = ctx.violation.getClause().tag.delta(node.getVal(DISK));
for (ReplicaInfo replica : replicas) {
if (currentDelta < 1) break;
if (replica.getVariables().get(CORE_IDX.tagName) == null) continue;

View File

@ -49,6 +49,7 @@ public class MoveReplicaSuggester extends Suggester {
List<Pair<ReplicaInfo, Row>> validReplicas = getValidReplicas(true, true, -1);
validReplicas.sort(leaderLast);
for (int i1 = 0; i1 < validReplicas.size(); i1++) {
lastBestDeviation = null;
Pair<ReplicaInfo, Row> fromReplica = validReplicas.get(i1);
Row fromRow = fromReplica.second();
ReplicaInfo ri = fromReplica.first();
@ -62,8 +63,7 @@ public class MoveReplicaSuggester extends Suggester {
if (!isNodeSuitableForReplicaAddition(targetRow)) continue;
targetRow = targetRow.addReplica(ri.getCollection(), ri.getShard(), ri.getType(), strict); // add replica to target first
Row srcRowModified = targetRow.session.getNode(fromRow.node).removeReplica(ri.getCollection(), ri.getShard(), ri.getType());//then remove replica from source node
double[] deviation = new double[1];
List<Violation> errs = testChangedMatrix(strict, srcRowModified.session, deviation);
List<Violation> errs = testChangedMatrix(strict, srcRowModified.session);
srcRowModified.session.applyRules(); // now resort the nodes with the new values
Policy.Session tmpSession = srcRowModified.session;

View File

@ -338,7 +338,10 @@ public class Policy implements MapWriter {
.filter(Clause::isPerCollectiontag)
.map(clause -> {
Map<String, Object> copy = new LinkedHashMap<>(clause.original);
if (!copy.containsKey("collection")) copy.put("collection", coll);
if (!copy.containsKey("collection")) {
copy.put("collection", coll);
copy.put(Clause.class.getName(), clause);
}
return Clause.create(copy);
})
.filter(it -> (it.getCollection().isPass(coll)))

View File

@ -25,6 +25,7 @@ import java.util.Collections;
import java.util.EnumMap;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@ -66,14 +67,27 @@ public abstract class Suggester implements MapWriter {
boolean force;
protected List<Violation> originalViolations = new ArrayList<>();
private boolean isInitialized = false;
LinkedHashMap<Clause, double[]> deviations, lastBestDeviation;
void _init(Policy.Session session) {
this.session = session.copy();
}
boolean isLessDeviant(double[] previousBest, double[] newDeviation) {
if (previousBest == null) return true;
return newDeviation[0] < previousBest[0];
boolean isLessDeviant() {
if (lastBestDeviation == null && deviations == null) return false;
if (deviations == null) return true;
if (lastBestDeviation == null) return false;
if (lastBestDeviation.size() < deviations.size()) return true;
for (Map.Entry<Clause, double[]> currentDeviation : deviations.entrySet()) {
double[] lastDeviation = lastBestDeviation.get(currentDeviation.getKey());
if (lastDeviation == null) return false;
int result = Preference.compareWithTolerance(currentDeviation.getValue()[0],
lastDeviation[0], 1);
if (result < 0) return true;
if (result > 0) return false;
}
return false;
}
public Suggester hint(Hint hint, Object value) {
hint.validator.accept(value);
@ -282,17 +296,22 @@ public abstract class Suggester implements MapWriter {
}
}
List<Violation> testChangedMatrix(boolean strict, Policy.Session session, double[] deviation) {
List<Violation> testChangedMatrix(boolean executeInStrictMode, Policy.Session session) {
if (this.deviations != null) this.lastBestDeviation = this.deviations;
this.deviations = null;
Policy.setApproxValuesAndSortNodes(session.getPolicy().clusterPreferences, session.matrix);
List<Violation> errors = new ArrayList<>();
for (Clause clause : session.expandedClauses) {
if (strict || clause.strict) {
List<Violation> errs = clause.test(session, deviation);
if (!errs.isEmpty()) {
errors.addAll(errs);
}
}
Clause originalClause = clause.derivedFrom == null ? clause : clause.derivedFrom;
// if (!executeInStrictMode && !clause.strict) {
if (this.deviations == null) this.deviations = new LinkedHashMap<>();
this.deviations.put(originalClause, new double[1]);
// }
List<Violation> errs = clause.test(session, this.deviations == null ? null : this.deviations.get(originalClause));
if (!errs.isEmpty() &&
(executeInStrictMode || clause.strict)) errors.addAll(errs);
}
if (!errors.isEmpty()) deviations = null;
return errors;
}

View File

@ -59,10 +59,24 @@ public interface Variable {
void getSuggestions(Suggestion.Ctx ctx);
/**When a non constant value is used in a variable, the actual value needs to be computed at the runtime
*
*/
default Object computeValue(Policy.Session session, Condition condition, String collection, String shard, String node) {
return condition.val;
}
default void computeDeviation(Policy.Session session, double[] deviations, ReplicaCount replicaCount, SealedClause sealedClause) {
if (deviations != null) {
Number actualCount = replicaCount.getVal(sealedClause.type);
if(sealedClause.replica.val instanceof RangeVal) {
Double realDelta = ((RangeVal) sealedClause.replica.val).realDelta(actualCount.doubleValue());
realDelta = sealedClause.isReplicaZero() ? -1 * realDelta : realDelta;
deviations[0] += Math.abs(realDelta);
}
}
}
int compareViolation(Violation v1, Violation v2);
default void projectRemoveReplica(Cell cell, ReplicaInfo ri, Consumer<Row.OperationInfo> opCollector) {
@ -327,6 +341,12 @@ public interface Variable {
return impl.computeValue(session, condition, collection, shard, node);
}
@Override
public void computeDeviation(Policy.Session session, double[] deviations, ReplicaCount replicaCount, SealedClause sealedClause) {
impl.computeDeviation(session, deviations, replicaCount, sealedClause);
}
@Override
public boolean match(Object inputVal, Operand op, Object val, String name, Row row) {
return impl.match(inputVal, op, val, name, row);

View File

@ -0,0 +1,20 @@
{
"diagnostics":{
"sortedNodes":[{
"node":"127.0.0.1:49469_solr",
"isLive":true,
"cores":0.0,
"freedisk":672.6827087402344,
"totaldisk":1037.938980102539,
"replicas":{}}
,{
"node":"127.0.0.1:49470_solr",
"isLive":true,
"cores":0.0,
"freedisk":672.6827087402344,
"totaldisk":1037.938980102539,
"replicas":{}}],
"liveNodes":["127.0.0.1:49469_solr",
"127.0.0.1:49470_solr"],
"violations":[],
"config":{}}}

View File

@ -0,0 +1,35 @@
{
"liveNodes": [
"node1",
"node2",
"node3"
],
"replicaInfo": {
"node1": {
"mycoll1": {
"shard3": [{"r3": {"type": "NRT", "INDEX.sizeInGB": 700}}],
"shard4": [{"r4": {"type": "NRT", "INDEX.sizeInGB": 400}}]
}
},
"node2": {
"mycoll1": {
"shard1": [{"r1": {"type": "NRT", "INDEX.sizeInGB": 450}}],
"shard2": [{"r2": {"type": "NRT", "INDEX.sizeInGB": 750}}]
}
},
"node3": {
"mycoll2": {
"shard1": [{"r1": {"type": "NRT", "INDEX.sizeInGB": 250}}]
}
}
},
"nodeValues": {
"node1": {"node": "node1", "cores": 2, "freedisk": 900},
"node2": {"node": "node2", "cores": 2, "freedisk": 800},
"node3": {"node": "node3", "cores": 1, "freedisk": 1200}
},
"config": {
"cluster-policy": [{"replica":"<2", "shard":"#EACH", "node":"#ANY"},
{"replica": "#ALL", "freedisk": ">700", "strict": false}]
}
}

View File

@ -41,6 +41,7 @@ import com.google.common.collect.ImmutableSet;
import org.apache.solr.SolrTestCaseJ4;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.SolrResponse;
import org.apache.solr.client.solrj.V2RequestSupport;
import org.apache.solr.client.solrj.cloud.DistribStateManager;
import org.apache.solr.client.solrj.cloud.DistributedQueueFactory;
import org.apache.solr.client.solrj.cloud.NodeStateProvider;
@ -2446,6 +2447,38 @@ public class TestPolicy extends SolrTestCaseJ4 {
latestSession.matrix.get(1).forEachReplica(replicaInfo -> count.incrementAndGet());
assertEquals(4, count.get());
}
public void testFreeDiskDeviation() throws IOException {
Map map = (Map) TestPolicy2.loadFromResource("testFreeDiskDeviation.json");
AutoScalingConfig cfg = new AutoScalingConfig((Map<String, Object>) map.get("config"));
SolrCloudManager scm = cloudManagerWithData(map);
Suggester suggester = cfg.getPolicy()
.createSession(scm)
.getSuggester(ADDREPLICA);
MapWriter v2Request = (MapWriter) ((V2RequestSupport) suggester
.hint(Hint.COLL_SHARD, new Pair<>("mycoll2", "shard1"))
.getSuggestion()
.setUseV2(true))
.getV2Request();
assertEquals("/c/mycoll2/shards", v2Request._get("path",null));
assertEquals("add-replica", v2Request._get("command[0]/key",null));
assertEquals("node1", v2Request._get("command/add-replica/node",null));
suggester = suggester.getSession()
.getSuggester(ADDREPLICA);
v2Request = (MapWriter) ((V2RequestSupport) suggester
.hint(Hint.COLL_SHARD, new Pair<>("mycoll2", "shard1"))
.getSuggestion()
.setUseV2(true))
.getV2Request();
assertEquals("/c/mycoll2/shards", v2Request._get("path",null));
assertEquals("add-replica", v2Request._get("command[0]/key",null));
assertEquals("node2", v2Request._get("command/add-replica/node",null));
}

View File

@ -32,6 +32,7 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import com.google.common.collect.ImmutableSet;
import org.apache.solr.SolrTestCaseJ4;
@ -40,10 +41,12 @@ import org.apache.solr.client.solrj.cloud.SolrCloudManager;
import org.apache.solr.client.solrj.impl.ClusterStateProvider;
import org.apache.solr.client.solrj.impl.SolrClientNodeStateProvider;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.ReplicaPosition;
import org.apache.solr.common.util.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static java.util.Collections.EMPTY_MAP;
import static java.util.Collections.emptyMap;
import static org.apache.solr.client.solrj.cloud.autoscaling.Variable.Type.CORES;
@ -448,6 +451,22 @@ public class TestPolicy2 extends SolrTestCaseJ4 {
}
public void testCreateCollectionWithEmptyPolicy() throws IOException {
Map m = (Map) loadFromResource("testCreateCollectionWithEmptyPolicy.json");
SolrCloudManager cloudManagerFromDiagnostics = createCloudManagerFromDiagnostics(m);
AutoScalingConfig autoScalingConfig = new AutoScalingConfig(new HashMap());
///Users/noble/work/4solr/solr/core/src/test/org/apache/solr/handler/V2ApiIntegrationTest.java
//POSITIONS : [shard1:1[NRT] @127.0.0.1:49469_solr, shard1:2[NRT] @127.0.0.1:49469_solr]
List<ReplicaPosition> positions = PolicyHelper.getReplicaLocations("coll_new", autoScalingConfig, cloudManagerFromDiagnostics,
EMPTY_MAP, Collections.singletonList("shard1"), 2, 0, 0, null);
List<String> nodes = positions.stream().map(count -> count.node).collect(Collectors.toList());
assertTrue(nodes.contains("127.0.0.1:49469_solr"));
assertTrue(nodes.contains("127.0.0.1:49470_solr"));
}
public static Object loadFromResource(String file) throws IOException {
try (InputStream is = TestPolicy2.class.getResourceAsStream("/solrj/solr/autoscaling/" + file)) {
return Utils.fromJSON(is);