Add mergeProgress into MergeState

Signed-off-by: luyuncheng <luyuncheng@bytedance.com>
This commit is contained in:
luyuncheng 2024-09-25 19:35:12 +08:00
parent 53d1c2bd2f
commit 5e7aab1d6b
6 changed files with 23 additions and 11 deletions

View File

@ -69,7 +69,8 @@ final class PerFieldMergeState {
in.maxDocs, in.maxDocs,
in.infoStream, in.infoStream,
in.intraMergeTaskExecutor, in.intraMergeTaskExecutor,
in.needsIndexSort); in.needsIndexSort,
in.mergeProgress);
} }
private static class FilterFieldInfos extends FieldInfos { private static class FilterFieldInfos extends FieldInfos {

View File

@ -3453,7 +3453,8 @@ public class IndexWriter
trackingDir, trackingDir,
globalFieldNumberMap, globalFieldNumberMap,
context, context,
intraMergeExecutor); intraMergeExecutor,
merge.getMergeProgress());
if (!merger.shouldMerge()) { if (!merger.shouldMerge()) {
return; return;
@ -5254,7 +5255,8 @@ public class IndexWriter
dirWrapper, dirWrapper,
globalFieldNumberMap, globalFieldNumberMap,
context, context,
intraMergeExecutor); intraMergeExecutor,
merge.getMergeProgress());
merge.info.setSoftDelCount(Math.toIntExact(softDeleteCount.get())); merge.info.setSoftDelCount(Math.toIntExact(softDeleteCount.get()));
merge.checkAborted(); merge.checkAborted();

View File

@ -91,17 +91,22 @@ public class MergeState {
/** Indicates if the index needs to be sorted * */ /** Indicates if the index needs to be sorted * */
public boolean needsIndexSort; public boolean needsIndexSort;
/** Progress and state for an executing merge. */
public final MergePolicy.OneMergeProgress mergeProgress;
/** Sole constructor. */ /** Sole constructor. */
MergeState( MergeState(
List<CodecReader> readers, List<CodecReader> readers,
SegmentInfo segmentInfo, SegmentInfo segmentInfo,
InfoStream infoStream, InfoStream infoStream,
Executor intraMergeTaskExecutor) Executor intraMergeTaskExecutor,
MergePolicy.OneMergeProgress mergeProgress)
throws IOException { throws IOException {
verifyIndexSort(readers, segmentInfo); verifyIndexSort(readers, segmentInfo);
this.infoStream = infoStream; this.infoStream = infoStream;
int numReaders = readers.size(); int numReaders = readers.size();
this.intraMergeTaskExecutor = intraMergeTaskExecutor; this.intraMergeTaskExecutor = intraMergeTaskExecutor;
this.mergeProgress = mergeProgress;
maxDocs = new int[numReaders]; maxDocs = new int[numReaders];
fieldsProducers = new FieldsProducer[numReaders]; fieldsProducers = new FieldsProducer[numReaders];
@ -284,7 +289,8 @@ public class MergeState {
int[] maxDocs, int[] maxDocs,
InfoStream infoStream, InfoStream infoStream,
Executor intraMergeTaskExecutor, Executor intraMergeTaskExecutor,
boolean needsIndexSort) { boolean needsIndexSort,
MergePolicy.OneMergeProgress mergeProgress) {
this.docMaps = docMaps; this.docMaps = docMaps;
this.segmentInfo = segmentInfo; this.segmentInfo = segmentInfo;
this.mergeFieldInfos = mergeFieldInfos; this.mergeFieldInfos = mergeFieldInfos;
@ -301,5 +307,6 @@ public class MergeState {
this.infoStream = infoStream; this.infoStream = infoStream;
this.intraMergeTaskExecutor = intraMergeTaskExecutor; this.intraMergeTaskExecutor = intraMergeTaskExecutor;
this.needsIndexSort = needsIndexSort; this.needsIndexSort = needsIndexSort;
this.mergeProgress = mergeProgress;
} }
} }

View File

@ -59,13 +59,15 @@ final class SegmentMerger {
Directory dir, Directory dir,
FieldInfos.FieldNumbers fieldNumbers, FieldInfos.FieldNumbers fieldNumbers,
IOContext context, IOContext context,
Executor intraMergeTaskExecutor) Executor intraMergeTaskExecutor,
MergePolicy.OneMergeProgress mergeProgress)
throws IOException { throws IOException {
if (context.context() != IOContext.Context.MERGE) { if (context.context() != IOContext.Context.MERGE) {
throw new IllegalArgumentException( throw new IllegalArgumentException(
"IOContext.context should be MERGE; got: " + context.context()); "IOContext.context should be MERGE; got: " + context.context());
} }
mergeState = new MergeState(readers, segmentInfo, infoStream, intraMergeTaskExecutor); mergeState =
new MergeState(readers, segmentInfo, infoStream, intraMergeTaskExecutor, mergeProgress);
mergeStateCreationThread = Thread.currentThread(); mergeStateCreationThread = Thread.currentThread();
directory = dir; directory = dir;
this.codec = segmentInfo.getCodec(); this.codec = segmentInfo.getCodec();

View File

@ -230,7 +230,6 @@ public class TestDoc extends LuceneTestCase {
StringHelper.randomId(), StringHelper.randomId(),
new HashMap<>(), new HashMap<>(),
null); null);
SegmentMerger merger = SegmentMerger merger =
new SegmentMerger( new SegmentMerger(
Arrays.<CodecReader>asList(r1, r2), Arrays.<CodecReader>asList(r1, r2),
@ -239,8 +238,8 @@ public class TestDoc extends LuceneTestCase {
trackingDir, trackingDir,
new FieldInfos.FieldNumbers(null, null), new FieldInfos.FieldNumbers(null, null),
context, context,
new SameThreadExecutorService()); new SameThreadExecutorService(),
new MergePolicy.OneMergeProgress());
merger.merge(); merger.merge();
r1.close(); r1.close();
r2.close(); r2.close();

View File

@ -107,7 +107,8 @@ public class TestSegmentMerger extends LuceneTestCase {
mergedDir, mergedDir,
new FieldInfos.FieldNumbers(null, null), new FieldInfos.FieldNumbers(null, null),
newIOContext(random(), new IOContext(new MergeInfo(-1, -1, false, -1))), newIOContext(random(), new IOContext(new MergeInfo(-1, -1, false, -1))),
new SameThreadExecutorService()); new SameThreadExecutorService(),
new MergePolicy.OneMergeProgress());
MergeState mergeState = merger.merge(); MergeState mergeState = merger.merge();
int docsMerged = mergeState.segmentInfo.maxDoc(); int docsMerged = mergeState.segmentInfo.maxDoc();
assertTrue(docsMerged == 2); assertTrue(docsMerged == 2);