mirror of
https://github.com/apache/lucene.git
synced 2025-02-23 10:51:29 +00:00
SOLR-2592: progress - introduce DocCollection, add properties for collections, add a router collection property, add some builtin routers, rename HashPartitioner to DocRouter
git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1416025 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
0dd0e6d598
commit
1cee959d6b
@ -263,7 +263,7 @@ sb.append("(group_name=").append(tg.getName()).append(")");
|
||||
|
||||
private Map<String,Object> getCoreProps(ZkController zkController, SolrCore core) {
|
||||
final String collection = core.getCoreDescriptor().getCloudDescriptor().getCollectionName();
|
||||
Replica props = zkController.getClusterState().getShardProps(collection, ZkStateReader.getCoreNodeName(zkController.getNodeName(), core.getName()));
|
||||
Replica props = zkController.getClusterState().getReplica(collection, ZkStateReader.getCoreNodeName(zkController.getNodeName(), core.getName()));
|
||||
if(props!=null) {
|
||||
return props.getProperties();
|
||||
}
|
||||
|
@ -39,7 +39,7 @@ public class AssignShard {
|
||||
numShards = 1;
|
||||
}
|
||||
String returnShardId = null;
|
||||
Map<String, Slice> sliceMap = state.getSlices(collection);
|
||||
Map<String, Slice> sliceMap = state.getSlicesMap(collection);
|
||||
|
||||
if (sliceMap == null) {
|
||||
return "shard1";
|
||||
|
@ -263,7 +263,7 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
|
||||
ZkNodeProps leaderProps, String collection, String shardId) {
|
||||
ClusterState clusterState = zkController.getZkStateReader()
|
||||
.getClusterState();
|
||||
Map<String,Slice> slices = clusterState.getSlices(collection);
|
||||
Map<String,Slice> slices = clusterState.getSlicesMap(collection);
|
||||
Slice slice = slices.get(shardId);
|
||||
Map<String,Replica> replicasMap = slice.getReplicasMap();
|
||||
for (Map.Entry<String,Replica> shard : replicasMap.entrySet()) {
|
||||
@ -323,6 +323,7 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
|
||||
|
||||
Thread.sleep(500);
|
||||
slices = zkController.getClusterState().getSlice(collection, shardId);
|
||||
// System.out.println("###### waitForReplicasToComeUp : slices=" + slices + " all=" + zkController.getClusterState().getCollectionStates() );
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -17,6 +17,7 @@ package org.apache.solr.cloud;
|
||||
* the License.
|
||||
*/
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.List;
|
||||
@ -25,7 +26,9 @@ import java.util.Map;
|
||||
import org.apache.solr.common.SolrException;
|
||||
import org.apache.solr.common.cloud.ClusterState;
|
||||
import org.apache.solr.common.cloud.ClosableThread;
|
||||
import org.apache.solr.common.cloud.HashPartitioner;
|
||||
import org.apache.solr.common.cloud.DocCollection;
|
||||
import org.apache.solr.common.cloud.DocRouter;
|
||||
import org.apache.solr.common.cloud.DocRouter;
|
||||
import org.apache.solr.common.cloud.Replica;
|
||||
import org.apache.solr.common.cloud.Slice;
|
||||
import org.apache.solr.common.cloud.SolrZkClient;
|
||||
@ -229,7 +232,7 @@ public class Overseer {
|
||||
//request new shardId
|
||||
if (collectionExists) {
|
||||
// use existing numShards
|
||||
numShards = state.getCollectionStates().get(collection).size();
|
||||
numShards = state.getCollectionStates().get(collection).getSlices().size();
|
||||
log.info("Collection already exists with " + ZkStateReader.NUM_SHARDS_PROP + "=" + numShards);
|
||||
}
|
||||
sliceName = AssignShard.assignShard(collection, state, numShards);
|
||||
@ -274,16 +277,24 @@ public class Overseer {
|
||||
return newClusterState;
|
||||
}
|
||||
|
||||
private Map<String,Object> defaultCollectionProps() {
|
||||
HashMap<String,Object> props = new HashMap<String, Object>(2);
|
||||
props.put(DocCollection.DOC_ROUTER, DocRouter.DEFAULT_NAME);
|
||||
return props;
|
||||
}
|
||||
|
||||
private ClusterState createCollection(ClusterState state, String collectionName, int numShards) {
|
||||
log.info("Create collection {} with numShards {}", collectionName, numShards);
|
||||
|
||||
HashPartitioner hp = new HashPartitioner();
|
||||
List<HashPartitioner.Range> ranges = hp.partitionRange(numShards, hp.fullRange());
|
||||
DocRouter hp = new DocRouter();
|
||||
List<DocRouter.Range> ranges = hp.partitionRange(numShards, hp.fullRange());
|
||||
|
||||
|
||||
Map<String, DocCollection> newCollections = new LinkedHashMap<String,DocCollection>();
|
||||
|
||||
|
||||
Map<String, Map<String, Slice>> newStates = new LinkedHashMap<String,Map<String, Slice>>();
|
||||
Map<String, Slice> newSlices = new LinkedHashMap<String,Slice>();
|
||||
newStates.putAll(state.getCollectionStates());
|
||||
newCollections.putAll(state.getCollectionStates());
|
||||
for (int i = 0; i < numShards; i++) {
|
||||
final String sliceName = "shard" + (i+1);
|
||||
|
||||
@ -292,20 +303,27 @@ public class Overseer {
|
||||
|
||||
newSlices.put(sliceName, new Slice(sliceName, null, sliceProps));
|
||||
}
|
||||
newStates.put(collectionName, newSlices);
|
||||
ClusterState newClusterState = new ClusterState(state.getLiveNodes(), newStates);
|
||||
|
||||
// TODO: fill in with collection properties read from the /collections/<collectionName> node
|
||||
Map<String,Object> collectionProps = defaultCollectionProps();
|
||||
DocRouter router = DocRouter.DEFAULT;
|
||||
|
||||
DocCollection newCollection = new DocCollection(collectionName, newSlices, collectionProps, router);
|
||||
|
||||
newCollections.put(collectionName, newCollection);
|
||||
ClusterState newClusterState = new ClusterState(state.getLiveNodes(), newCollections);
|
||||
return newClusterState;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* Return an already assigned id or null if not assigned
|
||||
*/
|
||||
private String getAssignedId(final ClusterState state, final String nodeName,
|
||||
final ZkNodeProps coreState) {
|
||||
final String key = coreState.getStr(ZkStateReader.NODE_NAME_PROP) + "_" + coreState.getStr(ZkStateReader.CORE_NAME_PROP);
|
||||
Map<String, Slice> slices = state.getSlices(coreState.getStr(ZkStateReader.COLLECTION_PROP));
|
||||
Collection<Slice> slices = state.getSlices(coreState.getStr(ZkStateReader.COLLECTION_PROP));
|
||||
if (slices != null) {
|
||||
for (Slice slice : slices.values()) {
|
||||
for (Slice slice : slices) {
|
||||
if (slice.getReplicasMap().get(key) != null) {
|
||||
return slice.getName();
|
||||
}
|
||||
@ -314,40 +332,47 @@ public class Overseer {
|
||||
return null;
|
||||
}
|
||||
|
||||
private ClusterState updateSlice(ClusterState state, String collection, Slice slice) {
|
||||
private ClusterState updateSlice(ClusterState state, String collectionName, Slice slice) {
|
||||
// System.out.println("###!!!### OLD CLUSTERSTATE: " + JSONUtil.toJSON(state.getCollectionStates()));
|
||||
// System.out.println("Updating slice:" + slice);
|
||||
|
||||
Map<String, Map<String, Slice>> newCollections = new LinkedHashMap<String,Map<String,Slice>>(state.getCollectionStates()); // make a shallow copy
|
||||
Map<String, Slice> slices = newCollections.get(collection);
|
||||
if (slices == null) {
|
||||
Map<String, DocCollection> newCollections = new LinkedHashMap<String,DocCollection>(state.getCollectionStates()); // make a shallow copy
|
||||
DocCollection coll = newCollections.get(collectionName);
|
||||
Map<String,Slice> slices;
|
||||
Map<String,Object> props;
|
||||
DocRouter router;
|
||||
|
||||
if (coll == null) {
|
||||
// TODO: is updateSlice really called on a collection that doesn't exist?
|
||||
slices = new HashMap<String, Slice>(1);
|
||||
props = defaultCollectionProps();
|
||||
router = DocRouter.DEFAULT;
|
||||
} else {
|
||||
slices = new LinkedHashMap<String, Slice>(slices); // make a shallow copy
|
||||
props = coll.getProperties();
|
||||
router = coll.getRouter();
|
||||
slices = new LinkedHashMap<String, Slice>(coll.getSlicesMap()); // make a shallow copy
|
||||
}
|
||||
slices.put(slice.getName(), slice);
|
||||
newCollections.put(collection, slices);
|
||||
slices.put(slice.getName(), slice);
|
||||
DocCollection newCollection = new DocCollection(collectionName, slices, props, router);
|
||||
newCollections.put(collectionName, newCollection);
|
||||
|
||||
// System.out.println("###!!!### NEW CLUSTERSTATE: " + JSONUtil.toJSON(newCollections));
|
||||
|
||||
return new ClusterState(state.getLiveNodes(), newCollections);
|
||||
}
|
||||
|
||||
private ClusterState setShardLeader(ClusterState state, String collection, String sliceName, String leaderUrl) {
|
||||
private ClusterState setShardLeader(ClusterState state, String collectionName, String sliceName, String leaderUrl) {
|
||||
|
||||
final Map<String, Map<String, Slice>> newStates = new LinkedHashMap<String,Map<String,Slice>>(state.getCollectionStates());
|
||||
|
||||
Map<String, Slice> slices = newStates.get(collection);
|
||||
|
||||
if(slices==null) {
|
||||
log.error("Could not mark shard leader for non existing collection:" + collection);
|
||||
final Map<String, DocCollection> newCollections = new LinkedHashMap<String,DocCollection>(state.getCollectionStates());
|
||||
DocCollection coll = newCollections.get(collectionName);
|
||||
if(coll == null) {
|
||||
log.error("Could not mark shard leader for non existing collection:" + collectionName);
|
||||
return state;
|
||||
}
|
||||
|
||||
Map<String, Slice> slices = coll.getSlicesMap();
|
||||
// make a shallow copy and add it to the new collection
|
||||
slices = new LinkedHashMap<String,Slice>(slices);
|
||||
newStates.put(collection, slices);
|
||||
|
||||
|
||||
Slice slice = slices.get(sliceName);
|
||||
if (slice == null) {
|
||||
@ -383,7 +408,11 @@ public class Overseer {
|
||||
Slice newSlice = new Slice(slice.getName(), newReplicas, slice.getProperties());
|
||||
slices.put(newSlice.getName(), newSlice);
|
||||
}
|
||||
return new ClusterState(state.getLiveNodes(), newStates);
|
||||
|
||||
|
||||
DocCollection newCollection = new DocCollection(coll.getName(), slices, coll.getProperties(), coll.getRouter());
|
||||
newCollections.put(collectionName, newCollection);
|
||||
return new ClusterState(state.getLiveNodes(), newCollections);
|
||||
}
|
||||
|
||||
/*
|
||||
@ -394,51 +423,57 @@ public class Overseer {
|
||||
final String coreNodeName = message.getStr(ZkStateReader.NODE_NAME_PROP) + "_" + message.getStr(ZkStateReader.CORE_NAME_PROP);
|
||||
final String collection = message.getStr(ZkStateReader.COLLECTION_PROP);
|
||||
|
||||
final LinkedHashMap<String, Map<String, Slice>> newStates = new LinkedHashMap<String,Map<String,Slice>>();
|
||||
for(String collectionName: clusterState.getCollections()) {
|
||||
if(collection.equals(collectionName)) {
|
||||
Map<String, Slice> slices = clusterState.getSlices(collection);
|
||||
LinkedHashMap<String, Slice> newSlices = new LinkedHashMap<String, Slice>();
|
||||
for(Slice slice: slices.values()) {
|
||||
if(slice.getReplicasMap().containsKey(coreNodeName)) {
|
||||
Map<String, Replica> newReplicas = slice.getReplicasCopy();
|
||||
newReplicas.remove(coreNodeName);
|
||||
if (newReplicas.size() != 0) {
|
||||
Slice newSlice = new Slice(slice.getName(), newReplicas,
|
||||
slice.getProperties());
|
||||
newSlices.put(slice.getName(), newSlice);
|
||||
}
|
||||
} else {
|
||||
newSlices.put(slice.getName(), slice);
|
||||
}
|
||||
}
|
||||
int cnt = 0;
|
||||
for (Slice slice : newSlices.values()) {
|
||||
cnt+=slice.getReplicasMap().size();
|
||||
}
|
||||
// TODO: if no nodes are left after this unload
|
||||
// remove from zk - do we have a race where Overseer
|
||||
// see's registered nodes and publishes though?
|
||||
if (cnt > 0) {
|
||||
newStates.put(collectionName, newSlices);
|
||||
final Map<String, DocCollection> newCollections = new LinkedHashMap<String,DocCollection>(clusterState.getCollectionStates()); // shallow copy
|
||||
DocCollection coll = newCollections.get(collection);
|
||||
if (coll == null) {
|
||||
// TODO: log/error that we didn't find it?
|
||||
return clusterState;
|
||||
}
|
||||
|
||||
Map<String, Slice> newSlices = new LinkedHashMap<String, Slice>();
|
||||
for (Slice slice : coll.getSlices()) {
|
||||
Replica replica = slice.getReplica(coreNodeName);
|
||||
if (replica != null) {
|
||||
Map<String, Replica> newReplicas = slice.getReplicasCopy();
|
||||
newReplicas.remove(coreNodeName);
|
||||
// TODO TODO TODO!!! if there are no replicas left for the slice, and the slice has no hash range, remove it
|
||||
// if (newReplicas.size() == 0 && slice.getRange() == null) {
|
||||
// if there are no replicas left for the slice remove it
|
||||
if (newReplicas.size() == 0) {
|
||||
slice = null;
|
||||
} else {
|
||||
// TODO: it might be better logically to have this in ZkController
|
||||
// but for tests (it's easier) it seems better for the moment to leave CoreContainer and/or
|
||||
// ZkController out of the Overseer.
|
||||
try {
|
||||
zkClient.clean("/collections/" + collectionName);
|
||||
} catch (InterruptedException e) {
|
||||
SolrException.log(log, "Cleaning up collection in zk was interrupted:" + collectionName, e);
|
||||
Thread.currentThread().interrupt();
|
||||
} catch (KeeperException e) {
|
||||
SolrException.log(log, "Problem cleaning up collection in zk:" + collectionName, e);
|
||||
}
|
||||
slice = new Slice(slice.getName(), newReplicas, slice.getProperties());
|
||||
}
|
||||
} else {
|
||||
newStates.put(collectionName, clusterState.getSlices(collectionName));
|
||||
}
|
||||
|
||||
if (slice != null) {
|
||||
newSlices.put(slice.getName(), slice);
|
||||
}
|
||||
}
|
||||
ClusterState newState = new ClusterState(clusterState.getLiveNodes(), newStates);
|
||||
|
||||
// if there are no slices left in the collection, remove it?
|
||||
if (newSlices.size() == 0) {
|
||||
newCollections.remove(coll.getName());
|
||||
|
||||
// TODO: it might be better logically to have this in ZkController
|
||||
// but for tests (it's easier) it seems better for the moment to leave CoreContainer and/or
|
||||
// ZkController out of the Overseer.
|
||||
try {
|
||||
zkClient.clean("/collections/" + collection);
|
||||
} catch (InterruptedException e) {
|
||||
SolrException.log(log, "Cleaning up collection in zk was interrupted:" + collection, e);
|
||||
Thread.currentThread().interrupt();
|
||||
} catch (KeeperException e) {
|
||||
SolrException.log(log, "Problem cleaning up collection in zk:" + collection, e);
|
||||
}
|
||||
|
||||
|
||||
} else {
|
||||
DocCollection newCollection = new DocCollection(coll.getName(), newSlices, coll.getProperties(), coll.getRouter());
|
||||
newCollections.put(newCollection.getName(), newCollection);
|
||||
}
|
||||
|
||||
ClusterState newState = new ClusterState(clusterState.getLiveNodes(), newCollections);
|
||||
return newState;
|
||||
}
|
||||
|
||||
|
@ -26,6 +26,7 @@ import java.util.Set;
|
||||
import org.apache.solr.common.SolrException;
|
||||
import org.apache.solr.common.SolrException.ErrorCode;
|
||||
import org.apache.solr.common.cloud.ClusterState;
|
||||
import org.apache.solr.common.cloud.DocCollection;
|
||||
import org.apache.solr.common.cloud.Replica;
|
||||
import org.apache.solr.common.cloud.Slice;
|
||||
import org.apache.solr.common.cloud.ZkNodeProps;
|
||||
@ -275,13 +276,9 @@ public class OverseerCollectionProcessor implements Runnable {
|
||||
log.info("Executing Collection Cmd : " + params);
|
||||
String name = message.getStr("name");
|
||||
|
||||
Map<String,Slice> slices = clusterState.getCollectionStates().get(name);
|
||||
|
||||
if (slices == null) {
|
||||
throw new SolrException(ErrorCode.BAD_REQUEST, "Could not find collection:" + name);
|
||||
}
|
||||
|
||||
for (Map.Entry<String,Slice> entry : slices.entrySet()) {
|
||||
DocCollection coll = clusterState.getCollection(name);
|
||||
|
||||
for (Map.Entry<String,Slice> entry : coll.getSlicesMap().entrySet()) {
|
||||
Slice slice = entry.getValue();
|
||||
Map<String,Replica> shards = slice.getReplicasMap();
|
||||
Set<Map.Entry<String,Replica>> shardEntries = shards.entrySet();
|
||||
|
@ -44,6 +44,7 @@ import org.apache.solr.client.solrj.request.CoreAdminRequest.WaitForState;
|
||||
import org.apache.solr.common.SolrException;
|
||||
import org.apache.solr.common.SolrException.ErrorCode;
|
||||
import org.apache.solr.common.cloud.ClusterState;
|
||||
import org.apache.solr.common.cloud.DocCollection;
|
||||
import org.apache.solr.common.cloud.OnReconnect;
|
||||
import org.apache.solr.common.cloud.SolrZkClient;
|
||||
import org.apache.solr.common.cloud.ZkCmdExecutor;
|
||||
@ -889,6 +890,10 @@ public final class ZkController {
|
||||
|
||||
try {
|
||||
Map<String,Object> collectionProps = new HashMap<String,Object>();
|
||||
|
||||
// set defaults
|
||||
collectionProps.put(DocCollection.DOC_ROUTER, "compositeId");
|
||||
|
||||
// TODO: if collection.configName isn't set, and there isn't already a conf in zk, just use that?
|
||||
String defaultConfigName = System.getProperty(COLLECTION_PARAM_PREFIX+CONFIGNAME_PROP, collection);
|
||||
|
||||
@ -903,8 +908,10 @@ public final class ZkController {
|
||||
}
|
||||
|
||||
// if the config name wasn't passed in, use the default
|
||||
if (!collectionProps.containsKey(CONFIGNAME_PROP))
|
||||
if (!collectionProps.containsKey(CONFIGNAME_PROP)) {
|
||||
// TODO: getting the configName from the collectionPath should fail since we already know it doesn't exist?
|
||||
getConfName(collection, collectionPath, collectionProps);
|
||||
}
|
||||
|
||||
} else if(System.getProperty("bootstrap_confdir") != null) {
|
||||
// if we are bootstrapping a collection, default the config for
|
||||
@ -928,7 +935,6 @@ public final class ZkController {
|
||||
} else {
|
||||
getConfName(collection, collectionPath, collectionProps);
|
||||
}
|
||||
|
||||
ZkNodeProps zkProps = new ZkNodeProps(collectionProps);
|
||||
zkClient.makePath(collectionPath, ZkStateReader.toJSON(zkProps), CreateMode.PERSISTENT, null, true);
|
||||
|
||||
|
@ -39,7 +39,8 @@ import org.apache.solr.cloud.ZkController;
|
||||
import org.apache.solr.common.SolrException;
|
||||
import org.apache.solr.common.SolrException.ErrorCode;
|
||||
import org.apache.solr.common.cloud.ClusterState;
|
||||
import org.apache.solr.common.cloud.HashPartitioner;
|
||||
import org.apache.solr.common.cloud.DocRouter;
|
||||
import org.apache.solr.common.cloud.DocRouter;
|
||||
import org.apache.solr.common.cloud.Slice;
|
||||
import org.apache.solr.common.cloud.ZkNodeProps;
|
||||
import org.apache.solr.common.cloud.ZkStateReader;
|
||||
@ -216,7 +217,7 @@ public class CoreAdminHandler extends RequestHandlerBase {
|
||||
}
|
||||
|
||||
/** Creates a new core and registers it. The returned core will have it's reference count incremented an extra time and close() should be called when finished. */
|
||||
private SolrCore createCore(SolrCore current, int ord, HashPartitioner.Range newRange) throws IOException, SAXException, ParserConfigurationException {
|
||||
private SolrCore createCore(SolrCore current, int ord, DocRouter.Range newRange) throws IOException, SAXException, ParserConfigurationException {
|
||||
CoreDescriptor currCoreD = current.getCoreDescriptor();
|
||||
CloudDescriptor currCloudD = currCoreD.getCloudDescriptor();
|
||||
|
||||
@ -268,7 +269,7 @@ public class CoreAdminHandler extends RequestHandlerBase {
|
||||
// partitions=N (split into N partitions, leaving it up to solr what the ranges are and where to put them)
|
||||
// path - multiValued param, or comma separated param? Only creates indexes, not cores
|
||||
|
||||
List<HashPartitioner.Range> ranges = null;
|
||||
List<DocRouter.Range> ranges = null;
|
||||
// boolean closeDirectories = true;
|
||||
// DirectoryFactory dirFactory = null;
|
||||
|
||||
@ -290,9 +291,9 @@ public class CoreAdminHandler extends RequestHandlerBase {
|
||||
// split on every other doc rather than hash.
|
||||
|
||||
// TODO (cloud): get from the current core
|
||||
HashPartitioner.Range currentRange = new HashPartitioner.Range(Integer.MIN_VALUE, Integer.MAX_VALUE);
|
||||
DocRouter.Range currentRange = new DocRouter.Range(Integer.MIN_VALUE, Integer.MAX_VALUE);
|
||||
|
||||
HashPartitioner hp = new HashPartitioner();
|
||||
DocRouter hp = new DocRouter();
|
||||
ranges = hp.partitionRange(partitions, currentRange);
|
||||
|
||||
if (pathsArr == null) {
|
||||
|
@ -302,12 +302,12 @@ public class HttpShardHandler extends ShardHandler {
|
||||
// cloud state and add them to the Map 'slices'.
|
||||
for (int i = 0; i < collectionList.size(); i++) {
|
||||
String collection = collectionList.get(i);
|
||||
ClientUtils.appendMap(collection, slices, clusterState.getSlices(collection));
|
||||
ClientUtils.appendMap(collection, slices, clusterState.getSlicesMap(collection));
|
||||
}
|
||||
} else {
|
||||
// If no collections were specified, default to the collection for
|
||||
// this core.
|
||||
slices = clusterState.getSlices(cloudDescriptor.getCollectionName());
|
||||
slices = clusterState.getSlicesMap(cloudDescriptor.getCollectionName());
|
||||
if (slices == null) {
|
||||
throw new SolrException(ErrorCode.BAD_REQUEST,
|
||||
"Could not find collection:"
|
||||
@ -337,7 +337,7 @@ public class HttpShardHandler extends ShardHandler {
|
||||
if (rb.shards[i] == null) {
|
||||
if (clusterState == null) {
|
||||
clusterState = zkController.getClusterState();
|
||||
slices = clusterState.getSlices(cloudDescriptor.getCollectionName());
|
||||
slices = clusterState.getSlicesMap(cloudDescriptor.getCollectionName());
|
||||
}
|
||||
String sliceName = rb.slices[i];
|
||||
|
||||
|
@ -66,7 +66,7 @@ PARSER_END(QueryParser)
|
||||
"[", "]", "\"", "{", "}", "~", "*", "?", "\\", "/" ]
|
||||
| <_ESCAPED_CHAR> ) >
|
||||
| <#_TERM_CHAR: ( <_TERM_START_CHAR>
|
||||
| <_ESCAPED_CHAR> | "-" | "+" | "/" ) >
|
||||
| <_ESCAPED_CHAR> | "-" | "+" | "/" | "!") >
|
||||
| <#_WHITESPACE: ( " " | "\t" | "\n" | "\r" | "\u3000") >
|
||||
| <#_QUOTED_CHAR: ( ~[ "\"", "\\" ] | <_ESCAPED_CHAR> ) >
|
||||
| <#_SQUOTED_CHAR: ( ~[ "'", "\\" ] | <_ESCAPED_CHAR> ) >
|
||||
|
@ -106,7 +106,7 @@ private int jjMoveNfa_2(int startState, int curPos)
|
||||
{
|
||||
case 61:
|
||||
case 27:
|
||||
if ((0xfbfffcf8ffffd9ffL & l) == 0L)
|
||||
if ((0xfbfffcfaffffd9ffL & l) == 0L)
|
||||
break;
|
||||
if (kind > 24)
|
||||
kind = 24;
|
||||
@ -296,7 +296,7 @@ private int jjMoveNfa_2(int startState, int curPos)
|
||||
jjCheckNAddStates(6, 10);
|
||||
break;
|
||||
case 54:
|
||||
if ((0x7bfff8f8ffffd9ffL & l) == 0L)
|
||||
if ((0x7bfff8faffffd9ffL & l) == 0L)
|
||||
break;
|
||||
if (kind > 21)
|
||||
kind = 21;
|
||||
@ -308,7 +308,7 @@ private int jjMoveNfa_2(int startState, int curPos)
|
||||
jjCheckNAddTwoStates(54, 55);
|
||||
break;
|
||||
case 57:
|
||||
if ((0x7bfff8f8ffffd9ffL & l) != 0L)
|
||||
if ((0x7bfff8faffffd9ffL & l) != 0L)
|
||||
jjCheckNAddStates(36, 38);
|
||||
break;
|
||||
case 59:
|
||||
|
@ -317,7 +317,7 @@ public class SolrDispatchFilter implements Filter
|
||||
ZkStateReader zkStateReader = cores.getZkController().getZkStateReader();
|
||||
|
||||
ClusterState clusterState = zkStateReader.getClusterState();
|
||||
Map<String,Slice> slices = clusterState.getSlices(collection);
|
||||
Map<String,Slice> slices = clusterState.getSlicesMap(collection);
|
||||
if (slices == null) {
|
||||
return null;
|
||||
}
|
||||
|
@ -30,7 +30,7 @@ import org.apache.lucene.util.Bits;
|
||||
import org.apache.lucene.util.BytesRef;
|
||||
import org.apache.lucene.util.IOUtils;
|
||||
import org.apache.lucene.util.OpenBitSet;
|
||||
import org.apache.solr.common.cloud.HashPartitioner;
|
||||
import org.apache.solr.common.cloud.DocRouter;
|
||||
import org.apache.solr.common.util.Hash;
|
||||
import org.apache.solr.core.SolrCore;
|
||||
import org.apache.solr.schema.SchemaField;
|
||||
@ -48,8 +48,8 @@ public class SolrIndexSplitter {
|
||||
|
||||
SolrIndexSearcher searcher;
|
||||
SchemaField field;
|
||||
List<HashPartitioner.Range> ranges;
|
||||
HashPartitioner.Range[] rangesArr; // same as ranges list, but an array for extra speed in inner loops
|
||||
List<DocRouter.Range> ranges;
|
||||
DocRouter.Range[] rangesArr; // same as ranges list, but an array for extra speed in inner loops
|
||||
List<String> paths;
|
||||
List<SolrCore> cores;
|
||||
|
||||
@ -57,7 +57,7 @@ public class SolrIndexSplitter {
|
||||
field = cmd.getReq().getSchema().getUniqueKeyField();
|
||||
searcher = cmd.getReq().getSearcher();
|
||||
ranges = cmd.ranges;
|
||||
rangesArr = ranges.toArray(new HashPartitioner.Range[ranges.size()]);
|
||||
rangesArr = ranges.toArray(new DocRouter.Range[ranges.size()]);
|
||||
paths = cmd.paths;
|
||||
cores = cmd.cores;
|
||||
}
|
||||
|
@ -17,7 +17,7 @@
|
||||
|
||||
package org.apache.solr.update;
|
||||
|
||||
import org.apache.solr.common.cloud.HashPartitioner;
|
||||
import org.apache.solr.common.cloud.DocRouter;
|
||||
import org.apache.solr.core.SolrCore;
|
||||
import org.apache.solr.request.SolrQueryRequest;
|
||||
|
||||
@ -33,10 +33,10 @@ public class SplitIndexCommand extends UpdateCommand {
|
||||
// public List<Directory> dirs;
|
||||
public List<String> paths;
|
||||
public List<SolrCore> cores; // either paths or cores should be specified
|
||||
public List<HashPartitioner.Range> ranges;
|
||||
public List<DocRouter.Range> ranges;
|
||||
// TODO: allow specification of custom hash function
|
||||
|
||||
public SplitIndexCommand(SolrQueryRequest req, List<String> paths, List<SolrCore> cores, List<HashPartitioner.Range> ranges) {
|
||||
public SplitIndexCommand(SolrQueryRequest req, List<String> paths, List<SolrCore> cores, List<DocRouter.Range> ranges) {
|
||||
super(req);
|
||||
this.paths = paths;
|
||||
this.cores = cores;
|
||||
|
@ -37,6 +37,8 @@ import org.apache.solr.common.SolrException.ErrorCode;
|
||||
import org.apache.solr.common.SolrInputDocument;
|
||||
import org.apache.solr.common.SolrInputField;
|
||||
import org.apache.solr.common.cloud.ClusterState;
|
||||
import org.apache.solr.common.cloud.DocCollection;
|
||||
import org.apache.solr.common.cloud.DocRouter;
|
||||
import org.apache.solr.common.cloud.Replica;
|
||||
import org.apache.solr.common.cloud.Slice;
|
||||
import org.apache.solr.common.cloud.ZkCoreNodeProps;
|
||||
@ -248,6 +250,173 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
|
||||
return nodes;
|
||||
}
|
||||
|
||||
|
||||
// trying to incrementally get things to pass
|
||||
private List<Node> setupRequest2(int hash, String id, SolrInputDocument doc) {
|
||||
List<Node> nodes = null;
|
||||
|
||||
// if we are in zk mode...
|
||||
if (zkEnabled) {
|
||||
|
||||
//////
|
||||
ClusterState cstate = zkController.getClusterState();
|
||||
numNodes = cstate.getLiveNodes().size();
|
||||
DocCollection coll = cstate.getCollection(collection);
|
||||
Slice slice = coll.getRouter().getTargetShard(id, doc, req.getParams(), coll);
|
||||
|
||||
Replica leader = slice.getLeader();
|
||||
|
||||
|
||||
|
||||
//////
|
||||
|
||||
// set num nodes
|
||||
numNodes = cstate.getLiveNodes().size();
|
||||
|
||||
String shardId = getShard(hash, collection, zkController.getClusterState()); // get the right shard based on the hash...
|
||||
|
||||
if (shardId != slice.getName()) {
|
||||
System.out.println("######################## shardId="+shardId + " slice="+slice + " cstate=" + cstate);
|
||||
}
|
||||
|
||||
try {
|
||||
ZkCoreNodeProps leaderProps = new ZkCoreNodeProps(zkController.getZkStateReader().getLeaderProps(
|
||||
collection, shardId));
|
||||
|
||||
String leaderNodeName = leaderProps.getCoreNodeName();
|
||||
String coreName = req.getCore().getName();
|
||||
String coreNodeName = zkController.getNodeName() + "_" + coreName;
|
||||
isLeader = coreNodeName.equals(leaderNodeName);
|
||||
|
||||
DistribPhase phase =
|
||||
DistribPhase.parseParam(req.getParams().get(DISTRIB_UPDATE_PARAM));
|
||||
|
||||
doDefensiveChecks(shardId, phase);
|
||||
|
||||
|
||||
if (DistribPhase.FROMLEADER == phase) {
|
||||
// we are coming from the leader, just go local - add no urls
|
||||
forwardToLeader = false;
|
||||
} else if (isLeader) {
|
||||
// that means I want to forward onto my replicas...
|
||||
// so get the replicas...
|
||||
forwardToLeader = false;
|
||||
List<ZkCoreNodeProps> replicaProps = zkController.getZkStateReader()
|
||||
.getReplicaProps(collection, shardId, zkController.getNodeName(),
|
||||
coreName, null, ZkStateReader.DOWN);
|
||||
if (replicaProps != null) {
|
||||
nodes = new ArrayList<Node>(replicaProps.size());
|
||||
// check for test param that lets us miss replicas
|
||||
String[] skipList = req.getParams().getParams("test.distrib.skip.servers");
|
||||
Set<String> skipListSet = null;
|
||||
if (skipList != null) {
|
||||
skipListSet = new HashSet<String>(skipList.length);
|
||||
skipListSet.addAll(Arrays.asList(skipList));
|
||||
}
|
||||
|
||||
for (ZkCoreNodeProps props : replicaProps) {
|
||||
if (skipList != null) {
|
||||
if (!skipListSet.contains(props.getCoreUrl())) {
|
||||
nodes.add(new StdNode(props));
|
||||
}
|
||||
} else {
|
||||
nodes.add(new StdNode(props));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
} else {
|
||||
// I need to forward onto the leader...
|
||||
nodes = new ArrayList<Node>(1);
|
||||
nodes.add(new RetryNode(leaderProps, zkController.getZkStateReader(), collection, shardId));
|
||||
forwardToLeader = true;
|
||||
}
|
||||
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "",
|
||||
e);
|
||||
}
|
||||
}
|
||||
|
||||
return nodes;
|
||||
}
|
||||
|
||||
|
||||
// use old code for now
|
||||
private List<Node> setupRequest(String id, SolrInputDocument doc) {
|
||||
// return setupRequest2(DocRouter.DEFAULT.shardHash(id, null, null), id, doc);
|
||||
return setupRequest(DocRouter.DEFAULT.shardHash(id, null, null));
|
||||
}
|
||||
|
||||
|
||||
private List<Node> setupRequestX(String id, SolrInputDocument doc) {
|
||||
List<Node> nodes = null;
|
||||
|
||||
// if we are in zk mode...
|
||||
if (zkEnabled) {
|
||||
// set num nodes
|
||||
ClusterState cstate = zkController.getClusterState();
|
||||
numNodes = cstate.getLiveNodes().size();
|
||||
DocCollection coll = cstate.getCollection(collection);
|
||||
Slice slice = coll.getRouter().getTargetShard(id, doc, req.getParams(), coll);
|
||||
|
||||
Replica leader = slice.getLeader();
|
||||
|
||||
String coreName = req.getCore().getName();
|
||||
String coreNodeName = zkController.getNodeName() + "_" + coreName;
|
||||
isLeader = coreNodeName.equals(leader.getName()); // is this me?
|
||||
|
||||
DistribPhase phase =
|
||||
DistribPhase.parseParam(req.getParams().get(DISTRIB_UPDATE_PARAM));
|
||||
|
||||
doDefensiveChecks(slice.getName(), phase);
|
||||
|
||||
|
||||
if (DistribPhase.FROMLEADER == phase) {
|
||||
// we are coming from the leader, just go local - add no urls
|
||||
forwardToLeader = false;
|
||||
} else if (isLeader) {
|
||||
// that means I want to forward onto my replicas...
|
||||
// so get the replicas...
|
||||
forwardToLeader = false;
|
||||
List<ZkCoreNodeProps> replicaProps = zkController.getZkStateReader()
|
||||
.getReplicaProps(collection, slice.getName(), zkController.getNodeName(),
|
||||
coreName, null, ZkStateReader.DOWN);
|
||||
if (replicaProps != null) {
|
||||
nodes = new ArrayList<Node>(replicaProps.size());
|
||||
// check for test param that lets us miss replicas
|
||||
String[] skipList = req.getParams().getParams("test.distrib.skip.servers");
|
||||
Set<String> skipListSet = null;
|
||||
if (skipList != null) {
|
||||
skipListSet = new HashSet<String>(skipList.length);
|
||||
skipListSet.addAll(Arrays.asList(skipList));
|
||||
}
|
||||
|
||||
for (ZkCoreNodeProps props : replicaProps) {
|
||||
if (skipList != null) {
|
||||
if (!skipListSet.contains(props.getCoreUrl())) {
|
||||
nodes.add(new StdNode(props));
|
||||
}
|
||||
} else {
|
||||
nodes.add(new StdNode(props));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
} else {
|
||||
// I need to forward onto the leader...
|
||||
nodes = new ArrayList<Node>(1);
|
||||
nodes.add(new RetryNode(new ZkCoreNodeProps(leader), zkController.getZkStateReader(), collection, leader.getName()));
|
||||
forwardToLeader = true;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
return nodes;
|
||||
}
|
||||
|
||||
|
||||
private void doDefensiveChecks(String shardId, DistribPhase phase) {
|
||||
String from = req.getParams().get("distrib.from");
|
||||
boolean logReplay = req.getParams().getBool(LOG_REPLAY, false);
|
||||
@ -315,7 +484,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
|
||||
if (zkEnabled) {
|
||||
zkCheck();
|
||||
hash = hash(cmd);
|
||||
nodes = setupRequest(hash);
|
||||
nodes = setupRequest(cmd.getHashableId(), cmd.getSolrInputDocument());
|
||||
} else {
|
||||
isLeader = getNonZkLeaderAssumption(req);
|
||||
}
|
||||
@ -668,11 +837,9 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
|
||||
return;
|
||||
}
|
||||
|
||||
int hash = 0;
|
||||
if (zkEnabled) {
|
||||
zkCheck();
|
||||
hash = hash(cmd);
|
||||
nodes = setupRequest(hash);
|
||||
nodes = setupRequest(cmd.getId(), null);
|
||||
} else {
|
||||
isLeader = getNonZkLeaderAssumption(req);
|
||||
}
|
||||
@ -751,7 +918,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
|
||||
if (zkEnabled && DistribPhase.NONE == phase) {
|
||||
boolean leaderForAnyShard = false; // start off by assuming we are not a leader for any shard
|
||||
|
||||
Map<String,Slice> slices = zkController.getClusterState().getSlices(collection);
|
||||
Map<String,Slice> slices = zkController.getClusterState().getSlicesMap(collection);
|
||||
if (slices == null) {
|
||||
throw new SolrException(ErrorCode.BAD_REQUEST,
|
||||
"Cannot find collection:" + collection + " in "
|
||||
@ -1058,7 +1225,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
|
||||
ClusterState clusterState = req.getCore().getCoreDescriptor()
|
||||
.getCoreContainer().getZkController().getClusterState();
|
||||
List<Node> urls = new ArrayList<Node>();
|
||||
Map<String,Slice> slices = clusterState.getSlices(collection);
|
||||
Map<String,Slice> slices = clusterState.getSlicesMap(collection);
|
||||
if (slices == null) {
|
||||
throw new ZooKeeperException(ErrorCode.BAD_REQUEST,
|
||||
"Could not find collection in zk: " + clusterState);
|
||||
|
@ -60,6 +60,7 @@ import org.apache.solr.common.SolrException;
|
||||
import org.apache.solr.common.SolrException.ErrorCode;
|
||||
import org.apache.solr.common.SolrInputDocument;
|
||||
import org.apache.solr.common.cloud.ClusterState;
|
||||
import org.apache.solr.common.cloud.DocCollection;
|
||||
import org.apache.solr.common.cloud.Replica;
|
||||
import org.apache.solr.common.cloud.Slice;
|
||||
import org.apache.solr.common.cloud.ZkCoreNodeProps;
|
||||
@ -440,7 +441,7 @@ public class BasicDistributedZkTest extends AbstractFullDistribZkTestBase {
|
||||
|
||||
zkStateReader.updateClusterState(true);
|
||||
|
||||
int slices = zkStateReader.getClusterState().getCollectionStates().get("unloadcollection").size();
|
||||
int slices = zkStateReader.getClusterState().getCollectionStates().get("unloadcollection").getSlices().size();
|
||||
assertEquals(1, slices);
|
||||
|
||||
client = clients.get(1);
|
||||
@ -455,7 +456,7 @@ public class BasicDistributedZkTest extends AbstractFullDistribZkTestBase {
|
||||
server.request(createCmd);
|
||||
|
||||
zkStateReader.updateClusterState(true);
|
||||
slices = zkStateReader.getClusterState().getCollectionStates().get("unloadcollection").size();
|
||||
slices = zkStateReader.getClusterState().getCollectionStates().get("unloadcollection").getSlices().size();
|
||||
assertEquals(1, slices);
|
||||
|
||||
waitForRecoveriesToFinish("unloadcollection", zkStateReader, false);
|
||||
@ -922,10 +923,10 @@ public class BasicDistributedZkTest extends AbstractFullDistribZkTestBase {
|
||||
|
||||
private void collectStartTimes(String collectionName,
|
||||
Map<String,Long> urlToTime) throws SolrServerException, IOException {
|
||||
Map<String,Map<String,Slice>> collections = solrj.getZkStateReader()
|
||||
Map<String,DocCollection> collections = solrj.getZkStateReader()
|
||||
.getClusterState().getCollectionStates();
|
||||
if (collections.containsKey(collectionName)) {
|
||||
Map<String,Slice> slices = collections.get(collectionName);
|
||||
Map<String,Slice> slices = collections.get(collectionName).getSlicesMap();
|
||||
|
||||
Iterator<Entry<String,Slice>> it = slices.entrySet().iterator();
|
||||
while (it.hasNext()) {
|
||||
@ -951,7 +952,7 @@ public class BasicDistributedZkTest extends AbstractFullDistribZkTestBase {
|
||||
|
||||
private String getUrlFromZk(String collection) {
|
||||
ClusterState clusterState = solrj.getZkStateReader().getClusterState();
|
||||
Map<String,Slice> slices = clusterState.getCollectionStates().get(collection);
|
||||
Map<String,Slice> slices = clusterState.getCollectionStates().get(collection).getSlicesMap();
|
||||
|
||||
if (slices == null) {
|
||||
throw new SolrException(ErrorCode.BAD_REQUEST, "Could not find collection:" + collection);
|
||||
@ -1015,10 +1016,10 @@ public class BasicDistributedZkTest extends AbstractFullDistribZkTestBase {
|
||||
boolean sliceMatch = false;
|
||||
while (System.currentTimeMillis() < timeoutAt) {
|
||||
ClusterState clusterState = solrj.getZkStateReader().getClusterState();
|
||||
Map<String,Map<String,Slice>> collections = clusterState
|
||||
Map<String,DocCollection> collections = clusterState
|
||||
.getCollectionStates();
|
||||
if (collections.containsKey(collectionName)) {
|
||||
Map<String,Slice> slices = collections.get(collectionName);
|
||||
Map<String,Slice> slices = collections.get(collectionName).getSlicesMap();
|
||||
// did we find expectedSlices slices/shards?
|
||||
if (slices.size() == expectedSlices) {
|
||||
sliceMatch = true;
|
||||
@ -1044,7 +1045,7 @@ public class BasicDistributedZkTest extends AbstractFullDistribZkTestBase {
|
||||
while (System.currentTimeMillis() < timeoutAt) {
|
||||
solrj.getZkStateReader().updateClusterState(true);
|
||||
ClusterState clusterState = solrj.getZkStateReader().getClusterState();
|
||||
Map<String,Map<String,Slice>> collections = clusterState
|
||||
Map<String,DocCollection> collections = clusterState
|
||||
.getCollectionStates();
|
||||
if (!collections.containsKey(collectionName)) {
|
||||
found = false;
|
||||
@ -1107,6 +1108,7 @@ public class BasicDistributedZkTest extends AbstractFullDistribZkTestBase {
|
||||
|
||||
// cloud level test mainly needed just to make sure that versions and errors are propagated correctly
|
||||
private void doOptimisticLockingAndUpdating() throws Exception {
|
||||
log.info("### STARTING doOptimisticLockingAndUpdating");
|
||||
printLayout();
|
||||
|
||||
SolrInputDocument sd = sdoc("id", 1000, "_version_", -1);
|
||||
@ -1147,6 +1149,7 @@ public class BasicDistributedZkTest extends AbstractFullDistribZkTestBase {
|
||||
|
||||
private void testNumberOfCommitsWithCommitAfterAdd()
|
||||
throws SolrServerException, IOException {
|
||||
log.info("### STARTING testNumberOfCommitsWithCommitAfterAdd");
|
||||
long startCommits = getNumCommits((HttpSolrServer) clients.get(0));
|
||||
|
||||
ContentStreamUpdateRequest up = new ContentStreamUpdateRequest("/update");
|
||||
@ -1178,6 +1181,7 @@ public class BasicDistributedZkTest extends AbstractFullDistribZkTestBase {
|
||||
}
|
||||
|
||||
private void testANewCollectionInOneInstanceWithManualShardAssignement() throws Exception {
|
||||
log.info("### STARTING testANewCollectionInOneInstanceWithManualShardAssignement");
|
||||
System.clearProperty("numShards");
|
||||
List<SolrServer> collectionClients = new ArrayList<SolrServer>();
|
||||
SolrServer client = clients.get(0);
|
||||
@ -1243,7 +1247,7 @@ public class BasicDistributedZkTest extends AbstractFullDistribZkTestBase {
|
||||
// we added a role of none on these creates - check for it
|
||||
ZkStateReader zkStateReader = solrj.getZkStateReader();
|
||||
zkStateReader.updateClusterState(true);
|
||||
Map<String,Slice> slices = zkStateReader.getClusterState().getSlices(oneInstanceCollection2);
|
||||
Map<String,Slice> slices = zkStateReader.getClusterState().getSlicesMap(oneInstanceCollection2);
|
||||
assertNotNull(slices);
|
||||
String roles = slices.get("slice1").getReplicasMap().values().iterator().next().getStr(ZkStateReader.ROLES_PROP);
|
||||
assertEquals("none", roles);
|
||||
@ -1271,6 +1275,7 @@ public class BasicDistributedZkTest extends AbstractFullDistribZkTestBase {
|
||||
}
|
||||
|
||||
private void testSearchByCollectionName() throws SolrServerException {
|
||||
log.info("### STARTING testSearchByCollectionName");
|
||||
SolrServer client = clients.get(0);
|
||||
final String baseUrl = ((HttpSolrServer) client).getBaseURL().substring(
|
||||
0,
|
||||
@ -1286,6 +1291,7 @@ public class BasicDistributedZkTest extends AbstractFullDistribZkTestBase {
|
||||
}
|
||||
|
||||
private void testANewCollectionInOneInstance() throws Exception {
|
||||
log.info("### STARTING testANewCollectionInOneInstance");
|
||||
List<SolrServer> collectionClients = new ArrayList<SolrServer>();
|
||||
SolrServer client = clients.get(0);
|
||||
otherCollectionClients.put(oneInstanceCollection , collectionClients);
|
||||
@ -1380,6 +1386,7 @@ public class BasicDistributedZkTest extends AbstractFullDistribZkTestBase {
|
||||
}
|
||||
|
||||
private void testMultipleCollections() throws Exception {
|
||||
log.info("### STARTING testMultipleCollections");
|
||||
// create another 2 collections and search across them
|
||||
createNewCollection("collection2");
|
||||
createNewCollection("collection3");
|
||||
|
@ -24,6 +24,9 @@ import java.util.Set;
|
||||
|
||||
import org.apache.solr.SolrTestCaseJ4;
|
||||
import org.apache.solr.common.cloud.ClusterState;
|
||||
import org.apache.solr.common.cloud.DocCollection;
|
||||
import org.apache.solr.common.cloud.DocRouter;
|
||||
import org.apache.solr.common.cloud.DocRouter;
|
||||
import org.apache.solr.common.cloud.Slice;
|
||||
import org.apache.solr.common.cloud.Replica;
|
||||
import org.apache.solr.common.cloud.ZkStateReader;
|
||||
@ -32,7 +35,7 @@ import org.junit.Test;
|
||||
public class ClusterStateTest extends SolrTestCaseJ4 {
|
||||
@Test
|
||||
public void testStoreAndRead() throws Exception {
|
||||
Map<String,Map<String,Slice>> collectionStates = new HashMap<String,Map<String,Slice>>();
|
||||
Map<String,DocCollection> collectionStates = new HashMap<String,DocCollection>();
|
||||
Set<String> liveNodes = new HashSet<String>();
|
||||
liveNodes.add("node1");
|
||||
liveNodes.add("node2");
|
||||
@ -49,12 +52,12 @@ public class ClusterStateTest extends SolrTestCaseJ4 {
|
||||
slices.put("shard1", slice);
|
||||
Slice slice2 = new Slice("shard2", sliceToProps, null);
|
||||
slices.put("shard2", slice2);
|
||||
collectionStates.put("collection1", slices);
|
||||
collectionStates.put("collection2", slices);
|
||||
collectionStates.put("collection1", new DocCollection("collection1", slices, null, DocRouter.DEFAULT));
|
||||
collectionStates.put("collection2", new DocCollection("collection2", slices, null, DocRouter.DEFAULT));
|
||||
|
||||
ClusterState clusterState = new ClusterState(liveNodes, collectionStates);
|
||||
byte[] bytes = ZkStateReader.toJSON(clusterState);
|
||||
|
||||
// System.out.println("#################### " + new String(bytes));
|
||||
ClusterState loadedClusterState = ClusterState.load(null, bytes, liveNodes);
|
||||
|
||||
assertEquals("Provided liveNodes not used properly", 2, loadedClusterState
|
||||
|
@ -171,7 +171,7 @@ public class ClusterStateUpdateTest extends SolrTestCaseJ4 {
|
||||
Map<String,Slice> slices = null;
|
||||
for (int i = 75; i > 0; i--) {
|
||||
clusterState2 = zkController2.getClusterState();
|
||||
slices = clusterState2.getSlices("testcore");
|
||||
slices = clusterState2.getSlicesMap("testcore");
|
||||
|
||||
if (slices != null && slices.containsKey("shard1")
|
||||
&& slices.get("shard1").getReplicasMap().size() > 0) {
|
||||
|
@ -35,6 +35,7 @@ import javax.xml.parsers.ParserConfigurationException;
|
||||
import org.apache.lucene.util.LuceneTestCase.Slow;
|
||||
import org.apache.solr.SolrTestCaseJ4;
|
||||
import org.apache.solr.common.cloud.ClusterState;
|
||||
import org.apache.solr.common.cloud.DocCollection;
|
||||
import org.apache.solr.common.cloud.Slice;
|
||||
import org.apache.solr.common.cloud.SolrZkClient;
|
||||
import org.apache.solr.common.cloud.ZkNodeProps;
|
||||
@ -148,7 +149,7 @@ public class OverseerTest extends SolrTestCaseJ4 {
|
||||
}
|
||||
|
||||
private String getShardId(final String coreName) {
|
||||
Map<String,Slice> slices = zkStateReader.getClusterState().getSlices(
|
||||
Map<String,Slice> slices = zkStateReader.getClusterState().getSlicesMap(
|
||||
collection);
|
||||
if (slices != null) {
|
||||
for (Slice slice : slices.values()) {
|
||||
@ -301,7 +302,7 @@ public class OverseerTest extends SolrTestCaseJ4 {
|
||||
cloudStateSliceCount = 0;
|
||||
reader.updateClusterState(true);
|
||||
ClusterState state = reader.getClusterState();
|
||||
Map<String,Slice> slices = state.getSlices("collection1");
|
||||
Map<String,Slice> slices = state.getSlicesMap("collection1");
|
||||
for (String name : slices.keySet()) {
|
||||
cloudStateSliceCount += slices.get(name).getReplicasMap().size();
|
||||
}
|
||||
@ -712,8 +713,8 @@ public class OverseerTest extends SolrTestCaseJ4 {
|
||||
ClusterState state = reader.getClusterState();
|
||||
|
||||
int numFound = 0;
|
||||
for (Map<String,Slice> collection : state.getCollectionStates().values()) {
|
||||
for (Slice slice : collection.values()) {
|
||||
for (DocCollection collection : state.getCollectionStates().values()) {
|
||||
for (Slice slice : collection.getSlices()) {
|
||||
if (slice.getReplicasMap().get("node1_core1") != null) {
|
||||
numFound++;
|
||||
}
|
||||
|
@ -20,13 +20,14 @@ package org.apache.solr.cloud;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.solr.SolrTestCaseJ4;
|
||||
import org.apache.solr.common.cloud.HashPartitioner;
|
||||
import org.apache.solr.common.cloud.HashPartitioner.Range;
|
||||
import org.apache.solr.common.cloud.DocRouter;
|
||||
import org.apache.solr.common.cloud.DocRouter;
|
||||
import org.apache.solr.common.cloud.DocRouter.Range;
|
||||
|
||||
public class TestHashPartitioner extends SolrTestCaseJ4 {
|
||||
|
||||
public void testMapHashes() throws Exception {
|
||||
HashPartitioner hp = new HashPartitioner();
|
||||
DocRouter hp = new DocRouter();
|
||||
List<Range> ranges;
|
||||
|
||||
// make sure the partitioner uses the "natural" boundaries and doesn't suffer from an off-by-one
|
||||
|
@ -197,7 +197,7 @@ public class CloudSolrServer extends SolrServer {
|
||||
Map<String,Slice> slices = new HashMap<String,Slice>();
|
||||
for (int i = 0; i < collectionList.size(); i++) {
|
||||
String coll= collectionList.get(i);
|
||||
ClientUtils.appendMap(coll, slices, clusterState.getSlices(coll));
|
||||
ClientUtils.appendMap(coll, slices, clusterState.getSlicesMap(coll));
|
||||
}
|
||||
|
||||
Set<String> liveNodes = clusterState.getLiveNodes();
|
||||
|
@ -18,6 +18,7 @@ package org.apache.solr.common.cloud;
|
||||
*/
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
@ -30,7 +31,7 @@ import java.util.Set;
|
||||
import org.apache.noggit.JSONWriter;
|
||||
import org.apache.solr.common.SolrException;
|
||||
import org.apache.solr.common.SolrException.ErrorCode;
|
||||
import org.apache.solr.common.cloud.HashPartitioner.Range;
|
||||
import org.apache.solr.common.cloud.DocRouter.Range;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
import org.apache.zookeeper.data.Stat;
|
||||
import org.slf4j.Logger;
|
||||
@ -39,17 +40,16 @@ import org.slf4j.LoggerFactory;
|
||||
/**
|
||||
* Immutable state of the cloud. Normally you can get the state by using
|
||||
* {@link ZkStateReader#getClusterState()}.
|
||||
* @lucene.experimental
|
||||
*/
|
||||
public class ClusterState implements JSONWriter.Writable {
|
||||
private static Logger log = LoggerFactory.getLogger(ClusterState.class);
|
||||
|
||||
private Integer zkClusterStateVersion;
|
||||
|
||||
private final Map<String, Map<String,Slice>> collectionStates; // Map<collectionName, Map<sliceName,Slice>>
|
||||
private final Map<String, DocCollection> collectionStates; // Map<collectionName, Map<sliceName,Slice>>
|
||||
private final Set<String> liveNodes;
|
||||
|
||||
private final HashPartitioner hp = new HashPartitioner();
|
||||
|
||||
|
||||
private final Map<String,RangeInfo> rangeInfos = new HashMap<String,RangeInfo>();
|
||||
private final Map<String,Map<String,ZkNodeProps>> leaders = new HashMap<String,Map<String,ZkNodeProps>>();
|
||||
|
||||
@ -61,7 +61,7 @@ public class ClusterState implements JSONWriter.Writable {
|
||||
* hashCode and equals will only depend on liveNodes and not clusterStateVersion.
|
||||
*/
|
||||
public ClusterState(Set<String> liveNodes,
|
||||
Map<String, Map<String,Slice>> collectionStates) {
|
||||
Map<String, DocCollection> collectionStates) {
|
||||
this(null, liveNodes, collectionStates);
|
||||
}
|
||||
|
||||
@ -69,60 +69,41 @@ public class ClusterState implements JSONWriter.Writable {
|
||||
* Use this constr when ClusterState is meant for consumption.
|
||||
*/
|
||||
public ClusterState(Integer zkClusterStateVersion, Set<String> liveNodes,
|
||||
Map<String, Map<String,Slice>> collectionStates) {
|
||||
Map<String, DocCollection> collectionStates) {
|
||||
this.zkClusterStateVersion = zkClusterStateVersion;
|
||||
this.liveNodes = new HashSet<String>(liveNodes.size());
|
||||
this.liveNodes.addAll(liveNodes);
|
||||
this.collectionStates = new HashMap<String, Map<String,Slice>>(collectionStates.size());
|
||||
this.collectionStates = new HashMap<String, DocCollection>(collectionStates.size());
|
||||
this.collectionStates.putAll(collectionStates);
|
||||
addRangeInfos(collectionStates.keySet());
|
||||
getShardLeaders();
|
||||
}
|
||||
|
||||
private void getShardLeaders() {
|
||||
Set<Entry<String,Map<String,Slice>>> collections = collectionStates.entrySet();
|
||||
for (Entry<String,Map<String,Slice>> collection : collections) {
|
||||
Map<String,Slice> state = collection.getValue();
|
||||
Set<Entry<String,Slice>> slices = state.entrySet();
|
||||
for (Entry<String,Slice> sliceEntry : slices) {
|
||||
Slice slice = sliceEntry.getValue();
|
||||
Map<String,Replica> shards = slice.getReplicasMap();
|
||||
Set<Entry<String,Replica>> shardsEntries = shards.entrySet();
|
||||
for (Entry<String,Replica> shardEntry : shardsEntries) {
|
||||
ZkNodeProps props = shardEntry.getValue();
|
||||
if (props.containsKey(ZkStateReader.LEADER_PROP)) {
|
||||
Map<String,ZkNodeProps> leadersForCollection = leaders.get(collection.getKey());
|
||||
if (leadersForCollection == null) {
|
||||
leadersForCollection = new HashMap<String,ZkNodeProps>();
|
||||
leaders.put(collection.getKey(), leadersForCollection);
|
||||
}
|
||||
leadersForCollection.put(sliceEntry.getKey(), props);
|
||||
break; // we found the leader for this shard
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get properties of a shard leader for specific collection.
|
||||
* Get properties of a shard/slice leader for specific collection.
|
||||
*/
|
||||
public ZkNodeProps getLeader(String collection, String shard) {
|
||||
Map<String,ZkNodeProps> collectionLeaders = leaders.get(collection);
|
||||
if (collectionLeaders == null) return null;
|
||||
return collectionLeaders.get(shard);
|
||||
public ZkNodeProps getLeader(String collection, String sliceName) {
|
||||
DocCollection coll = collectionStates.get(collection);
|
||||
if (coll == null) return null;
|
||||
Slice slice = coll.getSlice(sliceName);
|
||||
if (slice == null) return null;
|
||||
return slice.getLeader();
|
||||
}
|
||||
|
||||
/**
|
||||
* Get shard properties or null if shard is not found.
|
||||
* Get replica properties (if the slice is unknown) or null if replica is not found.
|
||||
* If the slice is known, do not use this method.
|
||||
* coreNodeName is the same as replicaName
|
||||
*/
|
||||
public Replica getShardProps(final String collection, final String coreNodeName) {
|
||||
Map<String, Slice> slices = getSlices(collection);
|
||||
if (slices == null) return null;
|
||||
for(Slice slice: slices.values()) {
|
||||
if(slice.getReplicasMap().get(coreNodeName)!=null) {
|
||||
return slice.getReplicasMap().get(coreNodeName);
|
||||
}
|
||||
public Replica getReplica(final String collection, final String coreNodeName) {
|
||||
return getReplica(collectionStates.get(collection), coreNodeName);
|
||||
}
|
||||
|
||||
private Replica getReplica(DocCollection coll, String replicaName) {
|
||||
if (coll == null) return null;
|
||||
for(Slice slice: coll.getSlices()) {
|
||||
Replica replica = slice.getReplica(replicaName);
|
||||
if (replica != null) return replica;
|
||||
}
|
||||
return null;
|
||||
}
|
||||
@ -134,22 +115,35 @@ public class ClusterState implements JSONWriter.Writable {
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the index Slice for collection.
|
||||
* Get the Slice for collection.
|
||||
*/
|
||||
public Slice getSlice(String collection, String slice) {
|
||||
if (collectionStates.containsKey(collection)
|
||||
&& collectionStates.get(collection).containsKey(slice))
|
||||
return collectionStates.get(collection).get(slice);
|
||||
return null;
|
||||
public Slice getSlice(String collection, String sliceName) {
|
||||
DocCollection coll = collectionStates.get(collection);
|
||||
if (coll == null) return null;
|
||||
return coll.getSlice(sliceName);
|
||||
}
|
||||
|
||||
public Map<String, Slice> getSlicesMap(String collection) {
|
||||
DocCollection coll = collectionStates.get(collection);
|
||||
if (coll == null) return null;
|
||||
return coll.getSlicesMap();
|
||||
}
|
||||
|
||||
public Collection<Slice> getSlices(String collection) {
|
||||
DocCollection coll = collectionStates.get(collection);
|
||||
if (coll == null) return null;
|
||||
return coll.getSlices();
|
||||
}
|
||||
|
||||
/**
|
||||
* Get all slices for collection.
|
||||
* Get the named DocCollection object, or thow an exception if it doesn't exist.
|
||||
*/
|
||||
public Map<String, Slice> getSlices(String collection) {
|
||||
if(!collectionStates.containsKey(collection))
|
||||
return null;
|
||||
return Collections.unmodifiableMap(collectionStates.get(collection));
|
||||
public DocCollection getCollection(String collection) {
|
||||
DocCollection coll = collectionStates.get(collection);
|
||||
if (coll == null) {
|
||||
throw new SolrException(ErrorCode.BAD_REQUEST, "Could not find collection:" + collection);
|
||||
}
|
||||
return coll;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -162,7 +156,7 @@ public class ClusterState implements JSONWriter.Writable {
|
||||
/**
|
||||
* @return Map<collectionName, Map<sliceName,Slice>>
|
||||
*/
|
||||
public Map<String, Map<String, Slice>> getCollectionStates() {
|
||||
public Map<String, DocCollection> getCollectionStates() {
|
||||
return Collections.unmodifiableMap(collectionStates);
|
||||
}
|
||||
|
||||
@ -174,17 +168,14 @@ public class ClusterState implements JSONWriter.Writable {
|
||||
}
|
||||
|
||||
/**
|
||||
* Get shardId for core.
|
||||
* @param coreNodeName in the form of nodeName_coreName
|
||||
* Get the slice/shardId for a core.
|
||||
* @param coreNodeName in the form of nodeName_coreName (the name of the replica)
|
||||
*/
|
||||
public String getShardId(String coreNodeName) {
|
||||
for (Entry<String, Map<String, Slice>> states: collectionStates.entrySet()){
|
||||
for(Entry<String, Slice> slices: states.getValue().entrySet()) {
|
||||
for(Entry<String, Replica> shards: slices.getValue().getReplicasMap().entrySet()){
|
||||
if(coreNodeName.equals(shards.getKey())) {
|
||||
return slices.getKey();
|
||||
}
|
||||
}
|
||||
// System.out.println("###### getShardId("+coreNodeName+") in " + collectionStates);
|
||||
for (DocCollection coll : collectionStates.values()) {
|
||||
for (Slice slice : coll.getSlices()) {
|
||||
if (slice.getReplicasMap().containsKey(coreNodeName)) return slice.getName();
|
||||
}
|
||||
}
|
||||
return null;
|
||||
@ -209,19 +200,14 @@ public class ClusterState implements JSONWriter.Writable {
|
||||
RangeInfo rangeInfo;
|
||||
rangeInfo = new RangeInfo();
|
||||
|
||||
Map<String,Slice> slices = getSlices(collection);
|
||||
DocCollection coll = getCollection(collection);
|
||||
|
||||
if (slices == null) {
|
||||
throw new SolrException(ErrorCode.BAD_REQUEST, "Can not find collection "
|
||||
+ collection + " in " + this);
|
||||
}
|
||||
|
||||
Set<String> shards = slices.keySet();
|
||||
Set<String> shards = coll.getSlicesMap().keySet();
|
||||
ArrayList<String> shardList = new ArrayList<String>(shards.size());
|
||||
shardList.addAll(shards);
|
||||
Collections.sort(shardList);
|
||||
|
||||
ranges = hp.partitionRange(shards.size(), Integer.MIN_VALUE, Integer.MAX_VALUE);
|
||||
ranges = DocRouter.DEFAULT.partitionRange(shards.size(), Integer.MIN_VALUE, Integer.MAX_VALUE);
|
||||
|
||||
rangeInfo.ranges = ranges;
|
||||
rangeInfo.shardList = shardList;
|
||||
@ -244,7 +230,7 @@ public class ClusterState implements JSONWriter.Writable {
|
||||
cnt++;
|
||||
}
|
||||
|
||||
throw new IllegalStateException("The HashPartitioner failed");
|
||||
throw new IllegalStateException("The DocRouter failed");
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -278,24 +264,59 @@ public class ClusterState implements JSONWriter.Writable {
|
||||
* @return the ClusterState
|
||||
*/
|
||||
public static ClusterState load(Integer version, byte[] bytes, Set<String> liveNodes) {
|
||||
// System.out.println("######## ClusterState.load:" + (bytes==null ? null : new String(bytes)));
|
||||
if (bytes == null || bytes.length == 0) {
|
||||
return new ClusterState(version, liveNodes, Collections.<String, Map<String,Slice>>emptyMap());
|
||||
return new ClusterState(version, liveNodes, Collections.<String, DocCollection>emptyMap());
|
||||
}
|
||||
Map<String, Object> stateMap = (Map<String, Object>) ZkStateReader.fromJSON(bytes);
|
||||
Map<String,DocCollection> collections = new LinkedHashMap<String,DocCollection>(stateMap.size());
|
||||
for (Entry<String, Object> entry : stateMap.entrySet()) {
|
||||
String collectionName = entry.getKey();
|
||||
DocCollection coll = collectionFromObjects(collectionName, (Map<String,Object>)entry.getValue());
|
||||
collections.put(collectionName, coll);
|
||||
}
|
||||
// System.out.println("########## Loading ClusterState:" + new String(bytes));
|
||||
LinkedHashMap<String, Object> stateMap = (LinkedHashMap<String, Object>) ZkStateReader.fromJSON(bytes);
|
||||
HashMap<String,Map<String, Slice>> state = new HashMap<String,Map<String,Slice>>();
|
||||
|
||||
for(String collectionName: stateMap.keySet()){
|
||||
Map<String, Object> collection = (Map<String, Object>)stateMap.get(collectionName);
|
||||
Map<String, Slice> slices = new LinkedHashMap<String,Slice>();
|
||||
// System.out.println("######## ClusterState.load result:" + collections);
|
||||
return new ClusterState(version, liveNodes, collections);
|
||||
}
|
||||
|
||||
for (Entry<String,Object> sliceEntry : collection.entrySet()) {
|
||||
Slice slice = new Slice(sliceEntry.getKey(), null, (Map<String,Object>)sliceEntry.getValue());
|
||||
slices.put(slice.getName(), slice);
|
||||
private static DocCollection collectionFromObjects(String name, Map<String,Object> objs) {
|
||||
Map<String,Object> props = (Map<String,Object>)objs.get(DocCollection.PROPERTIES);
|
||||
if (props == null) props = Collections.emptyMap();
|
||||
DocRouter router = getRouter(props.get(DocCollection.DOC_ROUTER));
|
||||
Map<String,Slice> slices = makeSlices(objs);
|
||||
return new DocCollection(name, slices, props, router);
|
||||
}
|
||||
|
||||
private static Map<String,Slice> makeSlices(Map<String,Object> genericSlices) {
|
||||
if (genericSlices == null) return Collections.emptyMap();
|
||||
Map<String,Slice> result = new LinkedHashMap<String, Slice>(genericSlices.size());
|
||||
for (Map.Entry<String,Object> entry : genericSlices.entrySet()) {
|
||||
String name = entry.getKey();
|
||||
if (DocCollection.PROPERTIES.equals(name)) continue; // skip special properties entry
|
||||
Object val = entry.getValue();
|
||||
Slice s;
|
||||
if (val instanceof Slice) {
|
||||
s = (Slice)val;
|
||||
} else {
|
||||
s = new Slice(name, null, (Map<String,Object>)val);
|
||||
}
|
||||
state.put(collectionName, slices);
|
||||
result.put(name, s);
|
||||
}
|
||||
return new ClusterState(version, liveNodes, state);
|
||||
return result;
|
||||
}
|
||||
|
||||
|
||||
private static DocRouter getRouter(Object routerSpec) {
|
||||
if (routerSpec == null) return new PlainIdRouter(); // back compat with 4.0
|
||||
|
||||
if (DocRouter.DEFAULT_NAME.equals(routerSpec)) {
|
||||
return DocRouter.DEFAULT;
|
||||
}
|
||||
|
||||
// TODO: how to instantiate custom routers?
|
||||
|
||||
throw new SolrException(ErrorCode.SERVER_ERROR, "Unknown document router '"+ routerSpec + "'");
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -0,0 +1,90 @@
|
||||
package org.apache.solr.common.cloud;
|
||||
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
import org.apache.noggit.JSONUtil;
|
||||
import org.apache.noggit.JSONWriter;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* Models a Collection in zookeeper (but that Java name is obviously taken, hence "DocCollection")
|
||||
*/
|
||||
public class DocCollection extends ZkNodeProps {
|
||||
public static final String PROPERTIES = "properties";
|
||||
public static final String DOC_ROUTER = "router";
|
||||
|
||||
private final String name;
|
||||
private final Map<String, Slice> slices;
|
||||
private final DocRouter router;
|
||||
|
||||
public DocCollection(String name, Map<String, Slice> slices, Map<String, Object> props, DocRouter router) {
|
||||
super(props == null ? new HashMap<String,Object>(1) : props);
|
||||
this.name = name;
|
||||
this.slices = slices;
|
||||
this.router = router;
|
||||
assert name != null && slices != null;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Return collection name.
|
||||
*/
|
||||
public String getName() {
|
||||
return name;
|
||||
}
|
||||
|
||||
public Slice getSlice(String sliceName) {
|
||||
return slices.get(sliceName);
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the list of slices for this collection.
|
||||
*/
|
||||
public Collection<Slice> getSlices() {
|
||||
return slices.values();
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the map of slices (sliceName->Slice) for this collection.
|
||||
*/
|
||||
public Map<String, Slice> getSlicesMap() {
|
||||
return slices;
|
||||
}
|
||||
|
||||
public DocRouter getRouter() {
|
||||
return router;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "DocCollection("+name+")=" + JSONUtil.toJSON(this);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void write(JSONWriter jsonWriter) {
|
||||
// write out the properties under "properties"
|
||||
LinkedHashMap<String,Object> all = new LinkedHashMap<String,Object>(slices.size()+1);
|
||||
all.put(PROPERTIES, propMap);
|
||||
all.putAll(slices);
|
||||
jsonWriter.write(all);
|
||||
}
|
||||
}
|
@ -18,6 +18,10 @@ package org.apache.solr.common.cloud;
|
||||
*/
|
||||
|
||||
import org.apache.noggit.JSONWriter;
|
||||
import org.apache.solr.common.SolrException;
|
||||
import org.apache.solr.common.SolrInputDocument;
|
||||
import org.apache.solr.common.params.SolrParams;
|
||||
import org.apache.solr.common.util.Hash;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
@ -25,9 +29,11 @@ import java.util.List;
|
||||
|
||||
/**
|
||||
* Class to partition int range into n ranges.
|
||||
*
|
||||
* @lucene.experimental
|
||||
*/
|
||||
public class HashPartitioner {
|
||||
public class DocRouter {
|
||||
public static final String DEFAULT_NAME = "compositeId";
|
||||
public static final DocRouter DEFAULT = new CompositeIdRouter();
|
||||
|
||||
// Hash ranges can't currently "wrap" - i.e. max must be greater or equal to min.
|
||||
// TODO: ranges may not be all contiguous in the future (either that or we will
|
||||
@ -115,4 +121,95 @@ public class HashPartitioner {
|
||||
return ranges;
|
||||
}
|
||||
|
||||
|
||||
public int shardHash(String id, SolrInputDocument sdoc, SolrParams params) {
|
||||
return Hash.murmurhash3_x86_32(id, 0, id.length(), 0);
|
||||
}
|
||||
|
||||
public String getId(SolrInputDocument sdoc, SolrParams params) {
|
||||
Object idObj = sdoc.getFieldValue("id"); // blech
|
||||
String id = idObj != null ? idObj.toString() : "null"; // should only happen on client side
|
||||
return id;
|
||||
}
|
||||
|
||||
public Slice getTargetShard(String id, SolrInputDocument sdoc, SolrParams params, DocCollection collection) {
|
||||
if (id == null) id = getId(sdoc, params);
|
||||
int hash = shardHash(id, sdoc, params);
|
||||
return hashToSlice(hash, collection);
|
||||
}
|
||||
|
||||
protected Slice hashToSlice(int hash, DocCollection collection) {
|
||||
for (Slice slice : collection.getSlices()) {
|
||||
DocRouter.Range range = slice.getRange();
|
||||
if (range != null && range.includes(hash)) return slice;
|
||||
}
|
||||
// return null or throw exception?
|
||||
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "No slice servicing hash code " + Integer.toHexString(hash) + " in " + collection);
|
||||
}
|
||||
|
||||
/*
|
||||
List<Slice> shardQuery(String id, SolrParams params, ClusterState state)
|
||||
List<Slice> shardQuery(SolrParams params, ClusterState state)
|
||||
*/
|
||||
}
|
||||
|
||||
|
||||
class PlainIdRouter extends DocRouter {
|
||||
|
||||
}
|
||||
|
||||
//
|
||||
// user!uniqueid
|
||||
// user,4!uniqueid
|
||||
//
|
||||
class CompositeIdRouter extends DocRouter {
|
||||
private int separator = '!';
|
||||
private int bits = 16;
|
||||
private int mask1 = 0xffff0000;
|
||||
private int mask2 = 0x0000ffff;
|
||||
|
||||
protected void setBits(int bits) {
|
||||
this.bits = bits;
|
||||
mask1 = -1 << (32-bits);
|
||||
mask2 = -1 >>> bits;
|
||||
}
|
||||
|
||||
protected int getBits(String firstPart, int commaIdx) {
|
||||
int v = 0;
|
||||
for (int idx = commaIdx +1; idx<firstPart.length(); idx++) {
|
||||
char ch = firstPart.charAt(idx);
|
||||
if (ch < '0' || ch > '9') return -1;
|
||||
v *= 10 + (ch - '0');
|
||||
}
|
||||
return v > 32 ? -1 : v;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int shardHash(String id, SolrInputDocument doc, SolrParams params) {
|
||||
int idx = id.indexOf(separator);
|
||||
if (idx < 0) {
|
||||
return Hash.murmurhash3_x86_32(id, 0, id.length(), 0);
|
||||
}
|
||||
|
||||
int m1 = mask1;
|
||||
int m2 = mask2;
|
||||
|
||||
String part1 = id.substring(0,idx);
|
||||
int commaIdx = part1.indexOf(',');
|
||||
if (commaIdx > 0) {
|
||||
int firstBits = getBits(part1, commaIdx);
|
||||
if (firstBits >= 0) {
|
||||
m1 = -1 << (32-firstBits);
|
||||
m2 = -1 >>> firstBits;
|
||||
part1 = part1.substring(0, commaIdx); // actually, this isn't strictly necessary
|
||||
}
|
||||
}
|
||||
|
||||
String part2 = id.substring(idx+1);
|
||||
|
||||
int hash1 = Hash.murmurhash3_x86_32(part1, 0, part1.length(), 0);
|
||||
int hash2 = Hash.murmurhash3_x86_32(part2, 0, part2.length(), 0);
|
||||
return (hash1 & m1) | (hash2 & m2);
|
||||
}
|
||||
|
||||
}
|
@ -41,4 +41,5 @@ public class Replica extends ZkNodeProps {
|
||||
return name + ':' + JSONUtil.toJSON(propMap, -1); // small enough, keep it on one line (i.e. no indent)
|
||||
}
|
||||
|
||||
// TODO: should we have a pointer back to the slice the replica belongs to?
|
||||
}
|
||||
|
@ -34,7 +34,7 @@ public class Slice extends ZkNodeProps {
|
||||
public static String LEADER = "leader"; // FUTURE: do we want to record the leader as a slice property in the JSON (as opposed to isLeader as a replica property?)
|
||||
|
||||
private final String name;
|
||||
private final HashPartitioner.Range range;
|
||||
private final DocRouter.Range range;
|
||||
private final Integer replicationFactor;
|
||||
private final Map<String,Replica> replicas;
|
||||
private final Replica leader;
|
||||
@ -49,11 +49,11 @@ public class Slice extends ZkNodeProps {
|
||||
this.name = name;
|
||||
|
||||
Object rangeObj = propMap.get(RANGE);
|
||||
HashPartitioner.Range tmpRange = null;
|
||||
if (rangeObj instanceof HashPartitioner.Range) {
|
||||
tmpRange = (HashPartitioner.Range)rangeObj;
|
||||
DocRouter.Range tmpRange = null;
|
||||
if (rangeObj instanceof DocRouter.Range) {
|
||||
tmpRange = (DocRouter.Range)rangeObj;
|
||||
} else if (rangeObj != null) {
|
||||
HashPartitioner hp = new HashPartitioner();
|
||||
DocRouter hp = new DocRouter();
|
||||
tmpRange = hp.fromString(rangeObj.toString());
|
||||
}
|
||||
range = tmpRange;
|
||||
@ -121,6 +121,14 @@ public class Slice extends ZkNodeProps {
|
||||
return leader;
|
||||
}
|
||||
|
||||
public Replica getReplica(String replicaName) {
|
||||
return replicas.get(replicaName);
|
||||
}
|
||||
|
||||
public DocRouter.Range getRange() {
|
||||
return range;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return name + ':' + JSONUtil.toJSON(propMap);
|
||||
|
@ -39,6 +39,8 @@ public class ZkNodeProps implements JSONWriter.Writable {
|
||||
*/
|
||||
public ZkNodeProps(Map<String,Object> propMap) {
|
||||
this.propMap = propMap;
|
||||
// TODO: store an unmodifiable map, but in a way that guarantees not to wrap more than once.
|
||||
// Always wrapping introduces a memory leak.
|
||||
}
|
||||
|
||||
|
||||
@ -70,14 +72,14 @@ public class ZkNodeProps implements JSONWriter.Writable {
|
||||
* Get property keys.
|
||||
*/
|
||||
public Set<String> keySet() {
|
||||
return Collections.unmodifiableSet(propMap.keySet());
|
||||
return propMap.keySet();
|
||||
}
|
||||
|
||||
/**
|
||||
* Get all properties as map.
|
||||
*/
|
||||
public Map<String, Object> getProperties() {
|
||||
return Collections.unmodifiableMap(propMap);
|
||||
return propMap;
|
||||
}
|
||||
|
||||
/** Returns a shallow writable copy of the properties */
|
||||
|
@ -450,7 +450,7 @@ public class ZkStateReader {
|
||||
if (clusterState == null) {
|
||||
return null;
|
||||
}
|
||||
Map<String,Slice> slices = clusterState.getSlices(collection);
|
||||
Map<String,Slice> slices = clusterState.getSlicesMap(collection);
|
||||
if (slices == null) {
|
||||
throw new ZooKeeperException(ErrorCode.BAD_REQUEST,
|
||||
"Could not find collection in zk: " + collection + " "
|
||||
|
@ -79,8 +79,16 @@ public abstract class AbstractDistribZkTestBase extends BaseDistributedSearchTes
|
||||
FileUtils.copyDirectory(new File(getSolrHome()), controlHome);
|
||||
|
||||
System.setProperty("collection", "control_collection");
|
||||
controlJetty = createJetty(controlHome, null, "control_shard");
|
||||
String numShardsS = System.getProperty(ZkStateReader.NUM_SHARDS_PROP);
|
||||
System.setProperty(ZkStateReader.NUM_SHARDS_PROP, "1");
|
||||
controlJetty = createJetty(controlHome, null); // let the shardId default to shard1
|
||||
System.clearProperty("collection");
|
||||
if(numShardsS != null) {
|
||||
System.setProperty(ZkStateReader.NUM_SHARDS_PROP, numShardsS);
|
||||
} else {
|
||||
System.clearProperty(ZkStateReader.NUM_SHARDS_PROP);
|
||||
}
|
||||
|
||||
controlClient = createNewSolrServer(controlJetty.getLocalPort());
|
||||
|
||||
StringBuilder sb = new StringBuilder();
|
||||
@ -128,7 +136,7 @@ public abstract class AbstractDistribZkTestBase extends BaseDistributedSearchTes
|
||||
boolean sawLiveRecovering = false;
|
||||
zkStateReader.updateClusterState(true);
|
||||
ClusterState clusterState = zkStateReader.getClusterState();
|
||||
Map<String,Slice> slices = clusterState.getSlices(collection);
|
||||
Map<String,Slice> slices = clusterState.getSlicesMap(collection);
|
||||
assertNotNull("Could not find collection:" + collection, slices);
|
||||
for (Map.Entry<String,Slice> entry : slices.entrySet()) {
|
||||
Map<String,Replica> shards = entry.getValue().getReplicasMap();
|
||||
@ -181,7 +189,7 @@ public abstract class AbstractDistribZkTestBase extends BaseDistributedSearchTes
|
||||
|
||||
zkStateReader.updateClusterState(true);
|
||||
ClusterState clusterState = zkStateReader.getClusterState();
|
||||
Map<String,Slice> slices = clusterState.getSlices(collection);
|
||||
Map<String,Slice> slices = clusterState.getSlicesMap(collection);
|
||||
if (slices == null) {
|
||||
throw new IllegalArgumentException("Cannot find collection:" + collection);
|
||||
}
|
||||
|
@ -226,18 +226,22 @@ public abstract class AbstractFullDistribZkTestBase extends AbstractDistribZkTes
|
||||
|
||||
System.setProperty("collection", "control_collection");
|
||||
String numShards = System.getProperty(ZkStateReader.NUM_SHARDS_PROP);
|
||||
System.clearProperty(ZkStateReader.NUM_SHARDS_PROP);
|
||||
|
||||
|
||||
// we want hashes by default, so set to 1 shard as opposed to leaving unset
|
||||
// System.clearProperty(ZkStateReader.NUM_SHARDS_PROP);
|
||||
System.setProperty(ZkStateReader.NUM_SHARDS_PROP, "1");
|
||||
|
||||
File controlJettyDir = new File(TEMP_DIR,
|
||||
getClass().getName() + "-controljetty-" + System.currentTimeMillis());
|
||||
org.apache.commons.io.FileUtils.copyDirectory(new File(getSolrHome()), controlJettyDir);
|
||||
|
||||
controlJetty = createJetty(controlJettyDir, testDir + "/control/data",
|
||||
"control_shard");
|
||||
controlJetty = createJetty(controlJettyDir, testDir + "/control/data"); // don't pass shard name... let it default to "shard1"
|
||||
System.clearProperty("collection");
|
||||
if(numShards != null) {
|
||||
System.setProperty(ZkStateReader.NUM_SHARDS_PROP, numShards);
|
||||
}
|
||||
} else {
|
||||
System.clearProperty(ZkStateReader.NUM_SHARDS_PROP);
|
||||
}
|
||||
controlClient = createNewSolrServer(controlJetty.getLocalPort());
|
||||
|
||||
initCloud();
|
||||
@ -339,7 +343,7 @@ public abstract class AbstractFullDistribZkTestBase extends AbstractDistribZkTes
|
||||
|
||||
protected int getNumShards(String collection) {
|
||||
ZkStateReader zkStateReader = cloudClient.getZkStateReader();
|
||||
Map<String,Slice> slices = zkStateReader.getClusterState().getSlices(collection);
|
||||
Map<String,Slice> slices = zkStateReader.getClusterState().getSlicesMap(collection);
|
||||
if (slices == null) {
|
||||
throw new IllegalArgumentException("Could not find collection:" + collection);
|
||||
}
|
||||
@ -371,7 +375,7 @@ public abstract class AbstractFullDistribZkTestBase extends AbstractDistribZkTes
|
||||
shardToJetty.clear();
|
||||
|
||||
ClusterState clusterState = zkStateReader.getClusterState();
|
||||
Map<String,Slice> slices = clusterState.getSlices(DEFAULT_COLLECTION);
|
||||
Map<String,Slice> slices = clusterState.getSlicesMap(DEFAULT_COLLECTION);
|
||||
|
||||
if (slices == null) {
|
||||
throw new RuntimeException("No slices found for collection "
|
||||
@ -943,7 +947,7 @@ public abstract class AbstractFullDistribZkTestBase extends AbstractDistribZkTes
|
||||
try {
|
||||
zk.createClusterStateWatchersAndUpdate();
|
||||
clusterState = zk.getClusterState();
|
||||
slices = clusterState.getSlices(DEFAULT_COLLECTION);
|
||||
slices = clusterState.getSlicesMap(DEFAULT_COLLECTION);
|
||||
} finally {
|
||||
zk.close();
|
||||
}
|
||||
|
@ -263,7 +263,7 @@ public class ChaosMonkey {
|
||||
}
|
||||
|
||||
private String getRandomSlice() {
|
||||
Map<String,Slice> slices = zkStateReader.getClusterState().getSlices(collection);
|
||||
Map<String,Slice> slices = zkStateReader.getClusterState().getSlicesMap(collection);
|
||||
|
||||
List<String> sliceKeyList = new ArrayList<String>(slices.size());
|
||||
sliceKeyList.addAll(slices.keySet());
|
||||
@ -292,7 +292,7 @@ public class ChaosMonkey {
|
||||
// get latest cloud state
|
||||
zkStateReader.updateClusterState(true);
|
||||
|
||||
Slice theShards = zkStateReader.getClusterState().getSlices(collection)
|
||||
Slice theShards = zkStateReader.getClusterState().getSlicesMap(collection)
|
||||
.get(slice);
|
||||
|
||||
ZkNodeProps props = theShards.getReplicasMap().get(cloudJetty.coreNodeName);
|
||||
|
Loading…
x
Reference in New Issue
Block a user