LUCENE-1191: if we hit OOM then don't commit any changes to the index

git-svn-id: https://svn.apache.org/repos/asf/lucene/java/trunk@632124 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael McCandless 2008-02-28 21:35:42 +00:00
parent 8f9781bbdb
commit 7af43e46d6
1 changed files with 376 additions and 311 deletions

View File

@ -289,6 +289,7 @@ public class IndexWriter {
private static Object MESSAGE_ID_LOCK = new Object(); private static Object MESSAGE_ID_LOCK = new Object();
private static int MESSAGE_ID = 0; private static int MESSAGE_ID = 0;
private int messageID = -1; private int messageID = -1;
volatile private boolean hitOOM;
private Directory directory; // where this index resides private Directory directory; // where this index resides
private Analyzer analyzer; // how to analyze text private Analyzer analyzer; // how to analyze text
@ -1610,6 +1611,13 @@ public class IndexWriter {
*/ */
public void close(boolean waitForMerges) throws CorruptIndexException, IOException { public void close(boolean waitForMerges) throws CorruptIndexException, IOException {
boolean doClose; boolean doClose;
// If any methods have hit OutOfMemoryError, then abort
// on close, in case the internal state of IndexWriter
// or DocumentsWriter is corrupt
if (hitOOM)
abort();
synchronized(this) { synchronized(this) {
// Ensure that only one thread actually gets to do the closing: // Ensure that only one thread actually gets to do the closing:
if (!closing) { if (!closing) {
@ -1676,7 +1684,9 @@ public class IndexWriter {
synchronized(this) { synchronized(this) {
closed = true; closed = true;
} }
} catch (OutOfMemoryError oom) {
hitOOM = true;
throw oom;
} finally { } finally {
synchronized(this) { synchronized(this) {
if (!closed) if (!closed)
@ -1862,27 +1872,32 @@ public class IndexWriter {
boolean doFlush = false; boolean doFlush = false;
boolean success = false; boolean success = false;
try { try {
doFlush = docWriter.addDocument(doc, analyzer); try {
success = true; doFlush = docWriter.addDocument(doc, analyzer);
} finally { success = true;
if (!success) { } finally {
if (!success) {
if (infoStream != null) if (infoStream != null)
message("hit exception adding document"); message("hit exception adding document");
synchronized (this) { synchronized (this) {
// If docWriter has some aborted files that were // If docWriter has some aborted files that were
// never incref'd, then we clean them up here // never incref'd, then we clean them up here
if (docWriter != null) { if (docWriter != null) {
final List files = docWriter.abortedFiles(); final List files = docWriter.abortedFiles();
if (files != null) if (files != null)
deleter.deleteNewFiles(files); deleter.deleteNewFiles(files);
}
} }
} }
} }
if (doFlush)
flush(true, false);
} catch (OutOfMemoryError oom) {
hitOOM = true;
throw oom;
} }
if (doFlush)
flush(true, false);
} }
/** /**
@ -1893,9 +1908,14 @@ public class IndexWriter {
*/ */
public void deleteDocuments(Term term) throws CorruptIndexException, IOException { public void deleteDocuments(Term term) throws CorruptIndexException, IOException {
ensureOpen(); ensureOpen();
boolean doFlush = docWriter.bufferDeleteTerm(term); try {
if (doFlush) boolean doFlush = docWriter.bufferDeleteTerm(term);
flush(true, false); if (doFlush)
flush(true, false);
} catch (OutOfMemoryError oom) {
hitOOM = true;
throw oom;
}
} }
/** /**
@ -1908,9 +1928,14 @@ public class IndexWriter {
*/ */
public void deleteDocuments(Term[] terms) throws CorruptIndexException, IOException { public void deleteDocuments(Term[] terms) throws CorruptIndexException, IOException {
ensureOpen(); ensureOpen();
boolean doFlush = docWriter.bufferDeleteTerms(terms); try {
if (doFlush) boolean doFlush = docWriter.bufferDeleteTerms(terms);
flush(true, false); if (doFlush)
flush(true, false);
} catch (OutOfMemoryError oom) {
hitOOM = true;
throw oom;
}
} }
/** /**
@ -1946,28 +1971,33 @@ public class IndexWriter {
public void updateDocument(Term term, Document doc, Analyzer analyzer) public void updateDocument(Term term, Document doc, Analyzer analyzer)
throws CorruptIndexException, IOException { throws CorruptIndexException, IOException {
ensureOpen(); ensureOpen();
boolean doFlush = false;
boolean success = false;
try { try {
doFlush = docWriter.updateDocument(term, doc, analyzer); boolean doFlush = false;
success = true; boolean success = false;
} finally { try {
if (!success) { doFlush = docWriter.updateDocument(term, doc, analyzer);
success = true;
} finally {
if (!success) {
if (infoStream != null) if (infoStream != null)
message("hit exception updating document"); message("hit exception updating document");
synchronized (this) { synchronized (this) {
// If docWriter has some aborted files that were // If docWriter has some aborted files that were
// never incref'd, then we clean them up here // never incref'd, then we clean them up here
final List files = docWriter.abortedFiles(); final List files = docWriter.abortedFiles();
if (files != null) if (files != null)
deleter.deleteNewFiles(files); deleter.deleteNewFiles(files);
}
} }
} }
if (doFlush)
flush(true, false);
} catch (OutOfMemoryError oom) {
hitOOM = true;
throw oom;
} }
if (doFlush)
flush(true, false);
} }
// for test purpose // for test purpose
@ -2644,32 +2674,37 @@ public class IndexWriter {
throws CorruptIndexException, IOException { throws CorruptIndexException, IOException {
ensureOpen(); ensureOpen();
if (infoStream != null)
message("flush at addIndexes");
flush(true, false);
boolean success = false;
startTransaction();
try { try {
for (int i = 0; i < dirs.length; i++) { if (infoStream != null)
SegmentInfos sis = new SegmentInfos(); // read infos from dir message("flush at addIndexes");
sis.read(dirs[i]); flush(true, false);
for (int j = 0; j < sis.size(); j++) {
segmentInfos.addElement(sis.info(j)); // add each info boolean success = false;
startTransaction();
try {
for (int i = 0; i < dirs.length; i++) {
SegmentInfos sis = new SegmentInfos(); // read infos from dir
sis.read(dirs[i]);
for (int j = 0; j < sis.size(); j++) {
segmentInfos.addElement(sis.info(j)); // add each info
}
}
optimize();
success = true;
} finally {
if (success) {
commitTransaction();
} else {
rollbackTransaction();
} }
} }
} catch (OutOfMemoryError oom) {
optimize(); hitOOM = true;
throw oom;
success = true;
} finally {
if (success) {
commitTransaction();
} else {
rollbackTransaction();
}
} }
} }
@ -2706,47 +2741,53 @@ public class IndexWriter {
throws CorruptIndexException, IOException { throws CorruptIndexException, IOException {
ensureOpen(); ensureOpen();
if (infoStream != null)
message("flush at addIndexesNoOptimize");
flush(true, false);
boolean success = false;
startTransaction();
try { try {
if (infoStream != null)
message("flush at addIndexesNoOptimize");
flush(true, false);
for (int i = 0; i < dirs.length; i++) { boolean success = false;
if (directory == dirs[i]) {
// cannot add this index: segments may be deleted in merge before added startTransaction();
throw new IllegalArgumentException("Cannot add this index to itself");
try {
for (int i = 0; i < dirs.length; i++) {
if (directory == dirs[i]) {
// cannot add this index: segments may be deleted in merge before added
throw new IllegalArgumentException("Cannot add this index to itself");
}
SegmentInfos sis = new SegmentInfos(); // read infos from dir
sis.read(dirs[i]);
for (int j = 0; j < sis.size(); j++) {
SegmentInfo info = sis.info(j);
segmentInfos.addElement(info); // add each info
}
} }
SegmentInfos sis = new SegmentInfos(); // read infos from dir maybeMerge();
sis.read(dirs[i]);
for (int j = 0; j < sis.size(); j++) { // If after merging there remain segments in the index
SegmentInfo info = sis.info(j); // that are in a different directory, just copy these
segmentInfos.addElement(info); // add each info // over into our index. This is necessary (before
// finishing the transaction) to avoid leaving the
// index in an unusable (inconsistent) state.
copyExternalSegments();
success = true;
} finally {
if (success) {
commitTransaction();
} else {
rollbackTransaction();
} }
} }
} catch (OutOfMemoryError oom) {
maybeMerge(); hitOOM = true;
throw oom;
// If after merging there remain segments in the index
// that are in a different directory, just copy these
// over into our index. This is necessary (before
// finishing the transaction) to avoid leaving the
// index in an unusable (inconsistent) state.
copyExternalSegments();
success = true;
} finally {
if (success) {
commitTransaction();
} else {
rollbackTransaction();
}
} }
} }
@ -2793,77 +2834,82 @@ public class IndexWriter {
throws CorruptIndexException, IOException { throws CorruptIndexException, IOException {
ensureOpen(); ensureOpen();
optimize(); // start with zero or 1 seg
final String mergedName = newSegmentName();
SegmentMerger merger = new SegmentMerger(this, mergedName, null);
SegmentInfo info;
IndexReader sReader = null;
try { try {
if (segmentInfos.size() == 1){ // add existing index, if any optimize(); // start with zero or 1 seg
sReader = SegmentReader.get(segmentInfos.info(0));
merger.add(sReader);
}
for (int i = 0; i < readers.length; i++) // add new indexes final String mergedName = newSegmentName();
merger.add(readers[i]); SegmentMerger merger = new SegmentMerger(this, mergedName, null);
boolean success = false; SegmentInfo info;
startTransaction();
IndexReader sReader = null;
try { try {
int docCount = merger.merge(); // merge 'em if (segmentInfos.size() == 1){ // add existing index, if any
sReader = SegmentReader.get(segmentInfos.info(0));
merger.add(sReader);
}
if(sReader != null) { for (int i = 0; i < readers.length; i++) // add new indexes
merger.add(readers[i]);
boolean success = false;
startTransaction();
try {
int docCount = merger.merge(); // merge 'em
if(sReader != null) {
sReader.close();
sReader = null;
}
segmentInfos.setSize(0); // pop old infos & add new
info = new SegmentInfo(mergedName, docCount, directory, false, true,
-1, null, false);
segmentInfos.addElement(info);
success = true;
} finally {
if (!success) {
if (infoStream != null)
message("hit exception in addIndexes during merge");
rollbackTransaction();
} else {
commitTransaction();
}
}
} finally {
if (sReader != null) {
sReader.close(); sReader.close();
sReader = null;
}
segmentInfos.setSize(0); // pop old infos & add new
info = new SegmentInfo(mergedName, docCount, directory, false, true,
-1, null, false);
segmentInfos.addElement(info);
success = true;
} finally {
if (!success) {
if (infoStream != null)
message("hit exception in addIndexes during merge");
rollbackTransaction();
} else {
commitTransaction();
} }
} }
} finally {
if (sReader != null) {
sReader.close();
}
}
if (mergePolicy instanceof LogMergePolicy && getUseCompoundFile()) { if (mergePolicy instanceof LogMergePolicy && getUseCompoundFile()) {
boolean success = false; boolean success = false;
startTransaction(); startTransaction();
try { try {
merger.createCompoundFile(mergedName + ".cfs"); merger.createCompoundFile(mergedName + ".cfs");
info.setUseCompoundFile(true); info.setUseCompoundFile(true);
} finally { } finally {
if (!success) { if (!success) {
if (infoStream != null) if (infoStream != null)
message("hit exception building compound file in addIndexes during merge"); message("hit exception building compound file in addIndexes during merge");
rollbackTransaction(); rollbackTransaction();
} else { } else {
commitTransaction(); commitTransaction();
}
} }
} }
} catch (OutOfMemoryError oom) {
hitOOM = true;
throw oom;
} }
} }
@ -3121,6 +3167,9 @@ public class IndexWriter {
return flushDocs || flushDeletes; return flushDocs || flushDeletes;
} catch (OutOfMemoryError oom) {
hitOOM = true;
throw oom;
} finally { } finally {
docWriter.clearFlushPending(); docWriter.clearFlushPending();
docWriter.resumeAllThreads(); docWriter.resumeAllThreads();
@ -3259,6 +3308,9 @@ public class IndexWriter {
/* FIXME if we want to support non-contiguous segment merges */ /* FIXME if we want to support non-contiguous segment merges */
synchronized private boolean commitMerge(MergePolicy.OneMerge merge) throws IOException { synchronized private boolean commitMerge(MergePolicy.OneMerge merge) throws IOException {
if (hitOOM)
return false;
if (infoStream != null) if (infoStream != null)
message("commitMerge: " + merge.segString(directory)); message("commitMerge: " + merge.segString(directory));
@ -3344,53 +3396,57 @@ public class IndexWriter {
boolean success = false; boolean success = false;
try { try {
try { try {
mergeInit(merge);
if (infoStream != null)
message("now merge\n merge=" + merge.segString(directory) + "\n index=" + segString());
mergeMiddle(merge);
success = true;
} catch (MergePolicy.MergeAbortedException e) {
merge.setException(e);
addMergeException(merge);
// We can ignore this exception, unless the merge
// involves segments from external directories, in
// which case we must throw it so, for example, the
// rollbackTransaction code in addIndexes* is
// executed.
if (merge.isExternal)
throw e;
}
} finally {
synchronized(this) {
try { try {
mergeInit(merge);
mergeFinish(merge); if (infoStream != null)
message("now merge\n merge=" + merge.segString(directory) + "\n index=" + segString());
if (!success) { mergeMiddle(merge);
if (infoStream != null) success = true;
message("hit exception during merge"); } catch (MergePolicy.MergeAbortedException e) {
addMergeException(merge); merge.setException(e);
if (merge.info != null && !segmentInfos.contains(merge.info)) addMergeException(merge);
deleter.refresh(merge.info.name); // We can ignore this exception, unless the merge
// involves segments from external directories, in
// which case we must throw it so, for example, the
// rollbackTransaction code in addIndexes* is
// executed.
if (merge.isExternal)
throw e;
}
} finally {
synchronized(this) {
try {
mergeFinish(merge);
if (!success) {
if (infoStream != null)
message("hit exception during merge");
addMergeException(merge);
if (merge.info != null && !segmentInfos.contains(merge.info))
deleter.refresh(merge.info.name);
}
// This merge (and, generally, any change to the
// segments) may now enable new merges, so we call
// merge policy & update pending merges.
if (success && !merge.isAborted() && !closed && !closing)
updatePendingMerges(merge.maxNumSegmentsOptimize, merge.optimize);
} finally {
runningMerges.remove(merge);
// Optimize may be waiting on the final optimize
// merge to finish; and finishMerges() may be
// waiting for all merges to finish:
notifyAll();
} }
// This merge (and, generally, any change to the
// segments) may now enable new merges, so we call
// merge policy & update pending merges.
if (success && !merge.isAborted() && !closed && !closing)
updatePendingMerges(merge.maxNumSegmentsOptimize, merge.optimize);
} finally {
runningMerges.remove(merge);
// Optimize may be waiting on the final optimize
// merge to finish; and finishMerges() may be
// waiting for all merges to finish:
notifyAll();
} }
} }
} catch (OutOfMemoryError oom) {
hitOOM = true;
throw oom;
} }
} }
@ -3988,143 +4044,152 @@ public class IndexWriter {
* that. */ * that. */
private void sync(boolean includeFlushes, long sizeInBytes) throws IOException { private void sync(boolean includeFlushes, long sizeInBytes) throws IOException {
message("start sync() includeFlushes=" + includeFlushes); if (hitOOM)
return;
if (!includeFlushes)
syncPause(sizeInBytes);
// First, we clone & incref the segmentInfos we intend
// to sync, then, without locking, we sync() each file
// referenced by toSync, in the background. Multiple
// threads can be doing this at once, if say a large
// merge and a small merge finish at the same time:
SegmentInfos toSync = null;
final int mySyncCount;
synchronized(this) {
if (!commitPending) {
message(" skip sync(): no commit pending");
return;
}
// Create the segmentInfos we want to sync, by copying
// the current one and possibly removing flushed
// segments:
toSync = (SegmentInfos) segmentInfos.clone();
final int numSegmentsToSync = toSync.size();
boolean newCommitPending = false;
if (!includeFlushes) {
// Do not sync flushes:
assert lastMergeInfo != null;
assert toSync.contains(lastMergeInfo);
int downTo = numSegmentsToSync-1;
while(!toSync.info(downTo).equals(lastMergeInfo)) {
message(" skip segment " + toSync.info(downTo).name);
toSync.remove(downTo);
downTo--;
newCommitPending = true;
}
} else if (numSegmentsToSync > 0)
// Force all subsequent syncs to include up through
// the final info in the current segments. This
// ensure that a call to commit() will force another
// sync (due to merge finishing) to sync all flushed
// segments as well:
lastMergeInfo = toSync.info(numSegmentsToSync-1);
mySyncCount = syncCount++;
deleter.incRef(toSync, false);
commitPending = newCommitPending;
}
boolean success0 = false;
try { try {
// Loop until all files toSync references are sync'd: message("start sync() includeFlushes=" + includeFlushes);
while(true) {
final Collection pending = new ArrayList(); if (!includeFlushes)
syncPause(sizeInBytes);
for(int i=0;i<toSync.size();i++) { // First, we clone & incref the segmentInfos we intend
final SegmentInfo info = toSync.info(i); // to sync, then, without locking, we sync() each file
final List files = info.files(); // referenced by toSync, in the background. Multiple
for(int j=0;j<files.size();j++) { // threads can be doing this at once, if say a large
final String fileName = (String) files.get(j); // merge and a small merge finish at the same time:
if (startSync(fileName, pending)) {
boolean success = false; SegmentInfos toSync = null;
try { final int mySyncCount;
// Because we incRef'd this commit point, above, synchronized(this) {
// the file had better exist:
assert directory.fileExists(fileName); if (!commitPending) {
message("now sync " + fileName); message(" skip sync(): no commit pending");
directory.sync(fileName); return;
success = true; }
} finally {
finishSync(fileName, success); // Create the segmentInfos we want to sync, by copying
// the current one and possibly removing flushed
// segments:
toSync = (SegmentInfos) segmentInfos.clone();
final int numSegmentsToSync = toSync.size();
boolean newCommitPending = false;
if (!includeFlushes) {
// Do not sync flushes:
assert lastMergeInfo != null;
assert toSync.contains(lastMergeInfo);
int downTo = numSegmentsToSync-1;
while(!toSync.info(downTo).equals(lastMergeInfo)) {
message(" skip segment " + toSync.info(downTo).name);
toSync.remove(downTo);
downTo--;
newCommitPending = true;
}
} else if (numSegmentsToSync > 0)
// Force all subsequent syncs to include up through
// the final info in the current segments. This
// ensure that a call to commit() will force another
// sync (due to merge finishing) to sync all flushed
// segments as well:
lastMergeInfo = toSync.info(numSegmentsToSync-1);
mySyncCount = syncCount++;
deleter.incRef(toSync, false);
commitPending = newCommitPending;
}
boolean success0 = false;
try {
// Loop until all files toSync references are sync'd:
while(true) {
final Collection pending = new ArrayList();
for(int i=0;i<toSync.size();i++) {
final SegmentInfo info = toSync.info(i);
final List files = info.files();
for(int j=0;j<files.size();j++) {
final String fileName = (String) files.get(j);
if (startSync(fileName, pending)) {
boolean success = false;
try {
// Because we incRef'd this commit point, above,
// the file had better exist:
assert directory.fileExists(fileName);
message("now sync " + fileName);
directory.sync(fileName);
success = true;
} finally {
finishSync(fileName, success);
}
} }
} }
} }
// All files that I require are either synced or being
// synced by other threads. If they are being synced,
// we must at this point block until they are done.
// If this returns false, that means an error in
// another thread resulted in failing to actually
// sync one of our files, so we repeat:
if (waitForAllSynced(pending))
break;
} }
// All files that I require are either synced or being synchronized(this) {
// synced by other threads. If they are being synced, // If someone saved a newer version of segments file
// we must at this point block until they are done. // since I first started syncing my version, I can
// If this returns false, that means an error in // safely skip saving myself since I've been
// another thread resulted in failing to actually // superseded:
// sync one of our files, so we repeat: if (mySyncCount > syncCountSaved) {
if (waitForAllSynced(pending))
break;
}
synchronized(this) { if (segmentInfos.getGeneration() > toSync.getGeneration())
// If someone saved a newer version of segments file toSync.updateGeneration(segmentInfos);
// since I first started syncing my version, I can
// safely skip saving myself since I've been
// superseded:
if (mySyncCount > syncCountSaved) {
if (segmentInfos.getGeneration() > toSync.getGeneration()) boolean success = false;
toSync.updateGeneration(segmentInfos); try {
toSync.commit(directory);
boolean success = false; success = true;
try { } finally {
toSync.commit(directory); // Have our master segmentInfos record the
success = true; // generations we just sync'd
} finally { segmentInfos.updateGeneration(toSync);
// Have our master segmentInfos record the if (!success) {
// generations we just sync'd commitPending = true;
segmentInfos.updateGeneration(toSync); message("hit exception committing segments file");
if (!success) { }
commitPending = true;
message("hit exception committing segments file");
} }
} message("commit complete");
message("commit complete");
syncCountSaved = mySyncCount; syncCountSaved = mySyncCount;
deleter.checkpoint(toSync, true); deleter.checkpoint(toSync, true);
setRollbackSegmentInfos(); setRollbackSegmentInfos();
} else } else
message("sync superseded by newer infos"); message("sync superseded by newer infos");
} }
message("done all syncs"); message("done all syncs");
success0 = true; success0 = true;
} finally { } finally {
synchronized(this) { synchronized(this) {
deleter.decRef(toSync); deleter.decRef(toSync);
if (!success0) if (!success0)
commitPending = true; commitPending = true;
}
} }
} catch (OutOfMemoryError oom) {
hitOOM = true;
throw oom;
} }
} }