NIFI-2495 This closes #808. Ensure that we always close Index Searchers when we're finished with them

This commit is contained in:
Mark Payne 2016-08-08 15:01:32 +00:00 committed by joewitt
parent d6a2409d71
commit 8752d11f18
1 changed files with 20 additions and 9 deletions

View File

@ -21,11 +21,9 @@ import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@ -189,7 +187,7 @@ public class IndexManager implements Closeable {
} else {
// keep track of any searchers that have been closed so that we can remove them
// from our cache later.
final Set<ActiveIndexSearcher> expired = new HashSet<>();
final List<ActiveIndexSearcher> expired = new ArrayList<>();
try {
for ( final ActiveIndexSearcher searcher : currentlyCached ) {
@ -307,10 +305,12 @@ public class IndexManager implements Closeable {
// Check if the given searcher is in our list. We use an Iterator to do this so that if we
// find it we can call remove() on the iterator if need be.
final Iterator<ActiveIndexSearcher> itr = currentlyCached.iterator();
final Iterator<ActiveIndexSearcher> itr = new ArrayList<>(currentlyCached).iterator();
boolean activeSearcherFound = false;
while (itr.hasNext()) {
final ActiveIndexSearcher activeSearcher = itr.next();
if ( activeSearcher.getSearcher().equals(searcher) ) {
activeSearcherFound = true;
if ( activeSearcher.isCache() ) {
// if the searcher is poisoned, close it and remove from "pool".
if ( activeSearcher.isPoisoned() ) {
@ -318,7 +318,10 @@ public class IndexManager implements Closeable {
try {
logger.debug("Closing Index Searcher for {} because it is poisoned", indexDirectory);
activeSearcher.close();
final boolean allReferencesClosed = activeSearcher.close();
if (!allReferencesClosed) {
currentlyCached.add(activeSearcher);
}
} catch (final IOException ioe) {
logger.warn("Failed to close Index Searcher for {} due to {}", absoluteFile, ioe);
if ( logger.isDebugEnabled() ) {
@ -366,7 +369,10 @@ public class IndexManager implements Closeable {
try {
logger.debug("Closing Index Searcher for {}", indexDirectory);
activeSearcher.close();
final boolean allReferencesClosed = activeSearcher.close();
if (!allReferencesClosed) {
currentlyCached.add(activeSearcher);
}
} catch (final IOException ioe) {
logger.warn("Failed to close Index Searcher for {} due to {}", absoluteFile, ioe);
if ( logger.isDebugEnabled() ) {
@ -376,6 +382,10 @@ public class IndexManager implements Closeable {
}
}
}
if (!activeSearcherFound) {
logger.error("Index Searcher {} was returned for {} but found no Active Searcher for it", searcher, indexDirectory);
}
} finally {
lock.unlock();
}
@ -448,7 +458,7 @@ public class IndexManager implements Closeable {
}
private static class ActiveIndexSearcher implements Closeable {
private static class ActiveIndexSearcher {
private final IndexSearcher searcher;
private final DirectoryReader directoryReader;
private final File indexDirectory;
@ -490,14 +500,15 @@ public class IndexManager implements Closeable {
return referenceCount.decrementAndGet();
}
@Override
public void close() throws IOException {
public boolean close() throws IOException {
final int updatedRefCount = referenceCount.decrementAndGet();
if (updatedRefCount <= 0) {
logger.debug("Decremented Reference Count for {} to {}; closing underlying directory reader", this, updatedRefCount);
IndexManager.close(directoryReader, directory);
return true;
} else {
logger.debug("Decremented Reference Count for {} to {}; leaving underlying directory reader open", this, updatedRefCount);
return false;
}
}