LUCENE-7302: ensure IW.getMaxCompletedSequenceNumber only reflects a change after NRT reader refresh would also see it

This commit is contained in:
Mike McCandless 2016-06-14 04:09:27 -04:00
parent 816b502025
commit 8ed16fd1f9
5 changed files with 49 additions and 27 deletions

View File

@ -122,7 +122,7 @@ final class DocumentsWriter implements Closeable, Accountable {
final DocumentsWriterFlushControl flushControl; final DocumentsWriterFlushControl flushControl;
private final IndexWriter writer; private final IndexWriter writer;
private final Queue<Event> events; private final Queue<Event> events;
private long lastSeqNo;
DocumentsWriter(IndexWriter writer, LiveIndexWriterConfig config, Directory directoryOrig, Directory directory) { DocumentsWriter(IndexWriter writer, LiveIndexWriterConfig config, Directory directoryOrig, Directory directory) {
this.directoryOrig = directoryOrig; this.directoryOrig = directoryOrig;
@ -144,6 +144,7 @@ final class DocumentsWriter implements Closeable, Accountable {
if (applyAllDeletes(deleteQueue)) { if (applyAllDeletes(deleteQueue)) {
seqNo = -seqNo; seqNo = -seqNo;
} }
lastSeqNo = Math.max(lastSeqNo, seqNo);
return seqNo; return seqNo;
} }
@ -158,6 +159,7 @@ final class DocumentsWriter implements Closeable, Accountable {
if (applyAllDeletes(deleteQueue)) { if (applyAllDeletes(deleteQueue)) {
seqNo = -seqNo; seqNo = -seqNo;
} }
lastSeqNo = Math.max(lastSeqNo, seqNo);
return seqNo; return seqNo;
} }
@ -168,7 +170,7 @@ final class DocumentsWriter implements Closeable, Accountable {
if (applyAllDeletes(deleteQueue)) { if (applyAllDeletes(deleteQueue)) {
seqNo = -seqNo; seqNo = -seqNo;
} }
lastSeqNo = Math.max(lastSeqNo, seqNo);
return seqNo; return seqNo;
} }
@ -317,6 +319,17 @@ final class DocumentsWriter implements Closeable, Accountable {
} }
} }
/** returns the maximum sequence number for all previously completed operations */
public long getMaxCompletedSequenceNumber() {
long value = lastSeqNo;
int limit = perThreadPool.getMaxThreadStates();
for(int i = 0; i < limit; i++) {
ThreadState perThread = perThreadPool.getThreadState(i);
value = Math.max(value, perThread.lastSeqNo);
}
return value;
}
boolean anyChanges() { boolean anyChanges() {
/* /*
* changes are either in a DWPT or in the deleteQueue. * changes are either in a DWPT or in the deleteQueue.
@ -413,7 +426,7 @@ final class DocumentsWriter implements Closeable, Accountable {
final ThreadState perThread = flushControl.obtainAndLock(); final ThreadState perThread = flushControl.obtainAndLock();
final DocumentsWriterPerThread flushingDWPT; final DocumentsWriterPerThread flushingDWPT;
final long seqNo; long seqNo;
try { try {
// This must happen after we've pulled the ThreadState because IW.close // This must happen after we've pulled the ThreadState because IW.close
@ -437,15 +450,18 @@ final class DocumentsWriter implements Closeable, Accountable {
} }
final boolean isUpdate = delTerm != null; final boolean isUpdate = delTerm != null;
flushingDWPT = flushControl.doAfterDocument(perThread, isUpdate); flushingDWPT = flushControl.doAfterDocument(perThread, isUpdate);
assert seqNo > perThread.lastSeqNo: "seqNo=" + seqNo + " lastSeqNo=" + perThread.lastSeqNo;
perThread.lastSeqNo = seqNo;
} finally { } finally {
perThreadPool.release(perThread); perThreadPool.release(perThread);
} }
if (postUpdate(flushingDWPT, hasEvents)) { if (postUpdate(flushingDWPT, hasEvents)) {
return -seqNo; seqNo = -seqNo;
} else {
return seqNo;
} }
return seqNo;
} }
long updateDocument(final Iterable<? extends IndexableField> doc, final Analyzer analyzer, long updateDocument(final Iterable<? extends IndexableField> doc, final Analyzer analyzer,
@ -456,7 +472,7 @@ final class DocumentsWriter implements Closeable, Accountable {
final ThreadState perThread = flushControl.obtainAndLock(); final ThreadState perThread = flushControl.obtainAndLock();
final DocumentsWriterPerThread flushingDWPT; final DocumentsWriterPerThread flushingDWPT;
final long seqNo; long seqNo;
try { try {
// This must happen after we've pulled the ThreadState because IW.close // This must happen after we've pulled the ThreadState because IW.close
// waits for all ThreadStates to be released: // waits for all ThreadStates to be released:
@ -479,15 +495,19 @@ final class DocumentsWriter implements Closeable, Accountable {
} }
final boolean isUpdate = delTerm != null; final boolean isUpdate = delTerm != null;
flushingDWPT = flushControl.doAfterDocument(perThread, isUpdate); flushingDWPT = flushControl.doAfterDocument(perThread, isUpdate);
assert seqNo > perThread.lastSeqNo: "seqNo=" + seqNo + " lastSeqNo=" + perThread.lastSeqNo;
perThread.lastSeqNo = seqNo;
} finally { } finally {
perThreadPool.release(perThread); perThreadPool.release(perThread);
} }
if (postUpdate(flushingDWPT, hasEvents)) { if (postUpdate(flushingDWPT, hasEvents)) {
return -seqNo; seqNo = -seqNo;
} else {
return seqNo;
} }
return seqNo;
} }
private boolean doFlush(DocumentsWriterPerThread flushingDWPT) throws IOException, AbortingException { private boolean doFlush(DocumentsWriterPerThread flushingDWPT) throws IOException, AbortingException {

View File

@ -59,6 +59,9 @@ final class DocumentsWriterPerThreadPool {
// write access guarded by DocumentsWriterFlushControl // write access guarded by DocumentsWriterFlushControl
long bytesUsed = 0; long bytesUsed = 0;
// set by DocumentsWriter after each indexing op finishes
volatile long lastSeqNo;
ThreadState(DocumentsWriterPerThread dpwt) { ThreadState(DocumentsWriterPerThread dpwt) {
this.dwpt = dpwt; this.dwpt = dpwt;
} }

View File

@ -1457,7 +1457,6 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
changed(); changed();
} }
//System.out.println(" yes " + info.info.name + " " + docID); //System.out.println(" yes " + info.info.name + " " + docID);
return docWriter.deleteQueue.getNextSequenceNumber(); return docWriter.deleteQueue.getNextSequenceNumber();
} }
} else { } else {
@ -5049,12 +5048,13 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
}; };
} }
/** Returns the last <a href="#sequence_number">sequence number</a>, or 0 /** Returns the highest <a href="#sequence_number">sequence number</a> across
* if no index-changing operations have completed yet. * all completed operations, or 0 if no operations have finished yet. Still
* in-flight operations (in other threads) are not counted until they finish.
* *
* @lucene.experimental */ * @lucene.experimental */
public long getLastSequenceNumber() { public long getMaxCompletedSequenceNumber() {
ensureOpen(); ensureOpen();
return docWriter.deleteQueue.getLastSequenceNumber(); return docWriter.getMaxCompletedSequenceNumber();
} }
} }

View File

@ -150,7 +150,6 @@ public class ControlledRealTimeReopenThread<T> extends Thread implements Closeab
* or false if maxMS wait time was exceeded * or false if maxMS wait time was exceeded
*/ */
public synchronized boolean waitForGeneration(long targetGen, int maxMS) throws InterruptedException { public synchronized boolean waitForGeneration(long targetGen, int maxMS) throws InterruptedException {
final long curGen = writer.getLastSequenceNumber();
if (targetGen > searchingGen) { if (targetGen > searchingGen) {
// Notify the reopen thread that the waitingGen has // Notify the reopen thread that the waitingGen has
// changed, so it may wake up and realize it should // changed, so it may wake up and realize it should
@ -232,7 +231,7 @@ public class ControlledRealTimeReopenThread<T> extends Thread implements Closeab
// Save the gen as of when we started the reopen; the // Save the gen as of when we started the reopen; the
// listener (HandleRefresh above) copies this to // listener (HandleRefresh above) copies this to
// searchingGen once the reopen completes: // searchingGen once the reopen completes:
refreshStartGen = writer.getLastSequenceNumber(); refreshStartGen = writer.getMaxCompletedSequenceNumber();
try { try {
manager.maybeRefreshBlocking(); manager.maybeRefreshBlocking();
} catch (IOException ioe) { } catch (IOException ioe) {

View File

@ -98,13 +98,13 @@ public class TestControlledRealTimeReopenThread extends ThreadedIndexingAndSearc
// Randomly verify the update "took": // Randomly verify the update "took":
if (random().nextInt(20) == 2) { if (random().nextInt(20) == 2) {
if (VERBOSE) { if (VERBOSE) {
System.out.println(Thread.currentThread().getName() + ": nrt: verify " + id); System.out.println(Thread.currentThread().getName() + ": nrt: verify updateDocuments " + id + " gen=" + gen);
} }
nrtDeletesThread.waitForGeneration(gen); nrtDeletesThread.waitForGeneration(gen);
assertTrue(gen <= nrtDeletesThread.getSearchingGen()); assertTrue(gen <= nrtDeletesThread.getSearchingGen());
final IndexSearcher s = nrtDeletes.acquire(); final IndexSearcher s = nrtDeletes.acquire();
if (VERBOSE) { if (VERBOSE) {
System.out.println(Thread.currentThread().getName() + ": nrt: got searcher=" + s); System.out.println(Thread.currentThread().getName() + ": nrt: got deletes searcher=" + s);
} }
try { try {
assertEquals(docs.size(), s.search(new TermQuery(id), 10).totalHits); assertEquals(docs.size(), s.search(new TermQuery(id), 10).totalHits);
@ -122,13 +122,13 @@ public class TestControlledRealTimeReopenThread extends ThreadedIndexingAndSearc
// Randomly verify the add "took": // Randomly verify the add "took":
if (random().nextInt(20) == 2) { if (random().nextInt(20) == 2) {
if (VERBOSE) { if (VERBOSE) {
System.out.println(Thread.currentThread().getName() + ": nrt: verify " + id); System.out.println(Thread.currentThread().getName() + ": nrt: verify addDocuments " + id + " gen=" + gen);
} }
nrtNoDeletesThread.waitForGeneration(gen); nrtNoDeletesThread.waitForGeneration(gen);
assertTrue(gen <= nrtNoDeletesThread.getSearchingGen()); assertTrue(gen <= nrtNoDeletesThread.getSearchingGen());
final IndexSearcher s = nrtNoDeletes.acquire(); final IndexSearcher s = nrtNoDeletes.acquire();
if (VERBOSE) { if (VERBOSE) {
System.out.println(Thread.currentThread().getName() + ": nrt: got searcher=" + s); System.out.println(Thread.currentThread().getName() + ": nrt: got noDeletes searcher=" + s);
} }
try { try {
assertEquals(docs.size(), s.search(new TermQuery(id), 10).totalHits); assertEquals(docs.size(), s.search(new TermQuery(id), 10).totalHits);
@ -146,13 +146,13 @@ public class TestControlledRealTimeReopenThread extends ThreadedIndexingAndSearc
// Randomly verify the add "took": // Randomly verify the add "took":
if (random().nextInt(20) == 2) { if (random().nextInt(20) == 2) {
if (VERBOSE) { if (VERBOSE) {
System.out.println(Thread.currentThread().getName() + ": nrt: verify " + id); System.out.println(Thread.currentThread().getName() + ": nrt: verify addDocument " + id + " gen=" + gen);
} }
nrtNoDeletesThread.waitForGeneration(gen); nrtNoDeletesThread.waitForGeneration(gen);
assertTrue(gen <= nrtNoDeletesThread.getSearchingGen()); assertTrue(gen <= nrtNoDeletesThread.getSearchingGen());
final IndexSearcher s = nrtNoDeletes.acquire(); final IndexSearcher s = nrtNoDeletes.acquire();
if (VERBOSE) { if (VERBOSE) {
System.out.println(Thread.currentThread().getName() + ": nrt: got searcher=" + s); System.out.println(Thread.currentThread().getName() + ": nrt: got noDeletes searcher=" + s);
} }
try { try {
assertEquals(1, s.search(new TermQuery(id), 10).totalHits); assertEquals(1, s.search(new TermQuery(id), 10).totalHits);
@ -169,13 +169,13 @@ public class TestControlledRealTimeReopenThread extends ThreadedIndexingAndSearc
// Randomly verify the udpate "took": // Randomly verify the udpate "took":
if (random().nextInt(20) == 2) { if (random().nextInt(20) == 2) {
if (VERBOSE) { if (VERBOSE) {
System.out.println(Thread.currentThread().getName() + ": nrt: verify " + id); System.out.println(Thread.currentThread().getName() + ": nrt: verify updateDocument " + id + " gen=" + gen);
} }
nrtDeletesThread.waitForGeneration(gen); nrtDeletesThread.waitForGeneration(gen);
assertTrue(gen <= nrtDeletesThread.getSearchingGen()); assertTrue(gen <= nrtDeletesThread.getSearchingGen());
final IndexSearcher s = nrtDeletes.acquire(); final IndexSearcher s = nrtDeletes.acquire();
if (VERBOSE) { if (VERBOSE) {
System.out.println(Thread.currentThread().getName() + ": nrt: got searcher=" + s); System.out.println(Thread.currentThread().getName() + ": nrt: got deletes searcher=" + s);
} }
try { try {
assertEquals(1, s.search(new TermQuery(id), 10).totalHits); assertEquals(1, s.search(new TermQuery(id), 10).totalHits);
@ -192,13 +192,13 @@ public class TestControlledRealTimeReopenThread extends ThreadedIndexingAndSearc
// randomly verify the delete "took": // randomly verify the delete "took":
if (random().nextInt(20) == 7) { if (random().nextInt(20) == 7) {
if (VERBOSE) { if (VERBOSE) {
System.out.println(Thread.currentThread().getName() + ": nrt: verify del " + id); System.out.println(Thread.currentThread().getName() + ": nrt: verify deleteDocuments " + id + " gen=" + gen);
} }
nrtDeletesThread.waitForGeneration(gen); nrtDeletesThread.waitForGeneration(gen);
assertTrue(gen <= nrtDeletesThread.getSearchingGen()); assertTrue(gen <= nrtDeletesThread.getSearchingGen());
final IndexSearcher s = nrtDeletes.acquire(); final IndexSearcher s = nrtDeletes.acquire();
if (VERBOSE) { if (VERBOSE) {
System.out.println(Thread.currentThread().getName() + ": nrt: got searcher=" + s); System.out.println(Thread.currentThread().getName() + ": nrt: got deletes searcher=" + s);
} }
try { try {
assertEquals(0, s.search(new TermQuery(id), 10).totalHits); assertEquals(0, s.search(new TermQuery(id), 10).totalHits);