mirror of https://github.com/apache/lucene.git
SOLR-2592: refactor doc routers, use implicit router when implicity creating collection, use collection router to find correct shard when indexing
git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1416216 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
c62c57ed5a
commit
91a788e871
|
@ -41,6 +41,9 @@ public class AssignShard {
|
||||||
String returnShardId = null;
|
String returnShardId = null;
|
||||||
Map<String, Slice> sliceMap = state.getSlicesMap(collection);
|
Map<String, Slice> sliceMap = state.getSlicesMap(collection);
|
||||||
|
|
||||||
|
|
||||||
|
// TODO: now that we create shards ahead of time, is this code needed? Esp since hash ranges aren't assigned when creating via this method?
|
||||||
|
|
||||||
if (sliceMap == null) {
|
if (sliceMap == null) {
|
||||||
return "shard1";
|
return "shard1";
|
||||||
}
|
}
|
||||||
|
@ -51,6 +54,8 @@ public class AssignShard {
|
||||||
return "shard" + (shardIdNames.size() + 1);
|
return "shard" + (shardIdNames.size() + 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO: don't need to sort to find shard with fewest replicas!
|
||||||
|
|
||||||
// else figure out which shard needs more replicas
|
// else figure out which shard needs more replicas
|
||||||
final Map<String, Integer> map = new HashMap<String, Integer>();
|
final Map<String, Integer> map = new HashMap<String, Integer>();
|
||||||
for (String shardId : shardIdNames) {
|
for (String shardId : shardIdNames) {
|
||||||
|
|
|
@ -29,6 +29,7 @@ import org.apache.solr.common.cloud.ClosableThread;
|
||||||
import org.apache.solr.common.cloud.DocCollection;
|
import org.apache.solr.common.cloud.DocCollection;
|
||||||
import org.apache.solr.common.cloud.DocRouter;
|
import org.apache.solr.common.cloud.DocRouter;
|
||||||
import org.apache.solr.common.cloud.DocRouter;
|
import org.apache.solr.common.cloud.DocRouter;
|
||||||
|
import org.apache.solr.common.cloud.ImplicitDocRouter;
|
||||||
import org.apache.solr.common.cloud.Replica;
|
import org.apache.solr.common.cloud.Replica;
|
||||||
import org.apache.solr.common.cloud.Slice;
|
import org.apache.solr.common.cloud.Slice;
|
||||||
import org.apache.solr.common.cloud.SolrZkClient;
|
import org.apache.solr.common.cloud.SolrZkClient;
|
||||||
|
@ -286,9 +287,8 @@ public class Overseer {
|
||||||
private ClusterState createCollection(ClusterState state, String collectionName, int numShards) {
|
private ClusterState createCollection(ClusterState state, String collectionName, int numShards) {
|
||||||
log.info("Create collection {} with numShards {}", collectionName, numShards);
|
log.info("Create collection {} with numShards {}", collectionName, numShards);
|
||||||
|
|
||||||
DocRouter hp = new DocRouter();
|
DocRouter router = DocRouter.DEFAULT;
|
||||||
List<DocRouter.Range> ranges = hp.partitionRange(numShards, hp.fullRange());
|
List<DocRouter.Range> ranges = router.partitionRange(numShards, router.fullRange());
|
||||||
|
|
||||||
|
|
||||||
Map<String, DocCollection> newCollections = new LinkedHashMap<String,DocCollection>();
|
Map<String, DocCollection> newCollections = new LinkedHashMap<String,DocCollection>();
|
||||||
|
|
||||||
|
@ -306,7 +306,6 @@ public class Overseer {
|
||||||
|
|
||||||
// TODO: fill in with collection properties read from the /collections/<collectionName> node
|
// TODO: fill in with collection properties read from the /collections/<collectionName> node
|
||||||
Map<String,Object> collectionProps = defaultCollectionProps();
|
Map<String,Object> collectionProps = defaultCollectionProps();
|
||||||
DocRouter router = DocRouter.DEFAULT;
|
|
||||||
|
|
||||||
DocCollection newCollection = new DocCollection(collectionName, newSlices, collectionProps, router);
|
DocCollection newCollection = new DocCollection(collectionName, newSlices, collectionProps, router);
|
||||||
|
|
||||||
|
@ -343,10 +342,12 @@ public class Overseer {
|
||||||
DocRouter router;
|
DocRouter router;
|
||||||
|
|
||||||
if (coll == null) {
|
if (coll == null) {
|
||||||
// TODO: is updateSlice really called on a collection that doesn't exist?
|
// when updateSlice is called on a collection that doesn't exist, it's currently when a core is publishing itself
|
||||||
|
// without explicitly creating a collection. In this current case, we assume custom sharding with an "implicit" router.
|
||||||
slices = new HashMap<String, Slice>(1);
|
slices = new HashMap<String, Slice>(1);
|
||||||
props = defaultCollectionProps();
|
props = new HashMap<String,Object>(1);
|
||||||
router = DocRouter.DEFAULT;
|
props.put(DocCollection.DOC_ROUTER, ImplicitDocRouter.NAME);
|
||||||
|
router = new ImplicitDocRouter();
|
||||||
} else {
|
} else {
|
||||||
props = coll.getProperties();
|
props = coll.getProperties();
|
||||||
router = coll.getRouter();
|
router = coll.getRouter();
|
||||||
|
|
|
@ -293,7 +293,7 @@ public class CoreAdminHandler extends RequestHandlerBase {
|
||||||
// TODO (cloud): get from the current core
|
// TODO (cloud): get from the current core
|
||||||
DocRouter.Range currentRange = new DocRouter.Range(Integer.MIN_VALUE, Integer.MAX_VALUE);
|
DocRouter.Range currentRange = new DocRouter.Range(Integer.MIN_VALUE, Integer.MAX_VALUE);
|
||||||
|
|
||||||
DocRouter hp = new DocRouter();
|
DocRouter hp = DocRouter.DEFAULT; // TODO: get actual doc router for collection if available
|
||||||
ranges = hp.partitionRange(partitions, currentRange);
|
ranges = hp.partitionRange(partitions, currentRange);
|
||||||
|
|
||||||
if (pathsArr == null) {
|
if (pathsArr == null) {
|
||||||
|
|
|
@ -177,200 +177,49 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private List<Node> setupRequest(int hash) {
|
|
||||||
List<Node> nodes = null;
|
|
||||||
|
|
||||||
// if we are in zk mode...
|
|
||||||
if (zkEnabled) {
|
|
||||||
// set num nodes
|
|
||||||
numNodes = zkController.getClusterState().getLiveNodes().size();
|
|
||||||
|
|
||||||
String shardId = getShard(hash, collection, zkController.getClusterState()); // get the right shard based on the hash...
|
|
||||||
|
|
||||||
try {
|
|
||||||
ZkCoreNodeProps leaderProps = new ZkCoreNodeProps(zkController.getZkStateReader().getLeaderProps(
|
|
||||||
collection, shardId));
|
|
||||||
|
|
||||||
String leaderNodeName = leaderProps.getCoreNodeName();
|
|
||||||
String coreName = req.getCore().getName();
|
|
||||||
String coreNodeName = zkController.getNodeName() + "_" + coreName;
|
|
||||||
isLeader = coreNodeName.equals(leaderNodeName);
|
|
||||||
|
|
||||||
DistribPhase phase =
|
|
||||||
DistribPhase.parseParam(req.getParams().get(DISTRIB_UPDATE_PARAM));
|
|
||||||
|
|
||||||
doDefensiveChecks(shardId, phase);
|
|
||||||
|
|
||||||
|
|
||||||
if (DistribPhase.FROMLEADER == phase) {
|
|
||||||
// we are coming from the leader, just go local - add no urls
|
|
||||||
forwardToLeader = false;
|
|
||||||
} else if (isLeader) {
|
|
||||||
// that means I want to forward onto my replicas...
|
|
||||||
// so get the replicas...
|
|
||||||
forwardToLeader = false;
|
|
||||||
List<ZkCoreNodeProps> replicaProps = zkController.getZkStateReader()
|
|
||||||
.getReplicaProps(collection, shardId, zkController.getNodeName(),
|
|
||||||
coreName, null, ZkStateReader.DOWN);
|
|
||||||
if (replicaProps != null) {
|
|
||||||
nodes = new ArrayList<Node>(replicaProps.size());
|
|
||||||
// check for test param that lets us miss replicas
|
|
||||||
String[] skipList = req.getParams().getParams("test.distrib.skip.servers");
|
|
||||||
Set<String> skipListSet = null;
|
|
||||||
if (skipList != null) {
|
|
||||||
skipListSet = new HashSet<String>(skipList.length);
|
|
||||||
skipListSet.addAll(Arrays.asList(skipList));
|
|
||||||
}
|
|
||||||
|
|
||||||
for (ZkCoreNodeProps props : replicaProps) {
|
|
||||||
if (skipList != null) {
|
|
||||||
if (!skipListSet.contains(props.getCoreUrl())) {
|
|
||||||
nodes.add(new StdNode(props));
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
nodes.add(new StdNode(props));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
} else {
|
|
||||||
// I need to forward onto the leader...
|
|
||||||
nodes = new ArrayList<Node>(1);
|
|
||||||
nodes.add(new RetryNode(leaderProps, zkController.getZkStateReader(), collection, shardId));
|
|
||||||
forwardToLeader = true;
|
|
||||||
}
|
|
||||||
|
|
||||||
} catch (InterruptedException e) {
|
|
||||||
Thread.currentThread().interrupt();
|
|
||||||
throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "",
|
|
||||||
e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return nodes;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
// trying to incrementally get things to pass
|
|
||||||
private List<Node> setupRequest2(int hash, String id, SolrInputDocument doc) {
|
|
||||||
List<Node> nodes = null;
|
|
||||||
|
|
||||||
// if we are in zk mode...
|
|
||||||
if (zkEnabled) {
|
|
||||||
|
|
||||||
//////
|
|
||||||
ClusterState cstate = zkController.getClusterState();
|
|
||||||
numNodes = cstate.getLiveNodes().size();
|
|
||||||
DocCollection coll = cstate.getCollection(collection);
|
|
||||||
Slice slice = coll.getRouter().getTargetShard(id, doc, req.getParams(), coll);
|
|
||||||
|
|
||||||
Replica leader = slice.getLeader();
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
//////
|
|
||||||
|
|
||||||
// set num nodes
|
|
||||||
numNodes = cstate.getLiveNodes().size();
|
|
||||||
|
|
||||||
String shardId = getShard(hash, collection, zkController.getClusterState()); // get the right shard based on the hash...
|
|
||||||
|
|
||||||
if (shardId != slice.getName()) {
|
|
||||||
System.out.println("######################## shardId="+shardId + " slice="+slice + " cstate=" + cstate);
|
|
||||||
}
|
|
||||||
|
|
||||||
try {
|
|
||||||
ZkCoreNodeProps leaderProps = new ZkCoreNodeProps(zkController.getZkStateReader().getLeaderProps(
|
|
||||||
collection, shardId));
|
|
||||||
|
|
||||||
String leaderNodeName = leaderProps.getCoreNodeName();
|
|
||||||
String coreName = req.getCore().getName();
|
|
||||||
String coreNodeName = zkController.getNodeName() + "_" + coreName;
|
|
||||||
isLeader = coreNodeName.equals(leaderNodeName);
|
|
||||||
|
|
||||||
DistribPhase phase =
|
|
||||||
DistribPhase.parseParam(req.getParams().get(DISTRIB_UPDATE_PARAM));
|
|
||||||
|
|
||||||
doDefensiveChecks(shardId, phase);
|
|
||||||
|
|
||||||
|
|
||||||
if (DistribPhase.FROMLEADER == phase) {
|
|
||||||
// we are coming from the leader, just go local - add no urls
|
|
||||||
forwardToLeader = false;
|
|
||||||
} else if (isLeader) {
|
|
||||||
// that means I want to forward onto my replicas...
|
|
||||||
// so get the replicas...
|
|
||||||
forwardToLeader = false;
|
|
||||||
List<ZkCoreNodeProps> replicaProps = zkController.getZkStateReader()
|
|
||||||
.getReplicaProps(collection, shardId, zkController.getNodeName(),
|
|
||||||
coreName, null, ZkStateReader.DOWN);
|
|
||||||
if (replicaProps != null) {
|
|
||||||
nodes = new ArrayList<Node>(replicaProps.size());
|
|
||||||
// check for test param that lets us miss replicas
|
|
||||||
String[] skipList = req.getParams().getParams("test.distrib.skip.servers");
|
|
||||||
Set<String> skipListSet = null;
|
|
||||||
if (skipList != null) {
|
|
||||||
skipListSet = new HashSet<String>(skipList.length);
|
|
||||||
skipListSet.addAll(Arrays.asList(skipList));
|
|
||||||
}
|
|
||||||
|
|
||||||
for (ZkCoreNodeProps props : replicaProps) {
|
|
||||||
if (skipList != null) {
|
|
||||||
if (!skipListSet.contains(props.getCoreUrl())) {
|
|
||||||
nodes.add(new StdNode(props));
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
nodes.add(new StdNode(props));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
} else {
|
|
||||||
// I need to forward onto the leader...
|
|
||||||
nodes = new ArrayList<Node>(1);
|
|
||||||
nodes.add(new RetryNode(leaderProps, zkController.getZkStateReader(), collection, shardId));
|
|
||||||
forwardToLeader = true;
|
|
||||||
}
|
|
||||||
|
|
||||||
} catch (InterruptedException e) {
|
|
||||||
Thread.currentThread().interrupt();
|
|
||||||
throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "",
|
|
||||||
e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return nodes;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
// use old code for now
|
|
||||||
private List<Node> setupRequest(String id, SolrInputDocument doc) {
|
private List<Node> setupRequest(String id, SolrInputDocument doc) {
|
||||||
// return setupRequest2(DocRouter.DEFAULT.shardHash(id, null, null), id, doc);
|
|
||||||
return setupRequest(DocRouter.DEFAULT.shardHash(id, null, null));
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
private List<Node> setupRequestX(String id, SolrInputDocument doc) {
|
|
||||||
List<Node> nodes = null;
|
List<Node> nodes = null;
|
||||||
|
|
||||||
// if we are in zk mode...
|
// if we are in zk mode...
|
||||||
if (zkEnabled) {
|
if (zkEnabled) {
|
||||||
// set num nodes
|
|
||||||
|
String coreName = req.getCore().getName();
|
||||||
|
String coreNodeName = zkController.getNodeName() + "_" + coreName;
|
||||||
|
|
||||||
ClusterState cstate = zkController.getClusterState();
|
ClusterState cstate = zkController.getClusterState();
|
||||||
numNodes = cstate.getLiveNodes().size();
|
numNodes = cstate.getLiveNodes().size();
|
||||||
DocCollection coll = cstate.getCollection(collection);
|
DocCollection coll = cstate.getCollection(collection);
|
||||||
Slice slice = coll.getRouter().getTargetShard(id, doc, req.getParams(), coll);
|
Slice slice = coll.getRouter().getTargetShard(id, doc, req.getParams(), coll);
|
||||||
|
|
||||||
Replica leader = slice.getLeader();
|
if (slice == null) {
|
||||||
|
// No slice found. Most strict routers will have already thrown an exception, so a null return is
|
||||||
|
// a signal to use the slice of this core.
|
||||||
|
// TODO: what if this core is not in the targeted collection?
|
||||||
|
String shardId = req.getCore().getCoreDescriptor().getCloudDescriptor().getShardId();
|
||||||
|
slice = coll.getSlice(shardId);
|
||||||
|
if (slice == null) {
|
||||||
|
throw new SolrException(ErrorCode.BAD_REQUEST, "No shard " + shardId + " in " + coll);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
String coreName = req.getCore().getName();
|
|
||||||
String coreNodeName = zkController.getNodeName() + "_" + coreName;
|
String shardId = slice.getName();
|
||||||
isLeader = coreNodeName.equals(leader.getName()); // is this me?
|
|
||||||
|
try {
|
||||||
|
// Not equivalent to getLeaderProps, which does retries to find a leader.
|
||||||
|
// Replica leader = slice.getLeader();
|
||||||
|
|
||||||
|
ZkCoreNodeProps leaderProps = new ZkCoreNodeProps(zkController.getZkStateReader().getLeaderProps(
|
||||||
|
collection, shardId));
|
||||||
|
|
||||||
|
String leaderNodeName = leaderProps.getCoreNodeName();
|
||||||
|
isLeader = coreNodeName.equals(leaderNodeName);
|
||||||
|
|
||||||
DistribPhase phase =
|
DistribPhase phase =
|
||||||
DistribPhase.parseParam(req.getParams().get(DISTRIB_UPDATE_PARAM));
|
DistribPhase.parseParam(req.getParams().get(DISTRIB_UPDATE_PARAM));
|
||||||
|
|
||||||
doDefensiveChecks(slice.getName(), phase);
|
doDefensiveChecks(shardId, phase);
|
||||||
|
|
||||||
|
|
||||||
if (DistribPhase.FROMLEADER == phase) {
|
if (DistribPhase.FROMLEADER == phase) {
|
||||||
|
@ -381,7 +230,7 @@ if (shardId != slice.getName()) {
|
||||||
// so get the replicas...
|
// so get the replicas...
|
||||||
forwardToLeader = false;
|
forwardToLeader = false;
|
||||||
List<ZkCoreNodeProps> replicaProps = zkController.getZkStateReader()
|
List<ZkCoreNodeProps> replicaProps = zkController.getZkStateReader()
|
||||||
.getReplicaProps(collection, slice.getName(), zkController.getNodeName(),
|
.getReplicaProps(collection, shardId, zkController.getNodeName(),
|
||||||
coreName, null, ZkStateReader.DOWN);
|
coreName, null, ZkStateReader.DOWN);
|
||||||
if (replicaProps != null) {
|
if (replicaProps != null) {
|
||||||
nodes = new ArrayList<Node>(replicaProps.size());
|
nodes = new ArrayList<Node>(replicaProps.size());
|
||||||
|
@ -407,10 +256,15 @@ if (shardId != slice.getName()) {
|
||||||
} else {
|
} else {
|
||||||
// I need to forward onto the leader...
|
// I need to forward onto the leader...
|
||||||
nodes = new ArrayList<Node>(1);
|
nodes = new ArrayList<Node>(1);
|
||||||
nodes.add(new RetryNode(new ZkCoreNodeProps(leader), zkController.getZkStateReader(), collection, leader.getName()));
|
nodes.add(new RetryNode(leaderProps, zkController.getZkStateReader(), collection, shardId));
|
||||||
forwardToLeader = true;
|
forwardToLeader = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
Thread.currentThread().interrupt();
|
||||||
|
throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "",
|
||||||
|
e);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return nodes;
|
return nodes;
|
||||||
|
|
|
@ -27,7 +27,7 @@ import org.apache.solr.common.cloud.DocRouter.Range;
|
||||||
public class TestHashPartitioner extends SolrTestCaseJ4 {
|
public class TestHashPartitioner extends SolrTestCaseJ4 {
|
||||||
|
|
||||||
public void testMapHashes() throws Exception {
|
public void testMapHashes() throws Exception {
|
||||||
DocRouter hp = new DocRouter();
|
DocRouter hp = DocRouter.DEFAULT;
|
||||||
List<Range> ranges;
|
List<Range> ranges;
|
||||||
|
|
||||||
// make sure the partitioner uses the "natural" boundaries and doesn't suffer from an off-by-one
|
// make sure the partitioner uses the "natural" boundaries and doesn't suffer from an off-by-one
|
||||||
|
|
|
@ -51,9 +51,6 @@ public class ClusterState implements JSONWriter.Writable {
|
||||||
private final Set<String> liveNodes;
|
private final Set<String> liveNodes;
|
||||||
|
|
||||||
private final Map<String,RangeInfo> rangeInfos = new HashMap<String,RangeInfo>();
|
private final Map<String,RangeInfo> rangeInfos = new HashMap<String,RangeInfo>();
|
||||||
private final Map<String,Map<String,ZkNodeProps>> leaders = new HashMap<String,Map<String,ZkNodeProps>>();
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Use this constr when ClusterState is meant for publication.
|
* Use this constr when ClusterState is meant for publication.
|
||||||
|
@ -80,9 +77,9 @@ public class ClusterState implements JSONWriter.Writable {
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get properties of a shard/slice leader for specific collection.
|
* Get properties of a shard/slice leader for specific collection, or null if one currently doesn't exist.
|
||||||
*/
|
*/
|
||||||
public ZkNodeProps getLeader(String collection, String sliceName) {
|
public Replica getLeader(String collection, String sliceName) {
|
||||||
DocCollection coll = collectionStates.get(collection);
|
DocCollection coll = collectionStates.get(collection);
|
||||||
if (coll == null) return null;
|
if (coll == null) return null;
|
||||||
Slice slice = coll.getSlice(sliceName);
|
Slice slice = coll.getSlice(sliceName);
|
||||||
|
@ -283,7 +280,7 @@ public class ClusterState implements JSONWriter.Writable {
|
||||||
private static DocCollection collectionFromObjects(String name, Map<String,Object> objs) {
|
private static DocCollection collectionFromObjects(String name, Map<String,Object> objs) {
|
||||||
Map<String,Object> props = (Map<String,Object>)objs.get(DocCollection.PROPERTIES);
|
Map<String,Object> props = (Map<String,Object>)objs.get(DocCollection.PROPERTIES);
|
||||||
if (props == null) props = Collections.emptyMap();
|
if (props == null) props = Collections.emptyMap();
|
||||||
DocRouter router = getRouter(props.get(DocCollection.DOC_ROUTER));
|
DocRouter router = DocRouter.getDocRouter(props.get(DocCollection.DOC_ROUTER));
|
||||||
Map<String,Slice> slices = makeSlices(objs);
|
Map<String,Slice> slices = makeSlices(objs);
|
||||||
return new DocCollection(name, slices, props, router);
|
return new DocCollection(name, slices, props, router);
|
||||||
}
|
}
|
||||||
|
@ -306,19 +303,6 @@ public class ClusterState implements JSONWriter.Writable {
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
private static DocRouter getRouter(Object routerSpec) {
|
|
||||||
if (routerSpec == null) return new PlainIdRouter(); // back compat with 4.0
|
|
||||||
|
|
||||||
if (DocRouter.DEFAULT_NAME.equals(routerSpec)) {
|
|
||||||
return DocRouter.DEFAULT;
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO: how to instantiate custom routers?
|
|
||||||
|
|
||||||
throw new SolrException(ErrorCode.SERVER_ERROR, "Unknown document router '"+ routerSpec + "'");
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void write(JSONWriter jsonWriter) {
|
public void write(JSONWriter jsonWriter) {
|
||||||
jsonWriter.write(collectionStates);
|
jsonWriter.write(collectionStates);
|
||||||
|
|
|
@ -25,16 +25,38 @@ import org.apache.solr.common.util.Hash;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Class to partition int range into n ranges.
|
* Class to partition int range into n ranges.
|
||||||
* @lucene.experimental
|
* @lucene.experimental
|
||||||
*/
|
*/
|
||||||
public class DocRouter {
|
public abstract class DocRouter {
|
||||||
public static final String DEFAULT_NAME = "compositeId";
|
public static final String DEFAULT_NAME = CompositeIdRouter.NAME;
|
||||||
public static final DocRouter DEFAULT = new CompositeIdRouter();
|
public static final DocRouter DEFAULT = new CompositeIdRouter();
|
||||||
|
|
||||||
|
public static DocRouter getDocRouter(Object routerSpec) {
|
||||||
|
DocRouter router = routerMap.get(routerSpec);
|
||||||
|
if (router != null) return router;
|
||||||
|
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Unknown document router '"+ routerSpec + "'");
|
||||||
|
}
|
||||||
|
|
||||||
|
// currently just an implementation detail...
|
||||||
|
private final static Map<String, DocRouter> routerMap;
|
||||||
|
static {
|
||||||
|
routerMap = new HashMap<String, DocRouter>();
|
||||||
|
PlainIdRouter plain = new PlainIdRouter();
|
||||||
|
// instead of doing back compat this way, we could always convert the clusterstate on first read to "plain" if it doesn't have any properties.
|
||||||
|
routerMap.put(null, plain); // back compat with 4.0
|
||||||
|
routerMap.put(PlainIdRouter.NAME, plain);
|
||||||
|
routerMap.put(CompositeIdRouter.NAME, DEFAULT_NAME.equals(CompositeIdRouter.NAME) ? DEFAULT : new CompositeIdRouter());
|
||||||
|
routerMap.put(ImplicitDocRouter.NAME, new ImplicitDocRouter());
|
||||||
|
// NOTE: careful that the map keys (the static .NAME members) are filled in by making them final
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
// Hash ranges can't currently "wrap" - i.e. max must be greater or equal to min.
|
// Hash ranges can't currently "wrap" - i.e. max must be greater or equal to min.
|
||||||
// TODO: ranges may not be all contiguous in the future (either that or we will
|
// TODO: ranges may not be all contiguous in the future (either that or we will
|
||||||
// need an extra class to model a collection of ranges)
|
// need an extra class to model a collection of ranges)
|
||||||
|
@ -122,47 +144,57 @@ public class DocRouter {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
public int shardHash(String id, SolrInputDocument sdoc, SolrParams params) {
|
public abstract Slice getTargetShard(String id, SolrInputDocument sdoc, SolrParams params, DocCollection collection);
|
||||||
return Hash.murmurhash3_x86_32(id, 0, id.length(), 0);
|
|
||||||
}
|
|
||||||
|
|
||||||
public String getId(SolrInputDocument sdoc, SolrParams params) {
|
|
||||||
Object idObj = sdoc.getFieldValue("id"); // blech
|
|
||||||
String id = idObj != null ? idObj.toString() : "null"; // should only happen on client side
|
|
||||||
return id;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
List<Slice> shardQuery(String id, SolrParams params, ClusterState state)
|
||||||
|
List<Slice> shardQuery(SolrParams params, ClusterState state)
|
||||||
|
*/
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
abstract class HashBasedRouter extends DocRouter {
|
||||||
|
|
||||||
|
@Override
|
||||||
public Slice getTargetShard(String id, SolrInputDocument sdoc, SolrParams params, DocCollection collection) {
|
public Slice getTargetShard(String id, SolrInputDocument sdoc, SolrParams params, DocCollection collection) {
|
||||||
if (id == null) id = getId(sdoc, params);
|
if (id == null) id = getId(sdoc, params);
|
||||||
int hash = shardHash(id, sdoc, params);
|
int hash = shardHash(id, sdoc, params);
|
||||||
return hashToSlice(hash, collection);
|
return hashToSlice(hash, collection);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected int shardHash(String id, SolrInputDocument sdoc, SolrParams params) {
|
||||||
|
return Hash.murmurhash3_x86_32(id, 0, id.length(), 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected String getId(SolrInputDocument sdoc, SolrParams params) {
|
||||||
|
Object idObj = sdoc.getFieldValue("id"); // blech
|
||||||
|
String id = idObj != null ? idObj.toString() : "null"; // should only happen on client side
|
||||||
|
return id;
|
||||||
|
}
|
||||||
|
|
||||||
protected Slice hashToSlice(int hash, DocCollection collection) {
|
protected Slice hashToSlice(int hash, DocCollection collection) {
|
||||||
for (Slice slice : collection.getSlices()) {
|
for (Slice slice : collection.getSlices()) {
|
||||||
DocRouter.Range range = slice.getRange();
|
DocRouter.Range range = slice.getRange();
|
||||||
if (range != null && range.includes(hash)) return slice;
|
if (range != null && range.includes(hash)) return slice;
|
||||||
}
|
}
|
||||||
// return null or throw exception?
|
|
||||||
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "No slice servicing hash code " + Integer.toHexString(hash) + " in " + collection);
|
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "No slice servicing hash code " + Integer.toHexString(hash) + " in " + collection);
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
|
||||||
List<Slice> shardQuery(String id, SolrParams params, ClusterState state)
|
|
||||||
List<Slice> shardQuery(SolrParams params, ClusterState state)
|
|
||||||
*/
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
class PlainIdRouter extends HashBasedRouter {
|
||||||
class PlainIdRouter extends DocRouter {
|
public static final String NAME = "plain";
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
//
|
//
|
||||||
// user!uniqueid
|
// user!uniqueid
|
||||||
// user,4!uniqueid
|
// user,4!uniqueid
|
||||||
//
|
//
|
||||||
class CompositeIdRouter extends DocRouter {
|
class CompositeIdRouter extends HashBasedRouter {
|
||||||
|
public static final String NAME = "compositeId";
|
||||||
|
|
||||||
private int separator = '!';
|
private int separator = '!';
|
||||||
private int bits = 16;
|
private int bits = 16;
|
||||||
private int mask1 = 0xffff0000;
|
private int mask1 = 0xffff0000;
|
||||||
|
@ -185,7 +217,7 @@ class CompositeIdRouter extends DocRouter {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int shardHash(String id, SolrInputDocument doc, SolrParams params) {
|
protected int shardHash(String id, SolrInputDocument doc, SolrParams params) {
|
||||||
int idx = id.indexOf(separator);
|
int idx = id.indexOf(separator);
|
||||||
if (idx < 0) {
|
if (idx < 0) {
|
||||||
return Hash.murmurhash3_x86_32(id, 0, id.length(), 0);
|
return Hash.murmurhash3_x86_32(id, 0, id.length(), 0);
|
||||||
|
|
|
@ -0,0 +1,53 @@
|
||||||
|
package org.apache.solr.common.cloud;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
import org.apache.solr.common.SolrException;
|
||||||
|
import org.apache.solr.common.SolrInputDocument;
|
||||||
|
import org.apache.solr.common.params.SolrParams;
|
||||||
|
|
||||||
|
/** This document router is for custom sharding
|
||||||
|
*/
|
||||||
|
public class ImplicitDocRouter extends DocRouter {
|
||||||
|
public static final String NAME = "implicit";
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Slice getTargetShard(String id, SolrInputDocument sdoc, SolrParams params, DocCollection collection) {
|
||||||
|
String shard = null;
|
||||||
|
if (sdoc != null) {
|
||||||
|
Object o = sdoc.getFieldValue("_shard_");
|
||||||
|
if (o != null) {
|
||||||
|
shard = o.toString();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (shard == null) {
|
||||||
|
shard = params.get("_shard_");
|
||||||
|
}
|
||||||
|
|
||||||
|
if (shard != null) {
|
||||||
|
Slice slice = collection.getSlice(shard);
|
||||||
|
if (slice == null) {
|
||||||
|
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "No _shard_=" + shard + " in " + collection);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return null; // no shard specified... use default.
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -24,11 +24,12 @@ import java.util.Map;
|
||||||
|
|
||||||
public class Replica extends ZkNodeProps {
|
public class Replica extends ZkNodeProps {
|
||||||
private final String name;
|
private final String name;
|
||||||
|
private final String nodeName;
|
||||||
|
|
||||||
public Replica(String name, Map<String,Object> propMap) {
|
public Replica(String name, Map<String,Object> propMap) {
|
||||||
super(propMap);
|
super(propMap);
|
||||||
this.name = name;
|
this.name = name;
|
||||||
String nodeName = (String)propMap.get(ZkStateReader.NODE_NAME_PROP);
|
nodeName = (String)propMap.get(ZkStateReader.NODE_NAME_PROP);
|
||||||
assert nodeName == null || name.startsWith(nodeName);
|
assert nodeName == null || name.startsWith(nodeName);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -36,10 +37,13 @@ public class Replica extends ZkNodeProps {
|
||||||
return name;
|
return name;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/** The name of the node this replica resides on */
|
||||||
|
public String getNodeName() {
|
||||||
|
return nodeName;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String toString() {
|
public String toString() {
|
||||||
return name + ':' + JSONUtil.toJSON(propMap, -1); // small enough, keep it on one line (i.e. no indent)
|
return name + ':' + JSONUtil.toJSON(propMap, -1); // small enough, keep it on one line (i.e. no indent)
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: should we have a pointer back to the slice the replica belongs to?
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -53,11 +53,17 @@ public class Slice extends ZkNodeProps {
|
||||||
if (rangeObj instanceof DocRouter.Range) {
|
if (rangeObj instanceof DocRouter.Range) {
|
||||||
tmpRange = (DocRouter.Range)rangeObj;
|
tmpRange = (DocRouter.Range)rangeObj;
|
||||||
} else if (rangeObj != null) {
|
} else if (rangeObj != null) {
|
||||||
DocRouter hp = new DocRouter();
|
// Doesn't support custom implementations of Range, but currently not needed.
|
||||||
tmpRange = hp.fromString(rangeObj.toString());
|
tmpRange = DocRouter.DEFAULT.fromString(rangeObj.toString());
|
||||||
}
|
}
|
||||||
range = tmpRange;
|
range = tmpRange;
|
||||||
|
|
||||||
|
/** debugging. this isn't an error condition for custom sharding.
|
||||||
|
if (range == null) {
|
||||||
|
System.out.println("###### NO RANGE for " + name + " props=" + props);
|
||||||
|
}
|
||||||
|
**/
|
||||||
|
|
||||||
replicationFactor = null; // future
|
replicationFactor = null; // future
|
||||||
|
|
||||||
// add the replicas *after* the other properties (for aesthetics, so it's easy to find slice properties in the JSON output)
|
// add the replicas *after* the other properties (for aesthetics, so it's easy to find slice properties in the JSON output)
|
||||||
|
|
|
@ -397,19 +397,22 @@ public class ZkStateReader {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get shard leader properties.
|
* Get shard leader properties, with retry if none exist.
|
||||||
*/
|
*/
|
||||||
public ZkNodeProps getLeaderProps(String collection, String shard) throws InterruptedException {
|
public Replica getLeaderProps(String collection, String shard) throws InterruptedException {
|
||||||
return getLeaderProps(collection, shard, 1000);
|
return getLeaderProps(collection, shard, 1000);
|
||||||
}
|
}
|
||||||
|
|
||||||
public ZkNodeProps getLeaderProps(String collection, String shard, int timeout) throws InterruptedException {
|
/**
|
||||||
|
* Get shard leader properties, with retry if none exist.
|
||||||
|
*/
|
||||||
|
public Replica getLeaderProps(String collection, String shard, int timeout) throws InterruptedException {
|
||||||
long timeoutAt = System.currentTimeMillis() + timeout;
|
long timeoutAt = System.currentTimeMillis() + timeout;
|
||||||
while (System.currentTimeMillis() < timeoutAt) {
|
while (System.currentTimeMillis() < timeoutAt) {
|
||||||
if (clusterState != null) {
|
if (clusterState != null) {
|
||||||
final ZkNodeProps nodeProps = clusterState.getLeader(collection, shard);
|
Replica replica = clusterState.getLeader(collection, shard);
|
||||||
if (nodeProps != null && getClusterState().liveNodesContain((String) nodeProps.get(ZkStateReader.NODE_NAME_PROP))) {
|
if (replica != null && getClusterState().liveNodesContain(replica.getNodeName())) {
|
||||||
return nodeProps;
|
return replica;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Thread.sleep(50);
|
Thread.sleep(50);
|
||||||
|
|
Loading…
Reference in New Issue