SOLR-11538: Implement port suggestion

This commit is contained in:
Noble Paul 2017-11-14 23:23:35 +10:30
parent fc678da2dd
commit 65cd8bbbf4
5 changed files with 95 additions and 14 deletions

View File

@ -51,14 +51,16 @@ public class MoveReplicaSuggester extends Suggester {
} }
final int i = getMatrix().indexOf(fromRow); final int i = getMatrix().indexOf(fromRow);
for (int j = getMatrix().size() - 1; j > i; j--) { int stopAt = force ? 0 : i;
for (int j = getMatrix().size() - 1; j >= stopAt; j--) {
if (j == i) continue;
Row targetRow = getMatrix().get(j); Row targetRow = getMatrix().get(j);
if(!targetRow.isLive) continue; if(!targetRow.isLive) continue;
if (!isAllowed(targetRow.node, Hint.TARGET_NODE)) continue; if (!isAllowed(targetRow.node, Hint.TARGET_NODE)) continue;
targetRow = targetRow.addReplica(coll, shard, replicaInfo.getType()); targetRow = targetRow.addReplica(coll, shard, replicaInfo.getType());
List<Violation> errs = testChangedMatrix(strict, getModifiedMatrix(getModifiedMatrix(getMatrix(), srcTmpRow, i), targetRow, j)); List<Violation> errs = testChangedMatrix(strict, getModifiedMatrix(getModifiedMatrix(getMatrix(), srcTmpRow, i), targetRow, j));
if (!containsNewErrors(errs) && isLessSerious(errs, leastSeriousViolation) && if (!containsNewErrors(errs) && isLessSerious(errs, leastSeriousViolation) &&
Policy.compareRows(srcTmpRow, targetRow, session.getPolicy()) < 1) { (force || Policy.compareRows(srcTmpRow, targetRow, session.getPolicy()) < 1)) {
leastSeriousViolation = errs; leastSeriousViolation = errs;
targetNodeIndex = j; targetNodeIndex = j;
sourceNodeIndex = i; sourceNodeIndex = i;

View File

@ -47,6 +47,7 @@ public abstract class Suggester {
protected final EnumMap<Hint, Object> hints = new EnumMap<>(Hint.class); protected final EnumMap<Hint, Object> hints = new EnumMap<>(Hint.class);
Policy.Session session; Policy.Session session;
SolrRequest operation; SolrRequest operation;
boolean force;
protected List<Violation> originalViolations = new ArrayList<>(); protected List<Violation> originalViolations = new ArrayList<>();
private boolean isInitialized = false; private boolean isInitialized = false;
@ -65,6 +66,15 @@ public abstract class Suggester {
return this; return this;
} }
/**
* Normally, only less loaded nodes are used for moving replicas. If this is a violation and a MOVE must be performed,
* set the flag to true.
*/
public Suggester forceOperation(boolean force) {
this.force = force;
return this;
}
abstract SolrRequest init(); abstract SolrRequest init();

View File

@ -109,7 +109,7 @@ public class Suggestion {
COLL("collection", String.class, null, null, null), COLL("collection", String.class, null, null, null),
SHARD("shard", String.class, null, null, null), SHARD("shard", String.class, null, null, null),
REPLICA("replica", Long.class, null, 0L, null), REPLICA("replica", Long.class, null, 0L, null),
PORT(ImplicitSnitch.PORT, Long.class, null, 1L, 65535L), PORT(ImplicitSnitch.PORT, Long.class, null, 1L, 65535L) ,
IP_1("ip_1", Long.class, null, 0L, 255L), IP_1("ip_1", Long.class, null, 0L, 255L),
IP_2("ip_2", Long.class, null, 0L, 255L), IP_2("ip_2", Long.class, null, 0L, 255L),
IP_3("ip_3", Long.class, null, 0L, 255L), IP_3("ip_3", Long.class, null, 0L, 255L),
@ -222,13 +222,7 @@ public class Suggestion {
@Override @Override
public void getSuggestions(SuggestionCtx ctx) { public void getSuggestions(SuggestionCtx ctx) {
if(ctx.violation == null)return; perNodeSuggestions(ctx);
for (ReplicaInfoAndErr e : ctx.violation.getViolatingReplicas()) {
Suggester suggester = ctx.session.getSuggester(MOVEREPLICA)
.hint(Suggester.Hint.COLL_SHARD, new Pair<>( e.replicaInfo.getCollection(), e.replicaInfo.getShard()))
.hint(Suggester.Hint.SRC_NODE, e.replicaInfo.getNode());
if (ctx.addSuggestion(suggester) == null) break;
}
} }
},; },;
@ -254,7 +248,7 @@ public class Suggestion {
} }
public void getSuggestions(SuggestionCtx ctx) { public void getSuggestions(SuggestionCtx ctx) {
perNodeSuggestions(ctx);
} }
public void addViolatingReplicas(ViolationCtx ctx) { public void addViolatingReplicas(ViolationCtx ctx) {
@ -262,6 +256,7 @@ public class Suggestion {
row.forEachReplica(replica -> { row.forEachReplica(replica -> {
if (ctx.clause.replica.isPass(0) && !ctx.clause.tag.isPass(row)) return; if (ctx.clause.replica.isPass(0) && !ctx.clause.tag.isPass(row)) return;
if (!ctx.clause.replica.isPass(0) && ctx.clause.tag.isPass(row)) return; if (!ctx.clause.replica.isPass(0) && ctx.clause.tag.isPass(row)) return;
if(!ctx.currentViolation.matchShard(replica.getShard())) return;
if (!ctx.clause.collection.isPass(ctx.currentViolation.coll) || !ctx.clause.shard.isPass(ctx.currentViolation.shard)) if (!ctx.clause.collection.isPass(ctx.currentViolation.coll) || !ctx.clause.shard.isPass(ctx.currentViolation.shard))
return; return;
ctx.currentViolation.addReplica(new ReplicaInfoAndErr(replica).withDelta(ctx.clause.tag.delta(row.getVal(ctx.clause.tag.name)))); ctx.currentViolation.addReplica(new ReplicaInfoAndErr(replica).withDelta(ctx.clause.tag.delta(row.getVal(ctx.clause.tag.name))));
@ -308,6 +303,17 @@ public class Suggestion {
} }
} }
private static void perNodeSuggestions(SuggestionCtx ctx) {
if (ctx.violation == null) return;
for (ReplicaInfoAndErr e : ctx.violation.getViolatingReplicas()) {
Suggester suggester = ctx.session.getSuggester(MOVEREPLICA)
.forceOperation(true)
.hint(Suggester.Hint.COLL_SHARD, new Pair<>(e.replicaInfo.getCollection(), e.replicaInfo.getShard()))
.hint(Suggester.Hint.SRC_NODE, e.replicaInfo.getNode());
if (ctx.addSuggestion(suggester) == null) break;
}
}
static { static {
for (Suggestion.ConditionType t : Suggestion.ConditionType.values()) Suggestion.validatetypes.put(t.tagName, t); for (Suggestion.ConditionType t : Suggestion.ConditionType.values()) Suggestion.validatetypes.put(t.tagName, t);
} }

View File

@ -34,7 +34,7 @@ public class Violation implements MapWriter {
final Object tagKey; final Object tagKey;
private final int hash; private final int hash;
private final Clause clause; private final Clause clause;
private List<ReplicaInfoAndErr> violationVsMetaData = new ArrayList<>(); private List<ReplicaInfoAndErr> replicaInfoAndErrs = new ArrayList<>();
Violation(Clause clause, String coll, String shard, String node, Object actualVal, Long replicaCountDelta, Object tagKey) { Violation(Clause clause, String coll, String shard, String node, Object actualVal, Long replicaCountDelta, Object tagKey) {
this.clause = clause; this.clause = clause;
@ -48,17 +48,23 @@ public class Violation implements MapWriter {
} }
public Violation addReplica(ReplicaInfoAndErr r) { public Violation addReplica(ReplicaInfoAndErr r) {
violationVsMetaData.add(r); replicaInfoAndErrs.add(r);
return this; return this;
} }
public List<ReplicaInfoAndErr> getViolatingReplicas() { public List<ReplicaInfoAndErr> getViolatingReplicas() {
return violationVsMetaData; return replicaInfoAndErrs;
} }
public Clause getClause() { public Clause getClause() {
return clause; return clause;
} }
public boolean matchShard(String shard) {
if (getClause().shard.op == Operand.WILDCARD) return true;
return this.shard == null || this.shard.equals(shard);
}
static class ReplicaInfoAndErr implements MapWriter{ static class ReplicaInfoAndErr implements MapWriter{
final ReplicaInfo replicaInfo; final ReplicaInfo replicaInfo;

View File

@ -1521,4 +1521,61 @@ public class TestPolicy extends SolrTestCaseJ4 {
} }
} }
public void testPortSuggestions() {
String autoScalingjson = "{" +
" 'cluster-preferences': [" +
" { 'maximize': 'freedisk', 'precision': 50}," +
" { 'minimize': 'cores', 'precision': 3}" +
" ]," +
" 'cluster-policy': [" +
" { 'replica': 0, shard:'#EACH', port : '8983'}" +
" ]" +
"}";
String dataproviderdata = "{" +
" 'liveNodes': [" +
" 'node1:8983'," +
" 'node2:8984'," +
" 'node3:8985'" +
" ]," +
" 'replicaInfo': {" +
" 'node1:8983': {" +
" 'c1': {" +
" 's1': [" +
" {'r1': {'type': 'NRT'}}," +
" {'r2': {'type': 'NRT'}}" +
" ]," +
" 's2': [" +
" {'r1': {'type': 'NRT'}}," +
" {'r2': {'type': 'NRT'}}" +
" ]" +
" }" +
" }" +
" }," +
" 'nodeValues': {" +
" 'node1:8983': {" +
" 'cores': 4," +
" 'freedisk': 334," +
" 'port': 8983" +
" }," +
" 'node2:8984': {" +
" 'cores': 0," +
" 'freedisk': 1000," +
" 'port': 8984" +
" }," +
" 'node3:8985': {" +
" 'cores': 0," +
" 'freedisk': 1500," +
" 'port': 8985" +
" }" +
" }" +
"}";
AutoScalingConfig cfg = new AutoScalingConfig((Map<String, Object>) Utils.fromJSONString(autoScalingjson));
List<Violation> violations = cfg.getPolicy().createSession(cloudManagerWithData(dataproviderdata)).getViolations();
assertEquals(2, violations.size());
List<Suggester.SuggestionInfo> suggestions = PolicyHelper.getSuggestions(cfg, cloudManagerWithData(dataproviderdata));
assertEquals(4, suggestions.size());
}
} }