SOLR-10278: test added for client data provider

This commit is contained in:
Noble Paul 2017-04-17 22:16:23 +09:30
parent f31546f6e6
commit d3daafca22
9 changed files with 240 additions and 125 deletions

View File

@ -66,39 +66,4 @@ public class TestPolicyCloud extends SolrCloudTestCase {
assertTrue("freedisk value is "+((Number) val.get("freedisk")).intValue() , ((Number) val.get("freedisk")).intValue() > 0);
System.out.println(Utils.toJSONString(val));
}
/*public void testMultiReplicaPlacement() {
String autoScaleJson ="";
Map<String,Map> nodeValues = (Map<String, Map>) Utils.fromJSONString( "{" +
"node1:{cores:12, freedisk: 334, heap:10480}," +
"node2:{cores:4, freedisk: 749, heap:6873}," +
"node3:{cores:7, freedisk: 262, heap:7834}," +
"node4:{cores:8, freedisk: 375, heap:16900, nodeRole:overseer}" +
"}");
ClusterDataProvider dataProvider = new ClusterDataProvider() {
@Override
public Map<String, Object> getNodeValues(String node, Collection<String> keys) {
return null;
}
@Override
public Map<String, Map<String, List<Policy.ReplicaInfo>>> getReplicaInfo(String node, Collection<String> keys) {
return null;
}
@Override
public Collection<String> getNodes() {
return null;
}
};
Map<String, List<String>> locations = Policy.getReplicaLocations("newColl", (Map<String, Object>) Utils.fromJSONString(autoScaleJson),
"policy1", dataProvider, Arrays.asList("shard1", "shard2"), 3);
}*/
}

View File

@ -86,6 +86,13 @@ public class Utils {
public static byte[] toJSON(Object o) {
if(o == null) return new byte[0];
CharArr out = new CharArr();
if (!(o instanceof List) && !(o instanceof Map)) {
if (o instanceof MapWriter) {
o = ((MapWriter)o).toMap(new LinkedHashMap<>());
} else if(o instanceof IteratorWriter){
o = ((IteratorWriter)o).toList(new ArrayList<>());
}
}
new JSONWriter(out, 2).write(o); // indentation by default
return toUTF8(out);
}

View File

@ -37,15 +37,15 @@ class AddReplicaSuggester extends Suggester {
Map tryEachNode(boolean strict) {
//iterate through elements and identify the least loaded
for (int i = matrix.size() - 1; i >= 0; i--) {
Row row = matrix.get(i);
for (int i = getMatrix().size() - 1; i >= 0; i--) {
Row row = getMatrix().get(i);
row = row.addReplica(coll, shard);
row.violations.clear();
for (Clause clause : session.getPolicy().clauses) {
if (strict || clause.strict) clause.test(row);
}
if (row.violations.isEmpty()) {// there are no rule violations
matrix.set(i, matrix.get(i).addReplica(coll, shard));
getMatrix().set(i, getMatrix().get(i).addReplica(coll, shard));
return Utils.makeMap("operation", ADDREPLICA.toLower(),
COLLECTION_PROP, coll,
SHARD_ID_PROP, shard,

View File

@ -19,6 +19,7 @@ package org.apache.solr.recipe;
import java.io.IOException;
import java.util.Arrays;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@ -26,6 +27,7 @@ import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import com.sun.istack.internal.NotNull;
import org.apache.solr.common.MapWriter;
import org.apache.solr.common.util.Utils;
import org.apache.solr.recipe.Policy.ReplicaInfo;
@ -46,7 +48,7 @@ import static org.apache.solr.recipe.Policy.ANY;
import static org.apache.solr.recipe.Policy.EACH;
// a set of conditions in a policy
public class Clause implements MapWriter {
public class Clause implements MapWriter, Comparable<Clause> {
Map<String, Object> original;
Condition collection, shard, replica, tag;
boolean strict = true;
@ -70,6 +72,13 @@ public class Clause implements MapWriter {
tag = parse(s, singletonMap(s, o));
}
@Override
public int compareTo(Clause that) {
int v = Integer.compare(this.tag.op.priority, that.tag.op.priority);
if (v != 0) return v;
return Integer.compare(this.replica.op.priority, that.replica.op.priority);
}
static class Condition {
final String name;
final Object val;
@ -135,7 +144,8 @@ public class Clause implements MapWriter {
AtomicReference<TestStatus> result = new AtomicReference<>(NOT_APPLICABLE);
for (Map.Entry<String, Map<String, List<ReplicaInfo>>> colls : row.replicaInfo.entrySet()) {
if (!collection.isPass(colls.getKey()) || result.get() == FAIL) break;
if (result.get() == FAIL) break;
if (!collection.isPass(colls.getKey())) continue;
int count = 0;
for (Map.Entry<String, List<ReplicaInfo>> shards : colls.getValue().entrySet()) {
if (!shard.isPass(shards.getKey()) || result.get() == FAIL) break;

View File

@ -39,8 +39,8 @@ public class MoveReplicaSuggester extends Suggester {
Map tryEachNode(boolean strict) {
//iterate through elements and identify the least loaded
for (int i = 0; i < matrix.size(); i++) {
Row fromRow = matrix.get(i);
for (int i = 0; i < getMatrix().size(); i++) {
Row fromRow = getMatrix().get(i);
Pair<Row, Policy.ReplicaInfo> pair = fromRow.removeReplica(coll, shard);
fromRow = pair.first();
if(fromRow == null){
@ -52,16 +52,16 @@ public class MoveReplicaSuggester extends Suggester {
if (strict || clause.strict) clause.test(fromRow);
}
if (fromRow.violations.isEmpty()) {
for (int j = matrix.size() - 1; j > i; i--) {
Row targetRow = matrix.get(i);
for (int j = getMatrix().size() - 1; j > i; i--) {
Row targetRow = getMatrix().get(i);
targetRow = targetRow.addReplica(coll, shard);
targetRow.violations.clear();
for (Clause clause : session.getPolicy().clauses) {
if (strict || clause.strict) clause.test(targetRow);
}
if (targetRow.violations.isEmpty()) {
matrix.set(i, matrix.get(i).removeReplica(coll, shard).first());
matrix.set(j, matrix.get(j).addReplica(coll, shard));
getMatrix().set(i, getMatrix().get(i).removeReplica(coll, shard).first());
getMatrix().set(j, getMatrix().get(j).addReplica(coll, shard));
return Utils.makeMap("operation", MOVEREPLICA.toLower(),
COLLECTION_PROP, coll,
SHARD_ID_PROP, shard,

View File

@ -20,12 +20,13 @@ package org.apache.solr.recipe;
import java.util.Objects;
import org.apache.solr.recipe.Clause.TestStatus;
import static org.apache.solr.recipe.Clause.TestStatus.*;
import static org.apache.solr.recipe.Policy.ANY;
public enum Operand {
WILDCARD(ANY){
WILDCARD(ANY, Integer.MAX_VALUE) {
@Override
public TestStatus match(Object ruleVal, Object testVal) {
return testVal == null ? NOT_APPLICABLE : PASS;
@ -37,14 +38,14 @@ public enum Operand {
return ANY.equals(val) || Policy.EACH.equals(val) ? val : null;
}
},
EQUAL(""),
NOT_EQUAL("!") {
EQUAL("", 0),
NOT_EQUAL("!", 2) {
@Override
public TestStatus match(Object ruleVal, Object testVal) {
return super.match(ruleVal, testVal) == PASS ? FAIL : PASS;
}
},
GREATER_THAN(">") {
GREATER_THAN(">", 1) {
@Override
public Object parse(String val) {
return checkNumeric(super.parse(val));
@ -58,8 +59,7 @@ public enum Operand {
}
},
LESS_THAN("<") {
LESS_THAN("<", 2) {
@Override
public TestStatus match(Object ruleVal, Object testVal) {
if (testVal == null) return NOT_APPLICABLE;
@ -72,9 +72,11 @@ public enum Operand {
}
};
public final String operand;
final int priority;
Operand(String val) {
Operand(String val, int priority) {
this.operand = val;
this.priority = priority;
}
public String toStr(Object expectedVal) {
@ -100,10 +102,6 @@ public enum Operand {
}
public int compare(Object n1Val, Object n2Val) {
return 0;
}
public int compareNum(Object n1Val, Object n2Val) {
Integer n1 = (Integer) parseObj(n1Val, Integer.class);
Integer n2 = (Integer) parseObj(n2Val, Integer.class);

View File

@ -27,12 +27,11 @@ import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.solr.common.IteratorWriter;
import org.apache.solr.common.MapWriter;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.params.CollectionParams.CollectionAction;
@ -45,7 +44,7 @@ import static java.util.stream.Collectors.toList;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDREPLICA;
import static org.apache.solr.common.util.Utils.getDeepCopy;
public class Policy {
public class Policy implements MapWriter {
public static final String EACH = "#EACH";
public static final String ANY = "#ANY";
List<Clause> clauses = new ArrayList<>();
@ -55,7 +54,10 @@ public class Policy {
public Policy(Map<String, Object> jsonMap) {
List<Map<String, Object>> l = getListOfMap("conditions", jsonMap);
clauses = l.stream().map(Clause::new).collect(toList());
clauses = l.stream()
.map(Clause::new)
.sorted()
.collect(toList());
l = getListOfMap("preferences", jsonMap);
preferences = l.stream().map(Preference::new).collect(toList());
for (int i = 0; i < preferences.size() - 1; i++) {
@ -73,29 +75,43 @@ public class Policy {
}
}
@Override
public void writeMap(EntryWriter ew) throws IOException {
if (!clauses.isEmpty()) {
ew.put("conditions", (IteratorWriter) iw -> {
for (Clause clause : clauses) iw.add(clause);
});
}
if (!preferences.isEmpty()) {
ew.put("preferences", (IteratorWriter) iw -> {
for (Preference p : preferences) iw.add(p);
});
}
}
public class Session implements MapWriter {
final List<String> nodes;
final ClusterDataProvider snitch;
final ClusterDataProvider dataProvider;
final List<Row> matrix;
Set<String> collections = new HashSet<>();
Session(List<String> nodes, ClusterDataProvider snitch, List<Row> matrix) {
Session(List<String> nodes, ClusterDataProvider dataProvider, List<Row> matrix) {
this.nodes = nodes;
this.snitch = snitch;
this.dataProvider = dataProvider;
this.matrix = matrix;
}
Session(ClusterDataProvider snitch) {
this.nodes = new ArrayList<>(snitch.getNodes());
this.snitch = snitch;
Session(ClusterDataProvider dataProvider) {
this.nodes = new ArrayList<>(dataProvider.getNodes());
this.dataProvider = dataProvider;
matrix = new ArrayList<>(nodes.size());
for (String node : nodes) matrix.add(new Row(node, params, snitch));
for (String node : nodes) matrix.add(new Row(node, params, dataProvider));
for (Row row : matrix) row.replicaInfo.forEach((s, e) -> collections.add(s));
}
Session copy() {
return new Session(nodes, snitch, getMatrixCopy());
return new Session(nodes, dataProvider, getMatrixCopy());
}
List<Row> getMatrixCopy() {
@ -109,7 +125,8 @@ public class Policy {
}
/**Apply the preferences and conditions
/**
* Apply the preferences and conditions
*/
public void applyRules() {
if (!preferences.isEmpty()) {
@ -157,6 +174,7 @@ public class Policy {
public String toString() {
return Utils.toJSONString(toMap(new LinkedHashMap<>()));
}
public List<Row> getSorted() {
return Collections.unmodifiableList(matrix);
}
@ -228,7 +246,6 @@ public class Policy {
String coll;
String shard;
Policy.Session session;
List<Row> matrix;
Map operation;
@ -236,7 +253,6 @@ public class Policy {
this.coll = coll;
this.shard = shard;
this.session = session.copy();
matrix = session.getMatrixCopy();
this.operation = init();
return this;
}
@ -251,6 +267,13 @@ public class Policy {
public Session getSession() {
return session;
}
List<Row> getMatrix() {
return session.matrix;
}
}
public static Map<String, List<String>> getReplicaLocations(String collName, Map<String, Object> autoScalingJson,
@ -265,14 +288,16 @@ public class Policy {
}
Map defaultPolicy = (Map) Utils.getObjectByPath(autoScalingJson, false, asList("policies", "default"));
Policy policy = new Policy(Policy.mergePolicies(collName, policyJson, defaultPolicy));
Map<String, Object> merged = Policy.mergePolicies(collName, policyJson, defaultPolicy);
System.out.println(Utils.toJSONString(merged));
Policy policy = new Policy(merged);
Policy.Session session = policy.createSession(cdp);
for (String shardName : shardNames) {
for (int i = 0; i < repFactor; i++) {
Policy.Suggester suggester = session.getSuggester(ADDREPLICA, collName, shardName);
Map op = suggester.getOperation();
if (op == null) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "No node can satisfy the rules");
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "No node can satisfy the rules "+ Utils.toJSONString(policy));
}
session = suggester.getSession();
positionMapping.get(shardName).add((String) op.get(CoreAdminParams.NODE));
@ -330,5 +355,4 @@ public class Policy {
}
}

View File

@ -17,17 +17,24 @@
package org.apache.solr.recipe;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.function.BiConsumer;
class Preference {
import org.apache.solr.common.MapWriter;
import org.apache.solr.common.util.Utils;
class Preference implements MapWriter {
final Policy.SortParam name;
Integer precision;
final Policy.Sort sort;
Preference next;
public int idx;
private final Map original;
Preference(Map<String, Object> m) {
this.original = Utils.getDeepCopy(m,3);
sort = Policy.Sort.get(m);
name = Policy.SortParam.get(m.get(sort.name()).toString());
Object p = m.getOrDefault("precision", 0);
@ -59,4 +66,12 @@ class Preference {
prevVal;
}
}
@Override
public void writeMap(EntryWriter ew) throws IOException {
for (Object o : original.entrySet()) {
Map.Entry e = (Map.Entry) o;
ew.put(String.valueOf(e.getKey()), e.getValue());
}
}
}

View File

@ -20,7 +20,6 @@ package org.apache.solr.recipe;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedHashMap;
@ -63,6 +62,7 @@ public class TestPolicy extends SolrTestCaseJ4 {
}
public void testMerge() {
Map map = (Map) Utils.fromJSONString("{" +
" 'policies': {" +
" 'default': {" +
@ -95,9 +95,29 @@ public class TestPolicy extends SolrTestCaseJ4 {
assertEquals(String.valueOf(getObjectByPath(map, true, "preferences[1]/precision")), "50");
}
public void testConditionsSort(){
String rules = "{" +
"conditions:[" +
"{nodeRole:'!overseer', strict:false}," +
"{replica:'<1',node:node3, shard: '#EACH'}," +
"{replica:'<2',node:'#ANY', shard:'#EACH'}," +
"{replica:1, rack:rack1}]," +
" preferences:[" +
"{minimize:cores , precision:2}," +
"{maximize:freedisk, precision:50}, " +
"{minimize:heap, precision:1000}]}";
Policy policy = new Policy((Map<String, Object>) Utils.fromJSONString(rules));
assertEquals("rack", policy.clauses.get(0).tag.name);
}
public void testRules() throws IOException {
String rules = "{" +
"conditions:[{nodeRole:'!overseer', strict:false},{replica:'<1',node:node3}," +
"conditions:[" +
"{nodeRole:'!overseer', strict:false}," +
"{replica:'<1',node:node3}," +
"{replica:'<2',node:'#ANY', shard:'#EACH'}]," +
" preferences:[" +
"{minimize:cores , precision:2}," +
@ -112,34 +132,9 @@ public class TestPolicy extends SolrTestCaseJ4 {
"node4:{cores:8, freedisk: 375, heap:16900, nodeRole:overseer}" +
"}");
ValidatingJsonMap m = ValidatingJsonMap
.getDeepCopy((Map) Utils.fromJSONString(clusterState), 6, true);
Policy policy = new Policy((Map<String, Object>) Utils.fromJSONString(rules));
Policy.Session session;
ClusterDataProvider snitch = new ClusterDataProvider() {
@Override
public Map<String, Object> getNodeValues(String node, Collection<String> keys) {
Map<String, Object> result = new LinkedHashMap<>();
keys.stream().forEach(s -> result.put(s, nodeValues.get(node).get(s)));
return result;
}
@Override
public Collection<String> getNodes() {
return Arrays.asList("node1", "node2", "node3", "node4");
}
@Override
public Map<String, Map<String, List<Policy.ReplicaInfo>>> getReplicaInfo(String node, Collection<String> keys) {
return getReplicaDetails(node, m);
}
};
session = policy.createSession(snitch);
session = policy.createSession(getClusterDataProvider(nodeValues, clusterState));
session.applyRules();
List<Row> l = session.getSorted();
@ -172,8 +167,107 @@ public class TestPolicy extends SolrTestCaseJ4 {
System.out.println(Utils.toJSONString(operation));
}
/* public void testOtherTag(){
String rules = "{" +
"conditions:[" +
"{nodeRole:'!overseer', strict:false}," +
"{replica:'<1',node:node3}," +
"{replica:'<2',node:'#ANY', shard:'#EACH'}," +
"{replica:<3,shard:'#EACH', rack:'#ANY' }" +
"]," +
" preferences:[" +
"{minimize:cores , precision:2}," +
"{maximize:freedisk, precision:50}, " +
"{minimize:heap, precision:1000}]}";
Map<String, Map> nodeValues = (Map<String, Map>) Utils.fromJSONString("{" +
"node1:{cores:12, freedisk: 334, heap:10480, rack: rack4}," +
"node2:{cores:4, freedisk: 749, heap:6873, rack: rack3}," +
"node3:{cores:7, freedisk: 262, heap:7834, rack: rack2}," +
"node4:{cores:8, freedisk: 375, heap:16900, nodeRole:overseer, rack: rack1}" +
"}");
Policy policy = new Policy((Map<String, Object>) Utils.fromJSONString(rules));
Policy.Session session = policy.createSession(getClusterDataProvider(nodeValues, clusterState));
}*/
private ClusterDataProvider getClusterDataProvider(final Map<String, Map> nodeValues, String clusterState) {
return new ClusterDataProvider() {
@Override
public Map<String, Object> getNodeValues(String node, Collection<String> keys) {
Map<String, Object> result = new LinkedHashMap<>();
keys.stream().forEach(s -> result.put(s, nodeValues.get(node).get(s)));
return result;
}
@Override
public Collection<String> getNodes() {
return nodeValues.keySet();
}
@Override
public Map<String, Map<String, List<Policy.ReplicaInfo>>> getReplicaInfo(String node, Collection<String> keys) {
return getReplicaDetails(node, clusterState);
}
};
}
/*public void testMultiReplicaPlacement() {
String autoScaleJson ="{" +
" 'policies': {" +
" 'default': {" +
" 'conditions': [" +
" { 'nodeRole': '!overseer'}," +
" { 'replica': '<2', 'shard': '#EACH', node:'#ANY'}" +
" ]," +
" 'preferences':[" +
" {'minimize': 'freedisk', 'precision':50}]" +
" }," +
" 'policy1': {" +
" 'conditions': [" +
" { replica: '<2', shard: '#ANY', node:'#ANY'}," +
" { replica: '<2', shard:'#EACH', rack: rack1}," +
" { replica: '1', sysprop.fs: ssd, shard: '#EACH'}" +
" ], preferences : [ {maximize: freedisk, precision:50}]" +
"}}}";
Map<String,Map> nodeValues = (Map<String, Map>) Utils.fromJSONString( "{" +
"node1:{cores:12, freedisk: 334, heap:10480, rack:rack3}," +
"node2:{cores:4, freedisk: 749, heap:6873, sysprop.fs : ssd, rack:rack1}," +
"node3:{cores:7, freedisk: 262, heap:7834, rack:rack4}," +
"node4:{cores:8, freedisk: 375, heap:16900, nodeRole:overseer, rack:rack2}" +
"}");
ClusterDataProvider dataProvider = new ClusterDataProvider() {
@Override
public Map<String, Object> getNodeValues(String node, Collection<String> keys) {
Map<String, Object> result = new LinkedHashMap<>();
keys.stream().forEach(s -> result.put(s, nodeValues.get(node).get(s)));
return result;
}
@Override
public Map<String, Map<String, List<Policy.ReplicaInfo>>> getReplicaInfo(String node, Collection<String> keys) {
return getReplicaDetails(node, clusterState);
}
@Override
public Collection<String> getNodes() {
return Arrays.asList("node1", "node2", "node3", "node4");
}
};
Map<String, List<String>> locations = Policy.getReplicaLocations("newColl", (Map<String, Object>) Utils.fromJSONString(autoScaleJson),
"policy1", dataProvider, Arrays.asList("shard1", "shard2"), 3);
System.out.println(Utils.toJSONString(locations));
}*/
public static String clusterState = "{'gettingstarted':{" +
" 'router':{'name':'compositeId'}," +
" 'shards':{" +
@ -216,7 +310,9 @@ public class TestPolicy extends SolrTestCaseJ4 {
" 'node_name':'node1'," +
" 'state':'active'}}}}}}";
public static Map<String, Map<String, List<Policy.ReplicaInfo>>> getReplicaDetails(String node, ValidatingJsonMap m) {
public static Map<String, Map<String, List<Policy.ReplicaInfo>>> getReplicaDetails(String node, String s) {
ValidatingJsonMap m = ValidatingJsonMap
.getDeepCopy((Map) Utils.fromJSONString(s), 6, true);
Map<String, Map<String, List<Policy.ReplicaInfo>>> result = new LinkedHashMap<>();
m.forEach((collName, o) -> {