LUCENE-8688: Forced merges merge more than necessary.

This commit is contained in:
Adrien Grand 2019-03-15 10:27:27 +01:00
parent ecf86ecab7
commit d89387553d
4 changed files with 161 additions and 6 deletions

View File

@ -29,6 +29,10 @@ Bug fixes
* LUCENE-8719: FixedShingleFilter can miss shingles at the end of a token stream if
there are multiple paths with different lengths. (Alan Woodward)
* LUCENE-8688: TieredMergePolicy#findForcedMerges now tries to create the
cheapest merges that allow the index to go down to `maxSegmentCount` segments
or less. (Armin Braun via Adrien Grand)
Improvements
* LUCENE-8673: Use radix partitioning when merging dimensional points instead

View File

@ -622,9 +622,8 @@ public class TieredMergePolicy extends MergePolicy {
};
}
@Override
public MergeSpecification findForcedMerges(SegmentInfos infos, int maxSegmentCount, Map<SegmentCommitInfo,Boolean> segmentsToMerge, MergeContext mergeContext) throws IOException {
public MergeSpecification findForcedMerges(SegmentInfos infos, int maxSegmentCount, Map<SegmentCommitInfo, Boolean> segmentsToMerge, MergeContext mergeContext) throws IOException {
if (verbose(mergeContext)) {
message("findForcedMerges maxSegmentCount=" + maxSegmentCount + " infos=" + segString(mergeContext, infos) +
" segmentsToMerge=" + segmentsToMerge, mergeContext);
@ -639,6 +638,7 @@ public class TieredMergePolicy extends MergePolicy {
// Trim the list down, remove if we're respecting max segment size and it's not original. Presumably it's been merged before and
// is close enough to the max segment size we shouldn't add it in again.
Iterator<SegmentSizeAndDocs> iter = sortedSizeAndDocs.iterator();
boolean forceMergeRunning = false;
while (iter.hasNext()) {
SegmentSizeAndDocs segSizeDocs = iter.next();
final Boolean isOriginal = segmentsToMerge.get(segSizeDocs.segInfo);
@ -646,6 +646,7 @@ public class TieredMergePolicy extends MergePolicy {
iter.remove();
} else {
if (merging.contains(segSizeDocs.segInfo)) {
forceMergeRunning = true;
iter.remove();
} else {
totalMergeBytes += segSizeDocs.sizeInBytes;
@ -707,6 +708,12 @@ public class TieredMergePolicy extends MergePolicy {
message("eligible=" + sortedSizeAndDocs, mergeContext);
}
final int startingSegmentCount = sortedSizeAndDocs.size();
final boolean finalMerge = startingSegmentCount < maxSegmentCount + maxMergeAtOnceExplicit - 1;
if (finalMerge && forceMergeRunning) {
return null;
}
// This is the special case of merging down to one segment
if (sortedSizeAndDocs.size() < maxMergeAtOnceExplicit && maxSegmentCount == 1 && totalMergeBytes < maxMergeBytes) {
MergeSpecification spec = new MergeSpecification();
@ -718,10 +725,50 @@ public class TieredMergePolicy extends MergePolicy {
return spec;
}
MergeSpecification spec = doFindMerges(sortedSizeAndDocs, maxMergeBytes, maxMergeAtOnceExplicit,
maxSegmentCount, 0, MERGE_TYPE.FORCE_MERGE, mergeContext, false);
MergeSpecification spec = null;
return spec;
int index = startingSegmentCount - 1;
int resultingSegments = startingSegmentCount;
while (true) {
List<SegmentCommitInfo> candidate = new ArrayList<>();
long currentCandidateBytes = 0L;
int mergesAllowed = maxMergeAtOnceExplicit;
while (index >= 0 && resultingSegments > maxSegmentCount && mergesAllowed > 0) {
final SegmentCommitInfo current = sortedSizeAndDocs.get(index).segInfo;
final int initialCandidateSize = candidate.size();
final long currentSegmentSize = current.sizeInBytes();
// We either add to the bin because there's space or because the it is the smallest possible bin since
// decrementing the index will move us to even larger segments.
if (currentCandidateBytes + currentSegmentSize <= maxMergeBytes || initialCandidateSize < 2) {
candidate.add(current);
--index;
currentCandidateBytes += currentSegmentSize;
--mergesAllowed;
if (initialCandidateSize > 0) {
// Any merge that handles two or more segments reduces the resulting number of segments
// by the number of segments handled - 1
--resultingSegments;
}
} else {
break;
}
}
final int candidateSize = candidate.size();
// While a force merge is running, only merges that cover the maximum allowed number of segments or that create a segment close to the
// maximum allowed segment sized are permitted
if (candidateSize > 1 && (forceMergeRunning == false || candidateSize == maxMergeAtOnceExplicit || candidateSize > 0.7 * maxMergeBytes)) {
final OneMerge merge = new OneMerge(candidate);
if (verbose(mergeContext)) {
message("add merge=" + segString(mergeContext, merge.segments), mergeContext);
}
if (spec == null) {
spec = new MergeSpecification();
}
spec.add(merge);
} else {
return spec;
}
}
}
@Override

View File

@ -19,8 +19,11 @@ package org.apache.lucene.index;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.lucene.analysis.MockAnalyzer;
import org.apache.lucene.document.Document;
@ -360,6 +363,101 @@ public class TestTieredMergePolicy extends BaseMergePolicyTestCase {
dir.close();
}
// LUCENE-8688 reports that force merges merged more segments that necessary to respect maxSegmentCount as a result
// of LUCENE-7976 so we ensure that it only does the minimum number of merges here.
public void testForcedMergesUseLeastNumberOfMerges() throws Exception {
final TieredMergePolicy tmp = new TieredMergePolicy();
final double oneSegmentSize = 1.0D;
final double maxSegmentSize = 10 * oneSegmentSize;
tmp.setMaxMergedSegmentMB(maxSegmentSize);
SegmentInfos infos = new SegmentInfos(Version.LATEST.major);
for (int j = 0; j < 30; ++j) {
infos.add(makeSegmentCommitInfo("_" + j, 1000, 0, oneSegmentSize, IndexWriter.SOURCE_MERGE));
}
final int expectedCount = random().nextInt(10) + 3;
final MergeSpecification specification =
tmp.findForcedMerges(infos, expectedCount, segmentsToMerge(infos), new MockMergeContext(SegmentCommitInfo::getDelCount));
assertMaxSize(specification, maxSegmentSize);
final int resultingCount =
infos.size() + specification.merges.size() - specification.merges.stream().mapToInt(spec -> spec.segments.size()).sum();
assertEquals(expectedCount, resultingCount);
SegmentInfos manySegmentsInfos = new SegmentInfos(Version.LATEST.major);
final int manySegmentsCount = atLeast(100);
for (int j = 0; j < manySegmentsCount; ++j) {
manySegmentsInfos.add(makeSegmentCommitInfo("_" + j, 1000, 0, 0.1D, IndexWriter.SOURCE_MERGE));
}
final MergeSpecification specificationManySegments = tmp.findForcedMerges(
manySegmentsInfos, expectedCount, segmentsToMerge(manySegmentsInfos), new MockMergeContext(SegmentCommitInfo::getDelCount));
assertMaxSize(specificationManySegments, maxSegmentSize);
final int resultingCountManySegments = manySegmentsInfos.size() + specificationManySegments.merges.size()
- specificationManySegments.merges.stream().mapToInt(spec -> spec.segments.size()).sum();
assertTrue(resultingCountManySegments >= expectedCount);
}
// Make sure that TieredMergePolicy doesn't do the final merge while there are merges ongoing, but does do non-final
// merges while merges are ongoing.
public void testForcedMergeWithPending() throws Exception {
final TieredMergePolicy tmp = new TieredMergePolicy();
final double maxSegmentSize = 10.0D;
tmp.setMaxMergedSegmentMB(maxSegmentSize);
SegmentInfos infos = new SegmentInfos(Version.LATEST.major);
for (int j = 0; j < 30; ++j) {
infos.add(makeSegmentCommitInfo("_" + j, 1000, 0, 1.0D, IndexWriter.SOURCE_MERGE));
}
final MockMergeContext mergeContext = new MockMergeContext(SegmentCommitInfo::getDelCount);
mergeContext.setMergingSegments(Collections.singleton(infos.asList().get(0)));
final int expectedCount = random().nextInt(10) + 3;
final MergeSpecification specification = tmp.findForcedMerges(infos, expectedCount, segmentsToMerge(infos), mergeContext);
// Since we have fewer than 30 (the max merge count) segments more than the final size this would have been the final merge
// so we check that it was prevented.
assertNull(specification);
SegmentInfos manySegmentsInfos = new SegmentInfos(Version.LATEST.major);
final int manySegmentsCount = atLeast(500);
for (int j = 0; j < manySegmentsCount; ++j) {
manySegmentsInfos.add(makeSegmentCommitInfo("_" + j, 1000, 0, 0.1D, IndexWriter.SOURCE_MERGE));
}
// We set one merge to be ongoing. Since we have more than 30 (the max merge count) times the number of segments
// of that we want to merge to this is not the final merge and hence the returned specification must not be null.
mergeContext.setMergingSegments(Collections.singleton(manySegmentsInfos.asList().get(0)));
final MergeSpecification specificationManySegments =
tmp.findForcedMerges(manySegmentsInfos, expectedCount, segmentsToMerge(manySegmentsInfos), mergeContext);
assertMaxSize(specificationManySegments, maxSegmentSize);
for (OneMerge merge : specificationManySegments.merges) {
assertEquals("No merges of less than the max merge count are permitted while another merge is in progress",
merge.segments.size(), tmp.getMaxMergeAtOnceExplicit());
}
final int resultingCountManySegments = manySegmentsInfos.size() + specificationManySegments.merges.size()
- specificationManySegments.merges.stream().mapToInt(spec -> spec.segments.size()).sum();
assertTrue(resultingCountManySegments >= expectedCount);
}
private static Map<SegmentCommitInfo, Boolean> segmentsToMerge(SegmentInfos infos) {
final Map<SegmentCommitInfo, Boolean> segmentsToMerge = new HashMap<>();
for (SegmentCommitInfo info : infos) {
segmentsToMerge.put(info, Boolean.TRUE);
}
return segmentsToMerge;
}
private static void assertMaxSize(MergeSpecification specification, double maxSegmentSizeMb) {
for (OneMerge merge : specification.merges) {
assertTrue(merge.segments.stream().mapToLong(s -> {
try {
return s.sizeInBytes();
} catch (IOException e) {
throw new AssertionError(e);
}
}).sum() < 1024 * 1024 * maxSegmentSizeMb * 1.5);
}
}
// Having a segment with very few documents in it can happen because of the random nature of the
// docs added to the index. For instance, let's say it just happens that the last segment has 3 docs in it.
// It can easily be merged with a close-to-max sized segment during a forceMerge and still respect the max segment

View File

@ -162,6 +162,8 @@ public abstract class BaseMergePolicyTestCase extends LuceneTestCase {
}
};
private Set<SegmentCommitInfo> mergingSegments = Collections.emptySet();
public MockMergeContext(ToIntFunction<SegmentCommitInfo> numDeletesFunc) {
this.numDeletesFunc = numDeletesFunc;
}
@ -183,7 +185,11 @@ public abstract class BaseMergePolicyTestCase extends LuceneTestCase {
@Override
public Set<SegmentCommitInfo> getMergingSegments() {
return Collections.emptySet();
return mergingSegments;
}
public void setMergingSegments(Set<SegmentCommitInfo> mergingSegments) {
this.mergingSegments = mergingSegments;
}
}