Fix race condition on flush for DWPT seqNo generation (#13627)

There is a tricky race condition with DWPT threads. It is possible that a flush starts by advancing the deleteQueue (in charge of creating seqNo). Thus, the referenced deleteQueue, there should be a cap on the number of actions left. 

However, it is possible after the advance, but before the DWPT are actually marked for flush, the DWPT gets freed and taken again to be used.

To replicate this extreme behavior, see: https://github.com/apache/lucene/compare/main...benwtrent:lucene:test-replicate-and-debug-13127?expand=1

This commit will prevent DWPT from being added back to the free list if their queue has been advanced. This is because the `maxSeqNo` for that queue was created accounting only for the current number of active threads. If the thread gets passed out again and still references the already advanced queue, it is possible that seqNo actually advances past the set `maxSeqNo`. 


closes: https://github.com/apache/lucene/issues/13127
closes: https://github.com/apache/lucene/issues/13571
This commit is contained in:
Benjamin Trent 2024-08-07 13:31:00 -04:00 committed by GitHub
parent 926d8f4ce6
commit d26b152117
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 26 additions and 5 deletions

View File

@ -360,6 +360,8 @@ Bug Fixes
* GITHUB#13615: Correct scalar quantization when used in conjunction with COSINE similarity. Vectors are normalized
before quantization to ensure the cosine similarity is correctly calculated. (Ben Trent)
* GITHUB#13627: Fix race condition on flush for DWPT seqNo generation. (Ben Trent, Ao Li)
Other
--------------------
(No changes)

View File

@ -430,10 +430,16 @@ final class DocumentsWriter implements Closeable, Accountable {
}
flushingDWPT = flushControl.doAfterDocument(dwpt);
} finally {
if (dwpt.isFlushPending() || dwpt.isAborted()) {
dwpt.unlock();
} else {
perThreadPool.marksAsFreeAndUnlock(dwpt);
// If a flush is occurring, we don't want to allow this dwpt to be reused
// If it is aborted, we shouldn't allow it to be reused
// If the deleteQueue is advanced, this means the maximum seqNo has been set and it cannot be
// reused
synchronized (flushControl) {
if (dwpt.isFlushPending() || dwpt.isAborted() || dwpt.isQueueAdvanced()) {
dwpt.unlock();
} else {
perThreadPool.marksAsFreeAndUnlock(dwpt);
}
}
assert dwpt.isHeldByCurrentThread() == false : "we didn't release the dwpt even on abort";
}

View File

@ -636,7 +636,7 @@ final class DocumentsWriterDeleteQueue implements Accountable, Closeable {
}
/** Returns <code>true</code> if it was advanced. */
boolean isAdvanced() {
synchronized boolean isAdvanced() {
return advanced;
}
}

View File

@ -718,6 +718,10 @@ final class DocumentsWriterPerThread implements Accountable, Lock {
return flushPending.get() == Boolean.TRUE;
}
boolean isQueueAdvanced() {
return deleteQueue.isAdvanced();
}
/** Sets this DWPT as flush pending. This can only be set once. */
void setFlushPending() {
flushPending.set(Boolean.TRUE);

View File

@ -138,6 +138,15 @@ final class DocumentsWriterPerThreadPool implements Iterable<DocumentsWriterPerT
void marksAsFreeAndUnlock(DocumentsWriterPerThread state) {
final long ramBytesUsed = state.ramBytesUsed();
assert state.isFlushPending() == false
&& state.isAborted() == false
&& state.isQueueAdvanced() == false
: "DWPT has pending flush: "
+ state.isFlushPending()
+ " aborted="
+ state.isAborted()
+ " queueAdvanced="
+ state.isQueueAdvanced();
assert contains(state)
: "we tried to add a DWPT back to the pool but the pool doesn't know about this DWPT";
freeList.addAndUnlock(state, ramBytesUsed);