diff --git a/lucene/CHANGES.txt b/lucene/CHANGES.txt index b1061599876..4e53b7e4c54 100644 --- a/lucene/CHANGES.txt +++ b/lucene/CHANGES.txt @@ -217,6 +217,8 @@ Bug Fixes * GITHUB#13105: Fix ByteKnnVectorFieldSource & FloatKnnVectorFieldSource to work correctly when a segment does not contain any docs with vectors (hossman) +* GITHUB#13017: Fix DV update files referenced by merge will be deleted by concurrent flush. (Jialiang Guo) + * GITHUB#13145: Detect MemorySegmentIndexInput correctly in NRTSuggester. (Uwe Schindler) Other diff --git a/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java b/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java index 2bfa25fea43..c81abc3f1b0 100644 --- a/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java +++ b/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java @@ -3933,7 +3933,8 @@ public class IndexWriter // updates // in a separate map in order to be applied to the merged segment after it's done rld.setIsMerging(); - return rld.getReaderForMerge(context); + return rld.getReaderForMerge( + context, mr -> deleter.incRef(mr.reader.getSegmentInfo().files())); }); } closeReaders = false; @@ -5065,6 +5066,7 @@ public class IndexWriter readerPool.drop(rld.info); } } + deleter.decRef(mr.reader.getSegmentInfo().files()); }); } else { assert merge.getMergeReader().isEmpty() @@ -5134,7 +5136,10 @@ public class IndexWriter sci -> { final ReadersAndUpdates rld = getPooledInstance(sci, true); rld.setIsMerging(); - return rld.getReaderForMerge(context); + synchronized (this) { + return rld.getReaderForMerge( + context, mr -> deleter.incRef(mr.reader.getSegmentInfo().files())); + } }); // Let the merge wrap readers List mergeReaders = new ArrayList<>(); diff --git a/lucene/core/src/java/org/apache/lucene/index/ReadersAndUpdates.java b/lucene/core/src/java/org/apache/lucene/index/ReadersAndUpdates.java index e5eb05b35bb..737149446c1 100644 --- a/lucene/core/src/java/org/apache/lucene/index/ReadersAndUpdates.java +++ b/lucene/core/src/java/org/apache/lucene/index/ReadersAndUpdates.java @@ -39,6 +39,7 @@ import org.apache.lucene.store.IOContext; import org.apache.lucene.store.TrackingDirectoryWrapper; import org.apache.lucene.util.Bits; import org.apache.lucene.util.BytesRef; +import org.apache.lucene.util.IOConsumer; import org.apache.lucene.util.IOUtils; import org.apache.lucene.util.InfoStream; @@ -669,11 +670,6 @@ final class ReadersAndUpdates { long bytes = ramBytesUsed.addAndGet(-bytesFreed); assert bytes >= 0; - // if there is a reader open, reopen it to reflect the updates - if (reader != null) { - swapNewReaderWithLatestLiveDocs(); - } - // writing field updates succeeded assert fieldInfosFiles != null; info.setFieldInfosFiles(fieldInfosFiles); @@ -690,6 +686,11 @@ final class ReadersAndUpdates { } info.setDocValuesUpdatesFiles(newDVFiles); + // if there is a reader open, reopen it to reflect the updates + if (reader != null) { + swapNewReaderWithLatestLiveDocs(); + } + if (infoStream.isEnabled("BD")) { infoStream.message( "BD", @@ -766,7 +767,8 @@ final class ReadersAndUpdates { } /** Returns a reader for merge, with the latest doc values updates and deletions. */ - synchronized MergePolicy.MergeReader getReaderForMerge(IOContext context) throws IOException { + synchronized MergePolicy.MergeReader getReaderForMerge( + IOContext context, IOConsumer readerConsumer) throws IOException { // We must carry over any still-pending DV updates because they were not // successfully written, e.g. because there was a hole in the delGens, @@ -782,13 +784,17 @@ final class ReadersAndUpdates { } SegmentReader reader = getReader(context); - if (pendingDeletes.needsRefresh(reader)) { + if (pendingDeletes.needsRefresh(reader) + || reader.getSegmentInfo().getDelGen() != pendingDeletes.info.getDelGen()) { // beware of zombies: assert pendingDeletes.getLiveDocs() != null; reader = createNewReaderWithLatestLiveDocs(reader); } assert pendingDeletes.verifyDocCounts(reader); - return new MergePolicy.MergeReader(reader, pendingDeletes.getHardLiveDocs()); + MergePolicy.MergeReader mergeReader = + new MergePolicy.MergeReader(reader, pendingDeletes.getHardLiveDocs()); + readerConsumer.accept(mergeReader); + return mergeReader; } /** diff --git a/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterMergePolicy.java b/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterMergePolicy.java index 4e9bfb0533f..1835a4ff309 100644 --- a/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterMergePolicy.java +++ b/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterMergePolicy.java @@ -17,6 +17,9 @@ package org.apache.lucene.index; import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.NoSuchFileException; +import java.nio.file.Path; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -33,11 +36,15 @@ import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.MatchAllDocsQuery; import org.apache.lucene.search.TermQuery; import org.apache.lucene.store.Directory; +import org.apache.lucene.store.FilterDirectory; +import org.apache.lucene.store.IOContext; +import org.apache.lucene.store.IndexInput; import org.apache.lucene.tests.analysis.MockAnalyzer; import org.apache.lucene.tests.index.MockIndexWriterEventListener; import org.apache.lucene.tests.index.RandomIndexWriter; import org.apache.lucene.tests.mockfile.HandleLimitFS; import org.apache.lucene.tests.util.LuceneTestCase; +import org.apache.lucene.util.IOFunction; @HandleLimitFS.MaxOpenHandles(limit = HandleLimitFS.MaxOpenHandles.MAX_OPEN_FILES * 2) // Some of these tests are too intense for SimpleText @@ -950,4 +957,435 @@ public class TestIndexWriterMergePolicy extends LuceneTestCase { w.close(); dir.close(); } + + private static final class MockAssertFileExistIndexInput extends IndexInput { + private final String name; + private final IndexInput delegate; + private final Path filePath; + + public MockAssertFileExistIndexInput(String name, IndexInput delegate, Path filePath) { + super("MockAssertFileExistIndexInput(name=" + name + " delegate=" + delegate + ")"); + this.name = name; + this.delegate = delegate; + this.filePath = filePath; + } + + private void checkFileExist() throws IOException { + if (Files.exists(filePath) == false) { + throw new NoSuchFileException(filePath.toString()); + } + } + + @Override + public void close() throws IOException { + delegate.close(); + } + + @Override + public MockAssertFileExistIndexInput clone() { + return new MockAssertFileExistIndexInput(name, delegate.clone(), filePath); + } + + @Override + public IndexInput slice(String sliceDescription, long offset, long length) throws IOException { + checkFileExist(); + IndexInput slice = delegate.slice(sliceDescription, offset, length); + return new MockAssertFileExistIndexInput(sliceDescription, slice, filePath); + } + + @Override + public long getFilePointer() { + return delegate.getFilePointer(); + } + + @Override + public void seek(long pos) throws IOException { + checkFileExist(); + delegate.seek(pos); + } + + @Override + public long length() { + return delegate.length(); + } + + @Override + public byte readByte() throws IOException { + checkFileExist(); + return delegate.readByte(); + } + + @Override + public void readBytes(byte[] b, int offset, int len) throws IOException { + checkFileExist(); + delegate.readBytes(b, offset, len); + } + } + + public void testForceMergeDVUpdateFileWithConcurrentFlush() throws Exception { + CountDownLatch waitForInitMergeReader = new CountDownLatch(1); + CountDownLatch waitForDVUpdate = new CountDownLatch(1); + CountDownLatch waitForMergeFinished = new CountDownLatch(1); + + Path path = createTempDir("testForceMergeDVUpdateFileWithConcurrentFlush"); + Directory mockDirectory = + new FilterDirectory(newFSDirectory(path)) { + @Override + public IndexInput openInput(String name, IOContext context) throws IOException { + IndexInput indexInput = super.openInput(name, context); + return new MockAssertFileExistIndexInput(name, indexInput, path.resolve(name)); + } + }; + + MergePolicy mockMergePolicy = + new OneMergeWrappingMergePolicy( + new SoftDeletesRetentionMergePolicy( + "soft_delete", + MatchAllDocsQuery::new, + new LogDocMergePolicy() { + @Override + public MergeSpecification findMerges( + MergeTrigger mergeTrigger, + SegmentInfos segmentInfos, + MergeContext mergeContext) + throws IOException { + // only allow force merge + return null; + } + }), + merge -> + new MergePolicy.OneMerge(merge.segments) { + @Override + void initMergeReaders( + IOFunction readerFactory) + throws IOException { + super.initMergeReaders(readerFactory); + waitForInitMergeReader.countDown(); + } + + @Override + public CodecReader wrapForMerge(CodecReader reader) throws IOException { + try { + waitForDVUpdate.await(); + } catch (InterruptedException e) { + throw new AssertionError(e); + } + return super.wrapForMerge(reader); + } + }); + + IndexWriter writer = + new IndexWriter( + mockDirectory, + newIndexWriterConfig() + .setMergePolicy(mockMergePolicy) + .setSoftDeletesField("soft_delete")); + + Document doc = new Document(); + doc.add(new StringField("id", "1", Field.Store.YES)); + doc.add(new StringField("version", "1", Field.Store.YES)); + writer.addDocument(doc); + writer.flush(); + doc = new Document(); + doc.add(new StringField("id", "2", Field.Store.YES)); + doc.add(new StringField("version", "1", Field.Store.YES)); + writer.addDocument(doc); + doc = new Document(); + doc.add(new StringField("id", "2", Field.Store.YES)); + doc.add(new StringField("version", "2", Field.Store.YES)); + Field field = new NumericDocValuesField("soft_delete", 1); + writer.softUpdateDocument(new Term("id", "2"), doc, field); + writer.flush(); + + Thread t = + new Thread( + () -> { + try { + writer.forceMerge(1); + } catch (Throwable e) { + throw new AssertionError(e); + } finally { + waitForMergeFinished.countDown(); + } + }); + t.start(); + waitForInitMergeReader.await(); + + doc = new Document(); + doc.add(new StringField("id", "2", Field.Store.YES)); + doc.add(new StringField("version", "3", Field.Store.YES)); + field = new NumericDocValuesField("soft_delete", 1); + writer.softUpdateDocument(new Term("id", "2"), doc, field); + writer.flush(); + + waitForDVUpdate.countDown(); + waitForMergeFinished.await(); + + writer.close(); + mockDirectory.close(); + } + + public void testMergeDVUpdateFileOnGetReaderWithConcurrentFlush() throws Exception { + CountDownLatch waitForInitMergeReader = new CountDownLatch(1); + CountDownLatch waitForDVUpdate = new CountDownLatch(1); + + Path path = createTempDir("testMergeDVUpdateFileOnGetReaderWithConcurrentFlush"); + Directory mockDirectory = + new FilterDirectory(newFSDirectory(path)) { + @Override + public IndexInput openInput(String name, IOContext context) throws IOException { + IndexInput indexInput = super.openInput(name, context); + return new MockAssertFileExistIndexInput(name, indexInput, path.resolve(name)); + } + }; + + IndexWriter firstWriter = + new IndexWriter( + mockDirectory, + newIndexWriterConfig(new MockAnalyzer(random())) + .setMergePolicy(NoMergePolicy.INSTANCE)); + + Document doc = new Document(); + doc.add(new StringField("id", "1", Field.Store.YES)); + doc.add(new StringField("version", "1", Field.Store.YES)); + firstWriter.addDocument(doc); + firstWriter.flush(); + doc = new Document(); + doc.add(new StringField("id", "2", Field.Store.YES)); + doc.add(new StringField("version", "1", Field.Store.YES)); + firstWriter.addDocument(doc); + doc = new Document(); + doc.add(new StringField("id", "2", Field.Store.YES)); + doc.add(new StringField("version", "2", Field.Store.YES)); + Field field = new NumericDocValuesField("soft_delete", 1); + firstWriter.softUpdateDocument(new Term("id", "2"), doc, field); + firstWriter.flush(); + DirectoryReader firstReader = DirectoryReader.open(firstWriter); + assertEquals(2, firstReader.leaves().size()); + firstReader.close(); + firstWriter.close(); + + ConcurrentMergeScheduler mockConcurrentMergeScheduler = + new ConcurrentMergeScheduler() { + @Override + public void merge(MergeSource mergeSource, MergeTrigger trigger) throws IOException { + waitForInitMergeReader.countDown(); + try { + waitForDVUpdate.await(); + } catch (InterruptedException e) { + throw new AssertionError(e); + } + super.merge(mergeSource, trigger); + } + }; + + IndexWriterConfig iwc = + newIndexWriterConfig(new MockAnalyzer(random())) + .setMergePolicy(new MergeOnXMergePolicy(newMergePolicy(), MergeTrigger.GET_READER)) + .setMaxFullFlushMergeWaitMillis(Integer.MAX_VALUE) + .setMergeScheduler(mockConcurrentMergeScheduler); + + IndexWriter writerWithMergePolicy = new IndexWriter(mockDirectory, iwc); + + Thread t = + new Thread( + () -> { + try { + waitForInitMergeReader.await(); + + Document updateDoc = new Document(); + updateDoc.add(new StringField("id", "2", Field.Store.YES)); + updateDoc.add(new StringField("version", "3", Field.Store.YES)); + Field softDeleteField = new NumericDocValuesField("soft_delete", 1); + writerWithMergePolicy.softUpdateDocument( + new Term("id", "2"), updateDoc, softDeleteField); + DirectoryReader reader = DirectoryReader.open(writerWithMergePolicy, true, false); + reader.close(); + + waitForDVUpdate.countDown(); + } catch (Exception e) { + throw new AssertionError(e); + } + }); + t.start(); + + try (DirectoryReader mergedReader = DirectoryReader.open(writerWithMergePolicy)) { + assertEquals(1, mergedReader.leaves().size()); + } + + writerWithMergePolicy.close(); + mockDirectory.close(); + } + + public void testMergeDVUpdateFileOnCommitWithConcurrentFlush() throws Exception { + CountDownLatch waitForInitMergeReader = new CountDownLatch(1); + CountDownLatch waitForDVUpdate = new CountDownLatch(1); + + Path path = createTempDir("testMergeDVUpdateFileOnCommitWithConcurrentFlush"); + Directory mockDirectory = + new FilterDirectory(newFSDirectory(path)) { + @Override + public IndexInput openInput(String name, IOContext context) throws IOException { + IndexInput indexInput = super.openInput(name, context); + return new MockAssertFileExistIndexInput(name, indexInput, path.resolve(name)); + } + }; + + IndexWriter firstWriter = + new IndexWriter( + mockDirectory, + newIndexWriterConfig(new MockAnalyzer(random())) + .setMergePolicy(NoMergePolicy.INSTANCE)); + + Document doc = new Document(); + doc.add(new StringField("id", "1", Field.Store.YES)); + doc.add(new StringField("version", "1", Field.Store.YES)); + firstWriter.addDocument(doc); + firstWriter.flush(); + doc = new Document(); + doc.add(new StringField("id", "2", Field.Store.YES)); + doc.add(new StringField("version", "1", Field.Store.YES)); + firstWriter.addDocument(doc); + doc = new Document(); + doc.add(new StringField("id", "2", Field.Store.YES)); + doc.add(new StringField("version", "2", Field.Store.YES)); + Field field = new NumericDocValuesField("soft_delete", 1); + firstWriter.softUpdateDocument(new Term("id", "2"), doc, field); + firstWriter.flush(); + DirectoryReader firstReader = DirectoryReader.open(firstWriter); + assertEquals(2, firstReader.leaves().size()); + firstReader.close(); + firstWriter.close(); + + ConcurrentMergeScheduler mockConcurrentMergeScheduler = + new ConcurrentMergeScheduler() { + @Override + public void merge(MergeSource mergeSource, MergeTrigger trigger) throws IOException { + waitForInitMergeReader.countDown(); + try { + waitForDVUpdate.await(); + } catch (InterruptedException e) { + throw new AssertionError(e); + } + super.merge(mergeSource, trigger); + } + }; + + IndexWriterConfig iwc = + newIndexWriterConfig(new MockAnalyzer(random())) + .setMergePolicy(new MergeOnXMergePolicy(newMergePolicy(), MergeTrigger.COMMIT)) + .setMaxFullFlushMergeWaitMillis(Integer.MAX_VALUE) + .setMergeScheduler(mockConcurrentMergeScheduler); + + IndexWriter writerWithMergePolicy = new IndexWriter(mockDirectory, iwc); + + Thread t = + new Thread( + () -> { + try { + waitForInitMergeReader.await(); + + Document updateDoc = new Document(); + updateDoc.add(new StringField("id", "2", Field.Store.YES)); + updateDoc.add(new StringField("version", "3", Field.Store.YES)); + Field softDeleteField = new NumericDocValuesField("soft_delete", 1); + writerWithMergePolicy.softUpdateDocument( + new Term("id", "2"), updateDoc, softDeleteField); + DirectoryReader reader = DirectoryReader.open(writerWithMergePolicy, true, false); + reader.close(); + + waitForDVUpdate.countDown(); + } catch (Exception e) { + throw new AssertionError(e); + } + }); + t.start(); + + writerWithMergePolicy.commit(); + assertEquals(2, writerWithMergePolicy.getSegmentCount()); + + writerWithMergePolicy.close(); + mockDirectory.close(); + } + + public void testForceMergeWithPendingHardAndSoftDeleteFile() throws Exception { + Path path = createTempDir("testForceMergeWithPendingHardAndSoftDeleteFile"); + Directory mockDirectory = + new FilterDirectory(newFSDirectory(path)) { + @Override + public IndexInput openInput(String name, IOContext context) throws IOException { + IndexInput indexInput = super.openInput(name, context); + return new MockAssertFileExistIndexInput(name, indexInput, path.resolve(name)); + } + }; + + MergePolicy mockMergePolicy = + new OneMergeWrappingMergePolicy( + new TieredMergePolicy() { + @Override + public MergeSpecification findMerges( + MergeTrigger mergeTrigger, SegmentInfos segmentInfos, MergeContext mergeContext) + throws IOException { + // only allow force merge + return null; + } + }, + merge -> new MergePolicy.OneMerge(merge.segments) {}); + + IndexWriter writer = + new IndexWriter(mockDirectory, newIndexWriterConfig().setMergePolicy(mockMergePolicy)); + + Document doc = new Document(); + doc.add(new StringField("id", "1", Field.Store.YES)); + doc.add(new StringField("version", "1", Field.Store.YES)); + writer.addDocument(doc); + writer.commit(); + + doc = new Document(); + doc.add(new StringField("id", "2", Field.Store.YES)); + doc.add(new StringField("version", "1", Field.Store.YES)); + writer.addDocument(doc); + + doc = new Document(); + doc.add(new StringField("id", "3", Field.Store.YES)); + doc.add(new StringField("version", "1", Field.Store.YES)); + writer.addDocument(doc); + + doc = new Document(); + doc.add(new StringField("id", "4", Field.Store.YES)); + doc.add(new StringField("version", "1", Field.Store.YES)); + writer.addDocument(doc); + + doc = new Document(); + doc.add(new StringField("id", "5", Field.Store.YES)); + doc.add(new StringField("version", "1", Field.Store.YES)); + writer.addDocument(doc); + writer.commit(); + + doc = new Document(); + doc.add(new StringField("id", "2", Field.Store.YES)); + doc.add(new StringField("version", "2", Field.Store.YES)); + writer.updateDocument(new Term("id", "2"), doc); + writer.commit(); + + doc = new Document(); + doc.add(new StringField("id", "3", Field.Store.YES)); + doc.add(new StringField("version", "2", Field.Store.YES)); + writer.updateDocument(new Term("id", "3"), doc); + + doc = new Document(); + doc.add(new StringField("id", "4", Field.Store.YES)); + doc.add(new StringField("version", "2", Field.Store.YES)); + Field field = new NumericDocValuesField("soft_delete", 1); + writer.softUpdateDocument(new Term("id", "4"), doc, field); + + DirectoryReader reader = writer.getReader(true, false); + reader.close(); + writer.commit(); + + writer.forceMerge(1); + + writer.close(); + mockDirectory.close(); + } }