LUCENE-8068: Allow IndexWriter to write a single DWPT to disk

Adds a `flushNextBuffer` method to IndexWriter that allows the caller to
synchronously move the next pending or the biggest non-pending index buffer to
disk. This enables flushing selected buffer to disk without highjacking an
indexing thread. This is for instance useful if more than one IW (shards) must
be maintained in a single JVM / system.
This commit is contained in:
Simon Willnauer 2017-11-27 22:39:11 +01:00
parent ebdaa44182
commit 01d12777c4
7 changed files with 260 additions and 40 deletions

View File

@ -85,6 +85,13 @@ New Features
* LUCENE-7736: IndexReaderFunctions expose various IndexReader statistics as
DoubleValuesSources. (Alan Woodward)
* LUCENE-8068: Allow IndexWriter to write a single DWPT to disk Adds a
flushNextBuffer method to IndexWriter that allows the caller to
synchronously move the next pending or the biggest non-pending index buffer to
disk. This enables flushing selected buffer to disk without highjacking an
indexing thread. This is for instance useful if more than one IW (shards) must
be maintained in a single JVM / system. (Simon Willnauer)
Bug Fixes
* LUCENE-8057: Exact circle bounds computation was incorrect.

View File

@ -246,6 +246,21 @@ final class DocumentsWriter implements Closeable, Accountable {
}
}
final boolean flushOneDWPT() throws IOException, AbortingException {
if (infoStream.isEnabled("DW")) {
infoStream.message("DW", "startFlushOneDWPT");
}
// first check if there is one pending
DocumentsWriterPerThread documentsWriterPerThread = flushControl.nextPendingFlush();
if (documentsWriterPerThread == null) {
documentsWriterPerThread = flushControl.checkoutLargestNonPendingWriter();
}
if (documentsWriterPerThread != null) {
return doFlush(documentsWriterPerThread);
}
return false; // we didn't flush anything here
}
/** Returns how many documents were aborted. */
synchronized long lockAndAbortAll(IndexWriter indexWriter) throws IOException {
assert indexWriter.holdsFullFlushLock();

View File

@ -182,24 +182,30 @@ final class DocumentsWriterFlushControl implements Accountable {
setFlushPending(perThread);
}
}
final DocumentsWriterPerThread flushingDWPT;
if (fullFlush) {
if (perThread.flushPending) {
checkoutAndBlock(perThread);
flushingDWPT = nextPendingFlush();
} else {
flushingDWPT = null;
}
} else {
flushingDWPT = tryCheckoutForFlush(perThread);
}
return flushingDWPT;
return checkout(perThread, false);
} finally {
boolean stalled = updateStallState();
assert assertNumDocsSinceStalled(stalled) && assertMemory();
}
}
private DocumentsWriterPerThread checkout(ThreadState perThread, boolean markPending) {
if (fullFlush) {
if (perThread.flushPending) {
checkoutAndBlock(perThread);
return nextPendingFlush();
} else {
return null;
}
} else {
if (markPending) {
assert perThread.isFlushPending() == false;
setFlushPending(perThread);
}
return tryCheckoutForFlush(perThread);
}
}
private boolean assertNumDocsSinceStalled(boolean stalled) {
/*
* updates the number of documents "finished" while we are in a stalled state.
@ -454,10 +460,6 @@ final class DocumentsWriterFlushControl implements Accountable {
flushDeletes.set(true);
}
int numActiveDWPT() {
return this.perThreadPool.getActiveThreadStateCount();
}
ThreadState obtainAndLock() {
final ThreadState perThread = perThreadPool.getAndLock(Thread
.currentThread(), documentsWriter);
@ -713,4 +715,58 @@ final class DocumentsWriterFlushControl implements Accountable {
public InfoStream getInfoStream() {
return infoStream;
}
ThreadState findLargestNonPendingWriter() {
ThreadState maxRamUsingThreadState = null;
long maxRamSoFar = 0;
Iterator<ThreadState> activePerThreadsIterator = allActiveThreadStates();
int count = 0;
while (activePerThreadsIterator.hasNext()) {
ThreadState next = activePerThreadsIterator.next();
if (!next.flushPending) {
final long nextRam = next.bytesUsed;
if (nextRam > 0 && next.dwpt.getNumDocsInRAM() > 0) {
if (infoStream.isEnabled("FP")) {
infoStream.message("FP", "thread state has " + nextRam + " bytes; docInRAM=" + next.dwpt.getNumDocsInRAM());
}
count++;
if (nextRam > maxRamSoFar) {
maxRamSoFar = nextRam;
maxRamUsingThreadState = next;
}
}
}
}
if (infoStream.isEnabled("FP")) {
infoStream.message("FP", count + " in-use non-flushing threads states");
}
return maxRamUsingThreadState;
}
/**
* Returns the largest non-pending flushable DWPT or <code>null</code> if there is none.
*/
final DocumentsWriterPerThread checkoutLargestNonPendingWriter() {
ThreadState largestNonPendingWriter = findLargestNonPendingWriter();
if (largestNonPendingWriter != null) {
// we only lock this very briefly to swap it's DWPT out - we don't go through the DWPTPool and it's free queue
largestNonPendingWriter.lock();
try {
synchronized (this) {
try {
if (largestNonPendingWriter.isInitialized() == false) {
return nextPendingFlush();
} else {
return checkout(largestNonPendingWriter, largestNonPendingWriter.isFlushPending() == false);
}
} finally {
updateStallState();
}
}
} finally {
largestNonPendingWriter.unlock();
}
}
return null;
}
}

View File

@ -104,31 +104,8 @@ abstract class FlushPolicy {
protected ThreadState findLargestNonPendingWriter(
DocumentsWriterFlushControl control, ThreadState perThreadState) {
assert perThreadState.dwpt.getNumDocsInRAM() > 0;
long maxRamSoFar = perThreadState.bytesUsed;
// the dwpt which needs to be flushed eventually
ThreadState maxRamUsingThreadState = perThreadState;
assert !perThreadState.flushPending : "DWPT should have flushed";
Iterator<ThreadState> activePerThreadsIterator = control.allActiveThreadStates();
int count = 0;
while (activePerThreadsIterator.hasNext()) {
ThreadState next = activePerThreadsIterator.next();
if (!next.flushPending) {
final long nextRam = next.bytesUsed;
if (nextRam > 0 && next.dwpt.getNumDocsInRAM() > 0) {
if (infoStream.isEnabled("FP")) {
infoStream.message("FP", "thread state has " + nextRam + " bytes; docInRAM=" + next.dwpt.getNumDocsInRAM());
}
count++;
if (nextRam > maxRamSoFar) {
maxRamSoFar = nextRam;
maxRamUsingThreadState = next;
}
}
}
}
if (infoStream.isEnabled("FP")) {
infoStream.message("FP", count + " in-use non-flushing threads states");
}
ThreadState maxRamUsingThreadState = control.findLargestNonPendingWriter();
assert assertMessage("set largest ram consuming thread pending on lower watermark");
return maxRamUsingThreadState;
}

View File

@ -3163,6 +3163,31 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
return pendingSeqNo;
}
/**
* <p>Expert: Flushes the next pending writer per thread buffer if available or the largest active
* non-pending writer per thread buffer in the calling thread.
* This can be used to flush documents to disk outside of an indexing thread. In contrast to {@link #flush()}
* this won't mark all currently active indexing buffers as flush-pending.
*
* Note: this method is best-effort and might not flush any segments to disk. If there is a full flush happening
* concurrently multiple segments might have been flushed.
* Users of this API can access the IndexWriters current memory consumption via {@link #ramBytesUsed()}
* </p>
* @return <code>true</code> iff this method flushed at least on segment to disk.
* @lucene.experimental
*/
public final boolean flushNextBuffer() throws IOException {
try {
if (docWriter.flushOneDWPT()) {
processEvents(true, false);
return true; // we wrote a segment
}
} catch (AbortingException | VirtualMachineError tragedy) {
tragicEvent(tragedy, "flushNextBuffer");
}
return false;
}
private long prepareCommitInternal() throws IOException {
startCommitTime = System.nanoTime();
synchronized(commitLock) {

View File

@ -2748,4 +2748,133 @@ public class TestIndexWriter extends LuceneTestCase {
dir.close();
}
public void testFlushLargestWriter() throws IOException, InterruptedException {
Directory dir = newDirectory();
IndexWriter w = new IndexWriter(dir, new IndexWriterConfig());
int numDocs = indexDocsForMultipleThreadStates(w);
DocumentsWriterPerThreadPool.ThreadState largestNonPendingWriter
= w.docWriter.flushControl.findLargestNonPendingWriter();
assertFalse(largestNonPendingWriter.flushPending);
assertNotNull(largestNonPendingWriter.dwpt);
int numRamDocs = w.numRamDocs();
int numDocsInDWPT = largestNonPendingWriter.dwpt.getNumDocsInRAM();
assertTrue(w.flushNextBuffer());
assertNull(largestNonPendingWriter.dwpt);
assertEquals(numRamDocs-numDocsInDWPT, w.numRamDocs());
// make sure it's not locked
largestNonPendingWriter.lock();
largestNonPendingWriter.unlock();
if (random().nextBoolean()) {
w.commit();
}
DirectoryReader reader = DirectoryReader.open(w, true, true);
assertEquals(numDocs, reader.numDocs());
reader.close();
w.close();
dir.close();
}
private int indexDocsForMultipleThreadStates(IndexWriter w) throws InterruptedException {
Thread[] threads = new Thread[3];
CountDownLatch latch = new CountDownLatch(threads.length);
int numDocsPerThread = 10 + random().nextInt(30);
// ensure we have more than on thread state
for (int i = 0; i < threads.length; i++) {
threads[i] = new Thread(() -> {
latch.countDown();
try {
latch.await();
for (int j = 0; j < numDocsPerThread; j++) {
Document doc = new Document();
doc.add(new StringField("id", "foo", Field.Store.YES));
w.addDocument(doc);
}
} catch (Exception e) {
throw new AssertionError(e);
}
});
threads[i].start();
}
for (Thread t : threads) {
t.join();
}
return numDocsPerThread * threads.length;
}
public void testNeverCheckOutOnFullFlush() throws IOException, InterruptedException {
Directory dir = newDirectory();
IndexWriter w = new IndexWriter(dir, new IndexWriterConfig());
indexDocsForMultipleThreadStates(w);
DocumentsWriterPerThreadPool.ThreadState largestNonPendingWriter
= w.docWriter.flushControl.findLargestNonPendingWriter();
assertFalse(largestNonPendingWriter.flushPending);
assertNotNull(largestNonPendingWriter.dwpt);
int activeThreadStateCount = w.docWriter.perThreadPool.getActiveThreadStateCount();
w.docWriter.flushControl.markForFullFlush();
DocumentsWriterPerThread documentsWriterPerThread = w.docWriter.flushControl.checkoutLargestNonPendingWriter();
assertNull(documentsWriterPerThread);
assertEquals(activeThreadStateCount, w.docWriter.flushControl.numQueuedFlushes());
w.docWriter.flushControl.abortFullFlushes();
assertNull("was aborted", w.docWriter.flushControl.checkoutLargestNonPendingWriter());
assertEquals(0, w.docWriter.flushControl.numQueuedFlushes());
w.close();
dir.close();
}
public void testHoldLockOnLargestWriter() throws IOException, InterruptedException {
Directory dir = newDirectory();
IndexWriter w = new IndexWriter(dir, new IndexWriterConfig());
int numDocs = indexDocsForMultipleThreadStates(w);
DocumentsWriterPerThreadPool.ThreadState largestNonPendingWriter
= w.docWriter.flushControl.findLargestNonPendingWriter();
assertFalse(largestNonPendingWriter.flushPending);
assertNotNull(largestNonPendingWriter.dwpt);
CountDownLatch wait = new CountDownLatch(1);
CountDownLatch locked = new CountDownLatch(1);
Thread lockThread = new Thread(() -> {
try {
largestNonPendingWriter.lock();
locked.countDown();
wait.await();
} catch (InterruptedException e) {
throw new AssertionError(e);
} finally {
largestNonPendingWriter.unlock();
}
});
lockThread.start();
Thread flushThread = new Thread(() -> {
try {
locked.await();
assertTrue(w.flushNextBuffer());
} catch (Exception e) {
throw new AssertionError(e);
}
});
flushThread.start();
locked.await();
// access a synced method to ensure we never lock while we hold the flush control monitor
w.docWriter.flushControl.activeBytes();
wait.countDown();
lockThread.join();
flushThread.join();
assertNull("largest DWPT should be flushed", largestNonPendingWriter.dwpt);
// make sure it's not locked
largestNonPendingWriter.lock();
largestNonPendingWriter.unlock();
if (random().nextBoolean()) {
w.commit();
}
DirectoryReader reader = DirectoryReader.open(w, true, true);
assertEquals(numDocs, reader.numDocs());
reader.close();
w.close();
dir.close();
}
}

View File

@ -180,6 +180,17 @@ public class RandomIndexWriter implements Closeable {
LuceneTestCase.maybeChangeLiveIndexWriterConfig(r, w.getConfig());
if (docCount++ == flushAt) {
if (r.nextBoolean()) {
if (LuceneTestCase.VERBOSE) {
System.out.println("RIW.add/updateDocument: now flushing the largest writer at docCount=" + docCount);
}
int activeThreadStateCount = w.docWriter.perThreadPool.getActiveThreadStateCount();
int numFlushes = Math.min(1, r.nextInt(activeThreadStateCount+1));
for (int i = 0; i < numFlushes; i++) {
if (w.flushNextBuffer() == false) {
break; // stop once we didn't flush anything
}
}
} else if (r.nextBoolean()) {
if (LuceneTestCase.VERBOSE) {
System.out.println("RIW.add/updateDocument: now doing a flush at docCount=" + docCount);
}