diff --git a/src/java/org/apache/lucene/index/DocumentsWriter.java b/src/java/org/apache/lucene/index/DocumentsWriter.java index e1fab496345..04a4d0a49f6 100644 --- a/src/java/org/apache/lucene/index/DocumentsWriter.java +++ b/src/java/org/apache/lucene/index/DocumentsWriter.java @@ -389,6 +389,7 @@ final class DocumentsWriter { try { wait(); } catch (InterruptedException e) { + Thread.currentThread().interrupt(); } } } @@ -521,6 +522,7 @@ final class DocumentsWriter { int maxTermHit; // Set to > 0 if this doc has a too-large term boolean doFlushAfter; + boolean abortOnExc; public ThreadState() { fieldDataArray = new FieldData[8]; @@ -558,6 +560,12 @@ final class DocumentsWriter { * the ThreadState into the "real" stores. */ public void writeDocument() throws IOException { + // If we hit an exception while appending to the + // stored fields or term vectors files, we have to + // abort because it means those files are possibly + // inconsistent. + abortOnExc = true; + // Append stored fields to the real FieldsWriter: fieldsWriter.flushDocument(fdtLocal); fdtLocal.reset(); @@ -581,6 +589,7 @@ final class DocumentsWriter { tvfLocal.reset(); } } + abortOnExc = false; // Append norms for the fields we saw: for(int i=0;i 0) { - // Add term vectors for this field - writeVectors(fieldInfo); - if (postingsVectorsUpto > maxPostingsVectors) - maxPostingsVectors = postingsVectorsUpto; - postingsVectorsUpto = 0; - vectorsPool.reset(); + docFieldsFinal[j] = null; + } + } finally { + if (postingsVectorsUpto > 0) { + // Add term vectors for this field + writeVectors(fieldInfo); + if (postingsVectorsUpto > maxPostingsVectors) + maxPostingsVectors = postingsVectorsUpto; + postingsVectorsUpto = 0; + vectorsPool.reset(); + } } } @@ -1406,6 +1428,8 @@ final class DocumentsWriter { int hashPos = code & postingsHashMask; + assert !postingsCompacted; + // Locate Posting in hash p = postingsHash[hashPos]; @@ -1422,6 +1446,12 @@ final class DocumentsWriter { final int proxCode; + // If we hit an exception below, it's possible the + // posting list or term vectors data will be + // partially written and thus inconsistent if + // flushed, so we have to abort: + abortOnExc = true; + if (p != null) { // term seen since last flush if (docID != p.lastDocID) { // term not yet seen in this doc @@ -1492,6 +1522,7 @@ final class DocumentsWriter { // Just skip this term; we will throw an // exception after processing all accepted // terms in the doc + abortOnExc = false; return; } charPool.nextBuffer(); @@ -1572,6 +1603,8 @@ final class DocumentsWriter { vector.lastOffset = offsetEnd; vector.offsetUpto = offsetUpto + (vector.offsetUpto & BYTE_BLOCK_NOT_MASK); } + + abortOnExc = false; } /** Called when postings hash is too small (> 50% @@ -2142,7 +2175,9 @@ final class DocumentsWriter { while(!state.isIdle || pauseThreads != 0 || flushPending) try { wait(); - } catch (InterruptedException e) {} + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } if (segment == null) segment = writer.newSegmentName(); @@ -2165,22 +2200,24 @@ final class DocumentsWriter { boolean success = false; try { state.init(doc, nextDocID++); + + if (delTerm != null) { + addDeleteTerm(delTerm, state.docID); + if (!state.doFlushAfter) + state.doFlushAfter = timeToFlushDeletes(); + } + success = true; } finally { if (!success) { - state.isIdle = true; - if (state.doFlushAfter) { - state.doFlushAfter = false; - flushPending = false; + synchronized(this) { + state.isIdle = true; + if (state.doFlushAfter) { + state.doFlushAfter = false; + flushPending = false; + } + notifyAll(); } - abort(); - } - } - - if (delTerm != null) { - addDeleteTerm(delTerm, state.docID); - if (!state.doFlushAfter) { - state.doFlushAfter = timeToFlushDeletes(); } } @@ -2208,15 +2245,22 @@ final class DocumentsWriter { int maxTermHit; try { // This call is not synchronized and does all the work - state.processDocument(analyzer); - // This call synchronized but fast - maxTermHit = state.maxTermHit; - finishDocument(state); + try { + state.processDocument(analyzer); + } finally { + maxTermHit = state.maxTermHit; + // This call synchronized but fast + finishDocument(state); + } success = true; } finally { if (!success) { - state.isIdle = true; - abort(); + synchronized(this) { + state.isIdle = true; + if (state.abortOnExc) + abort(); + notifyAll(); + } } } @@ -2246,7 +2290,9 @@ final class DocumentsWriter { while(pauseThreads != 0 || flushPending) try { wait(); - } catch (InterruptedException e) {} + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } for (int i = 0; i < terms.length; i++) addDeleteTerm(terms[i], numDocsInRAM); return timeToFlushDeletes(); @@ -2256,7 +2302,9 @@ final class DocumentsWriter { while(pauseThreads != 0 || flushPending) try { wait(); - } catch (InterruptedException e) {} + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } addDeleteTerm(term, numDocsInRAM); return timeToFlushDeletes(); } diff --git a/src/test/org/apache/lucene/index/TestIndexWriter.java b/src/test/org/apache/lucene/index/TestIndexWriter.java index 0e56cd6c36f..077f417d28f 100644 --- a/src/test/org/apache/lucene/index/TestIndexWriter.java +++ b/src/test/org/apache/lucene/index/TestIndexWriter.java @@ -18,6 +18,7 @@ package org.apache.lucene.index; */ import java.io.IOException; +import java.io.Reader; import java.io.File; import java.util.Arrays; import java.util.Random; @@ -25,7 +26,12 @@ import java.util.Random; import org.apache.lucene.util.LuceneTestCase; import org.apache.lucene.analysis.WhitespaceAnalyzer; +import org.apache.lucene.analysis.Analyzer; +import org.apache.lucene.analysis.TokenFilter; +import org.apache.lucene.analysis.TokenStream; import org.apache.lucene.analysis.standard.StandardAnalyzer; +import org.apache.lucene.analysis.standard.StandardTokenizer; +import org.apache.lucene.analysis.Token; import org.apache.lucene.document.Document; import org.apache.lucene.document.Field; import org.apache.lucene.search.IndexSearcher; @@ -1736,4 +1742,108 @@ public class TestIndexWriter extends LuceneTestCase iw.addDocument(document); iw.close(); } + + // LUCENE-1072 + public void testExceptionFromTokenStream() throws IOException { + RAMDirectory dir = new MockRAMDirectory(); + IndexWriter writer = new IndexWriter(dir, new Analyzer() { + + public TokenStream tokenStream(String fieldName, Reader reader) { + return new TokenFilter(new StandardTokenizer(reader)) { + private int count = 0; + + public Token next() throws IOException { + if (count++ == 5) { + throw new IOException(); + } + return input.next(); + } + }; + } + + }, true); + + Document doc = new Document(); + String contents = "aa bb cc dd ee ff gg hh ii jj kk"; + doc.add(new Field("content", contents, Field.Store.NO, + Field.Index.TOKENIZED)); + try { + writer.addDocument(doc); + fail("did not hit expected exception"); + } catch (Exception e) { + } + + // Make sure we can add another normal document + doc = new Document(); + doc.add(new Field("content", "aa bb cc dd", Field.Store.NO, + Field.Index.TOKENIZED)); + writer.addDocument(doc); + + // Make sure we can add another normal document + doc = new Document(); + doc.add(new Field("content", "aa bb cc dd", Field.Store.NO, + Field.Index.TOKENIZED)); + writer.addDocument(doc); + + writer.close(); + IndexReader reader = IndexReader.open(dir); + assertEquals(reader.docFreq(new Term("content", "aa")), 3); + assertEquals(reader.docFreq(new Term("content", "gg")), 0); + reader.close(); + dir.close(); + } + + private static class FailOnlyOnFlush extends MockRAMDirectory.Failure { + boolean doFail = false; + int count; + + public void setDoFail() { + this.doFail = true; + } + public void clearDoFail() { + this.doFail = false; + } + + public void eval(MockRAMDirectory dir) throws IOException { + if (doFail) { + StackTraceElement[] trace = new Exception().getStackTrace(); + for (int i = 0; i < trace.length; i++) { + if ("appendPostings".equals(trace[i].getMethodName()) && count++ == 30) { + doFail = false; + throw new IOException("now failing during flush"); + } + } + } + } + } + + // LUCENE-1072: make sure an errant exception on flushing + // one segment only takes out those docs in that one flush + public void testDocumentsWriterAbort() throws IOException { + MockRAMDirectory dir = new MockRAMDirectory(); + FailOnlyOnFlush failure = new FailOnlyOnFlush(); + failure.setDoFail(); + dir.failOn(failure); + + IndexWriter writer = new IndexWriter(dir, new WhitespaceAnalyzer()); + writer.setMaxBufferedDocs(2); + Document doc = new Document(); + String contents = "aa bb cc dd ee ff gg hh ii jj kk"; + doc.add(new Field("content", contents, Field.Store.NO, + Field.Index.TOKENIZED)); + boolean hitError = false; + for(int i=0;i<200;i++) { + try { + writer.addDocument(doc); + } catch (IOException ioe) { + // only one flush should fail: + assertFalse(hitError); + hitError = true; + } + } + assertTrue(hitError); + writer.close(); + IndexReader reader = IndexReader.open(dir); + assertEquals(198, reader.docFreq(new Term("content", "aa"))); + } }