LUCENE-8212: Never swallow Exceptions in IndexWriter and DocumentsWriter

IndexWriter as well as DocumentsWriter caught Throwable and ignored
it. This is mainly a relict from pre Java 7 were exceptions didn't have
the needed API to suppress exceptions. This change handles exceptions
correctly where the original exception is rethrown and all other exceptions
are added as suppressed.
This commit is contained in:
Simon Willnauer 2018-03-15 15:00:35 +01:00
parent 6eac67fe56
commit 916ed60eea
3 changed files with 127 additions and 115 deletions

View File

@ -19,10 +19,13 @@ package org.apache.lucene.index;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Locale;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.lucene.analysis.Analyzer;
@ -261,16 +264,30 @@ final class DocumentsWriter implements Closeable, Accountable {
return false; // we didn't flush anything here
}
/** Returns how many documents were aborted. */
synchronized long lockAndAbortAll(IndexWriter indexWriter) throws IOException {
/** Locks all currently active DWPT and aborts them.
* The returned Closeable should be closed once the locks for the aborted
* DWPTs can be released. */
synchronized Closeable lockAndAbortAll(IndexWriter indexWriter) throws IOException {
assert indexWriter.holdsFullFlushLock();
if (infoStream.isEnabled("DW")) {
infoStream.message("DW", "lockAndAbortAll");
}
// Make sure we move all pending tickets into the flush queue:
ticketQueue.forcePurge(indexWriter);
long abortedDocCount = 0;
boolean success = false;
List<ThreadState> threadStates = new ArrayList<>();
AtomicBoolean released = new AtomicBoolean(false);
final Closeable release = () -> {
assert indexWriter.holdsFullFlushLock();
if (released.compareAndSet(false, true)) { // only once
if (infoStream.isEnabled("DW")) {
infoStream.message("DW", "unlockAllAbortedThread");
}
perThreadPool.clearAbort();
for (ThreadState state : threadStates) {
state.unlock();
}
}
};
try {
deleteQueue.clear();
final int limit = perThreadPool.getMaxThreadStates();
@ -278,25 +295,31 @@ final class DocumentsWriter implements Closeable, Accountable {
for (int i = 0; i < limit; i++) {
final ThreadState perThread = perThreadPool.getThreadState(i);
perThread.lock();
abortedDocCount += abortThreadState(perThread);
threadStates.add(perThread);
abortThreadState(perThread);
}
deleteQueue.clear();
// jump over any possible in flight ops:
deleteQueue.skipSequenceNumbers(perThreadPool.getActiveThreadStateCount()+1);
deleteQueue.skipSequenceNumbers(perThreadPool.getActiveThreadStateCount() + 1);
flushControl.abortPendingFlushes();
flushControl.waitForFlush();
success = true;
return abortedDocCount;
} finally {
if (infoStream.isEnabled("DW")) {
infoStream.message("DW", "finished lockAndAbortAll success=" + success);
infoStream.message("DW", "finished lockAndAbortAll success=true");
}
if (success == false) {
return release;
} catch (Throwable t) {
if (infoStream.isEnabled("DW")) {
infoStream.message("DW", "finished lockAndAbortAll success=false");
}
try {
// if something happens here we unlock all states again
unlockAllAfterAbortAll(indexWriter);
release.close();
} catch (Throwable t1) {
t.addSuppressed(t1);
}
throw t;
}
}
@ -319,28 +342,6 @@ final class DocumentsWriter implements Closeable, Accountable {
}
}
synchronized void unlockAllAfterAbortAll(IndexWriter indexWriter) {
assert indexWriter.holdsFullFlushLock();
if (infoStream.isEnabled("DW")) {
infoStream.message("DW", "unlockAll");
}
final int limit = perThreadPool.getMaxThreadStates();
perThreadPool.clearAbort();
for (int i = 0; i < limit; i++) {
try {
final ThreadState perThread = perThreadPool.getThreadState(i);
if (perThread.isHeldByCurrentThread()) {
perThread.unlock();
}
} catch (Throwable e) {
if (infoStream.isEnabled("DW")) {
infoStream.message("DW", "unlockAll: could not unlock state: " + i + " msg:" + e.getMessage());
}
// ignore & keep on unlocking
}
}
}
/** returns the maximum sequence number for all previously completed operations */
public long getMaxCompletedSequenceNumber() {
long value = lastSeqNo;

View File

@ -746,10 +746,9 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
checkpointNoSIS();
}
} catch (Throwable t) {
priorE = IOUtils.useOrSuppress(priorE, t);
if (doSave) {
throw IOUtils.rethrowAlways(t);
} else if (priorE == null) {
priorE = t;
throw t;
}
}
@ -766,10 +765,9 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
try {
rld.dropReaders();
} catch (Throwable t) {
priorE = IOUtils.useOrSuppress(priorE, t);
if (doSave) {
throw IOUtils.rethrowAlways(t);
} else if (priorE == null) {
priorE = t;
throw t;
}
}
}
@ -2447,8 +2445,6 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
}
private void rollbackInternalNoCommit() throws IOException {
boolean success = false;
if (infoStream.isEnabled("IW")) {
infoStream.message("IW", "rollback");
}
@ -2467,7 +2463,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
docWriter.abort(this); // don't sync on IW here
docWriter.flushControl.waitForFlush(); // wait for all concurrently running flushes
purge(true); // empty the flush ticket queue otherwise we might not have cleaned up all resources
synchronized(this) {
synchronized (this) {
if (pendingCommit != null) {
pendingCommit.rollbackCommit(directory);
@ -2488,8 +2484,8 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
int rollbackMaxDoc = segmentInfos.totalMaxDoc();
// now we need to adjust this back to the rolled back SI but don't set it to the absolute value
// otherwise we might hide internal bugsf
adjustPendingNumDocs(-(totalMaxDoc-rollbackMaxDoc));
if (infoStream.isEnabled("IW") ) {
adjustPendingNumDocs(-(totalMaxDoc - rollbackMaxDoc));
if (infoStream.isEnabled("IW")) {
infoStream.message("IW", "rollback: infos=" + segString(segmentInfos));
}
@ -2512,29 +2508,26 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
IOUtils.close(writeLock); // release write lock
writeLock = null;
closed = true;
closing = false;
// So any "concurrently closing" threads wake up and see that the close has now completed:
notifyAll();
}
success = true;
} catch (VirtualMachineError tragedy) {
tragicEvent(tragedy, "rollbackInternal");
throw tragedy;
} finally {
if (success == false) {
} catch (Throwable throwable) {
try {
// Must not hold IW's lock while closing
// mergeScheduler: this can lead to deadlock,
// e.g. TestIW.testThreadInterruptDeadlock
IOUtils.closeWhileHandlingException(mergeScheduler);
}
synchronized(this) {
if (success == false) {
synchronized (this) {
// we tried to be nice about it: do the minimum
// don't leak a segments_N file if there is a pending commit
if (pendingCommit != null) {
try {
pendingCommit.rollbackCommit(directory);
deleter.decRef(pendingCommit);
} catch (Throwable t) {
throwable.addSuppressed(t);
}
pendingCommit = null;
}
@ -2542,13 +2535,24 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
// close all the closeables we can (but important is readerPool and writeLock to prevent leaks)
IOUtils.closeWhileHandlingException(readerPool, deleter, writeLock);
writeLock = null;
}
closed = true;
closing = false;
// So any "concurrently closing" threads wake up and see that the close has now completed:
notifyAll();
}
} catch (Throwable t) {
throwable.addSuppressed(t);
} finally {
if (throwable instanceof VirtualMachineError) {
try {
tragicEvent(throwable, "rollbackInternal");
} catch (Throwable t1){
throwable.addSuppressed(t1);
}
}
}
throw throwable;
}
}
@ -2600,7 +2604,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
*/
try {
synchronized (fullFlushLock) {
docWriter.lockAndAbortAll(this);
try (Closeable release = docWriter.lockAndAbortAll(this)) {
processEvents(false);
synchronized (this) {
try {
@ -2631,8 +2635,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
docWriter.setLastSeqNo(seqNo);
return seqNo;
} finally {
docWriter.unlockAllAfterAbortAll(this);
if (!success) {
if (success == false) {
if (infoStream.isEnabled("IW")) {
infoStream.message("IW", "hit exception during deleteAll");
}
@ -2640,6 +2643,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
}
}
}
}
} catch (VirtualMachineError tragedy) {
tragicEvent(tragedy, "deleteAll");
throw tragedy;
@ -4065,26 +4069,22 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
deleteNewFiles(merge.info.files());
}
boolean success = false;
try {
// Must close before checkpoint, otherwise IFD won't be
// able to delete the held-open files from the merge
// readers:
closeMergeReaders(merge, false);
success = true;
} finally {
checkpoint();
} catch (Throwable t) {
// Must note the change to segmentInfos so any commits
// in-flight don't lose it (IFD will incRef/protect the
// new files we created):
if (success) {
checkpoint();
} else {
try {
checkpoint();
} catch (Throwable t) {
// Ignore so we keep throwing original exception.
}
} catch (Throwable t1) {
t.addSuppressed(t1);
}
throw t;
}
if (infoStream.isEnabled("IW")) {
@ -4120,7 +4120,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
// in which case we must throw it so, for example, the
// rollbackTransaction code in addIndexes* is
// executed.
if (merge.isExternal) {
if (merge.isExternal) { // TODO can we simplify this and just throw all the time? this would simplify this a lot
throw (MergePolicy.MergeAbortedException) t;
}
} else {
@ -4427,9 +4427,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
readerPool.drop(rld.info);
}
} catch (Throwable t) {
if (th == null) {
th = t;
}
th = IOUtils.useOrSuppress(th, t);
}
merge.readers.set(i, null);
}
@ -4438,9 +4436,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
try {
merge.mergeFinished();
} catch (Throwable t) {
if (th == null) {
th = t;
}
th = IOUtils.useOrSuppress(th, t);
}
// If any error occurred, throw it.

View File

@ -122,6 +122,8 @@ public final class IOUtils {
if (object != null) {
object.close();
}
} catch (VirtualMachineError e) {
throw e;
} catch (Throwable t) {
}
}
@ -623,4 +625,17 @@ public final class IOUtils {
return desc;
}
}
/**
* Returns the second throwable if the first is null otherwise adds the second as suppressed to the first
* and returns it.
*/
public static <T extends Throwable> T useOrSuppress(T first, T second) {
if (first == null) {
return second;
} else {
first.addSuppressed(second);
}
return first;
}
}