From 7a771c84a2605b3da7cd0521b3524a4cc6990cf2 Mon Sep 17 00:00:00 2001 From: Noble Paul Date: Mon, 4 May 2015 14:40:05 +0000 Subject: [PATCH] SOLR-6220: Rule Based Replica Assignment during collection creation git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1677607 13f79535-47bb-0310-9956-ffa450edef68 --- solr/CHANGES.txt | 2 + .../java/org/apache/solr/cloud/Overseer.java | 6 +- .../cloud/OverseerCollectionProcessor.java | 216 ++++++--- .../solr/cloud/rule/ImplicitSnitch.java | 92 ++++ .../solr/cloud/rule/RemoteCallback.java | 24 + .../solr/cloud/rule/ReplicaAssigner.java | 451 ++++++++++++++++++ .../java/org/apache/solr/cloud/rule/Rule.java | 380 +++++++++++++++ .../org/apache/solr/cloud/rule/Snitch.java | 35 ++ .../apache/solr/cloud/rule/SnitchContext.java | 136 ++++++ .../org/apache/solr/core/RequestParams.java | 4 + .../handler/admin/CollectionsHandler.java | 12 + .../solr/handler/admin/CoreAdminHandler.java | 37 ++ .../solr/servlet/SolrDispatchFilter.java | 1 + .../solr/cloud/rule/RuleEngineTest.java | 251 ++++++++++ .../org/apache/solr/cloud/rule/RulesTest.java | 67 +++ .../solrj/request/CollectionAdminRequest.java | 6 +- .../solrj/request/GenericSolrRequest.java | 58 +++ .../solrj/response/SimpleSolrResponse.java | 49 ++ .../solr/common/cloud/ZkStateReader.java | 5 + .../solr/common/params/CoreAdminParams.java | 5 +- .../common/params/ModifiableSolrParams.java | 4 + 21 files changed, 1759 insertions(+), 82 deletions(-) create mode 100644 solr/core/src/java/org/apache/solr/cloud/rule/ImplicitSnitch.java create mode 100644 solr/core/src/java/org/apache/solr/cloud/rule/RemoteCallback.java create mode 100644 solr/core/src/java/org/apache/solr/cloud/rule/ReplicaAssigner.java create mode 100644 solr/core/src/java/org/apache/solr/cloud/rule/Rule.java create mode 100644 solr/core/src/java/org/apache/solr/cloud/rule/Snitch.java create mode 100644 solr/core/src/java/org/apache/solr/cloud/rule/SnitchContext.java create mode 100644 solr/core/src/test/org/apache/solr/cloud/rule/RuleEngineTest.java create mode 100644 solr/core/src/test/org/apache/solr/cloud/rule/RulesTest.java create mode 100644 solr/solrj/src/java/org/apache/solr/client/solrj/request/GenericSolrRequest.java create mode 100644 solr/solrj/src/java/org/apache/solr/client/solrj/response/SimpleSolrResponse.java diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt index 2d9b9b809e0..66036facac4 100644 --- a/solr/CHANGES.txt +++ b/solr/CHANGES.txt @@ -163,6 +163,8 @@ New Features * SOLR-7231: DIH-TikaEntityprocessor, create lat-lon field from Metadata (Tim Allison via Noble Paul) +* SOLR-6220: Rule Based Replica Assignment during collection creation (Noble Paul) + Bug Fixes ---------------------- diff --git a/solr/core/src/java/org/apache/solr/cloud/Overseer.java b/solr/core/src/java/org/apache/solr/cloud/Overseer.java index 63c3b1d11ce..e71b90cce17 100644 --- a/solr/core/src/java/org/apache/solr/cloud/Overseer.java +++ b/solr/core/src/java/org/apache/solr/cloud/Overseer.java @@ -811,7 +811,7 @@ public class Overseer implements Closeable { ThreadGroup ccTg = new ThreadGroup("Overseer collection creation process."); - overseerCollectionProcessor = new OverseerCollectionProcessor(reader, id, shardHandler, adminPath, stats); + overseerCollectionProcessor = new OverseerCollectionProcessor(reader, id, shardHandler, adminPath, stats, Overseer.this); ccThread = new OverseerThread(ccTg, overseerCollectionProcessor, "OverseerCollectionProcessor-" + id); ccThread.setDaemon(true); @@ -829,6 +829,10 @@ public class Overseer implements Closeable { public Stats getStats() { return stats; } + + ZkController getZkController(){ + return zkController; + } /** * For tests. diff --git a/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java b/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java index 50171b7f3a6..14c21e53f1c 100644 --- a/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java +++ b/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java @@ -21,6 +21,7 @@ import static org.apache.solr.cloud.Assign.*; import static org.apache.solr.common.cloud.ZkStateReader.*; import static org.apache.solr.common.params.CollectionParams.CollectionAction.*; import static org.apache.solr.common.params.CommonParams.*; +import static org.apache.solr.common.util.StrUtils.formatString; import java.io.Closeable; import java.io.IOException; @@ -53,8 +54,12 @@ import org.apache.solr.client.solrj.response.UpdateResponse; import org.apache.solr.cloud.Assign.Node; import org.apache.solr.cloud.DistributedQueue.QueueEvent; import org.apache.solr.cloud.Overseer.LeaderStatus; +import org.apache.solr.cloud.rule.Rule; +import org.apache.solr.cloud.rule.ReplicaAssigner; +import org.apache.solr.cloud.rule.ReplicaAssigner.Position; import org.apache.solr.cloud.overseer.ClusterStateMutator; import org.apache.solr.cloud.overseer.OverseerAction; +import org.apache.solr.cloud.rule.SnitchContext; import org.apache.solr.common.SolrException; import org.apache.solr.common.SolrException.ErrorCode; import org.apache.solr.common.cloud.Aliases; @@ -103,13 +108,13 @@ import com.google.common.collect.ImmutableSet; public class OverseerCollectionProcessor implements Runnable, Closeable { - + public static final String NUM_SLICES = "numShards"; - + static final boolean CREATE_NODE_SET_SHUFFLE_DEFAULT = true; public static final String CREATE_NODE_SET_SHUFFLE = "createNodeSet.shuffle"; public static final String CREATE_NODE_SET = "createNodeSet"; - + public static final String ROUTER = "router"; public static final String SHARDS_PROP = "shards"; @@ -134,7 +139,9 @@ public class OverseerCollectionProcessor implements Runnable, Closeable { ROUTER, DocRouter.DEFAULT_NAME, ZkStateReader.REPLICATION_FACTOR, "1", ZkStateReader.MAX_SHARDS_PER_NODE, "1", - ZkStateReader.AUTO_ADD_REPLICAS, "false"); + ZkStateReader.AUTO_ADD_REPLICAS, "false", + "rule", null, + "snitch",null); static final Random RANDOM; static { @@ -149,10 +156,10 @@ public class OverseerCollectionProcessor implements Runnable, Closeable { } public ExecutorService tpe ; - + private static Logger log = LoggerFactory .getLogger(OverseerCollectionProcessor.class); - + private DistributedQueue workQueue; private DistributedMap runningMap; private DistributedMap completedMap; @@ -185,13 +192,15 @@ public class OverseerCollectionProcessor implements Runnable, Closeable { // deleted from the work-queue as that is a batched operation. final private Set runningZKTasks; private final Object waitLock = new Object(); + private Overseer overseer; public OverseerCollectionProcessor(ZkStateReader zkStateReader, String myId, final ShardHandler shardHandler, - String adminPath, Overseer.Stats stats) { + String adminPath, Overseer.Stats stats, Overseer overseer) { this(zkStateReader, myId, shardHandler.getShardHandlerFactory(), adminPath, stats, Overseer.getCollectionQueue(zkStateReader.getZkClient(), stats), Overseer.getRunningMap(zkStateReader.getZkClient()), Overseer.getCompletedMap(zkStateReader.getZkClient()), Overseer.getFailureMap(zkStateReader.getZkClient())); + this.overseer = overseer; } protected OverseerCollectionProcessor(ZkStateReader zkStateReader, String myId, @@ -216,7 +225,7 @@ public class OverseerCollectionProcessor implements Runnable, Closeable { this.collectionWip = new HashSet(); this.completedTasks = new HashMap<>(); } - + @Override public void run() { log.info("Process current queue of collection creations"); @@ -1113,12 +1122,12 @@ public class OverseerCollectionProcessor implements Runnable, Closeable { params.set(CoreAdminParams.DELETE_DATA_DIR, true); collectionCmd(zkStateReader.getClusterState(), message, params, results, null); - + ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, DELETE.toLower(), NAME, collection); Overseer.getInQueue(zkStateReader.getZkClient()).offer( ZkStateReader.toJSON(m)); - + // wait for a while until we don't see the collection long now = System.nanoTime(); long timeout = now + TimeUnit.NANOSECONDS.convert(30, TimeUnit.SECONDS); @@ -1136,9 +1145,9 @@ public class OverseerCollectionProcessor implements Runnable, Closeable { throw new SolrException(ErrorCode.SERVER_ERROR, "Could not fully remove collection: " + collection); } - + } finally { - + try { if (zkStateReader.getZkClient().exists( ZkStateReader.COLLECTIONS_ZKNODE + "/" + collection, true)) { @@ -1196,7 +1205,7 @@ public class OverseerCollectionProcessor implements Runnable, Closeable { } } - + private void checkForAlias(String name, String value) { long now = System.nanoTime(); @@ -1215,7 +1224,7 @@ public class OverseerCollectionProcessor implements Runnable, Closeable { log.warn("Timeout waiting to be notified of Alias change..."); } } - + private void checkForAliasAbsence(String name) { long now = System.nanoTime(); @@ -1265,7 +1274,7 @@ public class OverseerCollectionProcessor implements Runnable, Closeable { } finally { MDCUtils.cleanupMDC(previousMDCContext); } - + } private boolean createShard(ClusterState clusterState, ZkNodeProps message, NamedList results) @@ -1773,21 +1782,21 @@ public class OverseerCollectionProcessor implements Runnable, Closeable { return ureq.process(client); } } - + private String waitForCoreNodeName(String collectionName, String msgNodeName, String msgCore) { int retryCount = 320; while (retryCount-- > 0) { Map slicesMap = zkStateReader.getClusterState() .getSlicesMap(collectionName); if (slicesMap != null) { - + for (Slice slice : slicesMap.values()) { for (Replica replica : slice.getReplicas()) { // TODO: for really large clusters, we could 'index' on this - + String nodeName = replica.getStr(ZkStateReader.NODE_NAME_PROP); String core = replica.getStr(ZkStateReader.CORE_NAME_PROP); - + if (nodeName.equals(msgNodeName) && core.equals(msgCore)) { return replica.getName(); } @@ -2263,10 +2272,10 @@ public class OverseerCollectionProcessor implements Runnable, Closeable { nodeList = new ArrayList<>(liveNodes); Collections.shuffle(nodeList, random); } - - return nodeList; + + return nodeList; } - + private void createCollection(ClusterState clusterState, ZkNodeProps message, NamedList results) throws KeeperException, InterruptedException { String collectionName = message.getStr(NAME); if (clusterState.hasCollection(collectionName)) { @@ -2279,11 +2288,11 @@ public class OverseerCollectionProcessor implements Runnable, Closeable { } else if (!validateConfig(configName)) { throw new SolrException(ErrorCode.BAD_REQUEST, "Can not find the specified config set: " + configName); } - + try { // look at the replication factor and see if it matches reality // if it does not, find best nodes to create more cores - + int repFactor = message.getInt(ZkStateReader.REPLICATION_FACTOR, 1); ShardHandler shardHandler = shardHandlerFactory.getShardHandler(); @@ -2304,21 +2313,21 @@ public class OverseerCollectionProcessor implements Runnable, Closeable { } int maxShardsPerNode = message.getInt(ZkStateReader.MAX_SHARDS_PER_NODE, 1); - + if (repFactor <= 0) { throw new SolrException(ErrorCode.BAD_REQUEST, ZkStateReader.REPLICATION_FACTOR + " must be greater than 0"); } - + if (numSlices <= 0) { throw new SolrException(ErrorCode.BAD_REQUEST, NUM_SLICES + " must be > 0"); } - + // we need to look at every node and see how many cores it serves // add our new cores to existing nodes serving the least number of cores // but (for now) require that each core goes on a distinct node. - + final List nodeList = getLiveOrLiveAndCreateNodeSetList(clusterState.getLiveNodes(), message, RANDOM); - + if (repFactor > nodeList.size()) { log.warn("Specified " + ZkStateReader.REPLICATION_FACTOR @@ -2330,7 +2339,7 @@ public class OverseerCollectionProcessor implements Runnable, Closeable { + nodeList.size() + "). It's unusual to run two replica of the same slice on the same Solr-instance."); } - + int maxShardsAllowedToCreate = maxShardsPerNode * nodeList.size(); int requestedShardsToCreate = numSlices * repFactor; if (maxShardsAllowedToCreate < requestedShardsToCreate) { @@ -2343,6 +2352,8 @@ public class OverseerCollectionProcessor implements Runnable, Closeable { + ". This requires " + requestedShardsToCreate + " shards to be created (higher than the allowed number)"); } + + Map positionVsNodes = identifyNodes(clusterState, nodeList, message, shardNames, repFactor); boolean isLegacyCloud = Overseer.isLegacy(zkStateReader.getClusterProps()); createConfNode(configName, collectionName, isLegacyCloud); @@ -2363,62 +2374,60 @@ public class OverseerCollectionProcessor implements Runnable, Closeable { // For tracking async calls. HashMap requestMap = new HashMap(); - log.info("Creating SolrCores for new collection {}, shardNames {} , replicationFactor : {}", - collectionName, shardNames, repFactor); + + log.info(formatString("Creating SolrCores for new collection {0}, shardNames {1} , replicationFactor : {2}", + collectionName, shardNames, repFactor)); Map coresToCreate = new LinkedHashMap<>(); - for (int i = 1; i <= shardNames.size(); i++) { - String sliceName = shardNames.get(i-1); - for (int j = 1; j <= repFactor; j++) { - String nodeName = nodeList.get((repFactor * (i - 1) + (j - 1)) % nodeList.size()); - String coreName = collectionName + "_" + sliceName + "_replica" + j; - log.info("Creating shard " + coreName + " as part of slice " - + sliceName + " of collection " + collectionName + " on " - + nodeName); + for (Map.Entry e : positionVsNodes.entrySet()) { + Position position = e.getKey(); + String nodeName = e.getValue(); + String coreName = collectionName + "_" + position.shard + "_replica" + (position.index + 1); + log.info(formatString("Creating core {0} as part of shard {1} of collection {2} on {3}" + , coreName, position.shard, collectionName, nodeName)); - String baseUrl = zkStateReader.getBaseUrlForNodeName(nodeName); - //in the new mode, create the replica in clusterstate prior to creating the core. - // Otherwise the core creation fails - if(!isLegacyCloud){ - ZkNodeProps props = new ZkNodeProps( - Overseer.QUEUE_OPERATION, ADDREPLICA.toString(), - ZkStateReader.COLLECTION_PROP, collectionName, - ZkStateReader.SHARD_ID_PROP, sliceName, - ZkStateReader.CORE_NAME_PROP, coreName, - ZkStateReader.STATE_PROP, Replica.State.DOWN.toString(), - ZkStateReader.BASE_URL_PROP,baseUrl); - Overseer.getInQueue(zkStateReader.getZkClient()).offer(ZkStateReader.toJSON(props)); - } + String baseUrl = zkStateReader.getBaseUrlForNodeName(nodeName); + //in the new mode, create the replica in clusterstate prior to creating the core. + // Otherwise the core creation fails + if (!isLegacyCloud) { + ZkNodeProps props = new ZkNodeProps( + Overseer.QUEUE_OPERATION, ADDREPLICA.toString(), + ZkStateReader.COLLECTION_PROP, collectionName, + ZkStateReader.SHARD_ID_PROP, position.shard, + ZkStateReader.CORE_NAME_PROP, coreName, + ZkStateReader.STATE_PROP, Replica.State.DOWN.toString(), + ZkStateReader.BASE_URL_PROP, baseUrl); + Overseer.getInQueue(zkStateReader.getZkClient()).offer(ZkStateReader.toJSON(props)); + } - // Need to create new params for each request - ModifiableSolrParams params = new ModifiableSolrParams(); - params.set(CoreAdminParams.ACTION, CoreAdminAction.CREATE.toString()); + // Need to create new params for each request + ModifiableSolrParams params = new ModifiableSolrParams(); + params.set(CoreAdminParams.ACTION, CoreAdminAction.CREATE.toString()); - params.set(CoreAdminParams.NAME, coreName); - params.set(COLL_CONF, configName); - params.set(CoreAdminParams.COLLECTION, collectionName); - params.set(CoreAdminParams.SHARD, sliceName); - params.set(ZkStateReader.NUM_SHARDS_PROP, numSlices); + params.set(CoreAdminParams.NAME, coreName); + params.set(COLL_CONF, configName); + params.set(CoreAdminParams.COLLECTION, collectionName); + params.set(CoreAdminParams.SHARD, position.shard); + params.set(ZkStateReader.NUM_SHARDS_PROP, numSlices); - if (async != null) { - String coreAdminAsyncId = async + Math.abs(System.nanoTime()); - params.add(ASYNC, coreAdminAsyncId); - requestMap.put(nodeName, coreAdminAsyncId); - } - addPropertyParams(message, params); + if (async != null) { + String coreAdminAsyncId = async + Math.abs(System.nanoTime()); + params.add(ASYNC, coreAdminAsyncId); + requestMap.put(nodeName, coreAdminAsyncId); + } + addPropertyParams(message, params); - ShardRequest sreq = new ShardRequest(); - params.set("qt", adminPath); - sreq.purpose = 1; - sreq.shards = new String[] {baseUrl}; - sreq.actualShards = sreq.shards; - sreq.params = params; + ShardRequest sreq = new ShardRequest(); + params.set("qt", adminPath); + sreq.purpose = 1; + sreq.shards = new String[]{baseUrl}; + sreq.actualShards = sreq.shards; + sreq.params = params; - if(isLegacyCloud) { - shardHandler.submit(sreq, sreq.shards[0], sreq.params); - } else { - coresToCreate.put(coreName, sreq); - } + if (isLegacyCloud) { + shardHandler.submit(sreq, sreq.shards[0], sreq.params); + } else { + coresToCreate.put(coreName, sreq); } } @@ -2446,6 +2455,57 @@ public class OverseerCollectionProcessor implements Runnable, Closeable { } } + private Map identifyNodes(ClusterState clusterState, + List nodeList, + ZkNodeProps message, + List shardNames, + int repFactor) throws IOException { + List maps = (List) message.get("rule"); + if (maps == null) { + int i = 0; + Map result = new HashMap<>(); + for (String aShard : shardNames) { + for (int j = 0; j < repFactor; j++){ + result.put(new Position(aShard, j), nodeList.get(i % nodeList.size())); + i++; + } + } + return result; + } + + List rules = new ArrayList<>(); + for (Object map : maps) rules.add(new Rule((Map) map)); + + Map sharVsReplicaCount = new HashMap<>(); + + for (String shard : shardNames) sharVsReplicaCount.put(shard, repFactor); + maps = (List) message.get("snitch"); + List snitchList = maps == null? Collections.emptyList(): maps; + ReplicaAssigner replicaAssigner = new ReplicaAssigner(rules, + sharVsReplicaCount, + snitchList, + new HashMap<>(),//this is a new collection. So, there are no nodes in any shard + nodeList, + overseer.getZkController().getCoreContainer(), + clusterState); + + Map nodeMappings = replicaAssigner.getNodeMappings(); + if(nodeMappings == null){ + String msg = "Could not identify nodes matching the rules " + rules ; + if(!replicaAssigner.failedNodes.isEmpty()){ + Map failedNodes = new HashMap<>(); + for (Map.Entry e : replicaAssigner.failedNodes.entrySet()) { + failedNodes.put(e.getKey(), e.getValue().getErrMsg()); + } + msg+=" Some nodes where excluded from assigning replicas because tags could not be obtained from them "+ failedNodes; + } + msg+= ZkStateReader.toJSONString(replicaAssigner.getNodeVsTags()); + + throw new SolrException(ErrorCode.BAD_REQUEST, msg); + } + return nodeMappings; + } + private Map waitToSeeReplicasInState(String collectionName, Collection coreNames) throws InterruptedException { Map result = new HashMap<>(); long endTime = System.nanoTime() + TimeUnit.NANOSECONDS.convert(30, TimeUnit.SECONDS); diff --git a/solr/core/src/java/org/apache/solr/cloud/rule/ImplicitSnitch.java b/solr/core/src/java/org/apache/solr/cloud/rule/ImplicitSnitch.java new file mode 100644 index 00000000000..2d6c07cafa9 --- /dev/null +++ b/solr/core/src/java/org/apache/solr/cloud/rule/ImplicitSnitch.java @@ -0,0 +1,92 @@ +package org.apache.solr.cloud.rule; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +import com.google.common.collect.ImmutableSet; +import org.apache.solr.common.params.ModifiableSolrParams; +import org.apache.solr.core.CoreContainer; +import org.apache.solr.handler.admin.CoreAdminHandler; +import org.apache.solr.request.SolrQueryRequest; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ImplicitSnitch extends Snitch implements CoreAdminHandler.Invocable { + static final Logger log = LoggerFactory.getLogger(ImplicitSnitch.class); + + //well known tags + public static final String NODE = "node"; + public static final String PORT = "port"; + public static final String HOST = "host"; + public static final String CORES = "cores"; + public static final String DISK = "disk"; + public static final String SYSPROP = "D."; + + public static final Set tags = ImmutableSet.of(NODE, PORT, HOST, CORES, DISK); + + + @Override + public void getTags(String solrNode, Set requestedTags, SnitchContext ctx) { + if (requestedTags.contains(NODE)) ctx.getTags().put(NODE, solrNode); + if (requestedTags.contains(HOST)) ctx.getTags().put(HOST, solrNode.substring(0, solrNode.indexOf(':'))); + ModifiableSolrParams params = new ModifiableSolrParams(); + if (requestedTags.contains(CORES)) params.add(CORES, "1"); + if (requestedTags.contains(DISK)) params.add(DISK, "1"); + for (String tag : requestedTags) { + if (tag.startsWith(SYSPROP)) params.add(SYSPROP, tag.substring(SYSPROP.length())); + } + if (params.size() > 0) ctx.invokeRemote(solrNode, params, ImplicitSnitch.class.getName(), null); + } + + public Map invoke(SolrQueryRequest req) { + Map result = new HashMap<>(); + if (req.getParams().getInt(CORES, -1) == 1) { + CoreContainer cc = (CoreContainer) req.getContext().get(CoreContainer.class.getName()); + result.put(CORES, cc.getCoreNames().size()); + } + if (req.getParams().getInt(DISK, -1) == 1) { + try { + long space = Files.getFileStore(Paths.get("/")).getUsableSpace(); + long spaceInGB = space / 1024 / 1024 / 1024; + result.put(DISK, spaceInGB); + } catch (IOException e) { + + } + } + String[] sysProps = req.getParams().getParams(SYSPROP); + if (sysProps != null && sysProps.length > 0) { + for (String prop : sysProps) result.put(prop, System.getProperty(prop)); + } + return result; + } + + + @Override + public boolean isKnownTag(String tag) { + return tags.contains(tag) || + tag.startsWith(SYSPROP);//a system property + } + +} diff --git a/solr/core/src/java/org/apache/solr/cloud/rule/RemoteCallback.java b/solr/core/src/java/org/apache/solr/cloud/rule/RemoteCallback.java new file mode 100644 index 00000000000..124519b1174 --- /dev/null +++ b/solr/core/src/java/org/apache/solr/cloud/rule/RemoteCallback.java @@ -0,0 +1,24 @@ +package org.apache.solr.cloud.rule; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.util.Map; + +public interface RemoteCallback { + public void remoteCallback(SnitchContext ctx, Map returnedVal) ; +} diff --git a/solr/core/src/java/org/apache/solr/cloud/rule/ReplicaAssigner.java b/solr/core/src/java/org/apache/solr/cloud/rule/ReplicaAssigner.java new file mode 100644 index 00000000000..128b9a18ffb --- /dev/null +++ b/solr/core/src/java/org/apache/solr/cloud/rule/ReplicaAssigner.java @@ -0,0 +1,451 @@ +package org.apache.solr.cloud.rule; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.util.ArrayList; +import java.util.BitSet; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.solr.common.SolrException; +import org.apache.solr.common.cloud.ClusterState; +import org.apache.solr.common.cloud.DocCollection; +import org.apache.solr.common.cloud.Replica; +import org.apache.solr.common.cloud.Slice; +import org.apache.solr.common.util.StrUtils; +import org.apache.solr.core.CoreContainer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static java.util.Collections.singletonList; +import static org.apache.solr.cloud.rule.Rule.MatchStatus.NODE_CAN_BE_ASSIGNED; +import static org.apache.solr.cloud.rule.Rule.Phase.ASSIGN; +import static org.apache.solr.cloud.rule.Rule.Phase.FUZZY_ASSIGN; +import static org.apache.solr.cloud.rule.Rule.Phase.FUZZY_VERIFY; +import static org.apache.solr.cloud.rule.Rule.Phase.VERIFY; +import static org.apache.solr.common.util.StrUtils.formatString; +import static org.apache.solr.core.RequestParams.getDeepCopy; + +public class ReplicaAssigner { + public static final Logger log = LoggerFactory.getLogger(ReplicaAssigner.class); + List rules; + Map shardVsReplicaCount; + Map> nodeVsTags; + Map> shardVsNodes; + List liveNodes; + Set tagNames = new HashSet<>(); + private Map nodeVsCores = new HashMap<>(); + + + public static class Position implements Comparable { + public final String shard; + public final int index; + + public Position(String shard, int replicaIdx) { + this.shard = shard; + this.index = replicaIdx; + } + + @Override + public int compareTo(Position that) { + //this is to ensure that we try one replica from each shard first instead of + // all replicas from same shard + return that.index > index ? -1 : that.index == index ? 0 : 1; + } + + @Override + public String toString() { + return shard + ":" + index; + } + } + + + /** + * @param shardVsReplicaCount shard names vs no:of replicas required for each of those shards + * @param snitches snitches details + * @param shardVsNodes The current state of the system. can be an empty map if no nodes + * are created in this collection till now + */ + public ReplicaAssigner(List rules, + Map shardVsReplicaCount, + List snitches, + Map> shardVsNodes, + List liveNodes, + CoreContainer cc, ClusterState clusterState) { + this.rules = rules; + for (Rule rule : rules) tagNames.add(rule.tag.name); + this.shardVsReplicaCount = shardVsReplicaCount; + this.liveNodes = new ArrayList<>(liveNodes); + this.nodeVsTags = getTagsForNodes(cc, snitches); + this.shardVsNodes = getDeepCopy(shardVsNodes, 2); + validateTags(nodeVsTags); + + if (clusterState != null) { + for (String s : clusterState.getCollections()) { + DocCollection coll = clusterState.getCollection(s); + for (Slice slice : coll.getSlices()) { + for (Replica replica : slice.getReplicas()) { + AtomicInteger count = nodeVsCores.get(replica.getNodeName()); + if (count == null) nodeVsCores.put(replica.getNodeName(), count = new AtomicInteger()); + count.incrementAndGet(); + } + } + } + } + } + + public Map> getNodeVsTags() { + return nodeVsTags; + + } + + + /** + * For each shard return a new set of nodes where the replicas need to be created satisfying + * the specified rule + */ + public Map getNodeMappings() { + List shardNames = new ArrayList<>(shardVsReplicaCount.keySet()); + int[] shardOrder = new int[shardNames.size()]; + for (int i = 0; i < shardNames.size(); i++) shardOrder[i] = i; + + boolean hasFuzzyRules = false; + int nonWildCardShardRules = 0; + for (Rule r : rules) { + if (r.isFuzzy()) hasFuzzyRules = true; + if (!r.shard.isWildCard()) { + nonWildCardShardRules++; + //we will have to try all combinations + if (shardNames.size() > 10) { + throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, + "Max 10 shards allowed if there is a non wild card shard specified in rule"); + } + } + } + + Map result = tryAllPermutations(shardNames, shardOrder, nonWildCardShardRules, false); + if (result == null && hasFuzzyRules) { + result = tryAllPermutations(shardNames, shardOrder, nonWildCardShardRules, true); + } + return result; + + } + + private Map tryAllPermutations(List shardNames, + int[] shardOrder, + int nonWildCardShardRules, + boolean fuzzyPhase) { + + + Iterator shardPermutations = nonWildCardShardRules > 0 ? + permutations(shardNames.size()) : + singletonList(shardOrder).iterator(); + + for (; shardPermutations.hasNext(); ) { + int[] p = shardPermutations.next(); + for (int i = 0; i < p.length; i++) { + List positions = new ArrayList<>(); + for (int pos : p) { + for (int j = 0; j < shardVsReplicaCount.get(shardNames.get(pos)); j++) { + positions.add(new Position(shardNames.get(pos), j)); + } + } + Collections.sort(positions); + for (Iterator it = permutations(rules.size()); it.hasNext(); ) { + int[] permutation = it.next(); + Map result = tryAPermutationOfRules(permutation, positions, fuzzyPhase); + if (result != null) return result; + } + } + } + + return null; + } + + + private Map tryAPermutationOfRules(int[] rulePermutation, List positions, boolean fuzzyPhase) { + Map> nodeVsTagsCopy = getDeepCopy(nodeVsTags, 2); + Map result = new LinkedHashMap<>(); + int startPosition = 0; + Map> copyOfCurrentState = getDeepCopy(shardVsNodes, 2); + List sortedLiveNodes = new ArrayList<>(this.liveNodes); + Collections.sort(sortedLiveNodes, new Comparator() { + @Override + public int compare(String n1, String n2) { + int result = 0; + for (int i = 0; i < rulePermutation.length; i++) { + Rule rule = rules.get(rulePermutation[i]); + int val = rule.compare(n1, n2, nodeVsTagsCopy, copyOfCurrentState); + if (val != 0) {//atleast one non-zero compare break now + result = val; + break; + } + if (result == 0) {//if all else is equal, prefer nodes with fewer cores + AtomicInteger n1Count = nodeVsCores.get(n1); + AtomicInteger n2Count = nodeVsCores.get(n2); + int a = n1Count == null ? 0 : n1Count.get(); + int b = n2Count == null ? 0 : n2Count.get(); + result = a > b ? 1 : a == b ? 0 : -1; + } + + } + return result; + } + }); + forEachPosition: + for (Position position : positions) { + //trying to assign a node by verifying each rule in this rulePermutation + forEachNode: + for (int j = 0; j < sortedLiveNodes.size(); j++) { + String liveNode = sortedLiveNodes.get(startPosition % sortedLiveNodes.size()); + startPosition++; + for (int i = 0; i < rulePermutation.length; i++) { + Rule rule = rules.get(rulePermutation[i]); + //trying to assign a replica into this node in this shard + Rule.MatchStatus status = rule.tryAssignNodeToShard(liveNode, + copyOfCurrentState, nodeVsTagsCopy, position.shard, fuzzyPhase ? FUZZY_ASSIGN : ASSIGN); + if (status == Rule.MatchStatus.CANNOT_ASSIGN_FAIL) { + continue forEachNode;//try another node for this position + } + } + //We have reached this far means this node can be applied to this position + //and all rules are fine. So let us change the currentState + result.put(position, liveNode); + Set nodeNames = copyOfCurrentState.get(position.shard); + if (nodeNames == null) copyOfCurrentState.put(position.shard, nodeNames = new HashSet<>()); + nodeNames.add(liveNode); + Number coreCount = (Number) nodeVsTagsCopy.get(liveNode).get(ImplicitSnitch.CORES); + if (coreCount != null) { + nodeVsTagsCopy.get(liveNode).put(ImplicitSnitch.CORES, coreCount.intValue() + 1); + } + + continue forEachPosition; + } + //if it reached here, we could not find a node for this position + return null; + } + + if (positions.size() > result.size()) { + return null; + } + + for (Map.Entry e : result.entrySet()) { + for (int i = 0; i < rulePermutation.length; i++) { + Rule rule = rules.get(rulePermutation[i]); + Rule.MatchStatus matchStatus = rule.tryAssignNodeToShard(e.getValue(), + copyOfCurrentState, nodeVsTagsCopy, e.getKey().shard, fuzzyPhase ? FUZZY_VERIFY : VERIFY); + if (matchStatus != NODE_CAN_BE_ASSIGNED) return null; + } + } + return result; + } + + private void validateTags(Map> nodeVsTags) { + List errors = new ArrayList<>(); + for (Rule rule : rules) { + for (Map.Entry> e : nodeVsTags.entrySet()) { + if (e.getValue().get(rule.tag.name) == null) { + errors.add(formatString("The value for tag {0} is not available for node {}")); + } + } + } + if (!errors.isEmpty()) { + throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, StrUtils.join(errors, ',')); + } + } + + + /** + * get all permutations for the int[] whose items are 0..level + */ + public static Iterator permutations(final int level) { + return new Iterator() { + int i = 0; + int[] next; + + @Override + public boolean hasNext() { + AtomicReference nthval = new AtomicReference<>(); + permute(0, new int[level], new BitSet(level), nthval, i, new AtomicInteger()); + i++; + next = nthval.get(); + return next != null; + } + + @Override + public int[] next() { + return next; + } + }; + + } + + + private static void permute(int level, int[] permuted, BitSet used, AtomicReference nthval, + int requestedIdx, AtomicInteger seenSoFar) { + if (level == permuted.length) { + if (seenSoFar.get() == requestedIdx) nthval.set(permuted); + else seenSoFar.incrementAndGet(); + } else { + for (int i = 0; i < permuted.length; i++) { + if (!used.get(i)) { + used.set(i); + permuted[level] = i; + permute(level + 1, permuted, used, nthval, requestedIdx, seenSoFar); + if (nthval.get() != null) break; + used.set(i, false); + } + } + } + } + + + public Map failedNodes = new HashMap<>(); + + /** + * This method uses the snitches and get the tags for all the nodes + */ + private Map> getTagsForNodes(final CoreContainer cc, List snitchConf) { + + class Info extends SnitchContext.SnitchInfo { + final Snitch snitch; + final Set myTags = new HashSet<>(); + final Map nodeVsContext = new HashMap<>(); + + Info(Map conf, Snitch snitch) { + super(conf); + this.snitch = snitch; + } + + @Override + public Set getTagNames() { + return myTags; + } + + @Override + public CoreContainer getCoreContainer() { + return cc; + } + } + + Map snitches = new LinkedHashMap<>(); + for (Object o : snitchConf) { + //instantiating explicitly specified snitches + String klas = null; + Map map = Collections.emptyMap(); + if (o instanceof Map) {//it can be a Map + map = (Map) o; + klas = (String) map.get("class"); + if (klas == null) { + throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "snitch must have a class attribute"); + } + } else { //or just the snitch name + klas = o.toString(); + } + try { + if (klas.indexOf('.') == -1) klas = Snitch.class.getPackage().getName() + "." + klas; + Snitch inst = cc == null ? + (Snitch) Snitch.class.getClassLoader().loadClass(klas).newInstance() : + cc.getResourceLoader().newInstance(klas, Snitch.class); + snitches.put(inst.getClass(), new Info(map, inst)); + } catch (Exception e) { + throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e); + + } + + } + for (Class c : Snitch.WELL_KNOWN_SNITCHES) { + if (snitches.containsKey(c)) continue;// it is already specified explicitly , ignore + try { + snitches.put(c, new Info(Collections.EMPTY_MAP, (Snitch) c.newInstance())); + } catch (Exception e) { + throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Error instantiating Snitch " + c.getName()); + } + } + for (String tagName : tagNames) { + //identify which snitch is going to provide values for a given tag + boolean foundProvider = false; + for (Info info : snitches.values()) { + if (info.snitch.isKnownTag(tagName)) { + foundProvider = true; + info.myTags.add(tagName); + break; + } + } + if (!foundProvider) + throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Unknown tag in rules " + tagName); + } + + + for (String node : liveNodes) { + //now use the Snitch to get the tags + for (Info info : snitches.values()) { + if (!info.myTags.isEmpty()) { + SnitchContext context = new SnitchContext(info, node); + info.nodeVsContext.put(node, context); + try { + info.snitch.getTags(node, info.myTags, context); + } catch (Exception e) { + context.exception = e; + } + } + } + } + + Map> result = new HashMap<>(); + for (Info info : snitches.values()) { + for (Map.Entry e : info.nodeVsContext.entrySet()) { + SnitchContext context = e.getValue(); + String node = e.getKey(); + if (context.exception != null) { + failedNodes.put(node, context); + liveNodes.remove(node); + log.warn("Not all tags were obtained from node " + node); + context.exception = new SolrException(SolrException.ErrorCode.SERVER_ERROR, + "Not all tags were obtained from node " + node); + } else { + if (context.getTags().keySet().containsAll(context.snitchInfo.getTagNames())) { + Map tags = result.get(node); + if (tags == null) { + tags = new HashMap<>(); + result.put(node, tags); + } + tags.putAll(context.getTags()); + } + } + } + } + + if (liveNodes.isEmpty()) { + throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Could not get all tags for any nodes"); + + } + return result; + + } + +} diff --git a/solr/core/src/java/org/apache/solr/cloud/rule/Rule.java b/solr/core/src/java/org/apache/solr/cloud/rule/Rule.java new file mode 100644 index 00000000000..4092457bdfa --- /dev/null +++ b/solr/core/src/java/org/apache/solr/cloud/rule/Rule.java @@ -0,0 +1,380 @@ +package org.apache.solr.cloud.rule; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; + +import org.apache.solr.common.SolrException; +import org.apache.solr.common.cloud.ZkStateReader; +import org.apache.solr.common.util.StrUtils; + +import static org.apache.solr.cloud.rule.ImplicitSnitch.CORES; +import static org.apache.solr.cloud.rule.Rule.MatchStatus.CANNOT_ASSIGN_FAIL; +import static org.apache.solr.cloud.rule.Rule.MatchStatus.NODE_CAN_BE_ASSIGNED; +import static org.apache.solr.cloud.rule.Rule.MatchStatus.NOT_APPLICABLE; +import static org.apache.solr.cloud.rule.Rule.Operand.EQUAL; +import static org.apache.solr.cloud.rule.Rule.Operand.GREATER_THAN; +import static org.apache.solr.cloud.rule.Rule.Operand.LESS_THAN; +import static org.apache.solr.cloud.rule.Rule.Operand.NOT_EQUAL; +import static org.apache.solr.common.cloud.ZkStateReader.REPLICA_PROP; +import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP; + + +public class Rule { + public static final String WILD_CARD = "*"; + public static final String WILD_WILD_CARD = "**"; + static final Condition SHARD_DEFAULT = new Rule.Condition(SHARD_ID_PROP, WILD_WILD_CARD); + static final Condition REPLICA_DEFAULT = new Rule.Condition(REPLICA_PROP, WILD_CARD); + Condition shard; + Condition replica; + Condition tag; + + public Rule(Map m) { + for (Object o : m.entrySet()) { + Map.Entry e = (Map.Entry) o; + Condition condition = new Condition(String.valueOf(e.getKey()), String.valueOf(e.getValue())); + if (condition.name.equals(SHARD_ID_PROP)) shard = condition; + else if (condition.name.equals(REPLICA_PROP)) replica = condition; + else { + if (tag != null) { + throw new RuntimeException("There can be only one and only one tag other than 'shard' and 'replica' in rule " + m); + } + tag = condition; + } + + } + if (shard == null) shard = SHARD_DEFAULT; + if (replica == null) replica = REPLICA_DEFAULT; + if (tag == null) throw new RuntimeException("There should be a tag other than 'shard' and 'replica'"); + if (replica.isWildCard() && tag.isWildCard()) { + throw new RuntimeException("Both replica and tag cannot be wild cards"); + } + + } + + static Object parseObj(Object o, Class typ) { + if (o == null) return o; + if (typ == String.class) return String.valueOf(o); + if (typ == Integer.class) { + return Integer.parseInt(String.valueOf(o)); + } + return o; + } + + public static Map parseRule(String s) { + Map result = new LinkedHashMap<>(); + s = s.trim(); + List keyVals = StrUtils.splitSmart(s, ','); + for (String kv : keyVals) { + List keyVal = StrUtils.splitSmart(kv, ':'); + if (keyVal.size() != 2) { + throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Invalid rule. should have only key and val in : " + kv); + } + if (keyVal.get(0).trim().length() == 0 || keyVal.get(1).trim().length() == 0) { + throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Invalid rule. should have key and val in : " + kv); + } + result.put(keyVal.get(0).trim(), keyVal.get(1).trim()); + } + return result; + } + + + @Override + public String toString() { + Map map = new LinkedHashMap(); + if (shard != SHARD_DEFAULT) map.put(shard.name, shard.operand.toStr(shard.val)); + if (replica != REPLICA_DEFAULT) map.put(replica.name, replica.operand.toStr(replica.val)); + map.put(tag.name, tag.operand.toStr(tag.val)); + return ZkStateReader.toJSONString(map); + } + + /** + * Check if it is possible to assign this node as a replica of the given shard + * without violating this rule + * + * @param testNode The node in question + * @param shardVsNodeSet + * @param nodeVsTags The pre-fetched tags for all the nodes + * @param shardName The shard to which this node should be attempted + * @return + */ + MatchStatus tryAssignNodeToShard(String testNode, + Map> shardVsNodeSet, + Map> nodeVsTags, + String shardName, Phase phase) { + + if (tag.isWildCard()) { + //this is ensuring uniqueness across a certain tag + //eg: rack:r168 + if (!shard.isWildCard() && shardName.equals(shard.val)) return NOT_APPLICABLE; + Object tagValueForThisNode = nodeVsTags.get(testNode).get(tag.name); + int v = getNumberOfNodesWithSameTagVal(shard, nodeVsTags, shardVsNodeSet, + shardName, new Condition(tag.name, tagValueForThisNode, EQUAL), phase); + if (phase == Phase.ASSIGN || phase == Phase.FUZZY_ASSIGN) + v++;//v++ because including this node , it becomes v+1 during ASSIGN + return replica.canMatch(v, phase) ? + NODE_CAN_BE_ASSIGNED : + CANNOT_ASSIGN_FAIL; + } else { + if (!shard.isWildCard() && !shardName.equals(shard.val)) return NOT_APPLICABLE; + if (replica.isWildCard()) { + //this means for each replica, the value must match + //shard match is already tested + if (tag.canMatch(nodeVsTags.get(testNode).get(tag.name), phase)) return NODE_CAN_BE_ASSIGNED; + else return CANNOT_ASSIGN_FAIL; + } else { + int v = getNumberOfNodesWithSameTagVal(shard, nodeVsTags, shardVsNodeSet, shardName, tag, phase); + return replica.canMatch(v, phase) ? NODE_CAN_BE_ASSIGNED : CANNOT_ASSIGN_FAIL; + + } + + } + } + + private int getNumberOfNodesWithSameTagVal(Condition shardCondition, + Map> nodeVsTags, + Map> shardVsNodeSet, + String shardName, + Condition tagCondition, + Phase phase) { + + int countMatchingThisTagValue = 0; + for (Map.Entry> entry : shardVsNodeSet.entrySet()) { + //check if this shard is relevant. either it is a ANY Wild card (**) + // or this shard is same as the shard in question + if (shardCondition.val.equals(WILD_WILD_CARD) || entry.getKey().equals(shardName)) { + Set nodesInThisShard = shardVsNodeSet.get(shardCondition.val.equals(WILD_WILD_CARD) ? entry.getKey() : shardName); + if (nodesInThisShard != null) { + for (String aNode : nodesInThisShard) { + Object obj = nodeVsTags.get(aNode).get(tag.name); + if (tagCondition.canMatch(obj, phase)) countMatchingThisTagValue++; + } + } + } + } + return countMatchingThisTagValue; + } + + public int compare(String n1, String n2, + Map> nodeVsTags, + Map> currentState) { + return tag.compare(n1, n2, nodeVsTags); + } + + public boolean isFuzzy() { + return shard.fuzzy || replica.fuzzy || tag.fuzzy; + } + + public enum Operand { + EQUAL(""), + NOT_EQUAL("!") { + @Override + public boolean canMatch(Object ruleVal, Object testVal) { + return !super.canMatch(ruleVal, testVal); + } + }, + GREATER_THAN(">") { + @Override + public Object match(String val) { + return checkNumeric(super.match(val)); + } + + + @Override + public boolean canMatch(Object ruleVal, Object testVal) { + return compareNum(ruleVal, testVal) == 1; + } + + }, + LESS_THAN("<") { + @Override + public int compare(Object n1Val, Object n2Val) { + return GREATER_THAN.compare(n1Val, n2Val) * -1; + } + + @Override + public boolean canMatch(Object ruleVal, Object testVal) { + return compareNum(ruleVal, testVal) == -1; + } + + @Override + public Object match(String val) { + return checkNumeric(super.match(val)); + } + }; + public final String operand; + + Operand(String val) { + this.operand = val; + } + + public String toStr(Object expectedVal) { + return operand + expectedVal.toString(); + } + + Object checkNumeric(Object val) { + if (val == null) return null; + try { + return Integer.parseInt(val.toString()); + } catch (NumberFormatException e) { + throw new RuntimeException("for operand " + operand + " the value must be numeric"); + } + } + + public Object match(String val) { + if (operand.isEmpty()) return val; + return val.startsWith(operand) ? val.substring(1) : null; + } + + public boolean canMatch(Object ruleVal, Object testVal) { + return Objects.equals(String.valueOf(ruleVal), String.valueOf(testVal)); + } + + + public int compare(Object n1Val, Object n2Val) { + return 0; + } + + public int compareNum(Object n1Val, Object n2Val) { + Integer n1 = (Integer) parseObj(n1Val, Integer.class); + Integer n2 = (Integer) parseObj(n2Val, Integer.class); + return n1 > n2 ? -1 : Objects.equals(n1, n2) ? 0 : 1; + } + } + + enum MatchStatus { + NODE_CAN_BE_ASSIGNED, + CANNOT_ASSIGN_GO_AHEAD, + NOT_APPLICABLE, + CANNOT_ASSIGN_FAIL + } + + enum Phase { + ASSIGN, VERIFY, FUZZY_ASSIGN, FUZZY_VERIFY + } + + public static class Condition { + public final String name; + final Object val; + public final Operand operand; + final boolean fuzzy; + + Condition(String name, Object val, Operand op) { + this.name = name; + this.val = val; + this.operand = op; + fuzzy = false; + } + + Condition(String key, Object val) { + Object expectedVal; + boolean fuzzy = false; + if (val == null) throw new RuntimeException("value of a tag cannot be null for key " + key); + try { + this.name = key.trim(); + String value = val.toString().trim(); + if (value.endsWith("~")) { + fuzzy = true; + value = value.substring(0, value.length() - 1); + } + if ((expectedVal = NOT_EQUAL.match(value)) != null) { + operand = NOT_EQUAL; + } else if ((expectedVal = GREATER_THAN.match(value)) != null) { + operand = GREATER_THAN; + } else if ((expectedVal = LESS_THAN.match(value)) != null) { + operand = LESS_THAN; + } else { + operand = EQUAL; + expectedVal = value; + } + + if (name.equals(REPLICA_PROP)) { + if (!WILD_CARD.equals(expectedVal)) { + try { + expectedVal = Integer.parseInt(expectedVal.toString()); + } catch (NumberFormatException e) { + throw new RuntimeException("The replica tag value can only be '*' or an integer"); + } + } + } + + } catch (Exception e) { + throw new IllegalArgumentException("Invalid condition : " + key + ":" + val, e); + } + this.val = expectedVal; + this.fuzzy = fuzzy; + + } + + public boolean isWildCard() { + return val.equals(WILD_CARD) || val.equals(WILD_WILD_CARD); + } + + boolean canMatch(Object testVal, Phase phase) { + if (phase == Phase.FUZZY_ASSIGN || phase == Phase.FUZZY_VERIFY) return true; + if (phase == Phase.ASSIGN) { + if ((name.equals(REPLICA_PROP) || name.equals(CORES)) && + (operand == GREATER_THAN || operand == NOT_EQUAL)) { + //the no:of replicas or cores will increase towards the end + //so this should only be checked in the Phase. + //process + return true; + } + } + + return operand.canMatch(val, testVal); + } + + + @Override + public boolean equals(Object obj) { + if (obj instanceof Condition) { + Condition that = (Condition) obj; + return Objects.equals(name, that.name) && + Objects.equals(operand, that.operand) && + Objects.equals(val, that.val); + + } + return false; + } + + @Override + public String toString() { + return name + ":" + operand.toStr(val) + (fuzzy ? "~" : ""); + } + + public Integer getInt() { + return (Integer) val; + } + + public int compare(String n1, String n2, Map> nodeVsTags) { + return isWildCard() ? 0 : operand.compare(nodeVsTags.get(n1).get(name), nodeVsTags.get(n2).get(name)); + } + + } + + +} + + + diff --git a/solr/core/src/java/org/apache/solr/cloud/rule/Snitch.java b/solr/core/src/java/org/apache/solr/cloud/rule/Snitch.java new file mode 100644 index 00000000000..85a3596c54c --- /dev/null +++ b/solr/core/src/java/org/apache/solr/cloud/rule/Snitch.java @@ -0,0 +1,35 @@ +package org.apache.solr.cloud.rule; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.util.Set; + +import com.google.common.collect.ImmutableSet; + +/** + * + */ +public abstract class Snitch { + static Set WELL_KNOWN_SNITCHES = ImmutableSet.of(ImplicitSnitch.class); + + + public abstract void getTags(String solrNode, Set requestedTags, SnitchContext ctx); + + public abstract boolean isKnownTag(String tag); + +} diff --git a/solr/core/src/java/org/apache/solr/cloud/rule/SnitchContext.java b/solr/core/src/java/org/apache/solr/cloud/rule/SnitchContext.java new file mode 100644 index 00000000000..fab0d65190b --- /dev/null +++ b/solr/core/src/java/org/apache/solr/cloud/rule/SnitchContext.java @@ -0,0 +1,136 @@ +package org.apache.solr.cloud.rule; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.Future; + +import org.apache.http.client.methods.HttpGet; +import org.apache.solr.client.solrj.SolrRequest; +import org.apache.solr.client.solrj.SolrServerException; +import org.apache.solr.client.solrj.impl.BinaryResponseParser; +import org.apache.solr.client.solrj.impl.HttpSolrClient; +import org.apache.solr.client.solrj.request.GenericSolrRequest; +import org.apache.solr.client.solrj.response.SimpleSolrResponse; +import org.apache.solr.common.params.ModifiableSolrParams; +import org.apache.solr.common.params.SolrParams; +import org.apache.solr.common.util.NamedList; +import org.apache.solr.core.CoreContainer; +import org.apache.solr.update.UpdateShardHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.solr.common.params.CoreAdminParams.ACTION; +import static org.apache.solr.common.params.CoreAdminParams.CoreAdminAction.INVOKE; + +/** + * This is the context provided to the snitches to interact with the system. This is a per-node-per-snitch + * instance. + */ +public class SnitchContext implements RemoteCallback { + static final Logger log = LoggerFactory.getLogger(SnitchContext.class); + private final Map tags = new HashMap<>(); + private String node; + final SnitchInfo snitchInfo; + Exception exception; + + + SnitchContext(SnitchInfo perSnitch, String node) { + this.snitchInfo = perSnitch; + this.node = node; + } + + public SnitchInfo getSnitchInfo() { + return snitchInfo; + } + + public Map getTags() { + return tags; + } + + public String getNode() { + return node; + } + + /** + * make a call to solrnode/admin/cores with the given params and give a callback. This is designed to be + * asynchronous because the system would want to batch the calls made to any given node + * + * @param node The node for which this call is made + * @param params The params to be passed to the Snitch counterpart + * @param klas The name of the class to be invoked in the remote node + * @param callback The callback to be called when the response is obtained from remote node. + * If this is passed as null the entire response map will be added as tags + */ + public void invokeRemote(String node, ModifiableSolrParams params, String klas, RemoteCallback callback) { + if (callback == null) callback = this; + String url = snitchInfo.getCoreContainer().getZkController().getZkStateReader().getBaseUrlForNodeName(node); + params.add("class", klas); + params.add(ACTION, INVOKE.toString()); + //todo batch all requests to the same server + + try { + SimpleSolrResponse rsp = invoke(snitchInfo.getCoreContainer().getUpdateShardHandler(), url, CoreContainer.CORES_HANDLER_PATH, params); + Map returnedVal = (Map) rsp.getResponse().get(klas); + if(exception == null){ +// log this + } else { + callback.remoteCallback(SnitchContext.this,returnedVal); + } + callback.remoteCallback(this, returnedVal); + } catch (Exception e) { + log.error("Unable to invoke snitch counterpart", e); + exception = e; + } + } + + + public SimpleSolrResponse invoke(UpdateShardHandler shardHandler, final String url, String path, SolrParams params) + throws IOException, SolrServerException { + GenericSolrRequest request = new GenericSolrRequest(SolrRequest.METHOD.GET, path, params); + NamedList rsp = new HttpSolrClient(url, shardHandler.getHttpClient(), new BinaryResponseParser()).request(request); + request.response.nl = rsp; + return request.response; + } + + + @Override + public void remoteCallback(SnitchContext ctx, Map returnedVal) { + tags.putAll(returnedVal); + } + + public String getErrMsg() { + return exception == null ? null : exception.getMessage(); + } + + public static abstract class SnitchInfo { + private final Map conf; + + SnitchInfo(Map conf) { + this.conf = conf; + } + + public abstract Set getTagNames(); + + public abstract CoreContainer getCoreContainer(); + } +} diff --git a/solr/core/src/java/org/apache/solr/core/RequestParams.java b/solr/core/src/java/org/apache/solr/core/RequestParams.java index 32f8d804703..8390883e5e8 100644 --- a/solr/core/src/java/org/apache/solr/core/RequestParams.java +++ b/solr/core/src/java/org/apache/solr/core/RequestParams.java @@ -204,6 +204,10 @@ public class RequestParams implements MapSerializable { Object v = e.getValue(); if (v instanceof Map && maxDepth > 0) { v = getDeepCopy((Map) v, maxDepth - 1); + } else if (v instanceof Set) { + v = new HashSet((Set) v); + } else if (v instanceof List) { + v = new ArrayList((List) v); } copy.put(e.getKey(), v); } diff --git a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java index 16c3369b5b6..e5df46ab500 100644 --- a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java +++ b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java @@ -48,6 +48,7 @@ import org.apache.solr.cloud.Overseer; import org.apache.solr.cloud.OverseerCollectionProcessor; import org.apache.solr.cloud.OverseerSolrResponse; import org.apache.solr.cloud.overseer.SliceMutator; +import org.apache.solr.cloud.rule.Rule; import org.apache.solr.common.SolrException; import org.apache.solr.common.SolrException.ErrorCode; import org.apache.solr.common.cloud.ClusterState; @@ -803,6 +804,8 @@ public class CollectionsHandler extends RequestHandlerBase { if(props.get(DocCollection.STATE_FORMAT) == null){ props.put(DocCollection.STATE_FORMAT,"2"); } + addRuleMap(req.getParams(), props, "rule"); + addRuleMap(req.getParams(), props, "snitch"); if(SYSTEM_COLL.equals(name)){ //We must always create asystem collection with only a single shard @@ -817,6 +820,15 @@ public class CollectionsHandler extends RequestHandlerBase { handleResponse(CREATE.toLower(), m, rsp); } + private void addRuleMap(SolrParams params, Map props, String key) { + String[] rules = params.getParams(key); + if(rules!= null && rules.length >0){ + ArrayList l = new ArrayList<>(); + for (String rule : rules) l.add(Rule.parseRule(rule)); + props.put(key, l); + } + } + private void createSysConfigSet() throws KeeperException, InterruptedException { SolrZkClient zk = coreContainer.getZkController().getZkStateReader().getZkClient(); createNodeIfNotExists(zk,ZkStateReader.CONFIGS_ZKNODE, null); diff --git a/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java b/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java index f00e654191a..ce0e29a088d 100644 --- a/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java +++ b/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java @@ -67,6 +67,7 @@ import org.apache.solr.core.CoreDescriptor; import org.apache.solr.core.DirectoryFactory; import org.apache.solr.core.DirectoryFactory.DirContext; import org.apache.solr.core.SolrCore; +import org.apache.solr.core.SolrResourceLoader; import org.apache.solr.handler.RequestHandlerBase; import org.apache.solr.request.LocalSolrQueryRequest; import org.apache.solr.request.SolrQueryRequest; @@ -311,11 +312,40 @@ public class CoreAdminHandler extends RequestHandlerBase { log.warn("zkController is null in CoreAdminHandler.handleRequestInternal:REJOINLEADERELCTIONS. No action taken."); } break; + case INVOKE: + handleInvoke(req, rsp); + break; } } rsp.setHttpCaching(false); } + public void handleInvoke(SolrQueryRequest req, SolrQueryResponse rsp) throws Exception { + String[] klas = req.getParams().getParams("class"); + if (klas == null || klas.length == 0) { + throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "class is a required param"); + } + for (String c : klas) { + Map result = invokeAClass(req, c); + rsp.add(c, result); + } + + } + + private Map invokeAClass(SolrQueryRequest req, String c) { + SolrResourceLoader loader = null; + if (req.getCore() != null) loader = req.getCore().getResourceLoader(); + else if (req.getContext().get(CoreContainer.class.getName()) != null) { + CoreContainer cc = (CoreContainer) req.getContext().get(CoreContainer.class.getName()); + loader = cc.getResourceLoader(); + } + + Invocable invokable = loader.newInstance(c, Invocable.class); + Map result = invokable.invoke(req); + log.info("Invocable_invoked {}", result); + return result; + } + /** * Handle the core admin SPLIT action. @@ -1315,4 +1345,11 @@ public class CoreAdminHandler extends RequestHandlerBase { if (parallelExecutor != null && !parallelExecutor.isShutdown()) ExecutorUtil.shutdownAndAwaitTermination(parallelExecutor); } + + /** + * used by the INVOKE action of core admin handler + */ + public static interface Invocable { + public Map invoke(SolrQueryRequest req); + } } diff --git a/solr/core/src/java/org/apache/solr/servlet/SolrDispatchFilter.java b/solr/core/src/java/org/apache/solr/servlet/SolrDispatchFilter.java index 0005f5837d0..6fc5692a9cf 100644 --- a/solr/core/src/java/org/apache/solr/servlet/SolrDispatchFilter.java +++ b/solr/core/src/java/org/apache/solr/servlet/SolrDispatchFilter.java @@ -286,6 +286,7 @@ public class SolrDispatchFilter extends BaseSolrFilter { handler = cores.getRequestHandler(path); if (handler != null) { solrReq = SolrRequestParsers.DEFAULT.parse(null, path, req); + solrReq.getContext().put(CoreContainer.class.getName(), cores); handleAdminRequest(req, response, handler, solrReq); return; } diff --git a/solr/core/src/test/org/apache/solr/cloud/rule/RuleEngineTest.java b/solr/core/src/test/org/apache/solr/cloud/rule/RuleEngineTest.java new file mode 100644 index 00000000000..ba929dc6681 --- /dev/null +++ b/solr/core/src/test/org/apache/solr/cloud/rule/RuleEngineTest.java @@ -0,0 +1,251 @@ +package org.apache.solr.cloud.rule; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.solr.SolrTestCaseJ4; +import org.apache.solr.cloud.rule.ReplicaAssigner.Position; +import org.apache.solr.common.cloud.ZkStateReader; +import org.junit.Test; + +import static org.apache.solr.cloud.rule.Rule.parseRule; +import static org.apache.solr.common.cloud.ZkNodeProps.makeMap; + +public class RuleEngineTest extends SolrTestCaseJ4{ + @Test + public void testPlacement2(){ + + + String s = "{" + + " '127.0.0.1:49961_':{" + + " 'node':'127.0.0.1:49961_'," + + " 'disk':992," + + " 'cores':1}," + + " '127.0.0.1:49955_':{" + + " 'node':'127.0.0.1:49955_'," + + " 'disk':992," + + " 'cores':1}," + + " '127.0.0.1:49952_':{" + + " 'node':'127.0.0.1:49952_'," + + " 'disk':992," + + " 'cores':1}," + + " '127.0.0.1:49947_':{" + + " 'node':'127.0.0.1:49947_'," + + " 'disk':992," + + " 'cores':1}," + + " '127.0.0.1:49958_':{" + + " 'node':'127.0.0.1:49958_'," + + " 'disk':992," + + " 'cores':1}}"; + MockSnitch.nodeVsTags = (Map) ZkStateReader.fromJSON(s.getBytes()); + Map shardVsReplicaCount = makeMap("shard1", 2, "shard2", 2); + + List rules = parseRules("[{'cores':'<4'}, {" + + "'replica':'1',shard:'*','node':'*'}," + + " {'disk':'>1'}]"); + + Map mapping = new ReplicaAssigner( + rules, + shardVsReplicaCount, Collections.singletonList(MockSnitch.class.getName()), + new HashMap(), new ArrayList<>(MockSnitch.nodeVsTags.keySet()), null, null ).getNodeMappings(); + assertNotNull(mapping); + + mapping = new ReplicaAssigner( + rules, + shardVsReplicaCount, Collections.singletonList(MockSnitch.class.getName()), + new HashMap(), new ArrayList<>(MockSnitch.nodeVsTags.keySet()), null, null ).getNodeMappings(); + assertNotNull(mapping); + + + + } + + public void testPlacement3(){ + String s = "{" + + " '127.0.0.1:49961_':{" + + " 'node':'127.0.0.1:49961_'," + + " 'disk':992," + + " 'cores':1}," + + " '127.0.0.2:49955_':{" + + " 'node':'127.0.0.1:49955_'," + + " 'disk':995," + + " 'cores':1}," + + " '127.0.0.3:49952_':{" + + " 'node':'127.0.0.1:49952_'," + + " 'disk':990," + + " 'cores':1}," + + " '127.0.0.1:49947_':{" + + " 'node':'127.0.0.1:49947_'," + + " 'disk':980," + + " 'cores':1}," + + " '127.0.0.2:49958_':{" + + " 'node':'127.0.0.1:49958_'," + + " 'disk':970," + + " 'cores':1}}"; + MockSnitch.nodeVsTags = (Map) ZkStateReader.fromJSON(s.getBytes()); + //test not + List rules = parseRules( + "[{cores:'<4'}, " + + "{replica:'1',shard:'*',node:'*'}," + + "{node:'!127.0.0.1:49947_'}," + + "{disk:'>1'}]"); + Map shardVsReplicaCount = makeMap("shard1", 2, "shard2", 2); + Map mapping = new ReplicaAssigner( + rules, + shardVsReplicaCount, Collections.singletonList(MockSnitch.class.getName()), + new HashMap(), new ArrayList<>(MockSnitch.nodeVsTags.keySet()), null, null).getNodeMappings(); + assertNotNull(mapping); + assertFalse(mapping.containsValue("127.0.0.1:49947_")); + + rules = parseRules( + "[{cores:'<4'}, " + + "{replica:'1',node:'*'}," + + "{disk:'>980'}]"); + shardVsReplicaCount = makeMap("shard1", 2, "shard2", 2); + mapping = new ReplicaAssigner( + rules, + shardVsReplicaCount, Collections.singletonList(MockSnitch.class.getName()), + new HashMap(), new ArrayList<>(MockSnitch.nodeVsTags.keySet()), null, null).getNodeMappings(); + assertNull(mapping); + + + rules = parseRules( + "[{cores:'<4'}, " + + "{replica:'1',node:'*'}," + + "{disk:'>980~'}]"); + shardVsReplicaCount = makeMap("shard1", 2, "shard2", 2); + mapping = new ReplicaAssigner( + rules, + shardVsReplicaCount, Collections.singletonList(MockSnitch.class.getName()), + new HashMap(), new ArrayList<>(MockSnitch.nodeVsTags.keySet()), null, null).getNodeMappings(); + assertNotNull(mapping); + assertFalse(mapping.containsValue("127.0.0.2:49958_")); + + rules = parseRules( + "[{cores:'<4'}, " + + "{replica:'1',shard:'*',host:'*'}]" + ); + shardVsReplicaCount = makeMap("shard1", 2, "shard2", 2); + mapping = new ReplicaAssigner( + rules, + shardVsReplicaCount, Collections.singletonList(MockSnitch.class.getName()), + new HashMap(), new ArrayList<>(MockSnitch.nodeVsTags.keySet()), null, null).getNodeMappings(); + assertNotNull(mapping); + + rules = parseRules( + "[{cores:'<4'}, " + + "{replica:'1',shard:'**',host:'*'}]" + ); + shardVsReplicaCount = makeMap("shard1", 2, "shard2", 2); + mapping = new ReplicaAssigner( + rules, + shardVsReplicaCount, Collections.singletonList(MockSnitch.class.getName()), + new HashMap(), new ArrayList<>(MockSnitch.nodeVsTags.keySet()), null, null).getNodeMappings(); + assertNull(mapping); + + rules = parseRules( + "[{cores:'<4'}, " + + "{replica:'1~',shard:'**',host:'*'}]" + ); + shardVsReplicaCount = makeMap("shard1", 2, "shard2", 2); + mapping = new ReplicaAssigner( + rules, + shardVsReplicaCount, Collections.singletonList(MockSnitch.class.getName()), + new HashMap(), new ArrayList<>(MockSnitch.nodeVsTags.keySet()), null, null).getNodeMappings(); + assertNotNull(mapping); + + + } + + private List parseRules(String s) { + + List maps = (List) ZkStateReader.fromJSON(s.getBytes()); + + List rules = new ArrayList<>(); + for (Object map : maps) rules.add(new Rule((Map) map)); + return rules; + } + + @Test + public void testPlacement() throws Exception { + String rulesStr = "rack:*,replica:<2"; + List rules = parse(Arrays.asList(rulesStr)); + Map shardVsReplicaCount = makeMap("shard1", 3, "shard2", 3); + Map nodeVsTags = makeMap( + "node1:80", makeMap("rack", "178"), + "node2:80", makeMap("rack", "179"), + "node3:80", makeMap("rack", "180"), + "node4:80", makeMap("rack", "181"), + "node5:80", makeMap("rack", "182") + ); + MockSnitch.nodeVsTags = nodeVsTags; + Map mapping = new ReplicaAssigner( + rules, + shardVsReplicaCount, Collections.singletonList(MockSnitch.class.getName()), + new HashMap(), new ArrayList<>(MockSnitch.nodeVsTags.keySet()), null ,null).getNodeMappings(); + assertNull(mapping); + rulesStr = "rack:*,replica:<2~"; + rules = parse(Arrays.asList(rulesStr)); + mapping = new ReplicaAssigner( + rules, + shardVsReplicaCount, Collections.singletonList(MockSnitch.class.getName()), + new HashMap(), new ArrayList<>(MockSnitch.nodeVsTags.keySet()), null ,null).getNodeMappings(); + assertNotNull(mapping); + + rulesStr = "rack:*,shard:*,replica:<2";//for each shard there can be a max of 1 replica + rules = parse(Arrays.asList(rulesStr)); + mapping = new ReplicaAssigner( + rules, + shardVsReplicaCount, Collections.singletonList(MockSnitch.class.getName()), + new HashMap(), new ArrayList<>(MockSnitch.nodeVsTags.keySet()), null,null ).getNodeMappings(); + assertNotNull(mapping); + } + + public static class MockSnitch extends Snitch { + static Map nodeVsTags = Collections.emptyMap(); + + @Override + public void getTags(String solrNode, Set requestedTags, SnitchContext ctx) { + ctx.getTags().putAll((Map) nodeVsTags.get(solrNode)); + } + + @Override + public boolean isKnownTag(String tag) { + Map next = (Map) nodeVsTags.values().iterator().next(); + return next.containsKey(tag); + } + } + + public static List parse(List rules) throws IOException { + assert rules != null && !rules.isEmpty(); + ArrayList result = new ArrayList<>(); + for (String s : rules) { + if (s == null || s.trim().isEmpty()) continue; + result.add(new Rule(parseRule(s))); + } + return result; + } +} diff --git a/solr/core/src/test/org/apache/solr/cloud/rule/RulesTest.java b/solr/core/src/test/org/apache/solr/cloud/rule/RulesTest.java new file mode 100644 index 00000000000..20972972fe2 --- /dev/null +++ b/solr/core/src/test/org/apache/solr/cloud/rule/RulesTest.java @@ -0,0 +1,67 @@ +package org.apache.solr.cloud.rule; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.util.List; +import java.util.Map; + +import org.apache.solr.client.solrj.SolrClient; +import org.apache.solr.client.solrj.impl.HttpSolrClient; +import org.apache.solr.client.solrj.request.CollectionAdminRequest; +import org.apache.solr.client.solrj.response.CollectionAdminResponse; +import org.apache.solr.cloud.AbstractFullDistribZkTestBase; +import org.apache.solr.common.cloud.DocCollection; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class RulesTest extends AbstractFullDistribZkTestBase { + static final Logger log = LoggerFactory.getLogger(RulesTest.class); + + @Test + public void doIntegrationTest() throws Exception { + String rulesColl = "rulesColl"; + try (SolrClient client = createNewSolrClient("", getBaseUrl((HttpSolrClient) clients.get(0)))) { + CollectionAdminResponse rsp; + CollectionAdminRequest.Create create = new CollectionAdminRequest.Create(); + create.setCollectionName(rulesColl); + create.setNumShards(1); + create.setReplicationFactor(2); + create.setRule("cores:<4", "node:*,replica:1", "disk:>1"); + create.setSnitch("class:ImplicitSnitch"); + rsp = create.process(client); + assertEquals(0, rsp.getStatus()); + assertTrue(rsp.isSuccess()); + + } + + DocCollection rulesCollection = cloudClient.getZkStateReader().getClusterState().getCollection(rulesColl); + List list = (List) rulesCollection.get("rule"); + assertEquals(3, list.size()); + assertEquals ( "<4", ((Map)list.get(0)).get("cores")); + assertEquals("1", ((Map) list.get(1)).get("replica")); + assertEquals(">1", ((Map) list.get(2)).get("disk")); + list = (List) rulesCollection.get("snitch"); + assertEquals(1, list.size()); + assertEquals ( "ImplicitSnitch", ((Map)list.get(0)).get("class")); + + } + + + +} diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java b/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java index 18d9538c847..9609b165e0a 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java @@ -190,7 +190,7 @@ public class CollectionAdminRequest extends SolrRequest protected Boolean autoAddReplicas; protected Integer stateFormat; protected String asyncId; - + private String[] rule , snitch; public Create() { action = CollectionAction.CREATE; } @@ -208,6 +208,8 @@ public class CollectionAdminRequest extends SolrRequest public void setAsyncId(String asyncId) { this.asyncId = asyncId; } + public void setRule(String... s){ this.rule = s; } + public void setSnitch(String... s){ this.snitch = s; } public String getConfigName() { return configName; } public String getCreateNodeSet() { return createNodeSet; } @@ -260,6 +262,8 @@ public class CollectionAdminRequest extends SolrRequest if (stateFormat != null) { params.set(DocCollection.STATE_FORMAT, stateFormat); } + if(rule != null) params.set("rule", rule); + if(snitch != null) params.set("snitch", snitch); return params; } diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/request/GenericSolrRequest.java b/solr/solrj/src/java/org/apache/solr/client/solrj/request/GenericSolrRequest.java new file mode 100644 index 00000000000..e447ad5334a --- /dev/null +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/request/GenericSolrRequest.java @@ -0,0 +1,58 @@ +package org.apache.solr.client.solrj.request; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.io.IOException; +import java.util.Collection; + +import org.apache.solr.client.solrj.SolrClient; +import org.apache.solr.client.solrj.SolrRequest; +import org.apache.solr.client.solrj.response.SimpleSolrResponse; +import org.apache.solr.common.params.SolrParams; +import org.apache.solr.common.util.ContentStream; + +public class GenericSolrRequest extends SolrRequest { + public SolrParams params; + public SimpleSolrResponse response = new SimpleSolrResponse(); + private Collection contentStreams; + + public GenericSolrRequest(METHOD m, String path, SolrParams params) { + super(m, path); + this.params = params; + } + + public void setContentStreams(Collection streams) { + contentStreams = streams; + } + + + @Override + public SolrParams getParams() { + return params; + } + + @Override + public Collection getContentStreams() throws IOException { + return null; + } + + @Override + protected SimpleSolrResponse createResponse(SolrClient client) { + return response; + } +} diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/response/SimpleSolrResponse.java b/solr/solrj/src/java/org/apache/solr/client/solrj/response/SimpleSolrResponse.java new file mode 100644 index 00000000000..e7e8c9e80d6 --- /dev/null +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/response/SimpleSolrResponse.java @@ -0,0 +1,49 @@ +package org.apache.solr.client.solrj.response; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +import org.apache.solr.client.solrj.SolrResponse; +import org.apache.solr.common.util.NamedList; + +public class SimpleSolrResponse extends SolrResponse { + + public long elapsedTime; + + public NamedList nl; + + @Override + public long getElapsedTime() { + return elapsedTime; + } + + @Override + public NamedList getResponse() { + return nl; + } + + @Override + public void setResponse(NamedList rsp) { + nl = rsp; + } + + @Override + public void setElapsedTime(long elapsedTime) { + this.elapsedTime = elapsedTime; + } +} diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java index 907de220807..57dc6529240 100644 --- a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java +++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java @@ -37,6 +37,7 @@ import java.io.Closeable; import java.io.IOException; import java.io.UnsupportedEncodingException; import java.net.URLDecoder; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -131,6 +132,10 @@ public class ZkStateReader implements Closeable { return toUTF8(out); } + public static String toJSONString(Object o) { + return new String(toJSON(o), StandardCharsets.UTF_8); + } + public static byte[] toUTF8(CharArr out) { byte[] arr = new byte[out.size() << 2]; // is 4x the real worst-case upper-bound? int nBytes = ByteUtils.UTF16toUTF8(out, 0, out.size(), arr, 0); diff --git a/solr/solrj/src/java/org/apache/solr/common/params/CoreAdminParams.java b/solr/solrj/src/java/org/apache/solr/common/params/CoreAdminParams.java index fcafd92a6dd..bf6cabb2eb0 100644 --- a/solr/solrj/src/java/org/apache/solr/common/params/CoreAdminParams.java +++ b/solr/solrj/src/java/org/apache/solr/common/params/CoreAdminParams.java @@ -138,8 +138,9 @@ public abstract class CoreAdminParams TRANSIENT, OVERSEEROP, REQUESTSTATUS, - REJOINLEADERELECTION; - + REJOINLEADERELECTION, + INVOKE; + public static CoreAdminAction get( String p ) { if( p != null ) { diff --git a/solr/solrj/src/java/org/apache/solr/common/params/ModifiableSolrParams.java b/solr/solrj/src/java/org/apache/solr/common/params/ModifiableSolrParams.java index a1f176a226f..c8287fd02bf 100644 --- a/solr/solrj/src/java/org/apache/solr/common/params/ModifiableSolrParams.java +++ b/solr/solrj/src/java/org/apache/solr/common/params/ModifiableSolrParams.java @@ -57,6 +57,10 @@ public class ModifiableSolrParams extends SolrParams } } + public int size() { + return vals == null ? 0 : vals.size(); + } + public Map getMap() { return vals; }