Avoid modify merge state in per field mergers (#13208)

The PerFieldDocValuesFormat and PerFieldPostingsFormat mutate and reset 
the fieldInfos of the mergeState during merges. Consequently, other 
running merge sub-tasks may fail to see some fieldInfos. This was
problematic since we introduced concurrency for merge sub-tasks.

Relates #13190
This commit is contained in:
Nhat Nguyen 2024-03-25 08:56:41 -07:00 committed by GitHub
parent d5ef6edf7f
commit f4db67fae2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 263 additions and 68 deletions

View File

@ -148,13 +148,8 @@ public abstract class PerFieldDocValuesFormat extends DocValuesFormat {
}
// Delegate the merge to the appropriate consumer
PerFieldMergeState pfMergeState = new PerFieldMergeState(mergeState);
try {
for (Map.Entry<DocValuesConsumer, Collection<String>> e : consumersToField.entrySet()) {
e.getKey().merge(pfMergeState.apply(e.getValue()));
}
} finally {
pfMergeState.reset();
for (Map.Entry<DocValuesConsumer, Collection<String>> e : consumersToField.entrySet()) {
e.getKey().merge(PerFieldMergeState.restrictFields(mergeState, e.getValue()));
}
}

View File

@ -31,72 +31,45 @@ import org.apache.lucene.index.IndexOptions;
import org.apache.lucene.index.MergeState;
import org.apache.lucene.index.Terms;
/**
* Utility class to update the {@link MergeState} instance to be restricted to a set of fields.
*
* <p>Warning: the input {@linkplain MergeState} instance will be updated when calling {@link
* #apply(Collection)}.
*
* <p>It should be called within a {@code try {...} finally {...}} block to make sure that the
* mergeState instance is restored to its original state:
*
* <pre>
* PerFieldMergeState pfMergeState = new PerFieldMergeState(mergeState);
* try {
* doSomething(pfMergeState.apply(fields));
* ...
* } finally {
* pfMergeState.reset();
* }
* </pre>
*/
/** Utility class creating a new {@link MergeState} to be restricted to a set of fields. */
final class PerFieldMergeState {
private final MergeState in;
private final FieldInfos orgMergeFieldInfos;
private final FieldInfos[] orgFieldInfos;
private final FieldsProducer[] orgFieldsProducers;
PerFieldMergeState(MergeState in) {
this.in = in;
this.orgMergeFieldInfos = in.mergeFieldInfos;
this.orgFieldInfos = new FieldInfos[in.fieldInfos.length];
this.orgFieldsProducers = new FieldsProducer[in.fieldsProducers.length];
System.arraycopy(in.fieldInfos, 0, this.orgFieldInfos, 0, this.orgFieldInfos.length);
System.arraycopy(
in.fieldsProducers, 0, this.orgFieldsProducers, 0, this.orgFieldsProducers.length);
}
/**
* Update the input {@link MergeState} instance to restrict the fields to the given ones.
* Create a new MergeState from the given {@link MergeState} instance with restricted fields.
*
* @param fields The fields to keep in the updated instance.
* @return The updated instance.
* @param fields The fields to keep in the new instance.
* @return The new MergeState with restricted fields
*/
MergeState apply(Collection<String> fields) {
in.mergeFieldInfos = new FilterFieldInfos(orgMergeFieldInfos, fields);
for (int i = 0; i < orgFieldInfos.length; i++) {
in.fieldInfos[i] = new FilterFieldInfos(orgFieldInfos[i], fields);
static MergeState restrictFields(MergeState in, Collection<String> fields) {
var fieldInfos = new FieldInfos[in.fieldInfos.length];
for (int i = 0; i < in.fieldInfos.length; i++) {
fieldInfos[i] = new FilterFieldInfos(in.fieldInfos[i], fields);
}
for (int i = 0; i < orgFieldsProducers.length; i++) {
in.fieldsProducers[i] =
orgFieldsProducers[i] == null
var fieldsProducers = new FieldsProducer[in.fieldsProducers.length];
for (int i = 0; i < in.fieldsProducers.length; i++) {
fieldsProducers[i] =
in.fieldsProducers[i] == null
? null
: new FilterFieldsProducer(orgFieldsProducers[i], fields);
: new FilterFieldsProducer(in.fieldsProducers[i], fields);
}
return in;
}
/**
* Resets the input {@link MergeState} instance to its original state.
*
* @return The reset instance.
*/
MergeState reset() {
in.mergeFieldInfos = orgMergeFieldInfos;
System.arraycopy(orgFieldInfos, 0, in.fieldInfos, 0, in.fieldInfos.length);
System.arraycopy(orgFieldsProducers, 0, in.fieldsProducers, 0, in.fieldsProducers.length);
return in;
var mergeFieldInfos = new FilterFieldInfos(in.mergeFieldInfos, fields);
return new MergeState(
in.docMaps,
in.segmentInfo,
mergeFieldInfos,
in.storedFieldsReaders,
in.termVectorsReaders,
in.normsProducers,
in.docValuesProducers,
fieldInfos,
in.liveDocs,
fieldsProducers,
in.pointsReaders,
in.knnVectorsReaders,
in.maxDocs,
in.infoStream,
in.intraMergeTaskExecutor,
in.needsIndexSort);
}
private static class FilterFieldInfos extends FieldInfos {

View File

@ -193,7 +193,6 @@ public abstract class PerFieldPostingsFormat extends PostingsFormat {
Map<PostingsFormat, FieldsGroup> formatToGroups = buildFieldsGroupMapping(indexedFieldNames);
// Merge postings
PerFieldMergeState pfMergeState = new PerFieldMergeState(mergeState);
boolean success = false;
try {
for (Map.Entry<PostingsFormat, FieldsGroup> ent : formatToGroups.entrySet()) {
@ -202,11 +201,10 @@ public abstract class PerFieldPostingsFormat extends PostingsFormat {
FieldsConsumer consumer = format.fieldsConsumer(group.state);
toClose.add(consumer);
consumer.merge(pfMergeState.apply(group.fields), norms);
consumer.merge(PerFieldMergeState.restrictFields(mergeState, group.fields), norms);
}
success = true;
} finally {
pfMergeState.reset();
if (!success) {
IOUtils.closeWhileHandlingException(toClose);
}

View File

@ -266,4 +266,40 @@ public class MergeState {
}
return docMapBuilder.build();
}
/** Create a new merge instance. */
public MergeState(
DocMap[] docMaps,
SegmentInfo segmentInfo,
FieldInfos mergeFieldInfos,
StoredFieldsReader[] storedFieldsReaders,
TermVectorsReader[] termVectorsReaders,
NormsProducer[] normsProducers,
DocValuesProducer[] docValuesProducers,
FieldInfos[] fieldInfos,
Bits[] liveDocs,
FieldsProducer[] fieldsProducers,
PointsReader[] pointsReaders,
KnnVectorsReader[] knnVectorsReaders,
int[] maxDocs,
InfoStream infoStream,
Executor intraMergeTaskExecutor,
boolean needsIndexSort) {
this.docMaps = docMaps;
this.segmentInfo = segmentInfo;
this.mergeFieldInfos = mergeFieldInfos;
this.storedFieldsReaders = storedFieldsReaders;
this.termVectorsReaders = termVectorsReaders;
this.normsProducers = normsProducers;
this.docValuesProducers = docValuesProducers;
this.fieldInfos = fieldInfos;
this.liveDocs = liveDocs;
this.fieldsProducers = fieldsProducers;
this.pointsReaders = pointsReaders;
this.knnVectorsReaders = knnVectorsReaders;
this.maxDocs = maxDocs;
this.infoStream = infoStream;
this.intraMergeTaskExecutor = intraMergeTaskExecutor;
this.needsIndexSort = needsIndexSort;
}
}

View File

@ -18,9 +18,26 @@ package org.apache.lucene.index;
import java.io.IOException;
import java.util.Locale;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.codecs.Codec;
import org.apache.lucene.codecs.DocValuesConsumer;
import org.apache.lucene.codecs.DocValuesFormat;
import org.apache.lucene.codecs.DocValuesProducer;
import org.apache.lucene.codecs.FieldsConsumer;
import org.apache.lucene.codecs.FieldsProducer;
import org.apache.lucene.codecs.FilterCodec;
import org.apache.lucene.codecs.NormsProducer;
import org.apache.lucene.codecs.PostingsFormat;
import org.apache.lucene.codecs.perfield.PerFieldDocValuesFormat;
import org.apache.lucene.codecs.perfield.PerFieldPostingsFormat;
import org.apache.lucene.document.BinaryDocValuesField;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.LongPoint;
import org.apache.lucene.document.StringField;
import org.apache.lucene.index.IndexWriterConfig.OpenMode;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IOContext;
@ -29,6 +46,7 @@ import org.apache.lucene.tests.analysis.MockTokenizer;
import org.apache.lucene.tests.store.MockDirectoryWrapper;
import org.apache.lucene.tests.util.LuceneTestCase;
import org.apache.lucene.tests.util.TestUtil;
import org.apache.lucene.util.BytesRef;
public class TestIndexWriterForceMerge extends LuceneTestCase {
public void testPartialMerge() throws IOException {
@ -289,4 +307,179 @@ public class TestIndexWriterForceMerge extends LuceneTestCase {
dir.close();
}
public void testMergePerField() throws IOException {
IndexWriterConfig config = new IndexWriterConfig();
ConcurrentMergeScheduler mergeScheduler =
new ConcurrentMergeScheduler() {
@Override
public Executor getIntraMergeExecutor(MergePolicy.OneMerge merge) {
// always enable parallel merges
return intraMergeExecutor;
}
};
mergeScheduler.setMaxMergesAndThreads(4, 4);
config.setMergeScheduler(mergeScheduler);
Codec codec = TestUtil.getDefaultCodec();
CyclicBarrier barrier = new CyclicBarrier(2);
config.setCodec(
new FilterCodec(codec.getName(), codec) {
@Override
public PostingsFormat postingsFormat() {
return new PerFieldPostingsFormat() {
@Override
public PostingsFormat getPostingsFormatForField(String field) {
return new BlockingOnMergePostingsFormat(
TestUtil.getDefaultPostingsFormat(), barrier);
}
};
}
@Override
public DocValuesFormat docValuesFormat() {
return new PerFieldDocValuesFormat() {
@Override
public DocValuesFormat getDocValuesFormatForField(String field) {
return new BlockingOnMergeDocValuesFormat(
TestUtil.getDefaultDocValuesFormat(), barrier);
}
};
}
});
try (Directory directory = newDirectory();
IndexWriter writer = new IndexWriter(directory, config)) {
int numDocs = 50 + random().nextInt(100);
int numFields = 5 + random().nextInt(5);
for (int d = 0; d < numDocs; d++) {
Document doc = new Document();
for (int f = 0; f < numFields * 2; f++) {
String field = "f" + f;
String value = "v-" + random().nextInt(100);
if (f % 2 == 0) {
doc.add(new StringField(field, value, Field.Store.NO));
} else {
doc.add(new BinaryDocValuesField(field, new BytesRef(value)));
}
doc.add(new LongPoint("p" + f, random().nextInt(10000)));
}
writer.addDocument(doc);
if (random().nextInt(100) < 10) {
writer.flush();
}
}
writer.forceMerge(1);
try (DirectoryReader reader = DirectoryReader.open(writer)) {
assertEquals(numDocs, reader.numDocs());
}
}
}
static class BlockingOnMergePostingsFormat extends PostingsFormat {
private final PostingsFormat postingsFormat;
private final CyclicBarrier barrier;
BlockingOnMergePostingsFormat(PostingsFormat postingsFormat, CyclicBarrier barrier) {
super(postingsFormat.getName());
this.postingsFormat = postingsFormat;
this.barrier = barrier;
}
@Override
public FieldsConsumer fieldsConsumer(SegmentWriteState state) throws IOException {
var in = postingsFormat.fieldsConsumer(state);
return new FieldsConsumer() {
@Override
public void write(Fields fields, NormsProducer norms) throws IOException {
in.write(fields, norms);
}
@Override
public void merge(MergeState mergeState, NormsProducer norms) throws IOException {
try {
barrier.await(1, TimeUnit.SECONDS);
} catch (Exception e) {
throw new AssertionError("broken barrier", e);
}
in.merge(mergeState, norms);
}
@Override
public void close() throws IOException {
in.close();
}
};
}
@Override
public FieldsProducer fieldsProducer(SegmentReadState state) throws IOException {
return postingsFormat.fieldsProducer(state);
}
}
static class BlockingOnMergeDocValuesFormat extends DocValuesFormat {
private final DocValuesFormat docValuesFormat;
private final CyclicBarrier barrier;
BlockingOnMergeDocValuesFormat(DocValuesFormat docValuesFormat, CyclicBarrier barrier) {
super(docValuesFormat.getName());
this.docValuesFormat = docValuesFormat;
this.barrier = barrier;
}
@Override
public DocValuesConsumer fieldsConsumer(SegmentWriteState state) throws IOException {
DocValuesConsumer in = docValuesFormat.fieldsConsumer(state);
return new DocValuesConsumer() {
@Override
public void addNumericField(FieldInfo field, DocValuesProducer valuesProducer)
throws IOException {
in.addNumericField(field, valuesProducer);
}
@Override
public void addBinaryField(FieldInfo field, DocValuesProducer valuesProducer)
throws IOException {
in.addBinaryField(field, valuesProducer);
}
@Override
public void addSortedField(FieldInfo field, DocValuesProducer valuesProducer)
throws IOException {
in.addSortedField(field, valuesProducer);
}
@Override
public void addSortedNumericField(FieldInfo field, DocValuesProducer valuesProducer)
throws IOException {
in.addSortedNumericField(field, valuesProducer);
}
@Override
public void addSortedSetField(FieldInfo field, DocValuesProducer valuesProducer)
throws IOException {
in.addSortedSetField(field, valuesProducer);
}
@Override
public void merge(MergeState mergeState) throws IOException {
try {
barrier.await(1, TimeUnit.SECONDS);
} catch (Exception e) {
throw new AssertionError("broken barrier", e);
}
in.merge(mergeState);
}
@Override
public void close() throws IOException {
in.close();
}
};
}
@Override
public DocValuesProducer fieldsProducer(SegmentReadState state) throws IOException {
return docValuesFormat.fieldsProducer(state);
}
}
}