diff --git a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java index 625c87bb9cd..f591b3538f5 100644 --- a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java +++ b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java @@ -21,7 +21,6 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.List; -import java.util.Locale; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -169,11 +168,11 @@ final class DocumentsWriter implements Closeable, Accountable { final DocumentsWriterDeleteQueue deleteQueue = this.deleteQueue; if (flushControl.isFullFlush() == false - // never apply deletes during full flush this breaks happens before relationship + // never apply deletes during full flush this breaks happens before relationship. && deleteQueue.isOpen() // if it's closed then it's already fully applied and we have a new delete queue && flushControl.getAndResetApplyAllDeletes()) { - if (ticketQueue.addDeletes(deleteQueue)) { + if (ticketQueue.addTicket(() -> maybeFreezeGlobalBuffer(deleteQueue)) != null) { flushNotifications.onDeletesApplied(); // apply deletes event forces a purge return true; } @@ -241,15 +240,16 @@ final class DocumentsWriter implements Closeable, Accountable { if (infoStream.isEnabled("DW")) { infoStream.message("DW", "startFlushOneDWPT"); } - // first check if there is one pending - DocumentsWriterPerThread documentsWriterPerThread = flushControl.nextPendingFlush(); - if (documentsWriterPerThread == null) { - documentsWriterPerThread = flushControl.checkoutLargestNonPendingWriter(); + if (maybeFlush() == false) { + DocumentsWriterPerThread documentsWriterPerThread = + flushControl.checkoutLargestNonPendingWriter(); + if (documentsWriterPerThread != null) { + doFlush(documentsWriterPerThread); + return true; + } + return false; } - if (documentsWriterPerThread != null) { - return doFlush(documentsWriterPerThread); - } - return false; // we didn't flush anything here + return true; } /** @@ -388,11 +388,8 @@ final class DocumentsWriter implements Closeable, Accountable { || (flushControl.numQueuedFlushes() > 0 && config.checkPendingFlushOnUpdate)) { // Help out flushing any queued DWPTs so we can un-stall: // Try pick up pending threads here if possible - DocumentsWriterPerThread flushingDWPT; - while ((flushingDWPT = flushControl.nextPendingFlush()) != null) { - // Don't push the delete here since the update could fail! - hasEvents |= doFlush(flushingDWPT); - } + // no need to loop over the next pending flushes... doFlush will take care of this + hasEvents |= maybeFlush(); flushControl.waitIfStalled(); // block if stalled } return hasEvents; @@ -402,14 +399,11 @@ final class DocumentsWriter implements Closeable, Accountable { throws IOException { hasEvents |= applyAllDeletes(); if (flushingDWPT != null) { - hasEvents |= doFlush(flushingDWPT); + doFlush(flushingDWPT); + hasEvents = true; } else if (config.checkPendingFlushOnUpdate) { - final DocumentsWriterPerThread nextPendingFlush = flushControl.nextPendingFlush(); - if (nextPendingFlush != null) { - hasEvents |= doFlush(nextPendingFlush); - } + hasEvents |= maybeFlush(); } - return hasEvents; } @@ -451,11 +445,19 @@ final class DocumentsWriter implements Closeable, Accountable { return seqNo; } - private boolean doFlush(DocumentsWriterPerThread flushingDWPT) throws IOException { - boolean hasEvents = false; - while (flushingDWPT != null) { + private boolean maybeFlush() throws IOException { + final DocumentsWriterPerThread flushingDWPT = flushControl.nextPendingFlush(); + if (flushingDWPT != null) { + doFlush(flushingDWPT); + return true; + } + return false; + } + + private void doFlush(DocumentsWriterPerThread flushingDWPT) throws IOException { + assert flushingDWPT != null : "Flushing DWPT must not be null"; + do { assert flushingDWPT.hasFlushed() == false; - hasEvents = true; boolean success = false; DocumentsWriterFlushQueue.FlushTicket ticket = null; try { @@ -483,8 +485,11 @@ final class DocumentsWriter implements Closeable, Accountable { */ try { assert assertTicketQueueModification(flushingDWPT.deleteQueue); + final DocumentsWriterPerThread dwpt = flushingDWPT; // Each flush is assigned a ticket in the order they acquire the ticketQueue lock - ticket = ticketQueue.addFlushTicket(flushingDWPT); + ticket = + ticketQueue.addTicket( + () -> new DocumentsWriterFlushQueue.FlushTicket(dwpt.prepareFlush(), true)); final int flushingDocsInRam = flushingDWPT.getNumDocsInRAM(); boolean dwptSuccess = false; try { @@ -497,11 +502,9 @@ final class DocumentsWriter implements Closeable, Accountable { if (flushingDWPT.pendingFilesToDelete().isEmpty() == false) { Set files = flushingDWPT.pendingFilesToDelete(); flushNotifications.deleteUnusedFiles(files); - hasEvents = true; } if (dwptSuccess == false) { flushNotifications.flushFailed(flushingDWPT.getSegmentInfo()); - hasEvents = true; } } // flush was successful once we reached this point - new seg. has been assigned to the @@ -525,42 +528,12 @@ final class DocumentsWriter implements Closeable, Accountable { // other threads flushing segments. In this case // we forcefully stall the producers. flushNotifications.onTicketBacklog(); - break; } } finally { flushControl.doAfterFlush(flushingDWPT); } - - flushingDWPT = flushControl.nextPendingFlush(); - } - - if (hasEvents) { - flushNotifications.afterSegmentsFlushed(); - } - - // If deletes alone are consuming > 1/2 our RAM - // buffer, force them all to apply now. This is to - // prevent too-frequent flushing of a long tail of - // tiny segments: - final double ramBufferSizeMB = config.getRAMBufferSizeMB(); - if (ramBufferSizeMB != IndexWriterConfig.DISABLE_AUTO_FLUSH - && flushControl.getDeleteBytesUsed() > (1024 * 1024 * ramBufferSizeMB / 2)) { - hasEvents = true; - if (applyAllDeletes() == false) { - if (infoStream.isEnabled("DW")) { - infoStream.message( - "DW", - String.format( - Locale.ROOT, - "force apply deletes after flush bytesUsed=%.1f MB vs ramBuffer=%.1f MB", - flushControl.getDeleteBytesUsed() / (1024. * 1024.), - ramBufferSizeMB)); - } - flushNotifications.onDeletesApplied(); - } - } - - return hasEvents; + } while ((flushingDWPT = flushControl.nextPendingFlush()) != null); + flushNotifications.afterSegmentsFlushed(); } synchronized long getNextSequenceNumber() { @@ -665,11 +638,7 @@ final class DocumentsWriter implements Closeable, Accountable { boolean anythingFlushed = false; try { - DocumentsWriterPerThread flushingDWPT; - // Help out with flushing: - while ((flushingDWPT = flushControl.nextPendingFlush()) != null) { - anythingFlushed |= doFlush(flushingDWPT); - } + anythingFlushed |= maybeFlush(); // If a concurrent flush is still in flight wait for it flushControl.waitForFlush(); if (anythingFlushed == false @@ -679,9 +648,9 @@ final class DocumentsWriter implements Closeable, Accountable { "DW", Thread.currentThread().getName() + ": flush naked frozen global deletes"); } assert assertTicketQueueModification(flushingDeleteQueue); - ticketQueue.addDeletes(flushingDeleteQueue); + ticketQueue.addTicket(() -> maybeFreezeGlobalBuffer(flushingDeleteQueue)); } - // we can't assert that we don't have any tickets in teh queue since we might add a + // we can't assert that we don't have any tickets in the queue since we might add a // DocumentsWriterDeleteQueue // concurrently if we have very small ram buffers this happens quite frequently assert !flushingDeleteQueue.anyChanges(); @@ -698,6 +667,16 @@ final class DocumentsWriter implements Closeable, Accountable { } } + private DocumentsWriterFlushQueue.FlushTicket maybeFreezeGlobalBuffer( + DocumentsWriterDeleteQueue deleteQueue) { + FrozenBufferedUpdates frozenBufferedUpdates = deleteQueue.maybeFreezeGlobalBuffer(); + if (frozenBufferedUpdates != null) { + // no need to publish anything if we don't have any frozen updates + return new DocumentsWriterFlushQueue.FlushTicket(frozenBufferedUpdates, false); + } + return null; + } + void finishFullFlush(boolean success) throws IOException { try { if (infoStream.isEnabled("DW")) { diff --git a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushQueue.java b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushQueue.java index 235dbb1fbd4..d71c1771c2c 100644 --- a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushQueue.java +++ b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushQueue.java @@ -21,6 +21,7 @@ import java.util.ArrayDeque; import java.util.Queue; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Supplier; import org.apache.lucene.index.DocumentsWriterPerThread.FlushedSegment; import org.apache.lucene.util.IOConsumer; @@ -34,23 +35,23 @@ final class DocumentsWriterFlushQueue { private final AtomicInteger ticketCount = new AtomicInteger(); private final ReentrantLock purgeLock = new ReentrantLock(); - synchronized boolean addDeletes(DocumentsWriterDeleteQueue deleteQueue) throws IOException { + synchronized FlushTicket addTicket(Supplier ticketSupplier) throws IOException { // first inc the ticket count - freeze opens a window for #anyChanges to fail incTickets(); boolean success = false; try { - FrozenBufferedUpdates frozenBufferedUpdates = deleteQueue.maybeFreezeGlobalBuffer(); - if (frozenBufferedUpdates != null) { + FlushTicket ticket = ticketSupplier.get(); + if (ticket != null) { // no need to publish anything if we don't have any frozen updates - queue.add(new FlushTicket(frozenBufferedUpdates, false)); + queue.add(ticket); success = true; } + return ticket; } finally { if (!success) { decTickets(); } } - return success; } private void incTickets() { @@ -63,24 +64,6 @@ final class DocumentsWriterFlushQueue { assert numTickets >= 0; } - synchronized FlushTicket addFlushTicket(DocumentsWriterPerThread dwpt) throws IOException { - // Each flush is assigned a ticket in the order they acquire the ticketQueue - // lock - incTickets(); - boolean success = false; - try { - // prepare flush freezes the global deletes - do in synced block! - final FlushTicket ticket = new FlushTicket(dwpt.prepareFlush(), true); - queue.add(ticket); - success = true; - return ticket; - } finally { - if (!success) { - decTickets(); - } - } - } - synchronized void addSegment(FlushTicket ticket, FlushedSegment segment) { assert ticket.hasSegment; // the actual flush is done asynchronously and once done the FlushedSegment diff --git a/lucene/core/src/test/org/apache/lucene/index/TestIndexWriter.java b/lucene/core/src/test/org/apache/lucene/index/TestIndexWriter.java index d101927f7e0..e226322696b 100644 --- a/lucene/core/src/test/org/apache/lucene/index/TestIndexWriter.java +++ b/lucene/core/src/test/org/apache/lucene/index/TestIndexWriter.java @@ -3146,6 +3146,92 @@ public class TestIndexWriter extends LuceneTestCase { dir.close(); } + public void testApplyDeletesWithoutFlushes() throws IOException { + try (Directory dir = newDirectory()) { + IndexWriterConfig indexWriterConfig = new IndexWriterConfig(); + AtomicBoolean flushDeletes = new AtomicBoolean(); + indexWriterConfig.setFlushPolicy( + new FlushPolicy() { + @Override + public void onChange( + DocumentsWriterFlushControl control, DocumentsWriterPerThread perThread) { + if (flushDeletes.get()) { + control.setApplyAllDeletes(); + } + } + }); + try (IndexWriter w = new IndexWriter(dir, indexWriterConfig)) { + assertEquals(0, w.docWriter.flushControl.getDeleteBytesUsed()); + w.deleteDocuments(new Term("foo", "bar")); + long bytesUsed = w.docWriter.flushControl.getDeleteBytesUsed(); + assertTrue(bytesUsed + " > 0", bytesUsed > 0); + w.deleteDocuments(new Term("foo", "baz")); + bytesUsed = w.docWriter.flushControl.getDeleteBytesUsed(); + assertTrue(bytesUsed + " > 0", bytesUsed > 0); + assertEquals(2, w.getBufferedDeleteTermsSize()); + assertEquals(0, w.getFlushDeletesCount()); + flushDeletes.set(true); + w.deleteDocuments(new Term("foo", "bar")); + assertEquals(0, w.docWriter.flushControl.getDeleteBytesUsed()); + assertEquals(1, w.getFlushDeletesCount()); + } + } + } + + public void testDeletesAppliedOnFlush() throws IOException { + try (Directory dir = newDirectory()) { + try (IndexWriter w = new IndexWriter(dir, new IndexWriterConfig())) { + Document doc = new Document(); + doc.add(newField("id", "1", storedTextType)); + w.addDocument(doc); + w.updateDocument(new Term("id", "1"), doc); + long deleteBytesUsed = w.docWriter.flushControl.getDeleteBytesUsed(); + assertTrue("deletedBytesUsed: " + deleteBytesUsed, deleteBytesUsed > 0); + assertEquals(0, w.getFlushDeletesCount()); + assertTrue(w.flushNextBuffer()); + assertEquals(1, w.getFlushDeletesCount()); + assertEquals(0, w.docWriter.flushControl.getDeleteBytesUsed()); + w.deleteAll(); + w.commit(); + assertEquals(2, w.getFlushDeletesCount()); + if (random().nextBoolean()) { + w.deleteDocuments(new Term("id", "1")); + } else { + w.updateDocValues(new Term("id", "1"), new NumericDocValuesField("foo", 1l)); + } + deleteBytesUsed = w.docWriter.flushControl.getDeleteBytesUsed(); + assertTrue("deletedBytesUsed: " + deleteBytesUsed, deleteBytesUsed > 0); + doc = new Document(); + doc.add(newField("id", "5", storedTextType)); + w.addDocument(doc); + assertTrue(w.flushNextBuffer()); + assertEquals(0, w.docWriter.flushControl.getDeleteBytesUsed()); + assertEquals(3, w.getFlushDeletesCount()); + } + try (RandomIndexWriter w = new RandomIndexWriter(random(), dir, new IndexWriterConfig())) { + int numDocs = random().nextInt(1, 100); + for (int i = 0; i < numDocs; i++) { + Document doc = new Document(); + doc.add(newField("id", "" + i, storedTextType)); + w.addDocument(doc); + } + for (int i = 0; i < numDocs; i++) { + if (random().nextBoolean()) { + Document doc = new Document(); + doc.add(newField("id", "" + i, storedTextType)); + w.updateDocument(new Term("id", "" + i), doc); + } + } + + long deleteBytesUsed = w.w.docWriter.flushControl.getDeleteBytesUsed(); + if (deleteBytesUsed > 0) { + assertTrue(w.w.flushNextBuffer()); + assertEquals(0, w.w.docWriter.flushControl.getDeleteBytesUsed()); + } + } + } + } + public void testHoldLockOnLargestWriter() throws IOException, InterruptedException { Directory dir = newDirectory(); IndexWriter w = new IndexWriter(dir, new IndexWriterConfig());