LUCENE-8813: Ensure we never apply deletes from a closed DWPTDeleteQueue

Today we don't have a strong protection that we add and apply deletes / updates
on or from an already flushed delete queue. DWPTDeleteQueue instances are replaced
once we do a full flush in order to reopen an NRT reader or commit the IndexWriter.

In LUCENE-8813 we tripped an assert that used to protect us from such an situation
but it didn't take all cornercases from concurrent flushing into account. This change
adds a stronger protection and ensures that we neither apply a closed delete queue nor
add any updates or deletes to it.

This change also allows to speculativly freeze the global buffer that might return
null now if the queue has already been closed. This is now possible since we ensure that
we never see modifications to the queue after it's been closed and that happens right after
the last DWPT for the ongoing full flush is done flushing.
This commit is contained in:
Simon Willnauer 2019-05-28 14:35:08 +02:00
parent db334c792b
commit 165d2d5ff5
4 changed files with 138 additions and 42 deletions

View File

@ -168,27 +168,28 @@ final class DocumentsWriter implements Closeable, Accountable {
} }
private synchronized long applyDeleteOrUpdate(ToLongFunction<DocumentsWriterDeleteQueue> function) throws IOException { private synchronized long applyDeleteOrUpdate(ToLongFunction<DocumentsWriterDeleteQueue> function) throws IOException {
// TODO why is this synchronized? // This method is synchronized to make sure we don't replace the deleteQueue while applying this update / delete
// otherwise we might loose an update / delete if this happens concurrently to a full flush.
final DocumentsWriterDeleteQueue deleteQueue = this.deleteQueue; final DocumentsWriterDeleteQueue deleteQueue = this.deleteQueue;
long seqNo = function.applyAsLong(deleteQueue); long seqNo = function.applyAsLong(deleteQueue);
flushControl.doOnDelete(); flushControl.doOnDelete();
lastSeqNo = Math.max(lastSeqNo, seqNo); lastSeqNo = Math.max(lastSeqNo, seqNo);
if (applyAllDeletes(deleteQueue)) { if (applyAllDeletes()) {
seqNo = -seqNo; seqNo = -seqNo;
} }
return seqNo; return seqNo;
} }
/** If buffered deletes are using too much heap, resolve them and write disk and return true. */ /** If buffered deletes are using too much heap, resolve them and write disk and return true. */
private boolean applyAllDeletes(DocumentsWriterDeleteQueue deleteQueue) throws IOException { private boolean applyAllDeletes() throws IOException {
final DocumentsWriterDeleteQueue deleteQueue = this.deleteQueue;
if (flushControl.isFullFlush() == false // never apply deletes during full flush this breaks happens before relationship if (flushControl.isFullFlush() == false // never apply deletes during full flush this breaks happens before relationship
&& deleteQueue.isOpen() // if it's closed then it's already fully applied and we have a new delete queue
&& flushControl.getAndResetApplyAllDeletes()) { && flushControl.getAndResetApplyAllDeletes()) {
if (deleteQueue != null) { if (ticketQueue.addDeletes(deleteQueue)) {
assert assertTicketQueueModification(deleteQueue); flushNotifications.onDeletesApplied(); // apply deletes event forces a purge
ticketQueue.addDeletes(deleteQueue); return true;
} }
flushNotifications.onDeletesApplied(); // apply deletes event forces a purge
return true;
} }
return false; return false;
} }
@ -408,7 +409,7 @@ final class DocumentsWriter implements Closeable, Accountable {
} }
private boolean postUpdate(DocumentsWriterPerThread flushingDWPT, boolean hasEvents) throws IOException { private boolean postUpdate(DocumentsWriterPerThread flushingDWPT, boolean hasEvents) throws IOException {
hasEvents |= applyAllDeletes(deleteQueue); hasEvents |= applyAllDeletes();
if (flushingDWPT != null) { if (flushingDWPT != null) {
hasEvents |= doFlush(flushingDWPT); hasEvents |= doFlush(flushingDWPT);
} else if (config.checkPendingFlushOnUpdate) { } else if (config.checkPendingFlushOnUpdate) {
@ -607,7 +608,7 @@ final class DocumentsWriter implements Closeable, Accountable {
if (ramBufferSizeMB != IndexWriterConfig.DISABLE_AUTO_FLUSH && if (ramBufferSizeMB != IndexWriterConfig.DISABLE_AUTO_FLUSH &&
flushControl.getDeleteBytesUsed() > (1024*1024*ramBufferSizeMB/2)) { flushControl.getDeleteBytesUsed() > (1024*1024*ramBufferSizeMB/2)) {
hasEvents = true; hasEvents = true;
if (applyAllDeletes(deleteQueue) == false) { if (applyAllDeletes() == false) {
if (infoStream.isEnabled("DW")) { if (infoStream.isEnabled("DW")) {
infoStream.message("DW", String.format(Locale.ROOT, "force apply deletes after flush bytesUsed=%.1f MB vs ramBuffer=%.1f MB", infoStream.message("DW", String.format(Locale.ROOT, "force apply deletes after flush bytesUsed=%.1f MB vs ramBuffer=%.1f MB",
flushControl.getDeleteBytesUsed()/(1024.*1024.), flushControl.getDeleteBytesUsed()/(1024.*1024.),
@ -673,6 +674,8 @@ final class DocumentsWriter implements Closeable, Accountable {
// for asserts // for asserts
private synchronized boolean setFlushingDeleteQueue(DocumentsWriterDeleteQueue session) { private synchronized boolean setFlushingDeleteQueue(DocumentsWriterDeleteQueue session) {
assert currentFullFlushDelQueue == null
|| currentFullFlushDelQueue.isOpen() == false : "Can not replace a full flush queue if the queue is not closed";
currentFullFlushDelQueue = session; currentFullFlushDelQueue = session;
return true; return true;
} }
@ -724,7 +727,7 @@ final class DocumentsWriter implements Closeable, Accountable {
if (infoStream.isEnabled("DW")) { if (infoStream.isEnabled("DW")) {
infoStream.message("DW", Thread.currentThread().getName() + ": flush naked frozen global deletes"); infoStream.message("DW", Thread.currentThread().getName() + ": flush naked frozen global deletes");
} }
assertTicketQueueModification(flushingDeleteQueue); assert assertTicketQueueModification(flushingDeleteQueue);
ticketQueue.addDeletes(flushingDeleteQueue); ticketQueue.addDeletes(flushingDeleteQueue);
} }
// we can't assert that we don't have any tickets in teh queue since we might add a DocumentsWriterDeleteQueue // we can't assert that we don't have any tickets in teh queue since we might add a DocumentsWriterDeleteQueue
@ -732,6 +735,7 @@ final class DocumentsWriter implements Closeable, Accountable {
assert !flushingDeleteQueue.anyChanges(); assert !flushingDeleteQueue.anyChanges();
} finally { } finally {
assert flushingDeleteQueue == currentFullFlushDelQueue; assert flushingDeleteQueue == currentFullFlushDelQueue;
flushingDeleteQueue.close(); // all DWPT have been processed and this queue has been fully flushed to the ticket-queue
} }
if (anythingFlushed) { if (anythingFlushed) {
return -seqNo; return -seqNo;
@ -754,7 +758,7 @@ final class DocumentsWriter implements Closeable, Accountable {
} }
} finally { } finally {
pendingChangesInCurrentFullFlush = false; pendingChangesInCurrentFullFlush = false;
applyAllDeletes(deleteQueue); // make sure we do execute this since we block applying deletes during full flush applyAllDeletes(); // make sure we do execute this since we block applying deletes during full flush
} }
} }

View File

@ -16,7 +16,7 @@
*/ */
package org.apache.lucene.index; package org.apache.lucene.index;
import java.io.IOException; import java.io.Closeable;
import java.util.Arrays; import java.util.Arrays;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
@ -24,6 +24,7 @@ import java.util.concurrent.locks.ReentrantLock;
import org.apache.lucene.index.DocValuesUpdate.BinaryDocValuesUpdate; import org.apache.lucene.index.DocValuesUpdate.BinaryDocValuesUpdate;
import org.apache.lucene.index.DocValuesUpdate.NumericDocValuesUpdate; import org.apache.lucene.index.DocValuesUpdate.NumericDocValuesUpdate;
import org.apache.lucene.search.Query; import org.apache.lucene.search.Query;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.util.Accountable; import org.apache.lucene.util.Accountable;
import org.apache.lucene.util.InfoStream; import org.apache.lucene.util.InfoStream;
@ -68,11 +69,13 @@ import org.apache.lucene.util.InfoStream;
* will also not be added to its private deletes neither to the global deletes. * will also not be added to its private deletes neither to the global deletes.
* *
*/ */
final class DocumentsWriterDeleteQueue implements Accountable { final class DocumentsWriterDeleteQueue implements Accountable, Closeable {
// the current end (latest delete operation) in the delete queue: // the current end (latest delete operation) in the delete queue:
private volatile Node<?> tail; private volatile Node<?> tail;
private volatile boolean closed = false;
/** Used to record deletes against all prior (already written to disk) segments. Whenever any segment flushes, we bundle up this set of /** Used to record deletes against all prior (already written to disk) segments. Whenever any segment flushes, we bundle up this set of
* deletes and insert into the buffered updates stream before the newly flushed segment(s). */ * deletes and insert into the buffered updates stream before the newly flushed segment(s). */
private final DeleteSlice globalSlice; private final DeleteSlice globalSlice;
@ -163,6 +166,7 @@ final class DocumentsWriterDeleteQueue implements Accountable {
} }
synchronized long add(Node<?> newNode) { synchronized long add(Node<?> newNode) {
ensureOpen();
tail.next = newNode; tail.next = newNode;
this.tail = newNode; this.tail = newNode;
return getNextSequenceNumber(); return getNextSequenceNumber();
@ -183,6 +187,7 @@ final class DocumentsWriterDeleteQueue implements Accountable {
} }
void tryApplyGlobalSlice() { void tryApplyGlobalSlice() {
ensureOpen();
if (globalBufferLock.tryLock()) { if (globalBufferLock.tryLock()) {
/* /*
* The global buffer must be locked but we don't need to update them if * The global buffer must be locked but we don't need to update them if
@ -200,31 +205,45 @@ final class DocumentsWriterDeleteQueue implements Accountable {
} }
} }
FrozenBufferedUpdates freezeGlobalBuffer(DeleteSlice callerSlice) throws IOException {
globalBufferLock.lock();
/*
* Here we freeze the global buffer so we need to lock it, apply all
* deletes in the queue and reset the global slice to let the GC prune the
* queue.
*/
final Node<?> currentTail = tail; // take the current tail make this local any
// Changes after this call are applied later
// and not relevant here
if (callerSlice != null) {
// Update the callers slices so we are on the same page
callerSlice.sliceTail = currentTail;
}
try {
if (globalSlice.sliceTail != currentTail) {
globalSlice.sliceTail = currentTail;
globalSlice.apply(globalBufferedUpdates, BufferedUpdates.MAX_INT);
}
if (globalBufferedUpdates.any()) { FrozenBufferedUpdates freezeGlobalBuffer(DeleteSlice callerSlice) {
final FrozenBufferedUpdates packet = new FrozenBufferedUpdates(infoStream, globalBufferedUpdates, null); globalBufferLock.lock();
globalBufferedUpdates.clear(); try {
return packet; ensureOpen();
/*
* Here we freeze the global buffer so we need to lock it, apply all
* deletes in the queue and reset the global slice to let the GC prune the
* queue.
*/
final Node<?> currentTail = tail; // take the current tail make this local any
// Changes after this call are applied later
// and not relevant here
if (callerSlice != null) {
// Update the callers slices so we are on the same page
callerSlice.sliceTail = currentTail;
}
return freezeGlobalBufferInternal(currentTail);
} finally {
globalBufferLock.unlock();
}
}
/**
* This may freeze the global buffer unless the delete queue has already been closed.
* If the queue has been closed this method will return <code>null</code>
*/
FrozenBufferedUpdates maybeFreezeGlobalBuffer() {
globalBufferLock.lock();
try {
if (closed == false) {
/*
* Here we freeze the global buffer so we need to lock it, apply all
* deletes in the queue and reset the global slice to let the GC prune the
* queue.
*/
return freezeGlobalBufferInternal(tail); // take the current tail make this local any
} else { } else {
assert anyChanges() == false : "we are closed but have changes";
return null; return null;
} }
} finally { } finally {
@ -232,12 +251,28 @@ final class DocumentsWriterDeleteQueue implements Accountable {
} }
} }
private FrozenBufferedUpdates freezeGlobalBufferInternal(final Node<?> currentTail ) {
if (globalSlice.sliceTail != currentTail) {
globalSlice.sliceTail = currentTail;
globalSlice.apply(globalBufferedUpdates, BufferedUpdates.MAX_INT);
}
if (globalBufferedUpdates.any()) {
final FrozenBufferedUpdates packet = new FrozenBufferedUpdates(infoStream, globalBufferedUpdates, null);
globalBufferedUpdates.clear();
return packet;
} else {
return null;
}
}
DeleteSlice newSlice() { DeleteSlice newSlice() {
return new DeleteSlice(tail); return new DeleteSlice(tail);
} }
/** Negative result means there were new deletes since we last applied */ /** Negative result means there were new deletes since we last applied */
synchronized long updateSlice(DeleteSlice slice) { synchronized long updateSlice(DeleteSlice slice) {
ensureOpen();
long seqNo = getNextSequenceNumber(); long seqNo = getNextSequenceNumber();
if (slice.sliceTail != tail) { if (slice.sliceTail != tail) {
// new deletes arrived since we last checked // new deletes arrived since we last checked
@ -257,6 +292,29 @@ final class DocumentsWriterDeleteQueue implements Accountable {
return false; return false;
} }
private void ensureOpen() {
if (closed) {
throw new AlreadyClosedException("This " + DocumentsWriterDeleteQueue.class.getSimpleName() + " is already closed");
}
}
public boolean isOpen() {
return closed == false;
}
@Override
public synchronized void close() {
globalBufferLock.lock();
try {
if (anyChanges()) {
throw new IllegalStateException("Can't close queue unless all changes are applied");
}
this.closed = true;
} finally {
globalBufferLock.unlock();
}
}
static class DeleteSlice { static class DeleteSlice {
// No need to be volatile, slices are thread captive (only accessed by one thread)! // No need to be volatile, slices are thread captive (only accessed by one thread)!
Node<?> sliceHead; // we don't apply this one Node<?> sliceHead; // we don't apply this one

View File

@ -35,18 +35,22 @@ final class DocumentsWriterFlushQueue {
private final AtomicInteger ticketCount = new AtomicInteger(); private final AtomicInteger ticketCount = new AtomicInteger();
private final ReentrantLock purgeLock = new ReentrantLock(); private final ReentrantLock purgeLock = new ReentrantLock();
synchronized void addDeletes(DocumentsWriterDeleteQueue deleteQueue) throws IOException { synchronized boolean addDeletes(DocumentsWriterDeleteQueue deleteQueue) throws IOException {
incTickets();// first inc the ticket count - freeze opens incTickets();// first inc the ticket count - freeze opens
// a window for #anyChanges to fail // a window for #anyChanges to fail
boolean success = false; boolean success = false;
try { try {
queue.add(new FlushTicket(deleteQueue.freezeGlobalBuffer(null), false)); FrozenBufferedUpdates frozenBufferedUpdates = deleteQueue.maybeFreezeGlobalBuffer();
success = true; if (frozenBufferedUpdates != null) { // no need to publish anything if we don't have any frozen updates
queue.add(new FlushTicket(frozenBufferedUpdates, false));
success = true;
}
} finally { } finally {
if (!success) { if (!success) {
decTickets(); decTickets();
} }
} }
return success;
} }
private void incTickets() { private void incTickets() {

View File

@ -24,7 +24,9 @@ import java.util.concurrent.locks.ReentrantLock;
import org.apache.lucene.index.DocumentsWriterDeleteQueue.DeleteSlice; import org.apache.lucene.index.DocumentsWriterDeleteQueue.DeleteSlice;
import org.apache.lucene.index.PrefixCodedTerms.TermIterator; import org.apache.lucene.index.PrefixCodedTerms.TermIterator;
import org.apache.lucene.search.MatchNoDocsQuery;
import org.apache.lucene.search.TermQuery; import org.apache.lucene.search.TermQuery;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.util.BytesRefBuilder; import org.apache.lucene.util.BytesRefBuilder;
import org.apache.lucene.util.LuceneTestCase; import org.apache.lucene.util.LuceneTestCase;
import org.apache.lucene.util.ThreadInterruptedException; import org.apache.lucene.util.ThreadInterruptedException;
@ -114,7 +116,7 @@ public class TestDocumentsWriterDeleteQueue extends LuceneTestCase {
} }
public void testAnyChanges() throws Exception { public void testAnyChanges() {
DocumentsWriterDeleteQueue queue = new DocumentsWriterDeleteQueue(null); DocumentsWriterDeleteQueue queue = new DocumentsWriterDeleteQueue(null);
final int size = 200 + random().nextInt(500) * RANDOM_MULTIPLIER; final int size = 200 + random().nextInt(500) * RANDOM_MULTIPLIER;
int termsSinceFreeze = 0; int termsSinceFreeze = 0;
@ -205,7 +207,35 @@ public class TestDocumentsWriterDeleteQueue extends LuceneTestCase {
.numGlobalTermDeletes()); .numGlobalTermDeletes());
assertEquals(uniqueValues.size(), frozenSet.size()); assertEquals(uniqueValues.size(), frozenSet.size());
assertEquals(uniqueValues, frozenSet); assertEquals(uniqueValues, frozenSet);
}
public void testClose() {
{
DocumentsWriterDeleteQueue queue = new DocumentsWriterDeleteQueue(null);
assertTrue(queue.isOpen());
queue.close();
if (random().nextBoolean()) {
queue.close(); // double close
}
expectThrows(AlreadyClosedException.class, () -> queue.addDelete(new Term("foo", "bar")));
expectThrows(AlreadyClosedException.class, () -> queue.freezeGlobalBuffer(null));
expectThrows(AlreadyClosedException.class, () -> queue.addDelete(new MatchNoDocsQuery()));
expectThrows(AlreadyClosedException.class,
() -> queue.addDocValuesUpdates(new DocValuesUpdate.NumericDocValuesUpdate(new Term("foo", "bar"), "foo", 1)));
expectThrows(AlreadyClosedException.class, () -> queue.add(null));
assertNull(queue.maybeFreezeGlobalBuffer()); // this is fine
assertFalse(queue.isOpen());
}
{
DocumentsWriterDeleteQueue queue = new DocumentsWriterDeleteQueue(null);
queue.addDelete(new Term("foo", "bar"));
expectThrows(IllegalStateException.class, () -> queue.close());
assertTrue(queue.isOpen());
queue.tryApplyGlobalSlice();
queue.freezeGlobalBuffer(null);
queue.close();
assertFalse(queue.isOpen());
}
} }
private static class UpdateThread extends Thread { private static class UpdateThread extends Thread {