mirror of https://github.com/apache/lucene.git
LUCENE-1072: make sure an exception raised in Tokenizer.next() leaves DocumentsWriter in OK state
git-svn-id: https://svn.apache.org/repos/asf/lucene/java/trunk@601121 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
c23cb457ef
commit
0d2d6475ac
|
@ -389,6 +389,7 @@ final class DocumentsWriter {
|
|||
try {
|
||||
wait();
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -521,6 +522,7 @@ final class DocumentsWriter {
|
|||
int maxTermHit; // Set to > 0 if this doc has a too-large term
|
||||
|
||||
boolean doFlushAfter;
|
||||
boolean abortOnExc;
|
||||
|
||||
public ThreadState() {
|
||||
fieldDataArray = new FieldData[8];
|
||||
|
@ -558,6 +560,12 @@ final class DocumentsWriter {
|
|||
* the ThreadState into the "real" stores. */
|
||||
public void writeDocument() throws IOException {
|
||||
|
||||
// If we hit an exception while appending to the
|
||||
// stored fields or term vectors files, we have to
|
||||
// abort because it means those files are possibly
|
||||
// inconsistent.
|
||||
abortOnExc = true;
|
||||
|
||||
// Append stored fields to the real FieldsWriter:
|
||||
fieldsWriter.flushDocument(fdtLocal);
|
||||
fdtLocal.reset();
|
||||
|
@ -581,6 +589,7 @@ final class DocumentsWriter {
|
|||
tvfLocal.reset();
|
||||
}
|
||||
}
|
||||
abortOnExc = false;
|
||||
|
||||
// Append norms for the fields we saw:
|
||||
for(int i=0;i<numFieldData;i++) {
|
||||
|
@ -604,6 +613,7 @@ final class DocumentsWriter {
|
|||
/** Initializes shared state for this new document */
|
||||
void init(Document doc, int docID) throws IOException {
|
||||
|
||||
abortOnExc = false;
|
||||
this.docID = docID;
|
||||
docBoost = doc.getBoost();
|
||||
numStoredFields = 0;
|
||||
|
@ -1177,6 +1187,7 @@ final class DocumentsWriter {
|
|||
boolean doVectors;
|
||||
boolean doVectorPositions;
|
||||
boolean doVectorOffsets;
|
||||
boolean postingsCompacted;
|
||||
|
||||
int numPostings;
|
||||
|
||||
|
@ -1197,8 +1208,11 @@ final class DocumentsWriter {
|
|||
}
|
||||
|
||||
void resetPostingArrays() {
|
||||
if (!postingsCompacted)
|
||||
compactPostings();
|
||||
recyclePostings(this.postingsHash, numPostings);
|
||||
Arrays.fill(postingsHash, 0, postingsHash.length, null);
|
||||
postingsCompacted = false;
|
||||
numPostings = 0;
|
||||
}
|
||||
|
||||
|
@ -1217,15 +1231,20 @@ final class DocumentsWriter {
|
|||
return fieldInfo.name.compareTo(((FieldData) o).fieldInfo.name);
|
||||
}
|
||||
|
||||
/** Collapse the hash table & sort in-place. */
|
||||
public Posting[] sortPostings() {
|
||||
private void compactPostings() {
|
||||
int upto = 0;
|
||||
for(int i=0;i<postingsHashSize;i++)
|
||||
if (postingsHash[i] != null)
|
||||
postingsHash[upto++] = postingsHash[i];
|
||||
|
||||
assert upto == numPostings;
|
||||
doPostingSort(postingsHash, upto);
|
||||
postingsCompacted = true;
|
||||
}
|
||||
|
||||
/** Collapse the hash table & sort in-place. */
|
||||
public Posting[] sortPostings() {
|
||||
compactPostings();
|
||||
doPostingSort(postingsHash, numPostings);
|
||||
return postingsHash;
|
||||
}
|
||||
|
||||
|
@ -1241,26 +1260,29 @@ final class DocumentsWriter {
|
|||
final int limit = fieldCount;
|
||||
final Fieldable[] docFieldsFinal = docFields;
|
||||
|
||||
// Walk through all occurrences in this doc for this field:
|
||||
for(int j=0;j<limit;j++) {
|
||||
Fieldable field = docFieldsFinal[j];
|
||||
// Walk through all occurrences in this doc for this
|
||||
// field:
|
||||
try {
|
||||
for(int j=0;j<limit;j++) {
|
||||
Fieldable field = docFieldsFinal[j];
|
||||
|
||||
if (field.isIndexed())
|
||||
invertField(field, analyzer, maxFieldLength);
|
||||
if (field.isIndexed())
|
||||
invertField(field, analyzer, maxFieldLength);
|
||||
|
||||
if (field.isStored())
|
||||
localFieldsWriter.writeField(fieldInfo, field);
|
||||
if (field.isStored())
|
||||
localFieldsWriter.writeField(fieldInfo, field);
|
||||
|
||||
docFieldsFinal[j] = null;
|
||||
}
|
||||
|
||||
if (postingsVectorsUpto > 0) {
|
||||
// Add term vectors for this field
|
||||
writeVectors(fieldInfo);
|
||||
if (postingsVectorsUpto > maxPostingsVectors)
|
||||
maxPostingsVectors = postingsVectorsUpto;
|
||||
postingsVectorsUpto = 0;
|
||||
vectorsPool.reset();
|
||||
docFieldsFinal[j] = null;
|
||||
}
|
||||
} finally {
|
||||
if (postingsVectorsUpto > 0) {
|
||||
// Add term vectors for this field
|
||||
writeVectors(fieldInfo);
|
||||
if (postingsVectorsUpto > maxPostingsVectors)
|
||||
maxPostingsVectors = postingsVectorsUpto;
|
||||
postingsVectorsUpto = 0;
|
||||
vectorsPool.reset();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1406,6 +1428,8 @@ final class DocumentsWriter {
|
|||
|
||||
int hashPos = code & postingsHashMask;
|
||||
|
||||
assert !postingsCompacted;
|
||||
|
||||
// Locate Posting in hash
|
||||
p = postingsHash[hashPos];
|
||||
|
||||
|
@ -1422,6 +1446,12 @@ final class DocumentsWriter {
|
|||
|
||||
final int proxCode;
|
||||
|
||||
// If we hit an exception below, it's possible the
|
||||
// posting list or term vectors data will be
|
||||
// partially written and thus inconsistent if
|
||||
// flushed, so we have to abort:
|
||||
abortOnExc = true;
|
||||
|
||||
if (p != null) { // term seen since last flush
|
||||
|
||||
if (docID != p.lastDocID) { // term not yet seen in this doc
|
||||
|
@ -1492,6 +1522,7 @@ final class DocumentsWriter {
|
|||
// Just skip this term; we will throw an
|
||||
// exception after processing all accepted
|
||||
// terms in the doc
|
||||
abortOnExc = false;
|
||||
return;
|
||||
}
|
||||
charPool.nextBuffer();
|
||||
|
@ -1572,6 +1603,8 @@ final class DocumentsWriter {
|
|||
vector.lastOffset = offsetEnd;
|
||||
vector.offsetUpto = offsetUpto + (vector.offsetUpto & BYTE_BLOCK_NOT_MASK);
|
||||
}
|
||||
|
||||
abortOnExc = false;
|
||||
}
|
||||
|
||||
/** Called when postings hash is too small (> 50%
|
||||
|
@ -2142,7 +2175,9 @@ final class DocumentsWriter {
|
|||
while(!state.isIdle || pauseThreads != 0 || flushPending)
|
||||
try {
|
||||
wait();
|
||||
} catch (InterruptedException e) {}
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
|
||||
if (segment == null)
|
||||
segment = writer.newSegmentName();
|
||||
|
@ -2165,22 +2200,24 @@ final class DocumentsWriter {
|
|||
boolean success = false;
|
||||
try {
|
||||
state.init(doc, nextDocID++);
|
||||
|
||||
if (delTerm != null) {
|
||||
addDeleteTerm(delTerm, state.docID);
|
||||
if (!state.doFlushAfter)
|
||||
state.doFlushAfter = timeToFlushDeletes();
|
||||
}
|
||||
|
||||
success = true;
|
||||
} finally {
|
||||
if (!success) {
|
||||
state.isIdle = true;
|
||||
if (state.doFlushAfter) {
|
||||
state.doFlushAfter = false;
|
||||
flushPending = false;
|
||||
synchronized(this) {
|
||||
state.isIdle = true;
|
||||
if (state.doFlushAfter) {
|
||||
state.doFlushAfter = false;
|
||||
flushPending = false;
|
||||
}
|
||||
notifyAll();
|
||||
}
|
||||
abort();
|
||||
}
|
||||
}
|
||||
|
||||
if (delTerm != null) {
|
||||
addDeleteTerm(delTerm, state.docID);
|
||||
if (!state.doFlushAfter) {
|
||||
state.doFlushAfter = timeToFlushDeletes();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -2208,15 +2245,22 @@ final class DocumentsWriter {
|
|||
int maxTermHit;
|
||||
try {
|
||||
// This call is not synchronized and does all the work
|
||||
state.processDocument(analyzer);
|
||||
// This call synchronized but fast
|
||||
maxTermHit = state.maxTermHit;
|
||||
finishDocument(state);
|
||||
try {
|
||||
state.processDocument(analyzer);
|
||||
} finally {
|
||||
maxTermHit = state.maxTermHit;
|
||||
// This call synchronized but fast
|
||||
finishDocument(state);
|
||||
}
|
||||
success = true;
|
||||
} finally {
|
||||
if (!success) {
|
||||
state.isIdle = true;
|
||||
abort();
|
||||
synchronized(this) {
|
||||
state.isIdle = true;
|
||||
if (state.abortOnExc)
|
||||
abort();
|
||||
notifyAll();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -2246,7 +2290,9 @@ final class DocumentsWriter {
|
|||
while(pauseThreads != 0 || flushPending)
|
||||
try {
|
||||
wait();
|
||||
} catch (InterruptedException e) {}
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
for (int i = 0; i < terms.length; i++)
|
||||
addDeleteTerm(terms[i], numDocsInRAM);
|
||||
return timeToFlushDeletes();
|
||||
|
@ -2256,7 +2302,9 @@ final class DocumentsWriter {
|
|||
while(pauseThreads != 0 || flushPending)
|
||||
try {
|
||||
wait();
|
||||
} catch (InterruptedException e) {}
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
addDeleteTerm(term, numDocsInRAM);
|
||||
return timeToFlushDeletes();
|
||||
}
|
||||
|
|
|
@ -18,6 +18,7 @@ package org.apache.lucene.index;
|
|||
*/
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.Reader;
|
||||
import java.io.File;
|
||||
import java.util.Arrays;
|
||||
import java.util.Random;
|
||||
|
@ -25,7 +26,12 @@ import java.util.Random;
|
|||
import org.apache.lucene.util.LuceneTestCase;
|
||||
|
||||
import org.apache.lucene.analysis.WhitespaceAnalyzer;
|
||||
import org.apache.lucene.analysis.Analyzer;
|
||||
import org.apache.lucene.analysis.TokenFilter;
|
||||
import org.apache.lucene.analysis.TokenStream;
|
||||
import org.apache.lucene.analysis.standard.StandardAnalyzer;
|
||||
import org.apache.lucene.analysis.standard.StandardTokenizer;
|
||||
import org.apache.lucene.analysis.Token;
|
||||
import org.apache.lucene.document.Document;
|
||||
import org.apache.lucene.document.Field;
|
||||
import org.apache.lucene.search.IndexSearcher;
|
||||
|
@ -1736,4 +1742,108 @@ public class TestIndexWriter extends LuceneTestCase
|
|||
iw.addDocument(document);
|
||||
iw.close();
|
||||
}
|
||||
|
||||
// LUCENE-1072
|
||||
public void testExceptionFromTokenStream() throws IOException {
|
||||
RAMDirectory dir = new MockRAMDirectory();
|
||||
IndexWriter writer = new IndexWriter(dir, new Analyzer() {
|
||||
|
||||
public TokenStream tokenStream(String fieldName, Reader reader) {
|
||||
return new TokenFilter(new StandardTokenizer(reader)) {
|
||||
private int count = 0;
|
||||
|
||||
public Token next() throws IOException {
|
||||
if (count++ == 5) {
|
||||
throw new IOException();
|
||||
}
|
||||
return input.next();
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
}, true);
|
||||
|
||||
Document doc = new Document();
|
||||
String contents = "aa bb cc dd ee ff gg hh ii jj kk";
|
||||
doc.add(new Field("content", contents, Field.Store.NO,
|
||||
Field.Index.TOKENIZED));
|
||||
try {
|
||||
writer.addDocument(doc);
|
||||
fail("did not hit expected exception");
|
||||
} catch (Exception e) {
|
||||
}
|
||||
|
||||
// Make sure we can add another normal document
|
||||
doc = new Document();
|
||||
doc.add(new Field("content", "aa bb cc dd", Field.Store.NO,
|
||||
Field.Index.TOKENIZED));
|
||||
writer.addDocument(doc);
|
||||
|
||||
// Make sure we can add another normal document
|
||||
doc = new Document();
|
||||
doc.add(new Field("content", "aa bb cc dd", Field.Store.NO,
|
||||
Field.Index.TOKENIZED));
|
||||
writer.addDocument(doc);
|
||||
|
||||
writer.close();
|
||||
IndexReader reader = IndexReader.open(dir);
|
||||
assertEquals(reader.docFreq(new Term("content", "aa")), 3);
|
||||
assertEquals(reader.docFreq(new Term("content", "gg")), 0);
|
||||
reader.close();
|
||||
dir.close();
|
||||
}
|
||||
|
||||
private static class FailOnlyOnFlush extends MockRAMDirectory.Failure {
|
||||
boolean doFail = false;
|
||||
int count;
|
||||
|
||||
public void setDoFail() {
|
||||
this.doFail = true;
|
||||
}
|
||||
public void clearDoFail() {
|
||||
this.doFail = false;
|
||||
}
|
||||
|
||||
public void eval(MockRAMDirectory dir) throws IOException {
|
||||
if (doFail) {
|
||||
StackTraceElement[] trace = new Exception().getStackTrace();
|
||||
for (int i = 0; i < trace.length; i++) {
|
||||
if ("appendPostings".equals(trace[i].getMethodName()) && count++ == 30) {
|
||||
doFail = false;
|
||||
throw new IOException("now failing during flush");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// LUCENE-1072: make sure an errant exception on flushing
|
||||
// one segment only takes out those docs in that one flush
|
||||
public void testDocumentsWriterAbort() throws IOException {
|
||||
MockRAMDirectory dir = new MockRAMDirectory();
|
||||
FailOnlyOnFlush failure = new FailOnlyOnFlush();
|
||||
failure.setDoFail();
|
||||
dir.failOn(failure);
|
||||
|
||||
IndexWriter writer = new IndexWriter(dir, new WhitespaceAnalyzer());
|
||||
writer.setMaxBufferedDocs(2);
|
||||
Document doc = new Document();
|
||||
String contents = "aa bb cc dd ee ff gg hh ii jj kk";
|
||||
doc.add(new Field("content", contents, Field.Store.NO,
|
||||
Field.Index.TOKENIZED));
|
||||
boolean hitError = false;
|
||||
for(int i=0;i<200;i++) {
|
||||
try {
|
||||
writer.addDocument(doc);
|
||||
} catch (IOException ioe) {
|
||||
// only one flush should fail:
|
||||
assertFalse(hitError);
|
||||
hitError = true;
|
||||
}
|
||||
}
|
||||
assertTrue(hitError);
|
||||
writer.close();
|
||||
IndexReader reader = IndexReader.open(dir);
|
||||
assertEquals(198, reader.docFreq(new Term("content", "aa")));
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue