mirror of https://github.com/apache/lucene.git
SOLR-10278: make hints optional
This commit is contained in:
parent
ea106682c2
commit
70462ed6a5
|
@ -77,7 +77,7 @@ public class SolrClientDataProvider implements ClusterDataProvider {
|
|||
if (collData == null) nodeData.put(collName, collData = new HashMap<>());
|
||||
List<ReplicaInfo> replicas = collData.get(shard);
|
||||
if (replicas == null) collData.put(shard, replicas = new ArrayList<>());
|
||||
replicas.add(new ReplicaInfo(replica.getName(), new HashMap<>()));
|
||||
replicas.add(new ReplicaInfo(replica.getName(), collName , shard, new HashMap<>()));
|
||||
});
|
||||
});
|
||||
}
|
||||
|
|
|
@ -19,6 +19,7 @@ package org.apache.solr.cloud.autoscaling;
|
|||
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.solr.common.util.Pair;
|
||||
import org.apache.solr.common.util.Utils;
|
||||
import org.apache.solr.cloud.autoscaling.Policy.Suggester;
|
||||
|
||||
|
@ -36,11 +37,14 @@ class AddReplicaSuggester extends Suggester {
|
|||
}
|
||||
|
||||
Map tryEachNode(boolean strict) {
|
||||
String coll = (String) hints.get(Hint.COLL);
|
||||
String shard = (String) hints.get(Hint.SHARD);
|
||||
if (coll == null || shard == null)
|
||||
throw new RuntimeException("add-replica requires 'collection' and 'shard'");
|
||||
//iterate through elements and identify the least loaded
|
||||
for (int i = getMatrix().size() - 1; i >= 0; i--) {
|
||||
Row row = getMatrix().get(i);
|
||||
String coll = hints.get(Hint.COLL);
|
||||
String shard = hints.get(Hint.SHARD);
|
||||
if (!isAllowed(row.node, Hint.TARGET_NODE)) continue;
|
||||
row = row.addReplica(coll, shard);
|
||||
row.violations.clear();
|
||||
for (Clause clause : session.expandedClauses) {
|
||||
|
|
|
@ -19,9 +19,9 @@ package org.apache.solr.cloud.autoscaling;
|
|||
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.solr.cloud.autoscaling.Policy.Suggester;
|
||||
import org.apache.solr.common.util.Pair;
|
||||
import org.apache.solr.common.util.Utils;
|
||||
import org.apache.solr.cloud.autoscaling.Policy.Suggester;
|
||||
|
||||
import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
|
||||
import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
|
||||
|
@ -40,23 +40,24 @@ public class MoveReplicaSuggester extends Suggester {
|
|||
|
||||
Map tryEachNode(boolean strict) {
|
||||
//iterate through elements and identify the least loaded
|
||||
String coll = hints.get(Hint.COLL);
|
||||
String shard = hints.get(Hint.SHARD);
|
||||
for (int i = 0; i < getMatrix().size(); i++) {
|
||||
Row fromRow = getMatrix().get(i);
|
||||
Pair<Row, Policy.ReplicaInfo> pair = fromRow.removeReplica(coll, shard);
|
||||
fromRow = pair.first();
|
||||
if(fromRow == null){
|
||||
for (Pair<Policy.ReplicaInfo, Row> fromReplica : getValidReplicas(true, true, -1)) {
|
||||
Row fromRow = fromReplica.second();
|
||||
String coll = fromReplica.first().collection;
|
||||
String shard = fromReplica.first().shard;
|
||||
Pair<Row, Policy.ReplicaInfo> tmpRow = fromRow.removeReplica(coll, shard);
|
||||
if (tmpRow.first() == null) {
|
||||
//no such replica available
|
||||
continue;
|
||||
}
|
||||
|
||||
for (Clause clause : session.expandedClauses) {
|
||||
if (strict || clause.strict) clause.test(fromRow);
|
||||
if (strict || clause.strict) clause.test(tmpRow.first());
|
||||
}
|
||||
if (fromRow.violations.isEmpty()) {
|
||||
int i = getMatrix().indexOf(fromRow);
|
||||
if (tmpRow.first().violations.isEmpty()) {
|
||||
for (int j = getMatrix().size() - 1; j > i; i--) {
|
||||
Row targetRow = getMatrix().get(i);
|
||||
Row targetRow = getMatrix().get(j);
|
||||
if (!isAllowed(targetRow.node, Hint.TARGET_NODE)) continue;
|
||||
targetRow = targetRow.addReplica(coll, shard);
|
||||
targetRow.violations.clear();
|
||||
for (Clause clause : session.expandedClauses) {
|
||||
|
@ -65,12 +66,12 @@ public class MoveReplicaSuggester extends Suggester {
|
|||
if (targetRow.violations.isEmpty()) {
|
||||
getMatrix().set(i, getMatrix().get(i).removeReplica(coll, shard).first());
|
||||
getMatrix().set(j, getMatrix().get(j).addReplica(coll, shard));
|
||||
return Utils.makeMap("operation", MOVEREPLICA.toLower(),
|
||||
COLLECTION_PROP, coll,
|
||||
SHARD_ID_PROP, shard,
|
||||
NODE, fromRow.node,
|
||||
REPLICA, pair.second().name,
|
||||
"target", targetRow.node);
|
||||
return Utils.makeMap("operation", MOVEREPLICA.toLower(),
|
||||
COLLECTION_PROP, coll,
|
||||
SHARD_ID_PROP, shard,
|
||||
NODE, fromRow.node,
|
||||
REPLICA, tmpRow.second().name,
|
||||
"targetNode", targetRow.node);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -28,6 +28,7 @@ import java.util.HashSet;
|
|||
import java.util.LinkedHashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Set;
|
||||
import java.util.function.Supplier;
|
||||
import java.util.stream.Collectors;
|
||||
|
@ -35,6 +36,7 @@ import java.util.stream.Collectors;
|
|||
import org.apache.solr.common.IteratorWriter;
|
||||
import org.apache.solr.common.MapWriter;
|
||||
import org.apache.solr.common.params.CollectionParams.CollectionAction;
|
||||
import org.apache.solr.common.util.Pair;
|
||||
import org.apache.solr.common.util.StrUtils;
|
||||
import org.apache.solr.common.util.Utils;
|
||||
|
||||
|
@ -158,7 +160,6 @@ public class Policy implements MapWriter {
|
|||
paramsOfInterest = new ArrayList<>(p);
|
||||
matrix = new ArrayList<>(nodes.size());
|
||||
for (String node : nodes) matrix.add(new Row(node, paramsOfInterest, dataProvider));
|
||||
for (Row row : matrix) row.replicaInfo.forEach((s, e) -> collections.add(s));
|
||||
applyRules();
|
||||
}
|
||||
|
||||
|
@ -268,9 +269,11 @@ public class Policy implements MapWriter {
|
|||
String core,collection,shard;
|
||||
Map<String, Object> variables;
|
||||
|
||||
public ReplicaInfo(String name, Map<String, Object> vals) {
|
||||
public ReplicaInfo(String name, String coll, String shard, Map<String, Object> vals) {
|
||||
this.name = name;
|
||||
this.variables = vals;
|
||||
this.collection = coll;
|
||||
this.shard = shard;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -291,7 +294,7 @@ public class Policy implements MapWriter {
|
|||
|
||||
|
||||
public static abstract class Suggester {
|
||||
protected final EnumMap<Hint, String> hints = new EnumMap<>(Hint.class);
|
||||
protected final EnumMap<Hint, Object> hints = new EnumMap<>(Hint.class);
|
||||
Policy.Session session;
|
||||
Map operation;
|
||||
private boolean isInitialized = false;
|
||||
|
@ -300,7 +303,7 @@ public class Policy implements MapWriter {
|
|||
this.session = session.copy();
|
||||
}
|
||||
|
||||
public Suggester hint(Hint hint, String value) {
|
||||
public Suggester hint(Hint hint, Object value) {
|
||||
hints.put(hint, value);
|
||||
return this;
|
||||
}
|
||||
|
@ -325,6 +328,35 @@ public class Policy implements MapWriter {
|
|||
|
||||
}
|
||||
|
||||
List<Pair<ReplicaInfo, Row>> getValidReplicas(boolean sortDesc, boolean isSource, int until) {
|
||||
List<Pair<Policy.ReplicaInfo, Row>> allPossibleReplicas = new ArrayList<>();
|
||||
|
||||
if (sortDesc) {
|
||||
if(until == -1) until = getMatrix().size();
|
||||
for (int i = 0; i < until; i++) addReplicaToList(getMatrix().get(i), isSource, allPossibleReplicas);
|
||||
} else {
|
||||
if(until == -1) until = 0;
|
||||
for (int i = getMatrix().size() - 1; i >= until; i--) addReplicaToList(getMatrix().get(i), isSource, allPossibleReplicas);
|
||||
}
|
||||
return allPossibleReplicas;
|
||||
}
|
||||
|
||||
void addReplicaToList(Row r, boolean isSource, List<Pair<Policy.ReplicaInfo, Row>> replicaList) {
|
||||
if (!isAllowed(r.node, isSource ? Hint.SRC_NODE : Hint.TARGET_NODE)) return;
|
||||
for (Map.Entry<String, Map<String, List<Policy.ReplicaInfo>>> e : r.replicaInfo.entrySet()) {
|
||||
if (!isAllowed(e.getKey(), Hint.COLL)) continue;
|
||||
for (Map.Entry<String, List<Policy.ReplicaInfo>> shard : e.getValue().entrySet()) {
|
||||
if (!isAllowed(e.getKey(), Hint.SHARD)) continue;
|
||||
replicaList.add(new Pair<>(shard.getValue().get(0), r));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
protected boolean isAllowed(Object v, Hint hint) {
|
||||
Object hintVal = hints.get(hint);
|
||||
return hintVal == null || Objects.equals(v, hintVal);
|
||||
}
|
||||
|
||||
public enum Hint {
|
||||
COLL, SHARD, SRC_NODE, TARGET_NODE
|
||||
}
|
||||
|
|
|
@ -43,7 +43,7 @@ class Row implements MapWriter {
|
|||
|
||||
Row(String node, List<String> params, ClusterDataProvider snitch) {
|
||||
replicaInfo = snitch.getReplicaInfo(node, params);
|
||||
if (replicaInfo == null) replicaInfo = Collections.emptyMap();
|
||||
if (replicaInfo == null) replicaInfo = new HashMap<>();
|
||||
this.node = node;
|
||||
cells = new Cell[params.size()];
|
||||
Map<String, Object> vals = snitch.getNodeValues(node, params);
|
||||
|
@ -95,10 +95,9 @@ class Row implements MapWriter {
|
|||
if (c == null) row.replicaInfo.put(coll, c = new HashMap<>());
|
||||
List<ReplicaInfo> s = c.get(shard);
|
||||
if (s == null) c.put(shard, s = new ArrayList<>());
|
||||
s.add(new ReplicaInfo(""+new Random().nextInt(10000)+10000 , new HashMap<>()));
|
||||
s.add(new ReplicaInfo(coll,shard,""+new Random().nextInt(1000)+1000 , new HashMap<>()));
|
||||
return row;
|
||||
|
||||
|
||||
}
|
||||
|
||||
Pair<Row, ReplicaInfo> removeReplica(String coll, String shard) {
|
||||
|
|
|
@ -33,6 +33,7 @@ import org.apache.solr.common.util.Utils;
|
|||
import org.apache.solr.common.util.ValidatingJsonMap;
|
||||
|
||||
import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDREPLICA;
|
||||
import static org.apache.solr.common.params.CollectionParams.CollectionAction.MOVEREPLICA;
|
||||
|
||||
public class TestPolicy extends SolrTestCaseJ4 {
|
||||
|
||||
|
@ -198,6 +199,16 @@ public class TestPolicy extends SolrTestCaseJ4 {
|
|||
Map operation = suggester.getOperation();
|
||||
assertEquals("node2", operation.get("node"));
|
||||
|
||||
nodeValues = (Map<String, Map>) Utils.fromJSONString("{" +
|
||||
"node1:{cores:12, freedisk: 334, heap:10480}," +
|
||||
"node2:{cores:4, freedisk: 749, heap:6873}," +
|
||||
"node3:{cores:7, freedisk: 262, heap:7834}," +
|
||||
"node5:{cores:0, freedisk: 895, heap:17834}," +
|
||||
"node4:{cores:8, freedisk: 375, heap:16900, nodeRole:overseer}" +
|
||||
"}");
|
||||
session = policy.createSession(getClusterDataProvider(nodeValues, clusterState));
|
||||
operation = session.getSuggester(MOVEREPLICA).hint(Hint.TARGET_NODE, "node5").getOperation();
|
||||
assertEquals("node5", operation.get("targetNode"));
|
||||
|
||||
|
||||
}
|
||||
|
@ -327,7 +338,7 @@ public class TestPolicy extends SolrTestCaseJ4 {
|
|||
if (shardVsReplicaStats == null) result.put(collName, shardVsReplicaStats = new HashMap<>());
|
||||
List<Policy.ReplicaInfo> replicaInfos = shardVsReplicaStats.get(shard);
|
||||
if (replicaInfos == null) shardVsReplicaStats.put(shard, replicaInfos = new ArrayList<>());
|
||||
replicaInfos.add(new Policy.ReplicaInfo(replicaName, new HashMap<>()));
|
||||
replicaInfos.add(new Policy.ReplicaInfo(replicaName,collName, shard, new HashMap<>()));
|
||||
});
|
||||
});
|
||||
});
|
||||
|
|
Loading…
Reference in New Issue