Add a `targetSearchConcurrency` parameter to `LogMergePolicy`. (#13517)

This adds the same `targetSearchConcurrency` parameter to `LogMergePolicy`
that #13430 is adding to `TieredMergePolicy`.
This commit is contained in:
Adrien Grand 2024-07-18 11:28:35 +02:00 committed by GitHub
parent fff997f801
commit 9f040864a6
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 59 additions and 9 deletions

View File

@ -260,6 +260,10 @@ New Features
merge policy to try to have at least this number of segments on the highest merge policy to try to have at least this number of segments on the highest
tier. (Adrien Grand, Carlos Delgado) tier. (Adrien Grand, Carlos Delgado)
* GITHUB#13517: Allow configuring the search concurrency on LogDocMergePolicy
and LogByteSizeMergePolicy via a new #setTargetConcurrency setter.
(Adrien Grand)
Improvements Improvements
--------------------- ---------------------

View File

@ -90,6 +90,12 @@ public abstract class LogMergePolicy extends MergePolicy {
/** If true, we pro-rate a segment's size by the percentage of non-deleted documents. */ /** If true, we pro-rate a segment's size by the percentage of non-deleted documents. */
protected boolean calibrateSizeByDeletes = true; protected boolean calibrateSizeByDeletes = true;
/**
* Target search concurrency. This merge policy will avoid creating segments that have more than
* {@code maxDoc / targetSearchConcurrency} documents.
*/
protected int targetSearchConcurrency = 1;
/** Sole constructor. (For invocation by subclass constructors, typically implicit.) */ /** Sole constructor. (For invocation by subclass constructors, typically implicit.) */
public LogMergePolicy() { public LogMergePolicy() {
super(DEFAULT_NO_CFS_RATIO, MergePolicy.DEFAULT_MAX_CFS_SEGMENT_SIZE); super(DEFAULT_NO_CFS_RATIO, MergePolicy.DEFAULT_MAX_CFS_SEGMENT_SIZE);
@ -131,6 +137,28 @@ public abstract class LogMergePolicy extends MergePolicy {
return calibrateSizeByDeletes; return calibrateSizeByDeletes;
} }
/**
* Sets the target search concurrency. This prevents creating segments that are bigger than
* maxDoc/targetSearchConcurrency, which in turn makes the work parallelizable into
* targetSearchConcurrency slices of similar doc counts.
*
* <p><b>NOTE:</b> Configuring a value greater than 1 will increase the number of segments in the
* index linearly with the value of {@code targetSearchConcurrency} and also increase write
* amplification.
*/
public void setTargetSearchConcurrency(int targetSearchConcurrency) {
if (targetSearchConcurrency < 1) {
throw new IllegalArgumentException(
"targetSearchConcurrency must be >= 1 (got " + targetSearchConcurrency + ")");
}
this.targetSearchConcurrency = targetSearchConcurrency;
}
/** Returns the target search concurrency. */
public int getTargetSearchConcurrency() {
return targetSearchConcurrency;
}
/** /**
* Return the number of documents in the provided {@link SegmentCommitInfo}, pro-rated by * Return the number of documents in the provided {@link SegmentCommitInfo}, pro-rated by
* percentage of non-deleted documents if {@link #setCalibrateSizeByDeletes} is set. * percentage of non-deleted documents if {@link #setCalibrateSizeByDeletes} is set.
@ -484,8 +512,10 @@ public abstract class LogMergePolicy extends MergePolicy {
final Set<SegmentCommitInfo> mergingSegments = mergeContext.getMergingSegments(); final Set<SegmentCommitInfo> mergingSegments = mergeContext.getMergingSegments();
int totalDocCount = 0;
for (int i = 0; i < numSegments; i++) { for (int i = 0; i < numSegments; i++) {
final SegmentCommitInfo info = infos.info(i); final SegmentCommitInfo info = infos.info(i);
totalDocCount += sizeDocs(info, mergeContext);
long size = size(info, mergeContext); long size = size(info, mergeContext);
// Floor tiny segments // Floor tiny segments
@ -575,6 +605,9 @@ public abstract class LogMergePolicy extends MergePolicy {
mergeContext); mergeContext);
} }
final int maxMergeDocs =
Math.min(this.maxMergeDocs, Math.ceilDiv(totalDocCount, targetSearchConcurrency));
// Finally, record all merges that are viable at this level: // Finally, record all merges that are viable at this level:
int end = start + mergeFactor; int end = start + mergeFactor;
while (end <= 1 + upto) { while (end <= 1 + upto) {
@ -590,7 +623,9 @@ public abstract class LogMergePolicy extends MergePolicy {
} }
long segmentSize = size(info, mergeContext); long segmentSize = size(info, mergeContext);
long segmentDocs = sizeDocs(info, mergeContext); long segmentDocs = sizeDocs(info, mergeContext);
if (mergeSize + segmentSize > maxMergeSize || mergeDocs + segmentDocs > maxMergeDocs) { if (mergeSize + segmentSize > maxMergeSize
|| (mergeSize + segmentSize > minMergeSize
&& mergeDocs + segmentDocs > maxMergeDocs)) {
// This merge is full, stop adding more segments to it // This merge is full, stop adding more segments to it
if (i == start) { if (i == start) {
// This segment alone is too large, return a singleton merge // This segment alone is too large, return a singleton merge

View File

@ -132,7 +132,7 @@ public class TestMergeSchedulerExternal extends LuceneTestCase {
logMP.setMergeFactor(10); logMP.setMergeFactor(10);
try { try {
for (int i = 0; i < 20; i++) { for (int i = 0; i < 60; i++) {
writer.addDocument(doc); writer.addDocument(doc);
} }
} catch ( } catch (

View File

@ -802,6 +802,7 @@ public class TestConcurrentMergeScheduler extends LuceneTestCase {
iwc.setMaxBufferedDocs(2); iwc.setMaxBufferedDocs(2);
LogMergePolicy lmp = newLogMergePolicy(); LogMergePolicy lmp = newLogMergePolicy();
lmp.setMergeFactor(2); lmp.setMergeFactor(2);
lmp.setTargetSearchConcurrency(1);
iwc.setMergePolicy(lmp); iwc.setMergePolicy(lmp);
IndexWriter w = new IndexWriter(dir, iwc); IndexWriter w = new IndexWriter(dir, iwc);

View File

@ -471,6 +471,7 @@ public class TestIndexWriterExceptions extends LuceneTestCase {
cms.setSuppressExceptions(); cms.setSuppressExceptions();
conf.setMergeScheduler(cms); conf.setMergeScheduler(cms);
((LogMergePolicy) conf.getMergePolicy()).setMergeFactor(2); ((LogMergePolicy) conf.getMergePolicy()).setMergeFactor(2);
((LogMergePolicy) conf.getMergePolicy()).setTargetSearchConcurrency(1);
TestPoint3 testPoint = new TestPoint3(); TestPoint3 testPoint = new TestPoint3();
IndexWriter w = RandomIndexWriter.mockIndexWriter(random(), dir, conf, testPoint); IndexWriter w = RandomIndexWriter.mockIndexWriter(random(), dir, conf, testPoint);
testPoint.doFail = true; testPoint.doFail = true;

View File

@ -900,8 +900,10 @@ public class TestIndexWriterMergePolicy extends LuceneTestCase {
} }
public void testSetDiagnostics() throws IOException { public void testSetDiagnostics() throws IOException {
LogMergePolicy logMp = newLogMergePolicy(4);
logMp.setTargetSearchConcurrency(1);
MergePolicy myMergePolicy = MergePolicy myMergePolicy =
new FilterMergePolicy(newLogMergePolicy(4)) { new FilterMergePolicy(logMp) {
@Override @Override
public MergeSpecification findMerges( public MergeSpecification findMerges(
MergeTrigger mergeTrigger, SegmentInfos segmentInfos, MergeContext mergeContext) MergeTrigger mergeTrigger, SegmentInfos segmentInfos, MergeContext mergeContext)

View File

@ -999,6 +999,8 @@ public class TestIndexWriterReader extends LuceneTestCase {
public void testSegmentWarmer() throws Exception { public void testSegmentWarmer() throws Exception {
Directory dir = newDirectory(); Directory dir = newDirectory();
final AtomicBoolean didWarm = new AtomicBoolean(); final AtomicBoolean didWarm = new AtomicBoolean();
LogMergePolicy mp = newLogMergePolicy(10);
mp.setTargetSearchConcurrency(1);
IndexWriter w = IndexWriter w =
new IndexWriter( new IndexWriter(
dir, dir,
@ -1012,7 +1014,7 @@ public class TestIndexWriterReader extends LuceneTestCase {
assertEquals(20, count); assertEquals(20, count);
didWarm.set(true); didWarm.set(true);
}) })
.setMergePolicy(newLogMergePolicy(10))); .setMergePolicy(mp));
Document doc = new Document(); Document doc = new Document();
doc.add(newStringField("foo", "bar", Field.Store.NO)); doc.add(newStringField("foo", "bar", Field.Store.NO));
@ -1045,6 +1047,8 @@ public class TestIndexWriterReader extends LuceneTestCase {
return true; return true;
} }
}; };
LogMergePolicy mp = newLogMergePolicy(10);
mp.setTargetSearchConcurrency(1);
IndexWriter w = IndexWriter w =
new IndexWriter( new IndexWriter(
dir, dir,
@ -1053,7 +1057,7 @@ public class TestIndexWriterReader extends LuceneTestCase {
.setReaderPooling(true) .setReaderPooling(true)
.setInfoStream(infoStream) .setInfoStream(infoStream)
.setMergedSegmentWarmer(new SimpleMergedSegmentWarmer(infoStream)) .setMergedSegmentWarmer(new SimpleMergedSegmentWarmer(infoStream))
.setMergePolicy(newLogMergePolicy(10))); .setMergePolicy(mp));
Document doc = new Document(); Document doc = new Document();
doc.add(newStringField("foo", "bar", Field.Store.NO)); doc.add(newStringField("foo", "bar", Field.Store.NO));

View File

@ -249,8 +249,10 @@ public class TestLogMergePolicy extends BaseMergePolicyTestCase {
SegmentInfos segmentInfos = new SegmentInfos(Version.LATEST.major); SegmentInfos segmentInfos = new SegmentInfos(Version.LATEST.major);
LogMergePolicy mp = mergePolicy(); LogMergePolicy mp = mergePolicy();
// Number of segments guaranteed to trigger a merge.
int numSegmentsForMerging = mp.getMergeFactor() + mp.getTargetSearchConcurrency();
for (int i = 0; i < mp.getMergeFactor(); ++i) { for (int i = 0; i < numSegmentsForMerging; ++i) {
segmentInfos.add( segmentInfos.add(
makeSegmentCommitInfo( makeSegmentCommitInfo(
"_" + segNameGenerator.getAndIncrement(), "_" + segNameGenerator.getAndIncrement(),
@ -266,6 +268,6 @@ public class TestLogMergePolicy extends BaseMergePolicyTestCase {
segmentInfos = segmentInfos =
applyMerge(segmentInfos, merge, "_" + segNameGenerator.getAndIncrement(), stats); applyMerge(segmentInfos, merge, "_" + segNameGenerator.getAndIncrement(), stats);
} }
assertEquals(1, segmentInfos.size()); assertTrue(segmentInfos.size() < numSegmentsForMerging);
} }
} }

View File

@ -1053,6 +1053,7 @@ public abstract class LuceneTestCase extends Assert {
public static LogMergePolicy newLogMergePolicy(Random r) { public static LogMergePolicy newLogMergePolicy(Random r) {
LogMergePolicy logmp = r.nextBoolean() ? new LogDocMergePolicy() : new LogByteSizeMergePolicy(); LogMergePolicy logmp = r.nextBoolean() ? new LogDocMergePolicy() : new LogByteSizeMergePolicy();
logmp.setCalibrateSizeByDeletes(r.nextBoolean()); logmp.setCalibrateSizeByDeletes(r.nextBoolean());
logmp.setTargetSearchConcurrency(TestUtil.nextInt(random(), 1, 16));
if (rarely(r)) { if (rarely(r)) {
logmp.setMergeFactor(TestUtil.nextInt(r, 2, 9)); logmp.setMergeFactor(TestUtil.nextInt(r, 2, 9));
} else { } else {
@ -1112,14 +1113,14 @@ public abstract class LuceneTestCase extends Assert {
return logmp; return logmp;
} }
public static MergePolicy newLogMergePolicy(boolean useCFS, int mergeFactor) { public static LogMergePolicy newLogMergePolicy(boolean useCFS, int mergeFactor) {
LogMergePolicy logmp = newLogMergePolicy(); LogMergePolicy logmp = newLogMergePolicy();
logmp.setNoCFSRatio(useCFS ? 1.0 : 0.0); logmp.setNoCFSRatio(useCFS ? 1.0 : 0.0);
logmp.setMergeFactor(mergeFactor); logmp.setMergeFactor(mergeFactor);
return logmp; return logmp;
} }
public static MergePolicy newLogMergePolicy(int mergeFactor) { public static LogMergePolicy newLogMergePolicy(int mergeFactor) {
LogMergePolicy logmp = newLogMergePolicy(); LogMergePolicy logmp = newLogMergePolicy();
logmp.setMergeFactor(mergeFactor); logmp.setMergeFactor(mergeFactor);
return logmp; return logmp;