reduce IW.infoStream noise when stalling happens due to too many total bytes flushing; only notifyAll in stall/unstall when it changes

This commit is contained in:
Mike McCandless 2016-07-25 19:17:28 -04:00
parent 501d73b4b8
commit 30db9e72ba
4 changed files with 28 additions and 42 deletions

View File

@ -376,9 +376,6 @@ final class DocumentsWriter implements Closeable, Accountable {
boolean hasEvents = false;
if (flushControl.anyStalledThreads() || flushControl.numQueuedFlushes() > 0) {
// Help out flushing any queued DWPTs so we can un-stall:
if (infoStream.isEnabled("DW")) {
infoStream.message("DW", "DocumentsWriter has queued dwpt; will hijack this thread to flush pending segment(s)");
}
do {
// Try pick up pending threads here if possible
DocumentsWriterPerThread flushingDWPT;
@ -386,17 +383,9 @@ final class DocumentsWriter implements Closeable, Accountable {
// Don't push the delete here since the update could fail!
hasEvents |= doFlush(flushingDWPT);
}
if (infoStream.isEnabled("DW") && flushControl.anyStalledThreads()) {
infoStream.message("DW", "WARNING DocumentsWriter has stalled threads; waiting");
}
flushControl.waitIfStalled(); // block if stalled
} while (flushControl.numQueuedFlushes() != 0); // still queued DWPTs try help flushing
if (infoStream.isEnabled("DW")) {
infoStream.message("DW", "continue indexing after helping out flushing DocumentsWriter is healthy");
}
}
return hasEvents;
}

View File

@ -22,6 +22,7 @@ import java.util.IdentityHashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Locale;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicBoolean;
@ -74,7 +75,7 @@ final class DocumentsWriterFlushControl implements Accountable {
DocumentsWriterFlushControl(DocumentsWriter documentsWriter, LiveIndexWriterConfig config, BufferedUpdatesStream bufferedUpdatesStream) {
this.infoStream = config.getInfoStream();
this.stallControl = new DocumentsWriterStallControl(config);
this.stallControl = new DocumentsWriterStallControl();
this.perThreadPool = documentsWriter.perThreadPool;
this.flushPolicy = documentsWriter.flushPolicy;
this.config = config;
@ -230,7 +231,9 @@ final class DocumentsWriterFlushControl implements Accountable {
}
}
}
private long stallStartNS;
private boolean updateStallState() {
assert Thread.holdsLock(this);
@ -245,6 +248,20 @@ final class DocumentsWriterFlushControl implements Accountable {
final boolean stall = (activeBytes + flushBytes) > limit &&
activeBytes < limit &&
!closed;
if (infoStream.isEnabled("DWFC")) {
if (stall != stallControl.anyStalledThreads()) {
if (stall) {
infoStream.message("DW", String.format(Locale.ROOT, "now stalling flushes: netBytes: %.1f MB flushBytes: %.1f MB fullFlush: %b",
netBytes()/1024./1024., flushBytes()/1024./1024., fullFlush));
stallStartNS = System.nanoTime();
} else {
infoStream.message("DW", String.format(Locale.ROOT, "done stalling flushes for %.1f msec: netBytes: %.1f MB flushBytes: %.1f MB fullFlush: %b",
(System.nanoTime()-stallStartNS)/1000000., netBytes()/1024./1024., flushBytes()/1024./1024., fullFlush));
}
}
}
stallControl.updateStalled(stall);
return stall;
}
@ -687,12 +704,6 @@ final class DocumentsWriterFlushControl implements Accountable {
* checked out DWPT are available
*/
void waitIfStalled() {
if (infoStream.isEnabled("DWFC")) {
infoStream.message("DWFC",
"waitIfStalled: numFlushesPending: " + flushQueue.size()
+ " netBytes: " + netBytes() + " flushBytes: " + flushBytes()
+ " fullFlush: " + fullFlush);
}
stallControl.waitIfStalled();
}

View File

@ -20,7 +20,6 @@ import java.util.IdentityHashMap;
import java.util.Map;
import org.apache.lucene.index.DocumentsWriterPerThreadPool.ThreadState;
import org.apache.lucene.util.InfoStream;
import org.apache.lucene.util.ThreadInterruptedException;
/**
@ -44,12 +43,7 @@ final class DocumentsWriterStallControl {
private int numWaiting; // only with assert
private boolean wasStalled; // only with assert
private final Map<Thread, Boolean> waiting = new IdentityHashMap<>(); // only with assert
private final InfoStream infoStream;
DocumentsWriterStallControl(LiveIndexWriterConfig iwc) {
infoStream = iwc.getInfoStream();
}
/**
* Update the stalled flag status. This method will set the stalled flag to
* <code>true</code> iff the number of flushing
@ -59,11 +53,13 @@ final class DocumentsWriterStallControl {
* waiting on {@link #waitIfStalled()}
*/
synchronized void updateStalled(boolean stalled) {
this.stalled = stalled;
if (stalled) {
wasStalled = true;
if (this.stalled != stalled) {
this.stalled = stalled;
if (stalled) {
wasStalled = true;
}
notifyAll();
}
notifyAll();
}
/**
@ -93,13 +89,7 @@ final class DocumentsWriterStallControl {
return stalled;
}
long stallStartNS;
private void incWaiters() {
stallStartNS = System.nanoTime();
if (infoStream.isEnabled("DW") && numWaiting == 0) {
infoStream.message("DW", "now stalling flushes");
}
numWaiting++;
assert waiting.put(Thread.currentThread(), Boolean.TRUE) == null;
assert numWaiting > 0;
@ -109,10 +99,6 @@ final class DocumentsWriterStallControl {
numWaiting--;
assert waiting.remove(Thread.currentThread()) != null;
assert numWaiting >= 0;
if (infoStream.isEnabled("DW") && numWaiting == 0) {
long stallEndNS = System.nanoTime();
infoStream.message("DW", "done stalling flushes for " + ((stallEndNS - stallStartNS)/1000000.0) + " ms");
}
}
synchronized boolean hasBlocked() { // for tests

View File

@ -32,7 +32,7 @@ import org.apache.lucene.util.ThreadInterruptedException;
public class TestDocumentsWriterStallControl extends LuceneTestCase {
public void testSimpleStall() throws InterruptedException {
DocumentsWriterStallControl ctrl = new DocumentsWriterStallControl(newIndexWriterConfig());
DocumentsWriterStallControl ctrl = new DocumentsWriterStallControl();
ctrl.updateStalled(false);
Thread[] waitThreads = waitThreads(atLeast(1), ctrl);
@ -54,7 +54,7 @@ public class TestDocumentsWriterStallControl extends LuceneTestCase {
}
public void testRandom() throws InterruptedException {
final DocumentsWriterStallControl ctrl = new DocumentsWriterStallControl(newIndexWriterConfig());
final DocumentsWriterStallControl ctrl = new DocumentsWriterStallControl();
ctrl.updateStalled(false);
Thread[] stallThreads = new Thread[atLeast(3)];
@ -95,7 +95,7 @@ public class TestDocumentsWriterStallControl extends LuceneTestCase {
}
public void testAccquireReleaseRace() throws InterruptedException {
final DocumentsWriterStallControl ctrl = new DocumentsWriterStallControl(newIndexWriterConfig());
final DocumentsWriterStallControl ctrl = new DocumentsWriterStallControl();
ctrl.updateStalled(false);
final AtomicBoolean stop = new AtomicBoolean(false);
final AtomicBoolean checkPoint = new AtomicBoolean(true);