mirror of https://github.com/apache/lucene.git
LUCENE-4561: count in-flight threads for asserting memory
git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1410326 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
54b8e81b1f
commit
fbad326573
|
@ -46,6 +46,7 @@ final class DocumentsWriterFlushControl {
|
||||||
private long activeBytes = 0;
|
private long activeBytes = 0;
|
||||||
private long flushBytes = 0;
|
private long flushBytes = 0;
|
||||||
private volatile int numPending = 0;
|
private volatile int numPending = 0;
|
||||||
|
private int numDocsSinceStalled = 0; // only with assert
|
||||||
final AtomicBoolean flushDeletes = new AtomicBoolean(false);
|
final AtomicBoolean flushDeletes = new AtomicBoolean(false);
|
||||||
private boolean fullFlush = false;
|
private boolean fullFlush = false;
|
||||||
private final Queue<DocumentsWriterPerThread> flushQueue = new LinkedList<DocumentsWriterPerThread>();
|
private final Queue<DocumentsWriterPerThread> flushQueue = new LinkedList<DocumentsWriterPerThread>();
|
||||||
|
@ -104,8 +105,8 @@ final class DocumentsWriterFlushControl {
|
||||||
// 2 * ramBufferBytes -> before we stall we need to cross the 2xRAM Buffer border this is still a valid limit
|
// 2 * ramBufferBytes -> before we stall we need to cross the 2xRAM Buffer border this is still a valid limit
|
||||||
// (numPending + numFlushingDWPT() + numBlockedFlushes()) * peakDelta) -> those are the total number of DWPT that are not active but not yet fully fluhsed
|
// (numPending + numFlushingDWPT() + numBlockedFlushes()) * peakDelta) -> those are the total number of DWPT that are not active but not yet fully fluhsed
|
||||||
// all of them could theoretically be taken out of the loop once they crossed the RAM buffer and the last document was the peak delta
|
// all of them could theoretically be taken out of the loop once they crossed the RAM buffer and the last document was the peak delta
|
||||||
// (perThreadPool.getActiveThreadState() * peakDelta) -> at any given time there could be n threads in flight that crossed the stall control before we reached the limit and each of them could hold a peak document
|
// (numDocsSinceStalled * peakDelta) -> at any given time there could be n threads in flight that crossed the stall control before we reached the limit and each of them could hold a peak document
|
||||||
final long expected = (2 * (ramBufferBytes)) + ((numPending + numFlushingDWPT() + numBlockedFlushes()) * peakDelta) + (perThreadPool.getActiveThreadState() * peakDelta);
|
final long expected = (2 * (ramBufferBytes)) + ((numPending + numFlushingDWPT() + numBlockedFlushes()) * peakDelta) + (numDocsSinceStalled * peakDelta);
|
||||||
// the expected ram consumption is an upper bound at this point and not really the expected consumption
|
// the expected ram consumption is an upper bound at this point and not really the expected consumption
|
||||||
if (peakDelta < (ramBufferBytes >> 1)) {
|
if (peakDelta < (ramBufferBytes >> 1)) {
|
||||||
/*
|
/*
|
||||||
|
@ -183,11 +184,27 @@ final class DocumentsWriterFlushControl {
|
||||||
}
|
}
|
||||||
return flushingDWPT;
|
return flushingDWPT;
|
||||||
} finally {
|
} finally {
|
||||||
updateStallState();
|
boolean stalled = updateStallState();
|
||||||
assert assertMemory();
|
assert assertNumDocsSinceStalled(stalled) && assertMemory();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private boolean assertNumDocsSinceStalled(boolean stalled) {
|
||||||
|
/*
|
||||||
|
* updates the number of documents "finished" while we are in a stalled state.
|
||||||
|
* this is important for asserting memory upper bounds since it corresponds
|
||||||
|
* to the number of threads that are in-flight and crossed the stall control
|
||||||
|
* check before we actually stalled.
|
||||||
|
* see #assertMemory()
|
||||||
|
*/
|
||||||
|
if (stalled) {
|
||||||
|
numDocsSinceStalled++;
|
||||||
|
} else {
|
||||||
|
numDocsSinceStalled = 0;
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
synchronized void doAfterFlush(DocumentsWriterPerThread dwpt) {
|
synchronized void doAfterFlush(DocumentsWriterPerThread dwpt) {
|
||||||
assert flushingWriters.containsKey(dwpt);
|
assert flushingWriters.containsKey(dwpt);
|
||||||
try {
|
try {
|
||||||
|
@ -204,7 +221,7 @@ final class DocumentsWriterFlushControl {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private final void updateStallState() {
|
private final boolean updateStallState() {
|
||||||
|
|
||||||
assert Thread.holdsLock(this);
|
assert Thread.holdsLock(this);
|
||||||
final long limit = stallLimitBytes();
|
final long limit = stallLimitBytes();
|
||||||
|
@ -219,6 +236,7 @@ final class DocumentsWriterFlushControl {
|
||||||
(activeBytes < limit) &&
|
(activeBytes < limit) &&
|
||||||
!closed;
|
!closed;
|
||||||
stallControl.updateStalled(stall);
|
stallControl.updateStalled(stall);
|
||||||
|
return stall;
|
||||||
}
|
}
|
||||||
|
|
||||||
public synchronized void waitForFlush() {
|
public synchronized void waitForFlush() {
|
||||||
|
|
Loading…
Reference in New Issue