From 116439fa676dc505bf731e0e72de2e3e56e62ee3 Mon Sep 17 00:00:00 2001 From: Shalin Shekhar Mangar Date: Mon, 14 Oct 2013 10:48:02 +0000 Subject: [PATCH] SOLR-5338: Split shards by a route key using split.key parameter git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1531845 13f79535-47bb-0310-9956-ffa450edef68 --- solr/CHANGES.txt | 2 + .../cloud/OverseerCollectionProcessor.java | 51 ++++++++- .../handler/admin/CollectionsHandler.java | 22 +++- .../solr/cloud/ChaosMonkeyShardSplitTest.java | 2 +- .../org/apache/solr/cloud/ShardSplitTest.java | 105 +++++++++++++++++- .../solr/update/SolrIndexSplitterTest.java | 6 + .../solr/common/cloud/CompositeIdRouter.java | 46 ++++++++ 7 files changed, 223 insertions(+), 11 deletions(-) diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt index cb0995ff7ef..92f0b9ad518 100644 --- a/solr/CHANGES.txt +++ b/solr/CHANGES.txt @@ -95,6 +95,8 @@ New Features * SOLR-5324: Make sub shard replica recovery and shard state switch asynchronous. (Yago Riveiro, shalin) +* SOLR-5338: Split shards by a route key using split.key parameter. (shalin) + Bug Fixes ---------------------- diff --git a/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java b/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java index ffffd02188e..f6ae9128692 100644 --- a/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java +++ b/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java @@ -31,6 +31,7 @@ import org.apache.solr.common.SolrException.ErrorCode; import org.apache.solr.common.cloud.Aliases; import org.apache.solr.common.cloud.ClosableThread; import org.apache.solr.common.cloud.ClusterState; +import org.apache.solr.common.cloud.CompositeIdRouter; import org.apache.solr.common.cloud.DocCollection; import org.apache.solr.common.cloud.DocRouter; import org.apache.solr.common.cloud.ImplicitDocRouter; @@ -57,6 +58,7 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -487,8 +489,34 @@ public class OverseerCollectionProcessor implements Runnable, ClosableThread { log.info("Split shard invoked"); String collectionName = message.getStr("collection"); String slice = message.getStr(ZkStateReader.SHARD_ID_PROP); - Slice parentSlice = clusterState.getSlice(collectionName, slice); - + String splitKey = message.getStr("split.key"); + + DocCollection collection = clusterState.getCollection(collectionName); + DocRouter router = collection.getRouter() != null ? collection.getRouter() : DocRouter.DEFAULT; + + Slice parentSlice = null; + + if (slice == null) { + if (router instanceof CompositeIdRouter) { + Collection searchSlices = router.getSearchSlicesSingle(splitKey, new ModifiableSolrParams(), collection); + if (searchSlices.isEmpty()) { + throw new SolrException(ErrorCode.BAD_REQUEST, "Unable to find an active shard for split.key: " + splitKey); + } + if (searchSlices.size() > 1) { + throw new SolrException(ErrorCode.BAD_REQUEST, + "Splitting a split.key: " + splitKey + " which spans multiple shards is not supported"); + } + parentSlice = searchSlices.iterator().next(); + slice = parentSlice.getName(); + log.info("Split by route.key: {}, parent shard is: {} ", splitKey, slice); + } else { + throw new SolrException(ErrorCode.BAD_REQUEST, + "Split by route key can only be used with CompositeIdRouter or subclass. Found router: " + router.getClass().getName()); + } + } else { + parentSlice = clusterState.getSlice(collectionName, slice); + } + if (parentSlice == null) { if(clusterState.getCollections().contains(collectionName)) { throw new SolrException(ErrorCode.BAD_REQUEST, "No shard with the specified name exists: " + slice); @@ -499,8 +527,6 @@ public class OverseerCollectionProcessor implements Runnable, ClosableThread { // find the leader for the shard Replica parentShardLeader = clusterState.getLeader(collectionName, slice); - DocCollection collection = clusterState.getCollection(collectionName); - DocRouter router = collection.getRouter() != null ? collection.getRouter() : DocRouter.DEFAULT; DocRouter.Range range = parentSlice.getRange(); if (range == null) { range = new PlainIdRouter().fullRange(); @@ -527,6 +553,23 @@ public class OverseerCollectionProcessor implements Runnable, ClosableThread { } } } + } else if (splitKey != null) { + if (router instanceof CompositeIdRouter) { + CompositeIdRouter compositeIdRouter = (CompositeIdRouter) router; + subRanges = compositeIdRouter.partitionRangeByKey(splitKey, range); + if (subRanges.size() == 1) { + throw new SolrException(ErrorCode.BAD_REQUEST, + "The split.key: " + splitKey + " has a hash range that is exactly equal to hash range of shard: " + slice); + } + log.info("Partitioning parent shard " + slice + " range: " + parentSlice.getRange() + " yields: " + subRanges); + rangesStr = ""; + for (int i = 0; i < subRanges.size(); i++) { + DocRouter.Range subRange = subRanges.get(i); + rangesStr += subRange.toString(); + if (i < subRanges.size() - 1) + rangesStr += ','; + } + } } else { // todo: fixed to two partitions? subRanges = router.partitionRange(2, range); diff --git a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java index 0214259eacb..c2b5de5f0fb 100644 --- a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java +++ b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java @@ -369,13 +369,31 @@ public class CollectionsHandler extends RequestHandlerBase { log.info("Splitting shard : " + req.getParamString()); String name = req.getParams().required().get("collection"); // TODO : add support for multiple shards - String shard = req.getParams().required().get("shard"); + String shard = req.getParams().get("shard"); String rangesStr = req.getParams().get(CoreAdminParams.RANGES); + String splitKey = req.getParams().get("split.key"); + + if (splitKey == null && shard == null) { + throw new SolrException( SolrException.ErrorCode.BAD_REQUEST, "Missing required parameter: shard"); + } + if (splitKey != null && shard != null) { + throw new SolrException( SolrException.ErrorCode.BAD_REQUEST, + "Only one of 'shard' or 'split.key' should be specified"); + } + if (splitKey != null && rangesStr != null) { + throw new SolrException( SolrException.ErrorCode.BAD_REQUEST, + "Only one of 'ranges' or 'split.key' should be specified"); + } Map props = new HashMap(); props.put(Overseer.QUEUE_OPERATION, OverseerCollectionProcessor.SPLITSHARD); props.put("collection", name); - props.put(ZkStateReader.SHARD_ID_PROP, shard); + if (shard != null) { + props.put(ZkStateReader.SHARD_ID_PROP, shard); + } + if (splitKey != null) { + props.put("split.key", splitKey); + } if (rangesStr != null) { props.put(CoreAdminParams.RANGES, rangesStr); } diff --git a/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyShardSplitTest.java b/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyShardSplitTest.java index 51198088523..c2224098e46 100644 --- a/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyShardSplitTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyShardSplitTest.java @@ -135,7 +135,7 @@ public class ChaosMonkeyShardSplitTest extends ShardSplitTest { killerThread.start(); killCounter.incrementAndGet(); - splitShard(AbstractDistribZkTestBase.DEFAULT_COLLECTION, SHARD1, null); + splitShard(AbstractDistribZkTestBase.DEFAULT_COLLECTION, SHARD1, null, null); log.info("Layout after split: \n"); printLayout(); diff --git a/solr/core/src/test/org/apache/solr/cloud/ShardSplitTest.java b/solr/core/src/test/org/apache/solr/cloud/ShardSplitTest.java index ef0f7ba8d88..a65158d8c4d 100644 --- a/solr/core/src/test/org/apache/solr/cloud/ShardSplitTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/ShardSplitTest.java @@ -28,6 +28,7 @@ import org.apache.solr.client.solrj.request.QueryRequest; import org.apache.solr.client.solrj.response.QueryResponse; import org.apache.solr.common.SolrDocument; import org.apache.solr.common.cloud.ClusterState; +import org.apache.solr.common.cloud.CompositeIdRouter; import org.apache.solr.common.cloud.DocRouter; import org.apache.solr.common.cloud.HashBasedRouter; import org.apache.solr.common.cloud.Replica; @@ -110,6 +111,7 @@ public class ShardSplitTest extends BasicDistributedZkTest { splitByUniqueKeyTest(); splitByRouteFieldTest(); + splitByRouteKeyTest(); // todo can't call waitForThingsToLevelOut because it looks for jettys of all shards // and the new sub-shards don't have any. @@ -176,7 +178,7 @@ public class ShardSplitTest extends BasicDistributedZkTest { try { for (int i = 0; i < 3; i++) { try { - splitShard(AbstractDistribZkTestBase.DEFAULT_COLLECTION, SHARD1, subRanges); + splitShard(AbstractDistribZkTestBase.DEFAULT_COLLECTION, SHARD1, subRanges, null); log.info("Layout after split: \n"); printLayout(); break; @@ -262,7 +264,7 @@ public class ShardSplitTest extends BasicDistributedZkTest { for (int i = 0; i < 3; i++) { try { - splitShard(collectionName, SHARD1, null); + splitShard(collectionName, SHARD1, null, null); break; } catch (HttpSolrServer.RemoteSolrException e) { if (e.code() != 500) { @@ -281,6 +283,96 @@ public class ShardSplitTest extends BasicDistributedZkTest { assertEquals(docCounts[1], collectionClient.query(new SolrQuery("*:*").setParam("shards", "shard1_1")).getResults().getNumFound()); } + private void splitByRouteKeyTest() throws Exception { + log.info("Starting splitByRouteKeyTest"); + String collectionName = "splitByRouteKeyTest"; + int numShards = 4; + int replicationFactor = 2; + int maxShardsPerNode = (((numShards * replicationFactor) / getCommonCloudSolrServer() + .getZkStateReader().getClusterState().getLiveNodes().size())) + 1; + + HashMap> collectionInfos = new HashMap>(); + CloudSolrServer client = null; + try { + client = createCloudClient(null); + Map props = ZkNodeProps.makeMap( + REPLICATION_FACTOR, replicationFactor, + MAX_SHARDS_PER_NODE, maxShardsPerNode, + NUM_SLICES, numShards); + + createCollection(collectionInfos, collectionName,props,client); + } finally { + if (client != null) client.shutdown(); + } + + List list = collectionInfos.get(collectionName); + checkForCollection(collectionName, list, null); + + waitForRecoveriesToFinish(false); + + String url = CustomCollectionTest.getUrlFromZk(getCommonCloudSolrServer().getZkStateReader().getClusterState(), collectionName); + + HttpSolrServer collectionClient = new HttpSolrServer(url); + + String splitKey = "b!"; + + ClusterState clusterState = cloudClient.getZkStateReader().getClusterState(); + final DocRouter router = clusterState.getCollection(collectionName).getRouter(); + Slice shard1 = clusterState.getSlice(collectionName, SHARD1); + DocRouter.Range shard1Range = shard1.getRange() != null ? shard1.getRange() : router.fullRange(); + final List ranges = ((CompositeIdRouter) router).partitionRangeByKey(splitKey, shard1Range); + final int[] docCounts = new int[ranges.size()]; + + int uniqIdentifier = (1<<12); + int splitKeyDocCount = 0; + for (int i = 100; i <= 200; i++) { + String shardKey = "" + (char)('a' + (i % 26)); // See comment in ShardRoutingTest for hash distribution + + String idStr = shardKey + "!" + i; + collectionClient.add(getDoc(id, idStr, "n_ti", (shardKey + "!").equals(splitKey) ? uniqIdentifier : i)); + int idx = getHashRangeIdx(router, ranges, idStr); + if (idx != -1) { + docCounts[idx]++; + } + if (splitKey.equals(shardKey + "!")) + splitKeyDocCount++; + } + + for (int i = 0; i < docCounts.length; i++) { + int docCount = docCounts[i]; + log.info("Shard {} docCount = {}", "shard1_" + i, docCount); + } + log.info("Route key doc count = {}", splitKeyDocCount); + + collectionClient.commit(); + + for (int i = 0; i < 3; i++) { + try { + splitShard(collectionName, null, null, splitKey); + break; + } catch (HttpSolrServer.RemoteSolrException e) { + if (e.code() != 500) { + throw e; + } + log.error("SPLITSHARD failed. " + (i < 2 ? " Retring split" : ""), e); + if (i == 2) { + fail("SPLITSHARD was not successful even after three tries"); + } + } + } + + waitForRecoveriesToFinish(collectionName, false); + SolrQuery solrQuery = new SolrQuery("*:*"); + assertEquals("DocCount on shard1_0 does not match", docCounts[0], collectionClient.query(solrQuery.setParam("shards", "shard1_0")).getResults().getNumFound()); + assertEquals("DocCount on shard1_1 does not match", docCounts[1], collectionClient.query(solrQuery.setParam("shards", "shard1_1")).getResults().getNumFound()); + assertEquals("DocCount on shard1_2 does not match", docCounts[2], collectionClient.query(solrQuery.setParam("shards", "shard1_2")).getResults().getNumFound()); + + solrQuery = new SolrQuery("n_ti:" + uniqIdentifier); + assertEquals("shard1_0 must have 0 docs for route key: " + splitKey, 0, collectionClient.query(solrQuery.setParam("shards", "shard1_0")).getResults().getNumFound()); + assertEquals("Wrong number of docs on shard1_1 for route key: " + splitKey, splitKeyDocCount, collectionClient.query(solrQuery.setParam("shards", "shard1_1")).getResults().getNumFound()); + assertEquals("shard1_2 must have 0 docs for route key: " + splitKey, 0, collectionClient.query(solrQuery.setParam("shards", "shard1_2")).getResults().getNumFound()); + } + protected void checkDocCountsAndShardStates(int[] docCounts, int numReplicas) throws Exception { ClusterState clusterState = null; Slice slice1_0 = null, slice1_1 = null; @@ -351,11 +443,13 @@ public class ShardSplitTest extends BasicDistributedZkTest { } } - protected void splitShard(String collection, String shardId, List subRanges) throws SolrServerException, IOException { + protected void splitShard(String collection, String shardId, List subRanges, String splitKey) throws SolrServerException, IOException { ModifiableSolrParams params = new ModifiableSolrParams(); params.set("action", CollectionParams.CollectionAction.SPLITSHARD.toString()); params.set("collection", collection); - params.set("shard", shardId); + if (shardId != null) { + params.set("shard", shardId); + } if (subRanges != null) { StringBuilder ranges = new StringBuilder(); for (int i = 0; i < subRanges.size(); i++) { @@ -366,6 +460,9 @@ public class ShardSplitTest extends BasicDistributedZkTest { } params.set("ranges", ranges.toString()); } + if (splitKey != null) { + params.set("split.key", splitKey); + } SolrRequest request = new QueryRequest(params); request.setPath("/admin/collections"); diff --git a/solr/core/src/test/org/apache/solr/update/SolrIndexSplitterTest.java b/solr/core/src/test/org/apache/solr/update/SolrIndexSplitterTest.java index e379485a0c2..b1b26dc0f2c 100644 --- a/solr/core/src/test/org/apache/solr/update/SolrIndexSplitterTest.java +++ b/solr/core/src/test/org/apache/solr/update/SolrIndexSplitterTest.java @@ -273,6 +273,12 @@ public class SolrIndexSplitterTest extends SolrTestCaseJ4 { bytes = id2.getBytes("UTF-8"); int maxHash = Hash.murmurhash3_x86_32(bytes, 0, bytes.length, 0); + if (minHash > maxHash) { + int temp = maxHash; + maxHash = minHash; + minHash = temp; + } + PlainIdRouter router = new PlainIdRouter(); DocRouter.Range fullRange = new DocRouter.Range(minHash, maxHash); return router.partitionRange(2, fullRange); diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/CompositeIdRouter.java b/solr/solrj/src/java/org/apache/solr/common/cloud/CompositeIdRouter.java index 7e73cd818a3..3f3b9af9edf 100644 --- a/solr/solrj/src/java/org/apache/solr/common/cloud/CompositeIdRouter.java +++ b/solr/solrj/src/java/org/apache/solr/common/cloud/CompositeIdRouter.java @@ -93,6 +93,31 @@ public class CompositeIdRouter extends HashBasedRouter { return (hash1 & m1) | (hash2 & m2); } + public Range keyHashRange(String routeKey) { + int idx = routeKey.indexOf(separator); + if (idx < 0) { + throw new IllegalArgumentException("Route key must be a composite id"); + } + String part1 = routeKey.substring(0, idx); + int commaIdx = part1.indexOf(bitsSeparator); + int m1 = mask1; + int m2 = mask2; + + if (commaIdx > 0) { + int firstBits = getBits(part1, commaIdx); + if (firstBits >= 0) { + m1 = firstBits==0 ? 0 : (-1 << (32-firstBits)); + m2 = firstBits==32 ? 0 : (-1 >>> firstBits); + part1 = part1.substring(0, commaIdx); + } + } + + int hash = Hash.murmurhash3_x86_32(part1, 0, part1.length(), 0); + int min = hash & m1; + int max = min | m2; + return new Range(min, max); + } + @Override public Collection getSearchSlicesSingle(String shardKey, SolrParams params, DocCollection collection) { if (shardKey == null) { @@ -149,6 +174,27 @@ public class CompositeIdRouter extends HashBasedRouter { return targetSlices; } + public List partitionRangeByKey(String key, Range range) { + List result = new ArrayList(3); + Range keyRange = keyHashRange(key); + if (!keyRange.overlaps(range)) { + throw new IllegalArgumentException("Key range does not overlap given range"); + } + if (keyRange.equals(range)) { + return Collections.singletonList(keyRange); + } else if (keyRange.isSubsetOf(range)) { + result.add(new Range(range.min, keyRange.min - 1)); + result.add(keyRange); + result.add((new Range(keyRange.max + 1, range.max))); + } else if (range.includes(keyRange.max)) { + result.add(new Range(range.min, keyRange.max)); + result.add(new Range(keyRange.max + 1, range.max)); + } else { + result.add(new Range(range.min, keyRange.min - 1)); + result.add(new Range(keyRange.min, range.max)); + } + return result; + } @Override public List partitionRange(int partitions, Range range) {