diff --git a/CHANGES.txt b/CHANGES.txt
index 28064156499..0d59408e569 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -97,6 +97,12 @@ Bug fixes
8. LUCENE-1267: Added numDocs() and maxDoc() to IndexWriter;
deprecated docCount(). (Mike McCandless)
+
+ 9. LUCENE-1274: Added new prepareCommit() method to IndexWriter,
+ which does phase 1 of a 2-phase commit (commit() does phase 2).
+ This is needed when you want to update an index as part of a
+ transaction involving external resources (eg a database). Also
+ deprecated abort(), renaming it to rollback(). (Mike McCandless)
New features
diff --git a/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java b/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java
index 9ea7903b120..5ef05a96cfd 100644
--- a/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java
+++ b/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java
@@ -138,6 +138,9 @@ public class ConcurrentMergeScheduler extends MergeScheduler {
public void merge(IndexWriter writer)
throws CorruptIndexException, IOException {
+ // TODO: enable this once we are on JRE 1.5
+ // assert !Thread.holdsLock(writer);
+
this.writer = writer;
initMergeThreadPriority();
diff --git a/src/java/org/apache/lucene/index/DocumentsWriter.java b/src/java/org/apache/lucene/index/DocumentsWriter.java
index 2232f48f9e8..681b3897edb 100644
--- a/src/java/org/apache/lucene/index/DocumentsWriter.java
+++ b/src/java/org/apache/lucene/index/DocumentsWriter.java
@@ -348,11 +348,12 @@ final class DocumentsWriter {
abortCount++;
}
- /** Called if we hit an exception when adding docs,
- * flushing, etc. This resets our state, discarding any
- * docs added since last flush. If ae is non-null, it
- * contains the root cause exception (which we re-throw
- * after we are done aborting). */
+ /** Called if we hit an exception at a bad time (when
+ * updating the index files) and must discard all
+ * currently buffered docs. This resets our state,
+ * discarding any docs added since last flush. If ae is
+ * non-null, it contains the root cause exception (which
+ * we re-throw after we are done aborting). */
synchronized void abort(AbortException ae) throws IOException {
// Anywhere that throws an AbortException must first
diff --git a/src/java/org/apache/lucene/index/FieldsWriter.java b/src/java/org/apache/lucene/index/FieldsWriter.java
index 5dfb5b6f665..e6dff0fc1b1 100644
--- a/src/java/org/apache/lucene/index/FieldsWriter.java
+++ b/src/java/org/apache/lucene/index/FieldsWriter.java
@@ -64,8 +64,16 @@ final class FieldsWriter
success = true;
} finally {
if (!success) {
- close();
- d.deleteFile(fieldsName);
+ try {
+ close();
+ } catch (Throwable t) {
+ // Suppress so we keep throwing the original exception
+ }
+ try {
+ d.deleteFile(fieldsName);
+ } catch (Throwable t) {
+ // Suppress so we keep throwing the original exception
+ }
}
}
@@ -77,9 +85,20 @@ final class FieldsWriter
success = true;
} finally {
if (!success) {
- close();
- d.deleteFile(fieldsName);
- d.deleteFile(indexName);
+ try {
+ close();
+ } catch (IOException ioe) {
+ }
+ try {
+ d.deleteFile(fieldsName);
+ } catch (Throwable t) {
+ // Suppress so we keep throwing the original exception
+ }
+ try {
+ d.deleteFile(indexName);
+ } catch (Throwable t) {
+ // Suppress so we keep throwing the original exception
+ }
}
}
diff --git a/src/java/org/apache/lucene/index/IndexWriter.java b/src/java/org/apache/lucene/index/IndexWriter.java
index 3d273edab68..c62984a3d97 100644
--- a/src/java/org/apache/lucene/index/IndexWriter.java
+++ b/src/java/org/apache/lucene/index/IndexWriter.java
@@ -306,6 +306,9 @@ public class IndexWriter {
private SegmentInfos rollbackSegmentInfos; // segmentInfos we will fallback to if the commit fails
private HashMap rollbackSegments;
+ volatile SegmentInfos pendingCommit; // set when a commit is pending (after prepareCommit() & before commit())
+ volatile long pendingCommitChangeCount;
+
private SegmentInfos localRollbackSegmentInfos; // segmentInfos we will fallback to if the commit fails
private boolean localAutoCommit; // saved autoCommit during local transaction
private int localFlushedDocCount; // saved docWriter.getFlushedDocCount during local transaction
@@ -364,12 +367,13 @@ public class IndexWriter {
infoStream.println("IW " + messageID + " [" + Thread.currentThread().getName() + "]: " + message);
}
- private synchronized void setMessageID() {
+ private synchronized void setMessageID(PrintStream infoStream) {
if (infoStream != null && messageID == -1) {
synchronized(MESSAGE_ID_LOCK) {
messageID = MESSAGE_ID++;
}
}
+ this.infoStream = infoStream;
}
/**
@@ -1082,9 +1086,8 @@ public class IndexWriter {
this.closeDir = closeDir;
directory = d;
analyzer = a;
- this.infoStream = defaultInfoStream;
+ setMessageID(defaultInfoStream);
this.maxFieldLength = maxFieldLength;
- setMessageID();
if (create) {
// Clear the write lock in case it's leftover:
@@ -1496,8 +1499,7 @@ public class IndexWriter {
*/
public void setInfoStream(PrintStream infoStream) {
ensureOpen();
- this.infoStream = infoStream;
- setMessageID();
+ setMessageID(infoStream);
docWriter.setInfoStream(infoStream);
deleter.setInfoStream(infoStream);
if (infoStream != null)
@@ -1672,7 +1674,7 @@ public class IndexWriter {
if (infoStream != null)
message("now call final commit()");
- commit(true, 0);
+ commit(0);
if (infoStream != null)
message("at close: " + segString());
@@ -2571,7 +2573,7 @@ public class IndexWriter {
if (autoCommit) {
boolean success = false;
try {
- commit(true, 0);
+ commit(0);
success = true;
} finally {
if (!success) {
@@ -2587,6 +2589,13 @@ public class IndexWriter {
localRollbackSegmentInfos = null;
}
+ /**
+ * @deprecated Please use {@link #rollback} instead.
+ */
+ public void abort() throws IOException {
+ rollback();
+ }
+
/**
* Close the IndexWriter
without committing
* any of the changes that have occurred since it was
@@ -2594,18 +2603,27 @@ public class IndexWriter {
* created, after which the state of the index will be the
* same as it was when this writer was first opened. This
* can only be called when this IndexWriter was opened
- * with autoCommit=false
.
+ * with autoCommit=false
. This also clears a
+ * previous call to {@link #prepareCommit}.
* @throws IllegalStateException if this is called when
* the writer was opened with autoCommit=true
.
* @throws IOException if there is a low-level IO error
*/
- public void abort() throws IOException {
+ public void rollback() throws IOException {
ensureOpen();
if (autoCommit)
throw new IllegalStateException("abort() can only be called when IndexWriter was opened with autoCommit=false");
boolean doClose;
synchronized(this) {
+
+ if (pendingCommit != null) {
+ pendingCommit.rollbackCommit(directory);
+ deleter.decRef(pendingCommit);
+ pendingCommit = null;
+ notifyAll();
+ }
+
// Ensure that only one thread actually gets to do the closing:
if (!closing) {
doClose = true;
@@ -3113,10 +3131,54 @@ public class IndexWriter {
flush(true, false, true);
}
+ /**
Expert: prepare for commit. This does the first + * phase of 2-phase commit. You can only call this when + * autoCommit is false. This method does all steps + * necessary to commit changes since this writer was + * opened: flushes pending added and deleted docs, syncs + * the index files, writes most of next segments_N file. + * After calling this you must call either {@link + * #commit()} to finish the commit, or {@link + * #rollback()} to revert the commit and undo all changes + * done since the writer was opened.
+ * + * You can also just call {@link #commit()} directly + * without prepareCommit first in which case that method + * will internally call prepareCommit. + */ + public final void prepareCommit() throws CorruptIndexException, IOException { + prepareCommit(false); + } + + private final void prepareCommit(boolean internal) throws CorruptIndexException, IOException { + + if (hitOOM) + throw new IllegalStateException("this writer hit an OutOfMemoryError; cannot commit"); + + if (autoCommit && !internal) + throw new IllegalStateException("this method can only be used when autoCommit is false"); + + if (!autoCommit && pendingCommit != null) + throw new IllegalStateException("prepareCommit was already called with no corresponding call to commit"); + + message("prepareCommit: flush"); + + flush(true, true, true); + + startCommit(0); + } + + private void commit(long sizeInBytes) throws IOException { + startCommit(sizeInBytes); + finishCommit(); + } + /** - *Commits all pending updates (added & deleted documents) - * to the index, and syncs all referenced index files, - * such that a reader will see the changes. Note that + *
Commits all pending updates (added & deleted + * documents) to the index, and syncs all referenced index + * files, such that a reader will see the changes and the + * index updates will survive an OS or machine crash or + * power loss (though, see the note below). Note that * this does not wait for any running background merges to * finish. This may be a costly operation, so you should * test the cost in your application and do it only when @@ -3135,12 +3197,38 @@ public class IndexWriter { * consistency on such devices.
*/ public final void commit() throws CorruptIndexException, IOException { - commit(true); + + message("commit: start"); + + if (autoCommit || pendingCommit == null) { + message("commit: now prepare"); + prepareCommit(true); + } else + message("commit: already prepared"); + + finishCommit(); } - private final void commit(boolean triggerMerges) throws CorruptIndexException, IOException { - flush(triggerMerges, true, true); - commit(true, 0); + private synchronized final void finishCommit() throws CorruptIndexException, IOException { + + if (pendingCommit != null) { + try { + message("commit: pendingCommit != null"); + pendingCommit.finishCommit(directory); + lastCommitChangeCount = pendingCommitChangeCount; + segmentInfos.updateGeneration(pendingCommit); + setRollbackSegmentInfos(); + deleter.checkpoint(pendingCommit, true); + } finally { + deleter.decRef(pendingCommit); + pendingCommit = null; + notifyAll(); + } + + } else + message("commit: pendingCommit == null; skip"); + + message("commit: done"); } /** @@ -3176,8 +3264,7 @@ public class IndexWriter { // when flushing a segment; otherwise deletes may become // visible before their corresponding added document // from an updateDocument call - if (autoCommit) - flushDeletes = true; + flushDeletes |= autoCommit; // Returns true if docWriter is currently aborting, in // which case we skip flushing this segment @@ -3935,7 +4022,7 @@ public class IndexWriter { synchronized(this) { size = merge.info.sizeInBytes(); } - commit(false, size); + commit(size); } success = false; @@ -3988,7 +4075,7 @@ public class IndexWriter { synchronized(this) { size = merge.info.sizeInBytes(); } - commit(false, size); + commit(size); } return mergedDocCount; @@ -4151,13 +4238,13 @@ public class IndexWriter { } /** Walk through all files referenced by the current - * segmentInfos, minus flushes, and ask the Directory to - * sync each file, if it wasn't already. If that - * succeeds, then we write a new segments_N file & sync - * that. */ - private void commit(boolean skipWait, long sizeInBytes) throws IOException { + * segmentInfos and ask the Directory to sync each file, + * if it wasn't already. If that succeeds, then we + * prepare a new segments_N file but do not fully commit + * it. */ + private void startCommit(long sizeInBytes) throws IOException { - assert testPoint("startCommit"); + assert testPoint("startStartCommit"); if (hitOOM) return; @@ -4165,9 +4252,9 @@ public class IndexWriter { try { if (infoStream != null) - message("start commit() skipWait=" + skipWait + " sizeInBytes=" + sizeInBytes); + message("startCommit(): start sizeInBytes=" + sizeInBytes); - if (!skipWait) + if (sizeInBytes > 0) syncPause(sizeInBytes); SegmentInfos toSync = null; @@ -4179,7 +4266,7 @@ public class IndexWriter { if (changeCount == lastCommitChangeCount) { if (infoStream != null) - message(" skip commit(): no changes pending"); + message(" skip startCommit(): no changes pending"); return; } @@ -4189,15 +4276,17 @@ public class IndexWriter { // threads can be doing this at once, if say a large // merge and a small merge finish at the same time: + if (infoStream != null) + message("startCommit index=" + segString(segmentInfos) + " changeCount=" + changeCount); + toSync = (SegmentInfos) segmentInfos.clone(); deleter.incRef(toSync, false); myChangeCount = changeCount; } - if (infoStream != null) - message("commit index=" + segString(toSync)); + assert testPoint("midStartCommit"); - assert testPoint("midCommit"); + boolean setPending = false; try { @@ -4237,54 +4326,72 @@ public class IndexWriter { break; } - assert testPoint("midCommit2"); - + assert testPoint("midStartCommit2"); + synchronized(this) { // If someone saved a newer version of segments file // since I first started syncing my version, I can // safely skip saving myself since I've been // superseded: - if (myChangeCount > lastCommitChangeCount) { - + if (myChangeCount > lastCommitChangeCount && (pendingCommit == null || myChangeCount > pendingCommitChangeCount)) { + + // Wait now for any current pending commit to complete: + while(pendingCommit != null) { + message("wait for existing pendingCommit to finish..."); + try { + wait(); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + } + } + if (segmentInfos.getGeneration() > toSync.getGeneration()) toSync.updateGeneration(segmentInfos); boolean success = false; try { - toSync.commit(directory); + + // Exception here means nothing is prepared + // (this method unwinds everything it did on + // an exception) + try { + toSync.prepareCommit(directory); + } finally { + // Have our master segmentInfos record the + // generations we just prepared. We do this + // on error or success so we don't + // double-write a segments_N file. + segmentInfos.updateGeneration(toSync); + } + + assert pendingCommit == null; + setPending = true; + pendingCommit = toSync; + pendingCommitChangeCount = myChangeCount; success = true; } finally { - // Have our master segmentInfos record the - // generations we just sync'd - segmentInfos.updateGeneration(toSync); if (!success) message("hit exception committing segments file"); } - - message("commit complete"); - - lastCommitChangeCount = myChangeCount; - - deleter.checkpoint(toSync, true); - setRollbackSegmentInfos(); } else message("sync superseded by newer infos"); } message("done all syncs"); - assert testPoint("midCommitSuccess"); + assert testPoint("midStartCommitSuccess"); } finally { synchronized(this) { - deleter.decRef(toSync); + if (!setPending) + deleter.decRef(toSync); } } } catch (OutOfMemoryError oom) { hitOOM = true; throw oom; } - assert testPoint("finishCommit"); + assert testPoint("finishStartCommit"); } /** @@ -4377,11 +4484,11 @@ public class IndexWriter { // Used only by assert for testing. Current points: // startDoFlush // startCommitMerge - // startCommit - // midCommit - // midCommit2 - // midCommitSuccess - // finishCommit + // startStartCommit + // midStartCommit + // midStartCommit2 + // midStartCommitSuccess + // finishStartCommit // startCommitMergeDeletes // startMergeInit // startApplyDeletes diff --git a/src/java/org/apache/lucene/index/SegmentInfos.java b/src/java/org/apache/lucene/index/SegmentInfos.java index 334a3848bbc..90e79aa0ef5 100644 --- a/src/java/org/apache/lucene/index/SegmentInfos.java +++ b/src/java/org/apache/lucene/index/SegmentInfos.java @@ -274,6 +274,10 @@ final class SegmentInfos extends Vector { }.run(); } + // Only non-null after prepareCommit has been called and + // before finishCommit is called + ChecksumIndexOutput pendingOutput; + private final void write(Directory directory) throws IOException { String segmentFileName = getNextSegmentFileName(); @@ -298,53 +302,27 @@ final class SegmentInfos extends Vector { for (int i = 0; i < size(); i++) { info(i).write(output); } - final long checksum = output.getChecksum(); - output.writeLong(checksum); + output.prepareCommit(); success = true; + pendingOutput = output; } finally { - boolean success2 = false; - try { - if (!success) { - // We hit an exception above; try to close the file - // but suppress any exception: - try { - output.close(); - success2 = true; - } catch (Throwable t) { - // Suppress so we keep throwing the original exception - } - } else { + if (!success) { + // We hit an exception above; try to close the file + // but suppress any exception: + try { output.close(); - success2 = true; + } catch (Throwable t) { + // Suppress so we keep throwing the original exception } - } finally { - if (!success || !success2) { - try { - // Try not to leave a truncated segments_N file in - // the index: - directory.deleteFile(segmentFileName); - } catch (Throwable t) { - // Suppress so we keep throwing the original exception - } + try { + // Try not to leave a truncated segments_N file in + // the index: + directory.deleteFile(segmentFileName); + } catch (Throwable t) { + // Suppress so we keep throwing the original exception } } } - - try { - IndexOutput genOutput = directory.createOutput(IndexFileNames.SEGMENTS_GEN); - try { - genOutput.writeInt(FORMAT_LOCKLESS); - genOutput.writeLong(generation); - genOutput.writeLong(generation); - } finally { - genOutput.close(); - } - } catch (IOException e) { - // It's OK if we fail to write this file since it's - // used only as one of the retry fallbacks. - } - - lastGeneration = generation; } /** @@ -355,7 +333,7 @@ final class SegmentInfos extends Vector { public Object clone() { SegmentInfos sis = (SegmentInfos) super.clone(); for(int i=0;i