mirror of https://github.com/apache/lucene.git
Add target search concurrency to TieredMergePolicy (#13430)
This commit is contained in:
parent
99488b2245
commit
22ca695ef5
|
@ -254,7 +254,11 @@ API Changes
|
|||
|
||||
New Features
|
||||
---------------------
|
||||
(No changes)
|
||||
|
||||
* GITHUB#13430: Allow configuring the search concurrency via
|
||||
TieredMergePolicy#setTargetSearchConcurrency. This in-turn instructs the
|
||||
merge policy to try to have at least this number of segments on the highest
|
||||
tier. (Adrien Grand, Carlos Delgado)
|
||||
|
||||
Improvements
|
||||
---------------------
|
||||
|
|
|
@ -93,6 +93,7 @@ public class TieredMergePolicy extends MergePolicy {
|
|||
private double segsPerTier = 10.0;
|
||||
private double forceMergeDeletesPctAllowed = 10.0;
|
||||
private double deletesPctAllowed = 20.0;
|
||||
private int targetSearchConcurrency = 1;
|
||||
|
||||
/** Sole constructor, setting all settings to their defaults. */
|
||||
public TieredMergePolicy() {
|
||||
|
@ -257,6 +258,26 @@ public class TieredMergePolicy extends MergePolicy {
|
|||
return segsPerTier;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the target search concurrency. This prevents creating segments that are bigger than
|
||||
* maxDoc/targetSearchConcurrency, which in turn makes the work parallelizable into
|
||||
* targetSearchConcurrency slices of similar doc counts. It also makes merging less aggressive, as
|
||||
* higher values result in indices that do less merging and have more segments
|
||||
*/
|
||||
public TieredMergePolicy setTargetSearchConcurrency(int targetSearchConcurrency) {
|
||||
if (targetSearchConcurrency < 1) {
|
||||
throw new IllegalArgumentException(
|
||||
"targetSearchConcurrency must be >= 1 (got " + targetSearchConcurrency + ")");
|
||||
}
|
||||
this.targetSearchConcurrency = targetSearchConcurrency;
|
||||
return this;
|
||||
}
|
||||
|
||||
/** Returns the target search concurrency. */
|
||||
public int getTargetSearchConcurrency() {
|
||||
return targetSearchConcurrency;
|
||||
}
|
||||
|
||||
private static class SegmentSizeAndDocs {
|
||||
private final SegmentCommitInfo segInfo;
|
||||
/// Size of the segment in bytes, pro-rated by the number of live documents.
|
||||
|
@ -371,31 +392,40 @@ public class TieredMergePolicy extends MergePolicy {
|
|||
// If we have too-large segments, grace them out of the maximum segment count
|
||||
// If we're above certain thresholds of deleted docs, we can merge very large segments.
|
||||
int tooBigCount = 0;
|
||||
// We relax merging for the bigger segments for concurrency reasons, as we want to have several
|
||||
// segments on the highest tier without over-merging on the lower tiers.
|
||||
int concurrencyCount = 0;
|
||||
iter = sortedInfos.iterator();
|
||||
|
||||
double allowedSegCount = 0;
|
||||
|
||||
// remove large segments from consideration under two conditions.
|
||||
// 1> Overall percent deleted docs relatively small and this segment is larger than 50%
|
||||
// maxSegSize
|
||||
// 2> overall percent deleted docs large and this segment is large and has few deleted docs
|
||||
|
||||
while (iter.hasNext()) {
|
||||
SegmentSizeAndDocs segSizeDocs = iter.next();
|
||||
double segDelPct = 100 * (double) segSizeDocs.delCount / (double) segSizeDocs.maxDoc;
|
||||
if (segSizeDocs.sizeInBytes > maxMergedSegmentBytes / 2
|
||||
&& (totalDelPct <= deletesPctAllowed || segDelPct <= deletesPctAllowed)) {
|
||||
iter.remove();
|
||||
tooBigCount++; // Just for reporting purposes.
|
||||
tooBigCount++;
|
||||
totIndexBytes -= segSizeDocs.sizeInBytes;
|
||||
allowedDelCount -= segSizeDocs.delCount;
|
||||
} else if (concurrencyCount + tooBigCount < targetSearchConcurrency - 1) {
|
||||
// Make sure we count a whole segment for the first targetSearchConcurrency-1 segments to
|
||||
// avoid over merging on the lower levels.
|
||||
concurrencyCount++;
|
||||
allowedSegCount++;
|
||||
totIndexBytes -= segSizeDocs.sizeInBytes;
|
||||
}
|
||||
}
|
||||
allowedDelCount = Math.max(0, allowedDelCount);
|
||||
|
||||
final int mergeFactor = (int) Math.min(maxMergeAtOnce, segsPerTier);
|
||||
// Compute max allowed segments in the index
|
||||
// Compute max allowed segments for the remainder of the index
|
||||
long levelSize = Math.max(minSegmentBytes, floorSegmentBytes);
|
||||
long bytesLeft = totIndexBytes;
|
||||
double allowedSegCount = 0;
|
||||
while (true) {
|
||||
final double segCountLevel = bytesLeft / (double) levelSize;
|
||||
if (segCountLevel < segsPerTier || levelSize == maxMergedSegmentBytes) {
|
||||
|
@ -408,7 +438,8 @@ public class TieredMergePolicy extends MergePolicy {
|
|||
}
|
||||
// allowedSegCount may occasionally be less than segsPerTier
|
||||
// if segment sizes are below the floor size
|
||||
allowedSegCount = Math.max(allowedSegCount, segsPerTier);
|
||||
allowedSegCount = Math.max(allowedSegCount, Math.max(segsPerTier, targetSearchConcurrency));
|
||||
int allowedDocCount = getMaxAllowedDocs(totalMaxDoc, totalDelDocs);
|
||||
|
||||
if (verbose(mergeContext) && tooBigCount > 0) {
|
||||
message(
|
||||
|
@ -419,7 +450,11 @@ public class TieredMergePolicy extends MergePolicy {
|
|||
+ " (eligible count="
|
||||
+ sortedInfos.size()
|
||||
+ ") tooBigCount= "
|
||||
+ tooBigCount,
|
||||
+ tooBigCount
|
||||
+ " allowedDocCount="
|
||||
+ allowedDocCount
|
||||
+ " vs doc count="
|
||||
+ infos.totalMaxDoc(),
|
||||
mergeContext);
|
||||
}
|
||||
return doFindMerges(
|
||||
|
@ -428,6 +463,7 @@ public class TieredMergePolicy extends MergePolicy {
|
|||
mergeFactor,
|
||||
(int) allowedSegCount,
|
||||
allowedDelCount,
|
||||
allowedDocCount,
|
||||
MERGE_TYPE.NATURAL,
|
||||
mergeContext,
|
||||
mergingBytes >= maxMergedSegmentBytes);
|
||||
|
@ -439,6 +475,7 @@ public class TieredMergePolicy extends MergePolicy {
|
|||
final int mergeFactor,
|
||||
final int allowedSegCount,
|
||||
final int allowedDelCount,
|
||||
final int allowedDocCount,
|
||||
final MERGE_TYPE mergeType,
|
||||
MergeContext mergeContext,
|
||||
boolean maxMergeIsRunning)
|
||||
|
@ -522,16 +559,23 @@ public class TieredMergePolicy extends MergePolicy {
|
|||
final List<SegmentCommitInfo> candidate = new ArrayList<>();
|
||||
boolean hitTooLarge = false;
|
||||
long bytesThisMerge = 0;
|
||||
long docCountThisMerge = 0;
|
||||
for (int idx = startIdx;
|
||||
idx < sortedEligible.size()
|
||||
&& candidate.size() < mergeFactor
|
||||
&& bytesThisMerge < maxMergedSegmentBytes;
|
||||
&& bytesThisMerge < maxMergedSegmentBytes
|
||||
&& (bytesThisMerge < floorSegmentBytes || docCountThisMerge <= allowedDocCount);
|
||||
idx++) {
|
||||
final SegmentSizeAndDocs segSizeDocs = sortedEligible.get(idx);
|
||||
final long segBytes = segSizeDocs.sizeInBytes;
|
||||
|
||||
if (totAfterMergeBytes + segBytes > maxMergedSegmentBytes) {
|
||||
hitTooLarge = true;
|
||||
int segDocCount = segSizeDocs.maxDoc - segSizeDocs.delCount;
|
||||
if (totAfterMergeBytes + segBytes > maxMergedSegmentBytes
|
||||
|| (totAfterMergeBytes > floorSegmentBytes
|
||||
&& docCountThisMerge + segDocCount > allowedDocCount)) {
|
||||
// Only set hitTooLarge when reaching the maximum byte size, as this will create
|
||||
// segments of the maximum size which will no longer be eligible for merging for a long
|
||||
// time (until they accumulate enough deletes).
|
||||
hitTooLarge |= totAfterMergeBytes + segBytes > maxMergedSegmentBytes;
|
||||
if (candidate.size() == 0) {
|
||||
// We should never have something coming in that _cannot_ be merged, so handle
|
||||
// singleton merges
|
||||
|
@ -548,6 +592,7 @@ public class TieredMergePolicy extends MergePolicy {
|
|||
}
|
||||
candidate.add(segSizeDocs.segInfo);
|
||||
bytesThisMerge += segBytes;
|
||||
docCountThisMerge += segDocCount;
|
||||
totAfterMergeBytes += segBytes;
|
||||
}
|
||||
|
||||
|
@ -916,14 +961,13 @@ public class TieredMergePolicy extends MergePolicy {
|
|||
final Set<SegmentCommitInfo> merging = mergeContext.getMergingSegments();
|
||||
|
||||
boolean haveWork = false;
|
||||
int totalDelCount = 0;
|
||||
for (SegmentCommitInfo info : infos) {
|
||||
int delCount = mergeContext.numDeletesToMerge(info);
|
||||
assert assertDelCount(delCount, info);
|
||||
totalDelCount += delCount;
|
||||
double pctDeletes = 100. * ((double) delCount) / info.info.maxDoc();
|
||||
if (pctDeletes > forceMergeDeletesPctAllowed && !merging.contains(info)) {
|
||||
haveWork = true;
|
||||
break;
|
||||
}
|
||||
haveWork = haveWork || (pctDeletes > forceMergeDeletesPctAllowed && !merging.contains(info));
|
||||
}
|
||||
|
||||
if (haveWork == false) {
|
||||
|
@ -950,11 +994,16 @@ public class TieredMergePolicy extends MergePolicy {
|
|||
Integer.MAX_VALUE,
|
||||
Integer.MAX_VALUE,
|
||||
0,
|
||||
getMaxAllowedDocs(infos.totalMaxDoc(), totalDelCount),
|
||||
MERGE_TYPE.FORCE_MERGE_DELETES,
|
||||
mergeContext,
|
||||
false);
|
||||
}
|
||||
|
||||
int getMaxAllowedDocs(int totalMaxDoc, int totalDelDocs) {
|
||||
return Math.ceilDiv(totalMaxDoc - totalDelDocs, targetSearchConcurrency);
|
||||
}
|
||||
|
||||
private long floorSize(long bytes) {
|
||||
return Math.max(floorSegmentBytes, bytes);
|
||||
}
|
||||
|
@ -969,7 +1018,8 @@ public class TieredMergePolicy extends MergePolicy {
|
|||
sb.append("segmentsPerTier=").append(segsPerTier).append(", ");
|
||||
sb.append("maxCFSSegmentSizeMB=").append(getMaxCFSSegmentSizeMB()).append(", ");
|
||||
sb.append("noCFSRatio=").append(noCFSRatio).append(", ");
|
||||
sb.append("deletesPctAllowed=").append(deletesPctAllowed);
|
||||
sb.append("deletesPctAllowed=").append(deletesPctAllowed).append(", ");
|
||||
sb.append("targetSearchConcurrency=").append(targetSearchConcurrency);
|
||||
return sb.toString();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -65,6 +65,7 @@ public class TestTieredMergePolicy extends BaseMergePolicyTestCase {
|
|||
segmentSizes.add(weightedByteSize);
|
||||
minSegmentBytes = Math.min(minSegmentBytes, weightedByteSize);
|
||||
}
|
||||
Collections.sort(segmentSizes);
|
||||
|
||||
final double delPercentage = 100.0 * totalDelCount / totalMaxDoc;
|
||||
assertTrue(
|
||||
|
@ -77,12 +78,26 @@ public class TestTieredMergePolicy extends BaseMergePolicyTestCase {
|
|||
long levelSizeBytes = Math.max(minSegmentBytes, (long) (tmp.getFloorSegmentMB() * 1024 * 1024));
|
||||
long bytesLeft = totalBytes;
|
||||
double allowedSegCount = 0;
|
||||
List<Long> biggestSegments = segmentSizes;
|
||||
if (biggestSegments.size() > tmp.getTargetSearchConcurrency() - 1) {
|
||||
biggestSegments =
|
||||
biggestSegments.subList(
|
||||
biggestSegments.size() - tmp.getTargetSearchConcurrency() + 1,
|
||||
biggestSegments.size());
|
||||
}
|
||||
// Allow whole segments for the targetSearchConcurrency-1 biggest segments
|
||||
for (long size : biggestSegments) {
|
||||
bytesLeft -= size;
|
||||
allowedSegCount++;
|
||||
}
|
||||
|
||||
// below we make the assumption that segments that reached the max segment
|
||||
// size divided by 2 don't need merging anymore
|
||||
int mergeFactor = (int) Math.min(tmp.getSegmentsPerTier(), tmp.getMaxMergeAtOnce());
|
||||
while (true) {
|
||||
final double segCountLevel = bytesLeft / (double) levelSizeBytes;
|
||||
if (segCountLevel < tmp.getSegmentsPerTier() || levelSizeBytes >= maxMergedSegmentBytes / 2) {
|
||||
if (segCountLevel <= tmp.getSegmentsPerTier()
|
||||
|| levelSizeBytes >= maxMergedSegmentBytes / 2) {
|
||||
allowedSegCount += Math.ceil(segCountLevel);
|
||||
break;
|
||||
}
|
||||
|
@ -94,7 +109,6 @@ public class TestTieredMergePolicy extends BaseMergePolicyTestCase {
|
|||
|
||||
// It's ok to be over the allowed segment count if none of the most balanced merges are balanced
|
||||
// enough
|
||||
Collections.sort(segmentSizes);
|
||||
boolean hasBalancedMerges = false;
|
||||
for (int i = 0; i < segmentSizes.size() - mergeFactor; ++i) {
|
||||
long maxMergeSegmentSize = segmentSizes.get(i + mergeFactor - 1);
|
||||
|
@ -111,11 +125,24 @@ public class TestTieredMergePolicy extends BaseMergePolicyTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
// There can be more segments if we can't merge docs because they are balanced between segments.
|
||||
// At least the
|
||||
// 2 smallest segments should be mergeable.
|
||||
// should be 2 segments to merge
|
||||
int maxDocsPerSegment = tmp.getMaxAllowedDocs(infos.totalMaxDoc(), totalDelCount);
|
||||
List<Integer> segmentDocs =
|
||||
infos.asList().stream()
|
||||
.map(info -> info.info.maxDoc() - info.getDelCount())
|
||||
.sorted()
|
||||
.toList();
|
||||
boolean eligibleDocsMerge =
|
||||
segmentDocs.size() >= 2 && segmentDocs.get(0) + segmentDocs.get(1) < maxDocsPerSegment;
|
||||
|
||||
int numSegments = infos.asList().size();
|
||||
assertTrue(
|
||||
String.format(
|
||||
Locale.ROOT,
|
||||
"mergeFactor=%d minSegmentBytes=%,d maxMergedSegmentBytes=%,d segmentsPerTier=%g maxMergeAtOnce=%d numSegments=%d allowed=%g totalBytes=%,d delPercentage=%g deletesPctAllowed=%g",
|
||||
"mergeFactor=%d minSegmentBytes=%,d maxMergedSegmentBytes=%,d segmentsPerTier=%g maxMergeAtOnce=%d numSegments=%d allowed=%g totalBytes=%,d delPercentage=%g deletesPctAllowed=%g targetNumSegments=%d",
|
||||
mergeFactor,
|
||||
minSegmentBytes,
|
||||
maxMergedSegmentBytes,
|
||||
|
@ -125,8 +152,9 @@ public class TestTieredMergePolicy extends BaseMergePolicyTestCase {
|
|||
allowedSegCount,
|
||||
totalBytes,
|
||||
delPercentage,
|
||||
tmp.getDeletesPctAllowed()),
|
||||
numSegments <= allowedSegCount || hasBalancedMerges == false);
|
||||
tmp.getDeletesPctAllowed(),
|
||||
tmp.getTargetSearchConcurrency()),
|
||||
numSegments <= allowedSegCount || hasBalancedMerges == false || eligibleDocsMerge == false);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -208,6 +236,7 @@ public class TestTieredMergePolicy extends BaseMergePolicyTestCase {
|
|||
|
||||
int segmentCount = w.getSegmentCount();
|
||||
int targetCount = TestUtil.nextInt(random(), 1, segmentCount);
|
||||
|
||||
if (VERBOSE) {
|
||||
System.out.println(
|
||||
"TEST: merge to " + targetCount + " segs (current count=" + segmentCount + ")");
|
||||
|
|
|
@ -1095,6 +1095,12 @@ public abstract class LuceneTestCase extends Assert {
|
|||
} else {
|
||||
tmp.setSegmentsPerTier(TestUtil.nextInt(r, 10, 50));
|
||||
}
|
||||
if (rarely(r)) {
|
||||
tmp.setTargetSearchConcurrency(TestUtil.nextInt(r, 10, 50));
|
||||
} else {
|
||||
tmp.setTargetSearchConcurrency(TestUtil.nextInt(r, 2, 20));
|
||||
}
|
||||
|
||||
configureRandom(r, tmp);
|
||||
tmp.setDeletesPctAllowed(20 + random().nextDouble() * 30);
|
||||
return tmp;
|
||||
|
|
Loading…
Reference in New Issue