LUCENE-6214: don't deadlock when tragedy strikes during getReader and another thread is committing

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1656366 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael McCandless 2015-02-01 21:46:13 +00:00
parent e4ac9ae227
commit 80e5fbc7a8
4 changed files with 135 additions and 33 deletions

View File

@ -493,6 +493,11 @@ Bug Fixes
BlendedInfixSuggester for lookups that do not end in a prefix
token. (jane chang via Mike McCandless)
* LUCENE-6214: Fixed IndexWriter deadlock when one thread is
committing while another opens a near-real-time reader and an
unrecoverable (tragic) exception is hit. (Simon Willnauer, Mike
McCandless)
Documentation
* LUCENE-5392: Add/improve analysis package documentation to reflect

View File

@ -629,7 +629,8 @@ final class DocumentsWriter implements Closeable, Accountable {
return anythingFlushed;
}
final void finishFullFlush(boolean success) {
final void finishFullFlush(IndexWriter indexWriter, boolean success) {
assert indexWriter.holdsFullFlushLock();
try {
if (infoStream.isEnabled("DW")) {
infoStream.message("DW", Thread.currentThread().getName() + " finishFullFlush success=" + success);

View File

@ -412,8 +412,8 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
*/
boolean success2 = false;
try {
boolean success = false;
synchronized (fullFlushLock) {
boolean success = false;
try {
anyChanges = docWriter.flushAllThreads();
if (!anyChanges) {
@ -421,7 +421,6 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
// if we flushed anything.
flushCount.incrementAndGet();
}
success = true;
// Prevent segmentInfos from changing while opening the
// reader; in theory we could instead do similar retry logic,
// just like we do when loading segments_N
@ -432,22 +431,18 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
infoStream.message("IW", "return reader version=" + r.getVersion() + " reader=" + r);
}
}
} catch (AbortingException | OutOfMemoryError tragedy) {
tragicEvent(tragedy, "getReader");
// never reached but javac disagrees:
return null;
success = true;
} finally {
if (!success) {
// Done: finish the full flush!
docWriter.finishFullFlush(this, success);
if (success) {
processEvents(false, true);
doAfterFlush();
} else {
if (infoStream.isEnabled("IW")) {
infoStream.message("IW", "hit exception during NRT reader");
}
}
if (tragedy == null) {
// Done: finish the full flush! (unless we hit OOM or something)
docWriter.finishFullFlush(success);
processEvents(false, true);
doAfterFlush();
}
}
}
if (anyChanges) {
@ -457,6 +452,10 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
infoStream.message("IW", "getReader took " + (System.currentTimeMillis() - tStart) + " msec");
}
success2 = true;
} catch (AbortingException | OutOfMemoryError tragedy) {
tragicEvent(tragedy, "getReader");
// never reached but javac disagrees:
return null;
} finally {
if (!success2) {
IOUtils.closeWhileHandlingException(r);
@ -639,6 +638,9 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
*/
public synchronized ReadersAndUpdates get(SegmentCommitInfo info, boolean create) {
// Make sure no new readers can be opened if another thread just closed us:
ensureOpen(false);
assert info.info.dir == directory: "info.dir=" + info.info.dir + " vs " + directory;
ReadersAndUpdates rld = readerMap.get(info);
@ -2042,17 +2044,17 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
/* hold the full flush lock to prevent concurrency commits / NRT reopens to
* get in our way and do unnecessary work. -- if we don't lock this here we might
* get in trouble if */
synchronized (fullFlushLock) {
/*
* We first abort and trash everything we have in-memory
* and keep the thread-states locked, the lockAndAbortAll operation
* also guarantees "point in time semantics" ie. the checkpoint that we need in terms
* of logical happens-before relationship in the DW. So we do
* abort all in memory structures
* We also drop global field numbering before during abort to make
* sure it's just like a fresh index.
*/
try {
/*
* We first abort and trash everything we have in-memory
* and keep the thread-states locked, the lockAndAbortAll operation
* also guarantees "point in time semantics" ie. the checkpoint that we need in terms
* of logical happens-before relationship in the DW. So we do
* abort all in memory structures
* We also drop global field numbering before during abort to make
* sure it's just like a fresh index.
*/
try {
synchronized (fullFlushLock) {
docWriter.lockAndAbortAll(this);
processEvents(false, true);
synchronized (this) {
@ -2077,6 +2079,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
globalFieldNumberMap.clear();
success = true;
} finally {
docWriter.unlockAllAfterAbortAll(this);
if (!success) {
if (infoStream.isEnabled("IW")) {
infoStream.message("IW", "hit exception during deleteAll");
@ -2084,11 +2087,9 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
}
}
}
} catch (OutOfMemoryError oom) {
tragicEvent(oom, "deleteAll");
} finally {
docWriter.unlockAllAfterAbortAll(this);
}
} catch (OutOfMemoryError oom) {
tragicEvent(oom, "deleteAll");
}
}
@ -2699,7 +2700,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
}
}
// Done: finish the full flush!
docWriter.finishFullFlush(flushSuccess);
docWriter.finishFullFlush(this, flushSuccess);
doAfterFlush();
}
}
@ -2944,16 +2945,16 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
boolean anyChanges = false;
synchronized (fullFlushLock) {
boolean flushSuccess = false;
boolean flushSuccess = false;
try {
anyChanges = docWriter.flushAllThreads();
if (!anyChanges) {
// flushCount is incremented in flushAllThreads
flushCount.incrementAndGet();
}
}
flushSuccess = true;
} finally {
docWriter.finishFullFlush(flushSuccess);
docWriter.finishFullFlush(this, flushSuccess);
processEvents(false, true);
}
}

View File

@ -0,0 +1,95 @@
package org.apache.lucene.index;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.lucene.document.Document;
import org.apache.lucene.store.MockDirectoryWrapper;
import org.apache.lucene.util.LuceneTestCase;
public class TestTragicIndexWriterDeadlock extends LuceneTestCase {
public void testDeadlockExcNRTReaderCommit() throws Exception {
MockDirectoryWrapper dir = newMockDirectory();
IndexWriterConfig iwc = newIndexWriterConfig();
if (iwc.getMergeScheduler() instanceof ConcurrentMergeScheduler) {
iwc.setMergeScheduler(new SuppressingConcurrentMergeScheduler() {
@Override
protected boolean isOK(Throwable th) {
return true;
}
});
}
final IndexWriter w = new IndexWriter(dir, iwc);
final CountDownLatch startingGun = new CountDownLatch(1);
final AtomicBoolean done = new AtomicBoolean();
Thread commitThread = new Thread() {
@Override
public void run() {
try {
startingGun.await();
while (done.get() == false) {
w.addDocument(new Document());
w.commit();
}
} catch (Throwable t) {
done.set(true);
//System.out.println("commit exc:");
//t.printStackTrace(System.out);
}
}
};
commitThread.start();
final DirectoryReader r0 = DirectoryReader.open(w, true);
Thread nrtThread = new Thread() {
@Override
public void run() {
DirectoryReader r = r0;
try {
try {
startingGun.await();
while (done.get() == false) {
DirectoryReader oldReader = r;
DirectoryReader r2 = DirectoryReader.openIfChanged(oldReader);
if (r2 != null) {
r = r2;
oldReader.decRef();
}
}
} finally {
r.close();
}
} catch (Throwable t) {
done.set(true);
//System.out.println("nrt exc:");
//t.printStackTrace(System.out);
}
}
};
nrtThread.start();
dir.setRandomIOExceptionRate(.1);
startingGun.countDown();
commitThread.join();
nrtThread.join();
dir.setRandomIOExceptionRate(0.0);
w.close();
dir.close();
}
}