LUCENE-9508: Fix DocumentsWriter to block threads until unstalled (#2085)

DWStallControl expects the caller to loop on top of the wait call to make
progress with flushing if the DW is stalled. This logic wasn't applied such that
DW only stalled for one second and then released the indexing thread. This can cause
OOM if for instance during a full flush one DWPT gets stuck and onther threads keep on
indexing.
This commit is contained in:
Simon Willnauer 2020-11-24 13:05:21 +01:00 committed by GitHub
parent 7d54c28958
commit c71f119e9a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 60 additions and 12 deletions

View File

@ -250,6 +250,11 @@ Optimizations
Bug Fixes Bug Fixes
--------------------- ---------------------
* LUCENE-9508: DocumentsWriter was only stalling threads for 1 second allowing
documents to be indexed even the DocumentsWriter wasn't able to keep up flushing.
Unless IW can't make progress due to an ill behaving DWPT this issue was barely
noticeable. (Simon Willnauer)
* LUCENE-9581: Japanese tokenizer should discard the compound token instead of disabling the decomposition * LUCENE-9581: Japanese tokenizer should discard the compound token instead of disabling the decomposition
of long tokens when discardCompoundToken is activated. (Jim Ferenczi) of long tokens when discardCompoundToken is activated. (Jim Ferenczi)

View File

@ -371,19 +371,15 @@ final class DocumentsWriter implements Closeable, Accountable {
private boolean preUpdate() throws IOException { private boolean preUpdate() throws IOException {
ensureOpen(); ensureOpen();
boolean hasEvents = false; boolean hasEvents = false;
while (flushControl.anyStalledThreads() || (flushControl.numQueuedFlushes() > 0 && config.checkPendingFlushOnUpdate)) {
if (flushControl.anyStalledThreads() || (flushControl.numQueuedFlushes() > 0 && config.checkPendingFlushOnUpdate)) {
// Help out flushing any queued DWPTs so we can un-stall: // Help out flushing any queued DWPTs so we can un-stall:
do { // Try pick up pending threads here if possible
// Try pick up pending threads here if possible DocumentsWriterPerThread flushingDWPT;
DocumentsWriterPerThread flushingDWPT; while ((flushingDWPT = flushControl.nextPendingFlush()) != null) {
while ((flushingDWPT = flushControl.nextPendingFlush()) != null) { // Don't push the delete here since the update could fail!
// Don't push the delete here since the update could fail! hasEvents |= doFlush(flushingDWPT);
hasEvents |= doFlush(flushingDWPT); }
} flushControl.waitIfStalled(); // block if stalled
flushControl.waitIfStalled(); // block if stalled
} while (flushControl.numQueuedFlushes() != 0); // still queued DWPTs try help flushing
} }
return hasEvents; return hasEvents;
} }

View File

@ -104,6 +104,10 @@ final class DocumentsWriterStallControl {
return numWaiting > 0; return numWaiting > 0;
} }
synchronized int getNumWaiting() { // for tests
return numWaiting;
}
boolean isHealthy() { // for tests boolean isHealthy() { // for tests
return !stalled; // volatile read! return !stalled; // volatile read!
} }

View File

@ -4258,4 +4258,47 @@ public class TestIndexWriter extends LuceneTestCase {
} }
} }
} }
public void testIndexWriterBlocksOnStall() throws IOException, InterruptedException {
try (Directory dir = newDirectory()) {
try (IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig())) {
DocumentsWriterStallControl stallControl = writer.getDocsWriter().flushControl.stallControl;
stallControl.updateStalled(true);
Thread[] threads = new Thread[random().nextInt(3)+1];
AtomicLong numThreadsCompleted = new AtomicLong(0);
for (int i = 0; i < threads.length; i++) {
threads[i] = new Thread(() -> {
Document d = new Document();
d.add(new StringField("id", Integer.toString(0), Field.Store.YES));
try {
writer.addDocument(d);
} catch (IOException e) {
throw new AssertionError(e);
}
numThreadsCompleted.incrementAndGet();
});
threads[i].start();
}
try {
for (int i = 0; i < 10; i++) {
synchronized (stallControl) {
stallControl.notifyAll();
}
while (stallControl.getNumWaiting() != threads.length) {
// wait for all threads to be stalled again
assertEquals(0, writer.getPendingNumDocs());
assertEquals(0, numThreadsCompleted.get());
}
}
} finally {
stallControl.updateStalled(false);
for (Thread t : threads) {
t.join();
}
}
writer.commit();
assertEquals(threads.length, writer.getDocStats().maxDoc);
}
}
}
} }