SOLR-11359: Refactored

This commit is contained in:
Noble Paul 2017-10-16 23:57:29 +10:30
parent e5c5acca94
commit 141b08a40f
14 changed files with 321 additions and 282 deletions

View File

@ -23,6 +23,7 @@ import java.io.IOException;
import org.apache.solr.client.solrj.cloud.autoscaling.NoneSuggester;
import org.apache.solr.client.solrj.cloud.autoscaling.Policy;
import org.apache.solr.client.solrj.cloud.autoscaling.SolrCloudManager;
import org.apache.solr.client.solrj.cloud.autoscaling.Suggester;
import org.apache.solr.client.solrj.impl.ClusterStateProvider;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.ClusterState;
@ -32,7 +33,7 @@ import org.apache.solr.common.cloud.ZkStateReader;
public class AutoAddReplicasPlanAction extends ComputePlanAction {
@Override
protected Policy.Suggester getSuggester(Policy.Session session, TriggerEvent event, SolrCloudManager cloudManager) {
protected Suggester getSuggester(Policy.Session session, TriggerEvent event, SolrCloudManager cloudManager) {
// for backward compatibility
ClusterStateProvider stateProvider = cloudManager.getClusterStateProvider();
String autoAddReplicas = stateProvider.getClusterProperty(ZkStateReader.AUTO_ADD_REPLICAS, (String) null);
@ -40,7 +41,7 @@ public class AutoAddReplicasPlanAction extends ComputePlanAction {
return new NoneSuggester();
}
Policy.Suggester suggester = super.getSuggester(session, event, cloudManager);
Suggester suggester = super.getSuggester(session, event, cloudManager);
ClusterState clusterState;
try {
clusterState = stateProvider.getClusterState();
@ -52,7 +53,7 @@ public class AutoAddReplicasPlanAction extends ComputePlanAction {
for (DocCollection collection: clusterState.getCollectionsMap().values()) {
if (collection.getAutoAddReplicas()) {
anyCollections = true;
suggester.hint(Policy.Suggester.Hint.COLL, collection.getName());
suggester.hint(Suggester.Hint.COLL, collection.getName());
}
}

View File

@ -38,11 +38,10 @@ import org.apache.solr.api.Api;
import org.apache.solr.api.ApiBag;
import org.apache.solr.client.solrj.cloud.DistributedQueueFactory;
import org.apache.solr.client.solrj.cloud.autoscaling.AutoScalingConfig;
import org.apache.solr.client.solrj.cloud.autoscaling.Cell;
import org.apache.solr.client.solrj.cloud.autoscaling.Clause;
import org.apache.solr.client.solrj.cloud.autoscaling.Policy;
import org.apache.solr.client.solrj.cloud.autoscaling.PolicyHelper;
import org.apache.solr.client.solrj.cloud.autoscaling.Preference;
import org.apache.solr.client.solrj.cloud.autoscaling.Row;
import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventProcessorStage;
import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventType;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
@ -217,32 +216,7 @@ public class AutoScalingHandler extends RequestHandlerBase implements Permission
.withHttpClient(container.getUpdateShardHandler().getHttpClient())
.withZkHost(container.getZkController().getZkServerAddress()).build()) {
DistributedQueueFactory queueFactory = new ZkDistributedQueueFactory(container.getZkController().getZkClient());
Policy.Session session = policy.createSession(new SolrClientCloudManager(queueFactory, build));
List<Row> sorted = session.getSorted();
List<Clause.Violation> violations = session.getViolations();
List<Preference> clusterPreferences = policy.getClusterPreferences();
List<Map<String, Object>> sortedNodes = new ArrayList<>(sorted.size());
for (Row row : sorted) {
Map<String, Object> map = Utils.makeMap("node", row.node);
for (Cell cell : row.getCells()) {
for (Preference clusterPreference : clusterPreferences) {
Policy.SortParam name = clusterPreference.getName();
if (cell.getName().equalsIgnoreCase(name.name())) {
map.put(name.name(), cell.getValue());
break;
}
}
}
sortedNodes.add(map);
}
Map<String, Object> map = new HashMap<>(2);
map.put("sortedNodes", sortedNodes);
map.put("violations", violations);
rsp.getValues().add("diagnostics", map);
rsp.getValues().add("diagnostics", PolicyHelper.getDiagnostics(policy, new SolrClientCloudManager(queueFactory, build)));
}
}

View File

@ -27,6 +27,7 @@ import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.cloud.autoscaling.AutoScalingConfig;
import org.apache.solr.client.solrj.cloud.autoscaling.Policy;
import org.apache.solr.client.solrj.cloud.autoscaling.SolrCloudManager;
import org.apache.solr.client.solrj.cloud.autoscaling.Suggester;
import org.apache.solr.common.params.CollectionParams;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -53,7 +54,7 @@ public class ComputePlanAction extends TriggerActionBase {
}
Policy policy = autoScalingConf.getPolicy();
Policy.Session session = policy.createSession(cloudManager);
Policy.Suggester suggester = getSuggester(session, event, cloudManager);
Suggester suggester = getSuggester(session, event, cloudManager);
while (true) {
SolrRequest operation = suggester.getOperation();
if (operation == null) break;
@ -75,17 +76,17 @@ public class ComputePlanAction extends TriggerActionBase {
}
}
protected Policy.Suggester getSuggester(Policy.Session session, TriggerEvent event, SolrCloudManager cloudManager) {
Policy.Suggester suggester;
protected Suggester getSuggester(Policy.Session session, TriggerEvent event, SolrCloudManager cloudManager) {
Suggester suggester;
switch (event.getEventType()) {
case NODEADDED:
suggester = session.getSuggester(CollectionParams.CollectionAction.MOVEREPLICA)
.hint(Policy.Suggester.Hint.TARGET_NODE, event.getProperty(TriggerEvent.NODE_NAMES));
.hint(Suggester.Hint.TARGET_NODE, event.getProperty(TriggerEvent.NODE_NAMES));
log.debug("NODEADDED Created suggester with targetNode: {}", event.getProperty(TriggerEvent.NODE_NAMES));
break;
case NODELOST:
suggester = session.getSuggester(CollectionParams.CollectionAction.MOVEREPLICA)
.hint(Policy.Suggester.Hint.SRC_NODE, event.getProperty(TriggerEvent.NODE_NAMES));
.hint(Suggester.Hint.SRC_NODE, event.getProperty(TriggerEvent.NODE_NAMES));
log.debug("NODELOST Created suggester with srcNode: {}", event.getProperty(TriggerEvent.NODE_NAMES));
break;
default:

View File

@ -22,7 +22,6 @@ import java.util.List;
import java.util.Set;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.cloud.autoscaling.Policy.Suggester;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.util.Pair;

View File

@ -234,15 +234,17 @@ public class Clause implements MapWriter, Comparable<Clause> {
}
}
public class Violation implements MapWriter {
public static class Violation implements MapWriter {
final String shard, coll, node;
final Object actualVal;
final Long delta;//how far is the actual value from the expected value
final Object tagKey;
private final int hash;
private final Clause clause;
private Violation(String coll, String shard, String node, Object actualVal, Long delta, Object tagKey) {
private Violation(Clause clause, String coll, String shard, String node, Object actualVal, Long delta, Object tagKey) {
this.clause = clause;
this.shard = shard;
this.coll = coll;
this.node = node;
@ -253,7 +255,7 @@ public class Clause implements MapWriter, Comparable<Clause> {
}
public Clause getClause() {
return Clause.this;
return clause;
}
@Override
@ -292,7 +294,7 @@ public class Clause implements MapWriter, Comparable<Clause> {
ew.putIfNotNull("tagKey", String.valueOf(tagKey));
ew.putIfNotNull("violation", (MapWriter) ew1 -> {
if (getClause().isPerCollectiontag()) ew1.put("replica", actualVal);
else ew1.put(tag.name, String.valueOf(actualVal));
else ew1.put(clause.tag.name, String.valueOf(actualVal));
ew1.putIfNotNull("delta", delta);
});
ew.put("clause", getClause());
@ -310,7 +312,7 @@ public class Clause implements MapWriter, Comparable<Clause> {
if (!shard.isPass(shardVsCount.getKey())) continue;
for (Map.Entry<String, ReplicaCount> counts : shardVsCount.getValue().entrySet()) {
if (!replica.isPass(counts.getValue())) {
violations.add(new Violation(
violations.add(new Violation(this,
e.getKey(),
shardVsCount.getKey(),
tag.name.equals("node") ? counts.getKey() : null,
@ -325,7 +327,7 @@ public class Clause implements MapWriter, Comparable<Clause> {
} else {
for (Row r : allRows) {
if (!tag.isPass(r)) {
violations.add(new Violation(null, null, r.node, r.getVal(tag.name), tag.delta(r.getVal(tag.name)), null));
violations.add(new Violation(this, null, null, r.node, r.getVal(tag.name), tag.delta(r.getVal(tag.name)), null));
}
}
}

View File

@ -21,7 +21,6 @@ import java.util.List;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.cloud.autoscaling.Clause.Violation;
import org.apache.solr.client.solrj.cloud.autoscaling.Policy.Suggester;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.common.util.Pair;

View File

@ -19,7 +19,7 @@ package org.apache.solr.client.solrj.cloud.autoscaling;
import org.apache.solr.client.solrj.SolrRequest;
public class NoneSuggester extends Policy.Suggester{
public class NoneSuggester extends Suggester {
@Override
SolrRequest init() {
return null;

View File

@ -22,28 +22,22 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumMap;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.cloud.autoscaling.Clause.Violation;
import org.apache.solr.client.solrj.impl.ClusterStateProvider;
import org.apache.solr.common.IteratorWriter;
import org.apache.solr.common.MapWriter;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.params.CollectionParams.CollectionAction;
import org.apache.solr.common.util.Pair;
import org.apache.solr.common.util.StrUtils;
import org.apache.solr.common.util.Utils;
@ -241,7 +235,7 @@ public class Policy implements MapWriter {
applyRules();
}
private void addClausesForCollection(ClusterStateProvider stateProvider, String c) {
void addClausesForCollection(ClusterStateProvider stateProvider, String c) {
String p = stateProvider.getPolicyNameByCollection(c);
if (p != null) {
List<Clause> perCollPolicy = policies.get(p);
@ -269,7 +263,7 @@ public class Policy implements MapWriter {
/**
* Apply the preferences and conditions
*/
private void applyRules() {
void applyRules() {
setApproxValuesAndSortNodes(clusterPreferences, matrix);
for (Clause clause : expandedClauses) {
@ -365,211 +359,6 @@ public class Policy implements MapWriter {
}
/* A suggester is capable of suggesting a collection operation
* given a particular session. Before it suggests a new operation,
* it ensures that ,
* a) load is reduced on the most loaded node
* b) it causes no new violations
*
*/
public static abstract class Suggester {
protected final EnumMap<Hint, Object> hints = new EnumMap<>(Hint.class);
Policy.Session session;
SolrRequest operation;
protected List<Violation> originalViolations = new ArrayList<>();
private boolean isInitialized = false;
private void _init(Session session) {
this.session = session.copy();
}
public Suggester hint(Hint hint, Object value) {
hint.validator.accept(value);
if (hint.multiValued) {
Collection<?> values = value instanceof Collection ? (Collection)value : Collections.singletonList(value);
((Set) hints.computeIfAbsent(hint, h -> new HashSet<>())).addAll(values);
} else {
hints.put(hint, value == null ? null : String.valueOf(value));
}
return this;
}
abstract SolrRequest init();
public SolrRequest getOperation() {
if (!isInitialized) {
Set<String> collections = (Set<String>) hints.getOrDefault(Hint.COLL, Collections.emptySet());
Set<Pair<String, String>> s = (Set<Pair<String, String>>) hints.getOrDefault(Hint.COLL_SHARD, Collections.emptySet());
if (!collections.isEmpty() || !s.isEmpty()) {
HashSet<Pair<String, String>> shards = new HashSet<>(s);
collections.stream().forEach(c -> shards.add(new Pair<>(c, null)));
ClusterStateProvider stateProvider = session.cloudManager.getClusterStateProvider();
for (Pair<String, String> shard : shards) {
// if this is not a known collection from the existing clusterstate,
// then add it
if (session.matrix.stream().noneMatch(row -> row.collectionVsShardVsReplicas.containsKey(shard.first()))) {
session.addClausesForCollection(stateProvider, shard.first());
}
for (Row row : session.matrix) {
Map<String, List<ReplicaInfo>> shardInfo = row.collectionVsShardVsReplicas.computeIfAbsent(shard.first(), it -> new HashMap<>());
if (shard.second() != null) shardInfo.computeIfAbsent(shard.second(), it -> new ArrayList<>());
}
}
Collections.sort(session.expandedClauses);
}
Set<String> srcNodes = (Set<String>) hints.get(Hint.SRC_NODE);
if (srcNodes != null && !srcNodes.isEmpty()) {
// the source node is dead so live nodes may not have it
for (String srcNode : srcNodes) {
if(session.matrix.stream().noneMatch(row -> row.node.equals(srcNode)))
session.matrix.add(new Row(srcNode, session.getPolicy().params, session.cloudManager));
}
}
session.applyRules();
originalViolations.addAll(session.getViolations());
this.operation = init();
isInitialized = true;
}
return operation;
}
public Session getSession() {
return session;
}
List<Row> getMatrix() {
return session.matrix;
}
//check if the fresh set of violations is less serious than the last set of violations
boolean isLessSerious(List<Violation> fresh, List<Violation> old) {
if (old == null || fresh.size() < old.size()) return true;
if (fresh.size() == old.size()) {
for (int i = 0; i < fresh.size(); i++) {
Violation freshViolation = fresh.get(i);
Violation oldViolation = null;
for (Violation v : old) {
if (v.equals(freshViolation)) oldViolation = v;
}
if (oldViolation != null && freshViolation.isLessSerious(oldViolation)) return true;
}
}
return false;
}
boolean containsNewErrors(List<Violation> violations) {
for (Violation v : violations) {
int idx = originalViolations.indexOf(v);
if (idx < 0 || originalViolations.get(idx).isLessSerious(v)) return true;
}
return false;
}
List<Pair<ReplicaInfo, Row>> getValidReplicas(boolean sortDesc, boolean isSource, int until) {
List<Pair<ReplicaInfo, Row>> allPossibleReplicas = new ArrayList<>();
if (sortDesc) {
if (until == -1) until = getMatrix().size();
for (int i = 0; i < until; i++) addReplicaToList(getMatrix().get(i), isSource, allPossibleReplicas);
} else {
if (until == -1) until = 0;
for (int i = getMatrix().size() - 1; i >= until; i--)
addReplicaToList(getMatrix().get(i), isSource, allPossibleReplicas);
}
return allPossibleReplicas;
}
void addReplicaToList(Row r, boolean isSource, List<Pair<ReplicaInfo, Row>> replicaList) {
if (!isAllowed(r.node, isSource ? Hint.SRC_NODE : Hint.TARGET_NODE)) return;
for (Map.Entry<String, Map<String, List<ReplicaInfo>>> e : r.collectionVsShardVsReplicas.entrySet()) {
if (!isAllowed(e.getKey(), Hint.COLL)) continue;
for (Map.Entry<String, List<ReplicaInfo>> shard : e.getValue().entrySet()) {
if (!isAllowed(new Pair<>(e.getKey(), shard.getKey()), Hint.COLL_SHARD)) continue;//todo fix
if(shard.getValue() == null || shard.getValue().isEmpty()) continue;
replicaList.add(new Pair<>(shard.getValue().get(0), r));
}
}
}
List<Violation> testChangedMatrix(boolean strict, List<Row> rows) {
setApproxValuesAndSortNodes(session.getPolicy().clusterPreferences,rows);
List<Violation> errors = new ArrayList<>();
for (Clause clause : session.expandedClauses) {
if (strict || clause.strict) {
List<Violation> errs = clause.test(rows);
if (!errs.isEmpty()) {
errors.addAll(errs);
}
}
}
return errors;
}
ArrayList<Row> getModifiedMatrix(List<Row> matrix, Row tmpRow, int i) {
ArrayList<Row> copy = new ArrayList<>(matrix);
copy.set(i, tmpRow);
return copy;
}
protected boolean isAllowed(Object v, Hint hint) {
Object hintVal = hints.get(hint);
if (hint.multiValued) {
Set set = (Set) hintVal;
return set == null || set.contains(v);
} else {
return hintVal == null || Objects.equals(v, hintVal);
}
}
public enum Hint {
COLL(true),
// collection shard pair
// this should be a Pair<String, String> , (collection,shard)
COLL_SHARD(true, v -> {
Collection c = v instanceof Collection ? (Collection) v : Collections.singleton(v);
for (Object o : c) {
if (!(o instanceof Pair)) {
throw new RuntimeException("SHARD hint must use a Pair");
}
Pair p = (Pair) o;
if (p.first() == null || p.second() == null) {
throw new RuntimeException("Both collection and shard must not be null");
}
}
}),
SRC_NODE(true),
TARGET_NODE(true),
REPLICATYPE(false, o -> {
if (!(o instanceof Replica.Type)) {
throw new RuntimeException("REPLICATYPE hint must use a ReplicaType");
}
});
public final boolean multiValued;
public final Consumer<Object> validator;
Hint(boolean multiValued) {
this(multiValued, v -> {
Collection c = v instanceof Collection ? (Collection) v : Collections.singleton(v);
for (Object o : c) {
if (!(o instanceof String)) throw new RuntimeException("hint must be of type String");
}
});
}
Hint(boolean multiValued, Consumer<Object> c) {
this.multiValued = multiValued;
this.validator = c;
}
}
}
public static Map<String, List<Clause>> policiesFromMap(Map<String, List<Map<String, Object>>> map, List<String> newParams) {
Map<String, List<Clause>> newPolicies = new HashMap<>();
map.forEach((s, l1) ->

View File

@ -27,8 +27,10 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.cloud.autoscaling.Policy.Suggester.Hint;
import org.apache.solr.client.solrj.cloud.autoscaling.Suggester.Hint;
import org.apache.solr.client.solrj.impl.ClusterStateProvider;
import org.apache.solr.client.solrj.impl.SolrClientCloudManager;
import org.apache.solr.common.MapWriter;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.ReplicaPosition;
@ -79,7 +81,7 @@ public class PolicyHelper {
int idx = 0;
for (Map.Entry<Replica.Type, Integer> e : typeVsCount.entrySet()) {
for (int i = 0; i < e.getValue(); i++) {
Policy.Suggester suggester = session.getSuggester(ADDREPLICA)
Suggester suggester = session.getSuggester(ADDREPLICA)
.hint(Hint.REPLICATYPE, e.getKey())
.hint(Hint.COLL_SHARD, new Pair<>(collName, shardName));
if (nodesList != null) {
@ -108,6 +110,36 @@ public class PolicyHelper {
public static final int SESSION_EXPIRY = 180;//3 seconds
public static ThreadLocal<Long> REF_VERSION = new ThreadLocal<>();
public static MapWriter getDiagnostics(Policy policy, SolrClientCloudManager cloudManager) {
Policy.Session session = policy.createSession(cloudManager);
List<Row> sorted = session.getSorted();
List<Clause.Violation> violations = session.getViolations();
List<Preference> clusterPreferences = policy.getClusterPreferences();
List<Map<String, Object>> sortedNodes = new ArrayList<>(sorted.size());
for (Row row : sorted) {
Map<String, Object> map = Utils.makeMap("node", row.node);
for (Cell cell : row.getCells()) {
for (Preference clusterPreference : clusterPreferences) {
Policy.SortParam name = clusterPreference.getName();
if (cell.getName().equalsIgnoreCase(name.name())) {
map.put(name.name(), cell.getValue());
break;
}
}
}
sortedNodes.add(map);
}
return ew -> {
ew.put("sortedNodes", sortedNodes);
ew.put("violations", violations);
};
}
public static class SessionRef {
private final AtomicLong myVersion = new AtomicLong(0);
AtomicInteger refCount = new AtomicInteger();

View File

@ -32,14 +32,14 @@ public class ReplicaInfo implements MapWriter {
final String core, collection, shard;
final Map<String, Object> variables = new HashMap<>();
public ReplicaInfo(String name, String coll, String shard, Map<String, Object> vals) {
public ReplicaInfo(String name, String coreName, String coll, String shard, Map<String, Object> vals) {
this.name = name;
this.core = coreName == null ? (String) vals.get("core") : coreName;
if (vals != null) {
this.variables.putAll(vals);
}
this.collection = coll;
this.shard = shard;
this.core = (String)vals.get("core");
}
@Override

View File

@ -98,7 +98,8 @@ public class Row implements MapWriter {
Row row = copy();
Map<String, List<ReplicaInfo>> c = row.collectionVsShardVsReplicas.computeIfAbsent(coll, k -> new HashMap<>());
List<ReplicaInfo> replicas = c.computeIfAbsent(shard, k -> new ArrayList<>());
replicas.add(new ReplicaInfo("" + new Random().nextInt(1000) + 1000, coll, shard,
String replicaname = "" + new Random().nextInt(1000) + 1000;
replicas.add(new ReplicaInfo(replicaname,replicaname, coll, shard,
Collections.singletonMap(ZkStateReader.REPLICA_TYPE, type != null ? type.toString() : Replica.Type.NRT.toString())));
for (Cell cell : row.cells) {
if (cell.name.equals("cores")) {

View File

@ -0,0 +1,240 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.client.solrj.cloud.autoscaling;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumMap;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.function.Consumer;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.impl.ClusterStateProvider;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.util.Pair;
/* A suggester is capable of suggesting a collection operation
* given a particular session. Before it suggests a new operation,
* it ensures that ,
* a) load is reduced on the most loaded node
* b) it causes no new violations
*
*/
public abstract class Suggester {
protected final EnumMap<Hint, Object> hints = new EnumMap<>(Hint.class);
Policy.Session session;
SolrRequest operation;
protected List<Clause.Violation> originalViolations = new ArrayList<>();
private boolean isInitialized = false;
void _init(Policy.Session session) {
this.session = session.copy();
}
public Suggester hint(Hint hint, Object value) {
hint.validator.accept(value);
if (hint.multiValued) {
Collection<?> values = value instanceof Collection ? (Collection)value : Collections.singletonList(value);
((Set) hints.computeIfAbsent(hint, h -> new HashSet<>())).addAll(values);
} else {
hints.put(hint, value == null ? null : String.valueOf(value));
}
return this;
}
abstract SolrRequest init();
public SolrRequest getOperation() {
if (!isInitialized) {
Set<String> collections = (Set<String>) hints.getOrDefault(Hint.COLL, Collections.emptySet());
Set<Pair<String, String>> s = (Set<Pair<String, String>>) hints.getOrDefault(Hint.COLL_SHARD, Collections.emptySet());
if (!collections.isEmpty() || !s.isEmpty()) {
HashSet<Pair<String, String>> shards = new HashSet<>(s);
collections.stream().forEach(c -> shards.add(new Pair<>(c, null)));
ClusterStateProvider stateProvider = session.cloudManager.getClusterStateProvider();
for (Pair<String, String> shard : shards) {
// if this is not a known collection from the existing clusterstate,
// then add it
if (session.matrix.stream().noneMatch(row -> row.collectionVsShardVsReplicas.containsKey(shard.first()))) {
session.addClausesForCollection(stateProvider, shard.first());
}
for (Row row : session.matrix) {
Map<String, List<ReplicaInfo>> shardInfo = row.collectionVsShardVsReplicas.computeIfAbsent(shard.first(), it -> new HashMap<>());
if (shard.second() != null) shardInfo.computeIfAbsent(shard.second(), it -> new ArrayList<>());
}
}
Collections.sort(session.expandedClauses);
}
Set<String> srcNodes = (Set<String>) hints.get(Hint.SRC_NODE);
if (srcNodes != null && !srcNodes.isEmpty()) {
// the source node is dead so live nodes may not have it
for (String srcNode : srcNodes) {
if(session.matrix.stream().noneMatch(row -> row.node.equals(srcNode)))
session.matrix.add(new Row(srcNode, session.getPolicy().params, session.cloudManager));
}
}
session.applyRules();
originalViolations.addAll(session.getViolations());
this.operation = init();
isInitialized = true;
}
return operation;
}
public Policy.Session getSession() {
return session;
}
List<Row> getMatrix() {
return session.matrix;
}
//check if the fresh set of violations is less serious than the last set of violations
boolean isLessSerious(List<Clause.Violation> fresh, List<Clause.Violation> old) {
if (old == null || fresh.size() < old.size()) return true;
if (fresh.size() == old.size()) {
for (int i = 0; i < fresh.size(); i++) {
Clause.Violation freshViolation = fresh.get(i);
Clause.Violation oldViolation = null;
for (Clause.Violation v : old) {
if (v.equals(freshViolation)) oldViolation = v;
}
if (oldViolation != null && freshViolation.isLessSerious(oldViolation)) return true;
}
}
return false;
}
boolean containsNewErrors(List<Clause.Violation> violations) {
for (Clause.Violation v : violations) {
int idx = originalViolations.indexOf(v);
if (idx < 0 || originalViolations.get(idx).isLessSerious(v)) return true;
}
return false;
}
List<Pair<ReplicaInfo, Row>> getValidReplicas(boolean sortDesc, boolean isSource, int until) {
List<Pair<ReplicaInfo, Row>> allPossibleReplicas = new ArrayList<>();
if (sortDesc) {
if (until == -1) until = getMatrix().size();
for (int i = 0; i < until; i++) addReplicaToList(getMatrix().get(i), isSource, allPossibleReplicas);
} else {
if (until == -1) until = 0;
for (int i = getMatrix().size() - 1; i >= until; i--)
addReplicaToList(getMatrix().get(i), isSource, allPossibleReplicas);
}
return allPossibleReplicas;
}
void addReplicaToList(Row r, boolean isSource, List<Pair<ReplicaInfo, Row>> replicaList) {
if (!isAllowed(r.node, isSource ? Hint.SRC_NODE : Hint.TARGET_NODE)) return;
for (Map.Entry<String, Map<String, List<ReplicaInfo>>> e : r.collectionVsShardVsReplicas.entrySet()) {
if (!isAllowed(e.getKey(), Hint.COLL)) continue;
for (Map.Entry<String, List<ReplicaInfo>> shard : e.getValue().entrySet()) {
if (!isAllowed(new Pair<>(e.getKey(), shard.getKey()), Hint.COLL_SHARD)) continue;//todo fix
if(shard.getValue() == null || shard.getValue().isEmpty()) continue;
replicaList.add(new Pair<>(shard.getValue().get(0), r));
}
}
}
List<Clause.Violation> testChangedMatrix(boolean strict, List<Row> rows) {
Policy.setApproxValuesAndSortNodes(session.getPolicy().clusterPreferences,rows);
List<Clause.Violation> errors = new ArrayList<>();
for (Clause clause : session.expandedClauses) {
if (strict || clause.strict) {
List<Clause.Violation> errs = clause.test(rows);
if (!errs.isEmpty()) {
errors.addAll(errs);
}
}
}
return errors;
}
ArrayList<Row> getModifiedMatrix(List<Row> matrix, Row tmpRow, int i) {
ArrayList<Row> copy = new ArrayList<>(matrix);
copy.set(i, tmpRow);
return copy;
}
protected boolean isAllowed(Object v, Hint hint) {
Object hintVal = hints.get(hint);
if (hint.multiValued) {
Set set = (Set) hintVal;
return set == null || set.contains(v);
} else {
return hintVal == null || Objects.equals(v, hintVal);
}
}
public enum Hint {
COLL(true),
// collection shard pair
// this should be a Pair<String, String> , (collection,shard)
COLL_SHARD(true, v -> {
Collection c = v instanceof Collection ? (Collection) v : Collections.singleton(v);
for (Object o : c) {
if (!(o instanceof Pair)) {
throw new RuntimeException("SHARD hint must use a Pair");
}
Pair p = (Pair) o;
if (p.first() == null || p.second() == null) {
throw new RuntimeException("Both collection and shard must not be null");
}
}
}),
SRC_NODE(true),
TARGET_NODE(true),
REPLICATYPE(false, o -> {
if (!(o instanceof Replica.Type)) {
throw new RuntimeException("REPLICATYPE hint must use a ReplicaType");
}
});
public final boolean multiValued;
public final Consumer<Object> validator;
Hint(boolean multiValued) {
this(multiValued, v -> {
Collection c = v instanceof Collection ? (Collection) v : Collections.singleton(v);
for (Object o : c) {
if (!(o instanceof String)) throw new RuntimeException("hint must be of type String");
}
});
}
Hint(boolean multiValued, Consumer<Object> c) {
this.multiValued = multiValued;
this.validator = c;
}
}
}

View File

@ -84,7 +84,7 @@ public class SolrClientNodeStateProvider implements NodeStateProvider {
Map<String, Map<String, List<ReplicaInfo>>> nodeData = data.computeIfAbsent(replica.getNodeName(), k -> new HashMap<>());
Map<String, List<ReplicaInfo>> collData = nodeData.computeIfAbsent(collName, k -> new HashMap<>());
List<ReplicaInfo> replicas = collData.computeIfAbsent(shard, k -> new ArrayList<>());
replicas.add(new ReplicaInfo(replica.getName(), collName, shard, replica.getProperties()));
replicas.add(new ReplicaInfo(replica.getName(), replica.getCoreName(), collName, shard, replica.getProperties()));
});
});
if(log.isDebugEnabled()) INST = this;

View File

@ -37,7 +37,7 @@ import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.SolrResponse;
import org.apache.solr.client.solrj.cloud.DistributedQueueFactory;
import org.apache.solr.client.solrj.cloud.autoscaling.Clause.Violation;
import org.apache.solr.client.solrj.cloud.autoscaling.Policy.Suggester.Hint;
import org.apache.solr.client.solrj.cloud.autoscaling.Suggester.Hint;
import org.apache.solr.client.solrj.impl.ClusterStateProvider;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.common.cloud.Replica;
@ -115,7 +115,7 @@ public class TestPolicy extends SolrTestCaseJ4 {
if (!node_name.equals(node)) return;
Map<String, List<ReplicaInfo>> shardVsReplicaStats = result.computeIfAbsent(collName, k -> new HashMap<>());
List<ReplicaInfo> replicaInfos = shardVsReplicaStats.computeIfAbsent(shard, k -> new ArrayList<>());
replicaInfos.add(new ReplicaInfo(replicaName, collName, shard, r));
replicaInfos.add(new ReplicaInfo(replicaName,replicaName, collName, shard, r));
});
});
});
@ -309,7 +309,7 @@ public class TestPolicy extends SolrTestCaseJ4 {
AutoScalingConfig config = new AutoScalingConfig(policies);
Policy policy = config.getPolicy();
Policy.Session session = policy.createSession(provider);
Policy.Suggester suggester = session.getSuggester(MOVEREPLICA)
Suggester suggester = session.getSuggester(MOVEREPLICA)
.hint(Hint.SRC_NODE, "node1");
SolrRequest operation = suggester.getOperation();
@ -424,7 +424,8 @@ public class TestPolicy extends SolrTestCaseJ4 {
for (int i = 0; i < l3.size(); i++) {
Object o = l3.get(i);
Map m3 = (Map) o;
l3.set(i, new ReplicaInfo(m3.keySet().iterator().next().toString()
String name = m3.keySet().iterator().next().toString();
l3.set(i, new ReplicaInfo(name,name
,coll.toString(), shard.toString(), m3));
}
});
@ -480,7 +481,7 @@ public class TestPolicy extends SolrTestCaseJ4 {
"node4:{cores:8, freedisk: 375, heapUsage:16900, nodeRole:overseer, rack: rack1}" +
"}");
Policy policy = new Policy(policies);
Policy.Suggester suggester = policy.createSession(getSolrCloudManager(nodeValues, clusterState))
Suggester suggester = policy.createSession(getSolrCloudManager(nodeValues, clusterState))
.getSuggester(ADDREPLICA)
.hint(Hint.COLL_SHARD, new Pair("newColl", "shard1"))
.hint(Hint.REPLICATYPE, Replica.Type.PULL);
@ -620,11 +621,11 @@ public class TestPolicy extends SolrTestCaseJ4 {
" 'tlogReplicas':'0'}\n" +
"}";
Policy policy = new Policy(new HashMap<>());
Policy.Suggester suggester = policy.createSession(getSolrCloudManager(nodeValues, clusterState))
Suggester suggester = policy.createSession(getSolrCloudManager(nodeValues, clusterState))
.getSuggester(MOVEREPLICA)
.hint(Hint.COLL, "collection1")
.hint(Hint.COLL, "collection2")
.hint(Policy.Suggester.Hint.SRC_NODE, "node2");
.hint(Suggester.Hint.SRC_NODE, "node2");
SolrRequest op = suggester.getOperation();
assertNotNull(op);
assertEquals("collection2", op.getParams().get("collection"));
@ -636,7 +637,7 @@ public class TestPolicy extends SolrTestCaseJ4 {
.getSuggester(MOVEREPLICA)
.hint(Hint.COLL, "collection1")
.hint(Hint.COLL, "collection2")
.hint(Policy.Suggester.Hint.SRC_NODE, "node2");
.hint(Suggester.Hint.SRC_NODE, "node2");
op = suggester.getOperation();
assertNotNull(op);
assertEquals("collection2", op.getParams().get("collection"));
@ -648,7 +649,7 @@ public class TestPolicy extends SolrTestCaseJ4 {
.getSuggester(MOVEREPLICA)
.hint(Hint.COLL, "collection1")
.hint(Hint.COLL, "collection2")
.hint(Policy.Suggester.Hint.SRC_NODE, "node2");
.hint(Suggester.Hint.SRC_NODE, "node2");
op = suggester.getOperation();
assertNull(op);
}
@ -676,7 +677,7 @@ public class TestPolicy extends SolrTestCaseJ4 {
"node4:{cores:8, freedisk: 375, heapUsage:16900, nodeRole:overseer, rack: rack1}" +
"}");
Policy policy = new Policy(policies);
Policy.Suggester suggester = policy.createSession(getSolrCloudManager(nodeValues, clusterState))
Suggester suggester = policy.createSession(getSolrCloudManager(nodeValues, clusterState))
.getSuggester(ADDREPLICA)
.hint(Hint.REPLICATYPE, Replica.Type.PULL)
.hint(Hint.COLL_SHARD, new Pair<>("newColl", "shard1"))
@ -819,7 +820,7 @@ public class TestPolicy extends SolrTestCaseJ4 {
assertTrue(violations.stream().anyMatch(violation -> "nodeRole".equals(violation.getClause().tag.getName())));
assertTrue(violations.stream().anyMatch(violation -> (violation.getClause().replica.getOperand() == Operand.LESS_THAN && "node".equals(violation.getClause().tag.getName()))));
Policy.Suggester suggester = session.getSuggester(ADDREPLICA)
Suggester suggester = session.getSuggester(ADDREPLICA)
.hint(Hint.COLL_SHARD, new Pair<>("gettingstarted","r1"));
SolrParams operation = suggester.getOperation().getParams();
assertEquals("node2", operation.get("node"));
@ -935,7 +936,7 @@ public class TestPolicy extends SolrTestCaseJ4 {
SolrCloudManager cloudManager = getSolrCloudManager(nodeValues, clusterState);
Policy.Session session = policy.createSession(cloudManager);
for (int i = 0; i < 3; i++) {
Policy.Suggester suggester = session.getSuggester(ADDREPLICA);
Suggester suggester = session.getSuggester(ADDREPLICA);
SolrRequest op = suggester
.hint(Hint.COLL_SHARD, new Pair<>("newColl","shard1"))
.getOperation();
@ -966,7 +967,7 @@ public class TestPolicy extends SolrTestCaseJ4 {
Policy policy = new Policy((Map<String, Object>) Utils.fromJSONString(autoscaleJson));
SolrCloudManager cloudManager = getSolrCloudManager(nodeValues, clusterState);
Policy.Session session = policy.createSession(cloudManager);
Policy.Suggester suggester = session.getSuggester(ADDREPLICA);
Suggester suggester = session.getSuggester(ADDREPLICA);
SolrRequest op = suggester
.hint(Hint.COLL_SHARD, new Pair<>("newColl", "shard1"))
.getOperation();
@ -1006,8 +1007,8 @@ public class TestPolicy extends SolrTestCaseJ4 {
" {'core_node2':{}}]}}}");
Map m = (Map) Utils.getObjectByPath(replicaInfoMap, false, "127.0.0.1:60089_solr/compute_plan_action_test");
m.put("shard1", Arrays.asList(
new ReplicaInfo("core_node1", "compute_plan_action_test", "shard1", Collections.singletonMap(ZkStateReader.REPLICA_TYPE, Replica.Type.NRT)),
new ReplicaInfo("core_node2", "compute_plan_action_test", "shard1", Collections.singletonMap(ZkStateReader.REPLICA_TYPE, Replica.Type.NRT))
new ReplicaInfo("core_node1", "core_node1", "compute_plan_action_test", "shard1", Collections.singletonMap(ZkStateReader.REPLICA_TYPE, Replica.Type.NRT)),
new ReplicaInfo("core_node2", "core_node2", "compute_plan_action_test", "shard1", Collections.singletonMap(ZkStateReader.REPLICA_TYPE, Replica.Type.NRT))
));
Map<String, Map<String, Object>> tagsMap = (Map) Utils.fromJSONString("{" +
@ -1046,7 +1047,7 @@ public class TestPolicy extends SolrTestCaseJ4 {
};
}
});
Policy.Suggester suggester = session.getSuggester(MOVEREPLICA)
Suggester suggester = session.getSuggester(MOVEREPLICA)
.hint(Hint.TARGET_NODE, "127.0.0.1:60099_solr");
SolrRequest op = suggester.getOperation();
assertNotNull(op);
@ -1308,7 +1309,7 @@ public class TestPolicy extends SolrTestCaseJ4 {
" 'cluster-preferences':[{'minimize':'cores'}]}";
Policy policy = new Policy((Map<String, Object>) Utils.fromJSONString(autoScalingjson));
Policy.Session session = policy.createSession(cloudManagerWithData(dataproviderdata));
Policy.Suggester suggester = session.getSuggester(MOVEREPLICA).hint(Hint.TARGET_NODE, "10.0.0.6:7574_solr");
Suggester suggester = session.getSuggester(MOVEREPLICA).hint(Hint.TARGET_NODE, "10.0.0.6:7574_solr");
SolrRequest op = suggester.getOperation();
assertNotNull(op);
suggester = suggester.getSession().getSuggester(MOVEREPLICA).hint(Hint.TARGET_NODE, "10.0.0.6:7574_solr");
@ -1345,7 +1346,7 @@ public class TestPolicy extends SolrTestCaseJ4 {
" { nodeRole:overseer,replica:0}]}";
Policy policy = new Policy((Map<String, Object>) Utils.fromJSONString(autoScalingjson));
Policy.Session session = policy.createSession(cloudManagerWithData(dataproviderdata));
Policy.Suggester suggester = session.getSuggester(CollectionParams.CollectionAction.MOVEREPLICA)
Suggester suggester = session.getSuggester(CollectionParams.CollectionAction.MOVEREPLICA)
.hint(Hint.TARGET_NODE, "127.0.0.1:51147_solr");
SolrRequest op = suggester.getOperation();
log.info("" + op);