diff --git a/lucene/CHANGES.txt b/lucene/CHANGES.txt
index 191d740ce70..fb93517b96d 100644
--- a/lucene/CHANGES.txt
+++ b/lucene/CHANGES.txt
@@ -486,6 +486,10 @@ New features
document IDs and scores encountered during the search, and "replay" them to
another Collector. (Mike McCandless, Shai Erera)
+* LUCENE-3112: Added experimental IndexWriter.add/updateDocuments,
+ enabling a block of documents to be indexed, atomically, with
+ guaranteed sequential docIDs. (Mike McCandless)
+
API Changes
* LUCENE-3061: IndexWriter's getNextMerge() and merge(OneMerge) are now public
diff --git a/lucene/contrib/misc/src/java/org/apache/lucene/index/IndexSplitter.java b/lucene/contrib/misc/src/java/org/apache/lucene/index/IndexSplitter.java
index 2a42390fd82..3daf7aa7dd6 100644
--- a/lucene/contrib/misc/src/java/org/apache/lucene/index/IndexSplitter.java
+++ b/lucene/contrib/misc/src/java/org/apache/lucene/index/IndexSplitter.java
@@ -45,6 +45,11 @@ import org.apache.lucene.store.FSDirectory;
* @lucene.experimental You can easily
* accidentally remove segments from your index so be
* careful!
+ *
+ *
NOTE: this tool is unaware of documents added
+ * atomically via {@link IndexWriter#addDocuments} or {@link
+ * IndexWriter#updateDocuments}, which means it can easily
+ * break up such document groups.
*/
public class IndexSplitter {
public SegmentInfos infos;
diff --git a/lucene/contrib/misc/src/java/org/apache/lucene/index/MultiPassIndexSplitter.java b/lucene/contrib/misc/src/java/org/apache/lucene/index/MultiPassIndexSplitter.java
index ce42d8bbc99..c41d6dcd272 100644
--- a/lucene/contrib/misc/src/java/org/apache/lucene/index/MultiPassIndexSplitter.java
+++ b/lucene/contrib/misc/src/java/org/apache/lucene/index/MultiPassIndexSplitter.java
@@ -40,6 +40,11 @@ import org.apache.lucene.util.Version;
*
Note 2: the disadvantage of this tool is that source index needs to be
* read as many times as there are parts to be created, hence the name of this
* tool.
+ *
+ *
NOTE: this tool is unaware of documents added
+ * atomically via {@link IndexWriter#addDocuments} or {@link
+ * IndexWriter#updateDocuments}, which means it can easily
+ * break up such document groups.
*/
public class MultiPassIndexSplitter {
diff --git a/lucene/src/java/org/apache/lucene/index/DocumentsWriter.java b/lucene/src/java/org/apache/lucene/index/DocumentsWriter.java
index b9556ebe73b..3f97ac82272 100644
--- a/lucene/src/java/org/apache/lucene/index/DocumentsWriter.java
+++ b/lucene/src/java/org/apache/lucene/index/DocumentsWriter.java
@@ -274,11 +274,9 @@ final class DocumentsWriter {
flushControl.setClosed();
}
- boolean updateDocument(final Document doc, final Analyzer analyzer,
- final Term delTerm) throws CorruptIndexException, IOException {
+ private boolean preUpdate() throws CorruptIndexException, IOException {
ensureOpen();
boolean maybeMerge = false;
- final boolean isUpdate = delTerm != null;
if (flushControl.anyStalledThreads() || flushControl.numQueuedFlushes() > 0) {
// Help out flushing any queued DWPTs so we can un-stall:
if (infoStream != null) {
@@ -303,9 +301,59 @@ final class DocumentsWriter {
message("continue indexing after helpling out flushing DocumentsWriter is healthy");
}
}
+ return maybeMerge;
+ }
- final ThreadState perThread = perThreadPool.getAndLock(Thread.currentThread(),
- this, doc);
+ private boolean postUpdate(DocumentsWriterPerThread flushingDWPT, boolean maybeMerge) throws IOException {
+ if (flushingDWPT != null) {
+ maybeMerge |= doFlush(flushingDWPT);
+ } else {
+ final DocumentsWriterPerThread nextPendingFlush = flushControl.nextPendingFlush();
+ if (nextPendingFlush != null) {
+ maybeMerge |= doFlush(nextPendingFlush);
+ }
+ }
+
+ return maybeMerge;
+ }
+
+ boolean updateDocuments(final Iterable docs, final Analyzer analyzer,
+ final Term delTerm) throws CorruptIndexException, IOException {
+ boolean maybeMerge = preUpdate();
+
+ final ThreadState perThread = perThreadPool.getAndLock(Thread.currentThread(), this);
+ final DocumentsWriterPerThread flushingDWPT;
+
+ try {
+ if (!perThread.isActive()) {
+ ensureOpen();
+ assert false: "perThread is not active but we are still open";
+ }
+
+ final DocumentsWriterPerThread dwpt = perThread.perThread;
+ try {
+ final int docCount = dwpt.updateDocuments(docs, analyzer, delTerm);
+ numDocsInRAM.addAndGet(docCount);
+ } finally {
+ if (dwpt.checkAndResetHasAborted()) {
+ flushControl.doOnAbort(perThread);
+ }
+ }
+ final boolean isUpdate = delTerm != null;
+ flushingDWPT = flushControl.doAfterDocument(perThread, isUpdate);
+ } finally {
+ perThread.unlock();
+ }
+
+ return postUpdate(flushingDWPT, maybeMerge);
+ }
+
+ boolean updateDocument(final Document doc, final Analyzer analyzer,
+ final Term delTerm) throws CorruptIndexException, IOException {
+
+ boolean maybeMerge = preUpdate();
+
+ final ThreadState perThread = perThreadPool.getAndLock(Thread.currentThread(), this);
final DocumentsWriterPerThread flushingDWPT;
try {
@@ -324,20 +372,13 @@ final class DocumentsWriter {
flushControl.doOnAbort(perThread);
}
}
+ final boolean isUpdate = delTerm != null;
flushingDWPT = flushControl.doAfterDocument(perThread, isUpdate);
} finally {
perThread.unlock();
}
-
- if (flushingDWPT != null) {
- maybeMerge |= doFlush(flushingDWPT);
- } else {
- final DocumentsWriterPerThread nextPendingFlush = flushControl.nextPendingFlush();
- if (nextPendingFlush != null) {
- maybeMerge |= doFlush(nextPendingFlush);
- }
- }
- return maybeMerge;
+
+ return postUpdate(flushingDWPT, maybeMerge);
}
private boolean doFlush(DocumentsWriterPerThread flushingDWPT) throws IOException {
diff --git a/lucene/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java b/lucene/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java
index 81d6d9655c9..79b0959ef3b 100644
--- a/lucene/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java
+++ b/lucene/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java
@@ -68,7 +68,7 @@ public final class DocumentsWriterFlushControl {
this.stallControl = new DocumentsWriterStallControl();
this.perThreadPool = documentsWriter.perThreadPool;
this.flushPolicy = documentsWriter.flushPolicy;
- this.hardMaxBytesPerDWPT = config.getRAMPerThreadHardLimitMB() * 1024 * 1024;;
+ this.hardMaxBytesPerDWPT = config.getRAMPerThreadHardLimitMB() * 1024 * 1024;
this.config = config;
this.documentsWriter = documentsWriter;
}
@@ -162,8 +162,6 @@ public final class DocumentsWriterFlushControl {
stallControl.updateStalled(this);
assert assertMemory();
}
-
-
}
synchronized void doAfterFlush(DocumentsWriterPerThread dwpt) {
@@ -217,7 +215,7 @@ public final class DocumentsWriterFlushControl {
assert assertMemory();
// Take it out of the loop this DWPT is stale
perThreadPool.replaceForFlush(state, closed);
- }finally {
+ } finally {
stallControl.updateStalled(this);
}
}
diff --git a/lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java b/lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java
index 68334663fa0..553aff5f157 100644
--- a/lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java
+++ b/lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java
@@ -104,7 +104,7 @@ public class DocumentsWriterPerThread {
// largish:
doc = null;
analyzer = null;
- }
+ }
}
static class FlushedSegment {
@@ -253,6 +253,82 @@ public class DocumentsWriterPerThread {
finishDocument(delTerm);
}
+ public int updateDocuments(Iterable docs, Analyzer analyzer, Term delTerm) throws IOException {
+ assert writer.testPoint("DocumentsWriterPerThread addDocuments start");
+ assert deleteQueue != null;
+ docState.analyzer = analyzer;
+ if (segment == null) {
+ // this call is synchronized on IndexWriter.segmentInfos
+ segment = writer.newSegmentName();
+ assert numDocsInRAM == 0;
+ }
+
+ int docCount = 0;
+ try {
+ for(Document doc : docs) {
+ docState.doc = doc;
+ docState.docID = numDocsInRAM;
+ docCount++;
+
+ boolean success = false;
+ try {
+ consumer.processDocument(fieldInfos);
+ success = true;
+ } finally {
+ if (!success) {
+ // An exc is being thrown...
+
+ if (!aborting) {
+ // One of the documents hit a non-aborting
+ // exception (eg something happened during
+ // analysis). We now go and mark any docs
+ // from this batch that we had already indexed
+ // as deleted:
+ int docID = docState.docID;
+ final int endDocID = docID - docCount;
+ while (docID > endDocID) {
+ deleteDocID(docID);
+ docID--;
+ }
+
+ // Incr here because finishDocument will not
+ // be called (because an exc is being thrown):
+ numDocsInRAM++;
+ fieldInfos.revertUncommitted();
+ } else {
+ abort();
+ }
+ }
+ }
+ success = false;
+ try {
+ consumer.finishDocument();
+ success = true;
+ } finally {
+ if (!success) {
+ abort();
+ }
+ }
+
+ finishDocument(null);
+ }
+
+ // Apply delTerm only after all indexing has
+ // succeeded, but apply it only to docs prior to when
+ // this batch started:
+ if (delTerm != null) {
+ deleteQueue.add(delTerm, deleteSlice);
+ assert deleteSlice.isTailItem(delTerm) : "expected the delete term as the tail item";
+ deleteSlice.apply(pendingDeletes, numDocsInRAM-docCount);
+ }
+
+ } finally {
+ docState.clear();
+ }
+
+ return docCount;
+ }
+
private void finishDocument(Term delTerm) throws IOException {
/*
* here we actually finish the document in two steps 1. push the delete into
diff --git a/lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java b/lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java
index 45ca99ad5d8..8014db17dcc 100644
--- a/lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java
+++ b/lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java
@@ -19,7 +19,6 @@ package org.apache.lucene.index;
import java.util.Iterator;
import java.util.concurrent.locks.ReentrantLock;
-import org.apache.lucene.document.Document;
import org.apache.lucene.index.FieldInfos.FieldNumberBiMap;
import org.apache.lucene.index.SegmentCodecs.SegmentCodecsBuilder;
import org.apache.lucene.index.codecs.CodecProvider;
@@ -212,7 +211,7 @@ public abstract class DocumentsWriterPerThreadPool {
// don't recycle DWPT by default
}
- public abstract ThreadState getAndLock(Thread requestingThread, DocumentsWriter documentsWriter, Document doc);
+ public abstract ThreadState getAndLock(Thread requestingThread, DocumentsWriter documentsWriter);
/**
* Returns an iterator providing access to all {@link ThreadState}
diff --git a/lucene/src/java/org/apache/lucene/index/IndexWriter.java b/lucene/src/java/org/apache/lucene/index/IndexWriter.java
index 305676ec2b6..54f6ad2173e 100644
--- a/lucene/src/java/org/apache/lucene/index/IndexWriter.java
+++ b/lucene/src/java/org/apache/lucene/index/IndexWriter.java
@@ -1227,6 +1227,111 @@ public class IndexWriter implements Closeable {
updateDocument(null, doc, analyzer);
}
+ /**
+ * Atomically adds a block of documents with sequentially
+ * assigned document IDs, such that an external reader
+ * will see all or none of the documents.
+ *
+ * WARNING: the index does not currently record
+ * which documents were added as a block. Today this is
+ * fine, because merging will preserve the block (as long
+ * as none them were deleted). But it's possible in the
+ * future that Lucene may more aggressively re-order
+ * documents (for example, perhaps to obtain better index
+ * compression), in which case you may need to fully
+ * re-index your documents at that time.
+ *
+ *
See {@link #addDocument(Document)} for details on
+ * index and IndexWriter state after an Exception, and
+ * flushing/merging temporary free space requirements.
+ *
+ * NOTE: tools that do offline splitting of an index
+ * (for example, IndexSplitter in contrib) or
+ * re-sorting of documents (for example, IndexSorter in
+ * contrib) are not aware of these atomically added documents
+ * and will likely break them up. Use such tools at your
+ * own risk!
+ *
+ *
NOTE: if this method hits an OutOfMemoryError
+ * you should immediately close the writer. See above for details.
+ *
+ * @throws CorruptIndexException if the index is corrupt
+ * @throws IOException if there is a low-level IO error
+ *
+ * @lucene.experimental
+ */
+ public void addDocuments(Iterable docs) throws CorruptIndexException, IOException {
+ addDocuments(docs, analyzer);
+ }
+
+ /**
+ * Atomically adds a block of documents, analyzed using the
+ * provided analyzer, with sequentially assigned document
+ * IDs, such that an external reader will see all or none
+ * of the documents.
+ *
+ * @throws CorruptIndexException if the index is corrupt
+ * @throws IOException if there is a low-level IO error
+ *
+ * @lucene.experimental
+ */
+ public void addDocuments(Iterable docs, Analyzer analyzer) throws CorruptIndexException, IOException {
+ updateDocuments(null, docs, analyzer);
+ }
+
+ /**
+ * Atomically deletes documents matching the provided
+ * delTerm and adds a block of documents with sequentially
+ * assigned document IDs, such that an external reader
+ * will see all or none of the documents.
+ *
+ * See {@link #addDocuments(Iterable)}.
+ *
+ * @throws CorruptIndexException if the index is corrupt
+ * @throws IOException if there is a low-level IO error
+ *
+ * @lucene.experimental
+ */
+ public void updateDocuments(Term delTerm, Iterable docs) throws CorruptIndexException, IOException {
+ updateDocuments(delTerm, docs, analyzer);
+ }
+
+ /**
+ * Atomically deletes documents matching the provided
+ * delTerm and adds a block of documents, analyzed using
+ * the provided analyzer, with sequentially
+ * assigned document IDs, such that an external reader
+ * will see all or none of the documents.
+ *
+ * See {@link #addDocuments(Iterable)}.
+ *
+ * @throws CorruptIndexException if the index is corrupt
+ * @throws IOException if there is a low-level IO error
+ *
+ * @lucene.experimental
+ */
+ public void updateDocuments(Term delTerm, Iterable docs, Analyzer analyzer) throws CorruptIndexException, IOException {
+ ensureOpen();
+ try {
+ boolean success = false;
+ boolean anySegmentFlushed = false;
+ try {
+ anySegmentFlushed = docWriter.updateDocuments(docs, analyzer, delTerm);
+ success = true;
+ } finally {
+ if (!success && infoStream != null) {
+ message("hit exception updating document");
+ }
+ }
+ if (anySegmentFlushed) {
+ maybeMerge();
+ }
+ } catch (OutOfMemoryError oom) {
+ handleOOM(oom, "updateDocuments");
+ }
+ }
+
/**
* Deletes the document(s) containing term
.
*
diff --git a/lucene/src/java/org/apache/lucene/index/TermVectorsTermsWriter.java b/lucene/src/java/org/apache/lucene/index/TermVectorsTermsWriter.java
index fa956dda190..dcc7c8da921 100644
--- a/lucene/src/java/org/apache/lucene/index/TermVectorsTermsWriter.java
+++ b/lucene/src/java/org/apache/lucene/index/TermVectorsTermsWriter.java
@@ -139,7 +139,7 @@ final class TermVectorsTermsWriter extends TermsHashConsumer {
}
}
- assert lastDocID == docState.docID;
+ assert lastDocID == docState.docID: "lastDocID=" + lastDocID + " docState.docID=" + docState.docID;
lastDocID++;
diff --git a/lucene/src/java/org/apache/lucene/index/ThreadAffinityDocumentsWriterThreadPool.java b/lucene/src/java/org/apache/lucene/index/ThreadAffinityDocumentsWriterThreadPool.java
index f478f29df1f..b5f0b6cb20d 100644
--- a/lucene/src/java/org/apache/lucene/index/ThreadAffinityDocumentsWriterThreadPool.java
+++ b/lucene/src/java/org/apache/lucene/index/ThreadAffinityDocumentsWriterThreadPool.java
@@ -18,7 +18,6 @@ package org.apache.lucene.index;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
-import org.apache.lucene.document.Document;
import org.apache.lucene.index.DocumentsWriterPerThreadPool.ThreadState; //javadoc
/**
@@ -48,12 +47,10 @@ public class ThreadAffinityDocumentsWriterThreadPool extends DocumentsWriterPerT
}
@Override
- public ThreadState getAndLock(Thread requestingThread, DocumentsWriter documentsWriter, Document doc) {
+ public ThreadState getAndLock(Thread requestingThread, DocumentsWriter documentsWriter) {
ThreadState threadState = threadBindings.get(requestingThread);
- if (threadState != null) {
- if (threadState.tryLock()) {
- return threadState;
- }
+ if (threadState != null && threadState.tryLock()) {
+ return threadState;
}
ThreadState minThreadState = null;
diff --git a/lucene/src/test-framework/org/apache/lucene/index/RandomIndexWriter.java b/lucene/src/test-framework/org/apache/lucene/index/RandomIndexWriter.java
index 0712e4104c4..627690bea43 100644
--- a/lucene/src/test-framework/org/apache/lucene/index/RandomIndexWriter.java
+++ b/lucene/src/test-framework/org/apache/lucene/index/RandomIndexWriter.java
@@ -19,6 +19,7 @@ package org.apache.lucene.index;
import java.io.Closeable;
import java.io.IOException;
+import java.util.Iterator;
import java.util.Random;
import org.apache.lucene.analysis.Analyzer;
@@ -97,8 +98,43 @@ public class RandomIndexWriter implements Closeable {
* Adds a Document.
* @see IndexWriter#addDocument(Document)
*/
- public void addDocument(Document doc) throws IOException {
- w.addDocument(doc);
+ public void addDocument(final Document doc) throws IOException {
+ if (r.nextInt(5) == 3) {
+ // TODO: maybe, we should simply buffer up added docs
+ // (but we need to clone them), and only when
+ // getReader, commit, etc. are called, we do an
+ // addDocuments? Would be better testing.
+ w.addDocuments(new Iterable() {
+
+ // @Override -- not until Java 1.6
+ public Iterator iterator() {
+ return new Iterator() {
+ boolean done;
+
+ // @Override -- not until Java 1.6
+ public boolean hasNext() {
+ return !done;
+ }
+
+ // @Override -- not until Java 1.6
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+
+ // @Override -- not until Java 1.6
+ public Document next() {
+ if (done) {
+ throw new IllegalStateException();
+ }
+ done = true;
+ return doc;
+ }
+ };
+ }
+ });
+ } else {
+ w.addDocument(doc);
+ }
maybeCommit();
}
@@ -116,12 +152,53 @@ public class RandomIndexWriter implements Closeable {
}
}
+ public void addDocuments(Iterable docs) throws IOException {
+ w.addDocuments(docs);
+ maybeCommit();
+ }
+
+ public void updateDocuments(Term delTerm, Iterable docs) throws IOException {
+ w.updateDocuments(delTerm, docs);
+ maybeCommit();
+ }
+
/**
* Updates a document.
* @see IndexWriter#updateDocument(Term, Document)
*/
- public void updateDocument(Term t, Document doc) throws IOException {
- w.updateDocument(t, doc);
+ public void updateDocument(Term t, final Document doc) throws IOException {
+ if (r.nextInt(5) == 3) {
+ w.updateDocuments(t, new Iterable() {
+
+ // @Override -- not until Java 1.6
+ public Iterator iterator() {
+ return new Iterator() {
+ boolean done;
+
+ // @Override -- not until Java 1.6
+ public boolean hasNext() {
+ return !done;
+ }
+
+ // @Override -- not until Java 1.6
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+
+ // @Override -- not until Java 1.6
+ public Document next() {
+ if (done) {
+ throw new IllegalStateException();
+ }
+ done = true;
+ return doc;
+ }
+ };
+ }
+ });
+ } else {
+ w.updateDocument(t, doc);
+ }
maybeCommit();
}
diff --git a/lucene/src/test/org/apache/lucene/index/TestIndexWriterExceptions.java b/lucene/src/test/org/apache/lucene/index/TestIndexWriterExceptions.java
index bda5bc449e7..412daae2eb7 100644
--- a/lucene/src/test/org/apache/lucene/index/TestIndexWriterExceptions.java
+++ b/lucene/src/test/org/apache/lucene/index/TestIndexWriterExceptions.java
@@ -17,24 +17,16 @@ package org.apache.lucene.index;
* limitations under the License.
*/
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Random;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.PrintStream;
import java.io.Reader;
+import java.io.StringReader;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Random;
-import org.apache.lucene.util.Bits;
-import org.apache.lucene.util.BytesRef;
-import org.apache.lucene.util.LuceneTestCase;
-import org.apache.lucene.util._TestUtil;
-import org.apache.lucene.search.DocIdSetIterator;
-import org.apache.lucene.store.Directory;
-import org.apache.lucene.store.IndexInput;
-import org.apache.lucene.store.IndexOutput;
-import org.apache.lucene.store.MockDirectoryWrapper;
-import org.apache.lucene.store.RAMDirectory;
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.analysis.MockAnalyzer;
import org.apache.lucene.analysis.MockTokenizer;
@@ -43,9 +35,54 @@ import org.apache.lucene.analysis.TokenStream;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
import org.apache.lucene.index.IndexWriterConfig.OpenMode;
+import org.apache.lucene.search.DocIdSetIterator;
+import org.apache.lucene.search.IndexSearcher;
+import org.apache.lucene.search.PhraseQuery;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.IndexInput;
+import org.apache.lucene.store.IndexOutput;
+import org.apache.lucene.store.MockDirectoryWrapper;
+import org.apache.lucene.store.RAMDirectory;
+import org.apache.lucene.util.Bits;
+import org.apache.lucene.util.BytesRef;
+import org.apache.lucene.util.LuceneTestCase;
+import org.apache.lucene.util._TestUtil;
public class TestIndexWriterExceptions extends LuceneTestCase {
+ private static class DocCopyIterator implements Iterable {
+ private final Document doc;
+ private final int count;
+
+ public DocCopyIterator(Document doc, int count) {
+ this.count = count;
+ this.doc = doc;
+ }
+
+ // @Override -- not until Java 1.6
+ public Iterator iterator() {
+ return new Iterator() {
+ int upto;
+
+ // @Override -- not until Java 1.6
+ public boolean hasNext() {
+ return upto < count;
+ }
+
+ // @Override -- not until Java 1.6
+ public Document next() {
+ upto++;
+ return doc;
+ }
+
+ // @Override -- not until Java 1.6
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ };
+ }
+ }
+
private class IndexerThread extends Thread {
IndexWriter writer;
@@ -87,7 +124,11 @@ public class TestIndexWriterExceptions extends LuceneTestCase {
idField.setValue(id);
Term idTerm = new Term("id", id);
try {
- writer.updateDocument(idTerm, doc);
+ if (r.nextBoolean()) {
+ writer.updateDocuments(idTerm, new DocCopyIterator(doc, _TestUtil.nextInt(r, 1, 20)));
+ } else {
+ writer.updateDocument(idTerm, doc);
+ }
} catch (RuntimeException re) {
if (VERBOSE) {
System.out.println(Thread.currentThread().getName() + ": EXC: ");
@@ -136,7 +177,7 @@ public class TestIndexWriterExceptions extends LuceneTestCase {
@Override
boolean testPoint(String name) {
- if (doFail.get() != null && !name.equals("startDoFlush") && r.nextInt(20) == 17) {
+ if (doFail.get() != null && !name.equals("startDoFlush") && r.nextInt(40) == 17) {
if (VERBOSE) {
System.out.println(Thread.currentThread().getName() + ": NOW FAIL: " + name);
new Throwable().printStackTrace(System.out);
@@ -267,6 +308,8 @@ public class TestIndexWriterExceptions extends LuceneTestCase {
}
}
+ private static String CRASH_FAIL_MESSAGE = "I'm experiencing problems";
+
private class CrashingFilter extends TokenFilter {
String fieldName;
int count;
@@ -279,7 +322,7 @@ public class TestIndexWriterExceptions extends LuceneTestCase {
@Override
public boolean incrementToken() throws IOException {
if (this.fieldName.equals("crash") && count++ >= 4)
- throw new IOException("I'm experiencing problems");
+ throw new IOException(CRASH_FAIL_MESSAGE);
return input.incrementToken();
}
@@ -1278,4 +1321,141 @@ public class TestIndexWriterExceptions extends LuceneTestCase {
}
}
}
+
+ public void testAddDocsNonAbortingException() throws Exception {
+ final Directory dir = newDirectory();
+ final RandomIndexWriter w = new RandomIndexWriter(random, dir);
+ final int numDocs1 = random.nextInt(25);
+ for(int docCount=0;docCount docs = new ArrayList();
+ for(int docCount=0;docCount<7;docCount++) {
+ Document doc = new Document();
+ docs.add(doc);
+ doc.add(newField("id", docCount+"", Field.Index.NOT_ANALYZED));
+ doc.add(newField("content", "silly content " + docCount, Field.Index.ANALYZED));
+ if (docCount == 4) {
+ Field f = newField("crash", "", Field.Index.ANALYZED);
+ doc.add(f);
+ MockTokenizer tokenizer = new MockTokenizer(new StringReader("crash me on the 4th token"), MockTokenizer.WHITESPACE, false);
+ tokenizer.setEnableChecks(false); // disable workflow checking as we forcefully close() in exceptional cases.
+ f.setTokenStream(new CrashingFilter("crash", tokenizer));
+ }
+ }
+ try {
+ w.addDocuments(docs);
+ // BUG: CrashingFilter didn't
+ fail("did not hit expected exception");
+ } catch (IOException ioe) {
+ // expected
+ assertEquals(CRASH_FAIL_MESSAGE, ioe.getMessage());
+ }
+
+ final int numDocs2 = random.nextInt(25);
+ for(int docCount=0;docCount docs = new ArrayList();
+ final int numDocs2 = random.nextInt(25);
+ for(int docCount=0;docCount subIDs;
+ public boolean deleted;
+
+ public SubDocs(String packID, List subIDs) {
+ this.packID = packID;
+ this.subIDs = subIDs;
+ }
+ }
+
+ // TODO: is there a pre-existing way to do this!!!
+ private Document cloneDoc(Document doc1) {
+ final Document doc2 = new Document();
+ for(Fieldable f : doc1.getFields()) {
+ Field field1 = (Field) f;
+
+ Field field2 = new Field(field1.name(),
+ field1.stringValue(),
+ field1.isStored() ? Field.Store.YES : Field.Store.NO,
+ field1.isIndexed() ? (field1.isTokenized() ? Field.Index.ANALYZED : Field.Index.NOT_ANALYZED) : Field.Index.NO);
+ if (field1.getOmitNorms()) {
+ field2.setOmitNorms(true);
+ }
+ if (field1.getOmitTermFreqAndPositions()) {
+ field2.setOmitTermFreqAndPositions(true);
+ }
+ doc2.add(field2);
+ }
+
+ return doc2;
+ }
+
@Test
public void testNRTThreads() throws Exception {
@@ -121,13 +156,16 @@ public class TestNRTThreads extends LuceneTestCase {
final int NUM_INDEX_THREADS = 2;
final int NUM_SEARCH_THREADS = 3;
+
final int RUN_TIME_SEC = LuceneTestCase.TEST_NIGHTLY ? 300 : 5;
final AtomicBoolean failed = new AtomicBoolean();
final AtomicInteger addCount = new AtomicInteger();
final AtomicInteger delCount = new AtomicInteger();
+ final AtomicInteger packCount = new AtomicInteger();
final Set delIDs = Collections.synchronizedSet(new HashSet());
+ final List allSubDocs = Collections.synchronizedList(new ArrayList());
final long stopTime = System.currentTimeMillis() + RUN_TIME_SEC*1000;
Thread[] threads = new Thread[NUM_INDEX_THREADS];
@@ -135,7 +173,9 @@ public class TestNRTThreads extends LuceneTestCase {
threads[thread] = new Thread() {
@Override
public void run() {
+ // TODO: would be better if this were cross thread, so that we make sure one thread deleting anothers added docs works:
final List toDeleteIDs = new ArrayList();
+ final List toDeleteSubDocs = new ArrayList();
while(System.currentTimeMillis() < stopTime && !failed.get()) {
try {
Document doc = docs.nextDoc();
@@ -153,7 +193,92 @@ public class TestNRTThreads extends LuceneTestCase {
if (VERBOSE) {
//System.out.println(Thread.currentThread().getName() + ": add doc id:" + doc.get("docid"));
}
- writer.addDocument(doc);
+
+ if (random.nextBoolean()) {
+ // Add a pack of adjacent sub-docs
+ final String packID;
+ final SubDocs delSubDocs;
+ if (toDeleteSubDocs.size() > 0 && random.nextBoolean()) {
+ delSubDocs = toDeleteSubDocs.get(random.nextInt(toDeleteSubDocs.size()));
+ assert !delSubDocs.deleted;
+ toDeleteSubDocs.remove(delSubDocs);
+ // reuse prior packID
+ packID = delSubDocs.packID;
+ } else {
+ delSubDocs = null;
+ // make new packID
+ packID = packCount.getAndIncrement() + "";
+ }
+
+ final Field packIDField = newField("packID", packID, Field.Store.YES, Field.Index.NOT_ANALYZED);
+ final List docIDs = new ArrayList();
+ final SubDocs subDocs = new SubDocs(packID, docIDs);
+ final List docsList = new ArrayList();
+
+ allSubDocs.add(subDocs);
+ doc.add(packIDField);
+ docsList.add(cloneDoc(doc));
+ docIDs.add(doc.get("docid"));
+
+ final int maxDocCount = _TestUtil.nextInt(random, 1, 10);
+ while(docsList.size() < maxDocCount) {
+ doc = docs.nextDoc();
+ if (doc == null) {
+ break;
+ }
+ docsList.add(cloneDoc(doc));
+ docIDs.add(doc.get("docid"));
+ }
+ addCount.addAndGet(docsList.size());
+
+ if (delSubDocs != null) {
+ delSubDocs.deleted = true;
+ delIDs.addAll(delSubDocs.subIDs);
+ delCount.addAndGet(delSubDocs.subIDs.size());
+ if (VERBOSE) {
+ System.out.println("TEST: update pack packID=" + delSubDocs.packID + " count=" + docsList.size() + " docs=" + docIDs);
+ }
+ writer.updateDocuments(new Term("packID", delSubDocs.packID), docsList);
+ /*
+ // non-atomic:
+ writer.deleteDocuments(new Term("packID", delSubDocs.packID));
+ for(Document subDoc : docsList) {
+ writer.addDocument(subDoc);
+ }
+ */
+ } else {
+ if (VERBOSE) {
+ System.out.println("TEST: add pack packID=" + packID + " count=" + docsList.size() + " docs=" + docIDs);
+ }
+ writer.addDocuments(docsList);
+
+ /*
+ // non-atomic:
+ for(Document subDoc : docsList) {
+ writer.addDocument(subDoc);
+ }
+ */
+ }
+ doc.removeField("packID");
+
+ if (random.nextInt(5) == 2) {
+ if (VERBOSE) {
+ //System.out.println(Thread.currentThread().getName() + ": buffer del id:" + packID);
+ }
+ toDeleteSubDocs.add(subDocs);
+ }
+
+ } else {
+ writer.addDocument(doc);
+ addCount.getAndIncrement();
+
+ if (random.nextInt(5) == 3) {
+ if (VERBOSE) {
+ //System.out.println(Thread.currentThread().getName() + ": buffer del id:" + doc.get("docid"));
+ }
+ toDeleteIDs.add(doc.get("docid"));
+ }
+ }
} else {
// we use update but it never replaces a
// prior doc
@@ -161,14 +286,17 @@ public class TestNRTThreads extends LuceneTestCase {
//System.out.println(Thread.currentThread().getName() + ": update doc id:" + doc.get("docid"));
}
writer.updateDocument(new Term("docid", doc.get("docid")), doc);
- }
- if (random.nextInt(5) == 3) {
- if (VERBOSE) {
- //System.out.println(Thread.currentThread().getName() + ": buffer del id:" + doc.get("docid"));
+ addCount.getAndIncrement();
+
+ if (random.nextInt(5) == 3) {
+ if (VERBOSE) {
+ //System.out.println(Thread.currentThread().getName() + ": buffer del id:" + doc.get("docid"));
+ }
+ toDeleteIDs.add(doc.get("docid"));
}
- toDeleteIDs.add(doc.get("docid"));
}
- if (random.nextInt(50) == 17) {
+
+ if (random.nextInt(30) == 17) {
if (VERBOSE) {
//System.out.println(Thread.currentThread().getName() + ": apply " + toDeleteIDs.size() + " deletes");
}
@@ -184,8 +312,19 @@ public class TestNRTThreads extends LuceneTestCase {
}
delIDs.addAll(toDeleteIDs);
toDeleteIDs.clear();
+
+ for(SubDocs subDocs : toDeleteSubDocs) {
+ assert !subDocs.deleted;
+ writer.deleteDocuments(new Term("packID", subDocs.packID));
+ subDocs.deleted = true;
+ if (VERBOSE) {
+ System.out.println(" del subs: " + subDocs.subIDs + " packID=" + subDocs.packID);
+ }
+ delIDs.addAll(subDocs.subIDs);
+ delCount.addAndGet(subDocs.subIDs.size());
+ }
+ toDeleteSubDocs.clear();
}
- addCount.getAndIncrement();
if (addedField != null) {
doc.removeField(addedField);
}
@@ -356,7 +495,7 @@ public class TestNRTThreads extends LuceneTestCase {
if (VERBOSE) {
System.out.println("TEST: done join [" + (System.currentTimeMillis()-t0) + " ms]; addCount=" + addCount + " delCount=" + delCount);
}
-
+
final IndexReader r2 = writer.getReader();
final IndexSearcher s = newSearcher(r2);
boolean doFail = false;
@@ -367,6 +506,43 @@ public class TestNRTThreads extends LuceneTestCase {
doFail = true;
}
}
+
+ // Make sure each group of sub-docs are still in docID order:
+ for(SubDocs subDocs : allSubDocs) {
+ if (!subDocs.deleted) {
+ // We sort by relevance but the scores should be identical so sort falls back to by docID:
+ TopDocs hits = s.search(new TermQuery(new Term("packID", subDocs.packID)), 20);
+ assertEquals(subDocs.subIDs.size(), hits.totalHits);
+ int lastDocID = -1;
+ int startDocID = -1;
+ for(ScoreDoc scoreDoc : hits.scoreDocs) {
+ final int docID = scoreDoc.doc;
+ if (lastDocID != -1) {
+ assertEquals(1+lastDocID, docID);
+ } else {
+ startDocID = docID;
+ }
+ lastDocID = docID;
+ final Document doc = s.doc(docID);
+ assertEquals(subDocs.packID, doc.get("packID"));
+ }
+
+ lastDocID = startDocID - 1;
+ for(String subID : subDocs.subIDs) {
+ hits = s.search(new TermQuery(new Term("docid", subID)), 1);
+ assertEquals(1, hits.totalHits);
+ final int docID = hits.scoreDocs[0].doc;
+ if (lastDocID != -1) {
+ assertEquals(1+lastDocID, docID);
+ }
+ lastDocID = docID;
+ }
+ } else {
+ for(String subID : subDocs.subIDs) {
+ assertEquals(0, s.search(new TermQuery(new Term("docid", subID)), 1).totalHits);
+ }
+ }
+ }
final int endID = Integer.parseInt(docs.nextDoc().get("docid"));
for(int id=0;id