diff --git a/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java b/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java index a73fdb1d3ba..bfa04feb1bb 100644 --- a/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java +++ b/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java @@ -3159,7 +3159,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable, // Commit thread timed out waiting for this merge and moved on. No need to manipulate toCommit. return; } - if (isAborted() == false) { + if (committed) { deleter.incRef(this.info.files()); // Resolve "live" SegmentInfos segments to their toCommit cloned equivalents, based on segment name. Set mergedSegmentNames = new HashSet<>(); @@ -4076,6 +4076,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable, } try (Closeable finalizer = this::checkpoint) { + merge.committed = true; // Must close before checkpoint, otherwise IFD won't be // able to delete the held-open files from the merge // readers: diff --git a/lucene/core/src/java/org/apache/lucene/index/MergePolicy.java b/lucene/core/src/java/org/apache/lucene/index/MergePolicy.java index c191c6571ce..ad4069d5358 100644 --- a/lucene/core/src/java/org/apache/lucene/index/MergePolicy.java +++ b/lucene/core/src/java/org/apache/lucene/index/MergePolicy.java @@ -225,6 +225,8 @@ public abstract class MergePolicy { public final int totalMaxDoc; Throwable error; + boolean committed; // Set by IndexWriter once the merge has been committed to disk + /** Sole constructor. * @param segments List of {@link SegmentCommitInfo}s * to be merged. */ diff --git a/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterMergePolicy.java b/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterMergePolicy.java index f58ecee67b3..8a463efd835 100644 --- a/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterMergePolicy.java +++ b/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterMergePolicy.java @@ -19,10 +19,7 @@ package org.apache.lucene.index; import java.io.IOException; import java.util.ArrayList; -import java.util.Collection; import java.util.List; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.atomic.AtomicInteger; import org.apache.lucene.analysis.MockAnalyzer; import org.apache.lucene.document.Document; @@ -32,11 +29,31 @@ import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.MatchAllDocsQuery; import org.apache.lucene.store.Directory; -import org.apache.lucene.util.LineFileDocs; import org.apache.lucene.util.LuceneTestCase; public class TestIndexWriterMergePolicy extends LuceneTestCase { - + + private static final MergePolicy MERGE_ON_COMMIT_POLICY = new LogDocMergePolicy() { + @Override + public MergeSpecification findFullFlushMerges(MergeTrigger mergeTrigger, SegmentInfos segmentInfos, MergeContext mergeContext) { + // Optimize down to a single segment on commit + if (mergeTrigger == MergeTrigger.COMMIT && segmentInfos.size() > 1) { + List nonMergingSegments = new ArrayList<>(); + for (SegmentCommitInfo sci : segmentInfos) { + if (mergeContext.getMergingSegments().contains(sci) == false) { + nonMergingSegments.add(sci); + } + } + if (nonMergingSegments.size() > 1) { + MergeSpecification mergeSpecification = new MergeSpecification(); + mergeSpecification.add(new OneMerge(nonMergingSegments)); + return mergeSpecification; + } + } + return null; + } + }; + // Test the normal case public void testNormalCase() throws IOException { Directory dir = newDirectory(); @@ -286,7 +303,8 @@ public class TestIndexWriterMergePolicy extends LuceneTestCase { assertSetters(new LogDocMergePolicy()); } - public void testMergeOnCommit() throws IOException, InterruptedException { + // Test basic semantics of merge on commit + public void testMergeOnCommit() throws IOException { Directory dir = newDirectory(); IndexWriter firstWriter = new IndexWriter(dir, newIndexWriterConfig(new MockAnalyzer(random())) @@ -298,118 +316,33 @@ public class TestIndexWriterMergePolicy extends LuceneTestCase { DirectoryReader firstReader = DirectoryReader.open(firstWriter); assertEquals(5, firstReader.leaves().size()); firstReader.close(); - firstWriter.close(); + firstWriter.close(); // When this writer closes, it does not merge on commit. - MergePolicy mergeOnCommitPolicy = new LogDocMergePolicy() { - @Override - public MergeSpecification findFullFlushMerges(MergeTrigger mergeTrigger, SegmentInfos segmentInfos, MergeContext mergeContext) { - // Optimize down to a single segment on commit - if (mergeTrigger == MergeTrigger.COMMIT && segmentInfos.size() > 1) { - List nonMergingSegments = new ArrayList<>(); - for (SegmentCommitInfo sci : segmentInfos) { - if (mergeContext.getMergingSegments().contains(sci) == false) { - nonMergingSegments.add(sci); - } - } - if (nonMergingSegments.size() > 1) { - MergeSpecification mergeSpecification = new MergeSpecification(); - mergeSpecification.add(new OneMerge(nonMergingSegments)); - return mergeSpecification; - } - } - return null; - } - }; - - AtomicInteger abandonedMerges = new AtomicInteger(0); IndexWriterConfig iwc = newIndexWriterConfig(new MockAnalyzer(random())) - .setMergePolicy(mergeOnCommitPolicy) - .setIndexWriterEvents(new IndexWriterEvents() { - @Override - public void beginMergeOnCommit() { + .setMergePolicy(MERGE_ON_COMMIT_POLICY); - } - - @Override - public void finishMergeOnCommit() { - - } - - @Override - public void abandonedMergesOnCommit(int abandonedCount) { - abandonedMerges.incrementAndGet(); - } - }); IndexWriter writerWithMergePolicy = new IndexWriter(dir, iwc); - - writerWithMergePolicy.commit(); + writerWithMergePolicy.commit(); // No changes. Commit doesn't trigger a merge. DirectoryReader unmergedReader = DirectoryReader.open(writerWithMergePolicy); - assertEquals(5, unmergedReader.leaves().size()); // Don't merge unless there's a change + assertEquals(5, unmergedReader.leaves().size()); unmergedReader.close(); TestIndexWriter.addDoc(writerWithMergePolicy); - writerWithMergePolicy.commit(); + writerWithMergePolicy.commit(); // Doc added, do merge on commit. + assertEquals(1, writerWithMergePolicy.getSegmentCount()); // DirectoryReader mergedReader = DirectoryReader.open(writerWithMergePolicy); - assertEquals(1, mergedReader.leaves().size()); // Now we merge on commit + assertEquals(1, mergedReader.leaves().size()); mergedReader.close(); - LineFileDocs lineFileDocs = new LineFileDocs(random()); - int docCount = atLeast(1000); - AtomicInteger indexedDocs = new AtomicInteger(0); - int numIndexingThreads = atLeast(2); - CountDownLatch startingGun = new CountDownLatch(1); - Collection indexingThreads = new ArrayList<>(); - for (int i = 0; i < numIndexingThreads; i++) { - Thread t = new Thread(() -> { - try { - startingGun.await(); - while (indexedDocs.getAndIncrement() < docCount) { - writerWithMergePolicy.addDocument(lineFileDocs.nextDoc()); - if (rarely()) { - writerWithMergePolicy.commit(); - } - } - } catch (IOException | InterruptedException e) { - e.printStackTrace(); - fail(); - } - }); - t.start(); - indexingThreads.add(t); - } - startingGun.countDown(); - for (Thread t : indexingThreads) { - t.join(); - } - for (int i = 0; i < 50; i++) { - // Wait for pending merges to finish - synchronized (writerWithMergePolicy) { - if (writerWithMergePolicy.getMergingSegments().isEmpty()) { - break; - } - } - Thread.sleep(100); - } - abandonedMerges.set(0); - // Ensure there's at least one pending change so merge on commit happens - TestIndexWriter.addDoc(writerWithMergePolicy); - writerWithMergePolicy.commit(); - if (abandonedMerges.get() == 0) { - assertEquals(1, writerWithMergePolicy.listOfSegmentCommitInfos().size()); - } else { - assertNotEquals(1, writerWithMergePolicy.listOfSegmentCommitInfos().size()); - } - try (IndexReader reader = writerWithMergePolicy.getReader()) { IndexSearcher searcher = new IndexSearcher(reader); - assertEquals(docCount + 7, reader.numDocs()); - assertEquals(docCount + 7, searcher.count(new MatchAllDocsQuery())); + assertEquals(6, reader.numDocs()); + assertEquals(6, searcher.count(new MatchAllDocsQuery())); } writerWithMergePolicy.close(); - dir.close(); }