cutover all IW APIs that change the index to return seq no

This commit is contained in:
Mike McCandless 2016-05-24 19:45:40 -04:00
parent 058970e72b
commit e4a21330a2
10 changed files with 119 additions and 79 deletions

View File

@ -136,13 +136,15 @@ final class DocumentsWriter implements Closeable, Accountable {
flushControl = new DocumentsWriterFlushControl(this, config, writer.bufferedUpdatesStream); flushControl = new DocumentsWriterFlushControl(this, config, writer.bufferedUpdatesStream);
} }
synchronized boolean deleteQueries(final Query... queries) throws IOException { synchronized long deleteQueries(final Query... queries) throws IOException {
// TODO why is this synchronized? // TODO why is this synchronized?
final DocumentsWriterDeleteQueue deleteQueue = this.deleteQueue; final DocumentsWriterDeleteQueue deleteQueue = this.deleteQueue;
deleteQueue.addDelete(queries); long seqNo = deleteQueue.addDelete(queries);
flushControl.doOnDelete(); flushControl.doOnDelete();
// nocommit long if (applyAllDeletes(deleteQueue)) {
return applyAllDeletes(deleteQueue); seqNo = -seqNo;
}
return seqNo;
} }
// TODO: we could check w/ FreqProxTermsWriter: if the // TODO: we could check w/ FreqProxTermsWriter: if the
@ -251,6 +253,10 @@ final class DocumentsWriter implements Closeable, Accountable {
abortedDocCount += abortThreadState(perThread); abortedDocCount += abortThreadState(perThread);
} }
deleteQueue.clear(); deleteQueue.clear();
// jump over any possible in flight ops:
deleteQueue.seqNo.addAndGet(perThreadPool.getActiveThreadStateCount()+1);
flushControl.abortPendingFlushes(); flushControl.abortPendingFlushes();
flushControl.waitForFlush(); flushControl.waitForFlush();
success = true; success = true;
@ -397,12 +403,13 @@ final class DocumentsWriter implements Closeable, Accountable {
} }
} }
boolean updateDocuments(final Iterable<? extends Iterable<? extends IndexableField>> docs, final Analyzer analyzer, long updateDocuments(final Iterable<? extends Iterable<? extends IndexableField>> docs, final Analyzer analyzer,
final Term delTerm) throws IOException, AbortingException { final Term delTerm) throws IOException, AbortingException {
boolean hasEvents = preUpdate(); boolean hasEvents = preUpdate();
final ThreadState perThread = flushControl.obtainAndLock(); final ThreadState perThread = flushControl.obtainAndLock();
final DocumentsWriterPerThread flushingDWPT; final DocumentsWriterPerThread flushingDWPT;
final long seqNo;
try { try {
// This must happen after we've pulled the ThreadState because IW.close // This must happen after we've pulled the ThreadState because IW.close
@ -413,7 +420,7 @@ final class DocumentsWriter implements Closeable, Accountable {
final DocumentsWriterPerThread dwpt = perThread.dwpt; final DocumentsWriterPerThread dwpt = perThread.dwpt;
final int dwptNumDocs = dwpt.getNumDocsInRAM(); final int dwptNumDocs = dwpt.getNumDocsInRAM();
try { try {
dwpt.updateDocuments(docs, analyzer, delTerm); seqNo = dwpt.updateDocuments(docs, analyzer, delTerm);
} catch (AbortingException ae) { } catch (AbortingException ae) {
flushControl.doOnAbort(perThread); flushControl.doOnAbort(perThread);
dwpt.abort(); dwpt.abort();
@ -430,7 +437,11 @@ final class DocumentsWriter implements Closeable, Accountable {
perThreadPool.release(perThread); perThreadPool.release(perThread);
} }
return postUpdate(flushingDWPT, hasEvents); if (postUpdate(flushingDWPT, hasEvents)) {
return -seqNo;
} else {
return seqNo;
}
} }
long updateDocument(final Iterable<? extends IndexableField> doc, final Analyzer analyzer, long updateDocument(final Iterable<? extends IndexableField> doc, final Analyzer analyzer,
@ -441,7 +452,7 @@ final class DocumentsWriter implements Closeable, Accountable {
final ThreadState perThread = flushControl.obtainAndLock(); final ThreadState perThread = flushControl.obtainAndLock();
final DocumentsWriterPerThread flushingDWPT; final DocumentsWriterPerThread flushingDWPT;
final long seqno; final long seqNo;
try { try {
// This must happen after we've pulled the ThreadState because IW.close // This must happen after we've pulled the ThreadState because IW.close
// waits for all ThreadStates to be released: // waits for all ThreadStates to be released:
@ -451,7 +462,7 @@ final class DocumentsWriter implements Closeable, Accountable {
final DocumentsWriterPerThread dwpt = perThread.dwpt; final DocumentsWriterPerThread dwpt = perThread.dwpt;
final int dwptNumDocs = dwpt.getNumDocsInRAM(); final int dwptNumDocs = dwpt.getNumDocsInRAM();
try { try {
seqno = dwpt.updateDocument(doc, analyzer, delTerm); seqNo = dwpt.updateDocument(doc, analyzer, delTerm);
} catch (AbortingException ae) { } catch (AbortingException ae) {
flushControl.doOnAbort(perThread); flushControl.doOnAbort(perThread);
dwpt.abort(); dwpt.abort();
@ -469,9 +480,9 @@ final class DocumentsWriter implements Closeable, Accountable {
} }
if (postUpdate(flushingDWPT, hasEvents)) { if (postUpdate(flushingDWPT, hasEvents)) {
return -seqno; return -seqNo;
} else { } else {
return seqno; return seqNo;
} }
} }

View File

@ -480,6 +480,8 @@ final class DocumentsWriterFlushControl implements Accountable {
// Set a new delete queue - all subsequent DWPT will use this queue until // Set a new delete queue - all subsequent DWPT will use this queue until
// we do another full flush // we do another full flush
//System.out.println("DWFC: fullFLush old seqNo=" + documentsWriter.deleteQueue.seqNo.get() + " activeThreadCount=" + perThreadPool.getActiveThreadStateCount()); //System.out.println("DWFC: fullFLush old seqNo=" + documentsWriter.deleteQueue.seqNo.get() + " activeThreadCount=" + perThreadPool.getActiveThreadStateCount());
// jump over any possible in flight ops:
seqNo = documentsWriter.deleteQueue.seqNo.get() + perThreadPool.getActiveThreadStateCount(); seqNo = documentsWriter.deleteQueue.seqNo.get() + perThreadPool.getActiveThreadStateCount();
// nocommit is this (active thread state count) always enough of a gap? what if new indexing thread sneaks in just now? it would // nocommit is this (active thread state count) always enough of a gap? what if new indexing thread sneaks in just now? it would

View File

@ -244,7 +244,7 @@ class DocumentsWriterPerThread {
return finishDocument(delTerm); return finishDocument(delTerm);
} }
public int updateDocuments(Iterable<? extends Iterable<? extends IndexableField>> docs, Analyzer analyzer, Term delTerm) throws IOException, AbortingException { public long updateDocuments(Iterable<? extends Iterable<? extends IndexableField>> docs, Analyzer analyzer, Term delTerm) throws IOException, AbortingException {
testPoint("DocumentsWriterPerThread addDocuments start"); testPoint("DocumentsWriterPerThread addDocuments start");
assert deleteQueue != null; assert deleteQueue != null;
docState.analyzer = analyzer; docState.analyzer = analyzer;
@ -285,13 +285,17 @@ class DocumentsWriterPerThread {
// Apply delTerm only after all indexing has // Apply delTerm only after all indexing has
// succeeded, but apply it only to docs prior to when // succeeded, but apply it only to docs prior to when
// this batch started: // this batch started:
long seqNo;
if (delTerm != null) { if (delTerm != null) {
deleteQueue.add(delTerm, deleteSlice); seqNo = deleteQueue.add(delTerm, deleteSlice);
assert deleteSlice.isTailItem(delTerm) : "expected the delete term as the tail item"; assert deleteSlice.isTailItem(delTerm) : "expected the delete term as the tail item";
deleteSlice.apply(pendingUpdates, numDocsInRAM-docCount); deleteSlice.apply(pendingUpdates, numDocsInRAM-docCount);
return seqNo;
} else {
seqNo = deleteQueue.seqNo.get();
} }
// nocommit return seqNo here return seqNo;
} finally { } finally {
if (!allDocsIndexed && !aborted) { if (!allDocsIndexed && !aborted) {
@ -306,8 +310,6 @@ class DocumentsWriterPerThread {
} }
docState.clear(); docState.clear();
} }
return docCount;
} }
private long finishDocument(Term delTerm) { private long finishDocument(Term delTerm) {
@ -326,7 +328,6 @@ class DocumentsWriterPerThread {
assert deleteSlice.isTailItem(delTerm) : "expected the delete term as the tail item"; assert deleteSlice.isTailItem(delTerm) : "expected the delete term as the tail item";
} else { } else {
applySlice &= deleteQueue.updateSlice(deleteSlice); applySlice &= deleteQueue.updateSlice(deleteSlice);
// nocommit we don't need to increment here?
seqNo = deleteQueue.seqNo.get(); seqNo = deleteQueue.seqNo.get();
} }

View File

@ -1332,8 +1332,8 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
* *
* @lucene.experimental * @lucene.experimental
*/ */
public void addDocuments(Iterable<? extends Iterable<? extends IndexableField>> docs) throws IOException { public long addDocuments(Iterable<? extends Iterable<? extends IndexableField>> docs) throws IOException {
updateDocuments(null, docs); return updateDocuments(null, docs);
} }
/** /**
@ -1349,15 +1349,18 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
* *
* @lucene.experimental * @lucene.experimental
*/ */
public void updateDocuments(Term delTerm, Iterable<? extends Iterable<? extends IndexableField>> docs) throws IOException { public long updateDocuments(Term delTerm, Iterable<? extends Iterable<? extends IndexableField>> docs) throws IOException {
ensureOpen(); ensureOpen();
try { try {
boolean success = false; boolean success = false;
try { try {
if (docWriter.updateDocuments(docs, analyzer, delTerm)) { long seqNo = docWriter.updateDocuments(docs, analyzer, delTerm);
if (seqNo < 0) {
seqNo = -seqNo;
processEvents(true, false); processEvents(true, false);
} }
success = true; success = true;
return seqNo;
} finally { } finally {
if (!success) { if (!success) {
if (infoStream.isEnabled("IW")) { if (infoStream.isEnabled("IW")) {
@ -1367,6 +1370,9 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
} }
} catch (AbortingException | VirtualMachineError tragedy) { } catch (AbortingException | VirtualMachineError tragedy) {
tragicEvent(tragedy, "updateDocuments"); tragicEvent(tragedy, "updateDocuments");
// dead code but javac disagrees
return -1;
} }
} }
@ -1375,15 +1381,15 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
* DirectoryReader#open(IndexWriter)}). If the * DirectoryReader#open(IndexWriter)}). If the
* provided reader is an NRT reader obtained from this * provided reader is an NRT reader obtained from this
* writer, and its segment has not been merged away, then * writer, and its segment has not been merged away, then
* the delete succeeds and this method returns true; else, it * the delete succeeds and this method returns a valid (&gt; 0) sequence
* returns false the caller must then separately delete by * number; else, it returns -1 and the caller must then
* Term or Query. * separately delete by Term or Query.
* *
* <b>NOTE</b>: this method can only delete documents * <b>NOTE</b>: this method can only delete documents
* visible to the currently open NRT reader. If you need * visible to the currently open NRT reader. If you need
* to delete documents indexed after opening the NRT * to delete documents indexed after opening the NRT
* reader you must use {@link #deleteDocuments(Term...)}). */ * reader you must use {@link #deleteDocuments(Term...)}). */
public synchronized boolean tryDeleteDocument(IndexReader readerIn, int docID) throws IOException { public synchronized long tryDeleteDocument(IndexReader readerIn, int docID) throws IOException {
final LeafReader reader; final LeafReader reader;
if (readerIn instanceof LeafReader) { if (readerIn instanceof LeafReader) {
@ -1434,7 +1440,8 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
changed(); changed();
} }
//System.out.println(" yes " + info.info.name + " " + docID); //System.out.println(" yes " + info.info.name + " " + docID);
return true;
return docWriter.deleteQueue.seqNo.getAndIncrement();
} }
} else { } else {
//System.out.println(" no rld " + info.info.name + " " + docID); //System.out.println(" no rld " + info.info.name + " " + docID);
@ -1442,7 +1449,8 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
} else { } else {
//System.out.println(" no seg " + info.info.name + " " + docID); //System.out.println(" no seg " + info.info.name + " " + docID);
} }
return false;
return -1;
} }
/** /**
@ -1481,23 +1489,29 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
* @throws CorruptIndexException if the index is corrupt * @throws CorruptIndexException if the index is corrupt
* @throws IOException if there is a low-level IO error * @throws IOException if there is a low-level IO error
*/ */
public void deleteDocuments(Query... queries) throws IOException { public long deleteDocuments(Query... queries) throws IOException {
ensureOpen(); ensureOpen();
// LUCENE-6379: Specialize MatchAllDocsQuery // LUCENE-6379: Specialize MatchAllDocsQuery
for(Query query : queries) { for(Query query : queries) {
if (query.getClass() == MatchAllDocsQuery.class) { if (query.getClass() == MatchAllDocsQuery.class) {
deleteAll(); return deleteAll();
return;
} }
} }
try { try {
if (docWriter.deleteQueries(queries)) { long seqNo = docWriter.deleteQueries(queries);
if (seqNo < 0) {
seqNo = -seqNo;
processEvents(true, false); processEvents(true, false);
} }
return seqNo;
} catch (VirtualMachineError tragedy) { } catch (VirtualMachineError tragedy) {
tragicEvent(tragedy, "deleteDocuments(Query..)"); tragicEvent(tragedy, "deleteDocuments(Query..)");
// dead code but javac disagrees:
return -1;
} }
} }
@ -2225,7 +2239,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
* or {@link #forceMergeDeletes} methods, they may receive * or {@link #forceMergeDeletes} methods, they may receive
* {@link MergePolicy.MergeAbortedException}s. * {@link MergePolicy.MergeAbortedException}s.
*/ */
public void deleteAll() throws IOException { public long deleteAll() throws IOException {
ensureOpen(); ensureOpen();
// Remove any buffered docs // Remove any buffered docs
boolean success = false; boolean success = false;
@ -2272,6 +2286,8 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
globalFieldNumberMap.clear(); globalFieldNumberMap.clear();
success = true; success = true;
return docWriter.deleteQueue.seqNo.get();
} finally { } finally {
docWriter.unlockAllAfterAbortAll(this); docWriter.unlockAllAfterAbortAll(this);
if (!success) { if (!success) {
@ -2284,6 +2300,9 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
} }
} catch (VirtualMachineError tragedy) { } catch (VirtualMachineError tragedy) {
tragicEvent(tragedy, "deleteAll"); tragicEvent(tragedy, "deleteAll");
// dead code but javac disagrees
return -1;
} }
} }
@ -2511,7 +2530,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
* the index to exceed {@link #MAX_DOCS}, or if the indoming * the index to exceed {@link #MAX_DOCS}, or if the indoming
* index sort does not match this index's index sort * index sort does not match this index's index sort
*/ */
public void addIndexes(Directory... dirs) throws IOException { public long addIndexes(Directory... dirs) throws IOException {
ensureOpen(); ensureOpen();
noDupDirs(dirs); noDupDirs(dirs);
@ -2618,6 +2637,9 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
} }
} }
maybeMerge(); maybeMerge();
// no need to increment:
return docWriter.deleteQueue.seqNo.get();
} }
/** /**
@ -2649,7 +2671,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
* @throws IllegalArgumentException * @throws IllegalArgumentException
* if addIndexes would cause the index to exceed {@link #MAX_DOCS} * if addIndexes would cause the index to exceed {@link #MAX_DOCS}
*/ */
public void addIndexes(CodecReader... readers) throws IOException { public long addIndexes(CodecReader... readers) throws IOException {
ensureOpen(); ensureOpen();
// long so we can detect int overflow: // long so we can detect int overflow:
@ -2691,7 +2713,8 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
rateLimiters.set(new MergeRateLimiter(null)); rateLimiters.set(new MergeRateLimiter(null));
if (!merger.shouldMerge()) { if (!merger.shouldMerge()) {
return; // no need to increment:
return docWriter.deleteQueue.seqNo.get();
} }
merger.merge(); // merge 'em merger.merge(); // merge 'em
@ -2709,7 +2732,9 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
if (stopMerges) { if (stopMerges) {
// Safe: these files must exist // Safe: these files must exist
deleteNewFiles(infoPerCommit.files()); deleteNewFiles(infoPerCommit.files());
return;
// no need to increment:
return docWriter.deleteQueue.seqNo.get();
} }
ensureOpen(); ensureOpen();
useCompoundFile = mergePolicy.useCompoundFile(segmentInfos, infoPerCommit, this); useCompoundFile = mergePolicy.useCompoundFile(segmentInfos, infoPerCommit, this);
@ -2744,7 +2769,9 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
if (stopMerges) { if (stopMerges) {
// Safe: these files must exist // Safe: these files must exist
deleteNewFiles(infoPerCommit.files()); deleteNewFiles(infoPerCommit.files());
return;
// no need to increment:
return docWriter.deleteQueue.seqNo.get();
} }
ensureOpen(); ensureOpen();
@ -2758,6 +2785,9 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
tragicEvent(tragedy, "addIndexes(CodecReader...)"); tragicEvent(tragedy, "addIndexes(CodecReader...)");
} }
maybeMerge(); maybeMerge();
// no need to increment:
return docWriter.deleteQueue.seqNo.get();
} }
/** Copies the segment files as-is into the IndexWriter's directory. */ /** Copies the segment files as-is into the IndexWriter's directory. */

View File

@ -66,14 +66,6 @@ public class TrackingIndexWriter {
return indexingGen.get(); return indexingGen.get();
} }
/** Calls {@link IndexWriter#deleteDocuments(Term...)} and
* returns the generation that reflects this change. */
public long deleteDocuments(Term t) throws IOException {
writer.deleteDocuments(t);
// Return gen as of when indexing finished:
return indexingGen.get();
}
/** Calls {@link IndexWriter#deleteDocuments(Term...)} and /** Calls {@link IndexWriter#deleteDocuments(Term...)} and
* returns the generation that reflects this change. */ * returns the generation that reflects this change. */
public long deleteDocuments(Term... terms) throws IOException { public long deleteDocuments(Term... terms) throws IOException {
@ -82,14 +74,6 @@ public class TrackingIndexWriter {
return indexingGen.get(); return indexingGen.get();
} }
/** Calls {@link IndexWriter#deleteDocuments(Query...)} and
* returns the generation that reflects this change. */
public long deleteDocuments(Query q) throws IOException {
writer.deleteDocuments(q);
// Return gen as of when indexing finished:
return indexingGen.get();
}
/** Calls {@link IndexWriter#deleteDocuments(Query...)} /** Calls {@link IndexWriter#deleteDocuments(Query...)}
* and returns the generation that reflects this change. */ * and returns the generation that reflects this change. */
public long deleteDocuments(Query... queries) throws IOException { public long deleteDocuments(Query... queries) throws IOException {
@ -159,7 +143,7 @@ public class TrackingIndexWriter {
* IndexWriter#tryDeleteDocument(IndexReader,int)} and * IndexWriter#tryDeleteDocument(IndexReader,int)} and
* returns the generation that reflects this change. */ * returns the generation that reflects this change. */
public long tryDeleteDocument(IndexReader reader, int docID) throws IOException { public long tryDeleteDocument(IndexReader reader, int docID) throws IOException {
if (writer.tryDeleteDocument(reader, docID)) { if (writer.tryDeleteDocument(reader, docID) != -1) {
return indexingGen.get(); return indexingGen.get();
} else { } else {
return -1; return -1;

View File

@ -1238,8 +1238,8 @@ public class TestIndexWriterDelete extends LuceneTestCase {
iwc.setOpenMode(IndexWriterConfig.OpenMode.APPEND); iwc.setOpenMode(IndexWriterConfig.OpenMode.APPEND);
w = new IndexWriter(d, iwc); w = new IndexWriter(d, iwc);
IndexReader r = DirectoryReader.open(w, false, false); IndexReader r = DirectoryReader.open(w, false, false);
assertTrue(w.tryDeleteDocument(r, 1)); assertTrue(w.tryDeleteDocument(r, 1) != -1);
assertTrue(w.tryDeleteDocument(r.leaves().get(0).reader(), 0)); assertTrue(w.tryDeleteDocument(r.leaves().get(0).reader(), 0) != -1);
r.close(); r.close();
w.close(); w.close();

View File

@ -43,7 +43,7 @@ public class TestIndexingSequenceNumbers extends LuceneTestCase {
IndexWriter w = new IndexWriter(dir, newIndexWriterConfig()); IndexWriter w = new IndexWriter(dir, newIndexWriterConfig());
long a = w.addDocument(new Document()); long a = w.addDocument(new Document());
long b = w.addDocument(new Document()); long b = w.addDocument(new Document());
assertTrue(b > a); assertTrue(b >= a);
w.close(); w.close();
dir.close(); dir.close();
} }
@ -154,7 +154,6 @@ public class TestIndexingSequenceNumbers extends LuceneTestCase {
Object commitLock = new Object(); Object commitLock = new Object();
final List<Operation> commits = new ArrayList<>(); final List<Operation> commits = new ArrayList<>();
final AtomicInteger opsSinceCommit = new AtomicInteger();
// multiple threads update the same set of documents, and we randomly commit // multiple threads update the same set of documents, and we randomly commit
for(int i=0;i<threads.length;i++) { for(int i=0;i<threads.length;i++) {
@ -172,12 +171,9 @@ public class TestIndexingSequenceNumbers extends LuceneTestCase {
if (random().nextInt(500) == 17) { if (random().nextInt(500) == 17) {
op.what = 2; op.what = 2;
synchronized(commitLock) { synchronized(commitLock) {
// nocommit why does this sometimes fail :) if (w.hasUncommittedChanges()) {
//if (w.hasUncommittedChanges()) {
if (opsSinceCommit.get() > numThreads) {
op.seqNo = w.commit(); op.seqNo = w.commit();
commits.add(op); commits.add(op);
opsSinceCommit.set(0);
} }
//System.out.println("done commit seqNo=" + op.seqNo); //System.out.println("done commit seqNo=" + op.seqNo);
} }
@ -186,16 +182,25 @@ public class TestIndexingSequenceNumbers extends LuceneTestCase {
Term idTerm = new Term("id", "" + op.id); Term idTerm = new Term("id", "" + op.id);
if (random().nextInt(10) == 1) { if (random().nextInt(10) == 1) {
op.what = 1; op.what = 1;
if (random().nextBoolean()) {
op.seqNo = w.deleteDocuments(idTerm); op.seqNo = w.deleteDocuments(idTerm);
} else {
op.seqNo = w.deleteDocuments(new TermQuery(idTerm));
}
} else { } else {
Document doc = new Document(); Document doc = new Document();
doc.add(new StoredField("thread", threadID)); doc.add(new StoredField("thread", threadID));
doc.add(new StringField("id", "" + op.id, Field.Store.NO)); doc.add(new StringField("id", "" + op.id, Field.Store.NO));
if (random().nextBoolean()) {
List<Document> docs = new ArrayList<>();
docs.add(doc);
op.seqNo = w.updateDocuments(idTerm, docs);
} else {
op.seqNo = w.updateDocument(idTerm, doc); op.seqNo = w.updateDocument(idTerm, doc);
}
op.what = 2; op.what = 2;
} }
ops.add(op); ops.add(op);
opsSinceCommit.getAndIncrement();
} }
} }
} catch (Exception e) { } catch (Exception e) {
@ -210,11 +215,14 @@ public class TestIndexingSequenceNumbers extends LuceneTestCase {
thread.join(); thread.join();
} }
/*
// nocommit: why does this make the assertEquals angry...?
if (w.hasUncommittedChanges()) {
Operation commitOp = new Operation(); Operation commitOp = new Operation();
synchronized(commitLock) {
commitOp.seqNo = w.commit(); commitOp.seqNo = w.commit();
commits.add(commitOp); commits.add(commitOp);
} }
*/
List<IndexCommit> indexCommits = DirectoryReader.listCommits(dir); List<IndexCommit> indexCommits = DirectoryReader.listCommits(dir);
assertEquals(commits.size(), indexCommits.size()); assertEquals(commits.size(), indexCommits.size());

View File

@ -80,7 +80,7 @@ public class TestRollingUpdates extends LuceneTestCase {
if (s != null && updateCount < SIZE) { if (s != null && updateCount < SIZE) {
TopDocs hits = s.search(new TermQuery(idTerm), 1); TopDocs hits = s.search(new TermQuery(idTerm), 1);
assertEquals(1, hits.totalHits); assertEquals(1, hits.totalHits);
doUpdate = !w.tryDeleteDocument(r, hits.scoreDocs[0].doc); doUpdate = w.tryDeleteDocument(r, hits.scoreDocs[0].doc) == -1;
if (VERBOSE) { if (VERBOSE) {
if (doUpdate) { if (doUpdate) {
System.out.println(" tryDeleteDocument failed"); System.out.println(" tryDeleteDocument failed");

View File

@ -40,29 +40,33 @@ public class TestTwoPhaseCommitTool extends LuceneTestCase {
} }
@Override @Override
public void prepareCommit() throws IOException { public long prepareCommit() throws IOException {
prepareCommit(null); return prepareCommit(null);
} }
public void prepareCommit(Map<String, String> commitData) throws IOException { public long prepareCommit(Map<String, String> commitData) throws IOException {
this.prepareCommitData = commitData; this.prepareCommitData = commitData;
assertFalse("commit should not have been called before all prepareCommit were", commitCalled); assertFalse("commit should not have been called before all prepareCommit were", commitCalled);
if (failOnPrepare) { if (failOnPrepare) {
throw new IOException("failOnPrepare"); throw new IOException("failOnPrepare");
} }
// nocommit hmm
return -1;
} }
@Override @Override
public void commit() throws IOException { public long commit() throws IOException {
commit(null); return commit(null);
} }
public void commit(Map<String, String> commitData) throws IOException { public long commit(Map<String, String> commitData) throws IOException {
this.commitData = commitData; this.commitData = commitData;
commitCalled = true; commitCalled = true;
if (failOnCommit) { if (failOnCommit) {
throw new RuntimeException("failOnCommit"); throw new RuntimeException("failOnCommit");
} }
// nocommit hmm
return -1;
} }
@Override @Override

View File

@ -389,14 +389,13 @@ public class TestControlledRealTimeReopenThread extends ThreadedIndexingAndSearc
super(d, conf); super(d, conf);
this.latch = latch; this.latch = latch;
this.signal = signal; this.signal = signal;
} }
@Override @Override
public void updateDocument(Term term, public long updateDocument(Term term,
Iterable<? extends IndexableField> doc) Iterable<? extends IndexableField> doc)
throws IOException { throws IOException {
super.updateDocument(term, doc); long result = super.updateDocument(term, doc);
try { try {
if (waitAfterUpdate) { if (waitAfterUpdate) {
signal.countDown(); signal.countDown();
@ -405,6 +404,7 @@ public class TestControlledRealTimeReopenThread extends ThreadedIndexingAndSearc
} catch (InterruptedException e) { } catch (InterruptedException e) {
throw new ThreadInterruptedException(e); throw new ThreadInterruptedException(e);
} }
return result;
} }
} }