LUCENE-4147: fix thread safety issues when IndexWriter's rollback & commit are called simultaneously

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1352600 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael McCandless 2012-06-21 16:28:57 +00:00
parent a112e601f1
commit 580828461c
6 changed files with 262 additions and 144 deletions

View File

@ -1062,6 +1062,9 @@ Bug fixes
* LUCENE-4114: Fix int overflow bugs in BYTES_FIXED_STRAIGHT and
BYTES_FIXED_DEREF doc values implementations (Walt Elder via Mike McCandless).
* LUCENE-4147: Fixed thread safety issues when rollback() and commit()
are called simultaneously. (Simon Willnauer, Mike McCandless)
Documentation
* LUCENE-3958: Javadocs corrections for IndexWriter.

View File

@ -202,7 +202,6 @@ final class DocumentsWriter {
* discarding any docs added since last flush. */
synchronized void abort() throws IOException {
boolean success = false;
synchronized (this) {
deleteQueue.clear();
}
@ -233,6 +232,7 @@ final class DocumentsWriter {
perThread.unlock();
}
}
flushControl.waitForFlush();
success = true;
} finally {
if (infoStream.isEnabled("DW")) {

View File

@ -343,8 +343,10 @@ final class DocumentsWriterFlushControl {
synchronized void setClosed() {
// set by DW to signal that we should not release new DWPT after close
this.closed = true;
perThreadPool.deactivateUnreleasedStates();
if (!closed) {
this.closed = true;
perThreadPool.deactivateUnreleasedStates();
}
}
/**

View File

@ -463,10 +463,6 @@ class DocumentsWriterPerThread {
pendingDeletes.docIDs.clear();
}
if (infoStream.isEnabled("DWPT")) {
infoStream.message("DWPT", "flush postings as segment " + flushState.segmentInfo.name + " numDocs=" + numDocsInRAM);
}
if (aborting) {
if (infoStream.isEnabled("DWPT")) {
infoStream.message("DWPT", "flush: skip because aborting is set");
@ -474,6 +470,10 @@ class DocumentsWriterPerThread {
return null;
}
if (infoStream.isEnabled("DWPT")) {
infoStream.message("DWPT", "flush postings as segment " + flushState.segmentInfo.name + " numDocs=" + numDocsInRAM);
}
boolean success = false;
try {

View File

@ -841,15 +841,19 @@ public class IndexWriter implements Closeable, TwoPhaseCommit {
*/
public void close(boolean waitForMerges) throws CorruptIndexException, IOException {
// Ensure that only one thread actually gets to do the closing:
if (shouldClose()) {
// If any methods have hit OutOfMemoryError, then abort
// on close, in case the internal state of IndexWriter
// or DocumentsWriter is corrupt
if (hitOOM)
rollbackInternal();
else
closeInternal(waitForMerges);
// Ensure that only one thread actually gets to do the
// closing, and make sure no commit is also in progress:
synchronized(commitLock) {
if (shouldClose()) {
// If any methods have hit OutOfMemoryError, then abort
// on close, in case the internal state of IndexWriter
// or DocumentsWriter is corrupt
if (hitOOM) {
rollbackInternal();
} else {
closeInternal(waitForMerges, !hitOOM);
}
}
}
}
@ -868,12 +872,13 @@ public class IndexWriter implements Closeable, TwoPhaseCommit {
// successfully) or another (fails to close)
doWait();
}
} else
} else {
return false;
}
}
}
private void closeInternal(boolean waitForMerges) throws CorruptIndexException, IOException {
private void closeInternal(boolean waitForMerges, boolean doFlush) throws CorruptIndexException, IOException {
try {
@ -889,8 +894,10 @@ public class IndexWriter implements Closeable, TwoPhaseCommit {
// Only allow a new merge to be triggered if we are
// going to wait for merges:
if (!hitOOM) {
if (doFlush) {
flush(waitForMerges, true);
} else {
docWriter.abort(); // already closed
}
if (waitForMerges)
@ -910,7 +917,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit {
infoStream.message("IW", "now call final commit()");
}
if (!hitOOM) {
if (doFlush) {
commitInternal(null);
}
@ -1774,9 +1781,13 @@ public class IndexWriter implements Closeable, TwoPhaseCommit {
public void rollback() throws IOException {
ensureOpen();
// Ensure that only one thread actually gets to do the closing:
if (shouldClose())
rollbackInternal();
// Ensure that only one thread actually gets to do the
// closing, and make sure no commit is also in progress:
synchronized(commitLock) {
if (shouldClose()) {
rollbackInternal();
}
}
}
private void rollbackInternal() throws IOException {
@ -1786,6 +1797,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit {
if (infoStream.isEnabled("IW")) {
infoStream.message("IW", "rollback");
}
try {
synchronized(this) {
@ -1804,7 +1816,8 @@ public class IndexWriter implements Closeable, TwoPhaseCommit {
mergeScheduler.close();
bufferedDeletesStream.clear();
docWriter.close(); // mark it as closed first to prevent subsequent indexing actions/flushes
docWriter.abort();
synchronized(this) {
if (pendingCommit != null) {
@ -1826,8 +1839,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit {
if (infoStream.isEnabled("IW") ) {
infoStream.message("IW", "rollback: infos=" + segString(segmentInfos));
}
docWriter.abort();
assert testPoint("rollback before checkpoint");
@ -1854,7 +1866,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit {
}
}
closeInternal(false);
closeInternal(false, false);
}
/**
@ -2482,99 +2494,102 @@ public class IndexWriter implements Closeable, TwoPhaseCommit {
public final void prepareCommit(Map<String,String> commitUserData) throws CorruptIndexException, IOException {
ensureOpen(false);
if (infoStream.isEnabled("IW")) {
infoStream.message("IW", "prepareCommit: flush");
infoStream.message("IW", " index before flush " + segString());
}
synchronized(commitLock) {
if (infoStream.isEnabled("IW")) {
infoStream.message("IW", "prepareCommit: flush");
infoStream.message("IW", " index before flush " + segString());
}
if (hitOOM) {
throw new IllegalStateException("this writer hit an OutOfMemoryError; cannot commit");
}
if (hitOOM) {
throw new IllegalStateException("this writer hit an OutOfMemoryError; cannot commit");
}
if (pendingCommit != null) {
throw new IllegalStateException("prepareCommit was already called with no corresponding call to commit");
}
if (pendingCommit != null) {
throw new IllegalStateException("prepareCommit was already called with no corresponding call to commit");
}
doBeforeFlush();
assert testPoint("startDoFlush");
SegmentInfos toCommit = null;
boolean anySegmentsFlushed = false;
doBeforeFlush();
assert testPoint("startDoFlush");
SegmentInfos toCommit = null;
boolean anySegmentsFlushed = false;
// This is copied from doFlush, except it's modified to
// clone & incRef the flushed SegmentInfos inside the
// sync block:
// This is copied from doFlush, except it's modified to
// clone & incRef the flushed SegmentInfos inside the
// sync block:
try {
try {
synchronized (fullFlushLock) {
boolean flushSuccess = false;
boolean success = false;
try {
anySegmentsFlushed = docWriter.flushAllThreads();
if (!anySegmentsFlushed) {
// prevent double increment since docWriter#doFlush increments the flushcount
// if we flushed anything.
flushCount.incrementAndGet();
}
flushSuccess = true;
synchronized(this) {
maybeApplyDeletes(true);
readerPool.commit(segmentInfos);
// Must clone the segmentInfos while we still
// hold fullFlushLock and while sync'd so that
// no partial changes (eg a delete w/o
// corresponding add from an updateDocument) can
// sneak into the commit point:
toCommit = segmentInfos.clone();
pendingCommitChangeCount = changeCount;
// This protects the segmentInfos we are now going
// to commit. This is important in case, eg, while
// we are trying to sync all referenced files, a
// merge completes which would otherwise have
// removed the files we are now syncing.
filesToCommit = toCommit.files(directory, false);
deleter.incRef(filesToCommit);
}
success = true;
} finally {
if (!success) {
if (infoStream.isEnabled("IW")) {
infoStream.message("IW", "hit exception during prepareCommit");
synchronized (fullFlushLock) {
boolean flushSuccess = false;
boolean success = false;
try {
anySegmentsFlushed = docWriter.flushAllThreads();
if (!anySegmentsFlushed) {
// prevent double increment since docWriter#doFlush increments the flushcount
// if we flushed anything.
flushCount.incrementAndGet();
}
}
// Done: finish the full flush!
docWriter.finishFullFlush(flushSuccess);
doAfterFlush();
}
}
} catch (OutOfMemoryError oom) {
handleOOM(oom, "prepareCommit");
}
boolean success = false;
try {
if (anySegmentsFlushed) {
maybeMerge();
}
success = true;
} finally {
if (!success) {
synchronized (this) {
deleter.decRef(filesToCommit);
filesToCommit = null;
}
}
}
flushSuccess = true;
startCommit(toCommit, commitUserData);
synchronized(this) {
maybeApplyDeletes(true);
readerPool.commit(segmentInfos);
// Must clone the segmentInfos while we still
// hold fullFlushLock and while sync'd so that
// no partial changes (eg a delete w/o
// corresponding add from an updateDocument) can
// sneak into the commit point:
toCommit = segmentInfos.clone();
pendingCommitChangeCount = changeCount;
// This protects the segmentInfos we are now going
// to commit. This is important in case, eg, while
// we are trying to sync all referenced files, a
// merge completes which would otherwise have
// removed the files we are now syncing.
filesToCommit = toCommit.files(directory, false);
deleter.incRef(filesToCommit);
}
success = true;
} finally {
if (!success) {
if (infoStream.isEnabled("IW")) {
infoStream.message("IW", "hit exception during prepareCommit");
}
}
// Done: finish the full flush!
docWriter.finishFullFlush(flushSuccess);
doAfterFlush();
}
}
} catch (OutOfMemoryError oom) {
handleOOM(oom, "prepareCommit");
}
boolean success = false;
try {
if (anySegmentsFlushed) {
maybeMerge();
}
success = true;
} finally {
if (!success) {
synchronized (this) {
deleter.decRef(filesToCommit);
filesToCommit = null;
}
}
}
startCommit(toCommit, commitUserData);
}
}
// Used only by commit, below; lock order is commitLock -> IW
// Used only by commit and prepareCommit, below; lock
// order is commitLock -> IW
private final Object commitLock = new Object();
/**
@ -2634,6 +2649,8 @@ public class IndexWriter implements Closeable, TwoPhaseCommit {
}
synchronized(commitLock) {
ensureOpen(false);
if (infoStream.isEnabled("IW")) {
infoStream.message("IW", "commit: enter lock");
}

View File

@ -19,6 +19,10 @@ package org.apache.lucene.index;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.lucene.analysis.MockAnalyzer;
import org.apache.lucene.document.Document;
@ -31,6 +35,7 @@ import org.apache.lucene.store.Directory;
import org.apache.lucene.store.MockDirectoryWrapper;
import org.apache.lucene.util.Bits;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.LineFileDocs;
import org.apache.lucene.util.LuceneTestCase;
import org.apache.lucene.util.ThreadInterruptedException;
import org.apache.lucene.util._TestUtil;
@ -456,41 +461,132 @@ public class TestIndexWriterWithThreads extends LuceneTestCase {
dir.close();
}
static class DelayedIndexAndCloseRunnable extends Thread {
private final Directory dir;
boolean failed = false;
Throwable failure = null;
private final CountDownLatch startIndexing = new CountDownLatch(1);
private CountDownLatch iwConstructed;
static class DelayedIndexAndCloseRunnable extends Thread {
private final Directory dir;
boolean failed = false;
Throwable failure = null;
private final CountDownLatch startIndexing = new CountDownLatch(1);
private CountDownLatch iwConstructed;
public DelayedIndexAndCloseRunnable(Directory dir,
CountDownLatch iwConstructed) {
this.dir = dir;
this.iwConstructed = iwConstructed;
}
public DelayedIndexAndCloseRunnable(Directory dir,
CountDownLatch iwConstructed) {
this.dir = dir;
this.iwConstructed = iwConstructed;
}
public void startIndexing() {
this.startIndexing.countDown();
}
public void startIndexing() {
this.startIndexing.countDown();
}
@Override
public void run() {
try {
Document doc = new Document();
Field field = newTextField("field", "testData", Field.Store.YES);
doc.add(field);
IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig(
TEST_VERSION_CURRENT, new MockAnalyzer(random())));
iwConstructed.countDown();
startIndexing.await();
writer.addDocument(doc);
writer.close();
} catch (Throwable e) {
failed = true;
failure = e;
failure.printStackTrace(System.out);
return;
}
}
}
@Override
public void run() {
try {
Document doc = new Document();
Field field = newTextField("field", "testData", Field.Store.YES);
doc.add(field);
IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig(
TEST_VERSION_CURRENT, new MockAnalyzer(random())));
iwConstructed.countDown();
startIndexing.await();
writer.addDocument(doc);
writer.close();
} catch (Throwable e) {
failed = true;
failure = e;
failure.printStackTrace(System.out);
return;
}
}
}
// LUCENE-4147
public void testRollbackAndCommitWithThreads() throws Exception {
final MockDirectoryWrapper d = newFSDirectory(_TestUtil.getTempDir("RollbackAndCommitWithThreads"));
d.setPreventDoubleWrite(false);
final int threadCount = _TestUtil.nextInt(random(), 2, 6);
final AtomicReference<IndexWriter> writerRef = new AtomicReference<IndexWriter>();
writerRef.set(new IndexWriter(d, newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random()))));
final LineFileDocs docs = new LineFileDocs(random());
final Thread[] threads = new Thread[threadCount];
final int iters = atLeast(1000);
final AtomicBoolean failed = new AtomicBoolean();
final Lock rollbackLock = new ReentrantLock();
final Lock commitLock = new ReentrantLock();
for(int threadID=0;threadID<threadCount;threadID++) {
threads[threadID] = new Thread() {
@Override
public void run() {
for(int iter=0;iter<iters && !failed.get();iter++) {
//final int x = random().nextInt(5);
final int x = random().nextInt(3);
try {
switch(x) {
case 0:
rollbackLock.lock();
if (VERBOSE) {
System.out.println("\nTEST: " + Thread.currentThread().getName() + ": now rollback");
}
try {
writerRef.get().rollback();
if (VERBOSE) {
System.out.println("TEST: " + Thread.currentThread().getName() + ": rollback done; now open new writer");
}
writerRef.set(new IndexWriter(d, newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random()))));
} finally {
rollbackLock.unlock();
}
break;
case 1:
commitLock.lock();
if (VERBOSE) {
System.out.println("\nTEST: " + Thread.currentThread().getName() + ": now commit");
}
try {
if (random().nextBoolean()) {
writerRef.get().prepareCommit();
}
writerRef.get().commit();
} catch (AlreadyClosedException ace) {
// ok
} catch (NullPointerException npe) {
// ok
} finally {
commitLock.unlock();
}
break;
case 2:
if (VERBOSE) {
System.out.println("\nTEST: " + Thread.currentThread().getName() + ": now add");
}
try {
writerRef.get().addDocument(docs.nextDoc());
} catch (AlreadyClosedException ace) {
// ok
} catch (NullPointerException npe) {
// ok
} catch (AssertionError ae) {
// ok
}
break;
}
} catch (Throwable t) {
failed.set(true);
throw new RuntimeException(t);
}
}
}
};
threads[threadID].start();
}
for(int threadID=0;threadID<threadCount;threadID++) {
threads[threadID].join();
}
assertTrue(!failed.get());
writerRef.get().close();
d.close();
}
}