LUCENE-8215: Fix several fragile exception handling places in o.a.l.index

Several places in the index package don't handle exceptions well or ignores them.
This change adds some utility methods and cuts over to make use of try/with blocks
to simplify exception handling.
This commit is contained in:
Simon Willnauer 2018-03-19 11:55:51 +01:00
parent 3048e5da22
commit 2e35ef2b3d
17 changed files with 220 additions and 343 deletions

View File

@ -19,6 +19,7 @@ package org.apache.lucene.index;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
@ -302,7 +303,6 @@ class BufferedUpdatesStream implements Accountable {
ensureOpen();
List<SegmentState> segStates = new ArrayList<>();
boolean success = false;
try {
for (SegmentCommitInfo info : infos) {
if (info.getBufferedDeletesGen() <= delGen && alreadySeenSegments.contains(info) == false) {
@ -310,17 +310,15 @@ class BufferedUpdatesStream implements Accountable {
alreadySeenSegments.add(info);
}
}
success = true;
} finally {
if (success == false) {
for(SegmentState segState : segStates) {
try {
segState.finish(pool);
} catch (Throwable th) {
// suppress so we keep throwing original exc
}
} catch (Throwable t) {
for(SegmentState segState : segStates) {
try {
segState.finish(pool);
} catch (Throwable th) {
t.addSuppressed(th);
}
}
throw t;
}
return segStates.toArray(new SegmentState[0]);
@ -328,13 +326,10 @@ class BufferedUpdatesStream implements Accountable {
/** Close segment states previously opened with openSegmentStates. */
public ApplyDeletesResult closeSegmentStates(IndexWriter.ReaderPool pool, SegmentState[] segStates, boolean success) throws IOException {
int count = segStates.length;
Throwable firstExc = null;
List<SegmentCommitInfo> allDeleted = null;
long totDelCount = 0;
for (int j=0;j<count;j++) {
SegmentState segState = segStates[j];
final List<SegmentState> segmentStates = Arrays.asList(segStates);
for (SegmentState segState : segmentStates) {
if (success) {
totDelCount += segState.rld.getPendingDeleteCount() - segState.startDelCount;
int fullDelCount = segState.rld.info.getDelCount() + segState.rld.getPendingDeleteCount();
@ -346,21 +341,8 @@ class BufferedUpdatesStream implements Accountable {
allDeleted.add(segState.reader.getSegmentInfo());
}
}
try {
segStates[j].finish(pool);
} catch (Throwable th) {
if (firstExc == null) {
firstExc = th;
}
}
}
if (success) {
if (firstExc != null) {
throw IOUtils.rethrowAlways(firstExc);
}
}
IOUtils.applyToAll(segmentStates, s -> s.finish(pool));
if (infoStream.isEnabled("BD")) {
infoStream.message("BD", "closeSegmentStates: " + totDelCount + " new deleted documents; pool " + updates.size() + " packets; bytesUsed=" + pool.ramBytesUsed());
}

View File

@ -418,7 +418,8 @@ public class ConcurrentMergeScheduler extends MergeScheduler {
if (value != null) {
spins = Boolean.parseBoolean(value);
}
} catch (Throwable ignored) {
} catch (Exception ignored) {
// that's fine we might hit a SecurityException etc. here just continue
}
setDefaultMaxMergesAndThreads(spins);
if (verbose()) {

View File

@ -636,8 +636,8 @@ final class DocumentsWriterFlushControl implements Accountable {
try {
documentsWriter.subtractFlushedNumDocs(dwpt.getNumDocsInRAM());
dwpt.abort();
} catch (Throwable ex) {
// ignore - keep on aborting the flush queue
} catch (Exception ex) {
// that's fine we just abort everything here this is best effort
} finally {
doAfterFlush(dwpt);
}
@ -647,8 +647,8 @@ final class DocumentsWriterFlushControl implements Accountable {
flushingWriters.put(blockedFlush.dwpt, Long.valueOf(blockedFlush.bytes));
documentsWriter.subtractFlushedNumDocs(blockedFlush.dwpt.getNumDocsInRAM());
blockedFlush.dwpt.abort();
} catch (Throwable ex) {
// ignore - keep on aborting the blocked queue
} catch (Exception ex) {
// that's fine we just abort everything here this is best effort
} finally {
doAfterFlush(blockedFlush.dwpt);
}

View File

@ -16,6 +16,7 @@
*/
package org.apache.lucene.index;
import java.io.Closeable;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
@ -26,6 +27,7 @@ import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.lucene.index.DocValuesUpdate.BinaryDocValuesUpdate;
import org.apache.lucene.index.DocValuesUpdate.NumericDocValuesUpdate;
@ -40,7 +42,6 @@ import org.apache.lucene.store.RAMOutputStream;
import org.apache.lucene.util.ArrayUtil;
import org.apache.lucene.util.Bits;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.IOUtils;
import org.apache.lucene.util.InfoStream;
import org.apache.lucene.util.RamUsageEstimator;
@ -247,7 +248,7 @@ class FrozenBufferedUpdates {
/** Translates a frozen packet of delete term/query, or doc values
* updates, into their actual docIDs in the index, and applies the change. This is a heavy
* operation and is done concurrently by incoming indexing threads. */
@SuppressWarnings("try")
public synchronized void apply(IndexWriter writer) throws IOException {
if (applied.getCount() == 0) {
// already done
@ -319,14 +320,12 @@ class FrozenBufferedUpdates {
writer.deleter.incRef(delFiles);
}
boolean success = false;
AtomicBoolean success = new AtomicBoolean();
long delCount;
try {
try (Closeable finalizer = () -> finishApply(writer, segStates, success.get(), delFiles)) {
// don't hold IW monitor lock here so threads are free concurrently resolve deletes/updates:
delCount = apply(segStates);
success = true;
} finally {
finishApply(writer, segStates, success, delFiles);
success.set(true);
}
// Since we jus resolved some more deletes/updates, now is a good time to write them:
@ -722,103 +721,98 @@ class FrozenBufferedUpdates {
// We apply segment-private deletes on flush:
assert privateSegment == null;
try {
long startNS = System.nanoTime();
long startNS = System.nanoTime();
long delCount = 0;
long delCount = 0;
for (BufferedUpdatesStream.SegmentState segState : segStates) {
assert segState.delGen != delGen: "segState.delGen=" + segState.delGen + " vs this.gen=" + delGen;
if (segState.delGen > delGen) {
// our deletes don't apply to this segment
continue;
}
if (segState.rld.refCount() == 1) {
// This means we are the only remaining reference to this segment, meaning
// it was merged away while we were running, so we can safely skip running
// because we will run on the newly merged segment next:
continue;
}
for (BufferedUpdatesStream.SegmentState segState : segStates) {
assert segState.delGen != delGen: "segState.delGen=" + segState.delGen + " vs this.gen=" + delGen;
if (segState.delGen > delGen) {
// our deletes don't apply to this segment
continue;
}
if (segState.rld.refCount() == 1) {
// This means we are the only remaining reference to this segment, meaning
// it was merged away while we were running, so we can safely skip running
// because we will run on the newly merged segment next:
continue;
}
FieldTermIterator iter = deleteTerms.iterator();
FieldTermIterator iter = deleteTerms.iterator();
BytesRef delTerm;
String field = null;
TermsEnum termsEnum = null;
BytesRef readerTerm = null;
PostingsEnum postingsEnum = null;
while ((delTerm = iter.next()) != null) {
BytesRef delTerm;
String field = null;
TermsEnum termsEnum = null;
BytesRef readerTerm = null;
PostingsEnum postingsEnum = null;
while ((delTerm = iter.next()) != null) {
if (iter.field() != field) {
// field changed
field = iter.field();
Terms terms = segState.reader.terms(field);
if (terms != null) {
termsEnum = terms.iterator();
readerTerm = termsEnum.next();
} else {
termsEnum = null;
}
if (iter.field() != field) {
// field changed
field = iter.field();
Terms terms = segState.reader.terms(field);
if (terms != null) {
termsEnum = terms.iterator();
readerTerm = termsEnum.next();
} else {
termsEnum = null;
}
}
if (termsEnum != null) {
int cmp = delTerm.compareTo(readerTerm);
if (cmp < 0) {
// TODO: can we advance across del terms here?
// move to next del term
continue;
} else if (cmp == 0) {
if (termsEnum != null) {
int cmp = delTerm.compareTo(readerTerm);
if (cmp < 0) {
// TODO: can we advance across del terms here?
// move to next del term
continue;
} else if (cmp == 0) {
// fall through
} else if (cmp > 0) {
TermsEnum.SeekStatus status = termsEnum.seekCeil(delTerm);
if (status == TermsEnum.SeekStatus.FOUND) {
// fall through
} else if (cmp > 0) {
TermsEnum.SeekStatus status = termsEnum.seekCeil(delTerm);
if (status == TermsEnum.SeekStatus.FOUND) {
// fall through
} else if (status == TermsEnum.SeekStatus.NOT_FOUND) {
readerTerm = termsEnum.term();
continue;
} else {
// TODO: can we advance to next field in deleted terms?
// no more terms in this segment
termsEnum = null;
continue;
}
} else if (status == TermsEnum.SeekStatus.NOT_FOUND) {
readerTerm = termsEnum.term();
continue;
} else {
// TODO: can we advance to next field in deleted terms?
// no more terms in this segment
termsEnum = null;
continue;
}
}
// we don't need term frequencies for this
postingsEnum = termsEnum.postings(postingsEnum, PostingsEnum.NONE);
// we don't need term frequencies for this
postingsEnum = termsEnum.postings(postingsEnum, PostingsEnum.NONE);
assert postingsEnum != null;
assert postingsEnum != null;
int docID;
while ((docID = postingsEnum.nextDoc()) != DocIdSetIterator.NO_MORE_DOCS) {
int docID;
while ((docID = postingsEnum.nextDoc()) != DocIdSetIterator.NO_MORE_DOCS) {
// NOTE: there is no limit check on the docID
// when deleting by Term (unlike by Query)
// because on flush we apply all Term deletes to
// each segment. So all Term deleting here is
// against prior segments:
if (segState.rld.delete(docID)) {
delCount++;
}
// NOTE: there is no limit check on the docID
// when deleting by Term (unlike by Query)
// because on flush we apply all Term deletes to
// each segment. So all Term deleting here is
// against prior segments:
if (segState.rld.delete(docID)) {
delCount++;
}
}
}
}
if (infoStream.isEnabled("BD")) {
infoStream.message("BD",
String.format(Locale.ROOT, "applyTermDeletes took %.2f msec for %d segments and %d del terms; %d new deletions",
(System.nanoTime()-startNS)/1000000.,
segStates.length,
deleteTerms.size(),
delCount));
}
return delCount;
} catch (Throwable t) {
throw IOUtils.rethrowAlways(t);
}
if (infoStream.isEnabled("BD")) {
infoStream.message("BD",
String.format(Locale.ROOT, "applyTermDeletes took %.2f msec for %d segments and %d del terms; %d new deletions",
(System.nanoTime()-startNS)/1000000.,
segStates.length,
deleteTerms.size(),
delCount));
}
return delCount;
}
public void setDelGen(long delGen) {

View File

@ -356,10 +356,6 @@ final class IndexFileDeleter implements Closeable {
}
}
public SegmentInfos getLastSegmentInfos() {
return lastSegmentInfos;
}
/**
* Remove the CommitPoints in the commitsToDelete List by
* DecRef'ing all files from each SegmentInfos.
@ -381,9 +377,7 @@ final class IndexFileDeleter implements Closeable {
try {
decRef(commit.files);
} catch (Throwable t) {
if (firstThrowable == null) {
firstThrowable = t;
}
firstThrowable = IOUtils.useOrSuppress(firstThrowable, t);
}
}
commitsToDelete.clear();
@ -583,20 +577,14 @@ final class IndexFileDeleter implements Closeable {
toDelete.add(file);
}
} catch (Throwable t) {
if (firstThrowable == null) {
// Save first exception and throw it in the end, but be sure to finish decRef all files
firstThrowable = t;
}
firstThrowable = IOUtils.useOrSuppress(firstThrowable, t);
}
}
try {
deleteFiles(toDelete);
} catch (Throwable t) {
if (firstThrowable == null) {
// Save first exception and throw it in the end, but be sure to finish decRef all files
firstThrowable = t;
}
firstThrowable = IOUtils.useOrSuppress(firstThrowable, t);
}
if (firstThrowable != null) {

View File

@ -29,7 +29,6 @@ import org.apache.lucene.document.Document;
import org.apache.lucene.document.DocumentStoredFieldVisitor;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.util.Bits; // javadocs
import org.apache.lucene.util.IOUtils;
/**
IndexReader is an abstract class, providing an interface for accessing a
@ -142,16 +141,13 @@ public abstract class IndexReader implements Closeable {
}
// overridden by StandardDirectoryReader and SegmentReader
void notifyReaderClosedListeners(Throwable th) throws IOException {
// nothing to notify in the base impl, just rethrow
if (th != null) {
throw IOUtils.rethrowAlways(th);
}
void notifyReaderClosedListeners() throws IOException {
// nothing to notify in the base impl
}
private void reportCloseToParentReaders() {
synchronized(parentReaders) {
for(IndexReader parent : parentReaders) {
private void reportCloseToParentReaders() throws IOException {
synchronized (parentReaders) {
for (IndexReader parent : parentReaders) {
parent.closedByChild = true;
// cross memory barrier by a fake write:
parent.refCount.addAndGet(0);
@ -232,6 +228,7 @@ public abstract class IndexReader implements Closeable {
*
* @see #incRef
*/
@SuppressWarnings("try")
public final void decRef() throws IOException {
// only check refcount here (don't call ensureOpen()), so we can
// still close the reader if it was made invalid by a child:
@ -242,17 +239,9 @@ public abstract class IndexReader implements Closeable {
final int rc = refCount.decrementAndGet();
if (rc == 0) {
closed = true;
Throwable throwable = null;
try {
try (Closeable finalizer = this::reportCloseToParentReaders;
Closeable finalizer1 = this::notifyReaderClosedListeners) {
doClose();
} catch (Throwable th) {
throwable = th;
} finally {
try {
reportCloseToParentReaders();
} finally {
notifyReaderClosedListeners(throwable);
}
}
} else if (rc < 0) {
throw new IllegalStateException("too many decRef calls: refCount is " + rc + " after decrement");

View File

@ -2586,6 +2586,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
* @return The <a href="#sequence_number">sequence number</a>
* for this operation
*/
@SuppressWarnings("try")
public long deleteAll() throws IOException {
ensureOpen();
// Remove any buffered docs
@ -2604,7 +2605,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
*/
try {
synchronized (fullFlushLock) {
try (Closeable release = docWriter.lockAndAbortAll(this)) {
try (Closeable finalizer = docWriter.lockAndAbortAll(this)) {
processEvents(false);
synchronized (this) {
try {
@ -3957,6 +3958,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
return mergedDeletesAndUpdates;
}
@SuppressWarnings("try")
synchronized private boolean commitMerge(MergePolicy.OneMerge merge, MergeState mergeState) throws IOException {
testPoint("startCommitMerge");
@ -4069,22 +4071,11 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
deleteNewFiles(merge.info.files());
}
try {
try (Closeable finalizer = this::checkpoint) {
// Must close before checkpoint, otherwise IFD won't be
// able to delete the held-open files from the merge
// readers:
closeMergeReaders(merge, false);
checkpoint();
} catch (Throwable t) {
// Must note the change to segmentInfos so any commits
// in-flight don't lose it (IFD will incRef/protect the
// new files we created):
try {
checkpoint();
} catch (Throwable t1) {
t.addSuppressed(t1);
}
throw t;
}
if (infoStream.isEnabled("IW")) {
@ -4403,45 +4394,27 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
runningMerges.remove(merge);
}
private final synchronized void closeMergeReaders(MergePolicy.OneMerge merge, boolean suppressExceptions) throws IOException {
final int numSegments = merge.readers.size();
Throwable th = null;
boolean drop = suppressExceptions == false;
for (int i = 0; i < numSegments; i++) {
final SegmentReader sr = merge.readers.get(i);
if (sr != null) {
try {
final ReadersAndUpdates rld = readerPool.get(sr.getSegmentInfo(), false);
// We still hold a ref so it should not have been removed:
assert rld != null;
if (drop) {
rld.dropChanges();
} else {
rld.dropMergingUpdates();
}
rld.release(sr);
readerPool.release(rld);
if (drop) {
readerPool.drop(rld.info);
}
} catch (Throwable t) {
th = IOUtils.useOrSuppress(th, t);
@SuppressWarnings("try")
private synchronized void closeMergeReaders(MergePolicy.OneMerge merge, boolean suppressExceptions) throws IOException {
final boolean drop = suppressExceptions == false;
try (Closeable finalizer = merge::mergeFinished) {
IOUtils.applyToAll(merge.readers, sr -> {
final ReadersAndUpdates rld = readerPool.get(sr.getSegmentInfo(), false);
// We still hold a ref so it should not have been removed:
assert rld != null;
if (drop) {
rld.dropChanges();
} else {
rld.dropMergingUpdates();
}
merge.readers.set(i, null);
}
}
try {
merge.mergeFinished();
} catch (Throwable t) {
th = IOUtils.useOrSuppress(th, t);
}
// If any error occurred, throw it.
if (!suppressExceptions && th != null) {
throw IOUtils.rethrowAlways(th);
rld.release(sr);
readerPool.release(rld);
if (drop) {
readerPool.drop(rld.info);
}
});
} finally {
Collections.fill(merge.readers, null);
}
}

View File

@ -845,7 +845,7 @@ class ReadersAndUpdates {
success = true;
} finally {
if (success == false) {
newReader.decRef();
newReader.close();
}
}
reader = newReader;

View File

@ -17,6 +17,7 @@
package org.apache.lucene.index;
import java.io.Closeable;
import java.io.EOFException;
import java.io.FileNotFoundException;
import java.io.IOException;
@ -161,16 +162,13 @@ final class SegmentCoreReaders {
throw new AlreadyClosedException("SegmentCoreReaders is already closed");
}
@SuppressWarnings("try")
void decRef() throws IOException {
if (ref.decrementAndGet() == 0) {
Throwable th = null;
try {
try (Closeable finalizer = this::notifyCoreClosedListeners){
IOUtils.close(termVectorsLocal, fieldsReaderLocal, fields, termVectorsReaderOrig, fieldsReaderOrig,
cfsReader, normsProducer, pointsReader);
} catch (Throwable throwable) {
th = throwable;
} finally {
notifyCoreClosedListeners(th);
}
}
}
@ -193,25 +191,9 @@ final class SegmentCoreReaders {
return cacheHelper;
}
private void notifyCoreClosedListeners(Throwable th) throws IOException {
private void notifyCoreClosedListeners() throws IOException {
synchronized(coreClosedListeners) {
for (IndexReader.ClosedListener listener : coreClosedListeners) {
// SegmentReader uses our instance as its
// coreCacheKey:
try {
listener.onClose(cacheHelper.getKey());
} catch (Throwable t) {
if (th == null) {
th = t;
} else {
th.addSuppressed(t);
}
}
}
if (th != null) {
throw IOUtils.rethrowAlways(th);
}
IOUtils.applyToAll(coreClosedListeners, l -> l.onClose(cacheHelper.getKey()));
}
}

View File

@ -78,21 +78,10 @@ final class SegmentDocValues {
* generations.
*/
synchronized void decRef(List<Long> dvProducersGens) throws IOException {
Throwable t = null;
for (Long gen : dvProducersGens) {
IOUtils.applyToAll(dvProducersGens, gen -> {
RefCount<DocValuesProducer> dvp = genDVProducers.get(gen);
assert dvp != null : "gen=" + gen;
try {
dvp.decRef();
} catch (Throwable th) {
if (t == null) {
t = th;
}
}
}
if (t != null) {
throw IOUtils.rethrowAlways(t);
}
dvp.decRef();
});
}
}

View File

@ -55,7 +55,6 @@ class SegmentDocValuesProducer extends DocValuesProducer {
* @param segDocValues producer map
*/
SegmentDocValuesProducer(SegmentCommitInfo si, Directory dir, FieldInfos coreInfos, FieldInfos allInfos, SegmentDocValues segDocValues) throws IOException {
boolean success = false;
try {
DocValuesProducer baseProducer = null;
for (FieldInfo fi : allInfos) {
@ -74,21 +73,19 @@ class SegmentDocValuesProducer extends DocValuesProducer {
} else {
assert !dvGens.contains(docValuesGen);
// otherwise, producer sees only the one fieldinfo it wrote
final DocValuesProducer dvp = segDocValues.getDocValuesProducer(docValuesGen, si, dir, new FieldInfos(new FieldInfo[] { fi }));
final DocValuesProducer dvp = segDocValues.getDocValuesProducer(docValuesGen, si, dir, new FieldInfos(new FieldInfo[]{fi}));
dvGens.add(docValuesGen);
dvProducers.add(dvp);
dvProducersByField.put(fi.name, dvp);
}
}
success = true;
} finally {
if (success == false) {
try {
segDocValues.decRef(dvGens);
} catch (Throwable t) {
// Ignore so we keep throwing first exception
}
} catch (Throwable t) {
try {
segDocValues.decRef(dvGens);
} catch (Throwable t1) {
t.addSuppressed(t1);
}
throw t;
}
}

View File

@ -297,23 +297,9 @@ public final class SegmentReader extends CodecReader {
private final Set<ClosedListener> readerClosedListeners = new CopyOnWriteArraySet<>();
@Override
void notifyReaderClosedListeners(Throwable th) throws IOException {
void notifyReaderClosedListeners() throws IOException {
synchronized(readerClosedListeners) {
for(ClosedListener listener : readerClosedListeners) {
try {
listener.onClose(readerCacheHelper.getKey());
} catch (Throwable t) {
if (th == null) {
th = t;
} else {
th.addSuppressed(t);
}
}
}
if (th != null) {
IOUtils.rethrowAlways(th);
}
IOUtils.applyToAll(readerClosedListeners, l -> l.onClose(readerCacheHelper.getKey()));
}
}

View File

@ -17,6 +17,7 @@
package org.apache.lucene.index;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
@ -91,7 +92,6 @@ public final class StandardDirectoryReader extends DirectoryReader {
final SegmentInfos segmentInfos = infos.clone();
int infosUpto = 0;
boolean success = false;
try {
for (int i = 0; i < numSegments; i++) {
// NOTE: important that we use infos not
@ -119,21 +119,16 @@ public final class StandardDirectoryReader extends DirectoryReader {
writer.incRefDeleter(segmentInfos);
StandardDirectoryReader result = new StandardDirectoryReader(dir,
readers.toArray(new SegmentReader[readers.size()]), writer,
segmentInfos, applyAllDeletes, writeAllDeletes);
success = true;
readers.toArray(new SegmentReader[readers.size()]), writer,
segmentInfos, applyAllDeletes, writeAllDeletes);
return result;
} finally {
if (!success) {
for (SegmentReader r : readers) {
try {
r.decRef();
} catch (Throwable th) {
// ignore any exception that is thrown here to not mask any original
// exception.
}
}
} catch (Throwable t) {
try {
IOUtils.applyToAll(readers, SegmentReader::decRef);
} catch (Throwable t1) {
t.addSuppressed(t1);
}
throw t;
}
}
@ -365,34 +360,25 @@ public final class StandardDirectoryReader extends DirectoryReader {
}
@Override
@SuppressWarnings("try")
protected void doClose() throws IOException {
Throwable firstExc = null;
for (final LeafReader r : getSequentialSubReaders()) {
// try to close each reader, even if an exception is thrown
try {
r.decRef();
} catch (Throwable t) {
if (firstExc == null) {
firstExc = t;
Closeable decRefDeleter = () -> {
if (writer != null) {
try {
writer.decRefDeleter(segmentInfos);
} catch (AlreadyClosedException ex) {
// This is OK, it just means our original writer was
// closed before we were, and this may leave some
// un-referenced files in the index, which is
// harmless. The next time IW is opened on the
// index, it will delete them.
}
}
}
if (writer != null) {
try {
writer.decRefDeleter(segmentInfos);
} catch (AlreadyClosedException ex) {
// This is OK, it just means our original writer was
// closed before we were, and this may leave some
// un-referenced files in the index, which is
// harmless. The next time IW is opened on the
// index, it will delete them.
}
}
// throw the first exception
if (firstExc != null) {
throw IOUtils.rethrowAlways(firstExc);
};
try (Closeable finalizer = decRefDeleter) {
// try to close each reader, even if an exception is thrown
final List<? extends LeafReader> sequentialSubReaders = getSequentialSubReaders();
IOUtils.applyToAll(sequentialSubReaders, LeafReader::decRef);
}
}
@ -493,23 +479,9 @@ public final class StandardDirectoryReader extends DirectoryReader {
};
@Override
void notifyReaderClosedListeners(Throwable th) throws IOException {
void notifyReaderClosedListeners() throws IOException {
synchronized(readerClosedListeners) {
for(ClosedListener listener : readerClosedListeners) {
try {
listener.onClose(cacheHelper.getKey());
} catch (Throwable t) {
if (th == null) {
th = t;
} else {
th.addSuppressed(t);
}
}
}
if (th != null) {
throw IOUtils.rethrowAlways(th);
}
IOUtils.applyToAll(readerClosedListeners, l -> l.onClose(cacheHelper.getKey()));
}
}

View File

@ -19,6 +19,7 @@ package org.apache.lucene.index;
import java.io.IOException;
/**
* A utility for executing 2-phase commit on several objects.
*

View File

@ -16,7 +16,6 @@
*/
package org.apache.lucene.util;
import java.io.BufferedReader;
import java.io.Closeable;
import java.io.IOException;
@ -40,6 +39,7 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Objects;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FSDirectory;
@ -638,4 +638,24 @@ public final class IOUtils {
}
return first;
}
/**
* Applies the consumer to all non-null elements in the collection even if an exception is thrown. The first exception
* thrown by the consumer is re-thrown and subsequent exceptions are suppressed.
*/
public static <T> void applyToAll(Collection<T> collection, IOConsumer<T> consumer) throws IOException {
IOUtils.close(collection.stream().filter(Objects::nonNull).map(t -> (Closeable) () -> consumer.accept(t))::iterator);
}
/**
* An IO operation with a single input.
* @see java.util.function.Consumer
*/
@FunctionalInterface
public interface IOConsumer<T> {
/**
* Performs this operation on the given argument.
*/
void accept(T input) throws IOException;
}
}

View File

@ -27,6 +27,8 @@ import java.nio.file.LinkOption;
import java.nio.file.Path;
import java.nio.file.attribute.FileAttributeView;
import java.nio.file.attribute.FileStoreAttributeView;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
@ -487,4 +489,19 @@ public class TestIOUtils extends LuceneTestCase {
// no exception
}
public void testApplyToAll() {
ArrayList<Integer> closed = new ArrayList<>();
RuntimeException runtimeException = expectThrows(RuntimeException.class, () ->
IOUtils.applyToAll(Arrays.asList(1, 2), i -> {
closed.add(i);
throw new RuntimeException("" + i);
}));
assertEquals("1", runtimeException.getMessage());
assertEquals(1, runtimeException.getSuppressed().length);
assertEquals("2", runtimeException.getSuppressed()[0].getMessage());
assertEquals(2, closed.size());
assertEquals(1, closed.get(0).intValue());
assertEquals(2, closed.get(1).intValue());
}
}

View File

@ -57,23 +57,9 @@ public final class OwnCacheKeyMultiReader extends MultiReader {
}
@Override
void notifyReaderClosedListeners(Throwable th) throws IOException {
void notifyReaderClosedListeners() throws IOException {
synchronized(readerClosedListeners) {
for(ClosedListener listener : readerClosedListeners) {
try {
listener.onClose(cacheHelper.getKey());
} catch (Throwable t) {
if (th == null) {
th = t;
} else {
th.addSuppressed(t);
}
}
}
if (th != null) {
throw IOUtils.rethrowAlways(th);
}
IOUtils.applyToAll(readerClosedListeners, l -> l.onClose(cacheHelper.getKey()));
}
}