From 4666e84ee1eb673271266ec92a7113531a00179a Mon Sep 17 00:00:00 2001 From: Michael McCandless Date: Mon, 23 May 2011 16:02:25 +0000 Subject: [PATCH] LUCENE-3112: add IW.add/updateDocuments, to atomically add a block of docs with guaranteed sequential docIDs git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1126560 13f79535-47bb-0310-9956-ffa450edef68 --- lucene/CHANGES.txt | 4 + .../apache/lucene/index/IndexSplitter.java | 5 + .../lucene/index/MultiPassIndexSplitter.java | 5 + .../apache/lucene/index/DocumentsWriter.java | 71 ++++-- .../index/DocumentsWriterFlushControl.java | 6 +- .../index/DocumentsWriterPerThread.java | 78 ++++++- .../index/DocumentsWriterPerThreadPool.java | 3 +- .../org/apache/lucene/index/IndexWriter.java | 105 +++++++++ .../lucene/index/TermVectorsTermsWriter.java | 2 +- ...readAffinityDocumentsWriterThreadPool.java | 9 +- .../lucene/index/RandomIndexWriter.java | 85 ++++++- .../index/TestIndexWriterExceptions.java | 212 ++++++++++++++++-- .../apache/lucene/index/TestNRTThreads.java | 206 +++++++++++++++-- 13 files changed, 727 insertions(+), 64 deletions(-) diff --git a/lucene/CHANGES.txt b/lucene/CHANGES.txt index 191d740ce70..fb93517b96d 100644 --- a/lucene/CHANGES.txt +++ b/lucene/CHANGES.txt @@ -486,6 +486,10 @@ New features document IDs and scores encountered during the search, and "replay" them to another Collector. (Mike McCandless, Shai Erera) +* LUCENE-3112: Added experimental IndexWriter.add/updateDocuments, + enabling a block of documents to be indexed, atomically, with + guaranteed sequential docIDs. (Mike McCandless) + API Changes * LUCENE-3061: IndexWriter's getNextMerge() and merge(OneMerge) are now public diff --git a/lucene/contrib/misc/src/java/org/apache/lucene/index/IndexSplitter.java b/lucene/contrib/misc/src/java/org/apache/lucene/index/IndexSplitter.java index 2a42390fd82..3daf7aa7dd6 100644 --- a/lucene/contrib/misc/src/java/org/apache/lucene/index/IndexSplitter.java +++ b/lucene/contrib/misc/src/java/org/apache/lucene/index/IndexSplitter.java @@ -45,6 +45,11 @@ import org.apache.lucene.store.FSDirectory; * @lucene.experimental You can easily * accidentally remove segments from your index so be * careful! + * + *

NOTE: this tool is unaware of documents added + * atomically via {@link IndexWriter#addDocuments} or {@link + * IndexWriter#updateDocuments}, which means it can easily + * break up such document groups. */ public class IndexSplitter { public SegmentInfos infos; diff --git a/lucene/contrib/misc/src/java/org/apache/lucene/index/MultiPassIndexSplitter.java b/lucene/contrib/misc/src/java/org/apache/lucene/index/MultiPassIndexSplitter.java index ce42d8bbc99..c41d6dcd272 100644 --- a/lucene/contrib/misc/src/java/org/apache/lucene/index/MultiPassIndexSplitter.java +++ b/lucene/contrib/misc/src/java/org/apache/lucene/index/MultiPassIndexSplitter.java @@ -40,6 +40,11 @@ import org.apache.lucene.util.Version; *

Note 2: the disadvantage of this tool is that source index needs to be * read as many times as there are parts to be created, hence the name of this * tool. + * + *

NOTE: this tool is unaware of documents added + * atomically via {@link IndexWriter#addDocuments} or {@link + * IndexWriter#updateDocuments}, which means it can easily + * break up such document groups. */ public class MultiPassIndexSplitter { diff --git a/lucene/src/java/org/apache/lucene/index/DocumentsWriter.java b/lucene/src/java/org/apache/lucene/index/DocumentsWriter.java index b9556ebe73b..3f97ac82272 100644 --- a/lucene/src/java/org/apache/lucene/index/DocumentsWriter.java +++ b/lucene/src/java/org/apache/lucene/index/DocumentsWriter.java @@ -274,11 +274,9 @@ final class DocumentsWriter { flushControl.setClosed(); } - boolean updateDocument(final Document doc, final Analyzer analyzer, - final Term delTerm) throws CorruptIndexException, IOException { + private boolean preUpdate() throws CorruptIndexException, IOException { ensureOpen(); boolean maybeMerge = false; - final boolean isUpdate = delTerm != null; if (flushControl.anyStalledThreads() || flushControl.numQueuedFlushes() > 0) { // Help out flushing any queued DWPTs so we can un-stall: if (infoStream != null) { @@ -303,9 +301,59 @@ final class DocumentsWriter { message("continue indexing after helpling out flushing DocumentsWriter is healthy"); } } + return maybeMerge; + } - final ThreadState perThread = perThreadPool.getAndLock(Thread.currentThread(), - this, doc); + private boolean postUpdate(DocumentsWriterPerThread flushingDWPT, boolean maybeMerge) throws IOException { + if (flushingDWPT != null) { + maybeMerge |= doFlush(flushingDWPT); + } else { + final DocumentsWriterPerThread nextPendingFlush = flushControl.nextPendingFlush(); + if (nextPendingFlush != null) { + maybeMerge |= doFlush(nextPendingFlush); + } + } + + return maybeMerge; + } + + boolean updateDocuments(final Iterable docs, final Analyzer analyzer, + final Term delTerm) throws CorruptIndexException, IOException { + boolean maybeMerge = preUpdate(); + + final ThreadState perThread = perThreadPool.getAndLock(Thread.currentThread(), this); + final DocumentsWriterPerThread flushingDWPT; + + try { + if (!perThread.isActive()) { + ensureOpen(); + assert false: "perThread is not active but we are still open"; + } + + final DocumentsWriterPerThread dwpt = perThread.perThread; + try { + final int docCount = dwpt.updateDocuments(docs, analyzer, delTerm); + numDocsInRAM.addAndGet(docCount); + } finally { + if (dwpt.checkAndResetHasAborted()) { + flushControl.doOnAbort(perThread); + } + } + final boolean isUpdate = delTerm != null; + flushingDWPT = flushControl.doAfterDocument(perThread, isUpdate); + } finally { + perThread.unlock(); + } + + return postUpdate(flushingDWPT, maybeMerge); + } + + boolean updateDocument(final Document doc, final Analyzer analyzer, + final Term delTerm) throws CorruptIndexException, IOException { + + boolean maybeMerge = preUpdate(); + + final ThreadState perThread = perThreadPool.getAndLock(Thread.currentThread(), this); final DocumentsWriterPerThread flushingDWPT; try { @@ -324,20 +372,13 @@ final class DocumentsWriter { flushControl.doOnAbort(perThread); } } + final boolean isUpdate = delTerm != null; flushingDWPT = flushControl.doAfterDocument(perThread, isUpdate); } finally { perThread.unlock(); } - - if (flushingDWPT != null) { - maybeMerge |= doFlush(flushingDWPT); - } else { - final DocumentsWriterPerThread nextPendingFlush = flushControl.nextPendingFlush(); - if (nextPendingFlush != null) { - maybeMerge |= doFlush(nextPendingFlush); - } - } - return maybeMerge; + + return postUpdate(flushingDWPT, maybeMerge); } private boolean doFlush(DocumentsWriterPerThread flushingDWPT) throws IOException { diff --git a/lucene/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java b/lucene/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java index 81d6d9655c9..79b0959ef3b 100644 --- a/lucene/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java +++ b/lucene/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java @@ -68,7 +68,7 @@ public final class DocumentsWriterFlushControl { this.stallControl = new DocumentsWriterStallControl(); this.perThreadPool = documentsWriter.perThreadPool; this.flushPolicy = documentsWriter.flushPolicy; - this.hardMaxBytesPerDWPT = config.getRAMPerThreadHardLimitMB() * 1024 * 1024;; + this.hardMaxBytesPerDWPT = config.getRAMPerThreadHardLimitMB() * 1024 * 1024; this.config = config; this.documentsWriter = documentsWriter; } @@ -162,8 +162,6 @@ public final class DocumentsWriterFlushControl { stallControl.updateStalled(this); assert assertMemory(); } - - } synchronized void doAfterFlush(DocumentsWriterPerThread dwpt) { @@ -217,7 +215,7 @@ public final class DocumentsWriterFlushControl { assert assertMemory(); // Take it out of the loop this DWPT is stale perThreadPool.replaceForFlush(state, closed); - }finally { + } finally { stallControl.updateStalled(this); } } diff --git a/lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java b/lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java index 68334663fa0..553aff5f157 100644 --- a/lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java +++ b/lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java @@ -104,7 +104,7 @@ public class DocumentsWriterPerThread { // largish: doc = null; analyzer = null; - } + } } static class FlushedSegment { @@ -253,6 +253,82 @@ public class DocumentsWriterPerThread { finishDocument(delTerm); } + public int updateDocuments(Iterable docs, Analyzer analyzer, Term delTerm) throws IOException { + assert writer.testPoint("DocumentsWriterPerThread addDocuments start"); + assert deleteQueue != null; + docState.analyzer = analyzer; + if (segment == null) { + // this call is synchronized on IndexWriter.segmentInfos + segment = writer.newSegmentName(); + assert numDocsInRAM == 0; + } + + int docCount = 0; + try { + for(Document doc : docs) { + docState.doc = doc; + docState.docID = numDocsInRAM; + docCount++; + + boolean success = false; + try { + consumer.processDocument(fieldInfos); + success = true; + } finally { + if (!success) { + // An exc is being thrown... + + if (!aborting) { + // One of the documents hit a non-aborting + // exception (eg something happened during + // analysis). We now go and mark any docs + // from this batch that we had already indexed + // as deleted: + int docID = docState.docID; + final int endDocID = docID - docCount; + while (docID > endDocID) { + deleteDocID(docID); + docID--; + } + + // Incr here because finishDocument will not + // be called (because an exc is being thrown): + numDocsInRAM++; + fieldInfos.revertUncommitted(); + } else { + abort(); + } + } + } + success = false; + try { + consumer.finishDocument(); + success = true; + } finally { + if (!success) { + abort(); + } + } + + finishDocument(null); + } + + // Apply delTerm only after all indexing has + // succeeded, but apply it only to docs prior to when + // this batch started: + if (delTerm != null) { + deleteQueue.add(delTerm, deleteSlice); + assert deleteSlice.isTailItem(delTerm) : "expected the delete term as the tail item"; + deleteSlice.apply(pendingDeletes, numDocsInRAM-docCount); + } + + } finally { + docState.clear(); + } + + return docCount; + } + private void finishDocument(Term delTerm) throws IOException { /* * here we actually finish the document in two steps 1. push the delete into diff --git a/lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java b/lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java index 45ca99ad5d8..8014db17dcc 100644 --- a/lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java +++ b/lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java @@ -19,7 +19,6 @@ package org.apache.lucene.index; import java.util.Iterator; import java.util.concurrent.locks.ReentrantLock; -import org.apache.lucene.document.Document; import org.apache.lucene.index.FieldInfos.FieldNumberBiMap; import org.apache.lucene.index.SegmentCodecs.SegmentCodecsBuilder; import org.apache.lucene.index.codecs.CodecProvider; @@ -212,7 +211,7 @@ public abstract class DocumentsWriterPerThreadPool { // don't recycle DWPT by default } - public abstract ThreadState getAndLock(Thread requestingThread, DocumentsWriter documentsWriter, Document doc); + public abstract ThreadState getAndLock(Thread requestingThread, DocumentsWriter documentsWriter); /** * Returns an iterator providing access to all {@link ThreadState} diff --git a/lucene/src/java/org/apache/lucene/index/IndexWriter.java b/lucene/src/java/org/apache/lucene/index/IndexWriter.java index 305676ec2b6..54f6ad2173e 100644 --- a/lucene/src/java/org/apache/lucene/index/IndexWriter.java +++ b/lucene/src/java/org/apache/lucene/index/IndexWriter.java @@ -1227,6 +1227,111 @@ public class IndexWriter implements Closeable { updateDocument(null, doc, analyzer); } + /** + * Atomically adds a block of documents with sequentially + * assigned document IDs, such that an external reader + * will see all or none of the documents. + * + *

WARNING: the index does not currently record + * which documents were added as a block. Today this is + * fine, because merging will preserve the block (as long + * as none them were deleted). But it's possible in the + * future that Lucene may more aggressively re-order + * documents (for example, perhaps to obtain better index + * compression), in which case you may need to fully + * re-index your documents at that time. + * + *

See {@link #addDocument(Document)} for details on + * index and IndexWriter state after an Exception, and + * flushing/merging temporary free space requirements.

+ * + *

NOTE: tools that do offline splitting of an index + * (for example, IndexSplitter in contrib) or + * re-sorting of documents (for example, IndexSorter in + * contrib) are not aware of these atomically added documents + * and will likely break them up. Use such tools at your + * own risk! + * + *

NOTE: if this method hits an OutOfMemoryError + * you should immediately close the writer. See above for details.

+ * + * @throws CorruptIndexException if the index is corrupt + * @throws IOException if there is a low-level IO error + * + * @lucene.experimental + */ + public void addDocuments(Iterable docs) throws CorruptIndexException, IOException { + addDocuments(docs, analyzer); + } + + /** + * Atomically adds a block of documents, analyzed using the + * provided analyzer, with sequentially assigned document + * IDs, such that an external reader will see all or none + * of the documents. + * + * @throws CorruptIndexException if the index is corrupt + * @throws IOException if there is a low-level IO error + * + * @lucene.experimental + */ + public void addDocuments(Iterable docs, Analyzer analyzer) throws CorruptIndexException, IOException { + updateDocuments(null, docs, analyzer); + } + + /** + * Atomically deletes documents matching the provided + * delTerm and adds a block of documents with sequentially + * assigned document IDs, such that an external reader + * will see all or none of the documents. + * + * See {@link #addDocuments(Iterable)}. + * + * @throws CorruptIndexException if the index is corrupt + * @throws IOException if there is a low-level IO error + * + * @lucene.experimental + */ + public void updateDocuments(Term delTerm, Iterable docs) throws CorruptIndexException, IOException { + updateDocuments(delTerm, docs, analyzer); + } + + /** + * Atomically deletes documents matching the provided + * delTerm and adds a block of documents, analyzed using + * the provided analyzer, with sequentially + * assigned document IDs, such that an external reader + * will see all or none of the documents. + * + * See {@link #addDocuments(Iterable)}. + * + * @throws CorruptIndexException if the index is corrupt + * @throws IOException if there is a low-level IO error + * + * @lucene.experimental + */ + public void updateDocuments(Term delTerm, Iterable docs, Analyzer analyzer) throws CorruptIndexException, IOException { + ensureOpen(); + try { + boolean success = false; + boolean anySegmentFlushed = false; + try { + anySegmentFlushed = docWriter.updateDocuments(docs, analyzer, delTerm); + success = true; + } finally { + if (!success && infoStream != null) { + message("hit exception updating document"); + } + } + if (anySegmentFlushed) { + maybeMerge(); + } + } catch (OutOfMemoryError oom) { + handleOOM(oom, "updateDocuments"); + } + } + /** * Deletes the document(s) containing term. * diff --git a/lucene/src/java/org/apache/lucene/index/TermVectorsTermsWriter.java b/lucene/src/java/org/apache/lucene/index/TermVectorsTermsWriter.java index fa956dda190..dcc7c8da921 100644 --- a/lucene/src/java/org/apache/lucene/index/TermVectorsTermsWriter.java +++ b/lucene/src/java/org/apache/lucene/index/TermVectorsTermsWriter.java @@ -139,7 +139,7 @@ final class TermVectorsTermsWriter extends TermsHashConsumer { } } - assert lastDocID == docState.docID; + assert lastDocID == docState.docID: "lastDocID=" + lastDocID + " docState.docID=" + docState.docID; lastDocID++; diff --git a/lucene/src/java/org/apache/lucene/index/ThreadAffinityDocumentsWriterThreadPool.java b/lucene/src/java/org/apache/lucene/index/ThreadAffinityDocumentsWriterThreadPool.java index f478f29df1f..b5f0b6cb20d 100644 --- a/lucene/src/java/org/apache/lucene/index/ThreadAffinityDocumentsWriterThreadPool.java +++ b/lucene/src/java/org/apache/lucene/index/ThreadAffinityDocumentsWriterThreadPool.java @@ -18,7 +18,6 @@ package org.apache.lucene.index; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -import org.apache.lucene.document.Document; import org.apache.lucene.index.DocumentsWriterPerThreadPool.ThreadState; //javadoc /** @@ -48,12 +47,10 @@ public class ThreadAffinityDocumentsWriterThreadPool extends DocumentsWriterPerT } @Override - public ThreadState getAndLock(Thread requestingThread, DocumentsWriter documentsWriter, Document doc) { + public ThreadState getAndLock(Thread requestingThread, DocumentsWriter documentsWriter) { ThreadState threadState = threadBindings.get(requestingThread); - if (threadState != null) { - if (threadState.tryLock()) { - return threadState; - } + if (threadState != null && threadState.tryLock()) { + return threadState; } ThreadState minThreadState = null; diff --git a/lucene/src/test-framework/org/apache/lucene/index/RandomIndexWriter.java b/lucene/src/test-framework/org/apache/lucene/index/RandomIndexWriter.java index 0712e4104c4..627690bea43 100644 --- a/lucene/src/test-framework/org/apache/lucene/index/RandomIndexWriter.java +++ b/lucene/src/test-framework/org/apache/lucene/index/RandomIndexWriter.java @@ -19,6 +19,7 @@ package org.apache.lucene.index; import java.io.Closeable; import java.io.IOException; +import java.util.Iterator; import java.util.Random; import org.apache.lucene.analysis.Analyzer; @@ -97,8 +98,43 @@ public class RandomIndexWriter implements Closeable { * Adds a Document. * @see IndexWriter#addDocument(Document) */ - public void addDocument(Document doc) throws IOException { - w.addDocument(doc); + public void addDocument(final Document doc) throws IOException { + if (r.nextInt(5) == 3) { + // TODO: maybe, we should simply buffer up added docs + // (but we need to clone them), and only when + // getReader, commit, etc. are called, we do an + // addDocuments? Would be better testing. + w.addDocuments(new Iterable() { + + // @Override -- not until Java 1.6 + public Iterator iterator() { + return new Iterator() { + boolean done; + + // @Override -- not until Java 1.6 + public boolean hasNext() { + return !done; + } + + // @Override -- not until Java 1.6 + public void remove() { + throw new UnsupportedOperationException(); + } + + // @Override -- not until Java 1.6 + public Document next() { + if (done) { + throw new IllegalStateException(); + } + done = true; + return doc; + } + }; + } + }); + } else { + w.addDocument(doc); + } maybeCommit(); } @@ -116,12 +152,53 @@ public class RandomIndexWriter implements Closeable { } } + public void addDocuments(Iterable docs) throws IOException { + w.addDocuments(docs); + maybeCommit(); + } + + public void updateDocuments(Term delTerm, Iterable docs) throws IOException { + w.updateDocuments(delTerm, docs); + maybeCommit(); + } + /** * Updates a document. * @see IndexWriter#updateDocument(Term, Document) */ - public void updateDocument(Term t, Document doc) throws IOException { - w.updateDocument(t, doc); + public void updateDocument(Term t, final Document doc) throws IOException { + if (r.nextInt(5) == 3) { + w.updateDocuments(t, new Iterable() { + + // @Override -- not until Java 1.6 + public Iterator iterator() { + return new Iterator() { + boolean done; + + // @Override -- not until Java 1.6 + public boolean hasNext() { + return !done; + } + + // @Override -- not until Java 1.6 + public void remove() { + throw new UnsupportedOperationException(); + } + + // @Override -- not until Java 1.6 + public Document next() { + if (done) { + throw new IllegalStateException(); + } + done = true; + return doc; + } + }; + } + }); + } else { + w.updateDocument(t, doc); + } maybeCommit(); } diff --git a/lucene/src/test/org/apache/lucene/index/TestIndexWriterExceptions.java b/lucene/src/test/org/apache/lucene/index/TestIndexWriterExceptions.java index bda5bc449e7..412daae2eb7 100644 --- a/lucene/src/test/org/apache/lucene/index/TestIndexWriterExceptions.java +++ b/lucene/src/test/org/apache/lucene/index/TestIndexWriterExceptions.java @@ -17,24 +17,16 @@ package org.apache.lucene.index; * limitations under the License. */ -import java.util.ArrayList; -import java.util.List; -import java.util.Random; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.PrintStream; import java.io.Reader; +import java.io.StringReader; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Random; -import org.apache.lucene.util.Bits; -import org.apache.lucene.util.BytesRef; -import org.apache.lucene.util.LuceneTestCase; -import org.apache.lucene.util._TestUtil; -import org.apache.lucene.search.DocIdSetIterator; -import org.apache.lucene.store.Directory; -import org.apache.lucene.store.IndexInput; -import org.apache.lucene.store.IndexOutput; -import org.apache.lucene.store.MockDirectoryWrapper; -import org.apache.lucene.store.RAMDirectory; import org.apache.lucene.analysis.Analyzer; import org.apache.lucene.analysis.MockAnalyzer; import org.apache.lucene.analysis.MockTokenizer; @@ -43,9 +35,54 @@ import org.apache.lucene.analysis.TokenStream; import org.apache.lucene.document.Document; import org.apache.lucene.document.Field; import org.apache.lucene.index.IndexWriterConfig.OpenMode; +import org.apache.lucene.search.DocIdSetIterator; +import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.PhraseQuery; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.IndexInput; +import org.apache.lucene.store.IndexOutput; +import org.apache.lucene.store.MockDirectoryWrapper; +import org.apache.lucene.store.RAMDirectory; +import org.apache.lucene.util.Bits; +import org.apache.lucene.util.BytesRef; +import org.apache.lucene.util.LuceneTestCase; +import org.apache.lucene.util._TestUtil; public class TestIndexWriterExceptions extends LuceneTestCase { + private static class DocCopyIterator implements Iterable { + private final Document doc; + private final int count; + + public DocCopyIterator(Document doc, int count) { + this.count = count; + this.doc = doc; + } + + // @Override -- not until Java 1.6 + public Iterator iterator() { + return new Iterator() { + int upto; + + // @Override -- not until Java 1.6 + public boolean hasNext() { + return upto < count; + } + + // @Override -- not until Java 1.6 + public Document next() { + upto++; + return doc; + } + + // @Override -- not until Java 1.6 + public void remove() { + throw new UnsupportedOperationException(); + } + }; + } + } + private class IndexerThread extends Thread { IndexWriter writer; @@ -87,7 +124,11 @@ public class TestIndexWriterExceptions extends LuceneTestCase { idField.setValue(id); Term idTerm = new Term("id", id); try { - writer.updateDocument(idTerm, doc); + if (r.nextBoolean()) { + writer.updateDocuments(idTerm, new DocCopyIterator(doc, _TestUtil.nextInt(r, 1, 20))); + } else { + writer.updateDocument(idTerm, doc); + } } catch (RuntimeException re) { if (VERBOSE) { System.out.println(Thread.currentThread().getName() + ": EXC: "); @@ -136,7 +177,7 @@ public class TestIndexWriterExceptions extends LuceneTestCase { @Override boolean testPoint(String name) { - if (doFail.get() != null && !name.equals("startDoFlush") && r.nextInt(20) == 17) { + if (doFail.get() != null && !name.equals("startDoFlush") && r.nextInt(40) == 17) { if (VERBOSE) { System.out.println(Thread.currentThread().getName() + ": NOW FAIL: " + name); new Throwable().printStackTrace(System.out); @@ -267,6 +308,8 @@ public class TestIndexWriterExceptions extends LuceneTestCase { } } + private static String CRASH_FAIL_MESSAGE = "I'm experiencing problems"; + private class CrashingFilter extends TokenFilter { String fieldName; int count; @@ -279,7 +322,7 @@ public class TestIndexWriterExceptions extends LuceneTestCase { @Override public boolean incrementToken() throws IOException { if (this.fieldName.equals("crash") && count++ >= 4) - throw new IOException("I'm experiencing problems"); + throw new IOException(CRASH_FAIL_MESSAGE); return input.incrementToken(); } @@ -1278,4 +1321,141 @@ public class TestIndexWriterExceptions extends LuceneTestCase { } } } + + public void testAddDocsNonAbortingException() throws Exception { + final Directory dir = newDirectory(); + final RandomIndexWriter w = new RandomIndexWriter(random, dir); + final int numDocs1 = random.nextInt(25); + for(int docCount=0;docCount docs = new ArrayList(); + for(int docCount=0;docCount<7;docCount++) { + Document doc = new Document(); + docs.add(doc); + doc.add(newField("id", docCount+"", Field.Index.NOT_ANALYZED)); + doc.add(newField("content", "silly content " + docCount, Field.Index.ANALYZED)); + if (docCount == 4) { + Field f = newField("crash", "", Field.Index.ANALYZED); + doc.add(f); + MockTokenizer tokenizer = new MockTokenizer(new StringReader("crash me on the 4th token"), MockTokenizer.WHITESPACE, false); + tokenizer.setEnableChecks(false); // disable workflow checking as we forcefully close() in exceptional cases. + f.setTokenStream(new CrashingFilter("crash", tokenizer)); + } + } + try { + w.addDocuments(docs); + // BUG: CrashingFilter didn't + fail("did not hit expected exception"); + } catch (IOException ioe) { + // expected + assertEquals(CRASH_FAIL_MESSAGE, ioe.getMessage()); + } + + final int numDocs2 = random.nextInt(25); + for(int docCount=0;docCount docs = new ArrayList(); + final int numDocs2 = random.nextInt(25); + for(int docCount=0;docCount subIDs; + public boolean deleted; + + public SubDocs(String packID, List subIDs) { + this.packID = packID; + this.subIDs = subIDs; + } + } + + // TODO: is there a pre-existing way to do this!!! + private Document cloneDoc(Document doc1) { + final Document doc2 = new Document(); + for(Fieldable f : doc1.getFields()) { + Field field1 = (Field) f; + + Field field2 = new Field(field1.name(), + field1.stringValue(), + field1.isStored() ? Field.Store.YES : Field.Store.NO, + field1.isIndexed() ? (field1.isTokenized() ? Field.Index.ANALYZED : Field.Index.NOT_ANALYZED) : Field.Index.NO); + if (field1.getOmitNorms()) { + field2.setOmitNorms(true); + } + if (field1.getOmitTermFreqAndPositions()) { + field2.setOmitTermFreqAndPositions(true); + } + doc2.add(field2); + } + + return doc2; + } + @Test public void testNRTThreads() throws Exception { @@ -121,13 +156,16 @@ public class TestNRTThreads extends LuceneTestCase { final int NUM_INDEX_THREADS = 2; final int NUM_SEARCH_THREADS = 3; + final int RUN_TIME_SEC = LuceneTestCase.TEST_NIGHTLY ? 300 : 5; final AtomicBoolean failed = new AtomicBoolean(); final AtomicInteger addCount = new AtomicInteger(); final AtomicInteger delCount = new AtomicInteger(); + final AtomicInteger packCount = new AtomicInteger(); final Set delIDs = Collections.synchronizedSet(new HashSet()); + final List allSubDocs = Collections.synchronizedList(new ArrayList()); final long stopTime = System.currentTimeMillis() + RUN_TIME_SEC*1000; Thread[] threads = new Thread[NUM_INDEX_THREADS]; @@ -135,7 +173,9 @@ public class TestNRTThreads extends LuceneTestCase { threads[thread] = new Thread() { @Override public void run() { + // TODO: would be better if this were cross thread, so that we make sure one thread deleting anothers added docs works: final List toDeleteIDs = new ArrayList(); + final List toDeleteSubDocs = new ArrayList(); while(System.currentTimeMillis() < stopTime && !failed.get()) { try { Document doc = docs.nextDoc(); @@ -153,7 +193,92 @@ public class TestNRTThreads extends LuceneTestCase { if (VERBOSE) { //System.out.println(Thread.currentThread().getName() + ": add doc id:" + doc.get("docid")); } - writer.addDocument(doc); + + if (random.nextBoolean()) { + // Add a pack of adjacent sub-docs + final String packID; + final SubDocs delSubDocs; + if (toDeleteSubDocs.size() > 0 && random.nextBoolean()) { + delSubDocs = toDeleteSubDocs.get(random.nextInt(toDeleteSubDocs.size())); + assert !delSubDocs.deleted; + toDeleteSubDocs.remove(delSubDocs); + // reuse prior packID + packID = delSubDocs.packID; + } else { + delSubDocs = null; + // make new packID + packID = packCount.getAndIncrement() + ""; + } + + final Field packIDField = newField("packID", packID, Field.Store.YES, Field.Index.NOT_ANALYZED); + final List docIDs = new ArrayList(); + final SubDocs subDocs = new SubDocs(packID, docIDs); + final List docsList = new ArrayList(); + + allSubDocs.add(subDocs); + doc.add(packIDField); + docsList.add(cloneDoc(doc)); + docIDs.add(doc.get("docid")); + + final int maxDocCount = _TestUtil.nextInt(random, 1, 10); + while(docsList.size() < maxDocCount) { + doc = docs.nextDoc(); + if (doc == null) { + break; + } + docsList.add(cloneDoc(doc)); + docIDs.add(doc.get("docid")); + } + addCount.addAndGet(docsList.size()); + + if (delSubDocs != null) { + delSubDocs.deleted = true; + delIDs.addAll(delSubDocs.subIDs); + delCount.addAndGet(delSubDocs.subIDs.size()); + if (VERBOSE) { + System.out.println("TEST: update pack packID=" + delSubDocs.packID + " count=" + docsList.size() + " docs=" + docIDs); + } + writer.updateDocuments(new Term("packID", delSubDocs.packID), docsList); + /* + // non-atomic: + writer.deleteDocuments(new Term("packID", delSubDocs.packID)); + for(Document subDoc : docsList) { + writer.addDocument(subDoc); + } + */ + } else { + if (VERBOSE) { + System.out.println("TEST: add pack packID=" + packID + " count=" + docsList.size() + " docs=" + docIDs); + } + writer.addDocuments(docsList); + + /* + // non-atomic: + for(Document subDoc : docsList) { + writer.addDocument(subDoc); + } + */ + } + doc.removeField("packID"); + + if (random.nextInt(5) == 2) { + if (VERBOSE) { + //System.out.println(Thread.currentThread().getName() + ": buffer del id:" + packID); + } + toDeleteSubDocs.add(subDocs); + } + + } else { + writer.addDocument(doc); + addCount.getAndIncrement(); + + if (random.nextInt(5) == 3) { + if (VERBOSE) { + //System.out.println(Thread.currentThread().getName() + ": buffer del id:" + doc.get("docid")); + } + toDeleteIDs.add(doc.get("docid")); + } + } } else { // we use update but it never replaces a // prior doc @@ -161,14 +286,17 @@ public class TestNRTThreads extends LuceneTestCase { //System.out.println(Thread.currentThread().getName() + ": update doc id:" + doc.get("docid")); } writer.updateDocument(new Term("docid", doc.get("docid")), doc); - } - if (random.nextInt(5) == 3) { - if (VERBOSE) { - //System.out.println(Thread.currentThread().getName() + ": buffer del id:" + doc.get("docid")); + addCount.getAndIncrement(); + + if (random.nextInt(5) == 3) { + if (VERBOSE) { + //System.out.println(Thread.currentThread().getName() + ": buffer del id:" + doc.get("docid")); + } + toDeleteIDs.add(doc.get("docid")); } - toDeleteIDs.add(doc.get("docid")); } - if (random.nextInt(50) == 17) { + + if (random.nextInt(30) == 17) { if (VERBOSE) { //System.out.println(Thread.currentThread().getName() + ": apply " + toDeleteIDs.size() + " deletes"); } @@ -184,8 +312,19 @@ public class TestNRTThreads extends LuceneTestCase { } delIDs.addAll(toDeleteIDs); toDeleteIDs.clear(); + + for(SubDocs subDocs : toDeleteSubDocs) { + assert !subDocs.deleted; + writer.deleteDocuments(new Term("packID", subDocs.packID)); + subDocs.deleted = true; + if (VERBOSE) { + System.out.println(" del subs: " + subDocs.subIDs + " packID=" + subDocs.packID); + } + delIDs.addAll(subDocs.subIDs); + delCount.addAndGet(subDocs.subIDs.size()); + } + toDeleteSubDocs.clear(); } - addCount.getAndIncrement(); if (addedField != null) { doc.removeField(addedField); } @@ -356,7 +495,7 @@ public class TestNRTThreads extends LuceneTestCase { if (VERBOSE) { System.out.println("TEST: done join [" + (System.currentTimeMillis()-t0) + " ms]; addCount=" + addCount + " delCount=" + delCount); } - + final IndexReader r2 = writer.getReader(); final IndexSearcher s = newSearcher(r2); boolean doFail = false; @@ -367,6 +506,43 @@ public class TestNRTThreads extends LuceneTestCase { doFail = true; } } + + // Make sure each group of sub-docs are still in docID order: + for(SubDocs subDocs : allSubDocs) { + if (!subDocs.deleted) { + // We sort by relevance but the scores should be identical so sort falls back to by docID: + TopDocs hits = s.search(new TermQuery(new Term("packID", subDocs.packID)), 20); + assertEquals(subDocs.subIDs.size(), hits.totalHits); + int lastDocID = -1; + int startDocID = -1; + for(ScoreDoc scoreDoc : hits.scoreDocs) { + final int docID = scoreDoc.doc; + if (lastDocID != -1) { + assertEquals(1+lastDocID, docID); + } else { + startDocID = docID; + } + lastDocID = docID; + final Document doc = s.doc(docID); + assertEquals(subDocs.packID, doc.get("packID")); + } + + lastDocID = startDocID - 1; + for(String subID : subDocs.subIDs) { + hits = s.search(new TermQuery(new Term("docid", subID)), 1); + assertEquals(1, hits.totalHits); + final int docID = hits.scoreDocs[0].doc; + if (lastDocID != -1) { + assertEquals(1+lastDocID, docID); + } + lastDocID = docID; + } + } else { + for(String subID : subDocs.subIDs) { + assertEquals(0, s.search(new TermQuery(new Term("docid", subID)), 1).totalHits); + } + } + } final int endID = Integer.parseInt(docs.nextDoc().get("docid")); for(int id=0;id