MoveSuggester should do greedy check

This commit is contained in:
Noble Paul 2017-05-26 11:32:34 +09:30
parent a0cd8decc6
commit f4c186c9cb
3 changed files with 57 additions and 38 deletions

View File

@ -217,6 +217,11 @@ public class Clause implements MapWriter, Comparable<Clause> {
public int hashCode() { public int hashCode() {
return hash; return hash;
} }
//if the delta is lower , this violation is less serious
public boolean isLessSerious(Violation that) {
return that.delta != null && delta != null &&
Math.abs(delta) < Math.abs(that.delta);
}
@Override @Override
public boolean equals(Object that) { public boolean equals(Object that) {

View File

@ -22,6 +22,7 @@ import java.util.List;
import org.apache.solr.client.solrj.SolrRequest; import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.request.CollectionAdminRequest; import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.cloud.autoscaling.Clause.Violation; import org.apache.solr.cloud.autoscaling.Clause.Violation;
import org.apache.solr.cloud.autoscaling.Policy.ReplicaInfo;
import org.apache.solr.cloud.autoscaling.Policy.Suggester; import org.apache.solr.cloud.autoscaling.Policy.Suggester;
import org.apache.solr.common.util.Pair; import org.apache.solr.common.util.Pair;
@ -36,11 +37,16 @@ public class MoveReplicaSuggester extends Suggester {
SolrRequest tryEachNode(boolean strict) { SolrRequest tryEachNode(boolean strict) {
//iterate through elements and identify the least loaded //iterate through elements and identify the least loaded
for (Pair<Policy.ReplicaInfo, Row> fromReplica : getValidReplicas(true, true, -1)) { List<Clause.Violation> leastSeriousViolation = null;
Integer targetNodeIndex = null;
Integer fromNodeIndex = null;
ReplicaInfo fromReplicaInfo = null;
for (Pair<ReplicaInfo, Row> fromReplica : getValidReplicas(true, true, -1)) {
Row fromRow = fromReplica.second(); Row fromRow = fromReplica.second();
String coll = fromReplica.first().collection; ReplicaInfo replicaInfo = fromReplica.first();
String shard = fromReplica.first().shard; String coll = replicaInfo.collection;
Pair<Row, Policy.ReplicaInfo> pair = fromRow.removeReplica(coll, shard); String shard = replicaInfo.shard;
Pair<Row, ReplicaInfo> pair = fromRow.removeReplica(coll, shard);
Row tmpRow = pair.first(); Row tmpRow = pair.first();
if (tmpRow == null) { if (tmpRow == null) {
//no such replica available //no such replica available
@ -55,15 +61,21 @@ public class MoveReplicaSuggester extends Suggester {
targetRow = targetRow.addReplica(coll, shard); targetRow = targetRow.addReplica(coll, shard);
targetRow.violations.clear(); targetRow.violations.clear();
List<Violation> errs = testChangedRow(strict, getModifiedMatrix(getModifiedMatrix(getMatrix(), tmpRow, i), targetRow, j)); List<Violation> errs = testChangedRow(strict, getModifiedMatrix(getModifiedMatrix(getMatrix(), tmpRow, i), targetRow, j));
if (!containsNewErrors(errs)) { if (!containsNewErrors(errs) && isLessSerious(errs, leastSeriousViolation)) {
getMatrix().set(i, getMatrix().get(i).removeReplica(coll, shard).first()); leastSeriousViolation = errs;
getMatrix().set(j, getMatrix().get(j).addReplica(coll, shard)); targetNodeIndex = j;
fromNodeIndex = i;
fromReplicaInfo = replicaInfo;
}
}
}
if (targetNodeIndex != null && fromNodeIndex != null) {
getMatrix().set(fromNodeIndex, getMatrix().get(fromNodeIndex).removeReplica(fromReplicaInfo.collection, fromReplicaInfo.shard).first());
getMatrix().set(targetNodeIndex, getMatrix().get(targetNodeIndex).addReplica(fromReplicaInfo.collection, fromReplicaInfo.shard));
return new CollectionAdminRequest.MoveReplica( return new CollectionAdminRequest.MoveReplica(
coll, fromReplicaInfo.collection,
pair.second().name, fromReplicaInfo.name,
targetRow.node); getMatrix().get(targetNodeIndex).node);
}
}
} }
return null; return null;
} }

View File

@ -216,7 +216,7 @@ public class Policy implements MapWriter {
//approximate values are set now. Let's do recursive sorting //approximate values are set now. Let's do recursive sorting
Collections.sort(matrix, (r1, r2) -> { Collections.sort(matrix, (r1, r2) -> {
int result = clusterPreferences.get(0).compare(r1, r2, true); int result = clusterPreferences.get(0).compare(r1, r2, true);
if(result == 0) result = clusterPreferences.get(0).compare(r1, r2, false); if (result == 0) result = clusterPreferences.get(0).compare(r1, r2, false);
return result; return result;
}); });
} }
@ -291,7 +291,7 @@ public class Policy implements MapWriter {
public static class ReplicaInfo implements MapWriter { public static class ReplicaInfo implements MapWriter {
final String name; final String name;
String core,collection,shard; String core, collection, shard;
Map<String, Object> variables; Map<String, Object> variables;
public ReplicaInfo(String name, String coll, String shard, Map<String, Object> vals) { public ReplicaInfo(String name, String coll, String shard, Map<String, Object> vals) {
@ -305,14 +305,16 @@ public class Policy implements MapWriter {
public void writeMap(EntryWriter ew) throws IOException { public void writeMap(EntryWriter ew) throws IOException {
ew.put(name, variables); ew.put(name, variables);
} }
public String getCore(){
public String getCore() {
return core; return core;
} }
public String getCollection(){
public String getCollection() {
return collection; return collection;
} }
public String getShard(){ public String getShard() {
return shard; return shard;
} }
} }
@ -351,16 +353,16 @@ public class Policy implements MapWriter {
String shard = (String) hints.get(Hint.SHARD); String shard = (String) hints.get(Hint.SHARD);
// if this is not a known collection from the existing clusterstate, // if this is not a known collection from the existing clusterstate,
// then add it // then add it
if(session.matrix.stream().noneMatch(row -> row.replicaInfo.containsKey(coll))){ if (session.matrix.stream().noneMatch(row -> row.replicaInfo.containsKey(coll))) {
session.addClausesForCollection(session.dataProvider, coll); session.addClausesForCollection(session.dataProvider, coll);
Collections.sort(session.expandedClauses); Collections.sort(session.expandedClauses);
} }
if(coll != null) { if (coll != null) {
for (Row row : session.matrix) { for (Row row : session.matrix) {
if (!row.replicaInfo.containsKey(coll)) row.replicaInfo.put(coll, new HashMap<>()); if (!row.replicaInfo.containsKey(coll)) row.replicaInfo.put(coll, new HashMap<>());
if(shard != null){ if (shard != null) {
Map<String, List<ReplicaInfo>> shardInfo = row.replicaInfo.get(coll); Map<String, List<ReplicaInfo>> shardInfo = row.replicaInfo.get(coll);
if(!shardInfo.containsKey(shard)) shardInfo.put(shard, new ArrayList<>()); if (!shardInfo.containsKey(shard)) shardInfo.put(shard, new ArrayList<>());
} }
} }
} }
@ -381,41 +383,40 @@ public class Policy implements MapWriter {
} }
//check if the fresh set of violations is less serious than the last set of violations
boolean isLessSerious(List<Violation> fresh, List<Violation> old) { boolean isLessSerious(List<Violation> fresh, List<Violation> old) {
if (old == null || fresh.size() < old.size()) return true; if (old == null || fresh.size() < old.size()) return true;
if(fresh.size() == old.size()){ if (fresh.size() == old.size()) {
for (int i = 0; i < fresh.size(); i++) { for (int i = 0; i < fresh.size(); i++) {
Violation freshViolation = fresh.get(i); Violation freshViolation = fresh.get(i);
Violation oldViolation = null; Violation oldViolation = null;
for (Violation v : old) { for (Violation v : old) {
if(v.equals(freshViolation)){ if (v.equals(freshViolation)) oldViolation = v;
oldViolation =v;
} }
if (oldViolation != null && freshViolation.isLessSerious(oldViolation)) return true;
} }
if (oldViolation != null && oldViolation.delta != null &&
Math.abs(fresh.get(i).delta) < Math.abs(oldViolation.delta)) return true;
}
}
return false;
}
boolean containsNewErrors(List<Violation> errs){
for (Clause.Violation err : errs) {
if(!originalViolations.contains(err)) return true;
} }
return false; return false;
} }
boolean containsNewErrors(List<Violation> violations) {
for (Violation v : violations) {
int idx = originalViolations.indexOf(v);
if (idx < 0 || originalViolations.get(idx).isLessSerious(v)) return true;
}
return false;
}
List<Pair<ReplicaInfo, Row>> getValidReplicas(boolean sortDesc, boolean isSource, int until) { List<Pair<ReplicaInfo, Row>> getValidReplicas(boolean sortDesc, boolean isSource, int until) {
List<Pair<Policy.ReplicaInfo, Row>> allPossibleReplicas = new ArrayList<>(); List<Pair<Policy.ReplicaInfo, Row>> allPossibleReplicas = new ArrayList<>();
if (sortDesc) { if (sortDesc) {
if(until == -1) until = getMatrix().size(); if (until == -1) until = getMatrix().size();
for (int i = 0; i < until; i++) addReplicaToList(getMatrix().get(i), isSource, allPossibleReplicas); for (int i = 0; i < until; i++) addReplicaToList(getMatrix().get(i), isSource, allPossibleReplicas);
} else { } else {
if(until == -1) until = 0; if (until == -1) until = 0;
for (int i = getMatrix().size() - 1; i >= until; i--) addReplicaToList(getMatrix().get(i), isSource, allPossibleReplicas); for (int i = getMatrix().size() - 1; i >= until; i--)
addReplicaToList(getMatrix().get(i), isSource, allPossibleReplicas);
} }
return allPossibleReplicas; return allPossibleReplicas;
} }
@ -430,7 +431,8 @@ public class Policy implements MapWriter {
} }
} }
} }
protected List<Violation> testChangedRow(boolean strict,List<Row> rows) {
protected List<Violation> testChangedRow(boolean strict, List<Row> rows) {
List<Violation> errors = new ArrayList<>(); List<Violation> errors = new ArrayList<>();
for (Clause clause : session.expandedClauses) { for (Clause clause : session.expandedClauses) {
if (strict || clause.strict) { if (strict || clause.strict) {
@ -443,7 +445,7 @@ public class Policy implements MapWriter {
return errors; return errors;
} }
ArrayList<Row> getModifiedMatrix(List<Row> matrix ,Row tmpRow, int i) { ArrayList<Row> getModifiedMatrix(List<Row> matrix, Row tmpRow, int i) {
ArrayList<Row> copy = new ArrayList<>(matrix); ArrayList<Row> copy = new ArrayList<>(matrix);
copy.set(i, tmpRow); copy.set(i, tmpRow);
return copy; return copy;