From c258905bd01f458df4924e361b2395f06e387b88 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Thu, 24 Sep 2020 09:39:33 +0200 Subject: [PATCH] LUCENE-9535: Commit DWPT bytes used before locking indexing (#1918) Currently we calculate the ramBytesUsed by the DWPT under the flushControl lock. We can do this calculation safely outside of the lock without any downside. The FlushControl lock should be used with care since it's a central part of indexing and might block all indexing. --- .../index/DocumentsWriterFlushControl.java | 63 ++++++++++--------- .../index/DocumentsWriterPerThread.java | 13 +++- 2 files changed, 44 insertions(+), 32 deletions(-) diff --git a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java index d368e88c323..8b652827849 100644 --- a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java +++ b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java @@ -145,20 +145,6 @@ final class DocumentsWriterFlushControl implements Accountable, Closeable { return true; } - private synchronized void commitPerThreadBytes(DocumentsWriterPerThread perThread) { - final long delta = perThread.commitLastBytesUsed(); - /* - * We need to differentiate here if we are pending since setFlushPending - * moves the perThread memory to the flushBytes and we could be set to - * pending during a delete - */ - if (perThread.isFlushPending()) { - flushBytes += delta; - } else { - activeBytes += delta; - } - assert updatePeaks(delta); - } // only for asserts private boolean updatePeaks(long delta) { @@ -170,25 +156,42 @@ final class DocumentsWriterFlushControl implements Accountable, Closeable { return true; } - synchronized DocumentsWriterPerThread doAfterDocument(DocumentsWriterPerThread perThread, boolean isUpdate) { - try { - commitPerThreadBytes(perThread); - if (!perThread.isFlushPending()) { - if (isUpdate) { - flushPolicy.onUpdate(this, perThread); + DocumentsWriterPerThread doAfterDocument(DocumentsWriterPerThread perThread, boolean isUpdate) { + final long delta = perThread.getCommitLastBytesUsedDelta(); + synchronized (this) { + // we need to commit this under lock but calculate it outside of the lock to minimize the time this lock is held + // per document. The reason we update this under lock is that we mark DWPTs as pending without acquiring it's + // lock in #setFlushPending and this also reads the committed bytes and modifies the flush/activeBytes. + // In the future we can clean this up to be more intuitive. + perThread.commitLastBytesUsed(delta); + try { + /* + * We need to differentiate here if we are pending since setFlushPending + * moves the perThread memory to the flushBytes and we could be set to + * pending during a delete + */ + if (perThread.isFlushPending()) { + flushBytes += delta; + assert updatePeaks(delta); } else { - flushPolicy.onInsert(this, perThread); - } - if (!perThread.isFlushPending() && perThread.ramBytesUsed() > hardMaxBytesPerDWPT) { - // Safety check to prevent a single DWPT exceeding its RAM limit. This - // is super important since we can not address more than 2048 MB per DWPT - setFlushPending(perThread); + activeBytes += delta; + assert updatePeaks(delta); + if (isUpdate) { + flushPolicy.onUpdate(this, perThread); + } else { + flushPolicy.onInsert(this, perThread); + } + if (!perThread.isFlushPending() && perThread.ramBytesUsed() > hardMaxBytesPerDWPT) { + // Safety check to prevent a single DWPT exceeding its RAM limit. This + // is super important since we can not address more than 2048 MB per DWPT + setFlushPending(perThread); + } } + return checkout(perThread, false); + } finally { + boolean stalled = updateStallState(); + assert assertNumDocsSinceStalled(stalled) && assertMemory(); } - return checkout(perThread, false); - } finally { - boolean stalled = updateStallState(); - assert assertNumDocsSinceStalled(stalled) && assertMemory(); } } diff --git a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java index d8ae3754ec0..70cb04def0e 100644 --- a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java +++ b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java @@ -540,12 +540,21 @@ final class DocumentsWriterPerThread implements Accountable { /** * Commits the current {@link #ramBytesUsed()} and stores it's value for later reuse. * The last committed bytes used can be retrieved via {@link #getLastCommittedBytesUsed()} + */ + void commitLastBytesUsed(long delta) { + assert isHeldByCurrentThread(); + assert getCommitLastBytesUsedDelta() == delta : "delta has changed"; + lastCommittedBytesUsed += delta; + } + + /** + * Calculates the delta between the last committed bytes used and the currently used ram. + * @see #commitLastBytesUsed(long) * @return the delta between the current {@link #ramBytesUsed()} and the current {@link #getLastCommittedBytesUsed()} */ - long commitLastBytesUsed() { + long getCommitLastBytesUsedDelta() { assert isHeldByCurrentThread(); long delta = ramBytesUsed() - lastCommittedBytesUsed; - lastCommittedBytesUsed += delta; return delta; }