mirror of https://github.com/apache/lucene.git
SOLR-10278: added more tests and a SolrJ based client state provider
This commit is contained in:
parent
a5824cd962
commit
1309f75b05
|
@ -60,6 +60,7 @@ import org.apache.solr.common.SolrException.ErrorCode;
|
|||
import org.apache.solr.common.SolrInputDocument;
|
||||
import org.apache.solr.common.ToleratedUpdateError;
|
||||
import org.apache.solr.common.cloud.ClusterState;
|
||||
import org.apache.solr.common.cloud.ClusterState.CollectionRef;
|
||||
import org.apache.solr.common.cloud.CollectionStatePredicate;
|
||||
import org.apache.solr.common.cloud.CollectionStateWatcher;
|
||||
import org.apache.solr.common.cloud.DocCollection;
|
||||
|
@ -1445,7 +1446,7 @@ public class CloudSolrClient extends SolrClient {
|
|||
&& !cacheEntry.shoulRetry()) return col;
|
||||
}
|
||||
|
||||
ClusterState.CollectionRef ref = getCollectionRef(collection);
|
||||
CollectionRef ref = getCollectionRef(collection);
|
||||
if (ref == null) {
|
||||
//no such collection exists
|
||||
return null;
|
||||
|
@ -1480,7 +1481,7 @@ public class CloudSolrClient extends SolrClient {
|
|||
}
|
||||
}
|
||||
|
||||
ClusterState.CollectionRef getCollectionRef(String collection) {
|
||||
CollectionRef getCollectionRef(String collection) {
|
||||
return stateProvider.getState(collection);
|
||||
}
|
||||
|
||||
|
@ -1729,9 +1730,9 @@ public class CloudSolrClient extends SolrClient {
|
|||
}
|
||||
}
|
||||
|
||||
interface ClusterStateProvider extends Closeable {
|
||||
public interface ClusterStateProvider extends Closeable {
|
||||
|
||||
ClusterState.CollectionRef getState(String collection);
|
||||
CollectionRef getState(String collection);
|
||||
|
||||
Set<String> liveNodes();
|
||||
|
||||
|
@ -1741,6 +1742,8 @@ public class CloudSolrClient extends SolrClient {
|
|||
|
||||
Map<String, Object> getClusterProperties();
|
||||
|
||||
Map<String,CollectionRef> getCollections();
|
||||
|
||||
void connect();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -27,6 +27,8 @@ import java.util.Set;
|
|||
import org.apache.solr.common.SolrException;
|
||||
import org.apache.solr.common.cloud.Aliases;
|
||||
import org.apache.solr.common.cloud.ClusterState;
|
||||
import org.apache.solr.common.cloud.ClusterState.CollectionRef;
|
||||
import org.apache.solr.common.cloud.DocCollection;
|
||||
import org.apache.solr.common.cloud.ZkStateReader;
|
||||
import org.apache.solr.common.cloud.ZooKeeperException;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
|
@ -52,7 +54,7 @@ public class ZkClientClusterStateProvider implements CloudSolrClient.ClusterStat
|
|||
}
|
||||
|
||||
@Override
|
||||
public ClusterState.CollectionRef getState(String collection) {
|
||||
public CollectionRef getState(String collection) {
|
||||
return zkStateReader.getClusterState().getCollectionRef(collection);
|
||||
}
|
||||
|
||||
|
@ -181,6 +183,11 @@ public class ZkClientClusterStateProvider implements CloudSolrClient.ClusterStat
|
|||
return zkHostString;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, CollectionRef> getCollections() {
|
||||
return zkStateReader.getClusterState().getCollectionStates();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return zkHost;
|
||||
|
|
|
@ -25,6 +25,7 @@ import java.util.List;
|
|||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Set;
|
||||
import java.util.function.BiConsumer;
|
||||
|
||||
import org.apache.solr.common.SolrException;
|
||||
import org.apache.solr.common.SolrException.ErrorCode;
|
||||
|
@ -165,6 +166,10 @@ public class DocCollection extends ZkNodeProps implements Iterable<Slice> {
|
|||
return slices.get(sliceName);
|
||||
}
|
||||
|
||||
public void forEachReplica(BiConsumer<String, Replica> consumer) {
|
||||
slices.forEach((shard, slice) -> slice.getReplicasMap().forEach((s, replica) -> consumer.accept(shard, replica)));
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the list of all slices for this collection.
|
||||
*/
|
||||
|
|
|
@ -68,9 +68,10 @@ public class Utils {
|
|||
}
|
||||
|
||||
private static Object makeDeepCopy(Object v, int maxDepth, boolean mutable) {
|
||||
if (v instanceof MapWriter) v = ((MapWriter) v).toMap(new LinkedHashMap<>());
|
||||
else if (v instanceof IteratorWriter) v = ((IteratorWriter) v).toList(new ArrayList<>());
|
||||
else if (v instanceof Map) v = getDeepCopy((Map) v, maxDepth - 1, mutable);
|
||||
if (v instanceof MapWriter && maxDepth > 1) v = ((MapWriter) v).toMap(new LinkedHashMap<>());
|
||||
else if (v instanceof IteratorWriter && maxDepth > 1) v = ((IteratorWriter) v).toList(new ArrayList<>());
|
||||
|
||||
if (v instanceof Map) v = getDeepCopy((Map) v, maxDepth - 1, mutable);
|
||||
else if (v instanceof Collection) v = getDeepCopy((Collection) v, maxDepth - 1, mutable);
|
||||
return v;
|
||||
}
|
||||
|
|
|
@ -20,21 +20,16 @@ package org.apache.solr.recipe;
|
|||
import java.util.Map;
|
||||
|
||||
import org.apache.solr.common.util.Utils;
|
||||
import org.apache.solr.recipe.Policy.BaseSuggester;
|
||||
import org.apache.solr.recipe.Policy.Session;
|
||||
import org.apache.solr.recipe.Policy.Suggester;
|
||||
|
||||
import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
|
||||
import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
|
||||
import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDREPLICA;
|
||||
import static org.apache.solr.common.params.CoreAdminParams.NODE;
|
||||
|
||||
class AddReplicaSuggester extends BaseSuggester {
|
||||
class AddReplicaSuggester extends Suggester {
|
||||
|
||||
AddReplicaSuggester(String coll, String shard, Session session) {
|
||||
super(coll, shard, session);
|
||||
}
|
||||
|
||||
Map get() {
|
||||
Map init() {
|
||||
Map operation = tryEachNode(true);
|
||||
if (operation == null) operation = tryEachNode(false);
|
||||
return operation;
|
||||
|
@ -46,11 +41,12 @@ class AddReplicaSuggester extends BaseSuggester {
|
|||
Row row = matrix.get(i);
|
||||
row = row.addReplica(coll, shard);
|
||||
row.violations.clear();
|
||||
for (Clause clause : session.getRuleSorter().clauses) {
|
||||
for (Clause clause : session.getPolicy().clauses) {
|
||||
if (strict || clause.strict) clause.test(row);
|
||||
}
|
||||
if (row.violations.isEmpty()) {
|
||||
return Utils.makeMap("operation", ADDREPLICA,
|
||||
if (row.violations.isEmpty()) {// there are no rule violations
|
||||
matrix.set(i, matrix.get(i).addReplica(coll, shard));
|
||||
return Utils.makeMap("operation", ADDREPLICA.toLower(),
|
||||
COLLECTION_PROP, coll,
|
||||
SHARD_ID_PROP, shard,
|
||||
NODE, row.node);
|
||||
|
|
|
@ -27,7 +27,7 @@ import java.util.concurrent.atomic.AtomicReference;
|
|||
|
||||
import org.apache.solr.common.MapWriter;
|
||||
import org.apache.solr.common.util.Utils;
|
||||
import org.apache.solr.recipe.Policy.ReplicaStat;
|
||||
import org.apache.solr.recipe.Policy.ReplicaInfo;
|
||||
|
||||
import static java.util.Collections.singletonMap;
|
||||
import static org.apache.solr.common.params.CoreAdminParams.COLLECTION;
|
||||
|
@ -125,10 +125,10 @@ public class Clause implements MapWriter {
|
|||
TestStatus test(Row row) {
|
||||
AtomicReference<TestStatus> result = new AtomicReference<>(NOT_APPLICABLE);
|
||||
|
||||
for (Map.Entry<String, Map<String, List<ReplicaStat>>> colls : row.replicaInfo.entrySet()) {
|
||||
for (Map.Entry<String, Map<String, List<ReplicaInfo>>> colls : row.replicaInfo.entrySet()) {
|
||||
if (!collection.isPass(colls.getKey()) || result.get() == FAIL) break;
|
||||
int count = 0;
|
||||
for (Map.Entry<String, List<ReplicaStat>> shards : colls.getValue().entrySet()) {
|
||||
for (Map.Entry<String, List<ReplicaInfo>> shards : colls.getValue().entrySet()) {
|
||||
if (!shard.isPass(shards.getKey()) || result.get() == FAIL) break;
|
||||
count += shards.getValue().size();
|
||||
if (shard.val.equals(EACH)) testReplicaCount(row, result, count);
|
||||
|
|
|
@ -21,8 +21,7 @@ import java.util.Map;
|
|||
|
||||
import org.apache.solr.common.util.Pair;
|
||||
import org.apache.solr.common.util.Utils;
|
||||
import org.apache.solr.recipe.Policy.BaseSuggester;
|
||||
import org.apache.solr.recipe.Policy.Session;
|
||||
import org.apache.solr.recipe.Policy.Suggester;
|
||||
|
||||
import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
|
||||
import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
|
||||
|
@ -30,13 +29,9 @@ import static org.apache.solr.common.params.CollectionParams.CollectionAction.MO
|
|||
import static org.apache.solr.common.params.CoreAdminParams.NODE;
|
||||
import static org.apache.solr.common.params.CoreAdminParams.REPLICA;
|
||||
|
||||
public class MoveReplicaSuggester extends BaseSuggester{
|
||||
|
||||
MoveReplicaSuggester(String coll, String shard, Session session) {
|
||||
super(coll, shard, session);
|
||||
}
|
||||
|
||||
Map get() {
|
||||
public class MoveReplicaSuggester extends Suggester {
|
||||
@Override
|
||||
Map init() {
|
||||
Map operation = tryEachNode(true);
|
||||
if (operation == null) operation = tryEachNode(false);
|
||||
return operation;
|
||||
|
@ -46,24 +41,27 @@ public class MoveReplicaSuggester extends BaseSuggester{
|
|||
//iterate through elements and identify the least loaded
|
||||
for (int i = 0; i < matrix.size(); i++) {
|
||||
Row fromRow = matrix.get(i);
|
||||
Pair<Row, Policy.ReplicaStat> pair = fromRow.removeReplica(coll, shard);
|
||||
Pair<Row, Policy.ReplicaInfo> pair = fromRow.removeReplica(coll, shard);
|
||||
fromRow = pair.first();
|
||||
if(fromRow == null){
|
||||
//no such replica available
|
||||
continue;
|
||||
}
|
||||
|
||||
for (Clause clause : session.getRuleSorter().clauses) {
|
||||
for (Clause clause : session.getPolicy().clauses) {
|
||||
if (strict || clause.strict) clause.test(fromRow);
|
||||
}
|
||||
if (fromRow.violations.isEmpty()) {
|
||||
for (int j = matrix.size() - 1; j > i; i--) {
|
||||
Row targetRow = matrix.get(i);
|
||||
targetRow = targetRow.addReplica(coll, shard);
|
||||
for (Clause clause : session.getRuleSorter().clauses) {
|
||||
targetRow.violations.clear();
|
||||
for (Clause clause : session.getPolicy().clauses) {
|
||||
if (strict || clause.strict) clause.test(targetRow);
|
||||
}
|
||||
if (targetRow.violations.isEmpty()) {
|
||||
matrix.set(i, matrix.get(i).removeReplica(coll, shard).first());
|
||||
matrix.set(j, matrix.get(j).addReplica(coll, shard));
|
||||
return Utils.makeMap("operation", MOVEREPLICA.toLower(),
|
||||
COLLECTION_PROP, coll,
|
||||
SHARD_ID_PROP, shard,
|
||||
|
|
|
@ -28,6 +28,7 @@ import java.util.LinkedHashMap;
|
|||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.function.Supplier;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import org.apache.solr.common.MapWriter;
|
||||
|
@ -68,25 +69,35 @@ public class Policy {
|
|||
|
||||
public class Session implements MapWriter {
|
||||
final List<String> nodes;
|
||||
final NodeValueProvider snitch;
|
||||
final ClusterDataProvider snitch;
|
||||
final List<Row> matrix;
|
||||
Set<String> collections = new HashSet<>();
|
||||
|
||||
Session(List<String> nodes, NodeValueProvider snitch) {
|
||||
Session(List<String> nodes, ClusterDataProvider snitch, List<Row> matrix) {
|
||||
this.nodes = nodes;
|
||||
this.snitch = snitch;
|
||||
this.matrix = matrix;
|
||||
}
|
||||
|
||||
Session(ClusterDataProvider snitch) {
|
||||
this.nodes = new ArrayList<>(snitch.getNodes());
|
||||
this.snitch = snitch;
|
||||
matrix = new ArrayList<>(nodes.size());
|
||||
for (String node : nodes) matrix.add(new Row(node, params, snitch));
|
||||
for (Row row : matrix) row.replicaInfo.forEach((s, e) -> collections.add(s));
|
||||
}
|
||||
|
||||
Session copy() {
|
||||
return new Session(nodes, snitch, getMatrixCopy());
|
||||
}
|
||||
|
||||
List<Row> getMatrixCopy() {
|
||||
return matrix.stream()
|
||||
.map(Row::copy)
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
|
||||
Policy getRuleSorter() {
|
||||
Policy getPolicy() {
|
||||
return Policy.this;
|
||||
|
||||
}
|
||||
|
@ -121,10 +132,10 @@ public class Policy {
|
|||
.collect(Collectors.toMap(r -> r.node, r -> r.violations));
|
||||
}
|
||||
|
||||
public Map suggest(CollectionAction action, String collection, String shard) {
|
||||
Suggester op = ops.get(action);
|
||||
public Suggester getSuggester(CollectionAction action, String collection, String shard) {
|
||||
Suggester op = ops.get(action).get();
|
||||
if (op == null) throw new UnsupportedOperationException(action.toString() + "is not supported");
|
||||
return op.suggest(collection, shard, this);
|
||||
return op.init(collection, shard, this);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -145,8 +156,8 @@ public class Policy {
|
|||
}
|
||||
|
||||
|
||||
public Session createSession(List<String> nodes, NodeValueProvider snitch) {
|
||||
return new Session(nodes, snitch);
|
||||
public Session createSession(ClusterDataProvider snitch) {
|
||||
return new Session(snitch);
|
||||
}
|
||||
|
||||
|
||||
|
@ -190,11 +201,11 @@ public class Policy {
|
|||
}
|
||||
|
||||
|
||||
static class ReplicaStat implements MapWriter {
|
||||
static class ReplicaInfo implements MapWriter {
|
||||
final String name;
|
||||
Map<String, Object> variables;
|
||||
|
||||
ReplicaStat(String name, Map<String, Object> vals) {
|
||||
ReplicaInfo(String name, Map<String, Object> vals) {
|
||||
this.name = name;
|
||||
this.variables = vals;
|
||||
}
|
||||
|
@ -206,44 +217,51 @@ public class Policy {
|
|||
}
|
||||
|
||||
|
||||
interface NodeValueProvider {
|
||||
Map<String, Object> getValues(String node, Collection<String> keys);
|
||||
interface ClusterDataProvider {
|
||||
Map<String, Object> getNodeValues(String node, Collection<String> keys);
|
||||
|
||||
/**
|
||||
* Get the details of each replica in a node. It attempts to fetch as much details about
|
||||
* the replica as mentioned in the keys list
|
||||
* the replica as mentioned in the keys list. It is not necessary to give al details
|
||||
* <p>
|
||||
* the format is {collection:shard :[{replicadetails}]}
|
||||
*/
|
||||
Map<String, Map<String, List<ReplicaStat>>> getReplicaCounts(String node, Collection<String> keys);
|
||||
Map<String, Map<String, List<ReplicaInfo>>> getReplicaInfo(String node, Collection<String> keys);
|
||||
|
||||
Collection<String> getNodes();
|
||||
}
|
||||
|
||||
interface Suggester {
|
||||
Map<String, Object> suggest(String coll, String shard, Session session);
|
||||
|
||||
}
|
||||
|
||||
static class BaseSuggester {
|
||||
final String coll;
|
||||
final String shard;
|
||||
final Policy.Session session;
|
||||
static abstract class Suggester {
|
||||
String coll;
|
||||
String shard;
|
||||
Policy.Session session;
|
||||
List<Row> matrix;
|
||||
|
||||
BaseSuggester(String coll, String shard, Policy.Session session) {
|
||||
Map operation;
|
||||
|
||||
Suggester init(String coll, String shard, Policy.Session session) {
|
||||
this.coll = coll;
|
||||
this.shard = shard;
|
||||
this.session = session;
|
||||
this.session = session.copy();
|
||||
matrix = session.getMatrixCopy();
|
||||
this.operation = init();
|
||||
return this;
|
||||
}
|
||||
|
||||
abstract Map init();
|
||||
|
||||
|
||||
public Map getOperation() {
|
||||
return operation;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private static final Map<CollectionAction, Suggester> ops = new HashMap<>();
|
||||
private static final Map<CollectionAction, Supplier<Suggester>> ops = new HashMap<>();
|
||||
|
||||
static {
|
||||
ops.put(CollectionAction.ADDREPLICA, (coll, shard, session) -> new AddReplicaSuggester(coll, shard, session).get());
|
||||
ops.put(CollectionAction.MOVEREPLICA, (coll, shard, session) -> new MoveReplicaSuggester(coll, shard, session).get());
|
||||
ops.put(CollectionAction.ADDREPLICA, () -> new AddReplicaSuggester());
|
||||
ops.put(CollectionAction.MOVEREPLICA, () -> new MoveReplicaSuggester());
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -23,12 +23,13 @@ import java.util.Collections;
|
|||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Random;
|
||||
|
||||
import org.apache.solr.common.IteratorWriter;
|
||||
import org.apache.solr.common.MapWriter;
|
||||
import org.apache.solr.common.util.Pair;
|
||||
import org.apache.solr.common.util.Utils;
|
||||
import org.apache.solr.recipe.Policy.ReplicaStat;
|
||||
import org.apache.solr.recipe.Policy.ReplicaInfo;
|
||||
|
||||
import static org.apache.solr.common.params.CoreAdminParams.NODE;
|
||||
|
||||
|
@ -36,16 +37,16 @@ import static org.apache.solr.common.params.CoreAdminParams.NODE;
|
|||
class Row implements MapWriter {
|
||||
public final String node;
|
||||
final Cell[] cells;
|
||||
Map<String, Map<String, List<ReplicaStat>>> replicaInfo;
|
||||
Map<String, Map<String, List<ReplicaInfo>>> replicaInfo;
|
||||
List<Clause> violations = new ArrayList<>();
|
||||
boolean anyValueMissing = false;
|
||||
|
||||
Row(String node, List<String> params, Policy.NodeValueProvider snitch) {
|
||||
replicaInfo = snitch.getReplicaCounts(node, params);
|
||||
Row(String node, List<String> params, Policy.ClusterDataProvider snitch) {
|
||||
replicaInfo = snitch.getReplicaInfo(node, params);
|
||||
if (replicaInfo == null) replicaInfo = Collections.emptyMap();
|
||||
this.node = node;
|
||||
cells = new Cell[params.size()];
|
||||
Map<String, Object> vals = snitch.getValues(node, params);
|
||||
Map<String, Object> vals = snitch.getNodeValues(node, params);
|
||||
for (int i = 0; i < params.size(); i++) {
|
||||
String s = params.get(i);
|
||||
cells[i] = new Cell(i, s, vals.get(s));
|
||||
|
@ -54,7 +55,7 @@ class Row implements MapWriter {
|
|||
}
|
||||
}
|
||||
|
||||
Row(String node, Cell[] cells, boolean anyValueMissing, Map<String, Map<String, List<ReplicaStat>>> replicaInfo, List<Clause> violations) {
|
||||
Row(String node, Cell[] cells, boolean anyValueMissing, Map<String, Map<String, List<ReplicaInfo>>> replicaInfo, List<Clause> violations) {
|
||||
this.node = node;
|
||||
this.cells = new Cell[cells.length];
|
||||
for (int i = 0; i < this.cells.length; i++) {
|
||||
|
@ -74,11 +75,11 @@ class Row implements MapWriter {
|
|||
});
|
||||
}
|
||||
|
||||
public Row copy() {
|
||||
return new Row(node, cells, anyValueMissing, Utils.getDeepCopy(replicaInfo, 2), new ArrayList<>(violations));
|
||||
Row copy() {
|
||||
return new Row(node, cells, anyValueMissing, Utils.getDeepCopy(replicaInfo, 3), new ArrayList<>(violations));
|
||||
}
|
||||
|
||||
public Object getVal(String name) {
|
||||
Object getVal(String name) {
|
||||
for (Cell cell : cells) if (cell.name.equals(name)) return cell.val;
|
||||
return null;
|
||||
}
|
||||
|
@ -90,20 +91,21 @@ class Row implements MapWriter {
|
|||
|
||||
Row addReplica(String coll, String shard) {
|
||||
Row row = copy();
|
||||
Map<String, List<ReplicaStat>> c = row.replicaInfo.get(coll);
|
||||
Map<String, List<ReplicaInfo>> c = row.replicaInfo.get(coll);
|
||||
if (c == null) row.replicaInfo.put(coll, c = new HashMap<>());
|
||||
List<ReplicaStat> s = c.get(shard);
|
||||
List<ReplicaInfo> s = c.get(shard);
|
||||
if (s == null) c.put(shard, s = new ArrayList<>());
|
||||
s.add(new ReplicaInfo(""+new Random().nextInt(10000)+10000 , new HashMap<>()));
|
||||
return row;
|
||||
|
||||
|
||||
}
|
||||
|
||||
Pair<Row, ReplicaStat> removeReplica(String coll, String shard) {
|
||||
Pair<Row, ReplicaInfo> removeReplica(String coll, String shard) {
|
||||
Row row = copy();
|
||||
Map<String, List<ReplicaStat>> c = row.replicaInfo.get(coll);
|
||||
Map<String, List<ReplicaInfo>> c = row.replicaInfo.get(coll);
|
||||
if(c == null) return null;
|
||||
List<ReplicaStat> s = c.get(shard);
|
||||
List<ReplicaInfo> s = c.get(shard);
|
||||
if (s == null || s.isEmpty()) return null;
|
||||
return new Pair(row,s.remove(0));
|
||||
|
||||
|
|
|
@ -0,0 +1,71 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.solr.recipe;
|
||||
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.solr.client.solrj.impl.CloudSolrClient.ClusterStateProvider;
|
||||
import org.apache.solr.common.cloud.ClusterState;
|
||||
import org.apache.solr.common.cloud.DocCollection;
|
||||
import org.apache.solr.recipe.Policy.ClusterDataProvider;
|
||||
import org.apache.solr.recipe.Policy.ReplicaInfo;
|
||||
|
||||
public class SolrClientClusterDataProvider implements ClusterDataProvider {
|
||||
|
||||
private final ClusterStateProvider clusterStateProvider;
|
||||
private final Map<String, Map<String, Map<String, List<ReplicaInfo>>>> data = new HashMap<>();
|
||||
|
||||
public SolrClientClusterDataProvider(ClusterStateProvider csp) {
|
||||
this.clusterStateProvider = csp;
|
||||
Map<String, ClusterState.CollectionRef> all = clusterStateProvider.getCollections();
|
||||
all.forEach((collName, ref) -> {
|
||||
DocCollection coll = ref.get();
|
||||
if (coll == null) return;
|
||||
coll.forEachReplica((shard, replica) -> {
|
||||
Map<String, Map<String, List<ReplicaInfo>>> nodeData = data.get(replica.getNodeName());
|
||||
if (nodeData == null) data.put(replica.getNodeName(), nodeData = new HashMap<>());
|
||||
Map<String, List<ReplicaInfo>> collData = nodeData.get(collName);
|
||||
if (collData == null) nodeData.put(collName, collData = new HashMap<>());
|
||||
List<ReplicaInfo> replicas = collData.get(shard);
|
||||
if (replicas == null) collData.put(shard, replicas = new ArrayList<>());
|
||||
replicas.add(new ReplicaInfo(replica.getName(), new HashMap<>()));
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, Object> getNodeValues(String node, Collection<String> keys) {
|
||||
//todo
|
||||
return new HashMap<>();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, Map<String, List<ReplicaInfo>>> getReplicaInfo(String node, Collection<String> keys) {
|
||||
return data.get(node);//todo fill other details
|
||||
}
|
||||
|
||||
@Override
|
||||
public Collection<String> getNodes() {
|
||||
return clusterStateProvider.liveNodes();
|
||||
}
|
||||
}
|
|
@ -34,6 +34,7 @@ import org.apache.http.NoHttpResponseException;
|
|||
import org.apache.solr.SolrTestCaseJ4;
|
||||
import org.apache.solr.client.solrj.request.UpdateRequest;
|
||||
import org.apache.solr.common.cloud.ClusterState;
|
||||
import org.apache.solr.common.cloud.ClusterState.CollectionRef;
|
||||
import org.apache.solr.common.cloud.DocCollection;
|
||||
import org.apache.solr.common.util.NamedList;
|
||||
import static org.mockito.Mockito.*;
|
||||
|
@ -118,10 +119,10 @@ public class CloudSolrClientCacheTest extends SolrTestCaseJ4 {
|
|||
}
|
||||
|
||||
private CloudSolrClient.ClusterStateProvider getStateProvider(Set<String> livenodes,
|
||||
Map<String, ClusterState.CollectionRef> colls) {
|
||||
Map<String, CollectionRef> colls) {
|
||||
return new CloudSolrClient.ClusterStateProvider() {
|
||||
@Override
|
||||
public ClusterState.CollectionRef getState(String collection) {
|
||||
public CollectionRef getState(String collection) {
|
||||
return colls.get(collection);
|
||||
}
|
||||
|
||||
|
@ -148,6 +149,11 @@ public class CloudSolrClientCacheTest extends SolrTestCaseJ4 {
|
|||
@Override
|
||||
public void connect() { }
|
||||
|
||||
@Override
|
||||
public Map<String, CollectionRef> getCollections() {
|
||||
return Collections.emptyMap();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
|
||||
|
|
|
@ -31,6 +31,8 @@ import org.apache.solr.SolrTestCaseJ4;
|
|||
import org.apache.solr.common.util.Utils;
|
||||
import org.apache.solr.common.util.ValidatingJsonMap;
|
||||
|
||||
import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDREPLICA;
|
||||
|
||||
public class TestPolicy extends SolrTestCaseJ4 {
|
||||
|
||||
public void testOperands() {
|
||||
|
@ -47,10 +49,18 @@ public class TestPolicy extends SolrTestCaseJ4 {
|
|||
c = new Clause((Map<String, Object>) Utils.fromJSONString("{nodeRole:'!overseer'}"));
|
||||
assertTrue(c.tag.isPass("OVERSEER"));
|
||||
assertFalse(c.tag.isPass("overseer"));
|
||||
|
||||
|
||||
}
|
||||
public void testRuleParsing() throws IOException {
|
||||
|
||||
public void testRow(){
|
||||
Row row = new Row("nodex", new Cell[]{new Cell(0,"node", "nodex")}, false, new HashMap<>(), new ArrayList<>());
|
||||
Row r1 = row.addReplica("c1", "s1");
|
||||
Row r2 = r1.addReplica("c1", "s1");
|
||||
assertEquals(1,r1.replicaInfo.get("c1").get("s1").size());
|
||||
assertEquals(2,r2.replicaInfo.get("c1").get("s1").size());
|
||||
assertTrue(r2.replicaInfo.get("c1").get("s1").get(0) instanceof Policy.ReplicaInfo);
|
||||
assertTrue(r2.replicaInfo.get("c1").get("s1").get(1) instanceof Policy.ReplicaInfo);
|
||||
}
|
||||
public void testRules() throws IOException {
|
||||
String rules = "{" +
|
||||
"conditions:[{nodeRole:'!overseer', strict:false},{replica:'<1',node:node3}," +
|
||||
"{replica:'<2',node:'#ANY', shard:'#EACH'}]," +
|
||||
|
@ -115,17 +125,22 @@ public class TestPolicy extends SolrTestCaseJ4 {
|
|||
|
||||
Policy policy = new Policy((Map<String, Object>) Utils.fromJSONString(rules));
|
||||
Policy.Session session;
|
||||
Policy.NodeValueProvider snitch = new Policy.NodeValueProvider() {
|
||||
Policy.ClusterDataProvider snitch = new Policy.ClusterDataProvider() {
|
||||
@Override
|
||||
public Map<String,Object> getValues(String node, Collection<String> keys) {
|
||||
public Map<String,Object> getNodeValues(String node, Collection<String> keys) {
|
||||
Map<String,Object> result = new LinkedHashMap<>();
|
||||
keys.stream().forEach(s -> result.put(s, nodeValues.get(node).get(s)));
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, Map<String, List<Policy.ReplicaStat>>> getReplicaCounts(String node, Collection<String> keys) {
|
||||
Map<String, Map<String, List<Policy.ReplicaStat>>> result = new LinkedHashMap<>();
|
||||
public Collection<String> getNodes() {
|
||||
return Arrays.asList("node1", "node2", "node3", "node4");
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, Map<String, List<Policy.ReplicaInfo>>> getReplicaInfo(String node, Collection<String> keys) {
|
||||
Map<String, Map<String, List<Policy.ReplicaInfo>>> result = new LinkedHashMap<>();
|
||||
|
||||
m.forEach((collName, o) -> {
|
||||
ValidatingJsonMap coll = (ValidatingJsonMap) o;
|
||||
|
@ -135,11 +150,11 @@ public class TestPolicy extends SolrTestCaseJ4 {
|
|||
ValidatingJsonMap r = (ValidatingJsonMap) o2;
|
||||
String node_name = (String) r.get("node_name");
|
||||
if (!node_name.equals(node)) return;
|
||||
Map<String, List<Policy.ReplicaStat>> shardVsReplicaStats = result.get(collName);
|
||||
Map<String, List<Policy.ReplicaInfo>> shardVsReplicaStats = result.get(collName);
|
||||
if (shardVsReplicaStats == null) result.put(collName, shardVsReplicaStats = new HashMap<>());
|
||||
List<Policy.ReplicaStat> replicaStats = shardVsReplicaStats.get(shard);
|
||||
if (replicaStats == null) shardVsReplicaStats.put(shard, replicaStats = new ArrayList<>());
|
||||
replicaStats.add(new Policy.ReplicaStat(replicaName, new HashMap<>()));
|
||||
List<Policy.ReplicaInfo> replicaInfos = shardVsReplicaStats.get(shard);
|
||||
if (replicaInfos == null) shardVsReplicaStats.put(shard, replicaInfos = new ArrayList<>());
|
||||
replicaInfos.add(new Policy.ReplicaInfo(replicaName, new HashMap<>()));
|
||||
});
|
||||
});
|
||||
});
|
||||
|
@ -150,7 +165,7 @@ public class TestPolicy extends SolrTestCaseJ4 {
|
|||
|
||||
};
|
||||
|
||||
session = policy.createSession(Arrays.asList("node1", "node2", "node3", "node4"), snitch);
|
||||
session = policy.createSession( snitch);
|
||||
|
||||
session.applyRules();
|
||||
List<Row> l = session.getSorted();
|
||||
|
@ -177,6 +192,11 @@ public class TestPolicy extends SolrTestCaseJ4 {
|
|||
assertEquals(v.get(0).replica.val, 1);
|
||||
assertEquals(v.get(0).tag.val, "node3");
|
||||
|
||||
Policy.Suggester suggester = session.getSuggester(ADDREPLICA, "gettingstarted", "r1");
|
||||
Map operation = suggester.getOperation();
|
||||
assertEquals("node2", operation.get("node"));
|
||||
System.out.println(Utils.toJSONString(operation));
|
||||
|
||||
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue