mirror of https://github.com/apache/lucene.git
LUCENE-565: added NewIndexModifier (IndexWriter subclass) to more efficiently handle updating documents (delete then add)
git-svn-id: https://svn.apache.org/repos/asf/lucene/java/trunk@502191 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
4e79f49f23
commit
7d583030ce
|
@ -102,6 +102,13 @@ New features
|
|||
their passing unit tests.
|
||||
(Otis Gospodnetic)
|
||||
|
||||
13. LUCENE-565: Added NewIndexModifier (subclass of IndexWriter) to
|
||||
more efficiently handle updating documents (the "delete then add"
|
||||
use case). This is intended to be an eventual replacement for the
|
||||
existing IndexModifier. Added IndexWriter.flush() (renamed from
|
||||
flushRamSegments()) to flush all pending updates (held in RAM), to
|
||||
the Directory. (Ning Li via Mike McCandless)
|
||||
|
||||
API Changes
|
||||
|
||||
1. LUCENE-438: Remove "final" from Token, implement Cloneable, allow
|
||||
|
|
|
@ -108,8 +108,8 @@ public class IndexWriter {
|
|||
private HashSet protectedSegments; // segment names that should not be deleted until commit
|
||||
private SegmentInfos rollbackSegmentInfos; // segmentInfos we will fallback to if the commit fails
|
||||
|
||||
private SegmentInfos segmentInfos = new SegmentInfos(); // the segments
|
||||
private SegmentInfos ramSegmentInfos = new SegmentInfos(); // the segments in ramDirectory
|
||||
protected SegmentInfos segmentInfos = new SegmentInfos(); // the segments
|
||||
protected SegmentInfos ramSegmentInfos = new SegmentInfos(); // the segments in ramDirectory
|
||||
private final RAMDirectory ramDirectory = new RAMDirectory(); // for temp segs
|
||||
private IndexFileDeleter deleter;
|
||||
|
||||
|
@ -125,6 +125,10 @@ public class IndexWriter {
|
|||
|
||||
private boolean closeDir;
|
||||
|
||||
protected IndexFileDeleter getDeleter() {
|
||||
return deleter;
|
||||
}
|
||||
|
||||
/** Get the current setting of whether to use the compound file format.
|
||||
* Note that this just returns the value you set with setUseCompoundFile(boolean)
|
||||
* or the default. You cannot use this to query the status of an existing index.
|
||||
|
@ -642,15 +646,20 @@ public class IndexWriter {
|
|||
* flushing/merging temporary free space requirements.</p>
|
||||
*/
|
||||
public void addDocument(Document doc, Analyzer analyzer) throws IOException {
|
||||
DocumentWriter dw =
|
||||
new DocumentWriter(ramDirectory, analyzer, this);
|
||||
SegmentInfo newSegmentInfo = buildSingleDocSegment(doc, analyzer);
|
||||
synchronized (this) {
|
||||
ramSegmentInfos.addElement(newSegmentInfo);
|
||||
maybeFlushRamSegments();
|
||||
}
|
||||
}
|
||||
|
||||
final SegmentInfo buildSingleDocSegment(Document doc, Analyzer analyzer)
|
||||
throws IOException {
|
||||
DocumentWriter dw = new DocumentWriter(ramDirectory, analyzer, this);
|
||||
dw.setInfoStream(infoStream);
|
||||
String segmentName = newRAMSegmentName();
|
||||
dw.addDocument(segmentName, doc);
|
||||
synchronized (this) {
|
||||
ramSegmentInfos.addElement(new SegmentInfo(segmentName, 1, ramDirectory, false, false));
|
||||
maybeFlushRamSegments();
|
||||
}
|
||||
return new SegmentInfo(segmentName, 1, ramDirectory, false, false);
|
||||
}
|
||||
|
||||
// for test purpose
|
||||
|
@ -658,7 +667,7 @@ public class IndexWriter {
|
|||
return ramSegmentInfos.size();
|
||||
}
|
||||
|
||||
private final synchronized String newRAMSegmentName() {
|
||||
final synchronized String newRAMSegmentName() {
|
||||
return "_ram_" + Integer.toString(ramSegmentInfos.counter++, Character.MAX_RADIX);
|
||||
}
|
||||
|
||||
|
@ -676,7 +685,7 @@ public class IndexWriter {
|
|||
}
|
||||
}
|
||||
|
||||
private final synchronized String newSegmentName() {
|
||||
final synchronized String newSegmentName() {
|
||||
return "_" + Integer.toString(segmentInfos.counter++, Character.MAX_RADIX);
|
||||
}
|
||||
|
||||
|
@ -1219,20 +1228,46 @@ public class IndexWriter {
|
|||
// counts x and y, then f(x) >= f(y).
|
||||
// 2: The number of committed segments on the same level (f(n)) <= M.
|
||||
|
||||
private final void maybeFlushRamSegments() throws IOException {
|
||||
if (ramSegmentInfos.size() >= minMergeDocs) {
|
||||
protected boolean timeToFlushRam() {
|
||||
return ramSegmentInfos.size() >= minMergeDocs;
|
||||
}
|
||||
|
||||
protected boolean anythingToFlushRam() {
|
||||
return ramSegmentInfos.size() > 0;
|
||||
}
|
||||
|
||||
// true if only buffered inserts, no buffered deletes
|
||||
protected boolean onlyRamDocsToFlush() {
|
||||
return true;
|
||||
}
|
||||
|
||||
// whether the latest segment is the flushed merge of ram segments
|
||||
protected void doAfterFlushRamSegments(boolean flushedRamSegments)
|
||||
throws IOException {
|
||||
}
|
||||
|
||||
protected final void maybeFlushRamSegments() throws IOException {
|
||||
if (timeToFlushRam()) {
|
||||
flushRamSegments();
|
||||
}
|
||||
}
|
||||
|
||||
/** Expert: Flushes all RAM-resident segments (buffered documents), then may merge segments. */
|
||||
public final synchronized void flushRamSegments() throws IOException {
|
||||
if (ramSegmentInfos.size() > 0) {
|
||||
private final synchronized void flushRamSegments() throws IOException {
|
||||
if (anythingToFlushRam()) {
|
||||
mergeSegments(ramSegmentInfos, 0, ramSegmentInfos.size());
|
||||
maybeMergeSegments(minMergeDocs);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Flush all in-memory buffered updates to the Directory.
|
||||
* @throws IOException
|
||||
*/
|
||||
public final synchronized void flush() throws IOException {
|
||||
flushRamSegments();
|
||||
}
|
||||
|
||||
/** Expert: Return the total size of all index files currently cached in memory.
|
||||
* Useful for size management with flushRamDocs()
|
||||
*/
|
||||
|
@ -1315,10 +1350,10 @@ public class IndexWriter {
|
|||
private final int mergeSegments(SegmentInfos sourceSegments, int minSegment, int end)
|
||||
throws IOException {
|
||||
|
||||
boolean mergeFlag = end > 0;
|
||||
final String mergedName = newSegmentName();
|
||||
if (infoStream != null) infoStream.print("merging segments");
|
||||
SegmentMerger merger = new SegmentMerger(this, mergedName);
|
||||
|
||||
SegmentMerger merger = null;
|
||||
|
||||
final Vector segmentsToDelete = new Vector();
|
||||
|
||||
String segmentsInfosFileName = segmentInfos.getCurrentSegmentFileName();
|
||||
|
@ -1326,21 +1361,26 @@ public class IndexWriter {
|
|||
|
||||
SegmentInfo newSegment = null;
|
||||
|
||||
int mergedDocCount;
|
||||
int mergedDocCount = 0;
|
||||
|
||||
// This is try/finally to make sure merger's readers are closed:
|
||||
try {
|
||||
|
||||
if (mergeFlag) {
|
||||
if (infoStream != null) infoStream.print("merging segments");
|
||||
merger = new SegmentMerger(this, mergedName);
|
||||
|
||||
for (int i = minSegment; i < end; i++) {
|
||||
SegmentInfo si = sourceSegments.info(i);
|
||||
if (infoStream != null)
|
||||
infoStream.print(" " + si.name + " (" + si.docCount + " docs)");
|
||||
IndexReader reader = SegmentReader.get(si);
|
||||
IndexReader reader = SegmentReader.get(si); // no need to set deleter (yet)
|
||||
merger.add(reader);
|
||||
if ((reader.directory() == this.directory) || // if we own the directory
|
||||
(reader.directory() == this.ramDirectory))
|
||||
segmentsToDelete.addElement(reader); // queue segment for deletion
|
||||
}
|
||||
}
|
||||
|
||||
SegmentInfos rollback = null;
|
||||
boolean success = false;
|
||||
|
@ -1349,6 +1389,7 @@ public class IndexWriter {
|
|||
// if we hit exception when doing the merge:
|
||||
try {
|
||||
|
||||
if (mergeFlag) {
|
||||
mergedDocCount = merger.merge();
|
||||
|
||||
if (infoStream != null) {
|
||||
|
@ -1357,23 +1398,32 @@ public class IndexWriter {
|
|||
|
||||
newSegment = new SegmentInfo(mergedName, mergedDocCount,
|
||||
directory, false, true);
|
||||
}
|
||||
|
||||
if (!inTransaction
|
||||
&& (sourceSegments != ramSegmentInfos || !onlyRamDocsToFlush())) {
|
||||
// Now save the SegmentInfo instances that
|
||||
// we are replacing:
|
||||
rollback = (SegmentInfos) segmentInfos.clone();
|
||||
}
|
||||
|
||||
if (mergeFlag) {
|
||||
if (sourceSegments == ramSegmentInfos) {
|
||||
segmentInfos.addElement(newSegment);
|
||||
} else {
|
||||
|
||||
if (!inTransaction) {
|
||||
// Now save the SegmentInfo instances that
|
||||
// we are replacing:
|
||||
rollback = (SegmentInfos) segmentInfos.clone();
|
||||
}
|
||||
|
||||
for (int i = end-1; i > minSegment; i--) // remove old infos & add new
|
||||
sourceSegments.remove(i);
|
||||
|
||||
segmentInfos.set(minSegment, newSegment);
|
||||
}
|
||||
}
|
||||
|
||||
if (sourceSegments == ramSegmentInfos) {
|
||||
// Should not be necessary: no prior commit should
|
||||
// have left pending files, so just defensive:
|
||||
deleter.clearPendingFiles();
|
||||
doAfterFlushRamSegments(mergeFlag);
|
||||
}
|
||||
|
||||
if (!inTransaction) {
|
||||
segmentInfos.write(directory); // commit before deleting
|
||||
|
@ -1396,7 +1446,7 @@ public class IndexWriter {
|
|||
|
||||
// Must rollback so our state matches index:
|
||||
|
||||
if (sourceSegments == ramSegmentInfos) {
|
||||
if (sourceSegments == ramSegmentInfos && onlyRamDocsToFlush()) {
|
||||
// Simple case: newSegment may or may not have
|
||||
// been added to the end of our segment infos,
|
||||
// so just check & remove if so:
|
||||
|
@ -1414,6 +1464,10 @@ public class IndexWriter {
|
|||
segmentInfos.addAll(rollback);
|
||||
}
|
||||
|
||||
// Erase any pending files that we were going to delete:
|
||||
// i.e. old del files added by SegmentReader.doCommit()
|
||||
deleter.clearPendingFiles();
|
||||
|
||||
// Delete any partially created files:
|
||||
deleter.deleteFile(nextSegmentsFileName);
|
||||
deleter.findDeletableFiles();
|
||||
|
@ -1422,18 +1476,21 @@ public class IndexWriter {
|
|||
}
|
||||
} finally {
|
||||
// close readers before we attempt to delete now-obsolete segments
|
||||
merger.closeReaders();
|
||||
if (mergeFlag) merger.closeReaders();
|
||||
}
|
||||
|
||||
if (!inTransaction) {
|
||||
// Attempt to delete all files we just obsoleted:
|
||||
deleter.deleteFile(segmentsInfosFileName); // delete old segments_N file
|
||||
deleter.deleteSegments(segmentsToDelete); // delete now-unused segments
|
||||
// including the old del files
|
||||
deleter.commitPendingFiles();
|
||||
} else {
|
||||
deleter.addPendingFile(segmentsInfosFileName); // delete old segments_N file
|
||||
deleter.deleteSegments(segmentsToDelete, protectedSegments); // delete now-unused segments
|
||||
}
|
||||
|
||||
if (useCompoundFile) {
|
||||
if (useCompoundFile && mergeFlag) {
|
||||
|
||||
segmentsInfosFileName = nextSegmentsFileName;
|
||||
nextSegmentsFileName = segmentInfos.getNextSegmentFileName();
|
||||
|
|
|
@ -0,0 +1,294 @@
|
|||
package org.apache.lucene.index;
|
||||
|
||||
import org.apache.lucene.analysis.Analyzer;
|
||||
import org.apache.lucene.document.Document;
|
||||
import org.apache.lucene.store.Directory;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.Map.Entry;
|
||||
|
||||
/**
|
||||
* NewIndexModifier extends {@link IndexWriter} so that you can not only insert
|
||||
* documents but also delete documents through a single interface. Internally,
|
||||
* inserts and deletes are buffered before they are flushed to disk.
|
||||
* <p>
|
||||
* Design Overview
|
||||
* <p>
|
||||
* deleteDocuments() method works by buffering terms to be deleted. Deletes are
|
||||
* deferred until ram is flushed to disk, either because enough new documents or
|
||||
* delete terms are buffered, or because close() or flush() is called. Using
|
||||
* Java synchronization, care is taken to ensure that an interleaved sequence of
|
||||
* inserts and deletes for the same document are properly serialized.
|
||||
*/
|
||||
|
||||
public class NewIndexModifier extends IndexWriter {
|
||||
// number of ram segments a delete term applies to
|
||||
private class Num {
|
||||
private int num;
|
||||
|
||||
Num(int num) {
|
||||
this.num = num;
|
||||
}
|
||||
|
||||
int getNum() {
|
||||
return num;
|
||||
}
|
||||
|
||||
void setNum(int num) {
|
||||
this.num = num;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Default value is 10. Change using {@link #setMaxBufferedDeleteTerms(int)}.
|
||||
*/
|
||||
public final static int DEFAULT_MAX_BUFFERED_DELETE_TERMS = 10;
|
||||
// the max number of delete terms that can be buffered before
|
||||
// they must be flushed to disk
|
||||
private int maxBufferedDeleteTerms = DEFAULT_MAX_BUFFERED_DELETE_TERMS;
|
||||
|
||||
// to buffer delete terms in ram before they are applied
|
||||
// key is delete term, value is number of ram segments the term applies to
|
||||
private HashMap bufferedDeleteTerms = new HashMap();
|
||||
private int numBufferedDeleteTerms = 0;
|
||||
|
||||
/**
|
||||
* @see IndexWriter#IndexWriter(String, Analyzer, boolean)
|
||||
*/
|
||||
public NewIndexModifier(String path, Analyzer a, boolean create)
|
||||
throws IOException {
|
||||
super(path, a, create);
|
||||
}
|
||||
|
||||
/**
|
||||
* @see IndexWriter#IndexWriter(File, Analyzer, boolean)
|
||||
*/
|
||||
public NewIndexModifier(File path, Analyzer a, boolean create)
|
||||
throws IOException {
|
||||
super(path, a, create);
|
||||
}
|
||||
|
||||
/**
|
||||
* @see IndexWriter#IndexWriter(Directory, Analyzer, boolean)
|
||||
*/
|
||||
public NewIndexModifier(Directory d, Analyzer a, boolean create)
|
||||
throws IOException {
|
||||
super(d, a, create);
|
||||
}
|
||||
|
||||
/**
|
||||
* @see IndexWriter#IndexWriter(String, Analyzer)
|
||||
*/
|
||||
public NewIndexModifier(String path, Analyzer a) throws IOException {
|
||||
super(path, a);
|
||||
}
|
||||
|
||||
/**
|
||||
* @see IndexWriter#IndexWriter(File, Analyzer)
|
||||
*/
|
||||
public NewIndexModifier(File path, Analyzer a) throws IOException {
|
||||
super(path, a);
|
||||
}
|
||||
|
||||
/**
|
||||
* @see IndexWriter#IndexWriter(Directory, Analyzer)
|
||||
*/
|
||||
public NewIndexModifier(Directory d, Analyzer a) throws IOException {
|
||||
super(d, a);
|
||||
}
|
||||
|
||||
/**
|
||||
* Determines the minimal number of delete terms required before the buffered
|
||||
* in-memory delete terms are applied and flushed. If there are documents
|
||||
* buffered in memory at the time, they are merged and a new Segment is
|
||||
* created. The delete terms are applied appropriately.
|
||||
* <p>
|
||||
* The default value is 10.
|
||||
* @throws IllegalArgumentException if maxBufferedDeleteTerms is smaller than
|
||||
* 1
|
||||
*/
|
||||
public void setMaxBufferedDeleteTerms(int maxBufferedDeleteTerms) {
|
||||
if (maxBufferedDeleteTerms < 1)
|
||||
throw new IllegalArgumentException("maxBufferedDeleteTerms must at least be 1");
|
||||
this.maxBufferedDeleteTerms = maxBufferedDeleteTerms;
|
||||
}
|
||||
|
||||
/**
|
||||
* @see #setMaxBufferedDeleteTerms
|
||||
*/
|
||||
public int getMaxBufferedDeleteTerms() {
|
||||
return maxBufferedDeleteTerms;
|
||||
}
|
||||
|
||||
// for test purpose
|
||||
final synchronized int getBufferedDeleteTermsSize() {
|
||||
return bufferedDeleteTerms.size();
|
||||
}
|
||||
|
||||
// for test purpose
|
||||
final synchronized int getNumBufferedDeleteTerms() {
|
||||
return numBufferedDeleteTerms;
|
||||
}
|
||||
|
||||
/**
|
||||
* Updates a document by first deleting all documents containing
|
||||
* <code>term</code> and then adding the new document.
|
||||
*/
|
||||
public void updateDocument(Term term, Document doc) throws IOException {
|
||||
updateDocument(term, doc, getAnalyzer());
|
||||
}
|
||||
|
||||
/**
|
||||
* Updates a document by first deleting all documents containing
|
||||
* <code>term</code> and then adding the new document.
|
||||
*/
|
||||
public void updateDocument(Term term, Document doc, Analyzer analyzer)
|
||||
throws IOException {
|
||||
SegmentInfo newSegmentInfo = buildSingleDocSegment(doc, analyzer);
|
||||
synchronized (this) {
|
||||
bufferDeleteTerm(term);
|
||||
ramSegmentInfos.addElement(newSegmentInfo);
|
||||
maybeFlushRamSegments();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Deletes all documents containing <code>term</code>.
|
||||
*/
|
||||
public synchronized void deleteDocuments(Term term) throws IOException {
|
||||
bufferDeleteTerm(term);
|
||||
maybeFlushRamSegments();
|
||||
}
|
||||
|
||||
/**
|
||||
* Deletes all documents containing any of the terms. All deletes are flushed
|
||||
* at the same time.
|
||||
*/
|
||||
public synchronized void deleteDocuments(Term[] terms) throws IOException {
|
||||
for (int i = 0; i < terms.length; i++) {
|
||||
bufferDeleteTerm(terms[i]);
|
||||
}
|
||||
maybeFlushRamSegments();
|
||||
}
|
||||
|
||||
// buffer a term in bufferedDeleteTerms. bufferedDeleteTerms also records
|
||||
// the current number of documents buffered in ram so that the delete term
|
||||
// will be applied to those ram segments as well as the disk segments
|
||||
private void bufferDeleteTerm(Term term) {
|
||||
Num num = (Num)bufferedDeleteTerms.get(term);
|
||||
if (num == null) {
|
||||
bufferedDeleteTerms.put(term, new Num(getRAMSegmentCount()));
|
||||
} else {
|
||||
num.setNum(getRAMSegmentCount());
|
||||
}
|
||||
numBufferedDeleteTerms++;
|
||||
}
|
||||
|
||||
// a flush is triggered if enough new documents are buffered or
|
||||
// if enough delete terms are buffered
|
||||
protected boolean timeToFlushRam() {
|
||||
return super.timeToFlushRam()
|
||||
|| numBufferedDeleteTerms >= maxBufferedDeleteTerms;
|
||||
}
|
||||
|
||||
protected boolean anythingToFlushRam() {
|
||||
return super.anythingToFlushRam() || bufferedDeleteTerms.size() > 0;
|
||||
}
|
||||
|
||||
protected boolean onlyRamDocsToFlush() {
|
||||
return super.onlyRamDocsToFlush() && bufferedDeleteTerms.size() == 0;
|
||||
}
|
||||
|
||||
protected void doAfterFlushRamSegments(boolean flushedRamSegments)
|
||||
throws IOException {
|
||||
if (bufferedDeleteTerms.size() > 0) {
|
||||
if (getInfoStream() != null)
|
||||
getInfoStream().println(
|
||||
"flush " + numBufferedDeleteTerms + " buffered terms on "
|
||||
+ segmentInfos.size() + " segments.");
|
||||
|
||||
if (flushedRamSegments) {
|
||||
IndexReader reader = null;
|
||||
try {
|
||||
reader = SegmentReader.get(segmentInfos.info(segmentInfos.size() - 1));
|
||||
reader.setDeleter(getDeleter());
|
||||
|
||||
// apply delete terms to the segment just flushed from ram
|
||||
// apply appropriately so that a delete term is only applied to
|
||||
// the documents buffered before it, not those buffered after it
|
||||
applyDeletesSelectively(bufferedDeleteTerms, reader);
|
||||
} finally {
|
||||
if (reader != null)
|
||||
reader.close();
|
||||
}
|
||||
}
|
||||
|
||||
int infosEnd = segmentInfos.size();
|
||||
if (flushedRamSegments) {
|
||||
infosEnd--;
|
||||
}
|
||||
|
||||
for (int i = 0; i < infosEnd; i++) {
|
||||
IndexReader reader = null;
|
||||
try {
|
||||
reader = SegmentReader.get(segmentInfos.info(i));
|
||||
reader.setDeleter(getDeleter());
|
||||
|
||||
// apply delete terms to disk segments
|
||||
// except the one just flushed from ram
|
||||
applyDeletes(bufferedDeleteTerms, reader);
|
||||
} finally {
|
||||
if (reader != null)
|
||||
reader.close();
|
||||
}
|
||||
}
|
||||
|
||||
// clean up bufferedDeleteTerms
|
||||
bufferedDeleteTerms.clear();
|
||||
numBufferedDeleteTerms = 0;
|
||||
}
|
||||
}
|
||||
|
||||
// apply buffered delete terms to the segment just flushed from ram
|
||||
// apply appropriately so that a delete term is only applied to
|
||||
// the documents buffered before it, not those buffered after it
|
||||
private final void applyDeletesSelectively(HashMap deleteTerms,
|
||||
IndexReader reader) throws IOException {
|
||||
Iterator iter = deleteTerms.entrySet().iterator();
|
||||
while (iter.hasNext()) {
|
||||
Entry entry = (Entry)iter.next();
|
||||
Term term = (Term)entry.getKey();
|
||||
|
||||
TermDocs docs = reader.termDocs(term);
|
||||
if (docs != null) {
|
||||
int num = ((Num)entry.getValue()).getNum();
|
||||
try {
|
||||
while (docs.next()) {
|
||||
int doc = docs.doc();
|
||||
if (doc >= num) {
|
||||
break;
|
||||
}
|
||||
reader.deleteDocument(doc);
|
||||
}
|
||||
} finally {
|
||||
docs.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// apply buffered delete terms to disk segments
|
||||
// except the one just flushed from ram
|
||||
private final void applyDeletes(HashMap deleteTerms, IndexReader reader)
|
||||
throws IOException {
|
||||
Iterator iter = deleteTerms.entrySet().iterator();
|
||||
while (iter.hasNext()) {
|
||||
Entry entry = (Entry)iter.next();
|
||||
Term term = (Term)entry.getKey();
|
||||
reader.deleteDocuments(term);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,446 @@
|
|||
package org.apache.lucene.index;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
|
||||
import junit.framework.TestCase;
|
||||
|
||||
import org.apache.lucene.analysis.WhitespaceAnalyzer;
|
||||
import org.apache.lucene.document.Document;
|
||||
import org.apache.lucene.document.Field;
|
||||
import org.apache.lucene.search.Hits;
|
||||
import org.apache.lucene.search.IndexSearcher;
|
||||
import org.apache.lucene.search.TermQuery;
|
||||
import org.apache.lucene.store.Directory;
|
||||
import org.apache.lucene.store.MockRAMDirectory;
|
||||
import org.apache.lucene.store.RAMDirectory;
|
||||
|
||||
public class TestNewIndexModifierDelete extends TestCase {
|
||||
|
||||
// test the simple case
|
||||
public void testSimpleCase() throws IOException {
|
||||
String[] keywords = { "1", "2" };
|
||||
String[] unindexed = { "Netherlands", "Italy" };
|
||||
String[] unstored = { "Amsterdam has lots of bridges",
|
||||
"Venice has lots of canals" };
|
||||
String[] text = { "Amsterdam", "Venice" };
|
||||
|
||||
Directory dir = new RAMDirectory();
|
||||
NewIndexModifier modifier = new NewIndexModifier(dir,
|
||||
new WhitespaceAnalyzer(), true);
|
||||
modifier.setUseCompoundFile(true);
|
||||
modifier.setMaxBufferedDeleteTerms(1);
|
||||
|
||||
for (int i = 0; i < keywords.length; i++) {
|
||||
Document doc = new Document();
|
||||
doc.add(new Field("id", keywords[i], Field.Store.YES,
|
||||
Field.Index.UN_TOKENIZED));
|
||||
doc.add(new Field("country", unindexed[i], Field.Store.YES,
|
||||
Field.Index.NO));
|
||||
doc.add(new Field("contents", unstored[i], Field.Store.NO,
|
||||
Field.Index.TOKENIZED));
|
||||
doc
|
||||
.add(new Field("city", text[i], Field.Store.YES,
|
||||
Field.Index.TOKENIZED));
|
||||
modifier.addDocument(doc);
|
||||
}
|
||||
modifier.optimize();
|
||||
|
||||
Term term = new Term("city", "Amsterdam");
|
||||
int hitCount = getHitCount(dir, term);
|
||||
assertEquals(1, hitCount);
|
||||
modifier.deleteDocuments(term);
|
||||
hitCount = getHitCount(dir, term);
|
||||
assertEquals(0, hitCount);
|
||||
|
||||
modifier.close();
|
||||
}
|
||||
|
||||
// test when delete terms only apply to disk segments
|
||||
public void testNonRAMDelete() throws IOException {
|
||||
Directory dir = new RAMDirectory();
|
||||
NewIndexModifier modifier = new NewIndexModifier(dir,
|
||||
new WhitespaceAnalyzer(), true);
|
||||
modifier.setMaxBufferedDocs(2);
|
||||
modifier.setMaxBufferedDeleteTerms(2);
|
||||
|
||||
int id = 0;
|
||||
int value = 100;
|
||||
|
||||
for (int i = 0; i < 7; i++) {
|
||||
addDoc(modifier, ++id, value);
|
||||
}
|
||||
modifier.flush();
|
||||
|
||||
assertEquals(0, modifier.getRAMSegmentCount());
|
||||
assertTrue(0 < modifier.getSegmentCount());
|
||||
|
||||
IndexReader reader = IndexReader.open(dir);
|
||||
assertEquals(7, reader.numDocs());
|
||||
reader.close();
|
||||
|
||||
modifier.deleteDocuments(new Term("value", String.valueOf(value)));
|
||||
modifier.deleteDocuments(new Term("value", String.valueOf(value)));
|
||||
|
||||
reader = IndexReader.open(dir);
|
||||
assertEquals(0, reader.numDocs());
|
||||
reader.close();
|
||||
|
||||
modifier.close();
|
||||
}
|
||||
|
||||
// test when delete terms only apply to ram segments
|
||||
public void testRAMDeletes() throws IOException {
|
||||
Directory dir = new RAMDirectory();
|
||||
NewIndexModifier modifier = new NewIndexModifier(dir,
|
||||
new WhitespaceAnalyzer(), true);
|
||||
modifier.setMaxBufferedDocs(4);
|
||||
modifier.setMaxBufferedDeleteTerms(4);
|
||||
|
||||
int id = 0;
|
||||
int value = 100;
|
||||
|
||||
addDoc(modifier, ++id, value);
|
||||
modifier.deleteDocuments(new Term("value", String.valueOf(value)));
|
||||
addDoc(modifier, ++id, value);
|
||||
modifier.deleteDocuments(new Term("value", String.valueOf(value)));
|
||||
|
||||
assertEquals(2, modifier.getNumBufferedDeleteTerms());
|
||||
assertEquals(1, modifier.getBufferedDeleteTermsSize());
|
||||
|
||||
addDoc(modifier, ++id, value);
|
||||
assertEquals(0, modifier.getSegmentCount());
|
||||
modifier.flush();
|
||||
|
||||
IndexReader reader = IndexReader.open(dir);
|
||||
assertEquals(1, reader.numDocs());
|
||||
|
||||
int hitCount = getHitCount(dir, new Term("id", String.valueOf(id)));
|
||||
assertEquals(1, hitCount);
|
||||
reader.close();
|
||||
|
||||
modifier.close();
|
||||
}
|
||||
|
||||
// test when delete terms apply to both disk and ram segments
|
||||
public void testBothDeletes() throws IOException {
|
||||
Directory dir = new RAMDirectory();
|
||||
NewIndexModifier modifier = new NewIndexModifier(dir,
|
||||
new WhitespaceAnalyzer(), true);
|
||||
modifier.setMaxBufferedDocs(100);
|
||||
modifier.setMaxBufferedDeleteTerms(100);
|
||||
|
||||
int id = 0;
|
||||
int value = 100;
|
||||
|
||||
for (int i = 0; i < 5; i++) {
|
||||
addDoc(modifier, ++id, value);
|
||||
}
|
||||
|
||||
value = 200;
|
||||
for (int i = 0; i < 5; i++) {
|
||||
addDoc(modifier, ++id, value);
|
||||
}
|
||||
modifier.flush();
|
||||
|
||||
for (int i = 0; i < 5; i++) {
|
||||
addDoc(modifier, ++id, value);
|
||||
}
|
||||
modifier.deleteDocuments(new Term("value", String.valueOf(value)));
|
||||
modifier.flush();
|
||||
|
||||
IndexReader reader = IndexReader.open(dir);
|
||||
assertEquals(5, reader.numDocs());
|
||||
|
||||
modifier.close();
|
||||
}
|
||||
|
||||
// test that batched delete terms are flushed together
|
||||
public void testBatchDeletes() throws IOException {
|
||||
Directory dir = new RAMDirectory();
|
||||
NewIndexModifier modifier = new NewIndexModifier(dir,
|
||||
new WhitespaceAnalyzer(), true);
|
||||
modifier.setMaxBufferedDocs(2);
|
||||
modifier.setMaxBufferedDeleteTerms(2);
|
||||
|
||||
int id = 0;
|
||||
int value = 100;
|
||||
|
||||
for (int i = 0; i < 7; i++) {
|
||||
addDoc(modifier, ++id, value);
|
||||
}
|
||||
modifier.flush();
|
||||
|
||||
IndexReader reader = IndexReader.open(dir);
|
||||
assertEquals(7, reader.numDocs());
|
||||
reader.close();
|
||||
|
||||
id = 0;
|
||||
modifier.deleteDocuments(new Term("id", String.valueOf(++id)));
|
||||
modifier.deleteDocuments(new Term("id", String.valueOf(++id)));
|
||||
|
||||
reader = IndexReader.open(dir);
|
||||
assertEquals(5, reader.numDocs());
|
||||
reader.close();
|
||||
|
||||
Term[] terms = new Term[3];
|
||||
for (int i = 0; i < terms.length; i++) {
|
||||
terms[i] = new Term("id", String.valueOf(++id));
|
||||
}
|
||||
modifier.deleteDocuments(terms);
|
||||
|
||||
reader = IndexReader.open(dir);
|
||||
assertEquals(2, reader.numDocs());
|
||||
reader.close();
|
||||
|
||||
modifier.close();
|
||||
}
|
||||
|
||||
private void addDoc(NewIndexModifier modifier, int id, int value)
|
||||
throws IOException {
|
||||
Document doc = new Document();
|
||||
doc.add(new Field("content", "aaa", Field.Store.NO, Field.Index.TOKENIZED));
|
||||
doc.add(new Field("id", String.valueOf(id), Field.Store.YES,
|
||||
Field.Index.UN_TOKENIZED));
|
||||
doc.add(new Field("value", String.valueOf(value), Field.Store.NO,
|
||||
Field.Index.UN_TOKENIZED));
|
||||
modifier.addDocument(doc);
|
||||
}
|
||||
|
||||
private int getHitCount(Directory dir, Term term) throws IOException {
|
||||
IndexSearcher searcher = new IndexSearcher(dir);
|
||||
int hitCount = searcher.search(new TermQuery(term)).length();
|
||||
searcher.close();
|
||||
return hitCount;
|
||||
}
|
||||
|
||||
public void testDeletesOnDiskFull() throws IOException {
|
||||
testOperationsOnDiskFull(false);
|
||||
}
|
||||
|
||||
public void testUpdatesOnDiskFull() throws IOException {
|
||||
testOperationsOnDiskFull(true);
|
||||
}
|
||||
|
||||
/**
|
||||
* Make sure if modifier tries to commit but hits disk full that modifier
|
||||
* remains consistent and usable. Similar to TestIndexReader.testDiskFull().
|
||||
*/
|
||||
private void testOperationsOnDiskFull(boolean updates) throws IOException {
|
||||
|
||||
boolean debug = false;
|
||||
Term searchTerm = new Term("content", "aaa");
|
||||
int START_COUNT = 157;
|
||||
int END_COUNT = 144;
|
||||
|
||||
// First build up a starting index:
|
||||
RAMDirectory startDir = new RAMDirectory();
|
||||
IndexWriter writer = new IndexWriter(startDir, new WhitespaceAnalyzer(),
|
||||
true);
|
||||
for (int i = 0; i < 157; i++) {
|
||||
Document d = new Document();
|
||||
d.add(new Field("id", Integer.toString(i), Field.Store.YES,
|
||||
Field.Index.UN_TOKENIZED));
|
||||
d.add(new Field("content", "aaa " + i, Field.Store.NO,
|
||||
Field.Index.TOKENIZED));
|
||||
writer.addDocument(d);
|
||||
}
|
||||
writer.close();
|
||||
|
||||
long diskUsage = startDir.sizeInBytes();
|
||||
long diskFree = diskUsage + 10;
|
||||
|
||||
IOException err = null;
|
||||
|
||||
boolean done = false;
|
||||
|
||||
// Iterate w/ ever increasing free disk space:
|
||||
while (!done) {
|
||||
MockRAMDirectory dir = new MockRAMDirectory(startDir);
|
||||
NewIndexModifier modifier = new NewIndexModifier(dir,
|
||||
new WhitespaceAnalyzer(), false);
|
||||
|
||||
modifier.setMaxBufferedDocs(1000); // use flush or close
|
||||
modifier.setMaxBufferedDeleteTerms(1000); // use flush or close
|
||||
|
||||
// For each disk size, first try to commit against
|
||||
// dir that will hit random IOExceptions & disk
|
||||
// full; after, give it infinite disk space & turn
|
||||
// off random IOExceptions & retry w/ same reader:
|
||||
boolean success = false;
|
||||
|
||||
for (int x = 0; x < 2; x++) {
|
||||
|
||||
double rate = 0.1;
|
||||
double diskRatio = ((double)diskFree) / diskUsage;
|
||||
long thisDiskFree;
|
||||
String testName;
|
||||
|
||||
if (0 == x) {
|
||||
thisDiskFree = diskFree;
|
||||
if (diskRatio >= 2.0) {
|
||||
rate /= 2;
|
||||
}
|
||||
if (diskRatio >= 4.0) {
|
||||
rate /= 2;
|
||||
}
|
||||
if (diskRatio >= 6.0) {
|
||||
rate = 0.0;
|
||||
}
|
||||
if (debug) {
|
||||
System.out.println("\ncycle: " + diskFree + " bytes");
|
||||
}
|
||||
testName = "disk full during reader.close() @ " + thisDiskFree
|
||||
+ " bytes";
|
||||
} else {
|
||||
thisDiskFree = 0;
|
||||
rate = 0.0;
|
||||
if (debug) {
|
||||
System.out.println("\ncycle: same writer: unlimited disk space");
|
||||
}
|
||||
testName = "reader re-use after disk full";
|
||||
}
|
||||
|
||||
dir.setMaxSizeInBytes(thisDiskFree);
|
||||
dir.setRandomIOExceptionRate(rate, diskFree);
|
||||
|
||||
try {
|
||||
if (0 == x) {
|
||||
int docId = 12;
|
||||
for (int i = 0; i < 13; i++) {
|
||||
if (updates) {
|
||||
Document d = new Document();
|
||||
d.add(new Field("id", Integer.toString(i), Field.Store.YES,
|
||||
Field.Index.UN_TOKENIZED));
|
||||
d.add(new Field("content", "bbb " + i, Field.Store.NO,
|
||||
Field.Index.TOKENIZED));
|
||||
modifier.updateDocument(
|
||||
new Term("id", Integer.toString(docId)), d);
|
||||
} else { // deletes
|
||||
modifier
|
||||
.deleteDocuments(new Term("id", Integer.toString(docId)));
|
||||
// modifier.setNorm(docId, "contents", (float)2.0);
|
||||
}
|
||||
docId += 12;
|
||||
}
|
||||
}
|
||||
modifier.close();
|
||||
success = true;
|
||||
if (0 == x) {
|
||||
done = true;
|
||||
}
|
||||
}
|
||||
catch (IOException e) {
|
||||
if (debug) {
|
||||
System.out.println(" hit IOException: " + e);
|
||||
}
|
||||
err = e;
|
||||
if (1 == x) {
|
||||
e.printStackTrace();
|
||||
fail(testName + " hit IOException after disk space was freed up");
|
||||
}
|
||||
}
|
||||
|
||||
// Whether we succeeded or failed, check that all
|
||||
// un-referenced files were in fact deleted (ie,
|
||||
// we did not create garbage). Just create a
|
||||
// new IndexFileDeleter, have it delete
|
||||
// unreferenced files, then verify that in fact
|
||||
// no files were deleted:
|
||||
String[] startFiles = dir.list();
|
||||
SegmentInfos infos = new SegmentInfos();
|
||||
infos.read(dir);
|
||||
IndexFileDeleter d = new IndexFileDeleter(infos, dir);
|
||||
d.findDeletableFiles();
|
||||
d.deleteFiles();
|
||||
String[] endFiles = dir.list();
|
||||
|
||||
Arrays.sort(startFiles);
|
||||
Arrays.sort(endFiles);
|
||||
|
||||
// for(int i=0;i<startFiles.length;i++) {
|
||||
// System.out.println(" startFiles: " + i + ": " + startFiles[i]);
|
||||
// }
|
||||
|
||||
if (!Arrays.equals(startFiles, endFiles)) {
|
||||
String successStr;
|
||||
if (success) {
|
||||
successStr = "success";
|
||||
} else {
|
||||
successStr = "IOException";
|
||||
err.printStackTrace();
|
||||
}
|
||||
fail("reader.close() failed to delete unreferenced files after "
|
||||
+ successStr + " (" + diskFree + " bytes): before delete:\n "
|
||||
+ arrayToString(startFiles) + "\n after delete:\n "
|
||||
+ arrayToString(endFiles));
|
||||
}
|
||||
|
||||
// Finally, verify index is not corrupt, and, if
|
||||
// we succeeded, we see all docs changed, and if
|
||||
// we failed, we see either all docs or no docs
|
||||
// changed (transactional semantics):
|
||||
IndexReader newReader = null;
|
||||
try {
|
||||
newReader = IndexReader.open(dir);
|
||||
}
|
||||
catch (IOException e) {
|
||||
e.printStackTrace();
|
||||
fail(testName
|
||||
+ ":exception when creating IndexReader after disk full during close: "
|
||||
+ e);
|
||||
}
|
||||
|
||||
IndexSearcher searcher = new IndexSearcher(newReader);
|
||||
Hits hits = null;
|
||||
try {
|
||||
hits = searcher.search(new TermQuery(searchTerm));
|
||||
}
|
||||
catch (IOException e) {
|
||||
e.printStackTrace();
|
||||
fail(testName + ": exception when searching: " + e);
|
||||
}
|
||||
int result2 = hits.length();
|
||||
if (success) {
|
||||
if (result2 != END_COUNT) {
|
||||
fail(testName
|
||||
+ ": method did not throw exception but hits.length for search on term 'aaa' is "
|
||||
+ result2 + " instead of expected " + END_COUNT);
|
||||
}
|
||||
} else {
|
||||
// On hitting exception we still may have added
|
||||
// all docs:
|
||||
if (result2 != START_COUNT && result2 != END_COUNT) {
|
||||
err.printStackTrace();
|
||||
fail(testName
|
||||
+ ": method did throw exception but hits.length for search on term 'aaa' is "
|
||||
+ result2 + " instead of expected " + START_COUNT);
|
||||
}
|
||||
}
|
||||
|
||||
searcher.close();
|
||||
newReader.close();
|
||||
|
||||
if (result2 == END_COUNT) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
dir.close();
|
||||
|
||||
// Try again with 10 more bytes of free space:
|
||||
diskFree += 10;
|
||||
}
|
||||
}
|
||||
|
||||
private String arrayToString(String[] l) {
|
||||
String s = "";
|
||||
for (int i = 0; i < l.length; i++) {
|
||||
if (i > 0) {
|
||||
s += "\n ";
|
||||
}
|
||||
s += l[i];
|
||||
}
|
||||
return s;
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue