mirror of https://github.com/apache/lucene.git
SOLR-6220: Rule Based Replica Assignment during collection creation
git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1677607 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
bf1355346c
commit
7a771c84a2
|
@ -163,6 +163,8 @@ New Features
|
|||
* SOLR-7231: DIH-TikaEntityprocessor, create lat-lon field from Metadata
|
||||
(Tim Allison via Noble Paul)
|
||||
|
||||
* SOLR-6220: Rule Based Replica Assignment during collection creation (Noble Paul)
|
||||
|
||||
|
||||
Bug Fixes
|
||||
----------------------
|
||||
|
|
|
@ -811,7 +811,7 @@ public class Overseer implements Closeable {
|
|||
|
||||
ThreadGroup ccTg = new ThreadGroup("Overseer collection creation process.");
|
||||
|
||||
overseerCollectionProcessor = new OverseerCollectionProcessor(reader, id, shardHandler, adminPath, stats);
|
||||
overseerCollectionProcessor = new OverseerCollectionProcessor(reader, id, shardHandler, adminPath, stats, Overseer.this);
|
||||
ccThread = new OverseerThread(ccTg, overseerCollectionProcessor, "OverseerCollectionProcessor-" + id);
|
||||
ccThread.setDaemon(true);
|
||||
|
||||
|
@ -830,6 +830,10 @@ public class Overseer implements Closeable {
|
|||
return stats;
|
||||
}
|
||||
|
||||
ZkController getZkController(){
|
||||
return zkController;
|
||||
}
|
||||
|
||||
/**
|
||||
* For tests.
|
||||
*
|
||||
|
|
|
@ -21,6 +21,7 @@ import static org.apache.solr.cloud.Assign.*;
|
|||
import static org.apache.solr.common.cloud.ZkStateReader.*;
|
||||
import static org.apache.solr.common.params.CollectionParams.CollectionAction.*;
|
||||
import static org.apache.solr.common.params.CommonParams.*;
|
||||
import static org.apache.solr.common.util.StrUtils.formatString;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
|
@ -53,8 +54,12 @@ import org.apache.solr.client.solrj.response.UpdateResponse;
|
|||
import org.apache.solr.cloud.Assign.Node;
|
||||
import org.apache.solr.cloud.DistributedQueue.QueueEvent;
|
||||
import org.apache.solr.cloud.Overseer.LeaderStatus;
|
||||
import org.apache.solr.cloud.rule.Rule;
|
||||
import org.apache.solr.cloud.rule.ReplicaAssigner;
|
||||
import org.apache.solr.cloud.rule.ReplicaAssigner.Position;
|
||||
import org.apache.solr.cloud.overseer.ClusterStateMutator;
|
||||
import org.apache.solr.cloud.overseer.OverseerAction;
|
||||
import org.apache.solr.cloud.rule.SnitchContext;
|
||||
import org.apache.solr.common.SolrException;
|
||||
import org.apache.solr.common.SolrException.ErrorCode;
|
||||
import org.apache.solr.common.cloud.Aliases;
|
||||
|
@ -134,7 +139,9 @@ public class OverseerCollectionProcessor implements Runnable, Closeable {
|
|||
ROUTER, DocRouter.DEFAULT_NAME,
|
||||
ZkStateReader.REPLICATION_FACTOR, "1",
|
||||
ZkStateReader.MAX_SHARDS_PER_NODE, "1",
|
||||
ZkStateReader.AUTO_ADD_REPLICAS, "false");
|
||||
ZkStateReader.AUTO_ADD_REPLICAS, "false",
|
||||
"rule", null,
|
||||
"snitch",null);
|
||||
|
||||
static final Random RANDOM;
|
||||
static {
|
||||
|
@ -185,13 +192,15 @@ public class OverseerCollectionProcessor implements Runnable, Closeable {
|
|||
// deleted from the work-queue as that is a batched operation.
|
||||
final private Set<String> runningZKTasks;
|
||||
private final Object waitLock = new Object();
|
||||
private Overseer overseer;
|
||||
|
||||
public OverseerCollectionProcessor(ZkStateReader zkStateReader, String myId,
|
||||
final ShardHandler shardHandler,
|
||||
String adminPath, Overseer.Stats stats) {
|
||||
String adminPath, Overseer.Stats stats, Overseer overseer) {
|
||||
this(zkStateReader, myId, shardHandler.getShardHandlerFactory(), adminPath, stats, Overseer.getCollectionQueue(zkStateReader.getZkClient(), stats),
|
||||
Overseer.getRunningMap(zkStateReader.getZkClient()),
|
||||
Overseer.getCompletedMap(zkStateReader.getZkClient()), Overseer.getFailureMap(zkStateReader.getZkClient()));
|
||||
this.overseer = overseer;
|
||||
}
|
||||
|
||||
protected OverseerCollectionProcessor(ZkStateReader zkStateReader, String myId,
|
||||
|
@ -2343,6 +2352,8 @@ public class OverseerCollectionProcessor implements Runnable, Closeable {
|
|||
+ ". This requires " + requestedShardsToCreate
|
||||
+ " shards to be created (higher than the allowed number)");
|
||||
}
|
||||
|
||||
Map<Position, String> positionVsNodes = identifyNodes(clusterState, nodeList, message, shardNames, repFactor);
|
||||
boolean isLegacyCloud = Overseer.isLegacy(zkStateReader.getClusterProps());
|
||||
|
||||
createConfNode(configName, collectionName, isLegacyCloud);
|
||||
|
@ -2363,62 +2374,60 @@ public class OverseerCollectionProcessor implements Runnable, Closeable {
|
|||
// For tracking async calls.
|
||||
HashMap<String, String> requestMap = new HashMap<String, String>();
|
||||
|
||||
log.info("Creating SolrCores for new collection {}, shardNames {} , replicationFactor : {}",
|
||||
collectionName, shardNames, repFactor);
|
||||
|
||||
log.info(formatString("Creating SolrCores for new collection {0}, shardNames {1} , replicationFactor : {2}",
|
||||
collectionName, shardNames, repFactor));
|
||||
Map<String ,ShardRequest> coresToCreate = new LinkedHashMap<>();
|
||||
for (int i = 1; i <= shardNames.size(); i++) {
|
||||
String sliceName = shardNames.get(i-1);
|
||||
for (int j = 1; j <= repFactor; j++) {
|
||||
String nodeName = nodeList.get((repFactor * (i - 1) + (j - 1)) % nodeList.size());
|
||||
String coreName = collectionName + "_" + sliceName + "_replica" + j;
|
||||
log.info("Creating shard " + coreName + " as part of slice "
|
||||
+ sliceName + " of collection " + collectionName + " on "
|
||||
+ nodeName);
|
||||
for (Map.Entry<Position, String> e : positionVsNodes.entrySet()) {
|
||||
Position position = e.getKey();
|
||||
String nodeName = e.getValue();
|
||||
String coreName = collectionName + "_" + position.shard + "_replica" + (position.index + 1);
|
||||
log.info(formatString("Creating core {0} as part of shard {1} of collection {2} on {3}"
|
||||
, coreName, position.shard, collectionName, nodeName));
|
||||
|
||||
|
||||
String baseUrl = zkStateReader.getBaseUrlForNodeName(nodeName);
|
||||
//in the new mode, create the replica in clusterstate prior to creating the core.
|
||||
// Otherwise the core creation fails
|
||||
if(!isLegacyCloud){
|
||||
ZkNodeProps props = new ZkNodeProps(
|
||||
Overseer.QUEUE_OPERATION, ADDREPLICA.toString(),
|
||||
ZkStateReader.COLLECTION_PROP, collectionName,
|
||||
ZkStateReader.SHARD_ID_PROP, sliceName,
|
||||
ZkStateReader.CORE_NAME_PROP, coreName,
|
||||
ZkStateReader.STATE_PROP, Replica.State.DOWN.toString(),
|
||||
ZkStateReader.BASE_URL_PROP,baseUrl);
|
||||
Overseer.getInQueue(zkStateReader.getZkClient()).offer(ZkStateReader.toJSON(props));
|
||||
}
|
||||
String baseUrl = zkStateReader.getBaseUrlForNodeName(nodeName);
|
||||
//in the new mode, create the replica in clusterstate prior to creating the core.
|
||||
// Otherwise the core creation fails
|
||||
if (!isLegacyCloud) {
|
||||
ZkNodeProps props = new ZkNodeProps(
|
||||
Overseer.QUEUE_OPERATION, ADDREPLICA.toString(),
|
||||
ZkStateReader.COLLECTION_PROP, collectionName,
|
||||
ZkStateReader.SHARD_ID_PROP, position.shard,
|
||||
ZkStateReader.CORE_NAME_PROP, coreName,
|
||||
ZkStateReader.STATE_PROP, Replica.State.DOWN.toString(),
|
||||
ZkStateReader.BASE_URL_PROP, baseUrl);
|
||||
Overseer.getInQueue(zkStateReader.getZkClient()).offer(ZkStateReader.toJSON(props));
|
||||
}
|
||||
|
||||
// Need to create new params for each request
|
||||
ModifiableSolrParams params = new ModifiableSolrParams();
|
||||
params.set(CoreAdminParams.ACTION, CoreAdminAction.CREATE.toString());
|
||||
// Need to create new params for each request
|
||||
ModifiableSolrParams params = new ModifiableSolrParams();
|
||||
params.set(CoreAdminParams.ACTION, CoreAdminAction.CREATE.toString());
|
||||
|
||||
params.set(CoreAdminParams.NAME, coreName);
|
||||
params.set(COLL_CONF, configName);
|
||||
params.set(CoreAdminParams.COLLECTION, collectionName);
|
||||
params.set(CoreAdminParams.SHARD, sliceName);
|
||||
params.set(ZkStateReader.NUM_SHARDS_PROP, numSlices);
|
||||
params.set(CoreAdminParams.NAME, coreName);
|
||||
params.set(COLL_CONF, configName);
|
||||
params.set(CoreAdminParams.COLLECTION, collectionName);
|
||||
params.set(CoreAdminParams.SHARD, position.shard);
|
||||
params.set(ZkStateReader.NUM_SHARDS_PROP, numSlices);
|
||||
|
||||
if (async != null) {
|
||||
String coreAdminAsyncId = async + Math.abs(System.nanoTime());
|
||||
params.add(ASYNC, coreAdminAsyncId);
|
||||
requestMap.put(nodeName, coreAdminAsyncId);
|
||||
}
|
||||
addPropertyParams(message, params);
|
||||
if (async != null) {
|
||||
String coreAdminAsyncId = async + Math.abs(System.nanoTime());
|
||||
params.add(ASYNC, coreAdminAsyncId);
|
||||
requestMap.put(nodeName, coreAdminAsyncId);
|
||||
}
|
||||
addPropertyParams(message, params);
|
||||
|
||||
ShardRequest sreq = new ShardRequest();
|
||||
params.set("qt", adminPath);
|
||||
sreq.purpose = 1;
|
||||
sreq.shards = new String[] {baseUrl};
|
||||
sreq.actualShards = sreq.shards;
|
||||
sreq.params = params;
|
||||
ShardRequest sreq = new ShardRequest();
|
||||
params.set("qt", adminPath);
|
||||
sreq.purpose = 1;
|
||||
sreq.shards = new String[]{baseUrl};
|
||||
sreq.actualShards = sreq.shards;
|
||||
sreq.params = params;
|
||||
|
||||
if(isLegacyCloud) {
|
||||
shardHandler.submit(sreq, sreq.shards[0], sreq.params);
|
||||
} else {
|
||||
coresToCreate.put(coreName, sreq);
|
||||
}
|
||||
if (isLegacyCloud) {
|
||||
shardHandler.submit(sreq, sreq.shards[0], sreq.params);
|
||||
} else {
|
||||
coresToCreate.put(coreName, sreq);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -2446,6 +2455,57 @@ public class OverseerCollectionProcessor implements Runnable, Closeable {
|
|||
}
|
||||
}
|
||||
|
||||
private Map<Position, String> identifyNodes(ClusterState clusterState,
|
||||
List<String> nodeList,
|
||||
ZkNodeProps message,
|
||||
List<String> shardNames,
|
||||
int repFactor) throws IOException {
|
||||
List<Map> maps = (List) message.get("rule");
|
||||
if (maps == null) {
|
||||
int i = 0;
|
||||
Map<Position, String> result = new HashMap<>();
|
||||
for (String aShard : shardNames) {
|
||||
for (int j = 0; j < repFactor; j++){
|
||||
result.put(new Position(aShard, j), nodeList.get(i % nodeList.size()));
|
||||
i++;
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
List<Rule> rules = new ArrayList<>();
|
||||
for (Object map : maps) rules.add(new Rule((Map) map));
|
||||
|
||||
Map<String, Integer> sharVsReplicaCount = new HashMap<>();
|
||||
|
||||
for (String shard : shardNames) sharVsReplicaCount.put(shard, repFactor);
|
||||
maps = (List<Map>) message.get("snitch");
|
||||
List snitchList = maps == null? Collections.emptyList(): maps;
|
||||
ReplicaAssigner replicaAssigner = new ReplicaAssigner(rules,
|
||||
sharVsReplicaCount,
|
||||
snitchList,
|
||||
new HashMap<>(),//this is a new collection. So, there are no nodes in any shard
|
||||
nodeList,
|
||||
overseer.getZkController().getCoreContainer(),
|
||||
clusterState);
|
||||
|
||||
Map<Position, String> nodeMappings = replicaAssigner.getNodeMappings();
|
||||
if(nodeMappings == null){
|
||||
String msg = "Could not identify nodes matching the rules " + rules ;
|
||||
if(!replicaAssigner.failedNodes.isEmpty()){
|
||||
Map<String, String> failedNodes = new HashMap<>();
|
||||
for (Map.Entry<String, SnitchContext> e : replicaAssigner.failedNodes.entrySet()) {
|
||||
failedNodes.put(e.getKey(), e.getValue().getErrMsg());
|
||||
}
|
||||
msg+=" Some nodes where excluded from assigning replicas because tags could not be obtained from them "+ failedNodes;
|
||||
}
|
||||
msg+= ZkStateReader.toJSONString(replicaAssigner.getNodeVsTags());
|
||||
|
||||
throw new SolrException(ErrorCode.BAD_REQUEST, msg);
|
||||
}
|
||||
return nodeMappings;
|
||||
}
|
||||
|
||||
private Map<String, Replica> waitToSeeReplicasInState(String collectionName, Collection<String> coreNames) throws InterruptedException {
|
||||
Map<String, Replica> result = new HashMap<>();
|
||||
long endTime = System.nanoTime() + TimeUnit.NANOSECONDS.convert(30, TimeUnit.SECONDS);
|
||||
|
|
|
@ -0,0 +1,92 @@
|
|||
package org.apache.solr.cloud.rule;
|
||||
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Paths;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
import com.google.common.collect.ImmutableSet;
|
||||
import org.apache.solr.common.params.ModifiableSolrParams;
|
||||
import org.apache.solr.core.CoreContainer;
|
||||
import org.apache.solr.handler.admin.CoreAdminHandler;
|
||||
import org.apache.solr.request.SolrQueryRequest;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
public class ImplicitSnitch extends Snitch implements CoreAdminHandler.Invocable {
|
||||
static final Logger log = LoggerFactory.getLogger(ImplicitSnitch.class);
|
||||
|
||||
//well known tags
|
||||
public static final String NODE = "node";
|
||||
public static final String PORT = "port";
|
||||
public static final String HOST = "host";
|
||||
public static final String CORES = "cores";
|
||||
public static final String DISK = "disk";
|
||||
public static final String SYSPROP = "D.";
|
||||
|
||||
public static final Set<String> tags = ImmutableSet.of(NODE, PORT, HOST, CORES, DISK);
|
||||
|
||||
|
||||
@Override
|
||||
public void getTags(String solrNode, Set<String> requestedTags, SnitchContext ctx) {
|
||||
if (requestedTags.contains(NODE)) ctx.getTags().put(NODE, solrNode);
|
||||
if (requestedTags.contains(HOST)) ctx.getTags().put(HOST, solrNode.substring(0, solrNode.indexOf(':')));
|
||||
ModifiableSolrParams params = new ModifiableSolrParams();
|
||||
if (requestedTags.contains(CORES)) params.add(CORES, "1");
|
||||
if (requestedTags.contains(DISK)) params.add(DISK, "1");
|
||||
for (String tag : requestedTags) {
|
||||
if (tag.startsWith(SYSPROP)) params.add(SYSPROP, tag.substring(SYSPROP.length()));
|
||||
}
|
||||
if (params.size() > 0) ctx.invokeRemote(solrNode, params, ImplicitSnitch.class.getName(), null);
|
||||
}
|
||||
|
||||
public Map<String, Object> invoke(SolrQueryRequest req) {
|
||||
Map<String, Object> result = new HashMap<>();
|
||||
if (req.getParams().getInt(CORES, -1) == 1) {
|
||||
CoreContainer cc = (CoreContainer) req.getContext().get(CoreContainer.class.getName());
|
||||
result.put(CORES, cc.getCoreNames().size());
|
||||
}
|
||||
if (req.getParams().getInt(DISK, -1) == 1) {
|
||||
try {
|
||||
long space = Files.getFileStore(Paths.get("/")).getUsableSpace();
|
||||
long spaceInGB = space / 1024 / 1024 / 1024;
|
||||
result.put(DISK, spaceInGB);
|
||||
} catch (IOException e) {
|
||||
|
||||
}
|
||||
}
|
||||
String[] sysProps = req.getParams().getParams(SYSPROP);
|
||||
if (sysProps != null && sysProps.length > 0) {
|
||||
for (String prop : sysProps) result.put(prop, System.getProperty(prop));
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public boolean isKnownTag(String tag) {
|
||||
return tags.contains(tag) ||
|
||||
tag.startsWith(SYSPROP);//a system property
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,24 @@
|
|||
package org.apache.solr.cloud.rule;
|
||||
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
public interface RemoteCallback {
|
||||
public void remoteCallback(SnitchContext ctx, Map<String, Object> returnedVal) ;
|
||||
}
|
|
@ -0,0 +1,451 @@
|
|||
package org.apache.solr.cloud.rule;
|
||||
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.BitSet;
|
||||
import java.util.Collections;
|
||||
import java.util.Comparator;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import org.apache.solr.common.SolrException;
|
||||
import org.apache.solr.common.cloud.ClusterState;
|
||||
import org.apache.solr.common.cloud.DocCollection;
|
||||
import org.apache.solr.common.cloud.Replica;
|
||||
import org.apache.solr.common.cloud.Slice;
|
||||
import org.apache.solr.common.util.StrUtils;
|
||||
import org.apache.solr.core.CoreContainer;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import static java.util.Collections.singletonList;
|
||||
import static org.apache.solr.cloud.rule.Rule.MatchStatus.NODE_CAN_BE_ASSIGNED;
|
||||
import static org.apache.solr.cloud.rule.Rule.Phase.ASSIGN;
|
||||
import static org.apache.solr.cloud.rule.Rule.Phase.FUZZY_ASSIGN;
|
||||
import static org.apache.solr.cloud.rule.Rule.Phase.FUZZY_VERIFY;
|
||||
import static org.apache.solr.cloud.rule.Rule.Phase.VERIFY;
|
||||
import static org.apache.solr.common.util.StrUtils.formatString;
|
||||
import static org.apache.solr.core.RequestParams.getDeepCopy;
|
||||
|
||||
public class ReplicaAssigner {
|
||||
public static final Logger log = LoggerFactory.getLogger(ReplicaAssigner.class);
|
||||
List<Rule> rules;
|
||||
Map<String, Integer> shardVsReplicaCount;
|
||||
Map<String, Map<String, Object>> nodeVsTags;
|
||||
Map<String, Set<String>> shardVsNodes;
|
||||
List<String> liveNodes;
|
||||
Set<String> tagNames = new HashSet<>();
|
||||
private Map<String, AtomicInteger> nodeVsCores = new HashMap<>();
|
||||
|
||||
|
||||
public static class Position implements Comparable<Position> {
|
||||
public final String shard;
|
||||
public final int index;
|
||||
|
||||
public Position(String shard, int replicaIdx) {
|
||||
this.shard = shard;
|
||||
this.index = replicaIdx;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int compareTo(Position that) {
|
||||
//this is to ensure that we try one replica from each shard first instead of
|
||||
// all replicas from same shard
|
||||
return that.index > index ? -1 : that.index == index ? 0 : 1;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return shard + ":" + index;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* @param shardVsReplicaCount shard names vs no:of replicas required for each of those shards
|
||||
* @param snitches snitches details
|
||||
* @param shardVsNodes The current state of the system. can be an empty map if no nodes
|
||||
* are created in this collection till now
|
||||
*/
|
||||
public ReplicaAssigner(List<Rule> rules,
|
||||
Map<String, Integer> shardVsReplicaCount,
|
||||
List snitches,
|
||||
Map<String, Set<String>> shardVsNodes,
|
||||
List<String> liveNodes,
|
||||
CoreContainer cc, ClusterState clusterState) {
|
||||
this.rules = rules;
|
||||
for (Rule rule : rules) tagNames.add(rule.tag.name);
|
||||
this.shardVsReplicaCount = shardVsReplicaCount;
|
||||
this.liveNodes = new ArrayList<>(liveNodes);
|
||||
this.nodeVsTags = getTagsForNodes(cc, snitches);
|
||||
this.shardVsNodes = getDeepCopy(shardVsNodes, 2);
|
||||
validateTags(nodeVsTags);
|
||||
|
||||
if (clusterState != null) {
|
||||
for (String s : clusterState.getCollections()) {
|
||||
DocCollection coll = clusterState.getCollection(s);
|
||||
for (Slice slice : coll.getSlices()) {
|
||||
for (Replica replica : slice.getReplicas()) {
|
||||
AtomicInteger count = nodeVsCores.get(replica.getNodeName());
|
||||
if (count == null) nodeVsCores.put(replica.getNodeName(), count = new AtomicInteger());
|
||||
count.incrementAndGet();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public Map<String, Map<String, Object>> getNodeVsTags() {
|
||||
return nodeVsTags;
|
||||
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* For each shard return a new set of nodes where the replicas need to be created satisfying
|
||||
* the specified rule
|
||||
*/
|
||||
public Map<Position, String> getNodeMappings() {
|
||||
List<String> shardNames = new ArrayList<>(shardVsReplicaCount.keySet());
|
||||
int[] shardOrder = new int[shardNames.size()];
|
||||
for (int i = 0; i < shardNames.size(); i++) shardOrder[i] = i;
|
||||
|
||||
boolean hasFuzzyRules = false;
|
||||
int nonWildCardShardRules = 0;
|
||||
for (Rule r : rules) {
|
||||
if (r.isFuzzy()) hasFuzzyRules = true;
|
||||
if (!r.shard.isWildCard()) {
|
||||
nonWildCardShardRules++;
|
||||
//we will have to try all combinations
|
||||
if (shardNames.size() > 10) {
|
||||
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
|
||||
"Max 10 shards allowed if there is a non wild card shard specified in rule");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Map<Position, String> result = tryAllPermutations(shardNames, shardOrder, nonWildCardShardRules, false);
|
||||
if (result == null && hasFuzzyRules) {
|
||||
result = tryAllPermutations(shardNames, shardOrder, nonWildCardShardRules, true);
|
||||
}
|
||||
return result;
|
||||
|
||||
}
|
||||
|
||||
private Map<Position, String> tryAllPermutations(List<String> shardNames,
|
||||
int[] shardOrder,
|
||||
int nonWildCardShardRules,
|
||||
boolean fuzzyPhase) {
|
||||
|
||||
|
||||
Iterator<int[]> shardPermutations = nonWildCardShardRules > 0 ?
|
||||
permutations(shardNames.size()) :
|
||||
singletonList(shardOrder).iterator();
|
||||
|
||||
for (; shardPermutations.hasNext(); ) {
|
||||
int[] p = shardPermutations.next();
|
||||
for (int i = 0; i < p.length; i++) {
|
||||
List<Position> positions = new ArrayList<>();
|
||||
for (int pos : p) {
|
||||
for (int j = 0; j < shardVsReplicaCount.get(shardNames.get(pos)); j++) {
|
||||
positions.add(new Position(shardNames.get(pos), j));
|
||||
}
|
||||
}
|
||||
Collections.sort(positions);
|
||||
for (Iterator<int[]> it = permutations(rules.size()); it.hasNext(); ) {
|
||||
int[] permutation = it.next();
|
||||
Map<Position, String> result = tryAPermutationOfRules(permutation, positions, fuzzyPhase);
|
||||
if (result != null) return result;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
|
||||
private Map<Position, String> tryAPermutationOfRules(int[] rulePermutation, List<Position> positions, boolean fuzzyPhase) {
|
||||
Map<String, Map<String, Object>> nodeVsTagsCopy = getDeepCopy(nodeVsTags, 2);
|
||||
Map<Position, String> result = new LinkedHashMap<>();
|
||||
int startPosition = 0;
|
||||
Map<String, Set<String>> copyOfCurrentState = getDeepCopy(shardVsNodes, 2);
|
||||
List<String> sortedLiveNodes = new ArrayList<>(this.liveNodes);
|
||||
Collections.sort(sortedLiveNodes, new Comparator<String>() {
|
||||
@Override
|
||||
public int compare(String n1, String n2) {
|
||||
int result = 0;
|
||||
for (int i = 0; i < rulePermutation.length; i++) {
|
||||
Rule rule = rules.get(rulePermutation[i]);
|
||||
int val = rule.compare(n1, n2, nodeVsTagsCopy, copyOfCurrentState);
|
||||
if (val != 0) {//atleast one non-zero compare break now
|
||||
result = val;
|
||||
break;
|
||||
}
|
||||
if (result == 0) {//if all else is equal, prefer nodes with fewer cores
|
||||
AtomicInteger n1Count = nodeVsCores.get(n1);
|
||||
AtomicInteger n2Count = nodeVsCores.get(n2);
|
||||
int a = n1Count == null ? 0 : n1Count.get();
|
||||
int b = n2Count == null ? 0 : n2Count.get();
|
||||
result = a > b ? 1 : a == b ? 0 : -1;
|
||||
}
|
||||
|
||||
}
|
||||
return result;
|
||||
}
|
||||
});
|
||||
forEachPosition:
|
||||
for (Position position : positions) {
|
||||
//trying to assign a node by verifying each rule in this rulePermutation
|
||||
forEachNode:
|
||||
for (int j = 0; j < sortedLiveNodes.size(); j++) {
|
||||
String liveNode = sortedLiveNodes.get(startPosition % sortedLiveNodes.size());
|
||||
startPosition++;
|
||||
for (int i = 0; i < rulePermutation.length; i++) {
|
||||
Rule rule = rules.get(rulePermutation[i]);
|
||||
//trying to assign a replica into this node in this shard
|
||||
Rule.MatchStatus status = rule.tryAssignNodeToShard(liveNode,
|
||||
copyOfCurrentState, nodeVsTagsCopy, position.shard, fuzzyPhase ? FUZZY_ASSIGN : ASSIGN);
|
||||
if (status == Rule.MatchStatus.CANNOT_ASSIGN_FAIL) {
|
||||
continue forEachNode;//try another node for this position
|
||||
}
|
||||
}
|
||||
//We have reached this far means this node can be applied to this position
|
||||
//and all rules are fine. So let us change the currentState
|
||||
result.put(position, liveNode);
|
||||
Set<String> nodeNames = copyOfCurrentState.get(position.shard);
|
||||
if (nodeNames == null) copyOfCurrentState.put(position.shard, nodeNames = new HashSet<>());
|
||||
nodeNames.add(liveNode);
|
||||
Number coreCount = (Number) nodeVsTagsCopy.get(liveNode).get(ImplicitSnitch.CORES);
|
||||
if (coreCount != null) {
|
||||
nodeVsTagsCopy.get(liveNode).put(ImplicitSnitch.CORES, coreCount.intValue() + 1);
|
||||
}
|
||||
|
||||
continue forEachPosition;
|
||||
}
|
||||
//if it reached here, we could not find a node for this position
|
||||
return null;
|
||||
}
|
||||
|
||||
if (positions.size() > result.size()) {
|
||||
return null;
|
||||
}
|
||||
|
||||
for (Map.Entry<Position, String> e : result.entrySet()) {
|
||||
for (int i = 0; i < rulePermutation.length; i++) {
|
||||
Rule rule = rules.get(rulePermutation[i]);
|
||||
Rule.MatchStatus matchStatus = rule.tryAssignNodeToShard(e.getValue(),
|
||||
copyOfCurrentState, nodeVsTagsCopy, e.getKey().shard, fuzzyPhase ? FUZZY_VERIFY : VERIFY);
|
||||
if (matchStatus != NODE_CAN_BE_ASSIGNED) return null;
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
private void validateTags(Map<String, Map<String, Object>> nodeVsTags) {
|
||||
List<String> errors = new ArrayList<>();
|
||||
for (Rule rule : rules) {
|
||||
for (Map.Entry<String, Map<String, Object>> e : nodeVsTags.entrySet()) {
|
||||
if (e.getValue().get(rule.tag.name) == null) {
|
||||
errors.add(formatString("The value for tag {0} is not available for node {}"));
|
||||
}
|
||||
}
|
||||
}
|
||||
if (!errors.isEmpty()) {
|
||||
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, StrUtils.join(errors, ','));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* get all permutations for the int[] whose items are 0..level
|
||||
*/
|
||||
public static Iterator<int[]> permutations(final int level) {
|
||||
return new Iterator<int[]>() {
|
||||
int i = 0;
|
||||
int[] next;
|
||||
|
||||
@Override
|
||||
public boolean hasNext() {
|
||||
AtomicReference<int[]> nthval = new AtomicReference<>();
|
||||
permute(0, new int[level], new BitSet(level), nthval, i, new AtomicInteger());
|
||||
i++;
|
||||
next = nthval.get();
|
||||
return next != null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int[] next() {
|
||||
return next;
|
||||
}
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
|
||||
private static void permute(int level, int[] permuted, BitSet used, AtomicReference<int[]> nthval,
|
||||
int requestedIdx, AtomicInteger seenSoFar) {
|
||||
if (level == permuted.length) {
|
||||
if (seenSoFar.get() == requestedIdx) nthval.set(permuted);
|
||||
else seenSoFar.incrementAndGet();
|
||||
} else {
|
||||
for (int i = 0; i < permuted.length; i++) {
|
||||
if (!used.get(i)) {
|
||||
used.set(i);
|
||||
permuted[level] = i;
|
||||
permute(level + 1, permuted, used, nthval, requestedIdx, seenSoFar);
|
||||
if (nthval.get() != null) break;
|
||||
used.set(i, false);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
public Map<String, SnitchContext> failedNodes = new HashMap<>();
|
||||
|
||||
/**
|
||||
* This method uses the snitches and get the tags for all the nodes
|
||||
*/
|
||||
private Map<String, Map<String, Object>> getTagsForNodes(final CoreContainer cc, List snitchConf) {
|
||||
|
||||
class Info extends SnitchContext.SnitchInfo {
|
||||
final Snitch snitch;
|
||||
final Set<String> myTags = new HashSet<>();
|
||||
final Map<String, SnitchContext> nodeVsContext = new HashMap<>();
|
||||
|
||||
Info(Map<String, Object> conf, Snitch snitch) {
|
||||
super(conf);
|
||||
this.snitch = snitch;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Set<String> getTagNames() {
|
||||
return myTags;
|
||||
}
|
||||
|
||||
@Override
|
||||
public CoreContainer getCoreContainer() {
|
||||
return cc;
|
||||
}
|
||||
}
|
||||
|
||||
Map<Class, Info> snitches = new LinkedHashMap<>();
|
||||
for (Object o : snitchConf) {
|
||||
//instantiating explicitly specified snitches
|
||||
String klas = null;
|
||||
Map map = Collections.emptyMap();
|
||||
if (o instanceof Map) {//it can be a Map
|
||||
map = (Map) o;
|
||||
klas = (String) map.get("class");
|
||||
if (klas == null) {
|
||||
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "snitch must have a class attribute");
|
||||
}
|
||||
} else { //or just the snitch name
|
||||
klas = o.toString();
|
||||
}
|
||||
try {
|
||||
if (klas.indexOf('.') == -1) klas = Snitch.class.getPackage().getName() + "." + klas;
|
||||
Snitch inst = cc == null ?
|
||||
(Snitch) Snitch.class.getClassLoader().loadClass(klas).newInstance() :
|
||||
cc.getResourceLoader().newInstance(klas, Snitch.class);
|
||||
snitches.put(inst.getClass(), new Info(map, inst));
|
||||
} catch (Exception e) {
|
||||
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
for (Class c : Snitch.WELL_KNOWN_SNITCHES) {
|
||||
if (snitches.containsKey(c)) continue;// it is already specified explicitly , ignore
|
||||
try {
|
||||
snitches.put(c, new Info(Collections.EMPTY_MAP, (Snitch) c.newInstance()));
|
||||
} catch (Exception e) {
|
||||
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Error instantiating Snitch " + c.getName());
|
||||
}
|
||||
}
|
||||
for (String tagName : tagNames) {
|
||||
//identify which snitch is going to provide values for a given tag
|
||||
boolean foundProvider = false;
|
||||
for (Info info : snitches.values()) {
|
||||
if (info.snitch.isKnownTag(tagName)) {
|
||||
foundProvider = true;
|
||||
info.myTags.add(tagName);
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (!foundProvider)
|
||||
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Unknown tag in rules " + tagName);
|
||||
}
|
||||
|
||||
|
||||
for (String node : liveNodes) {
|
||||
//now use the Snitch to get the tags
|
||||
for (Info info : snitches.values()) {
|
||||
if (!info.myTags.isEmpty()) {
|
||||
SnitchContext context = new SnitchContext(info, node);
|
||||
info.nodeVsContext.put(node, context);
|
||||
try {
|
||||
info.snitch.getTags(node, info.myTags, context);
|
||||
} catch (Exception e) {
|
||||
context.exception = e;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Map<String, Map<String, Object>> result = new HashMap<>();
|
||||
for (Info info : snitches.values()) {
|
||||
for (Map.Entry<String, SnitchContext> e : info.nodeVsContext.entrySet()) {
|
||||
SnitchContext context = e.getValue();
|
||||
String node = e.getKey();
|
||||
if (context.exception != null) {
|
||||
failedNodes.put(node, context);
|
||||
liveNodes.remove(node);
|
||||
log.warn("Not all tags were obtained from node " + node);
|
||||
context.exception = new SolrException(SolrException.ErrorCode.SERVER_ERROR,
|
||||
"Not all tags were obtained from node " + node);
|
||||
} else {
|
||||
if (context.getTags().keySet().containsAll(context.snitchInfo.getTagNames())) {
|
||||
Map<String, Object> tags = result.get(node);
|
||||
if (tags == null) {
|
||||
tags = new HashMap<>();
|
||||
result.put(node, tags);
|
||||
}
|
||||
tags.putAll(context.getTags());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (liveNodes.isEmpty()) {
|
||||
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Could not get all tags for any nodes");
|
||||
|
||||
}
|
||||
return result;
|
||||
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,380 @@
|
|||
package org.apache.solr.cloud.rule;
|
||||
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.solr.common.SolrException;
|
||||
import org.apache.solr.common.cloud.ZkStateReader;
|
||||
import org.apache.solr.common.util.StrUtils;
|
||||
|
||||
import static org.apache.solr.cloud.rule.ImplicitSnitch.CORES;
|
||||
import static org.apache.solr.cloud.rule.Rule.MatchStatus.CANNOT_ASSIGN_FAIL;
|
||||
import static org.apache.solr.cloud.rule.Rule.MatchStatus.NODE_CAN_BE_ASSIGNED;
|
||||
import static org.apache.solr.cloud.rule.Rule.MatchStatus.NOT_APPLICABLE;
|
||||
import static org.apache.solr.cloud.rule.Rule.Operand.EQUAL;
|
||||
import static org.apache.solr.cloud.rule.Rule.Operand.GREATER_THAN;
|
||||
import static org.apache.solr.cloud.rule.Rule.Operand.LESS_THAN;
|
||||
import static org.apache.solr.cloud.rule.Rule.Operand.NOT_EQUAL;
|
||||
import static org.apache.solr.common.cloud.ZkStateReader.REPLICA_PROP;
|
||||
import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
|
||||
|
||||
|
||||
public class Rule {
|
||||
public static final String WILD_CARD = "*";
|
||||
public static final String WILD_WILD_CARD = "**";
|
||||
static final Condition SHARD_DEFAULT = new Rule.Condition(SHARD_ID_PROP, WILD_WILD_CARD);
|
||||
static final Condition REPLICA_DEFAULT = new Rule.Condition(REPLICA_PROP, WILD_CARD);
|
||||
Condition shard;
|
||||
Condition replica;
|
||||
Condition tag;
|
||||
|
||||
public Rule(Map m) {
|
||||
for (Object o : m.entrySet()) {
|
||||
Map.Entry e = (Map.Entry) o;
|
||||
Condition condition = new Condition(String.valueOf(e.getKey()), String.valueOf(e.getValue()));
|
||||
if (condition.name.equals(SHARD_ID_PROP)) shard = condition;
|
||||
else if (condition.name.equals(REPLICA_PROP)) replica = condition;
|
||||
else {
|
||||
if (tag != null) {
|
||||
throw new RuntimeException("There can be only one and only one tag other than 'shard' and 'replica' in rule " + m);
|
||||
}
|
||||
tag = condition;
|
||||
}
|
||||
|
||||
}
|
||||
if (shard == null) shard = SHARD_DEFAULT;
|
||||
if (replica == null) replica = REPLICA_DEFAULT;
|
||||
if (tag == null) throw new RuntimeException("There should be a tag other than 'shard' and 'replica'");
|
||||
if (replica.isWildCard() && tag.isWildCard()) {
|
||||
throw new RuntimeException("Both replica and tag cannot be wild cards");
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
static Object parseObj(Object o, Class typ) {
|
||||
if (o == null) return o;
|
||||
if (typ == String.class) return String.valueOf(o);
|
||||
if (typ == Integer.class) {
|
||||
return Integer.parseInt(String.valueOf(o));
|
||||
}
|
||||
return o;
|
||||
}
|
||||
|
||||
public static Map parseRule(String s) {
|
||||
Map<String, String> result = new LinkedHashMap<>();
|
||||
s = s.trim();
|
||||
List<String> keyVals = StrUtils.splitSmart(s, ',');
|
||||
for (String kv : keyVals) {
|
||||
List<String> keyVal = StrUtils.splitSmart(kv, ':');
|
||||
if (keyVal.size() != 2) {
|
||||
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Invalid rule. should have only key and val in : " + kv);
|
||||
}
|
||||
if (keyVal.get(0).trim().length() == 0 || keyVal.get(1).trim().length() == 0) {
|
||||
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Invalid rule. should have key and val in : " + kv);
|
||||
}
|
||||
result.put(keyVal.get(0).trim(), keyVal.get(1).trim());
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
Map map = new LinkedHashMap();
|
||||
if (shard != SHARD_DEFAULT) map.put(shard.name, shard.operand.toStr(shard.val));
|
||||
if (replica != REPLICA_DEFAULT) map.put(replica.name, replica.operand.toStr(replica.val));
|
||||
map.put(tag.name, tag.operand.toStr(tag.val));
|
||||
return ZkStateReader.toJSONString(map);
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if it is possible to assign this node as a replica of the given shard
|
||||
* without violating this rule
|
||||
*
|
||||
* @param testNode The node in question
|
||||
* @param shardVsNodeSet
|
||||
* @param nodeVsTags The pre-fetched tags for all the nodes
|
||||
* @param shardName The shard to which this node should be attempted
|
||||
* @return
|
||||
*/
|
||||
MatchStatus tryAssignNodeToShard(String testNode,
|
||||
Map<String, Set<String>> shardVsNodeSet,
|
||||
Map<String, Map<String, Object>> nodeVsTags,
|
||||
String shardName, Phase phase) {
|
||||
|
||||
if (tag.isWildCard()) {
|
||||
//this is ensuring uniqueness across a certain tag
|
||||
//eg: rack:r168
|
||||
if (!shard.isWildCard() && shardName.equals(shard.val)) return NOT_APPLICABLE;
|
||||
Object tagValueForThisNode = nodeVsTags.get(testNode).get(tag.name);
|
||||
int v = getNumberOfNodesWithSameTagVal(shard, nodeVsTags, shardVsNodeSet,
|
||||
shardName, new Condition(tag.name, tagValueForThisNode, EQUAL), phase);
|
||||
if (phase == Phase.ASSIGN || phase == Phase.FUZZY_ASSIGN)
|
||||
v++;//v++ because including this node , it becomes v+1 during ASSIGN
|
||||
return replica.canMatch(v, phase) ?
|
||||
NODE_CAN_BE_ASSIGNED :
|
||||
CANNOT_ASSIGN_FAIL;
|
||||
} else {
|
||||
if (!shard.isWildCard() && !shardName.equals(shard.val)) return NOT_APPLICABLE;
|
||||
if (replica.isWildCard()) {
|
||||
//this means for each replica, the value must match
|
||||
//shard match is already tested
|
||||
if (tag.canMatch(nodeVsTags.get(testNode).get(tag.name), phase)) return NODE_CAN_BE_ASSIGNED;
|
||||
else return CANNOT_ASSIGN_FAIL;
|
||||
} else {
|
||||
int v = getNumberOfNodesWithSameTagVal(shard, nodeVsTags, shardVsNodeSet, shardName, tag, phase);
|
||||
return replica.canMatch(v, phase) ? NODE_CAN_BE_ASSIGNED : CANNOT_ASSIGN_FAIL;
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
private int getNumberOfNodesWithSameTagVal(Condition shardCondition,
|
||||
Map<String, Map<String, Object>> nodeVsTags,
|
||||
Map<String, Set<String>> shardVsNodeSet,
|
||||
String shardName,
|
||||
Condition tagCondition,
|
||||
Phase phase) {
|
||||
|
||||
int countMatchingThisTagValue = 0;
|
||||
for (Map.Entry<String, Set<String>> entry : shardVsNodeSet.entrySet()) {
|
||||
//check if this shard is relevant. either it is a ANY Wild card (**)
|
||||
// or this shard is same as the shard in question
|
||||
if (shardCondition.val.equals(WILD_WILD_CARD) || entry.getKey().equals(shardName)) {
|
||||
Set<String> nodesInThisShard = shardVsNodeSet.get(shardCondition.val.equals(WILD_WILD_CARD) ? entry.getKey() : shardName);
|
||||
if (nodesInThisShard != null) {
|
||||
for (String aNode : nodesInThisShard) {
|
||||
Object obj = nodeVsTags.get(aNode).get(tag.name);
|
||||
if (tagCondition.canMatch(obj, phase)) countMatchingThisTagValue++;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return countMatchingThisTagValue;
|
||||
}
|
||||
|
||||
public int compare(String n1, String n2,
|
||||
Map<String, Map<String, Object>> nodeVsTags,
|
||||
Map<String, Set<String>> currentState) {
|
||||
return tag.compare(n1, n2, nodeVsTags);
|
||||
}
|
||||
|
||||
public boolean isFuzzy() {
|
||||
return shard.fuzzy || replica.fuzzy || tag.fuzzy;
|
||||
}
|
||||
|
||||
public enum Operand {
|
||||
EQUAL(""),
|
||||
NOT_EQUAL("!") {
|
||||
@Override
|
||||
public boolean canMatch(Object ruleVal, Object testVal) {
|
||||
return !super.canMatch(ruleVal, testVal);
|
||||
}
|
||||
},
|
||||
GREATER_THAN(">") {
|
||||
@Override
|
||||
public Object match(String val) {
|
||||
return checkNumeric(super.match(val));
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public boolean canMatch(Object ruleVal, Object testVal) {
|
||||
return compareNum(ruleVal, testVal) == 1;
|
||||
}
|
||||
|
||||
},
|
||||
LESS_THAN("<") {
|
||||
@Override
|
||||
public int compare(Object n1Val, Object n2Val) {
|
||||
return GREATER_THAN.compare(n1Val, n2Val) * -1;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean canMatch(Object ruleVal, Object testVal) {
|
||||
return compareNum(ruleVal, testVal) == -1;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object match(String val) {
|
||||
return checkNumeric(super.match(val));
|
||||
}
|
||||
};
|
||||
public final String operand;
|
||||
|
||||
Operand(String val) {
|
||||
this.operand = val;
|
||||
}
|
||||
|
||||
public String toStr(Object expectedVal) {
|
||||
return operand + expectedVal.toString();
|
||||
}
|
||||
|
||||
Object checkNumeric(Object val) {
|
||||
if (val == null) return null;
|
||||
try {
|
||||
return Integer.parseInt(val.toString());
|
||||
} catch (NumberFormatException e) {
|
||||
throw new RuntimeException("for operand " + operand + " the value must be numeric");
|
||||
}
|
||||
}
|
||||
|
||||
public Object match(String val) {
|
||||
if (operand.isEmpty()) return val;
|
||||
return val.startsWith(operand) ? val.substring(1) : null;
|
||||
}
|
||||
|
||||
public boolean canMatch(Object ruleVal, Object testVal) {
|
||||
return Objects.equals(String.valueOf(ruleVal), String.valueOf(testVal));
|
||||
}
|
||||
|
||||
|
||||
public int compare(Object n1Val, Object n2Val) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
public int compareNum(Object n1Val, Object n2Val) {
|
||||
Integer n1 = (Integer) parseObj(n1Val, Integer.class);
|
||||
Integer n2 = (Integer) parseObj(n2Val, Integer.class);
|
||||
return n1 > n2 ? -1 : Objects.equals(n1, n2) ? 0 : 1;
|
||||
}
|
||||
}
|
||||
|
||||
enum MatchStatus {
|
||||
NODE_CAN_BE_ASSIGNED,
|
||||
CANNOT_ASSIGN_GO_AHEAD,
|
||||
NOT_APPLICABLE,
|
||||
CANNOT_ASSIGN_FAIL
|
||||
}
|
||||
|
||||
enum Phase {
|
||||
ASSIGN, VERIFY, FUZZY_ASSIGN, FUZZY_VERIFY
|
||||
}
|
||||
|
||||
public static class Condition {
|
||||
public final String name;
|
||||
final Object val;
|
||||
public final Operand operand;
|
||||
final boolean fuzzy;
|
||||
|
||||
Condition(String name, Object val, Operand op) {
|
||||
this.name = name;
|
||||
this.val = val;
|
||||
this.operand = op;
|
||||
fuzzy = false;
|
||||
}
|
||||
|
||||
Condition(String key, Object val) {
|
||||
Object expectedVal;
|
||||
boolean fuzzy = false;
|
||||
if (val == null) throw new RuntimeException("value of a tag cannot be null for key " + key);
|
||||
try {
|
||||
this.name = key.trim();
|
||||
String value = val.toString().trim();
|
||||
if (value.endsWith("~")) {
|
||||
fuzzy = true;
|
||||
value = value.substring(0, value.length() - 1);
|
||||
}
|
||||
if ((expectedVal = NOT_EQUAL.match(value)) != null) {
|
||||
operand = NOT_EQUAL;
|
||||
} else if ((expectedVal = GREATER_THAN.match(value)) != null) {
|
||||
operand = GREATER_THAN;
|
||||
} else if ((expectedVal = LESS_THAN.match(value)) != null) {
|
||||
operand = LESS_THAN;
|
||||
} else {
|
||||
operand = EQUAL;
|
||||
expectedVal = value;
|
||||
}
|
||||
|
||||
if (name.equals(REPLICA_PROP)) {
|
||||
if (!WILD_CARD.equals(expectedVal)) {
|
||||
try {
|
||||
expectedVal = Integer.parseInt(expectedVal.toString());
|
||||
} catch (NumberFormatException e) {
|
||||
throw new RuntimeException("The replica tag value can only be '*' or an integer");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
} catch (Exception e) {
|
||||
throw new IllegalArgumentException("Invalid condition : " + key + ":" + val, e);
|
||||
}
|
||||
this.val = expectedVal;
|
||||
this.fuzzy = fuzzy;
|
||||
|
||||
}
|
||||
|
||||
public boolean isWildCard() {
|
||||
return val.equals(WILD_CARD) || val.equals(WILD_WILD_CARD);
|
||||
}
|
||||
|
||||
boolean canMatch(Object testVal, Phase phase) {
|
||||
if (phase == Phase.FUZZY_ASSIGN || phase == Phase.FUZZY_VERIFY) return true;
|
||||
if (phase == Phase.ASSIGN) {
|
||||
if ((name.equals(REPLICA_PROP) || name.equals(CORES)) &&
|
||||
(operand == GREATER_THAN || operand == NOT_EQUAL)) {
|
||||
//the no:of replicas or cores will increase towards the end
|
||||
//so this should only be checked in the Phase.
|
||||
//process
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
return operand.canMatch(val, testVal);
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public boolean equals(Object obj) {
|
||||
if (obj instanceof Condition) {
|
||||
Condition that = (Condition) obj;
|
||||
return Objects.equals(name, that.name) &&
|
||||
Objects.equals(operand, that.operand) &&
|
||||
Objects.equals(val, that.val);
|
||||
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return name + ":" + operand.toStr(val) + (fuzzy ? "~" : "");
|
||||
}
|
||||
|
||||
public Integer getInt() {
|
||||
return (Integer) val;
|
||||
}
|
||||
|
||||
public int compare(String n1, String n2, Map<String, Map<String, Object>> nodeVsTags) {
|
||||
return isWildCard() ? 0 : operand.compare(nodeVsTags.get(n1).get(name), nodeVsTags.get(n2).get(name));
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
||||
|
||||
|
|
@ -0,0 +1,35 @@
|
|||
package org.apache.solr.cloud.rule;
|
||||
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
import java.util.Set;
|
||||
|
||||
import com.google.common.collect.ImmutableSet;
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
public abstract class Snitch {
|
||||
static Set<Class> WELL_KNOWN_SNITCHES = ImmutableSet.of(ImplicitSnitch.class);
|
||||
|
||||
|
||||
public abstract void getTags(String solrNode, Set<String> requestedTags, SnitchContext ctx);
|
||||
|
||||
public abstract boolean isKnownTag(String tag);
|
||||
|
||||
}
|
|
@ -0,0 +1,136 @@
|
|||
package org.apache.solr.cloud.rule;
|
||||
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.Future;
|
||||
|
||||
import org.apache.http.client.methods.HttpGet;
|
||||
import org.apache.solr.client.solrj.SolrRequest;
|
||||
import org.apache.solr.client.solrj.SolrServerException;
|
||||
import org.apache.solr.client.solrj.impl.BinaryResponseParser;
|
||||
import org.apache.solr.client.solrj.impl.HttpSolrClient;
|
||||
import org.apache.solr.client.solrj.request.GenericSolrRequest;
|
||||
import org.apache.solr.client.solrj.response.SimpleSolrResponse;
|
||||
import org.apache.solr.common.params.ModifiableSolrParams;
|
||||
import org.apache.solr.common.params.SolrParams;
|
||||
import org.apache.solr.common.util.NamedList;
|
||||
import org.apache.solr.core.CoreContainer;
|
||||
import org.apache.solr.update.UpdateShardHandler;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import static org.apache.solr.common.params.CoreAdminParams.ACTION;
|
||||
import static org.apache.solr.common.params.CoreAdminParams.CoreAdminAction.INVOKE;
|
||||
|
||||
/**
|
||||
* This is the context provided to the snitches to interact with the system. This is a per-node-per-snitch
|
||||
* instance.
|
||||
*/
|
||||
public class SnitchContext implements RemoteCallback {
|
||||
static final Logger log = LoggerFactory.getLogger(SnitchContext.class);
|
||||
private final Map<String, Object> tags = new HashMap<>();
|
||||
private String node;
|
||||
final SnitchInfo snitchInfo;
|
||||
Exception exception;
|
||||
|
||||
|
||||
SnitchContext(SnitchInfo perSnitch, String node) {
|
||||
this.snitchInfo = perSnitch;
|
||||
this.node = node;
|
||||
}
|
||||
|
||||
public SnitchInfo getSnitchInfo() {
|
||||
return snitchInfo;
|
||||
}
|
||||
|
||||
public Map<String, Object> getTags() {
|
||||
return tags;
|
||||
}
|
||||
|
||||
public String getNode() {
|
||||
return node;
|
||||
}
|
||||
|
||||
/**
|
||||
* make a call to solrnode/admin/cores with the given params and give a callback. This is designed to be
|
||||
* asynchronous because the system would want to batch the calls made to any given node
|
||||
*
|
||||
* @param node The node for which this call is made
|
||||
* @param params The params to be passed to the Snitch counterpart
|
||||
* @param klas The name of the class to be invoked in the remote node
|
||||
* @param callback The callback to be called when the response is obtained from remote node.
|
||||
* If this is passed as null the entire response map will be added as tags
|
||||
*/
|
||||
public void invokeRemote(String node, ModifiableSolrParams params, String klas, RemoteCallback callback) {
|
||||
if (callback == null) callback = this;
|
||||
String url = snitchInfo.getCoreContainer().getZkController().getZkStateReader().getBaseUrlForNodeName(node);
|
||||
params.add("class", klas);
|
||||
params.add(ACTION, INVOKE.toString());
|
||||
//todo batch all requests to the same server
|
||||
|
||||
try {
|
||||
SimpleSolrResponse rsp = invoke(snitchInfo.getCoreContainer().getUpdateShardHandler(), url, CoreContainer.CORES_HANDLER_PATH, params);
|
||||
Map<String, Object> returnedVal = (Map<String, Object>) rsp.getResponse().get(klas);
|
||||
if(exception == null){
|
||||
// log this
|
||||
} else {
|
||||
callback.remoteCallback(SnitchContext.this,returnedVal);
|
||||
}
|
||||
callback.remoteCallback(this, returnedVal);
|
||||
} catch (Exception e) {
|
||||
log.error("Unable to invoke snitch counterpart", e);
|
||||
exception = e;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
public SimpleSolrResponse invoke(UpdateShardHandler shardHandler, final String url, String path, SolrParams params)
|
||||
throws IOException, SolrServerException {
|
||||
GenericSolrRequest request = new GenericSolrRequest(SolrRequest.METHOD.GET, path, params);
|
||||
NamedList<Object> rsp = new HttpSolrClient(url, shardHandler.getHttpClient(), new BinaryResponseParser()).request(request);
|
||||
request.response.nl = rsp;
|
||||
return request.response;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void remoteCallback(SnitchContext ctx, Map<String, Object> returnedVal) {
|
||||
tags.putAll(returnedVal);
|
||||
}
|
||||
|
||||
public String getErrMsg() {
|
||||
return exception == null ? null : exception.getMessage();
|
||||
}
|
||||
|
||||
public static abstract class SnitchInfo {
|
||||
private final Map<String, Object> conf;
|
||||
|
||||
SnitchInfo(Map<String, Object> conf) {
|
||||
this.conf = conf;
|
||||
}
|
||||
|
||||
public abstract Set<String> getTagNames();
|
||||
|
||||
public abstract CoreContainer getCoreContainer();
|
||||
}
|
||||
}
|
|
@ -204,6 +204,10 @@ public class RequestParams implements MapSerializable {
|
|||
Object v = e.getValue();
|
||||
if (v instanceof Map && maxDepth > 0) {
|
||||
v = getDeepCopy((Map) v, maxDepth - 1);
|
||||
} else if (v instanceof Set) {
|
||||
v = new HashSet((Set) v);
|
||||
} else if (v instanceof List) {
|
||||
v = new ArrayList((List) v);
|
||||
}
|
||||
copy.put(e.getKey(), v);
|
||||
}
|
||||
|
|
|
@ -48,6 +48,7 @@ import org.apache.solr.cloud.Overseer;
|
|||
import org.apache.solr.cloud.OverseerCollectionProcessor;
|
||||
import org.apache.solr.cloud.OverseerSolrResponse;
|
||||
import org.apache.solr.cloud.overseer.SliceMutator;
|
||||
import org.apache.solr.cloud.rule.Rule;
|
||||
import org.apache.solr.common.SolrException;
|
||||
import org.apache.solr.common.SolrException.ErrorCode;
|
||||
import org.apache.solr.common.cloud.ClusterState;
|
||||
|
@ -803,6 +804,8 @@ public class CollectionsHandler extends RequestHandlerBase {
|
|||
if(props.get(DocCollection.STATE_FORMAT) == null){
|
||||
props.put(DocCollection.STATE_FORMAT,"2");
|
||||
}
|
||||
addRuleMap(req.getParams(), props, "rule");
|
||||
addRuleMap(req.getParams(), props, "snitch");
|
||||
|
||||
if(SYSTEM_COLL.equals(name)){
|
||||
//We must always create asystem collection with only a single shard
|
||||
|
@ -817,6 +820,15 @@ public class CollectionsHandler extends RequestHandlerBase {
|
|||
handleResponse(CREATE.toLower(), m, rsp);
|
||||
}
|
||||
|
||||
private void addRuleMap(SolrParams params, Map<String, Object> props, String key) {
|
||||
String[] rules = params.getParams(key);
|
||||
if(rules!= null && rules.length >0){
|
||||
ArrayList<Map> l = new ArrayList<>();
|
||||
for (String rule : rules) l.add(Rule.parseRule(rule));
|
||||
props.put(key, l);
|
||||
}
|
||||
}
|
||||
|
||||
private void createSysConfigSet() throws KeeperException, InterruptedException {
|
||||
SolrZkClient zk = coreContainer.getZkController().getZkStateReader().getZkClient();
|
||||
createNodeIfNotExists(zk,ZkStateReader.CONFIGS_ZKNODE, null);
|
||||
|
|
|
@ -67,6 +67,7 @@ import org.apache.solr.core.CoreDescriptor;
|
|||
import org.apache.solr.core.DirectoryFactory;
|
||||
import org.apache.solr.core.DirectoryFactory.DirContext;
|
||||
import org.apache.solr.core.SolrCore;
|
||||
import org.apache.solr.core.SolrResourceLoader;
|
||||
import org.apache.solr.handler.RequestHandlerBase;
|
||||
import org.apache.solr.request.LocalSolrQueryRequest;
|
||||
import org.apache.solr.request.SolrQueryRequest;
|
||||
|
@ -311,11 +312,40 @@ public class CoreAdminHandler extends RequestHandlerBase {
|
|||
log.warn("zkController is null in CoreAdminHandler.handleRequestInternal:REJOINLEADERELCTIONS. No action taken.");
|
||||
}
|
||||
break;
|
||||
case INVOKE:
|
||||
handleInvoke(req, rsp);
|
||||
break;
|
||||
}
|
||||
}
|
||||
rsp.setHttpCaching(false);
|
||||
}
|
||||
|
||||
public void handleInvoke(SolrQueryRequest req, SolrQueryResponse rsp) throws Exception {
|
||||
String[] klas = req.getParams().getParams("class");
|
||||
if (klas == null || klas.length == 0) {
|
||||
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "class is a required param");
|
||||
}
|
||||
for (String c : klas) {
|
||||
Map<String, Object> result = invokeAClass(req, c);
|
||||
rsp.add(c, result);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private Map<String, Object> invokeAClass(SolrQueryRequest req, String c) {
|
||||
SolrResourceLoader loader = null;
|
||||
if (req.getCore() != null) loader = req.getCore().getResourceLoader();
|
||||
else if (req.getContext().get(CoreContainer.class.getName()) != null) {
|
||||
CoreContainer cc = (CoreContainer) req.getContext().get(CoreContainer.class.getName());
|
||||
loader = cc.getResourceLoader();
|
||||
}
|
||||
|
||||
Invocable invokable = loader.newInstance(c, Invocable.class);
|
||||
Map<String, Object> result = invokable.invoke(req);
|
||||
log.info("Invocable_invoked {}", result);
|
||||
return result;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Handle the core admin SPLIT action.
|
||||
|
@ -1315,4 +1345,11 @@ public class CoreAdminHandler extends RequestHandlerBase {
|
|||
if (parallelExecutor != null && !parallelExecutor.isShutdown())
|
||||
ExecutorUtil.shutdownAndAwaitTermination(parallelExecutor);
|
||||
}
|
||||
|
||||
/**
|
||||
* used by the INVOKE action of core admin handler
|
||||
*/
|
||||
public static interface Invocable {
|
||||
public Map<String, Object> invoke(SolrQueryRequest req);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -286,6 +286,7 @@ public class SolrDispatchFilter extends BaseSolrFilter {
|
|||
handler = cores.getRequestHandler(path);
|
||||
if (handler != null) {
|
||||
solrReq = SolrRequestParsers.DEFAULT.parse(null, path, req);
|
||||
solrReq.getContext().put(CoreContainer.class.getName(), cores);
|
||||
handleAdminRequest(req, response, handler, solrReq);
|
||||
return;
|
||||
}
|
||||
|
|
|
@ -0,0 +1,251 @@
|
|||
package org.apache.solr.cloud.rule;
|
||||
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.solr.SolrTestCaseJ4;
|
||||
import org.apache.solr.cloud.rule.ReplicaAssigner.Position;
|
||||
import org.apache.solr.common.cloud.ZkStateReader;
|
||||
import org.junit.Test;
|
||||
|
||||
import static org.apache.solr.cloud.rule.Rule.parseRule;
|
||||
import static org.apache.solr.common.cloud.ZkNodeProps.makeMap;
|
||||
|
||||
public class RuleEngineTest extends SolrTestCaseJ4{
|
||||
@Test
|
||||
public void testPlacement2(){
|
||||
|
||||
|
||||
String s = "{" +
|
||||
" '127.0.0.1:49961_':{" +
|
||||
" 'node':'127.0.0.1:49961_'," +
|
||||
" 'disk':992," +
|
||||
" 'cores':1}," +
|
||||
" '127.0.0.1:49955_':{" +
|
||||
" 'node':'127.0.0.1:49955_'," +
|
||||
" 'disk':992," +
|
||||
" 'cores':1}," +
|
||||
" '127.0.0.1:49952_':{" +
|
||||
" 'node':'127.0.0.1:49952_'," +
|
||||
" 'disk':992," +
|
||||
" 'cores':1}," +
|
||||
" '127.0.0.1:49947_':{" +
|
||||
" 'node':'127.0.0.1:49947_'," +
|
||||
" 'disk':992," +
|
||||
" 'cores':1}," +
|
||||
" '127.0.0.1:49958_':{" +
|
||||
" 'node':'127.0.0.1:49958_'," +
|
||||
" 'disk':992," +
|
||||
" 'cores':1}}";
|
||||
MockSnitch.nodeVsTags = (Map) ZkStateReader.fromJSON(s.getBytes());
|
||||
Map shardVsReplicaCount = makeMap("shard1", 2, "shard2", 2);
|
||||
|
||||
List<Rule> rules = parseRules("[{'cores':'<4'}, {" +
|
||||
"'replica':'1',shard:'*','node':'*'}," +
|
||||
" {'disk':'>1'}]");
|
||||
|
||||
Map<Position, String> mapping = new ReplicaAssigner(
|
||||
rules,
|
||||
shardVsReplicaCount, Collections.singletonList(MockSnitch.class.getName()),
|
||||
new HashMap(), new ArrayList<>(MockSnitch.nodeVsTags.keySet()), null, null ).getNodeMappings();
|
||||
assertNotNull(mapping);
|
||||
|
||||
mapping = new ReplicaAssigner(
|
||||
rules,
|
||||
shardVsReplicaCount, Collections.singletonList(MockSnitch.class.getName()),
|
||||
new HashMap(), new ArrayList<>(MockSnitch.nodeVsTags.keySet()), null, null ).getNodeMappings();
|
||||
assertNotNull(mapping);
|
||||
|
||||
|
||||
|
||||
}
|
||||
|
||||
public void testPlacement3(){
|
||||
String s = "{" +
|
||||
" '127.0.0.1:49961_':{" +
|
||||
" 'node':'127.0.0.1:49961_'," +
|
||||
" 'disk':992," +
|
||||
" 'cores':1}," +
|
||||
" '127.0.0.2:49955_':{" +
|
||||
" 'node':'127.0.0.1:49955_'," +
|
||||
" 'disk':995," +
|
||||
" 'cores':1}," +
|
||||
" '127.0.0.3:49952_':{" +
|
||||
" 'node':'127.0.0.1:49952_'," +
|
||||
" 'disk':990," +
|
||||
" 'cores':1}," +
|
||||
" '127.0.0.1:49947_':{" +
|
||||
" 'node':'127.0.0.1:49947_'," +
|
||||
" 'disk':980," +
|
||||
" 'cores':1}," +
|
||||
" '127.0.0.2:49958_':{" +
|
||||
" 'node':'127.0.0.1:49958_'," +
|
||||
" 'disk':970," +
|
||||
" 'cores':1}}";
|
||||
MockSnitch.nodeVsTags = (Map) ZkStateReader.fromJSON(s.getBytes());
|
||||
//test not
|
||||
List<Rule> rules = parseRules(
|
||||
"[{cores:'<4'}, " +
|
||||
"{replica:'1',shard:'*',node:'*'}," +
|
||||
"{node:'!127.0.0.1:49947_'}," +
|
||||
"{disk:'>1'}]");
|
||||
Map shardVsReplicaCount = makeMap("shard1", 2, "shard2", 2);
|
||||
Map<Position, String> mapping = new ReplicaAssigner(
|
||||
rules,
|
||||
shardVsReplicaCount, Collections.singletonList(MockSnitch.class.getName()),
|
||||
new HashMap(), new ArrayList<>(MockSnitch.nodeVsTags.keySet()), null, null).getNodeMappings();
|
||||
assertNotNull(mapping);
|
||||
assertFalse(mapping.containsValue("127.0.0.1:49947_"));
|
||||
|
||||
rules = parseRules(
|
||||
"[{cores:'<4'}, " +
|
||||
"{replica:'1',node:'*'}," +
|
||||
"{disk:'>980'}]");
|
||||
shardVsReplicaCount = makeMap("shard1", 2, "shard2", 2);
|
||||
mapping = new ReplicaAssigner(
|
||||
rules,
|
||||
shardVsReplicaCount, Collections.singletonList(MockSnitch.class.getName()),
|
||||
new HashMap(), new ArrayList<>(MockSnitch.nodeVsTags.keySet()), null, null).getNodeMappings();
|
||||
assertNull(mapping);
|
||||
|
||||
|
||||
rules = parseRules(
|
||||
"[{cores:'<4'}, " +
|
||||
"{replica:'1',node:'*'}," +
|
||||
"{disk:'>980~'}]");
|
||||
shardVsReplicaCount = makeMap("shard1", 2, "shard2", 2);
|
||||
mapping = new ReplicaAssigner(
|
||||
rules,
|
||||
shardVsReplicaCount, Collections.singletonList(MockSnitch.class.getName()),
|
||||
new HashMap(), new ArrayList<>(MockSnitch.nodeVsTags.keySet()), null, null).getNodeMappings();
|
||||
assertNotNull(mapping);
|
||||
assertFalse(mapping.containsValue("127.0.0.2:49958_"));
|
||||
|
||||
rules = parseRules(
|
||||
"[{cores:'<4'}, " +
|
||||
"{replica:'1',shard:'*',host:'*'}]"
|
||||
);
|
||||
shardVsReplicaCount = makeMap("shard1", 2, "shard2", 2);
|
||||
mapping = new ReplicaAssigner(
|
||||
rules,
|
||||
shardVsReplicaCount, Collections.singletonList(MockSnitch.class.getName()),
|
||||
new HashMap(), new ArrayList<>(MockSnitch.nodeVsTags.keySet()), null, null).getNodeMappings();
|
||||
assertNotNull(mapping);
|
||||
|
||||
rules = parseRules(
|
||||
"[{cores:'<4'}, " +
|
||||
"{replica:'1',shard:'**',host:'*'}]"
|
||||
);
|
||||
shardVsReplicaCount = makeMap("shard1", 2, "shard2", 2);
|
||||
mapping = new ReplicaAssigner(
|
||||
rules,
|
||||
shardVsReplicaCount, Collections.singletonList(MockSnitch.class.getName()),
|
||||
new HashMap(), new ArrayList<>(MockSnitch.nodeVsTags.keySet()), null, null).getNodeMappings();
|
||||
assertNull(mapping);
|
||||
|
||||
rules = parseRules(
|
||||
"[{cores:'<4'}, " +
|
||||
"{replica:'1~',shard:'**',host:'*'}]"
|
||||
);
|
||||
shardVsReplicaCount = makeMap("shard1", 2, "shard2", 2);
|
||||
mapping = new ReplicaAssigner(
|
||||
rules,
|
||||
shardVsReplicaCount, Collections.singletonList(MockSnitch.class.getName()),
|
||||
new HashMap(), new ArrayList<>(MockSnitch.nodeVsTags.keySet()), null, null).getNodeMappings();
|
||||
assertNotNull(mapping);
|
||||
|
||||
|
||||
}
|
||||
|
||||
private List<Rule> parseRules(String s) {
|
||||
|
||||
List maps = (List) ZkStateReader.fromJSON(s.getBytes());
|
||||
|
||||
List<Rule> rules = new ArrayList<>();
|
||||
for (Object map : maps) rules.add(new Rule((Map) map));
|
||||
return rules;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPlacement() throws Exception {
|
||||
String rulesStr = "rack:*,replica:<2";
|
||||
List<Rule> rules = parse(Arrays.asList(rulesStr));
|
||||
Map shardVsReplicaCount = makeMap("shard1", 3, "shard2", 3);
|
||||
Map nodeVsTags = makeMap(
|
||||
"node1:80", makeMap("rack", "178"),
|
||||
"node2:80", makeMap("rack", "179"),
|
||||
"node3:80", makeMap("rack", "180"),
|
||||
"node4:80", makeMap("rack", "181"),
|
||||
"node5:80", makeMap("rack", "182")
|
||||
);
|
||||
MockSnitch.nodeVsTags = nodeVsTags;
|
||||
Map<Position, String> mapping = new ReplicaAssigner(
|
||||
rules,
|
||||
shardVsReplicaCount, Collections.singletonList(MockSnitch.class.getName()),
|
||||
new HashMap(), new ArrayList<>(MockSnitch.nodeVsTags.keySet()), null ,null).getNodeMappings();
|
||||
assertNull(mapping);
|
||||
rulesStr = "rack:*,replica:<2~";
|
||||
rules = parse(Arrays.asList(rulesStr));
|
||||
mapping = new ReplicaAssigner(
|
||||
rules,
|
||||
shardVsReplicaCount, Collections.singletonList(MockSnitch.class.getName()),
|
||||
new HashMap(), new ArrayList<>(MockSnitch.nodeVsTags.keySet()), null ,null).getNodeMappings();
|
||||
assertNotNull(mapping);
|
||||
|
||||
rulesStr = "rack:*,shard:*,replica:<2";//for each shard there can be a max of 1 replica
|
||||
rules = parse(Arrays.asList(rulesStr));
|
||||
mapping = new ReplicaAssigner(
|
||||
rules,
|
||||
shardVsReplicaCount, Collections.singletonList(MockSnitch.class.getName()),
|
||||
new HashMap(), new ArrayList<>(MockSnitch.nodeVsTags.keySet()), null,null ).getNodeMappings();
|
||||
assertNotNull(mapping);
|
||||
}
|
||||
|
||||
public static class MockSnitch extends Snitch {
|
||||
static Map nodeVsTags = Collections.emptyMap();
|
||||
|
||||
@Override
|
||||
public void getTags(String solrNode, Set<String> requestedTags, SnitchContext ctx) {
|
||||
ctx.getTags().putAll((Map<? extends String, ?>) nodeVsTags.get(solrNode));
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isKnownTag(String tag) {
|
||||
Map next = (Map) nodeVsTags.values().iterator().next();
|
||||
return next.containsKey(tag);
|
||||
}
|
||||
}
|
||||
|
||||
public static List<Rule> parse(List<String> rules) throws IOException {
|
||||
assert rules != null && !rules.isEmpty();
|
||||
ArrayList<Rule> result = new ArrayList<>();
|
||||
for (String s : rules) {
|
||||
if (s == null || s.trim().isEmpty()) continue;
|
||||
result.add(new Rule(parseRule(s)));
|
||||
}
|
||||
return result;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,67 @@
|
|||
package org.apache.solr.cloud.rule;
|
||||
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.solr.client.solrj.SolrClient;
|
||||
import org.apache.solr.client.solrj.impl.HttpSolrClient;
|
||||
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
|
||||
import org.apache.solr.client.solrj.response.CollectionAdminResponse;
|
||||
import org.apache.solr.cloud.AbstractFullDistribZkTestBase;
|
||||
import org.apache.solr.common.cloud.DocCollection;
|
||||
import org.junit.Test;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
public class RulesTest extends AbstractFullDistribZkTestBase {
|
||||
static final Logger log = LoggerFactory.getLogger(RulesTest.class);
|
||||
|
||||
@Test
|
||||
public void doIntegrationTest() throws Exception {
|
||||
String rulesColl = "rulesColl";
|
||||
try (SolrClient client = createNewSolrClient("", getBaseUrl((HttpSolrClient) clients.get(0)))) {
|
||||
CollectionAdminResponse rsp;
|
||||
CollectionAdminRequest.Create create = new CollectionAdminRequest.Create();
|
||||
create.setCollectionName(rulesColl);
|
||||
create.setNumShards(1);
|
||||
create.setReplicationFactor(2);
|
||||
create.setRule("cores:<4", "node:*,replica:1", "disk:>1");
|
||||
create.setSnitch("class:ImplicitSnitch");
|
||||
rsp = create.process(client);
|
||||
assertEquals(0, rsp.getStatus());
|
||||
assertTrue(rsp.isSuccess());
|
||||
|
||||
}
|
||||
|
||||
DocCollection rulesCollection = cloudClient.getZkStateReader().getClusterState().getCollection(rulesColl);
|
||||
List list = (List) rulesCollection.get("rule");
|
||||
assertEquals(3, list.size());
|
||||
assertEquals ( "<4", ((Map)list.get(0)).get("cores"));
|
||||
assertEquals("1", ((Map) list.get(1)).get("replica"));
|
||||
assertEquals(">1", ((Map) list.get(2)).get("disk"));
|
||||
list = (List) rulesCollection.get("snitch");
|
||||
assertEquals(1, list.size());
|
||||
assertEquals ( "ImplicitSnitch", ((Map)list.get(0)).get("class"));
|
||||
|
||||
}
|
||||
|
||||
|
||||
|
||||
}
|
|
@ -190,7 +190,7 @@ public class CollectionAdminRequest extends SolrRequest<CollectionAdminResponse>
|
|||
protected Boolean autoAddReplicas;
|
||||
protected Integer stateFormat;
|
||||
protected String asyncId;
|
||||
|
||||
private String[] rule , snitch;
|
||||
public Create() {
|
||||
action = CollectionAction.CREATE;
|
||||
}
|
||||
|
@ -208,6 +208,8 @@ public class CollectionAdminRequest extends SolrRequest<CollectionAdminResponse>
|
|||
public void setAsyncId(String asyncId) {
|
||||
this.asyncId = asyncId;
|
||||
}
|
||||
public void setRule(String... s){ this.rule = s; }
|
||||
public void setSnitch(String... s){ this.snitch = s; }
|
||||
|
||||
public String getConfigName() { return configName; }
|
||||
public String getCreateNodeSet() { return createNodeSet; }
|
||||
|
@ -260,6 +262,8 @@ public class CollectionAdminRequest extends SolrRequest<CollectionAdminResponse>
|
|||
if (stateFormat != null) {
|
||||
params.set(DocCollection.STATE_FORMAT, stateFormat);
|
||||
}
|
||||
if(rule != null) params.set("rule", rule);
|
||||
if(snitch != null) params.set("snitch", snitch);
|
||||
return params;
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,58 @@
|
|||
package org.apache.solr.client.solrj.request;
|
||||
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collection;
|
||||
|
||||
import org.apache.solr.client.solrj.SolrClient;
|
||||
import org.apache.solr.client.solrj.SolrRequest;
|
||||
import org.apache.solr.client.solrj.response.SimpleSolrResponse;
|
||||
import org.apache.solr.common.params.SolrParams;
|
||||
import org.apache.solr.common.util.ContentStream;
|
||||
|
||||
public class GenericSolrRequest extends SolrRequest<SimpleSolrResponse> {
|
||||
public SolrParams params;
|
||||
public SimpleSolrResponse response = new SimpleSolrResponse();
|
||||
private Collection<ContentStream> contentStreams;
|
||||
|
||||
public GenericSolrRequest(METHOD m, String path, SolrParams params) {
|
||||
super(m, path);
|
||||
this.params = params;
|
||||
}
|
||||
|
||||
public void setContentStreams(Collection<ContentStream> streams) {
|
||||
contentStreams = streams;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public SolrParams getParams() {
|
||||
return params;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Collection<ContentStream> getContentStreams() throws IOException {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected SimpleSolrResponse createResponse(SolrClient client) {
|
||||
return response;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,49 @@
|
|||
package org.apache.solr.client.solrj.response;
|
||||
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
|
||||
import org.apache.solr.client.solrj.SolrResponse;
|
||||
import org.apache.solr.common.util.NamedList;
|
||||
|
||||
public class SimpleSolrResponse extends SolrResponse {
|
||||
|
||||
public long elapsedTime;
|
||||
|
||||
public NamedList<Object> nl;
|
||||
|
||||
@Override
|
||||
public long getElapsedTime() {
|
||||
return elapsedTime;
|
||||
}
|
||||
|
||||
@Override
|
||||
public NamedList<Object> getResponse() {
|
||||
return nl;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setResponse(NamedList<Object> rsp) {
|
||||
nl = rsp;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setElapsedTime(long elapsedTime) {
|
||||
this.elapsedTime = elapsedTime;
|
||||
}
|
||||
}
|
|
@ -37,6 +37,7 @@ import java.io.Closeable;
|
|||
import java.io.IOException;
|
||||
import java.io.UnsupportedEncodingException;
|
||||
import java.net.URLDecoder;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
|
@ -131,6 +132,10 @@ public class ZkStateReader implements Closeable {
|
|||
return toUTF8(out);
|
||||
}
|
||||
|
||||
public static String toJSONString(Object o) {
|
||||
return new String(toJSON(o), StandardCharsets.UTF_8);
|
||||
}
|
||||
|
||||
public static byte[] toUTF8(CharArr out) {
|
||||
byte[] arr = new byte[out.size() << 2]; // is 4x the real worst-case upper-bound?
|
||||
int nBytes = ByteUtils.UTF16toUTF8(out, 0, out.size(), arr, 0);
|
||||
|
|
|
@ -138,7 +138,8 @@ public abstract class CoreAdminParams
|
|||
TRANSIENT,
|
||||
OVERSEEROP,
|
||||
REQUESTSTATUS,
|
||||
REJOINLEADERELECTION;
|
||||
REJOINLEADERELECTION,
|
||||
INVOKE;
|
||||
|
||||
public static CoreAdminAction get( String p )
|
||||
{
|
||||
|
|
|
@ -57,6 +57,10 @@ public class ModifiableSolrParams extends SolrParams
|
|||
}
|
||||
}
|
||||
|
||||
public int size() {
|
||||
return vals == null ? 0 : vals.size();
|
||||
}
|
||||
|
||||
public Map<String,String[]> getMap() {
|
||||
return vals;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue