From 5361d67996a35c34f467e25f339e7c6a655db0d5 Mon Sep 17 00:00:00 2001 From: Mike McCandless Date: Fri, 27 May 2016 06:11:07 -0400 Subject: [PATCH] sequence numbers: always increment seq no (even for addDocument/s); add tests; add javadocs; make DWDQ's seqNo private --- .../apache/lucene/index/DocumentsWriter.java | 2 +- .../index/DocumentsWriterDeleteQueue.java | 22 ++- .../index/DocumentsWriterFlushControl.java | 4 +- .../index/DocumentsWriterPerThread.java | 4 +- .../org/apache/lucene/index/IndexWriter.java | 90 ++++++++-- .../index/TestIndexingSequenceNumbers.java | 155 +++++++++++++++++- 6 files changed, 249 insertions(+), 28 deletions(-) diff --git a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java index 9f1bdd342b6..5630fbb5f86 100644 --- a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java +++ b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java @@ -259,7 +259,7 @@ final class DocumentsWriter implements Closeable, Accountable { deleteQueue.clear(); // jump over any possible in flight ops: - deleteQueue.seqNo.addAndGet(perThreadPool.getActiveThreadStateCount()+1); + deleteQueue.skipSequenceNumbers(perThreadPool.getActiveThreadStateCount()+1); flushControl.abortPendingFlushes(); flushControl.waitForFlush(); diff --git a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterDeleteQueue.java b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterDeleteQueue.java index 80d2c85873d..4a11599e461 100644 --- a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterDeleteQueue.java +++ b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterDeleteQueue.java @@ -84,7 +84,7 @@ final class DocumentsWriterDeleteQueue implements Accountable { final long generation; /** Generates the sequence number that IW returns to callers changing the index, showing the effective serialization of all operations. */ - final AtomicLong seqNo; + private final AtomicLong nextSeqNo; DocumentsWriterDeleteQueue() { // seqNo must start at 1 because some APIs negate this to also return a boolean @@ -98,7 +98,7 @@ final class DocumentsWriterDeleteQueue implements Accountable { DocumentsWriterDeleteQueue(BufferedUpdates globalBufferedUpdates, long generation, long startSeqNo) { this.globalBufferedUpdates = globalBufferedUpdates; this.generation = generation; - this.seqNo = new AtomicLong(startSeqNo); + this.nextSeqNo = new AtomicLong(startSeqNo); /* * we use a sentinel instance as our initial tail. No slice will ever try to * apply this tail since the head is always omitted. @@ -168,10 +168,10 @@ final class DocumentsWriterDeleteQueue implements Accountable { /* * now that we are done we need to advance the tail */ - long mySeqNo = seqNo.getAndIncrement(); + long seqNo = getNextSequenceNumber(); boolean result = tailUpdater.compareAndSet(this, currentTail, newNode); assert result; - return mySeqNo; + return seqNo; } } } @@ -460,6 +460,16 @@ final class DocumentsWriterDeleteQueue implements Accountable { public String toString() { return "DWDQ: [ generation: " + generation + " ]"; } - - + + public long getNextSequenceNumber() { + return nextSeqNo.getAndIncrement(); + } + + public long getLastSequenceNumber() { + return nextSeqNo.get()-1; + } + + public void skipSequenceNumbers(long jump) { + nextSeqNo.addAndGet(jump); + } } diff --git a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java index ffcb7dcdf09..99bf8d8aa13 100644 --- a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java +++ b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java @@ -481,9 +481,9 @@ final class DocumentsWriterFlushControl implements Accountable { // we do another full flush //System.out.println("DWFC: fullFLush old seqNo=" + documentsWriter.deleteQueue.seqNo.get() + " activeThreadCount=" + perThreadPool.getActiveThreadStateCount()); - // Insert a gap in seqNo of current active thread count, in the worst case those threads now have one operation in flight. It's fine + // Insert a gap in seqNo of current active thread count, in the worst case each of those threads now have one operation in flight. It's fine // if we have some sequence numbers that were never assigned: - seqNo = documentsWriter.deleteQueue.seqNo.get() + perThreadPool.getActiveThreadStateCount(); + seqNo = documentsWriter.deleteQueue.getLastSequenceNumber() + perThreadPool.getActiveThreadStateCount() + 2; DocumentsWriterDeleteQueue newQueue = new DocumentsWriterDeleteQueue(flushingQueue.generation+1, seqNo+1); diff --git a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java index 5b1afa06e59..cf5694d4166 100644 --- a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java +++ b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java @@ -292,7 +292,7 @@ class DocumentsWriterPerThread { deleteSlice.apply(pendingUpdates, numDocsInRAM-docCount); return seqNo; } else { - seqNo = deleteQueue.seqNo.get(); + seqNo = deleteQueue.getNextSequenceNumber(); } return seqNo; @@ -328,7 +328,7 @@ class DocumentsWriterPerThread { assert deleteSlice.isTailItem(delTerm) : "expected the delete term as the tail item"; } else { applySlice &= deleteQueue.updateSlice(deleteSlice); - seqNo = deleteQueue.seqNo.get(); + seqNo = deleteQueue.getNextSequenceNumber(); } if (applySlice) { diff --git a/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java b/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java index 945399c1371..2bdfea72c3a 100644 --- a/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java +++ b/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java @@ -95,6 +95,14 @@ import org.apache.lucene.util.Version; and then adds the entire document). When finished adding, deleting and updating documents, {@link #close() close} should be called.

+ +

Each method that changes the index returns a {@code long} sequence number, which + expresses the effective order in which each change was applied. + {@link #commit} also returns a sequence number, describing which + changes are in the commit point and which are not. Sequence numbers + are transient (not saved into the index in any way) and only valid + within a single {@code IndexWriter} instance.

+

These changes are buffered in memory and periodically flushed to the {@link Directory} (during the above method @@ -1288,6 +1296,9 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable { * replaced with the Unicode replacement character * U+FFFD.

* + * @return The sequence number + * for this operation + * * @throws CorruptIndexException if the index is corrupt * @throws IOException if there is a low-level IO error */ @@ -1327,6 +1338,9 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable { * and will likely break them up. Use such tools at your * own risk! * + * @return The sequence number + * for this operation + * * @throws CorruptIndexException if the index is corrupt * @throws IOException if there is a low-level IO error * @@ -1344,6 +1358,9 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable { * * See {@link #addDocuments(Iterable)}. * + * @return The sequence number + * for this operation + * * @throws CorruptIndexException if the index is corrupt * @throws IOException if there is a low-level IO error * @@ -1441,7 +1458,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable { } //System.out.println(" yes " + info.info.name + " " + docID); - return docWriter.deleteQueue.seqNo.getAndIncrement(); + return docWriter.deleteQueue.getNextSequenceNumber(); } } else { //System.out.println(" no rld " + info.info.name + " " + docID); @@ -1458,6 +1475,9 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable { * terms. All given deletes are applied and flushed atomically * at the same time. * + * @return The sequence number + * for this operation + * * @param terms array of terms to identify the documents * to be deleted * @throws CorruptIndexException if the index is corrupt @@ -1484,6 +1504,9 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable { * Deletes the document(s) matching any of the provided queries. * All given deletes are applied and flushed atomically at the same time. * + * @return The sequence number + * for this operation + * * @param queries array of queries to identify the documents * to be deleted * @throws CorruptIndexException if the index is corrupt @@ -1522,6 +1545,9 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable { * by a reader on the same index (flush may happen only after * the add). * + * @return The sequence number + * for this operation + * * @param term the term to identify the document(s) to be * deleted * @param doc the document to be added @@ -1566,6 +1592,10 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable { * field name of the {@link NumericDocValues} field * @param value * new value for the field + * + * @return The sequence number + * for this operation + * * @throws CorruptIndexException * if the index is corrupt * @throws IOException @@ -1606,6 +1636,10 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable { * field name of the {@link BinaryDocValues} field * @param value * new value for the field + * + * @return The sequence number + * for this operation + * * @throws CorruptIndexException * if the index is corrupt * @throws IOException @@ -1642,6 +1676,10 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable { * * @param updates * the updates to apply + * + * @return The sequence number + * for this operation + * * @throws CorruptIndexException * if the index is corrupt * @throws IOException @@ -2256,6 +2294,9 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable { * threads are running {@link #forceMerge}, {@link #addIndexes(CodecReader[])} * or {@link #forceMergeDeletes} methods, they may receive * {@link MergePolicy.MergeAbortedException}s. + * + * @return The sequence number + * for this operation */ public long deleteAll() throws IOException { ensureOpen(); @@ -2304,7 +2345,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable { globalFieldNumberMap.clear(); success = true; - return docWriter.deleteQueue.seqNo.get(); + return docWriter.deleteQueue.getNextSequenceNumber(); } finally { docWriter.unlockAllAfterAbortAll(this); @@ -2542,6 +2583,9 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable { * *

This requires this index not be among those to be added. * + * @return The sequence number + * for this operation + * * @throws CorruptIndexException if the index is corrupt * @throws IOException if there is a low-level IO error * @throws IllegalArgumentException if addIndexes would cause @@ -2559,6 +2603,8 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable { boolean successTop = false; + long seqNo; + try { if (infoStream.isEnabled("IW")) { infoStream.message("IW", "flush at addIndexes(Directory...)"); @@ -2630,6 +2676,8 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable { // Now reserve the docs, just before we update SIS: reserveDocs(totalMaxDoc); + seqNo = docWriter.deleteQueue.getNextSequenceNumber(); + success = true; } finally { if (!success) { @@ -2647,6 +2695,8 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable { } catch (VirtualMachineError tragedy) { tragicEvent(tragedy, "addIndexes(Directory...)"); + // dead code but javac disagrees: + seqNo = -1; } finally { if (successTop) { IOUtils.close(locks); @@ -2656,8 +2706,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable { } maybeMerge(); - // no need to increment: - return docWriter.deleteQueue.seqNo.get(); + return seqNo; } /** @@ -2682,6 +2731,9 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable { * {@code maxMergeAtOnce} parameter, you should pass that many readers in one * call. * + * @return The sequence number + * for this operation + * * @throws CorruptIndexException * if the index is corrupt * @throws IOException @@ -2697,6 +2749,8 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable { Sort indexSort = config.getIndexSort(); + long seqNo; + try { if (infoStream.isEnabled("IW")) { infoStream.message("IW", "flush at addIndexes(CodecReader...)"); @@ -2731,8 +2785,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable { rateLimiters.set(new MergeRateLimiter(null)); if (!merger.shouldMerge()) { - // no need to increment: - return docWriter.deleteQueue.seqNo.get(); + return docWriter.deleteQueue.getNextSequenceNumber(); } merger.merge(); // merge 'em @@ -2751,8 +2804,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable { // Safe: these files must exist deleteNewFiles(infoPerCommit.files()); - // no need to increment: - return docWriter.deleteQueue.seqNo.get(); + return docWriter.deleteQueue.getNextSequenceNumber(); } ensureOpen(); useCompoundFile = mergePolicy.useCompoundFile(segmentInfos, infoPerCommit, this); @@ -2788,8 +2840,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable { // Safe: these files must exist deleteNewFiles(infoPerCommit.files()); - // no need to increment: - return docWriter.deleteQueue.seqNo.get(); + return docWriter.deleteQueue.getNextSequenceNumber(); } ensureOpen(); @@ -2797,15 +2848,17 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable { reserveDocs(numDocs); segmentInfos.add(infoPerCommit); + seqNo = docWriter.deleteQueue.getNextSequenceNumber(); checkpoint(); } } catch (VirtualMachineError tragedy) { tragicEvent(tragedy, "addIndexes(CodecReader...)"); + // dead code but javac disagrees: + seqNo = -1; } maybeMerge(); - // no need to increment: - return docWriter.deleteQueue.seqNo.get(); + return seqNo; } /** Copies the segment files as-is into the IndexWriter's directory. */ @@ -2873,6 +2926,10 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable { *

You can also just call {@link #commit()} directly * without prepareCommit first in which case that method * will internally call prepareCommit. + * + * @return The sequence number + * last operation in the commit. All sequence numbers <= this value + * will be reflected in the commit, and all others will not. */ @Override public final long prepareCommit() throws IOException { @@ -3069,6 +3126,10 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable { * point, and all other operations will not.

* * @see #prepareCommit + * + * @return The sequence number + * last operation in the commit. All sequence numbers <= this value + * will be reflected in the commit, and all others will not. */ @Override public final long commit() throws IOException { @@ -4988,11 +5049,12 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable { }; } - /** Returns the last sequence number. + /** Returns the last sequence number, or 0 + * if no index-changing operations have completed yet. * * @lucene.experimental */ public long getLastSequenceNumber() { ensureOpen(); - return docWriter.deleteQueue.seqNo.get()-1; + return docWriter.deleteQueue.getLastSequenceNumber(); } } diff --git a/lucene/core/src/test/org/apache/lucene/index/TestIndexingSequenceNumbers.java b/lucene/core/src/test/org/apache/lucene/index/TestIndexingSequenceNumbers.java index fb9b9abb7a0..002292cb445 100644 --- a/lucene/core/src/test/org/apache/lucene/index/TestIndexingSequenceNumbers.java +++ b/lucene/core/src/test/org/apache/lucene/index/TestIndexingSequenceNumbers.java @@ -43,7 +43,7 @@ public class TestIndexingSequenceNumbers extends LuceneTestCase { IndexWriter w = new IndexWriter(dir, newIndexWriterConfig()); long a = w.addDocument(new Document()); long b = w.addDocument(new Document()); - assertTrue(b >= a); + assertTrue(b > a); w.close(); dir.close(); } @@ -129,7 +129,7 @@ public class TestIndexingSequenceNumbers extends LuceneTestCase { } static class Operation { - // 0 = update, 1 = delete, 2 = commit + // 0 = update, 1 = delete, 2 = commit, 3 = add byte what; int id; int threadID; @@ -248,7 +248,7 @@ public class TestIndexingSequenceNumbers extends LuceneTestCase { } } - assertTrue(op.seqNo >= lastSeqNo); + assertTrue(op.seqNo > lastSeqNo); lastSeqNo = op.seqNo; } } @@ -293,4 +293,153 @@ public class TestIndexingSequenceNumbers extends LuceneTestCase { dir.close(); } + + public void testStressConcurrentAddAndDeleteAndCommit() throws Exception { + final int opCount = atLeast(10000); + final int idCount = TestUtil.nextInt(random(), 10, 1000); + + Directory dir = newDirectory(); + IndexWriterConfig iwc = newIndexWriterConfig(); + iwc.setIndexDeletionPolicy(NoDeletionPolicy.INSTANCE); + + // Cannot use RIW since it randomly commits: + final IndexWriter w = new IndexWriter(dir, iwc); + + final int numThreads = TestUtil.nextInt(random(), 2, 5); + Thread[] threads = new Thread[numThreads]; + //System.out.println("TEST: iter=" + iter + " opCount=" + opCount + " idCount=" + idCount + " threadCount=" + threads.length); + final CountDownLatch startingGun = new CountDownLatch(1); + List> threadOps = new ArrayList<>(); + + Object commitLock = new Object(); + final List commits = new ArrayList<>(); + + // multiple threads update the same set of documents, and we randomly commit + for(int i=0;i ops = new ArrayList<>(); + threadOps.add(ops); + final int threadID = i; + threads[i] = new Thread() { + @Override + public void run() { + try { + startingGun.await(); + for(int i=0;i docs = new ArrayList<>(); + docs.add(doc); + op.seqNo = w.addDocuments(docs); + } else { + op.seqNo = w.addDocument(doc); + } + op.what = 3; + } + ops.add(op); + } + } + } catch (Exception e) { + throw new RuntimeException(e); + } + } + }; + threads[i].start(); + } + startingGun.countDown(); + for(Thread thread : threads) { + thread.join(); + } + + Operation commitOp = new Operation(); + commitOp.seqNo = w.commit(); + if (commitOp.seqNo != -1) { + commits.add(commitOp); + } + + List indexCommits = DirectoryReader.listCommits(dir); + assertEquals(commits.size(), indexCommits.size()); + + // how many docs with this id are expected: + int[] expectedCounts = new int[idCount]; + long[] lastDelSeqNos = new long[idCount]; + + //System.out.println("TEST: " + commits.size() + " commits"); + for(int i=0;i