LUCENE-8328: Ensure ReadersAndUpdates consistently executes under lock

This commit is contained in:
Simon Willnauer 2018-05-23 07:40:47 +02:00
parent 9b1cb6646f
commit b54e5946de
3 changed files with 66 additions and 1 deletions

View File

@ -229,6 +229,9 @@ Bug Fixes
* LUCENE-8320: Fix WindowsFS to correctly account for rename and hardlinks. * LUCENE-8320: Fix WindowsFS to correctly account for rename and hardlinks.
(Simon Willnauer, Nhat Nguyen) (Simon Willnauer, Nhat Nguyen)
* LUCENE-8328: Ensure ReadersAndUpdates consistently executes under lock.
(Nhat Nguyen via Simon Willnauer)
Other Other
* LUCENE-8301: Update randomizedtesting to 2.6.0. (Dawid Weiss) * LUCENE-8301: Update randomizedtesting to 2.6.0. (Dawid Weiss)

View File

@ -249,7 +249,7 @@ final class ReadersAndUpdates {
return pendingDeletes.numDeletesToMerge(policy, this::getLatestReader); return pendingDeletes.numDeletesToMerge(policy, this::getLatestReader);
} }
private CodecReader getLatestReader() throws IOException { private synchronized CodecReader getLatestReader() throws IOException {
if (this.reader == null) { if (this.reader == null) {
// get a reader and dec the ref right away we just make sure we have a reader // get a reader and dec the ref right away we just make sure we have a reader
getReader(IOContext.READ).decRef(); getReader(IOContext.READ).decRef();
@ -667,6 +667,7 @@ final class ReadersAndUpdates {
private SegmentReader createNewReaderWithLatestLiveDocs(SegmentReader reader) throws IOException { private SegmentReader createNewReaderWithLatestLiveDocs(SegmentReader reader) throws IOException {
assert reader != null; assert reader != null;
assert Thread.holdsLock(this) : Thread.currentThread().getName();
SegmentReader newReader = new SegmentReader(info, reader, pendingDeletes.getLiveDocs(), SegmentReader newReader = new SegmentReader(info, reader, pendingDeletes.getLiveDocs(),
info.info.maxDoc() - info.getDelCount() - pendingDeletes.numPendingDeletes()); info.info.maxDoc() - info.getDelCount() - pendingDeletes.numPendingDeletes());
boolean success2 = false; boolean success2 = false;

View File

@ -19,6 +19,8 @@ package org.apache.lucene.index;
import java.io.IOException; import java.io.IOException;
import java.util.Collections; import java.util.Collections;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import com.carrotsearch.randomizedtesting.generators.RandomPicks; import com.carrotsearch.randomizedtesting.generators.RandomPicks;
import org.apache.lucene.document.Document; import org.apache.lucene.document.Document;
@ -28,6 +30,7 @@ import org.apache.lucene.document.StringField;
import org.apache.lucene.search.DocIdSetIterator; import org.apache.lucene.search.DocIdSetIterator;
import org.apache.lucene.store.Directory; import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IOContext; import org.apache.lucene.store.IOContext;
import org.apache.lucene.util.IOSupplier;
import org.apache.lucene.util.IOUtils; import org.apache.lucene.util.IOUtils;
import org.apache.lucene.util.LuceneTestCase; import org.apache.lucene.util.LuceneTestCase;
import org.apache.lucene.util.NullInfoStream; import org.apache.lucene.util.NullInfoStream;
@ -205,6 +208,64 @@ public class TestReaderPool extends LuceneTestCase {
IOUtils.close(pool, reader, directory); IOUtils.close(pool, reader, directory);
} }
public void testPassReaderToMergePolicyConcurrently() throws Exception {
Directory directory = newDirectory();
FieldInfos.FieldNumbers fieldNumbers = buildIndex(directory);
StandardDirectoryReader reader = (StandardDirectoryReader) DirectoryReader.open(directory);
SegmentInfos segmentInfos = reader.segmentInfos.clone();
ReaderPool pool = new ReaderPool(directory, directory, segmentInfos, fieldNumbers, () -> 0L,
new NullInfoStream(), null, null);
if (random().nextBoolean()) {
pool.enableReaderPooling();
}
AtomicBoolean isDone = new AtomicBoolean();
CountDownLatch latch = new CountDownLatch(1);
Thread refresher = new Thread(() -> {
try {
latch.countDown();
while (isDone.get() == false) {
for (SegmentCommitInfo commitInfo : segmentInfos) {
ReadersAndUpdates readersAndUpdates = pool.get(commitInfo, true);
SegmentReader segmentReader = readersAndUpdates.getReader(IOContext.READ);
readersAndUpdates.release(segmentReader);
pool.release(readersAndUpdates, random().nextBoolean());
}
}
} catch (Exception ex) {
throw new AssertionError(ex);
}
});
refresher.start();
MergePolicy mergePolicy = new FilterMergePolicy(newMergePolicy()) {
@Override
public boolean keepFullyDeletedSegment(IOSupplier<CodecReader> readerIOSupplier) throws IOException {
CodecReader reader = readerIOSupplier.get();
assert reader.maxDoc() > 0; // just try to access the reader
return true;
}
};
latch.await();
for (int i = 0; i < reader.maxDoc(); i++) {
for (SegmentCommitInfo commitInfo : segmentInfos) {
ReadersAndUpdates readersAndUpdates = pool.get(commitInfo, true);
SegmentReader sr = readersAndUpdates.getReadOnlyClone(IOContext.READ);
PostingsEnum postings = sr.postings(new Term("id", "" + i));
sr.decRef();
if (postings != null) {
for (int docId = postings.nextDoc(); docId != DocIdSetIterator.NO_MORE_DOCS; docId = postings.nextDoc()) {
readersAndUpdates.delete(docId);
assertTrue(readersAndUpdates.keepFullyDeletedSegment(mergePolicy));
}
}
assertTrue(readersAndUpdates.keepFullyDeletedSegment(mergePolicy));
pool.release(readersAndUpdates, random().nextBoolean());
}
}
isDone.set(true);
refresher.join();
IOUtils.close(pool, reader, directory);
}
private FieldInfos.FieldNumbers buildIndex(Directory directory) throws IOException { private FieldInfos.FieldNumbers buildIndex(Directory directory) throws IOException {
IndexWriter writer = new IndexWriter(directory, newIndexWriterConfig()); IndexWriter writer = new IndexWriter(directory, newIndexWriterConfig());
for (int i = 0; i < 10; i++) { for (int i = 0; i < 10; i++) {