LUCENE-2576: simplify IndexWriter's private startCommit method now that it's single thread'd (may fix this intermittent test failure); also add one missing checkpoint() in addIndexes

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@984510 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael McCandless 2010-08-11 18:19:13 +00:00
parent 1ab1788e98
commit 75e34cdba9
1 changed files with 50 additions and 67 deletions

View File

@ -3006,6 +3006,7 @@ public class IndexWriter implements Closeable {
info.setUseCompoundFile(true); info.setUseCompoundFile(true);
} }
} finally { } finally {
checkpoint();
deleter.decRef(files); deleter.decRef(files);
} }
} }
@ -3181,6 +3182,7 @@ public class IndexWriter implements Closeable {
setRollbackSegmentInfos(pendingCommit); setRollbackSegmentInfos(pendingCommit);
deleter.checkpoint(pendingCommit, true); deleter.checkpoint(pendingCommit, true);
} finally { } finally {
// Matches the incRef done in startCommit:
deleter.decRef(pendingCommit); deleter.decRef(pendingCommit);
pendingCommit = null; pendingCommit = null;
notifyAll(); notifyAll();
@ -4267,6 +4269,21 @@ public class IndexWriter implements Closeable {
} }
} }
// called only from assert
private boolean filesExist(SegmentInfos toSync) throws IOException {
Collection<String> files = toSync.files(directory, false);
for(final String fileName: files) {
assert directory.fileExists(fileName): "file " + fileName + " does not exist";
// If this trips it means we are missing a call to
// .checkpoint somewhere, because by the time we
// are called, deleter should know about every
// file referenced by the current head
// segmentInfos:
assert deleter.exists(fileName) : "IndexFileDeleter doesn't know about file " + fileName;
}
return true;
}
/** Walk through all files referenced by the current /** Walk through all files referenced by the current
* segmentInfos and ask the Directory to sync each file, * segmentInfos and ask the Directory to sync each file,
* if it wasn't already. If that succeeds, then we * if it wasn't already. If that succeeds, then we
@ -4275,9 +4292,7 @@ public class IndexWriter implements Closeable {
private void startCommit(long sizeInBytes, Map<String,String> commitUserData) throws IOException { private void startCommit(long sizeInBytes, Map<String,String> commitUserData) throws IOException {
assert testPoint("startStartCommit"); assert testPoint("startStartCommit");
assert pendingCommit == null;
// TODO: as of LUCENE-2095, we can simplify this method,
// since only 1 thread can be in here at once
if (hitOOM) { if (hitOOM) {
throw new IllegalStateException("this writer hit an OutOfMemoryError; cannot commit"); throw new IllegalStateException("this writer hit an OutOfMemoryError; cannot commit");
@ -4288,7 +4303,7 @@ public class IndexWriter implements Closeable {
if (infoStream != null) if (infoStream != null)
message("startCommit(): start sizeInBytes=" + sizeInBytes); message("startCommit(): start sizeInBytes=" + sizeInBytes);
SegmentInfos toSync = null; final SegmentInfos toSync;
final long myChangeCount; final long myChangeCount;
synchronized(this) { synchronized(this) {
@ -4303,9 +4318,7 @@ public class IndexWriter implements Closeable {
// First, we clone & incref the segmentInfos we intend // First, we clone & incref the segmentInfos we intend
// to sync, then, without locking, we sync() each file // to sync, then, without locking, we sync() each file
// referenced by toSync, in the background. Multiple // referenced by toSync, in the background.
// threads can be doing this at once, if say a large
// merge and a small merge finish at the same time:
if (infoStream != null) if (infoStream != null)
message("startCommit index=" + segString(segmentInfos) + " changeCount=" + changeCount); message("startCommit index=" + segString(segmentInfos) + " changeCount=" + changeCount);
@ -4313,84 +4326,42 @@ public class IndexWriter implements Closeable {
readerPool.commit(); readerPool.commit();
toSync = (SegmentInfos) segmentInfos.clone(); toSync = (SegmentInfos) segmentInfos.clone();
assert filesExist(toSync);
if (commitUserData != null) if (commitUserData != null)
toSync.setUserData(commitUserData); toSync.setUserData(commitUserData);
// This protects the segmentInfos we are now going
// to commit. This is important in case, eg, while
// we are trying to sync all referenced files, a
// merge completes which would otherwise have
// removed the files we are now syncing.
deleter.incRef(toSync, false); deleter.incRef(toSync, false);
myChangeCount = changeCount; myChangeCount = changeCount;
Collection<String> files = toSync.files(directory, false);
for(final String fileName: files) {
assert directory.fileExists(fileName): "file " + fileName + " does not exist";
// If this trips it means we are missing a call to
// .checkpoint somewhere, because by the time we
// are called, deleter should know about every
// file referenced by the current head
// segmentInfos:
assert deleter.exists(fileName);
}
} }
assert testPoint("midStartCommit"); assert testPoint("midStartCommit");
boolean setPending = false;
try { try {
// This call can take a long time -- 10s of seconds
// or more. We do it without sync:
directory.sync(toSync.files(directory, false)); directory.sync(toSync.files(directory, false));
assert testPoint("midStartCommit2"); assert testPoint("midStartCommit2");
synchronized(this) { synchronized(this) {
// If someone saved a newer version of segments file
// since I first started syncing my version, I can
// safely skip saving myself since I've been
// superseded:
while(true) { assert pendingCommit == null;
if (myChangeCount <= lastCommitChangeCount) {
if (infoStream != null) {
message("sync superseded by newer infos");
}
break;
} else if (pendingCommit == null) {
// My turn to commit
if (segmentInfos.getGeneration() > toSync.getGeneration()) assert segmentInfos.getGeneration() == toSync.getGeneration();
toSync.updateGeneration(segmentInfos);
boolean success = false;
try {
// Exception here means nothing is prepared // Exception here means nothing is prepared
// (this method unwinds everything it did on // (this method unwinds everything it did on
// an exception) // an exception)
try {
toSync.prepareCommit(directory); toSync.prepareCommit(directory);
} finally {
// Have our master segmentInfos record the
// generations we just prepared. We do this
// on error or success so we don't
// double-write a segments_N file.
segmentInfos.updateGeneration(toSync);
}
assert pendingCommit == null;
setPending = true;
pendingCommit = toSync; pendingCommit = toSync;
pendingCommitChangeCount = myChangeCount; pendingCommitChangeCount = myChangeCount;
success = true;
} finally {
if (!success && infoStream != null)
message("hit exception committing segments file");
}
break;
} else {
// Must wait for other commit to complete
doWait();
}
}
} }
if (infoStream != null) if (infoStream != null)
@ -4400,10 +4371,22 @@ public class IndexWriter implements Closeable {
} finally { } finally {
synchronized(this) { synchronized(this) {
if (!setPending)
// Have our master segmentInfos record the
// generations we just prepared. We do this
// on error or success so we don't
// double-write a segments_N file.
segmentInfos.updateGeneration(toSync);
if (pendingCommit == null) {
if (infoStream != null) {
message("hit exception committing segments file");
}
deleter.decRef(toSync); deleter.decRef(toSync);
} }
} }
}
} catch (OutOfMemoryError oom) { } catch (OutOfMemoryError oom) {
handleOOM(oom, "startCommit"); handleOOM(oom, "startCommit");
} }