From e4a21330a201b41b6fddaa3583e5b2a89ae6d331 Mon Sep 17 00:00:00 2001 From: Mike McCandless Date: Tue, 24 May 2016 19:45:40 -0400 Subject: [PATCH] cutover all IW APIs that change the index to return seq no --- .../apache/lucene/index/DocumentsWriter.java | 37 ++++++---- .../index/DocumentsWriterFlushControl.java | 2 + .../index/DocumentsWriterPerThread.java | 13 ++-- .../org/apache/lucene/index/IndexWriter.java | 70 +++++++++++++------ .../lucene/index/TrackingIndexWriter.java | 18 +---- .../lucene/index/TestIndexWriterDelete.java | 4 +- .../index/TestIndexingSequenceNumbers.java | 30 +++++--- .../lucene/index/TestRollingUpdates.java | 2 +- .../lucene/index/TestTwoPhaseCommitTool.java | 16 +++-- .../TestControlledRealTimeReopenThread.java | 6 +- 10 files changed, 119 insertions(+), 79 deletions(-) diff --git a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java index bbbef718a34..6b698dba7fd 100644 --- a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java +++ b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java @@ -136,13 +136,15 @@ final class DocumentsWriter implements Closeable, Accountable { flushControl = new DocumentsWriterFlushControl(this, config, writer.bufferedUpdatesStream); } - synchronized boolean deleteQueries(final Query... queries) throws IOException { + synchronized long deleteQueries(final Query... queries) throws IOException { // TODO why is this synchronized? final DocumentsWriterDeleteQueue deleteQueue = this.deleteQueue; - deleteQueue.addDelete(queries); + long seqNo = deleteQueue.addDelete(queries); flushControl.doOnDelete(); - // nocommit long - return applyAllDeletes(deleteQueue); + if (applyAllDeletes(deleteQueue)) { + seqNo = -seqNo; + } + return seqNo; } // TODO: we could check w/ FreqProxTermsWriter: if the @@ -251,6 +253,10 @@ final class DocumentsWriter implements Closeable, Accountable { abortedDocCount += abortThreadState(perThread); } deleteQueue.clear(); + + // jump over any possible in flight ops: + deleteQueue.seqNo.addAndGet(perThreadPool.getActiveThreadStateCount()+1); + flushControl.abortPendingFlushes(); flushControl.waitForFlush(); success = true; @@ -397,13 +403,14 @@ final class DocumentsWriter implements Closeable, Accountable { } } - boolean updateDocuments(final Iterable> docs, final Analyzer analyzer, - final Term delTerm) throws IOException, AbortingException { + long updateDocuments(final Iterable> docs, final Analyzer analyzer, + final Term delTerm) throws IOException, AbortingException { boolean hasEvents = preUpdate(); final ThreadState perThread = flushControl.obtainAndLock(); final DocumentsWriterPerThread flushingDWPT; - + final long seqNo; + try { // This must happen after we've pulled the ThreadState because IW.close // waits for all ThreadStates to be released: @@ -413,7 +420,7 @@ final class DocumentsWriter implements Closeable, Accountable { final DocumentsWriterPerThread dwpt = perThread.dwpt; final int dwptNumDocs = dwpt.getNumDocsInRAM(); try { - dwpt.updateDocuments(docs, analyzer, delTerm); + seqNo = dwpt.updateDocuments(docs, analyzer, delTerm); } catch (AbortingException ae) { flushControl.doOnAbort(perThread); dwpt.abort(); @@ -430,7 +437,11 @@ final class DocumentsWriter implements Closeable, Accountable { perThreadPool.release(perThread); } - return postUpdate(flushingDWPT, hasEvents); + if (postUpdate(flushingDWPT, hasEvents)) { + return -seqNo; + } else { + return seqNo; + } } long updateDocument(final Iterable doc, final Analyzer analyzer, @@ -441,7 +452,7 @@ final class DocumentsWriter implements Closeable, Accountable { final ThreadState perThread = flushControl.obtainAndLock(); final DocumentsWriterPerThread flushingDWPT; - final long seqno; + final long seqNo; try { // This must happen after we've pulled the ThreadState because IW.close // waits for all ThreadStates to be released: @@ -451,7 +462,7 @@ final class DocumentsWriter implements Closeable, Accountable { final DocumentsWriterPerThread dwpt = perThread.dwpt; final int dwptNumDocs = dwpt.getNumDocsInRAM(); try { - seqno = dwpt.updateDocument(doc, analyzer, delTerm); + seqNo = dwpt.updateDocument(doc, analyzer, delTerm); } catch (AbortingException ae) { flushControl.doOnAbort(perThread); dwpt.abort(); @@ -469,9 +480,9 @@ final class DocumentsWriter implements Closeable, Accountable { } if (postUpdate(flushingDWPT, hasEvents)) { - return -seqno; + return -seqNo; } else { - return seqno; + return seqNo; } } diff --git a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java index ec390d9f935..f388f46853a 100644 --- a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java +++ b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java @@ -480,6 +480,8 @@ final class DocumentsWriterFlushControl implements Accountable { // Set a new delete queue - all subsequent DWPT will use this queue until // we do another full flush //System.out.println("DWFC: fullFLush old seqNo=" + documentsWriter.deleteQueue.seqNo.get() + " activeThreadCount=" + perThreadPool.getActiveThreadStateCount()); + + // jump over any possible in flight ops: seqNo = documentsWriter.deleteQueue.seqNo.get() + perThreadPool.getActiveThreadStateCount(); // nocommit is this (active thread state count) always enough of a gap? what if new indexing thread sneaks in just now? it would diff --git a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java index ab00662b0c4..5b1afa06e59 100644 --- a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java +++ b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java @@ -244,7 +244,7 @@ class DocumentsWriterPerThread { return finishDocument(delTerm); } - public int updateDocuments(Iterable> docs, Analyzer analyzer, Term delTerm) throws IOException, AbortingException { + public long updateDocuments(Iterable> docs, Analyzer analyzer, Term delTerm) throws IOException, AbortingException { testPoint("DocumentsWriterPerThread addDocuments start"); assert deleteQueue != null; docState.analyzer = analyzer; @@ -285,13 +285,17 @@ class DocumentsWriterPerThread { // Apply delTerm only after all indexing has // succeeded, but apply it only to docs prior to when // this batch started: + long seqNo; if (delTerm != null) { - deleteQueue.add(delTerm, deleteSlice); + seqNo = deleteQueue.add(delTerm, deleteSlice); assert deleteSlice.isTailItem(delTerm) : "expected the delete term as the tail item"; deleteSlice.apply(pendingUpdates, numDocsInRAM-docCount); + return seqNo; + } else { + seqNo = deleteQueue.seqNo.get(); } - // nocommit return seqNo here + return seqNo; } finally { if (!allDocsIndexed && !aborted) { @@ -306,8 +310,6 @@ class DocumentsWriterPerThread { } docState.clear(); } - - return docCount; } private long finishDocument(Term delTerm) { @@ -326,7 +328,6 @@ class DocumentsWriterPerThread { assert deleteSlice.isTailItem(delTerm) : "expected the delete term as the tail item"; } else { applySlice &= deleteQueue.updateSlice(deleteSlice); - // nocommit we don't need to increment here? seqNo = deleteQueue.seqNo.get(); } diff --git a/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java b/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java index 16821357bb6..860936510e9 100644 --- a/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java +++ b/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java @@ -1332,8 +1332,8 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable { * * @lucene.experimental */ - public void addDocuments(Iterable> docs) throws IOException { - updateDocuments(null, docs); + public long addDocuments(Iterable> docs) throws IOException { + return updateDocuments(null, docs); } /** @@ -1349,15 +1349,18 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable { * * @lucene.experimental */ - public void updateDocuments(Term delTerm, Iterable> docs) throws IOException { + public long updateDocuments(Term delTerm, Iterable> docs) throws IOException { ensureOpen(); try { boolean success = false; try { - if (docWriter.updateDocuments(docs, analyzer, delTerm)) { + long seqNo = docWriter.updateDocuments(docs, analyzer, delTerm); + if (seqNo < 0) { + seqNo = -seqNo; processEvents(true, false); } success = true; + return seqNo; } finally { if (!success) { if (infoStream.isEnabled("IW")) { @@ -1367,6 +1370,9 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable { } } catch (AbortingException | VirtualMachineError tragedy) { tragicEvent(tragedy, "updateDocuments"); + + // dead code but javac disagrees + return -1; } } @@ -1375,15 +1381,15 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable { * DirectoryReader#open(IndexWriter)}). If the * provided reader is an NRT reader obtained from this * writer, and its segment has not been merged away, then - * the delete succeeds and this method returns true; else, it - * returns false the caller must then separately delete by - * Term or Query. + * the delete succeeds and this method returns a valid (> 0) sequence + * number; else, it returns -1 and the caller must then + * separately delete by Term or Query. * * NOTE: this method can only delete documents * visible to the currently open NRT reader. If you need * to delete documents indexed after opening the NRT * reader you must use {@link #deleteDocuments(Term...)}). */ - public synchronized boolean tryDeleteDocument(IndexReader readerIn, int docID) throws IOException { + public synchronized long tryDeleteDocument(IndexReader readerIn, int docID) throws IOException { final LeafReader reader; if (readerIn instanceof LeafReader) { @@ -1434,7 +1440,8 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable { changed(); } //System.out.println(" yes " + info.info.name + " " + docID); - return true; + + return docWriter.deleteQueue.seqNo.getAndIncrement(); } } else { //System.out.println(" no rld " + info.info.name + " " + docID); @@ -1442,7 +1449,8 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable { } else { //System.out.println(" no seg " + info.info.name + " " + docID); } - return false; + + return -1; } /** @@ -1481,23 +1489,29 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable { * @throws CorruptIndexException if the index is corrupt * @throws IOException if there is a low-level IO error */ - public void deleteDocuments(Query... queries) throws IOException { + public long deleteDocuments(Query... queries) throws IOException { ensureOpen(); // LUCENE-6379: Specialize MatchAllDocsQuery for(Query query : queries) { if (query.getClass() == MatchAllDocsQuery.class) { - deleteAll(); - return; + return deleteAll(); } } try { - if (docWriter.deleteQueries(queries)) { + long seqNo = docWriter.deleteQueries(queries); + if (seqNo < 0) { + seqNo = -seqNo; processEvents(true, false); } + + return seqNo; } catch (VirtualMachineError tragedy) { tragicEvent(tragedy, "deleteDocuments(Query..)"); + + // dead code but javac disagrees: + return -1; } } @@ -2225,7 +2239,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable { * or {@link #forceMergeDeletes} methods, they may receive * {@link MergePolicy.MergeAbortedException}s. */ - public void deleteAll() throws IOException { + public long deleteAll() throws IOException { ensureOpen(); // Remove any buffered docs boolean success = false; @@ -2272,6 +2286,8 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable { globalFieldNumberMap.clear(); success = true; + return docWriter.deleteQueue.seqNo.get(); + } finally { docWriter.unlockAllAfterAbortAll(this); if (!success) { @@ -2284,6 +2300,9 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable { } } catch (VirtualMachineError tragedy) { tragicEvent(tragedy, "deleteAll"); + + // dead code but javac disagrees + return -1; } } @@ -2511,7 +2530,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable { * the index to exceed {@link #MAX_DOCS}, or if the indoming * index sort does not match this index's index sort */ - public void addIndexes(Directory... dirs) throws IOException { + public long addIndexes(Directory... dirs) throws IOException { ensureOpen(); noDupDirs(dirs); @@ -2618,6 +2637,9 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable { } } maybeMerge(); + + // no need to increment: + return docWriter.deleteQueue.seqNo.get(); } /** @@ -2649,7 +2671,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable { * @throws IllegalArgumentException * if addIndexes would cause the index to exceed {@link #MAX_DOCS} */ - public void addIndexes(CodecReader... readers) throws IOException { + public long addIndexes(CodecReader... readers) throws IOException { ensureOpen(); // long so we can detect int overflow: @@ -2691,7 +2713,8 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable { rateLimiters.set(new MergeRateLimiter(null)); if (!merger.shouldMerge()) { - return; + // no need to increment: + return docWriter.deleteQueue.seqNo.get(); } merger.merge(); // merge 'em @@ -2709,7 +2732,9 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable { if (stopMerges) { // Safe: these files must exist deleteNewFiles(infoPerCommit.files()); - return; + + // no need to increment: + return docWriter.deleteQueue.seqNo.get(); } ensureOpen(); useCompoundFile = mergePolicy.useCompoundFile(segmentInfos, infoPerCommit, this); @@ -2744,7 +2769,9 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable { if (stopMerges) { // Safe: these files must exist deleteNewFiles(infoPerCommit.files()); - return; + + // no need to increment: + return docWriter.deleteQueue.seqNo.get(); } ensureOpen(); @@ -2758,6 +2785,9 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable { tragicEvent(tragedy, "addIndexes(CodecReader...)"); } maybeMerge(); + + // no need to increment: + return docWriter.deleteQueue.seqNo.get(); } /** Copies the segment files as-is into the IndexWriter's directory. */ diff --git a/lucene/core/src/java/org/apache/lucene/index/TrackingIndexWriter.java b/lucene/core/src/java/org/apache/lucene/index/TrackingIndexWriter.java index b50f53c0eb9..33c193b91c8 100644 --- a/lucene/core/src/java/org/apache/lucene/index/TrackingIndexWriter.java +++ b/lucene/core/src/java/org/apache/lucene/index/TrackingIndexWriter.java @@ -66,14 +66,6 @@ public class TrackingIndexWriter { return indexingGen.get(); } - /** Calls {@link IndexWriter#deleteDocuments(Term...)} and - * returns the generation that reflects this change. */ - public long deleteDocuments(Term t) throws IOException { - writer.deleteDocuments(t); - // Return gen as of when indexing finished: - return indexingGen.get(); - } - /** Calls {@link IndexWriter#deleteDocuments(Term...)} and * returns the generation that reflects this change. */ public long deleteDocuments(Term... terms) throws IOException { @@ -82,14 +74,6 @@ public class TrackingIndexWriter { return indexingGen.get(); } - /** Calls {@link IndexWriter#deleteDocuments(Query...)} and - * returns the generation that reflects this change. */ - public long deleteDocuments(Query q) throws IOException { - writer.deleteDocuments(q); - // Return gen as of when indexing finished: - return indexingGen.get(); - } - /** Calls {@link IndexWriter#deleteDocuments(Query...)} * and returns the generation that reflects this change. */ public long deleteDocuments(Query... queries) throws IOException { @@ -159,7 +143,7 @@ public class TrackingIndexWriter { * IndexWriter#tryDeleteDocument(IndexReader,int)} and * returns the generation that reflects this change. */ public long tryDeleteDocument(IndexReader reader, int docID) throws IOException { - if (writer.tryDeleteDocument(reader, docID)) { + if (writer.tryDeleteDocument(reader, docID) != -1) { return indexingGen.get(); } else { return -1; diff --git a/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterDelete.java b/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterDelete.java index 14dac59ed3f..ccfe5c65130 100644 --- a/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterDelete.java +++ b/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterDelete.java @@ -1238,8 +1238,8 @@ public class TestIndexWriterDelete extends LuceneTestCase { iwc.setOpenMode(IndexWriterConfig.OpenMode.APPEND); w = new IndexWriter(d, iwc); IndexReader r = DirectoryReader.open(w, false, false); - assertTrue(w.tryDeleteDocument(r, 1)); - assertTrue(w.tryDeleteDocument(r.leaves().get(0).reader(), 0)); + assertTrue(w.tryDeleteDocument(r, 1) != -1); + assertTrue(w.tryDeleteDocument(r.leaves().get(0).reader(), 0) != -1); r.close(); w.close(); diff --git a/lucene/core/src/test/org/apache/lucene/index/TestIndexingSequenceNumbers.java b/lucene/core/src/test/org/apache/lucene/index/TestIndexingSequenceNumbers.java index 198ff527162..f2e96362675 100644 --- a/lucene/core/src/test/org/apache/lucene/index/TestIndexingSequenceNumbers.java +++ b/lucene/core/src/test/org/apache/lucene/index/TestIndexingSequenceNumbers.java @@ -43,7 +43,7 @@ public class TestIndexingSequenceNumbers extends LuceneTestCase { IndexWriter w = new IndexWriter(dir, newIndexWriterConfig()); long a = w.addDocument(new Document()); long b = w.addDocument(new Document()); - assertTrue(b > a); + assertTrue(b >= a); w.close(); dir.close(); } @@ -154,7 +154,6 @@ public class TestIndexingSequenceNumbers extends LuceneTestCase { Object commitLock = new Object(); final List commits = new ArrayList<>(); - final AtomicInteger opsSinceCommit = new AtomicInteger(); // multiple threads update the same set of documents, and we randomly commit for(int i=0;i numThreads) { + if (w.hasUncommittedChanges()) { op.seqNo = w.commit(); commits.add(op); - opsSinceCommit.set(0); } //System.out.println("done commit seqNo=" + op.seqNo); } @@ -186,16 +182,25 @@ public class TestIndexingSequenceNumbers extends LuceneTestCase { Term idTerm = new Term("id", "" + op.id); if (random().nextInt(10) == 1) { op.what = 1; - op.seqNo = w.deleteDocuments(idTerm); + if (random().nextBoolean()) { + op.seqNo = w.deleteDocuments(idTerm); + } else { + op.seqNo = w.deleteDocuments(new TermQuery(idTerm)); + } } else { Document doc = new Document(); doc.add(new StoredField("thread", threadID)); doc.add(new StringField("id", "" + op.id, Field.Store.NO)); - op.seqNo = w.updateDocument(idTerm, doc); + if (random().nextBoolean()) { + List docs = new ArrayList<>(); + docs.add(doc); + op.seqNo = w.updateDocuments(idTerm, docs); + } else { + op.seqNo = w.updateDocument(idTerm, doc); + } op.what = 2; } ops.add(op); - opsSinceCommit.getAndIncrement(); } } } catch (Exception e) { @@ -210,11 +215,14 @@ public class TestIndexingSequenceNumbers extends LuceneTestCase { thread.join(); } - Operation commitOp = new Operation(); - synchronized(commitLock) { + /* + // nocommit: why does this make the assertEquals angry...? + if (w.hasUncommittedChanges()) { + Operation commitOp = new Operation(); commitOp.seqNo = w.commit(); commits.add(commitOp); } + */ List indexCommits = DirectoryReader.listCommits(dir); assertEquals(commits.size(), indexCommits.size()); diff --git a/lucene/core/src/test/org/apache/lucene/index/TestRollingUpdates.java b/lucene/core/src/test/org/apache/lucene/index/TestRollingUpdates.java index 23be40b0ad9..5a2c82fec4a 100644 --- a/lucene/core/src/test/org/apache/lucene/index/TestRollingUpdates.java +++ b/lucene/core/src/test/org/apache/lucene/index/TestRollingUpdates.java @@ -80,7 +80,7 @@ public class TestRollingUpdates extends LuceneTestCase { if (s != null && updateCount < SIZE) { TopDocs hits = s.search(new TermQuery(idTerm), 1); assertEquals(1, hits.totalHits); - doUpdate = !w.tryDeleteDocument(r, hits.scoreDocs[0].doc); + doUpdate = w.tryDeleteDocument(r, hits.scoreDocs[0].doc) == -1; if (VERBOSE) { if (doUpdate) { System.out.println(" tryDeleteDocument failed"); diff --git a/lucene/core/src/test/org/apache/lucene/index/TestTwoPhaseCommitTool.java b/lucene/core/src/test/org/apache/lucene/index/TestTwoPhaseCommitTool.java index be6e4f81b92..def90f2bde4 100644 --- a/lucene/core/src/test/org/apache/lucene/index/TestTwoPhaseCommitTool.java +++ b/lucene/core/src/test/org/apache/lucene/index/TestTwoPhaseCommitTool.java @@ -40,29 +40,33 @@ public class TestTwoPhaseCommitTool extends LuceneTestCase { } @Override - public void prepareCommit() throws IOException { - prepareCommit(null); + public long prepareCommit() throws IOException { + return prepareCommit(null); } - public void prepareCommit(Map commitData) throws IOException { + public long prepareCommit(Map commitData) throws IOException { this.prepareCommitData = commitData; assertFalse("commit should not have been called before all prepareCommit were", commitCalled); if (failOnPrepare) { throw new IOException("failOnPrepare"); } + // nocommit hmm + return -1; } @Override - public void commit() throws IOException { - commit(null); + public long commit() throws IOException { + return commit(null); } - public void commit(Map commitData) throws IOException { + public long commit(Map commitData) throws IOException { this.commitData = commitData; commitCalled = true; if (failOnCommit) { throw new RuntimeException("failOnCommit"); } + // nocommit hmm + return -1; } @Override diff --git a/lucene/core/src/test/org/apache/lucene/search/TestControlledRealTimeReopenThread.java b/lucene/core/src/test/org/apache/lucene/search/TestControlledRealTimeReopenThread.java index 8217eb1c132..2ef954cef86 100644 --- a/lucene/core/src/test/org/apache/lucene/search/TestControlledRealTimeReopenThread.java +++ b/lucene/core/src/test/org/apache/lucene/search/TestControlledRealTimeReopenThread.java @@ -389,14 +389,13 @@ public class TestControlledRealTimeReopenThread extends ThreadedIndexingAndSearc super(d, conf); this.latch = latch; this.signal = signal; - } @Override - public void updateDocument(Term term, + public long updateDocument(Term term, Iterable doc) throws IOException { - super.updateDocument(term, doc); + long result = super.updateDocument(term, doc); try { if (waitAfterUpdate) { signal.countDown(); @@ -405,6 +404,7 @@ public class TestControlledRealTimeReopenThread extends ThreadedIndexingAndSearc } catch (InterruptedException e) { throw new ThreadInterruptedException(e); } + return result; } }