LUCENE-10574: Avoid O(n^2) merging with LogMergePolicy (#936)

This commit is contained in:
Adrien Grand 2022-06-03 11:02:27 +02:00 committed by GitHub
parent 267a5fcc15
commit 3738beb038
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 198 additions and 60 deletions

View File

@ -96,11 +96,9 @@ public class LogByteSizeMergePolicy extends LogMergePolicy {
}
/**
* Sets the minimum size for the lowest level segments. Any segments below this size are
* considered to be on the same level (even if they vary drastically in size) and will be merged
* whenever there are mergeFactor of them. This effectively truncates the "long tail" of small
* segments that would otherwise be created into a single level. If you set this too large, it
* could greatly increase the merging cost during indexing (if you flush many small segments).
* Sets the minimum size for the lowest level segments. Any segments below this size will be
* merged more aggressively in order to avoid having a long tail of small segments. Large values
* of this parameter increase the merging cost during indexing if you flush small segments.
*/
public void setMinMergeMB(double mb) {
minMergeSize = (long) (mb * 1024 * 1024);

View File

@ -43,11 +43,9 @@ public class LogDocMergePolicy extends LogMergePolicy {
}
/**
* Sets the minimum size for the lowest level segments. Any segments below this size are
* considered to be on the same level (even if they vary drastically in size) and will be merged
* whenever there are mergeFactor of them. This effectively truncates the "long tail" of small
* segments that would otherwise be created into a single level. If you set this too large, it
* could greatly increase the merging cost during indexing (if you flush many small segments).
* Sets the minimum size for the lowest level segments. Any segments below this size will be
* merged more aggressively in order to avoid having a long tail of small segments. Large values
* of this parameter increase the merging cost during indexing if you flush small segments.
*/
public void setMinMergeDocs(int minMergeDocs) {
minMergeSize = minMergeDocs;

View File

@ -64,10 +64,7 @@ public abstract class LogMergePolicy extends MergePolicy {
/** How many segments to merge at a time. */
protected int mergeFactor = DEFAULT_MERGE_FACTOR;
/**
* Any segments whose size is smaller than this value will be rounded up to this value. This
* ensures that tiny segments are aggressively merged.
*/
/** Any segments whose size is smaller than this value will be merged more aggressively. */
protected long minMergeSize;
/** If the size of a segment exceeds this value then it will never be merged. */
@ -440,12 +437,10 @@ public abstract class LogMergePolicy extends MergePolicy {
private static class SegmentInfoAndLevel implements Comparable<SegmentInfoAndLevel> {
final SegmentCommitInfo info;
final long size;
final float level;
public SegmentInfoAndLevel(SegmentCommitInfo info, long size, float level) {
public SegmentInfoAndLevel(SegmentCommitInfo info, float level) {
this.info = info;
this.size = size;
this.level = level;
}
@ -488,7 +483,7 @@ public abstract class LogMergePolicy extends MergePolicy {
}
final SegmentInfoAndLevel infoLevel =
new SegmentInfoAndLevel(info, size, (float) Math.log(size) / norm);
new SegmentInfoAndLevel(info, (float) Math.log(size) / norm);
levels.add(infoLevel);
if (verbose(mergeContext)) {
@ -540,16 +535,21 @@ public abstract class LogMergePolicy extends MergePolicy {
// Now search backwards for the rightmost segment that
// falls into this level:
float levelBottom;
if (maxLevel <= levelFloor) {
// All remaining segments fall into the min level
levelBottom = -1.0F;
} else {
if (maxLevel > levelFloor) {
// With a merge factor of 10, this means that the biggest segment and the smallest segment
// that take part of a merge have a size difference of at most 5.6x.
levelBottom = (float) (maxLevel - LEVEL_LOG_SPAN);
// Force a boundary at the level floor
if (levelBottom < levelFloor && maxLevel >= levelFloor) {
levelBottom = levelFloor;
}
} else {
// For segments below the floor size, we allow more unbalanced merges, but still somewhat
// balanced to avoid running into O(n^2) merging.
// With a merge factor of 10, this means that the biggest segment and the smallest segment
// that take part of a merge have a size difference of at most 31.6x.
levelBottom = (float) (maxLevel - 2 * LEVEL_LOG_SPAN);
}
int upto = numMergeableSegments - 1;
@ -570,12 +570,8 @@ public abstract class LogMergePolicy extends MergePolicy {
while (end <= 1 + upto) {
boolean anyTooLarge = false;
boolean anyMerging = false;
long mergeSize = 0;
long maxSegmentSize = 0;
for (int i = start; i < end; i++) {
final SegmentInfoAndLevel segLevel = levels.get(i);
mergeSize += segLevel.size;
maxSegmentSize = Math.max(maxSegmentSize, segLevel.size);
final SegmentCommitInfo info = segLevel.info;
anyTooLarge |=
(size(info, mergeContext) >= maxMergeSize
@ -589,31 +585,25 @@ public abstract class LogMergePolicy extends MergePolicy {
if (anyMerging) {
// skip
} else if (!anyTooLarge) {
if (mergeSize >= maxSegmentSize * 1.5) {
// Ignore any merge where the resulting segment is not at least 50% larger than the
// biggest input segment.
// Otherwise we could run into pathological O(N^2) merging where merges keep rewriting
// again and again the biggest input segment into a segment that is barely bigger.
if (spec == null) {
spec = new MergeSpecification();
}
final List<SegmentCommitInfo> mergeInfos = new ArrayList<>(end - start);
for (int i = start; i < end; i++) {
mergeInfos.add(levels.get(i).info);
assert infos.contains(levels.get(i).info);
}
if (verbose(mergeContext)) {
message(
" add merge="
+ segString(mergeContext, mergeInfos)
+ " start="
+ start
+ " end="
+ end,
mergeContext);
}
spec.add(new OneMerge(mergeInfos));
} // else skip
if (spec == null) {
spec = new MergeSpecification();
}
final List<SegmentCommitInfo> mergeInfos = new ArrayList<>(end - start);
for (int i = start; i < end; i++) {
mergeInfos.add(levels.get(i).info);
assert infos.contains(levels.get(i).info);
}
if (verbose(mergeContext)) {
message(
" add merge="
+ segString(mergeContext, mergeInfos)
+ " start="
+ start
+ " end="
+ end,
mergeContext);
}
spec.add(new OneMerge(mergeInfos));
} else if (verbose(mergeContext)) {
message(
" "

View File

@ -17,9 +17,12 @@
package org.apache.lucene.index;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.lucene.index.MergePolicy.MergeContext;
import org.apache.lucene.index.MergePolicy.MergeSpecification;
import org.apache.lucene.index.MergePolicy.OneMerge;
import org.apache.lucene.tests.index.BaseMergePolicyTestCase;
import org.apache.lucene.util.Version;
public class TestLogMergePolicy extends BaseMergePolicyTestCase {
@ -35,7 +38,13 @@ public class TestLogMergePolicy extends BaseMergePolicyTestCase {
@Override
protected void assertSegmentInfos(MergePolicy policy, SegmentInfos infos) throws IOException {
// TODO
LogMergePolicy mp = (LogMergePolicy) policy;
MergeContext mockMergeContext = new MockMergeContext(SegmentCommitInfo::getDelCount);
for (SegmentCommitInfo info : infos) {
assertTrue(mp.size(info, mockMergeContext) / mp.getMergeFactor() < mp.maxMergeSize);
}
// TODO: what else can we check?
}
@Override
@ -45,4 +54,137 @@ public class TestLogMergePolicy extends BaseMergePolicyTestCase {
assertEquals(lmp.getMergeFactor(), oneMerge.segments.size());
}
}
public void testIncreasingSegmentSizes() throws IOException {
LogDocMergePolicy mergePolicy = new LogDocMergePolicy();
IOStats stats = new IOStats();
AtomicLong segNameGenerator = new AtomicLong();
MergeContext mergeContext = new MockMergeContext(SegmentCommitInfo::getDelCount);
SegmentInfos segmentInfos = new SegmentInfos(Version.LATEST.major);
// 11 segments of increasing sizes
for (int i = 0; i < 11; ++i) {
segmentInfos.add(
makeSegmentCommitInfo(
"_" + segNameGenerator.getAndIncrement(),
(i + 1) * 1000,
0,
0,
IndexWriter.SOURCE_MERGE));
}
MergeSpecification spec =
mergePolicy.findMerges(MergeTrigger.EXPLICIT, segmentInfos, mergeContext);
assertNotNull(spec);
for (OneMerge oneMerge : spec.merges) {
segmentInfos =
applyMerge(segmentInfos, oneMerge, "_" + segNameGenerator.getAndIncrement(), stats);
}
assertEquals(2, segmentInfos.size());
assertEquals(55_000, segmentInfos.info(0).info.maxDoc());
assertEquals(11_000, segmentInfos.info(1).info.maxDoc());
}
public void testOneSmallMiddleSegment() throws IOException {
LogDocMergePolicy mergePolicy = new LogDocMergePolicy();
IOStats stats = new IOStats();
AtomicLong segNameGenerator = new AtomicLong();
MergeContext mergeContext = new MockMergeContext(SegmentCommitInfo::getDelCount);
SegmentInfos segmentInfos = new SegmentInfos(Version.LATEST.major);
// 5 big segments
for (int i = 0; i < 5; ++i) {
segmentInfos.add(
makeSegmentCommitInfo(
"_" + segNameGenerator.getAndIncrement(), 10_000, 0, 0, IndexWriter.SOURCE_MERGE));
}
// 1 segment on a lower tier
segmentInfos.add(
makeSegmentCommitInfo(
"_" + segNameGenerator.getAndIncrement(), 100, 0, 0, IndexWriter.SOURCE_MERGE));
// 5 big segments again
for (int i = 0; i < 5; ++i) {
segmentInfos.add(
makeSegmentCommitInfo(
"_" + segNameGenerator.getAndIncrement(), 10_000, 0, 0, IndexWriter.SOURCE_MERGE));
}
// Ensure that having a small segment in the middle doesn't prevent merging
MergeSpecification spec =
mergePolicy.findMerges(MergeTrigger.EXPLICIT, segmentInfos, mergeContext);
assertNotNull(spec);
for (OneMerge oneMerge : spec.merges) {
segmentInfos =
applyMerge(segmentInfos, oneMerge, "_" + segNameGenerator.getAndIncrement(), stats);
}
assertEquals(2, segmentInfos.size());
assertEquals(90_100, segmentInfos.info(0).info.maxDoc());
assertEquals(10_000, segmentInfos.info(1).info.maxDoc());
}
public void testManySmallMiddleSegment() throws IOException {
LogDocMergePolicy mergePolicy = new LogDocMergePolicy();
IOStats stats = new IOStats();
AtomicLong segNameGenerator = new AtomicLong();
MergeContext mergeContext = new MockMergeContext(SegmentCommitInfo::getDelCount);
SegmentInfos segmentInfos = new SegmentInfos(Version.LATEST.major);
// 1 big segment
segmentInfos.add(
makeSegmentCommitInfo(
"_" + segNameGenerator.getAndIncrement(), 10_000, 0, 0, IndexWriter.SOURCE_MERGE));
// 9 segment on a lower tier
for (int i = 0; i < 9; ++i) {
segmentInfos.add(
makeSegmentCommitInfo(
"_" + segNameGenerator.getAndIncrement(), 100, 0, 0, IndexWriter.SOURCE_MERGE));
}
// 1 big segment again
segmentInfos.add(
makeSegmentCommitInfo(
"_" + segNameGenerator.getAndIncrement(), 10_000, 0, 0, IndexWriter.SOURCE_MERGE));
// Ensure that having small segments in the middle doesn't prevent merging
MergeSpecification spec =
mergePolicy.findMerges(MergeTrigger.EXPLICIT, segmentInfos, mergeContext);
assertNotNull(spec);
for (OneMerge oneMerge : spec.merges) {
segmentInfos =
applyMerge(segmentInfos, oneMerge, "_" + segNameGenerator.getAndIncrement(), stats);
}
assertEquals(2, segmentInfos.size());
assertEquals(10_900, segmentInfos.info(0).info.maxDoc());
assertEquals(10_000, segmentInfos.info(1).info.maxDoc());
}
public void testRejectUnbalancedMerges() throws IOException {
LogDocMergePolicy mergePolicy = new LogDocMergePolicy();
mergePolicy.setMinMergeDocs(10_000);
IOStats stats = new IOStats();
AtomicLong segNameGenerator = new AtomicLong();
MergeContext mergeContext = new MockMergeContext(SegmentCommitInfo::getDelCount);
SegmentInfos segmentInfos = new SegmentInfos(Version.LATEST.major);
// 1 100-docs segment
segmentInfos.add(
makeSegmentCommitInfo(
"_" + segNameGenerator.getAndIncrement(), 100, 0, 0, IndexWriter.SOURCE_MERGE));
// 9 1-doc segments again
for (int i = 0; i < 9; ++i) {
segmentInfos.add(
makeSegmentCommitInfo(
"_" + segNameGenerator.getAndIncrement(), 1, 0, 0, IndexWriter.SOURCE_FLUSH));
}
// Ensure though we're below the floor size, the merge would be too unbalanced
MergeSpecification spec =
mergePolicy.findMerges(MergeTrigger.EXPLICIT, segmentInfos, mergeContext);
assertNull(spec);
// another 1-doc segment, now we can merge 10 1-doc segments
segmentInfos.add(
makeSegmentCommitInfo(
"_" + segNameGenerator.getAndIncrement(), 1, 0, 0, IndexWriter.SOURCE_FLUSH));
spec = mergePolicy.findMerges(MergeTrigger.EXPLICIT, segmentInfos, mergeContext);
assertNotNull(spec);
for (OneMerge oneMerge : spec.merges) {
segmentInfos =
applyMerge(segmentInfos, oneMerge, "_" + segNameGenerator.getAndIncrement(), stats);
}
assertEquals(2, segmentInfos.size());
assertEquals(100, segmentInfos.info(0).info.maxDoc());
assertEquals(10, segmentInfos.info(1).info.maxDoc());
}
}

View File

@ -21,7 +21,7 @@ import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedHashSet;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
@ -328,21 +328,31 @@ public abstract class BaseMergePolicyTestCase extends LuceneTestCase {
protected static SegmentInfos applyMerge(
SegmentInfos infos, OneMerge merge, String mergedSegmentName, IOStats stats)
throws IOException {
LinkedHashSet<SegmentCommitInfo> scis = new LinkedHashSet<>(infos.asList());
int newMaxDoc = 0;
double newSize = 0;
for (SegmentCommitInfo sci : merge.segments) {
int numLiveDocs = sci.info.maxDoc() - sci.getDelCount();
newSize += (double) sci.sizeInBytes() * numLiveDocs / sci.info.maxDoc() / 1024 / 1024;
newMaxDoc += numLiveDocs;
boolean removed = scis.remove(sci);
assertTrue(removed);
}
SegmentCommitInfo mergedInfo =
makeSegmentCommitInfo(mergedSegmentName, newMaxDoc, 0, newSize, IndexWriter.SOURCE_MERGE);
Set<SegmentCommitInfo> mergedAway = new HashSet<>(merge.segments);
boolean mergedSegmentAdded = false;
SegmentInfos newInfos = new SegmentInfos(Version.LATEST.major);
newInfos.addAll(scis);
// Now add the merged segment
newInfos.add(
makeSegmentCommitInfo(mergedSegmentName, newMaxDoc, 0, newSize, IndexWriter.SOURCE_MERGE));
for (int i = 0; i < infos.size(); ++i) {
SegmentCommitInfo info = infos.info(i);
if (mergedAway.contains(info)) {
if (mergedSegmentAdded == false) {
newInfos.add(mergedInfo);
mergedSegmentAdded = true;
}
} else {
newInfos.add(info);
}
}
stats.mergeBytesWritten += newSize * 1024 * 1024;
return newInfos;
}