LUCENE-8385: Fix computation of the allowed segment count in TieredMergePolicy.

This commit is contained in:
Adrien Grand 2018-07-09 15:21:10 +02:00
parent 963cceebff
commit 41ddac5b44
3 changed files with 139 additions and 17 deletions

View File

@ -371,21 +371,7 @@ public class TieredMergePolicy extends MergePolicy {
assert totalMaxDoc >= 0;
assert totalDelDocs >= 0;
// Compute max allowed segments in the index
long levelSize = Math.max(minSegmentBytes, floorSegmentBytes);
long bytesLeft = totIndexBytes;
double allowedSegCount = 0;
while (true) {
final double segCountLevel = bytesLeft / (double) levelSize;
if (segCountLevel < segsPerTier) {
allowedSegCount += Math.ceil(segCountLevel);
break;
}
allowedSegCount += segsPerTier;
bytesLeft -= segsPerTier * levelSize;
levelSize *= maxMergeAtOnce;
}
// If we have too-large segments, grace them out of the maximum segment count
// If we're above certain thresholds, we can merge very large segments.
double totalDelPct = (double) totalDelDocs / (double) totalMaxDoc;
//TODO: See LUCENE-8263
@ -406,11 +392,27 @@ public class TieredMergePolicy extends MergePolicy {
if (segSizeDocs.sizeInBytes > maxMergedSegmentBytes / 2 && (totalDelPct < targetAsPct || segDelPct < targetAsPct)) {
iter.remove();
tooBigCount++; // Just for reporting purposes.
totIndexBytes -= segSizeDocs.sizeInBytes;
} else {
mergingBytes += segSizeDocs.sizeInBytes;
}
}
// Compute max allowed segments in the index
long levelSize = Math.max(minSegmentBytes, floorSegmentBytes);
long bytesLeft = totIndexBytes;
double allowedSegCount = 0;
while (true) {
final double segCountLevel = bytesLeft / (double) levelSize;
if (segCountLevel < segsPerTier) {
allowedSegCount += Math.ceil(segCountLevel);
break;
}
allowedSegCount += segsPerTier;
bytesLeft -= segsPerTier * levelSize;
levelSize *= maxMergeAtOnce;
}
if (verbose(mergeContext) && tooBigCount > 0) {
message(" allowedSegmentCount=" + allowedSegCount + " vs count=" + infos.size() +
" (eligible count=" + sortedInfos.size() + ") tooBigCount= " + tooBigCount, mergeContext);

View File

@ -25,9 +25,12 @@ import java.util.List;
import org.apache.lucene.analysis.MockAnalyzer;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
import org.apache.lucene.index.MergePolicy.MergeSpecification;
import org.apache.lucene.index.MergePolicy.OneMerge;
import org.apache.lucene.store.Directory;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.TestUtil;
import org.apache.lucene.util.Version;
public class TestTieredMergePolicy extends BaseMergePolicyTestCase {
@ -493,4 +496,32 @@ public class TestTieredMergePolicy extends BaseMergePolicyTestCase {
w.close();
dir.close();
}
public void testManyMaxSizeSegments() throws IOException {
TieredMergePolicy policy = new TieredMergePolicy();
policy.setMaxMergedSegmentMB(1024); // 1GB
SegmentInfos infos = new SegmentInfos(Version.LATEST.major);
int i = 0;
for (int j = 0; j < 30; ++j) {
infos.add(makeSegmentCommitInfo("_" + i, 1000, 0, 1024)); // max size
}
for (int j = 0; j < 8; ++j) {
infos.add(makeSegmentCommitInfo("_" + i, 1000, 0, 102)); // 102MB flushes
}
// Only 8 segments on 1 tier in addition to the max-size segments, nothing to do
MergeSpecification mergeSpec = policy.findMerges(MergeTrigger.SEGMENT_FLUSH, infos, new MockMergeContext(SegmentCommitInfo::getDelCount));
assertNull(mergeSpec);
for (int j = 0; j < 5; ++j) {
infos.add(makeSegmentCommitInfo("_" + i, 1000, 0, 102)); // 102MB flushes
}
// Now 13 segments on 1 tier in addition to the max-size segments, 10 of them should get merged in one merge
mergeSpec = policy.findMerges(MergeTrigger.SEGMENT_FLUSH, infos, new MockMergeContext(SegmentCommitInfo::getDelCount));
assertNotNull(mergeSpec);
assertEquals(1, mergeSpec.merges.size());
OneMerge merge = mergeSpec.merges.get(0);
assertEquals(10, merge.segments.size());
}
}

View File

@ -19,6 +19,10 @@ package org.apache.lucene.index;
import org.apache.lucene.analysis.MockAnalyzer;
import org.apache.lucene.document.Document;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.IndexOutput;
import org.apache.lucene.store.Lock;
import org.apache.lucene.util.InfoStream;
import org.apache.lucene.util.LuceneTestCase;
import org.apache.lucene.util.NullInfoStream;
@ -28,6 +32,7 @@ import org.apache.lucene.util.Version;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Collection;
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
@ -156,5 +161,89 @@ public abstract class BaseMergePolicyTestCase extends LuceneTestCase {
return Collections.emptySet();
}
}
/**
* Make a new {@link SegmentCommitInfo} with the given {@code maxDoc},
* {@code numDeletedDocs} and {@code sizeInBytes}, which are usually the
* numbers that merge policies care about.
*/
protected static SegmentCommitInfo makeSegmentCommitInfo(String name, int maxDoc, int numDeletedDocs, double sizeMB) {
if (name.startsWith("_") == false) {
throw new IllegalArgumentException("name must start with an _, got " + name);
}
byte[] id = new byte[StringHelper.ID_LENGTH];
random().nextBytes(id);
SegmentInfo info = new SegmentInfo(FAKE_DIRECTORY, Version.LATEST, Version.LATEST, name, maxDoc, false, TestUtil.getDefaultCodec(), Collections.emptyMap(), id, Collections.emptyMap(), null);
info.setFiles(Collections.singleton(name + "_size=" + Long.toString((long) (sizeMB * 1024 * 1024)) + ".fake"));
return new SegmentCommitInfo(info, numDeletedDocs, 0, 0, 0, 0);
}
/** A directory that computes the length of a file based on its name. */
private static final Directory FAKE_DIRECTORY = new Directory() {
@Override
public String[] listAll() throws IOException {
throw new UnsupportedOperationException();
}
@Override
public void deleteFile(String name) throws IOException {
throw new UnsupportedOperationException();
}
@Override
public long fileLength(String name) throws IOException {
if (name.endsWith(".liv")) {
return 0L;
}
if (name.endsWith(".fake") == false) {
throw new IllegalArgumentException(name);
}
int startIndex = name.indexOf("_size=") + "_size=".length();
int endIndex = name.length() - ".fake".length();
return Long.parseLong(name.substring(startIndex, endIndex));
}
@Override
public IndexOutput createOutput(String name, IOContext context) throws IOException {
throw new UnsupportedOperationException();
}
@Override
public IndexOutput createTempOutput(String prefix, String suffix, IOContext context) throws IOException {
throw new UnsupportedOperationException();
}
@Override
public void sync(Collection<String> names) throws IOException {
throw new UnsupportedOperationException();
}
@Override
public void rename(String source, String dest) throws IOException {
throw new UnsupportedOperationException();
}
@Override
public void syncMetaData() throws IOException {
throw new UnsupportedOperationException();
}
@Override
public IndexInput openInput(String name, IOContext context) throws IOException {
throw new UnsupportedOperationException();
}
@Override
public Lock obtainLock(String name) throws IOException {
throw new UnsupportedOperationException();
}
@Override
public void close() throws IOException {
throw new UnsupportedOperationException();
}
};
}