diff --git a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java index d368e88c323..8b652827849 100644 --- a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java +++ b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java @@ -145,20 +145,6 @@ final class DocumentsWriterFlushControl implements Accountable, Closeable { return true; } - private synchronized void commitPerThreadBytes(DocumentsWriterPerThread perThread) { - final long delta = perThread.commitLastBytesUsed(); - /* - * We need to differentiate here if we are pending since setFlushPending - * moves the perThread memory to the flushBytes and we could be set to - * pending during a delete - */ - if (perThread.isFlushPending()) { - flushBytes += delta; - } else { - activeBytes += delta; - } - assert updatePeaks(delta); - } // only for asserts private boolean updatePeaks(long delta) { @@ -170,25 +156,42 @@ final class DocumentsWriterFlushControl implements Accountable, Closeable { return true; } - synchronized DocumentsWriterPerThread doAfterDocument(DocumentsWriterPerThread perThread, boolean isUpdate) { - try { - commitPerThreadBytes(perThread); - if (!perThread.isFlushPending()) { - if (isUpdate) { - flushPolicy.onUpdate(this, perThread); + DocumentsWriterPerThread doAfterDocument(DocumentsWriterPerThread perThread, boolean isUpdate) { + final long delta = perThread.getCommitLastBytesUsedDelta(); + synchronized (this) { + // we need to commit this under lock but calculate it outside of the lock to minimize the time this lock is held + // per document. The reason we update this under lock is that we mark DWPTs as pending without acquiring it's + // lock in #setFlushPending and this also reads the committed bytes and modifies the flush/activeBytes. + // In the future we can clean this up to be more intuitive. + perThread.commitLastBytesUsed(delta); + try { + /* + * We need to differentiate here if we are pending since setFlushPending + * moves the perThread memory to the flushBytes and we could be set to + * pending during a delete + */ + if (perThread.isFlushPending()) { + flushBytes += delta; + assert updatePeaks(delta); } else { - flushPolicy.onInsert(this, perThread); - } - if (!perThread.isFlushPending() && perThread.ramBytesUsed() > hardMaxBytesPerDWPT) { - // Safety check to prevent a single DWPT exceeding its RAM limit. This - // is super important since we can not address more than 2048 MB per DWPT - setFlushPending(perThread); + activeBytes += delta; + assert updatePeaks(delta); + if (isUpdate) { + flushPolicy.onUpdate(this, perThread); + } else { + flushPolicy.onInsert(this, perThread); + } + if (!perThread.isFlushPending() && perThread.ramBytesUsed() > hardMaxBytesPerDWPT) { + // Safety check to prevent a single DWPT exceeding its RAM limit. This + // is super important since we can not address more than 2048 MB per DWPT + setFlushPending(perThread); + } } + return checkout(perThread, false); + } finally { + boolean stalled = updateStallState(); + assert assertNumDocsSinceStalled(stalled) && assertMemory(); } - return checkout(perThread, false); - } finally { - boolean stalled = updateStallState(); - assert assertNumDocsSinceStalled(stalled) && assertMemory(); } } diff --git a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java index d8ae3754ec0..70cb04def0e 100644 --- a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java +++ b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java @@ -540,12 +540,21 @@ final class DocumentsWriterPerThread implements Accountable { /** * Commits the current {@link #ramBytesUsed()} and stores it's value for later reuse. * The last committed bytes used can be retrieved via {@link #getLastCommittedBytesUsed()} + */ + void commitLastBytesUsed(long delta) { + assert isHeldByCurrentThread(); + assert getCommitLastBytesUsedDelta() == delta : "delta has changed"; + lastCommittedBytesUsed += delta; + } + + /** + * Calculates the delta between the last committed bytes used and the currently used ram. + * @see #commitLastBytesUsed(long) * @return the delta between the current {@link #ramBytesUsed()} and the current {@link #getLastCommittedBytesUsed()} */ - long commitLastBytesUsed() { + long getCommitLastBytesUsedDelta() { assert isHeldByCurrentThread(); long delta = ramBytesUsed() - lastCommittedBytesUsed; - lastCommittedBytesUsed += delta; return delta; }