mirror of https://github.com/apache/lucene.git
SOLR-13016: Computing suggestions when policy have "#EQUAL" or "#ALL" rules take too long
This commit is contained in:
parent
e5c7bb4ddf
commit
2f6d31364e
|
@ -259,6 +259,8 @@ Improvements
|
|||
|
||||
* SOLR-12983: JavabinLoader should avoid creating String Objects and create UTF8CharSequence fields from byte[] (noble)
|
||||
|
||||
* SOLR-13016: Computing suggestions when policy have "#EQUAL" or "#ALL" rules take too long (noble)
|
||||
|
||||
Other Changes
|
||||
----------------------
|
||||
|
||||
|
|
|
@ -58,6 +58,7 @@ import static org.apache.solr.common.util.Utils.toJSONString;
|
|||
public class Clause implements MapWriter, Comparable<Clause> {
|
||||
private static final Set<String> IGNORE_TAGS = new HashSet<>(Arrays.asList(REPLICA, COLLECTION, SHARD, "strict", "type"));
|
||||
|
||||
private final int hashCode;
|
||||
final boolean hasComputedValue;
|
||||
final Map<String, Object> original;
|
||||
final Clause derivedFrom;
|
||||
|
@ -67,6 +68,7 @@ public class Clause implements MapWriter, Comparable<Clause> {
|
|||
|
||||
protected Clause(Clause clause, Function<Condition, Object> computedValueEvaluator) {
|
||||
this.original = clause.original;
|
||||
this.hashCode = original.hashCode();
|
||||
this.type = clause.type;
|
||||
this.collection = clause.collection;
|
||||
this.shard = clause.shard;
|
||||
|
@ -80,6 +82,7 @@ public class Clause implements MapWriter, Comparable<Clause> {
|
|||
|
||||
// internal use only
|
||||
Clause(Map<String, Object> original, Condition tag, Condition globalTag, boolean isStrict) {
|
||||
this.hashCode = original.hashCode();
|
||||
this.original = original;
|
||||
this.tag = tag;
|
||||
this.globalTag = globalTag;
|
||||
|
@ -93,6 +96,7 @@ public class Clause implements MapWriter, Comparable<Clause> {
|
|||
private Clause(Map<String, Object> m) {
|
||||
derivedFrom = (Clause) m.remove(Clause.class.getName());
|
||||
this.original = Utils.getDeepCopy(m, 10);
|
||||
this.hashCode = original.hashCode();
|
||||
String type = (String) m.get("type");
|
||||
this.type = type == null || ANY.equals(type) ? null : Replica.Type.valueOf(type.toUpperCase(Locale.ROOT));
|
||||
strict = Boolean.parseBoolean(String.valueOf(m.getOrDefault("strict", "true")));
|
||||
|
@ -449,12 +453,27 @@ public class Clause implements MapWriter, Comparable<Clause> {
|
|||
|
||||
}
|
||||
|
||||
public static long addReplicaCountsForNode = 0;
|
||||
public static long addReplicaCountsForNodeCacheMiss = 0;
|
||||
public static final String PERSHARD_REPLICAS = Clause.class.getSimpleName() + ".perShardReplicas";
|
||||
private void addReplicaCountsForNode(ComputedValueEvaluator computedValueEvaluator, ReplicaCount replicaCount, Row node) {
|
||||
node.forEachReplica((String) collection.getValue(), ri -> {
|
||||
if (Policy.ANY.equals(computedValueEvaluator.shardName)
|
||||
|| computedValueEvaluator.shardName.equals(ri.getShard()))
|
||||
replicaCount.increment(ri);
|
||||
});
|
||||
addReplicaCountsForNode++;
|
||||
|
||||
ReplicaCount rc = node.computeCacheIfAbsent(computedValueEvaluator.collName, computedValueEvaluator.shardName, PERSHARD_REPLICAS,
|
||||
this, o -> {
|
||||
addReplicaCountsForNodeCacheMiss++;
|
||||
ReplicaCount result = new ReplicaCount();
|
||||
node.forEachReplica((String) collection.getValue(), ri -> {
|
||||
if (Policy.ANY.equals(computedValueEvaluator.shardName)
|
||||
|| computedValueEvaluator.shardName.equals(ri.getShard()))
|
||||
result.increment(ri);
|
||||
});
|
||||
return result;
|
||||
});
|
||||
if (rc != null)
|
||||
replicaCount.increment(rc);
|
||||
|
||||
|
||||
}
|
||||
|
||||
List<Violation> testPerNode(Policy.Session session, double[] deviations) {
|
||||
|
@ -633,7 +652,7 @@ public class Clause implements MapWriter, Comparable<Clause> {
|
|||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return original.hashCode();
|
||||
return hashCode;
|
||||
}
|
||||
|
||||
public static Double parseDouble(String name, Object val) {
|
||||
|
|
|
@ -50,6 +50,7 @@ public class CoresVariable extends VariableBase {
|
|||
if (ctx.violation == null || ctx.violation.replicaCountDelta == 0) return;
|
||||
if (ctx.violation.replicaCountDelta > 0) {//there are more replicas than necessary
|
||||
for (int i = 0; i < Math.abs(ctx.violation.replicaCountDelta); i++) {
|
||||
if (!ctx.needMore()) return;
|
||||
Suggester suggester = ctx.session.getSuggester(MOVEREPLICA)
|
||||
.hint(Suggester.Hint.SRC_NODE, ctx.violation.node);
|
||||
if (ctx.addSuggestion(suggester) == null) break;
|
||||
|
@ -83,14 +84,23 @@ public class CoresVariable extends VariableBase {
|
|||
}
|
||||
}
|
||||
|
||||
static final String TOTALCORES = CoresVariable.class.getSimpleName() + ".totalcores";
|
||||
private int getTotalCores(Policy.Session session, AtomicInteger liveNodes) {
|
||||
int[] coresCount = new int[1];
|
||||
int coresCount = 0;
|
||||
for (Row row : session.matrix) {
|
||||
if (!row.isLive) continue;
|
||||
liveNodes.incrementAndGet();
|
||||
row.forEachReplica(replicaInfo -> coresCount[0]++);
|
||||
Integer res = row.computeCacheIfAbsent(TOTALCORES, o -> {
|
||||
int[] result = new int[1];
|
||||
row.forEachReplica(replicaInfo -> result[0]++);
|
||||
return result[0];
|
||||
});
|
||||
if (res != null)
|
||||
coresCount += res;
|
||||
|
||||
|
||||
}
|
||||
return coresCount[0];
|
||||
return coresCount;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -113,7 +113,7 @@ public class FreeDiskVariable extends VariableBase {
|
|||
.hint(Hint.COLL_SHARD, new Pair<>(replica.getCollection(), replica.getShard()))
|
||||
.hint(Hint.SRC_NODE, node.node)
|
||||
.forceOperation(true);
|
||||
if (ctx.addSuggestion(suggester) == null) break;
|
||||
ctx.addSuggestion(suggester);
|
||||
currentDelta -= Clause.parseLong(CORE_IDX.tagName, replica.getVariable(CORE_IDX.tagName));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -64,7 +64,6 @@ public class MoveReplicaSuggester extends Suggester {
|
|||
targetRow = targetRow.addReplica(ri.getCollection(), ri.getShard(), ri.getType(), strict); // add replica to target first
|
||||
Row srcRowModified = targetRow.session.getNode(fromRow.node).removeReplica(ri.getCollection(), ri.getShard(), ri.getType());//then remove replica from source node
|
||||
List<Violation> errs = testChangedMatrix(strict, srcRowModified.session);
|
||||
srcRowModified.session.applyRules(); // now resort the nodes with the new values
|
||||
Policy.Session tmpSession = srcRowModified.session;
|
||||
|
||||
if (!containsNewErrors(errs) &&
|
||||
|
|
|
@ -18,6 +18,7 @@
|
|||
package org.apache.solr.client.solrj.cloud.autoscaling;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.StringWriter;
|
||||
import java.lang.invoke.MethodHandles;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
|
@ -277,6 +278,7 @@ public class Policy implements MapWriter {
|
|||
}
|
||||
|
||||
static void setApproxValuesAndSortNodes(List<Preference> clusterPreferences, List<Row> matrix) {
|
||||
List<Row> matrixCopy = new ArrayList<>(matrix);
|
||||
List<Row> deadNodes = null;
|
||||
Iterator<Row> it =matrix.iterator();
|
||||
while (it.hasNext()){
|
||||
|
@ -300,16 +302,19 @@ public class Policy implements MapWriter {
|
|||
return p.compare(r1, r2, false);
|
||||
});
|
||||
} catch (Exception e) {
|
||||
// log.error("Exception! prefs = {}, recent r1 = {}, r2 = {}, matrix = {}",
|
||||
// clusterPreferences,
|
||||
// lastComparison[0],
|
||||
// lastComparison[1],
|
||||
// Utils.toJSONString(Utils.getDeepCopy(tmpMatrix, 6, false)));
|
||||
log.error("Exception! prefs = {}, recent r1 = {}, r2 = {}, matrix = {}",
|
||||
clusterPreferences,
|
||||
lastComparison[0].node,
|
||||
lastComparison[1].node,
|
||||
matrix.size());
|
||||
try {
|
||||
Map m = Collections.singletonMap("diagnostics", (MapWriter) ew -> {
|
||||
PolicyHelper.writeNodes(ew, matrixCopy);
|
||||
ew.put("config", matrix.get(0).session.getPolicy());
|
||||
});
|
||||
log.error("Exception! prefs = {}, recent r1 = {}, r2 = {}, matrix = {}",
|
||||
clusterPreferences,
|
||||
lastComparison[0].node,
|
||||
lastComparison[1].node,
|
||||
Utils.writeJson(m, new StringWriter(), true).toString());
|
||||
} catch (IOException e1) {
|
||||
//
|
||||
}
|
||||
throw new RuntimeException(e.getMessage());
|
||||
}
|
||||
p.setApproxVal(tmpMatrix);
|
||||
|
@ -602,7 +607,7 @@ public class Policy implements MapWriter {
|
|||
* Apply the preferences and conditions
|
||||
*/
|
||||
void applyRules() {
|
||||
setApproxValuesAndSortNodes(clusterPreferences, matrix);
|
||||
sortNodes();
|
||||
|
||||
for (Clause clause : expandedClauses) {
|
||||
List<Violation> errs = clause.test(this, null);
|
||||
|
@ -610,6 +615,10 @@ public class Policy implements MapWriter {
|
|||
}
|
||||
}
|
||||
|
||||
void sortNodes() {
|
||||
setApproxValuesAndSortNodes(clusterPreferences, matrix);
|
||||
}
|
||||
|
||||
|
||||
public List<Violation> getViolations() {
|
||||
return violations;
|
||||
|
|
|
@ -203,12 +203,20 @@ public class PolicyHelper {
|
|||
|
||||
public static MapWriter getDiagnostics(Policy.Session session) {
|
||||
List<Row> sorted = session.getSortedNodes();
|
||||
return ew -> {
|
||||
writeNodes(ew, sorted);
|
||||
ew.put("liveNodes", session.cloudManager.getClusterStateProvider().getLiveNodes())
|
||||
.put("violations", session.getViolations())
|
||||
.put("config", session.getPolicy());
|
||||
};
|
||||
}
|
||||
|
||||
static void writeNodes(MapWriter.EntryWriter ew, List<Row> sorted) throws IOException {
|
||||
Set<CharSequence> alreadyWritten = new HashSet<>();
|
||||
BiPredicate<CharSequence, Object> p = dedupeKeyPredicate(alreadyWritten)
|
||||
.and(ConditionalMapWriter.NON_NULL_VAL)
|
||||
.and((s, o) -> !(o instanceof Map) || !((Map) o).isEmpty());
|
||||
|
||||
return ew -> ew.put("sortedNodes", (IteratorWriter) iw -> {
|
||||
ew.put("sortedNodes", (IteratorWriter) iw -> {
|
||||
for (Row row : sorted) {
|
||||
iw.add((MapWriter) ew1 -> {
|
||||
alreadyWritten.clear();
|
||||
|
@ -219,20 +227,19 @@ public class PolicyHelper {
|
|||
ew1.put("replicas", row.collectionVsShardVsReplicas);
|
||||
});
|
||||
}
|
||||
}).put("liveNodes", session.cloudManager.getClusterStateProvider().getLiveNodes())
|
||||
.put("violations", session.getViolations())
|
||||
.put("config", session.getPolicy());
|
||||
});
|
||||
}
|
||||
|
||||
public static List<Suggester.SuggestionInfo> getSuggestions(AutoScalingConfig autoScalingConf,
|
||||
SolrCloudManager cloudManager) {
|
||||
return getSuggestions(autoScalingConf, cloudManager, 50);
|
||||
return getSuggestions(autoScalingConf, cloudManager, 20, 10);
|
||||
}
|
||||
|
||||
public static List<Suggester.SuggestionInfo> getSuggestions(AutoScalingConfig autoScalingConf,
|
||||
SolrCloudManager cloudManager, int max) {
|
||||
SolrCloudManager cloudManager, int max, int timeoutInSecs) {
|
||||
Policy policy = autoScalingConf.getPolicy();
|
||||
Suggestion.Ctx ctx = new Suggestion.Ctx();
|
||||
ctx.endTime = cloudManager.getTimeSource().getTimeNs() + TimeUnit.SECONDS.toNanos(timeoutInSecs);
|
||||
ctx.max = max;
|
||||
ctx.session = policy.createSession(cloudManager);
|
||||
List<Violation> violations = ctx.session.getViolations();
|
||||
|
@ -243,6 +250,7 @@ public class PolicyHelper {
|
|||
|
||||
for (Violation current : ctx.session.getViolations()) {
|
||||
for (Violation old : violations) {
|
||||
if (!ctx.needMore()) return ctx.getSuggestions();
|
||||
if (current.equals(old)) {
|
||||
//could not be resolved
|
||||
ctx.suggestions.add(new Suggester.SuggestionInfo(current, null, "unresolved-violation"));
|
||||
|
@ -267,6 +275,7 @@ public class PolicyHelper {
|
|||
|
||||
private static void addMissingReplicas(SolrCloudManager cloudManager, Suggestion.Ctx ctx) throws IOException {
|
||||
cloudManager.getClusterStateProvider().getClusterState().forEachCollection(coll -> coll.forEach(slice -> {
|
||||
if (!ctx.needMore()) return;
|
||||
ReplicaCount replicaCount = new ReplicaCount();
|
||||
slice.forEach(replica -> {
|
||||
if (replica.getState() == Replica.State.ACTIVE || replica.getState() == Replica.State.RECOVERING) {
|
||||
|
@ -283,6 +292,7 @@ public class PolicyHelper {
|
|||
private static void addMissingReplicas(ReplicaCount count, DocCollection coll, String shard, Replica.Type type, Suggestion.Ctx ctx) {
|
||||
int delta = count.delta(coll.getExpectedReplicaCount(type, 0), type);
|
||||
for (; ; ) {
|
||||
if (!ctx.needMore()) return;
|
||||
if (delta >= 0) break;
|
||||
SolrRequest suggestion = ctx.addSuggestion(
|
||||
ctx.session.getSuggester(ADDREPLICA)
|
||||
|
@ -299,6 +309,7 @@ public class PolicyHelper {
|
|||
List<Row> matrix = ctx.session.matrix;
|
||||
if (matrix.isEmpty()) return;
|
||||
for (int i = 0; i < matrix.size(); i++) {
|
||||
if (ctx.getSuggestions().size() >= maxTotalSuggestions || ctx.hasTimedOut()) break;
|
||||
Row row = matrix.get(i);
|
||||
Map<String, Collection<String>> collVsShards = new HashMap<>();
|
||||
row.forEachReplica(ri -> collVsShards.computeIfAbsent(ri.getCollection(), s -> new HashSet<>()).add(ri.getShard()));
|
||||
|
@ -306,7 +317,8 @@ public class PolicyHelper {
|
|||
e.setValue(FreeDiskVariable.getSortedShards(Collections.singletonList(row), e.getValue(), e.getKey()));
|
||||
}
|
||||
for (Map.Entry<String, Collection<String>> e : collVsShards.entrySet()) {
|
||||
if (ctx.getSuggestions().size() >= maxTotalSuggestions) break;
|
||||
if (!ctx.needMore()) return;
|
||||
if (ctx.getSuggestions().size() >= maxTotalSuggestions || ctx.hasTimedOut()) break;
|
||||
for (String shard : e.getValue()) {
|
||||
Suggester suggester = ctx.session.getSuggester(MOVEREPLICA)
|
||||
.hint(Hint.COLL_SHARD, new Pair<>(e.getKey(), shard))
|
||||
|
|
|
@ -69,7 +69,7 @@ public class Preference implements MapWriter {
|
|||
int result = 0;
|
||||
if (o1 instanceof Long && o2 instanceof Long) result = ((Long) o1).compareTo((Long) o2);
|
||||
else if (o1 instanceof Double && o2 instanceof Double) {
|
||||
result = compareWithTolerance((Double) o1, (Double) o2, useApprox ? 1 : 1);
|
||||
result = compareWithTolerance((Double) o1, (Double) o2, useApprox ? 1f : 0.01f);
|
||||
} else if (!o1.getClass().getName().equals(o2.getClass().getName())) {
|
||||
throw new RuntimeException("Unable to compare " + o1 + " of type: " + o1.getClass().getName() + " from " + r1.cells[idx].toString() + " and " + o2 + " of type: " + o2.getClass().getName() + " from " + r2.cells[idx].toString());
|
||||
}
|
||||
|
@ -78,7 +78,7 @@ public class Preference implements MapWriter {
|
|||
next.compare(r1, r2, useApprox)) : sort.sortval * result;
|
||||
}
|
||||
|
||||
static int compareWithTolerance(Double o1, Double o2, int percentage) {
|
||||
static int compareWithTolerance(Double o1, Double o2, float percentage) {
|
||||
if (percentage == 0) return o1.compareTo(o2);
|
||||
if (o1.equals(o2)) return 0;
|
||||
double delta = Math.abs(o1 - o2);
|
||||
|
|
|
@ -73,6 +73,13 @@ class ReplicaCount implements MapWriter {
|
|||
increment(info.getType());
|
||||
}
|
||||
|
||||
void increment(ReplicaCount count) {
|
||||
nrt += count.nrt;
|
||||
pull += count.pull;
|
||||
tlog += count.tlog;
|
||||
}
|
||||
|
||||
|
||||
public void increment(Replica.Type type) {
|
||||
switch (type) {
|
||||
case NRT:
|
||||
|
@ -89,6 +96,16 @@ class ReplicaCount implements MapWriter {
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object obj) {
|
||||
if (obj instanceof ReplicaCount) {
|
||||
ReplicaCount that = (ReplicaCount) obj;
|
||||
return that.nrt == this.nrt && that.tlog == this.tlog && that.pull == this.pull;
|
||||
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return Utils.toJSONString(this);
|
||||
|
|
|
@ -20,7 +20,6 @@ package org.apache.solr.client.solrj.cloud.autoscaling;
|
|||
import java.util.Collection;
|
||||
import java.util.HashSet;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import org.apache.solr.common.util.StrUtils;
|
||||
|
||||
|
@ -30,16 +29,27 @@ class ReplicaVariable extends VariableBase {
|
|||
super(type);
|
||||
}
|
||||
|
||||
public static final String REPLICASCOUNT = "relevantReplicas";
|
||||
|
||||
|
||||
|
||||
|
||||
static int getRelevantReplicasCount(Policy.Session session, Condition cv, String collection, String shard) {
|
||||
AtomicInteger totalReplicasOfInterest = new AtomicInteger(0);
|
||||
int totalReplicasOfInterest = 0;
|
||||
Clause clause = cv.getClause();
|
||||
for (Row row : session.matrix) {
|
||||
row.forEachReplica(replicaInfo -> {
|
||||
if (clause.isMatch(replicaInfo, collection, shard))
|
||||
totalReplicasOfInterest.incrementAndGet();
|
||||
Integer perShardCount = row.computeCacheIfAbsent(collection, shard, REPLICASCOUNT, cv.clause, o -> {
|
||||
int[] result = new int[1];
|
||||
row.forEachReplica(collection, replicaInfo -> {
|
||||
if (clause.isMatch(replicaInfo, collection, shard))
|
||||
result[0]++;
|
||||
});
|
||||
return result[0];
|
||||
});
|
||||
if (perShardCount != null)
|
||||
totalReplicasOfInterest += perShardCount;
|
||||
}
|
||||
return totalReplicasOfInterest.get();
|
||||
return totalReplicasOfInterest;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -27,8 +27,10 @@ import java.util.LinkedList;
|
|||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Random;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.function.BiConsumer;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.function.Function;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import org.apache.solr.common.MapWriter;
|
||||
|
@ -50,10 +52,13 @@ public class Row implements MapWriter {
|
|||
public final String node;
|
||||
final Cell[] cells;
|
||||
//this holds the details of each replica in the node
|
||||
public Map<String, Map<String, List<ReplicaInfo>>> collectionVsShardVsReplicas;
|
||||
Map<String, Map<String, List<ReplicaInfo>>> collectionVsShardVsReplicas;
|
||||
|
||||
boolean anyValueMissing = false;
|
||||
boolean isLive = true;
|
||||
Policy.Session session;
|
||||
Map globalCache;
|
||||
Map perCollCache;
|
||||
|
||||
public Row(String node, List<Pair<String, Variable.Type>> params, List<String> perReplicaAttributes, Policy.Session session) {
|
||||
this.session = session;
|
||||
|
@ -70,16 +75,84 @@ public class Row implements MapWriter {
|
|||
if (NODE.equals(pair.first())) cells[i].val = node;
|
||||
if (cells[i].val == null) anyValueMissing = true;
|
||||
}
|
||||
this.globalCache = new HashMap();
|
||||
this.perCollCache = new HashMap();
|
||||
isAlreadyCopied = true;
|
||||
}
|
||||
|
||||
|
||||
public static final Map<String, CacheEntry> cacheStats = new HashMap<>();
|
||||
|
||||
static class CacheEntry implements MapWriter {
|
||||
AtomicLong hits = new AtomicLong(), misses = new AtomicLong();
|
||||
|
||||
@Override
|
||||
public void writeMap(EntryWriter ew) throws IOException {
|
||||
ew.put("hits", hits.get());
|
||||
ew.put("misses", misses.get());
|
||||
}
|
||||
|
||||
public static boolean hit(String cacheName) {
|
||||
// getCacheEntry(cacheName).hits.incrementAndGet();
|
||||
return true;
|
||||
}
|
||||
|
||||
private static CacheEntry getCacheEntry(String cacheName) {
|
||||
CacheEntry cacheEntry = cacheStats.get(cacheName);
|
||||
if (cacheEntry == null) {
|
||||
cacheStats.put(cacheName, cacheEntry = new CacheEntry());
|
||||
}
|
||||
return cacheEntry;
|
||||
}
|
||||
|
||||
public static boolean miss(String cacheName) {
|
||||
getCacheEntry(cacheName).misses.incrementAndGet();
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
public void forEachShard(String collection, BiConsumer<String, List<ReplicaInfo>> consumer) {
|
||||
collectionVsShardVsReplicas
|
||||
.getOrDefault(collection, Collections.emptyMap())
|
||||
.forEach(consumer);
|
||||
}
|
||||
|
||||
|
||||
public <R> R computeCacheIfAbsent(String cacheName, Function<Object, R> supplier) {
|
||||
R result = (R) globalCache.get(cacheName);
|
||||
if (result != null) {
|
||||
assert CacheEntry.hit(cacheName);
|
||||
return result;
|
||||
} else {
|
||||
assert CacheEntry.miss(cacheName);
|
||||
globalCache.put(cacheName, result = supplier.apply(cacheName));
|
||||
return result;
|
||||
}
|
||||
}
|
||||
|
||||
public <R> R computeCacheIfAbsent(String coll, String shard, String cacheName, Object key, Function<Object, R> supplier) {
|
||||
Map collMap = (Map) this.perCollCache.get(coll);
|
||||
if (collMap == null) this.perCollCache.put(coll, collMap = new HashMap());
|
||||
Map shardMap = (Map) collMap.get(shard);
|
||||
if (shardMap == null) collMap.put(shard, shardMap = new HashMap());
|
||||
Map cacheNameMap = (Map) shardMap.get(cacheName);
|
||||
if (cacheNameMap == null) shardMap.put(cacheName, cacheNameMap = new HashMap());
|
||||
R result = (R) cacheNameMap.get(key);
|
||||
if (result == null) {
|
||||
CacheEntry.miss(cacheName);
|
||||
cacheNameMap.put(key, result = supplier.apply(key));
|
||||
return result;
|
||||
} else {
|
||||
CacheEntry.hit(cacheName);
|
||||
return result;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
public Row(String node, Cell[] cells, boolean anyValueMissing, Map<String,
|
||||
Map<String, List<ReplicaInfo>>> collectionVsShardVsReplicas, boolean isLive, Policy.Session session) {
|
||||
Map<String, List<ReplicaInfo>>> collectionVsShardVsReplicas, boolean isLive, Policy.Session session, Map perRowCache, Map globalCache) {
|
||||
this.session = session;
|
||||
this.node = node;
|
||||
this.isLive = isLive;
|
||||
|
@ -90,6 +163,8 @@ public class Row implements MapWriter {
|
|||
}
|
||||
this.anyValueMissing = anyValueMissing;
|
||||
this.collectionVsShardVsReplicas = collectionVsShardVsReplicas;
|
||||
this.perCollCache = perRowCache;
|
||||
this.globalCache = globalCache;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -101,7 +176,7 @@ public class Row implements MapWriter {
|
|||
}
|
||||
|
||||
Row copy(Policy.Session session) {
|
||||
return new Row(node, cells, anyValueMissing, Utils.getDeepCopy(collectionVsShardVsReplicas, 3), isLive, session);
|
||||
return new Row(node, cells, anyValueMissing, collectionVsShardVsReplicas, isLive, session, this.globalCache, this.perCollCache);
|
||||
}
|
||||
|
||||
Object getVal(String name) {
|
||||
|
@ -147,11 +222,13 @@ public class Row implements MapWriter {
|
|||
log.error("more than 3 levels of recursion ", new RuntimeException());
|
||||
return this;
|
||||
}
|
||||
lazyCopyReplicas(coll, shard);
|
||||
List<OperationInfo> furtherOps = new LinkedList<>();
|
||||
Consumer<OperationInfo> opCollector = it -> furtherOps.add(it);
|
||||
Row row = null;
|
||||
row = session.copy().getNode(this.node);
|
||||
if (row == null) throw new RuntimeException("couldn't get a row");
|
||||
row.lazyCopyReplicas(coll, shard);
|
||||
Map<String, List<ReplicaInfo>> c = row.collectionVsShardVsReplicas.computeIfAbsent(coll, k -> new HashMap<>());
|
||||
List<ReplicaInfo> replicas = c.computeIfAbsent(shard, k -> new ArrayList<>());
|
||||
String replicaname = "SYNTHETIC." + new Random().nextInt(1000) + 1000;
|
||||
|
@ -172,6 +249,33 @@ public class Row implements MapWriter {
|
|||
return row;
|
||||
}
|
||||
|
||||
boolean isAlreadyCopied = false;
|
||||
|
||||
private void lazyCopyReplicas(String coll, String shard) {
|
||||
globalCache = new HashMap();
|
||||
Map cacheCopy = new HashMap<>(perCollCache);
|
||||
cacheCopy.remove(coll);//todo optimize at shard level later
|
||||
perCollCache = cacheCopy;
|
||||
if (isAlreadyCopied) return;//caches need to be invalidated but the rest can remain as is
|
||||
|
||||
Map<String, Map<String, List<ReplicaInfo>>> replicasCopy = new HashMap<>(collectionVsShardVsReplicas);
|
||||
Map<String, List<ReplicaInfo>> oneColl = replicasCopy.get(coll);
|
||||
if (oneColl != null) {
|
||||
replicasCopy.put(coll, Utils.getDeepCopy(oneColl, 2));
|
||||
}
|
||||
collectionVsShardVsReplicas = replicasCopy;
|
||||
isAlreadyCopied = true;
|
||||
}
|
||||
|
||||
boolean hasColl(String coll) {
|
||||
return collectionVsShardVsReplicas.containsKey(coll);
|
||||
}
|
||||
|
||||
public void createCollShard(Pair<String, String> collShard) {
|
||||
Map<String, List<ReplicaInfo>> shardInfo = collectionVsShardVsReplicas.computeIfAbsent(collShard.first(), Utils.NEW_HASHMAP_FUN);
|
||||
if (collShard.second() != null) shardInfo.computeIfAbsent(collShard.second(), Utils.NEW_ARRAYLIST_FUN);
|
||||
}
|
||||
|
||||
|
||||
static class OperationInfo {
|
||||
final String coll, shard, node, cellName;
|
||||
|
@ -221,6 +325,7 @@ public class Row implements MapWriter {
|
|||
List<OperationInfo> furtherOps = new LinkedList<>();
|
||||
Consumer<OperationInfo> opCollector = it -> furtherOps.add(it);
|
||||
Row row = session.copy().getNode(this.node);
|
||||
row.lazyCopyReplicas(coll, shard);
|
||||
Map<String, List<ReplicaInfo>> c = row.collectionVsShardVsReplicas.get(coll);
|
||||
if (c == null) return null;
|
||||
List<ReplicaInfo> r = c.get(shard);
|
||||
|
|
|
@ -23,7 +23,6 @@ import java.util.ArrayList;
|
|||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.EnumMap;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.List;
|
||||
|
@ -207,16 +206,14 @@ public abstract class Suggester implements MapWriter {
|
|||
for (Pair<String, String> shard : collectionShardPairs) {
|
||||
// if this is not a known collection from the existing clusterstate,
|
||||
// then add it
|
||||
if (session.matrix.stream().noneMatch(row -> row.collectionVsShardVsReplicas.containsKey(shard.first()))) {
|
||||
if (session.matrix.stream().noneMatch(row -> row.hasColl(shard.first()))) {
|
||||
session.addClausesForCollection(stateProvider, shard.first());
|
||||
}
|
||||
for (Row row : session.matrix) {
|
||||
Map<String, List<ReplicaInfo>> shardInfo = row.collectionVsShardVsReplicas.computeIfAbsent(shard.first(), it -> new HashMap<>());
|
||||
if (shard.second() != null) shardInfo.computeIfAbsent(shard.second(), it -> new ArrayList<>());
|
||||
}
|
||||
for (Row row : session.matrix) row.createCollShard(shard);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
public Policy.Session getSession() {
|
||||
return session;
|
||||
}
|
||||
|
@ -337,6 +334,7 @@ public abstract class Suggester implements MapWriter {
|
|||
if (!errs.isEmpty() &&
|
||||
(executeInStrictMode || clause.strict)) errors.addAll(errs);
|
||||
}
|
||||
session.violations = errors;
|
||||
if (!errors.isEmpty()) deviations = null;
|
||||
return errors;
|
||||
}
|
||||
|
|
|
@ -32,6 +32,7 @@ import static org.apache.solr.common.params.CollectionParams.CollectionAction.MO
|
|||
|
||||
public class Suggestion {
|
||||
static class Ctx {
|
||||
long endTime = -1;
|
||||
int max = Integer.MAX_VALUE;
|
||||
public Policy.Session session;
|
||||
public Violation violation;
|
||||
|
@ -60,14 +61,20 @@ public class Suggestion {
|
|||
return suggestions;
|
||||
}
|
||||
|
||||
public boolean hasTimedOut() {
|
||||
return session.cloudManager.getTimeSource().getTimeNs() >= endTime;
|
||||
|
||||
}
|
||||
|
||||
public boolean needMore() {
|
||||
return suggestions.size() < max;
|
||||
return suggestions.size() < max && !hasTimedOut();
|
||||
}
|
||||
}
|
||||
|
||||
static void suggestNegativeViolations(Suggestion.Ctx ctx, Function<Set<String>, List<String>> shardSorter) {
|
||||
if (ctx.violation.coll == null) return;
|
||||
Set<String> shardSet = new HashSet<>();
|
||||
if (!ctx.needMore()) return;
|
||||
for (Row node : ctx.session.matrix)
|
||||
node.forEachShard(ctx.violation.coll, (s, ri) -> {
|
||||
if (Policy.ANY.equals(ctx.violation.shard) || s.equals(ctx.violation.shard)) shardSet.add(s);
|
||||
|
@ -76,6 +83,7 @@ public class Suggestion {
|
|||
List<String> shards = shardSorter.apply(shardSet);
|
||||
outer:
|
||||
for (int i = 0; i < 5; i++) {
|
||||
if (!ctx.needMore()) break;
|
||||
int totalSuggestions = 0;
|
||||
for (String shard : shards) {
|
||||
Suggester suggester = ctx.session.getSuggester(MOVEREPLICA)
|
||||
|
@ -102,6 +110,7 @@ public class Suggestion {
|
|||
if (ctx.violation == null) return;
|
||||
Double currentDelta = ctx.violation.replicaCountDelta;
|
||||
for (ReplicaInfoAndErr e : ctx.violation.getViolatingReplicas()) {
|
||||
if (!ctx.needMore()) break;
|
||||
if (currentDelta <= 0) break;
|
||||
Suggester suggester = ctx.session.getSuggester(MOVEREPLICA)
|
||||
.forceOperation(true)
|
||||
|
|
|
@ -147,7 +147,7 @@ public class SolrClientNodeStateProvider implements NodeStateProvider, MapWriter
|
|||
|
||||
@Override
|
||||
public Map<String, Map<String, List<ReplicaInfo>>> getReplicaInfo(String node, Collection<String> keys) {
|
||||
Map<String, Map<String, List<ReplicaInfo>>> result = nodeVsCollectionVsShardVsReplicaInfo.computeIfAbsent(node, s -> emptyMap());
|
||||
Map<String, Map<String, List<ReplicaInfo>>> result = nodeVsCollectionVsShardVsReplicaInfo.computeIfAbsent(node, Utils.NEW_HASHMAP_FUN);
|
||||
if (!keys.isEmpty()) {
|
||||
Map<String, Pair<String, ReplicaInfo>> metricsKeyVsTagReplica = new HashMap<>();
|
||||
Row.forEachReplica(result, r -> {
|
||||
|
|
|
@ -35,6 +35,7 @@ import java.util.ArrayList;
|
|||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.List;
|
||||
|
@ -44,6 +45,7 @@ import java.util.TreeMap;
|
|||
import java.util.TreeSet;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.function.BiConsumer;
|
||||
import java.util.function.Function;
|
||||
import java.util.regex.Matcher;
|
||||
|
@ -81,6 +83,10 @@ import static java.util.Collections.unmodifiableSet;
|
|||
import static java.util.concurrent.TimeUnit.NANOSECONDS;
|
||||
|
||||
public class Utils {
|
||||
public static final Function NEW_HASHMAP_FUN = o -> new HashMap<>();
|
||||
public static final Function NEW_ATOMICLONG_FUN = o -> new AtomicLong();
|
||||
public static final Function NEW_ARRAYLIST_FUN = o -> new ArrayList<>();
|
||||
public static final Function NEW_HASHSET_FUN = o -> new HashSet<>();
|
||||
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
||||
|
||||
public static Map getDeepCopy(Map map, int maxDepth) {
|
||||
|
@ -98,7 +104,7 @@ public class Utils {
|
|||
if (sorted) {
|
||||
copy = new TreeMap();
|
||||
} else {
|
||||
copy = new LinkedHashMap();
|
||||
copy = map instanceof LinkedHashMap? new LinkedHashMap(map.size()): new HashMap(map.size());
|
||||
}
|
||||
for (Object o : map.entrySet()) {
|
||||
Map.Entry e = (Map.Entry) o;
|
||||
|
@ -293,6 +299,19 @@ public class Utils {
|
|||
}
|
||||
};
|
||||
|
||||
public static final Function<JSONParser, ObjectBuilder> MAPOBJBUILDER = jsonParser -> {
|
||||
try {
|
||||
return new ObjectBuilder(jsonParser){
|
||||
@Override
|
||||
public Object newObject() {
|
||||
return new HashMap();
|
||||
}
|
||||
};
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
};
|
||||
|
||||
public static Object fromJSON(InputStream is, Function<JSONParser, ObjectBuilder> objBuilderProvider) {
|
||||
try {
|
||||
return objBuilderProvider.apply(getJSONParser((new InputStreamReader(is, StandardCharsets.UTF_8)))).getVal();
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -1140,13 +1140,13 @@ public class TestPolicy extends SolrTestCaseJ4 {
|
|||
@Override
|
||||
public Map<String, Object> getNodeValues(String node, Collection<String> tags) {
|
||||
Map<String, Object> result = (Map<String, Object>) Utils.getObjectByPath(m, false, Arrays.asList("nodeValues", node));
|
||||
return result == null ? Collections.emptyMap() : result;
|
||||
return result == null ? new HashMap<>() : result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, Map<String, List<ReplicaInfo>>> getReplicaInfo(String node, Collection<String> keys) {
|
||||
Map<String, Map<String, List<ReplicaInfo>>> result = (Map<String, Map<String, List<ReplicaInfo>>>) Utils.getObjectByPath(m, false, Arrays.asList("replicaInfo", node));
|
||||
return result == null ? Collections.emptyMap() : result;
|
||||
return result == null ? new HashMap<>() : result;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
@ -2336,7 +2336,7 @@ public class TestPolicy extends SolrTestCaseJ4 {
|
|||
Utils.makeMap(FREEDISK.perReplicaValue, 200)))));
|
||||
return m;
|
||||
}
|
||||
return Collections.emptyMap();
|
||||
return new HashMap<>();
|
||||
}
|
||||
};
|
||||
}
|
||||
|
@ -2476,7 +2476,7 @@ public class TestPolicy extends SolrTestCaseJ4 {
|
|||
}
|
||||
rows.add(new Row((String) m.get("node"), c, false,
|
||||
new HashMap<>(),
|
||||
(Boolean) m.get("isLive"), null));
|
||||
(Boolean) m.get("isLive"), null, new HashMap(), new HashMap()));
|
||||
}
|
||||
int deadNodes = 0;
|
||||
for (Row row : rows) {
|
||||
|
@ -2618,13 +2618,13 @@ public class TestPolicy extends SolrTestCaseJ4 {
|
|||
@Override
|
||||
public Map<String, Object> getNodeValues(String node, Collection<String> tags) {
|
||||
Map<String, Object> result = (Map<String, Object>) Utils.getObjectByPath(m, false, Arrays.asList("nodeValues", node));
|
||||
return result == null ? Collections.emptyMap() : result;
|
||||
return result == null ? new HashMap<>() : result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, Map<String, List<ReplicaInfo>>> getReplicaInfo(String node, Collection<String> keys) {
|
||||
Map<String, Map<String, List<ReplicaInfo>>> result = (Map<String, Map<String, List<ReplicaInfo>>>) Utils.getObjectByPath(m, false, Arrays.asList("replicaInfo", node));
|
||||
return result == null ? Collections.emptyMap() : result;
|
||||
return result == null ? new HashMap<>() : result;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
|
|
@ -43,12 +43,13 @@ import org.apache.solr.client.solrj.impl.SolrClientNodeStateProvider;
|
|||
import org.apache.solr.common.cloud.ClusterState;
|
||||
import org.apache.solr.common.cloud.ReplicaPosition;
|
||||
import org.apache.solr.common.util.Utils;
|
||||
import org.junit.Ignore;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import static java.util.Collections.EMPTY_MAP;
|
||||
import static java.util.Collections.emptyMap;
|
||||
import static org.apache.solr.client.solrj.cloud.autoscaling.Variable.Type.CORES;
|
||||
import static org.apache.solr.common.util.Utils.MAPOBJBUILDER;
|
||||
import static org.apache.solr.common.util.Utils.getObjectByPath;
|
||||
|
||||
public class TestPolicy2 extends SolrTestCaseJ4 {
|
||||
|
@ -208,7 +209,7 @@ public class TestPolicy2 extends SolrTestCaseJ4 {
|
|||
|
||||
@Override
|
||||
public Map<String, Map<String, List<ReplicaInfo>>> getReplicaInfo(String node, Collection<String> keys) {
|
||||
Map<String, Map<String, List<ReplicaInfo>>> result = nodeVsCollectionVsShardVsReplicaInfo.computeIfAbsent(node, s -> emptyMap());
|
||||
Map<String, Map<String, List<ReplicaInfo>>> result = nodeVsCollectionVsShardVsReplicaInfo.computeIfAbsent(node, Utils.NEW_HASHMAP_FUN);
|
||||
if (!keys.isEmpty()) {
|
||||
Row.forEachReplica(result, replicaInfo -> {
|
||||
for (String key : keys) {
|
||||
|
@ -424,17 +425,28 @@ public class TestPolicy2 extends SolrTestCaseJ4 {
|
|||
SolrCloudManager cloudManagerFromDiagnostics = createCloudManagerFromDiagnostics(m);
|
||||
List<Suggester.SuggestionInfo> suggestions = PolicyHelper.getSuggestions(new AutoScalingConfig((Map<String, Object>) getObjectByPath(m, false, "diagnostics/config"))
|
||||
, cloudManagerFromDiagnostics);
|
||||
// System.out.println(Utils.writeJson(suggestions, new StringWriter(), true).toString());
|
||||
for (Suggester.SuggestionInfo suggestion : suggestions) {
|
||||
assertEquals("unresolved-violation", suggestion._get("type", null));
|
||||
assertEquals("1.0", suggestion._getStr("violation/violation/delta", null));
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
||||
|
||||
@Ignore
|
||||
public void testInfiniteLoop() {
|
||||
Row.cacheStats.clear();
|
||||
Map<String, Object> m = (Map<String, Object>) loadFromResource("testInfiniteLoop.json");
|
||||
SolrCloudManager cloudManagerFromDiagnostics = createCloudManagerFromDiagnostics(m);
|
||||
List<Suggester.SuggestionInfo> suggestions = PolicyHelper.getSuggestions(
|
||||
new AutoScalingConfig((Map<String, Object>) getObjectByPath(m, false, "diagnostics/config"))
|
||||
, cloudManagerFromDiagnostics, 200, 1200);
|
||||
|
||||
System.out.println(suggestions);
|
||||
}
|
||||
|
||||
public static Object loadFromResource(String file) {
|
||||
try (InputStream is = TestPolicy2.class.getResourceAsStream("/solrj/solr/autoscaling/" + file)) {
|
||||
return Utils.fromJSON(is);
|
||||
return Utils.fromJSON(is, MAPOBJBUILDER);
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue