LUCENE-1130: fix thread safety issues when hitting IOExceptions in DocumentsWriter

git-svn-id: https://svn.apache.org/repos/asf/lucene/java/trunk@611855 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael McCandless 2008-01-14 17:06:21 +00:00
parent a43f312375
commit f7740afe84
7 changed files with 956 additions and 351 deletions

View File

@ -251,7 +251,7 @@ public class ConcurrentMergeScheduler extends MergeScheduler {
message(" merge thread: done"); message(" merge thread: done");
} catch (IOException exc) { } catch (Throwable exc) {
if (merge != null) { if (merge != null) {
merge.setException(exc); merge.setException(exc);

View File

@ -145,11 +145,12 @@ final class DocumentsWriter {
private ThreadState[] threadStates = new ThreadState[0]; private ThreadState[] threadStates = new ThreadState[0];
private final HashMap threadBindings = new HashMap(); private final HashMap threadBindings = new HashMap();
private int numWaiting; private int numWaiting;
private ThreadState[] waitingThreadStates = new ThreadState[1]; private final ThreadState[] waitingThreadStates = new ThreadState[MAX_THREAD_STATE];
private int pauseThreads; // Non-zero when we need all threads to private int pauseThreads; // Non-zero when we need all threads to
// pause (eg to flush) // pause (eg to flush)
private boolean flushPending; // True when a thread has decided to flush private boolean flushPending; // True when a thread has decided to flush
private boolean bufferIsFull; // True when it's time to write segment private boolean bufferIsFull; // True when it's time to write segment
private boolean aborting; // True while abort is running
private PrintStream infoStream; private PrintStream infoStream;
@ -321,10 +322,25 @@ final class DocumentsWriter {
return files; return files;
} }
synchronized void setAborting() {
aborting = true;
}
/** Called if we hit an exception when adding docs, /** Called if we hit an exception when adding docs,
* flushing, etc. This resets our state, discarding any * flushing, etc. This resets our state, discarding any
* docs added since last flush. */ * docs added since last flush. If ae is non-null, it
synchronized void abort() throws IOException { * contains the root cause exception (which we re-throw
* after we are done aborting). */
synchronized void abort(AbortException ae) throws IOException {
// Anywhere that throws an AbortException must first
// mark aborting to make sure while the exception is
// unwinding the un-synchronized stack, no thread grabs
// the corrupt ThreadState that hit the aborting
// exception:
assert ae == null || aborting;
try {
if (infoStream != null) if (infoStream != null)
infoStream.println("docWriter: now abort"); infoStream.println("docWriter: now abort");
@ -334,14 +350,17 @@ final class DocumentsWriter {
waitingThreadStates[i].isIdle = true; waitingThreadStates[i].isIdle = true;
numWaiting = 0; numWaiting = 0;
// Wait for all other threads to finish with DocumentsWriter:
pauseAllThreads(); pauseAllThreads();
assert 0 == numWaiting;
try {
bufferedDeleteTerms.clear(); bufferedDeleteTerms.clear();
bufferedDeleteDocIDs.clear(); bufferedDeleteDocIDs.clear();
numBufferedDeleteTerms = 0; numBufferedDeleteTerms = 0;
try {
abortedFiles = files(); abortedFiles = files();
// Discard pending norms: // Discard pending norms:
@ -359,21 +378,36 @@ final class DocumentsWriter {
// Reset vectors writer // Reset vectors writer
if (tvx != null) { if (tvx != null) {
try {
tvx.close(); tvx.close();
tvf.close(); } catch (IOException ioe) {
tvd.close(); }
tvx = null; tvx = null;
} }
if (tvd != null) {
try {
tvd.close();
} catch (IOException ioe) {
}
tvd = null;
}
if (tvf != null) {
try {
tvf.close();
} catch (IOException ioe) {
}
tvf = null;
}
// Reset fields writer // Reset fields writer
if (fieldsWriter != null) { if (fieldsWriter != null) {
try {
fieldsWriter.close(); fieldsWriter.close();
} catch (IOException ioe) {
}
fieldsWriter = null; fieldsWriter = null;
} }
// Reset all postings data
resetPostingsData();
// Clear vectors & fields from ThreadStates // Clear vectors & fields from ThreadStates
for(int i=0;i<threadStates.length;i++) { for(int i=0;i<threadStates.length;i++) {
ThreadState state = threadStates[i]; ThreadState state = threadStates[i];
@ -384,12 +418,34 @@ final class DocumentsWriter {
state.tvfLocal.reset(); state.tvfLocal.reset();
state.fdtLocal.reset(); state.fdtLocal.reset();
} }
// Reset all postings data
resetPostingsData();
docStoreSegment = null; docStoreSegment = null;
files = null; files = null;
} finally { } finally {
resumeAllThreads(); resumeAllThreads();
} }
// If we have a root cause exception, re-throw it now:
if (ae != null) {
Throwable t = ae.getCause();
if (t instanceof IOException)
throw (IOException) t;
else if (t instanceof RuntimeException)
throw (RuntimeException) t;
else if (t instanceof Error)
throw (Error) t;
else
// Should not get here
assert false: "unknown exception: " + t;
}
} finally {
aborting = false;
notifyAll();
}
} }
/** Reset after a flush */ /** Reset after a flush */
@ -412,9 +468,9 @@ final class DocumentsWriter {
files = null; files = null;
} }
synchronized void pauseAllThreads() { // Returns true if an abort is in progress
synchronized boolean pauseAllThreads() {
pauseThreads++; pauseThreads++;
if (1 == pauseThreads) {
while(!allThreadsIdle()) { while(!allThreadsIdle()) {
try { try {
wait(); wait();
@ -422,7 +478,7 @@ final class DocumentsWriter {
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
} }
} }
} return aborting;
} }
synchronized void resumeAllThreads() { synchronized void resumeAllThreads() {
@ -444,7 +500,7 @@ final class DocumentsWriter {
List newFiles; List newFiles;
/** Flush all pending docs to a new segment */ /** Flush all pending docs to a new segment */
int flush(boolean closeDocStore) throws IOException { synchronized int flush(boolean closeDocStore) throws IOException {
assert allThreadsIdle(); assert allThreadsIdle();
@ -456,13 +512,6 @@ final class DocumentsWriter {
docStoreOffset = numDocsInStore; docStoreOffset = numDocsInStore;
if (closeDocStore) {
assert docStoreSegment != null;
assert docStoreSegment.equals(segment);
newFiles.addAll(files());
closeDocStore();
}
int docCount; int docCount;
assert numDocsInRAM > 0; assert numDocsInRAM > 0;
@ -474,6 +523,13 @@ final class DocumentsWriter {
try { try {
if (closeDocStore) {
assert docStoreSegment != null;
assert docStoreSegment.equals(segment);
newFiles.addAll(files());
closeDocStore();
}
fieldInfos.write(directory, segment + ".fnm"); fieldInfos.write(directory, segment + ".fnm");
docCount = numDocsInRAM; docCount = numDocsInRAM;
@ -484,7 +540,7 @@ final class DocumentsWriter {
} finally { } finally {
if (!success) if (!success)
abort(); abort(null);
} }
return docCount; return docCount;
@ -553,7 +609,6 @@ final class DocumentsWriter {
// doc has one // doc has one
boolean doFlushAfter; boolean doFlushAfter;
boolean abortOnExc;
public ThreadState() { public ThreadState() {
fieldDataArray = new FieldData[8]; fieldDataArray = new FieldData[8];
@ -574,6 +629,7 @@ final class DocumentsWriter {
localFieldsWriter.close(); localFieldsWriter.close();
localFieldsWriter = null; localFieldsWriter = null;
} }
fieldGen = 0;
maxPostingsVectors = 0; maxPostingsVectors = 0;
doFlushAfter = false; doFlushAfter = false;
postingsPool.reset(); postingsPool.reset();
@ -589,18 +645,17 @@ final class DocumentsWriter {
/** Move all per-document state that was accumulated in /** Move all per-document state that was accumulated in
* the ThreadState into the "real" stores. */ * the ThreadState into the "real" stores. */
public void writeDocument() throws IOException { public void writeDocument() throws IOException, AbortException {
// If we hit an exception while appending to the // If we hit an exception while appending to the
// stored fields or term vectors files, we have to // stored fields or term vectors files, we have to
// abort all documents since we last flushed because // abort all documents since we last flushed because
// it means those files are possibly inconsistent. // it means those files are possibly inconsistent.
abortOnExc = true; try {
// Append stored fields to the real FieldsWriter: // Append stored fields to the real FieldsWriter:
fieldsWriter.flushDocument(numStoredFields, fdtLocal); fieldsWriter.flushDocument(numStoredFields, fdtLocal);
fdtLocal.reset(); fdtLocal.reset();
numStoredFields = 0;
// Append term vectors to the real outputs: // Append term vectors to the real outputs:
if (tvx != null) { if (tvx != null) {
@ -634,7 +689,12 @@ final class DocumentsWriter {
bn.add(norm); bn.add(norm);
} }
} }
abortOnExc = false; } catch (Throwable t) {
// Forcefully idle this threadstate -- its state will
// be reset by abort()
isIdle = true;
throw new AbortException(t, DocumentsWriter.this);
}
if (bufferIsFull && !flushPending) { if (bufferIsFull && !flushPending) {
flushPending = true; flushPending = true;
@ -642,10 +702,13 @@ final class DocumentsWriter {
} }
} }
/** Initializes shared state for this new document */ int fieldGen;
void init(Document doc, int docID) throws IOException {
/** Initializes shared state for this new document */
void init(Document doc, int docID) throws IOException, AbortException {
assert !isIdle;
abortOnExc = false;
this.docID = docID; this.docID = docID;
docBoost = doc.getBoost(); docBoost = doc.getBoost();
numStoredFields = 0; numStoredFields = 0;
@ -654,7 +717,10 @@ final class DocumentsWriter {
maxTermPrefix = null; maxTermPrefix = null;
assert 0 == fdtLocal.length(); assert 0 == fdtLocal.length();
assert 0 == fdtLocal.getFilePointer();
assert 0 == tvfLocal.length(); assert 0 == tvfLocal.length();
assert 0 == tvfLocal.getFilePointer();
final int thisFieldGen = fieldGen++;
List docFields = doc.getFields(); List docFields = doc.getFields();
final int numDocFields = docFields.size(); final int numDocFields = docFields.size();
@ -700,36 +766,37 @@ final class DocumentsWriter {
if (numAllFieldData == allFieldDataArray.length) { if (numAllFieldData == allFieldDataArray.length) {
int newSize = (int) (allFieldDataArray.length*1.5); int newSize = (int) (allFieldDataArray.length*1.5);
int newHashSize = fieldDataHash.length*2;
FieldData newArray[] = new FieldData[newSize]; FieldData newArray[] = new FieldData[newSize];
FieldData newHashArray[] = new FieldData[newHashSize];
System.arraycopy(allFieldDataArray, 0, newArray, 0, numAllFieldData); System.arraycopy(allFieldDataArray, 0, newArray, 0, numAllFieldData);
allFieldDataArray = newArray;
// Rehash // Rehash
newSize = fieldDataHash.length*2;
newArray = new FieldData[newSize];
fieldDataHashMask = newSize-1; fieldDataHashMask = newSize-1;
for(int j=0;j<fieldDataHash.length;j++) { for(int j=0;j<fieldDataHash.length;j++) {
FieldData fp0 = fieldDataHash[j]; FieldData fp0 = fieldDataHash[j];
while(fp0 != null) { while(fp0 != null) {
hashPos = fp0.fieldInfo.name.hashCode() & fieldDataHashMask; hashPos = fp0.fieldInfo.name.hashCode() & fieldDataHashMask;
FieldData nextFP0 = fp0.next; FieldData nextFP0 = fp0.next;
fp0.next = newArray[hashPos]; fp0.next = newHashArray[hashPos];
newArray[hashPos] = fp0; newHashArray[hashPos] = fp0;
fp0 = nextFP0; fp0 = nextFP0;
} }
} }
fieldDataHash = newArray;
allFieldDataArray = newArray;
fieldDataHash = newHashArray;
} }
allFieldDataArray[numAllFieldData++] = fp; allFieldDataArray[numAllFieldData++] = fp;
} else { } else {
assert fp.fieldInfo == fi; assert fp.fieldInfo == fi;
} }
if (docID != fp.lastDocID) { if (thisFieldGen != fp.lastGen) {
// First time we're seeing this field for this doc // First time we're seeing this field for this doc
fp.lastDocID = docID; fp.lastGen = thisFieldGen;
fp.fieldCount = 0; fp.fieldCount = 0;
fp.doVectors = fp.doVectorPositions = fp.doVectorOffsets = false; fp.doVectors = fp.doVectorPositions = fp.doVectorOffsets = false;
fp.doNorms = fi.isIndexed && !fi.omitNorms; fp.doNorms = fi.isIndexed && !fi.omitNorms;
@ -776,7 +843,15 @@ final class DocumentsWriter {
assert docStoreSegment == null; assert docStoreSegment == null;
assert segment != null; assert segment != null;
docStoreSegment = segment; docStoreSegment = segment;
// If we hit an exception while init'ing the
// fieldsWriter, we must abort this segment
// because those files will be in an unknown
// state:
try {
fieldsWriter = new FieldsWriter(directory, docStoreSegment, fieldInfos); fieldsWriter = new FieldsWriter(directory, docStoreSegment, fieldInfos);
} catch (Throwable t) {
throw new AbortException(t, DocumentsWriter.this);
}
files = null; files = null;
} }
localFieldsWriter = new FieldsWriter(null, fdtLocal, fieldInfos); localFieldsWriter = new FieldsWriter(null, fdtLocal, fieldInfos);
@ -787,18 +862,26 @@ final class DocumentsWriter {
if (docHasVectors) { if (docHasVectors) {
if (tvx == null) { if (tvx == null) {
assert docStoreSegment != null; assert docStoreSegment != null;
// If we hit an exception while init'ing the term
// vector output files, we must abort this segment
// because those files will be in an unknown
// state:
try {
tvx = directory.createOutput(docStoreSegment + "." + IndexFileNames.VECTORS_INDEX_EXTENSION); tvx = directory.createOutput(docStoreSegment + "." + IndexFileNames.VECTORS_INDEX_EXTENSION);
tvx.writeInt(TermVectorsReader.FORMAT_VERSION); tvx.writeInt(TermVectorsReader.FORMAT_VERSION);
tvd = directory.createOutput(docStoreSegment + "." + IndexFileNames.VECTORS_DOCUMENTS_EXTENSION); tvd = directory.createOutput(docStoreSegment + "." + IndexFileNames.VECTORS_DOCUMENTS_EXTENSION);
tvd.writeInt(TermVectorsReader.FORMAT_VERSION); tvd.writeInt(TermVectorsReader.FORMAT_VERSION);
tvf = directory.createOutput(docStoreSegment + "." + IndexFileNames.VECTORS_FIELDS_EXTENSION); tvf = directory.createOutput(docStoreSegment + "." + IndexFileNames.VECTORS_FIELDS_EXTENSION);
tvf.writeInt(TermVectorsReader.FORMAT_VERSION); tvf.writeInt(TermVectorsReader.FORMAT_VERSION);
files = null;
// We must "catch up" for all docIDs that had no // We must "catch up" for all docIDs that had no
// vectors before this one // vectors before this one
for(int i=0;i<docID;i++) for(int i=0;i<docID;i++)
tvx.writeLong(0); tvx.writeLong(0);
} catch (Throwable t) {
throw new AbortException(t, DocumentsWriter.this);
}
files = null;
} }
numVectorFields = 0; numVectorFields = 0;
@ -929,7 +1012,7 @@ final class DocumentsWriter {
int upto = 0; int upto = 0;
for(int i=0;i<numAllFieldData;i++) { for(int i=0;i<numAllFieldData;i++) {
FieldData fp = allFieldDataArray[i]; FieldData fp = allFieldDataArray[i];
if (fp.lastDocID == -1) { if (fp.lastGen == -1) {
// This field was not seen since the previous // This field was not seen since the previous
// flush, so, free up its resources now // flush, so, free up its resources now
@ -953,7 +1036,7 @@ final class DocumentsWriter {
} else { } else {
// Reset // Reset
fp.lastDocID = -1; fp.lastGen = -1;
allFieldDataArray[upto++] = fp; allFieldDataArray[upto++] = fp;
if (fp.numPostings > 0 && ((float) fp.numPostings) / fp.postingsHashSize < 0.2) { if (fp.numPostings > 0 && ((float) fp.numPostings) / fp.postingsHashSize < 0.2) {
@ -996,7 +1079,7 @@ final class DocumentsWriter {
/** Tokenizes the fields of a document into Postings */ /** Tokenizes the fields of a document into Postings */
void processDocument(Analyzer analyzer) void processDocument(Analyzer analyzer)
throws IOException { throws IOException, AbortException {
final int numFields = numFieldData; final int numFields = numFieldData;
@ -1215,7 +1298,7 @@ final class DocumentsWriter {
int fieldCount; int fieldCount;
Fieldable[] docFields = new Fieldable[1]; Fieldable[] docFields = new Fieldable[1];
int lastDocID = -1; int lastGen = -1;
FieldData next; FieldData next;
boolean doNorms; boolean doNorms;
@ -1284,7 +1367,7 @@ final class DocumentsWriter {
} }
/** Process all occurrences of one field in the document. */ /** Process all occurrences of one field in the document. */
public void processField(Analyzer analyzer) throws IOException { public void processField(Analyzer analyzer) throws IOException, AbortException {
length = 0; length = 0;
position = 0; position = 0;
offset = 0; offset = 0;
@ -1316,12 +1399,10 @@ final class DocumentsWriter {
// contents of fdtLocal can be corrupt, so // contents of fdtLocal can be corrupt, so
// we must discard all stored fields for // we must discard all stored fields for
// this document: // this document:
if (!success) { if (!success)
numStoredFields = 0;
fdtLocal.reset(); fdtLocal.reset();
} }
} }
}
docFieldsFinal[j] = null; docFieldsFinal[j] = null;
} }
@ -1354,7 +1435,7 @@ final class DocumentsWriter {
Token localToken = new Token(); Token localToken = new Token();
/* Invert one occurrence of one field in the document */ /* Invert one occurrence of one field in the document */
public void invertField(Fieldable field, Analyzer analyzer, final int maxFieldLength) throws IOException { public void invertField(Fieldable field, Analyzer analyzer, final int maxFieldLength) throws IOException, AbortException {
if (length>0) if (length>0)
position += analyzer.getPositionIncrementGap(fieldInfo.name); position += analyzer.getPositionIncrementGap(fieldInfo.name);
@ -1475,7 +1556,7 @@ final class DocumentsWriter {
* for every term of every document. Its job is to * * for every term of every document. Its job is to *
* update the postings byte stream (Postings hash) * * update the postings byte stream (Postings hash) *
* based on the occurence of a single term. */ * based on the occurence of a single term. */
private void addPosition(Token token) { private void addPosition(Token token) throws AbortException {
final Payload payload = token.getPayload(); final Payload payload = token.getPayload();
@ -1519,7 +1600,8 @@ final class DocumentsWriter {
// partially written and thus inconsistent if // partially written and thus inconsistent if
// flushed, so we have to abort all documents // flushed, so we have to abort all documents
// since the last flush: // since the last flush:
abortOnExc = true;
try {
if (p != null) { // term seen since last flush if (p != null) { // term seen since last flush
@ -1592,7 +1674,6 @@ final class DocumentsWriter {
// can be inserted into the analyzer chain if // can be inserted into the analyzer chain if
// other behavior is wanted (pruning the term // other behavior is wanted (pruning the term
// to a prefix, throwing an exception, etc). // to a prefix, throwing an exception, etc).
abortOnExc = false;
if (maxTermPrefix == null) if (maxTermPrefix == null)
maxTermPrefix = new String(tokenText, 0, 30); maxTermPrefix = new String(tokenText, 0, 30);
@ -1678,8 +1759,9 @@ final class DocumentsWriter {
vector.lastOffset = offsetEnd; vector.lastOffset = offsetEnd;
vector.offsetUpto = offsetUpto + (vector.offsetUpto & BYTE_BLOCK_NOT_MASK); vector.offsetUpto = offsetUpto + (vector.offsetUpto & BYTE_BLOCK_NOT_MASK);
} }
} catch (Throwable t) {
abortOnExc = false; throw new AbortException(t, DocumentsWriter.this);
}
} }
/** Called when postings hash is too small (> 50% /** Called when postings hash is too small (> 50%
@ -2209,6 +2291,7 @@ final class DocumentsWriter {
synchronized void close() { synchronized void close() {
closed = true; closed = true;
notifyAll();
} }
/** Returns a free (idle) ThreadState that may be used for /** Returns a free (idle) ThreadState that may be used for
@ -2247,7 +2330,7 @@ final class DocumentsWriter {
// Next, wait until my thread state is idle (in case // Next, wait until my thread state is idle (in case
// it's shared with other threads) and for threads to // it's shared with other threads) and for threads to
// not be paused nor a flush pending: // not be paused nor a flush pending:
while(!state.isIdle || pauseThreads != 0 || flushPending) while(!closed && (!state.isIdle || pauseThreads != 0 || flushPending || aborting))
try { try {
wait(); wait();
} catch (InterruptedException e) { } catch (InterruptedException e) {
@ -2275,28 +2358,31 @@ final class DocumentsWriter {
state.isIdle = false; state.isIdle = false;
try {
boolean success = false; boolean success = false;
try { try {
state.init(doc, nextDocID++); state.init(doc, nextDocID);
if (delTerm != null) { if (delTerm != null) {
addDeleteTerm(delTerm, state.docID); addDeleteTerm(delTerm, state.docID);
if (!state.doFlushAfter) if (!state.doFlushAfter)
state.doFlushAfter = timeToFlushDeletes(); state.doFlushAfter = timeToFlushDeletes();
} }
// Only increment nextDocID on successful init
nextDocID++;
success = true; success = true;
} finally { } finally {
if (!success) { if (!success) {
synchronized(this) { // Forcefully idle this ThreadState:
state.isIdle = true; state.isIdle = true;
notifyAll();
if (state.doFlushAfter) { if (state.doFlushAfter) {
state.doFlushAfter = false; state.doFlushAfter = false;
flushPending = false; flushPending = false;
} }
notifyAll();
} }
} }
} catch (AbortException ae) {
abort(ae);
} }
return state; return state;
@ -2319,6 +2405,7 @@ final class DocumentsWriter {
// This call is synchronized but fast // This call is synchronized but fast
final ThreadState state = getThreadState(doc, delTerm); final ThreadState state = getThreadState(doc, delTerm);
try {
boolean success = false; boolean success = false;
try { try {
try { try {
@ -2332,20 +2419,17 @@ final class DocumentsWriter {
} finally { } finally {
if (!success) { if (!success) {
synchronized(this) { synchronized(this) {
state.isIdle = true;
if (state.abortOnExc)
// Abort all buffered docs since last flush
abort();
else
// Immediately mark this document as deleted // Immediately mark this document as deleted
// since likely it was partially added. This // since likely it was partially added. This
// keeps indexing as "all or none" (atomic) when // keeps indexing as "all or none" (atomic) when
// adding a document: // adding a document:
addDeleteDocID(state.docID); addDeleteDocID(state.docID);
notifyAll();
} }
} }
} }
} catch (AbortException ae) {
abort(ae);
}
return state.doFlushAfter || timeToFlushDeletes(); return state.doFlushAfter || timeToFlushDeletes();
} }
@ -2467,51 +2551,57 @@ final class DocumentsWriter {
/** Does the synchronized work to finish/flush the /** Does the synchronized work to finish/flush the
* inverted document. */ * inverted document. */
private synchronized void finishDocument(ThreadState state) throws IOException { private synchronized void finishDocument(ThreadState state) throws IOException, AbortException {
if (aborting) {
// Forcefully idle this threadstate -- its state will
// be reset by abort()
state.isIdle = true;
notifyAll();
return;
}
// Now write the indexed document to the real files. // Now write the indexed document to the real files.
if (nextWriteDocID == state.docID) { if (nextWriteDocID == state.docID) {
// It's my turn, so write everything now: // It's my turn, so write everything now:
state.isIdle = true;
nextWriteDocID++; nextWriteDocID++;
state.writeDocument(); state.writeDocument();
state.isIdle = true;
notifyAll();
// If any states were waiting on me, sweep through and // If any states were waiting on me, sweep through and
// flush those that are enabled by my write. // flush those that are enabled by my write.
if (numWaiting > 0) { if (numWaiting > 0) {
while(true) { boolean any = true;
int upto = 0; while(any) {
for(int i=0;i<numWaiting;i++) { any = false;
ThreadState s = waitingThreadStates[i]; for(int i=0;i<numWaiting;) {
final ThreadState s = waitingThreadStates[i];
if (s.docID == nextWriteDocID) { if (s.docID == nextWriteDocID) {
s.writeDocument();
s.isIdle = true; s.isIdle = true;
nextWriteDocID++; nextWriteDocID++;
s.writeDocument(); any = true;
} else if (numWaiting > i+1)
// Compact as we go // Swap in the last waiting state to fill in
waitingThreadStates[upto++] = waitingThreadStates[i]; // the hole we just created. It's important
} // to do this as-we-go and not at the end of
if (upto == numWaiting) // the loop, because if we hit an aborting
break; // exception in one of the s.writeDocument
numWaiting = upto; // calls (above), it leaves this array in an
// inconsistent state:
waitingThreadStates[i] = waitingThreadStates[numWaiting-1];
numWaiting--;
} else {
assert !s.isIdle;
i++;
}
}
} }
} }
// Now notify any incoming calls to addDocument
// (above) that are waiting on our line to
// shrink
notifyAll();
} else { } else {
// Another thread got a docID before me, but, it // Another thread got a docID before me, but, it
// hasn't finished its processing. So add myself to // hasn't finished its processing. So add myself to
// the line but don't hold up this thread. // the line but don't hold up this thread.
if (numWaiting == waitingThreadStates.length) {
ThreadState[] newWaiting = new ThreadState[2*waitingThreadStates.length];
System.arraycopy(waitingThreadStates, 0, newWaiting, 0, numWaiting);
waitingThreadStates = newWaiting;
}
waitingThreadStates[numWaiting++] = state; waitingThreadStates[numWaiting++] = state;
} }
} }
@ -3137,3 +3227,12 @@ final class DocumentsWriter {
int posUpto; // Next write address for positions int posUpto; // Next write address for positions
} }
} }
// Used only internally to DW to call abort "up the stack"
class AbortException extends IOException {
public AbortException(Throwable cause, DocumentsWriter docWriter) {
super();
initCause(cause);
docWriter.setAborting();
}
}

View File

@ -1278,7 +1278,7 @@ public class IndexWriter {
if (!success) { if (!success) {
if (infoStream != null) if (infoStream != null)
message("hit exception closing doc store segment"); message("hit exception closing doc store segment");
docWriter.abort(); docWriter.abort(null);
} }
} }
@ -1999,7 +1999,7 @@ public class IndexWriter {
segmentInfos.clear(); segmentInfos.clear();
segmentInfos.addAll(rollbackSegmentInfos); segmentInfos.addAll(rollbackSegmentInfos);
docWriter.abort(); docWriter.abort(null);
// Ask deleter to locate unreferenced files & remove // Ask deleter to locate unreferenced files & remove
// them: // them:
@ -2401,7 +2401,13 @@ public class IndexWriter {
private synchronized final boolean doFlush(boolean flushDocStores) throws CorruptIndexException, IOException { private synchronized final boolean doFlush(boolean flushDocStores) throws CorruptIndexException, IOException {
// Make sure no threads are actively adding a document // Make sure no threads are actively adding a document
docWriter.pauseAllThreads();
// Returns true if docWriter is currently aborting, in
// which case we skip flushing this segment
if (docWriter.pauseAllThreads()) {
docWriter.resumeAllThreads();
return false;
}
try { try {
@ -2536,7 +2542,7 @@ public class IndexWriter {
segmentInfos.remove(segmentInfos.size()-1); segmentInfos.remove(segmentInfos.size()-1);
} }
if (flushDocs) if (flushDocs)
docWriter.abort(); docWriter.abort(null);
deletePartialSegmentsFile(); deletePartialSegmentsFile();
deleter.checkpoint(segmentInfos, false); deleter.checkpoint(segmentInfos, false);

View File

@ -1887,6 +1887,11 @@ public class TestIndexWriter extends LuceneTestCase
throw new IOException("I'm experiencing problems"); throw new IOException("I'm experiencing problems");
return input.next(result); return input.next(result);
} }
public void reset() throws IOException {
super.reset();
count = 0;
}
} }
public void testDocumentsWriterExceptions() throws IOException { public void testDocumentsWriterExceptions() throws IOException {
@ -1969,6 +1974,122 @@ public class TestIndexWriter extends LuceneTestCase
} }
} }
public void testDocumentsWriterExceptionThreads() throws IOException {
Analyzer analyzer = new Analyzer() {
public TokenStream tokenStream(String fieldName, Reader reader) {
return new CrashingFilter(fieldName, new WhitespaceTokenizer(reader));
}
};
final int NUM_THREAD = 3;
final int NUM_ITER = 100;
for(int i=0;i<2;i++) {
MockRAMDirectory dir = new MockRAMDirectory();
{
final IndexWriter writer = new IndexWriter(dir, analyzer);
final int finalI = i;
Thread[] threads = new Thread[NUM_THREAD];
for(int t=0;t<NUM_THREAD;t++) {
threads[t] = new Thread() {
public void run() {
try {
for(int iter=0;iter<NUM_ITER;iter++) {
Document doc = new Document();
doc.add(new Field("contents", "here are some contents", Field.Store.YES,
Field.Index.TOKENIZED, Field.TermVector.WITH_POSITIONS_OFFSETS));
writer.addDocument(doc);
writer.addDocument(doc);
doc.add(new Field("crash", "this should crash after 4 terms", Field.Store.YES,
Field.Index.TOKENIZED, Field.TermVector.WITH_POSITIONS_OFFSETS));
doc.add(new Field("other", "this will not get indexed", Field.Store.YES,
Field.Index.TOKENIZED, Field.TermVector.WITH_POSITIONS_OFFSETS));
try {
writer.addDocument(doc);
fail("did not hit expected exception");
} catch (IOException ioe) {
}
if (0 == finalI) {
doc = new Document();
doc.add(new Field("contents", "here are some contents", Field.Store.YES,
Field.Index.TOKENIZED, Field.TermVector.WITH_POSITIONS_OFFSETS));
writer.addDocument(doc);
writer.addDocument(doc);
}
}
} catch (Throwable t) {
synchronized(this) {
System.out.println(Thread.currentThread().getName() + ": ERROR: hit unexpected exception");
t.printStackTrace(System.out);
}
fail();
}
}
};
threads[t].start();
}
for(int t=0;t<NUM_THREAD;t++)
while (true)
try {
threads[t].join();
break;
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
}
writer.close();
}
IndexReader reader = IndexReader.open(dir);
int expected = (3+(1-i)*2)*NUM_THREAD*NUM_ITER;
assertEquals(expected, reader.docFreq(new Term("contents", "here")));
assertEquals(expected, reader.maxDoc());
int numDel = 0;
for(int j=0;j<reader.maxDoc();j++) {
if (reader.isDeleted(j))
numDel++;
else
reader.document(j);
reader.getTermFreqVectors(j);
}
reader.close();
assertEquals(NUM_THREAD*NUM_ITER, numDel);
IndexWriter writer = new IndexWriter(dir, analyzer);
writer.setMaxBufferedDocs(10);
Document doc = new Document();
doc.add(new Field("contents", "here are some contents", Field.Store.YES,
Field.Index.TOKENIZED, Field.TermVector.WITH_POSITIONS_OFFSETS));
for(int j=0;j<17;j++)
writer.addDocument(doc);
writer.optimize();
writer.close();
reader = IndexReader.open(dir);
expected += 17-NUM_THREAD*NUM_ITER;
assertEquals(expected, reader.docFreq(new Term("contents", "here")));
assertEquals(expected, reader.maxDoc());
numDel = 0;
for(int j=0;j<reader.maxDoc();j++) {
if (reader.isDeleted(j))
numDel++;
else
reader.document(j);
reader.getTermFreqVectors(j);
}
reader.close();
assertEquals(0, numDel);
dir.close();
}
}
public void testVariableSchema() throws IOException { public void testVariableSchema() throws IOException {
MockRAMDirectory dir = new MockRAMDirectory(); MockRAMDirectory dir = new MockRAMDirectory();
int delID = 0; int delID = 0;
@ -2112,4 +2233,358 @@ public class TestIndexWriter extends LuceneTestCase
directory.close(); directory.close();
} }
// Used by test cases below
private class IndexerThread extends Thread {
boolean diskFull;
Throwable error;
AlreadyClosedException ace;
IndexWriter writer;
boolean noErrors;
public IndexerThread(IndexWriter writer, boolean noErrors) {
this.writer = writer;
this.noErrors = noErrors;
}
public void run() {
final Document doc = new Document();
doc.add(new Field("field", "aaa bbb ccc ddd eee fff ggg hhh iii jjj", Field.Store.YES, Field.Index.TOKENIZED, Field.TermVector.WITH_POSITIONS_OFFSETS));
int idUpto = 0;
int fullCount = 0;
while(true) {
try {
writer.updateDocument(new Term("id", ""+(idUpto++)), doc);
} catch (IOException ioe) {
if (ioe.getMessage().startsWith("fake disk full at") ||
ioe.getMessage().equals("now failing on purpose")) {
diskFull = true;
try {
Thread.sleep(1);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
}
if (fullCount++ >= 5)
break;
} else {
if (noErrors) {
System.out.println(Thread.currentThread().getName() + ": ERROR: unexpected IOException:");
ioe.printStackTrace(System.out);
error = ioe;
}
break;
}
} catch (Throwable t) {
if (noErrors) {
System.out.println(Thread.currentThread().getName() + ": ERROR: unexpected Throwable:");
t.printStackTrace(System.out);
error = t;
}
break;
}
}
}
}
// LUCENE-1130: make sure we can close() even while
// threads are trying to add documents. Strictly
// speaking, this isn't valid us of Lucene's APIs, but we
// still want to be robust to this case:
public void testCloseWithThreads() throws IOException {
int NUM_THREADS = 3;
for(int iter=0;iter<50;iter++) {
MockRAMDirectory dir = new MockRAMDirectory();
IndexWriter writer = new IndexWriter(dir, new WhitespaceAnalyzer());
ConcurrentMergeScheduler cms = new ConcurrentMergeScheduler();
writer.setMergeScheduler(cms);
writer.setMaxBufferedDocs(10);
writer.setMergeFactor(4);
IndexerThread[] threads = new IndexerThread[NUM_THREADS];
boolean diskFull = false;
for(int i=0;i<NUM_THREADS;i++)
threads[i] = new IndexerThread(writer, false);
for(int i=0;i<NUM_THREADS;i++)
threads[i].start();
try {
Thread.sleep(50);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
}
writer.close(false);
// Make sure threads that are adding docs are not hung:
for(int i=0;i<NUM_THREADS;i++) {
while(true) {
try {
// Without fix for LUCENE-1130: one of the
// threads will hang
threads[i].join();
break;
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
}
}
if (threads[i].isAlive())
fail("thread seems to be hung");
}
// Quick test to make sure index is not corrupt:
IndexReader reader = IndexReader.open(dir);
TermDocs tdocs = reader.termDocs(new Term("field", "aaa"));
int count = 0;
while(tdocs.next()) {
count++;
}
assertTrue(count > 0);
reader.close();
dir.close();
}
}
// LUCENE-1130: make sure immeidate disk full on creating
// an IndexWriter (hit during DW.ThreadState.init()) is
// OK:
public void testImmediateDiskFull() throws IOException {
MockRAMDirectory dir = new MockRAMDirectory();
IndexWriter writer = new IndexWriter(dir, new WhitespaceAnalyzer());
dir.setMaxSizeInBytes(dir.getRecomputedActualSizeInBytes());
writer.setMaxBufferedDocs(2);
final Document doc = new Document();
doc.add(new Field("field", "aaa bbb ccc ddd eee fff ggg hhh iii jjj", Field.Store.YES, Field.Index.TOKENIZED, Field.TermVector.WITH_POSITIONS_OFFSETS));
try {
writer.addDocument(doc);
fail("did not hit disk full");
} catch (IOException ioe) {
}
// Without fix for LUCENE-1130: this call will hang:
try {
writer.addDocument(doc);
fail("did not hit disk full");
} catch (IOException ioe) {
}
try {
writer.close(false);
fail("did not hit disk full");
} catch (IOException ioe) {
}
}
// LUCENE-1130: make sure immeidate disk full on creating
// an IndexWriter (hit during DW.ThreadState.init()), with
// multiple threads, is OK:
public void testImmediateDiskFullWithThreads() throws IOException {
int NUM_THREADS = 3;
for(int iter=0;iter<10;iter++) {
MockRAMDirectory dir = new MockRAMDirectory();
IndexWriter writer = new IndexWriter(dir, new WhitespaceAnalyzer());
ConcurrentMergeScheduler cms = new ConcurrentMergeScheduler();
// We expect disk full exceptions in the merge threads
cms.setSuppressExceptions();
writer.setMergeScheduler(cms);
writer.setMaxBufferedDocs(2);
writer.setMergeFactor(4);
dir.setMaxSizeInBytes(4*1024+20*iter);
IndexerThread[] threads = new IndexerThread[NUM_THREADS];
boolean diskFull = false;
for(int i=0;i<NUM_THREADS;i++)
threads[i] = new IndexerThread(writer, true);
for(int i=0;i<NUM_THREADS;i++)
threads[i].start();
for(int i=0;i<NUM_THREADS;i++) {
while(true) {
try {
// Without fix for LUCENE-1130: one of the
// threads will hang
threads[i].join();
break;
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
}
}
if (threads[i].isAlive())
fail("thread seems to be hung");
else
assertTrue("hit unexpected Throwable", threads[i].error == null);
}
try {
writer.close(false);
} catch (IOException ioe) {
}
dir.close();
}
}
// Throws IOException during FieldsWriter.flushDocument and during DocumentsWriter.abort
private static class FailOnlyOnAbortOrFlush extends MockRAMDirectory.Failure {
public void eval(MockRAMDirectory dir) throws IOException {
if (doFail) {
StackTraceElement[] trace = new Exception().getStackTrace();
for (int i = 0; i < trace.length; i++) {
if ("abort".equals(trace[i].getMethodName()) ||
"flushDocument".equals(trace[i].getMethodName()))
throw new IOException("now failing on purpose");
}
}
}
}
// Runs test, with one thread, using the specific failure
// to trigger an IOException
public void _testSingleThreadFailure(MockRAMDirectory.Failure failure) throws IOException {
MockRAMDirectory dir = new MockRAMDirectory();
IndexWriter writer = new IndexWriter(dir, new WhitespaceAnalyzer());
writer.setMaxBufferedDocs(2);
final Document doc = new Document();
doc.add(new Field("field", "aaa bbb ccc ddd eee fff ggg hhh iii jjj", Field.Store.YES, Field.Index.TOKENIZED, Field.TermVector.WITH_POSITIONS_OFFSETS));
for(int i=0;i<6;i++)
writer.addDocument(doc);
dir.failOn(failure);
failure.setDoFail();
try {
writer.addDocument(doc);
writer.addDocument(doc);
fail("did not hit exception");
} catch (IOException ioe) {
}
failure.clearDoFail();
writer.addDocument(doc);
writer.close(false);
}
// Runs test, with multiple threads, using the specific
// failure to trigger an IOException
public void _testMultipleThreadsFailure(MockRAMDirectory.Failure failure) throws IOException {
int NUM_THREADS = 3;
for(int iter=0;iter<5;iter++) {
MockRAMDirectory dir = new MockRAMDirectory();
IndexWriter writer = new IndexWriter(dir, new WhitespaceAnalyzer());
ConcurrentMergeScheduler cms = new ConcurrentMergeScheduler();
// We expect disk full exceptions in the merge threads
cms.setSuppressExceptions();
writer.setMergeScheduler(cms);
writer.setMaxBufferedDocs(2);
writer.setMergeFactor(4);
IndexerThread[] threads = new IndexerThread[NUM_THREADS];
boolean diskFull = false;
for(int i=0;i<NUM_THREADS;i++)
threads[i] = new IndexerThread(writer, true);
for(int i=0;i<NUM_THREADS;i++)
threads[i].start();
try {
Thread.sleep(10);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
}
dir.failOn(failure);
failure.setDoFail();
for(int i=0;i<NUM_THREADS;i++) {
while(true) {
try {
threads[i].join();
break;
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
}
}
if (threads[i].isAlive())
fail("thread seems to be hung");
else
assertTrue("hit unexpected Throwable", threads[i].error == null);
}
try {
writer.close(false);
} catch (IOException ioe) {
}
dir.close();
}
}
// LUCENE-1130: make sure initial IOException, and then 2nd
// IOException during abort(), is OK:
public void testIOExceptionDuringAbort() throws IOException {
_testSingleThreadFailure(new FailOnlyOnAbortOrFlush());
}
// LUCENE-1130: make sure initial IOException, and then 2nd
// IOException during abort(), with multiple threads, is OK:
public void testIOExceptionDuringAbortWithThreads() throws IOException {
_testMultipleThreadsFailure(new FailOnlyOnAbortOrFlush());
}
// Throws IOException during DocumentsWriter.closeDocStore
private static class FailOnlyInCloseDocStore extends MockRAMDirectory.Failure {
public void eval(MockRAMDirectory dir) throws IOException {
if (doFail) {
StackTraceElement[] trace = new Exception().getStackTrace();
for (int i = 0; i < trace.length; i++) {
if ("closeDocStore".equals(trace[i].getMethodName()))
throw new IOException("now failing on purpose");
}
}
}
}
// LUCENE-1130: test IOException in closeDocStore
public void testIOExceptionDuringCloseDocStore() throws IOException {
_testSingleThreadFailure(new FailOnlyInCloseDocStore());
}
// LUCENE-1130: test IOException in closeDocStore, with threads
public void testIOExceptionDuringCloseDocStoreWithThreads() throws IOException {
_testMultipleThreadsFailure(new FailOnlyInCloseDocStore());
}
// Throws IOException during DocumentsWriter.writeSegment
private static class FailOnlyInWriteSegment extends MockRAMDirectory.Failure {
public void eval(MockRAMDirectory dir) throws IOException {
if (doFail) {
StackTraceElement[] trace = new Exception().getStackTrace();
for (int i = 0; i < trace.length; i++) {
if ("writeSegment".equals(trace[i].getMethodName()))
throw new IOException("now failing on purpose");
}
}
}
}
// LUCENE-1130: test IOException in writeSegment
public void testIOExceptionDuringWriteSegment() throws IOException {
_testSingleThreadFailure(new FailOnlyInWriteSegment());
}
// LUCENE-1130: test IOException in writeSegment, with threads
public void testIOExceptionDuringWriteSegmentWithThreads() throws IOException {
_testMultipleThreadsFailure(new FailOnlyInWriteSegment());
}
} }

View File

@ -19,7 +19,6 @@ package org.apache.lucene.index;
import java.io.IOException; import java.io.IOException;
import java.util.Arrays; import java.util.Arrays;
import java.lang.StackTraceElement;
import org.apache.lucene.util.LuceneTestCase; import org.apache.lucene.util.LuceneTestCase;
@ -454,7 +453,7 @@ public class TestIndexWriterDelete extends LuceneTestCase {
String[] startFiles = dir.list(); String[] startFiles = dir.list();
SegmentInfos infos = new SegmentInfos(); SegmentInfos infos = new SegmentInfos();
infos.read(dir); infos.read(dir);
IndexFileDeleter d = new IndexFileDeleter(dir, new KeepOnlyLastCommitDeletionPolicy(), infos, null, null); new IndexFileDeleter(dir, new KeepOnlyLastCommitDeletionPolicy(), infos, null, null);
String[] endFiles = dir.list(); String[] endFiles = dir.list();
Arrays.sort(startFiles); Arrays.sort(startFiles);
@ -560,9 +559,20 @@ public class TestIndexWriterDelete extends LuceneTestCase {
} }
public void eval(MockRAMDirectory dir) throws IOException { public void eval(MockRAMDirectory dir) throws IOException {
if (sawMaybe && !failed) { if (sawMaybe && !failed) {
boolean seen = false;
StackTraceElement[] trace = new Exception().getStackTrace();
for (int i = 0; i < trace.length; i++) {
if ("applyDeletes".equals(trace[i].getMethodName())) {
seen = true;
break;
}
}
if (!seen) {
// Only fail once we are no longer in applyDeletes
failed = true; failed = true;
throw new IOException("fail after applyDeletes"); throw new IOException("fail after applyDeletes");
} }
}
if (!failed) { if (!failed) {
StackTraceElement[] trace = new Exception().getStackTrace(); StackTraceElement[] trace = new Exception().getStackTrace();
for (int i = 0; i < trace.length; i++) { for (int i = 0; i < trace.length; i++) {
@ -740,7 +750,7 @@ public class TestIndexWriterDelete extends LuceneTestCase {
String[] startFiles = dir.list(); String[] startFiles = dir.list();
SegmentInfos infos = new SegmentInfos(); SegmentInfos infos = new SegmentInfos();
infos.read(dir); infos.read(dir);
IndexFileDeleter d = new IndexFileDeleter(dir, new KeepOnlyLastCommitDeletionPolicy(), infos, null, null); new IndexFileDeleter(dir, new KeepOnlyLastCommitDeletionPolicy(), infos, null, null);
String[] endFiles = dir.list(); String[] endFiles = dir.list();
if (!Arrays.equals(startFiles, endFiles)) { if (!Arrays.equals(startFiles, endFiles)) {

View File

@ -117,7 +117,6 @@ public class MockRAMDirectory extends RAMDirectory {
} }
void maybeThrowIOException() throws IOException { void maybeThrowIOException() throws IOException {
maybeThrowDeterministicException();
if (randomIOExceptionRate > 0.0) { if (randomIOExceptionRate > 0.0) {
int number = Math.abs(randomState.nextInt() % 1000); int number = Math.abs(randomState.nextInt() % 1000);
if (number < randomIOExceptionRate*1000) { if (number < randomIOExceptionRate*1000) {
@ -198,7 +197,7 @@ public class MockRAMDirectory extends RAMDirectory {
* RAMOutputStream.BUFFER_SIZE (now 1024) bytes. * RAMOutputStream.BUFFER_SIZE (now 1024) bytes.
*/ */
final synchronized long getRecomputedActualSizeInBytes() { public final synchronized long getRecomputedActualSizeInBytes() {
long size = 0; long size = 0;
Iterator it = fileMap.values().iterator(); Iterator it = fileMap.values().iterator();
while (it.hasNext()) while (it.hasNext())
@ -245,6 +244,16 @@ public class MockRAMDirectory extends RAMDirectory {
* mock.failOn(failure.reset()) * mock.failOn(failure.reset())
*/ */
public Failure reset() { return this; } public Failure reset() { return this; }
protected boolean doFail;
public void setDoFail() {
doFail = true;
}
public void clearDoFail() {
doFail = false;
}
} }
ArrayList failures; ArrayList failures;
@ -253,7 +262,7 @@ public class MockRAMDirectory extends RAMDirectory {
* add a Failure object to the list of objects to be evaluated * add a Failure object to the list of objects to be evaluated
* at every potential failure point * at every potential failure point
*/ */
public void failOn(Failure fail) { synchronized public void failOn(Failure fail) {
if (failures == null) { if (failures == null) {
failures = new ArrayList(); failures = new ArrayList();
} }
@ -261,10 +270,10 @@ public class MockRAMDirectory extends RAMDirectory {
} }
/** /**
* Itterate through the failures list, giving each object a * Iterate through the failures list, giving each object a
* chance to throw an IOE * chance to throw an IOE
*/ */
void maybeThrowDeterministicException() throws IOException { synchronized void maybeThrowDeterministicException() throws IOException {
if (failures != null) { if (failures != null) {
for(int i = 0; i < failures.size(); i++) { for(int i = 0; i < failures.size(); i++) {
((Failure)failures.get(i)).eval(this); ((Failure)failures.get(i)).eval(this);

View File

@ -18,7 +18,6 @@ package org.apache.lucene.store;
*/ */
import java.io.IOException; import java.io.IOException;
import java.util.Iterator;
/** /**
* Used by MockRAMDirectory to create an output stream that * Used by MockRAMDirectory to create an output stream that
@ -50,6 +49,11 @@ public class MockRAMOutputStream extends RAMOutputStream {
} }
} }
public void flush() throws IOException {
dir.maybeThrowDeterministicException();
super.flush();
}
public void writeByte(byte b) throws IOException { public void writeByte(byte b) throws IOException {
singleByte[0] = b; singleByte[0] = b;
writeBytes(singleByte, 0, 1); writeBytes(singleByte, 0, 1);
@ -80,6 +84,8 @@ public class MockRAMOutputStream extends RAMOutputStream {
super.writeBytes(b, offset, len); super.writeBytes(b, offset, len);
} }
dir.maybeThrowDeterministicException();
if (first) { if (first) {
// Maybe throw random exception; only do this on first // Maybe throw random exception; only do this on first
// write to a new file: // write to a new file: