LUCENE-8809: Ensure release segment states

If refresh and rollback happen concurrently, then we can leave segment
states unreleased leads to leaking refCount of some SegmentReaders.
This commit is contained in:
Nhat Nguyen 2019-05-23 08:04:35 -04:00
parent 0ff6a59396
commit 9c5c7a7e8d
2 changed files with 62 additions and 12 deletions

View File

@ -19,7 +19,6 @@ package org.apache.lucene.index;
import java.io.Closeable; import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
@ -346,21 +345,23 @@ final class FrozenBufferedUpdates {
public static BufferedUpdatesStream.ApplyDeletesResult closeSegmentStates(IndexWriter writer, BufferedUpdatesStream.SegmentState[] segStates, boolean success) throws IOException { public static BufferedUpdatesStream.ApplyDeletesResult closeSegmentStates(IndexWriter writer, BufferedUpdatesStream.SegmentState[] segStates, boolean success) throws IOException {
List<SegmentCommitInfo> allDeleted = null; List<SegmentCommitInfo> allDeleted = null;
long totDelCount = 0; long totDelCount = 0;
final List<BufferedUpdatesStream.SegmentState> segmentStates = Arrays.asList(segStates); try {
for (BufferedUpdatesStream.SegmentState segState : segmentStates) { for (BufferedUpdatesStream.SegmentState segState : segStates) {
if (success) { if (success) {
totDelCount += segState.rld.getDelCount() - segState.startDelCount; totDelCount += segState.rld.getDelCount() - segState.startDelCount;
int fullDelCount = segState.rld.getDelCount(); int fullDelCount = segState.rld.getDelCount();
assert fullDelCount <= segState.rld.info.info.maxDoc() : fullDelCount + " > " + segState.rld.info.info.maxDoc(); assert fullDelCount <= segState.rld.info.info.maxDoc() : fullDelCount + " > " + segState.rld.info.info.maxDoc();
if (segState.rld.isFullyDeleted() && writer.getConfig().getMergePolicy().keepFullyDeletedSegment(() -> segState.reader) == false) { if (segState.rld.isFullyDeleted() && writer.getConfig().getMergePolicy().keepFullyDeletedSegment(() -> segState.reader) == false) {
if (allDeleted == null) { if (allDeleted == null) {
allDeleted = new ArrayList<>(); allDeleted = new ArrayList<>();
}
allDeleted.add(segState.reader.getOriginalSegmentInfo());
} }
allDeleted.add(segState.reader.getOriginalSegmentInfo());
} }
} }
} finally {
IOUtils.close(segStates);
} }
IOUtils.close(segmentStates);
if (writer.infoStream.isEnabled("BD")) { if (writer.infoStream.isEnabled("BD")) {
writer.infoStream.message("BD", "closeSegmentStates: " + totDelCount + " new deleted documents; pool " + writer.getPendingUpdatesCount()+ " packets; bytesUsed=" + writer.getReaderPoolRamBytesUsed()); writer.infoStream.message("BD", "closeSegmentStates: " + totDelCount + " new deleted documents; pool " + writer.getPendingUpdatesCount()+ " packets; bytesUsed=" + writer.getReaderPoolRamBytesUsed());
} }

View File

@ -39,6 +39,7 @@ import java.util.Map;
import java.util.Random; import java.util.Random;
import java.util.Set; import java.util.Set;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Predicate; import java.util.function.Predicate;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -76,6 +77,8 @@ import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.MatchAllDocsQuery; import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.lucene.search.PhraseQuery; import org.apache.lucene.search.PhraseQuery;
import org.apache.lucene.search.ScoreDoc; import org.apache.lucene.search.ScoreDoc;
import org.apache.lucene.search.SearcherFactory;
import org.apache.lucene.search.SearcherManager;
import org.apache.lucene.search.TermQuery; import org.apache.lucene.search.TermQuery;
import org.apache.lucene.search.TopDocs; import org.apache.lucene.search.TopDocs;
import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.store.AlreadyClosedException;
@ -3730,4 +3733,50 @@ public class TestIndexWriter extends LuceneTestCase {
dir.close(); dir.close();
} }
public void testRefreshAndRollbackConcurrently() throws Exception {
Directory dir = newDirectory();
IndexWriter w = new IndexWriter(dir, newIndexWriterConfig());
AtomicBoolean stopped = new AtomicBoolean();
Semaphore indexedDocs = new Semaphore(0);
Thread indexer = new Thread(() -> {
while (stopped.get() == false) {
try {
String id = Integer.toString(random().nextInt(100));
Document doc = new Document();
doc.add(new StringField("id", id, Field.Store.YES));
w.updateDocument(new Term("id", id), doc);
indexedDocs.release(1);
} catch (IOException e) {
throw new AssertionError(e);
} catch (AlreadyClosedException ignored) {
return;
}
}
});
SearcherManager sm = new SearcherManager(w, new SearcherFactory());
Thread refresher = new Thread(() -> {
while (stopped.get() == false) {
try {
sm.maybeRefreshBlocking();
} catch (IOException e) {
throw new AssertionError(e);
} catch (AlreadyClosedException ignored) {
return;
}
}
});
try {
indexer.start();
refresher.start();
indexedDocs.acquire(1 + random().nextInt(100));
w.rollback();
} finally {
stopped.set(true);
indexer.join();
refresher.join();
IOUtils.close(sm, dir);
}
}
} }