SOLR-4221 SOLR-4808 SOLR-5006 SOLR-5017 SOLR-4222

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1508968 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Noble Paul 2013-07-31 17:53:02 +00:00
parent 1d603a9f87
commit e5045d5538
17 changed files with 915 additions and 229 deletions

View File

@ -75,6 +75,11 @@ New Features
the "ie" (input encoding) parameter, e.g. "select?q=m%FCller&ie=ISO-8859-1".
The default is UTF-8. To change the encoding of POSTed content, use the
"Content-Type" HTTP header. (Uwe Schindler, David Smiley)
* SOLR-4221: Custom sharding (Noble Paul)
* SOLR-4808: Persist and use router,replicationFactor and maxShardsPerNode at Collection and Shard level (Noble Paul, Shalin Mangar)
* SOLR-5006: CREATESHARD command for 'implicit' shards (Noble Paul)
* SOLR-5017: Allow sharding based on the value of a field (Noble Paul)
* SOLR-4222:create custom sharded collection via collections API (Noble Paul)
Bug Fixes
----------------------

View File

@ -17,22 +17,36 @@ package org.apache.solr.cloud;
* the License.
*/
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.util.StrUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import static org.apache.solr.cloud.OverseerCollectionProcessor.CREATE_NODE_SET;
import static org.apache.solr.cloud.OverseerCollectionProcessor.MAX_SHARDS_PER_NODE;
import static org.apache.solr.cloud.OverseerCollectionProcessor.NUM_SLICES;
import static org.apache.solr.cloud.OverseerCollectionProcessor.REPLICATION_FACTOR;
public class Assign {
private static Pattern COUNT = Pattern.compile("core_node(\\d+)");
private static Logger log = LoggerFactory
.getLogger(Assign.class);
public static String assignNode(String collection, ClusterState state) {
Map<String, Slice> sliceMap = state.getSlicesMap(collection);
@ -100,4 +114,91 @@ public class Assign {
returnShardId = shardIdNames.get(0);
return returnShardId;
}
static class Node {
public final String nodeName;
public int thisCollectionNodes=0;
public int totalNodes=0;
Node(String nodeName) {
this.nodeName = nodeName;
}
public int weight(){
return (thisCollectionNodes * 100) + totalNodes;
}
}
public static ArrayList<Node> getNodesForNewShard(ClusterState clusterState, String collectionName, int numSlices, int maxShardsPerNode, int repFactor, String createNodeSetStr) {
List<String> createNodeList = createNodeSetStr == null ? null: StrUtils.splitSmart(createNodeSetStr, ",", true);
Set<String> nodes = clusterState.getLiveNodes();
List<String> nodeList = new ArrayList<String>(nodes.size());
nodeList.addAll(nodes);
if (createNodeList != null) nodeList.retainAll(createNodeList);
HashMap<String,Node> nodeNameVsShardCount = new HashMap<String, Node>();
for (String s : nodeList) nodeNameVsShardCount.put(s,new Node(s));
for (String s : clusterState.getCollections()) {
DocCollection c = clusterState.getCollection(s);
//identify suitable nodes by checking the no:of cores in each of them
for (Slice slice : c.getSlices()) {
Collection<Replica> replicas = slice.getReplicas();
for (Replica replica : replicas) {
Node count = nodeNameVsShardCount.get(replica.getNodeName());
if (count != null) {
count.totalNodes++;
if (s.equals(collectionName)) {
count.thisCollectionNodes++;
if (count.thisCollectionNodes >= maxShardsPerNode) nodeNameVsShardCount.remove(replica.getNodeName());
}
}
}
}
}
if (nodeNameVsShardCount.size() <= 0) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Cannot create collection " + collectionName
+ ". No live Solr-instances" + ((createNodeList != null)?" among Solr-instances specified in " + CREATE_NODE_SET + ":" + createNodeSetStr:""));
}
if (repFactor > nodeNameVsShardCount.size()) {
log.warn("Specified "
+ REPLICATION_FACTOR
+ " of "
+ repFactor
+ " on collection "
+ collectionName
+ " is higher than or equal to the number of Solr instances currently live or part of your " + CREATE_NODE_SET + "("
+ nodeList.size()
+ "). Its unusual to run two replica of the same slice on the same Solr-instance.");
}
int maxCoresAllowedToCreate = maxShardsPerNode * nodeList.size();
int requestedCoresToCreate = numSlices * repFactor;
int minCoresToCreate = requestedCoresToCreate;
if (maxCoresAllowedToCreate < minCoresToCreate) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Cannot create shards " + collectionName + ". Value of "
+ MAX_SHARDS_PER_NODE + " is " + maxShardsPerNode
+ ", and the number of live nodes is " + nodeList.size()
+ ". This allows a maximum of " + maxCoresAllowedToCreate
+ " to be created. Value of " + NUM_SLICES + " is " + numSlices
+ " and value of " + REPLICATION_FACTOR + " is " + repFactor
+ ". This requires " + requestedCoresToCreate
+ " shards to be created (higher than the allowed number)");
}
ArrayList<Node> sortedNodeList = new ArrayList<>(nodeNameVsShardCount.values());
Collections.sort(sortedNodeList, new Comparator<Node>() {
@Override
public int compare(Node x, Node y) {
return (x.weight() < y.weight()) ? -1 : ((x.weight() == y.weight()) ? 0 : 1);
}
});
return sortedNodeList;
}
}

View File

@ -17,16 +17,6 @@ package org.apache.solr.cloud;
* the License.
*/
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.cloud.ClosableThread;
@ -46,6 +36,16 @@ import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
/**
* Cluster leader. Responsible node assignments, cluster state file?
*/
@ -203,6 +203,8 @@ public class Overseer {
clusterState = createShard(clusterState, message);
} else if ("updateshardstate".equals(operation)) {
clusterState = updateShardState(clusterState, message);
} else if (OverseerCollectionProcessor.CREATECOLLECTION.equals(operation)) {
clusterState = buildCollection(clusterState, message);
} else {
throw new RuntimeException("unknown operation:" + operation
+ " contents:" + message.getProperties());
@ -210,6 +212,27 @@ public class Overseer {
return clusterState;
}
private ClusterState buildCollection(ClusterState clusterState, ZkNodeProps message) {
String collection = message.getStr("name");
log.info("building a new collection: " + collection);
if(clusterState.getCollections().contains(collection) ){
log.warn("Collection {} already exists. exit" ,collection);
return clusterState;
}
ArrayList<String> shardNames = new ArrayList<String>();
if(ImplicitDocRouter.NAME.equals( message.getStr("router",DocRouter.DEFAULT_NAME))){
getShardNames(shardNames,message.getStr("shards",DocRouter.DEFAULT_NAME));
} else {
int numShards = message.getInt(ZkStateReader.NUM_SHARDS_PROP, -1);
if(numShards<1) throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,"numShards is a required parameter for 'compositeId' router");
getShardNames(numShards, shardNames);
}
return createCollection(clusterState,collection,shardNames,message);
}
private ClusterState updateShardState(ClusterState clusterState, ZkNodeProps message) {
String collection = message.getStr(ZkStateReader.COLLECTION_PROP);
log.info("Update shard state invoked for collection: " + collection);
@ -294,12 +317,22 @@ public class Overseer {
}
message.getProperties().put(ZkStateReader.CORE_NODE_NAME_PROP, coreNodeName);
}
Integer numShards = message.getStr(ZkStateReader.NUM_SHARDS_PROP)!=null?Integer.parseInt(message.getStr(ZkStateReader.NUM_SHARDS_PROP)):null;
Integer numShards = message.getInt(ZkStateReader.NUM_SHARDS_PROP, null);
log.info("Update state numShards={} message={}", numShards, message);
String router = message.getStr(OverseerCollectionProcessor.ROUTER,DocRouter.DEFAULT_NAME);
List<String> shardNames = new ArrayList<String>();
//collection does not yet exist, create placeholders if num shards is specified
boolean collectionExists = state.getCollections().contains(collection);
if (!collectionExists && numShards!=null) {
state = createCollection(state, collection, numShards);
if(ImplicitDocRouter.NAME.equals(router)){
getShardNames(shardNames, message.getStr("shards",null));
numShards = shardNames.size();
}else {
getShardNames(numShards, shardNames);
}
state = createCollection(state, collection, shardNames, message);
}
// use the provided non null shardId
@ -391,34 +424,42 @@ public class Overseer {
return newClusterState;
}
private Map<String,Object> defaultCollectionProps() {
HashMap<String,Object> props = new HashMap<String, Object>(2);
props.put(DocCollection.DOC_ROUTER, DocRouter.DEFAULT_NAME);
return props;
}
private ClusterState createCollection(ClusterState state, String collectionName, List<String> shards , ZkNodeProps message) {
log.info("Create collection {} with shards {}", collectionName, shards);;
private ClusterState createCollection(ClusterState state, String collectionName, int numShards) {
log.info("Create collection {} with numShards {}", collectionName, numShards);
String routerName = message.getStr(OverseerCollectionProcessor.ROUTER,DocRouter.DEFAULT_NAME);
DocRouter router = DocRouter.getDocRouter(routerName);
DocRouter router = DocRouter.DEFAULT;
List<DocRouter.Range> ranges = router.partitionRange(numShards, router.fullRange());
List<DocRouter.Range> ranges = router.partitionRange(shards.size(), router.fullRange());
Map<String, DocCollection> newCollections = new LinkedHashMap<String,DocCollection>();
Map<String, Slice> newSlices = new LinkedHashMap<String,Slice>();
newCollections.putAll(state.getCollectionStates());
for (int i = 0; i < shards.size(); i++) {
String sliceName = shards.get(i);
/*}
for (int i = 0; i < numShards; i++) {
final String sliceName = "shard" + (i+1);
final String sliceName = "shard" + (i+1);*/
Map<String, Object> sliceProps = new LinkedHashMap<String, Object>(1);
sliceProps.put(Slice.RANGE, ranges.get(i));
sliceProps.put(Slice.RANGE, ranges == null? null: ranges.get(i));
newSlices.put(sliceName, new Slice(sliceName, null, sliceProps));
}
// TODO: fill in with collection properties read from the /collections/<collectionName> node
Map<String,Object> collectionProps = defaultCollectionProps();
Map<String,Object> collectionProps = new HashMap<String,Object>();
for (Entry<String, Object> e : OverseerCollectionProcessor.COLL_PROPS.entrySet()) {
Object val = message.get(e.getKey());
if(val == null){
val = OverseerCollectionProcessor.COLL_PROPS.get(e.getKey());
}
if(val != null) collectionProps.put(e.getKey(),val);
}
collectionProps.put(DocCollection.DOC_ROUTER, routerName);
DocCollection newCollection = new DocCollection(collectionName, newSlices, collectionProps, router);
@ -466,7 +507,6 @@ public class Overseer {
private ClusterState updateSlice(ClusterState state, String collectionName, Slice slice) {
// System.out.println("###!!!### OLD CLUSTERSTATE: " + JSONUtil.toJSON(state.getCollectionStates()));
// System.out.println("Updating slice:" + slice);
Map<String, DocCollection> newCollections = new LinkedHashMap<String,DocCollection>(state.getCollectionStates()); // make a shallow copy
DocCollection coll = newCollections.get(collectionName);
Map<String,Slice> slices;
@ -681,6 +721,28 @@ public class Overseer {
}
static void getShardNames(Integer numShards, List<String> shardNames) {
if(numShards == null)
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "numShards" + " is a required param");
for (int i = 0; i < numShards; i++) {
final String sliceName = "shard" + (i + 1);
shardNames.add(sliceName);
}
}
static void getShardNames(List<String> shardNames, String shards) {
if(shards ==null)
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "shards" + " is a required param");
for (String s : shards.split(",")) {
if(s ==null || s.trim().isEmpty()) continue;
shardNames.add(s.trim());
}
if(shardNames.isEmpty())
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "shards" + " is a required param");
}
class OverseerThread extends Thread implements ClosableThread {
private volatile boolean isClosed;

View File

@ -17,14 +17,6 @@ package org.apache.solr.cloud;
* limitations under the License.
*/
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.solr.client.solrj.SolrResponse;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.impl.HttpSolrServer;
@ -40,6 +32,7 @@ import org.apache.solr.common.cloud.ClosableThread;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.DocRouter;
import org.apache.solr.common.cloud.ImplicitDocRouter;
import org.apache.solr.common.cloud.PlainIdRouter;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
@ -61,6 +54,21 @@ import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import static org.apache.solr.cloud.Assign.Node;
import static org.apache.solr.cloud.Assign.getNodesForNewShard;
import static org.apache.solr.common.cloud.DocRouter.ROUTE_FIELD;
import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
public class OverseerCollectionProcessor implements Runnable, ClosableThread {
public static final String NUM_SLICES = "numShards";
@ -85,6 +93,22 @@ public class OverseerCollectionProcessor implements Runnable, ClosableThread {
public static final String DELETESHARD = "deleteshard";
public static final String ROUTER = "router";
public static final String SHARDS_PROP = "shards";
public static final String CREATESHARD = "createshard";
public static final String COLL_CONF = "collection.configName";
public static final Map<String,Object> COLL_PROPS = asMap(
ROUTER,DocRouter.DEFAULT_NAME,
REPLICATION_FACTOR, "1",
MAX_SHARDS_PER_NODE,"1",
ROUTE_FIELD,null);
// TODO: use from Overseer?
private static final String QUEUE_OPERATION = "operation";
@ -168,6 +192,7 @@ public class OverseerCollectionProcessor implements Runnable, ClosableThread {
protected SolrResponse processMessage(ZkNodeProps message, String operation) {
log.warn("OverseerCollectionProcessor.processMessage : "+ operation + " , "+ message.toString());
NamedList results = new NamedList();
try {
@ -185,6 +210,8 @@ public class OverseerCollectionProcessor implements Runnable, ClosableThread {
deleteAlias(zkStateReader.getAliases(), message);
} else if (SPLITSHARD.equals(operation)) {
splitShard(zkStateReader.getClusterState(), message, results);
} else if (CREATESHARD.equals(operation)) {
createShard(zkStateReader.getClusterState(), message, results);
} else if (DELETESHARD.equals(operation)) {
deleteShard(zkStateReader.getClusterState(), message, results);
} else {
@ -334,6 +361,83 @@ public class OverseerCollectionProcessor implements Runnable, ClosableThread {
}
private boolean createShard(ClusterState clusterState, ZkNodeProps message, NamedList results) throws KeeperException, InterruptedException {
log.info("create shard invoked");
String collectionName = message.getStr(COLLECTION_PROP);
String shard = message.getStr(SHARD_ID_PROP);
if(collectionName == null || shard ==null)
throw new SolrException(ErrorCode.BAD_REQUEST, "'collection' and 'shard' are required parameters" );
int numSlices = 1;
DocCollection collection = clusterState.getCollection(collectionName);
int maxShardsPerNode = collection.getInt(MAX_SHARDS_PER_NODE, 1);
int repFactor = message.getInt(REPLICATION_FACTOR, collection.getInt(MAX_SHARDS_PER_NODE, 1));
// int minReplicas = message.getInt("minReplicas",repFactor);
String createNodeSetStr =message.getStr(CREATE_NODE_SET);
ArrayList<Node> sortedNodeList = getNodesForNewShard(clusterState, collectionName, numSlices, maxShardsPerNode, repFactor, createNodeSetStr);
Overseer.getInQueue(zkStateReader.getZkClient()).offer(ZkStateReader.toJSON(message));
// wait for a while until we don't see the collection
long waitUntil = System.currentTimeMillis() + 30000;
boolean created = false;
while (System.currentTimeMillis() < waitUntil) {
Thread.sleep(100);
created = zkStateReader.getClusterState().getCollection(collectionName).getSlice(shard) !=null;
if (created) break;
}
if (!created)
throw new SolrException(ErrorCode.SERVER_ERROR, "Could not fully create shard: " + message.getStr("name"));
String configName = message.getStr(COLL_CONF);
String sliceName = shard;
for (int j = 1; j <= repFactor; j++) {
String nodeName = sortedNodeList.get(((j - 1)) % sortedNodeList.size()).nodeName;
String shardName = collectionName + "_" + sliceName + "_replica" + j;
log.info("Creating shard " + shardName + " as part of slice "
+ sliceName + " of collection " + collectionName + " on "
+ nodeName);
// Need to create new params for each request
ModifiableSolrParams params = new ModifiableSolrParams();
params.set(CoreAdminParams.ACTION, CoreAdminAction.CREATE.toString());
params.set(CoreAdminParams.NAME, shardName);
params.set(COLL_CONF, configName);
params.set(CoreAdminParams.COLLECTION, collectionName);
params.set(CoreAdminParams.SHARD, sliceName);
params.set(ZkStateReader.NUM_SHARDS_PROP, numSlices);
ShardRequest sreq = new ShardRequest();
params.set("qt", adminPath);
sreq.purpose = 1;
String replica = zkStateReader.getZkClient()
.getBaseUrlForNodeName(nodeName);
if (replica.startsWith("http://")) replica = replica.substring(7);
sreq.shards = new String[]{replica};
sreq.actualShards = sreq.shards;
sreq.params = params;
shardHandler.submit(sreq, replica, sreq.params);
}
ShardResponse srsp;
do {
srsp = shardHandler.takeCompletedOrError();
if (srsp != null) {
processResponse(results, srsp);
}
} while (srsp != null);
log.info("Finished create command on all shards for collection: "
+ collectionName);
return true;
}
private boolean splitShard(ClusterState clusterState, ZkNodeProps message, NamedList results) {
log.info("Split shard invoked");
String collectionName = message.getStr("collection");
@ -732,7 +836,7 @@ public class OverseerCollectionProcessor implements Runnable, ClosableThread {
shardHandler.submit(sreq, replica, sreq.params);
}
private void createCollection(ClusterState clusterState, ZkNodeProps message, NamedList results) {
private void createCollection(ClusterState clusterState, ZkNodeProps message, NamedList results) throws KeeperException, InterruptedException {
String collectionName = message.getStr("name");
if (clusterState.getCollections().contains(collectionName)) {
throw new SolrException(ErrorCode.BAD_REQUEST, "collection already exists: " + collectionName);
@ -742,14 +846,22 @@ public class OverseerCollectionProcessor implements Runnable, ClosableThread {
// look at the replication factor and see if it matches reality
// if it does not, find best nodes to create more cores
int repFactor = msgStrToInt(message, REPLICATION_FACTOR, 1);
Integer numSlices = msgStrToInt(message, NUM_SLICES, null);
int repFactor = message.getInt( REPLICATION_FACTOR, 1);
Integer numSlices = message.getInt(NUM_SLICES, null);
String router = message.getStr(ROUTER, DocRouter.DEFAULT_NAME);
List<String> shardNames = new ArrayList<>();
if(ImplicitDocRouter.NAME.equals(router)){
Overseer.getShardNames(shardNames, message.getStr("shards",null));
numSlices = shardNames.size();
} else {
Overseer.getShardNames(numSlices,shardNames);
}
if (numSlices == null ) {
throw new SolrException(ErrorCode.BAD_REQUEST, NUM_SLICES + " is a required param");
}
int maxShardsPerNode = msgStrToInt(message, MAX_SHARDS_PER_NODE, 1);
int maxShardsPerNode = message.getInt(MAX_SHARDS_PER_NODE, 1);
String createNodeSetStr;
List<String> createNodeList = ((createNodeSetStr = message.getStr(CREATE_NODE_SET)) == null)?null:StrUtils.splitSmart(createNodeSetStr, ",", true);
@ -761,8 +873,6 @@ public class OverseerCollectionProcessor implements Runnable, ClosableThread {
throw new SolrException(ErrorCode.BAD_REQUEST, NUM_SLICES + " must be > 0");
}
String configName = message.getStr("collection.configName");
// we need to look at every node and see how many cores it serves
// add our new cores to existing nodes serving the least number of cores
// but (for now) require that each core goes on a distinct node.
@ -806,10 +916,28 @@ public class OverseerCollectionProcessor implements Runnable, ClosableThread {
+ " shards to be created (higher than the allowed number)");
}
for (int i = 1; i <= numSlices; i++) {
// ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION,
// Overseer.CREATECOLLECTION, "name", message.getStr("name"));
Overseer.getInQueue(zkStateReader.getZkClient()).offer(ZkStateReader.toJSON(message));
// wait for a while until we don't see the collection
long waitUntil = System.currentTimeMillis() + 30000;
boolean created = false;
while (System.currentTimeMillis() < waitUntil) {
Thread.sleep(100);
created = zkStateReader.getClusterState().getCollections().contains(message.getStr("name"));
if(created) break;
}
if (!created)
throw new SolrException(ErrorCode.SERVER_ERROR, "Could not fully createcollection: " + message.getStr("name"));
String configName = message.getStr(COLL_CONF);
log.info("going to create cores replicas shardNames {} , repFactor : {}", shardNames, repFactor);
for (int i = 1; i <= shardNames.size(); i++) {
String sliceName = shardNames.get(i-1);
for (int j = 1; j <= repFactor; j++) {
String nodeName = nodeList.get((repFactor * (i - 1) + (j - 1)) % nodeList.size());
String sliceName = "shard" + i;
String shardName = collectionName + "_" + sliceName + "_replica" + j;
log.info("Creating shard " + shardName + " as part of slice "
+ sliceName + " of collection " + collectionName + " on "
@ -820,7 +948,7 @@ public class OverseerCollectionProcessor implements Runnable, ClosableThread {
params.set(CoreAdminParams.ACTION, CoreAdminAction.CREATE.toString());
params.set(CoreAdminParams.NAME, shardName);
params.set("collection.configName", configName);
params.set(COLL_CONF, configName);
params.set(CoreAdminParams.COLLECTION, collectionName);
params.set(CoreAdminParams.SHARD, sliceName);
params.set(ZkStateReader.NUM_SHARDS_PROP, numSlices);
@ -947,19 +1075,16 @@ public class OverseerCollectionProcessor implements Runnable, ClosableThread {
}
}
private Integer msgStrToInt(ZkNodeProps message, String key, Integer def)
throws Exception {
String str = message.getStr(key);
try {
return str == null ? def : Integer.valueOf(str);
} catch (Exception ex) {
SolrException.log(log, "Could not parse " + key, ex);
throw ex;
}
}
@Override
public boolean isClosed() {
return isClosed;
}
public static Map<String, Object> asMap(Object... vals) {
HashMap<String, Object> m = new HashMap<String, Object>();
for(int i=0; i<vals.length; i+=2) {
m.put(String.valueOf(vals[i]), vals[i+1]);
}
return m;
}
}

View File

@ -17,10 +17,6 @@ package org.apache.solr.handler.admin;
* limitations under the License.
*/
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import org.apache.solr.client.solrj.SolrResponse;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.impl.HttpSolrServer;
@ -32,6 +28,7 @@ import org.apache.solr.cloud.OverseerCollectionProcessor;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.ImplicitDocRouter;
import org.apache.solr.common.cloud.ZkCoreNodeProps;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
@ -49,6 +46,22 @@ import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import static org.apache.solr.cloud.Overseer.QUEUE_OPERATION;
import static org.apache.solr.cloud.OverseerCollectionProcessor.COLL_CONF;
import static org.apache.solr.cloud.OverseerCollectionProcessor.CREATESHARD;
import static org.apache.solr.cloud.OverseerCollectionProcessor.CREATE_NODE_SET;
import static org.apache.solr.cloud.OverseerCollectionProcessor.MAX_SHARDS_PER_NODE;
import static org.apache.solr.cloud.OverseerCollectionProcessor.NUM_SLICES;
import static org.apache.solr.cloud.OverseerCollectionProcessor.REPLICATION_FACTOR;
import static org.apache.solr.cloud.OverseerCollectionProcessor.ROUTER;
import static org.apache.solr.cloud.OverseerCollectionProcessor.SHARDS_PROP;
import static org.apache.solr.common.cloud.DocRouter.ROUTE_FIELD;
import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
public class CollectionsHandler extends RequestHandlerBase {
protected static Logger log = LoggerFactory.getLogger(CollectionsHandler.class);
@ -139,6 +152,9 @@ public class CollectionsHandler extends RequestHandlerBase {
case DELETESHARD: {
this.handleDeleteShardAction(req, rsp);
break;
}case CREATESHARD: {
this.handleCreateShard(req, rsp);
break;
}
default: {
@ -260,13 +276,7 @@ public class CollectionsHandler extends RequestHandlerBase {
private void handleCreateAction(SolrQueryRequest req,
SolrQueryResponse rsp) throws InterruptedException, KeeperException {
log.info("Creating Collection : " + req.getParamString());
Integer numReplicas = req.getParams().getInt(OverseerCollectionProcessor.REPLICATION_FACTOR, 1);
String name = req.getParams().required().get("name");
String configName = req.getParams().get("collection.configName");
String numShards = req.getParams().get(OverseerCollectionProcessor.NUM_SLICES);
String maxShardsPerNode = req.getParams().get(OverseerCollectionProcessor.MAX_SHARDS_PER_NODE);
String createNodeSetStr = req.getParams().get(OverseerCollectionProcessor.CREATE_NODE_SET);
if (name == null) {
log.error("Collection name is required to create a new collection");
throw new SolrException(ErrorCode.BAD_REQUEST,
@ -276,20 +286,46 @@ public class CollectionsHandler extends RequestHandlerBase {
Map<String,Object> props = new HashMap<String,Object>();
props.put(Overseer.QUEUE_OPERATION,
OverseerCollectionProcessor.CREATECOLLECTION);
props.put(OverseerCollectionProcessor.REPLICATION_FACTOR, numReplicas.toString());
props.put("name", name);
if (configName != null) {
props.put("collection.configName", configName);
}
props.put(OverseerCollectionProcessor.NUM_SLICES, numShards);
props.put(OverseerCollectionProcessor.MAX_SHARDS_PER_NODE, maxShardsPerNode);
props.put(OverseerCollectionProcessor.CREATE_NODE_SET, createNodeSetStr);
copyIfNotNull(req.getParams(),props,
"name",
REPLICATION_FACTOR,
COLL_CONF,
NUM_SLICES,
MAX_SHARDS_PER_NODE,
CREATE_NODE_SET ,
ROUTER,
SHARDS_PROP,
ROUTE_FIELD);
ZkNodeProps m = new ZkNodeProps(props);
handleResponse(OverseerCollectionProcessor.CREATECOLLECTION, m, rsp);
}
private void handleCreateShard(SolrQueryRequest req, SolrQueryResponse rsp) throws KeeperException, InterruptedException {
log.info("Create shard: " + req.getParamString());
req.getParams().required().check(COLLECTION_PROP, SHARD_ID_PROP);
ClusterState clusterState = coreContainer.getZkController().getClusterState();
if(!ImplicitDocRouter.NAME.equals( clusterState.getCollection(req.getParams().get(COLLECTION_PROP)).getStr(ROUTER)))
throw new SolrException(ErrorCode.BAD_REQUEST, "shards can be added only to 'implicit' collections" );
Map<String, Object> map = OverseerCollectionProcessor.asMap(QUEUE_OPERATION, CREATESHARD);
copyIfNotNull(req.getParams(),map,COLLECTION_PROP, SHARD_ID_PROP, REPLICATION_FACTOR);
ZkNodeProps m = new ZkNodeProps(map);
handleResponse(CREATESHARD, m, rsp);
}
private static void copyIfNotNull(SolrParams params, Map<String, Object> props, String... keys) {
if(keys !=null){
for (String key : keys) {
String v = params.get(key);
if(v != null) props.put(key,v);
}
}
}
private void handleDeleteShardAction(SolrQueryRequest req,
SolrQueryResponse rsp) throws InterruptedException, KeeperException {
log.info("Deleting Shard : " + req.getParamString());

View File

@ -16,18 +16,6 @@ package org.apache.solr.handler.component;
* limitations under the License.
*/
import java.net.ConnectException;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import org.apache.http.client.HttpClient;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.SolrResponse;
@ -44,7 +32,6 @@ import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.ZkCoreNodeProps;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.ModifiableSolrParams;
@ -55,6 +42,18 @@ import org.apache.solr.common.util.StrUtils;
import org.apache.solr.core.CoreDescriptor;
import org.apache.solr.request.SolrQueryRequest;
import java.net.ConnectException;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
public class HttpShardHandler extends ShardHandler {
private HttpShardHandlerFactory httpShardHandlerFactory;
@ -277,7 +276,8 @@ public class HttpShardHandler extends ShardHandler {
// we weren't provided with an explicit list of slices to query via "shards", so use the cluster state
clusterState = zkController.getClusterState();
String shardKeys = params.get(ShardParams.SHARD_KEYS);
String shardKeys = params.get(ShardParams._ROUTE_);
if(shardKeys == null) shardKeys = params.get(ShardParams.SHARD_KEYS);//eprecated
// This will be the complete list of slices we need to query for this request.
slices = new HashMap<String,Slice>();

View File

@ -17,16 +17,6 @@ package org.apache.solr.update.processor;
* limitations under the License.
*/
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.CharsRef;
import org.apache.solr.client.solrj.impl.HttpSolrServer;
@ -74,6 +64,16 @@ import org.apache.solr.update.VersionInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import static org.apache.solr.update.processor.DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM;
// NOT mt-safe... create a new processor for each add thread
@ -917,7 +917,9 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
outParams.set(DISTRIB_UPDATE_PARAM, DistribPhase.TOLEADER.toString());
SolrParams params = req.getParams();
Collection<Slice> slices = coll.getRouter().getSearchSlices(params.get(ShardParams.SHARD_KEYS), params, coll);
String route = params.get(ShardParams._ROUTE_);
if(route == null) route = params.get(ShardParams.SHARD_KEYS);// deprecated . kept for backcompat
Collection<Slice> slices = coll.getRouter().getSearchSlices(route, params, coll);
List<Node> leaders = new ArrayList<Node>(slices.size());
for (Slice slice : slices) {

View File

@ -581,6 +581,7 @@
<field name="store" type="location" indexed="true" stored="true" omitNorms="false"/>
<field name="lower" type="lowertok" indexed="false" stored="true" multiValued="true" />
<field name="_route_" type="string" indexed="true" stored="true" multiValued="false" />
<!-- Dynamic field definitions. If a field name is not found, dynamicFields
will be used if the name matches any of the patterns.

View File

@ -17,6 +17,50 @@ package org.apache.solr.cloud;
* limitations under the License.
*/
import org.apache.lucene.util.Constants;
import org.apache.lucene.util.LuceneTestCase.Slow;
import org.apache.lucene.util._TestUtil;
import org.apache.solr.SolrTestCaseJ4;
import org.apache.solr.client.solrj.SolrQuery;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.impl.CloudSolrServer;
import org.apache.solr.client.solrj.impl.HttpSolrServer;
import org.apache.solr.client.solrj.request.CoreAdminRequest;
import org.apache.solr.client.solrj.request.CoreAdminRequest.Create;
import org.apache.solr.client.solrj.request.QueryRequest;
import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.client.solrj.response.CoreAdminResponse;
import org.apache.solr.client.solrj.response.QueryResponse;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.DocRouter;
import org.apache.solr.common.cloud.ImplicitDocRouter;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.ZkCoreNodeProps;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.CollectionParams.CollectionAction;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.SimpleOrderedMap;
import org.apache.solr.common.util.StrUtils;
import org.apache.solr.core.SolrCore;
import org.apache.solr.core.SolrInfoMBean.Category;
import org.apache.solr.servlet.SolrDispatchFilter;
import org.apache.solr.update.DirectUpdateHandler2;
import org.apache.solr.update.SolrCmdDistributor.Request;
import org.apache.solr.util.DefaultSolrThreadFactory;
import org.junit.Before;
import org.junit.BeforeClass;
import javax.management.MBeanServer;
import javax.management.MBeanServerFactory;
import javax.management.ObjectName;
import java.io.File;
import java.io.IOException;
import java.lang.management.ManagementFactory;
@ -37,46 +81,11 @@ import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import javax.management.MBeanServer;
import javax.management.MBeanServerFactory;
import javax.management.ObjectName;
import org.apache.lucene.util.Constants;
import org.apache.lucene.util.LuceneTestCase.Slow;
import org.apache.lucene.util._TestUtil;
import org.apache.solr.SolrTestCaseJ4;
import org.apache.solr.client.solrj.SolrQuery;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.impl.CloudSolrServer;
import org.apache.solr.client.solrj.impl.HttpSolrServer;
import org.apache.solr.client.solrj.request.CoreAdminRequest;
import org.apache.solr.client.solrj.request.QueryRequest;
import org.apache.solr.client.solrj.request.CoreAdminRequest.Create;
import org.apache.solr.client.solrj.response.CoreAdminResponse;
import org.apache.solr.client.solrj.response.QueryResponse;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.ZkCoreNodeProps;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.CollectionParams.CollectionAction;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.SimpleOrderedMap;
import org.apache.solr.common.util.StrUtils;
import org.apache.solr.core.SolrCore;
import org.apache.solr.core.SolrInfoMBean.Category;
import org.apache.solr.servlet.SolrDispatchFilter;
import org.apache.solr.update.DirectUpdateHandler2;
import org.apache.solr.update.SolrCmdDistributor.Request;
import org.apache.solr.util.DefaultSolrThreadFactory;
import org.junit.Before;
import org.junit.BeforeClass;
import static org.apache.solr.cloud.OverseerCollectionProcessor.MAX_SHARDS_PER_NODE;
import static org.apache.solr.cloud.OverseerCollectionProcessor.REPLICATION_FACTOR;
import static org.apache.solr.cloud.OverseerCollectionProcessor.ROUTER;
import static org.apache.solr.cloud.OverseerCollectionProcessor.SHARDS_PROP;
import static org.apache.solr.common.params.ShardParams._ROUTE_;
/**
* Tests the Cloud Collections API.
@ -147,6 +156,7 @@ public class CollectionsAPIDistributedZkTest extends AbstractFullDistribZkTestBa
testErrorHandling();
deletePartiallyCreatedCollection();
deleteCollectionWithDownNodes();
testCustomCollectionsAPI();
if (DEBUG) {
super.printLayout();
}
@ -260,7 +270,7 @@ public class CollectionsAPIDistributedZkTest extends AbstractFullDistribZkTestBa
collectionName = "collection";
params.set("name", collectionName);
params.set("numShards", 2);
params.set(OverseerCollectionProcessor.REPLICATION_FACTOR, 10);
params.set(REPLICATION_FACTOR, 10);
request = new QueryRequest(params);
request.setPath("/admin/collections");
gotExp = false;
@ -277,7 +287,7 @@ public class CollectionsAPIDistributedZkTest extends AbstractFullDistribZkTestBa
params.set("action", CollectionAction.CREATE.toString());
collectionName = "acollection";
params.set("name", collectionName);
params.set(OverseerCollectionProcessor.REPLICATION_FACTOR, 10);
params.set(REPLICATION_FACTOR, 10);
request = new QueryRequest(params);
request.setPath("/admin/collections");
gotExp = false;
@ -294,7 +304,7 @@ public class CollectionsAPIDistributedZkTest extends AbstractFullDistribZkTestBa
params.set("action", CollectionAction.CREATE.toString());
collectionName = "acollection";
params.set("name", collectionName);
params.set(OverseerCollectionProcessor.REPLICATION_FACTOR, 10);
params.set(REPLICATION_FACTOR, 10);
params.set("numShards", 0);
request = new QueryRequest(params);
request.setPath("/admin/collections");
@ -361,7 +371,7 @@ public class CollectionsAPIDistributedZkTest extends AbstractFullDistribZkTestBa
params.set("action", CollectionAction.CREATE.toString());
params.set("numShards", 2);
params.set(OverseerCollectionProcessor.REPLICATION_FACTOR, 2);
params.set(REPLICATION_FACTOR, 2);
String collectionName = "nodes_used_collection";
params.set("name", collectionName);
@ -545,7 +555,7 @@ public class CollectionsAPIDistributedZkTest extends AbstractFullDistribZkTestBa
params.set("action", CollectionAction.CREATE.toString());
params.set("numShards", 1);
params.set(OverseerCollectionProcessor.REPLICATION_FACTOR, 2);
params.set(REPLICATION_FACTOR, 2);
collectionName = "acollectionafterbaddelete";
params.set("name", collectionName);
@ -617,6 +627,208 @@ public class CollectionsAPIDistributedZkTest extends AbstractFullDistribZkTestBa
checkNoTwoShardsUseTheSameIndexDir();
}
private void testCustomCollectionsAPI() throws Exception {
String COLL_PREFIX = "new_implicit_coll_";
// TODO: fragile - because we dont pass collection.confName, it will only
// find a default if a conf set with a name matching the collection name is found, or
// if there is only one conf set. That and the fact that other tests run first in this
// env make this pretty fragile
// create new collections rapid fire
Map<String,List<Integer>> collectionInfos = new HashMap<String,List<Integer>>();
int cnt = random().nextInt(6) + 1;
for (int i = 0; i < cnt; i++) {
int numShards = 4;
int replicationFactor = _TestUtil.nextInt(random(), 0, 3) + 2;
int maxShardsPerNode = ((((numShards+1) * replicationFactor) / getCommonCloudSolrServer()
.getZkStateReader().getClusterState().getLiveNodes().size())) + 1;
CloudSolrServer client = null;
try {
if (i == 0) {
// Test if we can create a collection through CloudSolrServer where
// you havnt set default-collection
// This is nice because you want to be able to create you first
// collection using CloudSolrServer, and in such case there is
// nothing reasonable to set as default-collection
client = createCloudClient(null);
} else if (i == 1) {
// Test if we can create a collection through CloudSolrServer where
// you have set default-collection to a non-existing collection
// This is nice because you want to be able to create you first
// collection using CloudSolrServer, and in such case there is
// nothing reasonable to set as default-collection, but you might want
// to use the same CloudSolrServer throughout the entire
// lifetime of your client-application, so it is nice to be able to
// set a default-collection on this CloudSolrServer once and for all
// and use this CloudSolrServer to create the collection
client = createCloudClient(COLL_PREFIX + i);
}
Map<String, Object> props = OverseerCollectionProcessor.asMap(
ROUTER, ImplicitDocRouter.NAME,
REPLICATION_FACTOR, replicationFactor,
MAX_SHARDS_PER_NODE, maxShardsPerNode,
SHARDS_PROP,"a,b,c,d");
createCollection(collectionInfos, COLL_PREFIX + i,props,client);
} finally {
if (client != null) client.shutdown();
}
}
Set<Entry<String,List<Integer>>> collectionInfosEntrySet = collectionInfos.entrySet();
for (Entry<String,List<Integer>> entry : collectionInfosEntrySet) {
String collection = entry.getKey();
List<Integer> list = entry.getValue();
checkForCollection(collection, list, null);
String url = getUrlFromZk(collection);
HttpSolrServer collectionClient = new HttpSolrServer(url);
// poll for a second - it can take a moment before we are ready to serve
waitForNon403or404or503(collectionClient);
}
ZkStateReader zkStateReader = getCommonCloudSolrServer().getZkStateReader();
for (int j = 0; j < cnt; j++) {
waitForRecoveriesToFinish(COLL_PREFIX + j, zkStateReader, false);
}
ClusterState clusterState = zkStateReader.getClusterState();
DocCollection coll = clusterState.getCollection(COLL_PREFIX + 0);
assertEquals("implicit", coll.getStr(ROUTER));
assertNotNull(coll.getStr(REPLICATION_FACTOR));
assertNotNull(coll.getStr(MAX_SHARDS_PER_NODE));
List<String> collectionNameList = new ArrayList<String>();
collectionNameList.addAll(collectionInfos.keySet());
log.info("Collections created : "+collectionNameList );
String collectionName = collectionNameList.get(random().nextInt(collectionNameList.size()));
String url = getUrlFromZk(collectionName);
HttpSolrServer collectionClient = new HttpSolrServer(url);
// lets try and use the solrj client to index a couple documents
collectionClient.add(getDoc(id, 6, i1, -600, tlong, 600, t1,
"humpty dumpy sat on a wall", _ROUTE_,"a"));
collectionClient.add(getDoc(id, 7, i1, -600, tlong, 600, t1,
"humpty dumpy3 sat on a walls", _ROUTE_,"a"));
collectionClient.add(getDoc(id, 8, i1, -600, tlong, 600, t1,
"humpty dumpy2 sat on a walled", _ROUTE_,"a"));
collectionClient.commit();
assertEquals(3, collectionClient.query(new SolrQuery("*:*")).getResults().getNumFound());
assertEquals(0, collectionClient.query(new SolrQuery("*:*").setParam("shard.keys","b")).getResults().getNumFound());
assertEquals(3, collectionClient.query(new SolrQuery("*:*").setParam("shard.keys","a")).getResults().getNumFound());
collectionClient.deleteByQuery("*:*");
collectionClient.commit(true,true);
assertEquals(0, collectionClient.query(new SolrQuery("*:*")).getResults().getNumFound());
UpdateRequest up = new UpdateRequest();
up.setParam(_ROUTE_, "c");
up.setParam("commit","true");
up.add(getDoc(id, 9, i1, -600, tlong, 600, t1,
"humpty dumpy sat on a wall"));
up.add(getDoc(id, 10, i1, -600, tlong, 600, t1,
"humpty dumpy3 sat on a walls"));
up.add(getDoc(id, 11, i1, -600, tlong, 600, t1,
"humpty dumpy2 sat on a walled"));
collectionClient.request(up);
assertEquals(3, collectionClient.query(new SolrQuery("*:*")).getResults().getNumFound());
assertEquals(0, collectionClient.query(new SolrQuery("*:*").setParam("shard.keys","a")).getResults().getNumFound());
assertEquals(3, collectionClient.query(new SolrQuery("*:*").setParam("shard.keys","c")).getResults().getNumFound());
//Testing CREATESHARD
ModifiableSolrParams params = new ModifiableSolrParams();
params.set("action", CollectionAction.CREATESHARD.toString());
params.set("collection", coll.getName());
params.set("shard", "z");
SolrRequest request = new QueryRequest(params);
request.setPath("/admin/collections");
createNewSolrServer("", getBaseUrl((HttpSolrServer) clients.get(0))).request(request);
waitForCollection(zkStateReader,coll.getName(),5);
collectionClient.add(getDoc(id, 66, i1, -600, tlong, 600, t1,
"humpty dumpy sat on a wall", _ROUTE_,"x"));
collectionClient.commit();
assertEquals(1, collectionClient.query(new SolrQuery("*:*").setParam(_ROUTE_,"x")).getResults().getNumFound());
int numShards = 4;
int replicationFactor = _TestUtil.nextInt(random(), 0, 3) + 2;
int maxShardsPerNode = (((numShards * replicationFactor) / getCommonCloudSolrServer()
.getZkStateReader().getClusterState().getLiveNodes().size())) + 1;
CloudSolrServer client = null;
String shard_fld = "shard_s";
try {
client = createCloudClient(null);
Map<String, Object> props = OverseerCollectionProcessor.asMap(
ROUTER, ImplicitDocRouter.NAME,
REPLICATION_FACTOR, replicationFactor,
MAX_SHARDS_PER_NODE, maxShardsPerNode,
SHARDS_PROP,"a,b,c,d",
DocRouter.ROUTE_FIELD, shard_fld);
collectionName = COLL_PREFIX + "withShardField";
createCollection(collectionInfos, collectionName,props,client);
} finally {
if (client != null) client.shutdown();
}
List<Integer> list = collectionInfos.get(collectionName);
checkForCollection(collectionName, list, null);
url = getUrlFromZk(collectionName);
collectionClient = new HttpSolrServer(url);
// poll for a second - it can take a moment before we are ready to serve
waitForNon403or404or503(collectionClient);
collectionClient = new HttpSolrServer(url);
// lets try and use the solrj client to index a couple documents
collectionClient.add(getDoc(id, 6, i1, -600, tlong, 600, t1,
"humpty dumpy sat on a wall", shard_fld,"a"));
collectionClient.add(getDoc(id, 7, i1, -600, tlong, 600, t1,
"humpty dumpy3 sat on a walls", shard_fld,"a"));
collectionClient.add(getDoc(id, 8, i1, -600, tlong, 600, t1,
"humpty dumpy2 sat on a walled", shard_fld,"a"));
collectionClient.commit();
assertEquals(3, collectionClient.query(new SolrQuery("*:*")).getResults().getNumFound());
assertEquals(0, collectionClient.query(new SolrQuery("*:*").setParam("shard.keys","b")).getResults().getNumFound());
//TODO debug the following case
assertEquals(3, collectionClient.query(new SolrQuery("*:*").setParam("shard.keys", "a")).getResults().getNumFound());
}
private boolean waitForReloads(String collectionName, Map<String,Long> urlToTimeBefore) throws SolrServerException, IOException {

View File

@ -17,25 +17,6 @@ package org.apache.solr.cloud;
* limitations under the License.
*/
import static org.easymock.EasyMock.capture;
import static org.easymock.EasyMock.createMock;
import static org.easymock.EasyMock.expect;
import static org.easymock.EasyMock.expectLastCall;
import static org.easymock.EasyMock.replay;
import static org.easymock.EasyMock.anyObject;
import static org.easymock.EasyMock.reset;
import static org.easymock.EasyMock.verify;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Queue;
import java.util.Set;
import org.apache.solr.SolrTestCaseJ4;
import org.apache.solr.client.solrj.SolrResponse;
import org.apache.solr.client.solrj.response.QueryResponse;
@ -51,8 +32,8 @@ import org.apache.solr.common.util.StrUtils;
import org.apache.solr.handler.component.ShardHandler;
import org.apache.solr.handler.component.ShardRequest;
import org.apache.solr.handler.component.ShardResponse;
import org.apache.zookeeper.CreateMode;
import org.easymock.Capture;
import org.easymock.EasyMock;
import org.easymock.IAnswer;
import org.eclipse.jetty.util.BlockingArrayQueue;
import org.junit.After;
@ -61,6 +42,27 @@ import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Queue;
import java.util.Set;
import static org.easymock.EasyMock.anyBoolean;
import static org.easymock.EasyMock.anyObject;
import static org.easymock.EasyMock.capture;
import static org.easymock.EasyMock.createMock;
import static org.easymock.EasyMock.expect;
import static org.easymock.EasyMock.expectLastCall;
import static org.easymock.EasyMock.getCurrentArguments;
import static org.easymock.EasyMock.replay;
import static org.easymock.EasyMock.reset;
import static org.easymock.EasyMock.verify;
public class OverseerCollectionProcessorTest extends SolrTestCaseJ4 {
private static final String ADMIN_PATH = "/admin/cores";
@ -72,6 +74,8 @@ public class OverseerCollectionProcessorTest extends SolrTestCaseJ4 {
private static ZkStateReader zkStateReaderMock;
private static ClusterState clusterStateMock;
private static SolrZkClient solrZkClientMock;
private final Map zkMap = new HashMap();
private final Set collectionsSet = new HashSet();
private OverseerCollectionProcessorToBeTested underTest;
@ -92,6 +96,7 @@ public class OverseerCollectionProcessorTest extends SolrTestCaseJ4 {
@Override
protected SolrResponse processMessage(ZkNodeProps message, String operation) {
lastProcessMessageResult = super.processMessage(message, operation);
log.info("1 : "+System.currentTimeMillis());
return lastProcessMessageResult;
}
@ -123,6 +128,7 @@ public class OverseerCollectionProcessorTest extends SolrTestCaseJ4 {
@Before
public void setUp() throws Exception {
super.setUp();
queue.clear();
reset(workQueueMock);
reset(workQueueMock);
reset(shardHandlerMock);
@ -131,6 +137,8 @@ public class OverseerCollectionProcessorTest extends SolrTestCaseJ4 {
reset(solrZkClientMock);
underTest = new OverseerCollectionProcessorToBeTested(zkStateReaderMock,
"1234", shardHandlerMock, ADMIN_PATH, workQueueMock);
zkMap.clear();
collectionsSet.clear();
}
@After
@ -156,7 +164,7 @@ public class OverseerCollectionProcessorTest extends SolrTestCaseJ4 {
expectLastCall().andAnswer(new IAnswer<Object>() {
@Override
public Object answer() throws Throwable {
queue.remove((QueueEvent)EasyMock.getCurrentArguments()[0]);
queue.remove((QueueEvent) getCurrentArguments()[0]);
return null;
}
}).anyTimes();
@ -190,7 +198,7 @@ public class OverseerCollectionProcessorTest extends SolrTestCaseJ4 {
expectLastCall().andAnswer(new IAnswer<Object>() {
@Override
public Object answer() throws Throwable {
return new HashSet<String>();
return collectionsSet;
}
}).anyTimes();
final Set<String> liveNodes = new HashSet<String>();
@ -216,10 +224,59 @@ public class OverseerCollectionProcessorTest extends SolrTestCaseJ4 {
return liveNodes;
}
}).anyTimes();
solrZkClientMock.create(anyObject(String.class), anyObject(byte[].class), anyObject(CreateMode.class), anyBoolean());
expectLastCall().andAnswer(new IAnswer<String>() {
@Override
public String answer() throws Throwable {
String key = (String) getCurrentArguments()[0];
zkMap.put(key, null);
handleCrateCollMessage((byte[]) getCurrentArguments()[1]);
return key;
}
}).anyTimes();
solrZkClientMock.create(anyObject(String.class), anyObject(byte[].class), anyObject(List.class),anyObject(CreateMode.class), anyBoolean());
expectLastCall().andAnswer(new IAnswer<String>() {
@Override
public String answer() throws Throwable {
String key = (String) getCurrentArguments()[0];
zkMap.put(key, null);
handleCrateCollMessage((byte[]) getCurrentArguments()[1]);
return key;
}
}).anyTimes();
solrZkClientMock.makePath(anyObject(String.class), anyObject(byte[].class), anyBoolean());
expectLastCall().andAnswer(new IAnswer<String>() {
@Override
public String answer() throws Throwable {
String key = (String) getCurrentArguments()[0];
return key;
}
}).anyTimes();
solrZkClientMock.exists(anyObject(String.class),anyBoolean());
expectLastCall().andAnswer(new IAnswer<Boolean>() {
@Override
public Boolean answer() throws Throwable {
String key = (String) getCurrentArguments()[0];
return zkMap.containsKey(key);
}
}).anyTimes();
return liveNodes;
}
private void handleCrateCollMessage(byte[] bytes) {
try {
ZkNodeProps props = ZkNodeProps.load(bytes);
if("createcollection".equals(props.getStr("operation"))){
String collName = props.getStr("name") ;
if(collName != null) collectionsSet.add(collName);
}
} catch (Exception e) { }
}
protected void startComponentUnderTest() {
thread = new Thread(underTest);
thread.start();
@ -409,7 +466,7 @@ public class OverseerCollectionProcessorTest extends SolrTestCaseJ4 {
long start = System.currentTimeMillis();
while (queue.peek() != null) {
if ((System.currentTimeMillis() - start) > maxWait) fail(" Queue not empty within "
+ maxWait + " ms");
+ maxWait + " ms" + System.currentTimeMillis());
Thread.sleep(100);
}
}
@ -445,10 +502,12 @@ public class OverseerCollectionProcessorTest extends SolrTestCaseJ4 {
replay(zkStateReaderMock);
replay(clusterStateMock);
replay(shardHandlerMock);
log.info("clusterstate " +clusterStateMock.hashCode());
startComponentUnderTest();
issueCreateJob(numberOfSlices, replicationFactor, maxShardsPerNode, (createNodeListOption != CreateNodeListOptions.SEND_NULL)?createNodeList:null, (createNodeListOption != CreateNodeListOptions.DONT_SEND));
waitForEmptyQueue(10000);
if (collectionExceptedToBeCreated) {

View File

@ -17,12 +17,11 @@ package org.apache.solr.common.cloud;
* limitations under the License.
*/
import org.noggit.JSONWriter;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.util.Hash;
import org.apache.solr.common.util.StrUtils;
import org.noggit.JSONWriter;
import java.util.ArrayList;
import java.util.Collection;
@ -39,6 +38,7 @@ import java.util.Map;
public abstract class DocRouter {
public static final String DEFAULT_NAME = CompositeIdRouter.NAME;
public static final DocRouter DEFAULT = new CompositeIdRouter();
public static final String ROUTE_FIELD = "routeField";
public static DocRouter getDocRouter(Object routerSpec) {
DocRouter router = routerMap.get(routerSpec);

View File

@ -20,33 +20,53 @@ package org.apache.solr.common.cloud;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.params.SolrParams;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import static org.apache.solr.common.params.ShardParams._ROUTE_;
/** This document router is for custom sharding
*/
public class ImplicitDocRouter extends DocRouter {
public static final String NAME = "implicit";
// @Deprecated
// public static final String DEFAULT_SHARD_PARAM = "_shard_";
private static Logger log = LoggerFactory
.getLogger(ImplicitDocRouter.class);
@Override
public Slice getTargetSlice(String id, SolrInputDocument sdoc, SolrParams params, DocCollection collection) {
String shard = null;
if (sdoc != null) {
Object o = sdoc.getFieldValue("_shard_");
String f = collection.getStr(ROUTE_FIELD);
if(f !=null) {
Object o = sdoc.getFieldValue(f);
if (o != null) shard = o.toString();
else throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "No value for field "+f +" in " + sdoc);
}
if(shard == null) {
Object o = sdoc.getFieldValue(_ROUTE_);
if (o == null) o = sdoc.getFieldValue("_shard_");//deprecated . for backcompat remove later
if (o != null) {
shard = o.toString();
}
}
}
if (shard == null) {
shard = params.get("_shard_");
shard = params.get(_ROUTE_);
if(shard == null) shard =params.get("_shard_"); //deperecated for back compat
}
if (shard != null) {
Slice slice = collection.getSlice(shard);
if (slice == null) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "No _shard_=" + shard + " in " + collection);
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "No shard called =" + shard + " in " + collection);
}
return slice;
}
@ -56,12 +76,14 @@ public class ImplicitDocRouter extends DocRouter {
@Override
public boolean isTargetSlice(String id, SolrInputDocument sdoc, SolrParams params, String shardId, DocCollection collection) {
// todo : how to handle this?
return false;
}
@Override
public Collection<Slice> getSearchSlicesSingle(String shardKey, SolrParams params, DocCollection collection) {
if (shardKey == null) {
return collection.getActiveSlices();
}
@ -75,4 +97,8 @@ public class ImplicitDocRouter extends DocRouter {
return Collections.singleton(slice);
}
@Override
public List<Range> partitionRange(int partitions, Range range) {
return null;
}
}

View File

@ -20,11 +20,9 @@ package org.apache.solr.common.cloud;
import org.noggit.JSONUtil;
import org.noggit.JSONWriter;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
/**
@ -108,6 +106,22 @@ public class ZkNodeProps implements JSONWriter.Writable {
return o == null ? null : o.toString();
}
/**
* Get a string property value.
*/
public Integer getInt(String key, Integer def) {
Object o = propMap.get(key);
return o == null ? def : Integer.valueOf(o.toString());
}
/**
* Get a string property value.
*/
public String getStr(String key,String def) {
Object o = propMap.get(key);
return o == null ? def : o.toString();
}
public Object get(String key) {
return propMap.get(key);
}

View File

@ -28,7 +28,7 @@ public interface CollectionParams
public enum CollectionAction {
CREATE, DELETE, RELOAD, SYNCSHARD, CREATEALIAS, DELETEALIAS, SPLITSHARD, DELETESHARD;
CREATE, DELETE, RELOAD, SYNCSHARD, CREATEALIAS, DELETEALIAS, SPLITSHARD, DELETESHARD, CREATESHARD;
public static CollectionAction get( String p )
{

View File

@ -148,4 +148,8 @@ public class RequiredSolrParams extends SolrParams {
public String getFieldParam(String field, String param, String def) {
return params.getFieldParam(field, param, def);
}
public void check(String... params){
for (String param : params) get(param);
}
}

View File

@ -47,5 +47,10 @@ public interface ShardParams {
public static final String SHARDS_TOLERANT = "shards.tolerant";
/** Should things fail if there is an error? (true/false) */
@Deprecated
public static final String SHARD_KEYS = "shard.keys";
public static final String _ROUTE_ = "_route_";
}

View File

@ -17,19 +17,6 @@ package org.apache.solr.cloud;
* limitations under the License.
*/
import java.io.File;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URI;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.http.params.CoreConnectionPNames;
import org.apache.lucene.util.LuceneTestCase.Slow;
import org.apache.solr.client.solrj.SolrQuery;
@ -56,6 +43,7 @@ import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.CollectionParams.CollectionAction;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.util.StrUtils;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
@ -63,6 +51,25 @@ import org.junit.BeforeClass;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URI;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import static org.apache.solr.cloud.OverseerCollectionProcessor.CREATE_NODE_SET;
import static org.apache.solr.cloud.OverseerCollectionProcessor.MAX_SHARDS_PER_NODE;
import static org.apache.solr.cloud.OverseerCollectionProcessor.NUM_SLICES;
import static org.apache.solr.cloud.OverseerCollectionProcessor.REPLICATION_FACTOR;
import static org.apache.solr.cloud.OverseerCollectionProcessor.SHARDS_PROP;
/**
* TODO: we should still test this works as a custom update chain as well as
* what we test now - the default update chain
@ -304,7 +311,9 @@ public abstract class AbstractFullDistribZkTestBase extends AbstractDistribZkTes
cnt = 30;
while (reader.getClusterState().getSlices(collection).size() < slices) {
if (cnt == 0) {
throw new RuntimeException("timeout waiting for collection shards to come up: collection="+collection + "nSlices="+slices);
throw new RuntimeException("timeout waiting for collection shards to come up: collection="+collection
+ ", slices.expected="+slices+ " slices.actual= " + reader.getClusterState().getSlices(collection).size()
+ " slices : "+ reader.getClusterState().getSlices(collection) );
}
cnt--;
Thread.sleep(500);
@ -1493,15 +1502,21 @@ public abstract class AbstractFullDistribZkTestBase extends AbstractDistribZkTes
createCollection(null, collectionName, numShards, numReplicas, maxShardsPerNode, null, null);
}
protected void createCollection(Map<String,List<Integer>> collectionInfos,
String collectionName, int numShards, int numReplicas, int maxShardsPerNode, SolrServer client, String createNodeSetStr) throws SolrServerException, IOException {
protected void createCollection(Map<String,List<Integer>> collectionInfos, String collectionName, Map<String,Object> collectionProps, SolrServer client ) throws SolrServerException, IOException{
ModifiableSolrParams params = new ModifiableSolrParams();
params.set("action", CollectionAction.CREATE.toString());
params.set(OverseerCollectionProcessor.NUM_SLICES, numShards);
params.set(OverseerCollectionProcessor.REPLICATION_FACTOR, numReplicas);
params.set(OverseerCollectionProcessor.MAX_SHARDS_PER_NODE, maxShardsPerNode);
if (createNodeSetStr != null) params.set(OverseerCollectionProcessor.CREATE_NODE_SET, createNodeSetStr);
for (Map.Entry<String, Object> entry : collectionProps.entrySet()) {
if(entry.getValue() !=null) params.set(entry.getKey(), String.valueOf(entry.getValue()));
}
Integer numShards = (Integer) collectionProps.get(NUM_SLICES);
if(numShards==null){
String shardNames = (String) collectionProps.get(SHARDS_PROP);
numShards = StrUtils.splitSmart(shardNames,',').size();
}
Integer numReplicas = (Integer) collectionProps.get(REPLICATION_FACTOR);
if(numShards==null){
numShards = (Integer) OverseerCollectionProcessor.COLL_PROPS.get(REPLICATION_FACTOR);
}
int clientIndex = random().nextInt(2);
List<Integer> list = new ArrayList<Integer>();
@ -1521,6 +1536,25 @@ public abstract class AbstractFullDistribZkTestBase extends AbstractDistribZkTes
} else {
client.request(request);
}
}
protected void runCollectionAdminCommand(ModifiableSolrParams params){
;
}
protected void createCollection(Map<String,List<Integer>> collectionInfos,
String collectionName, int numShards, int numReplicas, int maxShardsPerNode, SolrServer client, String createNodeSetStr) throws SolrServerException, IOException {
createCollection(collectionInfos, collectionName,
OverseerCollectionProcessor.asMap(
NUM_SLICES, numShards,
REPLICATION_FACTOR, numReplicas,
CREATE_NODE_SET, createNodeSetStr,
MAX_SHARDS_PER_NODE, maxShardsPerNode),
client);
}
@Override