This commit adds a new test CMS that always provides intra-merge parallelism (#13475)

@iverase has uncovered a potential issue with intra-merge CMS parallelism.

This commit helps expose this problem by forcing tests to use intra-merge parallelism instead of always (well, usually) delegating to a SameThreadExecutorService.

When intra-merge parallelism is used, norms, doc_values, stored_values, etc. are all merged in a separate thread than the thread that was used to construct their merge optimized instances.

This trips assertions numerous assertions in AssertingCodec.assertThread where we assume that the thread that called getMergeInstance() is also the thread getting the values to merge.

In addition to the better testing, this corrects poor merge state handling in the event of parallelism.
This commit is contained in:
Benjamin Trent 2024-09-11 09:14:50 -04:00 committed by GitHub
parent 45da83bd72
commit b940511b07
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 98 additions and 20 deletions

View File

@ -333,6 +333,8 @@ New Features
Improvements Improvements
--------------------- ---------------------
* GITHUB#13475: Re-enable intra-merge parallelism except for terms, norms, and doc values.
Related to GITHUB#13478. (Ben Trent)
* GITHUB#13548: Refactor and javadoc update for KNN vector writer classes. (Patrick Zhai) * GITHUB#13548: Refactor and javadoc update for KNN vector writer classes. (Patrick Zhai)

View File

@ -41,7 +41,7 @@ import org.apache.lucene.util.packed.PackedLongValues;
* *
* @lucene.experimental * @lucene.experimental
*/ */
public class MergeState { public class MergeState implements Cloneable {
/** Maps document IDs from old segments to document IDs in the new segment */ /** Maps document IDs from old segments to document IDs in the new segment */
public final DocMap[] docMaps; public final DocMap[] docMaps;
@ -302,4 +302,55 @@ public class MergeState {
this.intraMergeTaskExecutor = intraMergeTaskExecutor; this.intraMergeTaskExecutor = intraMergeTaskExecutor;
this.needsIndexSort = needsIndexSort; this.needsIndexSort = needsIndexSort;
} }
@Override
public MergeState clone() {
StoredFieldsReader[] storedFieldsReaders = this.storedFieldsReaders.clone();
TermVectorsReader[] termVectorsReaders = this.termVectorsReaders.clone();
NormsProducer[] normsProducers = this.normsProducers.clone();
DocValuesProducer[] docValuesProducers = this.docValuesProducers.clone();
FieldsProducer[] fieldsProducers = this.fieldsProducers.clone();
PointsReader[] pointsReaders = this.pointsReaders.clone();
KnnVectorsReader[] knnVectorsReaders = this.knnVectorsReaders.clone();
for (int i = 0; i < storedFieldsReaders.length; ++i) {
if (storedFieldsReaders[i] != null) {
storedFieldsReaders[i] = storedFieldsReaders[i].getMergeInstance();
}
if (termVectorsReaders[i] != null) {
termVectorsReaders[i] = termVectorsReaders[i].getMergeInstance();
}
if (normsProducers[i] != null) {
normsProducers[i] = normsProducers[i].getMergeInstance();
}
if (docValuesProducers[i] != null) {
docValuesProducers[i] = docValuesProducers[i].getMergeInstance();
}
if (fieldsProducers[i] != null) {
fieldsProducers[i] = fieldsProducers[i].getMergeInstance();
}
if (pointsReaders[i] != null) {
pointsReaders[i] = pointsReaders[i].getMergeInstance();
}
if (knnVectorsReaders[i] != null) {
knnVectorsReaders[i] = knnVectorsReaders[i].getMergeInstance();
}
}
return new MergeState(
docMaps,
segmentInfo,
mergeFieldInfos,
storedFieldsReaders,
termVectorsReaders,
normsProducers,
docValuesProducers,
fieldInfos,
liveDocs,
fieldsProducers,
pointsReaders,
knnVectorsReaders,
maxDocs,
infoStream,
intraMergeTaskExecutor,
needsIndexSort);
}
} }

View File

@ -52,6 +52,7 @@ final class SegmentMerger {
final MergeState mergeState; final MergeState mergeState;
private final FieldInfos.Builder fieldInfosBuilder; private final FieldInfos.Builder fieldInfosBuilder;
final Thread mergeStateCreationThread;
// note, just like in codec apis Directory 'dir' is NOT the same as segmentInfo.dir!! // note, just like in codec apis Directory 'dir' is NOT the same as segmentInfo.dir!!
SegmentMerger( SegmentMerger(
@ -68,6 +69,7 @@ final class SegmentMerger {
"IOContext.context should be MERGE; got: " + context.context()); "IOContext.context should be MERGE; got: " + context.context());
} }
mergeState = new MergeState(readers, segmentInfo, infoStream, intraMergeTaskExecutor); mergeState = new MergeState(readers, segmentInfo, infoStream, intraMergeTaskExecutor);
mergeStateCreationThread = Thread.currentThread();
directory = dir; directory = dir;
this.codec = segmentInfo.getCodec(); this.codec = segmentInfo.getCodec();
this.context = context; this.context = context;
@ -99,6 +101,16 @@ final class SegmentMerger {
return mergeState.segmentInfo.maxDoc() > 0; return mergeState.segmentInfo.maxDoc() > 0;
} }
private MergeState mergeState() {
MergeState mergeState = this.mergeState;
if (Thread.currentThread() != mergeStateCreationThread) {
// Most merges, e.g. small merges, run in the same thread, so save the cost of pulling a clone
// in that case.
mergeState = mergeState.clone();
}
return mergeState;
}
/** /**
* Merges the readers into the directory passed to the constructor * Merges the readers into the directory passed to the constructor
* *
@ -137,25 +149,15 @@ final class SegmentMerger {
TaskExecutor taskExecutor = new TaskExecutor(mergeState.intraMergeTaskExecutor); TaskExecutor taskExecutor = new TaskExecutor(mergeState.intraMergeTaskExecutor);
List<Callable<Void>> mergingTasks = new ArrayList<>(); List<Callable<Void>> mergingTasks = new ArrayList<>();
mergingTasks.add( if (mergeState.mergeFieldInfos.hasNorms()) {
() -> { mergeWithLogging(this::mergeNorms, segmentWriteState, segmentReadState, "norms", numMerged);
if (mergeState.mergeFieldInfos.hasNorms()) { }
mergeWithLogging(
this::mergeNorms, segmentWriteState, segmentReadState, "norms", numMerged);
}
mergeWithLogging( mergeWithLogging(this::mergeTerms, segmentWriteState, segmentReadState, "postings", numMerged);
this::mergeTerms, segmentWriteState, segmentReadState, "postings", numMerged);
return null;
});
if (mergeState.mergeFieldInfos.hasDocValues()) { if (mergeState.mergeFieldInfos.hasDocValues()) {
mergingTasks.add( mergeWithLogging(
() -> { this::mergeDocValues, segmentWriteState, segmentReadState, "doc values", numMerged);
mergeWithLogging(
this::mergeDocValues, segmentWriteState, segmentReadState, "doc values", numMerged);
return null;
});
} }
if (mergeState.mergeFieldInfos.hasPointValues()) { if (mergeState.mergeFieldInfos.hasPointValues()) {
@ -201,6 +203,7 @@ final class SegmentMerger {
private void mergeDocValues( private void mergeDocValues(
SegmentWriteState segmentWriteState, SegmentReadState segmentReadState) throws IOException { SegmentWriteState segmentWriteState, SegmentReadState segmentReadState) throws IOException {
MergeState mergeState = mergeState();
try (DocValuesConsumer consumer = codec.docValuesFormat().fieldsConsumer(segmentWriteState)) { try (DocValuesConsumer consumer = codec.docValuesFormat().fieldsConsumer(segmentWriteState)) {
consumer.merge(mergeState); consumer.merge(mergeState);
} }
@ -208,6 +211,7 @@ final class SegmentMerger {
private void mergePoints(SegmentWriteState segmentWriteState, SegmentReadState segmentReadState) private void mergePoints(SegmentWriteState segmentWriteState, SegmentReadState segmentReadState)
throws IOException { throws IOException {
MergeState mergeState = mergeState();
try (PointsWriter writer = codec.pointsFormat().fieldsWriter(segmentWriteState)) { try (PointsWriter writer = codec.pointsFormat().fieldsWriter(segmentWriteState)) {
writer.merge(mergeState); writer.merge(mergeState);
} }
@ -215,6 +219,7 @@ final class SegmentMerger {
private void mergeNorms(SegmentWriteState segmentWriteState, SegmentReadState segmentReadState) private void mergeNorms(SegmentWriteState segmentWriteState, SegmentReadState segmentReadState)
throws IOException { throws IOException {
MergeState mergeState = mergeState();
try (NormsConsumer consumer = codec.normsFormat().normsConsumer(segmentWriteState)) { try (NormsConsumer consumer = codec.normsFormat().normsConsumer(segmentWriteState)) {
consumer.merge(mergeState); consumer.merge(mergeState);
} }
@ -222,6 +227,7 @@ final class SegmentMerger {
private void mergeTerms(SegmentWriteState segmentWriteState, SegmentReadState segmentReadState) private void mergeTerms(SegmentWriteState segmentWriteState, SegmentReadState segmentReadState)
throws IOException { throws IOException {
MergeState mergeState = mergeState();
try (NormsProducer norms = try (NormsProducer norms =
mergeState.mergeFieldInfos.hasNorms() mergeState.mergeFieldInfos.hasNorms()
? codec.normsFormat().normsProducer(segmentReadState) ? codec.normsFormat().normsProducer(segmentReadState)
@ -256,6 +262,7 @@ final class SegmentMerger {
* @throws IOException if there is a low-level IO error * @throws IOException if there is a low-level IO error
*/ */
private int mergeFields() throws IOException { private int mergeFields() throws IOException {
MergeState mergeState = mergeState();
try (StoredFieldsWriter fieldsWriter = try (StoredFieldsWriter fieldsWriter =
codec.storedFieldsFormat().fieldsWriter(directory, mergeState.segmentInfo, context)) { codec.storedFieldsFormat().fieldsWriter(directory, mergeState.segmentInfo, context)) {
return fieldsWriter.merge(mergeState); return fieldsWriter.merge(mergeState);
@ -268,6 +275,7 @@ final class SegmentMerger {
* @throws IOException if there is a low-level IO error * @throws IOException if there is a low-level IO error
*/ */
private int mergeTermVectors() throws IOException { private int mergeTermVectors() throws IOException {
MergeState mergeState = mergeState();
try (TermVectorsWriter termVectorsWriter = try (TermVectorsWriter termVectorsWriter =
codec.termVectorsFormat().vectorsWriter(directory, mergeState.segmentInfo, context)) { codec.termVectorsFormat().vectorsWriter(directory, mergeState.segmentInfo, context)) {
int numMerged = termVectorsWriter.merge(mergeState); int numMerged = termVectorsWriter.merge(mergeState);
@ -278,6 +286,7 @@ final class SegmentMerger {
private void mergeVectorValues( private void mergeVectorValues(
SegmentWriteState segmentWriteState, SegmentReadState segmentReadState) throws IOException { SegmentWriteState segmentWriteState, SegmentReadState segmentReadState) throws IOException {
MergeState mergeState = mergeState();
try (KnnVectorsWriter writer = codec.knnVectorsFormat().fieldsWriter(segmentWriteState)) { try (KnnVectorsWriter writer = codec.knnVectorsFormat().fieldsWriter(segmentWriteState)) {
writer.merge(mergeState); writer.merge(mergeState);
} }

View File

@ -308,6 +308,7 @@ public class TestIndexWriterForceMerge extends LuceneTestCase {
dir.close(); dir.close();
} }
@AwaitsFix(bugUrl = "https://github.com/apache/lucene/issues/13478")
public void testMergePerField() throws IOException { public void testMergePerField() throws IOException {
IndexWriterConfig config = new IndexWriterConfig(); IndexWriterConfig config = new IndexWriterConfig();
ConcurrentMergeScheduler mergeScheduler = ConcurrentMergeScheduler mergeScheduler =

View File

@ -94,6 +94,7 @@ import java.util.Random;
import java.util.Set; import java.util.Set;
import java.util.TimeZone; import java.util.TimeZone;
import java.util.TreeSet; import java.util.TreeSet;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
@ -943,10 +944,10 @@ public abstract class LuceneTestCase extends Assert {
} else if (rarely(r)) { } else if (rarely(r)) {
ConcurrentMergeScheduler cms; ConcurrentMergeScheduler cms;
if (r.nextBoolean()) { if (r.nextBoolean()) {
cms = new ConcurrentMergeScheduler(); cms = new TestConcurrentMergeScheduler();
} else { } else {
cms = cms =
new ConcurrentMergeScheduler() { new TestConcurrentMergeScheduler() {
@Override @Override
protected synchronized boolean maybeStall(MergeSource mergeSource) { protected synchronized boolean maybeStall(MergeSource mergeSource) {
return true; return true;
@ -965,7 +966,8 @@ public abstract class LuceneTestCase extends Assert {
} else { } else {
// Always use consistent settings, else CMS's dynamic (SSD or not) // Always use consistent settings, else CMS's dynamic (SSD or not)
// defaults can change, hurting reproducibility: // defaults can change, hurting reproducibility:
ConcurrentMergeScheduler cms = new ConcurrentMergeScheduler(); ConcurrentMergeScheduler cms =
randomBoolean() ? new TestConcurrentMergeScheduler() : new ConcurrentMergeScheduler();
// Only 1 thread can run at once (should maybe help reproducibility), // Only 1 thread can run at once (should maybe help reproducibility),
// with up to 3 pending merges before segment-producing threads are // with up to 3 pending merges before segment-producing threads are
@ -3292,4 +3294,17 @@ public abstract class LuceneTestCase extends Assert {
.toList(); .toList();
return RandomPicks.randomFrom(random(), availableFormats); return RandomPicks.randomFrom(random(), availableFormats);
} }
/**
* This is a test merge scheduler that will always use the intra merge executor to ensure we test
* it.
*/
static class TestConcurrentMergeScheduler extends ConcurrentMergeScheduler {
@Override
public Executor getIntraMergeExecutor(MergePolicy.OneMerge merge) {
assert intraMergeExecutor != null : "scaledExecutor is not initialized";
// Always do the intra merge executor to ensure we test it
return intraMergeExecutor;
}
}
} }