sequence numbers: resolve last nocommits

This commit is contained in:
Mike McCandless 2016-05-25 18:41:21 -04:00
parent 38e9822cb4
commit 39de68963b
5 changed files with 31 additions and 58 deletions

View File

@ -150,41 +150,10 @@ final class DocumentsWriterDeleteQueue implements Accountable {
return seqNo;
}
// nocommit can we remove the sync'd
synchronized long add(Node<?> newNode) {
/*
* this non-blocking / 'wait-free' linked list add was inspired by Apache
* Harmony's ConcurrentLinkedQueue Implementation.
*/
while (true) {
final Node<?> currentTail = this.tail;
final Node<?> tailNext = currentTail.next;
if (tail == currentTail) {
if (tailNext != null) {
/*
* we are in intermediate state here. the tails next pointer has been
* advanced but the tail itself might not be updated yet. help to
* advance the tail and try again updating it.
*/
tailUpdater.compareAndSet(this, currentTail, tailNext); // can fail
} else {
/*
* we are in quiescent state and can try to insert the new node to the
* current tail if we fail to insert we just retry the operation since
* somebody else has already added its item
*/
if (currentTail.casNext(null, newNode)) {
/*
* now that we are done we need to advance the tail while another
* thread could have advanced it already so we can ignore the return
* type of this CAS call
*/
tailUpdater.compareAndSet(this, currentTail, newNode);
return seqNo.getAndIncrement();
}
}
}
}
tail.next = newNode;
tail = newNode;
return seqNo.getAndIncrement();
}
boolean anyChanges() {

View File

@ -484,8 +484,8 @@ final class DocumentsWriterFlushControl implements Accountable {
// jump over any possible in flight ops:
seqNo = documentsWriter.deleteQueue.seqNo.get() + perThreadPool.getActiveThreadStateCount();
// nocommit is this (active thread state count) always enough of a gap? what if new indexing thread sneaks in just now? it would
// have to get this next delete queue?
// Insert a gap in seqNo of current active thread count, in the worst case those threads now have one operation in flight. It's fine
// if we have some sequence numbers that were never assigned:
DocumentsWriterDeleteQueue newQueue = new DocumentsWriterDeleteQueue(flushingQueue.generation+1, seqNo+1);
documentsWriter.deleteQueue = newQueue;

View File

@ -426,7 +426,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
boolean success = false;
synchronized (fullFlushLock) {
try {
// nocommit should we make this available in the returned NRT reader?
// TODO: should we somehow make this available in the returned NRT reader?
long seqNo = docWriter.flushAllThreads();
if (seqNo < 0) {
anyChanges = true;
@ -2984,7 +2984,11 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
}
startCommit(toCommit);
success = true;
return seqNo;
if (pendingCommit == null) {
return -1;
} else {
return seqNo;
}
} finally {
if (!success) {
synchronized (this) {
@ -3058,6 +3062,12 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
* loss it may still lose data. Lucene cannot guarantee
* consistency on such devices. </p>
*
* <p> If nothing was committed, because there were no
* pending changes, this returns -1. Otherwise, it returns
* the sequence number such that all indexing operations
* prior to this sequence will be included in the commit
* point, and all other operations will not. </p>
*
* @see #prepareCommit
*/
@Override
@ -4978,9 +4988,11 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
};
}
// nocommit javadocs
/** Returns the last sequence number.
*
* @lucene.experimental */
public long getLastSequenceNumber() {
ensureOpen();
return docWriter.deleteQueue.seqNo.get();
return docWriter.deleteQueue.seqNo.get()-1;
}
}

View File

@ -151,9 +151,6 @@ public class ControlledRealTimeReopenThread<T> extends Thread implements Closeab
*/
public synchronized boolean waitForGeneration(long targetGen, int maxMS) throws InterruptedException {
final long curGen = writer.getLastSequenceNumber();
if (targetGen > curGen) {
throw new IllegalArgumentException("targetGen=" + targetGen + " was never returned by the ReferenceManager instance (current gen=" + curGen + ")");
}
if (targetGen > searchingGen) {
// Notify the reopen thread that the waitingGen has
// changed, so it may wake up and realize it should

View File

@ -74,8 +74,7 @@ public class TestIndexingSequenceNumbers extends LuceneTestCase {
int iters = atLeast(100);
for(int iter=0;iter<iters;iter++) {
Directory dir = newDirectory();
// nocommit use RandomIndexWriter
final IndexWriter w = new IndexWriter(dir, newIndexWriterConfig());
final RandomIndexWriter w = new RandomIndexWriter(random(), dir);
Thread[] threads = new Thread[TestUtil.nextInt(random(), 2, 5)];
final CountDownLatch startingGun = new CountDownLatch(1);
final long[] seqNos = new long[threads.length];
@ -117,7 +116,7 @@ public class TestIndexingSequenceNumbers extends LuceneTestCase {
}
// make sure all sequence numbers were different
assertEquals(threads.length, allSeqNos.size());
DirectoryReader r = DirectoryReader.open(w);
DirectoryReader r = w.getReader();
IndexSearcher s = newSearcher(r);
TopDocs hits = s.search(new TermQuery(id), 1);
assertEquals(1, hits.totalHits);
@ -142,10 +141,12 @@ public class TestIndexingSequenceNumbers extends LuceneTestCase {
final int idCount = TestUtil.nextInt(random(), 10, 1000);
Directory dir = newDirectory();
// nocommit use RandomIndexWriter
IndexWriterConfig iwc = newIndexWriterConfig();
iwc.setIndexDeletionPolicy(NoDeletionPolicy.INSTANCE);
// Cannot use RIW since it randomly commits:
final IndexWriter w = new IndexWriter(dir, iwc);
final int numThreads = TestUtil.nextInt(random(), 2, 5);
Thread[] threads = new Thread[numThreads];
//System.out.println("TEST: iter=" + iter + " opCount=" + opCount + " idCount=" + idCount + " threadCount=" + threads.length);
@ -171,11 +172,10 @@ public class TestIndexingSequenceNumbers extends LuceneTestCase {
if (random().nextInt(500) == 17) {
op.what = 2;
synchronized(commitLock) {
if (w.hasUncommittedChanges()) {
op.seqNo = w.commit();
op.seqNo = w.commit();
if (op.seqNo != -1) {
commits.add(op);
}
//System.out.println("done commit seqNo=" + op.seqNo);
}
} else {
op.id = random().nextInt(idCount);
@ -215,14 +215,11 @@ public class TestIndexingSequenceNumbers extends LuceneTestCase {
thread.join();
}
/*
// nocommit: why does this make the assertEquals angry...?
if (w.hasUncommittedChanges()) {
Operation commitOp = new Operation();
commitOp.seqNo = w.commit();
Operation commitOp = new Operation();
commitOp.seqNo = w.commit();
if (commitOp.seqNo != -1) {
commits.add(commitOp);
}
*/
List<IndexCommit> indexCommits = DirectoryReader.listCommits(dir);
assertEquals(commits.size(), indexCommits.size());
@ -296,6 +293,4 @@ public class TestIndexingSequenceNumbers extends LuceneTestCase {
dir.close();
}
// nocommit test that does n ops across threads, then does it again with a single index / single thread, and assert indices are the same
}