mirror of https://github.com/apache/lucene.git
SOLR-13399: ability to use id field for compositeId histogram
This commit is contained in:
parent
8233981e7f
commit
d8f99a9986
|
@ -45,8 +45,8 @@ New Features
|
|||
of two fields. (Gus Heck)
|
||||
|
||||
* SOLR-13399: SPLITSHARD implements a new splitByPrefix option that takes into account the actual document distribution
|
||||
when using compositeIds. The id prefix should be indexed into the "id_prefix" field for this feature to work.
|
||||
(yonik)
|
||||
when using compositeIds. Document distribution is calculated using the "id_prefix" field (if it exists) containing
|
||||
just the compositeId prefixes, or directly from the indexed "id" field otherwise. (yonik)
|
||||
|
||||
* SOLR-13565: Node level runtime libs loaded from remote urls (noble)
|
||||
|
||||
|
|
|
@ -212,16 +212,14 @@ public class SplitShardCmd implements OverseerCollectionMessageHandler.Cmd {
|
|||
if (message.getBool(CommonAdminParams.SPLIT_BY_PREFIX, true)) {
|
||||
t = timings.sub("getRanges");
|
||||
|
||||
log.info("Requesting split ranges from replica " + parentShardLeader.getName() + " as part of slice " + slice + " of collection "
|
||||
+ collectionName + " on " + parentShardLeader);
|
||||
|
||||
ModifiableSolrParams params = new ModifiableSolrParams();
|
||||
params.set(CoreAdminParams.ACTION, CoreAdminParams.CoreAdminAction.SPLIT.toString());
|
||||
params.set(CoreAdminParams.GET_RANGES, "true");
|
||||
params.set(CommonAdminParams.SPLIT_METHOD, splitMethod.toLower());
|
||||
params.set(CoreAdminParams.CORE, parentShardLeader.getStr("core"));
|
||||
int numSubShards = message.getInt(NUM_SUB_SHARDS, DEFAULT_NUM_SUB_SHARDS);
|
||||
params.set(NUM_SUB_SHARDS, Integer.toString(numSubShards));
|
||||
// Only 2 is currently supported
|
||||
// int numSubShards = message.getInt(NUM_SUB_SHARDS, DEFAULT_NUM_SUB_SHARDS);
|
||||
// params.set(NUM_SUB_SHARDS, Integer.toString(numSubShards));
|
||||
|
||||
{
|
||||
final ShardRequestTracker shardRequestTracker = ocmh.asyncRequestTracker(asyncId);
|
||||
|
@ -236,7 +234,7 @@ public class SplitShardCmd implements OverseerCollectionMessageHandler.Cmd {
|
|||
NamedList shardRsp = (NamedList)successes.getVal(0);
|
||||
String splits = (String)shardRsp.get(CoreAdminParams.RANGES);
|
||||
if (splits != null) {
|
||||
log.info("Resulting split range to be used is " + splits);
|
||||
log.info("Resulting split ranges to be used: " + splits + " slice=" + slice + " leader=" + parentShardLeader);
|
||||
// change the message to use the recommended split ranges
|
||||
message = message.plus(CoreAdminParams.RANGES, splits);
|
||||
}
|
||||
|
|
|
@ -187,11 +187,11 @@ class SplitOp implements CoreAdminHandler.CoreAdminOp {
|
|||
}
|
||||
|
||||
|
||||
|
||||
|
||||
// This is called when splitByPrefix is used.
|
||||
// The overseer called us to get recommended splits taking into
|
||||
// account actual document distribution over the hash space.
|
||||
/**
|
||||
* This is called when splitByPrefix is used.
|
||||
* The overseer called us to get recommended splits taking into
|
||||
* account actual document distribution over the hash space.
|
||||
*/
|
||||
private void handleGetRanges(CoreAdminHandler.CallInfo it, String coreName) throws Exception {
|
||||
|
||||
SolrCore parentCore = it.handler.coreContainer.getCore(coreName);
|
||||
|
@ -205,7 +205,9 @@ class SplitOp implements CoreAdminHandler.CoreAdminOp {
|
|||
if (!it.handler.coreContainer.isZooKeeperAware()) {
|
||||
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Shard splitByPrefix requires SolrCloud mode.");
|
||||
} else {
|
||||
String routeFieldName = "id";
|
||||
SolrIndexSearcher searcher = searcherHolder.get();
|
||||
|
||||
String routeFieldName = null;
|
||||
String prefixField = "id_prefix";
|
||||
|
||||
ClusterState clusterState = it.handler.coreContainer.getZkController().getClusterState();
|
||||
|
@ -221,8 +223,19 @@ class SplitOp implements CoreAdminHandler.CoreAdminOp {
|
|||
Map routerProps = (Map) routerObj;
|
||||
routeFieldName = (String) routerProps.get("field");
|
||||
}
|
||||
if (routeFieldName == null) {
|
||||
routeFieldName = searcher.getSchema().getUniqueKeyField().getName();
|
||||
}
|
||||
|
||||
Collection<RangeCount> counts = getHashHistogram(searcher, prefixField, router, collection);
|
||||
|
||||
if (counts.size() == 0) {
|
||||
// How to determine if we should look at the id field to figure out the prefix buckets?
|
||||
// There may legitimately be no indexed terms in id_prefix if no ids have a prefix yet.
|
||||
// For now, avoid using splitByPrefix unless you are actually using prefixes.
|
||||
counts = getHashHistogramFromId(searcher, searcher.getSchema().getUniqueKeyField().getName(), router, collection);
|
||||
}
|
||||
|
||||
Collection<RangeCount> counts = getHashHistogram(searcherHolder.get(), prefixField, router, collection);
|
||||
Collection<DocRouter.Range> splits = getSplits(counts, currentRange);
|
||||
String splitString = toSplitString(splits);
|
||||
|
||||
|
@ -290,7 +303,9 @@ class SplitOp implements CoreAdminHandler.CoreAdminOp {
|
|||
}
|
||||
|
||||
|
||||
// Returns a list of range counts sorted by the range lower bound
|
||||
/*
|
||||
* Returns a list of range counts sorted by the range lower bound
|
||||
*/
|
||||
static Collection<RangeCount> getHashHistogram(SolrIndexSearcher searcher, String prefixField, DocRouter router, DocCollection collection) throws IOException {
|
||||
RTimer timer = new RTimer();
|
||||
TreeMap<DocRouter.Range,RangeCount> counts = new TreeMap<>();
|
||||
|
@ -306,9 +321,8 @@ class SplitOp implements CoreAdminHandler.CoreAdminOp {
|
|||
long sumBuckets = 0;
|
||||
|
||||
TermsEnum termsEnum = terms.iterator();
|
||||
for (;;) {
|
||||
BytesRef term = termsEnum.next();
|
||||
if (term == null) break;
|
||||
BytesRef term;
|
||||
while ((term = termsEnum.next()) != null) {
|
||||
numPrefixes++;
|
||||
|
||||
String termStr = term.utf8ToString();
|
||||
|
@ -340,8 +354,102 @@ class SplitOp implements CoreAdminHandler.CoreAdminOp {
|
|||
return counts.values();
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a list of range counts sorted by the range lower bound, using the indexed "id" field (i.e. the terms are full IDs, not just prefixes)
|
||||
*/
|
||||
static Collection<RangeCount> getHashHistogramFromId(SolrIndexSearcher searcher, String idField, DocRouter router, DocCollection collection) throws IOException {
|
||||
RTimer timer = new RTimer();
|
||||
|
||||
// returns the list of recommended splits, or null if there is not enough information
|
||||
TreeMap<DocRouter.Range, RangeCount> counts = new TreeMap<>();
|
||||
|
||||
Terms terms = MultiTerms.getTerms(searcher.getIndexReader(), idField);
|
||||
if (terms == null) {
|
||||
return counts.values();
|
||||
}
|
||||
|
||||
int numPrefixes = 0;
|
||||
int numCollisions = 0;
|
||||
long sumBuckets = 0;
|
||||
|
||||
|
||||
byte sep = (byte) CompositeIdRouter.SEPARATOR.charAt(0);
|
||||
TermsEnum termsEnum = terms.iterator();
|
||||
BytesRef currPrefix = new BytesRef(); // prefix of the previous "id" term
|
||||
int bucketCount = 0; // count of the number of docs in the current bucket
|
||||
|
||||
// We're going to iterate over all terms, so do the minimum amount of work per term.
|
||||
// Terms are sorted, so all terms sharing a prefix will be grouped together. The extra work
|
||||
// is really just limited to stepping over all the terms in the id field.
|
||||
for (;;) {
|
||||
BytesRef term = termsEnum.next();
|
||||
|
||||
// compare to current prefix bucket and see if this new term shares the same prefix
|
||||
if (term != null && term.length >= currPrefix.length && currPrefix.length > 0) {
|
||||
int i = 0;
|
||||
for (; i < currPrefix.length; i++) {
|
||||
if (currPrefix.bytes[i] != term.bytes[term.offset + i]) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (i == currPrefix.length) {
|
||||
// prefix was the same (common-case fast path)
|
||||
// int count = termsEnum.docFreq();
|
||||
bucketCount++; // use 1 since we are dealing with unique ids
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
// At this point the prefix did not match, so if we had a bucket we were working on, record it.
|
||||
if (currPrefix.length > 0) {
|
||||
numPrefixes++;
|
||||
sumBuckets += bucketCount;
|
||||
String currPrefixStr = currPrefix.utf8ToString();
|
||||
DocRouter.Range range = router.getSearchRangeSingle(currPrefixStr, null, collection);
|
||||
|
||||
RangeCount rangeCount = new RangeCount(range, bucketCount);
|
||||
bucketCount = 0;
|
||||
|
||||
RangeCount prev = counts.put(rangeCount.range, rangeCount);
|
||||
if (prev != null) {
|
||||
// we hit a hash collision, so add the buckets together.
|
||||
rangeCount.count += prev.count;
|
||||
numCollisions++;
|
||||
}
|
||||
}
|
||||
|
||||
// if the current term is null, we ran out of values
|
||||
if (term == null) break;
|
||||
|
||||
// find the new prefix (if any)
|
||||
|
||||
// resize if needed
|
||||
if (currPrefix.length < term.length) {
|
||||
currPrefix.bytes = new byte[term.length+10];
|
||||
}
|
||||
|
||||
// Copy the bytes up to and including the separator, and set the length if the separator is found.
|
||||
// If there was no separator, then length remains 0 and it's the indicator that we have no prefix bucket
|
||||
currPrefix.length = 0;
|
||||
for (int i=0; i<term.length; i++) {
|
||||
byte b = term.bytes[i + term.offset];
|
||||
currPrefix.bytes[i] = b;
|
||||
if (b == sep) {
|
||||
currPrefix.length = i + 1;
|
||||
bucketCount++;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
log.info("Split histogram from idField {}: ms={}, numBuckets={} sumBuckets={} numPrefixes={}numCollisions={}", idField, timer.getTime(), counts.size(), sumBuckets, numPrefixes, numCollisions);
|
||||
|
||||
return counts.values();
|
||||
}
|
||||
|
||||
/*
|
||||
* Returns the list of recommended splits, or null if there is not enough information
|
||||
*/
|
||||
static Collection<DocRouter.Range> getSplits(Collection<RangeCount> rawCounts, DocRouter.Range currentRange) throws Exception {
|
||||
int totalCount = 0;
|
||||
RangeCount biggest = null; // keep track of the largest in case we need to split it out into it's own shard
|
||||
|
@ -371,6 +479,9 @@ class SplitOp implements CoreAdminHandler.CoreAdminOp {
|
|||
|
||||
if (counts.size() == 1) {
|
||||
// We have a single range, so we should split it.
|
||||
// Currently, we only split a prefix/bucket when we have just one, but this could be changed/controlled
|
||||
// in the future via a allowedSizeDifference parameter (i.e. if just separating prefix buckets results in
|
||||
// too large of an imbalanced, allow splitting within a prefix)
|
||||
|
||||
// It may already be a partial range, so figure that out
|
||||
int lower = Math.max(last.range.min, currentRange.min);
|
||||
|
@ -392,6 +503,7 @@ class SplitOp implements CoreAdminHandler.CoreAdminOp {
|
|||
|
||||
// We have at least two ranges, so we want to partition the ranges
|
||||
// and avoid splitting any individual range.
|
||||
// The "middle" bucket we are going to find will be included with the lower range and excluded from the upper range.
|
||||
|
||||
int targetCount = totalCount / 2;
|
||||
RangeCount middle = null;
|
||||
|
@ -413,10 +525,10 @@ class SplitOp implements CoreAdminHandler.CoreAdminOp {
|
|||
middle = prev;
|
||||
}
|
||||
|
||||
// if the middle range turned out to be the last one, pick the one before it instead
|
||||
if (middle == last) {
|
||||
middle = prev;
|
||||
}
|
||||
// The middle should never be the last, since that means that we won't actually do a split.
|
||||
// Minimising the error (above) should already ensure this never happens.
|
||||
assert middle != last;
|
||||
|
||||
|
||||
// Make sure to include the shard's current range in the new ranges so we don't create useless empty shards.
|
||||
DocRouter.Range lowerRange = new DocRouter.Range(currentRange.min, middle.range.max);
|
||||
|
|
|
@ -53,8 +53,12 @@ public class SplitByPrefixTest extends SolrCloudTestCase {
|
|||
public static void setupCluster() throws Exception {
|
||||
System.setProperty("managed.schema.mutable", "true"); // needed by cloud-managed config set
|
||||
|
||||
// clould-managed has the copyField from ID to id_prefix
|
||||
// cloud-minimal does not and thus histogram should be driven from the "id" field directly
|
||||
String configSetName = random().nextBoolean() ? "cloud-minimal" : "cloud-managed";
|
||||
|
||||
configureCluster(1)
|
||||
.addConfig("conf", configset("cloud-managed")) // cloud-managed has the id copyfield to id_prefix
|
||||
.addConfig("conf", configset(configSetName)) // cloud-managed has the id copyfield to id_prefix
|
||||
.configure();
|
||||
}
|
||||
|
||||
|
@ -71,9 +75,9 @@ public class SplitByPrefixTest extends SolrCloudTestCase {
|
|||
cluster.deleteAllCollections();
|
||||
}
|
||||
|
||||
static class Prefix implements Comparable<Prefix> {
|
||||
String key;
|
||||
DocRouter.Range range;
|
||||
public static class Prefix implements Comparable<Prefix> {
|
||||
public String key;
|
||||
public DocRouter.Range range;
|
||||
|
||||
@Override
|
||||
public int compareTo(Prefix o) {
|
||||
|
@ -87,7 +91,7 @@ public class SplitByPrefixTest extends SolrCloudTestCase {
|
|||
}
|
||||
|
||||
// find prefixes (shard keys) matching certain criteria
|
||||
public List<Prefix> findPrefixes(int numToFind, int lowerBound, int upperBound) {
|
||||
public static List<Prefix> findPrefixes(int numToFind, int lowerBound, int upperBound) {
|
||||
CompositeIdRouter router = new CompositeIdRouter();
|
||||
|
||||
ArrayList<Prefix> prefixes = new ArrayList<>();
|
||||
|
@ -112,7 +116,7 @@ public class SplitByPrefixTest extends SolrCloudTestCase {
|
|||
}
|
||||
|
||||
// remove duplicate prefixes from the sorted prefix list
|
||||
List<Prefix> removeDups(List<Prefix> prefixes) {
|
||||
public static List<Prefix> removeDups(List<Prefix> prefixes) {
|
||||
ArrayList<Prefix> result = new ArrayList<>();
|
||||
Prefix last = null;
|
||||
for (Prefix prefix : prefixes) {
|
||||
|
@ -198,7 +202,7 @@ public class SplitByPrefixTest extends SolrCloudTestCase {
|
|||
|
||||
|
||||
//
|
||||
// now lets add enough documents to the first prefix to get it split out on it's own
|
||||
// now lets add enough documents to the first prefix to get it split out on its own
|
||||
//
|
||||
for (int i=0; i<uniquePrefixes.size(); i++) {
|
||||
client.add( getDoc(uniquePrefixes.get(0).key, "doc"+(i+100)));
|
||||
|
|
|
@ -18,11 +18,17 @@ package org.apache.solr.handler.admin;
|
|||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Random;
|
||||
|
||||
import org.apache.solr.SolrTestCaseJ4;
|
||||
import org.apache.solr.cloud.api.collections.SplitByPrefixTest;
|
||||
import org.apache.solr.cloud.api.collections.SplitByPrefixTest.Prefix;
|
||||
import org.apache.solr.common.cloud.CompositeIdRouter;
|
||||
import org.apache.solr.common.cloud.DocRouter;
|
||||
import org.apache.solr.request.SolrQueryRequest;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
|
||||
// test low level splitByPrefix range recommendations.
|
||||
|
@ -30,6 +36,11 @@ import org.junit.Test;
|
|||
// See SplitByPrefixTest for cloud level tests of SPLITSHARD that use this by passing getRanges with the SPLIT command
|
||||
public class SplitHandlerTest extends SolrTestCaseJ4 {
|
||||
|
||||
@BeforeClass
|
||||
public static void beforeTests() throws Exception {
|
||||
System.setProperty("managed.schema.mutable", "true"); // needed by cloud-managed config set
|
||||
initCore("solrconfig.xml","schema_latest.xml");
|
||||
}
|
||||
|
||||
void verifyContiguous(Collection<DocRouter.Range> results, DocRouter.Range currentRange) {
|
||||
if (results == null) return;
|
||||
|
@ -215,4 +226,63 @@ public class SplitHandlerTest extends SolrTestCaseJ4 {
|
|||
verifyContiguous(results, curr);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testHistoramBuilding() throws Exception {
|
||||
List<Prefix> prefixes = SplitByPrefixTest.findPrefixes(20, 0, 0x00ffffff);
|
||||
List<Prefix> uniquePrefixes = SplitByPrefixTest.removeDups(prefixes);
|
||||
assertTrue(prefixes.size() > uniquePrefixes.size()); // make sure we have some duplicates to test hash collisions
|
||||
|
||||
String prefixField = "id_prefix_s";
|
||||
String idField = "id";
|
||||
DocRouter router = new CompositeIdRouter();
|
||||
|
||||
|
||||
for (int i=0; i<100; i++) {
|
||||
SolrQueryRequest req = req("myquery");
|
||||
try {
|
||||
// the first time through the loop we do this before adding docs to test an empty index
|
||||
Collection<SplitOp.RangeCount> counts1 = SplitOp.getHashHistogram(req.getSearcher(), prefixField, router, null);
|
||||
Collection<SplitOp.RangeCount> counts2 = SplitOp.getHashHistogramFromId(req.getSearcher(), idField, router, null);
|
||||
assertTrue(eqCount(counts1, counts2));
|
||||
|
||||
if (i>0) {
|
||||
assertTrue(counts1.size() > 0); // make sure we are testing something
|
||||
}
|
||||
|
||||
|
||||
// index a few random documents
|
||||
int ndocs = random().nextInt(10) + 1;
|
||||
for (int j=0; j<ndocs; j++) {
|
||||
String prefix = prefixes.get( random().nextInt(prefixes.size()) ).key;
|
||||
if (random().nextBoolean()) {
|
||||
prefix = prefix + Integer.toString(random().nextInt(3)) + "!";
|
||||
}
|
||||
String id = prefix + "doc" + i + "_" + j;
|
||||
updateJ(jsonAdd(sdoc(idField, id, prefixField, prefix)), null);
|
||||
}
|
||||
|
||||
assertU(commit());
|
||||
|
||||
|
||||
} finally {
|
||||
req.close();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private boolean eqCount(Collection<SplitOp.RangeCount> a, Collection<SplitOp.RangeCount> b) {
|
||||
Iterator<SplitOp.RangeCount> it1 = a.iterator();
|
||||
Iterator<SplitOp.RangeCount> it2 = b.iterator();
|
||||
while (it1.hasNext()) {
|
||||
SplitOp.RangeCount r1 = it1.next();
|
||||
SplitOp.RangeCount r2 = it2.next();
|
||||
if (!r1.range.equals(r2.range) || r1.count != r2.count) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -79,15 +79,14 @@ public class CompositeIdRouter extends HashBasedRouter {
|
|||
// search across whole range
|
||||
return fullRange();
|
||||
}
|
||||
String id = shardKey;
|
||||
|
||||
if (shardKey.indexOf(SEPARATOR) < 0) {
|
||||
// shardKey is a simple id, so don't do a range
|
||||
int hash = Hash.murmurhash3_x86_32(id, 0, id.length(), 0);
|
||||
int hash = Hash.murmurhash3_x86_32(shardKey, 0, shardKey.length(), 0);
|
||||
return new Range(hash, hash);
|
||||
}
|
||||
|
||||
return new KeyParser(id).getRange();
|
||||
return new KeyParser(shardKey).getRange();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
Loading…
Reference in New Issue