LUCENE-10190: Ensure changes are visible before advancing seqno (#640)

DocumentWriter#anyChanges() can return false after we process and
generate a sequence number for an update operation; but before we adjust
the numDocsInRAM. In this window of time, refreshes are noop, although
the maxCompletedSequenceNumber has advanced.
This commit is contained in:
Nhat Nguyen 2022-02-08 10:29:20 -05:00 committed by GitHub
parent 5250186bd1
commit bcb70fd742
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 9 additions and 8 deletions

View File

@ -427,17 +427,13 @@ final class DocumentsWriter implements Closeable, Accountable {
// This must happen after we've pulled the DWPT because IW.close
// waits for all DWPT to be released:
ensureOpen();
final int dwptNumDocs = dwpt.getNumDocsInRAM();
try {
seqNo = dwpt.updateDocuments(docs, delNode, flushNotifications);
seqNo =
dwpt.updateDocuments(docs, delNode, flushNotifications, numDocsInRAM::incrementAndGet);
} finally {
if (dwpt.isAborted()) {
flushControl.doOnAbort(dwpt);
}
// We don't know how many documents were actually
// counted as indexed, so we must subtract here to
// accumulate our separate counter:
numDocsInRAM.addAndGet(dwpt.getNumDocsInRAM() - dwptNumDocs);
}
final boolean isUpdate = delNode != null && delNode.isDelete();
flushingDWPT = flushControl.doAfterDocument(dwpt, isUpdate);

View File

@ -209,7 +209,8 @@ final class DocumentsWriterPerThread implements Accountable {
long updateDocuments(
Iterable<? extends Iterable<? extends IndexableField>> docs,
DocumentsWriterDeleteQueue.Node<?> deleteNode,
DocumentsWriter.FlushNotifications flushNotifications)
DocumentsWriter.FlushNotifications flushNotifications,
Runnable onNewDocOnRAM)
throws IOException {
try {
testPoint("DocumentsWriterPerThread addDocuments start");
@ -236,7 +237,11 @@ final class DocumentsWriterPerThread implements Accountable {
// it's very hard to fix (we can't easily distinguish aborting
// vs non-aborting exceptions):
reserveOneDoc();
indexingChain.processDocument(numDocsInRAM++, doc);
try {
indexingChain.processDocument(numDocsInRAM++, doc);
} finally {
onNewDocOnRAM.run();
}
}
allDocsIndexed = true;
return finishDocuments(deleteNode, docsInRamBefore);