mirror of https://github.com/apache/lucene.git
SOLR-4221 changed format router is stored
git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1524442 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
ea210933f5
commit
60e7475519
|
@ -221,7 +221,7 @@ public class Overseer {
|
|||
|
||||
ArrayList<String> shardNames = new ArrayList<String>();
|
||||
|
||||
if(ImplicitDocRouter.NAME.equals( message.getStr("router",DocRouter.DEFAULT_NAME))){
|
||||
if(ImplicitDocRouter.NAME.equals( message.getStr("router.name",DocRouter.DEFAULT_NAME))){
|
||||
getShardNames(shardNames,message.getStr("shards",DocRouter.DEFAULT_NAME));
|
||||
} else {
|
||||
int numShards = message.getInt(ZkStateReader.NUM_SHARDS_PROP, -1);
|
||||
|
@ -415,8 +415,8 @@ public class Overseer {
|
|||
private ClusterState createCollection(ClusterState state, String collectionName, List<String> shards , ZkNodeProps message) {
|
||||
log.info("Create collection {} with shards {}", collectionName, shards);;
|
||||
|
||||
String routerName = message.getStr(OverseerCollectionProcessor.ROUTER,DocRouter.DEFAULT_NAME);
|
||||
DocRouter router = DocRouter.getDocRouter(routerName);
|
||||
// String routerName = message.getStr(OverseerCollectionProcessor.ROUTER,DocRouter.DEFAULT_NAME);
|
||||
DocRouter router = DocRouter.getDocRouter(message.getStr(OverseerCollectionProcessor.ROUTER,DocRouter.DEFAULT_NAME));
|
||||
|
||||
List<DocRouter.Range> ranges = router.partitionRange(shards.size(), router.fullRange());
|
||||
|
||||
|
@ -447,7 +447,7 @@ public class Overseer {
|
|||
}
|
||||
if(val != null) collectionProps.put(e.getKey(),val);
|
||||
}
|
||||
collectionProps.put(DocCollection.DOC_ROUTER, routerName);
|
||||
collectionProps.put(DocCollection.DOC_ROUTER, DocRouter.getRouterSpec(message));
|
||||
|
||||
DocCollection newCollection = new DocCollection(collectionName, newSlices, collectionProps, router);
|
||||
|
||||
|
@ -506,7 +506,7 @@ public class Overseer {
|
|||
// without explicitly creating a collection. In this current case, we assume custom sharding with an "implicit" router.
|
||||
slices = new HashMap<String, Slice>(1);
|
||||
props = new HashMap<String,Object>(1);
|
||||
props.put(DocCollection.DOC_ROUTER, ImplicitDocRouter.NAME);
|
||||
props.put(DocCollection.DOC_ROUTER, ZkNodeProps.makeMap("name",ImplicitDocRouter.NAME));
|
||||
router = new ImplicitDocRouter();
|
||||
} else {
|
||||
props = coll.getProperties();
|
||||
|
|
|
@ -64,7 +64,6 @@ import java.util.Set;
|
|||
|
||||
import static org.apache.solr.cloud.Assign.Node;
|
||||
import static org.apache.solr.cloud.Assign.getNodesForNewShard;
|
||||
import static org.apache.solr.common.cloud.DocRouter.ROUTE_FIELD;
|
||||
import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
|
||||
import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
|
||||
|
||||
|
@ -102,11 +101,10 @@ public class OverseerCollectionProcessor implements Runnable, ClosableThread {
|
|||
public static final String COLL_CONF = "collection.configName";
|
||||
|
||||
|
||||
public static final Map<String,Object> COLL_PROPS = asMap(
|
||||
ROUTER,DocRouter.DEFAULT_NAME,
|
||||
public static final Map<String,Object> COLL_PROPS = ZkNodeProps.makeMap(
|
||||
ROUTER, DocRouter.DEFAULT_NAME,
|
||||
REPLICATION_FACTOR, "1",
|
||||
MAX_SHARDS_PER_NODE,"1",
|
||||
ROUTE_FIELD,null);
|
||||
MAX_SHARDS_PER_NODE, "1");
|
||||
|
||||
|
||||
// TODO: use from Overseer?
|
||||
|
@ -875,7 +873,7 @@ public class OverseerCollectionProcessor implements Runnable, ClosableThread {
|
|||
|
||||
int repFactor = message.getInt( REPLICATION_FACTOR, 1);
|
||||
Integer numSlices = message.getInt(NUM_SLICES, null);
|
||||
String router = message.getStr(ROUTER, DocRouter.DEFAULT_NAME);
|
||||
String router = message.getStr("router.name", DocRouter.DEFAULT_NAME);
|
||||
List<String> shardNames = new ArrayList<>();
|
||||
if(ImplicitDocRouter.NAME.equals(router)){
|
||||
Overseer.getShardNames(shardNames, message.getStr("shards",null));
|
||||
|
@ -943,8 +941,6 @@ public class OverseerCollectionProcessor implements Runnable, ClosableThread {
|
|||
+ " shards to be created (higher than the allowed number)");
|
||||
}
|
||||
|
||||
// ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION,
|
||||
// Overseer.CREATECOLLECTION, "name", message.getStr("name"));
|
||||
Overseer.getInQueue(zkStateReader.getZkClient()).offer(ZkStateReader.toJSON(message));
|
||||
|
||||
// wait for a while until we don't see the collection
|
||||
|
@ -1102,11 +1098,4 @@ public class OverseerCollectionProcessor implements Runnable, ClosableThread {
|
|||
return isClosed;
|
||||
}
|
||||
|
||||
public static Map<String, Object> asMap(Object... vals) {
|
||||
HashMap<String, Object> m = new HashMap<String, Object>();
|
||||
for(int i=0; i<vals.length; i+=2) {
|
||||
m.put(String.valueOf(vals[i]), vals[i+1]);
|
||||
}
|
||||
return m;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1166,9 +1166,9 @@ public final class ZkController {
|
|||
numShards = System.getProperty(ZkStateReader.NUM_SHARDS_PROP);
|
||||
}
|
||||
if (numShards == null) {
|
||||
collectionProps.put(DocCollection.DOC_ROUTER, ImplicitDocRouter.NAME);
|
||||
collectionProps.put(DocCollection.DOC_ROUTER, ZkNodeProps.makeMap("name",ImplicitDocRouter.NAME));
|
||||
} else {
|
||||
collectionProps.put(DocCollection.DOC_ROUTER, DocRouter.DEFAULT_NAME);
|
||||
collectionProps.put(DocCollection.DOC_ROUTER, ZkNodeProps.makeMap("name", DocRouter.DEFAULT_NAME));
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -47,7 +47,9 @@ import org.slf4j.Logger;
|
|||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.apache.solr.cloud.Overseer.QUEUE_OPERATION;
|
||||
|
@ -59,7 +61,7 @@ import static org.apache.solr.cloud.OverseerCollectionProcessor.NUM_SLICES;
|
|||
import static org.apache.solr.cloud.OverseerCollectionProcessor.REPLICATION_FACTOR;
|
||||
import static org.apache.solr.cloud.OverseerCollectionProcessor.ROUTER;
|
||||
import static org.apache.solr.cloud.OverseerCollectionProcessor.SHARDS_PROP;
|
||||
import static org.apache.solr.common.cloud.DocRouter.ROUTE_FIELD;
|
||||
import static org.apache.solr.common.cloud.ZkNodeProps.makeMap;
|
||||
import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
|
||||
import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
|
||||
|
||||
|
@ -294,9 +296,8 @@ public class CollectionsHandler extends RequestHandlerBase {
|
|||
NUM_SLICES,
|
||||
MAX_SHARDS_PER_NODE,
|
||||
CREATE_NODE_SET ,
|
||||
ROUTER,
|
||||
SHARDS_PROP,
|
||||
ROUTE_FIELD);
|
||||
"router.");
|
||||
|
||||
|
||||
ZkNodeProps m = new ZkNodeProps(props);
|
||||
|
@ -307,22 +308,39 @@ public class CollectionsHandler extends RequestHandlerBase {
|
|||
log.info("Create shard: " + req.getParamString());
|
||||
req.getParams().required().check(COLLECTION_PROP, SHARD_ID_PROP);
|
||||
ClusterState clusterState = coreContainer.getZkController().getClusterState();
|
||||
if(!ImplicitDocRouter.NAME.equals( clusterState.getCollection(req.getParams().get(COLLECTION_PROP)).getStr(ROUTER)))
|
||||
if(!ImplicitDocRouter.NAME.equals( ((Map) clusterState.getCollection(req.getParams().get(COLLECTION_PROP)).get(ROUTER)).get("name") ) )
|
||||
throw new SolrException(ErrorCode.BAD_REQUEST, "shards can be added only to 'implicit' collections" );
|
||||
|
||||
Map<String, Object> map = OverseerCollectionProcessor.asMap(QUEUE_OPERATION, CREATESHARD);
|
||||
Map<String, Object> map = makeMap(QUEUE_OPERATION, CREATESHARD);
|
||||
copyIfNotNull(req.getParams(),map,COLLECTION_PROP, SHARD_ID_PROP, REPLICATION_FACTOR,CREATE_NODE_SET);
|
||||
ZkNodeProps m = new ZkNodeProps(map);
|
||||
handleResponse(CREATESHARD, m, rsp);
|
||||
}
|
||||
|
||||
private static void copyIfNotNull(SolrParams params, Map<String, Object> props, String... keys) {
|
||||
ArrayList<String> prefixes = new ArrayList<String>(1);
|
||||
if(keys !=null){
|
||||
for (String key : keys) {
|
||||
if(key.endsWith(".")) {
|
||||
prefixes.add(key);
|
||||
continue;
|
||||
}
|
||||
String v = params.get(key);
|
||||
if(v != null) props.put(key,v);
|
||||
}
|
||||
}
|
||||
if(prefixes.isEmpty()) return;
|
||||
Iterator<String> it = params.getParameterNamesIterator();
|
||||
String prefix = null;
|
||||
for(;it.hasNext();){
|
||||
String name = it.next();
|
||||
for (int i = 0; i < prefixes.size(); i++) {
|
||||
if(name.startsWith(prefixes.get(i))){
|
||||
String val = params.get(name);
|
||||
if(val !=null) props.put(name,val);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
|
|
@ -183,8 +183,8 @@ public class CustomCollectionTest extends AbstractFullDistribZkTestBase {
|
|||
client = createCloudClient(COLL_PREFIX + i);
|
||||
}
|
||||
|
||||
Map<String, Object> props = OverseerCollectionProcessor.asMap(
|
||||
ROUTER, ImplicitDocRouter.NAME,
|
||||
Map<String, Object> props = ZkNodeProps.makeMap(
|
||||
"router.name", ImplicitDocRouter.NAME,
|
||||
REPLICATION_FACTOR, replicationFactor,
|
||||
MAX_SHARDS_PER_NODE, maxShardsPerNode,
|
||||
SHARDS_PROP,"a,b,c");
|
||||
|
@ -216,7 +216,7 @@ public class CustomCollectionTest extends AbstractFullDistribZkTestBase {
|
|||
ClusterState clusterState = zkStateReader.getClusterState();
|
||||
|
||||
DocCollection coll = clusterState.getCollection(COLL_PREFIX + 0);
|
||||
assertEquals("implicit", coll.getStr(ROUTER));
|
||||
assertEquals("implicit", ((Map)coll.get(ROUTER)).get("name") );
|
||||
assertNotNull(coll.getStr(REPLICATION_FACTOR));
|
||||
assertNotNull(coll.getStr(MAX_SHARDS_PER_NODE));
|
||||
|
||||
|
@ -308,12 +308,12 @@ public class CustomCollectionTest extends AbstractFullDistribZkTestBase {
|
|||
String shard_fld = "shard_s";
|
||||
try {
|
||||
client = createCloudClient(null);
|
||||
Map<String, Object> props = OverseerCollectionProcessor.asMap(
|
||||
ROUTER, ImplicitDocRouter.NAME,
|
||||
Map<String, Object> props = ZkNodeProps.makeMap(
|
||||
"router.name", ImplicitDocRouter.NAME,
|
||||
REPLICATION_FACTOR, replicationFactor,
|
||||
MAX_SHARDS_PER_NODE, maxShardsPerNode,
|
||||
SHARDS_PROP,"a,b,c,d",
|
||||
DocRouter.ROUTE_FIELD, shard_fld);
|
||||
"router.field", shard_fld);
|
||||
|
||||
collectionName = COLL_PREFIX + "withShardField";
|
||||
createCollection(collectionInfos, collectionName,props,client);
|
||||
|
@ -371,11 +371,11 @@ public class CustomCollectionTest extends AbstractFullDistribZkTestBase {
|
|||
String shard_fld = "shard_s";
|
||||
try {
|
||||
client = createCloudClient(null);
|
||||
Map<String, Object> props = OverseerCollectionProcessor.asMap(
|
||||
Map<String, Object> props = ZkNodeProps.makeMap(
|
||||
REPLICATION_FACTOR, replicationFactor,
|
||||
MAX_SHARDS_PER_NODE, maxShardsPerNode,
|
||||
NUM_SLICES,numShards,
|
||||
DocRouter.ROUTE_FIELD, shard_fld);
|
||||
"router.field", shard_fld);
|
||||
|
||||
createCollection(collectionInfos, collectionName,props,client);
|
||||
} finally {
|
||||
|
|
|
@ -151,7 +151,7 @@ public class SliceStateUpdateTest extends SolrTestCaseJ4 {
|
|||
prop.put("state", "inactive");
|
||||
Slice newSlice = new Slice(slice.getName(), slice.getReplicasMap(), prop);
|
||||
slicesMap.put(newSlice.getName(), newSlice);
|
||||
props.put(DocCollection.DOC_ROUTER, ImplicitDocRouter.NAME);
|
||||
props.put(DocCollection.DOC_ROUTER, ZkNodeProps.makeMap("name", ImplicitDocRouter.NAME));
|
||||
|
||||
DocCollection coll = new DocCollection("collection1", slicesMap, props, DocRouter.DEFAULT);
|
||||
collectionStates.put("collection1", coll);
|
||||
|
|
|
@ -260,7 +260,8 @@ public class ClusterState implements JSONWriter.Writable {
|
|||
objs.remove(DocCollection.SHARDS);
|
||||
}
|
||||
|
||||
DocRouter router = DocRouter.getDocRouter(props.get(DocCollection.DOC_ROUTER));
|
||||
Map map = (Map) props.get(DocCollection.DOC_ROUTER);
|
||||
DocRouter router = map == null ? DocRouter.DEFAULT : DocRouter.getDocRouter(map.get("name"));
|
||||
return new DocCollection(name, slices, props, router);
|
||||
}
|
||||
|
||||
|
|
|
@ -61,7 +61,7 @@ public class CompositeIdRouter extends HashBasedRouter {
|
|||
|
||||
@Override
|
||||
public int sliceHash(String id, SolrInputDocument doc, SolrParams params, DocCollection collection) {
|
||||
String shardFieldName = collection ==null? null: collection.getStr(DocRouter.ROUTE_FIELD);
|
||||
String shardFieldName = getRouteField(collection);
|
||||
String part1 = null;
|
||||
int idx = 0;
|
||||
int commaIdx = 0;
|
||||
|
|
|
@ -45,7 +45,7 @@ public class DocCollection extends ZkNodeProps {
|
|||
* @param props The properties of the slice. This is used directly and a copy is not made.
|
||||
*/
|
||||
public DocCollection(String name, Map<String, Slice> slices, Map<String, Object> props, DocRouter router) {
|
||||
super( props==null ? Collections.<String,Object>emptyMap() : props);
|
||||
super( props==null ? props = new HashMap<String,Object>() : props);
|
||||
this.name = name;
|
||||
|
||||
this.slices = slices;
|
||||
|
|
|
@ -28,9 +28,12 @@ import java.util.Collection;
|
|||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.apache.solr.common.cloud.DocCollection.DOC_ROUTER;
|
||||
|
||||
/**
|
||||
* Class to partition int range into n ranges.
|
||||
* @lucene.experimental
|
||||
|
@ -38,12 +41,32 @@ import java.util.Map;
|
|||
public abstract class DocRouter {
|
||||
public static final String DEFAULT_NAME = CompositeIdRouter.NAME;
|
||||
public static final DocRouter DEFAULT = new CompositeIdRouter();
|
||||
public static final String ROUTE_FIELD = "routeField";
|
||||
|
||||
public static DocRouter getDocRouter(Object routerSpec) {
|
||||
DocRouter router = routerMap.get(routerSpec);
|
||||
|
||||
public static DocRouter getDocRouter(Object routerName) {
|
||||
DocRouter router = routerMap.get(routerName);
|
||||
if (router != null) return router;
|
||||
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Unknown document router '"+ routerSpec + "'");
|
||||
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Unknown document router '"+ routerName + "'");
|
||||
}
|
||||
|
||||
protected String getRouteField(DocCollection coll){
|
||||
if(coll == null) return null;
|
||||
Map m = (Map) coll.get(DOC_ROUTER);
|
||||
if(m == null) return null;
|
||||
return (String) m.get("field");
|
||||
|
||||
}
|
||||
|
||||
public static Map<String,Object> getRouterSpec(ZkNodeProps props){
|
||||
Map<String,Object> map = new LinkedHashMap<String, Object>();
|
||||
for (String s : props.keySet()) {
|
||||
if(s.startsWith("router.")){
|
||||
map.put(s.substring(7), props.get(s));
|
||||
}
|
||||
}
|
||||
if(map.get("name") == null) map.put("name", DEFAULT_NAME);
|
||||
return map;
|
||||
|
||||
}
|
||||
|
||||
// currently just an implementation detail...
|
||||
|
|
|
@ -26,7 +26,6 @@ import org.slf4j.LoggerFactory;
|
|||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
import static org.apache.solr.common.params.ShardParams._ROUTE_;
|
||||
|
||||
/** This document router is for custom sharding
|
||||
|
@ -42,7 +41,7 @@ public class ImplicitDocRouter extends DocRouter {
|
|||
public Slice getTargetSlice(String id, SolrInputDocument sdoc, SolrParams params, DocCollection collection) {
|
||||
String shard = null;
|
||||
if (sdoc != null) {
|
||||
String f = collection.getStr(ROUTE_FIELD);
|
||||
String f = getRouteField(collection);
|
||||
if(f !=null) {
|
||||
Object o = sdoc.getFieldValue(f);
|
||||
if (o != null) shard = o.toString();
|
||||
|
|
|
@ -58,7 +58,7 @@ public class ZkNodeProps implements JSONWriter.Writable {
|
|||
if ((keyVals.length & 0x01) != 0) {
|
||||
throw new IllegalArgumentException("arguments should be key,value");
|
||||
}
|
||||
Map<String,Object> propMap = new HashMap<String,Object>(keyVals.length>>1);
|
||||
Map<String,Object> propMap = new LinkedHashMap<String,Object>(keyVals.length>>1);
|
||||
for (int i = 0; i < keyVals.length; i+=2) {
|
||||
propMap.put(keyVals[i].toString(), keyVals[i+1]);
|
||||
}
|
||||
|
@ -125,7 +125,7 @@ public class ZkNodeProps implements JSONWriter.Writable {
|
|||
public Object get(String key) {
|
||||
return propMap.get(key);
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return JSONUtil.toJSON(this);
|
||||
|
|
|
@ -1584,7 +1584,7 @@ public abstract class AbstractFullDistribZkTestBase extends AbstractDistribZkTes
|
|||
String collectionName, int numShards, int numReplicas, int maxShardsPerNode, SolrServer client, String createNodeSetStr) throws SolrServerException, IOException {
|
||||
|
||||
createCollection(collectionInfos, collectionName,
|
||||
OverseerCollectionProcessor.asMap(
|
||||
ZkNodeProps.makeMap(
|
||||
NUM_SLICES, numShards,
|
||||
REPLICATION_FACTOR, numReplicas,
|
||||
CREATE_NODE_SET, createNodeSetStr,
|
||||
|
|
Loading…
Reference in New Issue