LUCENE-1112: skip immense terms and mark a doc for deletion if it hits a non-aborting exception

git-svn-id: https://svn.apache.org/repos/asf/lucene/java/trunk@608534 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael McCandless 2008-01-03 15:49:50 +00:00
parent f12862426a
commit 2d633f98a2
3 changed files with 193 additions and 115 deletions

View File

@ -97,6 +97,28 @@ import java.util.Collections;
* threads and flush only once they are all idle. This
* means you can call flush with a given thread even while
* other threads are actively adding/deleting documents.
*
*
* Exceptions:
*
* Because this class directly updates in-memory posting
* lists, and flushes stored fields and term vectors
* directly to files in the directory, there are certain
* limited times when an exception can corrupt this state.
* For example, a disk full while flushing stored fields
* leaves this file in a corrupt state. Or, an OOM
* exception while appending to the in-memory posting lists
* can corrupt that posting list. We call such exceptions
* "aborting exceptions". In these cases we must call
* abort() to discard all docs added since the last flush.
*
* All other exceptions ("non-aborting exceptions") can
* still partially update the index structures. These
* updates are consistent, but, they represent only a part
* of the document seen up until the exception was hit.
* When this happens, we immediately mark the document as
* deleted so that the document is always atomically ("all
* or none") added to the index.
*/
final class DocumentsWriter {
@ -137,6 +159,9 @@ final class DocumentsWriter {
private HashMap bufferedDeleteTerms = new HashMap();
private int numBufferedDeleteTerms = 0;
// Currently used only for deleting a doc on hitting an non-aborting exception
private List bufferedDeleteDocIDs = new ArrayList();
// The max number of delete terms that can be buffered before
// they must be flushed to disk.
private int maxBufferedDeleteTerms = IndexWriter.DEFAULT_MAX_BUFFERED_DELETE_TERMS;
@ -155,6 +180,7 @@ final class DocumentsWriter {
private static int OBJECT_HEADER_BYTES = 12;
private static int OBJECT_POINTER_BYTES = 4; // TODO: should be 8 on 64-bit platform
private static int BYTES_PER_CHAR = 2;
private static int BYTES_PER_INT = 4;
private BufferedNorms[] norms = new BufferedNorms[0]; // Holds norms until we flush
@ -311,6 +337,7 @@ final class DocumentsWriter {
pauseAllThreads();
bufferedDeleteTerms.clear();
bufferedDeleteDocIDs.clear();
numBufferedDeleteTerms = 0;
try {
@ -522,7 +549,8 @@ final class DocumentsWriter {
int numAllFieldData;
FieldData[] fieldDataHash; // Hash FieldData instances by field name
int fieldDataHashMask;
int maxTermHit; // Set to > 0 if this doc has a too-large term
String maxTermPrefix; // Non-null prefix of a too-large term if this
// doc has one
boolean doFlushAfter;
boolean abortOnExc;
@ -623,7 +651,7 @@ final class DocumentsWriter {
numStoredFields = 0;
numFieldData = 0;
numVectorFields = 0;
maxTermHit = 0;
maxTermPrefix = null;
assert 0 == fdtLocal.length();
assert 0 == tvfLocal.length();
@ -717,12 +745,10 @@ final class DocumentsWriter {
}
if (field.isTermVectorStored()) {
if (!fp.doVectors) {
if (numVectorFields++ == vectorFieldPointers.length) {
final int newSize = (int) (numVectorFields*1.5);
vectorFieldPointers = new long[newSize];
vectorFieldNumbers = new int[newSize];
}
if (!fp.doVectors && numVectorFields++ == vectorFieldPointers.length) {
final int newSize = (int) (numVectorFields*1.5);
vectorFieldPointers = new long[newSize];
vectorFieldNumbers = new int[newSize];
}
fp.doVectors = true;
docHasVectors = true;
@ -989,6 +1015,9 @@ final class DocumentsWriter {
for(int i=0;i<numFields;i++)
fieldDataArray[i].processField(analyzer);
if (maxTermPrefix != null && infoStream != null)
infoStream.println("WARNING: document contains at least one immense term (longer than the max length " + MAX_TERM_LENGTH + "), all of which were skipped. Please correct the analyzer to not produce such terms. The prefix of the first immense term is: '" + maxTermPrefix + "...'");
if (ramBufferSize != IndexWriter.DISABLE_AUTO_FLUSH
&& numBytesUsed > 0.95 * ramBufferSize)
balanceRAM();
@ -1553,11 +1582,17 @@ final class DocumentsWriter {
final int textLen1 = 1+tokenTextLen;
if (textLen1 + charPool.byteUpto > CHAR_BLOCK_SIZE) {
if (textLen1 > CHAR_BLOCK_SIZE) {
maxTermHit = tokenTextLen;
// Just skip this term; we will throw an
// exception after processing all accepted
// terms in the doc
// Just skip this term, to remain as robust as
// possible during indexing. A TokenFilter
// can be inserted into the analyzer chain if
// other behavior is wanted (pruning the term
// to a prefix, throwing an exception, etc).
abortOnExc = false;
if (maxTermPrefix == null)
maxTermPrefix = new String(tokenText, 0, 30);
// Still increment position:
position++;
return;
}
charPool.nextBuffer();
@ -2267,29 +2302,27 @@ final class DocumentsWriter {
/** Returns true if the caller (IndexWriter) should now
* flush. */
int addDocument(Document doc, Analyzer analyzer)
boolean addDocument(Document doc, Analyzer analyzer)
throws CorruptIndexException, IOException {
return updateDocument(doc, analyzer, null);
}
int updateDocument(Term t, Document doc, Analyzer analyzer)
boolean updateDocument(Term t, Document doc, Analyzer analyzer)
throws CorruptIndexException, IOException {
return updateDocument(doc, analyzer, t);
}
int updateDocument(Document doc, Analyzer analyzer, Term delTerm)
boolean updateDocument(Document doc, Analyzer analyzer, Term delTerm)
throws CorruptIndexException, IOException {
// This call is synchronized but fast
final ThreadState state = getThreadState(doc, delTerm);
boolean success = false;
int maxTermHit;
try {
try {
// This call is not synchronized and does all the work
state.processDocument(analyzer);
} finally {
maxTermHit = state.maxTermHit;
// This call is synchronized but fast
finishDocument(state);
}
@ -2299,16 +2332,20 @@ final class DocumentsWriter {
synchronized(this) {
state.isIdle = true;
if (state.abortOnExc)
// Abort all buffered docs since last flush
abort();
else
// Immediately mark this document as deleted
// since likely it was partially added. This
// keeps indexing as "all or none" (atomic) when
// adding a document:
addDeleteDocID(state.docID);
notifyAll();
}
}
}
int status = maxTermHit<<1;
if (state.doFlushAfter || timeToFlushDeletes())
status += 1;
return status;
return state.doFlushAfter || timeToFlushDeletes();
}
synchronized int getNumBufferedDeleteTerms() {
@ -2319,9 +2356,14 @@ final class DocumentsWriter {
return bufferedDeleteTerms;
}
synchronized List getBufferedDeleteDocIDs() {
return bufferedDeleteDocIDs;
}
// Reset buffered deletes.
synchronized void clearBufferedDeleteTerms() throws IOException {
synchronized void clearBufferedDeletes() throws IOException {
bufferedDeleteTerms.clear();
bufferedDeleteDocIDs.clear();
numBufferedDeleteTerms = 0;
if (numBytesUsed > 0)
resetPostingsData();
@ -2366,7 +2408,7 @@ final class DocumentsWriter {
}
synchronized boolean hasDeletes() {
return bufferedDeleteTerms.size() > 0;
return bufferedDeleteTerms.size() > 0 || bufferedDeleteDocIDs.size() > 0;
}
// Number of documents a delete term applies to.
@ -2414,6 +2456,13 @@ final class DocumentsWriter {
numBufferedDeleteTerms++;
}
// Buffer a specific docID for deletion. Currently only
// used when we hit a exception when adding a document
synchronized private void addDeleteDocID(int docId) {
bufferedDeleteDocIDs.add(new Integer(docId));
numBytesUsed += OBJECT_HEADER_BYTES + BYTES_PER_INT + OBJECT_POINTER_BYTES;
}
/** Does the synchronized work to finish/flush the
* inverted document. */
private synchronized void finishDocument(ThreadState state) throws IOException {
@ -2910,6 +2959,8 @@ final class DocumentsWriter {
final static int CHAR_BLOCK_SIZE = (int) Math.pow(2.0, CHAR_BLOCK_SHIFT);
final static int CHAR_BLOCK_MASK = CHAR_BLOCK_SIZE - 1;
final static int MAX_TERM_LENGTH = CHAR_BLOCK_SIZE-1;
private ArrayList freeCharBlocks = new ArrayList();
/* Allocate another char[] from the shared pool */

View File

@ -241,6 +241,14 @@ public class IndexWriter {
* Default value is 128. Change using {@link #setTermIndexInterval(int)}.
*/
public final static int DEFAULT_TERM_INDEX_INTERVAL = 128;
/**
* Absolute hard maximum length for a term. If a term
* arrives from the analyzer longer than this length, it
* is skipped and a message is printed to infoStream, if
* set (see {@link setInfoStream}).
*/
public final static int MAX_TERM_LENGTH = DocumentsWriter.MAX_TERM_LENGTH;
// The normal read buffer size defaults to 1024, but
// increasing this during merging seems to yield
@ -1431,10 +1439,10 @@ public class IndexWriter {
*/
public void addDocument(Document doc, Analyzer analyzer) throws CorruptIndexException, IOException {
ensureOpen();
int status = 0;
boolean doFlush = false;
boolean success = false;
try {
status = docWriter.addDocument(doc, analyzer);
doFlush = docWriter.addDocument(doc, analyzer);
success = true;
} finally {
if (!success) {
@ -1453,9 +1461,8 @@ public class IndexWriter {
}
}
}
if ((status & 1) != 0)
if (doFlush)
flush(true, false);
checkMaxTermLength(status);
}
/**
@ -1519,10 +1526,10 @@ public class IndexWriter {
public void updateDocument(Term term, Document doc, Analyzer analyzer)
throws CorruptIndexException, IOException {
ensureOpen();
int status = 0;
boolean doFlush = false;
boolean success = false;
try {
status = docWriter.updateDocument(term, doc, analyzer);
doFlush = docWriter.updateDocument(term, doc, analyzer);
success = true;
} finally {
if (!success) {
@ -1539,17 +1546,8 @@ public class IndexWriter {
}
}
}
if ((status & 1) != 0)
if (doFlush)
flush(true, false);
checkMaxTermLength(status);
}
/** Throws IllegalArgumentException if the return status
* from DocumentsWriter.{add,update}Document indicates
* that a too-long term was encountered */
final private void checkMaxTermLength(int status) {
if (status > 1)
throw new IllegalArgumentException("at least one term (length " + (status>>1) + ") exceeds max term length " + (DocumentsWriter.CHAR_BLOCK_SIZE-1) + "; these terms were skipped");
}
// for test purpose
@ -2500,9 +2498,7 @@ public class IndexWriter {
// buffer deletes longer and then flush them to
// multiple flushed segments, when
// autoCommit=false
int delCount = applyDeletes(flushDocs);
if (infoStream != null)
infoStream.println("flushed " + delCount + " deleted documents");
applyDeletes(flushDocs);
doAfterFlush();
}
@ -3220,68 +3216,65 @@ public class IndexWriter {
// flushedNewSegment is true then a new segment was just
// created and flushed from the ram segments, so we will
// selectively apply the deletes to that new segment.
private final int applyDeletes(boolean flushedNewSegment) throws CorruptIndexException, IOException {
private final void applyDeletes(boolean flushedNewSegment) throws CorruptIndexException, IOException {
final HashMap bufferedDeleteTerms = docWriter.getBufferedDeleteTerms();
final List bufferedDeleteDocIDs = docWriter.getBufferedDeleteDocIDs();
int delCount = 0;
if (bufferedDeleteTerms.size() > 0) {
if (infoStream != null)
message("flush " + docWriter.getNumBufferedDeleteTerms() + " buffered deleted terms on "
+ segmentInfos.size() + " segments.");
if (infoStream != null)
message("flush " + docWriter.getNumBufferedDeleteTerms() + " buffered deleted terms and " +
bufferedDeleteDocIDs.size() + " deleted docIDs on "
+ segmentInfos.size() + " segments.");
if (flushedNewSegment) {
IndexReader reader = null;
try {
// Open readers w/o opening the stored fields /
// vectors because these files may still be held
// open for writing by docWriter
reader = SegmentReader.get(segmentInfos.info(segmentInfos.size() - 1), false);
if (flushedNewSegment) {
IndexReader reader = null;
try {
// Open readers w/o opening the stored fields /
// vectors because these files may still be held
// open for writing by docWriter
reader = SegmentReader.get(segmentInfos.info(segmentInfos.size() - 1), false);
// Apply delete terms to the segment just flushed from ram
// apply appropriately so that a delete term is only applied to
// the documents buffered before it, not those buffered after it.
delCount += applyDeletesSelectively(bufferedDeleteTerms, reader);
} finally {
if (reader != null) {
try {
reader.doCommit();
} finally {
reader.doClose();
}
// Apply delete terms to the segment just flushed from ram
// apply appropriately so that a delete term is only applied to
// the documents buffered before it, not those buffered after it.
applyDeletesSelectively(bufferedDeleteTerms, bufferedDeleteDocIDs, reader);
} finally {
if (reader != null) {
try {
reader.doCommit();
} finally {
reader.doClose();
}
}
}
int infosEnd = segmentInfos.size();
if (flushedNewSegment) {
infosEnd--;
}
for (int i = 0; i < infosEnd; i++) {
IndexReader reader = null;
try {
reader = SegmentReader.get(segmentInfos.info(i), false);
// Apply delete terms to disk segments
// except the one just flushed from ram.
delCount += applyDeletes(bufferedDeleteTerms, reader);
} finally {
if (reader != null) {
try {
reader.doCommit();
} finally {
reader.doClose();
}
}
}
}
// Clean up bufferedDeleteTerms.
docWriter.clearBufferedDeleteTerms();
}
return delCount;
int infosEnd = segmentInfos.size();
if (flushedNewSegment) {
infosEnd--;
}
for (int i = 0; i < infosEnd; i++) {
IndexReader reader = null;
try {
reader = SegmentReader.get(segmentInfos.info(i), false);
// Apply delete terms to disk segments
// except the one just flushed from ram.
applyDeletes(bufferedDeleteTerms, reader);
} finally {
if (reader != null) {
try {
reader.doCommit();
} finally {
reader.doClose();
}
}
}
}
// Clean up bufferedDeleteTerms.
docWriter.clearBufferedDeletes();
}
// For test purposes.
@ -3297,10 +3290,10 @@ public class IndexWriter {
// Apply buffered delete terms to the segment just flushed from ram
// apply appropriately so that a delete term is only applied to
// the documents buffered before it, not those buffered after it.
private final int applyDeletesSelectively(HashMap deleteTerms,
IndexReader reader) throws CorruptIndexException, IOException {
private final void applyDeletesSelectively(HashMap deleteTerms, List deleteIds,
IndexReader reader)
throws CorruptIndexException, IOException {
Iterator iter = deleteTerms.entrySet().iterator();
int delCount = 0;
while (iter.hasNext()) {
Entry entry = (Entry) iter.next();
Term term = (Term) entry.getKey();
@ -3315,26 +3308,28 @@ public class IndexWriter {
break;
}
reader.deleteDocument(doc);
delCount++;
}
} finally {
docs.close();
}
}
}
return delCount;
if (deleteIds.size() > 0) {
iter = deleteIds.iterator();
while(iter.hasNext())
reader.deleteDocument(((Integer) iter.next()).intValue());
}
}
// Apply buffered delete terms to this reader.
private final int applyDeletes(HashMap deleteTerms, IndexReader reader)
private final void applyDeletes(HashMap deleteTerms, IndexReader reader)
throws CorruptIndexException, IOException {
Iterator iter = deleteTerms.entrySet().iterator();
int delCount = 0;
while (iter.hasNext()) {
Entry entry = (Entry) iter.next();
delCount += reader.deleteDocuments((Term) entry.getKey());
reader.deleteDocuments((Term) entry.getKey());
}
return delCount;
}
// utility routines for tests

View File

@ -538,8 +538,7 @@ public class TestIndexWriter extends LuceneTestCase
}
/**
* Make sure we get a friendly exception for a wicked
* long term.
* Make sure we skip wicked long terms.
*/
public void testWickedLongTerm() throws IOException {
RAMDirectory dir = new RAMDirectory();
@ -552,13 +551,9 @@ public class TestIndexWriter extends LuceneTestCase
// Max length term is 16383, so this contents produces
// a too-long term:
String contents = "abc xyz x" + bigTerm;
String contents = "abc xyz x" + bigTerm + " another term";
doc.add(new Field("content", contents, Field.Store.NO, Field.Index.TOKENIZED));
try {
writer.addDocument(doc);
fail("did not hit expected exception");
} catch (IllegalArgumentException e) {
}
writer.addDocument(doc);
// Make sure we can add another normal document
doc = new Document();
@ -567,9 +562,24 @@ public class TestIndexWriter extends LuceneTestCase
writer.close();
IndexReader reader = IndexReader.open(dir);
// Make sure all terms < max size were indexed
assertEquals(2, reader.docFreq(new Term("content", "abc")));
assertEquals(1, reader.docFreq(new Term("content", "bbb")));
assertEquals(1, reader.docFreq(new Term("content", "term")));
assertEquals(1, reader.docFreq(new Term("content", "another")));
// Make sure position is still incremented when
// massive term is skipped:
TermPositions tps = reader.termPositions(new Term("content", "another"));
assertTrue(tps.next());
assertEquals(1, tps.freq());
assertEquals(3, tps.nextPosition());
// Make sure the doc that has the massive term is in
// the index:
assertEquals("document with wicked long term should is not in the index!", 2, reader.numDocs());
reader.close();
// Make sure we can add a document with exactly the
@ -1789,7 +1799,18 @@ public class TestIndexWriter extends LuceneTestCase
writer.close();
IndexReader reader = IndexReader.open(dir);
assertEquals(reader.docFreq(new Term("content", "aa")), 3);
final Term t = new Term("content", "aa");
assertEquals(reader.docFreq(t), 3);
// Make sure the doc that hit the exception was marked
// as deleted:
TermDocs tdocs = reader.termDocs(t);
int count = 0;
while(tdocs.next()) {
count++;
}
assertEquals(2, count);
assertEquals(reader.docFreq(new Term("content", "gg")), 0);
reader.close();
dir.close();
@ -1905,12 +1926,18 @@ public class TestIndexWriter extends LuceneTestCase
int expected = 3+(1-i)*2;
assertEquals(expected, reader.docFreq(new Term("contents", "here")));
assertEquals(expected, reader.maxDoc());
int numDel = 0;
for(int j=0;j<reader.maxDoc();j++) {
reader.document(j);
if (reader.isDeleted(j))
numDel++;
else
reader.document(j);
reader.getTermFreqVectors(j);
}
reader.close();
assertEquals(1, numDel);
writer = new IndexWriter(dir, analyzer);
writer.setMaxBufferedDocs(10);
doc = new Document();
@ -1922,14 +1949,19 @@ public class TestIndexWriter extends LuceneTestCase
writer.close();
reader = IndexReader.open(dir);
expected = 20+(1-i)*2;
expected = 19+(1-i)*2;
assertEquals(expected, reader.docFreq(new Term("contents", "here")));
assertEquals(expected, reader.maxDoc());
numDel = 0;
for(int j=0;j<reader.maxDoc();j++) {
reader.document(j);
if (reader.isDeleted(j))
numDel++;
else
reader.document(j);
reader.getTermFreqVectors(j);
}
reader.close();
assertEquals(0, numDel);
dir.close();
}