From df7b1f618df28dda67423af3d5b64393cd36eb22 Mon Sep 17 00:00:00 2001 From: Adrien Grand Date: Tue, 3 Dec 2024 17:47:36 +0100 Subject: [PATCH] LUCENE-10073: Reduce merging overhead of NRT by using a greater mergeFactor on tiny segments. (#266) Closes #11111 --- lucene/CHANGES.txt | 7 ++- .../lucene/index/TieredMergePolicy.java | 54 ++++++++++--------- .../lucene/index/TestTieredMergePolicy.java | 47 ++++++++++++++-- 3 files changed, 80 insertions(+), 28 deletions(-) diff --git a/lucene/CHANGES.txt b/lucene/CHANGES.txt index 6738d63b111..f4717e88537 100644 --- a/lucene/CHANGES.txt +++ b/lucene/CHANGES.txt @@ -15,7 +15,8 @@ New Features Improvements --------------------- -(No changes) + +* GITHUB#266: TieredMergePolicy's maxMergeAtOnce default value was changed from 10 to 30. (Adrien Grand) Optimizations --------------------- @@ -56,6 +57,10 @@ Improvements system property to increase the set of Java versions that Panama Vectorization will provide optimized implementations for. (Chris Hegarty) +* GITHUB#266: TieredMergePolicy now allows merging up to maxMergeAtOnce + segments for merges below the floor segment size, even if maxMergeAtOnce is + bigger than segsPerTier. (Adrien Grand) + Optimizations --------------------- diff --git a/lucene/core/src/java/org/apache/lucene/index/TieredMergePolicy.java b/lucene/core/src/java/org/apache/lucene/index/TieredMergePolicy.java index 2fb0c0783a2..678d4e2c4c9 100644 --- a/lucene/core/src/java/org/apache/lucene/index/TieredMergePolicy.java +++ b/lucene/core/src/java/org/apache/lucene/index/TieredMergePolicy.java @@ -85,8 +85,8 @@ public class TieredMergePolicy extends MergePolicy { public static final double DEFAULT_NO_CFS_RATIO = 0.1; // User-specified maxMergeAtOnce. In practice we always take the min of its - // value and segsPerTier to avoid suboptimal merging. - private int maxMergeAtOnce = 10; + // value and segsPerTier for segments above the floor size to avoid suboptimal merging. + private int maxMergeAtOnce = 30; private long maxMergedSegmentBytes = 5 * 1024 * 1024 * 1024L; private long floorSegmentBytes = 2 * 1024 * 1024L; @@ -100,7 +100,13 @@ public class TieredMergePolicy extends MergePolicy { super(DEFAULT_NO_CFS_RATIO, MergePolicy.DEFAULT_MAX_CFS_SEGMENT_SIZE); } - /** Maximum number of segments to be merged at a time during "normal" merging. Default is 10. */ + /** + * Maximum number of segments to be merged at a time during "normal" merging. Default is 30. + * + *

NOTE: Merges above the {@link #setFloorSegmentMB(double) floor segment size} also + * bound the number of merged segments by {@link #setSegmentsPerTier(double) the number of + * segments per tier}. + */ public TieredMergePolicy setMaxMergeAtOnce(int v) { if (v < 2) { throw new IllegalArgumentException("maxMergeAtOnce must be > 1 (got " + v + ")"); @@ -557,46 +563,46 @@ public class TieredMergePolicy extends MergePolicy { for (int startIdx = 0; startIdx < sortedEligible.size(); startIdx++) { - long totAfterMergeBytes = 0; - final List candidate = new ArrayList<>(); boolean hitTooLarge = false; long bytesThisMerge = 0; long docCountThisMerge = 0; for (int idx = startIdx; idx < sortedEligible.size() - && candidate.size() < mergeFactor + && candidate.size() < maxMergeAtOnce + // We allow merging more than mergeFactor segments together if the merged segment + // would be less than the floor segment size. This is important because segments + // below the floor segment size are more aggressively merged by this policy, so we + // need to grow them as quickly as possible. + && (candidate.size() < mergeFactor || bytesThisMerge < floorSegmentBytes) && bytesThisMerge < maxMergedSegmentBytes && (bytesThisMerge < floorSegmentBytes || docCountThisMerge <= allowedDocCount); idx++) { final SegmentSizeAndDocs segSizeDocs = sortedEligible.get(idx); final long segBytes = segSizeDocs.sizeInBytes; int segDocCount = segSizeDocs.maxDoc - segSizeDocs.delCount; - if (totAfterMergeBytes + segBytes > maxMergedSegmentBytes - || (totAfterMergeBytes > floorSegmentBytes + if (bytesThisMerge + segBytes > maxMergedSegmentBytes + || (bytesThisMerge > floorSegmentBytes && docCountThisMerge + segDocCount > allowedDocCount)) { // Only set hitTooLarge when reaching the maximum byte size, as this will create // segments of the maximum size which will no longer be eligible for merging for a long // time (until they accumulate enough deletes). - hitTooLarge |= totAfterMergeBytes + segBytes > maxMergedSegmentBytes; - if (candidate.size() == 0) { - // We should never have something coming in that _cannot_ be merged, so handle - // singleton merges - candidate.add(segSizeDocs.segInfo); - bytesThisMerge += segBytes; + hitTooLarge |= bytesThisMerge + segBytes > maxMergedSegmentBytes; + // We should never have something coming in that _cannot_ be merged, so handle + // singleton merges + if (candidate.size() > 0) { + // NOTE: we continue, so that we can try + // "packing" smaller segments into this merge + // to see if we can get closer to the max + // size; this in general is not perfect since + // this is really "bin packing" and we'd have + // to try different permutations. + continue; } - // NOTE: we continue, so that we can try - // "packing" smaller segments into this merge - // to see if we can get closer to the max - // size; this in general is not perfect since - // this is really "bin packing" and we'd have - // to try different permutations. - continue; } candidate.add(segSizeDocs.segInfo); bytesThisMerge += segBytes; docCountThisMerge += segDocCount; - totAfterMergeBytes += segBytes; } // We should never see an empty candidate: we iterated over maxMergeAtOnce @@ -645,7 +651,7 @@ public class TieredMergePolicy extends MergePolicy { + " tooLarge=" + hitTooLarge + " size=" - + String.format(Locale.ROOT, "%.3f MB", totAfterMergeBytes / 1024. / 1024.), + + String.format(Locale.ROOT, "%.3f MB", bytesThisMerge / 1024. / 1024.), mergeContext); } @@ -654,7 +660,7 @@ public class TieredMergePolicy extends MergePolicy { best = candidate; bestScore = score; bestTooLarge = hitTooLarge; - bestMergeBytes = totAfterMergeBytes; + bestMergeBytes = bytesThisMerge; } } diff --git a/lucene/core/src/test/org/apache/lucene/index/TestTieredMergePolicy.java b/lucene/core/src/test/org/apache/lucene/index/TestTieredMergePolicy.java index a2d678a3ec0..58e33cb7648 100644 --- a/lucene/core/src/test/org/apache/lucene/index/TestTieredMergePolicy.java +++ b/lucene/core/src/test/org/apache/lucene/index/TestTieredMergePolicy.java @@ -163,9 +163,8 @@ public class TestTieredMergePolicy extends BaseMergePolicyTestCase { @Override protected void assertMerge(MergePolicy policy, MergeSpecification merges) { TieredMergePolicy tmp = (TieredMergePolicy) policy; - final int mergeFactor = (int) Math.min(tmp.getMaxMergeAtOnce(), tmp.getSegmentsPerTier()); for (OneMerge merge : merges.merges) { - assertTrue(merge.segments.size() <= mergeFactor); + assertTrue(merge.segments.size() <= tmp.getMaxMergeAtOnce()); } } @@ -943,6 +942,48 @@ public class TestTieredMergePolicy extends BaseMergePolicyTestCase { doTestSimulateUpdates(mergePolicy, numDocs, 2500); } + public void testMergeSizeIsLessThanFloorSize() throws IOException { + MergeContext mergeContext = new MockMergeContext(SegmentCommitInfo::getDelCount); + + SegmentInfos infos = new SegmentInfos(Version.LATEST.major); + // 50 1MB segments + for (int i = 0; i < 50; ++i) { + infos.add(makeSegmentCommitInfo("_0", 1_000_000, 0, 1, IndexWriter.SOURCE_FLUSH)); + } + + TieredMergePolicy mergePolicy = new TieredMergePolicy(); + mergePolicy.setFloorSegmentMB(0.1); + + // Segments are above the floor segment size, we get 4 merges of mergeFactor=10 segments each + MergeSpecification mergeSpec = + mergePolicy.findMerges(MergeTrigger.FULL_FLUSH, infos, mergeContext); + assertNotNull(mergeSpec); + assertEquals(4, mergeSpec.merges.size()); + for (OneMerge oneMerge : mergeSpec.merges) { + assertEquals(mergePolicy.getSegmentsPerTier(), oneMerge.segments.size(), 0d); + } + + // Segments are below the floor segment size and it takes 15 segments to go above the floor + // segment size. We get 3 merges of 15 segments each + mergePolicy.setFloorSegmentMB(15); + mergeSpec = mergePolicy.findMerges(MergeTrigger.FULL_FLUSH, infos, mergeContext); + assertNotNull(mergeSpec); + assertEquals(3, mergeSpec.merges.size()); + for (OneMerge oneMerge : mergeSpec.merges) { + assertEquals(15, oneMerge.segments.size()); + } + + // Segments are below the floor segment size and we'd need to merge more than maxMergeAtOnce + // segments to go above the minimum segment size. We get 1 merge of maxMergeAtOnce=30 segments + // and 1 merge of 50-30=20 segments. + mergePolicy.setFloorSegmentMB(60); + mergeSpec = mergePolicy.findMerges(MergeTrigger.FULL_FLUSH, infos, mergeContext); + assertNotNull(mergeSpec); + assertEquals(2, mergeSpec.merges.size()); + assertEquals(30, mergeSpec.merges.get(0).segments.size()); + assertEquals(20, mergeSpec.merges.get(1).segments.size()); + } + public void testFullFlushMerges() throws IOException { AtomicLong segNameGenerator = new AtomicLong(); IOStats stats = new IOStats(); @@ -951,7 +992,7 @@ public class TestTieredMergePolicy extends BaseMergePolicyTestCase { TieredMergePolicy mp = new TieredMergePolicy(); - for (int i = 0; i < 11; ++i) { + for (int i = 0; i < 31; ++i) { segmentInfos.add( makeSegmentCommitInfo( "_" + segNameGenerator.getAndIncrement(),