diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt index ad029d5501a..4df2096f442 100644 --- a/solr/CHANGES.txt +++ b/solr/CHANGES.txt @@ -95,8 +95,8 @@ New Features of two fields. (Gus Heck) * SOLR-13399: SPLITSHARD implements a new splitByPrefix option that takes into account the actual document distribution - when using compositeIds. The id prefix should be indexed into the "id_prefix" field for this feature to work. - (yonik) + when using compositeIds. Document distribution is calculated using the "id_prefix" field (if it exists) containing + just the compositeId prefixes, or directly from the indexed "id" field otherwise. (yonik) * SOLR-13565: Node level runtime libs loaded from remote urls (noble) diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/SplitShardCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/SplitShardCmd.java index 4d623be366f..6c5921edbe0 100644 --- a/solr/core/src/java/org/apache/solr/cloud/api/collections/SplitShardCmd.java +++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/SplitShardCmd.java @@ -212,16 +212,14 @@ public class SplitShardCmd implements OverseerCollectionMessageHandler.Cmd { if (message.getBool(CommonAdminParams.SPLIT_BY_PREFIX, true)) { t = timings.sub("getRanges"); - log.info("Requesting split ranges from replica " + parentShardLeader.getName() + " as part of slice " + slice + " of collection " - + collectionName + " on " + parentShardLeader); - ModifiableSolrParams params = new ModifiableSolrParams(); params.set(CoreAdminParams.ACTION, CoreAdminParams.CoreAdminAction.SPLIT.toString()); params.set(CoreAdminParams.GET_RANGES, "true"); params.set(CommonAdminParams.SPLIT_METHOD, splitMethod.toLower()); params.set(CoreAdminParams.CORE, parentShardLeader.getStr("core")); - int numSubShards = message.getInt(NUM_SUB_SHARDS, DEFAULT_NUM_SUB_SHARDS); - params.set(NUM_SUB_SHARDS, Integer.toString(numSubShards)); + // Only 2 is currently supported + // int numSubShards = message.getInt(NUM_SUB_SHARDS, DEFAULT_NUM_SUB_SHARDS); + // params.set(NUM_SUB_SHARDS, Integer.toString(numSubShards)); { final ShardRequestTracker shardRequestTracker = ocmh.asyncRequestTracker(asyncId); @@ -236,7 +234,7 @@ public class SplitShardCmd implements OverseerCollectionMessageHandler.Cmd { NamedList shardRsp = (NamedList)successes.getVal(0); String splits = (String)shardRsp.get(CoreAdminParams.RANGES); if (splits != null) { - log.info("Resulting split range to be used is " + splits); + log.info("Resulting split ranges to be used: " + splits + " slice=" + slice + " leader=" + parentShardLeader); // change the message to use the recommended split ranges message = message.plus(CoreAdminParams.RANGES, splits); } diff --git a/solr/core/src/java/org/apache/solr/handler/admin/SplitOp.java b/solr/core/src/java/org/apache/solr/handler/admin/SplitOp.java index a37708fd800..d280b116303 100644 --- a/solr/core/src/java/org/apache/solr/handler/admin/SplitOp.java +++ b/solr/core/src/java/org/apache/solr/handler/admin/SplitOp.java @@ -187,11 +187,11 @@ class SplitOp implements CoreAdminHandler.CoreAdminOp { } - - - // This is called when splitByPrefix is used. - // The overseer called us to get recommended splits taking into - // account actual document distribution over the hash space. + /** + * This is called when splitByPrefix is used. + * The overseer called us to get recommended splits taking into + * account actual document distribution over the hash space. + */ private void handleGetRanges(CoreAdminHandler.CallInfo it, String coreName) throws Exception { SolrCore parentCore = it.handler.coreContainer.getCore(coreName); @@ -205,7 +205,9 @@ class SplitOp implements CoreAdminHandler.CoreAdminOp { if (!it.handler.coreContainer.isZooKeeperAware()) { throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Shard splitByPrefix requires SolrCloud mode."); } else { - String routeFieldName = "id"; + SolrIndexSearcher searcher = searcherHolder.get(); + + String routeFieldName = null; String prefixField = "id_prefix"; ClusterState clusterState = it.handler.coreContainer.getZkController().getClusterState(); @@ -221,8 +223,19 @@ class SplitOp implements CoreAdminHandler.CoreAdminOp { Map routerProps = (Map) routerObj; routeFieldName = (String) routerProps.get("field"); } + if (routeFieldName == null) { + routeFieldName = searcher.getSchema().getUniqueKeyField().getName(); + } + + Collection counts = getHashHistogram(searcher, prefixField, router, collection); + + if (counts.size() == 0) { + // How to determine if we should look at the id field to figure out the prefix buckets? + // There may legitimately be no indexed terms in id_prefix if no ids have a prefix yet. + // For now, avoid using splitByPrefix unless you are actually using prefixes. + counts = getHashHistogramFromId(searcher, searcher.getSchema().getUniqueKeyField().getName(), router, collection); + } - Collection counts = getHashHistogram(searcherHolder.get(), prefixField, router, collection); Collection splits = getSplits(counts, currentRange); String splitString = toSplitString(splits); @@ -290,7 +303,9 @@ class SplitOp implements CoreAdminHandler.CoreAdminOp { } - // Returns a list of range counts sorted by the range lower bound + /* + * Returns a list of range counts sorted by the range lower bound + */ static Collection getHashHistogram(SolrIndexSearcher searcher, String prefixField, DocRouter router, DocCollection collection) throws IOException { RTimer timer = new RTimer(); TreeMap counts = new TreeMap<>(); @@ -306,9 +321,8 @@ class SplitOp implements CoreAdminHandler.CoreAdminOp { long sumBuckets = 0; TermsEnum termsEnum = terms.iterator(); - for (;;) { - BytesRef term = termsEnum.next(); - if (term == null) break; + BytesRef term; + while ((term = termsEnum.next()) != null) { numPrefixes++; String termStr = term.utf8ToString(); @@ -340,8 +354,102 @@ class SplitOp implements CoreAdminHandler.CoreAdminOp { return counts.values(); } + /** + * Returns a list of range counts sorted by the range lower bound, using the indexed "id" field (i.e. the terms are full IDs, not just prefixes) + */ + static Collection getHashHistogramFromId(SolrIndexSearcher searcher, String idField, DocRouter router, DocCollection collection) throws IOException { + RTimer timer = new RTimer(); - // returns the list of recommended splits, or null if there is not enough information + TreeMap counts = new TreeMap<>(); + + Terms terms = MultiTerms.getTerms(searcher.getIndexReader(), idField); + if (terms == null) { + return counts.values(); + } + + int numPrefixes = 0; + int numCollisions = 0; + long sumBuckets = 0; + + + byte sep = (byte) CompositeIdRouter.SEPARATOR.charAt(0); + TermsEnum termsEnum = terms.iterator(); + BytesRef currPrefix = new BytesRef(); // prefix of the previous "id" term + int bucketCount = 0; // count of the number of docs in the current bucket + + // We're going to iterate over all terms, so do the minimum amount of work per term. + // Terms are sorted, so all terms sharing a prefix will be grouped together. The extra work + // is really just limited to stepping over all the terms in the id field. + for (;;) { + BytesRef term = termsEnum.next(); + + // compare to current prefix bucket and see if this new term shares the same prefix + if (term != null && term.length >= currPrefix.length && currPrefix.length > 0) { + int i = 0; + for (; i < currPrefix.length; i++) { + if (currPrefix.bytes[i] != term.bytes[term.offset + i]) { + break; + } + } + + if (i == currPrefix.length) { + // prefix was the same (common-case fast path) + // int count = termsEnum.docFreq(); + bucketCount++; // use 1 since we are dealing with unique ids + continue; + } + } + + // At this point the prefix did not match, so if we had a bucket we were working on, record it. + if (currPrefix.length > 0) { + numPrefixes++; + sumBuckets += bucketCount; + String currPrefixStr = currPrefix.utf8ToString(); + DocRouter.Range range = router.getSearchRangeSingle(currPrefixStr, null, collection); + + RangeCount rangeCount = new RangeCount(range, bucketCount); + bucketCount = 0; + + RangeCount prev = counts.put(rangeCount.range, rangeCount); + if (prev != null) { + // we hit a hash collision, so add the buckets together. + rangeCount.count += prev.count; + numCollisions++; + } + } + + // if the current term is null, we ran out of values + if (term == null) break; + + // find the new prefix (if any) + + // resize if needed + if (currPrefix.length < term.length) { + currPrefix.bytes = new byte[term.length+10]; + } + + // Copy the bytes up to and including the separator, and set the length if the separator is found. + // If there was no separator, then length remains 0 and it's the indicator that we have no prefix bucket + currPrefix.length = 0; + for (int i=0; i getSplits(Collection rawCounts, DocRouter.Range currentRange) throws Exception { int totalCount = 0; RangeCount biggest = null; // keep track of the largest in case we need to split it out into it's own shard @@ -371,6 +479,9 @@ class SplitOp implements CoreAdminHandler.CoreAdminOp { if (counts.size() == 1) { // We have a single range, so we should split it. + // Currently, we only split a prefix/bucket when we have just one, but this could be changed/controlled + // in the future via a allowedSizeDifference parameter (i.e. if just separating prefix buckets results in + // too large of an imbalanced, allow splitting within a prefix) // It may already be a partial range, so figure that out int lower = Math.max(last.range.min, currentRange.min); @@ -392,6 +503,7 @@ class SplitOp implements CoreAdminHandler.CoreAdminOp { // We have at least two ranges, so we want to partition the ranges // and avoid splitting any individual range. + // The "middle" bucket we are going to find will be included with the lower range and excluded from the upper range. int targetCount = totalCount / 2; RangeCount middle = null; @@ -413,10 +525,10 @@ class SplitOp implements CoreAdminHandler.CoreAdminOp { middle = prev; } - // if the middle range turned out to be the last one, pick the one before it instead - if (middle == last) { - middle = prev; - } + // The middle should never be the last, since that means that we won't actually do a split. + // Minimising the error (above) should already ensure this never happens. + assert middle != last; + // Make sure to include the shard's current range in the new ranges so we don't create useless empty shards. DocRouter.Range lowerRange = new DocRouter.Range(currentRange.min, middle.range.max); diff --git a/solr/core/src/test/org/apache/solr/cloud/api/collections/SplitByPrefixTest.java b/solr/core/src/test/org/apache/solr/cloud/api/collections/SplitByPrefixTest.java index f3ef230174f..ca2aefc5b46 100644 --- a/solr/core/src/test/org/apache/solr/cloud/api/collections/SplitByPrefixTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/api/collections/SplitByPrefixTest.java @@ -53,8 +53,12 @@ public class SplitByPrefixTest extends SolrCloudTestCase { public static void setupCluster() throws Exception { System.setProperty("managed.schema.mutable", "true"); // needed by cloud-managed config set + // clould-managed has the copyField from ID to id_prefix + // cloud-minimal does not and thus histogram should be driven from the "id" field directly + String configSetName = random().nextBoolean() ? "cloud-minimal" : "cloud-managed"; + configureCluster(1) - .addConfig("conf", configset("cloud-managed")) // cloud-managed has the id copyfield to id_prefix + .addConfig("conf", configset(configSetName)) // cloud-managed has the id copyfield to id_prefix .configure(); } @@ -71,9 +75,9 @@ public class SplitByPrefixTest extends SolrCloudTestCase { cluster.deleteAllCollections(); } - static class Prefix implements Comparable { - String key; - DocRouter.Range range; + public static class Prefix implements Comparable { + public String key; + public DocRouter.Range range; @Override public int compareTo(Prefix o) { @@ -87,7 +91,7 @@ public class SplitByPrefixTest extends SolrCloudTestCase { } // find prefixes (shard keys) matching certain criteria - public List findPrefixes(int numToFind, int lowerBound, int upperBound) { + public static List findPrefixes(int numToFind, int lowerBound, int upperBound) { CompositeIdRouter router = new CompositeIdRouter(); ArrayList prefixes = new ArrayList<>(); @@ -112,7 +116,7 @@ public class SplitByPrefixTest extends SolrCloudTestCase { } // remove duplicate prefixes from the sorted prefix list - List removeDups(List prefixes) { + public static List removeDups(List prefixes) { ArrayList result = new ArrayList<>(); Prefix last = null; for (Prefix prefix : prefixes) { @@ -198,7 +202,7 @@ public class SplitByPrefixTest extends SolrCloudTestCase { // - // now lets add enough documents to the first prefix to get it split out on it's own + // now lets add enough documents to the first prefix to get it split out on its own // for (int i=0; i results, DocRouter.Range currentRange) { if (results == null) return; @@ -215,4 +226,63 @@ public class SplitHandlerTest extends SolrTestCaseJ4 { verifyContiguous(results, curr); } + @Test + public void testHistoramBuilding() throws Exception { + List prefixes = SplitByPrefixTest.findPrefixes(20, 0, 0x00ffffff); + List uniquePrefixes = SplitByPrefixTest.removeDups(prefixes); + assertTrue(prefixes.size() > uniquePrefixes.size()); // make sure we have some duplicates to test hash collisions + + String prefixField = "id_prefix_s"; + String idField = "id"; + DocRouter router = new CompositeIdRouter(); + + + for (int i=0; i<100; i++) { + SolrQueryRequest req = req("myquery"); + try { + // the first time through the loop we do this before adding docs to test an empty index + Collection counts1 = SplitOp.getHashHistogram(req.getSearcher(), prefixField, router, null); + Collection counts2 = SplitOp.getHashHistogramFromId(req.getSearcher(), idField, router, null); + assertTrue(eqCount(counts1, counts2)); + + if (i>0) { + assertTrue(counts1.size() > 0); // make sure we are testing something + } + + + // index a few random documents + int ndocs = random().nextInt(10) + 1; + for (int j=0; j a, Collection b) { + Iterator it1 = a.iterator(); + Iterator it2 = b.iterator(); + while (it1.hasNext()) { + SplitOp.RangeCount r1 = it1.next(); + SplitOp.RangeCount r2 = it2.next(); + if (!r1.range.equals(r2.range) || r1.count != r2.count) { + return false; + } + } + return true; + } + } diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/CompositeIdRouter.java b/solr/solrj/src/java/org/apache/solr/common/cloud/CompositeIdRouter.java index 974b9eb74dd..d700464c690 100644 --- a/solr/solrj/src/java/org/apache/solr/common/cloud/CompositeIdRouter.java +++ b/solr/solrj/src/java/org/apache/solr/common/cloud/CompositeIdRouter.java @@ -79,15 +79,14 @@ public class CompositeIdRouter extends HashBasedRouter { // search across whole range return fullRange(); } - String id = shardKey; if (shardKey.indexOf(SEPARATOR) < 0) { // shardKey is a simple id, so don't do a range - int hash = Hash.murmurhash3_x86_32(id, 0, id.length(), 0); + int hash = Hash.murmurhash3_x86_32(shardKey, 0, shardKey.length(), 0); return new Range(hash, hash); } - return new KeyParser(id).getRange(); + return new KeyParser(shardKey).getRange(); } @Override