diff --git a/lucene/CHANGES.txt b/lucene/CHANGES.txt index 6b4aa9cd3a6..5943185711f 100644 --- a/lucene/CHANGES.txt +++ b/lucene/CHANGES.txt @@ -85,6 +85,13 @@ New Features * LUCENE-7736: IndexReaderFunctions expose various IndexReader statistics as DoubleValuesSources. (Alan Woodward) +* LUCENE-8068: Allow IndexWriter to write a single DWPT to disk Adds a + flushNextBuffer method to IndexWriter that allows the caller to + synchronously move the next pending or the biggest non-pending index buffer to + disk. This enables flushing selected buffer to disk without highjacking an + indexing thread. This is for instance useful if more than one IW (shards) must + be maintained in a single JVM / system. (Simon Willnauer) + Bug Fixes * LUCENE-8057: Exact circle bounds computation was incorrect. diff --git a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java index 0e2a06708b4..6aca9f4b697 100644 --- a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java +++ b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java @@ -246,6 +246,21 @@ final class DocumentsWriter implements Closeable, Accountable { } } + final boolean flushOneDWPT() throws IOException, AbortingException { + if (infoStream.isEnabled("DW")) { + infoStream.message("DW", "startFlushOneDWPT"); + } + // first check if there is one pending + DocumentsWriterPerThread documentsWriterPerThread = flushControl.nextPendingFlush(); + if (documentsWriterPerThread == null) { + documentsWriterPerThread = flushControl.checkoutLargestNonPendingWriter(); + } + if (documentsWriterPerThread != null) { + return doFlush(documentsWriterPerThread); + } + return false; // we didn't flush anything here + } + /** Returns how many documents were aborted. */ synchronized long lockAndAbortAll(IndexWriter indexWriter) throws IOException { assert indexWriter.holdsFullFlushLock(); diff --git a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java index 047fb9cc3a6..761db0ed1ee 100644 --- a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java +++ b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java @@ -182,23 +182,29 @@ final class DocumentsWriterFlushControl implements Accountable { setFlushPending(perThread); } } - final DocumentsWriterPerThread flushingDWPT; - if (fullFlush) { - if (perThread.flushPending) { - checkoutAndBlock(perThread); - flushingDWPT = nextPendingFlush(); - } else { - flushingDWPT = null; - } - } else { - flushingDWPT = tryCheckoutForFlush(perThread); - } - return flushingDWPT; + return checkout(perThread, false); } finally { boolean stalled = updateStallState(); assert assertNumDocsSinceStalled(stalled) && assertMemory(); } } + + private DocumentsWriterPerThread checkout(ThreadState perThread, boolean markPending) { + if (fullFlush) { + if (perThread.flushPending) { + checkoutAndBlock(perThread); + return nextPendingFlush(); + } else { + return null; + } + } else { + if (markPending) { + assert perThread.isFlushPending() == false; + setFlushPending(perThread); + } + return tryCheckoutForFlush(perThread); + } + } private boolean assertNumDocsSinceStalled(boolean stalled) { /* @@ -454,10 +460,6 @@ final class DocumentsWriterFlushControl implements Accountable { flushDeletes.set(true); } - int numActiveDWPT() { - return this.perThreadPool.getActiveThreadStateCount(); - } - ThreadState obtainAndLock() { final ThreadState perThread = perThreadPool.getAndLock(Thread .currentThread(), documentsWriter); @@ -713,4 +715,58 @@ final class DocumentsWriterFlushControl implements Accountable { public InfoStream getInfoStream() { return infoStream; } + + ThreadState findLargestNonPendingWriter() { + ThreadState maxRamUsingThreadState = null; + long maxRamSoFar = 0; + Iterator activePerThreadsIterator = allActiveThreadStates(); + int count = 0; + while (activePerThreadsIterator.hasNext()) { + ThreadState next = activePerThreadsIterator.next(); + if (!next.flushPending) { + final long nextRam = next.bytesUsed; + if (nextRam > 0 && next.dwpt.getNumDocsInRAM() > 0) { + if (infoStream.isEnabled("FP")) { + infoStream.message("FP", "thread state has " + nextRam + " bytes; docInRAM=" + next.dwpt.getNumDocsInRAM()); + } + count++; + if (nextRam > maxRamSoFar) { + maxRamSoFar = nextRam; + maxRamUsingThreadState = next; + } + } + } + } + if (infoStream.isEnabled("FP")) { + infoStream.message("FP", count + " in-use non-flushing threads states"); + } + return maxRamUsingThreadState; + } + + /** + * Returns the largest non-pending flushable DWPT or null if there is none. + */ + final DocumentsWriterPerThread checkoutLargestNonPendingWriter() { + ThreadState largestNonPendingWriter = findLargestNonPendingWriter(); + if (largestNonPendingWriter != null) { + // we only lock this very briefly to swap it's DWPT out - we don't go through the DWPTPool and it's free queue + largestNonPendingWriter.lock(); + try { + synchronized (this) { + try { + if (largestNonPendingWriter.isInitialized() == false) { + return nextPendingFlush(); + } else { + return checkout(largestNonPendingWriter, largestNonPendingWriter.isFlushPending() == false); + } + } finally { + updateStallState(); + } + } + } finally { + largestNonPendingWriter.unlock(); + } + } + return null; + } } diff --git a/lucene/core/src/java/org/apache/lucene/index/FlushPolicy.java b/lucene/core/src/java/org/apache/lucene/index/FlushPolicy.java index cad07b4c712..8fa9e632c5f 100644 --- a/lucene/core/src/java/org/apache/lucene/index/FlushPolicy.java +++ b/lucene/core/src/java/org/apache/lucene/index/FlushPolicy.java @@ -104,31 +104,8 @@ abstract class FlushPolicy { protected ThreadState findLargestNonPendingWriter( DocumentsWriterFlushControl control, ThreadState perThreadState) { assert perThreadState.dwpt.getNumDocsInRAM() > 0; - long maxRamSoFar = perThreadState.bytesUsed; // the dwpt which needs to be flushed eventually - ThreadState maxRamUsingThreadState = perThreadState; - assert !perThreadState.flushPending : "DWPT should have flushed"; - Iterator activePerThreadsIterator = control.allActiveThreadStates(); - int count = 0; - while (activePerThreadsIterator.hasNext()) { - ThreadState next = activePerThreadsIterator.next(); - if (!next.flushPending) { - final long nextRam = next.bytesUsed; - if (nextRam > 0 && next.dwpt.getNumDocsInRAM() > 0) { - if (infoStream.isEnabled("FP")) { - infoStream.message("FP", "thread state has " + nextRam + " bytes; docInRAM=" + next.dwpt.getNumDocsInRAM()); - } - count++; - if (nextRam > maxRamSoFar) { - maxRamSoFar = nextRam; - maxRamUsingThreadState = next; - } - } - } - } - if (infoStream.isEnabled("FP")) { - infoStream.message("FP", count + " in-use non-flushing threads states"); - } + ThreadState maxRamUsingThreadState = control.findLargestNonPendingWriter(); assert assertMessage("set largest ram consuming thread pending on lower watermark"); return maxRamUsingThreadState; } diff --git a/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java b/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java index 7f47e42d45d..6059218f99d 100644 --- a/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java +++ b/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java @@ -3163,6 +3163,31 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable { return pendingSeqNo; } + /** + *

Expert: Flushes the next pending writer per thread buffer if available or the largest active + * non-pending writer per thread buffer in the calling thread. + * This can be used to flush documents to disk outside of an indexing thread. In contrast to {@link #flush()} + * this won't mark all currently active indexing buffers as flush-pending. + * + * Note: this method is best-effort and might not flush any segments to disk. If there is a full flush happening + * concurrently multiple segments might have been flushed. + * Users of this API can access the IndexWriters current memory consumption via {@link #ramBytesUsed()} + *

+ * @return true iff this method flushed at least on segment to disk. + * @lucene.experimental + */ + public final boolean flushNextBuffer() throws IOException { + try { + if (docWriter.flushOneDWPT()) { + processEvents(true, false); + return true; // we wrote a segment + } + } catch (AbortingException | VirtualMachineError tragedy) { + tragicEvent(tragedy, "flushNextBuffer"); + } + return false; + } + private long prepareCommitInternal() throws IOException { startCommitTime = System.nanoTime(); synchronized(commitLock) { diff --git a/lucene/core/src/test/org/apache/lucene/index/TestIndexWriter.java b/lucene/core/src/test/org/apache/lucene/index/TestIndexWriter.java index 9538e03d5a0..04460cd0f86 100644 --- a/lucene/core/src/test/org/apache/lucene/index/TestIndexWriter.java +++ b/lucene/core/src/test/org/apache/lucene/index/TestIndexWriter.java @@ -2748,4 +2748,133 @@ public class TestIndexWriter extends LuceneTestCase { dir.close(); } + public void testFlushLargestWriter() throws IOException, InterruptedException { + Directory dir = newDirectory(); + IndexWriter w = new IndexWriter(dir, new IndexWriterConfig()); + int numDocs = indexDocsForMultipleThreadStates(w); + DocumentsWriterPerThreadPool.ThreadState largestNonPendingWriter + = w.docWriter.flushControl.findLargestNonPendingWriter(); + assertFalse(largestNonPendingWriter.flushPending); + assertNotNull(largestNonPendingWriter.dwpt); + + int numRamDocs = w.numRamDocs(); + int numDocsInDWPT = largestNonPendingWriter.dwpt.getNumDocsInRAM(); + assertTrue(w.flushNextBuffer()); + assertNull(largestNonPendingWriter.dwpt); + assertEquals(numRamDocs-numDocsInDWPT, w.numRamDocs()); + + // make sure it's not locked + largestNonPendingWriter.lock(); + largestNonPendingWriter.unlock(); + if (random().nextBoolean()) { + w.commit(); + } + DirectoryReader reader = DirectoryReader.open(w, true, true); + assertEquals(numDocs, reader.numDocs()); + reader.close(); + w.close(); + dir.close(); + } + + private int indexDocsForMultipleThreadStates(IndexWriter w) throws InterruptedException { + Thread[] threads = new Thread[3]; + CountDownLatch latch = new CountDownLatch(threads.length); + int numDocsPerThread = 10 + random().nextInt(30); + // ensure we have more than on thread state + for (int i = 0; i < threads.length; i++) { + threads[i] = new Thread(() -> { + latch.countDown(); + try { + latch.await(); + for (int j = 0; j < numDocsPerThread; j++) { + Document doc = new Document(); + doc.add(new StringField("id", "foo", Field.Store.YES)); + w.addDocument(doc); + } + } catch (Exception e) { + throw new AssertionError(e); + } + }); + threads[i].start(); + } + for (Thread t : threads) { + t.join(); + } + return numDocsPerThread * threads.length; + } + + public void testNeverCheckOutOnFullFlush() throws IOException, InterruptedException { + Directory dir = newDirectory(); + IndexWriter w = new IndexWriter(dir, new IndexWriterConfig()); + indexDocsForMultipleThreadStates(w); + DocumentsWriterPerThreadPool.ThreadState largestNonPendingWriter + = w.docWriter.flushControl.findLargestNonPendingWriter(); + assertFalse(largestNonPendingWriter.flushPending); + assertNotNull(largestNonPendingWriter.dwpt); + int activeThreadStateCount = w.docWriter.perThreadPool.getActiveThreadStateCount(); + w.docWriter.flushControl.markForFullFlush(); + DocumentsWriterPerThread documentsWriterPerThread = w.docWriter.flushControl.checkoutLargestNonPendingWriter(); + assertNull(documentsWriterPerThread); + assertEquals(activeThreadStateCount, w.docWriter.flushControl.numQueuedFlushes()); + w.docWriter.flushControl.abortFullFlushes(); + assertNull("was aborted", w.docWriter.flushControl.checkoutLargestNonPendingWriter()); + assertEquals(0, w.docWriter.flushControl.numQueuedFlushes()); + w.close(); + dir.close(); + } + + public void testHoldLockOnLargestWriter() throws IOException, InterruptedException { + Directory dir = newDirectory(); + IndexWriter w = new IndexWriter(dir, new IndexWriterConfig()); + int numDocs = indexDocsForMultipleThreadStates(w); + DocumentsWriterPerThreadPool.ThreadState largestNonPendingWriter + = w.docWriter.flushControl.findLargestNonPendingWriter(); + assertFalse(largestNonPendingWriter.flushPending); + assertNotNull(largestNonPendingWriter.dwpt); + + CountDownLatch wait = new CountDownLatch(1); + CountDownLatch locked = new CountDownLatch(1); + Thread lockThread = new Thread(() -> { + try { + largestNonPendingWriter.lock(); + locked.countDown(); + wait.await(); + } catch (InterruptedException e) { + throw new AssertionError(e); + } finally { + largestNonPendingWriter.unlock(); + } + }); + lockThread.start(); + Thread flushThread = new Thread(() -> { + try { + locked.await(); + assertTrue(w.flushNextBuffer()); + } catch (Exception e) { + throw new AssertionError(e); + } + }); + flushThread.start(); + + locked.await(); + // access a synced method to ensure we never lock while we hold the flush control monitor + w.docWriter.flushControl.activeBytes(); + wait.countDown(); + lockThread.join(); + flushThread.join(); + + assertNull("largest DWPT should be flushed", largestNonPendingWriter.dwpt); + // make sure it's not locked + largestNonPendingWriter.lock(); + largestNonPendingWriter.unlock(); + if (random().nextBoolean()) { + w.commit(); + } + DirectoryReader reader = DirectoryReader.open(w, true, true); + assertEquals(numDocs, reader.numDocs()); + reader.close(); + w.close(); + dir.close(); + } + } diff --git a/lucene/test-framework/src/java/org/apache/lucene/index/RandomIndexWriter.java b/lucene/test-framework/src/java/org/apache/lucene/index/RandomIndexWriter.java index d46c2480cd0..aa4da54c6c6 100644 --- a/lucene/test-framework/src/java/org/apache/lucene/index/RandomIndexWriter.java +++ b/lucene/test-framework/src/java/org/apache/lucene/index/RandomIndexWriter.java @@ -180,6 +180,17 @@ public class RandomIndexWriter implements Closeable { LuceneTestCase.maybeChangeLiveIndexWriterConfig(r, w.getConfig()); if (docCount++ == flushAt) { if (r.nextBoolean()) { + if (LuceneTestCase.VERBOSE) { + System.out.println("RIW.add/updateDocument: now flushing the largest writer at docCount=" + docCount); + } + int activeThreadStateCount = w.docWriter.perThreadPool.getActiveThreadStateCount(); + int numFlushes = Math.min(1, r.nextInt(activeThreadStateCount+1)); + for (int i = 0; i < numFlushes; i++) { + if (w.flushNextBuffer() == false) { + break; // stop once we didn't flush anything + } + } + } else if (r.nextBoolean()) { if (LuceneTestCase.VERBOSE) { System.out.println("RIW.add/updateDocument: now doing a flush at docCount=" + docCount); }