sequence numbers: always increment seq no (even for addDocument/s); add tests; add javadocs; make DWDQ's seqNo private

This commit is contained in:
Mike McCandless 2016-05-27 06:11:07 -04:00
parent 7a03c64969
commit 5361d67996
6 changed files with 249 additions and 28 deletions

View File

@ -259,7 +259,7 @@ final class DocumentsWriter implements Closeable, Accountable {
deleteQueue.clear();
// jump over any possible in flight ops:
deleteQueue.seqNo.addAndGet(perThreadPool.getActiveThreadStateCount()+1);
deleteQueue.skipSequenceNumbers(perThreadPool.getActiveThreadStateCount()+1);
flushControl.abortPendingFlushes();
flushControl.waitForFlush();

View File

@ -84,7 +84,7 @@ final class DocumentsWriterDeleteQueue implements Accountable {
final long generation;
/** Generates the sequence number that IW returns to callers changing the index, showing the effective serialization of all operations. */
final AtomicLong seqNo;
private final AtomicLong nextSeqNo;
DocumentsWriterDeleteQueue() {
// seqNo must start at 1 because some APIs negate this to also return a boolean
@ -98,7 +98,7 @@ final class DocumentsWriterDeleteQueue implements Accountable {
DocumentsWriterDeleteQueue(BufferedUpdates globalBufferedUpdates, long generation, long startSeqNo) {
this.globalBufferedUpdates = globalBufferedUpdates;
this.generation = generation;
this.seqNo = new AtomicLong(startSeqNo);
this.nextSeqNo = new AtomicLong(startSeqNo);
/*
* we use a sentinel instance as our initial tail. No slice will ever try to
* apply this tail since the head is always omitted.
@ -168,10 +168,10 @@ final class DocumentsWriterDeleteQueue implements Accountable {
/*
* now that we are done we need to advance the tail
*/
long mySeqNo = seqNo.getAndIncrement();
long seqNo = getNextSequenceNumber();
boolean result = tailUpdater.compareAndSet(this, currentTail, newNode);
assert result;
return mySeqNo;
return seqNo;
}
}
}
@ -460,6 +460,16 @@ final class DocumentsWriterDeleteQueue implements Accountable {
public String toString() {
return "DWDQ: [ generation: " + generation + " ]";
}
public long getNextSequenceNumber() {
return nextSeqNo.getAndIncrement();
}
public long getLastSequenceNumber() {
return nextSeqNo.get()-1;
}
public void skipSequenceNumbers(long jump) {
nextSeqNo.addAndGet(jump);
}
}

View File

@ -481,9 +481,9 @@ final class DocumentsWriterFlushControl implements Accountable {
// we do another full flush
//System.out.println("DWFC: fullFLush old seqNo=" + documentsWriter.deleteQueue.seqNo.get() + " activeThreadCount=" + perThreadPool.getActiveThreadStateCount());
// Insert a gap in seqNo of current active thread count, in the worst case those threads now have one operation in flight. It's fine
// Insert a gap in seqNo of current active thread count, in the worst case each of those threads now have one operation in flight. It's fine
// if we have some sequence numbers that were never assigned:
seqNo = documentsWriter.deleteQueue.seqNo.get() + perThreadPool.getActiveThreadStateCount();
seqNo = documentsWriter.deleteQueue.getLastSequenceNumber() + perThreadPool.getActiveThreadStateCount() + 2;
DocumentsWriterDeleteQueue newQueue = new DocumentsWriterDeleteQueue(flushingQueue.generation+1, seqNo+1);

View File

@ -292,7 +292,7 @@ class DocumentsWriterPerThread {
deleteSlice.apply(pendingUpdates, numDocsInRAM-docCount);
return seqNo;
} else {
seqNo = deleteQueue.seqNo.get();
seqNo = deleteQueue.getNextSequenceNumber();
}
return seqNo;
@ -328,7 +328,7 @@ class DocumentsWriterPerThread {
assert deleteSlice.isTailItem(delTerm) : "expected the delete term as the tail item";
} else {
applySlice &= deleteQueue.updateSlice(deleteSlice);
seqNo = deleteQueue.seqNo.get();
seqNo = deleteQueue.getNextSequenceNumber();
}
if (applySlice) {

View File

@ -95,6 +95,14 @@ import org.apache.lucene.util.Version;
and then adds the entire document). When finished adding, deleting
and updating documents, {@link #close() close} should be called.</p>
<a name="sequence_numbers"></a>
<p>Each method that changes the index returns a {@code long} sequence number, which
expresses the effective order in which each change was applied.
{@link #commit} also returns a sequence number, describing which
changes are in the commit point and which are not. Sequence numbers
are transient (not saved into the index in any way) and only valid
within a single {@code IndexWriter} instance.</p>
<a name="flush"></a>
<p>These changes are buffered in memory and periodically
flushed to the {@link Directory} (during the above method
@ -1288,6 +1296,9 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
* replaced with the Unicode replacement character
* U+FFFD.</p>
*
* @return The <a href="#sequence_number">sequence number</a>
* for this operation
*
* @throws CorruptIndexException if the index is corrupt
* @throws IOException if there is a low-level IO error
*/
@ -1327,6 +1338,9 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
* and will likely break them up. Use such tools at your
* own risk!
*
* @return The <a href="#sequence_number">sequence number</a>
* for this operation
*
* @throws CorruptIndexException if the index is corrupt
* @throws IOException if there is a low-level IO error
*
@ -1344,6 +1358,9 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
*
* See {@link #addDocuments(Iterable)}.
*
* @return The <a href="#sequence_number">sequence number</a>
* for this operation
*
* @throws CorruptIndexException if the index is corrupt
* @throws IOException if there is a low-level IO error
*
@ -1441,7 +1458,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
}
//System.out.println(" yes " + info.info.name + " " + docID);
return docWriter.deleteQueue.seqNo.getAndIncrement();
return docWriter.deleteQueue.getNextSequenceNumber();
}
} else {
//System.out.println(" no rld " + info.info.name + " " + docID);
@ -1458,6 +1475,9 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
* terms. All given deletes are applied and flushed atomically
* at the same time.
*
* @return The <a href="#sequence_number">sequence number</a>
* for this operation
*
* @param terms array of terms to identify the documents
* to be deleted
* @throws CorruptIndexException if the index is corrupt
@ -1484,6 +1504,9 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
* Deletes the document(s) matching any of the provided queries.
* All given deletes are applied and flushed atomically at the same time.
*
* @return The <a href="#sequence_number">sequence number</a>
* for this operation
*
* @param queries array of queries to identify the documents
* to be deleted
* @throws CorruptIndexException if the index is corrupt
@ -1522,6 +1545,9 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
* by a reader on the same index (flush may happen only after
* the add).
*
* @return The <a href="#sequence_number">sequence number</a>
* for this operation
*
* @param term the term to identify the document(s) to be
* deleted
* @param doc the document to be added
@ -1566,6 +1592,10 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
* field name of the {@link NumericDocValues} field
* @param value
* new value for the field
*
* @return The <a href="#sequence_number">sequence number</a>
* for this operation
*
* @throws CorruptIndexException
* if the index is corrupt
* @throws IOException
@ -1606,6 +1636,10 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
* field name of the {@link BinaryDocValues} field
* @param value
* new value for the field
*
* @return The <a href="#sequence_number">sequence number</a>
* for this operation
*
* @throws CorruptIndexException
* if the index is corrupt
* @throws IOException
@ -1642,6 +1676,10 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
*
* @param updates
* the updates to apply
*
* @return The <a href="#sequence_number">sequence number</a>
* for this operation
*
* @throws CorruptIndexException
* if the index is corrupt
* @throws IOException
@ -2256,6 +2294,9 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
* threads are running {@link #forceMerge}, {@link #addIndexes(CodecReader[])}
* or {@link #forceMergeDeletes} methods, they may receive
* {@link MergePolicy.MergeAbortedException}s.
*
* @return The <a href="#sequence_number">sequence number</a>
* for this operation
*/
public long deleteAll() throws IOException {
ensureOpen();
@ -2304,7 +2345,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
globalFieldNumberMap.clear();
success = true;
return docWriter.deleteQueue.seqNo.get();
return docWriter.deleteQueue.getNextSequenceNumber();
} finally {
docWriter.unlockAllAfterAbortAll(this);
@ -2542,6 +2583,9 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
*
* <p>This requires this index not be among those to be added.
*
* @return The <a href="#sequence_number">sequence number</a>
* for this operation
*
* @throws CorruptIndexException if the index is corrupt
* @throws IOException if there is a low-level IO error
* @throws IllegalArgumentException if addIndexes would cause
@ -2559,6 +2603,8 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
boolean successTop = false;
long seqNo;
try {
if (infoStream.isEnabled("IW")) {
infoStream.message("IW", "flush at addIndexes(Directory...)");
@ -2630,6 +2676,8 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
// Now reserve the docs, just before we update SIS:
reserveDocs(totalMaxDoc);
seqNo = docWriter.deleteQueue.getNextSequenceNumber();
success = true;
} finally {
if (!success) {
@ -2647,6 +2695,8 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
} catch (VirtualMachineError tragedy) {
tragicEvent(tragedy, "addIndexes(Directory...)");
// dead code but javac disagrees:
seqNo = -1;
} finally {
if (successTop) {
IOUtils.close(locks);
@ -2656,8 +2706,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
}
maybeMerge();
// no need to increment:
return docWriter.deleteQueue.seqNo.get();
return seqNo;
}
/**
@ -2682,6 +2731,9 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
* {@code maxMergeAtOnce} parameter, you should pass that many readers in one
* call.
*
* @return The <a href="#sequence_number">sequence number</a>
* for this operation
*
* @throws CorruptIndexException
* if the index is corrupt
* @throws IOException
@ -2697,6 +2749,8 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
Sort indexSort = config.getIndexSort();
long seqNo;
try {
if (infoStream.isEnabled("IW")) {
infoStream.message("IW", "flush at addIndexes(CodecReader...)");
@ -2731,8 +2785,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
rateLimiters.set(new MergeRateLimiter(null));
if (!merger.shouldMerge()) {
// no need to increment:
return docWriter.deleteQueue.seqNo.get();
return docWriter.deleteQueue.getNextSequenceNumber();
}
merger.merge(); // merge 'em
@ -2751,8 +2804,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
// Safe: these files must exist
deleteNewFiles(infoPerCommit.files());
// no need to increment:
return docWriter.deleteQueue.seqNo.get();
return docWriter.deleteQueue.getNextSequenceNumber();
}
ensureOpen();
useCompoundFile = mergePolicy.useCompoundFile(segmentInfos, infoPerCommit, this);
@ -2788,8 +2840,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
// Safe: these files must exist
deleteNewFiles(infoPerCommit.files());
// no need to increment:
return docWriter.deleteQueue.seqNo.get();
return docWriter.deleteQueue.getNextSequenceNumber();
}
ensureOpen();
@ -2797,15 +2848,17 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
reserveDocs(numDocs);
segmentInfos.add(infoPerCommit);
seqNo = docWriter.deleteQueue.getNextSequenceNumber();
checkpoint();
}
} catch (VirtualMachineError tragedy) {
tragicEvent(tragedy, "addIndexes(CodecReader...)");
// dead code but javac disagrees:
seqNo = -1;
}
maybeMerge();
// no need to increment:
return docWriter.deleteQueue.seqNo.get();
return seqNo;
}
/** Copies the segment files as-is into the IndexWriter's directory. */
@ -2873,6 +2926,10 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
* <p>You can also just call {@link #commit()} directly
* without prepareCommit first in which case that method
* will internally call prepareCommit.
*
* @return The <a href="#sequence_number">sequence number</a>
* last operation in the commit. All sequence numbers &lt;= this value
* will be reflected in the commit, and all others will not.
*/
@Override
public final long prepareCommit() throws IOException {
@ -3069,6 +3126,10 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
* point, and all other operations will not. </p>
*
* @see #prepareCommit
*
* @return The <a href="#sequence_number">sequence number</a>
* last operation in the commit. All sequence numbers &lt;= this value
* will be reflected in the commit, and all others will not.
*/
@Override
public final long commit() throws IOException {
@ -4988,11 +5049,12 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
};
}
/** Returns the last sequence number.
/** Returns the last <a href="#sequence_number">sequence number</a>, or 0
* if no index-changing operations have completed yet.
*
* @lucene.experimental */
public long getLastSequenceNumber() {
ensureOpen();
return docWriter.deleteQueue.seqNo.get()-1;
return docWriter.deleteQueue.getLastSequenceNumber();
}
}

View File

@ -43,7 +43,7 @@ public class TestIndexingSequenceNumbers extends LuceneTestCase {
IndexWriter w = new IndexWriter(dir, newIndexWriterConfig());
long a = w.addDocument(new Document());
long b = w.addDocument(new Document());
assertTrue(b >= a);
assertTrue(b > a);
w.close();
dir.close();
}
@ -129,7 +129,7 @@ public class TestIndexingSequenceNumbers extends LuceneTestCase {
}
static class Operation {
// 0 = update, 1 = delete, 2 = commit
// 0 = update, 1 = delete, 2 = commit, 3 = add
byte what;
int id;
int threadID;
@ -248,7 +248,7 @@ public class TestIndexingSequenceNumbers extends LuceneTestCase {
}
}
assertTrue(op.seqNo >= lastSeqNo);
assertTrue(op.seqNo > lastSeqNo);
lastSeqNo = op.seqNo;
}
}
@ -293,4 +293,153 @@ public class TestIndexingSequenceNumbers extends LuceneTestCase {
dir.close();
}
public void testStressConcurrentAddAndDeleteAndCommit() throws Exception {
final int opCount = atLeast(10000);
final int idCount = TestUtil.nextInt(random(), 10, 1000);
Directory dir = newDirectory();
IndexWriterConfig iwc = newIndexWriterConfig();
iwc.setIndexDeletionPolicy(NoDeletionPolicy.INSTANCE);
// Cannot use RIW since it randomly commits:
final IndexWriter w = new IndexWriter(dir, iwc);
final int numThreads = TestUtil.nextInt(random(), 2, 5);
Thread[] threads = new Thread[numThreads];
//System.out.println("TEST: iter=" + iter + " opCount=" + opCount + " idCount=" + idCount + " threadCount=" + threads.length);
final CountDownLatch startingGun = new CountDownLatch(1);
List<List<Operation>> threadOps = new ArrayList<>();
Object commitLock = new Object();
final List<Operation> commits = new ArrayList<>();
// multiple threads update the same set of documents, and we randomly commit
for(int i=0;i<threads.length;i++) {
final List<Operation> ops = new ArrayList<>();
threadOps.add(ops);
final int threadID = i;
threads[i] = new Thread() {
@Override
public void run() {
try {
startingGun.await();
for(int i=0;i<opCount;i++) {
Operation op = new Operation();
op.threadID = threadID;
if (random().nextInt(500) == 17) {
op.what = 2;
synchronized(commitLock) {
op.seqNo = w.commit();
if (op.seqNo != -1) {
commits.add(op);
}
}
} else {
op.id = random().nextInt(idCount);
Term idTerm = new Term("id", "" + op.id);
if (random().nextInt(10) == 1) {
op.what = 1;
if (random().nextBoolean()) {
op.seqNo = w.deleteDocuments(idTerm);
} else {
op.seqNo = w.deleteDocuments(new TermQuery(idTerm));
}
} else {
Document doc = new Document();
doc.add(new StoredField("thread", threadID));
doc.add(new StringField("id", "" + op.id, Field.Store.NO));
if (random().nextBoolean()) {
List<Document> docs = new ArrayList<>();
docs.add(doc);
op.seqNo = w.addDocuments(docs);
} else {
op.seqNo = w.addDocument(doc);
}
op.what = 3;
}
ops.add(op);
}
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
};
threads[i].start();
}
startingGun.countDown();
for(Thread thread : threads) {
thread.join();
}
Operation commitOp = new Operation();
commitOp.seqNo = w.commit();
if (commitOp.seqNo != -1) {
commits.add(commitOp);
}
List<IndexCommit> indexCommits = DirectoryReader.listCommits(dir);
assertEquals(commits.size(), indexCommits.size());
// how many docs with this id are expected:
int[] expectedCounts = new int[idCount];
long[] lastDelSeqNos = new long[idCount];
//System.out.println("TEST: " + commits.size() + " commits");
for(int i=0;i<commits.size();i++) {
// this commit point should reflect all operations <= this seqNo
long commitSeqNo = commits.get(i).seqNo;
//System.out.println(" commit " + i + ": seqNo=" + commitSeqNo + " segs=" + indexCommits.get(i));
// first find the highest seqNo of the last delete op, for each id, prior to this commit:
Arrays.fill(lastDelSeqNos, -1);
for(int threadID=0;threadID<threadOps.size();threadID++) {
long lastSeqNo = 0;
for(Operation op : threadOps.get(threadID)) {
if (op.what == 1 && op.seqNo <= commitSeqNo && op.seqNo > lastDelSeqNos[op.id]) {
lastDelSeqNos[op.id] = op.seqNo;
}
// within one thread the seqNos must only increase:
assertTrue(op.seqNo > lastSeqNo);
lastSeqNo = op.seqNo;
}
}
// then count how many adds happened since the last delete and before this commit:
Arrays.fill(expectedCounts, 0);
for(int threadID=0;threadID<threadOps.size();threadID++) {
for(Operation op : threadOps.get(threadID)) {
if (op.what == 3 && op.seqNo <= commitSeqNo && op.seqNo > lastDelSeqNos[op.id]) {
expectedCounts[op.id]++;
}
}
}
DirectoryReader r = DirectoryReader.open(indexCommits.get(i));
IndexSearcher s = new IndexSearcher(r);
for(int id=0;id<idCount;id++) {
//System.out.println("TEST: check id=" + id + " expectedThreadID=" + expectedThreadIDs[id]);
assertEquals(expectedCounts[id], s.count(new TermQuery(new Term("id", ""+id))));
}
w.close();
r.close();
}
dir.close();
}
public void testDeleteAll() throws Exception {
Directory dir = newDirectory();
IndexWriter w = new IndexWriter(dir, newIndexWriterConfig());
long a = w.addDocument(new Document());
long b = w.deleteAll();
assertTrue(a < b);
long c = w.commit();
assertTrue(b < c);
w.close();
dir.close();
}
}