LUCENE-1274: add preparCommit() to IW to do phase 1 of 2-phase commit

git-svn-id: https://svn.apache.org/repos/asf/lucene/java/trunk@653878 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael McCandless 2008-05-06 18:41:10 +00:00
parent 38d1caf56f
commit 8540820fe1
12 changed files with 662 additions and 134 deletions

View File

@ -98,6 +98,12 @@ Bug fixes
8. LUCENE-1267: Added numDocs() and maxDoc() to IndexWriter; 8. LUCENE-1267: Added numDocs() and maxDoc() to IndexWriter;
deprecated docCount(). (Mike McCandless) deprecated docCount(). (Mike McCandless)
9. LUCENE-1274: Added new prepareCommit() method to IndexWriter,
which does phase 1 of a 2-phase commit (commit() does phase 2).
This is needed when you want to update an index as part of a
transaction involving external resources (eg a database). Also
deprecated abort(), renaming it to rollback(). (Mike McCandless)
New features New features
1. LUCENE-1137: Added Token.set/getFlags() accessors for passing more information about a Token through the analysis 1. LUCENE-1137: Added Token.set/getFlags() accessors for passing more information about a Token through the analysis

View File

@ -138,6 +138,9 @@ public class ConcurrentMergeScheduler extends MergeScheduler {
public void merge(IndexWriter writer) public void merge(IndexWriter writer)
throws CorruptIndexException, IOException { throws CorruptIndexException, IOException {
// TODO: enable this once we are on JRE 1.5
// assert !Thread.holdsLock(writer);
this.writer = writer; this.writer = writer;
initMergeThreadPriority(); initMergeThreadPriority();

View File

@ -348,11 +348,12 @@ final class DocumentsWriter {
abortCount++; abortCount++;
} }
/** Called if we hit an exception when adding docs, /** Called if we hit an exception at a bad time (when
* flushing, etc. This resets our state, discarding any * updating the index files) and must discard all
* docs added since last flush. If ae is non-null, it * currently buffered docs. This resets our state,
* contains the root cause exception (which we re-throw * discarding any docs added since last flush. If ae is
* after we are done aborting). */ * non-null, it contains the root cause exception (which
* we re-throw after we are done aborting). */
synchronized void abort(AbortException ae) throws IOException { synchronized void abort(AbortException ae) throws IOException {
// Anywhere that throws an AbortException must first // Anywhere that throws an AbortException must first

View File

@ -64,8 +64,16 @@ final class FieldsWriter
success = true; success = true;
} finally { } finally {
if (!success) { if (!success) {
try {
close(); close();
} catch (Throwable t) {
// Suppress so we keep throwing the original exception
}
try {
d.deleteFile(fieldsName); d.deleteFile(fieldsName);
} catch (Throwable t) {
// Suppress so we keep throwing the original exception
}
} }
} }
@ -77,9 +85,20 @@ final class FieldsWriter
success = true; success = true;
} finally { } finally {
if (!success) { if (!success) {
try {
close(); close();
} catch (IOException ioe) {
}
try {
d.deleteFile(fieldsName); d.deleteFile(fieldsName);
} catch (Throwable t) {
// Suppress so we keep throwing the original exception
}
try {
d.deleteFile(indexName); d.deleteFile(indexName);
} catch (Throwable t) {
// Suppress so we keep throwing the original exception
}
} }
} }

View File

@ -306,6 +306,9 @@ public class IndexWriter {
private SegmentInfos rollbackSegmentInfos; // segmentInfos we will fallback to if the commit fails private SegmentInfos rollbackSegmentInfos; // segmentInfos we will fallback to if the commit fails
private HashMap rollbackSegments; private HashMap rollbackSegments;
volatile SegmentInfos pendingCommit; // set when a commit is pending (after prepareCommit() & before commit())
volatile long pendingCommitChangeCount;
private SegmentInfos localRollbackSegmentInfos; // segmentInfos we will fallback to if the commit fails private SegmentInfos localRollbackSegmentInfos; // segmentInfos we will fallback to if the commit fails
private boolean localAutoCommit; // saved autoCommit during local transaction private boolean localAutoCommit; // saved autoCommit during local transaction
private int localFlushedDocCount; // saved docWriter.getFlushedDocCount during local transaction private int localFlushedDocCount; // saved docWriter.getFlushedDocCount during local transaction
@ -364,12 +367,13 @@ public class IndexWriter {
infoStream.println("IW " + messageID + " [" + Thread.currentThread().getName() + "]: " + message); infoStream.println("IW " + messageID + " [" + Thread.currentThread().getName() + "]: " + message);
} }
private synchronized void setMessageID() { private synchronized void setMessageID(PrintStream infoStream) {
if (infoStream != null && messageID == -1) { if (infoStream != null && messageID == -1) {
synchronized(MESSAGE_ID_LOCK) { synchronized(MESSAGE_ID_LOCK) {
messageID = MESSAGE_ID++; messageID = MESSAGE_ID++;
} }
} }
this.infoStream = infoStream;
} }
/** /**
@ -1082,9 +1086,8 @@ public class IndexWriter {
this.closeDir = closeDir; this.closeDir = closeDir;
directory = d; directory = d;
analyzer = a; analyzer = a;
this.infoStream = defaultInfoStream; setMessageID(defaultInfoStream);
this.maxFieldLength = maxFieldLength; this.maxFieldLength = maxFieldLength;
setMessageID();
if (create) { if (create) {
// Clear the write lock in case it's leftover: // Clear the write lock in case it's leftover:
@ -1496,8 +1499,7 @@ public class IndexWriter {
*/ */
public void setInfoStream(PrintStream infoStream) { public void setInfoStream(PrintStream infoStream) {
ensureOpen(); ensureOpen();
this.infoStream = infoStream; setMessageID(infoStream);
setMessageID();
docWriter.setInfoStream(infoStream); docWriter.setInfoStream(infoStream);
deleter.setInfoStream(infoStream); deleter.setInfoStream(infoStream);
if (infoStream != null) if (infoStream != null)
@ -1672,7 +1674,7 @@ public class IndexWriter {
if (infoStream != null) if (infoStream != null)
message("now call final commit()"); message("now call final commit()");
commit(true, 0); commit(0);
if (infoStream != null) if (infoStream != null)
message("at close: " + segString()); message("at close: " + segString());
@ -2571,7 +2573,7 @@ public class IndexWriter {
if (autoCommit) { if (autoCommit) {
boolean success = false; boolean success = false;
try { try {
commit(true, 0); commit(0);
success = true; success = true;
} finally { } finally {
if (!success) { if (!success) {
@ -2587,6 +2589,13 @@ public class IndexWriter {
localRollbackSegmentInfos = null; localRollbackSegmentInfos = null;
} }
/**
* @deprecated Please use {@link #rollback} instead.
*/
public void abort() throws IOException {
rollback();
}
/** /**
* Close the <code>IndexWriter</code> without committing * Close the <code>IndexWriter</code> without committing
* any of the changes that have occurred since it was * any of the changes that have occurred since it was
@ -2594,18 +2603,27 @@ public class IndexWriter {
* created, after which the state of the index will be the * created, after which the state of the index will be the
* same as it was when this writer was first opened. This * same as it was when this writer was first opened. This
* can only be called when this IndexWriter was opened * can only be called when this IndexWriter was opened
* with <code>autoCommit=false</code>. * with <code>autoCommit=false</code>. This also clears a
* previous call to {@link #prepareCommit}.
* @throws IllegalStateException if this is called when * @throws IllegalStateException if this is called when
* the writer was opened with <code>autoCommit=true</code>. * the writer was opened with <code>autoCommit=true</code>.
* @throws IOException if there is a low-level IO error * @throws IOException if there is a low-level IO error
*/ */
public void abort() throws IOException { public void rollback() throws IOException {
ensureOpen(); ensureOpen();
if (autoCommit) if (autoCommit)
throw new IllegalStateException("abort() can only be called when IndexWriter was opened with autoCommit=false"); throw new IllegalStateException("abort() can only be called when IndexWriter was opened with autoCommit=false");
boolean doClose; boolean doClose;
synchronized(this) { synchronized(this) {
if (pendingCommit != null) {
pendingCommit.rollbackCommit(directory);
deleter.decRef(pendingCommit);
pendingCommit = null;
notifyAll();
}
// Ensure that only one thread actually gets to do the closing: // Ensure that only one thread actually gets to do the closing:
if (!closing) { if (!closing) {
doClose = true; doClose = true;
@ -3113,10 +3131,54 @@ public class IndexWriter {
flush(true, false, true); flush(true, false, true);
} }
/** <p>Expert: prepare for commit. This does the first
* phase of 2-phase commit. You can only call this when
* autoCommit is false. This method does all steps
* necessary to commit changes since this writer was
* opened: flushes pending added and deleted docs, syncs
* the index files, writes most of next segments_N file.
* After calling this you must call either {@link
* #commit()} to finish the commit, or {@link
* #rollback()} to revert the commit and undo all changes
* done since the writer was opened.</p>
*
* You can also just call {@link #commit()} directly
* without prepareCommit first in which case that method
* will internally call prepareCommit.
*/
public final void prepareCommit() throws CorruptIndexException, IOException {
prepareCommit(false);
}
private final void prepareCommit(boolean internal) throws CorruptIndexException, IOException {
if (hitOOM)
throw new IllegalStateException("this writer hit an OutOfMemoryError; cannot commit");
if (autoCommit && !internal)
throw new IllegalStateException("this method can only be used when autoCommit is false");
if (!autoCommit && pendingCommit != null)
throw new IllegalStateException("prepareCommit was already called with no corresponding call to commit");
message("prepareCommit: flush");
flush(true, true, true);
startCommit(0);
}
private void commit(long sizeInBytes) throws IOException {
startCommit(sizeInBytes);
finishCommit();
}
/** /**
* <p>Commits all pending updates (added & deleted documents) * <p>Commits all pending updates (added & deleted
* to the index, and syncs all referenced index files, * documents) to the index, and syncs all referenced index
* such that a reader will see the changes. Note that * files, such that a reader will see the changes and the
* index updates will survive an OS or machine crash or
* power loss (though, see the note below). Note that
* this does not wait for any running background merges to * this does not wait for any running background merges to
* finish. This may be a costly operation, so you should * finish. This may be a costly operation, so you should
* test the cost in your application and do it only when * test the cost in your application and do it only when
@ -3135,12 +3197,38 @@ public class IndexWriter {
* consistency on such devices. </p> * consistency on such devices. </p>
*/ */
public final void commit() throws CorruptIndexException, IOException { public final void commit() throws CorruptIndexException, IOException {
commit(true);
message("commit: start");
if (autoCommit || pendingCommit == null) {
message("commit: now prepare");
prepareCommit(true);
} else
message("commit: already prepared");
finishCommit();
} }
private final void commit(boolean triggerMerges) throws CorruptIndexException, IOException { private synchronized final void finishCommit() throws CorruptIndexException, IOException {
flush(triggerMerges, true, true);
commit(true, 0); if (pendingCommit != null) {
try {
message("commit: pendingCommit != null");
pendingCommit.finishCommit(directory);
lastCommitChangeCount = pendingCommitChangeCount;
segmentInfos.updateGeneration(pendingCommit);
setRollbackSegmentInfos();
deleter.checkpoint(pendingCommit, true);
} finally {
deleter.decRef(pendingCommit);
pendingCommit = null;
notifyAll();
}
} else
message("commit: pendingCommit == null; skip");
message("commit: done");
} }
/** /**
@ -3176,8 +3264,7 @@ public class IndexWriter {
// when flushing a segment; otherwise deletes may become // when flushing a segment; otherwise deletes may become
// visible before their corresponding added document // visible before their corresponding added document
// from an updateDocument call // from an updateDocument call
if (autoCommit) flushDeletes |= autoCommit;
flushDeletes = true;
// Returns true if docWriter is currently aborting, in // Returns true if docWriter is currently aborting, in
// which case we skip flushing this segment // which case we skip flushing this segment
@ -3935,7 +4022,7 @@ public class IndexWriter {
synchronized(this) { synchronized(this) {
size = merge.info.sizeInBytes(); size = merge.info.sizeInBytes();
} }
commit(false, size); commit(size);
} }
success = false; success = false;
@ -3988,7 +4075,7 @@ public class IndexWriter {
synchronized(this) { synchronized(this) {
size = merge.info.sizeInBytes(); size = merge.info.sizeInBytes();
} }
commit(false, size); commit(size);
} }
return mergedDocCount; return mergedDocCount;
@ -4151,13 +4238,13 @@ public class IndexWriter {
} }
/** Walk through all files referenced by the current /** Walk through all files referenced by the current
* segmentInfos, minus flushes, and ask the Directory to * segmentInfos and ask the Directory to sync each file,
* sync each file, if it wasn't already. If that * if it wasn't already. If that succeeds, then we
* succeeds, then we write a new segments_N file & sync * prepare a new segments_N file but do not fully commit
* that. */ * it. */
private void commit(boolean skipWait, long sizeInBytes) throws IOException { private void startCommit(long sizeInBytes) throws IOException {
assert testPoint("startCommit"); assert testPoint("startStartCommit");
if (hitOOM) if (hitOOM)
return; return;
@ -4165,9 +4252,9 @@ public class IndexWriter {
try { try {
if (infoStream != null) if (infoStream != null)
message("start commit() skipWait=" + skipWait + " sizeInBytes=" + sizeInBytes); message("startCommit(): start sizeInBytes=" + sizeInBytes);
if (!skipWait) if (sizeInBytes > 0)
syncPause(sizeInBytes); syncPause(sizeInBytes);
SegmentInfos toSync = null; SegmentInfos toSync = null;
@ -4179,7 +4266,7 @@ public class IndexWriter {
if (changeCount == lastCommitChangeCount) { if (changeCount == lastCommitChangeCount) {
if (infoStream != null) if (infoStream != null)
message(" skip commit(): no changes pending"); message(" skip startCommit(): no changes pending");
return; return;
} }
@ -4189,15 +4276,17 @@ public class IndexWriter {
// threads can be doing this at once, if say a large // threads can be doing this at once, if say a large
// merge and a small merge finish at the same time: // merge and a small merge finish at the same time:
if (infoStream != null)
message("startCommit index=" + segString(segmentInfos) + " changeCount=" + changeCount);
toSync = (SegmentInfos) segmentInfos.clone(); toSync = (SegmentInfos) segmentInfos.clone();
deleter.incRef(toSync, false); deleter.incRef(toSync, false);
myChangeCount = changeCount; myChangeCount = changeCount;
} }
if (infoStream != null) assert testPoint("midStartCommit");
message("commit index=" + segString(toSync));
assert testPoint("midCommit"); boolean setPending = false;
try { try {
@ -4237,46 +4326,64 @@ public class IndexWriter {
break; break;
} }
assert testPoint("midCommit2"); assert testPoint("midStartCommit2");
synchronized(this) { synchronized(this) {
// If someone saved a newer version of segments file // If someone saved a newer version of segments file
// since I first started syncing my version, I can // since I first started syncing my version, I can
// safely skip saving myself since I've been // safely skip saving myself since I've been
// superseded: // superseded:
if (myChangeCount > lastCommitChangeCount) { if (myChangeCount > lastCommitChangeCount && (pendingCommit == null || myChangeCount > pendingCommitChangeCount)) {
// Wait now for any current pending commit to complete:
while(pendingCommit != null) {
message("wait for existing pendingCommit to finish...");
try {
wait();
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
}
}
if (segmentInfos.getGeneration() > toSync.getGeneration()) if (segmentInfos.getGeneration() > toSync.getGeneration())
toSync.updateGeneration(segmentInfos); toSync.updateGeneration(segmentInfos);
boolean success = false; boolean success = false;
try { try {
toSync.commit(directory);
success = true; // Exception here means nothing is prepared
// (this method unwinds everything it did on
// an exception)
try {
toSync.prepareCommit(directory);
} finally { } finally {
// Have our master segmentInfos record the // Have our master segmentInfos record the
// generations we just sync'd // generations we just prepared. We do this
// on error or success so we don't
// double-write a segments_N file.
segmentInfos.updateGeneration(toSync); segmentInfos.updateGeneration(toSync);
}
assert pendingCommit == null;
setPending = true;
pendingCommit = toSync;
pendingCommitChangeCount = myChangeCount;
success = true;
} finally {
if (!success) if (!success)
message("hit exception committing segments file"); message("hit exception committing segments file");
} }
message("commit complete");
lastCommitChangeCount = myChangeCount;
deleter.checkpoint(toSync, true);
setRollbackSegmentInfos();
} else } else
message("sync superseded by newer infos"); message("sync superseded by newer infos");
} }
message("done all syncs"); message("done all syncs");
assert testPoint("midCommitSuccess"); assert testPoint("midStartCommitSuccess");
} finally { } finally {
synchronized(this) { synchronized(this) {
if (!setPending)
deleter.decRef(toSync); deleter.decRef(toSync);
} }
} }
@ -4284,7 +4391,7 @@ public class IndexWriter {
hitOOM = true; hitOOM = true;
throw oom; throw oom;
} }
assert testPoint("finishCommit"); assert testPoint("finishStartCommit");
} }
/** /**
@ -4377,11 +4484,11 @@ public class IndexWriter {
// Used only by assert for testing. Current points: // Used only by assert for testing. Current points:
// startDoFlush // startDoFlush
// startCommitMerge // startCommitMerge
// startCommit // startStartCommit
// midCommit // midStartCommit
// midCommit2 // midStartCommit2
// midCommitSuccess // midStartCommitSuccess
// finishCommit // finishStartCommit
// startCommitMergeDeletes // startCommitMergeDeletes
// startMergeInit // startMergeInit
// startApplyDeletes // startApplyDeletes

View File

@ -274,6 +274,10 @@ final class SegmentInfos extends Vector {
}.run(); }.run();
} }
// Only non-null after prepareCommit has been called and
// before finishCommit is called
ChecksumIndexOutput pendingOutput;
private final void write(Directory directory) throws IOException { private final void write(Directory directory) throws IOException {
String segmentFileName = getNextSegmentFileName(); String segmentFileName = getNextSegmentFileName();
@ -298,27 +302,18 @@ final class SegmentInfos extends Vector {
for (int i = 0; i < size(); i++) { for (int i = 0; i < size(); i++) {
info(i).write(output); info(i).write(output);
} }
final long checksum = output.getChecksum(); output.prepareCommit();
output.writeLong(checksum);
success = true; success = true;
pendingOutput = output;
} finally { } finally {
boolean success2 = false;
try {
if (!success) { if (!success) {
// We hit an exception above; try to close the file // We hit an exception above; try to close the file
// but suppress any exception: // but suppress any exception:
try { try {
output.close(); output.close();
success2 = true;
} catch (Throwable t) { } catch (Throwable t) {
// Suppress so we keep throwing the original exception // Suppress so we keep throwing the original exception
} }
} else {
output.close();
success2 = true;
}
} finally {
if (!success || !success2) {
try { try {
// Try not to leave a truncated segments_N file in // Try not to leave a truncated segments_N file in
// the index: // the index:
@ -330,23 +325,6 @@ final class SegmentInfos extends Vector {
} }
} }
try {
IndexOutput genOutput = directory.createOutput(IndexFileNames.SEGMENTS_GEN);
try {
genOutput.writeInt(FORMAT_LOCKLESS);
genOutput.writeLong(generation);
genOutput.writeLong(generation);
} finally {
genOutput.close();
}
} catch (IOException e) {
// It's OK if we fail to write this file since it's
// used only as one of the retry fallbacks.
}
lastGeneration = generation;
}
/** /**
* Returns a copy of this instance, also copying each * Returns a copy of this instance, also copying each
* SegmentInfo. * SegmentInfo.
@ -355,7 +333,7 @@ final class SegmentInfos extends Vector {
public Object clone() { public Object clone() {
SegmentInfos sis = (SegmentInfos) super.clone(); SegmentInfos sis = (SegmentInfos) super.clone();
for(int i=0;i<sis.size();i++) { for(int i=0;i<sis.size();i++) {
sis.setElementAt(((SegmentInfo) sis.elementAt(i)).clone(), i); sis.setElementAt(sis.info(i).clone(), i);
} }
return sis; return sis;
} }
@ -739,45 +717,73 @@ final class SegmentInfos extends Vector {
// Carry over generation numbers from another SegmentInfos // Carry over generation numbers from another SegmentInfos
void updateGeneration(SegmentInfos other) { void updateGeneration(SegmentInfos other) {
assert other.generation > generation;
lastGeneration = other.lastGeneration; lastGeneration = other.lastGeneration;
generation = other.generation; generation = other.generation;
version = other.version; version = other.version;
} }
/** Writes & syncs to the Directory dir, taking care to public final void rollbackCommit(Directory dir) throws IOException {
* remove the segments file on exception */ if (pendingOutput != null) {
public final void commit(Directory dir) throws IOException {
boolean success = false;
try { try {
write(dir); pendingOutput.close();
success = true; } catch (Throwable t) {
} finally { // Suppress so we keep throwing the original exception
if (!success) { // in our caller
}
// Must carefully compute fileName from "generation" // Must carefully compute fileName from "generation"
// since lastGeneration isn't incremented: // since lastGeneration isn't incremented:
try {
final String segmentFileName = IndexFileNames.fileNameFromGeneration(IndexFileNames.SEGMENTS, final String segmentFileName = IndexFileNames.fileNameFromGeneration(IndexFileNames.SEGMENTS,
"", "",
generation); generation);
try {
dir.deleteFile(segmentFileName); dir.deleteFile(segmentFileName);
} catch (Throwable t) { } catch (Throwable t) {
// Suppress so we keep throwing the original exception // Suppress so we keep throwing the original exception
// in our caller
}
pendingOutput = null;
} }
} }
/** Call this to start a commit. This writes the new
* segments file, but writes an invalid checksum at the
* end, so that it is not visible to readers. Once this
* is called you must call {@link #finishCommit} to complete
* the commit or {@link #rollbackCommit} to abort it. */
public final void prepareCommit(Directory dir) throws IOException {
if (pendingOutput != null)
throw new IllegalStateException("prepareCommit was already called");
write(dir);
}
public final void finishCommit(Directory dir) throws IOException {
if (pendingOutput == null)
throw new IllegalStateException("prepareCommit was not called");
boolean success = false;
try {
pendingOutput.finishCommit();
pendingOutput.close();
pendingOutput = null;
success = true;
} finally {
if (!success)
rollbackCommit(dir);
} }
// NOTE: if we crash here, we have left a segments_N // NOTE: if we crash here, we have left a segments_N
// file in the directory in a possibly corrupt state (if // file in the directory in a possibly corrupt state (if
// some bytes made it to stable storage and others // some bytes made it to stable storage and others
// didn't). But, the segments_N file now includes // didn't). But, the segments_N file includes checksum
// checksum at the end, which should catch this case. // at the end, which should catch this case. So when a
// So when a reader tries to read it, it will throw a // reader tries to read it, it will throw a
// CorruptIndexException, which should cause the retry // CorruptIndexException, which should cause the retry
// logic in SegmentInfos to kick in and load the last // logic in SegmentInfos to kick in and load the last
// good (previous) segments_N-1 file. // good (previous) segments_N-1 file.
final String fileName = getCurrentSegmentFileName(); final String fileName = IndexFileNames.fileNameFromGeneration(IndexFileNames.SEGMENTS,
"",
generation);
success = false; success = false;
try { try {
dir.sync(fileName); dir.sync(fileName);
@ -791,5 +797,28 @@ final class SegmentInfos extends Vector {
} }
} }
} }
lastGeneration = generation;
try {
IndexOutput genOutput = dir.createOutput(IndexFileNames.SEGMENTS_GEN);
try {
genOutput.writeInt(FORMAT_LOCKLESS);
genOutput.writeLong(generation);
genOutput.writeLong(generation);
} finally {
genOutput.close();
}
} catch (Throwable t) {
// It's OK if we fail to write this file since it's
// used only as one of the retry fallbacks.
}
}
/** Writes & syncs to the Directory dir, taking care to
* remove the segments file on exception */
public final void commit(Directory dir) throws IOException {
prepareCommit(dir);
finishCommit(dir);
} }
} }

View File

@ -62,6 +62,30 @@ public class ChecksumIndexOutput extends IndexOutput {
throw new RuntimeException("not allowed"); throw new RuntimeException("not allowed");
} }
/**
* Starts but does not complete the commit of this file (=
* writing of the final checksum at the end). After this
* is called must call {@link #finishCommit} and the
* {@link #close} to complete the commit.
*/
public void prepareCommit() throws IOException {
final long checksum = getChecksum();
// Intentionally write a mismatched checksum. This is
// because we want to 1) test, as best we can, that we
// are able to write a long to the file, but 2) not
// actually "commit" the file yet. This (prepare
// commit) is phase 1 of a two-phase commit.
final long pos = main.getFilePointer();
main.writeLong(checksum-1);
main.flush();
main.seek(pos);
}
/** See {@link #prepareCommit} */
public void finishCommit() throws IOException {
main.writeLong(getChecksum());
}
public long length() throws IOException { public long length() throws IOException {
return main.length(); return main.length();
} }

View File

@ -68,6 +68,7 @@ public class TestAtomicUpdate extends LuceneTestCase {
count++; count++;
} }
} catch (Throwable e) { } catch (Throwable e) {
System.out.println(Thread.currentThread().getName() + ": exc");
e.printStackTrace(System.out); e.printStackTrace(System.out);
failed = true; failed = true;
} }
@ -111,11 +112,7 @@ public class TestAtomicUpdate extends LuceneTestCase {
public void doWork() throws Throwable { public void doWork() throws Throwable {
IndexReader r = IndexReader.open(directory); IndexReader r = IndexReader.open(directory);
try {
assertEquals(100, r.numDocs()); assertEquals(100, r.numDocs());
} catch (Throwable t) {
throw t;
}
r.close(); r.close();
} }
} }
@ -141,6 +138,10 @@ public class TestAtomicUpdate extends LuceneTestCase {
} }
writer.commit(); writer.commit();
IndexReader r = IndexReader.open(directory);
assertEquals(100, r.numDocs());
r.close();
IndexerThread indexerThread = new IndexerThread(writer, threads); IndexerThread indexerThread = new IndexerThread(writer, threads);
threads[0] = indexerThread; threads[0] = indexerThread;
indexerThread.start(); indexerThread.start();

View File

@ -3302,7 +3302,7 @@ public class TestIndexWriter extends LuceneTestCase
boolean isCommit = false; boolean isCommit = false;
boolean isDelete = false; boolean isDelete = false;
for (int i = 0; i < trace.length; i++) { for (int i = 0; i < trace.length; i++) {
if ("org.apache.lucene.index.SegmentInfos".equals(trace[i].getClassName()) && "commit".equals(trace[i].getMethodName())) if ("org.apache.lucene.index.SegmentInfos".equals(trace[i].getClassName()) && "prepareCommit".equals(trace[i].getMethodName()))
isCommit = true; isCommit = true;
if ("org.apache.lucene.store.MockRAMDirectory".equals(trace[i].getClassName()) && "deleteFile".equals(trace[i].getMethodName())) if ("org.apache.lucene.store.MockRAMDirectory".equals(trace[i].getClassName()) && "deleteFile".equals(trace[i].getMethodName()))
isDelete = true; isDelete = true;
@ -3603,4 +3603,124 @@ public class TestIndexWriter extends LuceneTestCase
s.close(); s.close();
dir.close(); dir.close();
} }
// LUCENE-1274: test writer.prepareCommit()
public void testPrepareCommit() throws IOException {
Directory dir = new MockRAMDirectory();
IndexWriter writer = new IndexWriter(dir, false, new WhitespaceAnalyzer(), IndexWriter.MaxFieldLength.LIMITED);
writer.setMaxBufferedDocs(2);
writer.setMergeFactor(5);
for (int i = 0; i < 23; i++)
addDoc(writer);
IndexReader reader = IndexReader.open(dir);
assertEquals(0, reader.numDocs());
writer.prepareCommit();
IndexReader reader2 = IndexReader.open(dir);
assertEquals(0, reader2.numDocs());
writer.commit();
IndexReader reader3 = reader.reopen();
assertEquals(0, reader.numDocs());
assertEquals(0, reader2.numDocs());
assertEquals(23, reader3.numDocs());
reader.close();
reader2.close();
for (int i = 0; i < 17; i++)
addDoc(writer);
assertEquals(23, reader3.numDocs());
reader3.close();
reader = IndexReader.open(dir);
assertEquals(23, reader.numDocs());
reader.close();
writer.prepareCommit();
reader = IndexReader.open(dir);
assertEquals(23, reader.numDocs());
reader.close();
writer.commit();
reader = IndexReader.open(dir);
assertEquals(40, reader.numDocs());
reader.close();
writer.close();
dir.close();
}
// LUCENE-1274: test writer.prepareCommit()
public void testPrepareCommitRollback() throws IOException {
MockRAMDirectory dir = new MockRAMDirectory();
dir.setPreventDoubleWrite(false);
IndexWriter writer = new IndexWriter(dir, false, new WhitespaceAnalyzer(), IndexWriter.MaxFieldLength.LIMITED);
writer.setMaxBufferedDocs(2);
writer.setMergeFactor(5);
for (int i = 0; i < 23; i++)
addDoc(writer);
IndexReader reader = IndexReader.open(dir);
assertEquals(0, reader.numDocs());
writer.prepareCommit();
IndexReader reader2 = IndexReader.open(dir);
assertEquals(0, reader2.numDocs());
writer.rollback();
IndexReader reader3 = reader.reopen();
assertEquals(0, reader.numDocs());
assertEquals(0, reader2.numDocs());
assertEquals(0, reader3.numDocs());
reader.close();
reader2.close();
writer = new IndexWriter(dir, false, new WhitespaceAnalyzer(), IndexWriter.MaxFieldLength.LIMITED);
for (int i = 0; i < 17; i++)
addDoc(writer);
assertEquals(0, reader3.numDocs());
reader3.close();
reader = IndexReader.open(dir);
assertEquals(0, reader.numDocs());
reader.close();
writer.prepareCommit();
reader = IndexReader.open(dir);
assertEquals(0, reader.numDocs());
reader.close();
writer.commit();
reader = IndexReader.open(dir);
assertEquals(17, reader.numDocs());
reader.close();
writer.close();
dir.close();
}
// LUCENE-1274
public void testPrepareCommitNoChanges() throws IOException {
MockRAMDirectory dir = new MockRAMDirectory();
IndexWriter writer = new IndexWriter(dir, false, new WhitespaceAnalyzer(), IndexWriter.MaxFieldLength.LIMITED);
writer.prepareCommit();
writer.commit();
writer.close();
IndexReader reader = IndexReader.open(dir);
assertEquals(0, reader.numDocs());
reader.close();
dir.close();
}
} }

View File

@ -53,6 +53,7 @@ public class TestStressIndexing extends LuceneTestCase {
count++; count++;
} }
} catch (Throwable e) { } catch (Throwable e) {
System.out.println(Thread.currentThread() + ": exc");
e.printStackTrace(System.out); e.printStackTrace(System.out);
failed = true; failed = true;
} }

View File

@ -0,0 +1,217 @@
package org.apache.lucene.index;
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.io.IOException;
import java.util.Random;
import org.apache.lucene.store.*;
import org.apache.lucene.util.*;
import org.apache.lucene.analysis.*;
import org.apache.lucene.document.*;
public class TestTransactions extends LuceneTestCase
{
private static final Random RANDOM = new Random();
private static volatile boolean doFail;
private class RandomFailure extends MockRAMDirectory.Failure {
public void eval(MockRAMDirectory dir) throws IOException {
if (TestTransactions.doFail && RANDOM.nextInt() % 10 <= 3)
throw new IOException("now failing randomly but on purpose");
}
}
private static abstract class TimedThread extends Thread {
boolean failed;
private static int RUN_TIME_SEC = 6;
private TimedThread[] allThreads;
abstract public void doWork() throws Throwable;
TimedThread(TimedThread[] threads) {
this.allThreads = threads;
}
public void run() {
final long stopTime = System.currentTimeMillis() + 1000*RUN_TIME_SEC;
try {
while(System.currentTimeMillis() < stopTime && !anyErrors())
doWork();
} catch (Throwable e) {
System.out.println(Thread.currentThread() + ": exc");
e.printStackTrace(System.out);
failed = true;
}
}
private boolean anyErrors() {
for(int i=0;i<allThreads.length;i++)
if (allThreads[i] != null && allThreads[i].failed)
return true;
return false;
}
}
private static class IndexerThread extends TimedThread {
Directory dir1;
Directory dir2;
Object lock;
int nextID;
public IndexerThread(Object lock, Directory dir1, Directory dir2, TimedThread[] threads) {
super(threads);
this.lock = lock;
this.dir1 = dir1;
this.dir2 = dir2;
}
public void doWork() throws Throwable {
IndexWriter writer1 = new IndexWriter(dir1, false, new WhitespaceAnalyzer(), IndexWriter.MaxFieldLength.LIMITED);
writer1.setMaxBufferedDocs(3);
writer1.setMergeFactor(2);
((ConcurrentMergeScheduler) writer1.getMergeScheduler()).setSuppressExceptions();
IndexWriter writer2 = new IndexWriter(dir2, false, new WhitespaceAnalyzer(), IndexWriter.MaxFieldLength.LIMITED);
// Intentionally use different params so flush/merge
// happen @ different times
writer2.setMaxBufferedDocs(2);
writer2.setMergeFactor(3);
((ConcurrentMergeScheduler) writer2.getMergeScheduler()).setSuppressExceptions();
update(writer1);
update(writer2);
TestTransactions.doFail = true;
try {
synchronized(lock) {
try {
writer1.prepareCommit();
} catch (Throwable t) {
writer1.rollback();
writer2.rollback();
return;
}
try {
writer2.prepareCommit();
} catch (Throwable t) {
writer1.rollback();
writer2.rollback();
return;
}
writer1.commit();
writer2.commit();
}
} finally {
TestTransactions.doFail = false;
}
writer1.close();
writer2.close();
}
public void update(IndexWriter writer) throws IOException {
// Add 10 docs:
for(int j=0; j<10; j++) {
Document d = new Document();
int n = RANDOM.nextInt();
d.add(new Field("id", Integer.toString(nextID++), Field.Store.YES, Field.Index.UN_TOKENIZED));
d.add(new Field("contents", English.intToEnglish(n), Field.Store.NO, Field.Index.TOKENIZED));
writer.addDocument(d);
}
// Delete 5 docs:
int deleteID = nextID-1;
for(int j=0; j<5; j++) {
writer.deleteDocuments(new Term("id", ""+deleteID));
deleteID -= 2;
}
}
}
private static class SearcherThread extends TimedThread {
Directory dir1;
Directory dir2;
Object lock;
public SearcherThread(Object lock, Directory dir1, Directory dir2, TimedThread[] threads) {
super(threads);
this.lock = lock;
this.dir1 = dir1;
this.dir2 = dir2;
}
public void doWork() throws Throwable {
IndexReader r1, r2;
synchronized(lock) {
r1 = IndexReader.open(dir1);
r2 = IndexReader.open(dir2);
}
if (r1.numDocs() != r2.numDocs())
throw new RuntimeException("doc counts differ: r1=" + r1.numDocs() + " r2=" + r2.numDocs());
r1.close();
r2.close();
}
}
public void initIndex(Directory dir) throws Throwable {
IndexWriter writer = new IndexWriter(dir, false, new WhitespaceAnalyzer(), IndexWriter.MaxFieldLength.LIMITED);
for(int j=0; j<7; j++) {
Document d = new Document();
int n = RANDOM.nextInt();
d.add(new Field("contents", English.intToEnglish(n), Field.Store.NO, Field.Index.TOKENIZED));
writer.addDocument(d);
}
writer.close();
}
public void testTransactions() throws Throwable {
MockRAMDirectory dir1 = new MockRAMDirectory();
MockRAMDirectory dir2 = new MockRAMDirectory();
dir1.setPreventDoubleWrite(false);
dir2.setPreventDoubleWrite(false);
dir1.failOn(new RandomFailure());
dir2.failOn(new RandomFailure());
initIndex(dir1);
initIndex(dir2);
TimedThread[] threads = new TimedThread[3];
int numThread = 0;
IndexerThread indexerThread = new IndexerThread(this, dir1, dir2, threads);
threads[numThread++] = indexerThread;
indexerThread.start();
SearcherThread searcherThread1 = new SearcherThread(this, dir1, dir2, threads);
threads[numThread++] = searcherThread1;
searcherThread1.start();
SearcherThread searcherThread2 = new SearcherThread(this, dir1, dir2, threads);
threads[numThread++] = searcherThread2;
searcherThread2.start();
for(int i=0;i<numThread;i++)
threads[i].join();
for(int i=0;i<numThread;i++)
assertTrue(!((TimedThread) threads[i]).failed);
}
}

View File

@ -209,7 +209,7 @@ public class MockRAMDirectory extends RAMDirectory {
throw new IOException("cannot createOutput after crash"); throw new IOException("cannot createOutput after crash");
init(); init();
synchronized(openFiles) { synchronized(openFiles) {
if (preventDoubleWrite && createdFiles.contains(name)) if (preventDoubleWrite && createdFiles.contains(name) && !name.equals("segments.gen"))
throw new IOException("file \"" + name + "\" was already written to"); throw new IOException("file \"" + name + "\" was already written to");
if (noDeleteOpenFile && openFiles.containsKey(name)) if (noDeleteOpenFile && openFiles.containsKey(name))
throw new IOException("MockRAMDirectory: file \"" + name + "\" is still open: cannot overwrite"); throw new IOException("MockRAMDirectory: file \"" + name + "\" is still open: cannot overwrite");