Fix DV update files referenced by merge will be deleted by concurrent flush (#13017)

This commit is contained in:
guojialiang92 2024-03-04 19:18:16 +08:00 committed by GitHub
parent 3ce9ba9fd5
commit 47792dfcd3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 461 additions and 10 deletions

View File

@ -217,6 +217,8 @@ Bug Fixes
* GITHUB#13105: Fix ByteKnnVectorFieldSource & FloatKnnVectorFieldSource to work correctly when a segment does not contain
any docs with vectors (hossman)
* GITHUB#13017: Fix DV update files referenced by merge will be deleted by concurrent flush. (Jialiang Guo)
* GITHUB#13145: Detect MemorySegmentIndexInput correctly in NRTSuggester. (Uwe Schindler)
Other

View File

@ -3933,7 +3933,8 @@ public class IndexWriter
// updates
// in a separate map in order to be applied to the merged segment after it's done
rld.setIsMerging();
return rld.getReaderForMerge(context);
return rld.getReaderForMerge(
context, mr -> deleter.incRef(mr.reader.getSegmentInfo().files()));
});
}
closeReaders = false;
@ -5065,6 +5066,7 @@ public class IndexWriter
readerPool.drop(rld.info);
}
}
deleter.decRef(mr.reader.getSegmentInfo().files());
});
} else {
assert merge.getMergeReader().isEmpty()
@ -5134,7 +5136,10 @@ public class IndexWriter
sci -> {
final ReadersAndUpdates rld = getPooledInstance(sci, true);
rld.setIsMerging();
return rld.getReaderForMerge(context);
synchronized (this) {
return rld.getReaderForMerge(
context, mr -> deleter.incRef(mr.reader.getSegmentInfo().files()));
}
});
// Let the merge wrap readers
List<CodecReader> mergeReaders = new ArrayList<>();

View File

@ -39,6 +39,7 @@ import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.TrackingDirectoryWrapper;
import org.apache.lucene.util.Bits;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.IOConsumer;
import org.apache.lucene.util.IOUtils;
import org.apache.lucene.util.InfoStream;
@ -669,11 +670,6 @@ final class ReadersAndUpdates {
long bytes = ramBytesUsed.addAndGet(-bytesFreed);
assert bytes >= 0;
// if there is a reader open, reopen it to reflect the updates
if (reader != null) {
swapNewReaderWithLatestLiveDocs();
}
// writing field updates succeeded
assert fieldInfosFiles != null;
info.setFieldInfosFiles(fieldInfosFiles);
@ -690,6 +686,11 @@ final class ReadersAndUpdates {
}
info.setDocValuesUpdatesFiles(newDVFiles);
// if there is a reader open, reopen it to reflect the updates
if (reader != null) {
swapNewReaderWithLatestLiveDocs();
}
if (infoStream.isEnabled("BD")) {
infoStream.message(
"BD",
@ -766,7 +767,8 @@ final class ReadersAndUpdates {
}
/** Returns a reader for merge, with the latest doc values updates and deletions. */
synchronized MergePolicy.MergeReader getReaderForMerge(IOContext context) throws IOException {
synchronized MergePolicy.MergeReader getReaderForMerge(
IOContext context, IOConsumer<MergePolicy.MergeReader> readerConsumer) throws IOException {
// We must carry over any still-pending DV updates because they were not
// successfully written, e.g. because there was a hole in the delGens,
@ -782,13 +784,17 @@ final class ReadersAndUpdates {
}
SegmentReader reader = getReader(context);
if (pendingDeletes.needsRefresh(reader)) {
if (pendingDeletes.needsRefresh(reader)
|| reader.getSegmentInfo().getDelGen() != pendingDeletes.info.getDelGen()) {
// beware of zombies:
assert pendingDeletes.getLiveDocs() != null;
reader = createNewReaderWithLatestLiveDocs(reader);
}
assert pendingDeletes.verifyDocCounts(reader);
return new MergePolicy.MergeReader(reader, pendingDeletes.getHardLiveDocs());
MergePolicy.MergeReader mergeReader =
new MergePolicy.MergeReader(reader, pendingDeletes.getHardLiveDocs());
readerConsumer.accept(mergeReader);
return mergeReader;
}
/**

View File

@ -17,6 +17,9 @@
package org.apache.lucene.index;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.NoSuchFileException;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@ -33,11 +36,15 @@ import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.lucene.search.TermQuery;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FilterDirectory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.tests.analysis.MockAnalyzer;
import org.apache.lucene.tests.index.MockIndexWriterEventListener;
import org.apache.lucene.tests.index.RandomIndexWriter;
import org.apache.lucene.tests.mockfile.HandleLimitFS;
import org.apache.lucene.tests.util.LuceneTestCase;
import org.apache.lucene.util.IOFunction;
@HandleLimitFS.MaxOpenHandles(limit = HandleLimitFS.MaxOpenHandles.MAX_OPEN_FILES * 2)
// Some of these tests are too intense for SimpleText
@ -950,4 +957,435 @@ public class TestIndexWriterMergePolicy extends LuceneTestCase {
w.close();
dir.close();
}
private static final class MockAssertFileExistIndexInput extends IndexInput {
private final String name;
private final IndexInput delegate;
private final Path filePath;
public MockAssertFileExistIndexInput(String name, IndexInput delegate, Path filePath) {
super("MockAssertFileExistIndexInput(name=" + name + " delegate=" + delegate + ")");
this.name = name;
this.delegate = delegate;
this.filePath = filePath;
}
private void checkFileExist() throws IOException {
if (Files.exists(filePath) == false) {
throw new NoSuchFileException(filePath.toString());
}
}
@Override
public void close() throws IOException {
delegate.close();
}
@Override
public MockAssertFileExistIndexInput clone() {
return new MockAssertFileExistIndexInput(name, delegate.clone(), filePath);
}
@Override
public IndexInput slice(String sliceDescription, long offset, long length) throws IOException {
checkFileExist();
IndexInput slice = delegate.slice(sliceDescription, offset, length);
return new MockAssertFileExistIndexInput(sliceDescription, slice, filePath);
}
@Override
public long getFilePointer() {
return delegate.getFilePointer();
}
@Override
public void seek(long pos) throws IOException {
checkFileExist();
delegate.seek(pos);
}
@Override
public long length() {
return delegate.length();
}
@Override
public byte readByte() throws IOException {
checkFileExist();
return delegate.readByte();
}
@Override
public void readBytes(byte[] b, int offset, int len) throws IOException {
checkFileExist();
delegate.readBytes(b, offset, len);
}
}
public void testForceMergeDVUpdateFileWithConcurrentFlush() throws Exception {
CountDownLatch waitForInitMergeReader = new CountDownLatch(1);
CountDownLatch waitForDVUpdate = new CountDownLatch(1);
CountDownLatch waitForMergeFinished = new CountDownLatch(1);
Path path = createTempDir("testForceMergeDVUpdateFileWithConcurrentFlush");
Directory mockDirectory =
new FilterDirectory(newFSDirectory(path)) {
@Override
public IndexInput openInput(String name, IOContext context) throws IOException {
IndexInput indexInput = super.openInput(name, context);
return new MockAssertFileExistIndexInput(name, indexInput, path.resolve(name));
}
};
MergePolicy mockMergePolicy =
new OneMergeWrappingMergePolicy(
new SoftDeletesRetentionMergePolicy(
"soft_delete",
MatchAllDocsQuery::new,
new LogDocMergePolicy() {
@Override
public MergeSpecification findMerges(
MergeTrigger mergeTrigger,
SegmentInfos segmentInfos,
MergeContext mergeContext)
throws IOException {
// only allow force merge
return null;
}
}),
merge ->
new MergePolicy.OneMerge(merge.segments) {
@Override
void initMergeReaders(
IOFunction<SegmentCommitInfo, MergePolicy.MergeReader> readerFactory)
throws IOException {
super.initMergeReaders(readerFactory);
waitForInitMergeReader.countDown();
}
@Override
public CodecReader wrapForMerge(CodecReader reader) throws IOException {
try {
waitForDVUpdate.await();
} catch (InterruptedException e) {
throw new AssertionError(e);
}
return super.wrapForMerge(reader);
}
});
IndexWriter writer =
new IndexWriter(
mockDirectory,
newIndexWriterConfig()
.setMergePolicy(mockMergePolicy)
.setSoftDeletesField("soft_delete"));
Document doc = new Document();
doc.add(new StringField("id", "1", Field.Store.YES));
doc.add(new StringField("version", "1", Field.Store.YES));
writer.addDocument(doc);
writer.flush();
doc = new Document();
doc.add(new StringField("id", "2", Field.Store.YES));
doc.add(new StringField("version", "1", Field.Store.YES));
writer.addDocument(doc);
doc = new Document();
doc.add(new StringField("id", "2", Field.Store.YES));
doc.add(new StringField("version", "2", Field.Store.YES));
Field field = new NumericDocValuesField("soft_delete", 1);
writer.softUpdateDocument(new Term("id", "2"), doc, field);
writer.flush();
Thread t =
new Thread(
() -> {
try {
writer.forceMerge(1);
} catch (Throwable e) {
throw new AssertionError(e);
} finally {
waitForMergeFinished.countDown();
}
});
t.start();
waitForInitMergeReader.await();
doc = new Document();
doc.add(new StringField("id", "2", Field.Store.YES));
doc.add(new StringField("version", "3", Field.Store.YES));
field = new NumericDocValuesField("soft_delete", 1);
writer.softUpdateDocument(new Term("id", "2"), doc, field);
writer.flush();
waitForDVUpdate.countDown();
waitForMergeFinished.await();
writer.close();
mockDirectory.close();
}
public void testMergeDVUpdateFileOnGetReaderWithConcurrentFlush() throws Exception {
CountDownLatch waitForInitMergeReader = new CountDownLatch(1);
CountDownLatch waitForDVUpdate = new CountDownLatch(1);
Path path = createTempDir("testMergeDVUpdateFileOnGetReaderWithConcurrentFlush");
Directory mockDirectory =
new FilterDirectory(newFSDirectory(path)) {
@Override
public IndexInput openInput(String name, IOContext context) throws IOException {
IndexInput indexInput = super.openInput(name, context);
return new MockAssertFileExistIndexInput(name, indexInput, path.resolve(name));
}
};
IndexWriter firstWriter =
new IndexWriter(
mockDirectory,
newIndexWriterConfig(new MockAnalyzer(random()))
.setMergePolicy(NoMergePolicy.INSTANCE));
Document doc = new Document();
doc.add(new StringField("id", "1", Field.Store.YES));
doc.add(new StringField("version", "1", Field.Store.YES));
firstWriter.addDocument(doc);
firstWriter.flush();
doc = new Document();
doc.add(new StringField("id", "2", Field.Store.YES));
doc.add(new StringField("version", "1", Field.Store.YES));
firstWriter.addDocument(doc);
doc = new Document();
doc.add(new StringField("id", "2", Field.Store.YES));
doc.add(new StringField("version", "2", Field.Store.YES));
Field field = new NumericDocValuesField("soft_delete", 1);
firstWriter.softUpdateDocument(new Term("id", "2"), doc, field);
firstWriter.flush();
DirectoryReader firstReader = DirectoryReader.open(firstWriter);
assertEquals(2, firstReader.leaves().size());
firstReader.close();
firstWriter.close();
ConcurrentMergeScheduler mockConcurrentMergeScheduler =
new ConcurrentMergeScheduler() {
@Override
public void merge(MergeSource mergeSource, MergeTrigger trigger) throws IOException {
waitForInitMergeReader.countDown();
try {
waitForDVUpdate.await();
} catch (InterruptedException e) {
throw new AssertionError(e);
}
super.merge(mergeSource, trigger);
}
};
IndexWriterConfig iwc =
newIndexWriterConfig(new MockAnalyzer(random()))
.setMergePolicy(new MergeOnXMergePolicy(newMergePolicy(), MergeTrigger.GET_READER))
.setMaxFullFlushMergeWaitMillis(Integer.MAX_VALUE)
.setMergeScheduler(mockConcurrentMergeScheduler);
IndexWriter writerWithMergePolicy = new IndexWriter(mockDirectory, iwc);
Thread t =
new Thread(
() -> {
try {
waitForInitMergeReader.await();
Document updateDoc = new Document();
updateDoc.add(new StringField("id", "2", Field.Store.YES));
updateDoc.add(new StringField("version", "3", Field.Store.YES));
Field softDeleteField = new NumericDocValuesField("soft_delete", 1);
writerWithMergePolicy.softUpdateDocument(
new Term("id", "2"), updateDoc, softDeleteField);
DirectoryReader reader = DirectoryReader.open(writerWithMergePolicy, true, false);
reader.close();
waitForDVUpdate.countDown();
} catch (Exception e) {
throw new AssertionError(e);
}
});
t.start();
try (DirectoryReader mergedReader = DirectoryReader.open(writerWithMergePolicy)) {
assertEquals(1, mergedReader.leaves().size());
}
writerWithMergePolicy.close();
mockDirectory.close();
}
public void testMergeDVUpdateFileOnCommitWithConcurrentFlush() throws Exception {
CountDownLatch waitForInitMergeReader = new CountDownLatch(1);
CountDownLatch waitForDVUpdate = new CountDownLatch(1);
Path path = createTempDir("testMergeDVUpdateFileOnCommitWithConcurrentFlush");
Directory mockDirectory =
new FilterDirectory(newFSDirectory(path)) {
@Override
public IndexInput openInput(String name, IOContext context) throws IOException {
IndexInput indexInput = super.openInput(name, context);
return new MockAssertFileExistIndexInput(name, indexInput, path.resolve(name));
}
};
IndexWriter firstWriter =
new IndexWriter(
mockDirectory,
newIndexWriterConfig(new MockAnalyzer(random()))
.setMergePolicy(NoMergePolicy.INSTANCE));
Document doc = new Document();
doc.add(new StringField("id", "1", Field.Store.YES));
doc.add(new StringField("version", "1", Field.Store.YES));
firstWriter.addDocument(doc);
firstWriter.flush();
doc = new Document();
doc.add(new StringField("id", "2", Field.Store.YES));
doc.add(new StringField("version", "1", Field.Store.YES));
firstWriter.addDocument(doc);
doc = new Document();
doc.add(new StringField("id", "2", Field.Store.YES));
doc.add(new StringField("version", "2", Field.Store.YES));
Field field = new NumericDocValuesField("soft_delete", 1);
firstWriter.softUpdateDocument(new Term("id", "2"), doc, field);
firstWriter.flush();
DirectoryReader firstReader = DirectoryReader.open(firstWriter);
assertEquals(2, firstReader.leaves().size());
firstReader.close();
firstWriter.close();
ConcurrentMergeScheduler mockConcurrentMergeScheduler =
new ConcurrentMergeScheduler() {
@Override
public void merge(MergeSource mergeSource, MergeTrigger trigger) throws IOException {
waitForInitMergeReader.countDown();
try {
waitForDVUpdate.await();
} catch (InterruptedException e) {
throw new AssertionError(e);
}
super.merge(mergeSource, trigger);
}
};
IndexWriterConfig iwc =
newIndexWriterConfig(new MockAnalyzer(random()))
.setMergePolicy(new MergeOnXMergePolicy(newMergePolicy(), MergeTrigger.COMMIT))
.setMaxFullFlushMergeWaitMillis(Integer.MAX_VALUE)
.setMergeScheduler(mockConcurrentMergeScheduler);
IndexWriter writerWithMergePolicy = new IndexWriter(mockDirectory, iwc);
Thread t =
new Thread(
() -> {
try {
waitForInitMergeReader.await();
Document updateDoc = new Document();
updateDoc.add(new StringField("id", "2", Field.Store.YES));
updateDoc.add(new StringField("version", "3", Field.Store.YES));
Field softDeleteField = new NumericDocValuesField("soft_delete", 1);
writerWithMergePolicy.softUpdateDocument(
new Term("id", "2"), updateDoc, softDeleteField);
DirectoryReader reader = DirectoryReader.open(writerWithMergePolicy, true, false);
reader.close();
waitForDVUpdate.countDown();
} catch (Exception e) {
throw new AssertionError(e);
}
});
t.start();
writerWithMergePolicy.commit();
assertEquals(2, writerWithMergePolicy.getSegmentCount());
writerWithMergePolicy.close();
mockDirectory.close();
}
public void testForceMergeWithPendingHardAndSoftDeleteFile() throws Exception {
Path path = createTempDir("testForceMergeWithPendingHardAndSoftDeleteFile");
Directory mockDirectory =
new FilterDirectory(newFSDirectory(path)) {
@Override
public IndexInput openInput(String name, IOContext context) throws IOException {
IndexInput indexInput = super.openInput(name, context);
return new MockAssertFileExistIndexInput(name, indexInput, path.resolve(name));
}
};
MergePolicy mockMergePolicy =
new OneMergeWrappingMergePolicy(
new TieredMergePolicy() {
@Override
public MergeSpecification findMerges(
MergeTrigger mergeTrigger, SegmentInfos segmentInfos, MergeContext mergeContext)
throws IOException {
// only allow force merge
return null;
}
},
merge -> new MergePolicy.OneMerge(merge.segments) {});
IndexWriter writer =
new IndexWriter(mockDirectory, newIndexWriterConfig().setMergePolicy(mockMergePolicy));
Document doc = new Document();
doc.add(new StringField("id", "1", Field.Store.YES));
doc.add(new StringField("version", "1", Field.Store.YES));
writer.addDocument(doc);
writer.commit();
doc = new Document();
doc.add(new StringField("id", "2", Field.Store.YES));
doc.add(new StringField("version", "1", Field.Store.YES));
writer.addDocument(doc);
doc = new Document();
doc.add(new StringField("id", "3", Field.Store.YES));
doc.add(new StringField("version", "1", Field.Store.YES));
writer.addDocument(doc);
doc = new Document();
doc.add(new StringField("id", "4", Field.Store.YES));
doc.add(new StringField("version", "1", Field.Store.YES));
writer.addDocument(doc);
doc = new Document();
doc.add(new StringField("id", "5", Field.Store.YES));
doc.add(new StringField("version", "1", Field.Store.YES));
writer.addDocument(doc);
writer.commit();
doc = new Document();
doc.add(new StringField("id", "2", Field.Store.YES));
doc.add(new StringField("version", "2", Field.Store.YES));
writer.updateDocument(new Term("id", "2"), doc);
writer.commit();
doc = new Document();
doc.add(new StringField("id", "3", Field.Store.YES));
doc.add(new StringField("version", "2", Field.Store.YES));
writer.updateDocument(new Term("id", "3"), doc);
doc = new Document();
doc.add(new StringField("id", "4", Field.Store.YES));
doc.add(new StringField("version", "2", Field.Store.YES));
Field field = new NumericDocValuesField("soft_delete", 1);
writer.softUpdateDocument(new Term("id", "4"), doc, field);
DirectoryReader reader = writer.getReader(true, false);
reader.close();
writer.commit();
writer.forceMerge(1);
writer.close();
mockDirectory.close();
}
}