From b1bb11b79dc50ff9babcf660c695f0af30557a57 Mon Sep 17 00:00:00 2001 From: Adrien Grand Date: Tue, 10 Jul 2018 08:53:41 +0200 Subject: [PATCH] LUCENE-8391: More tests for merge policies. --- .../lucene/index/TestLogMergePolicy.java | 17 ++ .../lucene/index/TestNoMergePolicy.java | 25 +++ .../lucene/index/TestTieredMergePolicy.java | 71 +++++- .../index/TestUpgradeIndexMergePolicy.java | 13 ++ .../lucene/index/BaseMergePolicyTestCase.java | 207 +++++++++++++++++- 5 files changed, 318 insertions(+), 15 deletions(-) diff --git a/lucene/core/src/test/org/apache/lucene/index/TestLogMergePolicy.java b/lucene/core/src/test/org/apache/lucene/index/TestLogMergePolicy.java index 9dde87a027c..2ee0b2f8a56 100644 --- a/lucene/core/src/test/org/apache/lucene/index/TestLogMergePolicy.java +++ b/lucene/core/src/test/org/apache/lucene/index/TestLogMergePolicy.java @@ -16,6 +16,10 @@ */ package org.apache.lucene.index; +import java.io.IOException; + +import org.apache.lucene.index.MergePolicy.MergeSpecification; +import org.apache.lucene.index.MergePolicy.OneMerge; public class TestLogMergePolicy extends BaseMergePolicyTestCase { @@ -27,4 +31,17 @@ public class TestLogMergePolicy extends BaseMergePolicyTestCase { LogByteSizeMergePolicy mp = new LogByteSizeMergePolicy(); assertTrue(mp.getMaxMergeMBForForcedMerge() > 0.0); } + + @Override + protected void assertSegmentInfos(MergePolicy policy, SegmentInfos infos) throws IOException { + // TODO + } + + @Override + protected void assertMerge(MergePolicy policy, MergeSpecification merge) throws IOException { + LogMergePolicy lmp = (LogMergePolicy) policy; + for (OneMerge oneMerge : merge.merges) { + assertEquals(lmp.getMergeFactor(), oneMerge.segments.size()); + } + } } diff --git a/lucene/core/src/test/org/apache/lucene/index/TestNoMergePolicy.java b/lucene/core/src/test/org/apache/lucene/index/TestNoMergePolicy.java index 415a263e273..79be9b226aa 100644 --- a/lucene/core/src/test/org/apache/lucene/index/TestNoMergePolicy.java +++ b/lucene/core/src/test/org/apache/lucene/index/TestNoMergePolicy.java @@ -17,11 +17,13 @@ package org.apache.lucene.index; +import java.io.IOException; import java.lang.reflect.Constructor; import java.lang.reflect.Method; import java.lang.reflect.Modifier; import java.util.Arrays; +import org.apache.lucene.index.MergePolicy.MergeSpecification; import org.junit.Test; public class TestNoMergePolicy extends BaseMergePolicyTestCase { @@ -66,4 +68,27 @@ public class TestNoMergePolicy extends BaseMergePolicyTestCase { } } + @Override + protected void assertSegmentInfos(MergePolicy policy, SegmentInfos infos) throws IOException { + for (SegmentCommitInfo info : infos) { + assertEquals(IndexWriter.SOURCE_FLUSH, info.info.getAttribute(IndexWriter.SOURCE)); + } + } + + @Override + protected void assertMerge(MergePolicy policy, MergeSpecification merge) throws IOException { + fail(); // should never happen + } + + @Override + public void testSimulateAppendOnly() throws IOException { + // Reduce numbers as this merge policy doesn't work well with lots of data + doTestSimulateAppendOnly(mergePolicy(), 1_000_000, 10_000); + } + + @Override + public void testSimulateUpdates() throws IOException { + // Reduce numbers as this merge policy doesn't work well with lots of data + doTestSimulateUpdates(mergePolicy(), 100_000, 1000); + } } diff --git a/lucene/core/src/test/org/apache/lucene/index/TestTieredMergePolicy.java b/lucene/core/src/test/org/apache/lucene/index/TestTieredMergePolicy.java index f31d9885853..01432c52c0b 100644 --- a/lucene/core/src/test/org/apache/lucene/index/TestTieredMergePolicy.java +++ b/lucene/core/src/test/org/apache/lucene/index/TestTieredMergePolicy.java @@ -34,10 +34,57 @@ import org.apache.lucene.util.Version; public class TestTieredMergePolicy extends BaseMergePolicyTestCase { - public MergePolicy mergePolicy() { + @Override + public TieredMergePolicy mergePolicy() { return newTieredMergePolicy(); } + @Override + protected void assertSegmentInfos(MergePolicy policy, SegmentInfos infos) throws IOException { + TieredMergePolicy tmp = (TieredMergePolicy) policy; + + final long maxMergedSegmentBytes = (long) (tmp.getMaxMergedSegmentMB() * 1024 * 1024); + + long minSegmentBytes = Long.MAX_VALUE; + long totalBytes = 0; + for (SegmentCommitInfo sci : infos) { + long byteSize = sci.sizeInBytes(); + double liveRatio = 1 - (double) sci.getDelCount() / sci.info.maxDoc(); + long weightedByteSize = (long) Math.round(liveRatio * byteSize); + totalBytes += weightedByteSize; + minSegmentBytes = Math.min(minSegmentBytes, weightedByteSize); + } + + long levelSize = Math.max(minSegmentBytes, (long) (tmp.getFloorSegmentMB() * 1024 * 1024)); + long bytesLeft = totalBytes; + double allowedSegCount = 0; + // below we make the assumption that segments that reached the max segment + // size divided by 2 don't need merging anymore + int mergeFactor = (int) Math.min(tmp.getSegmentsPerTier(), tmp.getMaxMergeAtOnce()); + while (true) { + final double segCountLevel = bytesLeft / (double) levelSize; + if (segCountLevel < tmp.getSegmentsPerTier() || levelSize > maxMergedSegmentBytes / 2) { + allowedSegCount += Math.ceil(segCountLevel); + break; + } + allowedSegCount += tmp.getSegmentsPerTier(); + bytesLeft -= tmp.getSegmentsPerTier() * levelSize; + levelSize = Math.min(levelSize * mergeFactor, maxMergedSegmentBytes / 2); + } + allowedSegCount = Math.max(allowedSegCount, tmp.getSegmentsPerTier()); + + int numSegments = infos.asList().size(); + assertTrue("numSegments=" + numSegments + ", allowed=" + allowedSegCount, numSegments <= allowedSegCount); + } + + @Override + protected void assertMerge(MergePolicy policy, MergeSpecification merges) { + TieredMergePolicy tmp = (TieredMergePolicy) policy; + for (OneMerge merge : merges.merges) { + assertTrue(merge.segments.size() <= tmp.getMaxMergeAtOnce()); + } + } + public void testForceMergeDeletes() throws Exception { Directory dir = newDirectory(); IndexWriterConfig conf = newIndexWriterConfig(new MockAnalyzer(random())); @@ -503,10 +550,10 @@ public class TestTieredMergePolicy extends BaseMergePolicyTestCase { SegmentInfos infos = new SegmentInfos(Version.LATEST.major); int i = 0; for (int j = 0; j < 30; ++j) { - infos.add(makeSegmentCommitInfo("_" + i, 1000, 0, 1024)); // max size + infos.add(makeSegmentCommitInfo("_" + i, 1000, 0, 1024, IndexWriter.SOURCE_MERGE)); // max size } for (int j = 0; j < 8; ++j) { - infos.add(makeSegmentCommitInfo("_" + i, 1000, 0, 102)); // 102MB flushes + infos.add(makeSegmentCommitInfo("_" + i, 1000, 0, 102, IndexWriter.SOURCE_FLUSH)); // 102MB flushes } // Only 8 segments on 1 tier in addition to the max-size segments, nothing to do @@ -514,7 +561,7 @@ public class TestTieredMergePolicy extends BaseMergePolicyTestCase { assertNull(mergeSpec); for (int j = 0; j < 5; ++j) { - infos.add(makeSegmentCommitInfo("_" + i, 1000, 0, 102)); // 102MB flushes + infos.add(makeSegmentCommitInfo("_" + i, 1000, 0, 102, IndexWriter.SOURCE_FLUSH)); // 102MB flushes } // Now 13 segments on 1 tier in addition to the max-size segments, 10 of them should get merged in one merge @@ -524,4 +571,20 @@ public class TestTieredMergePolicy extends BaseMergePolicyTestCase { OneMerge merge = mergeSpec.merges.get(0); assertEquals(10, merge.segments.size()); } + + @Override + public void testSimulateAppendOnly() throws IOException { + TieredMergePolicy mergePolicy = mergePolicy(); + // Avoid low values of the max merged segment size which prevent this merge policy from scaling well + mergePolicy.setMaxMergedSegmentMB(TestUtil.nextInt(random(), 1024, 10 * 1024)); + doTestSimulateAppendOnly(mergePolicy, 100_000_000, 10_000); + } + + @Override + public void testSimulateUpdates() throws IOException { + TieredMergePolicy mergePolicy = mergePolicy(); + // Avoid low values of the max merged segment size which prevent this merge policy from scaling well + mergePolicy.setMaxMergedSegmentMB(TestUtil.nextInt(random(), 1024, 10 * 1024)); + doTestSimulateUpdates(mergePolicy, 10_000_000, 2500); + } } diff --git a/lucene/core/src/test/org/apache/lucene/index/TestUpgradeIndexMergePolicy.java b/lucene/core/src/test/org/apache/lucene/index/TestUpgradeIndexMergePolicy.java index 0ab13b42def..a2c66006477 100644 --- a/lucene/core/src/test/org/apache/lucene/index/TestUpgradeIndexMergePolicy.java +++ b/lucene/core/src/test/org/apache/lucene/index/TestUpgradeIndexMergePolicy.java @@ -16,6 +16,9 @@ */ package org.apache.lucene.index; +import java.io.IOException; + +import org.apache.lucene.index.MergePolicy.MergeSpecification; public class TestUpgradeIndexMergePolicy extends BaseMergePolicyTestCase { @@ -23,4 +26,14 @@ public class TestUpgradeIndexMergePolicy extends BaseMergePolicyTestCase { return new UpgradeIndexMergePolicy(newMergePolicy(random())); } + @Override + protected void assertSegmentInfos(MergePolicy policy, SegmentInfos infos) throws IOException { + // no-op + } + + @Override + protected void assertMerge(MergePolicy policy, MergeSpecification merge) throws IOException { + // no-op + } + } diff --git a/lucene/test-framework/src/java/org/apache/lucene/index/BaseMergePolicyTestCase.java b/lucene/test-framework/src/java/org/apache/lucene/index/BaseMergePolicyTestCase.java index 6f14a2f465b..8ec9248c065 100644 --- a/lucene/test-framework/src/java/org/apache/lucene/index/BaseMergePolicyTestCase.java +++ b/lucene/test-framework/src/java/org/apache/lucene/index/BaseMergePolicyTestCase.java @@ -16,8 +16,23 @@ */ package org.apache.lucene.index; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.ToIntFunction; + import org.apache.lucene.analysis.MockAnalyzer; import org.apache.lucene.document.Document; +import org.apache.lucene.index.MergePolicy.MergeContext; +import org.apache.lucene.index.MergePolicy.MergeSpecification; +import org.apache.lucene.index.MergePolicy.OneMerge; import org.apache.lucene.store.Directory; import org.apache.lucene.store.IOContext; import org.apache.lucene.store.IndexInput; @@ -30,14 +45,6 @@ import org.apache.lucene.util.StringHelper; import org.apache.lucene.util.TestUtil; import org.apache.lucene.util.Version; -import java.io.IOException; -import java.nio.charset.StandardCharsets; -import java.util.Collection; -import java.util.Collections; -import java.util.Set; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.function.ToIntFunction; - /** * Base test case for {@link MergePolicy}. */ @@ -46,6 +53,18 @@ public abstract class BaseMergePolicyTestCase extends LuceneTestCase { /** Create a new {@link MergePolicy} instance. */ protected abstract MergePolicy mergePolicy(); + /** + * Assert that the given segment infos match expectations of the merge + * policy, assuming segments that have only been either flushed or merged with + * this merge policy. + */ + protected abstract void assertSegmentInfos(MergePolicy policy, SegmentInfos infos) throws IOException; + + /** + * Assert that the given merge matches expectations of the merge policy. + */ + protected abstract void assertMerge(MergePolicy policy, MergeSpecification merge) throws IOException; + public void testForceMergeNotNeeded() throws IOException { try (Directory dir = newDirectory()) { final AtomicBoolean mayMerge = new AtomicBoolean(true); @@ -135,7 +154,13 @@ public abstract class BaseMergePolicyTestCase extends LuceneTestCase { */ public static final class MockMergeContext implements MergePolicy.MergeContext { private final ToIntFunction numDeletesFunc; - private final InfoStream infoStream = new NullInfoStream(); + private final InfoStream infoStream = new NullInfoStream() { + @Override + public boolean isEnabled(String component) { + // otherwise tests that simulate merging may bottleneck on generating messages + return false; + } + }; public MockMergeContext(ToIntFunction numDeletesFunc) { this.numDeletesFunc = numDeletesFunc; @@ -167,13 +192,15 @@ public abstract class BaseMergePolicyTestCase extends LuceneTestCase { * {@code numDeletedDocs} and {@code sizeInBytes}, which are usually the * numbers that merge policies care about. */ - protected static SegmentCommitInfo makeSegmentCommitInfo(String name, int maxDoc, int numDeletedDocs, double sizeMB) { + protected static SegmentCommitInfo makeSegmentCommitInfo(String name, int maxDoc, int numDeletedDocs, double sizeMB, String source) { if (name.startsWith("_") == false) { throw new IllegalArgumentException("name must start with an _, got " + name); } byte[] id = new byte[StringHelper.ID_LENGTH]; random().nextBytes(id); - SegmentInfo info = new SegmentInfo(FAKE_DIRECTORY, Version.LATEST, Version.LATEST, name, maxDoc, false, TestUtil.getDefaultCodec(), Collections.emptyMap(), id, Collections.emptyMap(), null); + SegmentInfo info = new SegmentInfo(FAKE_DIRECTORY, Version.LATEST, Version.LATEST, + name, maxDoc, false, TestUtil.getDefaultCodec(), Collections.emptyMap(), id, + Collections.singletonMap(IndexWriter.SOURCE, source), null); info.setFiles(Collections.singleton(name + "_size=" + Long.toString((long) (sizeMB * 1024 * 1024)) + ".fake")); return new SegmentCommitInfo(info, numDeletedDocs, 0, 0, 0, 0); } @@ -246,4 +273,162 @@ public abstract class BaseMergePolicyTestCase extends LuceneTestCase { }; + /** + * Apply a merge to a {@link SegmentInfos} instance, accumulating the number + * of written bytes into {@code stats}. + */ + protected static SegmentInfos applyMerge(SegmentInfos infos, OneMerge merge, String mergedSegmentName, IOStats stats) throws IOException { + LinkedHashSet scis = new LinkedHashSet<>(infos.asList()); + int newMaxDoc = 0; + double newSize = 0; + for (SegmentCommitInfo sci : merge.segments) { + int numLiveDocs = sci.info.maxDoc() - sci.getDelCount(); + newSize += (double) sci.sizeInBytes() * numLiveDocs / sci.info.maxDoc() / 1024 / 1024; + newMaxDoc += numLiveDocs; + boolean removed = scis.remove(sci); + assertTrue(removed); + } + SegmentInfos newInfos = new SegmentInfos(Version.LATEST.major); + newInfos.addAll(scis); + // Now add the merged segment + newInfos.add(makeSegmentCommitInfo(mergedSegmentName, newMaxDoc, 0, newSize, IndexWriter.SOURCE_MERGE)); + stats.mergeBytesWritten += newSize * 1024 * 1024; + return newInfos; + } + + /** + * Apply {@code numDeletes} uniformly across all segments of {@code infos}. + */ + protected static SegmentInfos applyDeletes(SegmentInfos infos, int numDeletes) { + List infoList = infos.asList(); + int totalNumDocs = infoList.stream() + .mapToInt(s -> s.info.maxDoc() - s.getDelCount()) + .sum(); + if (numDeletes > totalNumDocs) { + throw new IllegalArgumentException("More deletes than documents"); + } + double w = (double) numDeletes / totalNumDocs; + List newInfoList = new ArrayList<>(); + for (int i = 0; i < infoList.size(); ++i) { + assert numDeletes >= 0; + SegmentCommitInfo sci = infoList.get(i); + int segDeletes; + if (i == infoList.size() - 1) { + segDeletes = numDeletes; + } else { + segDeletes = Math.min(numDeletes, (int) Math.ceil(w * (sci.info.maxDoc() - sci.getDelCount()))); + } + int newDelCount = sci.getDelCount() + segDeletes; + assert newDelCount <= sci.info.maxDoc(); + if (newDelCount < sci.info.maxDoc()) { // drop fully deleted segments + SegmentCommitInfo newInfo = new SegmentCommitInfo(sci.info, sci.getDelCount() + segDeletes, 0, sci.getDelGen() + 1, sci.getFieldInfosGen(), sci.getDocValuesGen()); + newInfoList.add(newInfo); + } + numDeletes -= segDeletes; + } + assert numDeletes == 0; + SegmentInfos newInfos = new SegmentInfos(Version.LATEST.major); + newInfos.addAll(newInfoList); + return newInfos; + } + + /** + * Simulate an append-only use-case, ie. there are no deletes. + */ + public void testSimulateAppendOnly() throws IOException { + doTestSimulateAppendOnly(mergePolicy(), 100_000_000, 10_000); + } + + /** + * Simulate an append-only use-case, ie. there are no deletes. + * {@code totalDocs} exist in the index in the end, and flushes contribute at most + * {@code maxDocsPerFlush} documents. + */ + protected void doTestSimulateAppendOnly(MergePolicy mergePolicy, int totalDocs, int maxDocsPerFlush) throws IOException { + IOStats stats = new IOStats(); + AtomicLong segNameGenerator = new AtomicLong(); + MergeContext mergeContext = new MockMergeContext(SegmentCommitInfo::getDelCount); + SegmentInfos segmentInfos = new SegmentInfos(Version.LATEST.major); + final double avgDocSizeMB = 5. / 1024; // 5kB + for (int numDocs = 0; numDocs < totalDocs; ) { + int flushDocCount = TestUtil.nextInt(random(), 1, maxDocsPerFlush); + numDocs += flushDocCount; + double flushSizeMB = flushDocCount * avgDocSizeMB; + stats.flushBytesWritten += flushSizeMB * 1024 * 1024; + segmentInfos.add(makeSegmentCommitInfo("_" + segNameGenerator.getAndIncrement(), flushDocCount, 0, flushSizeMB, IndexWriter.SOURCE_FLUSH)); + + MergeSpecification merges = mergePolicy.findMerges(MergeTrigger.SEGMENT_FLUSH, segmentInfos, mergeContext); + while (merges != null) { + assertMerge(mergePolicy, merges); + for (OneMerge oneMerge : merges.merges) { + segmentInfos = applyMerge(segmentInfos, oneMerge, "_" + segNameGenerator.getAndIncrement(), stats); + } + merges = mergePolicy.findMerges(MergeTrigger.MERGE_FINISHED, segmentInfos, mergeContext); + } + assertSegmentInfos(mergePolicy, segmentInfos); + } + + if (VERBOSE) { + System.out.println("Write amplification for append-only: " + (double) (stats.flushBytesWritten + stats.mergeBytesWritten) / stats.flushBytesWritten); + } + } + + /** + * Simulate an update use-case where documents are uniformly updated across segments. + */ + public void testSimulateUpdates() throws IOException { + doTestSimulateUpdates(mergePolicy(), 10_000_000, 2500); + } + + /** + * Simulate an update use-case where documents are uniformly updated across segments. + * {@code totalDocs} exist in the index in the end, and flushes contribute at most + * {@code maxDocsPerFlush} documents. + */ + protected void doTestSimulateUpdates(MergePolicy mergePolicy, int totalDocs, int maxDocsPerFlush) throws IOException { + IOStats stats = new IOStats(); + AtomicLong segNameGenerator = new AtomicLong(); + MergeContext mergeContext = new MockMergeContext(SegmentCommitInfo::getDelCount); + SegmentInfos segmentInfos = new SegmentInfos(Version.LATEST.major); + final double avgDocSizeMB = 5. / 1024; // 5kB + for (int numDocs = 0; numDocs < totalDocs; ) { + int flushDocCount = TestUtil.nextInt(random(), 1, maxDocsPerFlush); + // how many of these documents are actually updates + int delCount = (int) (flushDocCount * 0.9 * numDocs / totalDocs); + numDocs += flushDocCount - delCount; + segmentInfos = applyDeletes(segmentInfos, delCount); + double flushSize = flushDocCount * avgDocSizeMB; + stats.flushBytesWritten += flushSize * 1024 * 1024; + segmentInfos.add(makeSegmentCommitInfo("_" + segNameGenerator.getAndIncrement(), flushDocCount, 0, flushSize, IndexWriter.SOURCE_FLUSH)); + MergeSpecification merges = mergePolicy.findMerges(MergeTrigger.SEGMENT_FLUSH, segmentInfos, mergeContext); + while (merges != null) { + assertMerge(mergePolicy, merges); + for (OneMerge oneMerge : merges.merges) { + segmentInfos = applyMerge(segmentInfos, oneMerge, "_" + segNameGenerator.getAndIncrement(), stats); + } + merges = mergePolicy.findMerges(MergeTrigger.MERGE_FINISHED, segmentInfos, mergeContext); + } + assertSegmentInfos(mergePolicy, segmentInfos); + } + + if (VERBOSE) { + System.out.println("Write amplification for update: " + (double) (stats.flushBytesWritten + stats.mergeBytesWritten) / stats.flushBytesWritten); + int totalDelCount = segmentInfos.asList().stream() + .mapToInt(SegmentCommitInfo::getDelCount) + .sum(); + int totalMaxDoc = segmentInfos.asList().stream() + .map(s -> s.info) + .mapToInt(SegmentInfo::maxDoc) + .sum(); + System.out.println("Final live ratio: " + (1 - (double) totalDelCount / totalMaxDoc)); + } + } + + /** Statistics about bytes written to storage. */ + public static class IOStats { + /** Bytes written through flushes. */ + long flushBytesWritten; + /** Bytes written through merges. */ + long mergeBytesWritten; + } }