Cleanup flushing logic in DocumentsWriter (#12647)

DocumentsWriter had some duplicate logic for iterating over
segments to be flushed. This change simplifies some of the loops
and moves common code in on place. This also adds tests to ensure
we actually freeze and apply deletes on segment flush.

Relates to #12572
This commit is contained in:
Simon Willnauer 2023-10-12 16:49:39 +02:00 committed by GitHub
parent fad6653495
commit 268dd54a86
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 139 additions and 91 deletions

View File

@ -21,7 +21,6 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Locale;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@ -169,11 +168,11 @@ final class DocumentsWriter implements Closeable, Accountable {
final DocumentsWriterDeleteQueue deleteQueue = this.deleteQueue;
if (flushControl.isFullFlush() == false
// never apply deletes during full flush this breaks happens before relationship
// never apply deletes during full flush this breaks happens before relationship.
&& deleteQueue.isOpen()
// if it's closed then it's already fully applied and we have a new delete queue
&& flushControl.getAndResetApplyAllDeletes()) {
if (ticketQueue.addDeletes(deleteQueue)) {
if (ticketQueue.addTicket(() -> maybeFreezeGlobalBuffer(deleteQueue)) != null) {
flushNotifications.onDeletesApplied(); // apply deletes event forces a purge
return true;
}
@ -241,15 +240,16 @@ final class DocumentsWriter implements Closeable, Accountable {
if (infoStream.isEnabled("DW")) {
infoStream.message("DW", "startFlushOneDWPT");
}
// first check if there is one pending
DocumentsWriterPerThread documentsWriterPerThread = flushControl.nextPendingFlush();
if (documentsWriterPerThread == null) {
documentsWriterPerThread = flushControl.checkoutLargestNonPendingWriter();
if (maybeFlush() == false) {
DocumentsWriterPerThread documentsWriterPerThread =
flushControl.checkoutLargestNonPendingWriter();
if (documentsWriterPerThread != null) {
doFlush(documentsWriterPerThread);
return true;
}
return false;
}
if (documentsWriterPerThread != null) {
return doFlush(documentsWriterPerThread);
}
return false; // we didn't flush anything here
return true;
}
/**
@ -388,11 +388,8 @@ final class DocumentsWriter implements Closeable, Accountable {
|| (flushControl.numQueuedFlushes() > 0 && config.checkPendingFlushOnUpdate)) {
// Help out flushing any queued DWPTs so we can un-stall:
// Try pick up pending threads here if possible
DocumentsWriterPerThread flushingDWPT;
while ((flushingDWPT = flushControl.nextPendingFlush()) != null) {
// Don't push the delete here since the update could fail!
hasEvents |= doFlush(flushingDWPT);
}
// no need to loop over the next pending flushes... doFlush will take care of this
hasEvents |= maybeFlush();
flushControl.waitIfStalled(); // block if stalled
}
return hasEvents;
@ -402,14 +399,11 @@ final class DocumentsWriter implements Closeable, Accountable {
throws IOException {
hasEvents |= applyAllDeletes();
if (flushingDWPT != null) {
hasEvents |= doFlush(flushingDWPT);
doFlush(flushingDWPT);
hasEvents = true;
} else if (config.checkPendingFlushOnUpdate) {
final DocumentsWriterPerThread nextPendingFlush = flushControl.nextPendingFlush();
if (nextPendingFlush != null) {
hasEvents |= doFlush(nextPendingFlush);
}
hasEvents |= maybeFlush();
}
return hasEvents;
}
@ -451,11 +445,19 @@ final class DocumentsWriter implements Closeable, Accountable {
return seqNo;
}
private boolean doFlush(DocumentsWriterPerThread flushingDWPT) throws IOException {
boolean hasEvents = false;
while (flushingDWPT != null) {
private boolean maybeFlush() throws IOException {
final DocumentsWriterPerThread flushingDWPT = flushControl.nextPendingFlush();
if (flushingDWPT != null) {
doFlush(flushingDWPT);
return true;
}
return false;
}
private void doFlush(DocumentsWriterPerThread flushingDWPT) throws IOException {
assert flushingDWPT != null : "Flushing DWPT must not be null";
do {
assert flushingDWPT.hasFlushed() == false;
hasEvents = true;
boolean success = false;
DocumentsWriterFlushQueue.FlushTicket ticket = null;
try {
@ -483,8 +485,11 @@ final class DocumentsWriter implements Closeable, Accountable {
*/
try {
assert assertTicketQueueModification(flushingDWPT.deleteQueue);
final DocumentsWriterPerThread dwpt = flushingDWPT;
// Each flush is assigned a ticket in the order they acquire the ticketQueue lock
ticket = ticketQueue.addFlushTicket(flushingDWPT);
ticket =
ticketQueue.addTicket(
() -> new DocumentsWriterFlushQueue.FlushTicket(dwpt.prepareFlush(), true));
final int flushingDocsInRam = flushingDWPT.getNumDocsInRAM();
boolean dwptSuccess = false;
try {
@ -497,11 +502,9 @@ final class DocumentsWriter implements Closeable, Accountable {
if (flushingDWPT.pendingFilesToDelete().isEmpty() == false) {
Set<String> files = flushingDWPT.pendingFilesToDelete();
flushNotifications.deleteUnusedFiles(files);
hasEvents = true;
}
if (dwptSuccess == false) {
flushNotifications.flushFailed(flushingDWPT.getSegmentInfo());
hasEvents = true;
}
}
// flush was successful once we reached this point - new seg. has been assigned to the
@ -525,42 +528,12 @@ final class DocumentsWriter implements Closeable, Accountable {
// other threads flushing segments. In this case
// we forcefully stall the producers.
flushNotifications.onTicketBacklog();
break;
}
} finally {
flushControl.doAfterFlush(flushingDWPT);
}
flushingDWPT = flushControl.nextPendingFlush();
}
if (hasEvents) {
flushNotifications.afterSegmentsFlushed();
}
// If deletes alone are consuming > 1/2 our RAM
// buffer, force them all to apply now. This is to
// prevent too-frequent flushing of a long tail of
// tiny segments:
final double ramBufferSizeMB = config.getRAMBufferSizeMB();
if (ramBufferSizeMB != IndexWriterConfig.DISABLE_AUTO_FLUSH
&& flushControl.getDeleteBytesUsed() > (1024 * 1024 * ramBufferSizeMB / 2)) {
hasEvents = true;
if (applyAllDeletes() == false) {
if (infoStream.isEnabled("DW")) {
infoStream.message(
"DW",
String.format(
Locale.ROOT,
"force apply deletes after flush bytesUsed=%.1f MB vs ramBuffer=%.1f MB",
flushControl.getDeleteBytesUsed() / (1024. * 1024.),
ramBufferSizeMB));
}
flushNotifications.onDeletesApplied();
}
}
return hasEvents;
} while ((flushingDWPT = flushControl.nextPendingFlush()) != null);
flushNotifications.afterSegmentsFlushed();
}
synchronized long getNextSequenceNumber() {
@ -665,11 +638,7 @@ final class DocumentsWriter implements Closeable, Accountable {
boolean anythingFlushed = false;
try {
DocumentsWriterPerThread flushingDWPT;
// Help out with flushing:
while ((flushingDWPT = flushControl.nextPendingFlush()) != null) {
anythingFlushed |= doFlush(flushingDWPT);
}
anythingFlushed |= maybeFlush();
// If a concurrent flush is still in flight wait for it
flushControl.waitForFlush();
if (anythingFlushed == false
@ -679,9 +648,9 @@ final class DocumentsWriter implements Closeable, Accountable {
"DW", Thread.currentThread().getName() + ": flush naked frozen global deletes");
}
assert assertTicketQueueModification(flushingDeleteQueue);
ticketQueue.addDeletes(flushingDeleteQueue);
ticketQueue.addTicket(() -> maybeFreezeGlobalBuffer(flushingDeleteQueue));
}
// we can't assert that we don't have any tickets in teh queue since we might add a
// we can't assert that we don't have any tickets in the queue since we might add a
// DocumentsWriterDeleteQueue
// concurrently if we have very small ram buffers this happens quite frequently
assert !flushingDeleteQueue.anyChanges();
@ -698,6 +667,16 @@ final class DocumentsWriter implements Closeable, Accountable {
}
}
private DocumentsWriterFlushQueue.FlushTicket maybeFreezeGlobalBuffer(
DocumentsWriterDeleteQueue deleteQueue) {
FrozenBufferedUpdates frozenBufferedUpdates = deleteQueue.maybeFreezeGlobalBuffer();
if (frozenBufferedUpdates != null) {
// no need to publish anything if we don't have any frozen updates
return new DocumentsWriterFlushQueue.FlushTicket(frozenBufferedUpdates, false);
}
return null;
}
void finishFullFlush(boolean success) throws IOException {
try {
if (infoStream.isEnabled("DW")) {

View File

@ -21,6 +21,7 @@ import java.util.ArrayDeque;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Supplier;
import org.apache.lucene.index.DocumentsWriterPerThread.FlushedSegment;
import org.apache.lucene.util.IOConsumer;
@ -34,23 +35,23 @@ final class DocumentsWriterFlushQueue {
private final AtomicInteger ticketCount = new AtomicInteger();
private final ReentrantLock purgeLock = new ReentrantLock();
synchronized boolean addDeletes(DocumentsWriterDeleteQueue deleteQueue) throws IOException {
synchronized FlushTicket addTicket(Supplier<FlushTicket> ticketSupplier) throws IOException {
// first inc the ticket count - freeze opens a window for #anyChanges to fail
incTickets();
boolean success = false;
try {
FrozenBufferedUpdates frozenBufferedUpdates = deleteQueue.maybeFreezeGlobalBuffer();
if (frozenBufferedUpdates != null) {
FlushTicket ticket = ticketSupplier.get();
if (ticket != null) {
// no need to publish anything if we don't have any frozen updates
queue.add(new FlushTicket(frozenBufferedUpdates, false));
queue.add(ticket);
success = true;
}
return ticket;
} finally {
if (!success) {
decTickets();
}
}
return success;
}
private void incTickets() {
@ -63,24 +64,6 @@ final class DocumentsWriterFlushQueue {
assert numTickets >= 0;
}
synchronized FlushTicket addFlushTicket(DocumentsWriterPerThread dwpt) throws IOException {
// Each flush is assigned a ticket in the order they acquire the ticketQueue
// lock
incTickets();
boolean success = false;
try {
// prepare flush freezes the global deletes - do in synced block!
final FlushTicket ticket = new FlushTicket(dwpt.prepareFlush(), true);
queue.add(ticket);
success = true;
return ticket;
} finally {
if (!success) {
decTickets();
}
}
}
synchronized void addSegment(FlushTicket ticket, FlushedSegment segment) {
assert ticket.hasSegment;
// the actual flush is done asynchronously and once done the FlushedSegment

View File

@ -3146,6 +3146,92 @@ public class TestIndexWriter extends LuceneTestCase {
dir.close();
}
public void testApplyDeletesWithoutFlushes() throws IOException {
try (Directory dir = newDirectory()) {
IndexWriterConfig indexWriterConfig = new IndexWriterConfig();
AtomicBoolean flushDeletes = new AtomicBoolean();
indexWriterConfig.setFlushPolicy(
new FlushPolicy() {
@Override
public void onChange(
DocumentsWriterFlushControl control, DocumentsWriterPerThread perThread) {
if (flushDeletes.get()) {
control.setApplyAllDeletes();
}
}
});
try (IndexWriter w = new IndexWriter(dir, indexWriterConfig)) {
assertEquals(0, w.docWriter.flushControl.getDeleteBytesUsed());
w.deleteDocuments(new Term("foo", "bar"));
long bytesUsed = w.docWriter.flushControl.getDeleteBytesUsed();
assertTrue(bytesUsed + " > 0", bytesUsed > 0);
w.deleteDocuments(new Term("foo", "baz"));
bytesUsed = w.docWriter.flushControl.getDeleteBytesUsed();
assertTrue(bytesUsed + " > 0", bytesUsed > 0);
assertEquals(2, w.getBufferedDeleteTermsSize());
assertEquals(0, w.getFlushDeletesCount());
flushDeletes.set(true);
w.deleteDocuments(new Term("foo", "bar"));
assertEquals(0, w.docWriter.flushControl.getDeleteBytesUsed());
assertEquals(1, w.getFlushDeletesCount());
}
}
}
public void testDeletesAppliedOnFlush() throws IOException {
try (Directory dir = newDirectory()) {
try (IndexWriter w = new IndexWriter(dir, new IndexWriterConfig())) {
Document doc = new Document();
doc.add(newField("id", "1", storedTextType));
w.addDocument(doc);
w.updateDocument(new Term("id", "1"), doc);
long deleteBytesUsed = w.docWriter.flushControl.getDeleteBytesUsed();
assertTrue("deletedBytesUsed: " + deleteBytesUsed, deleteBytesUsed > 0);
assertEquals(0, w.getFlushDeletesCount());
assertTrue(w.flushNextBuffer());
assertEquals(1, w.getFlushDeletesCount());
assertEquals(0, w.docWriter.flushControl.getDeleteBytesUsed());
w.deleteAll();
w.commit();
assertEquals(2, w.getFlushDeletesCount());
if (random().nextBoolean()) {
w.deleteDocuments(new Term("id", "1"));
} else {
w.updateDocValues(new Term("id", "1"), new NumericDocValuesField("foo", 1l));
}
deleteBytesUsed = w.docWriter.flushControl.getDeleteBytesUsed();
assertTrue("deletedBytesUsed: " + deleteBytesUsed, deleteBytesUsed > 0);
doc = new Document();
doc.add(newField("id", "5", storedTextType));
w.addDocument(doc);
assertTrue(w.flushNextBuffer());
assertEquals(0, w.docWriter.flushControl.getDeleteBytesUsed());
assertEquals(3, w.getFlushDeletesCount());
}
try (RandomIndexWriter w = new RandomIndexWriter(random(), dir, new IndexWriterConfig())) {
int numDocs = random().nextInt(1, 100);
for (int i = 0; i < numDocs; i++) {
Document doc = new Document();
doc.add(newField("id", "" + i, storedTextType));
w.addDocument(doc);
}
for (int i = 0; i < numDocs; i++) {
if (random().nextBoolean()) {
Document doc = new Document();
doc.add(newField("id", "" + i, storedTextType));
w.updateDocument(new Term("id", "" + i), doc);
}
}
long deleteBytesUsed = w.w.docWriter.flushControl.getDeleteBytesUsed();
if (deleteBytesUsed > 0) {
assertTrue(w.w.flushNextBuffer());
assertEquals(0, w.w.docWriter.flushControl.getDeleteBytesUsed());
}
}
}
}
public void testHoldLockOnLargestWriter() throws IOException, InterruptedException {
Directory dir = newDirectory();
IndexWriter w = new IndexWriter(dir, new IndexWriterConfig());