suggesters return SolrRequest object , no map

This commit is contained in:
Noble Paul 2017-05-15 13:49:45 +09:30
parent 17490d30f3
commit 65744aa5d6
6 changed files with 137 additions and 92 deletions

View File

@ -45,6 +45,7 @@ import org.apache.solr.client.solrj.impl.ZkClientClusterStateProvider;
import org.apache.solr.client.solrj.request.AbstractUpdateRequest;
import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.client.solrj.response.UpdateResponse;
import org.apache.solr.cloud.autoscaling.Policy;
import org.apache.solr.cloud.autoscaling.PolicyHelper;
import org.apache.solr.cloud.overseer.OverseerAction;
import org.apache.solr.cloud.rule.ReplicaAssigner;
@ -99,6 +100,7 @@ import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.SOLR_AUTOSCALING_CONF_PATH;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.*;
import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
import static org.apache.solr.common.params.CommonParams.AUTOSCALING_PATH;
import static org.apache.solr.common.params.CommonParams.NAME;
import static org.apache.solr.common.util.Utils.makeMap;
@ -709,56 +711,64 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
List<String> shardNames,
int repFactor) throws IOException, KeeperException, InterruptedException {
List<Map> rulesMap = (List) message.get("rule");
Map m = zkStateReader.getZkClient().getJson(AUTOSCALING_PATH, true);
boolean useAutoScalingPolicy = false;
String policyName = message.getStr("policy");
if (rulesMap == null && policyName == null) {
int i = 0;
Map<Position, String> result = new HashMap<>();
for (String aShard : shardNames) {
for (int j = 0; j < repFactor; j++){
result.put(new Position(aShard, j), nodeList.get(i % nodeList.size()));
i++;
}
}
return result;
if (rulesMap != null && (m.get(Policy.CLUSTER_POLICY) == null || m.get(Policy.CLUSTER_PREFERENCE) != null || policyName == null)) {
useAutoScalingPolicy = true;
}
if (policyName != null) {
String collName = message.getStr(CommonParams.NAME, "coll_" + System.nanoTime());
try(CloudSolrClient csc = new CloudSolrClient.Builder()
.withClusterStateProvider(new ZkClientClusterStateProvider(zkStateReader))
.build()) {
SolrClientDataProvider clientDataProvider = new SolrClientDataProvider(csc);
Map<String, List<String>> locations = PolicyHelper.getReplicaLocations(collName,
zkStateReader.getZkClient().getJson(SOLR_AUTOSCALING_CONF_PATH, true),
clientDataProvider, shardNames, repFactor);
if (rulesMap == null && !useAutoScalingPolicy) {
int i = 0;
Map<Position, String> result = new HashMap<>();
for (Map.Entry<String, List<String>> e : locations.entrySet()) {
List<String> value = e.getValue();
for ( int i = 0; i < value.size(); i++) {
result.put(new Position(e.getKey(),i), value.get(i));
for (String aShard : shardNames) {
for (int j = 0; j < repFactor; j++) {
result.put(new Position(aShard, j), nodeList.get(i % nodeList.size()));
i++;
}
}
return result;
}
} else {
List<Rule> rules = new ArrayList<>();
for (Object map : rulesMap) rules.add(new Rule((Map) map));
Map<String, Integer> sharVsReplicaCount = new HashMap<>();
if (useAutoScalingPolicy) {
String tmpCollName = "coll_" + System.nanoTime();
String collName = message.getStr(CommonParams.NAME, tmpCollName);
try (CloudSolrClient csc = new CloudSolrClient.Builder()
.withClusterStateProvider(new ZkClientClusterStateProvider(zkStateReader))
.build()) {
SolrClientDataProvider clientDataProvider = new SolrClientDataProvider(csc);
Map<String, List<String>> locations = PolicyHelper.getReplicaLocations(collName,
zkStateReader.getZkClient().getJson(SOLR_AUTOSCALING_CONF_PATH, true),
clientDataProvider, Collections.singletonMap(tmpCollName, policyName), shardNames, repFactor);
Map<Position, String> result = new HashMap<>();
for (Map.Entry<String, List<String>> e : locations.entrySet()) {
List<String> value = e.getValue();
for (int i = 0; i < value.size(); i++) {
result.put(new Position(e.getKey(), i), value.get(i));
}
}
return result;
}
for (String shard : shardNames) sharVsReplicaCount.put(shard, repFactor);
ReplicaAssigner replicaAssigner = new ReplicaAssigner(rules,
sharVsReplicaCount,
(List<Map>) message.get(SNITCH),
new HashMap<>(),//this is a new collection. So, there are no nodes in any shard
nodeList,
overseer.getZkController().getCoreContainer(),
clusterState);
} else {
List<Rule> rules = new ArrayList<>();
for (Object map : rulesMap) rules.add(new Rule((Map) map));
return replicaAssigner.getNodeMappings();
Map<String, Integer> sharVsReplicaCount = new HashMap<>();
for (String shard : shardNames) sharVsReplicaCount.put(shard, repFactor);
ReplicaAssigner replicaAssigner = new ReplicaAssigner(rules,
sharVsReplicaCount,
(List<Map>) message.get(SNITCH),
new HashMap<>(),//this is a new collection. So, there are no nodes in any shard
nodeList,
overseer.getZkController().getCoreContainer(),
clusterState);
return replicaAssigner.getNodeMappings();
}
}
}

View File

@ -17,26 +17,19 @@
package org.apache.solr.cloud.autoscaling;
import java.util.Map;
import org.apache.solr.common.util.Pair;
import org.apache.solr.common.util.Utils;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.cloud.autoscaling.Policy.Suggester;
import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDREPLICA;
import static org.apache.solr.common.params.CoreAdminParams.NODE;
class AddReplicaSuggester extends Suggester {
Map init() {
Map operation = tryEachNode(true);
SolrRequest init() {
SolrRequest operation = tryEachNode(true);
if (operation == null) operation = tryEachNode(false);
return operation;
}
Map tryEachNode(boolean strict) {
SolrRequest tryEachNode(boolean strict) {
String coll = (String) hints.get(Hint.COLL);
String shard = (String) hints.get(Hint.SHARD);
if (coll == null || shard == null)
@ -52,10 +45,9 @@ class AddReplicaSuggester extends Suggester {
}
if (row.violations.isEmpty()) {// there are no rule violations
getMatrix().set(i, getMatrix().get(i).addReplica(coll, shard));
return Utils.makeMap("operation", ADDREPLICA.toLower(),
COLLECTION_PROP, coll,
SHARD_ID_PROP, shard,
NODE, row.node);
return CollectionAdminRequest
.addReplicaToShard(coll, shard)
.setNode(row.node);
}
}
return null;

View File

@ -19,6 +19,8 @@ package org.apache.solr.cloud.autoscaling;
import java.util.Map;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.cloud.autoscaling.Policy.Suggester;
import org.apache.solr.common.util.Pair;
import org.apache.solr.common.util.Utils;
@ -32,13 +34,13 @@ import static org.apache.solr.common.params.CoreAdminParams.REPLICA;
public class MoveReplicaSuggester extends Suggester {
@Override
Map init() {
Map operation = tryEachNode(true);
SolrRequest init() {
SolrRequest operation = tryEachNode(true);
if (operation == null) operation = tryEachNode(false);
return operation;
}
Map tryEachNode(boolean strict) {
SolrRequest tryEachNode(boolean strict) {
//iterate through elements and identify the least loaded
for (Pair<Policy.ReplicaInfo, Row> fromReplica : getValidReplicas(true, true, -1)) {
Row fromRow = fromReplica.second();
@ -69,12 +71,10 @@ public class MoveReplicaSuggester extends Suggester {
if (targetRow.violations.isEmpty()) {
getMatrix().set(i, getMatrix().get(i).removeReplica(coll, shard).first());
getMatrix().set(j, getMatrix().get(j).addReplica(coll, shard));
return Utils.makeMap("operation", MOVEREPLICA.toLower(),
COLLECTION_PROP, coll,
SHARD_ID_PROP, shard,
NODE, fromRow.node,
REPLICA, pair.second().name,
"targetNode", targetRow.node);
return new CollectionAdminRequest.MoveReplica(
coll,
pair.second().name,
targetRow.node);
}
}
}

View File

@ -33,6 +33,7 @@ import java.util.Set;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.common.IteratorWriter;
import org.apache.solr.common.MapWriter;
import org.apache.solr.common.params.CollectionParams.CollectionAction;
@ -300,7 +301,7 @@ public class Policy implements MapWriter {
public static abstract class Suggester {
protected final EnumMap<Hint, Object> hints = new EnumMap<>(Hint.class);
Policy.Session session;
Map operation;
SolrRequest operation;
private boolean isInitialized = false;
private void _init(Session session) {
@ -312,10 +313,10 @@ public class Policy implements MapWriter {
return this;
}
abstract Map init();
abstract SolrRequest init();
public Map getOperation() {
public SolrRequest getOperation() {
if (!isInitialized) {
this.operation = init();
isInitialized = true;

View File

@ -19,10 +19,12 @@ package org.apache.solr.cloud.autoscaling;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.params.CoreAdminParams;
import org.apache.solr.common.util.Utils;
@ -34,13 +36,40 @@ import static org.apache.solr.common.params.CollectionParams.CollectionAction.AD
public class PolicyHelper {
public static Map<String, List<String>> getReplicaLocations(String collName, Map<String, Object> autoScalingJson,
ClusterDataProvider cdp,
Map<String, String> optionalPolicyMapping,
List<String> shardNames,
int repFactor) {
Map<String, List<String>> positionMapping = new HashMap<>();
for (String shardName : shardNames) positionMapping.put(shardName, new ArrayList<>(repFactor));
if (optionalPolicyMapping != null) {
final ClusterDataProvider delegate = cdp;
cdp = new ClusterDataProvider() {
@Override
public Map<String, Object> getNodeValues(String node, Collection<String> tags) {
return delegate.getNodeValues(node, tags);
}
@Override
public Map<String, Map<String, List<Policy.ReplicaInfo>>> getReplicaInfo(String node, Collection<String> keys) {
return delegate.getReplicaInfo(node, keys);
}
@Override
public Collection<String> getNodes() {
return delegate.getNodes();
}
@Override
public String getPolicy(String coll) {
return optionalPolicyMapping.containsKey(coll) ?
optionalPolicyMapping.get(coll) :
delegate.getPolicy(coll);
}
};
}
// Map<String, Object> merged = Policy.mergePolicies(collName, policyJson, defaultPolicy);
Policy policy = new Policy(autoScalingJson);
Policy.Session session = policy.createSession(cdp);
for (String shardName : shardNames) {
@ -48,12 +77,12 @@ public class PolicyHelper {
Policy.Suggester suggester = session.getSuggester(ADDREPLICA)
.hint(Hint.COLL, collName)
.hint(Hint.SHARD, shardName);
Map op = suggester.getOperation();
SolrRequest op = suggester.getOperation();
if (op == null) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "No node can satisfy the rules "+ Utils.toJSONString(policy));
}
session = suggester.getSession();
positionMapping.get(shardName).add((String) op.get(CoreAdminParams.NODE));
positionMapping.get(shardName).add((String) op.getParams().get(CoreAdminParams.NODE));
}
}

View File

@ -29,8 +29,11 @@ import java.util.List;
import java.util.Map;
import org.apache.solr.SolrTestCaseJ4;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.request.CollectionAdminRequest.MoveReplica;
import org.apache.solr.cloud.autoscaling.Policy.Suggester.Hint;
import org.apache.solr.common.params.CollectionParams;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.util.Utils;
import org.apache.solr.common.util.ValidatingJsonMap;
@ -198,7 +201,7 @@ public class TestPolicy extends SolrTestCaseJ4 {
Policy.Suggester suggester = session.getSuggester(ADDREPLICA)
.hint(Hint.COLL, "gettingstarted")
.hint(Hint.SHARD, "r1");
Map operation = suggester.getOperation();
SolrParams operation = suggester.getOperation().getParams();
assertEquals("node2", operation.get("node"));
nodeValues = (Map<String, Map>) Utils.fromJSONString("{" +
@ -209,7 +212,10 @@ public class TestPolicy extends SolrTestCaseJ4 {
"node4:{cores:8, freedisk: 375, heap:16900, nodeRole:overseer}" +
"}");
session = policy.createSession(getClusterDataProvider(nodeValues, clusterState));
operation = session.getSuggester(MOVEREPLICA).hint(Hint.TARGET_NODE, "node5").getOperation();
operation = session.getSuggester(MOVEREPLICA)
.hint(Hint.TARGET_NODE, "node5")
.getOperation()
.getParams();
assertEquals("node5", operation.get("targetNode"));
@ -269,11 +275,11 @@ public class TestPolicy extends SolrTestCaseJ4 {
Policy.Suggester suggester = session.getSuggester(CollectionParams.CollectionAction.MOVEREPLICA)
.hint(Policy.Suggester.Hint.TARGET_NODE, "127.0.0.1:60099_solr");
Map op = suggester.getOperation();
SolrParams op = suggester.getOperation().getParams();
assertNotNull(op);
session = suggester.getSession();
suggester = session.getSuggester(MOVEREPLICA).hint(Hint.TARGET_NODE, "127.0.0.1:60099_solr");
op = suggester.getOperation();
op = suggester.getOperation().getParams();
assertNotNull(op);
}
@ -330,12 +336,12 @@ public class TestPolicy extends SolrTestCaseJ4 {
};
Policy.Session session = policy.createSession(cdp);
Map op = session
CollectionAdminRequest.AddReplica op = (CollectionAdminRequest.AddReplica) session
.getSuggester(ADDREPLICA)
.hint(Hint.COLL, "newColl")
.hint(Hint.SHARD, "s1").getOperation();
assertNotNull(op);
assertEquals("node2", op.get("node"));
assertEquals("node2", op.getNode());
}
@ -366,24 +372,25 @@ public class TestPolicy extends SolrTestCaseJ4 {
};
}
/*public void testMultiReplicaPlacement() {
/* public void testMultiReplicaPlacement() {
String autoScaleJson ="{" +
" 'cluster-preferences': [" +
" { 'minimize': 'freedisk', 'precision': 50}" +
" ]," +
" 'cluster-policy': [" +
" { 'nodeRole': '!overseer'}," +
" { 'replica': '<2', 'shard': '#EACH', 'node': '#ANY'" +
" }" +
" ]," +
" 'policies': {" +
" 'default': {" +
" 'conditions': [" +
" { 'nodeRole': '!overseer'}," +
" { 'replica': '<2', 'shard': '#EACH', node:'#ANY'}" +
" ]," +
" 'preferences':[" +
" {'minimize': 'freedisk', 'precision':50}]" +
" }," +
" 'policy1': {" +
" 'conditions': [" +
" { replica: '<2', shard: '#ANY', node:'#ANY'}," +
" { replica: '<2', shard:'#EACH', rack: rack1}," +
" { replica: '1', sysprop.fs: ssd, shard: '#EACH'}" +
" ], preferences : [ {maximize: freedisk, precision:50}]" +
"}}}";
" 'policy1': [" +
" { 'replica': '<2', 'shard': '#ANY', 'node': '#ANY'}," +
" { 'replica': '<2', 'shard': '#EACH', 'rack': 'rack1'}," +
" { 'replica': '1', 'sysprop.fs': 'ssd', 'shard': '#EACH'}" +
" ]" +
" }" +
"}";
Map<String,Map> nodeValues = (Map<String, Map>) Utils.fromJSONString( "{" +
"node1:{cores:12, freedisk: 334, heap:10480, rack:rack3}," +
@ -405,13 +412,19 @@ public class TestPolicy extends SolrTestCaseJ4 {
return getReplicaDetails(node, clusterState);
}
@Override
public String getPolicy(String coll) {
return null;
}
@Override
public Collection<String> getNodes() {
return Arrays.asList("node1", "node2", "node3", "node4");
}
};
Map<String, List<String>> locations = Policy.getReplicaLocations("newColl", (Map<String, Object>) Utils.fromJSONString(autoScaleJson),
"policy1", dataProvider, Arrays.asList("shard1", "shard2"), 3);
Map<String, List<String>> locations = PolicyHelper.getReplicaLocations(
"newColl", (Map<String, Object>) Utils.fromJSONString(autoScaleJson),
dataProvider, Collections.singletonMap("newColl", "policy1"), Arrays.asList("shard1", "shard2"), 3);
System.out.println(Utils.toJSONString(locations));