SOLR-11307: The SHARD hint is deprecated in favor of a COLL_SHARD hint

This commit is contained in:
Noble Paul 2017-09-04 22:55:55 +09:30
parent b2cf38c191
commit 054f445a27
4 changed files with 94 additions and 67 deletions

View File

@ -17,6 +17,7 @@
package org.apache.solr.client.solrj.cloud.autoscaling;
import java.util.Collections;
import java.util.List;
import java.util.Set;
@ -24,6 +25,7 @@ import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.cloud.autoscaling.Policy.Suggester;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.util.Pair;
class AddReplicaSuggester extends Suggester {
@ -34,12 +36,11 @@ class AddReplicaSuggester extends Suggester {
}
SolrRequest tryEachNode(boolean strict) {
Set<String> collections = (Set<String>) hints.get(Hint.COLL);
String shard = (String) hints.get(Hint.SHARD);
if (collections == null || shard == null) {
Set<Pair<String, String>> shards = (Set<Pair<String, String>>) hints.getOrDefault(Hint.COLL_SHARD, Collections.emptySet());
if (shards.isEmpty()) {
throw new RuntimeException("add-replica requires 'collection' and 'shard'");
}
for (String coll : collections) {
for (Pair<String,String> shard : shards) {
Replica.Type type = Replica.Type.get((String) hints.get(Hint.REPLICATYPE));
//iterate through elements and identify the least loaded
List<Clause.Violation> leastSeriousViolation = null;
@ -48,7 +49,7 @@ class AddReplicaSuggester extends Suggester {
Row row = getMatrix().get(i);
if (!row.isLive) continue;
if (!isAllowed(row.node, Hint.TARGET_NODE)) continue;
Row tmpRow = row.addReplica(coll, shard, type);
Row tmpRow = row.addReplica(shard.first(), shard.second(), type);
List<Clause.Violation> errs = testChangedMatrix(strict, getModifiedMatrix(getMatrix(), tmpRow, i));
if (!containsNewErrors(errs)) {
@ -60,9 +61,9 @@ class AddReplicaSuggester extends Suggester {
}
if (targetNodeIndex != null) {// there are no rule violations
getMatrix().set(targetNodeIndex, getMatrix().get(targetNodeIndex).addReplica(coll, shard, type));
getMatrix().set(targetNodeIndex, getMatrix().get(targetNodeIndex).addReplica(shard.first(), shard.second(), type));
return CollectionAdminRequest
.addReplicaToShard(coll, shard)
.addReplicaToShard(shard.first(), shard.second())
.setType(type)
.setNode(getMatrix().get(targetNodeIndex).node);
}

View File

@ -32,6 +32,7 @@ import java.util.Objects;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;
@ -39,6 +40,7 @@ import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.cloud.autoscaling.Clause.Violation;
import org.apache.solr.common.IteratorWriter;
import org.apache.solr.common.MapWriter;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.params.CollectionParams.CollectionAction;
import org.apache.solr.common.util.Pair;
import org.apache.solr.common.util.StrUtils;
@ -360,7 +362,8 @@ public class Policy implements MapWriter {
}
public Suggester hint(Hint hint, Object value) {
if (hint == Hint.TARGET_NODE || hint == Hint.SRC_NODE || hint == Hint.COLL) {
hint.validator.accept(value);
if (hint.multiValued) {
Collection<?> values = value instanceof Collection ? (Collection)value : Collections.singletonList(value);
((Set) hints.computeIfAbsent(hint, h -> new HashSet<>())).addAll(values);
} else {
@ -374,22 +377,20 @@ public class Policy implements MapWriter {
public SolrRequest getOperation() {
if (!isInitialized) {
Set<String> collections = (Set<String>) hints.get(Hint.COLL);
String shard = (String) hints.get(Hint.SHARD);
if (collections != null) {
for (String coll : collections) {
Set<String> collections = (Set<String>) hints.getOrDefault(Hint.COLL, Collections.emptySet());
Set<Pair<String, String>> s = (Set<Pair<String, String>>) hints.getOrDefault(Hint.COLL_SHARD, Collections.emptySet());
if (!collections.isEmpty() || !s.isEmpty()) {
HashSet<Pair<String, String>> shards = new HashSet<>(s);
collections.stream().forEach(c -> shards.add(new Pair<>(c, null)));
for (Pair<String, String> shard : shards) {
// if this is not a known collection from the existing clusterstate,
// then add it
if (session.matrix.stream().noneMatch(row -> row.collectionVsShardVsReplicas.containsKey(coll))) {
session.addClausesForCollection(session.dataProvider, coll);
if (session.matrix.stream().noneMatch(row -> row.collectionVsShardVsReplicas.containsKey(shard.first()))) {
session.addClausesForCollection(session.dataProvider, shard.first());
}
for (Row row : session.matrix) {
if (!row.collectionVsShardVsReplicas.containsKey(coll))
row.collectionVsShardVsReplicas.put(coll, new HashMap<>());
if (shard != null) {
Map<String, List<ReplicaInfo>> shardInfo = row.collectionVsShardVsReplicas.get(coll);
if (!shardInfo.containsKey(shard)) shardInfo.put(shard, new ArrayList<>());
}
Map<String, List<ReplicaInfo>> shardInfo = row.collectionVsShardVsReplicas.computeIfAbsent(shard.first(), it -> new HashMap<>());
if (shard.second() != null) shardInfo.computeIfAbsent(shard.second(), it -> new ArrayList<>());
}
}
Collections.sort(session.expandedClauses);
@ -462,7 +463,7 @@ public class Policy implements MapWriter {
for (Map.Entry<String, Map<String, List<ReplicaInfo>>> e : r.collectionVsShardVsReplicas.entrySet()) {
if (!isAllowed(e.getKey(), Hint.COLL)) continue;
for (Map.Entry<String, List<ReplicaInfo>> shard : e.getValue().entrySet()) {
if (!isAllowed(e.getKey(), Hint.SHARD)) continue;//todo fix
if (!isAllowed(new Pair<>(e.getKey(), shard.getKey()), Hint.COLL_SHARD)) continue;//todo fix
if(shard.getValue() == null || shard.getValue().isEmpty()) continue;
replicaList.add(new Pair<>(shard.getValue().get(0), r));
}
@ -490,7 +491,7 @@ public class Policy implements MapWriter {
protected boolean isAllowed(Object v, Hint hint) {
Object hintVal = hints.get(hint);
if (hint == Hint.TARGET_NODE || hint == Hint.SRC_NODE || hint == Hint.COLL) {
if (hint.multiValued) {
Set set = (Set) hintVal;
return set == null || set.contains(v);
} else {
@ -499,7 +500,47 @@ public class Policy implements MapWriter {
}
public enum Hint {
COLL, SHARD, SRC_NODE, TARGET_NODE, REPLICATYPE
COLL(true),
// collection shard pair
// this should be a Pair<String, String> , (collection,shard)
COLL_SHARD(true, v -> {
Collection c = v instanceof Collection ? (Collection) v : Collections.singleton(v);
for (Object o : c) {
if (!(o instanceof Pair)) {
throw new RuntimeException("SHARD hint must use a Pair");
}
Pair p = (Pair) o;
if (p.first() == null || p.second() == null) {
throw new RuntimeException("Both collection and shard must not be null");
}
}
}),
SRC_NODE(true),
TARGET_NODE(true),
REPLICATYPE(false, o -> {
if (!(o instanceof Replica.Type)) {
throw new RuntimeException("REPLICATYPE hint must use a ReplicaType");
}
});
public final boolean multiValued;
public final Consumer<Object> validator;
Hint(boolean multiValued) {
this(multiValued, v -> {
Collection c = v instanceof Collection ? (Collection) v : Collections.singleton(v);
for (Object o : c) {
if (!(o instanceof String)) throw new RuntimeException("hint must be of type String");
}
});
}
Hint(boolean multiValued, Consumer<Object> c) {
this.multiValued = multiValued;
this.validator = c;
}
}

View File

@ -32,6 +32,7 @@ import org.apache.solr.client.solrj.cloud.autoscaling.Policy.Suggester.Hint;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.ReplicaPosition;
import org.apache.solr.common.util.Pair;
import org.apache.solr.common.util.Utils;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDREPLICA;
@ -89,9 +90,8 @@ public class PolicyHelper {
for (Map.Entry<Replica.Type, Integer> e : typeVsCount.entrySet()) {
for (int i = 0; i < e.getValue(); i++) {
Policy.Suggester suggester = session.getSuggester(ADDREPLICA)
.hint(Hint.COLL, collName)
.hint(Hint.REPLICATYPE, e.getKey())
.hint(Hint.SHARD, shardName);
.hint(Hint.COLL_SHARD, new Pair<>(collName, shardName));
if (nodesList != null) {
for (String nodeName : nodesList) {
suggester = suggester.hint(Hint.TARGET_NODE, nodeName);

View File

@ -27,7 +27,6 @@ import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import com.google.common.collect.ImmutableList;
import org.apache.solr.SolrTestCaseJ4;
@ -39,6 +38,7 @@ import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.ReplicaPosition;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.util.Pair;
import org.apache.solr.common.util.Utils;
import org.apache.solr.common.util.ValidatingJsonMap;
import org.junit.Test;
@ -469,9 +469,8 @@ public class TestPolicy extends SolrTestCaseJ4 {
Policy policy = new Policy(policies);
Policy.Suggester suggester = policy.createSession(getClusterDataProvider(nodeValues, clusterState))
.getSuggester(ADDREPLICA)
.hint(Hint.COLL, "newColl")
.hint(Hint.REPLICATYPE, Replica.Type.PULL)
.hint(Hint.SHARD, "shard1");
.hint(Hint.COLL_SHARD, new Pair("newColl", "shard1"))
.hint(Hint.REPLICATYPE, Replica.Type.PULL);
SolrRequest op = suggester.getOperation();
assertNotNull(op);
assertEquals(Replica.Type.PULL.name(), op.getParams().get("type"));
@ -479,9 +478,8 @@ public class TestPolicy extends SolrTestCaseJ4 {
suggester = suggester.getSession()
.getSuggester(ADDREPLICA)
.hint(Hint.COLL, "newColl")
.hint(Hint.REPLICATYPE, Replica.Type.PULL)
.hint(Hint.SHARD, "shard2");
.hint(Hint.COLL_SHARD, new Pair<>("newColl", "shard2"))
.hint(Hint.REPLICATYPE, Replica.Type.PULL);
op = suggester.getOperation();
assertNotNull(op);
assertEquals(Replica.Type.PULL.name(), op.getParams().get("type"));
@ -489,9 +487,8 @@ public class TestPolicy extends SolrTestCaseJ4 {
suggester = suggester.getSession()
.getSuggester(ADDREPLICA)
.hint(Hint.COLL, "newColl")
.hint(Hint.REPLICATYPE, Replica.Type.TLOG)
.hint(Hint.SHARD, "shard1");
.hint(Hint.COLL_SHARD, new Pair("newColl", "shard1"))
.hint(Hint.REPLICATYPE, Replica.Type.TLOG);
op = suggester.getOperation();
assertNotNull(op);
assertEquals(Replica.Type.TLOG.name(), op.getParams().get("type"));
@ -499,9 +496,8 @@ public class TestPolicy extends SolrTestCaseJ4 {
suggester = suggester.getSession()
.getSuggester(ADDREPLICA)
.hint(Hint.COLL, "newColl")
.hint(Hint.REPLICATYPE, Replica.Type.TLOG)
.hint(Hint.SHARD, "shard2");
.hint(Hint.COLL_SHARD, new Pair("newColl", "shard2"))
.hint(Hint.REPLICATYPE, Replica.Type.TLOG);
op = suggester.getOperation();
assertNotNull(op);
assertEquals(Replica.Type.TLOG.name(), op.getParams().get("type"));
@ -509,9 +505,8 @@ public class TestPolicy extends SolrTestCaseJ4 {
suggester = suggester.getSession()
.getSuggester(ADDREPLICA)
.hint(Hint.COLL, "newColl")
.hint(Hint.REPLICATYPE, Replica.Type.TLOG)
.hint(Hint.SHARD, "shard2");
.hint(Hint.COLL_SHARD, new Pair<>("newColl", "shard2"))
.hint(Hint.REPLICATYPE, Replica.Type.TLOG);
op = suggester.getOperation();
assertNull("No node should qualify for this" ,op);
@ -670,10 +665,9 @@ public class TestPolicy extends SolrTestCaseJ4 {
Policy policy = new Policy(policies);
Policy.Suggester suggester = policy.createSession(getClusterDataProvider(nodeValues, clusterState))
.getSuggester(ADDREPLICA)
.hint(Hint.COLL, "newColl")
.hint(Hint.COLL, "newColl2")
.hint(Hint.REPLICATYPE, Replica.Type.PULL)
.hint(Hint.SHARD, "shard1");
.hint(Hint.COLL_SHARD, new Pair<>("newColl", "shard1"))
.hint(Hint.COLL_SHARD, new Pair<>("newColl2", "shard1"));
SolrRequest op;
int countOp = 0;
int countNewCollOp = 0;
@ -681,10 +675,9 @@ public class TestPolicy extends SolrTestCaseJ4 {
while ((op = suggester.getOperation()) != null) {
countOp++;
suggester = suggester.getSession().getSuggester(ADDREPLICA)
.hint(Hint.COLL, "newColl")
.hint(Hint.COLL, "newColl2")
.hint(Hint.REPLICATYPE, Replica.Type.PULL)
.hint(Hint.SHARD, "shard1");
.hint(Hint.COLL_SHARD, new Pair<>("newColl", "shard1"))
.hint(Hint.COLL_SHARD, new Pair<>("newColl2", "shard1"));
assertEquals(Replica.Type.PULL.name(), op.getParams().get("type"));
String collection = op.getParams().get("collection");
assertTrue("Collection for replica is not as expected " + collection, collection.equals("newColl") || collection.equals("newColl2"));
@ -701,18 +694,16 @@ public class TestPolicy extends SolrTestCaseJ4 {
countNewColl2Op = 0;
suggester = suggester.getSession()
.getSuggester(ADDREPLICA)
.hint(Hint.COLL, "newColl")
.hint(Hint.COLL, "newColl2")
.hint(Hint.REPLICATYPE, Replica.Type.TLOG)
.hint(Hint.SHARD, "shard2");
.hint(Hint.COLL_SHARD, new Pair<>("newColl", "shard2"))
.hint(Hint.COLL_SHARD, new Pair<>("newColl2", "shard2"))
.hint(Hint.REPLICATYPE, Replica.Type.TLOG);
while ((op = suggester.getOperation()) != null) {
countOp++;
suggester = suggester.getSession()
.getSuggester(ADDREPLICA)
.hint(Hint.COLL, "newColl")
.hint(Hint.COLL, "newColl2")
.hint(Hint.REPLICATYPE, Replica.Type.TLOG)
.hint(Hint.SHARD, "shard2");
.hint(Hint.COLL_SHARD, new Pair<>("newColl", "shard2"))
.hint(Hint.COLL_SHARD, new Pair<>("newColl2", "shard2"))
.hint(Hint.REPLICATYPE, Replica.Type.TLOG);
assertEquals(Replica.Type.TLOG.name(), op.getParams().get("type"));
String collection = op.getParams().get("collection");
assertTrue("Collection for replica is not as expected " + collection, collection.equals("newColl") || collection.equals("newColl2"));
@ -816,8 +807,7 @@ public class TestPolicy extends SolrTestCaseJ4 {
assertTrue(violations.stream().anyMatch(violation -> (violation.getClause().replica.getOperand() == Operand.LESS_THAN && "node".equals(violation.getClause().tag.getName()))));
Policy.Suggester suggester = session.getSuggester(ADDREPLICA)
.hint(Hint.COLL, "gettingstarted")
.hint(Hint.SHARD, "r1");
.hint(Hint.COLL_SHARD, new Pair<>("gettingstarted","r1"));
SolrParams operation = suggester.getOperation().getParams();
assertEquals("node2", operation.get("node"));
@ -934,8 +924,7 @@ public class TestPolicy extends SolrTestCaseJ4 {
for (int i = 0; i < 3; i++) {
Policy.Suggester suggester = session.getSuggester(ADDREPLICA);
SolrRequest op = suggester
.hint(Hint.COLL, "newColl")
.hint(Hint.SHARD, "shard1")
.hint(Hint.COLL_SHARD, new Pair<>("newColl","shard1"))
.getOperation();
assertNotNull(op);
assertEquals("node3", op.getParams().get("node"));
@ -987,16 +976,14 @@ public class TestPolicy extends SolrTestCaseJ4 {
Policy.Session session = policy.createSession(cdp);
Policy.Suggester suggester = session.getSuggester(ADDREPLICA);
SolrRequest op = suggester
.hint(Hint.COLL, "newColl")
.hint(Hint.SHARD, "shard1")
.hint(Hint.COLL_SHARD, new Pair<>("newColl", "shard1"))
.getOperation();
assertNotNull(op);
assertEquals("node3", op.getParams().get("node"));
suggester = suggester
.getSession()
.getSuggester(ADDREPLICA)
.hint(Hint.COLL, "newColl")
.hint(Hint.SHARD, "shard1");
.hint(Hint.COLL_SHARD, new Pair<>("newColl", "shard1"));
op = suggester.getOperation();
assertNotNull(op);
assertEquals("node3", op.getParams().get("node"));
@ -1004,8 +991,7 @@ public class TestPolicy extends SolrTestCaseJ4 {
suggester = suggester
.getSession()
.getSuggester(ADDREPLICA)
.hint(Hint.COLL, "newColl")
.hint(Hint.SHARD, "shard1");
.hint(Hint.COLL_SHARD, new Pair<>("newColl", "shard1"));
op = suggester.getOperation();
assertNotNull(op);
assertEquals("node2", op.getParams().get("node"));
@ -1124,8 +1110,7 @@ public class TestPolicy extends SolrTestCaseJ4 {
CollectionAdminRequest.AddReplica op = (CollectionAdminRequest.AddReplica) session
.getSuggester(ADDREPLICA)
.hint(Hint.COLL, "newColl")
.hint(Hint.SHARD, "s1").getOperation();
.hint(Hint.COLL_SHARD, new Pair<>("newColl", "s1")).getOperation();
assertNotNull(op);
assertEquals("node2", op.getNode());
}