mirror of https://github.com/apache/lucene.git
LUCENE-8391: More tests for merge policies.
This commit is contained in:
parent
a864ef8231
commit
b1bb11b79d
|
@ -16,6 +16,10 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.lucene.index;
|
package org.apache.lucene.index;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
import org.apache.lucene.index.MergePolicy.MergeSpecification;
|
||||||
|
import org.apache.lucene.index.MergePolicy.OneMerge;
|
||||||
|
|
||||||
public class TestLogMergePolicy extends BaseMergePolicyTestCase {
|
public class TestLogMergePolicy extends BaseMergePolicyTestCase {
|
||||||
|
|
||||||
|
@ -27,4 +31,17 @@ public class TestLogMergePolicy extends BaseMergePolicyTestCase {
|
||||||
LogByteSizeMergePolicy mp = new LogByteSizeMergePolicy();
|
LogByteSizeMergePolicy mp = new LogByteSizeMergePolicy();
|
||||||
assertTrue(mp.getMaxMergeMBForForcedMerge() > 0.0);
|
assertTrue(mp.getMaxMergeMBForForcedMerge() > 0.0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void assertSegmentInfos(MergePolicy policy, SegmentInfos infos) throws IOException {
|
||||||
|
// TODO
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void assertMerge(MergePolicy policy, MergeSpecification merge) throws IOException {
|
||||||
|
LogMergePolicy lmp = (LogMergePolicy) policy;
|
||||||
|
for (OneMerge oneMerge : merge.merges) {
|
||||||
|
assertEquals(lmp.getMergeFactor(), oneMerge.segments.size());
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -17,11 +17,13 @@
|
||||||
package org.apache.lucene.index;
|
package org.apache.lucene.index;
|
||||||
|
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
import java.lang.reflect.Constructor;
|
import java.lang.reflect.Constructor;
|
||||||
import java.lang.reflect.Method;
|
import java.lang.reflect.Method;
|
||||||
import java.lang.reflect.Modifier;
|
import java.lang.reflect.Modifier;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
|
|
||||||
|
import org.apache.lucene.index.MergePolicy.MergeSpecification;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
public class TestNoMergePolicy extends BaseMergePolicyTestCase {
|
public class TestNoMergePolicy extends BaseMergePolicyTestCase {
|
||||||
|
@ -66,4 +68,27 @@ public class TestNoMergePolicy extends BaseMergePolicyTestCase {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void assertSegmentInfos(MergePolicy policy, SegmentInfos infos) throws IOException {
|
||||||
|
for (SegmentCommitInfo info : infos) {
|
||||||
|
assertEquals(IndexWriter.SOURCE_FLUSH, info.info.getAttribute(IndexWriter.SOURCE));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void assertMerge(MergePolicy policy, MergeSpecification merge) throws IOException {
|
||||||
|
fail(); // should never happen
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void testSimulateAppendOnly() throws IOException {
|
||||||
|
// Reduce numbers as this merge policy doesn't work well with lots of data
|
||||||
|
doTestSimulateAppendOnly(mergePolicy(), 1_000_000, 10_000);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void testSimulateUpdates() throws IOException {
|
||||||
|
// Reduce numbers as this merge policy doesn't work well with lots of data
|
||||||
|
doTestSimulateUpdates(mergePolicy(), 100_000, 1000);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -34,10 +34,57 @@ import org.apache.lucene.util.Version;
|
||||||
|
|
||||||
public class TestTieredMergePolicy extends BaseMergePolicyTestCase {
|
public class TestTieredMergePolicy extends BaseMergePolicyTestCase {
|
||||||
|
|
||||||
public MergePolicy mergePolicy() {
|
@Override
|
||||||
|
public TieredMergePolicy mergePolicy() {
|
||||||
return newTieredMergePolicy();
|
return newTieredMergePolicy();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void assertSegmentInfos(MergePolicy policy, SegmentInfos infos) throws IOException {
|
||||||
|
TieredMergePolicy tmp = (TieredMergePolicy) policy;
|
||||||
|
|
||||||
|
final long maxMergedSegmentBytes = (long) (tmp.getMaxMergedSegmentMB() * 1024 * 1024);
|
||||||
|
|
||||||
|
long minSegmentBytes = Long.MAX_VALUE;
|
||||||
|
long totalBytes = 0;
|
||||||
|
for (SegmentCommitInfo sci : infos) {
|
||||||
|
long byteSize = sci.sizeInBytes();
|
||||||
|
double liveRatio = 1 - (double) sci.getDelCount() / sci.info.maxDoc();
|
||||||
|
long weightedByteSize = (long) Math.round(liveRatio * byteSize);
|
||||||
|
totalBytes += weightedByteSize;
|
||||||
|
minSegmentBytes = Math.min(minSegmentBytes, weightedByteSize);
|
||||||
|
}
|
||||||
|
|
||||||
|
long levelSize = Math.max(minSegmentBytes, (long) (tmp.getFloorSegmentMB() * 1024 * 1024));
|
||||||
|
long bytesLeft = totalBytes;
|
||||||
|
double allowedSegCount = 0;
|
||||||
|
// below we make the assumption that segments that reached the max segment
|
||||||
|
// size divided by 2 don't need merging anymore
|
||||||
|
int mergeFactor = (int) Math.min(tmp.getSegmentsPerTier(), tmp.getMaxMergeAtOnce());
|
||||||
|
while (true) {
|
||||||
|
final double segCountLevel = bytesLeft / (double) levelSize;
|
||||||
|
if (segCountLevel < tmp.getSegmentsPerTier() || levelSize > maxMergedSegmentBytes / 2) {
|
||||||
|
allowedSegCount += Math.ceil(segCountLevel);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
allowedSegCount += tmp.getSegmentsPerTier();
|
||||||
|
bytesLeft -= tmp.getSegmentsPerTier() * levelSize;
|
||||||
|
levelSize = Math.min(levelSize * mergeFactor, maxMergedSegmentBytes / 2);
|
||||||
|
}
|
||||||
|
allowedSegCount = Math.max(allowedSegCount, tmp.getSegmentsPerTier());
|
||||||
|
|
||||||
|
int numSegments = infos.asList().size();
|
||||||
|
assertTrue("numSegments=" + numSegments + ", allowed=" + allowedSegCount, numSegments <= allowedSegCount);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void assertMerge(MergePolicy policy, MergeSpecification merges) {
|
||||||
|
TieredMergePolicy tmp = (TieredMergePolicy) policy;
|
||||||
|
for (OneMerge merge : merges.merges) {
|
||||||
|
assertTrue(merge.segments.size() <= tmp.getMaxMergeAtOnce());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public void testForceMergeDeletes() throws Exception {
|
public void testForceMergeDeletes() throws Exception {
|
||||||
Directory dir = newDirectory();
|
Directory dir = newDirectory();
|
||||||
IndexWriterConfig conf = newIndexWriterConfig(new MockAnalyzer(random()));
|
IndexWriterConfig conf = newIndexWriterConfig(new MockAnalyzer(random()));
|
||||||
|
@ -503,10 +550,10 @@ public class TestTieredMergePolicy extends BaseMergePolicyTestCase {
|
||||||
SegmentInfos infos = new SegmentInfos(Version.LATEST.major);
|
SegmentInfos infos = new SegmentInfos(Version.LATEST.major);
|
||||||
int i = 0;
|
int i = 0;
|
||||||
for (int j = 0; j < 30; ++j) {
|
for (int j = 0; j < 30; ++j) {
|
||||||
infos.add(makeSegmentCommitInfo("_" + i, 1000, 0, 1024)); // max size
|
infos.add(makeSegmentCommitInfo("_" + i, 1000, 0, 1024, IndexWriter.SOURCE_MERGE)); // max size
|
||||||
}
|
}
|
||||||
for (int j = 0; j < 8; ++j) {
|
for (int j = 0; j < 8; ++j) {
|
||||||
infos.add(makeSegmentCommitInfo("_" + i, 1000, 0, 102)); // 102MB flushes
|
infos.add(makeSegmentCommitInfo("_" + i, 1000, 0, 102, IndexWriter.SOURCE_FLUSH)); // 102MB flushes
|
||||||
}
|
}
|
||||||
|
|
||||||
// Only 8 segments on 1 tier in addition to the max-size segments, nothing to do
|
// Only 8 segments on 1 tier in addition to the max-size segments, nothing to do
|
||||||
|
@ -514,7 +561,7 @@ public class TestTieredMergePolicy extends BaseMergePolicyTestCase {
|
||||||
assertNull(mergeSpec);
|
assertNull(mergeSpec);
|
||||||
|
|
||||||
for (int j = 0; j < 5; ++j) {
|
for (int j = 0; j < 5; ++j) {
|
||||||
infos.add(makeSegmentCommitInfo("_" + i, 1000, 0, 102)); // 102MB flushes
|
infos.add(makeSegmentCommitInfo("_" + i, 1000, 0, 102, IndexWriter.SOURCE_FLUSH)); // 102MB flushes
|
||||||
}
|
}
|
||||||
|
|
||||||
// Now 13 segments on 1 tier in addition to the max-size segments, 10 of them should get merged in one merge
|
// Now 13 segments on 1 tier in addition to the max-size segments, 10 of them should get merged in one merge
|
||||||
|
@ -524,4 +571,20 @@ public class TestTieredMergePolicy extends BaseMergePolicyTestCase {
|
||||||
OneMerge merge = mergeSpec.merges.get(0);
|
OneMerge merge = mergeSpec.merges.get(0);
|
||||||
assertEquals(10, merge.segments.size());
|
assertEquals(10, merge.segments.size());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void testSimulateAppendOnly() throws IOException {
|
||||||
|
TieredMergePolicy mergePolicy = mergePolicy();
|
||||||
|
// Avoid low values of the max merged segment size which prevent this merge policy from scaling well
|
||||||
|
mergePolicy.setMaxMergedSegmentMB(TestUtil.nextInt(random(), 1024, 10 * 1024));
|
||||||
|
doTestSimulateAppendOnly(mergePolicy, 100_000_000, 10_000);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void testSimulateUpdates() throws IOException {
|
||||||
|
TieredMergePolicy mergePolicy = mergePolicy();
|
||||||
|
// Avoid low values of the max merged segment size which prevent this merge policy from scaling well
|
||||||
|
mergePolicy.setMaxMergedSegmentMB(TestUtil.nextInt(random(), 1024, 10 * 1024));
|
||||||
|
doTestSimulateUpdates(mergePolicy, 10_000_000, 2500);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -16,6 +16,9 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.lucene.index;
|
package org.apache.lucene.index;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
import org.apache.lucene.index.MergePolicy.MergeSpecification;
|
||||||
|
|
||||||
public class TestUpgradeIndexMergePolicy extends BaseMergePolicyTestCase {
|
public class TestUpgradeIndexMergePolicy extends BaseMergePolicyTestCase {
|
||||||
|
|
||||||
|
@ -23,4 +26,14 @@ public class TestUpgradeIndexMergePolicy extends BaseMergePolicyTestCase {
|
||||||
return new UpgradeIndexMergePolicy(newMergePolicy(random()));
|
return new UpgradeIndexMergePolicy(newMergePolicy(random()));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void assertSegmentInfos(MergePolicy policy, SegmentInfos infos) throws IOException {
|
||||||
|
// no-op
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void assertMerge(MergePolicy policy, MergeSpecification merge) throws IOException {
|
||||||
|
// no-op
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -16,8 +16,23 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.lucene.index;
|
package org.apache.lucene.index;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.nio.charset.StandardCharsets;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.LinkedHashSet;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
import java.util.function.ToIntFunction;
|
||||||
|
|
||||||
import org.apache.lucene.analysis.MockAnalyzer;
|
import org.apache.lucene.analysis.MockAnalyzer;
|
||||||
import org.apache.lucene.document.Document;
|
import org.apache.lucene.document.Document;
|
||||||
|
import org.apache.lucene.index.MergePolicy.MergeContext;
|
||||||
|
import org.apache.lucene.index.MergePolicy.MergeSpecification;
|
||||||
|
import org.apache.lucene.index.MergePolicy.OneMerge;
|
||||||
import org.apache.lucene.store.Directory;
|
import org.apache.lucene.store.Directory;
|
||||||
import org.apache.lucene.store.IOContext;
|
import org.apache.lucene.store.IOContext;
|
||||||
import org.apache.lucene.store.IndexInput;
|
import org.apache.lucene.store.IndexInput;
|
||||||
|
@ -30,14 +45,6 @@ import org.apache.lucene.util.StringHelper;
|
||||||
import org.apache.lucene.util.TestUtil;
|
import org.apache.lucene.util.TestUtil;
|
||||||
import org.apache.lucene.util.Version;
|
import org.apache.lucene.util.Version;
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.nio.charset.StandardCharsets;
|
|
||||||
import java.util.Collection;
|
|
||||||
import java.util.Collections;
|
|
||||||
import java.util.Set;
|
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
|
||||||
import java.util.function.ToIntFunction;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Base test case for {@link MergePolicy}.
|
* Base test case for {@link MergePolicy}.
|
||||||
*/
|
*/
|
||||||
|
@ -46,6 +53,18 @@ public abstract class BaseMergePolicyTestCase extends LuceneTestCase {
|
||||||
/** Create a new {@link MergePolicy} instance. */
|
/** Create a new {@link MergePolicy} instance. */
|
||||||
protected abstract MergePolicy mergePolicy();
|
protected abstract MergePolicy mergePolicy();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Assert that the given segment infos match expectations of the merge
|
||||||
|
* policy, assuming segments that have only been either flushed or merged with
|
||||||
|
* this merge policy.
|
||||||
|
*/
|
||||||
|
protected abstract void assertSegmentInfos(MergePolicy policy, SegmentInfos infos) throws IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Assert that the given merge matches expectations of the merge policy.
|
||||||
|
*/
|
||||||
|
protected abstract void assertMerge(MergePolicy policy, MergeSpecification merge) throws IOException;
|
||||||
|
|
||||||
public void testForceMergeNotNeeded() throws IOException {
|
public void testForceMergeNotNeeded() throws IOException {
|
||||||
try (Directory dir = newDirectory()) {
|
try (Directory dir = newDirectory()) {
|
||||||
final AtomicBoolean mayMerge = new AtomicBoolean(true);
|
final AtomicBoolean mayMerge = new AtomicBoolean(true);
|
||||||
|
@ -135,7 +154,13 @@ public abstract class BaseMergePolicyTestCase extends LuceneTestCase {
|
||||||
*/
|
*/
|
||||||
public static final class MockMergeContext implements MergePolicy.MergeContext {
|
public static final class MockMergeContext implements MergePolicy.MergeContext {
|
||||||
private final ToIntFunction<SegmentCommitInfo> numDeletesFunc;
|
private final ToIntFunction<SegmentCommitInfo> numDeletesFunc;
|
||||||
private final InfoStream infoStream = new NullInfoStream();
|
private final InfoStream infoStream = new NullInfoStream() {
|
||||||
|
@Override
|
||||||
|
public boolean isEnabled(String component) {
|
||||||
|
// otherwise tests that simulate merging may bottleneck on generating messages
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
public MockMergeContext(ToIntFunction<SegmentCommitInfo> numDeletesFunc) {
|
public MockMergeContext(ToIntFunction<SegmentCommitInfo> numDeletesFunc) {
|
||||||
this.numDeletesFunc = numDeletesFunc;
|
this.numDeletesFunc = numDeletesFunc;
|
||||||
|
@ -167,13 +192,15 @@ public abstract class BaseMergePolicyTestCase extends LuceneTestCase {
|
||||||
* {@code numDeletedDocs} and {@code sizeInBytes}, which are usually the
|
* {@code numDeletedDocs} and {@code sizeInBytes}, which are usually the
|
||||||
* numbers that merge policies care about.
|
* numbers that merge policies care about.
|
||||||
*/
|
*/
|
||||||
protected static SegmentCommitInfo makeSegmentCommitInfo(String name, int maxDoc, int numDeletedDocs, double sizeMB) {
|
protected static SegmentCommitInfo makeSegmentCommitInfo(String name, int maxDoc, int numDeletedDocs, double sizeMB, String source) {
|
||||||
if (name.startsWith("_") == false) {
|
if (name.startsWith("_") == false) {
|
||||||
throw new IllegalArgumentException("name must start with an _, got " + name);
|
throw new IllegalArgumentException("name must start with an _, got " + name);
|
||||||
}
|
}
|
||||||
byte[] id = new byte[StringHelper.ID_LENGTH];
|
byte[] id = new byte[StringHelper.ID_LENGTH];
|
||||||
random().nextBytes(id);
|
random().nextBytes(id);
|
||||||
SegmentInfo info = new SegmentInfo(FAKE_DIRECTORY, Version.LATEST, Version.LATEST, name, maxDoc, false, TestUtil.getDefaultCodec(), Collections.emptyMap(), id, Collections.emptyMap(), null);
|
SegmentInfo info = new SegmentInfo(FAKE_DIRECTORY, Version.LATEST, Version.LATEST,
|
||||||
|
name, maxDoc, false, TestUtil.getDefaultCodec(), Collections.emptyMap(), id,
|
||||||
|
Collections.singletonMap(IndexWriter.SOURCE, source), null);
|
||||||
info.setFiles(Collections.singleton(name + "_size=" + Long.toString((long) (sizeMB * 1024 * 1024)) + ".fake"));
|
info.setFiles(Collections.singleton(name + "_size=" + Long.toString((long) (sizeMB * 1024 * 1024)) + ".fake"));
|
||||||
return new SegmentCommitInfo(info, numDeletedDocs, 0, 0, 0, 0);
|
return new SegmentCommitInfo(info, numDeletedDocs, 0, 0, 0, 0);
|
||||||
}
|
}
|
||||||
|
@ -246,4 +273,162 @@ public abstract class BaseMergePolicyTestCase extends LuceneTestCase {
|
||||||
|
|
||||||
};
|
};
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Apply a merge to a {@link SegmentInfos} instance, accumulating the number
|
||||||
|
* of written bytes into {@code stats}.
|
||||||
|
*/
|
||||||
|
protected static SegmentInfos applyMerge(SegmentInfos infos, OneMerge merge, String mergedSegmentName, IOStats stats) throws IOException {
|
||||||
|
LinkedHashSet<SegmentCommitInfo> scis = new LinkedHashSet<>(infos.asList());
|
||||||
|
int newMaxDoc = 0;
|
||||||
|
double newSize = 0;
|
||||||
|
for (SegmentCommitInfo sci : merge.segments) {
|
||||||
|
int numLiveDocs = sci.info.maxDoc() - sci.getDelCount();
|
||||||
|
newSize += (double) sci.sizeInBytes() * numLiveDocs / sci.info.maxDoc() / 1024 / 1024;
|
||||||
|
newMaxDoc += numLiveDocs;
|
||||||
|
boolean removed = scis.remove(sci);
|
||||||
|
assertTrue(removed);
|
||||||
|
}
|
||||||
|
SegmentInfos newInfos = new SegmentInfos(Version.LATEST.major);
|
||||||
|
newInfos.addAll(scis);
|
||||||
|
// Now add the merged segment
|
||||||
|
newInfos.add(makeSegmentCommitInfo(mergedSegmentName, newMaxDoc, 0, newSize, IndexWriter.SOURCE_MERGE));
|
||||||
|
stats.mergeBytesWritten += newSize * 1024 * 1024;
|
||||||
|
return newInfos;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Apply {@code numDeletes} uniformly across all segments of {@code infos}.
|
||||||
|
*/
|
||||||
|
protected static SegmentInfos applyDeletes(SegmentInfos infos, int numDeletes) {
|
||||||
|
List<SegmentCommitInfo> infoList = infos.asList();
|
||||||
|
int totalNumDocs = infoList.stream()
|
||||||
|
.mapToInt(s -> s.info.maxDoc() - s.getDelCount())
|
||||||
|
.sum();
|
||||||
|
if (numDeletes > totalNumDocs) {
|
||||||
|
throw new IllegalArgumentException("More deletes than documents");
|
||||||
|
}
|
||||||
|
double w = (double) numDeletes / totalNumDocs;
|
||||||
|
List<SegmentCommitInfo> newInfoList = new ArrayList<>();
|
||||||
|
for (int i = 0; i < infoList.size(); ++i) {
|
||||||
|
assert numDeletes >= 0;
|
||||||
|
SegmentCommitInfo sci = infoList.get(i);
|
||||||
|
int segDeletes;
|
||||||
|
if (i == infoList.size() - 1) {
|
||||||
|
segDeletes = numDeletes;
|
||||||
|
} else {
|
||||||
|
segDeletes = Math.min(numDeletes, (int) Math.ceil(w * (sci.info.maxDoc() - sci.getDelCount())));
|
||||||
|
}
|
||||||
|
int newDelCount = sci.getDelCount() + segDeletes;
|
||||||
|
assert newDelCount <= sci.info.maxDoc();
|
||||||
|
if (newDelCount < sci.info.maxDoc()) { // drop fully deleted segments
|
||||||
|
SegmentCommitInfo newInfo = new SegmentCommitInfo(sci.info, sci.getDelCount() + segDeletes, 0, sci.getDelGen() + 1, sci.getFieldInfosGen(), sci.getDocValuesGen());
|
||||||
|
newInfoList.add(newInfo);
|
||||||
|
}
|
||||||
|
numDeletes -= segDeletes;
|
||||||
|
}
|
||||||
|
assert numDeletes == 0;
|
||||||
|
SegmentInfos newInfos = new SegmentInfos(Version.LATEST.major);
|
||||||
|
newInfos.addAll(newInfoList);
|
||||||
|
return newInfos;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Simulate an append-only use-case, ie. there are no deletes.
|
||||||
|
*/
|
||||||
|
public void testSimulateAppendOnly() throws IOException {
|
||||||
|
doTestSimulateAppendOnly(mergePolicy(), 100_000_000, 10_000);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Simulate an append-only use-case, ie. there are no deletes.
|
||||||
|
* {@code totalDocs} exist in the index in the end, and flushes contribute at most
|
||||||
|
* {@code maxDocsPerFlush} documents.
|
||||||
|
*/
|
||||||
|
protected void doTestSimulateAppendOnly(MergePolicy mergePolicy, int totalDocs, int maxDocsPerFlush) throws IOException {
|
||||||
|
IOStats stats = new IOStats();
|
||||||
|
AtomicLong segNameGenerator = new AtomicLong();
|
||||||
|
MergeContext mergeContext = new MockMergeContext(SegmentCommitInfo::getDelCount);
|
||||||
|
SegmentInfos segmentInfos = new SegmentInfos(Version.LATEST.major);
|
||||||
|
final double avgDocSizeMB = 5. / 1024; // 5kB
|
||||||
|
for (int numDocs = 0; numDocs < totalDocs; ) {
|
||||||
|
int flushDocCount = TestUtil.nextInt(random(), 1, maxDocsPerFlush);
|
||||||
|
numDocs += flushDocCount;
|
||||||
|
double flushSizeMB = flushDocCount * avgDocSizeMB;
|
||||||
|
stats.flushBytesWritten += flushSizeMB * 1024 * 1024;
|
||||||
|
segmentInfos.add(makeSegmentCommitInfo("_" + segNameGenerator.getAndIncrement(), flushDocCount, 0, flushSizeMB, IndexWriter.SOURCE_FLUSH));
|
||||||
|
|
||||||
|
MergeSpecification merges = mergePolicy.findMerges(MergeTrigger.SEGMENT_FLUSH, segmentInfos, mergeContext);
|
||||||
|
while (merges != null) {
|
||||||
|
assertMerge(mergePolicy, merges);
|
||||||
|
for (OneMerge oneMerge : merges.merges) {
|
||||||
|
segmentInfos = applyMerge(segmentInfos, oneMerge, "_" + segNameGenerator.getAndIncrement(), stats);
|
||||||
|
}
|
||||||
|
merges = mergePolicy.findMerges(MergeTrigger.MERGE_FINISHED, segmentInfos, mergeContext);
|
||||||
|
}
|
||||||
|
assertSegmentInfos(mergePolicy, segmentInfos);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (VERBOSE) {
|
||||||
|
System.out.println("Write amplification for append-only: " + (double) (stats.flushBytesWritten + stats.mergeBytesWritten) / stats.flushBytesWritten);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Simulate an update use-case where documents are uniformly updated across segments.
|
||||||
|
*/
|
||||||
|
public void testSimulateUpdates() throws IOException {
|
||||||
|
doTestSimulateUpdates(mergePolicy(), 10_000_000, 2500);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Simulate an update use-case where documents are uniformly updated across segments.
|
||||||
|
* {@code totalDocs} exist in the index in the end, and flushes contribute at most
|
||||||
|
* {@code maxDocsPerFlush} documents.
|
||||||
|
*/
|
||||||
|
protected void doTestSimulateUpdates(MergePolicy mergePolicy, int totalDocs, int maxDocsPerFlush) throws IOException {
|
||||||
|
IOStats stats = new IOStats();
|
||||||
|
AtomicLong segNameGenerator = new AtomicLong();
|
||||||
|
MergeContext mergeContext = new MockMergeContext(SegmentCommitInfo::getDelCount);
|
||||||
|
SegmentInfos segmentInfos = new SegmentInfos(Version.LATEST.major);
|
||||||
|
final double avgDocSizeMB = 5. / 1024; // 5kB
|
||||||
|
for (int numDocs = 0; numDocs < totalDocs; ) {
|
||||||
|
int flushDocCount = TestUtil.nextInt(random(), 1, maxDocsPerFlush);
|
||||||
|
// how many of these documents are actually updates
|
||||||
|
int delCount = (int) (flushDocCount * 0.9 * numDocs / totalDocs);
|
||||||
|
numDocs += flushDocCount - delCount;
|
||||||
|
segmentInfos = applyDeletes(segmentInfos, delCount);
|
||||||
|
double flushSize = flushDocCount * avgDocSizeMB;
|
||||||
|
stats.flushBytesWritten += flushSize * 1024 * 1024;
|
||||||
|
segmentInfos.add(makeSegmentCommitInfo("_" + segNameGenerator.getAndIncrement(), flushDocCount, 0, flushSize, IndexWriter.SOURCE_FLUSH));
|
||||||
|
MergeSpecification merges = mergePolicy.findMerges(MergeTrigger.SEGMENT_FLUSH, segmentInfos, mergeContext);
|
||||||
|
while (merges != null) {
|
||||||
|
assertMerge(mergePolicy, merges);
|
||||||
|
for (OneMerge oneMerge : merges.merges) {
|
||||||
|
segmentInfos = applyMerge(segmentInfos, oneMerge, "_" + segNameGenerator.getAndIncrement(), stats);
|
||||||
|
}
|
||||||
|
merges = mergePolicy.findMerges(MergeTrigger.MERGE_FINISHED, segmentInfos, mergeContext);
|
||||||
|
}
|
||||||
|
assertSegmentInfos(mergePolicy, segmentInfos);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (VERBOSE) {
|
||||||
|
System.out.println("Write amplification for update: " + (double) (stats.flushBytesWritten + stats.mergeBytesWritten) / stats.flushBytesWritten);
|
||||||
|
int totalDelCount = segmentInfos.asList().stream()
|
||||||
|
.mapToInt(SegmentCommitInfo::getDelCount)
|
||||||
|
.sum();
|
||||||
|
int totalMaxDoc = segmentInfos.asList().stream()
|
||||||
|
.map(s -> s.info)
|
||||||
|
.mapToInt(SegmentInfo::maxDoc)
|
||||||
|
.sum();
|
||||||
|
System.out.println("Final live ratio: " + (1 - (double) totalDelCount / totalMaxDoc));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Statistics about bytes written to storage. */
|
||||||
|
public static class IOStats {
|
||||||
|
/** Bytes written through flushes. */
|
||||||
|
long flushBytesWritten;
|
||||||
|
/** Bytes written through merges. */
|
||||||
|
long mergeBytesWritten;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue