diff --git a/lucene/CHANGES.txt b/lucene/CHANGES.txt index bc20db67121..8532ce81f8e 100644 --- a/lucene/CHANGES.txt +++ b/lucene/CHANGES.txt @@ -260,6 +260,10 @@ New Features merge policy to try to have at least this number of segments on the highest tier. (Adrien Grand, Carlos Delgado) +* GITHUB#13517: Allow configuring the search concurrency on LogDocMergePolicy + and LogByteSizeMergePolicy via a new #setTargetConcurrency setter. + (Adrien Grand) + Improvements --------------------- diff --git a/lucene/core/src/java/org/apache/lucene/index/LogMergePolicy.java b/lucene/core/src/java/org/apache/lucene/index/LogMergePolicy.java index f7c5011d9c6..f2113d0d13b 100644 --- a/lucene/core/src/java/org/apache/lucene/index/LogMergePolicy.java +++ b/lucene/core/src/java/org/apache/lucene/index/LogMergePolicy.java @@ -90,6 +90,12 @@ public abstract class LogMergePolicy extends MergePolicy { /** If true, we pro-rate a segment's size by the percentage of non-deleted documents. */ protected boolean calibrateSizeByDeletes = true; + /** + * Target search concurrency. This merge policy will avoid creating segments that have more than + * {@code maxDoc / targetSearchConcurrency} documents. + */ + protected int targetSearchConcurrency = 1; + /** Sole constructor. (For invocation by subclass constructors, typically implicit.) */ public LogMergePolicy() { super(DEFAULT_NO_CFS_RATIO, MergePolicy.DEFAULT_MAX_CFS_SEGMENT_SIZE); @@ -131,6 +137,28 @@ public abstract class LogMergePolicy extends MergePolicy { return calibrateSizeByDeletes; } + /** + * Sets the target search concurrency. This prevents creating segments that are bigger than + * maxDoc/targetSearchConcurrency, which in turn makes the work parallelizable into + * targetSearchConcurrency slices of similar doc counts. + * + *

NOTE: Configuring a value greater than 1 will increase the number of segments in the + * index linearly with the value of {@code targetSearchConcurrency} and also increase write + * amplification. + */ + public void setTargetSearchConcurrency(int targetSearchConcurrency) { + if (targetSearchConcurrency < 1) { + throw new IllegalArgumentException( + "targetSearchConcurrency must be >= 1 (got " + targetSearchConcurrency + ")"); + } + this.targetSearchConcurrency = targetSearchConcurrency; + } + + /** Returns the target search concurrency. */ + public int getTargetSearchConcurrency() { + return targetSearchConcurrency; + } + /** * Return the number of documents in the provided {@link SegmentCommitInfo}, pro-rated by * percentage of non-deleted documents if {@link #setCalibrateSizeByDeletes} is set. @@ -484,8 +512,10 @@ public abstract class LogMergePolicy extends MergePolicy { final Set mergingSegments = mergeContext.getMergingSegments(); + int totalDocCount = 0; for (int i = 0; i < numSegments; i++) { final SegmentCommitInfo info = infos.info(i); + totalDocCount += sizeDocs(info, mergeContext); long size = size(info, mergeContext); // Floor tiny segments @@ -575,6 +605,9 @@ public abstract class LogMergePolicy extends MergePolicy { mergeContext); } + final int maxMergeDocs = + Math.min(this.maxMergeDocs, Math.ceilDiv(totalDocCount, targetSearchConcurrency)); + // Finally, record all merges that are viable at this level: int end = start + mergeFactor; while (end <= 1 + upto) { @@ -590,7 +623,9 @@ public abstract class LogMergePolicy extends MergePolicy { } long segmentSize = size(info, mergeContext); long segmentDocs = sizeDocs(info, mergeContext); - if (mergeSize + segmentSize > maxMergeSize || mergeDocs + segmentDocs > maxMergeDocs) { + if (mergeSize + segmentSize > maxMergeSize + || (mergeSize + segmentSize > minMergeSize + && mergeDocs + segmentDocs > maxMergeDocs)) { // This merge is full, stop adding more segments to it if (i == start) { // This segment alone is too large, return a singleton merge diff --git a/lucene/core/src/test/org/apache/lucene/TestMergeSchedulerExternal.java b/lucene/core/src/test/org/apache/lucene/TestMergeSchedulerExternal.java index 506a76f9989..99e1e8e1fc0 100644 --- a/lucene/core/src/test/org/apache/lucene/TestMergeSchedulerExternal.java +++ b/lucene/core/src/test/org/apache/lucene/TestMergeSchedulerExternal.java @@ -132,7 +132,7 @@ public class TestMergeSchedulerExternal extends LuceneTestCase { logMP.setMergeFactor(10); try { - for (int i = 0; i < 20; i++) { + for (int i = 0; i < 60; i++) { writer.addDocument(doc); } } catch ( diff --git a/lucene/core/src/test/org/apache/lucene/index/TestConcurrentMergeScheduler.java b/lucene/core/src/test/org/apache/lucene/index/TestConcurrentMergeScheduler.java index 5f85b5d3774..e0b2c49d854 100644 --- a/lucene/core/src/test/org/apache/lucene/index/TestConcurrentMergeScheduler.java +++ b/lucene/core/src/test/org/apache/lucene/index/TestConcurrentMergeScheduler.java @@ -802,6 +802,7 @@ public class TestConcurrentMergeScheduler extends LuceneTestCase { iwc.setMaxBufferedDocs(2); LogMergePolicy lmp = newLogMergePolicy(); lmp.setMergeFactor(2); + lmp.setTargetSearchConcurrency(1); iwc.setMergePolicy(lmp); IndexWriter w = new IndexWriter(dir, iwc); diff --git a/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterExceptions.java b/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterExceptions.java index 0e7f7814f92..5757141b8f2 100644 --- a/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterExceptions.java +++ b/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterExceptions.java @@ -471,6 +471,7 @@ public class TestIndexWriterExceptions extends LuceneTestCase { cms.setSuppressExceptions(); conf.setMergeScheduler(cms); ((LogMergePolicy) conf.getMergePolicy()).setMergeFactor(2); + ((LogMergePolicy) conf.getMergePolicy()).setTargetSearchConcurrency(1); TestPoint3 testPoint = new TestPoint3(); IndexWriter w = RandomIndexWriter.mockIndexWriter(random(), dir, conf, testPoint); testPoint.doFail = true; diff --git a/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterMergePolicy.java b/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterMergePolicy.java index 1835a4ff309..c56feb62d84 100644 --- a/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterMergePolicy.java +++ b/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterMergePolicy.java @@ -900,8 +900,10 @@ public class TestIndexWriterMergePolicy extends LuceneTestCase { } public void testSetDiagnostics() throws IOException { + LogMergePolicy logMp = newLogMergePolicy(4); + logMp.setTargetSearchConcurrency(1); MergePolicy myMergePolicy = - new FilterMergePolicy(newLogMergePolicy(4)) { + new FilterMergePolicy(logMp) { @Override public MergeSpecification findMerges( MergeTrigger mergeTrigger, SegmentInfos segmentInfos, MergeContext mergeContext) diff --git a/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterReader.java b/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterReader.java index 51b502a122b..82aa548df80 100644 --- a/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterReader.java +++ b/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterReader.java @@ -999,6 +999,8 @@ public class TestIndexWriterReader extends LuceneTestCase { public void testSegmentWarmer() throws Exception { Directory dir = newDirectory(); final AtomicBoolean didWarm = new AtomicBoolean(); + LogMergePolicy mp = newLogMergePolicy(10); + mp.setTargetSearchConcurrency(1); IndexWriter w = new IndexWriter( dir, @@ -1012,7 +1014,7 @@ public class TestIndexWriterReader extends LuceneTestCase { assertEquals(20, count); didWarm.set(true); }) - .setMergePolicy(newLogMergePolicy(10))); + .setMergePolicy(mp)); Document doc = new Document(); doc.add(newStringField("foo", "bar", Field.Store.NO)); @@ -1045,6 +1047,8 @@ public class TestIndexWriterReader extends LuceneTestCase { return true; } }; + LogMergePolicy mp = newLogMergePolicy(10); + mp.setTargetSearchConcurrency(1); IndexWriter w = new IndexWriter( dir, @@ -1053,7 +1057,7 @@ public class TestIndexWriterReader extends LuceneTestCase { .setReaderPooling(true) .setInfoStream(infoStream) .setMergedSegmentWarmer(new SimpleMergedSegmentWarmer(infoStream)) - .setMergePolicy(newLogMergePolicy(10))); + .setMergePolicy(mp)); Document doc = new Document(); doc.add(newStringField("foo", "bar", Field.Store.NO)); diff --git a/lucene/core/src/test/org/apache/lucene/index/TestLogMergePolicy.java b/lucene/core/src/test/org/apache/lucene/index/TestLogMergePolicy.java index 0b00a20a17f..ea60f9b1e09 100644 --- a/lucene/core/src/test/org/apache/lucene/index/TestLogMergePolicy.java +++ b/lucene/core/src/test/org/apache/lucene/index/TestLogMergePolicy.java @@ -249,8 +249,10 @@ public class TestLogMergePolicy extends BaseMergePolicyTestCase { SegmentInfos segmentInfos = new SegmentInfos(Version.LATEST.major); LogMergePolicy mp = mergePolicy(); + // Number of segments guaranteed to trigger a merge. + int numSegmentsForMerging = mp.getMergeFactor() + mp.getTargetSearchConcurrency(); - for (int i = 0; i < mp.getMergeFactor(); ++i) { + for (int i = 0; i < numSegmentsForMerging; ++i) { segmentInfos.add( makeSegmentCommitInfo( "_" + segNameGenerator.getAndIncrement(), @@ -266,6 +268,6 @@ public class TestLogMergePolicy extends BaseMergePolicyTestCase { segmentInfos = applyMerge(segmentInfos, merge, "_" + segNameGenerator.getAndIncrement(), stats); } - assertEquals(1, segmentInfos.size()); + assertTrue(segmentInfos.size() < numSegmentsForMerging); } } diff --git a/lucene/test-framework/src/java/org/apache/lucene/tests/util/LuceneTestCase.java b/lucene/test-framework/src/java/org/apache/lucene/tests/util/LuceneTestCase.java index 0f9cab1eda4..3299d8ddca9 100644 --- a/lucene/test-framework/src/java/org/apache/lucene/tests/util/LuceneTestCase.java +++ b/lucene/test-framework/src/java/org/apache/lucene/tests/util/LuceneTestCase.java @@ -1053,6 +1053,7 @@ public abstract class LuceneTestCase extends Assert { public static LogMergePolicy newLogMergePolicy(Random r) { LogMergePolicy logmp = r.nextBoolean() ? new LogDocMergePolicy() : new LogByteSizeMergePolicy(); logmp.setCalibrateSizeByDeletes(r.nextBoolean()); + logmp.setTargetSearchConcurrency(TestUtil.nextInt(random(), 1, 16)); if (rarely(r)) { logmp.setMergeFactor(TestUtil.nextInt(r, 2, 9)); } else { @@ -1112,14 +1113,14 @@ public abstract class LuceneTestCase extends Assert { return logmp; } - public static MergePolicy newLogMergePolicy(boolean useCFS, int mergeFactor) { + public static LogMergePolicy newLogMergePolicy(boolean useCFS, int mergeFactor) { LogMergePolicy logmp = newLogMergePolicy(); logmp.setNoCFSRatio(useCFS ? 1.0 : 0.0); logmp.setMergeFactor(mergeFactor); return logmp; } - public static MergePolicy newLogMergePolicy(int mergeFactor) { + public static LogMergePolicy newLogMergePolicy(int mergeFactor) { LogMergePolicy logmp = newLogMergePolicy(); logmp.setMergeFactor(mergeFactor); return logmp;