LUCENE-3112: add IW.add/updateDocuments, to atomically add a block of docs with guaranteed sequential docIDs

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1126560 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael McCandless 2011-05-23 16:02:25 +00:00
parent 0974e2768a
commit 4666e84ee1
13 changed files with 727 additions and 64 deletions

View File

@ -486,6 +486,10 @@ New features
document IDs and scores encountered during the search, and "replay" them to
another Collector. (Mike McCandless, Shai Erera)
* LUCENE-3112: Added experimental IndexWriter.add/updateDocuments,
enabling a block of documents to be indexed, atomically, with
guaranteed sequential docIDs. (Mike McCandless)
API Changes
* LUCENE-3061: IndexWriter's getNextMerge() and merge(OneMerge) are now public

View File

@ -45,6 +45,11 @@ import org.apache.lucene.store.FSDirectory;
* @lucene.experimental You can easily
* accidentally remove segments from your index so be
* careful!
*
* <p><b>NOTE</b>: this tool is unaware of documents added
* atomically via {@link IndexWriter#addDocuments} or {@link
* IndexWriter#updateDocuments}, which means it can easily
* break up such document groups.
*/
public class IndexSplitter {
public SegmentInfos infos;

View File

@ -40,6 +40,11 @@ import org.apache.lucene.util.Version;
* <p>Note 2: the disadvantage of this tool is that source index needs to be
* read as many times as there are parts to be created, hence the name of this
* tool.
*
* <p><b>NOTE</b>: this tool is unaware of documents added
* atomically via {@link IndexWriter#addDocuments} or {@link
* IndexWriter#updateDocuments}, which means it can easily
* break up such document groups.
*/
public class MultiPassIndexSplitter {

View File

@ -274,11 +274,9 @@ final class DocumentsWriter {
flushControl.setClosed();
}
boolean updateDocument(final Document doc, final Analyzer analyzer,
final Term delTerm) throws CorruptIndexException, IOException {
private boolean preUpdate() throws CorruptIndexException, IOException {
ensureOpen();
boolean maybeMerge = false;
final boolean isUpdate = delTerm != null;
if (flushControl.anyStalledThreads() || flushControl.numQueuedFlushes() > 0) {
// Help out flushing any queued DWPTs so we can un-stall:
if (infoStream != null) {
@ -303,9 +301,59 @@ final class DocumentsWriter {
message("continue indexing after helpling out flushing DocumentsWriter is healthy");
}
}
return maybeMerge;
}
final ThreadState perThread = perThreadPool.getAndLock(Thread.currentThread(),
this, doc);
private boolean postUpdate(DocumentsWriterPerThread flushingDWPT, boolean maybeMerge) throws IOException {
if (flushingDWPT != null) {
maybeMerge |= doFlush(flushingDWPT);
} else {
final DocumentsWriterPerThread nextPendingFlush = flushControl.nextPendingFlush();
if (nextPendingFlush != null) {
maybeMerge |= doFlush(nextPendingFlush);
}
}
return maybeMerge;
}
boolean updateDocuments(final Iterable<Document> docs, final Analyzer analyzer,
final Term delTerm) throws CorruptIndexException, IOException {
boolean maybeMerge = preUpdate();
final ThreadState perThread = perThreadPool.getAndLock(Thread.currentThread(), this);
final DocumentsWriterPerThread flushingDWPT;
try {
if (!perThread.isActive()) {
ensureOpen();
assert false: "perThread is not active but we are still open";
}
final DocumentsWriterPerThread dwpt = perThread.perThread;
try {
final int docCount = dwpt.updateDocuments(docs, analyzer, delTerm);
numDocsInRAM.addAndGet(docCount);
} finally {
if (dwpt.checkAndResetHasAborted()) {
flushControl.doOnAbort(perThread);
}
}
final boolean isUpdate = delTerm != null;
flushingDWPT = flushControl.doAfterDocument(perThread, isUpdate);
} finally {
perThread.unlock();
}
return postUpdate(flushingDWPT, maybeMerge);
}
boolean updateDocument(final Document doc, final Analyzer analyzer,
final Term delTerm) throws CorruptIndexException, IOException {
boolean maybeMerge = preUpdate();
final ThreadState perThread = perThreadPool.getAndLock(Thread.currentThread(), this);
final DocumentsWriterPerThread flushingDWPT;
try {
@ -324,20 +372,13 @@ final class DocumentsWriter {
flushControl.doOnAbort(perThread);
}
}
final boolean isUpdate = delTerm != null;
flushingDWPT = flushControl.doAfterDocument(perThread, isUpdate);
} finally {
perThread.unlock();
}
if (flushingDWPT != null) {
maybeMerge |= doFlush(flushingDWPT);
} else {
final DocumentsWriterPerThread nextPendingFlush = flushControl.nextPendingFlush();
if (nextPendingFlush != null) {
maybeMerge |= doFlush(nextPendingFlush);
}
}
return maybeMerge;
return postUpdate(flushingDWPT, maybeMerge);
}
private boolean doFlush(DocumentsWriterPerThread flushingDWPT) throws IOException {

View File

@ -68,7 +68,7 @@ public final class DocumentsWriterFlushControl {
this.stallControl = new DocumentsWriterStallControl();
this.perThreadPool = documentsWriter.perThreadPool;
this.flushPolicy = documentsWriter.flushPolicy;
this.hardMaxBytesPerDWPT = config.getRAMPerThreadHardLimitMB() * 1024 * 1024;;
this.hardMaxBytesPerDWPT = config.getRAMPerThreadHardLimitMB() * 1024 * 1024;
this.config = config;
this.documentsWriter = documentsWriter;
}
@ -162,8 +162,6 @@ public final class DocumentsWriterFlushControl {
stallControl.updateStalled(this);
assert assertMemory();
}
}
synchronized void doAfterFlush(DocumentsWriterPerThread dwpt) {
@ -217,7 +215,7 @@ public final class DocumentsWriterFlushControl {
assert assertMemory();
// Take it out of the loop this DWPT is stale
perThreadPool.replaceForFlush(state, closed);
}finally {
} finally {
stallControl.updateStalled(this);
}
}

View File

@ -253,6 +253,82 @@ public class DocumentsWriterPerThread {
finishDocument(delTerm);
}
public int updateDocuments(Iterable<Document> docs, Analyzer analyzer, Term delTerm) throws IOException {
assert writer.testPoint("DocumentsWriterPerThread addDocuments start");
assert deleteQueue != null;
docState.analyzer = analyzer;
if (segment == null) {
// this call is synchronized on IndexWriter.segmentInfos
segment = writer.newSegmentName();
assert numDocsInRAM == 0;
}
int docCount = 0;
try {
for(Document doc : docs) {
docState.doc = doc;
docState.docID = numDocsInRAM;
docCount++;
boolean success = false;
try {
consumer.processDocument(fieldInfos);
success = true;
} finally {
if (!success) {
// An exc is being thrown...
if (!aborting) {
// One of the documents hit a non-aborting
// exception (eg something happened during
// analysis). We now go and mark any docs
// from this batch that we had already indexed
// as deleted:
int docID = docState.docID;
final int endDocID = docID - docCount;
while (docID > endDocID) {
deleteDocID(docID);
docID--;
}
// Incr here because finishDocument will not
// be called (because an exc is being thrown):
numDocsInRAM++;
fieldInfos.revertUncommitted();
} else {
abort();
}
}
}
success = false;
try {
consumer.finishDocument();
success = true;
} finally {
if (!success) {
abort();
}
}
finishDocument(null);
}
// Apply delTerm only after all indexing has
// succeeded, but apply it only to docs prior to when
// this batch started:
if (delTerm != null) {
deleteQueue.add(delTerm, deleteSlice);
assert deleteSlice.isTailItem(delTerm) : "expected the delete term as the tail item";
deleteSlice.apply(pendingDeletes, numDocsInRAM-docCount);
}
} finally {
docState.clear();
}
return docCount;
}
private void finishDocument(Term delTerm) throws IOException {
/*
* here we actually finish the document in two steps 1. push the delete into

View File

@ -19,7 +19,6 @@ package org.apache.lucene.index;
import java.util.Iterator;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.lucene.document.Document;
import org.apache.lucene.index.FieldInfos.FieldNumberBiMap;
import org.apache.lucene.index.SegmentCodecs.SegmentCodecsBuilder;
import org.apache.lucene.index.codecs.CodecProvider;
@ -212,7 +211,7 @@ public abstract class DocumentsWriterPerThreadPool {
// don't recycle DWPT by default
}
public abstract ThreadState getAndLock(Thread requestingThread, DocumentsWriter documentsWriter, Document doc);
public abstract ThreadState getAndLock(Thread requestingThread, DocumentsWriter documentsWriter);
/**
* Returns an iterator providing access to all {@link ThreadState}

View File

@ -1227,6 +1227,111 @@ public class IndexWriter implements Closeable {
updateDocument(null, doc, analyzer);
}
/**
* Atomically adds a block of documents with sequentially
* assigned document IDs, such that an external reader
* will see all or none of the documents.
*
* <p><b>WARNING</b>: the index does not currently record
* which documents were added as a block. Today this is
* fine, because merging will preserve the block (as long
* as none them were deleted). But it's possible in the
* future that Lucene may more aggressively re-order
* documents (for example, perhaps to obtain better index
* compression), in which case you may need to fully
* re-index your documents at that time.
*
* <p>See {@link #addDocument(Document)} for details on
* index and IndexWriter state after an Exception, and
* flushing/merging temporary free space requirements.</p>
*
* <p><b>NOTE</b>: tools that do offline splitting of an index
* (for example, IndexSplitter in contrib) or
* re-sorting of documents (for example, IndexSorter in
* contrib) are not aware of these atomically added documents
* and will likely break them up. Use such tools at your
* own risk!
*
* <p><b>NOTE</b>: if this method hits an OutOfMemoryError
* you should immediately close the writer. See <a
* href="#OOME">above</a> for details.</p>
*
* @throws CorruptIndexException if the index is corrupt
* @throws IOException if there is a low-level IO error
*
* @lucene.experimental
*/
public void addDocuments(Iterable<Document> docs) throws CorruptIndexException, IOException {
addDocuments(docs, analyzer);
}
/**
* Atomically adds a block of documents, analyzed using the
* provided analyzer, with sequentially assigned document
* IDs, such that an external reader will see all or none
* of the documents.
*
* @throws CorruptIndexException if the index is corrupt
* @throws IOException if there is a low-level IO error
*
* @lucene.experimental
*/
public void addDocuments(Iterable<Document> docs, Analyzer analyzer) throws CorruptIndexException, IOException {
updateDocuments(null, docs, analyzer);
}
/**
* Atomically deletes documents matching the provided
* delTerm and adds a block of documents with sequentially
* assigned document IDs, such that an external reader
* will see all or none of the documents.
*
* See {@link #addDocuments(Iterable)}.
*
* @throws CorruptIndexException if the index is corrupt
* @throws IOException if there is a low-level IO error
*
* @lucene.experimental
*/
public void updateDocuments(Term delTerm, Iterable<Document> docs) throws CorruptIndexException, IOException {
updateDocuments(delTerm, docs, analyzer);
}
/**
* Atomically deletes documents matching the provided
* delTerm and adds a block of documents, analyzed using
* the provided analyzer, with sequentially
* assigned document IDs, such that an external reader
* will see all or none of the documents.
*
* See {@link #addDocuments(Iterable)}.
*
* @throws CorruptIndexException if the index is corrupt
* @throws IOException if there is a low-level IO error
*
* @lucene.experimental
*/
public void updateDocuments(Term delTerm, Iterable<Document> docs, Analyzer analyzer) throws CorruptIndexException, IOException {
ensureOpen();
try {
boolean success = false;
boolean anySegmentFlushed = false;
try {
anySegmentFlushed = docWriter.updateDocuments(docs, analyzer, delTerm);
success = true;
} finally {
if (!success && infoStream != null) {
message("hit exception updating document");
}
}
if (anySegmentFlushed) {
maybeMerge();
}
} catch (OutOfMemoryError oom) {
handleOOM(oom, "updateDocuments");
}
}
/**
* Deletes the document(s) containing <code>term</code>.
*

View File

@ -139,7 +139,7 @@ final class TermVectorsTermsWriter extends TermsHashConsumer {
}
}
assert lastDocID == docState.docID;
assert lastDocID == docState.docID: "lastDocID=" + lastDocID + " docState.docID=" + docState.docID;
lastDocID++;

View File

@ -18,7 +18,6 @@ package org.apache.lucene.index;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.lucene.document.Document;
import org.apache.lucene.index.DocumentsWriterPerThreadPool.ThreadState; //javadoc
/**
@ -48,13 +47,11 @@ public class ThreadAffinityDocumentsWriterThreadPool extends DocumentsWriterPerT
}
@Override
public ThreadState getAndLock(Thread requestingThread, DocumentsWriter documentsWriter, Document doc) {
public ThreadState getAndLock(Thread requestingThread, DocumentsWriter documentsWriter) {
ThreadState threadState = threadBindings.get(requestingThread);
if (threadState != null) {
if (threadState.tryLock()) {
if (threadState != null && threadState.tryLock()) {
return threadState;
}
}
ThreadState minThreadState = null;

View File

@ -19,6 +19,7 @@ package org.apache.lucene.index;
import java.io.Closeable;
import java.io.IOException;
import java.util.Iterator;
import java.util.Random;
import org.apache.lucene.analysis.Analyzer;
@ -97,8 +98,43 @@ public class RandomIndexWriter implements Closeable {
* Adds a Document.
* @see IndexWriter#addDocument(Document)
*/
public void addDocument(Document doc) throws IOException {
public void addDocument(final Document doc) throws IOException {
if (r.nextInt(5) == 3) {
// TODO: maybe, we should simply buffer up added docs
// (but we need to clone them), and only when
// getReader, commit, etc. are called, we do an
// addDocuments? Would be better testing.
w.addDocuments(new Iterable<Document>() {
// @Override -- not until Java 1.6
public Iterator<Document> iterator() {
return new Iterator<Document>() {
boolean done;
// @Override -- not until Java 1.6
public boolean hasNext() {
return !done;
}
// @Override -- not until Java 1.6
public void remove() {
throw new UnsupportedOperationException();
}
// @Override -- not until Java 1.6
public Document next() {
if (done) {
throw new IllegalStateException();
}
done = true;
return doc;
}
};
}
});
} else {
w.addDocument(doc);
}
maybeCommit();
}
@ -116,12 +152,53 @@ public class RandomIndexWriter implements Closeable {
}
}
public void addDocuments(Iterable<Document> docs) throws IOException {
w.addDocuments(docs);
maybeCommit();
}
public void updateDocuments(Term delTerm, Iterable<Document> docs) throws IOException {
w.updateDocuments(delTerm, docs);
maybeCommit();
}
/**
* Updates a document.
* @see IndexWriter#updateDocument(Term, Document)
*/
public void updateDocument(Term t, Document doc) throws IOException {
public void updateDocument(Term t, final Document doc) throws IOException {
if (r.nextInt(5) == 3) {
w.updateDocuments(t, new Iterable<Document>() {
// @Override -- not until Java 1.6
public Iterator<Document> iterator() {
return new Iterator<Document>() {
boolean done;
// @Override -- not until Java 1.6
public boolean hasNext() {
return !done;
}
// @Override -- not until Java 1.6
public void remove() {
throw new UnsupportedOperationException();
}
// @Override -- not until Java 1.6
public Document next() {
if (done) {
throw new IllegalStateException();
}
done = true;
return doc;
}
};
}
});
} else {
w.updateDocument(t, doc);
}
maybeCommit();
}

View File

@ -17,24 +17,16 @@ package org.apache.lucene.index;
* limitations under the License.
*/
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.PrintStream;
import java.io.Reader;
import java.io.StringReader;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
import org.apache.lucene.util.Bits;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.LuceneTestCase;
import org.apache.lucene.util._TestUtil;
import org.apache.lucene.search.DocIdSetIterator;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.IndexOutput;
import org.apache.lucene.store.MockDirectoryWrapper;
import org.apache.lucene.store.RAMDirectory;
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.analysis.MockAnalyzer;
import org.apache.lucene.analysis.MockTokenizer;
@ -43,9 +35,54 @@ import org.apache.lucene.analysis.TokenStream;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
import org.apache.lucene.index.IndexWriterConfig.OpenMode;
import org.apache.lucene.search.DocIdSetIterator;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.PhraseQuery;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.IndexOutput;
import org.apache.lucene.store.MockDirectoryWrapper;
import org.apache.lucene.store.RAMDirectory;
import org.apache.lucene.util.Bits;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.LuceneTestCase;
import org.apache.lucene.util._TestUtil;
public class TestIndexWriterExceptions extends LuceneTestCase {
private static class DocCopyIterator implements Iterable<Document> {
private final Document doc;
private final int count;
public DocCopyIterator(Document doc, int count) {
this.count = count;
this.doc = doc;
}
// @Override -- not until Java 1.6
public Iterator<Document> iterator() {
return new Iterator<Document>() {
int upto;
// @Override -- not until Java 1.6
public boolean hasNext() {
return upto < count;
}
// @Override -- not until Java 1.6
public Document next() {
upto++;
return doc;
}
// @Override -- not until Java 1.6
public void remove() {
throw new UnsupportedOperationException();
}
};
}
}
private class IndexerThread extends Thread {
IndexWriter writer;
@ -87,7 +124,11 @@ public class TestIndexWriterExceptions extends LuceneTestCase {
idField.setValue(id);
Term idTerm = new Term("id", id);
try {
if (r.nextBoolean()) {
writer.updateDocuments(idTerm, new DocCopyIterator(doc, _TestUtil.nextInt(r, 1, 20)));
} else {
writer.updateDocument(idTerm, doc);
}
} catch (RuntimeException re) {
if (VERBOSE) {
System.out.println(Thread.currentThread().getName() + ": EXC: ");
@ -136,7 +177,7 @@ public class TestIndexWriterExceptions extends LuceneTestCase {
@Override
boolean testPoint(String name) {
if (doFail.get() != null && !name.equals("startDoFlush") && r.nextInt(20) == 17) {
if (doFail.get() != null && !name.equals("startDoFlush") && r.nextInt(40) == 17) {
if (VERBOSE) {
System.out.println(Thread.currentThread().getName() + ": NOW FAIL: " + name);
new Throwable().printStackTrace(System.out);
@ -267,6 +308,8 @@ public class TestIndexWriterExceptions extends LuceneTestCase {
}
}
private static String CRASH_FAIL_MESSAGE = "I'm experiencing problems";
private class CrashingFilter extends TokenFilter {
String fieldName;
int count;
@ -279,7 +322,7 @@ public class TestIndexWriterExceptions extends LuceneTestCase {
@Override
public boolean incrementToken() throws IOException {
if (this.fieldName.equals("crash") && count++ >= 4)
throw new IOException("I'm experiencing problems");
throw new IOException(CRASH_FAIL_MESSAGE);
return input.incrementToken();
}
@ -1278,4 +1321,141 @@ public class TestIndexWriterExceptions extends LuceneTestCase {
}
}
}
public void testAddDocsNonAbortingException() throws Exception {
final Directory dir = newDirectory();
final RandomIndexWriter w = new RandomIndexWriter(random, dir);
final int numDocs1 = random.nextInt(25);
for(int docCount=0;docCount<numDocs1;docCount++) {
Document doc = new Document();
doc.add(newField("content", "good content", Field.Index.ANALYZED));
w.addDocument(doc);
}
final List<Document> docs = new ArrayList<Document>();
for(int docCount=0;docCount<7;docCount++) {
Document doc = new Document();
docs.add(doc);
doc.add(newField("id", docCount+"", Field.Index.NOT_ANALYZED));
doc.add(newField("content", "silly content " + docCount, Field.Index.ANALYZED));
if (docCount == 4) {
Field f = newField("crash", "", Field.Index.ANALYZED);
doc.add(f);
MockTokenizer tokenizer = new MockTokenizer(new StringReader("crash me on the 4th token"), MockTokenizer.WHITESPACE, false);
tokenizer.setEnableChecks(false); // disable workflow checking as we forcefully close() in exceptional cases.
f.setTokenStream(new CrashingFilter("crash", tokenizer));
}
}
try {
w.addDocuments(docs);
// BUG: CrashingFilter didn't
fail("did not hit expected exception");
} catch (IOException ioe) {
// expected
assertEquals(CRASH_FAIL_MESSAGE, ioe.getMessage());
}
final int numDocs2 = random.nextInt(25);
for(int docCount=0;docCount<numDocs2;docCount++) {
Document doc = new Document();
doc.add(newField("content", "good content", Field.Index.ANALYZED));
w.addDocument(doc);
}
final IndexReader r = w.getReader();
w.close();
final IndexSearcher s = new IndexSearcher(r);
PhraseQuery pq = new PhraseQuery();
pq.add(new Term("content", "silly"));
pq.add(new Term("content", "content"));
assertEquals(0, s.search(pq, 1).totalHits);
pq = new PhraseQuery();
pq.add(new Term("content", "good"));
pq.add(new Term("content", "content"));
assertEquals(numDocs1+numDocs2, s.search(pq, 1).totalHits);
r.close();
dir.close();
}
public void testUpdateDocsNonAbortingException() throws Exception {
final Directory dir = newDirectory();
final RandomIndexWriter w = new RandomIndexWriter(random, dir);
final int numDocs1 = random.nextInt(25);
for(int docCount=0;docCount<numDocs1;docCount++) {
Document doc = new Document();
doc.add(newField("content", "good content", Field.Index.ANALYZED));
w.addDocument(doc);
}
// Use addDocs (no exception) to get docs in the index:
final List<Document> docs = new ArrayList<Document>();
final int numDocs2 = random.nextInt(25);
for(int docCount=0;docCount<numDocs2;docCount++) {
Document doc = new Document();
docs.add(doc);
doc.add(newField("subid", "subs", Field.Index.NOT_ANALYZED));
doc.add(newField("id", docCount+"", Field.Index.NOT_ANALYZED));
doc.add(newField("content", "silly content " + docCount, Field.Index.ANALYZED));
}
w.addDocuments(docs);
final int numDocs3 = random.nextInt(25);
for(int docCount=0;docCount<numDocs3;docCount++) {
Document doc = new Document();
doc.add(newField("content", "good content", Field.Index.ANALYZED));
w.addDocument(doc);
}
docs.clear();
final int limit = _TestUtil.nextInt(random, 2, 25);
final int crashAt = random.nextInt(limit);
for(int docCount=0;docCount<limit;docCount++) {
Document doc = new Document();
docs.add(doc);
doc.add(newField("id", docCount+"", Field.Index.NOT_ANALYZED));
doc.add(newField("content", "silly content " + docCount, Field.Index.ANALYZED));
if (docCount == crashAt) {
Field f = newField("crash", "", Field.Index.ANALYZED);
doc.add(f);
MockTokenizer tokenizer = new MockTokenizer(new StringReader("crash me on the 4th token"), MockTokenizer.WHITESPACE, false);
tokenizer.setEnableChecks(false); // disable workflow checking as we forcefully close() in exceptional cases.
f.setTokenStream(new CrashingFilter("crash", tokenizer));
}
}
try {
w.updateDocuments(new Term("subid", "subs"), docs);
// BUG: CrashingFilter didn't
fail("did not hit expected exception");
} catch (IOException ioe) {
// expected
assertEquals(CRASH_FAIL_MESSAGE, ioe.getMessage());
}
final int numDocs4 = random.nextInt(25);
for(int docCount=0;docCount<numDocs4;docCount++) {
Document doc = new Document();
doc.add(newField("content", "good content", Field.Index.ANALYZED));
w.addDocument(doc);
}
final IndexReader r = w.getReader();
w.close();
final IndexSearcher s = new IndexSearcher(r);
PhraseQuery pq = new PhraseQuery();
pq.add(new Term("content", "silly"));
pq.add(new Term("content", "content"));
assertEquals(numDocs2, s.search(pq, 1).totalHits);
pq = new PhraseQuery();
pq.add(new Term("content", "good"));
pq.add(new Term("content", "content"));
assertEquals(numDocs1+numDocs3+numDocs4, s.search(pq, 1).totalHits);
r.close();
dir.close();
}
}

View File

@ -21,33 +21,35 @@ import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.HashSet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.Executors;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.lucene.analysis.MockAnalyzer;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.Fieldable;
import org.apache.lucene.index.codecs.CodecProvider;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.PhraseQuery;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.ScoreDoc;
import org.apache.lucene.search.Sort;
import org.apache.lucene.search.SortField;
import org.apache.lucene.search.TermQuery;
import org.apache.lucene.search.TopDocs;
import org.apache.lucene.document.Field;
import org.apache.lucene.store.FSDirectory;
import org.apache.lucene.store.MockDirectoryWrapper;
import org.apache.lucene.util.NamedThreadFactory;
import org.apache.lucene.util.Bits;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.LineFileDocs;
import org.apache.lucene.util.LuceneTestCase;
import org.apache.lucene.util.NamedThreadFactory;
import org.apache.lucene.util._TestUtil;
import org.junit.Test;
@ -57,6 +59,39 @@ import org.junit.Test;
public class TestNRTThreads extends LuceneTestCase {
private static class SubDocs {
public final String packID;
public final List<String> subIDs;
public boolean deleted;
public SubDocs(String packID, List<String> subIDs) {
this.packID = packID;
this.subIDs = subIDs;
}
}
// TODO: is there a pre-existing way to do this!!!
private Document cloneDoc(Document doc1) {
final Document doc2 = new Document();
for(Fieldable f : doc1.getFields()) {
Field field1 = (Field) f;
Field field2 = new Field(field1.name(),
field1.stringValue(),
field1.isStored() ? Field.Store.YES : Field.Store.NO,
field1.isIndexed() ? (field1.isTokenized() ? Field.Index.ANALYZED : Field.Index.NOT_ANALYZED) : Field.Index.NO);
if (field1.getOmitNorms()) {
field2.setOmitNorms(true);
}
if (field1.getOmitTermFreqAndPositions()) {
field2.setOmitTermFreqAndPositions(true);
}
doc2.add(field2);
}
return doc2;
}
@Test
public void testNRTThreads() throws Exception {
@ -121,13 +156,16 @@ public class TestNRTThreads extends LuceneTestCase {
final int NUM_INDEX_THREADS = 2;
final int NUM_SEARCH_THREADS = 3;
final int RUN_TIME_SEC = LuceneTestCase.TEST_NIGHTLY ? 300 : 5;
final AtomicBoolean failed = new AtomicBoolean();
final AtomicInteger addCount = new AtomicInteger();
final AtomicInteger delCount = new AtomicInteger();
final AtomicInteger packCount = new AtomicInteger();
final Set<String> delIDs = Collections.synchronizedSet(new HashSet<String>());
final List<SubDocs> allSubDocs = Collections.synchronizedList(new ArrayList<SubDocs>());
final long stopTime = System.currentTimeMillis() + RUN_TIME_SEC*1000;
Thread[] threads = new Thread[NUM_INDEX_THREADS];
@ -135,7 +173,9 @@ public class TestNRTThreads extends LuceneTestCase {
threads[thread] = new Thread() {
@Override
public void run() {
// TODO: would be better if this were cross thread, so that we make sure one thread deleting anothers added docs works:
final List<String> toDeleteIDs = new ArrayList<String>();
final List<SubDocs> toDeleteSubDocs = new ArrayList<SubDocs>();
while(System.currentTimeMillis() < stopTime && !failed.get()) {
try {
Document doc = docs.nextDoc();
@ -153,7 +193,92 @@ public class TestNRTThreads extends LuceneTestCase {
if (VERBOSE) {
//System.out.println(Thread.currentThread().getName() + ": add doc id:" + doc.get("docid"));
}
if (random.nextBoolean()) {
// Add a pack of adjacent sub-docs
final String packID;
final SubDocs delSubDocs;
if (toDeleteSubDocs.size() > 0 && random.nextBoolean()) {
delSubDocs = toDeleteSubDocs.get(random.nextInt(toDeleteSubDocs.size()));
assert !delSubDocs.deleted;
toDeleteSubDocs.remove(delSubDocs);
// reuse prior packID
packID = delSubDocs.packID;
} else {
delSubDocs = null;
// make new packID
packID = packCount.getAndIncrement() + "";
}
final Field packIDField = newField("packID", packID, Field.Store.YES, Field.Index.NOT_ANALYZED);
final List<String> docIDs = new ArrayList<String>();
final SubDocs subDocs = new SubDocs(packID, docIDs);
final List<Document> docsList = new ArrayList<Document>();
allSubDocs.add(subDocs);
doc.add(packIDField);
docsList.add(cloneDoc(doc));
docIDs.add(doc.get("docid"));
final int maxDocCount = _TestUtil.nextInt(random, 1, 10);
while(docsList.size() < maxDocCount) {
doc = docs.nextDoc();
if (doc == null) {
break;
}
docsList.add(cloneDoc(doc));
docIDs.add(doc.get("docid"));
}
addCount.addAndGet(docsList.size());
if (delSubDocs != null) {
delSubDocs.deleted = true;
delIDs.addAll(delSubDocs.subIDs);
delCount.addAndGet(delSubDocs.subIDs.size());
if (VERBOSE) {
System.out.println("TEST: update pack packID=" + delSubDocs.packID + " count=" + docsList.size() + " docs=" + docIDs);
}
writer.updateDocuments(new Term("packID", delSubDocs.packID), docsList);
/*
// non-atomic:
writer.deleteDocuments(new Term("packID", delSubDocs.packID));
for(Document subDoc : docsList) {
writer.addDocument(subDoc);
}
*/
} else {
if (VERBOSE) {
System.out.println("TEST: add pack packID=" + packID + " count=" + docsList.size() + " docs=" + docIDs);
}
writer.addDocuments(docsList);
/*
// non-atomic:
for(Document subDoc : docsList) {
writer.addDocument(subDoc);
}
*/
}
doc.removeField("packID");
if (random.nextInt(5) == 2) {
if (VERBOSE) {
//System.out.println(Thread.currentThread().getName() + ": buffer del id:" + packID);
}
toDeleteSubDocs.add(subDocs);
}
} else {
writer.addDocument(doc);
addCount.getAndIncrement();
if (random.nextInt(5) == 3) {
if (VERBOSE) {
//System.out.println(Thread.currentThread().getName() + ": buffer del id:" + doc.get("docid"));
}
toDeleteIDs.add(doc.get("docid"));
}
}
} else {
// we use update but it never replaces a
// prior doc
@ -161,14 +286,17 @@ public class TestNRTThreads extends LuceneTestCase {
//System.out.println(Thread.currentThread().getName() + ": update doc id:" + doc.get("docid"));
}
writer.updateDocument(new Term("docid", doc.get("docid")), doc);
}
addCount.getAndIncrement();
if (random.nextInt(5) == 3) {
if (VERBOSE) {
//System.out.println(Thread.currentThread().getName() + ": buffer del id:" + doc.get("docid"));
}
toDeleteIDs.add(doc.get("docid"));
}
if (random.nextInt(50) == 17) {
}
if (random.nextInt(30) == 17) {
if (VERBOSE) {
//System.out.println(Thread.currentThread().getName() + ": apply " + toDeleteIDs.size() + " deletes");
}
@ -184,8 +312,19 @@ public class TestNRTThreads extends LuceneTestCase {
}
delIDs.addAll(toDeleteIDs);
toDeleteIDs.clear();
for(SubDocs subDocs : toDeleteSubDocs) {
assert !subDocs.deleted;
writer.deleteDocuments(new Term("packID", subDocs.packID));
subDocs.deleted = true;
if (VERBOSE) {
System.out.println(" del subs: " + subDocs.subIDs + " packID=" + subDocs.packID);
}
delIDs.addAll(subDocs.subIDs);
delCount.addAndGet(subDocs.subIDs.size());
}
toDeleteSubDocs.clear();
}
addCount.getAndIncrement();
if (addedField != null) {
doc.removeField(addedField);
}
@ -368,6 +507,43 @@ public class TestNRTThreads extends LuceneTestCase {
}
}
// Make sure each group of sub-docs are still in docID order:
for(SubDocs subDocs : allSubDocs) {
if (!subDocs.deleted) {
// We sort by relevance but the scores should be identical so sort falls back to by docID:
TopDocs hits = s.search(new TermQuery(new Term("packID", subDocs.packID)), 20);
assertEquals(subDocs.subIDs.size(), hits.totalHits);
int lastDocID = -1;
int startDocID = -1;
for(ScoreDoc scoreDoc : hits.scoreDocs) {
final int docID = scoreDoc.doc;
if (lastDocID != -1) {
assertEquals(1+lastDocID, docID);
} else {
startDocID = docID;
}
lastDocID = docID;
final Document doc = s.doc(docID);
assertEquals(subDocs.packID, doc.get("packID"));
}
lastDocID = startDocID - 1;
for(String subID : subDocs.subIDs) {
hits = s.search(new TermQuery(new Term("docid", subID)), 1);
assertEquals(1, hits.totalHits);
final int docID = hits.scoreDocs[0].doc;
if (lastDocID != -1) {
assertEquals(1+lastDocID, docID);
}
lastDocID = docID;
}
} else {
for(String subID : subDocs.subIDs) {
assertEquals(0, s.search(new TermQuery(new Term("docid", subID)), 1).totalHits);
}
}
}
final int endID = Integer.parseInt(docs.nextDoc().get("docid"));
for(int id=0;id<endID;id++) {
String stringID = ""+id;