SOLR-2592: delegate query-time shard selection to collection router

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1416744 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Yonik Seeley 2012-12-04 02:01:02 +00:00
parent 6024e1465e
commit e34b5ea98e
9 changed files with 163 additions and 76 deletions

View File

@ -17,6 +17,7 @@ package org.apache.solr.handler.component;
*/
import java.net.ConnectException;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@ -42,6 +43,7 @@ import org.apache.solr.cloud.ZkController;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.ZkCoreNodeProps;
@ -240,6 +242,7 @@ public class HttpShardHandler extends ShardHandler {
future.cancel(true);
}
}
public void checkDistributed(ResponseBuilder rb) {
SolrQueryRequest req = rb.req;
SolrParams params = req.getParams();
@ -279,14 +282,17 @@ public class HttpShardHandler extends ShardHandler {
}
}
} else if (zkController != null) {
// we weren't provided with a list of slices to query, so find the list that will cover the complete index
// we weren't provided with an explicit list of slices to query via "shards", so use the cluster state
clusterState = zkController.getClusterState();
String shardKeys = params.get(ShardParams.SHARD_KEYS);
// This can be more efficient... we only record the name, even though we
// have the shard info we need in the next step of mapping slice->shards
// Stores the comma-separated list of specified collections.
// This will be the complete list of slices we need to query for this request.
slices = new HashMap<String,Slice>();
// we need to find out what collections this request is for.
// A comma-separated list of specified collections.
// Eg: "collection1,collection2,collection3"
String collections = params.get("collection");
if (collections != null) {
@ -294,26 +300,23 @@ public class HttpShardHandler extends ShardHandler {
// each parameter and store as a seperate member of a List.
List<String> collectionList = StrUtils.splitSmart(collections, ",",
true);
// First create an empty HashMap to add the slice info to.
slices = new HashMap<String,Slice>();
// In turn, retrieve the slices that cover each collection from the
// cloud state and add them to the Map 'slices'.
for (int i = 0; i < collectionList.size(); i++) {
String collection = collectionList.get(i);
ClientUtils.appendMap(collection, slices, clusterState.getSlicesMap(collection));
for (String collectionName : collectionList) {
DocCollection coll = clusterState.getCollection(collectionName);
// The original code produced <collection-name>_<shard-name> when the collections
// parameter was specified (see ClientUtils.appendMap)
// Is this necessary if ony one collection is specified?
// i.e. should we change multiCollection to collectionList.size() > 1?
addSlices(slices, clusterState, params, collectionName, shardKeys, true);
}
} else {
// If no collections were specified, default to the collection for
// this core.
slices = clusterState.getSlicesMap(cloudDescriptor.getCollectionName());
if (slices == null) {
throw new SolrException(ErrorCode.BAD_REQUEST,
"Could not find collection:"
+ cloudDescriptor.getCollectionName());
}
// just this collection
String collectionName = cloudDescriptor.getCollectionName();
DocCollection coll = clusterState.getCollection(cloudDescriptor.getCollectionName());
addSlices(slices, clusterState, params, collectionName, shardKeys, false);
}
// Store the logical slices in the ResponseBuilder and create a new
// String array to hold the physical shards (which will be mapped
@ -388,5 +391,21 @@ public class HttpShardHandler extends ShardHandler {
}
}
}
private void addSlices(Map<String,Slice> target, ClusterState state, SolrParams params, String collectionName, String shardKeys, boolean multiCollection) {
DocCollection coll = state.getCollection(collectionName);
if (shardKeys != null) {
List<String> shardKeyList = StrUtils.splitSmart(shardKeys, ",", true);
for (String oneShardKey : shardKeyList) {
Collection<Slice> someSlices = coll.getRouter().getSearchSlices(oneShardKey, params, coll);
ClientUtils.addSlices(target, collectionName, someSlices, multiCollection);
}
} else {
Collection<Slice> someSlices = coll.getRouter().getSearchSlices(null, params, coll);
ClientUtils.addSlices(target, collectionName, someSlices, multiCollection);
}
}
}

View File

@ -38,7 +38,6 @@ import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.SolrInputField;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.DocRouter;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.ZkCoreNodeProps;
@ -190,7 +189,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
ClusterState cstate = zkController.getClusterState();
numNodes = cstate.getLiveNodes().size();
DocCollection coll = cstate.getCollection(collection);
Slice slice = coll.getRouter().getTargetShard(id, doc, req.getParams(), coll);
Slice slice = coll.getRouter().getTargetSlice(id, doc, req.getParams(), coll);
if (slice == null) {
// No slice found. Most strict routers will have already thrown an exception, so a null return is
@ -287,13 +286,6 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
}
private String getShard(int hash, String collection, ClusterState clusterState) {
// ranges should be part of the cloud state and eventually gotten from zk
// get the shard names
return clusterState.getShard(hash, collection);
}
// used for deleteByQuery to get the list of nodes this leader should forward to
private List<Node> setupRequest() {
List<Node> nodes = null;
@ -333,11 +325,8 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
@Override
public void processAdd(AddUpdateCommand cmd) throws IOException {
// TODO: check for id field?
int hash = 0;
if (zkEnabled) {
zkCheck();
hash = hash(cmd);
nodes = setupRequest(cmd.getHashableId(), cmd.getSolrInputDocument());
} else {
isLeader = getNonZkLeaderAssumption(req);
@ -1102,19 +1091,6 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
return urls;
}
// TODO: move this to AddUpdateCommand/DeleteUpdateCommand and cache it? And
// make the hash pluggable of course.
// The hash also needs to be pluggable
private int hash(AddUpdateCommand cmd) {
String hashableId = cmd.getHashableId();
return Hash.murmurhash3_x86_32(hashableId, 0, hashableId.length(), 0);
}
private int hash(DeleteUpdateCommand cmd) {
return Hash.murmurhash3_x86_32(cmd.getId(), 0, cmd.getId().length(), 0);
}
// RetryNodes are used in the case of 'forward to leader' where we want
// to try the latest leader on a fail in the case the leader just went down.
public static class RetryNode extends StdNode {

View File

@ -20,6 +20,7 @@ package org.apache.solr.client.solrj.impl;
import java.io.IOException;
import java.net.MalformedURLException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@ -195,9 +196,8 @@ public class CloudSolrServer extends SolrServer {
// Retrieve slices from the cloud state and, for each collection specified,
// add it to the Map of slices.
Map<String,Slice> slices = new HashMap<String,Slice>();
for (int i = 0; i < collectionList.size(); i++) {
String coll= collectionList.get(i);
ClientUtils.appendMap(coll, slices, clusterState.getSlicesMap(coll));
for (String collectionName : collectionList) {
ClientUtils.addSlices(slices, collectionName, clusterState.getSlices(collectionName), true);
}
Set<String> liveNodes = clusterState.getLiveNodes();

View File

@ -242,15 +242,13 @@ public class ClientUtils
catch (IOException e) {throw new RuntimeException(e);} // can't happen
return sb.toString();
}
public static void appendMap(String collection, Map<String,Slice> map1, Map<String,Slice> map2) {
if (map1==null)
map1 = new HashMap<String,Slice>();
if (map2!=null) {
Set<Entry<String,Slice>> entrySet = map2.entrySet();
for (Entry<String,Slice> entry : entrySet) {
map1.put(collection + "_" + entry.getKey(), entry.getValue());
}
/** Constructs a slices map from a collection of slices and handles disambiguation if multiple collections are being queried simultaneously */
public static void addSlices(Map<String,Slice> target, String collectionName, Collection<Slice> slices, boolean multiCollection) {
for (Slice slice : slices) {
String key = slice.getName();
if (multiCollection) key = collectionName + "_" + key;
target.put(key, slice);
}
}
}

View File

@ -133,7 +133,7 @@ public class ClusterState implements JSONWriter.Writable {
}
/**
* Get the named DocCollection object, or thow an exception if it doesn't exist.
* Get the named DocCollection object, or throw an exception if it doesn't exist.
*/
public DocCollection getCollection(String collection) {
DocCollection coll = collectionStates.get(collection);

View File

@ -24,6 +24,7 @@ import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.util.Hash;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@ -74,6 +75,14 @@ public abstract class DocRouter {
return hash >= min && hash <= max;
}
public boolean isSubsetOf(Range superset) {
return superset.min <= min && superset.max >= max;
}
public boolean overlaps(Range other) {
return includes(other.min) || includes(other.max) || isSubsetOf(other);
}
public String toString() {
return Integer.toHexString(min) + '-' + Integer.toHexString(max);
}
@ -144,28 +153,26 @@ public abstract class DocRouter {
}
public abstract Slice getTargetShard(String id, SolrInputDocument sdoc, SolrParams params, DocCollection collection);
/*
List<Slice> shardQuery(String id, SolrParams params, ClusterState state)
List<Slice> shardQuery(SolrParams params, ClusterState state)
*/
public abstract Slice getTargetSlice(String id, SolrInputDocument sdoc, SolrParams params, DocCollection collection);
/** This method is consulted to determine what slices should be queried for a request when
* an explicit shards parameter was not used.
* shardKey (normally from shard.keys) and params may be null.
**/
public abstract Collection<Slice> getSearchSlices(String shardKey, SolrParams params, DocCollection collection);
}
abstract class HashBasedRouter extends DocRouter {
@Override
public Slice getTargetShard(String id, SolrInputDocument sdoc, SolrParams params, DocCollection collection) {
public Slice getTargetSlice(String id, SolrInputDocument sdoc, SolrParams params, DocCollection collection) {
if (id == null) id = getId(sdoc, params);
int hash = shardHash(id, sdoc, params);
int hash = sliceHash(id, sdoc, params);
return hashToSlice(hash, collection);
}
protected int shardHash(String id, SolrInputDocument sdoc, SolrParams params) {
protected int sliceHash(String id, SolrInputDocument sdoc, SolrParams params) {
return Hash.murmurhash3_x86_32(id, 0, id.length(), 0);
}
@ -182,6 +189,19 @@ abstract class HashBasedRouter extends DocRouter {
}
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "No slice servicing hash code " + Integer.toHexString(hash) + " in " + collection);
}
@Override
public Collection<Slice> getSearchSlices(String shardKey, SolrParams params, DocCollection collection) {
if (shardKey == null) {
// search across whole collection
// TODO: this may need modification in the future when shard splitting could cause an overlap
return collection.getSlices();
}
// use the shardKey as an id for plain hashing
return Collections.singletonList(getTargetSlice(shardKey, null, params, collection));
}
}
class PlainIdRouter extends HashBasedRouter {
@ -190,12 +210,15 @@ class PlainIdRouter extends HashBasedRouter {
//
// user!uniqueid
// user,4!uniqueid
// user/4!uniqueid
//
class CompositeIdRouter extends HashBasedRouter {
public static final String NAME = "compositeId";
private int separator = '!';
// separator used to optionally specify number of bits to allocate toward first part.
private int bitsSepartor = '/';
private int bits = 16;
private int mask1 = 0xffff0000;
private int mask2 = 0x0000ffff;
@ -208,7 +231,7 @@ class CompositeIdRouter extends HashBasedRouter {
protected int getBits(String firstPart, int commaIdx) {
int v = 0;
for (int idx = commaIdx +1; idx<firstPart.length(); idx++) {
for (int idx = commaIdx + 1; idx<firstPart.length(); idx++) {
char ch = firstPart.charAt(idx);
if (ch < '0' || ch > '9') return -1;
v *= 10 + (ch - '0');
@ -217,7 +240,7 @@ class CompositeIdRouter extends HashBasedRouter {
}
@Override
protected int shardHash(String id, SolrInputDocument doc, SolrParams params) {
protected int sliceHash(String id, SolrInputDocument doc, SolrParams params) {
int idx = id.indexOf(separator);
if (idx < 0) {
return Hash.murmurhash3_x86_32(id, 0, id.length(), 0);
@ -227,13 +250,13 @@ class CompositeIdRouter extends HashBasedRouter {
int m2 = mask2;
String part1 = id.substring(0,idx);
int commaIdx = part1.indexOf(',');
int commaIdx = part1.indexOf(bitsSepartor);
if (commaIdx > 0) {
int firstBits = getBits(part1, commaIdx);
if (firstBits >= 0) {
m1 = -1 << (32-firstBits);
m2 = -1 >>> firstBits;
part1 = part1.substring(0, commaIdx); // actually, this isn't strictly necessary
part1 = part1.substring(0, commaIdx);
}
}
@ -244,4 +267,52 @@ class CompositeIdRouter extends HashBasedRouter {
return (hash1 & m1) | (hash2 & m2);
}
@Override
public Collection<Slice> getSearchSlices(String shardKey, SolrParams params, DocCollection collection) {
if (shardKey == null) {
// search across whole collection
// TODO: this may need modification in the future when shard splitting could cause an overlap
return collection.getSlices();
}
String id = shardKey;
int idx = shardKey.indexOf(separator);
if (idx < 0) {
// shardKey is a simple id, so don't do a range
return Collections.singletonList(hashToSlice(Hash.murmurhash3_x86_32(id, 0, id.length(), 0), collection));
}
int m1 = mask1;
int m2 = mask2;
String part1 = id.substring(0,idx);
int commaIdx = part1.indexOf(bitsSepartor);
if (commaIdx > 0) {
int firstBits = getBits(part1, commaIdx);
if (firstBits >= 0) {
m1 = -1 << (32-firstBits);
m2 = -1 >>> firstBits;
part1 = part1.substring(0, commaIdx);
}
}
// If the upper bits are 0xF0000000, the range we want to cover is
// 0xF0000000 0xFfffffff
int hash1 = Hash.murmurhash3_x86_32(part1, 0, part1.length(), 0);
int upperBits = hash1 & m1;
int lowerBound = upperBits;
int upperBound = upperBits | m2;
Range completeRange = new Range(lowerBound, upperBound);
List<Slice> slices = new ArrayList(1);
for (Slice slice : slices) {
Range range = slice.getRange();
if (range != null && range.overlaps(completeRange)) {
slices.add(slice);
}
}
return slices;
}
}

View File

@ -21,13 +21,16 @@ import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.params.SolrParams;
import java.util.Collection;
import java.util.Collections;
/** This document router is for custom sharding
*/
public class ImplicitDocRouter extends DocRouter {
public static final String NAME = "implicit";
@Override
public Slice getTargetShard(String id, SolrInputDocument sdoc, SolrParams params, DocCollection collection) {
public Slice getTargetSlice(String id, SolrInputDocument sdoc, SolrParams params, DocCollection collection) {
String shard = null;
if (sdoc != null) {
Object o = sdoc.getFieldValue("_shard_");
@ -50,4 +53,19 @@ public class ImplicitDocRouter extends DocRouter {
return null; // no shard specified... use default.
}
@Override
public Collection<Slice> getSearchSlices(String shardKey, SolrParams params, DocCollection collection) {
if (shardKey == null) {
return collection.getSlices();
}
// assume the shardKey is just a slice name
Slice slice = collection.getSlice(shardKey);
if (slice == null) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "implicit router can't find shard " + shardKey + " in collection " + collection.getName());
}
return Collections.singleton(slice);
}
}

View File

@ -45,4 +45,7 @@ public interface ShardParams {
/** Should things fail if there is an error? (true/false) */
public static final String SHARDS_TOLERANT = "shards.tolerant";
/** Should things fail if there is an error? (true/false) */
public static final String SHARD_KEYS = "shard.keys";
}

View File

@ -182,6 +182,8 @@ public abstract class AbstractDistribZkTestBase extends BaseDistributedSearchTes
}
cnt++;
}
log.info("Recoveries finished - collection: " + collection);
}
protected void assertAllActive(String collection,ZkStateReader zkStateReader)