From 22ca695ef5cef722b9f962c89c6da52c9b3883b0 Mon Sep 17 00:00:00 2001 From: Carlos Delgado <6339205+carlosdelest@users.noreply.github.com> Date: Wed, 17 Jul 2024 13:54:19 +0200 Subject: [PATCH] Add target search concurrency to TieredMergePolicy (#13430) --- lucene/CHANGES.txt | 6 +- .../lucene/index/TieredMergePolicy.java | 80 +++++++++++++++---- .../lucene/index/TestTieredMergePolicy.java | 39 +++++++-- .../lucene/tests/util/LuceneTestCase.java | 6 ++ 4 files changed, 110 insertions(+), 21 deletions(-) diff --git a/lucene/CHANGES.txt b/lucene/CHANGES.txt index 14a6d0eb773..f361fdcf002 100644 --- a/lucene/CHANGES.txt +++ b/lucene/CHANGES.txt @@ -254,7 +254,11 @@ API Changes New Features --------------------- -(No changes) + +* GITHUB#13430: Allow configuring the search concurrency via + TieredMergePolicy#setTargetSearchConcurrency. This in-turn instructs the + merge policy to try to have at least this number of segments on the highest + tier. (Adrien Grand, Carlos Delgado) Improvements --------------------- diff --git a/lucene/core/src/java/org/apache/lucene/index/TieredMergePolicy.java b/lucene/core/src/java/org/apache/lucene/index/TieredMergePolicy.java index 208aa297287..702df660c44 100644 --- a/lucene/core/src/java/org/apache/lucene/index/TieredMergePolicy.java +++ b/lucene/core/src/java/org/apache/lucene/index/TieredMergePolicy.java @@ -93,6 +93,7 @@ public class TieredMergePolicy extends MergePolicy { private double segsPerTier = 10.0; private double forceMergeDeletesPctAllowed = 10.0; private double deletesPctAllowed = 20.0; + private int targetSearchConcurrency = 1; /** Sole constructor, setting all settings to their defaults. */ public TieredMergePolicy() { @@ -257,6 +258,26 @@ public class TieredMergePolicy extends MergePolicy { return segsPerTier; } + /** + * Sets the target search concurrency. This prevents creating segments that are bigger than + * maxDoc/targetSearchConcurrency, which in turn makes the work parallelizable into + * targetSearchConcurrency slices of similar doc counts. It also makes merging less aggressive, as + * higher values result in indices that do less merging and have more segments + */ + public TieredMergePolicy setTargetSearchConcurrency(int targetSearchConcurrency) { + if (targetSearchConcurrency < 1) { + throw new IllegalArgumentException( + "targetSearchConcurrency must be >= 1 (got " + targetSearchConcurrency + ")"); + } + this.targetSearchConcurrency = targetSearchConcurrency; + return this; + } + + /** Returns the target search concurrency. */ + public int getTargetSearchConcurrency() { + return targetSearchConcurrency; + } + private static class SegmentSizeAndDocs { private final SegmentCommitInfo segInfo; /// Size of the segment in bytes, pro-rated by the number of live documents. @@ -371,31 +392,40 @@ public class TieredMergePolicy extends MergePolicy { // If we have too-large segments, grace them out of the maximum segment count // If we're above certain thresholds of deleted docs, we can merge very large segments. int tooBigCount = 0; + // We relax merging for the bigger segments for concurrency reasons, as we want to have several + // segments on the highest tier without over-merging on the lower tiers. + int concurrencyCount = 0; iter = sortedInfos.iterator(); + double allowedSegCount = 0; + // remove large segments from consideration under two conditions. // 1> Overall percent deleted docs relatively small and this segment is larger than 50% // maxSegSize // 2> overall percent deleted docs large and this segment is large and has few deleted docs - while (iter.hasNext()) { SegmentSizeAndDocs segSizeDocs = iter.next(); double segDelPct = 100 * (double) segSizeDocs.delCount / (double) segSizeDocs.maxDoc; if (segSizeDocs.sizeInBytes > maxMergedSegmentBytes / 2 && (totalDelPct <= deletesPctAllowed || segDelPct <= deletesPctAllowed)) { iter.remove(); - tooBigCount++; // Just for reporting purposes. + tooBigCount++; totIndexBytes -= segSizeDocs.sizeInBytes; allowedDelCount -= segSizeDocs.delCount; + } else if (concurrencyCount + tooBigCount < targetSearchConcurrency - 1) { + // Make sure we count a whole segment for the first targetSearchConcurrency-1 segments to + // avoid over merging on the lower levels. + concurrencyCount++; + allowedSegCount++; + totIndexBytes -= segSizeDocs.sizeInBytes; } } allowedDelCount = Math.max(0, allowedDelCount); final int mergeFactor = (int) Math.min(maxMergeAtOnce, segsPerTier); - // Compute max allowed segments in the index + // Compute max allowed segments for the remainder of the index long levelSize = Math.max(minSegmentBytes, floorSegmentBytes); long bytesLeft = totIndexBytes; - double allowedSegCount = 0; while (true) { final double segCountLevel = bytesLeft / (double) levelSize; if (segCountLevel < segsPerTier || levelSize == maxMergedSegmentBytes) { @@ -408,7 +438,8 @@ public class TieredMergePolicy extends MergePolicy { } // allowedSegCount may occasionally be less than segsPerTier // if segment sizes are below the floor size - allowedSegCount = Math.max(allowedSegCount, segsPerTier); + allowedSegCount = Math.max(allowedSegCount, Math.max(segsPerTier, targetSearchConcurrency)); + int allowedDocCount = getMaxAllowedDocs(totalMaxDoc, totalDelDocs); if (verbose(mergeContext) && tooBigCount > 0) { message( @@ -419,7 +450,11 @@ public class TieredMergePolicy extends MergePolicy { + " (eligible count=" + sortedInfos.size() + ") tooBigCount= " - + tooBigCount, + + tooBigCount + + " allowedDocCount=" + + allowedDocCount + + " vs doc count=" + + infos.totalMaxDoc(), mergeContext); } return doFindMerges( @@ -428,6 +463,7 @@ public class TieredMergePolicy extends MergePolicy { mergeFactor, (int) allowedSegCount, allowedDelCount, + allowedDocCount, MERGE_TYPE.NATURAL, mergeContext, mergingBytes >= maxMergedSegmentBytes); @@ -439,6 +475,7 @@ public class TieredMergePolicy extends MergePolicy { final int mergeFactor, final int allowedSegCount, final int allowedDelCount, + final int allowedDocCount, final MERGE_TYPE mergeType, MergeContext mergeContext, boolean maxMergeIsRunning) @@ -522,16 +559,23 @@ public class TieredMergePolicy extends MergePolicy { final List candidate = new ArrayList<>(); boolean hitTooLarge = false; long bytesThisMerge = 0; + long docCountThisMerge = 0; for (int idx = startIdx; idx < sortedEligible.size() && candidate.size() < mergeFactor - && bytesThisMerge < maxMergedSegmentBytes; + && bytesThisMerge < maxMergedSegmentBytes + && (bytesThisMerge < floorSegmentBytes || docCountThisMerge <= allowedDocCount); idx++) { final SegmentSizeAndDocs segSizeDocs = sortedEligible.get(idx); final long segBytes = segSizeDocs.sizeInBytes; - - if (totAfterMergeBytes + segBytes > maxMergedSegmentBytes) { - hitTooLarge = true; + int segDocCount = segSizeDocs.maxDoc - segSizeDocs.delCount; + if (totAfterMergeBytes + segBytes > maxMergedSegmentBytes + || (totAfterMergeBytes > floorSegmentBytes + && docCountThisMerge + segDocCount > allowedDocCount)) { + // Only set hitTooLarge when reaching the maximum byte size, as this will create + // segments of the maximum size which will no longer be eligible for merging for a long + // time (until they accumulate enough deletes). + hitTooLarge |= totAfterMergeBytes + segBytes > maxMergedSegmentBytes; if (candidate.size() == 0) { // We should never have something coming in that _cannot_ be merged, so handle // singleton merges @@ -548,6 +592,7 @@ public class TieredMergePolicy extends MergePolicy { } candidate.add(segSizeDocs.segInfo); bytesThisMerge += segBytes; + docCountThisMerge += segDocCount; totAfterMergeBytes += segBytes; } @@ -916,14 +961,13 @@ public class TieredMergePolicy extends MergePolicy { final Set merging = mergeContext.getMergingSegments(); boolean haveWork = false; + int totalDelCount = 0; for (SegmentCommitInfo info : infos) { int delCount = mergeContext.numDeletesToMerge(info); assert assertDelCount(delCount, info); + totalDelCount += delCount; double pctDeletes = 100. * ((double) delCount) / info.info.maxDoc(); - if (pctDeletes > forceMergeDeletesPctAllowed && !merging.contains(info)) { - haveWork = true; - break; - } + haveWork = haveWork || (pctDeletes > forceMergeDeletesPctAllowed && !merging.contains(info)); } if (haveWork == false) { @@ -950,11 +994,16 @@ public class TieredMergePolicy extends MergePolicy { Integer.MAX_VALUE, Integer.MAX_VALUE, 0, + getMaxAllowedDocs(infos.totalMaxDoc(), totalDelCount), MERGE_TYPE.FORCE_MERGE_DELETES, mergeContext, false); } + int getMaxAllowedDocs(int totalMaxDoc, int totalDelDocs) { + return Math.ceilDiv(totalMaxDoc - totalDelDocs, targetSearchConcurrency); + } + private long floorSize(long bytes) { return Math.max(floorSegmentBytes, bytes); } @@ -969,7 +1018,8 @@ public class TieredMergePolicy extends MergePolicy { sb.append("segmentsPerTier=").append(segsPerTier).append(", "); sb.append("maxCFSSegmentSizeMB=").append(getMaxCFSSegmentSizeMB()).append(", "); sb.append("noCFSRatio=").append(noCFSRatio).append(", "); - sb.append("deletesPctAllowed=").append(deletesPctAllowed); + sb.append("deletesPctAllowed=").append(deletesPctAllowed).append(", "); + sb.append("targetSearchConcurrency=").append(targetSearchConcurrency); return sb.toString(); } } diff --git a/lucene/core/src/test/org/apache/lucene/index/TestTieredMergePolicy.java b/lucene/core/src/test/org/apache/lucene/index/TestTieredMergePolicy.java index 5db9b13eb95..eb24d964702 100644 --- a/lucene/core/src/test/org/apache/lucene/index/TestTieredMergePolicy.java +++ b/lucene/core/src/test/org/apache/lucene/index/TestTieredMergePolicy.java @@ -65,6 +65,7 @@ public class TestTieredMergePolicy extends BaseMergePolicyTestCase { segmentSizes.add(weightedByteSize); minSegmentBytes = Math.min(minSegmentBytes, weightedByteSize); } + Collections.sort(segmentSizes); final double delPercentage = 100.0 * totalDelCount / totalMaxDoc; assertTrue( @@ -77,12 +78,26 @@ public class TestTieredMergePolicy extends BaseMergePolicyTestCase { long levelSizeBytes = Math.max(minSegmentBytes, (long) (tmp.getFloorSegmentMB() * 1024 * 1024)); long bytesLeft = totalBytes; double allowedSegCount = 0; + List biggestSegments = segmentSizes; + if (biggestSegments.size() > tmp.getTargetSearchConcurrency() - 1) { + biggestSegments = + biggestSegments.subList( + biggestSegments.size() - tmp.getTargetSearchConcurrency() + 1, + biggestSegments.size()); + } + // Allow whole segments for the targetSearchConcurrency-1 biggest segments + for (long size : biggestSegments) { + bytesLeft -= size; + allowedSegCount++; + } + // below we make the assumption that segments that reached the max segment // size divided by 2 don't need merging anymore int mergeFactor = (int) Math.min(tmp.getSegmentsPerTier(), tmp.getMaxMergeAtOnce()); while (true) { final double segCountLevel = bytesLeft / (double) levelSizeBytes; - if (segCountLevel < tmp.getSegmentsPerTier() || levelSizeBytes >= maxMergedSegmentBytes / 2) { + if (segCountLevel <= tmp.getSegmentsPerTier() + || levelSizeBytes >= maxMergedSegmentBytes / 2) { allowedSegCount += Math.ceil(segCountLevel); break; } @@ -94,7 +109,6 @@ public class TestTieredMergePolicy extends BaseMergePolicyTestCase { // It's ok to be over the allowed segment count if none of the most balanced merges are balanced // enough - Collections.sort(segmentSizes); boolean hasBalancedMerges = false; for (int i = 0; i < segmentSizes.size() - mergeFactor; ++i) { long maxMergeSegmentSize = segmentSizes.get(i + mergeFactor - 1); @@ -111,11 +125,24 @@ public class TestTieredMergePolicy extends BaseMergePolicyTestCase { } } + // There can be more segments if we can't merge docs because they are balanced between segments. + // At least the + // 2 smallest segments should be mergeable. + // should be 2 segments to merge + int maxDocsPerSegment = tmp.getMaxAllowedDocs(infos.totalMaxDoc(), totalDelCount); + List segmentDocs = + infos.asList().stream() + .map(info -> info.info.maxDoc() - info.getDelCount()) + .sorted() + .toList(); + boolean eligibleDocsMerge = + segmentDocs.size() >= 2 && segmentDocs.get(0) + segmentDocs.get(1) < maxDocsPerSegment; + int numSegments = infos.asList().size(); assertTrue( String.format( Locale.ROOT, - "mergeFactor=%d minSegmentBytes=%,d maxMergedSegmentBytes=%,d segmentsPerTier=%g maxMergeAtOnce=%d numSegments=%d allowed=%g totalBytes=%,d delPercentage=%g deletesPctAllowed=%g", + "mergeFactor=%d minSegmentBytes=%,d maxMergedSegmentBytes=%,d segmentsPerTier=%g maxMergeAtOnce=%d numSegments=%d allowed=%g totalBytes=%,d delPercentage=%g deletesPctAllowed=%g targetNumSegments=%d", mergeFactor, minSegmentBytes, maxMergedSegmentBytes, @@ -125,8 +152,9 @@ public class TestTieredMergePolicy extends BaseMergePolicyTestCase { allowedSegCount, totalBytes, delPercentage, - tmp.getDeletesPctAllowed()), - numSegments <= allowedSegCount || hasBalancedMerges == false); + tmp.getDeletesPctAllowed(), + tmp.getTargetSearchConcurrency()), + numSegments <= allowedSegCount || hasBalancedMerges == false || eligibleDocsMerge == false); } @Override @@ -208,6 +236,7 @@ public class TestTieredMergePolicy extends BaseMergePolicyTestCase { int segmentCount = w.getSegmentCount(); int targetCount = TestUtil.nextInt(random(), 1, segmentCount); + if (VERBOSE) { System.out.println( "TEST: merge to " + targetCount + " segs (current count=" + segmentCount + ")"); diff --git a/lucene/test-framework/src/java/org/apache/lucene/tests/util/LuceneTestCase.java b/lucene/test-framework/src/java/org/apache/lucene/tests/util/LuceneTestCase.java index 91942f660f9..0f9cab1eda4 100644 --- a/lucene/test-framework/src/java/org/apache/lucene/tests/util/LuceneTestCase.java +++ b/lucene/test-framework/src/java/org/apache/lucene/tests/util/LuceneTestCase.java @@ -1095,6 +1095,12 @@ public abstract class LuceneTestCase extends Assert { } else { tmp.setSegmentsPerTier(TestUtil.nextInt(r, 10, 50)); } + if (rarely(r)) { + tmp.setTargetSearchConcurrency(TestUtil.nextInt(r, 10, 50)); + } else { + tmp.setTargetSearchConcurrency(TestUtil.nextInt(r, 2, 20)); + } + configureRandom(r, tmp); tmp.setDeletesPctAllowed(20 + random().nextDouble() * 30); return tmp;