LUCENE-9304: Fix IW#getMaxCompletedSequenceNumber() (#1427)

After recent refactoring on LUCENE-9304 `IW#getMaxCompletedSequenceNumber()` might
return values that belong to non-completed operations if a full flush is running, a new delete
queue is already in place but not all DWPTs that participate in the full flush have finished it's in
flight operation. This caused rare failures in
`TestControlledRealTimeReopenThread#testControlledRealTimeReopenThread` where
documents are not actually visible given the max completed seqNo. This change streamlines
the delete queue advance, adds a dedicated testcase and ensures that a delete queues
sequence Id space is never exhausted.
This commit is contained in:
Simon Willnauer 2020-04-14 19:39:23 +02:00 committed by GitHub
parent ceeb55c7f5
commit 18af6325ed
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 249 additions and 43 deletions

View File

@ -331,7 +331,7 @@ final class DocumentsWriter implements Closeable, Accountable {
/** returns the maximum sequence number for all previously completed operations */
long getMaxCompletedSequenceNumber() {
return deleteQueue.getLastSequenceNumber();
return deleteQueue.getMaxCompletedSeqNo();
}
@ -550,6 +550,20 @@ final class DocumentsWriter implements Closeable, Accountable {
return hasEvents;
}
synchronized long getNextSequenceNumber() {
// this must be synced otherwise the delete queue might change concurrently
return deleteQueue.getNextSequenceNumber();
}
synchronized void resetDeleteQueue(DocumentsWriterDeleteQueue newQueue) {
assert deleteQueue.isAdvanced();
assert newQueue.isAdvanced() == false;
assert deleteQueue.getLastSequenceNumber() <= newQueue.getLastSequenceNumber();
assert deleteQueue.getMaxSeqNo() <= newQueue.getLastSequenceNumber()
: "maxSeqNo: " + deleteQueue.getMaxSeqNo() + " vs. " + newQueue.getLastSequenceNumber();
deleteQueue = newQueue;
}
interface FlushNotifications { // TODO maybe we find a better name for this?
/**
@ -630,7 +644,6 @@ final class DocumentsWriter implements Closeable, Accountable {
}
long seqNo;
synchronized (this) {
pendingChangesInCurrentFullFlush = anyChanges();
flushingDeleteQueue = deleteQueue;

View File

@ -20,6 +20,7 @@ import java.io.Closeable;
import java.util.Arrays;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.LongSupplier;
import org.apache.lucene.index.DocValuesUpdate.BinaryDocValuesUpdate;
import org.apache.lucene.index.DocValuesUpdate.NumericDocValuesUpdate;
@ -91,23 +92,26 @@ final class DocumentsWriterDeleteQueue implements Accountable, Closeable {
private final InfoStream infoStream;
// for asserts
long maxSeqNo = Long.MAX_VALUE;
private volatile long maxSeqNo = Long.MAX_VALUE;
private final long startSeqNo;
private final LongSupplier previousMaxSeqId;
private boolean advanced;
DocumentsWriterDeleteQueue(InfoStream infoStream) {
// seqNo must start at 1 because some APIs negate this to also return a boolean
this(infoStream, 0, 1);
}
DocumentsWriterDeleteQueue(InfoStream infoStream, long generation, long startSeqNo) {
this(infoStream, new BufferedUpdates("global"), generation, startSeqNo);
this(infoStream, 0, 1, () -> 0);
}
DocumentsWriterDeleteQueue(InfoStream infoStream, BufferedUpdates globalBufferedUpdates, long generation, long startSeqNo) {
private DocumentsWriterDeleteQueue(InfoStream infoStream, long generation, long startSeqNo, LongSupplier previousMaxSeqId) {
this.infoStream = infoStream;
this.globalBufferedUpdates = globalBufferedUpdates;
this.globalBufferedUpdates = new BufferedUpdates("global");
this.generation = generation;
this.nextSeqNo = new AtomicLong(startSeqNo);
this.startSeqNo = startSeqNo;
this.previousMaxSeqId = previousMaxSeqId;
long value = previousMaxSeqId.getAsLong();
assert value <= startSeqNo : "illegal max sequence ID: " + value + " start was: " + startSeqNo;
/*
* we use a sentinel instance as our initial tail. No slice will ever try to
* apply this tail since the head is always omitted.
@ -311,6 +315,9 @@ final class DocumentsWriterDeleteQueue implements Accountable, Closeable {
throw new IllegalStateException("Can't close queue unless all changes are applied");
}
this.closed = true;
long seqNo = nextSeqNo.get();
assert seqNo <= maxSeqNo : "maxSeqNo must be greater or equal to " + seqNo + " but was " + maxSeqNo;
nextSeqNo.set(maxSeqNo+1);
} finally {
globalBufferLock.unlock();
}
@ -357,7 +364,7 @@ final class DocumentsWriterDeleteQueue implements Accountable, Closeable {
}
/**
* Returns <code>true</code> iff the given node is identical to the the slices tail,
* Returns <code>true</code> iff the given node is identical to the slices tail,
* otherwise <code>false</code>.
*/
boolean isTail(Node<?> node) {
@ -538,17 +545,69 @@ final class DocumentsWriterDeleteQueue implements Accountable, Closeable {
public long getNextSequenceNumber() {
long seqNo = nextSeqNo.getAndIncrement();
assert seqNo < maxSeqNo: "seqNo=" + seqNo + " vs maxSeqNo=" + maxSeqNo;
assert seqNo <= maxSeqNo: "seqNo=" + seqNo + " vs maxSeqNo=" + maxSeqNo;
return seqNo;
}
public long getLastSequenceNumber() {
long getLastSequenceNumber() {
return nextSeqNo.get()-1;
}
/** Inserts a gap in the sequence numbers. This is used by IW during flush or commit to ensure any in-flight threads get sequence numbers
* inside the gap */
public void skipSequenceNumbers(long jump) {
void skipSequenceNumbers(long jump) {
nextSeqNo.addAndGet(jump);
}
}
/**
* Returns the maximum completed seq no for this queue.
*/
long getMaxCompletedSeqNo() {
if (startSeqNo < nextSeqNo.get()) {
return getLastSequenceNumber();
} else {
// if we haven't advanced the seqNo make sure we fall back to the previous queue
long value = previousMaxSeqId.getAsLong();
assert value < startSeqNo : "illegal max sequence ID: " + value + " start was: " + startSeqNo;
return value;
}
}
/**
* Advances the queue to the next queue on flush. This carries over the the generation to the next queue and
* set the {@link #getMaxSeqNo()} based on the given maxNumPendingOps. This method can only be called once, subsequently
* the returned queue should be used.
* @param maxNumPendingOps the max number of possible concurrent operations that will execute on this queue after
* it was advanced. This corresponds the the number of DWPTs that own the current queue at the
* moment when this queue is advanced since each these DWPTs can increment the seqId after we
* advanced it.
* @return a new queue as a successor of this queue.
*/
synchronized DocumentsWriterDeleteQueue advanceQueue(int maxNumPendingOps) {
if (advanced) {
throw new IllegalStateException("queue was already advanced");
}
advanced = true;
long seqNo = getLastSequenceNumber() + maxNumPendingOps + 1;
maxSeqNo = seqNo;
return new DocumentsWriterDeleteQueue(infoStream, generation + 1, seqNo + 1,
// don't pass ::getMaxCompletedSeqNo here b/c otherwise we keep an reference to this queue
// and this will be a memory leak since the queues can't be GCed
() -> nextSeqNo.get() - 1);
}
/**
* Returns the maximum sequence number for this queue. This value will change once this queue is advanced.
*/
long getMaxSeqNo() {
return maxSeqNo;
}
/**
* Returns <code>true</code> if it was advanced.
*/
boolean isAdvanced() {
return advanced;
}
}

View File

@ -476,10 +476,9 @@ final class DocumentsWriterFlushControl implements Accountable, Closeable {
try {
// Insert a gap in seqNo of current active thread count, in the worst case each of those threads now have one operation in flight. It's fine
// if we have some sequence numbers that were never assigned:
seqNo = documentsWriter.deleteQueue.getLastSequenceNumber() + perThreadPool.size() + 2;
flushingQueue.maxSeqNo = seqNo + 1;
DocumentsWriterDeleteQueue newQueue = new DocumentsWriterDeleteQueue(infoStream, flushingQueue.generation + 1, seqNo + 1);
documentsWriter.deleteQueue = newQueue;
DocumentsWriterDeleteQueue newQueue = documentsWriter.deleteQueue.advanceQueue(perThreadPool.size());
seqNo = documentsWriter.deleteQueue.getMaxSeqNo();
documentsWriter.resetDeleteQueue(newQueue);
} finally {
perThreadPool.unlockNewWriters();
}
@ -530,6 +529,7 @@ final class DocumentsWriterFlushControl implements Accountable, Closeable {
fullFlushMarkDone = true; // at this point we must have collected all DWPTs that belong to the old delete queue
}
assert assertActiveDeleteQueue(documentsWriter.deleteQueue);
assert flushingQueue.getLastSequenceNumber() <= flushingQueue.getMaxSeqNo();
return seqNo;
}

View File

@ -1544,7 +1544,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
if (rld != null) {
synchronized(bufferedUpdatesStream) {
toApply.run(docID, rld);
return docWriter.deleteQueue.getNextSequenceNumber();
return docWriter.getNextSequenceNumber();
}
}
}
@ -2468,7 +2468,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
segmentInfos.changed();
globalFieldNumberMap.clear();
success = true;
long seqNo = docWriter.deleteQueue.getNextSequenceNumber();
long seqNo = docWriter.getNextSequenceNumber();
return seqNo;
} finally {
if (success == false) {
@ -2858,7 +2858,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
// Now reserve the docs, just before we update SIS:
reserveDocs(totalMaxDoc);
seqNo = docWriter.deleteQueue.getNextSequenceNumber();
seqNo = docWriter.getNextSequenceNumber();
success = true;
} finally {
@ -2987,7 +2987,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
context);
if (!merger.shouldMerge()) {
return docWriter.deleteQueue.getNextSequenceNumber();
return docWriter.getNextSequenceNumber();
}
synchronized (this) {
@ -3017,7 +3017,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
// Safe: these files must exist
deleteNewFiles(infoPerCommit.files());
return docWriter.deleteQueue.getNextSequenceNumber();
return docWriter.getNextSequenceNumber();
}
ensureOpen();
useCompoundFile = mergePolicy.useCompoundFile(segmentInfos, infoPerCommit, this);
@ -3053,7 +3053,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
// Safe: these files must exist
deleteNewFiles(infoPerCommit.files());
return docWriter.deleteQueue.getNextSequenceNumber();
return docWriter.getNextSequenceNumber();
}
ensureOpen();
@ -3061,7 +3061,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
reserveDocs(numDocs);
segmentInfos.add(infoPerCommit);
seqNo = docWriter.deleteQueue.getNextSequenceNumber();
seqNo = docWriter.getNextSequenceNumber();
checkpoint();
}
} catch (VirtualMachineError tragedy) {
@ -3599,12 +3599,12 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
infoStream.message("IW", " start flush: applyAllDeletes=" + applyAllDeletes);
infoStream.message("IW", " index before flush " + segString());
}
boolean anyChanges = false;
boolean anyChanges;
synchronized (fullFlushLock) {
boolean flushSuccess = false;
try {
long seqNo = docWriter.flushAllThreads();
long seqNo = docWriter.flushAllThreads() ;
if (seqNo < 0) {
seqNo = -seqNo;
anyChanges = true;

View File

@ -78,19 +78,21 @@ public class ControlledRealTimeReopenThread<T> extends Thread implements Closeab
private class HandleRefresh implements ReferenceManager.RefreshListener {
@Override
public void beforeRefresh() {
// Save the gen as of when we started the reopen; the
// listener (HandleRefresh above) copies this to
// searchingGen once the reopen completes:
refreshStartGen = writer.getMaxCompletedSequenceNumber();
}
@Override
public void afterRefresh(boolean didRefresh) {
refreshDone();
synchronized (ControlledRealTimeReopenThread.this) {
searchingGen = refreshStartGen;
ControlledRealTimeReopenThread.this.notifyAll();
}
}
}
private synchronized void refreshDone() {
searchingGen = refreshStartGen;
notifyAll();
}
@Override
public synchronized void close() {
//System.out.println("NRT: set finish");
@ -228,10 +230,6 @@ public class ControlledRealTimeReopenThread<T> extends Thread implements Closeab
}
lastReopenStartNS = System.nanoTime();
// Save the gen as of when we started the reopen; the
// listener (HandleRefresh above) copies this to
// searchingGen once the reopen completes:
refreshStartGen = writer.getMaxCompletedSequenceNumber();
try {
manager.maybeRefreshBlocking();
} catch (IOException ioe) {

View File

@ -44,6 +44,8 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;
import java.util.stream.Collectors;
@ -3949,4 +3951,138 @@ public class TestIndexWriter extends LuceneTestCase {
}
}
}
public void testMaxCompletedSequenceNumber() throws IOException, InterruptedException {
try (Directory dir = newDirectory();
IndexWriter writer = new IndexWriter(dir, new IndexWriterConfig());) {
assertEquals(1, writer.addDocument(new Document()));
assertEquals(2, writer.updateDocument(new Term("foo", "bar"), new Document()));
writer.flushNextBuffer();
assertEquals(3, writer.commit());
assertEquals(4, writer.addDocument(new Document()));
assertEquals(4, writer.getMaxCompletedSequenceNumber());
// commit moves seqNo by 2 since there is one DWPT that could still be in-flight
assertEquals(6, writer.commit());
assertEquals(6, writer.getMaxCompletedSequenceNumber());
assertEquals(7, writer.addDocument(new Document()));
writer.getReader().close();
// getReader moves seqNo by 2 since there is one DWPT that could still be in-flight
assertEquals(9, writer.getMaxCompletedSequenceNumber());
}
try (Directory dir = newDirectory();
IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig());
SearcherManager manager = new SearcherManager(writer, new SearcherFactory())) {
CountDownLatch start = new CountDownLatch(1);
int numDocs = 100 + random().nextInt(500);
AtomicLong maxCompletedSeqID = new AtomicLong(-1);
Thread[] threads = new Thread[2 + random().nextInt(2)];
for (int i = 0; i < threads.length; i++) {
int idx = i;
threads[i] = new Thread(() -> {
try {
start.await();
for (int j = 0; j < numDocs; j++) {
Document doc = new Document();
String id = idx +"-"+j;
doc.add(new StringField("id", id, Field.Store.NO));
long seqNo = writer.addDocument(doc);
if (maxCompletedSeqID.get() < seqNo) {
long maxCompletedSequenceNumber = writer.getMaxCompletedSequenceNumber();
manager.maybeRefreshBlocking();
maxCompletedSeqID.updateAndGet(oldVal-> Math.max(oldVal, maxCompletedSequenceNumber));
}
IndexSearcher acquire = manager.acquire();
try {
assertEquals(1, acquire.search(new TermQuery(new Term("id", id)), 10).totalHits.value);
} finally {
manager.release(acquire);
}
}
} catch (Exception e) {
throw new AssertionError(e);
}
});
threads[i].start();
}
start.countDown();
for (int i = 0; i < threads.length; i++) {
threads[i].join();
}
}
}
public void testEnsureMaxSeqNoIsAccurateDuringFlush() throws IOException, InterruptedException {
AtomicReference<CountDownLatch> waitRef = new AtomicReference<>(new CountDownLatch(0));
AtomicReference<CountDownLatch> arrivedRef = new AtomicReference<>(new CountDownLatch(0));
InfoStream stream = new InfoStream() {
@Override
public void message(String component, String message) {
if ("TP".equals(component) && "DocumentsWriterPerThread addDocuments start".equals(message)) {
try {
arrivedRef.get().countDown();
waitRef.get().await();
} catch (InterruptedException e) {
throw new AssertionError(e);
}
}
}
@Override
public boolean isEnabled(String component) {
return "TP".equals(component);
}
@Override
public void close() throws IOException {
}
};
IndexWriterConfig indexWriterConfig = newIndexWriterConfig();
indexWriterConfig.setInfoStream(stream);
try (Directory dir = newDirectory();
IndexWriter writer = new IndexWriter(dir, indexWriterConfig) {
@Override
protected boolean isEnableTestPoints() {
return true;
}
}) {
// we produce once DWPT with 1 doc
writer.addDocument(new Document());
assertEquals(1, writer.docWriter.perThreadPool.size());
long maxCompletedSequenceNumber = writer.getMaxCompletedSequenceNumber();
// safe the seqNo and use the latches to block this DWPT such that a refresh must wait for it
waitRef.set(new CountDownLatch(1));
arrivedRef.set(new CountDownLatch(1));
Thread waiterThread = new Thread(() -> {
try {
writer.addDocument(new Document());
} catch (IOException e) {
throw new AssertionError(e);
}
});
waiterThread.start();
arrivedRef.get().await();
Thread refreshThread = new Thread(() -> {
try {
writer.getReader().close();
} catch (IOException e) {
throw new AssertionError(e);
}
});
DocumentsWriterDeleteQueue deleteQueue = writer.docWriter.deleteQueue;
refreshThread.start();
// now we wait until the refresh has swapped the deleted queue and assert that
// we see an accurate seqId
while (writer.docWriter.deleteQueue == deleteQueue) {
Thread.yield(); // busy wait for refresh to swap the queue
}
try {
assertEquals(maxCompletedSequenceNumber, writer.getMaxCompletedSequenceNumber());
} finally {
waitRef.get().countDown();
waiterThread.join();
refreshThread.join();
}
assertEquals(maxCompletedSequenceNumber+2, writer.getMaxCompletedSequenceNumber());
}
}
}

View File

@ -107,7 +107,7 @@ public class TestControlledRealTimeReopenThread extends ThreadedIndexingAndSearc
System.out.println(Thread.currentThread().getName() + ": nrt: got deletes searcher=" + s);
}
try {
assertEquals(docs.size(), s.search(new TermQuery(id), 10).totalHits.value);
assertEquals("generation: " + gen, docs.size(), s.search(new TermQuery(id), 10).totalHits.value);
} finally {
nrtDeletes.release(s);
}
@ -131,7 +131,7 @@ public class TestControlledRealTimeReopenThread extends ThreadedIndexingAndSearc
System.out.println(Thread.currentThread().getName() + ": nrt: got noDeletes searcher=" + s);
}
try {
assertEquals(docs.size(), s.search(new TermQuery(id), 10).totalHits.value);
assertEquals("generation: " + gen, docs.size(), s.search(new TermQuery(id), 10).totalHits.value);
} finally {
nrtNoDeletes.release(s);
}
@ -155,7 +155,7 @@ public class TestControlledRealTimeReopenThread extends ThreadedIndexingAndSearc
System.out.println(Thread.currentThread().getName() + ": nrt: got noDeletes searcher=" + s);
}
try {
assertEquals(1, s.search(new TermQuery(id), 10).totalHits.value);
assertEquals("generation: " + gen, 1, s.search(new TermQuery(id), 10).totalHits.value);
} finally {
nrtNoDeletes.release(s);
}
@ -178,7 +178,7 @@ public class TestControlledRealTimeReopenThread extends ThreadedIndexingAndSearc
System.out.println(Thread.currentThread().getName() + ": nrt: got deletes searcher=" + s);
}
try {
assertEquals(1, s.search(new TermQuery(id), 10).totalHits.value);
assertEquals("generation: " + gen, 1, s.search(new TermQuery(id), 10).totalHits.value);
} finally {
nrtDeletes.release(s);
}