mirror of https://github.com/apache/lucene.git
SOLR-11614: ReplicaAssigner to use SolrCloudManager
This commit is contained in:
parent
eef8f30da2
commit
6c46569705
|
@ -124,7 +124,7 @@ public class AddReplicaCmd implements OverseerCollectionMessageHandler.Cmd {
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
node = getNodesForNewReplicas(clusterState, collection, shard, 1, node,
|
node = getNodesForNewReplicas(clusterState, collection, shard, 1, node,
|
||||||
ocmh.overseer.getSolrCloudManager(), ocmh.overseer.getCoreContainer()).get(0).nodeName;// TODO: use replica type in this logic too
|
ocmh.overseer.getSolrCloudManager()).get(0).nodeName;// TODO: use replica type in this logic too
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
log.info("Node Identified {} for creating new replica", node);
|
log.info("Node Identified {} for creating new replica", node);
|
||||||
|
|
|
@ -32,12 +32,12 @@ import java.util.stream.Collectors;
|
||||||
|
|
||||||
import com.google.common.collect.ImmutableMap;
|
import com.google.common.collect.ImmutableMap;
|
||||||
import org.apache.solr.client.solrj.cloud.autoscaling.AlreadyExistsException;
|
import org.apache.solr.client.solrj.cloud.autoscaling.AlreadyExistsException;
|
||||||
|
import org.apache.solr.client.solrj.cloud.autoscaling.AutoScalingConfig;
|
||||||
import org.apache.solr.client.solrj.cloud.autoscaling.BadVersionException;
|
import org.apache.solr.client.solrj.cloud.autoscaling.BadVersionException;
|
||||||
import org.apache.solr.client.solrj.cloud.autoscaling.DistribStateManager;
|
import org.apache.solr.client.solrj.cloud.autoscaling.DistribStateManager;
|
||||||
import org.apache.solr.client.solrj.cloud.autoscaling.PolicyHelper;
|
import org.apache.solr.client.solrj.cloud.autoscaling.PolicyHelper;
|
||||||
import org.apache.solr.client.solrj.cloud.autoscaling.SolrCloudManager;
|
import org.apache.solr.client.solrj.cloud.autoscaling.SolrCloudManager;
|
||||||
import org.apache.solr.client.solrj.cloud.autoscaling.VersionedData;
|
import org.apache.solr.client.solrj.cloud.autoscaling.VersionedData;
|
||||||
import org.apache.solr.client.solrj.cloud.autoscaling.AutoScalingConfig;
|
|
||||||
import org.apache.solr.cloud.rule.ReplicaAssigner;
|
import org.apache.solr.cloud.rule.ReplicaAssigner;
|
||||||
import org.apache.solr.cloud.rule.Rule;
|
import org.apache.solr.cloud.rule.Rule;
|
||||||
import org.apache.solr.common.SolrException;
|
import org.apache.solr.common.SolrException;
|
||||||
|
@ -49,7 +49,6 @@ import org.apache.solr.common.cloud.Slice;
|
||||||
import org.apache.solr.common.cloud.ZkNodeProps;
|
import org.apache.solr.common.cloud.ZkNodeProps;
|
||||||
import org.apache.solr.common.util.StrUtils;
|
import org.apache.solr.common.util.StrUtils;
|
||||||
import org.apache.solr.common.util.Utils;
|
import org.apache.solr.common.util.Utils;
|
||||||
import org.apache.solr.core.CoreContainer;
|
|
||||||
import org.apache.solr.util.NumberUtils;
|
import org.apache.solr.util.NumberUtils;
|
||||||
import org.apache.zookeeper.CreateMode;
|
import org.apache.zookeeper.CreateMode;
|
||||||
import org.apache.zookeeper.KeeperException;
|
import org.apache.zookeeper.KeeperException;
|
||||||
|
@ -284,7 +283,7 @@ public class Assign {
|
||||||
(List<Map>) message.get(SNITCH),
|
(List<Map>) message.get(SNITCH),
|
||||||
new HashMap<>(),//this is a new collection. So, there are no nodes in any shard
|
new HashMap<>(),//this is a new collection. So, there are no nodes in any shard
|
||||||
nodeList,
|
nodeList,
|
||||||
ocmh.overseer.getZkController().getCoreContainer(),
|
ocmh.overseer.getSolrCloudManager(),
|
||||||
clusterState);
|
clusterState);
|
||||||
|
|
||||||
Map<ReplicaPosition, String> nodeMappings = replicaAssigner.getNodeMappings();
|
Map<ReplicaPosition, String> nodeMappings = replicaAssigner.getNodeMappings();
|
||||||
|
@ -327,7 +326,7 @@ public class Assign {
|
||||||
// could be created on live nodes given maxShardsPerNode, Replication factor (if from createShard) etc.
|
// could be created on live nodes given maxShardsPerNode, Replication factor (if from createShard) etc.
|
||||||
public static List<ReplicaCount> getNodesForNewReplicas(ClusterState clusterState, String collectionName,
|
public static List<ReplicaCount> getNodesForNewReplicas(ClusterState clusterState, String collectionName,
|
||||||
String shard, int nrtReplicas,
|
String shard, int nrtReplicas,
|
||||||
Object createNodeSet, SolrCloudManager cloudManager, CoreContainer cc) throws IOException, InterruptedException {
|
Object createNodeSet, SolrCloudManager cloudManager) throws IOException, InterruptedException {
|
||||||
log.debug("getNodesForNewReplicas() shard: {} , replicas : {} , createNodeSet {}", shard, nrtReplicas, createNodeSet );
|
log.debug("getNodesForNewReplicas() shard: {} , replicas : {} , createNodeSet {}", shard, nrtReplicas, createNodeSet );
|
||||||
DocCollection coll = clusterState.getCollection(collectionName);
|
DocCollection coll = clusterState.getCollection(collectionName);
|
||||||
Integer maxShardsPerNode = coll.getMaxShardsPerNode();
|
Integer maxShardsPerNode = coll.getMaxShardsPerNode();
|
||||||
|
@ -360,7 +359,7 @@ public class Assign {
|
||||||
List<ReplicaPosition> replicaPositions = null;
|
List<ReplicaPosition> replicaPositions = null;
|
||||||
if (l != null) {
|
if (l != null) {
|
||||||
// TODO: make it so that this method doesn't require access to CC
|
// TODO: make it so that this method doesn't require access to CC
|
||||||
replicaPositions = getNodesViaRules(clusterState, shard, nrtReplicas, cc, coll, createNodeList, l);
|
replicaPositions = getNodesViaRules(clusterState, shard, nrtReplicas, cloudManager, coll, createNodeList, l);
|
||||||
}
|
}
|
||||||
String policyName = coll.getStr(POLICY);
|
String policyName = coll.getStr(POLICY);
|
||||||
AutoScalingConfig autoScalingConfig = cloudManager.getDistribStateManager().getAutoScalingConfig();
|
AutoScalingConfig autoScalingConfig = cloudManager.getDistribStateManager().getAutoScalingConfig();
|
||||||
|
@ -417,7 +416,7 @@ public class Assign {
|
||||||
}
|
}
|
||||||
|
|
||||||
private static List<ReplicaPosition> getNodesViaRules(ClusterState clusterState, String shard, int numberOfNodes,
|
private static List<ReplicaPosition> getNodesViaRules(ClusterState clusterState, String shard, int numberOfNodes,
|
||||||
CoreContainer cc, DocCollection coll, List<String> createNodeList, List l) {
|
SolrCloudManager cloudManager, DocCollection coll, List<String> createNodeList, List l) {
|
||||||
ArrayList<Rule> rules = new ArrayList<>();
|
ArrayList<Rule> rules = new ArrayList<>();
|
||||||
for (Object o : l) rules.add(new Rule((Map) o));
|
for (Object o : l) rules.add(new Rule((Map) o));
|
||||||
Map<String, Map<String, Integer>> shardVsNodes = new LinkedHashMap<>();
|
Map<String, Map<String, Integer>> shardVsNodes = new LinkedHashMap<>();
|
||||||
|
@ -439,7 +438,7 @@ public class Assign {
|
||||||
Collections.singletonMap(shard, numberOfNodes),
|
Collections.singletonMap(shard, numberOfNodes),
|
||||||
snitches,
|
snitches,
|
||||||
shardVsNodes,
|
shardVsNodes,
|
||||||
nodesList, cc, clusterState).getNodeMappings();
|
nodesList, cloudManager, clusterState).getNodeMappings();
|
||||||
|
|
||||||
return positions.entrySet().stream().map(e -> e.getKey().setNode(e.getValue())).collect(Collectors.toList());// getReplicaCounts(positions);
|
return positions.entrySet().stream().map(e -> e.getKey().setNode(e.getValue())).collect(Collectors.toList());// getReplicaCounts(positions);
|
||||||
}
|
}
|
||||||
|
|
|
@ -105,7 +105,7 @@ public class CreateShardCmd implements Cmd {
|
||||||
numPullReplicas);
|
numPullReplicas);
|
||||||
} else {
|
} else {
|
||||||
List<Assign.ReplicaCount> sortedNodeList = getNodesForNewReplicas(clusterState, collectionName, sliceName, totalReplicas,
|
List<Assign.ReplicaCount> sortedNodeList = getNodesForNewReplicas(clusterState, collectionName, sliceName, totalReplicas,
|
||||||
createNodeSetStr, ocmh.overseer.getSolrCloudManager(), ocmh.overseer.getCoreContainer());
|
createNodeSetStr, ocmh.overseer.getSolrCloudManager());
|
||||||
int i = 0;
|
int i = 0;
|
||||||
positions = new ArrayList<>();
|
positions = new ArrayList<>();
|
||||||
for (Map.Entry<Replica.Type, Integer> e : ImmutableMap.of(Replica.Type.NRT, numNrtReplicas,
|
for (Map.Entry<Replica.Type, Integer> e : ImmutableMap.of(Replica.Type.NRT, numNrtReplicas,
|
||||||
|
|
|
@ -30,23 +30,27 @@ import java.util.Set;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
import java.util.concurrent.atomic.AtomicReference;
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
|
||||||
|
import org.apache.solr.client.solrj.cloud.autoscaling.SolrCloudManager;
|
||||||
import org.apache.solr.common.SolrException;
|
import org.apache.solr.common.SolrException;
|
||||||
import org.apache.solr.common.cloud.ClusterState;
|
import org.apache.solr.common.cloud.ClusterState;
|
||||||
import org.apache.solr.common.cloud.DocCollection;
|
import org.apache.solr.common.cloud.DocCollection;
|
||||||
import org.apache.solr.common.cloud.ReplicaPosition;
|
|
||||||
import org.apache.solr.common.cloud.Replica;
|
import org.apache.solr.common.cloud.Replica;
|
||||||
|
import org.apache.solr.common.cloud.ReplicaPosition;
|
||||||
import org.apache.solr.common.cloud.Slice;
|
import org.apache.solr.common.cloud.Slice;
|
||||||
import org.apache.solr.common.cloud.rule.ImplicitSnitch;
|
import org.apache.solr.common.cloud.rule.ImplicitSnitch;
|
||||||
import org.apache.solr.common.cloud.rule.Snitch;
|
import org.apache.solr.common.cloud.rule.Snitch;
|
||||||
import org.apache.solr.common.cloud.rule.SnitchContext;
|
import org.apache.solr.common.cloud.rule.SnitchContext;
|
||||||
import org.apache.solr.common.util.Utils;
|
import org.apache.solr.common.util.Utils;
|
||||||
import org.apache.solr.core.CoreContainer;
|
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import static java.util.Collections.singletonList;
|
import static java.util.Collections.singletonList;
|
||||||
import static org.apache.solr.cloud.rule.Rule.MatchStatus.*;
|
import static org.apache.solr.cloud.rule.Rule.MatchStatus.NODE_CAN_BE_ASSIGNED;
|
||||||
import static org.apache.solr.cloud.rule.Rule.Phase.*;
|
import static org.apache.solr.cloud.rule.Rule.MatchStatus.NOT_APPLICABLE;
|
||||||
|
import static org.apache.solr.cloud.rule.Rule.Phase.ASSIGN;
|
||||||
|
import static org.apache.solr.cloud.rule.Rule.Phase.FUZZY_ASSIGN;
|
||||||
|
import static org.apache.solr.cloud.rule.Rule.Phase.FUZZY_VERIFY;
|
||||||
|
import static org.apache.solr.cloud.rule.Rule.Phase.VERIFY;
|
||||||
import static org.apache.solr.common.util.Utils.getDeepCopy;
|
import static org.apache.solr.common.util.Utils.getDeepCopy;
|
||||||
|
|
||||||
public class ReplicaAssigner {
|
public class ReplicaAssigner {
|
||||||
|
@ -71,12 +75,12 @@ public class ReplicaAssigner {
|
||||||
List snitches,
|
List snitches,
|
||||||
Map<String, Map<String, Integer>> shardVsNodes,
|
Map<String, Map<String, Integer>> shardVsNodes,
|
||||||
List<String> participatingLiveNodes,
|
List<String> participatingLiveNodes,
|
||||||
CoreContainer cc, ClusterState clusterState) {
|
SolrCloudManager cloudManager, ClusterState clusterState) {
|
||||||
this.rules = rules;
|
this.rules = rules;
|
||||||
for (Rule rule : rules) tagNames.add(rule.tag.name);
|
for (Rule rule : rules) tagNames.add(rule.tag.name);
|
||||||
this.shardVsReplicaCount = shardVsReplicaCount;
|
this.shardVsReplicaCount = shardVsReplicaCount;
|
||||||
this.participatingLiveNodes = new ArrayList<>(participatingLiveNodes);
|
this.participatingLiveNodes = new ArrayList<>(participatingLiveNodes);
|
||||||
this.nodeVsTags = getTagsForNodes(cc, snitches);
|
this.nodeVsTags = getTagsForNodes(cloudManager, snitches);
|
||||||
this.shardVsNodes = getDeepCopy(shardVsNodes, 2);
|
this.shardVsNodes = getDeepCopy(shardVsNodes, 2);
|
||||||
|
|
||||||
if (clusterState != null) {
|
if (clusterState != null) {
|
||||||
|
@ -309,12 +313,12 @@ public class ReplicaAssigner {
|
||||||
final Snitch snitch;
|
final Snitch snitch;
|
||||||
final Set<String> myTags = new HashSet<>();
|
final Set<String> myTags = new HashSet<>();
|
||||||
final Map<String, SnitchContext> nodeVsContext = new HashMap<>();
|
final Map<String, SnitchContext> nodeVsContext = new HashMap<>();
|
||||||
private final CoreContainer cc;
|
private final SolrCloudManager cloudManager;
|
||||||
|
|
||||||
SnitchInfoImpl(Map<String, Object> conf, Snitch snitch, CoreContainer cc) {
|
SnitchInfoImpl(Map<String, Object> conf, Snitch snitch, SolrCloudManager cloudManager) {
|
||||||
super(conf);
|
super(conf);
|
||||||
this.snitch = snitch;
|
this.snitch = snitch;
|
||||||
this.cc = cc;
|
this.cloudManager = cloudManager;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -328,13 +332,13 @@ public class ReplicaAssigner {
|
||||||
/**
|
/**
|
||||||
* This method uses the snitches and get the tags for all the nodes
|
* This method uses the snitches and get the tags for all the nodes
|
||||||
*/
|
*/
|
||||||
private Map<String, Map<String, Object>> getTagsForNodes(final CoreContainer cc, List snitchConf) {
|
private Map<String, Map<String, Object>> getTagsForNodes(final SolrCloudManager cloudManager, List snitchConf) {
|
||||||
|
|
||||||
Map<Class, SnitchInfoImpl> snitches = getSnitchInfos(cc, snitchConf);
|
Map<Class, SnitchInfoImpl> snitches = getSnitchInfos(cloudManager, snitchConf);
|
||||||
for (Class c : Snitch.WELL_KNOWN_SNITCHES) {
|
for (Class c : Snitch.WELL_KNOWN_SNITCHES) {
|
||||||
if (snitches.containsKey(c)) continue;// it is already specified explicitly , ignore
|
if (snitches.containsKey(c)) continue;// it is already specified explicitly , ignore
|
||||||
try {
|
try {
|
||||||
snitches.put(c, new SnitchInfoImpl(Collections.EMPTY_MAP, (Snitch) c.newInstance(), cc));
|
snitches.put(c, new SnitchInfoImpl(Collections.EMPTY_MAP, (Snitch) c.newInstance(), cloudManager));
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Error instantiating Snitch " + c.getName());
|
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Error instantiating Snitch " + c.getName());
|
||||||
}
|
}
|
||||||
|
@ -358,7 +362,7 @@ public class ReplicaAssigner {
|
||||||
//now use the Snitch to get the tags
|
//now use the Snitch to get the tags
|
||||||
for (SnitchInfoImpl info : snitches.values()) {
|
for (SnitchInfoImpl info : snitches.values()) {
|
||||||
if (!info.myTags.isEmpty()) {
|
if (!info.myTags.isEmpty()) {
|
||||||
SnitchContext context = getSnitchCtx(node, info, cc);
|
SnitchContext context = getSnitchCtx(node, info, cloudManager);
|
||||||
info.nodeVsContext.put(node, context);
|
info.nodeVsContext.put(node, context);
|
||||||
try {
|
try {
|
||||||
info.snitch.getTags(node, info.myTags, context);
|
info.snitch.getTags(node, info.myTags, context);
|
||||||
|
@ -401,16 +405,16 @@ public class ReplicaAssigner {
|
||||||
|
|
||||||
private Map<String, Object> snitchSession = new HashMap<>();
|
private Map<String, Object> snitchSession = new HashMap<>();
|
||||||
|
|
||||||
protected SnitchContext getSnitchCtx(String node, SnitchInfoImpl info, CoreContainer cc) {
|
protected SnitchContext getSnitchCtx(String node, SnitchInfoImpl info, SolrCloudManager cloudManager) {
|
||||||
return new ServerSnitchContext(info, node, snitchSession, cc);
|
return new ServerSnitchContext(info, node, snitchSession, cloudManager);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static void verifySnitchConf(CoreContainer cc, List snitchConf) {
|
public static void verifySnitchConf(SolrCloudManager cloudManager, List snitchConf) {
|
||||||
getSnitchInfos(cc, snitchConf);
|
getSnitchInfos(cloudManager, snitchConf);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
static Map<Class, SnitchInfoImpl> getSnitchInfos(CoreContainer cc, List snitchConf) {
|
static Map<Class, SnitchInfoImpl> getSnitchInfos(SolrCloudManager cloudManager, List snitchConf) {
|
||||||
if (snitchConf == null) snitchConf = Collections.emptyList();
|
if (snitchConf == null) snitchConf = Collections.emptyList();
|
||||||
Map<Class, SnitchInfoImpl> snitches = new LinkedHashMap<>();
|
Map<Class, SnitchInfoImpl> snitches = new LinkedHashMap<>();
|
||||||
for (Object o : snitchConf) {
|
for (Object o : snitchConf) {
|
||||||
|
@ -428,10 +432,9 @@ public class ReplicaAssigner {
|
||||||
}
|
}
|
||||||
try {
|
try {
|
||||||
if (klas.indexOf('.') == -1) klas = Snitch.class.getPackage().getName() + "." + klas;
|
if (klas.indexOf('.') == -1) klas = Snitch.class.getPackage().getName() + "." + klas;
|
||||||
Snitch inst = cc == null ?
|
Snitch inst =
|
||||||
(Snitch) Snitch.class.getClassLoader().loadClass(klas).newInstance() :
|
(Snitch) Snitch.class.getClassLoader().loadClass(klas).newInstance() ;
|
||||||
cc.getResourceLoader().newInstance(klas, Snitch.class);
|
snitches.put(inst.getClass(), new SnitchInfoImpl(map, inst, cloudManager));
|
||||||
snitches.put(inst.getClass(), new SnitchInfoImpl(map, inst, cc));
|
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
|
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
|
||||||
|
|
||||||
|
|
|
@ -25,7 +25,6 @@ import org.apache.solr.common.SolrException;
|
||||||
import org.apache.solr.common.util.StrUtils;
|
import org.apache.solr.common.util.StrUtils;
|
||||||
import org.apache.solr.common.util.Utils;
|
import org.apache.solr.common.util.Utils;
|
||||||
|
|
||||||
import static org.apache.solr.common.cloud.rule.ImplicitSnitch.CORES;
|
|
||||||
import static org.apache.solr.cloud.rule.Rule.MatchStatus.CANNOT_ASSIGN_FAIL;
|
import static org.apache.solr.cloud.rule.Rule.MatchStatus.CANNOT_ASSIGN_FAIL;
|
||||||
import static org.apache.solr.cloud.rule.Rule.MatchStatus.NODE_CAN_BE_ASSIGNED;
|
import static org.apache.solr.cloud.rule.Rule.MatchStatus.NODE_CAN_BE_ASSIGNED;
|
||||||
import static org.apache.solr.cloud.rule.Rule.MatchStatus.NOT_APPLICABLE;
|
import static org.apache.solr.cloud.rule.Rule.MatchStatus.NOT_APPLICABLE;
|
||||||
|
@ -35,6 +34,7 @@ import static org.apache.solr.cloud.rule.Rule.Operand.LESS_THAN;
|
||||||
import static org.apache.solr.cloud.rule.Rule.Operand.NOT_EQUAL;
|
import static org.apache.solr.cloud.rule.Rule.Operand.NOT_EQUAL;
|
||||||
import static org.apache.solr.common.cloud.ZkStateReader.REPLICA_PROP;
|
import static org.apache.solr.common.cloud.ZkStateReader.REPLICA_PROP;
|
||||||
import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
|
import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
|
||||||
|
import static org.apache.solr.common.cloud.rule.ImplicitSnitch.CORES;
|
||||||
|
|
||||||
|
|
||||||
public class Rule {
|
public class Rule {
|
||||||
|
@ -73,7 +73,8 @@ public class Rule {
|
||||||
if (o == null) return o;
|
if (o == null) return o;
|
||||||
if (typ == String.class) return String.valueOf(o);
|
if (typ == String.class) return String.valueOf(o);
|
||||||
if (typ == Integer.class) {
|
if (typ == Integer.class) {
|
||||||
return Integer.parseInt(String.valueOf(o));
|
Double v = Double.parseDouble(String.valueOf(o));
|
||||||
|
return v.intValue();
|
||||||
}
|
}
|
||||||
return o;
|
return o;
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,83 +19,40 @@ package org.apache.solr.cloud.rule;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.lang.invoke.MethodHandles;
|
import java.lang.invoke.MethodHandles;
|
||||||
import java.util.Collections;
|
import java.util.Collection;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
import org.apache.solr.client.solrj.SolrRequest;
|
import org.apache.solr.client.solrj.cloud.autoscaling.SolrCloudManager;
|
||||||
import org.apache.solr.client.solrj.SolrServerException;
|
|
||||||
import org.apache.solr.client.solrj.impl.BinaryResponseParser;
|
|
||||||
import org.apache.solr.client.solrj.impl.HttpSolrClient;
|
|
||||||
import org.apache.solr.client.solrj.request.GenericSolrRequest;
|
|
||||||
import org.apache.solr.client.solrj.response.SimpleSolrResponse;
|
|
||||||
import org.apache.solr.common.cloud.rule.RemoteCallback;
|
|
||||||
import org.apache.solr.common.cloud.rule.SnitchContext;
|
import org.apache.solr.common.cloud.rule.SnitchContext;
|
||||||
import org.apache.solr.common.params.CommonParams;
|
|
||||||
import org.apache.solr.common.params.ModifiableSolrParams;
|
|
||||||
import org.apache.solr.common.params.SolrParams;
|
|
||||||
import org.apache.solr.common.util.NamedList;
|
|
||||||
import org.apache.solr.common.util.Utils;
|
import org.apache.solr.common.util.Utils;
|
||||||
import org.apache.solr.core.CoreContainer;
|
|
||||||
import org.apache.solr.update.UpdateShardHandler;
|
|
||||||
import org.apache.zookeeper.KeeperException;
|
import org.apache.zookeeper.KeeperException;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import static org.apache.solr.common.params.CoreAdminParams.ACTION;
|
|
||||||
import static org.apache.solr.common.params.CoreAdminParams.CoreAdminAction.INVOKE;
|
|
||||||
|
|
||||||
public class ServerSnitchContext extends SnitchContext {
|
public class ServerSnitchContext extends SnitchContext {
|
||||||
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
||||||
|
|
||||||
final CoreContainer coreContainer;
|
SolrCloudManager cloudManager;
|
||||||
public ServerSnitchContext(SnitchInfo perSnitch,
|
public ServerSnitchContext(SnitchInfo perSnitch,
|
||||||
String node, Map<String, Object> session,
|
String node, Map<String, Object> session,
|
||||||
CoreContainer coreContainer) {
|
SolrCloudManager cloudManager) {
|
||||||
super(perSnitch, node, session);
|
super(perSnitch, node, session);
|
||||||
this.coreContainer = coreContainer;
|
this.cloudManager = cloudManager;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
public Map getZkJson(String path) throws KeeperException, InterruptedException {
|
public Map getZkJson(String path) throws KeeperException, InterruptedException {
|
||||||
if (coreContainer.isZooKeeperAware()) {
|
|
||||||
return Utils.getJson(coreContainer.getZkController().getZkClient(), path, true);
|
|
||||||
} else {
|
|
||||||
return Collections.emptyMap();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public void invokeRemote(String node, ModifiableSolrParams params, String klas, RemoteCallback callback) {
|
|
||||||
if (callback == null) callback = this;
|
|
||||||
params.add("class", klas);
|
|
||||||
params.add(ACTION, INVOKE.toString());
|
|
||||||
//todo batch all requests to the same server
|
|
||||||
|
|
||||||
try {
|
try {
|
||||||
SimpleSolrResponse rsp = invoke(node, CommonParams.CORES_HANDLER_PATH, params);
|
return Utils.getJson(cloudManager.getDistribStateManager(), path) ;
|
||||||
Map<String, Object> returnedVal = (Map<String, Object>) rsp.getResponse().get(klas);
|
} catch (IOException e) {
|
||||||
if(exception == null){
|
throw new RuntimeException(e);
|
||||||
// log this
|
|
||||||
} else {
|
|
||||||
callback.remoteCallback(ServerSnitchContext.this,returnedVal);
|
|
||||||
}
|
|
||||||
callback.remoteCallback(this, returnedVal);
|
|
||||||
} catch (Exception e) {
|
|
||||||
log.error("Unable to invoke snitch counterpart", e);
|
|
||||||
exception = e;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public SimpleSolrResponse invoke(String solrNode, String path, SolrParams params)
|
public Map<String, Object> getNodeValues(String node, Collection<String> tags){
|
||||||
throws IOException, SolrServerException {
|
return cloudManager.getNodeStateProvider().getNodeValues(node, tags);
|
||||||
String url = coreContainer.getZkController().getZkStateReader().getBaseUrlForNodeName(solrNode);
|
|
||||||
UpdateShardHandler shardHandler = coreContainer.getUpdateShardHandler();
|
|
||||||
GenericSolrRequest request = new GenericSolrRequest(SolrRequest.METHOD.GET, path, params);
|
|
||||||
try (HttpSolrClient client = new HttpSolrClient.Builder(url).withHttpClient(shardHandler.getHttpClient())
|
|
||||||
.withResponseParser(new BinaryResponseParser()).build()) {
|
|
||||||
NamedList<Object> rsp = client.request(request);
|
|
||||||
request.response.nl = rsp;
|
|
||||||
return request.response;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -1103,7 +1103,8 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
ReplicaAssigner.verifySnitchConf(cc, (List) m.get(SNITCH));
|
if (cc != null && cc.isZooKeeperAware())
|
||||||
|
ReplicaAssigner.verifySnitchConf(cc.getZkController().getSolrCloudManager(), (List) m.get(SNITCH));
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -17,6 +17,7 @@
|
||||||
|
|
||||||
package org.apache.solr.cloud.rule;
|
package org.apache.solr.cloud.rule;
|
||||||
|
|
||||||
|
import java.util.Collection;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
@ -27,9 +28,7 @@ import org.apache.solr.SolrTestCaseJ4;
|
||||||
import org.apache.solr.common.SolrException;
|
import org.apache.solr.common.SolrException;
|
||||||
import org.apache.solr.common.cloud.ZkStateReader;
|
import org.apache.solr.common.cloud.ZkStateReader;
|
||||||
import org.apache.solr.common.cloud.rule.ImplicitSnitch;
|
import org.apache.solr.common.cloud.rule.ImplicitSnitch;
|
||||||
import org.apache.solr.common.cloud.rule.RemoteCallback;
|
|
||||||
import org.apache.solr.common.cloud.rule.SnitchContext;
|
import org.apache.solr.common.cloud.rule.SnitchContext;
|
||||||
import org.apache.solr.common.params.ModifiableSolrParams;
|
|
||||||
import org.apache.zookeeper.KeeperException;
|
import org.apache.zookeeper.KeeperException;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
@ -226,8 +225,9 @@ public class ImplicitSnitchTest extends LuceneTestCase {
|
||||||
expectThrows(SolrException.class, KeeperException.ConnectionLossException.class, () -> implicitSnitch.getTags("", Collections.singleton(ImplicitSnitch.NODEROLE), keeperExceptionSnitch));
|
expectThrows(SolrException.class, KeeperException.ConnectionLossException.class, () -> implicitSnitch.getTags("", Collections.singleton(ImplicitSnitch.NODEROLE), keeperExceptionSnitch));
|
||||||
|
|
||||||
ServerSnitchContext remoteExceptionSnitch = new ServerSnitchContext(null, null, new HashMap<>(), null) {
|
ServerSnitchContext remoteExceptionSnitch = new ServerSnitchContext(null, null, new HashMap<>(), null) {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void invokeRemote(String node, ModifiableSolrParams params, String klas, RemoteCallback callback) {
|
public Map<String, Object> getNodeValues(String node, Collection<String> tags) {
|
||||||
throw new RuntimeException();
|
throw new RuntimeException();
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
|
@ -20,6 +20,7 @@ import java.io.IOException;
|
||||||
import java.nio.charset.StandardCharsets;
|
import java.nio.charset.StandardCharsets;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
|
import java.util.Collection;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
@ -28,12 +29,15 @@ import java.util.Set;
|
||||||
|
|
||||||
import com.google.common.collect.ImmutableList;
|
import com.google.common.collect.ImmutableList;
|
||||||
import org.apache.solr.SolrTestCaseJ4;
|
import org.apache.solr.SolrTestCaseJ4;
|
||||||
|
import org.apache.solr.client.solrj.cloud.autoscaling.DelegatingCloudManager;
|
||||||
|
import org.apache.solr.client.solrj.cloud.autoscaling.NodeStateProvider;
|
||||||
|
import org.apache.solr.client.solrj.cloud.autoscaling.ReplicaInfo;
|
||||||
|
import org.apache.solr.client.solrj.cloud.autoscaling.SolrCloudManager;
|
||||||
import org.apache.solr.common.cloud.ReplicaPosition;
|
import org.apache.solr.common.cloud.ReplicaPosition;
|
||||||
import org.apache.solr.common.cloud.ZkStateReader;
|
import org.apache.solr.common.cloud.ZkStateReader;
|
||||||
import org.apache.solr.common.cloud.rule.Snitch;
|
import org.apache.solr.common.cloud.rule.Snitch;
|
||||||
import org.apache.solr.common.cloud.rule.SnitchContext;
|
import org.apache.solr.common.cloud.rule.SnitchContext;
|
||||||
import org.apache.solr.common.util.Utils;
|
import org.apache.solr.common.util.Utils;
|
||||||
import org.apache.solr.core.CoreContainer;
|
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
import static java.util.Collections.singletonList;
|
import static java.util.Collections.singletonList;
|
||||||
|
@ -94,8 +98,8 @@ public class RuleEngineTest extends SolrTestCaseJ4{
|
||||||
new HashMap(), new ArrayList<>(MockSnitch.nodeVsTags.keySet()), null, null) {
|
new HashMap(), new ArrayList<>(MockSnitch.nodeVsTags.keySet()), null, null) {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected SnitchContext getSnitchCtx(String node, SnitchInfoImpl info, CoreContainer cc) {
|
protected SnitchContext getSnitchCtx(String node, SnitchInfoImpl info, SolrCloudManager cloudManager) {
|
||||||
return new ServerSnitchContext(info, node, snitchSession,cc){
|
return new ServerSnitchContext(info, node, snitchSession,cloudManager){
|
||||||
@Override
|
@Override
|
||||||
public Map getZkJson(String path) {
|
public Map getZkJson(String path) {
|
||||||
if(ZkStateReader.ROLES.equals(path)){
|
if(ZkStateReader.ROLES.equals(path)){
|
||||||
|
@ -189,6 +193,29 @@ public class RuleEngineTest extends SolrTestCaseJ4{
|
||||||
new HashMap(), new ArrayList<>(MockSnitch.nodeVsTags.keySet()), null, null).getNodeMappings();
|
new HashMap(), new ArrayList<>(MockSnitch.nodeVsTags.keySet()), null, null).getNodeMappings();
|
||||||
assertNotNull(mapping);
|
assertNotNull(mapping);
|
||||||
|
|
||||||
|
mapping = new ReplicaAssigner(
|
||||||
|
rules,
|
||||||
|
shardVsReplicaCount, Collections.emptyList(),
|
||||||
|
new HashMap(), new ArrayList<>(MockSnitch.nodeVsTags.keySet()), new DelegatingCloudManager(null){
|
||||||
|
@Override
|
||||||
|
public NodeStateProvider getNodeStateProvider() {
|
||||||
|
return new NodeStateProvider() {
|
||||||
|
@Override
|
||||||
|
public Map<String, Object> getNodeValues(String node, Collection<String> tags) {
|
||||||
|
return (Map<String, Object>) MockSnitch.nodeVsTags.get(node);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Map<String, Map<String, List<ReplicaInfo>>> getReplicaInfo(String node, Collection<String> keys) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
}, null).getNodeMappings();
|
||||||
|
assertNotNull(mapping);
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
rules = parseRules(
|
rules = parseRules(
|
||||||
"[{cores:'<4'}, " +
|
"[{cores:'<4'}, " +
|
||||||
"{replica:'1',shard:'**',host:'*'}]"
|
"{replica:'1',shard:'**',host:'*'}]"
|
||||||
|
|
|
@ -43,7 +43,6 @@ import org.apache.solr.common.cloud.ClusterState;
|
||||||
import org.apache.solr.common.cloud.DocCollection;
|
import org.apache.solr.common.cloud.DocCollection;
|
||||||
import org.apache.solr.common.cloud.ZkStateReader;
|
import org.apache.solr.common.cloud.ZkStateReader;
|
||||||
import org.apache.solr.common.cloud.rule.ImplicitSnitch;
|
import org.apache.solr.common.cloud.rule.ImplicitSnitch;
|
||||||
import org.apache.solr.common.cloud.rule.RemoteCallback;
|
|
||||||
import org.apache.solr.common.cloud.rule.SnitchContext;
|
import org.apache.solr.common.cloud.rule.SnitchContext;
|
||||||
import org.apache.solr.common.params.CommonParams;
|
import org.apache.solr.common.params.CommonParams;
|
||||||
import org.apache.solr.common.params.ModifiableSolrParams;
|
import org.apache.solr.common.params.ModifiableSolrParams;
|
||||||
|
@ -247,9 +246,6 @@ public class SolrClientNodeStateProvider implements NodeStateProvider, MapWriter
|
||||||
return Utils.getJson(zkClientClusterStateProvider.getZkStateReader().getZkClient(), path, true);
|
return Utils.getJson(zkClientClusterStateProvider.getZkStateReader().getZkClient(), path, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void invokeRemote(String node, ModifiableSolrParams params, String klas, RemoteCallback callback) {
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
public SimpleSolrResponse invoke(String solrNode, String path, SolrParams params)
|
public SimpleSolrResponse invoke(String solrNode, String path, SolrParams params)
|
||||||
throws IOException, SolrServerException {
|
throws IOException, SolrServerException {
|
||||||
|
|
|
@ -21,6 +21,7 @@ import java.net.InetAddress;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
|
import java.util.HashMap;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
@ -30,7 +31,6 @@ import java.util.regex.Pattern;
|
||||||
|
|
||||||
import org.apache.solr.common.SolrException;
|
import org.apache.solr.common.SolrException;
|
||||||
import org.apache.solr.common.cloud.ZkStateReader;
|
import org.apache.solr.common.cloud.ZkStateReader;
|
||||||
import org.apache.solr.common.params.ModifiableSolrParams;
|
|
||||||
import org.apache.zookeeper.KeeperException;
|
import org.apache.zookeeper.KeeperException;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
@ -79,13 +79,20 @@ public class ImplicitSnitch extends Snitch {
|
||||||
}
|
}
|
||||||
|
|
||||||
protected void getRemoteInfo(String solrNode, Set<String> requestedTags, SnitchContext ctx) {
|
protected void getRemoteInfo(String solrNode, Set<String> requestedTags, SnitchContext ctx) {
|
||||||
ModifiableSolrParams params = new ModifiableSolrParams();
|
HashMap<String, Object> params = new HashMap<>();
|
||||||
if (requestedTags.contains(CORES)) params.add(CORES, "1");
|
if (requestedTags.contains(CORES)) params.put(CORES, "1");
|
||||||
if (requestedTags.contains(DISK)) params.add(DISK, "1");
|
if (requestedTags.contains(DISK)) params.put(DISK, "1");
|
||||||
for (String tag : requestedTags) {
|
for (String tag : requestedTags) {
|
||||||
if (tag.startsWith(SYSPROP)) params.add(SYSPROP, tag.substring(SYSPROP.length()));
|
if (tag.startsWith(SYSPROP)) params.put(tag, tag.substring(SYSPROP.length()));
|
||||||
}
|
}
|
||||||
if (params.size() > 0) ctx.invokeRemote(solrNode, params, "org.apache.solr.cloud.rule.ImplicitSnitch", null);
|
|
||||||
|
if (params.size() > 0) {
|
||||||
|
Map<String, Object> vals = ctx.getNodeValues(solrNode, params.keySet());
|
||||||
|
for (Map.Entry<String, Object> e : vals.entrySet()) {
|
||||||
|
if(e.getValue() != null) params.put(e.getKey(), e.getValue());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
ctx.getTags().putAll(params);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void fillRole(String solrNode, SnitchContext ctx, String key) throws KeeperException, InterruptedException {
|
private void fillRole(String solrNode, SnitchContext ctx, String key) throws KeeperException, InterruptedException {
|
||||||
|
|
|
@ -17,6 +17,8 @@
|
||||||
package org.apache.solr.common.cloud.rule;
|
package org.apache.solr.common.cloud.rule;
|
||||||
|
|
||||||
import java.lang.invoke.MethodHandles;
|
import java.lang.invoke.MethodHandles;
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.Collections;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
@ -58,6 +60,9 @@ public abstract class SnitchContext implements RemoteCallback {
|
||||||
return session != null ? session.get(s) : null;
|
return session != null ? session.get(s) : null;
|
||||||
|
|
||||||
}
|
}
|
||||||
|
public Map<String, Object> getNodeValues(String node, Collection<String> tags){
|
||||||
|
return Collections.emptyMap();
|
||||||
|
}
|
||||||
|
|
||||||
public abstract Map getZkJson(String path) throws KeeperException, InterruptedException;
|
public abstract Map getZkJson(String path) throws KeeperException, InterruptedException;
|
||||||
|
|
||||||
|
@ -75,7 +80,8 @@ public abstract class SnitchContext implements RemoteCallback {
|
||||||
* @param callback The callback to be called when the response is obtained from remote node.
|
* @param callback The callback to be called when the response is obtained from remote node.
|
||||||
* If this is passed as null the entire response map will be added as tags
|
* If this is passed as null the entire response map will be added as tags
|
||||||
*/
|
*/
|
||||||
public abstract void invokeRemote(String node, ModifiableSolrParams params, String klas, RemoteCallback callback) ;
|
@Deprecated
|
||||||
|
public void invokeRemote(String node, ModifiableSolrParams params, String klas, RemoteCallback callback) {};
|
||||||
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -40,6 +40,8 @@ import java.util.regex.Pattern;
|
||||||
|
|
||||||
import org.apache.http.HttpEntity;
|
import org.apache.http.HttpEntity;
|
||||||
import org.apache.http.util.EntityUtils;
|
import org.apache.http.util.EntityUtils;
|
||||||
|
import org.apache.solr.client.solrj.cloud.autoscaling.DistribStateManager;
|
||||||
|
import org.apache.solr.client.solrj.cloud.autoscaling.VersionedData;
|
||||||
import org.apache.solr.client.solrj.impl.BinaryRequestWriter;
|
import org.apache.solr.client.solrj.impl.BinaryRequestWriter;
|
||||||
import org.apache.solr.common.IteratorWriter;
|
import org.apache.solr.common.IteratorWriter;
|
||||||
import org.apache.solr.common.MapWriter;
|
import org.apache.solr.common.MapWriter;
|
||||||
|
@ -366,6 +368,17 @@ public class Utils {
|
||||||
while (is.read() != -1) {}
|
while (is.read() != -1) {}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static Map<String, Object> getJson(DistribStateManager distribStateManager, String path) throws InterruptedException, IOException, KeeperException {
|
||||||
|
VersionedData data = null;
|
||||||
|
try {
|
||||||
|
data = distribStateManager.getData(path);
|
||||||
|
} catch (KeeperException.NoNodeException e) {
|
||||||
|
return Collections.emptyMap();
|
||||||
|
}
|
||||||
|
if (data == null || data.getData() == null || data.getData().length == 0) return Collections.emptyMap();
|
||||||
|
return (Map<String, Object>) Utils.fromJSON(data.getData());
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Assumes data in ZooKeeper is a JSON string, deserializes it and returns as a Map
|
* Assumes data in ZooKeeper is a JSON string, deserializes it and returns as a Map
|
||||||
*
|
*
|
||||||
|
|
Loading…
Reference in New Issue