SOLR-5338: Split shards by a route key using split.key parameter

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1531845 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Shalin Shekhar Mangar 2013-10-14 10:48:02 +00:00
parent 9ad9a5f65a
commit 116439fa67
7 changed files with 223 additions and 11 deletions

View File

@ -95,6 +95,8 @@ New Features
* SOLR-5324: Make sub shard replica recovery and shard state switch asynchronous.
(Yago Riveiro, shalin)
* SOLR-5338: Split shards by a route key using split.key parameter. (shalin)
Bug Fixes
----------------------

View File

@ -31,6 +31,7 @@ import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.cloud.Aliases;
import org.apache.solr.common.cloud.ClosableThread;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.CompositeIdRouter;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.DocRouter;
import org.apache.solr.common.cloud.ImplicitDocRouter;
@ -57,6 +58,7 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@ -487,7 +489,33 @@ public class OverseerCollectionProcessor implements Runnable, ClosableThread {
log.info("Split shard invoked");
String collectionName = message.getStr("collection");
String slice = message.getStr(ZkStateReader.SHARD_ID_PROP);
Slice parentSlice = clusterState.getSlice(collectionName, slice);
String splitKey = message.getStr("split.key");
DocCollection collection = clusterState.getCollection(collectionName);
DocRouter router = collection.getRouter() != null ? collection.getRouter() : DocRouter.DEFAULT;
Slice parentSlice = null;
if (slice == null) {
if (router instanceof CompositeIdRouter) {
Collection<Slice> searchSlices = router.getSearchSlicesSingle(splitKey, new ModifiableSolrParams(), collection);
if (searchSlices.isEmpty()) {
throw new SolrException(ErrorCode.BAD_REQUEST, "Unable to find an active shard for split.key: " + splitKey);
}
if (searchSlices.size() > 1) {
throw new SolrException(ErrorCode.BAD_REQUEST,
"Splitting a split.key: " + splitKey + " which spans multiple shards is not supported");
}
parentSlice = searchSlices.iterator().next();
slice = parentSlice.getName();
log.info("Split by route.key: {}, parent shard is: {} ", splitKey, slice);
} else {
throw new SolrException(ErrorCode.BAD_REQUEST,
"Split by route key can only be used with CompositeIdRouter or subclass. Found router: " + router.getClass().getName());
}
} else {
parentSlice = clusterState.getSlice(collectionName, slice);
}
if (parentSlice == null) {
if(clusterState.getCollections().contains(collectionName)) {
@ -499,8 +527,6 @@ public class OverseerCollectionProcessor implements Runnable, ClosableThread {
// find the leader for the shard
Replica parentShardLeader = clusterState.getLeader(collectionName, slice);
DocCollection collection = clusterState.getCollection(collectionName);
DocRouter router = collection.getRouter() != null ? collection.getRouter() : DocRouter.DEFAULT;
DocRouter.Range range = parentSlice.getRange();
if (range == null) {
range = new PlainIdRouter().fullRange();
@ -527,6 +553,23 @@ public class OverseerCollectionProcessor implements Runnable, ClosableThread {
}
}
}
} else if (splitKey != null) {
if (router instanceof CompositeIdRouter) {
CompositeIdRouter compositeIdRouter = (CompositeIdRouter) router;
subRanges = compositeIdRouter.partitionRangeByKey(splitKey, range);
if (subRanges.size() == 1) {
throw new SolrException(ErrorCode.BAD_REQUEST,
"The split.key: " + splitKey + " has a hash range that is exactly equal to hash range of shard: " + slice);
}
log.info("Partitioning parent shard " + slice + " range: " + parentSlice.getRange() + " yields: " + subRanges);
rangesStr = "";
for (int i = 0; i < subRanges.size(); i++) {
DocRouter.Range subRange = subRanges.get(i);
rangesStr += subRange.toString();
if (i < subRanges.size() - 1)
rangesStr += ',';
}
}
} else {
// todo: fixed to two partitions?
subRanges = router.partitionRange(2, range);

View File

@ -369,13 +369,31 @@ public class CollectionsHandler extends RequestHandlerBase {
log.info("Splitting shard : " + req.getParamString());
String name = req.getParams().required().get("collection");
// TODO : add support for multiple shards
String shard = req.getParams().required().get("shard");
String shard = req.getParams().get("shard");
String rangesStr = req.getParams().get(CoreAdminParams.RANGES);
String splitKey = req.getParams().get("split.key");
if (splitKey == null && shard == null) {
throw new SolrException( SolrException.ErrorCode.BAD_REQUEST, "Missing required parameter: shard");
}
if (splitKey != null && shard != null) {
throw new SolrException( SolrException.ErrorCode.BAD_REQUEST,
"Only one of 'shard' or 'split.key' should be specified");
}
if (splitKey != null && rangesStr != null) {
throw new SolrException( SolrException.ErrorCode.BAD_REQUEST,
"Only one of 'ranges' or 'split.key' should be specified");
}
Map<String,Object> props = new HashMap<String,Object>();
props.put(Overseer.QUEUE_OPERATION, OverseerCollectionProcessor.SPLITSHARD);
props.put("collection", name);
props.put(ZkStateReader.SHARD_ID_PROP, shard);
if (shard != null) {
props.put(ZkStateReader.SHARD_ID_PROP, shard);
}
if (splitKey != null) {
props.put("split.key", splitKey);
}
if (rangesStr != null) {
props.put(CoreAdminParams.RANGES, rangesStr);
}

View File

@ -135,7 +135,7 @@ public class ChaosMonkeyShardSplitTest extends ShardSplitTest {
killerThread.start();
killCounter.incrementAndGet();
splitShard(AbstractDistribZkTestBase.DEFAULT_COLLECTION, SHARD1, null);
splitShard(AbstractDistribZkTestBase.DEFAULT_COLLECTION, SHARD1, null, null);
log.info("Layout after split: \n");
printLayout();

View File

@ -28,6 +28,7 @@ import org.apache.solr.client.solrj.request.QueryRequest;
import org.apache.solr.client.solrj.response.QueryResponse;
import org.apache.solr.common.SolrDocument;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.CompositeIdRouter;
import org.apache.solr.common.cloud.DocRouter;
import org.apache.solr.common.cloud.HashBasedRouter;
import org.apache.solr.common.cloud.Replica;
@ -110,6 +111,7 @@ public class ShardSplitTest extends BasicDistributedZkTest {
splitByUniqueKeyTest();
splitByRouteFieldTest();
splitByRouteKeyTest();
// todo can't call waitForThingsToLevelOut because it looks for jettys of all shards
// and the new sub-shards don't have any.
@ -176,7 +178,7 @@ public class ShardSplitTest extends BasicDistributedZkTest {
try {
for (int i = 0; i < 3; i++) {
try {
splitShard(AbstractDistribZkTestBase.DEFAULT_COLLECTION, SHARD1, subRanges);
splitShard(AbstractDistribZkTestBase.DEFAULT_COLLECTION, SHARD1, subRanges, null);
log.info("Layout after split: \n");
printLayout();
break;
@ -262,7 +264,7 @@ public class ShardSplitTest extends BasicDistributedZkTest {
for (int i = 0; i < 3; i++) {
try {
splitShard(collectionName, SHARD1, null);
splitShard(collectionName, SHARD1, null, null);
break;
} catch (HttpSolrServer.RemoteSolrException e) {
if (e.code() != 500) {
@ -281,6 +283,96 @@ public class ShardSplitTest extends BasicDistributedZkTest {
assertEquals(docCounts[1], collectionClient.query(new SolrQuery("*:*").setParam("shards", "shard1_1")).getResults().getNumFound());
}
private void splitByRouteKeyTest() throws Exception {
log.info("Starting splitByRouteKeyTest");
String collectionName = "splitByRouteKeyTest";
int numShards = 4;
int replicationFactor = 2;
int maxShardsPerNode = (((numShards * replicationFactor) / getCommonCloudSolrServer()
.getZkStateReader().getClusterState().getLiveNodes().size())) + 1;
HashMap<String, List<Integer>> collectionInfos = new HashMap<String, List<Integer>>();
CloudSolrServer client = null;
try {
client = createCloudClient(null);
Map<String, Object> props = ZkNodeProps.makeMap(
REPLICATION_FACTOR, replicationFactor,
MAX_SHARDS_PER_NODE, maxShardsPerNode,
NUM_SLICES, numShards);
createCollection(collectionInfos, collectionName,props,client);
} finally {
if (client != null) client.shutdown();
}
List<Integer> list = collectionInfos.get(collectionName);
checkForCollection(collectionName, list, null);
waitForRecoveriesToFinish(false);
String url = CustomCollectionTest.getUrlFromZk(getCommonCloudSolrServer().getZkStateReader().getClusterState(), collectionName);
HttpSolrServer collectionClient = new HttpSolrServer(url);
String splitKey = "b!";
ClusterState clusterState = cloudClient.getZkStateReader().getClusterState();
final DocRouter router = clusterState.getCollection(collectionName).getRouter();
Slice shard1 = clusterState.getSlice(collectionName, SHARD1);
DocRouter.Range shard1Range = shard1.getRange() != null ? shard1.getRange() : router.fullRange();
final List<DocRouter.Range> ranges = ((CompositeIdRouter) router).partitionRangeByKey(splitKey, shard1Range);
final int[] docCounts = new int[ranges.size()];
int uniqIdentifier = (1<<12);
int splitKeyDocCount = 0;
for (int i = 100; i <= 200; i++) {
String shardKey = "" + (char)('a' + (i % 26)); // See comment in ShardRoutingTest for hash distribution
String idStr = shardKey + "!" + i;
collectionClient.add(getDoc(id, idStr, "n_ti", (shardKey + "!").equals(splitKey) ? uniqIdentifier : i));
int idx = getHashRangeIdx(router, ranges, idStr);
if (idx != -1) {
docCounts[idx]++;
}
if (splitKey.equals(shardKey + "!"))
splitKeyDocCount++;
}
for (int i = 0; i < docCounts.length; i++) {
int docCount = docCounts[i];
log.info("Shard {} docCount = {}", "shard1_" + i, docCount);
}
log.info("Route key doc count = {}", splitKeyDocCount);
collectionClient.commit();
for (int i = 0; i < 3; i++) {
try {
splitShard(collectionName, null, null, splitKey);
break;
} catch (HttpSolrServer.RemoteSolrException e) {
if (e.code() != 500) {
throw e;
}
log.error("SPLITSHARD failed. " + (i < 2 ? " Retring split" : ""), e);
if (i == 2) {
fail("SPLITSHARD was not successful even after three tries");
}
}
}
waitForRecoveriesToFinish(collectionName, false);
SolrQuery solrQuery = new SolrQuery("*:*");
assertEquals("DocCount on shard1_0 does not match", docCounts[0], collectionClient.query(solrQuery.setParam("shards", "shard1_0")).getResults().getNumFound());
assertEquals("DocCount on shard1_1 does not match", docCounts[1], collectionClient.query(solrQuery.setParam("shards", "shard1_1")).getResults().getNumFound());
assertEquals("DocCount on shard1_2 does not match", docCounts[2], collectionClient.query(solrQuery.setParam("shards", "shard1_2")).getResults().getNumFound());
solrQuery = new SolrQuery("n_ti:" + uniqIdentifier);
assertEquals("shard1_0 must have 0 docs for route key: " + splitKey, 0, collectionClient.query(solrQuery.setParam("shards", "shard1_0")).getResults().getNumFound());
assertEquals("Wrong number of docs on shard1_1 for route key: " + splitKey, splitKeyDocCount, collectionClient.query(solrQuery.setParam("shards", "shard1_1")).getResults().getNumFound());
assertEquals("shard1_2 must have 0 docs for route key: " + splitKey, 0, collectionClient.query(solrQuery.setParam("shards", "shard1_2")).getResults().getNumFound());
}
protected void checkDocCountsAndShardStates(int[] docCounts, int numReplicas) throws Exception {
ClusterState clusterState = null;
Slice slice1_0 = null, slice1_1 = null;
@ -351,11 +443,13 @@ public class ShardSplitTest extends BasicDistributedZkTest {
}
}
protected void splitShard(String collection, String shardId, List<DocRouter.Range> subRanges) throws SolrServerException, IOException {
protected void splitShard(String collection, String shardId, List<DocRouter.Range> subRanges, String splitKey) throws SolrServerException, IOException {
ModifiableSolrParams params = new ModifiableSolrParams();
params.set("action", CollectionParams.CollectionAction.SPLITSHARD.toString());
params.set("collection", collection);
params.set("shard", shardId);
if (shardId != null) {
params.set("shard", shardId);
}
if (subRanges != null) {
StringBuilder ranges = new StringBuilder();
for (int i = 0; i < subRanges.size(); i++) {
@ -366,6 +460,9 @@ public class ShardSplitTest extends BasicDistributedZkTest {
}
params.set("ranges", ranges.toString());
}
if (splitKey != null) {
params.set("split.key", splitKey);
}
SolrRequest request = new QueryRequest(params);
request.setPath("/admin/collections");

View File

@ -273,6 +273,12 @@ public class SolrIndexSplitterTest extends SolrTestCaseJ4 {
bytes = id2.getBytes("UTF-8");
int maxHash = Hash.murmurhash3_x86_32(bytes, 0, bytes.length, 0);
if (minHash > maxHash) {
int temp = maxHash;
maxHash = minHash;
minHash = temp;
}
PlainIdRouter router = new PlainIdRouter();
DocRouter.Range fullRange = new DocRouter.Range(minHash, maxHash);
return router.partitionRange(2, fullRange);

View File

@ -93,6 +93,31 @@ public class CompositeIdRouter extends HashBasedRouter {
return (hash1 & m1) | (hash2 & m2);
}
public Range keyHashRange(String routeKey) {
int idx = routeKey.indexOf(separator);
if (idx < 0) {
throw new IllegalArgumentException("Route key must be a composite id");
}
String part1 = routeKey.substring(0, idx);
int commaIdx = part1.indexOf(bitsSeparator);
int m1 = mask1;
int m2 = mask2;
if (commaIdx > 0) {
int firstBits = getBits(part1, commaIdx);
if (firstBits >= 0) {
m1 = firstBits==0 ? 0 : (-1 << (32-firstBits));
m2 = firstBits==32 ? 0 : (-1 >>> firstBits);
part1 = part1.substring(0, commaIdx);
}
}
int hash = Hash.murmurhash3_x86_32(part1, 0, part1.length(), 0);
int min = hash & m1;
int max = min | m2;
return new Range(min, max);
}
@Override
public Collection<Slice> getSearchSlicesSingle(String shardKey, SolrParams params, DocCollection collection) {
if (shardKey == null) {
@ -149,6 +174,27 @@ public class CompositeIdRouter extends HashBasedRouter {
return targetSlices;
}
public List<Range> partitionRangeByKey(String key, Range range) {
List<Range> result = new ArrayList<Range>(3);
Range keyRange = keyHashRange(key);
if (!keyRange.overlaps(range)) {
throw new IllegalArgumentException("Key range does not overlap given range");
}
if (keyRange.equals(range)) {
return Collections.singletonList(keyRange);
} else if (keyRange.isSubsetOf(range)) {
result.add(new Range(range.min, keyRange.min - 1));
result.add(keyRange);
result.add((new Range(keyRange.max + 1, range.max)));
} else if (range.includes(keyRange.max)) {
result.add(new Range(range.min, keyRange.max));
result.add(new Range(keyRange.max + 1, range.max));
} else {
result.add(new Range(range.min, keyRange.min - 1));
result.add(new Range(keyRange.min, range.max));
}
return result;
}
@Override
public List<Range> partitionRange(int partitions, Range range) {