From e5045d5538d0877e9905d3f527e1f6bf75f69179 Mon Sep 17 00:00:00 2001 From: Noble Paul Date: Wed, 31 Jul 2013 17:53:02 +0000 Subject: [PATCH] SOLR-4221 SOLR-4808 SOLR-5006 SOLR-5017 SOLR-4222 git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1508968 13f79535-47bb-0310-9956-ffa450edef68 --- solr/CHANGES.txt | 5 + .../java/org/apache/solr/cloud/Assign.java | 107 +++++- .../java/org/apache/solr/cloud/Overseer.java | 118 +++++-- .../cloud/OverseerCollectionProcessor.java | 209 +++++++++--- .../handler/admin/CollectionsHandler.java | 76 +++-- .../handler/component/HttpShardHandler.java | 28 +- .../processor/DistributedUpdateProcessor.java | 24 +- .../solr/collection1/conf/schema.xml | 1 + .../CollectionsAPIDistributedZkTest.java | 308 +++++++++++++++--- .../OverseerCollectionProcessorTest.java | 115 +++++-- .../apache/solr/common/cloud/DocRouter.java | 4 +- .../solr/common/cloud/ImplicitDocRouter.java | 36 +- .../apache/solr/common/cloud/ZkNodeProps.java | 18 +- .../solr/common/params/CollectionParams.java | 2 +- .../common/params/RequiredSolrParams.java | 4 + .../solr/common/params/ShardParams.java | 5 + .../cloud/AbstractFullDistribZkTestBase.java | 84 +++-- 17 files changed, 915 insertions(+), 229 deletions(-) diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt index 1466018e90d..bfbf025d6ec 100644 --- a/solr/CHANGES.txt +++ b/solr/CHANGES.txt @@ -75,6 +75,11 @@ New Features the "ie" (input encoding) parameter, e.g. "select?q=m%FCller&ie=ISO-8859-1". The default is UTF-8. To change the encoding of POSTed content, use the "Content-Type" HTTP header. (Uwe Schindler, David Smiley) +* SOLR-4221: Custom sharding (Noble Paul) +* SOLR-4808: Persist and use router,replicationFactor and maxShardsPerNode at Collection and Shard level (Noble Paul, Shalin Mangar) +* SOLR-5006: CREATESHARD command for 'implicit' shards (Noble Paul) +* SOLR-5017: Allow sharding based on the value of a field (Noble Paul) +* SOLR-4222:create custom sharded collection via collections API (Noble Paul) Bug Fixes ---------------------- diff --git a/solr/core/src/java/org/apache/solr/cloud/Assign.java b/solr/core/src/java/org/apache/solr/cloud/Assign.java index 8f0120b81d8..62f62fc3ab1 100644 --- a/solr/core/src/java/org/apache/solr/cloud/Assign.java +++ b/solr/core/src/java/org/apache/solr/cloud/Assign.java @@ -17,22 +17,36 @@ package org.apache.solr.cloud; * the License. */ +import org.apache.solr.common.SolrException; +import org.apache.solr.common.cloud.ClusterState; +import org.apache.solr.common.cloud.DocCollection; +import org.apache.solr.common.cloud.Replica; +import org.apache.solr.common.cloud.Slice; +import org.apache.solr.common.util.StrUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.regex.Matcher; import java.util.regex.Pattern; -import org.apache.solr.common.cloud.ClusterState; -import org.apache.solr.common.cloud.Replica; -import org.apache.solr.common.cloud.Slice; +import static org.apache.solr.cloud.OverseerCollectionProcessor.CREATE_NODE_SET; +import static org.apache.solr.cloud.OverseerCollectionProcessor.MAX_SHARDS_PER_NODE; +import static org.apache.solr.cloud.OverseerCollectionProcessor.NUM_SLICES; +import static org.apache.solr.cloud.OverseerCollectionProcessor.REPLICATION_FACTOR; public class Assign { private static Pattern COUNT = Pattern.compile("core_node(\\d+)"); + private static Logger log = LoggerFactory + .getLogger(Assign.class); public static String assignNode(String collection, ClusterState state) { Map sliceMap = state.getSlicesMap(collection); @@ -100,4 +114,91 @@ public class Assign { returnShardId = shardIdNames.get(0); return returnShardId; } + + static class Node { + public final String nodeName; + public int thisCollectionNodes=0; + public int totalNodes=0; + + Node(String nodeName) { + this.nodeName = nodeName; + } + + public int weight(){ + return (thisCollectionNodes * 100) + totalNodes; + } + } + + public static ArrayList getNodesForNewShard(ClusterState clusterState, String collectionName, int numSlices, int maxShardsPerNode, int repFactor, String createNodeSetStr) { + List createNodeList = createNodeSetStr == null ? null: StrUtils.splitSmart(createNodeSetStr, ",", true); + + + Set nodes = clusterState.getLiveNodes(); + + List nodeList = new ArrayList(nodes.size()); + nodeList.addAll(nodes); + if (createNodeList != null) nodeList.retainAll(createNodeList); + + + HashMap nodeNameVsShardCount = new HashMap(); + for (String s : nodeList) nodeNameVsShardCount.put(s,new Node(s)); + for (String s : clusterState.getCollections()) { + DocCollection c = clusterState.getCollection(s); + //identify suitable nodes by checking the no:of cores in each of them + for (Slice slice : c.getSlices()) { + Collection replicas = slice.getReplicas(); + for (Replica replica : replicas) { + Node count = nodeNameVsShardCount.get(replica.getNodeName()); + if (count != null) { + count.totalNodes++; + if (s.equals(collectionName)) { + count.thisCollectionNodes++; + if (count.thisCollectionNodes >= maxShardsPerNode) nodeNameVsShardCount.remove(replica.getNodeName()); + } + } + } + } + } + + if (nodeNameVsShardCount.size() <= 0) { + throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Cannot create collection " + collectionName + + ". No live Solr-instances" + ((createNodeList != null)?" among Solr-instances specified in " + CREATE_NODE_SET + ":" + createNodeSetStr:"")); + } + + if (repFactor > nodeNameVsShardCount.size()) { + log.warn("Specified " + + REPLICATION_FACTOR + + " of " + + repFactor + + " on collection " + + collectionName + + " is higher than or equal to the number of Solr instances currently live or part of your " + CREATE_NODE_SET + "(" + + nodeList.size() + + "). Its unusual to run two replica of the same slice on the same Solr-instance."); + } + + int maxCoresAllowedToCreate = maxShardsPerNode * nodeList.size(); + int requestedCoresToCreate = numSlices * repFactor; + int minCoresToCreate = requestedCoresToCreate; + if (maxCoresAllowedToCreate < minCoresToCreate) { + throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Cannot create shards " + collectionName + ". Value of " + + MAX_SHARDS_PER_NODE + " is " + maxShardsPerNode + + ", and the number of live nodes is " + nodeList.size() + + ". This allows a maximum of " + maxCoresAllowedToCreate + + " to be created. Value of " + NUM_SLICES + " is " + numSlices + + " and value of " + REPLICATION_FACTOR + " is " + repFactor + + ". This requires " + requestedCoresToCreate + + " shards to be created (higher than the allowed number)"); + } + + ArrayList sortedNodeList = new ArrayList<>(nodeNameVsShardCount.values()); + Collections.sort(sortedNodeList, new Comparator() { + @Override + public int compare(Node x, Node y) { + return (x.weight() < y.weight()) ? -1 : ((x.weight() == y.weight()) ? 0 : 1); + } + }); + return sortedNodeList; + } + } diff --git a/solr/core/src/java/org/apache/solr/cloud/Overseer.java b/solr/core/src/java/org/apache/solr/cloud/Overseer.java index 1aabf41bcf0..c6a9b2db763 100644 --- a/solr/core/src/java/org/apache/solr/cloud/Overseer.java +++ b/solr/core/src/java/org/apache/solr/cloud/Overseer.java @@ -17,16 +17,6 @@ package org.apache.solr.cloud; * the License. */ -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Set; - import org.apache.solr.common.SolrException; import org.apache.solr.common.SolrException.ErrorCode; import org.apache.solr.common.cloud.ClosableThread; @@ -46,6 +36,16 @@ import org.apache.zookeeper.KeeperException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; + /** * Cluster leader. Responsible node assignments, cluster state file? */ @@ -54,7 +54,7 @@ public class Overseer { public static final String DELETECORE = "deletecore"; public static final String REMOVECOLLECTION = "removecollection"; public static final String REMOVESHARD = "removeshard"; - + private static final int STATE_UPDATE_DELAY = 1500; // delay between cloud state updates @@ -203,13 +203,36 @@ public class Overseer { clusterState = createShard(clusterState, message); } else if ("updateshardstate".equals(operation)) { clusterState = updateShardState(clusterState, message); + } else if (OverseerCollectionProcessor.CREATECOLLECTION.equals(operation)) { + clusterState = buildCollection(clusterState, message); } else { throw new RuntimeException("unknown operation:" + operation + " contents:" + message.getProperties()); } return clusterState; } - + + private ClusterState buildCollection(ClusterState clusterState, ZkNodeProps message) { + String collection = message.getStr("name"); + log.info("building a new collection: " + collection); + if(clusterState.getCollections().contains(collection) ){ + log.warn("Collection {} already exists. exit" ,collection); + return clusterState; + } + + ArrayList shardNames = new ArrayList(); + + if(ImplicitDocRouter.NAME.equals( message.getStr("router",DocRouter.DEFAULT_NAME))){ + getShardNames(shardNames,message.getStr("shards",DocRouter.DEFAULT_NAME)); + } else { + int numShards = message.getInt(ZkStateReader.NUM_SHARDS_PROP, -1); + if(numShards<1) throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,"numShards is a required parameter for 'compositeId' router"); + getShardNames(numShards, shardNames); + } + + return createCollection(clusterState,collection,shardNames,message); + } + private ClusterState updateShardState(ClusterState clusterState, ZkNodeProps message) { String collection = message.getStr(ZkStateReader.COLLECTION_PROP); log.info("Update shard state invoked for collection: " + collection); @@ -294,12 +317,22 @@ public class Overseer { } message.getProperties().put(ZkStateReader.CORE_NODE_NAME_PROP, coreNodeName); } - Integer numShards = message.getStr(ZkStateReader.NUM_SHARDS_PROP)!=null?Integer.parseInt(message.getStr(ZkStateReader.NUM_SHARDS_PROP)):null; + Integer numShards = message.getInt(ZkStateReader.NUM_SHARDS_PROP, null); log.info("Update state numShards={} message={}", numShards, message); + + String router = message.getStr(OverseerCollectionProcessor.ROUTER,DocRouter.DEFAULT_NAME); + List shardNames = new ArrayList(); + //collection does not yet exist, create placeholders if num shards is specified boolean collectionExists = state.getCollections().contains(collection); if (!collectionExists && numShards!=null) { - state = createCollection(state, collection, numShards); + if(ImplicitDocRouter.NAME.equals(router)){ + getShardNames(shardNames, message.getStr("shards",null)); + numShards = shardNames.size(); + }else { + getShardNames(numShards, shardNames); + } + state = createCollection(state, collection, shardNames, message); } // use the provided non null shardId @@ -391,34 +424,42 @@ public class Overseer { return newClusterState; } - private Map defaultCollectionProps() { - HashMap props = new HashMap(2); - props.put(DocCollection.DOC_ROUTER, DocRouter.DEFAULT_NAME); - return props; - } + private ClusterState createCollection(ClusterState state, String collectionName, List shards , ZkNodeProps message) { + log.info("Create collection {} with shards {}", collectionName, shards);; - private ClusterState createCollection(ClusterState state, String collectionName, int numShards) { - log.info("Create collection {} with numShards {}", collectionName, numShards); + String routerName = message.getStr(OverseerCollectionProcessor.ROUTER,DocRouter.DEFAULT_NAME); + DocRouter router = DocRouter.getDocRouter(routerName); - DocRouter router = DocRouter.DEFAULT; - List ranges = router.partitionRange(numShards, router.fullRange()); + List ranges = router.partitionRange(shards.size(), router.fullRange()); Map newCollections = new LinkedHashMap(); Map newSlices = new LinkedHashMap(); newCollections.putAll(state.getCollectionStates()); + for (int i = 0; i < shards.size(); i++) { + String sliceName = shards.get(i); + /*} for (int i = 0; i < numShards; i++) { - final String sliceName = "shard" + (i+1); + final String sliceName = "shard" + (i+1);*/ - Map sliceProps = new LinkedHashMap(1); - sliceProps.put(Slice.RANGE, ranges.get(i)); + Map sliceProps = new LinkedHashMap(1); + sliceProps.put(Slice.RANGE, ranges == null? null: ranges.get(i)); newSlices.put(sliceName, new Slice(sliceName, null, sliceProps)); } // TODO: fill in with collection properties read from the /collections/ node - Map collectionProps = defaultCollectionProps(); + Map collectionProps = new HashMap(); + + for (Entry e : OverseerCollectionProcessor.COLL_PROPS.entrySet()) { + Object val = message.get(e.getKey()); + if(val == null){ + val = OverseerCollectionProcessor.COLL_PROPS.get(e.getKey()); + } + if(val != null) collectionProps.put(e.getKey(),val); + } + collectionProps.put(DocCollection.DOC_ROUTER, routerName); DocCollection newCollection = new DocCollection(collectionName, newSlices, collectionProps, router); @@ -466,7 +507,6 @@ public class Overseer { private ClusterState updateSlice(ClusterState state, String collectionName, Slice slice) { // System.out.println("###!!!### OLD CLUSTERSTATE: " + JSONUtil.toJSON(state.getCollectionStates())); // System.out.println("Updating slice:" + slice); - Map newCollections = new LinkedHashMap(state.getCollectionStates()); // make a shallow copy DocCollection coll = newCollections.get(collectionName); Map slices; @@ -681,6 +721,28 @@ public class Overseer { } + static void getShardNames(Integer numShards, List shardNames) { + if(numShards == null) + throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "numShards" + " is a required param"); + for (int i = 0; i < numShards; i++) { + final String sliceName = "shard" + (i + 1); + shardNames.add(sliceName); + } + + } + + static void getShardNames(List shardNames, String shards) { + if(shards ==null) + throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "shards" + " is a required param"); + for (String s : shards.split(",")) { + if(s ==null || s.trim().isEmpty()) continue; + shardNames.add(s.trim()); + } + if(shardNames.isEmpty()) + throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "shards" + " is a required param"); + + } + class OverseerThread extends Thread implements ClosableThread { private volatile boolean isClosed; diff --git a/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java b/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java index b98c4603324..f2d336b055c 100644 --- a/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java +++ b/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java @@ -17,14 +17,6 @@ package org.apache.solr.cloud; * limitations under the License. */ -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; - import org.apache.solr.client.solrj.SolrResponse; import org.apache.solr.client.solrj.SolrServerException; import org.apache.solr.client.solrj.impl.HttpSolrServer; @@ -40,6 +32,7 @@ import org.apache.solr.common.cloud.ClosableThread; import org.apache.solr.common.cloud.ClusterState; import org.apache.solr.common.cloud.DocCollection; import org.apache.solr.common.cloud.DocRouter; +import org.apache.solr.common.cloud.ImplicitDocRouter; import org.apache.solr.common.cloud.PlainIdRouter; import org.apache.solr.common.cloud.Replica; import org.apache.solr.common.cloud.Slice; @@ -61,6 +54,21 @@ import org.apache.zookeeper.KeeperException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.apache.solr.cloud.Assign.Node; +import static org.apache.solr.cloud.Assign.getNodesForNewShard; +import static org.apache.solr.common.cloud.DocRouter.ROUTE_FIELD; +import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP; +import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP; + + public class OverseerCollectionProcessor implements Runnable, ClosableThread { public static final String NUM_SLICES = "numShards"; @@ -85,6 +93,22 @@ public class OverseerCollectionProcessor implements Runnable, ClosableThread { public static final String DELETESHARD = "deleteshard"; + public static final String ROUTER = "router"; + + public static final String SHARDS_PROP = "shards"; + + public static final String CREATESHARD = "createshard"; + + public static final String COLL_CONF = "collection.configName"; + + + public static final Map COLL_PROPS = asMap( + ROUTER,DocRouter.DEFAULT_NAME, + REPLICATION_FACTOR, "1", + MAX_SHARDS_PER_NODE,"1", + ROUTE_FIELD,null); + + // TODO: use from Overseer? private static final String QUEUE_OPERATION = "operation"; @@ -168,7 +192,8 @@ public class OverseerCollectionProcessor implements Runnable, ClosableThread { protected SolrResponse processMessage(ZkNodeProps message, String operation) { - + log.warn("OverseerCollectionProcessor.processMessage : "+ operation + " , "+ message.toString()); + NamedList results = new NamedList(); try { if (CREATECOLLECTION.equals(operation)) { @@ -185,6 +210,8 @@ public class OverseerCollectionProcessor implements Runnable, ClosableThread { deleteAlias(zkStateReader.getAliases(), message); } else if (SPLITSHARD.equals(operation)) { splitShard(zkStateReader.getClusterState(), message, results); + } else if (CREATESHARD.equals(operation)) { + createShard(zkStateReader.getClusterState(), message, results); } else if (DELETESHARD.equals(operation)) { deleteShard(zkStateReader.getClusterState(), message, results); } else { @@ -333,7 +360,84 @@ public class OverseerCollectionProcessor implements Runnable, ClosableThread { } } - + + private boolean createShard(ClusterState clusterState, ZkNodeProps message, NamedList results) throws KeeperException, InterruptedException { + log.info("create shard invoked"); + String collectionName = message.getStr(COLLECTION_PROP); + String shard = message.getStr(SHARD_ID_PROP); + if(collectionName == null || shard ==null) + throw new SolrException(ErrorCode.BAD_REQUEST, "'collection' and 'shard' are required parameters" ); + int numSlices = 1; + + DocCollection collection = clusterState.getCollection(collectionName); + int maxShardsPerNode = collection.getInt(MAX_SHARDS_PER_NODE, 1); + int repFactor = message.getInt(REPLICATION_FACTOR, collection.getInt(MAX_SHARDS_PER_NODE, 1)); +// int minReplicas = message.getInt("minReplicas",repFactor); + String createNodeSetStr =message.getStr(CREATE_NODE_SET); + + ArrayList sortedNodeList = getNodesForNewShard(clusterState, collectionName, numSlices, maxShardsPerNode, repFactor, createNodeSetStr); + + Overseer.getInQueue(zkStateReader.getZkClient()).offer(ZkStateReader.toJSON(message)); + // wait for a while until we don't see the collection + long waitUntil = System.currentTimeMillis() + 30000; + boolean created = false; + while (System.currentTimeMillis() < waitUntil) { + Thread.sleep(100); + created = zkStateReader.getClusterState().getCollection(collectionName).getSlice(shard) !=null; + if (created) break; + } + if (!created) + throw new SolrException(ErrorCode.SERVER_ERROR, "Could not fully create shard: " + message.getStr("name")); + + + String configName = message.getStr(COLL_CONF); + String sliceName = shard; + for (int j = 1; j <= repFactor; j++) { + String nodeName = sortedNodeList.get(((j - 1)) % sortedNodeList.size()).nodeName; + String shardName = collectionName + "_" + sliceName + "_replica" + j; + log.info("Creating shard " + shardName + " as part of slice " + + sliceName + " of collection " + collectionName + " on " + + nodeName); + + // Need to create new params for each request + ModifiableSolrParams params = new ModifiableSolrParams(); + params.set(CoreAdminParams.ACTION, CoreAdminAction.CREATE.toString()); + + params.set(CoreAdminParams.NAME, shardName); + params.set(COLL_CONF, configName); + params.set(CoreAdminParams.COLLECTION, collectionName); + params.set(CoreAdminParams.SHARD, sliceName); + params.set(ZkStateReader.NUM_SHARDS_PROP, numSlices); + + ShardRequest sreq = new ShardRequest(); + params.set("qt", adminPath); + sreq.purpose = 1; + String replica = zkStateReader.getZkClient() + .getBaseUrlForNodeName(nodeName); + if (replica.startsWith("http://")) replica = replica.substring(7); + sreq.shards = new String[]{replica}; + sreq.actualShards = sreq.shards; + sreq.params = params; + + shardHandler.submit(sreq, replica, sreq.params); + + } + + ShardResponse srsp; + do { + srsp = shardHandler.takeCompletedOrError(); + if (srsp != null) { + processResponse(results, srsp); + } + } while (srsp != null); + + log.info("Finished create command on all shards for collection: " + + collectionName); + + return true; + } + + private boolean splitShard(ClusterState clusterState, ZkNodeProps message, NamedList results) { log.info("Split shard invoked"); String collectionName = message.getStr("collection"); @@ -674,7 +778,7 @@ public class OverseerCollectionProcessor implements Runnable, ClosableThread { "The slice: " + slice.getName() + " is currently " + slice.getState() + ". Only INACTIVE (or custom-hashed) slices can be deleted."); } - + try { ModifiableSolrParams params = new ModifiableSolrParams(); params.set(CoreAdminParams.ACTION, CoreAdminAction.UNLOAD.toString()); @@ -732,7 +836,7 @@ public class OverseerCollectionProcessor implements Runnable, ClosableThread { shardHandler.submit(sreq, replica, sreq.params); } - private void createCollection(ClusterState clusterState, ZkNodeProps message, NamedList results) { + private void createCollection(ClusterState clusterState, ZkNodeProps message, NamedList results) throws KeeperException, InterruptedException { String collectionName = message.getStr("name"); if (clusterState.getCollections().contains(collectionName)) { throw new SolrException(ErrorCode.BAD_REQUEST, "collection already exists: " + collectionName); @@ -742,14 +846,22 @@ public class OverseerCollectionProcessor implements Runnable, ClosableThread { // look at the replication factor and see if it matches reality // if it does not, find best nodes to create more cores - int repFactor = msgStrToInt(message, REPLICATION_FACTOR, 1); - Integer numSlices = msgStrToInt(message, NUM_SLICES, null); - - if (numSlices == null) { + int repFactor = message.getInt( REPLICATION_FACTOR, 1); + Integer numSlices = message.getInt(NUM_SLICES, null); + String router = message.getStr(ROUTER, DocRouter.DEFAULT_NAME); + List shardNames = new ArrayList<>(); + if(ImplicitDocRouter.NAME.equals(router)){ + Overseer.getShardNames(shardNames, message.getStr("shards",null)); + numSlices = shardNames.size(); + } else { + Overseer.getShardNames(numSlices,shardNames); + } + + if (numSlices == null ) { throw new SolrException(ErrorCode.BAD_REQUEST, NUM_SLICES + " is a required param"); } - - int maxShardsPerNode = msgStrToInt(message, MAX_SHARDS_PER_NODE, 1); + + int maxShardsPerNode = message.getInt(MAX_SHARDS_PER_NODE, 1); String createNodeSetStr; List createNodeList = ((createNodeSetStr = message.getStr(CREATE_NODE_SET)) == null)?null:StrUtils.splitSmart(createNodeSetStr, ",", true); @@ -761,8 +873,6 @@ public class OverseerCollectionProcessor implements Runnable, ClosableThread { throw new SolrException(ErrorCode.BAD_REQUEST, NUM_SLICES + " must be > 0"); } - String configName = message.getStr("collection.configName"); - // we need to look at every node and see how many cores it serves // add our new cores to existing nodes serving the least number of cores // but (for now) require that each core goes on a distinct node. @@ -805,26 +915,44 @@ public class OverseerCollectionProcessor implements Runnable, ClosableThread { + ". This requires " + requestedShardsToCreate + " shards to be created (higher than the allowed number)"); } - - for (int i = 1; i <= numSlices; i++) { + +// ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, +// Overseer.CREATECOLLECTION, "name", message.getStr("name")); + Overseer.getInQueue(zkStateReader.getZkClient()).offer(ZkStateReader.toJSON(message)); + + // wait for a while until we don't see the collection + long waitUntil = System.currentTimeMillis() + 30000; + boolean created = false; + while (System.currentTimeMillis() < waitUntil) { + Thread.sleep(100); + created = zkStateReader.getClusterState().getCollections().contains(message.getStr("name")); + if(created) break; + } + if (!created) + throw new SolrException(ErrorCode.SERVER_ERROR, "Could not fully createcollection: " + message.getStr("name")); + + + String configName = message.getStr(COLL_CONF); + log.info("going to create cores replicas shardNames {} , repFactor : {}", shardNames, repFactor); + for (int i = 1; i <= shardNames.size(); i++) { + String sliceName = shardNames.get(i-1); for (int j = 1; j <= repFactor; j++) { String nodeName = nodeList.get((repFactor * (i - 1) + (j - 1)) % nodeList.size()); - String sliceName = "shard" + i; String shardName = collectionName + "_" + sliceName + "_replica" + j; log.info("Creating shard " + shardName + " as part of slice " + sliceName + " of collection " + collectionName + " on " + nodeName); - + // Need to create new params for each request ModifiableSolrParams params = new ModifiableSolrParams(); params.set(CoreAdminParams.ACTION, CoreAdminAction.CREATE.toString()); - + params.set(CoreAdminParams.NAME, shardName); - params.set("collection.configName", configName); + params.set(COLL_CONF, configName); params.set(CoreAdminParams.COLLECTION, collectionName); params.set(CoreAdminParams.SHARD, sliceName); params.set(ZkStateReader.NUM_SHARDS_PROP, numSlices); - + ShardRequest sreq = new ShardRequest(); params.set("qt", adminPath); sreq.purpose = 1; @@ -834,12 +962,12 @@ public class OverseerCollectionProcessor implements Runnable, ClosableThread { sreq.shards = new String[] {replica}; sreq.actualShards = sreq.shards; sreq.params = params; - + shardHandler.submit(sreq, replica, sreq.params); - + } } - + ShardResponse srsp; do { srsp = shardHandler.takeCompletedOrError(); @@ -857,7 +985,7 @@ public class OverseerCollectionProcessor implements Runnable, ClosableThread { throw new SolrException(ErrorCode.SERVER_ERROR, null, ex); } } - + private void collectionCmd(ClusterState clusterState, ZkNodeProps message, ModifiableSolrParams params, NamedList results, String stateMatcher) { log.info("Executing Collection Cmd : " + params); String collectionName = message.getStr("name"); @@ -947,19 +1075,16 @@ public class OverseerCollectionProcessor implements Runnable, ClosableThread { } } - private Integer msgStrToInt(ZkNodeProps message, String key, Integer def) - throws Exception { - String str = message.getStr(key); - try { - return str == null ? def : Integer.valueOf(str); - } catch (Exception ex) { - SolrException.log(log, "Could not parse " + key, ex); - throw ex; - } - } - @Override public boolean isClosed() { return isClosed; } + + public static Map asMap(Object... vals) { + HashMap m = new HashMap(); + for(int i=0; i props = new HashMap(); props.put(Overseer.QUEUE_OPERATION, OverseerCollectionProcessor.CREATECOLLECTION); - props.put(OverseerCollectionProcessor.REPLICATION_FACTOR, numReplicas.toString()); - props.put("name", name); - if (configName != null) { - props.put("collection.configName", configName); - } - props.put(OverseerCollectionProcessor.NUM_SLICES, numShards); - props.put(OverseerCollectionProcessor.MAX_SHARDS_PER_NODE, maxShardsPerNode); - props.put(OverseerCollectionProcessor.CREATE_NODE_SET, createNodeSetStr); - - ZkNodeProps m = new ZkNodeProps(props); + copyIfNotNull(req.getParams(),props, + "name", + REPLICATION_FACTOR, + COLL_CONF, + NUM_SLICES, + MAX_SHARDS_PER_NODE, + CREATE_NODE_SET , + ROUTER, + SHARDS_PROP, + ROUTE_FIELD); + + + ZkNodeProps m = new ZkNodeProps(props); handleResponse(OverseerCollectionProcessor.CREATECOLLECTION, m, rsp); } + + private void handleCreateShard(SolrQueryRequest req, SolrQueryResponse rsp) throws KeeperException, InterruptedException { + log.info("Create shard: " + req.getParamString()); + req.getParams().required().check(COLLECTION_PROP, SHARD_ID_PROP); + ClusterState clusterState = coreContainer.getZkController().getClusterState(); + if(!ImplicitDocRouter.NAME.equals( clusterState.getCollection(req.getParams().get(COLLECTION_PROP)).getStr(ROUTER))) + throw new SolrException(ErrorCode.BAD_REQUEST, "shards can be added only to 'implicit' collections" ); + + Map map = OverseerCollectionProcessor.asMap(QUEUE_OPERATION, CREATESHARD); + copyIfNotNull(req.getParams(),map,COLLECTION_PROP, SHARD_ID_PROP, REPLICATION_FACTOR); + ZkNodeProps m = new ZkNodeProps(map); + handleResponse(CREATESHARD, m, rsp); + } + + private static void copyIfNotNull(SolrParams params, Map props, String... keys) { + if(keys !=null){ + for (String key : keys) { + String v = params.get(key); + if(v != null) props.put(key,v); + } + } + + } private void handleDeleteShardAction(SolrQueryRequest req, SolrQueryResponse rsp) throws InterruptedException, KeeperException { diff --git a/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandler.java b/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandler.java index 206fc091c40..90e8aad9376 100644 --- a/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandler.java +++ b/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandler.java @@ -16,18 +16,6 @@ package org.apache.solr.handler.component; * limitations under the License. */ -import java.net.ConnectException; -import java.util.Collection; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.Callable; -import java.util.concurrent.CompletionService; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; - import org.apache.http.client.HttpClient; import org.apache.solr.client.solrj.SolrRequest; import org.apache.solr.client.solrj.SolrResponse; @@ -44,7 +32,6 @@ import org.apache.solr.common.cloud.DocCollection; import org.apache.solr.common.cloud.Replica; import org.apache.solr.common.cloud.Slice; import org.apache.solr.common.cloud.ZkCoreNodeProps; -import org.apache.solr.common.cloud.ZkNodeProps; import org.apache.solr.common.cloud.ZkStateReader; import org.apache.solr.common.params.CommonParams; import org.apache.solr.common.params.ModifiableSolrParams; @@ -55,6 +42,18 @@ import org.apache.solr.common.util.StrUtils; import org.apache.solr.core.CoreDescriptor; import org.apache.solr.request.SolrQueryRequest; +import java.net.ConnectException; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.CompletionService; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + public class HttpShardHandler extends ShardHandler { private HttpShardHandlerFactory httpShardHandlerFactory; @@ -277,7 +276,8 @@ public class HttpShardHandler extends ShardHandler { // we weren't provided with an explicit list of slices to query via "shards", so use the cluster state clusterState = zkController.getClusterState(); - String shardKeys = params.get(ShardParams.SHARD_KEYS); + String shardKeys = params.get(ShardParams._ROUTE_); + if(shardKeys == null) shardKeys = params.get(ShardParams.SHARD_KEYS);//eprecated // This will be the complete list of slices we need to query for this request. slices = new HashMap(); diff --git a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java index 83ee34f5fd4..796d101e63d 100644 --- a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java +++ b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java @@ -17,16 +17,6 @@ package org.apache.solr.update.processor; * limitations under the License. */ -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Set; - import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.CharsRef; import org.apache.solr.client.solrj.impl.HttpSolrServer; @@ -74,6 +64,16 @@ import org.apache.solr.update.VersionInfo; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; + import static org.apache.solr.update.processor.DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM; // NOT mt-safe... create a new processor for each add thread @@ -917,7 +917,9 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor { outParams.set(DISTRIB_UPDATE_PARAM, DistribPhase.TOLEADER.toString()); SolrParams params = req.getParams(); - Collection slices = coll.getRouter().getSearchSlices(params.get(ShardParams.SHARD_KEYS), params, coll); + String route = params.get(ShardParams._ROUTE_); + if(route == null) route = params.get(ShardParams.SHARD_KEYS);// deprecated . kept for backcompat + Collection slices = coll.getRouter().getSearchSlices(route, params, coll); List leaders = new ArrayList(slices.size()); for (Slice slice : slices) { diff --git a/solr/core/src/test-files/solr/collection1/conf/schema.xml b/solr/core/src/test-files/solr/collection1/conf/schema.xml index 6ab77435f00..3d0068412ff 100644 --- a/solr/core/src/test-files/solr/collection1/conf/schema.xml +++ b/solr/core/src/test-files/solr/collection1/conf/schema.xml @@ -581,6 +581,7 @@ +