From 8ed16fd1f9a03c66d4ac81ddaa7ab70359410b95 Mon Sep 17 00:00:00 2001 From: Mike McCandless Date: Tue, 14 Jun 2016 04:09:27 -0400 Subject: [PATCH] LUCENE-7302: ensure IW.getMaxCompletedSequenceNumber only reflects a change after NRT reader refresh would also see it --- .../apache/lucene/index/DocumentsWriter.java | 40 ++++++++++++++----- .../index/DocumentsWriterPerThreadPool.java | 3 ++ .../org/apache/lucene/index/IndexWriter.java | 10 ++--- .../ControlledRealTimeReopenThread.java | 3 +- .../TestControlledRealTimeReopenThread.java | 20 +++++----- 5 files changed, 49 insertions(+), 27 deletions(-) diff --git a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java index 13800a88e36..a33d64072c9 100644 --- a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java +++ b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java @@ -122,7 +122,7 @@ final class DocumentsWriter implements Closeable, Accountable { final DocumentsWriterFlushControl flushControl; private final IndexWriter writer; private final Queue events; - + private long lastSeqNo; DocumentsWriter(IndexWriter writer, LiveIndexWriterConfig config, Directory directoryOrig, Directory directory) { this.directoryOrig = directoryOrig; @@ -144,6 +144,7 @@ final class DocumentsWriter implements Closeable, Accountable { if (applyAllDeletes(deleteQueue)) { seqNo = -seqNo; } + lastSeqNo = Math.max(lastSeqNo, seqNo); return seqNo; } @@ -158,6 +159,7 @@ final class DocumentsWriter implements Closeable, Accountable { if (applyAllDeletes(deleteQueue)) { seqNo = -seqNo; } + lastSeqNo = Math.max(lastSeqNo, seqNo); return seqNo; } @@ -168,7 +170,7 @@ final class DocumentsWriter implements Closeable, Accountable { if (applyAllDeletes(deleteQueue)) { seqNo = -seqNo; } - + lastSeqNo = Math.max(lastSeqNo, seqNo); return seqNo; } @@ -317,6 +319,17 @@ final class DocumentsWriter implements Closeable, Accountable { } } + /** returns the maximum sequence number for all previously completed operations */ + public long getMaxCompletedSequenceNumber() { + long value = lastSeqNo; + int limit = perThreadPool.getMaxThreadStates(); + for(int i = 0; i < limit; i++) { + ThreadState perThread = perThreadPool.getThreadState(i); + value = Math.max(value, perThread.lastSeqNo); + } + return value; + } + boolean anyChanges() { /* * changes are either in a DWPT or in the deleteQueue. @@ -413,7 +426,7 @@ final class DocumentsWriter implements Closeable, Accountable { final ThreadState perThread = flushControl.obtainAndLock(); final DocumentsWriterPerThread flushingDWPT; - final long seqNo; + long seqNo; try { // This must happen after we've pulled the ThreadState because IW.close @@ -437,15 +450,18 @@ final class DocumentsWriter implements Closeable, Accountable { } final boolean isUpdate = delTerm != null; flushingDWPT = flushControl.doAfterDocument(perThread, isUpdate); + + assert seqNo > perThread.lastSeqNo: "seqNo=" + seqNo + " lastSeqNo=" + perThread.lastSeqNo; + perThread.lastSeqNo = seqNo; + } finally { perThreadPool.release(perThread); } if (postUpdate(flushingDWPT, hasEvents)) { - return -seqNo; - } else { - return seqNo; + seqNo = -seqNo; } + return seqNo; } long updateDocument(final Iterable doc, final Analyzer analyzer, @@ -456,7 +472,7 @@ final class DocumentsWriter implements Closeable, Accountable { final ThreadState perThread = flushControl.obtainAndLock(); final DocumentsWriterPerThread flushingDWPT; - final long seqNo; + long seqNo; try { // This must happen after we've pulled the ThreadState because IW.close // waits for all ThreadStates to be released: @@ -479,15 +495,19 @@ final class DocumentsWriter implements Closeable, Accountable { } final boolean isUpdate = delTerm != null; flushingDWPT = flushControl.doAfterDocument(perThread, isUpdate); + + assert seqNo > perThread.lastSeqNo: "seqNo=" + seqNo + " lastSeqNo=" + perThread.lastSeqNo; + perThread.lastSeqNo = seqNo; + } finally { perThreadPool.release(perThread); } if (postUpdate(flushingDWPT, hasEvents)) { - return -seqNo; - } else { - return seqNo; + seqNo = -seqNo; } + + return seqNo; } private boolean doFlush(DocumentsWriterPerThread flushingDWPT) throws IOException, AbortingException { diff --git a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java index 3802805af2d..cc723424853 100644 --- a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java +++ b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java @@ -59,6 +59,9 @@ final class DocumentsWriterPerThreadPool { // write access guarded by DocumentsWriterFlushControl long bytesUsed = 0; + // set by DocumentsWriter after each indexing op finishes + volatile long lastSeqNo; + ThreadState(DocumentsWriterPerThread dpwt) { this.dwpt = dpwt; } diff --git a/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java b/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java index b5e0c2254b8..5fe164856ee 100644 --- a/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java +++ b/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java @@ -1457,7 +1457,6 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable { changed(); } //System.out.println(" yes " + info.info.name + " " + docID); - return docWriter.deleteQueue.getNextSequenceNumber(); } } else { @@ -5049,12 +5048,13 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable { }; } - /** Returns the last sequence number, or 0 - * if no index-changing operations have completed yet. + /** Returns the highest sequence number across + * all completed operations, or 0 if no operations have finished yet. Still + * in-flight operations (in other threads) are not counted until they finish. * * @lucene.experimental */ - public long getLastSequenceNumber() { + public long getMaxCompletedSequenceNumber() { ensureOpen(); - return docWriter.deleteQueue.getLastSequenceNumber(); + return docWriter.getMaxCompletedSequenceNumber(); } } diff --git a/lucene/core/src/java/org/apache/lucene/search/ControlledRealTimeReopenThread.java b/lucene/core/src/java/org/apache/lucene/search/ControlledRealTimeReopenThread.java index 466d793cc1e..a98a30d8c38 100644 --- a/lucene/core/src/java/org/apache/lucene/search/ControlledRealTimeReopenThread.java +++ b/lucene/core/src/java/org/apache/lucene/search/ControlledRealTimeReopenThread.java @@ -150,7 +150,6 @@ public class ControlledRealTimeReopenThread extends Thread implements Closeab * or false if maxMS wait time was exceeded */ public synchronized boolean waitForGeneration(long targetGen, int maxMS) throws InterruptedException { - final long curGen = writer.getLastSequenceNumber(); if (targetGen > searchingGen) { // Notify the reopen thread that the waitingGen has // changed, so it may wake up and realize it should @@ -232,7 +231,7 @@ public class ControlledRealTimeReopenThread extends Thread implements Closeab // Save the gen as of when we started the reopen; the // listener (HandleRefresh above) copies this to // searchingGen once the reopen completes: - refreshStartGen = writer.getLastSequenceNumber(); + refreshStartGen = writer.getMaxCompletedSequenceNumber(); try { manager.maybeRefreshBlocking(); } catch (IOException ioe) { diff --git a/lucene/core/src/test/org/apache/lucene/search/TestControlledRealTimeReopenThread.java b/lucene/core/src/test/org/apache/lucene/search/TestControlledRealTimeReopenThread.java index 69822a6d969..779c1f21a1f 100644 --- a/lucene/core/src/test/org/apache/lucene/search/TestControlledRealTimeReopenThread.java +++ b/lucene/core/src/test/org/apache/lucene/search/TestControlledRealTimeReopenThread.java @@ -98,13 +98,13 @@ public class TestControlledRealTimeReopenThread extends ThreadedIndexingAndSearc // Randomly verify the update "took": if (random().nextInt(20) == 2) { if (VERBOSE) { - System.out.println(Thread.currentThread().getName() + ": nrt: verify " + id); + System.out.println(Thread.currentThread().getName() + ": nrt: verify updateDocuments " + id + " gen=" + gen); } nrtDeletesThread.waitForGeneration(gen); assertTrue(gen <= nrtDeletesThread.getSearchingGen()); final IndexSearcher s = nrtDeletes.acquire(); if (VERBOSE) { - System.out.println(Thread.currentThread().getName() + ": nrt: got searcher=" + s); + System.out.println(Thread.currentThread().getName() + ": nrt: got deletes searcher=" + s); } try { assertEquals(docs.size(), s.search(new TermQuery(id), 10).totalHits); @@ -122,13 +122,13 @@ public class TestControlledRealTimeReopenThread extends ThreadedIndexingAndSearc // Randomly verify the add "took": if (random().nextInt(20) == 2) { if (VERBOSE) { - System.out.println(Thread.currentThread().getName() + ": nrt: verify " + id); + System.out.println(Thread.currentThread().getName() + ": nrt: verify addDocuments " + id + " gen=" + gen); } nrtNoDeletesThread.waitForGeneration(gen); assertTrue(gen <= nrtNoDeletesThread.getSearchingGen()); final IndexSearcher s = nrtNoDeletes.acquire(); if (VERBOSE) { - System.out.println(Thread.currentThread().getName() + ": nrt: got searcher=" + s); + System.out.println(Thread.currentThread().getName() + ": nrt: got noDeletes searcher=" + s); } try { assertEquals(docs.size(), s.search(new TermQuery(id), 10).totalHits); @@ -146,13 +146,13 @@ public class TestControlledRealTimeReopenThread extends ThreadedIndexingAndSearc // Randomly verify the add "took": if (random().nextInt(20) == 2) { if (VERBOSE) { - System.out.println(Thread.currentThread().getName() + ": nrt: verify " + id); + System.out.println(Thread.currentThread().getName() + ": nrt: verify addDocument " + id + " gen=" + gen); } nrtNoDeletesThread.waitForGeneration(gen); assertTrue(gen <= nrtNoDeletesThread.getSearchingGen()); final IndexSearcher s = nrtNoDeletes.acquire(); if (VERBOSE) { - System.out.println(Thread.currentThread().getName() + ": nrt: got searcher=" + s); + System.out.println(Thread.currentThread().getName() + ": nrt: got noDeletes searcher=" + s); } try { assertEquals(1, s.search(new TermQuery(id), 10).totalHits); @@ -169,13 +169,13 @@ public class TestControlledRealTimeReopenThread extends ThreadedIndexingAndSearc // Randomly verify the udpate "took": if (random().nextInt(20) == 2) { if (VERBOSE) { - System.out.println(Thread.currentThread().getName() + ": nrt: verify " + id); + System.out.println(Thread.currentThread().getName() + ": nrt: verify updateDocument " + id + " gen=" + gen); } nrtDeletesThread.waitForGeneration(gen); assertTrue(gen <= nrtDeletesThread.getSearchingGen()); final IndexSearcher s = nrtDeletes.acquire(); if (VERBOSE) { - System.out.println(Thread.currentThread().getName() + ": nrt: got searcher=" + s); + System.out.println(Thread.currentThread().getName() + ": nrt: got deletes searcher=" + s); } try { assertEquals(1, s.search(new TermQuery(id), 10).totalHits); @@ -192,13 +192,13 @@ public class TestControlledRealTimeReopenThread extends ThreadedIndexingAndSearc // randomly verify the delete "took": if (random().nextInt(20) == 7) { if (VERBOSE) { - System.out.println(Thread.currentThread().getName() + ": nrt: verify del " + id); + System.out.println(Thread.currentThread().getName() + ": nrt: verify deleteDocuments " + id + " gen=" + gen); } nrtDeletesThread.waitForGeneration(gen); assertTrue(gen <= nrtDeletesThread.getSearchingGen()); final IndexSearcher s = nrtDeletes.acquire(); if (VERBOSE) { - System.out.println(Thread.currentThread().getName() + ": nrt: got searcher=" + s); + System.out.println(Thread.currentThread().getName() + ": nrt: got deletes searcher=" + s); } try { assertEquals(0, s.search(new TermQuery(id), 10).totalHits);