LUCENE-9535: Commit DWPT bytes used before locking indexing (#1918)

Currently we calculate the ramBytesUsed by the DWPT under the flushControl
lock. We can do this calculation safely outside of the lock without any downside.
The FlushControl lock should be used with care since it's a central part of indexing
and might block all indexing.
This commit is contained in:
Simon Willnauer 2020-09-24 09:39:33 +02:00 committed by GitHub
parent cafa449769
commit c258905bd0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 44 additions and 32 deletions

View File

@ -145,20 +145,6 @@ final class DocumentsWriterFlushControl implements Accountable, Closeable {
return true;
}
private synchronized void commitPerThreadBytes(DocumentsWriterPerThread perThread) {
final long delta = perThread.commitLastBytesUsed();
/*
* We need to differentiate here if we are pending since setFlushPending
* moves the perThread memory to the flushBytes and we could be set to
* pending during a delete
*/
if (perThread.isFlushPending()) {
flushBytes += delta;
} else {
activeBytes += delta;
}
assert updatePeaks(delta);
}
// only for asserts
private boolean updatePeaks(long delta) {
@ -170,25 +156,42 @@ final class DocumentsWriterFlushControl implements Accountable, Closeable {
return true;
}
synchronized DocumentsWriterPerThread doAfterDocument(DocumentsWriterPerThread perThread, boolean isUpdate) {
try {
commitPerThreadBytes(perThread);
if (!perThread.isFlushPending()) {
if (isUpdate) {
flushPolicy.onUpdate(this, perThread);
DocumentsWriterPerThread doAfterDocument(DocumentsWriterPerThread perThread, boolean isUpdate) {
final long delta = perThread.getCommitLastBytesUsedDelta();
synchronized (this) {
// we need to commit this under lock but calculate it outside of the lock to minimize the time this lock is held
// per document. The reason we update this under lock is that we mark DWPTs as pending without acquiring it's
// lock in #setFlushPending and this also reads the committed bytes and modifies the flush/activeBytes.
// In the future we can clean this up to be more intuitive.
perThread.commitLastBytesUsed(delta);
try {
/*
* We need to differentiate here if we are pending since setFlushPending
* moves the perThread memory to the flushBytes and we could be set to
* pending during a delete
*/
if (perThread.isFlushPending()) {
flushBytes += delta;
assert updatePeaks(delta);
} else {
flushPolicy.onInsert(this, perThread);
}
if (!perThread.isFlushPending() && perThread.ramBytesUsed() > hardMaxBytesPerDWPT) {
// Safety check to prevent a single DWPT exceeding its RAM limit. This
// is super important since we can not address more than 2048 MB per DWPT
setFlushPending(perThread);
activeBytes += delta;
assert updatePeaks(delta);
if (isUpdate) {
flushPolicy.onUpdate(this, perThread);
} else {
flushPolicy.onInsert(this, perThread);
}
if (!perThread.isFlushPending() && perThread.ramBytesUsed() > hardMaxBytesPerDWPT) {
// Safety check to prevent a single DWPT exceeding its RAM limit. This
// is super important since we can not address more than 2048 MB per DWPT
setFlushPending(perThread);
}
}
return checkout(perThread, false);
} finally {
boolean stalled = updateStallState();
assert assertNumDocsSinceStalled(stalled) && assertMemory();
}
return checkout(perThread, false);
} finally {
boolean stalled = updateStallState();
assert assertNumDocsSinceStalled(stalled) && assertMemory();
}
}

View File

@ -540,12 +540,21 @@ final class DocumentsWriterPerThread implements Accountable {
/**
* Commits the current {@link #ramBytesUsed()} and stores it's value for later reuse.
* The last committed bytes used can be retrieved via {@link #getLastCommittedBytesUsed()}
*/
void commitLastBytesUsed(long delta) {
assert isHeldByCurrentThread();
assert getCommitLastBytesUsedDelta() == delta : "delta has changed";
lastCommittedBytesUsed += delta;
}
/**
* Calculates the delta between the last committed bytes used and the currently used ram.
* @see #commitLastBytesUsed(long)
* @return the delta between the current {@link #ramBytesUsed()} and the current {@link #getLastCommittedBytesUsed()}
*/
long commitLastBytesUsed() {
long getCommitLastBytesUsedDelta() {
assert isHeldByCurrentThread();
long delta = ramBytesUsed() - lastCommittedBytesUsed;
lastCommittedBytesUsed += delta;
return delta;
}