LUCENE-10599: Improve LogMergePolicy's handling of maxMergeSize. (#935)

With this change, segments are more likely to be considered for merging until
they reach the max merge size. Before this change, LogMergePolicy would exclude
an entire window of `mergeFactor` segments from merging if this window had a
too large segment and other segments were on the same tier.
This commit is contained in:
Adrien Grand 2022-06-09 17:43:19 +02:00 committed by GitHub
parent 54c67db10d
commit b1eddec821
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 83 additions and 17 deletions

View File

@ -44,7 +44,9 @@ Optimizations
Bug Fixes Bug Fixes
--------------------- ---------------------
(No changes)
* LUCENE-10599: LogMergePolicy is more likely to keep merging segments until
they reach the maximum merge size. (Adrien Grand)
Other Other
--------------------- ---------------------

View File

@ -579,23 +579,41 @@ public abstract class LogMergePolicy extends MergePolicy {
// Finally, record all merges that are viable at this level: // Finally, record all merges that are viable at this level:
int end = start + mergeFactor; int end = start + mergeFactor;
while (end <= 1 + upto) { while (end <= 1 + upto) {
boolean anyTooLarge = false;
boolean anyMerging = false; boolean anyMerging = false;
long mergeSize = 0;
long mergeDocs = 0;
for (int i = start; i < end; i++) { for (int i = start; i < end; i++) {
final SegmentInfoAndLevel segLevel = levels.get(i); final SegmentInfoAndLevel segLevel = levels.get(i);
final SegmentCommitInfo info = segLevel.info; final SegmentCommitInfo info = segLevel.info;
anyTooLarge |=
(size(info, mergeContext) >= maxMergeSize
|| sizeDocs(info, mergeContext) >= maxMergeDocs);
if (mergingSegments.contains(info)) { if (mergingSegments.contains(info)) {
anyMerging = true; anyMerging = true;
break; break;
} }
long segmentSize = size(info, mergeContext);
long segmentDocs = sizeDocs(info, mergeContext);
if (mergeSize + segmentSize > maxMergeSize || mergeDocs + segmentDocs > maxMergeDocs) {
// This merge is full, stop adding more segments to it
if (i == start) {
// This segment alone is too large, return a singleton merge
if (verbose(mergeContext)) {
message(
" " + i + " is larger than the max merge size/docs; ignoring", mergeContext);
}
end = i + 1;
} else {
// Previous segments are under the max merge size, return them
end = i;
}
break;
}
mergeSize += segmentSize;
mergeDocs += segmentDocs;
} }
if (anyMerging) { if (anyMerging || end - start <= 1) {
// skip // skip: there is an ongoing merge at the current level or the computed merge has a single
} else if (!anyTooLarge) { // segment and this merge policy doesn't do singleton merges
} else {
if (spec == null) { if (spec == null) {
spec = new MergeSpecification(); spec = new MergeSpecification();
} }
@ -615,14 +633,6 @@ public abstract class LogMergePolicy extends MergePolicy {
mergeContext); mergeContext);
} }
spec.add(new OneMerge(mergeInfos)); spec.add(new OneMerge(mergeInfos));
} else if (verbose(mergeContext)) {
message(
" "
+ start
+ " to "
+ end
+ ": contains segment over maxMergeSize or maxMergeDocs; skipping",
mergeContext);
} }
start = end; start = end;

View File

@ -51,7 +51,7 @@ public class TestLogMergePolicy extends BaseMergePolicyTestCase {
protected void assertMerge(MergePolicy policy, MergeSpecification merge) throws IOException { protected void assertMerge(MergePolicy policy, MergeSpecification merge) throws IOException {
LogMergePolicy lmp = (LogMergePolicy) policy; LogMergePolicy lmp = (LogMergePolicy) policy;
for (OneMerge oneMerge : merge.merges) { for (OneMerge oneMerge : merge.merges) {
assertEquals(lmp.getMergeFactor(), oneMerge.segments.size()); assertTrue(oneMerge.segments.size() <= lmp.getMergeFactor());
} }
} }
@ -188,6 +188,60 @@ public class TestLogMergePolicy extends BaseMergePolicyTestCase {
assertEquals(10, segmentInfos.info(1).info.maxDoc()); assertEquals(10, segmentInfos.info(1).info.maxDoc());
} }
public void testPackLargeSegments() throws IOException {
LogDocMergePolicy mergePolicy = new LogDocMergePolicy();
IOStats stats = new IOStats();
mergePolicy.setMaxMergeDocs(10_000);
AtomicLong segNameGenerator = new AtomicLong();
MergeContext mergeContext = new MockMergeContext(SegmentCommitInfo::getDelCount);
SegmentInfos segmentInfos = new SegmentInfos(Version.LATEST.major);
// 10 segments below the max segment size, but larger than maxMergeSize/mergeFactor
for (int i = 0; i < 10; ++i) {
segmentInfos.add(
makeSegmentCommitInfo(
"_" + segNameGenerator.getAndIncrement(), 3_000, 0, 0, IndexWriter.SOURCE_MERGE));
}
MergeSpecification spec =
mergePolicy.findMerges(MergeTrigger.EXPLICIT, segmentInfos, mergeContext);
assertNotNull(spec);
for (OneMerge oneMerge : spec.merges) {
segmentInfos =
applyMerge(segmentInfos, oneMerge, "_" + segNameGenerator.getAndIncrement(), stats);
}
// LogMP packed 3 3k segments together
assertEquals(9_000, segmentInfos.info(0).info.maxDoc());
}
public void testIgnoreLargeSegments() throws IOException {
LogDocMergePolicy mergePolicy = new LogDocMergePolicy();
IOStats stats = new IOStats();
mergePolicy.setMaxMergeDocs(10_000);
AtomicLong segNameGenerator = new AtomicLong();
MergeContext mergeContext = new MockMergeContext(SegmentCommitInfo::getDelCount);
SegmentInfos segmentInfos = new SegmentInfos(Version.LATEST.major);
// 1 segment that reached the maximum segment size
segmentInfos.add(
makeSegmentCommitInfo(
"_" + segNameGenerator.getAndIncrement(), 11_000, 0, 0, IndexWriter.SOURCE_MERGE));
// and 10 segments below the max segment size, but within the same level
for (int i = 0; i < 10; ++i) {
segmentInfos.add(
makeSegmentCommitInfo(
"_" + segNameGenerator.getAndIncrement(), 2_000, 0, 0, IndexWriter.SOURCE_MERGE));
}
// LogMergePolicy used to have a bug that would make it exclude the first mergeFactor segments
// from merging if any of them was above the maximum merged size
MergeSpecification spec =
mergePolicy.findMerges(MergeTrigger.EXPLICIT, segmentInfos, mergeContext);
assertNotNull(spec);
for (OneMerge oneMerge : spec.merges) {
segmentInfos =
applyMerge(segmentInfos, oneMerge, "_" + segNameGenerator.getAndIncrement(), stats);
}
assertEquals(11_000, segmentInfos.info(0).info.maxDoc());
assertEquals(10_000, segmentInfos.info(1).info.maxDoc());
}
public void testFullFlushMerges() throws IOException { public void testFullFlushMerges() throws IOException {
AtomicLong segNameGenerator = new AtomicLong(); AtomicLong segNameGenerator = new AtomicLong();
IOStats stats = new IOStats(); IOStats stats = new IOStats();