LUCENE-8962: Split test case (#1313)

* LUCENE-8962: Simplify test case

The testMergeOnCommit test case was trying to verify too many things
at once: basic semantics of merge on commit and proper behavior when
a bunch of indexing threads are writing and committing all at once.

Now we just verify basic behavior, with strict assertions on invariants, while 
leaving it to MockRandomMergePolicy to enable merge on commit in existing
 test cases to verify that indexing generally works as expected and no new
unexpected exceptions are thrown.

* LUCENE-8962: Only update toCommit if merge was committed

The code was previously assuming that if mergeFinished() was called and
isAborted() was false, then the merge must have completed successfully.
Instead, we should know for sure if a given merge was committed, and
only then update our pending commit SegmentInfos.
This commit is contained in:
Michael Sokolov 2020-03-05 15:49:26 -05:00 committed by GitHub
parent ceb90ce0e8
commit a030207a5e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 37 additions and 101 deletions

View File

@ -3159,7 +3159,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
// Commit thread timed out waiting for this merge and moved on. No need to manipulate toCommit. // Commit thread timed out waiting for this merge and moved on. No need to manipulate toCommit.
return; return;
} }
if (isAborted() == false) { if (committed) {
deleter.incRef(this.info.files()); deleter.incRef(this.info.files());
// Resolve "live" SegmentInfos segments to their toCommit cloned equivalents, based on segment name. // Resolve "live" SegmentInfos segments to their toCommit cloned equivalents, based on segment name.
Set<String> mergedSegmentNames = new HashSet<>(); Set<String> mergedSegmentNames = new HashSet<>();
@ -4076,6 +4076,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
} }
try (Closeable finalizer = this::checkpoint) { try (Closeable finalizer = this::checkpoint) {
merge.committed = true;
// Must close before checkpoint, otherwise IFD won't be // Must close before checkpoint, otherwise IFD won't be
// able to delete the held-open files from the merge // able to delete the held-open files from the merge
// readers: // readers:

View File

@ -225,6 +225,8 @@ public abstract class MergePolicy {
public final int totalMaxDoc; public final int totalMaxDoc;
Throwable error; Throwable error;
boolean committed; // Set by IndexWriter once the merge has been committed to disk
/** Sole constructor. /** Sole constructor.
* @param segments List of {@link SegmentCommitInfo}s * @param segments List of {@link SegmentCommitInfo}s
* to be merged. */ * to be merged. */

View File

@ -19,10 +19,7 @@ package org.apache.lucene.index;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection;
import java.util.List; import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.lucene.analysis.MockAnalyzer; import org.apache.lucene.analysis.MockAnalyzer;
import org.apache.lucene.document.Document; import org.apache.lucene.document.Document;
@ -32,11 +29,31 @@ import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.MatchAllDocsQuery; import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.lucene.store.Directory; import org.apache.lucene.store.Directory;
import org.apache.lucene.util.LineFileDocs;
import org.apache.lucene.util.LuceneTestCase; import org.apache.lucene.util.LuceneTestCase;
public class TestIndexWriterMergePolicy extends LuceneTestCase { public class TestIndexWriterMergePolicy extends LuceneTestCase {
private static final MergePolicy MERGE_ON_COMMIT_POLICY = new LogDocMergePolicy() {
@Override
public MergeSpecification findFullFlushMerges(MergeTrigger mergeTrigger, SegmentInfos segmentInfos, MergeContext mergeContext) {
// Optimize down to a single segment on commit
if (mergeTrigger == MergeTrigger.COMMIT && segmentInfos.size() > 1) {
List<SegmentCommitInfo> nonMergingSegments = new ArrayList<>();
for (SegmentCommitInfo sci : segmentInfos) {
if (mergeContext.getMergingSegments().contains(sci) == false) {
nonMergingSegments.add(sci);
}
}
if (nonMergingSegments.size() > 1) {
MergeSpecification mergeSpecification = new MergeSpecification();
mergeSpecification.add(new OneMerge(nonMergingSegments));
return mergeSpecification;
}
}
return null;
}
};
// Test the normal case // Test the normal case
public void testNormalCase() throws IOException { public void testNormalCase() throws IOException {
Directory dir = newDirectory(); Directory dir = newDirectory();
@ -286,7 +303,8 @@ public class TestIndexWriterMergePolicy extends LuceneTestCase {
assertSetters(new LogDocMergePolicy()); assertSetters(new LogDocMergePolicy());
} }
public void testMergeOnCommit() throws IOException, InterruptedException { // Test basic semantics of merge on commit
public void testMergeOnCommit() throws IOException {
Directory dir = newDirectory(); Directory dir = newDirectory();
IndexWriter firstWriter = new IndexWriter(dir, newIndexWriterConfig(new MockAnalyzer(random())) IndexWriter firstWriter = new IndexWriter(dir, newIndexWriterConfig(new MockAnalyzer(random()))
@ -298,118 +316,33 @@ public class TestIndexWriterMergePolicy extends LuceneTestCase {
DirectoryReader firstReader = DirectoryReader.open(firstWriter); DirectoryReader firstReader = DirectoryReader.open(firstWriter);
assertEquals(5, firstReader.leaves().size()); assertEquals(5, firstReader.leaves().size());
firstReader.close(); firstReader.close();
firstWriter.close(); firstWriter.close(); // When this writer closes, it does not merge on commit.
MergePolicy mergeOnCommitPolicy = new LogDocMergePolicy() {
@Override
public MergeSpecification findFullFlushMerges(MergeTrigger mergeTrigger, SegmentInfos segmentInfos, MergeContext mergeContext) {
// Optimize down to a single segment on commit
if (mergeTrigger == MergeTrigger.COMMIT && segmentInfos.size() > 1) {
List<SegmentCommitInfo> nonMergingSegments = new ArrayList<>();
for (SegmentCommitInfo sci : segmentInfos) {
if (mergeContext.getMergingSegments().contains(sci) == false) {
nonMergingSegments.add(sci);
}
}
if (nonMergingSegments.size() > 1) {
MergeSpecification mergeSpecification = new MergeSpecification();
mergeSpecification.add(new OneMerge(nonMergingSegments));
return mergeSpecification;
}
}
return null;
}
};
AtomicInteger abandonedMerges = new AtomicInteger(0);
IndexWriterConfig iwc = newIndexWriterConfig(new MockAnalyzer(random())) IndexWriterConfig iwc = newIndexWriterConfig(new MockAnalyzer(random()))
.setMergePolicy(mergeOnCommitPolicy) .setMergePolicy(MERGE_ON_COMMIT_POLICY);
.setIndexWriterEvents(new IndexWriterEvents() {
@Override
public void beginMergeOnCommit() {
}
@Override
public void finishMergeOnCommit() {
}
@Override
public void abandonedMergesOnCommit(int abandonedCount) {
abandonedMerges.incrementAndGet();
}
});
IndexWriter writerWithMergePolicy = new IndexWriter(dir, iwc); IndexWriter writerWithMergePolicy = new IndexWriter(dir, iwc);
writerWithMergePolicy.commit(); // No changes. Commit doesn't trigger a merge.
writerWithMergePolicy.commit();
DirectoryReader unmergedReader = DirectoryReader.open(writerWithMergePolicy); DirectoryReader unmergedReader = DirectoryReader.open(writerWithMergePolicy);
assertEquals(5, unmergedReader.leaves().size()); // Don't merge unless there's a change assertEquals(5, unmergedReader.leaves().size());
unmergedReader.close(); unmergedReader.close();
TestIndexWriter.addDoc(writerWithMergePolicy); TestIndexWriter.addDoc(writerWithMergePolicy);
writerWithMergePolicy.commit(); writerWithMergePolicy.commit(); // Doc added, do merge on commit.
assertEquals(1, writerWithMergePolicy.getSegmentCount()); //
DirectoryReader mergedReader = DirectoryReader.open(writerWithMergePolicy); DirectoryReader mergedReader = DirectoryReader.open(writerWithMergePolicy);
assertEquals(1, mergedReader.leaves().size()); // Now we merge on commit assertEquals(1, mergedReader.leaves().size());
mergedReader.close(); mergedReader.close();
LineFileDocs lineFileDocs = new LineFileDocs(random());
int docCount = atLeast(1000);
AtomicInteger indexedDocs = new AtomicInteger(0);
int numIndexingThreads = atLeast(2);
CountDownLatch startingGun = new CountDownLatch(1);
Collection<Thread> indexingThreads = new ArrayList<>();
for (int i = 0; i < numIndexingThreads; i++) {
Thread t = new Thread(() -> {
try {
startingGun.await();
while (indexedDocs.getAndIncrement() < docCount) {
writerWithMergePolicy.addDocument(lineFileDocs.nextDoc());
if (rarely()) {
writerWithMergePolicy.commit();
}
}
} catch (IOException | InterruptedException e) {
e.printStackTrace();
fail();
}
});
t.start();
indexingThreads.add(t);
}
startingGun.countDown();
for (Thread t : indexingThreads) {
t.join();
}
for (int i = 0; i < 50; i++) {
// Wait for pending merges to finish
synchronized (writerWithMergePolicy) {
if (writerWithMergePolicy.getMergingSegments().isEmpty()) {
break;
}
}
Thread.sleep(100);
}
abandonedMerges.set(0);
// Ensure there's at least one pending change so merge on commit happens
TestIndexWriter.addDoc(writerWithMergePolicy);
writerWithMergePolicy.commit();
if (abandonedMerges.get() == 0) {
assertEquals(1, writerWithMergePolicy.listOfSegmentCommitInfos().size());
} else {
assertNotEquals(1, writerWithMergePolicy.listOfSegmentCommitInfos().size());
}
try (IndexReader reader = writerWithMergePolicy.getReader()) { try (IndexReader reader = writerWithMergePolicy.getReader()) {
IndexSearcher searcher = new IndexSearcher(reader); IndexSearcher searcher = new IndexSearcher(reader);
assertEquals(docCount + 7, reader.numDocs()); assertEquals(6, reader.numDocs());
assertEquals(docCount + 7, searcher.count(new MatchAllDocsQuery())); assertEquals(6, searcher.count(new MatchAllDocsQuery()));
} }
writerWithMergePolicy.close(); writerWithMergePolicy.close();
dir.close(); dir.close();
} }