From 268dd54a8695403c33fd1a22ecee0b17b120480d Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Thu, 12 Oct 2023 16:49:39 +0200 Subject: [PATCH] Cleanup flushing logic in DocumentsWriter (#12647) DocumentsWriter had some duplicate logic for iterating over segments to be flushed. This change simplifies some of the loops and moves common code in on place. This also adds tests to ensure we actually freeze and apply deletes on segment flush. Relates to #12572 --- .../apache/lucene/index/DocumentsWriter.java | 115 +++++++----------- .../index/DocumentsWriterFlushQueue.java | 29 +---- .../apache/lucene/index/TestIndexWriter.java | 86 +++++++++++++ 3 files changed, 139 insertions(+), 91 deletions(-) diff --git a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java index 625c87bb9cd..f591b3538f5 100644 --- a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java +++ b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java @@ -21,7 +21,6 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.List; -import java.util.Locale; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -169,11 +168,11 @@ final class DocumentsWriter implements Closeable, Accountable { final DocumentsWriterDeleteQueue deleteQueue = this.deleteQueue; if (flushControl.isFullFlush() == false - // never apply deletes during full flush this breaks happens before relationship + // never apply deletes during full flush this breaks happens before relationship. && deleteQueue.isOpen() // if it's closed then it's already fully applied and we have a new delete queue && flushControl.getAndResetApplyAllDeletes()) { - if (ticketQueue.addDeletes(deleteQueue)) { + if (ticketQueue.addTicket(() -> maybeFreezeGlobalBuffer(deleteQueue)) != null) { flushNotifications.onDeletesApplied(); // apply deletes event forces a purge return true; } @@ -241,15 +240,16 @@ final class DocumentsWriter implements Closeable, Accountable { if (infoStream.isEnabled("DW")) { infoStream.message("DW", "startFlushOneDWPT"); } - // first check if there is one pending - DocumentsWriterPerThread documentsWriterPerThread = flushControl.nextPendingFlush(); - if (documentsWriterPerThread == null) { - documentsWriterPerThread = flushControl.checkoutLargestNonPendingWriter(); + if (maybeFlush() == false) { + DocumentsWriterPerThread documentsWriterPerThread = + flushControl.checkoutLargestNonPendingWriter(); + if (documentsWriterPerThread != null) { + doFlush(documentsWriterPerThread); + return true; + } + return false; } - if (documentsWriterPerThread != null) { - return doFlush(documentsWriterPerThread); - } - return false; // we didn't flush anything here + return true; } /** @@ -388,11 +388,8 @@ final class DocumentsWriter implements Closeable, Accountable { || (flushControl.numQueuedFlushes() > 0 && config.checkPendingFlushOnUpdate)) { // Help out flushing any queued DWPTs so we can un-stall: // Try pick up pending threads here if possible - DocumentsWriterPerThread flushingDWPT; - while ((flushingDWPT = flushControl.nextPendingFlush()) != null) { - // Don't push the delete here since the update could fail! - hasEvents |= doFlush(flushingDWPT); - } + // no need to loop over the next pending flushes... doFlush will take care of this + hasEvents |= maybeFlush(); flushControl.waitIfStalled(); // block if stalled } return hasEvents; @@ -402,14 +399,11 @@ final class DocumentsWriter implements Closeable, Accountable { throws IOException { hasEvents |= applyAllDeletes(); if (flushingDWPT != null) { - hasEvents |= doFlush(flushingDWPT); + doFlush(flushingDWPT); + hasEvents = true; } else if (config.checkPendingFlushOnUpdate) { - final DocumentsWriterPerThread nextPendingFlush = flushControl.nextPendingFlush(); - if (nextPendingFlush != null) { - hasEvents |= doFlush(nextPendingFlush); - } + hasEvents |= maybeFlush(); } - return hasEvents; } @@ -451,11 +445,19 @@ final class DocumentsWriter implements Closeable, Accountable { return seqNo; } - private boolean doFlush(DocumentsWriterPerThread flushingDWPT) throws IOException { - boolean hasEvents = false; - while (flushingDWPT != null) { + private boolean maybeFlush() throws IOException { + final DocumentsWriterPerThread flushingDWPT = flushControl.nextPendingFlush(); + if (flushingDWPT != null) { + doFlush(flushingDWPT); + return true; + } + return false; + } + + private void doFlush(DocumentsWriterPerThread flushingDWPT) throws IOException { + assert flushingDWPT != null : "Flushing DWPT must not be null"; + do { assert flushingDWPT.hasFlushed() == false; - hasEvents = true; boolean success = false; DocumentsWriterFlushQueue.FlushTicket ticket = null; try { @@ -483,8 +485,11 @@ final class DocumentsWriter implements Closeable, Accountable { */ try { assert assertTicketQueueModification(flushingDWPT.deleteQueue); + final DocumentsWriterPerThread dwpt = flushingDWPT; // Each flush is assigned a ticket in the order they acquire the ticketQueue lock - ticket = ticketQueue.addFlushTicket(flushingDWPT); + ticket = + ticketQueue.addTicket( + () -> new DocumentsWriterFlushQueue.FlushTicket(dwpt.prepareFlush(), true)); final int flushingDocsInRam = flushingDWPT.getNumDocsInRAM(); boolean dwptSuccess = false; try { @@ -497,11 +502,9 @@ final class DocumentsWriter implements Closeable, Accountable { if (flushingDWPT.pendingFilesToDelete().isEmpty() == false) { Set files = flushingDWPT.pendingFilesToDelete(); flushNotifications.deleteUnusedFiles(files); - hasEvents = true; } if (dwptSuccess == false) { flushNotifications.flushFailed(flushingDWPT.getSegmentInfo()); - hasEvents = true; } } // flush was successful once we reached this point - new seg. has been assigned to the @@ -525,42 +528,12 @@ final class DocumentsWriter implements Closeable, Accountable { // other threads flushing segments. In this case // we forcefully stall the producers. flushNotifications.onTicketBacklog(); - break; } } finally { flushControl.doAfterFlush(flushingDWPT); } - - flushingDWPT = flushControl.nextPendingFlush(); - } - - if (hasEvents) { - flushNotifications.afterSegmentsFlushed(); - } - - // If deletes alone are consuming > 1/2 our RAM - // buffer, force them all to apply now. This is to - // prevent too-frequent flushing of a long tail of - // tiny segments: - final double ramBufferSizeMB = config.getRAMBufferSizeMB(); - if (ramBufferSizeMB != IndexWriterConfig.DISABLE_AUTO_FLUSH - && flushControl.getDeleteBytesUsed() > (1024 * 1024 * ramBufferSizeMB / 2)) { - hasEvents = true; - if (applyAllDeletes() == false) { - if (infoStream.isEnabled("DW")) { - infoStream.message( - "DW", - String.format( - Locale.ROOT, - "force apply deletes after flush bytesUsed=%.1f MB vs ramBuffer=%.1f MB", - flushControl.getDeleteBytesUsed() / (1024. * 1024.), - ramBufferSizeMB)); - } - flushNotifications.onDeletesApplied(); - } - } - - return hasEvents; + } while ((flushingDWPT = flushControl.nextPendingFlush()) != null); + flushNotifications.afterSegmentsFlushed(); } synchronized long getNextSequenceNumber() { @@ -665,11 +638,7 @@ final class DocumentsWriter implements Closeable, Accountable { boolean anythingFlushed = false; try { - DocumentsWriterPerThread flushingDWPT; - // Help out with flushing: - while ((flushingDWPT = flushControl.nextPendingFlush()) != null) { - anythingFlushed |= doFlush(flushingDWPT); - } + anythingFlushed |= maybeFlush(); // If a concurrent flush is still in flight wait for it flushControl.waitForFlush(); if (anythingFlushed == false @@ -679,9 +648,9 @@ final class DocumentsWriter implements Closeable, Accountable { "DW", Thread.currentThread().getName() + ": flush naked frozen global deletes"); } assert assertTicketQueueModification(flushingDeleteQueue); - ticketQueue.addDeletes(flushingDeleteQueue); + ticketQueue.addTicket(() -> maybeFreezeGlobalBuffer(flushingDeleteQueue)); } - // we can't assert that we don't have any tickets in teh queue since we might add a + // we can't assert that we don't have any tickets in the queue since we might add a // DocumentsWriterDeleteQueue // concurrently if we have very small ram buffers this happens quite frequently assert !flushingDeleteQueue.anyChanges(); @@ -698,6 +667,16 @@ final class DocumentsWriter implements Closeable, Accountable { } } + private DocumentsWriterFlushQueue.FlushTicket maybeFreezeGlobalBuffer( + DocumentsWriterDeleteQueue deleteQueue) { + FrozenBufferedUpdates frozenBufferedUpdates = deleteQueue.maybeFreezeGlobalBuffer(); + if (frozenBufferedUpdates != null) { + // no need to publish anything if we don't have any frozen updates + return new DocumentsWriterFlushQueue.FlushTicket(frozenBufferedUpdates, false); + } + return null; + } + void finishFullFlush(boolean success) throws IOException { try { if (infoStream.isEnabled("DW")) { diff --git a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushQueue.java b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushQueue.java index 235dbb1fbd4..d71c1771c2c 100644 --- a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushQueue.java +++ b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushQueue.java @@ -21,6 +21,7 @@ import java.util.ArrayDeque; import java.util.Queue; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Supplier; import org.apache.lucene.index.DocumentsWriterPerThread.FlushedSegment; import org.apache.lucene.util.IOConsumer; @@ -34,23 +35,23 @@ final class DocumentsWriterFlushQueue { private final AtomicInteger ticketCount = new AtomicInteger(); private final ReentrantLock purgeLock = new ReentrantLock(); - synchronized boolean addDeletes(DocumentsWriterDeleteQueue deleteQueue) throws IOException { + synchronized FlushTicket addTicket(Supplier ticketSupplier) throws IOException { // first inc the ticket count - freeze opens a window for #anyChanges to fail incTickets(); boolean success = false; try { - FrozenBufferedUpdates frozenBufferedUpdates = deleteQueue.maybeFreezeGlobalBuffer(); - if (frozenBufferedUpdates != null) { + FlushTicket ticket = ticketSupplier.get(); + if (ticket != null) { // no need to publish anything if we don't have any frozen updates - queue.add(new FlushTicket(frozenBufferedUpdates, false)); + queue.add(ticket); success = true; } + return ticket; } finally { if (!success) { decTickets(); } } - return success; } private void incTickets() { @@ -63,24 +64,6 @@ final class DocumentsWriterFlushQueue { assert numTickets >= 0; } - synchronized FlushTicket addFlushTicket(DocumentsWriterPerThread dwpt) throws IOException { - // Each flush is assigned a ticket in the order they acquire the ticketQueue - // lock - incTickets(); - boolean success = false; - try { - // prepare flush freezes the global deletes - do in synced block! - final FlushTicket ticket = new FlushTicket(dwpt.prepareFlush(), true); - queue.add(ticket); - success = true; - return ticket; - } finally { - if (!success) { - decTickets(); - } - } - } - synchronized void addSegment(FlushTicket ticket, FlushedSegment segment) { assert ticket.hasSegment; // the actual flush is done asynchronously and once done the FlushedSegment diff --git a/lucene/core/src/test/org/apache/lucene/index/TestIndexWriter.java b/lucene/core/src/test/org/apache/lucene/index/TestIndexWriter.java index d101927f7e0..e226322696b 100644 --- a/lucene/core/src/test/org/apache/lucene/index/TestIndexWriter.java +++ b/lucene/core/src/test/org/apache/lucene/index/TestIndexWriter.java @@ -3146,6 +3146,92 @@ public class TestIndexWriter extends LuceneTestCase { dir.close(); } + public void testApplyDeletesWithoutFlushes() throws IOException { + try (Directory dir = newDirectory()) { + IndexWriterConfig indexWriterConfig = new IndexWriterConfig(); + AtomicBoolean flushDeletes = new AtomicBoolean(); + indexWriterConfig.setFlushPolicy( + new FlushPolicy() { + @Override + public void onChange( + DocumentsWriterFlushControl control, DocumentsWriterPerThread perThread) { + if (flushDeletes.get()) { + control.setApplyAllDeletes(); + } + } + }); + try (IndexWriter w = new IndexWriter(dir, indexWriterConfig)) { + assertEquals(0, w.docWriter.flushControl.getDeleteBytesUsed()); + w.deleteDocuments(new Term("foo", "bar")); + long bytesUsed = w.docWriter.flushControl.getDeleteBytesUsed(); + assertTrue(bytesUsed + " > 0", bytesUsed > 0); + w.deleteDocuments(new Term("foo", "baz")); + bytesUsed = w.docWriter.flushControl.getDeleteBytesUsed(); + assertTrue(bytesUsed + " > 0", bytesUsed > 0); + assertEquals(2, w.getBufferedDeleteTermsSize()); + assertEquals(0, w.getFlushDeletesCount()); + flushDeletes.set(true); + w.deleteDocuments(new Term("foo", "bar")); + assertEquals(0, w.docWriter.flushControl.getDeleteBytesUsed()); + assertEquals(1, w.getFlushDeletesCount()); + } + } + } + + public void testDeletesAppliedOnFlush() throws IOException { + try (Directory dir = newDirectory()) { + try (IndexWriter w = new IndexWriter(dir, new IndexWriterConfig())) { + Document doc = new Document(); + doc.add(newField("id", "1", storedTextType)); + w.addDocument(doc); + w.updateDocument(new Term("id", "1"), doc); + long deleteBytesUsed = w.docWriter.flushControl.getDeleteBytesUsed(); + assertTrue("deletedBytesUsed: " + deleteBytesUsed, deleteBytesUsed > 0); + assertEquals(0, w.getFlushDeletesCount()); + assertTrue(w.flushNextBuffer()); + assertEquals(1, w.getFlushDeletesCount()); + assertEquals(0, w.docWriter.flushControl.getDeleteBytesUsed()); + w.deleteAll(); + w.commit(); + assertEquals(2, w.getFlushDeletesCount()); + if (random().nextBoolean()) { + w.deleteDocuments(new Term("id", "1")); + } else { + w.updateDocValues(new Term("id", "1"), new NumericDocValuesField("foo", 1l)); + } + deleteBytesUsed = w.docWriter.flushControl.getDeleteBytesUsed(); + assertTrue("deletedBytesUsed: " + deleteBytesUsed, deleteBytesUsed > 0); + doc = new Document(); + doc.add(newField("id", "5", storedTextType)); + w.addDocument(doc); + assertTrue(w.flushNextBuffer()); + assertEquals(0, w.docWriter.flushControl.getDeleteBytesUsed()); + assertEquals(3, w.getFlushDeletesCount()); + } + try (RandomIndexWriter w = new RandomIndexWriter(random(), dir, new IndexWriterConfig())) { + int numDocs = random().nextInt(1, 100); + for (int i = 0; i < numDocs; i++) { + Document doc = new Document(); + doc.add(newField("id", "" + i, storedTextType)); + w.addDocument(doc); + } + for (int i = 0; i < numDocs; i++) { + if (random().nextBoolean()) { + Document doc = new Document(); + doc.add(newField("id", "" + i, storedTextType)); + w.updateDocument(new Term("id", "" + i), doc); + } + } + + long deleteBytesUsed = w.w.docWriter.flushControl.getDeleteBytesUsed(); + if (deleteBytesUsed > 0) { + assertTrue(w.w.flushNextBuffer()); + assertEquals(0, w.w.docWriter.flushControl.getDeleteBytesUsed()); + } + } + } + } + public void testHoldLockOnLargestWriter() throws IOException, InterruptedException { Directory dir = newDirectory(); IndexWriter w = new IndexWriter(dir, new IndexWriterConfig());