LUCENE-10073: Reduce merging overhead of NRT by using a greater mergeFactor on tiny segments. (#266)

Closes #11111
This commit is contained in:
Adrien Grand 2024-12-03 17:47:36 +01:00
parent 290847a80e
commit 02db61ffb3
3 changed files with 77 additions and 25 deletions

View File

@ -30,6 +30,10 @@ Improvements
system property to increase the set of Java versions that Panama Vectorization will system property to increase the set of Java versions that Panama Vectorization will
provide optimized implementations for. (Chris Hegarty) provide optimized implementations for. (Chris Hegarty)
* GITHUB#266: TieredMergePolicy now allows merging up to maxMergeAtOnce
segments for merges below the floor segment size, even if maxMergeAtOnce is
bigger than segsPerTier. (Adrien Grand)
Optimizations Optimizations
--------------------- ---------------------

View File

@ -85,7 +85,7 @@ public class TieredMergePolicy extends MergePolicy {
public static final double DEFAULT_NO_CFS_RATIO = 0.1; public static final double DEFAULT_NO_CFS_RATIO = 0.1;
// User-specified maxMergeAtOnce. In practice we always take the min of its // User-specified maxMergeAtOnce. In practice we always take the min of its
// value and segsPerTier to avoid suboptimal merging. // value and segsPerTier for segments above the floor size to avoid suboptimal merging.
private int maxMergeAtOnce = 10; private int maxMergeAtOnce = 10;
private long maxMergedSegmentBytes = 5 * 1024 * 1024 * 1024L; private long maxMergedSegmentBytes = 5 * 1024 * 1024 * 1024L;
@ -100,7 +100,13 @@ public class TieredMergePolicy extends MergePolicy {
super(DEFAULT_NO_CFS_RATIO, MergePolicy.DEFAULT_MAX_CFS_SEGMENT_SIZE); super(DEFAULT_NO_CFS_RATIO, MergePolicy.DEFAULT_MAX_CFS_SEGMENT_SIZE);
} }
/** Maximum number of segments to be merged at a time during "normal" merging. Default is 10. */ /**
* Maximum number of segments to be merged at a time during "normal" merging. Default is 10.
*
* <p><b>NOTE</b>: Merges above the {@link #setFloorSegmentMB(double) floor segment size} also
* bound the number of merged segments by {@link #setSegmentsPerTier(double) the number of
* segments per tier}.
*/
public TieredMergePolicy setMaxMergeAtOnce(int v) { public TieredMergePolicy setMaxMergeAtOnce(int v) {
if (v < 2) { if (v < 2) {
throw new IllegalArgumentException("maxMergeAtOnce must be > 1 (got " + v + ")"); throw new IllegalArgumentException("maxMergeAtOnce must be > 1 (got " + v + ")");
@ -557,46 +563,46 @@ public class TieredMergePolicy extends MergePolicy {
for (int startIdx = 0; startIdx < sortedEligible.size(); startIdx++) { for (int startIdx = 0; startIdx < sortedEligible.size(); startIdx++) {
long totAfterMergeBytes = 0;
final List<SegmentCommitInfo> candidate = new ArrayList<>(); final List<SegmentCommitInfo> candidate = new ArrayList<>();
boolean hitTooLarge = false; boolean hitTooLarge = false;
long bytesThisMerge = 0; long bytesThisMerge = 0;
long docCountThisMerge = 0; long docCountThisMerge = 0;
for (int idx = startIdx; for (int idx = startIdx;
idx < sortedEligible.size() idx < sortedEligible.size()
&& candidate.size() < mergeFactor && candidate.size() < maxMergeAtOnce
// We allow merging more than mergeFactor segments together if the merged segment
// would be less than the floor segment size. This is important because segments
// below the floor segment size are more aggressively merged by this policy, so we
// need to grow them as quickly as possible.
&& (candidate.size() < mergeFactor || bytesThisMerge < floorSegmentBytes)
&& bytesThisMerge < maxMergedSegmentBytes && bytesThisMerge < maxMergedSegmentBytes
&& (bytesThisMerge < floorSegmentBytes || docCountThisMerge <= allowedDocCount); && (bytesThisMerge < floorSegmentBytes || docCountThisMerge <= allowedDocCount);
idx++) { idx++) {
final SegmentSizeAndDocs segSizeDocs = sortedEligible.get(idx); final SegmentSizeAndDocs segSizeDocs = sortedEligible.get(idx);
final long segBytes = segSizeDocs.sizeInBytes; final long segBytes = segSizeDocs.sizeInBytes;
int segDocCount = segSizeDocs.maxDoc - segSizeDocs.delCount; int segDocCount = segSizeDocs.maxDoc - segSizeDocs.delCount;
if (totAfterMergeBytes + segBytes > maxMergedSegmentBytes if (bytesThisMerge + segBytes > maxMergedSegmentBytes
|| (totAfterMergeBytes > floorSegmentBytes || (bytesThisMerge > floorSegmentBytes
&& docCountThisMerge + segDocCount > allowedDocCount)) { && docCountThisMerge + segDocCount > allowedDocCount)) {
// Only set hitTooLarge when reaching the maximum byte size, as this will create // Only set hitTooLarge when reaching the maximum byte size, as this will create
// segments of the maximum size which will no longer be eligible for merging for a long // segments of the maximum size which will no longer be eligible for merging for a long
// time (until they accumulate enough deletes). // time (until they accumulate enough deletes).
hitTooLarge |= totAfterMergeBytes + segBytes > maxMergedSegmentBytes; hitTooLarge |= bytesThisMerge + segBytes > maxMergedSegmentBytes;
if (candidate.size() == 0) { // We should never have something coming in that _cannot_ be merged, so handle
// We should never have something coming in that _cannot_ be merged, so handle // singleton merges
// singleton merges if (candidate.size() > 0) {
candidate.add(segSizeDocs.segInfo); // NOTE: we continue, so that we can try
bytesThisMerge += segBytes; // "packing" smaller segments into this merge
// to see if we can get closer to the max
// size; this in general is not perfect since
// this is really "bin packing" and we'd have
// to try different permutations.
continue;
} }
// NOTE: we continue, so that we can try
// "packing" smaller segments into this merge
// to see if we can get closer to the max
// size; this in general is not perfect since
// this is really "bin packing" and we'd have
// to try different permutations.
continue;
} }
candidate.add(segSizeDocs.segInfo); candidate.add(segSizeDocs.segInfo);
bytesThisMerge += segBytes; bytesThisMerge += segBytes;
docCountThisMerge += segDocCount; docCountThisMerge += segDocCount;
totAfterMergeBytes += segBytes;
} }
// We should never see an empty candidate: we iterated over maxMergeAtOnce // We should never see an empty candidate: we iterated over maxMergeAtOnce
@ -645,7 +651,7 @@ public class TieredMergePolicy extends MergePolicy {
+ " tooLarge=" + " tooLarge="
+ hitTooLarge + hitTooLarge
+ " size=" + " size="
+ String.format(Locale.ROOT, "%.3f MB", totAfterMergeBytes / 1024. / 1024.), + String.format(Locale.ROOT, "%.3f MB", bytesThisMerge / 1024. / 1024.),
mergeContext); mergeContext);
} }
@ -654,7 +660,7 @@ public class TieredMergePolicy extends MergePolicy {
best = candidate; best = candidate;
bestScore = score; bestScore = score;
bestTooLarge = hitTooLarge; bestTooLarge = hitTooLarge;
bestMergeBytes = totAfterMergeBytes; bestMergeBytes = bytesThisMerge;
} }
} }

View File

@ -163,9 +163,8 @@ public class TestTieredMergePolicy extends BaseMergePolicyTestCase {
@Override @Override
protected void assertMerge(MergePolicy policy, MergeSpecification merges) { protected void assertMerge(MergePolicy policy, MergeSpecification merges) {
TieredMergePolicy tmp = (TieredMergePolicy) policy; TieredMergePolicy tmp = (TieredMergePolicy) policy;
final int mergeFactor = (int) Math.min(tmp.getMaxMergeAtOnce(), tmp.getSegmentsPerTier());
for (OneMerge merge : merges.merges) { for (OneMerge merge : merges.merges) {
assertTrue(merge.segments.size() <= mergeFactor); assertTrue(merge.segments.size() <= tmp.getMaxMergeAtOnce());
} }
} }
@ -943,6 +942,49 @@ public class TestTieredMergePolicy extends BaseMergePolicyTestCase {
doTestSimulateUpdates(mergePolicy, numDocs, 2500); doTestSimulateUpdates(mergePolicy, numDocs, 2500);
} }
public void testMergeSizeIsLessThanFloorSize() throws IOException {
MergeContext mergeContext = new MockMergeContext(SegmentCommitInfo::getDelCount);
SegmentInfos infos = new SegmentInfos(Version.LATEST.major);
// 50 1MB segments
for (int i = 0; i < 50; ++i) {
infos.add(makeSegmentCommitInfo("_0", 1_000_000, 0, 1, IndexWriter.SOURCE_FLUSH));
}
TieredMergePolicy mergePolicy = new TieredMergePolicy();
mergePolicy.setMaxMergeAtOnce(30);
mergePolicy.setFloorSegmentMB(0.1);
// Segments are above the floor segment size, we get 4 merges of mergeFactor=10 segments each
MergeSpecification mergeSpec =
mergePolicy.findMerges(MergeTrigger.FULL_FLUSH, infos, mergeContext);
assertNotNull(mergeSpec);
assertEquals(4, mergeSpec.merges.size());
for (OneMerge oneMerge : mergeSpec.merges) {
assertEquals(mergePolicy.getSegmentsPerTier(), oneMerge.segments.size(), 0d);
}
// Segments are below the floor segment size and it takes 15 segments to go above the floor
// segment size. We get 3 merges of 15 segments each
mergePolicy.setFloorSegmentMB(15);
mergeSpec = mergePolicy.findMerges(MergeTrigger.FULL_FLUSH, infos, mergeContext);
assertNotNull(mergeSpec);
assertEquals(3, mergeSpec.merges.size());
for (OneMerge oneMerge : mergeSpec.merges) {
assertEquals(15, oneMerge.segments.size());
}
// Segments are below the floor segment size and we'd need to merge more than maxMergeAtOnce
// segments to go above the minimum segment size. We get 1 merge of maxMergeAtOnce=30 segments
// and 1 merge of 50-30=20 segments.
mergePolicy.setFloorSegmentMB(60);
mergeSpec = mergePolicy.findMerges(MergeTrigger.FULL_FLUSH, infos, mergeContext);
assertNotNull(mergeSpec);
assertEquals(2, mergeSpec.merges.size());
assertEquals(30, mergeSpec.merges.get(0).segments.size());
assertEquals(20, mergeSpec.merges.get(1).segments.size());
}
public void testFullFlushMerges() throws IOException { public void testFullFlushMerges() throws IOException {
AtomicLong segNameGenerator = new AtomicLong(); AtomicLong segNameGenerator = new AtomicLong();
IOStats stats = new IOStats(); IOStats stats = new IOStats();