LUCENE-9276: Use same code-path for updateDocuments and updateDocument (#1346)

Today we have a large amount of duplicated code that is rather of
complex nature. This change consolidates the code-paths to always
use the updateDocuments path.
This commit is contained in:
Simon Willnauer 2020-03-13 20:33:15 +01:00 committed by GitHub
parent 74721fa4c6
commit c0cf7bb4b0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 17 additions and 147 deletions

View File

@ -114,7 +114,8 @@ New Features
Improvements
---------------------
(No changes)
* LUCENE-9276: Use same code-path for updateDocuments and updateDocument in IndexWriter and
DocumentsWriter. (Simon Willnauer)
Optimizations
---------------------

View File

@ -474,51 +474,6 @@ final class DocumentsWriter implements Closeable, Accountable {
return seqNo;
}
long updateDocument(final Iterable<? extends IndexableField> doc, final Analyzer analyzer,
final DocumentsWriterDeleteQueue.Node<?> delNode) throws IOException {
boolean hasEvents = preUpdate();
final ThreadState perThread = flushControl.obtainAndLock();
final DocumentsWriterPerThread flushingDWPT;
long seqNo;
try {
// This must happen after we've pulled the ThreadState because IW.close
// waits for all ThreadStates to be released:
ensureOpen();
ensureInitialized(perThread);
assert perThread.isInitialized();
final DocumentsWriterPerThread dwpt = perThread.dwpt;
final int dwptNumDocs = dwpt.getNumDocsInRAM();
try {
seqNo = dwpt.updateDocument(doc, analyzer, delNode, flushNotifications);
} finally {
if (dwpt.isAborted()) {
flushControl.doOnAbort(perThread);
}
// We don't know whether the document actually
// counted as being indexed, so we must subtract here to
// accumulate our separate counter:
numDocsInRAM.addAndGet(dwpt.getNumDocsInRAM() - dwptNumDocs);
}
final boolean isUpdate = delNode != null && delNode.isDelete();
flushingDWPT = flushControl.doAfterDocument(perThread, isUpdate);
assert seqNo > perThread.lastSeqNo: "seqNo=" + seqNo + " lastSeqNo=" + perThread.lastSeqNo;
perThread.lastSeqNo = seqNo;
} finally {
perThreadPool.release(perThread);
}
if (postUpdate(flushingDWPT, hasEvents)) {
seqNo = -seqNo;
}
return seqNo;
}
private boolean doFlush(DocumentsWriterPerThread flushingDWPT) throws IOException {
boolean hasEvents = false;
while (flushingDWPT != null) {

View File

@ -226,46 +226,6 @@ final class DocumentsWriterPerThread {
}
}
public long updateDocument(Iterable<? extends IndexableField> doc, Analyzer analyzer, DocumentsWriterDeleteQueue.Node<?> deleteNode, DocumentsWriter.FlushNotifications flushNotifications) throws IOException {
try {
assert hasHitAbortingException() == false: "DWPT has hit aborting exception but is still indexing";
testPoint("DocumentsWriterPerThread addDocument start");
assert deleteQueue != null;
reserveOneDoc();
docState.doc = doc;
docState.analyzer = analyzer;
docState.docID = numDocsInRAM;
if (INFO_VERBOSE && infoStream.isEnabled("DWPT")) {
infoStream.message("DWPT", Thread.currentThread().getName() + " update delTerm=" + deleteNode + " docID=" + docState.docID + " seg=" + segmentInfo.name);
}
// Even on exception, the document is still added (but marked
// deleted), so we don't need to un-reserve at that point.
// Aborting exceptions will actually "lose" more than one
// document, so the counter will be "wrong" in that case, but
// it's very hard to fix (we can't easily distinguish aborting
// vs non-aborting exceptions):
boolean success = false;
try {
try {
consumer.processDocument();
} finally {
docState.clear();
}
success = true;
} finally {
if (!success) {
// mark document as deleted
deleteDocID(docState.docID);
numDocsInRAM++;
}
}
return finishDocument(deleteNode);
} finally {
maybeAbort("updateDocument", flushNotifications);
}
}
public long updateDocuments(Iterable<? extends Iterable<? extends IndexableField>> docs, Analyzer analyzer, DocumentsWriterDeleteQueue.Node<?> deleteNode, DocumentsWriter.FlushNotifications flushNotifications) throws IOException {
try {
testPoint("DocumentsWriterPerThread addDocuments start");
@ -306,28 +266,7 @@ final class DocumentsWriterPerThread {
numDocsInRAM++;
}
allDocsIndexed = true;
// Apply delTerm only after all indexing has
// succeeded, but apply it only to docs prior to when
// this batch started:
long seqNo;
if (deleteNode != null) {
seqNo = deleteQueue.add(deleteNode, deleteSlice);
assert deleteSlice.isTail(deleteNode) : "expected the delete term as the tail item";
deleteSlice.apply(pendingUpdates, numDocsInRAM - docCount);
return seqNo;
} else {
seqNo = deleteQueue.updateSlice(deleteSlice);
if (seqNo < 0) {
seqNo = -seqNo;
deleteSlice.apply(pendingUpdates, numDocsInRAM - docCount);
} else {
deleteSlice.reset();
}
}
return seqNo;
return finishDocuments(deleteNode, docCount);
} finally {
if (!allDocsIndexed && !aborted) {
// the iterator threw an exception that is not aborting
@ -346,7 +285,7 @@ final class DocumentsWriterPerThread {
}
}
private long finishDocument(DocumentsWriterDeleteQueue.Node<?> deleteNode) {
private long finishDocuments(DocumentsWriterDeleteQueue.Node<?> deleteNode, int docCount) {
/*
* here we actually finish the document in two steps 1. push the delete into
* the queue and update our slice. 2. increment the DWPT private document
@ -355,27 +294,24 @@ final class DocumentsWriterPerThread {
* the updated slice we get from 1. holds all the deletes that have occurred
* since we updated the slice the last time.
*/
boolean applySlice = numDocsInRAM != 0;
// Apply delTerm only after all indexing has
// succeeded, but apply it only to docs prior to when
// this batch started:
long seqNo;
if (deleteNode != null) {
seqNo = deleteQueue.add(deleteNode, deleteSlice);
assert deleteSlice.isTail(deleteNode) : "expected the delete node as the tail";
assert deleteSlice.isTail(deleteNode) : "expected the delete term as the tail item";
deleteSlice.apply(pendingUpdates, numDocsInRAM - docCount);
return seqNo;
} else {
seqNo = deleteQueue.updateSlice(deleteSlice);
if (seqNo < 0) {
seqNo = -seqNo;
deleteSlice.apply(pendingUpdates, numDocsInRAM - docCount);
} else {
applySlice = false;
}
}
if (applySlice) {
deleteSlice.apply(pendingUpdates, numDocsInRAM);
} else { // if we don't need to apply we must reset!
deleteSlice.reset();
}
++numDocsInRAM;
}
return seqNo;
}

View File

@ -1289,7 +1289,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
* @throws IOException if there is a low-level IO error
*/
public long addDocument(Iterable<? extends IndexableField> doc) throws IOException {
return updateDocument((DocumentsWriterDeleteQueue.Node<?>) null, doc);
return updateDocument(null, doc);
}
/**
@ -1647,31 +1647,9 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
* @throws IOException if there is a low-level IO error
*/
public long updateDocument(Term term, Iterable<? extends IndexableField> doc) throws IOException {
return updateDocument(term == null ? null : DocumentsWriterDeleteQueue.newNode(term), doc);
return updateDocuments(term == null ? null : DocumentsWriterDeleteQueue.newNode(term), List.of(doc));
}
private long updateDocument(final DocumentsWriterDeleteQueue.Node<?> delNode,
Iterable<? extends IndexableField> doc) throws IOException {
ensureOpen();
boolean success = false;
try {
final long seqNo = maybeProcessEvents(docWriter.updateDocument(doc, analyzer, delNode));
success = true;
return seqNo;
} catch (VirtualMachineError tragedy) {
tragicEvent(tragedy, "updateDocument");
throw tragedy;
} finally {
if (success == false) {
if (infoStream.isEnabled("IW")) {
infoStream.message("IW", "hit exception updating document");
}
}
maybeCloseOnTragicEvent();
}
}
/**
* Expert:
* Updates a document by first updating the document(s)
@ -1711,7 +1689,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
if (softDeletes == null || softDeletes.length == 0) {
throw new IllegalArgumentException("at least one soft delete must be present");
}
return updateDocument(DocumentsWriterDeleteQueue.newNode(buildDocValuesUpdate(term, softDeletes)), doc);
return updateDocuments(DocumentsWriterDeleteQueue.newNode(buildDocValuesUpdate(term, softDeletes)), List.of(doc));
}

View File

@ -341,7 +341,7 @@ public class TestIndexWriterExceptions extends LuceneTestCase {
@Override
public void apply(String name) {
if (doFail && name.equals("DocumentsWriterPerThread addDocument start"))
if (doFail && name.equals("DocumentsWriterPerThread addDocuments start"))
throw new RuntimeException("intentionally failing");
}
}