diff --git a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java
index 965c10d87d0..276f19500d4 100644
--- a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java
+++ b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java
@@ -331,7 +331,7 @@ final class DocumentsWriter implements Closeable, Accountable {
/** returns the maximum sequence number for all previously completed operations */
long getMaxCompletedSequenceNumber() {
- return deleteQueue.getLastSequenceNumber();
+ return deleteQueue.getMaxCompletedSeqNo();
}
@@ -550,6 +550,20 @@ final class DocumentsWriter implements Closeable, Accountable {
return hasEvents;
}
+ synchronized long getNextSequenceNumber() {
+ // this must be synced otherwise the delete queue might change concurrently
+ return deleteQueue.getNextSequenceNumber();
+ }
+
+ synchronized void resetDeleteQueue(DocumentsWriterDeleteQueue newQueue) {
+ assert deleteQueue.isAdvanced();
+ assert newQueue.isAdvanced() == false;
+ assert deleteQueue.getLastSequenceNumber() <= newQueue.getLastSequenceNumber();
+ assert deleteQueue.getMaxSeqNo() <= newQueue.getLastSequenceNumber()
+ : "maxSeqNo: " + deleteQueue.getMaxSeqNo() + " vs. " + newQueue.getLastSequenceNumber();
+ deleteQueue = newQueue;
+ }
+
interface FlushNotifications { // TODO maybe we find a better name for this?
/**
@@ -630,7 +644,6 @@ final class DocumentsWriter implements Closeable, Accountable {
}
long seqNo;
-
synchronized (this) {
pendingChangesInCurrentFullFlush = anyChanges();
flushingDeleteQueue = deleteQueue;
diff --git a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterDeleteQueue.java b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterDeleteQueue.java
index 828064797ff..92e7be7d359 100644
--- a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterDeleteQueue.java
+++ b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterDeleteQueue.java
@@ -20,6 +20,7 @@ import java.io.Closeable;
import java.util.Arrays;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.LongSupplier;
import org.apache.lucene.index.DocValuesUpdate.BinaryDocValuesUpdate;
import org.apache.lucene.index.DocValuesUpdate.NumericDocValuesUpdate;
@@ -91,23 +92,26 @@ final class DocumentsWriterDeleteQueue implements Accountable, Closeable {
private final InfoStream infoStream;
- // for asserts
- long maxSeqNo = Long.MAX_VALUE;
+ private volatile long maxSeqNo = Long.MAX_VALUE;
+
+ private final long startSeqNo;
+ private final LongSupplier previousMaxSeqId;
+ private boolean advanced;
DocumentsWriterDeleteQueue(InfoStream infoStream) {
// seqNo must start at 1 because some APIs negate this to also return a boolean
- this(infoStream, 0, 1);
- }
-
- DocumentsWriterDeleteQueue(InfoStream infoStream, long generation, long startSeqNo) {
- this(infoStream, new BufferedUpdates("global"), generation, startSeqNo);
+ this(infoStream, 0, 1, () -> 0);
}
- DocumentsWriterDeleteQueue(InfoStream infoStream, BufferedUpdates globalBufferedUpdates, long generation, long startSeqNo) {
+ private DocumentsWriterDeleteQueue(InfoStream infoStream, long generation, long startSeqNo, LongSupplier previousMaxSeqId) {
this.infoStream = infoStream;
- this.globalBufferedUpdates = globalBufferedUpdates;
+ this.globalBufferedUpdates = new BufferedUpdates("global");
this.generation = generation;
this.nextSeqNo = new AtomicLong(startSeqNo);
+ this.startSeqNo = startSeqNo;
+ this.previousMaxSeqId = previousMaxSeqId;
+ long value = previousMaxSeqId.getAsLong();
+ assert value <= startSeqNo : "illegal max sequence ID: " + value + " start was: " + startSeqNo;
/*
* we use a sentinel instance as our initial tail. No slice will ever try to
* apply this tail since the head is always omitted.
@@ -311,6 +315,9 @@ final class DocumentsWriterDeleteQueue implements Accountable, Closeable {
throw new IllegalStateException("Can't close queue unless all changes are applied");
}
this.closed = true;
+ long seqNo = nextSeqNo.get();
+ assert seqNo <= maxSeqNo : "maxSeqNo must be greater or equal to " + seqNo + " but was " + maxSeqNo;
+ nextSeqNo.set(maxSeqNo+1);
} finally {
globalBufferLock.unlock();
}
@@ -357,7 +364,7 @@ final class DocumentsWriterDeleteQueue implements Accountable, Closeable {
}
/**
- * Returns true
iff the given node is identical to the the slices tail,
+ * Returns true
iff the given node is identical to the slices tail,
* otherwise false
.
*/
boolean isTail(Node> node) {
@@ -538,17 +545,69 @@ final class DocumentsWriterDeleteQueue implements Accountable, Closeable {
public long getNextSequenceNumber() {
long seqNo = nextSeqNo.getAndIncrement();
- assert seqNo < maxSeqNo: "seqNo=" + seqNo + " vs maxSeqNo=" + maxSeqNo;
+ assert seqNo <= maxSeqNo: "seqNo=" + seqNo + " vs maxSeqNo=" + maxSeqNo;
return seqNo;
}
- public long getLastSequenceNumber() {
+ long getLastSequenceNumber() {
return nextSeqNo.get()-1;
}
/** Inserts a gap in the sequence numbers. This is used by IW during flush or commit to ensure any in-flight threads get sequence numbers
* inside the gap */
- public void skipSequenceNumbers(long jump) {
+ void skipSequenceNumbers(long jump) {
nextSeqNo.addAndGet(jump);
- }
+ }
+
+ /**
+ * Returns the maximum completed seq no for this queue.
+ */
+ long getMaxCompletedSeqNo() {
+ if (startSeqNo < nextSeqNo.get()) {
+ return getLastSequenceNumber();
+ } else {
+ // if we haven't advanced the seqNo make sure we fall back to the previous queue
+ long value = previousMaxSeqId.getAsLong();
+ assert value < startSeqNo : "illegal max sequence ID: " + value + " start was: " + startSeqNo;
+ return value;
+ }
+ }
+
+ /**
+ * Advances the queue to the next queue on flush. This carries over the the generation to the next queue and
+ * set the {@link #getMaxSeqNo()} based on the given maxNumPendingOps. This method can only be called once, subsequently
+ * the returned queue should be used.
+ * @param maxNumPendingOps the max number of possible concurrent operations that will execute on this queue after
+ * it was advanced. This corresponds the the number of DWPTs that own the current queue at the
+ * moment when this queue is advanced since each these DWPTs can increment the seqId after we
+ * advanced it.
+ * @return a new queue as a successor of this queue.
+ */
+ synchronized DocumentsWriterDeleteQueue advanceQueue(int maxNumPendingOps) {
+ if (advanced) {
+ throw new IllegalStateException("queue was already advanced");
+ }
+ advanced = true;
+ long seqNo = getLastSequenceNumber() + maxNumPendingOps + 1;
+ maxSeqNo = seqNo;
+ return new DocumentsWriterDeleteQueue(infoStream, generation + 1, seqNo + 1,
+ // don't pass ::getMaxCompletedSeqNo here b/c otherwise we keep an reference to this queue
+ // and this will be a memory leak since the queues can't be GCed
+ () -> nextSeqNo.get() - 1);
+
+ }
+
+ /**
+ * Returns the maximum sequence number for this queue. This value will change once this queue is advanced.
+ */
+ long getMaxSeqNo() {
+ return maxSeqNo;
+ }
+
+ /**
+ * Returns true
if it was advanced.
+ */
+ boolean isAdvanced() {
+ return advanced;
+ }
}
diff --git a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java
index 2c1a9358187..f4a86846567 100644
--- a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java
+++ b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java
@@ -476,10 +476,9 @@ final class DocumentsWriterFlushControl implements Accountable, Closeable {
try {
// Insert a gap in seqNo of current active thread count, in the worst case each of those threads now have one operation in flight. It's fine
// if we have some sequence numbers that were never assigned:
- seqNo = documentsWriter.deleteQueue.getLastSequenceNumber() + perThreadPool.size() + 2;
- flushingQueue.maxSeqNo = seqNo + 1;
- DocumentsWriterDeleteQueue newQueue = new DocumentsWriterDeleteQueue(infoStream, flushingQueue.generation + 1, seqNo + 1);
- documentsWriter.deleteQueue = newQueue;
+ DocumentsWriterDeleteQueue newQueue = documentsWriter.deleteQueue.advanceQueue(perThreadPool.size());
+ seqNo = documentsWriter.deleteQueue.getMaxSeqNo();
+ documentsWriter.resetDeleteQueue(newQueue);
} finally {
perThreadPool.unlockNewWriters();
}
@@ -530,6 +529,7 @@ final class DocumentsWriterFlushControl implements Accountable, Closeable {
fullFlushMarkDone = true; // at this point we must have collected all DWPTs that belong to the old delete queue
}
assert assertActiveDeleteQueue(documentsWriter.deleteQueue);
+ assert flushingQueue.getLastSequenceNumber() <= flushingQueue.getMaxSeqNo();
return seqNo;
}
diff --git a/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java b/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
index df688376358..99ee5cba615 100644
--- a/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
+++ b/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
@@ -1544,7 +1544,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
if (rld != null) {
synchronized(bufferedUpdatesStream) {
toApply.run(docID, rld);
- return docWriter.deleteQueue.getNextSequenceNumber();
+ return docWriter.getNextSequenceNumber();
}
}
}
@@ -2468,7 +2468,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
segmentInfos.changed();
globalFieldNumberMap.clear();
success = true;
- long seqNo = docWriter.deleteQueue.getNextSequenceNumber();
+ long seqNo = docWriter.getNextSequenceNumber();
return seqNo;
} finally {
if (success == false) {
@@ -2858,7 +2858,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
// Now reserve the docs, just before we update SIS:
reserveDocs(totalMaxDoc);
- seqNo = docWriter.deleteQueue.getNextSequenceNumber();
+ seqNo = docWriter.getNextSequenceNumber();
success = true;
} finally {
@@ -2987,7 +2987,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
context);
if (!merger.shouldMerge()) {
- return docWriter.deleteQueue.getNextSequenceNumber();
+ return docWriter.getNextSequenceNumber();
}
synchronized (this) {
@@ -3017,7 +3017,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
// Safe: these files must exist
deleteNewFiles(infoPerCommit.files());
- return docWriter.deleteQueue.getNextSequenceNumber();
+ return docWriter.getNextSequenceNumber();
}
ensureOpen();
useCompoundFile = mergePolicy.useCompoundFile(segmentInfos, infoPerCommit, this);
@@ -3053,7 +3053,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
// Safe: these files must exist
deleteNewFiles(infoPerCommit.files());
- return docWriter.deleteQueue.getNextSequenceNumber();
+ return docWriter.getNextSequenceNumber();
}
ensureOpen();
@@ -3061,7 +3061,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
reserveDocs(numDocs);
segmentInfos.add(infoPerCommit);
- seqNo = docWriter.deleteQueue.getNextSequenceNumber();
+ seqNo = docWriter.getNextSequenceNumber();
checkpoint();
}
} catch (VirtualMachineError tragedy) {
@@ -3599,12 +3599,12 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
infoStream.message("IW", " start flush: applyAllDeletes=" + applyAllDeletes);
infoStream.message("IW", " index before flush " + segString());
}
- boolean anyChanges = false;
+ boolean anyChanges;
synchronized (fullFlushLock) {
boolean flushSuccess = false;
try {
- long seqNo = docWriter.flushAllThreads();
+ long seqNo = docWriter.flushAllThreads() ;
if (seqNo < 0) {
seqNo = -seqNo;
anyChanges = true;
diff --git a/lucene/core/src/java/org/apache/lucene/search/ControlledRealTimeReopenThread.java b/lucene/core/src/java/org/apache/lucene/search/ControlledRealTimeReopenThread.java
index 849978305b1..ccd954d2c2d 100644
--- a/lucene/core/src/java/org/apache/lucene/search/ControlledRealTimeReopenThread.java
+++ b/lucene/core/src/java/org/apache/lucene/search/ControlledRealTimeReopenThread.java
@@ -78,19 +78,21 @@ public class ControlledRealTimeReopenThread extends Thread implements Closeab
private class HandleRefresh implements ReferenceManager.RefreshListener {
@Override
public void beforeRefresh() {
+ // Save the gen as of when we started the reopen; the
+ // listener (HandleRefresh above) copies this to
+ // searchingGen once the reopen completes:
+ refreshStartGen = writer.getMaxCompletedSequenceNumber();
}
@Override
public void afterRefresh(boolean didRefresh) {
- refreshDone();
+ synchronized (ControlledRealTimeReopenThread.this) {
+ searchingGen = refreshStartGen;
+ ControlledRealTimeReopenThread.this.notifyAll();
+ }
}
}
- private synchronized void refreshDone() {
- searchingGen = refreshStartGen;
- notifyAll();
- }
-
@Override
public synchronized void close() {
//System.out.println("NRT: set finish");
@@ -228,10 +230,6 @@ public class ControlledRealTimeReopenThread extends Thread implements Closeab
}
lastReopenStartNS = System.nanoTime();
- // Save the gen as of when we started the reopen; the
- // listener (HandleRefresh above) copies this to
- // searchingGen once the reopen completes:
- refreshStartGen = writer.getMaxCompletedSequenceNumber();
try {
manager.maybeRefreshBlocking();
} catch (IOException ioe) {
diff --git a/lucene/core/src/test/org/apache/lucene/index/TestIndexWriter.java b/lucene/core/src/test/org/apache/lucene/index/TestIndexWriter.java
index 0225f1eed9a..42a85d00285 100644
--- a/lucene/core/src/test/org/apache/lucene/index/TestIndexWriter.java
+++ b/lucene/core/src/test/org/apache/lucene/index/TestIndexWriter.java
@@ -44,6 +44,8 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;
import java.util.stream.Collectors;
@@ -3949,4 +3951,138 @@ public class TestIndexWriter extends LuceneTestCase {
}
}
}
+
+ public void testMaxCompletedSequenceNumber() throws IOException, InterruptedException {
+ try (Directory dir = newDirectory();
+ IndexWriter writer = new IndexWriter(dir, new IndexWriterConfig());) {
+ assertEquals(1, writer.addDocument(new Document()));
+ assertEquals(2, writer.updateDocument(new Term("foo", "bar"), new Document()));
+ writer.flushNextBuffer();
+ assertEquals(3, writer.commit());
+ assertEquals(4, writer.addDocument(new Document()));
+ assertEquals(4, writer.getMaxCompletedSequenceNumber());
+ // commit moves seqNo by 2 since there is one DWPT that could still be in-flight
+ assertEquals(6, writer.commit());
+ assertEquals(6, writer.getMaxCompletedSequenceNumber());
+ assertEquals(7, writer.addDocument(new Document()));
+ writer.getReader().close();
+ // getReader moves seqNo by 2 since there is one DWPT that could still be in-flight
+ assertEquals(9, writer.getMaxCompletedSequenceNumber());
+ }
+ try (Directory dir = newDirectory();
+ IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig());
+ SearcherManager manager = new SearcherManager(writer, new SearcherFactory())) {
+ CountDownLatch start = new CountDownLatch(1);
+ int numDocs = 100 + random().nextInt(500);
+ AtomicLong maxCompletedSeqID = new AtomicLong(-1);
+ Thread[] threads = new Thread[2 + random().nextInt(2)];
+ for (int i = 0; i < threads.length; i++) {
+ int idx = i;
+ threads[i] = new Thread(() -> {
+ try {
+ start.await();
+ for (int j = 0; j < numDocs; j++) {
+ Document doc = new Document();
+ String id = idx +"-"+j;
+ doc.add(new StringField("id", id, Field.Store.NO));
+ long seqNo = writer.addDocument(doc);
+ if (maxCompletedSeqID.get() < seqNo) {
+ long maxCompletedSequenceNumber = writer.getMaxCompletedSequenceNumber();
+ manager.maybeRefreshBlocking();
+ maxCompletedSeqID.updateAndGet(oldVal-> Math.max(oldVal, maxCompletedSequenceNumber));
+ }
+ IndexSearcher acquire = manager.acquire();
+ try {
+ assertEquals(1, acquire.search(new TermQuery(new Term("id", id)), 10).totalHits.value);
+ } finally {
+ manager.release(acquire);
+ }
+ }
+ } catch (Exception e) {
+ throw new AssertionError(e);
+ }
+ });
+ threads[i].start();
+ }
+ start.countDown();
+ for (int i = 0; i < threads.length; i++) {
+ threads[i].join();
+ }
+ }
+ }
+
+ public void testEnsureMaxSeqNoIsAccurateDuringFlush() throws IOException, InterruptedException {
+ AtomicReference waitRef = new AtomicReference<>(new CountDownLatch(0));
+ AtomicReference arrivedRef = new AtomicReference<>(new CountDownLatch(0));
+ InfoStream stream = new InfoStream() {
+ @Override
+ public void message(String component, String message) {
+ if ("TP".equals(component) && "DocumentsWriterPerThread addDocuments start".equals(message)) {
+ try {
+ arrivedRef.get().countDown();
+ waitRef.get().await();
+ } catch (InterruptedException e) {
+ throw new AssertionError(e);
+ }
+ }
+ }
+
+ @Override
+ public boolean isEnabled(String component) {
+ return "TP".equals(component);
+ }
+
+ @Override
+ public void close() throws IOException {
+ }
+ };
+ IndexWriterConfig indexWriterConfig = newIndexWriterConfig();
+ indexWriterConfig.setInfoStream(stream);
+ try (Directory dir = newDirectory();
+ IndexWriter writer = new IndexWriter(dir, indexWriterConfig) {
+ @Override
+ protected boolean isEnableTestPoints() {
+ return true;
+ }
+ }) {
+ // we produce once DWPT with 1 doc
+ writer.addDocument(new Document());
+ assertEquals(1, writer.docWriter.perThreadPool.size());
+ long maxCompletedSequenceNumber = writer.getMaxCompletedSequenceNumber();
+ // safe the seqNo and use the latches to block this DWPT such that a refresh must wait for it
+ waitRef.set(new CountDownLatch(1));
+ arrivedRef.set(new CountDownLatch(1));
+ Thread waiterThread = new Thread(() -> {
+ try {
+ writer.addDocument(new Document());
+ } catch (IOException e) {
+ throw new AssertionError(e);
+ }
+ });
+ waiterThread.start();
+ arrivedRef.get().await();
+ Thread refreshThread = new Thread(() -> {
+ try {
+ writer.getReader().close();
+ } catch (IOException e) {
+ throw new AssertionError(e);
+ }
+ });
+ DocumentsWriterDeleteQueue deleteQueue = writer.docWriter.deleteQueue;
+ refreshThread.start();
+ // now we wait until the refresh has swapped the deleted queue and assert that
+ // we see an accurate seqId
+ while (writer.docWriter.deleteQueue == deleteQueue) {
+ Thread.yield(); // busy wait for refresh to swap the queue
+ }
+ try {
+ assertEquals(maxCompletedSequenceNumber, writer.getMaxCompletedSequenceNumber());
+ } finally {
+ waitRef.get().countDown();
+ waiterThread.join();
+ refreshThread.join();
+ }
+ assertEquals(maxCompletedSequenceNumber+2, writer.getMaxCompletedSequenceNumber());
+ }
+ }
}
diff --git a/lucene/core/src/test/org/apache/lucene/search/TestControlledRealTimeReopenThread.java b/lucene/core/src/test/org/apache/lucene/search/TestControlledRealTimeReopenThread.java
index c8f9c8489c7..d8cf98cc98f 100644
--- a/lucene/core/src/test/org/apache/lucene/search/TestControlledRealTimeReopenThread.java
+++ b/lucene/core/src/test/org/apache/lucene/search/TestControlledRealTimeReopenThread.java
@@ -107,7 +107,7 @@ public class TestControlledRealTimeReopenThread extends ThreadedIndexingAndSearc
System.out.println(Thread.currentThread().getName() + ": nrt: got deletes searcher=" + s);
}
try {
- assertEquals(docs.size(), s.search(new TermQuery(id), 10).totalHits.value);
+ assertEquals("generation: " + gen, docs.size(), s.search(new TermQuery(id), 10).totalHits.value);
} finally {
nrtDeletes.release(s);
}
@@ -131,7 +131,7 @@ public class TestControlledRealTimeReopenThread extends ThreadedIndexingAndSearc
System.out.println(Thread.currentThread().getName() + ": nrt: got noDeletes searcher=" + s);
}
try {
- assertEquals(docs.size(), s.search(new TermQuery(id), 10).totalHits.value);
+ assertEquals("generation: " + gen, docs.size(), s.search(new TermQuery(id), 10).totalHits.value);
} finally {
nrtNoDeletes.release(s);
}
@@ -155,7 +155,7 @@ public class TestControlledRealTimeReopenThread extends ThreadedIndexingAndSearc
System.out.println(Thread.currentThread().getName() + ": nrt: got noDeletes searcher=" + s);
}
try {
- assertEquals(1, s.search(new TermQuery(id), 10).totalHits.value);
+ assertEquals("generation: " + gen, 1, s.search(new TermQuery(id), 10).totalHits.value);
} finally {
nrtNoDeletes.release(s);
}
@@ -178,7 +178,7 @@ public class TestControlledRealTimeReopenThread extends ThreadedIndexingAndSearc
System.out.println(Thread.currentThread().getName() + ": nrt: got deletes searcher=" + s);
}
try {
- assertEquals(1, s.search(new TermQuery(id), 10).totalHits.value);
+ assertEquals("generation: " + gen, 1, s.search(new TermQuery(id), 10).totalHits.value);
} finally {
nrtDeletes.release(s);
}