diff --git a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java index d7c2ba2eee9..7a876b2e48d 100644 --- a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java +++ b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java @@ -46,6 +46,7 @@ final class DocumentsWriterFlushControl { private long activeBytes = 0; private long flushBytes = 0; private volatile int numPending = 0; + private int numDocsSinceStalled = 0; // only with assert final AtomicBoolean flushDeletes = new AtomicBoolean(false); private boolean fullFlush = false; private final Queue flushQueue = new LinkedList(); @@ -104,8 +105,8 @@ final class DocumentsWriterFlushControl { // 2 * ramBufferBytes -> before we stall we need to cross the 2xRAM Buffer border this is still a valid limit // (numPending + numFlushingDWPT() + numBlockedFlushes()) * peakDelta) -> those are the total number of DWPT that are not active but not yet fully fluhsed // all of them could theoretically be taken out of the loop once they crossed the RAM buffer and the last document was the peak delta - // (perThreadPool.getActiveThreadState() * peakDelta) -> at any given time there could be n threads in flight that crossed the stall control before we reached the limit and each of them could hold a peak document - final long expected = (2 * (ramBufferBytes)) + ((numPending + numFlushingDWPT() + numBlockedFlushes()) * peakDelta) + (perThreadPool.getActiveThreadState() * peakDelta); + // (numDocsSinceStalled * peakDelta) -> at any given time there could be n threads in flight that crossed the stall control before we reached the limit and each of them could hold a peak document + final long expected = (2 * (ramBufferBytes)) + ((numPending + numFlushingDWPT() + numBlockedFlushes()) * peakDelta) + (numDocsSinceStalled * peakDelta); // the expected ram consumption is an upper bound at this point and not really the expected consumption if (peakDelta < (ramBufferBytes >> 1)) { /* @@ -183,10 +184,26 @@ final class DocumentsWriterFlushControl { } return flushingDWPT; } finally { - updateStallState(); - assert assertMemory(); + boolean stalled = updateStallState(); + assert assertNumDocsSinceStalled(stalled) && assertMemory(); } } + + private boolean assertNumDocsSinceStalled(boolean stalled) { + /* + * updates the number of documents "finished" while we are in a stalled state. + * this is important for asserting memory upper bounds since it corresponds + * to the number of threads that are in-flight and crossed the stall control + * check before we actually stalled. + * see #assertMemory() + */ + if (stalled) { + numDocsSinceStalled++; + } else { + numDocsSinceStalled = 0; + } + return true; + } synchronized void doAfterFlush(DocumentsWriterPerThread dwpt) { assert flushingWriters.containsKey(dwpt); @@ -204,7 +221,7 @@ final class DocumentsWriterFlushControl { } } - private final void updateStallState() { + private final boolean updateStallState() { assert Thread.holdsLock(this); final long limit = stallLimitBytes(); @@ -219,6 +236,7 @@ final class DocumentsWriterFlushControl { (activeBytes < limit) && !closed; stallControl.updateStalled(stall); + return stall; } public synchronized void waitForFlush() {