SOLR-13399: ability to use id field for compositeId histogram

This commit is contained in:
yonik 2019-08-06 14:09:54 -04:00
parent 742e6b7eff
commit 19ddcfd282
6 changed files with 217 additions and 34 deletions

View File

@ -95,8 +95,8 @@ New Features
of two fields. (Gus Heck) of two fields. (Gus Heck)
* SOLR-13399: SPLITSHARD implements a new splitByPrefix option that takes into account the actual document distribution * SOLR-13399: SPLITSHARD implements a new splitByPrefix option that takes into account the actual document distribution
when using compositeIds. The id prefix should be indexed into the "id_prefix" field for this feature to work. when using compositeIds. Document distribution is calculated using the "id_prefix" field (if it exists) containing
(yonik) just the compositeId prefixes, or directly from the indexed "id" field otherwise. (yonik)
* SOLR-13565: Node level runtime libs loaded from remote urls (noble) * SOLR-13565: Node level runtime libs loaded from remote urls (noble)

View File

@ -212,16 +212,14 @@ public class SplitShardCmd implements OverseerCollectionMessageHandler.Cmd {
if (message.getBool(CommonAdminParams.SPLIT_BY_PREFIX, true)) { if (message.getBool(CommonAdminParams.SPLIT_BY_PREFIX, true)) {
t = timings.sub("getRanges"); t = timings.sub("getRanges");
log.info("Requesting split ranges from replica " + parentShardLeader.getName() + " as part of slice " + slice + " of collection "
+ collectionName + " on " + parentShardLeader);
ModifiableSolrParams params = new ModifiableSolrParams(); ModifiableSolrParams params = new ModifiableSolrParams();
params.set(CoreAdminParams.ACTION, CoreAdminParams.CoreAdminAction.SPLIT.toString()); params.set(CoreAdminParams.ACTION, CoreAdminParams.CoreAdminAction.SPLIT.toString());
params.set(CoreAdminParams.GET_RANGES, "true"); params.set(CoreAdminParams.GET_RANGES, "true");
params.set(CommonAdminParams.SPLIT_METHOD, splitMethod.toLower()); params.set(CommonAdminParams.SPLIT_METHOD, splitMethod.toLower());
params.set(CoreAdminParams.CORE, parentShardLeader.getStr("core")); params.set(CoreAdminParams.CORE, parentShardLeader.getStr("core"));
int numSubShards = message.getInt(NUM_SUB_SHARDS, DEFAULT_NUM_SUB_SHARDS); // Only 2 is currently supported
params.set(NUM_SUB_SHARDS, Integer.toString(numSubShards)); // int numSubShards = message.getInt(NUM_SUB_SHARDS, DEFAULT_NUM_SUB_SHARDS);
// params.set(NUM_SUB_SHARDS, Integer.toString(numSubShards));
{ {
final ShardRequestTracker shardRequestTracker = ocmh.asyncRequestTracker(asyncId); final ShardRequestTracker shardRequestTracker = ocmh.asyncRequestTracker(asyncId);
@ -236,7 +234,7 @@ public class SplitShardCmd implements OverseerCollectionMessageHandler.Cmd {
NamedList shardRsp = (NamedList)successes.getVal(0); NamedList shardRsp = (NamedList)successes.getVal(0);
String splits = (String)shardRsp.get(CoreAdminParams.RANGES); String splits = (String)shardRsp.get(CoreAdminParams.RANGES);
if (splits != null) { if (splits != null) {
log.info("Resulting split range to be used is " + splits); log.info("Resulting split ranges to be used: " + splits + " slice=" + slice + " leader=" + parentShardLeader);
// change the message to use the recommended split ranges // change the message to use the recommended split ranges
message = message.plus(CoreAdminParams.RANGES, splits); message = message.plus(CoreAdminParams.RANGES, splits);
} }

View File

@ -187,11 +187,11 @@ class SplitOp implements CoreAdminHandler.CoreAdminOp {
} }
/**
* This is called when splitByPrefix is used.
// This is called when splitByPrefix is used. * The overseer called us to get recommended splits taking into
// The overseer called us to get recommended splits taking into * account actual document distribution over the hash space.
// account actual document distribution over the hash space. */
private void handleGetRanges(CoreAdminHandler.CallInfo it, String coreName) throws Exception { private void handleGetRanges(CoreAdminHandler.CallInfo it, String coreName) throws Exception {
SolrCore parentCore = it.handler.coreContainer.getCore(coreName); SolrCore parentCore = it.handler.coreContainer.getCore(coreName);
@ -205,7 +205,9 @@ class SplitOp implements CoreAdminHandler.CoreAdminOp {
if (!it.handler.coreContainer.isZooKeeperAware()) { if (!it.handler.coreContainer.isZooKeeperAware()) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Shard splitByPrefix requires SolrCloud mode."); throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Shard splitByPrefix requires SolrCloud mode.");
} else { } else {
String routeFieldName = "id"; SolrIndexSearcher searcher = searcherHolder.get();
String routeFieldName = null;
String prefixField = "id_prefix"; String prefixField = "id_prefix";
ClusterState clusterState = it.handler.coreContainer.getZkController().getClusterState(); ClusterState clusterState = it.handler.coreContainer.getZkController().getClusterState();
@ -221,8 +223,19 @@ class SplitOp implements CoreAdminHandler.CoreAdminOp {
Map routerProps = (Map) routerObj; Map routerProps = (Map) routerObj;
routeFieldName = (String) routerProps.get("field"); routeFieldName = (String) routerProps.get("field");
} }
if (routeFieldName == null) {
routeFieldName = searcher.getSchema().getUniqueKeyField().getName();
}
Collection<RangeCount> counts = getHashHistogram(searcher, prefixField, router, collection);
if (counts.size() == 0) {
// How to determine if we should look at the id field to figure out the prefix buckets?
// There may legitimately be no indexed terms in id_prefix if no ids have a prefix yet.
// For now, avoid using splitByPrefix unless you are actually using prefixes.
counts = getHashHistogramFromId(searcher, searcher.getSchema().getUniqueKeyField().getName(), router, collection);
}
Collection<RangeCount> counts = getHashHistogram(searcherHolder.get(), prefixField, router, collection);
Collection<DocRouter.Range> splits = getSplits(counts, currentRange); Collection<DocRouter.Range> splits = getSplits(counts, currentRange);
String splitString = toSplitString(splits); String splitString = toSplitString(splits);
@ -290,7 +303,9 @@ class SplitOp implements CoreAdminHandler.CoreAdminOp {
} }
// Returns a list of range counts sorted by the range lower bound /*
* Returns a list of range counts sorted by the range lower bound
*/
static Collection<RangeCount> getHashHistogram(SolrIndexSearcher searcher, String prefixField, DocRouter router, DocCollection collection) throws IOException { static Collection<RangeCount> getHashHistogram(SolrIndexSearcher searcher, String prefixField, DocRouter router, DocCollection collection) throws IOException {
RTimer timer = new RTimer(); RTimer timer = new RTimer();
TreeMap<DocRouter.Range,RangeCount> counts = new TreeMap<>(); TreeMap<DocRouter.Range,RangeCount> counts = new TreeMap<>();
@ -306,9 +321,8 @@ class SplitOp implements CoreAdminHandler.CoreAdminOp {
long sumBuckets = 0; long sumBuckets = 0;
TermsEnum termsEnum = terms.iterator(); TermsEnum termsEnum = terms.iterator();
for (;;) { BytesRef term;
BytesRef term = termsEnum.next(); while ((term = termsEnum.next()) != null) {
if (term == null) break;
numPrefixes++; numPrefixes++;
String termStr = term.utf8ToString(); String termStr = term.utf8ToString();
@ -340,8 +354,102 @@ class SplitOp implements CoreAdminHandler.CoreAdminOp {
return counts.values(); return counts.values();
} }
/**
* Returns a list of range counts sorted by the range lower bound, using the indexed "id" field (i.e. the terms are full IDs, not just prefixes)
*/
static Collection<RangeCount> getHashHistogramFromId(SolrIndexSearcher searcher, String idField, DocRouter router, DocCollection collection) throws IOException {
RTimer timer = new RTimer();
// returns the list of recommended splits, or null if there is not enough information TreeMap<DocRouter.Range, RangeCount> counts = new TreeMap<>();
Terms terms = MultiTerms.getTerms(searcher.getIndexReader(), idField);
if (terms == null) {
return counts.values();
}
int numPrefixes = 0;
int numCollisions = 0;
long sumBuckets = 0;
byte sep = (byte) CompositeIdRouter.SEPARATOR.charAt(0);
TermsEnum termsEnum = terms.iterator();
BytesRef currPrefix = new BytesRef(); // prefix of the previous "id" term
int bucketCount = 0; // count of the number of docs in the current bucket
// We're going to iterate over all terms, so do the minimum amount of work per term.
// Terms are sorted, so all terms sharing a prefix will be grouped together. The extra work
// is really just limited to stepping over all the terms in the id field.
for (;;) {
BytesRef term = termsEnum.next();
// compare to current prefix bucket and see if this new term shares the same prefix
if (term != null && term.length >= currPrefix.length && currPrefix.length > 0) {
int i = 0;
for (; i < currPrefix.length; i++) {
if (currPrefix.bytes[i] != term.bytes[term.offset + i]) {
break;
}
}
if (i == currPrefix.length) {
// prefix was the same (common-case fast path)
// int count = termsEnum.docFreq();
bucketCount++; // use 1 since we are dealing with unique ids
continue;
}
}
// At this point the prefix did not match, so if we had a bucket we were working on, record it.
if (currPrefix.length > 0) {
numPrefixes++;
sumBuckets += bucketCount;
String currPrefixStr = currPrefix.utf8ToString();
DocRouter.Range range = router.getSearchRangeSingle(currPrefixStr, null, collection);
RangeCount rangeCount = new RangeCount(range, bucketCount);
bucketCount = 0;
RangeCount prev = counts.put(rangeCount.range, rangeCount);
if (prev != null) {
// we hit a hash collision, so add the buckets together.
rangeCount.count += prev.count;
numCollisions++;
}
}
// if the current term is null, we ran out of values
if (term == null) break;
// find the new prefix (if any)
// resize if needed
if (currPrefix.length < term.length) {
currPrefix.bytes = new byte[term.length+10];
}
// Copy the bytes up to and including the separator, and set the length if the separator is found.
// If there was no separator, then length remains 0 and it's the indicator that we have no prefix bucket
currPrefix.length = 0;
for (int i=0; i<term.length; i++) {
byte b = term.bytes[i + term.offset];
currPrefix.bytes[i] = b;
if (b == sep) {
currPrefix.length = i + 1;
bucketCount++;
break;
}
}
}
log.info("Split histogram from idField {}: ms={}, numBuckets={} sumBuckets={} numPrefixes={}numCollisions={}", idField, timer.getTime(), counts.size(), sumBuckets, numPrefixes, numCollisions);
return counts.values();
}
/*
* Returns the list of recommended splits, or null if there is not enough information
*/
static Collection<DocRouter.Range> getSplits(Collection<RangeCount> rawCounts, DocRouter.Range currentRange) throws Exception { static Collection<DocRouter.Range> getSplits(Collection<RangeCount> rawCounts, DocRouter.Range currentRange) throws Exception {
int totalCount = 0; int totalCount = 0;
RangeCount biggest = null; // keep track of the largest in case we need to split it out into it's own shard RangeCount biggest = null; // keep track of the largest in case we need to split it out into it's own shard
@ -371,6 +479,9 @@ class SplitOp implements CoreAdminHandler.CoreAdminOp {
if (counts.size() == 1) { if (counts.size() == 1) {
// We have a single range, so we should split it. // We have a single range, so we should split it.
// Currently, we only split a prefix/bucket when we have just one, but this could be changed/controlled
// in the future via a allowedSizeDifference parameter (i.e. if just separating prefix buckets results in
// too large of an imbalanced, allow splitting within a prefix)
// It may already be a partial range, so figure that out // It may already be a partial range, so figure that out
int lower = Math.max(last.range.min, currentRange.min); int lower = Math.max(last.range.min, currentRange.min);
@ -392,6 +503,7 @@ class SplitOp implements CoreAdminHandler.CoreAdminOp {
// We have at least two ranges, so we want to partition the ranges // We have at least two ranges, so we want to partition the ranges
// and avoid splitting any individual range. // and avoid splitting any individual range.
// The "middle" bucket we are going to find will be included with the lower range and excluded from the upper range.
int targetCount = totalCount / 2; int targetCount = totalCount / 2;
RangeCount middle = null; RangeCount middle = null;
@ -413,10 +525,10 @@ class SplitOp implements CoreAdminHandler.CoreAdminOp {
middle = prev; middle = prev;
} }
// if the middle range turned out to be the last one, pick the one before it instead // The middle should never be the last, since that means that we won't actually do a split.
if (middle == last) { // Minimising the error (above) should already ensure this never happens.
middle = prev; assert middle != last;
}
// Make sure to include the shard's current range in the new ranges so we don't create useless empty shards. // Make sure to include the shard's current range in the new ranges so we don't create useless empty shards.
DocRouter.Range lowerRange = new DocRouter.Range(currentRange.min, middle.range.max); DocRouter.Range lowerRange = new DocRouter.Range(currentRange.min, middle.range.max);

View File

@ -53,8 +53,12 @@ public class SplitByPrefixTest extends SolrCloudTestCase {
public static void setupCluster() throws Exception { public static void setupCluster() throws Exception {
System.setProperty("managed.schema.mutable", "true"); // needed by cloud-managed config set System.setProperty("managed.schema.mutable", "true"); // needed by cloud-managed config set
// clould-managed has the copyField from ID to id_prefix
// cloud-minimal does not and thus histogram should be driven from the "id" field directly
String configSetName = random().nextBoolean() ? "cloud-minimal" : "cloud-managed";
configureCluster(1) configureCluster(1)
.addConfig("conf", configset("cloud-managed")) // cloud-managed has the id copyfield to id_prefix .addConfig("conf", configset(configSetName)) // cloud-managed has the id copyfield to id_prefix
.configure(); .configure();
} }
@ -71,9 +75,9 @@ public class SplitByPrefixTest extends SolrCloudTestCase {
cluster.deleteAllCollections(); cluster.deleteAllCollections();
} }
static class Prefix implements Comparable<Prefix> { public static class Prefix implements Comparable<Prefix> {
String key; public String key;
DocRouter.Range range; public DocRouter.Range range;
@Override @Override
public int compareTo(Prefix o) { public int compareTo(Prefix o) {
@ -87,7 +91,7 @@ public class SplitByPrefixTest extends SolrCloudTestCase {
} }
// find prefixes (shard keys) matching certain criteria // find prefixes (shard keys) matching certain criteria
public List<Prefix> findPrefixes(int numToFind, int lowerBound, int upperBound) { public static List<Prefix> findPrefixes(int numToFind, int lowerBound, int upperBound) {
CompositeIdRouter router = new CompositeIdRouter(); CompositeIdRouter router = new CompositeIdRouter();
ArrayList<Prefix> prefixes = new ArrayList<>(); ArrayList<Prefix> prefixes = new ArrayList<>();
@ -112,7 +116,7 @@ public class SplitByPrefixTest extends SolrCloudTestCase {
} }
// remove duplicate prefixes from the sorted prefix list // remove duplicate prefixes from the sorted prefix list
List<Prefix> removeDups(List<Prefix> prefixes) { public static List<Prefix> removeDups(List<Prefix> prefixes) {
ArrayList<Prefix> result = new ArrayList<>(); ArrayList<Prefix> result = new ArrayList<>();
Prefix last = null; Prefix last = null;
for (Prefix prefix : prefixes) { for (Prefix prefix : prefixes) {
@ -198,7 +202,7 @@ public class SplitByPrefixTest extends SolrCloudTestCase {
// //
// now lets add enough documents to the first prefix to get it split out on it's own // now lets add enough documents to the first prefix to get it split out on its own
// //
for (int i=0; i<uniquePrefixes.size(); i++) { for (int i=0; i<uniquePrefixes.size(); i++) {
client.add( getDoc(uniquePrefixes.get(0).key, "doc"+(i+100))); client.add( getDoc(uniquePrefixes.get(0).key, "doc"+(i+100)));

View File

@ -18,11 +18,17 @@ package org.apache.solr.handler.admin;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Random; import java.util.Random;
import org.apache.solr.SolrTestCaseJ4; import org.apache.solr.SolrTestCaseJ4;
import org.apache.solr.cloud.api.collections.SplitByPrefixTest;
import org.apache.solr.cloud.api.collections.SplitByPrefixTest.Prefix;
import org.apache.solr.common.cloud.CompositeIdRouter;
import org.apache.solr.common.cloud.DocRouter; import org.apache.solr.common.cloud.DocRouter;
import org.apache.solr.request.SolrQueryRequest;
import org.junit.BeforeClass;
import org.junit.Test; import org.junit.Test;
// test low level splitByPrefix range recommendations. // test low level splitByPrefix range recommendations.
@ -30,6 +36,11 @@ import org.junit.Test;
// See SplitByPrefixTest for cloud level tests of SPLITSHARD that use this by passing getRanges with the SPLIT command // See SplitByPrefixTest for cloud level tests of SPLITSHARD that use this by passing getRanges with the SPLIT command
public class SplitHandlerTest extends SolrTestCaseJ4 { public class SplitHandlerTest extends SolrTestCaseJ4 {
@BeforeClass
public static void beforeTests() throws Exception {
System.setProperty("managed.schema.mutable", "true"); // needed by cloud-managed config set
initCore("solrconfig.xml","schema_latest.xml");
}
void verifyContiguous(Collection<DocRouter.Range> results, DocRouter.Range currentRange) { void verifyContiguous(Collection<DocRouter.Range> results, DocRouter.Range currentRange) {
if (results == null) return; if (results == null) return;
@ -215,4 +226,63 @@ public class SplitHandlerTest extends SolrTestCaseJ4 {
verifyContiguous(results, curr); verifyContiguous(results, curr);
} }
@Test
public void testHistoramBuilding() throws Exception {
List<Prefix> prefixes = SplitByPrefixTest.findPrefixes(20, 0, 0x00ffffff);
List<Prefix> uniquePrefixes = SplitByPrefixTest.removeDups(prefixes);
assertTrue(prefixes.size() > uniquePrefixes.size()); // make sure we have some duplicates to test hash collisions
String prefixField = "id_prefix_s";
String idField = "id";
DocRouter router = new CompositeIdRouter();
for (int i=0; i<100; i++) {
SolrQueryRequest req = req("myquery");
try {
// the first time through the loop we do this before adding docs to test an empty index
Collection<SplitOp.RangeCount> counts1 = SplitOp.getHashHistogram(req.getSearcher(), prefixField, router, null);
Collection<SplitOp.RangeCount> counts2 = SplitOp.getHashHistogramFromId(req.getSearcher(), idField, router, null);
assertTrue(eqCount(counts1, counts2));
if (i>0) {
assertTrue(counts1.size() > 0); // make sure we are testing something
}
// index a few random documents
int ndocs = random().nextInt(10) + 1;
for (int j=0; j<ndocs; j++) {
String prefix = prefixes.get( random().nextInt(prefixes.size()) ).key;
if (random().nextBoolean()) {
prefix = prefix + Integer.toString(random().nextInt(3)) + "!";
}
String id = prefix + "doc" + i + "_" + j;
updateJ(jsonAdd(sdoc(idField, id, prefixField, prefix)), null);
}
assertU(commit());
} finally {
req.close();
}
}
}
private boolean eqCount(Collection<SplitOp.RangeCount> a, Collection<SplitOp.RangeCount> b) {
Iterator<SplitOp.RangeCount> it1 = a.iterator();
Iterator<SplitOp.RangeCount> it2 = b.iterator();
while (it1.hasNext()) {
SplitOp.RangeCount r1 = it1.next();
SplitOp.RangeCount r2 = it2.next();
if (!r1.range.equals(r2.range) || r1.count != r2.count) {
return false;
}
}
return true;
}
} }

View File

@ -79,15 +79,14 @@ public class CompositeIdRouter extends HashBasedRouter {
// search across whole range // search across whole range
return fullRange(); return fullRange();
} }
String id = shardKey;
if (shardKey.indexOf(SEPARATOR) < 0) { if (shardKey.indexOf(SEPARATOR) < 0) {
// shardKey is a simple id, so don't do a range // shardKey is a simple id, so don't do a range
int hash = Hash.murmurhash3_x86_32(id, 0, id.length(), 0); int hash = Hash.murmurhash3_x86_32(shardKey, 0, shardKey.length(), 0);
return new Range(hash, hash); return new Range(hash, hash);
} }
return new KeyParser(id).getRange(); return new KeyParser(shardKey).getRange();
} }
@Override @Override