NIFI-2452: Ensure that we do not close Index Readers that are still in use

This commit is contained in:
Mark Payne 2016-08-01 14:51:02 -04:00 committed by joewitt
parent bc5237593e
commit e9b87dd734
6 changed files with 223 additions and 22 deletions

View File

@ -212,13 +212,14 @@ public class IndexConfiguration {
final List<File> dirs = new ArrayList<>();
lock.lock();
try {
// Sort directories so that we return the newest index first
final List<File> sortedIndexDirectories = getIndexDirectories();
Collections.sort(sortedIndexDirectories, new Comparator<File>() {
@Override
public int compare(final File o1, final File o2) {
final long epochTimestamp1 = getIndexStartTime(o1);
final long epochTimestamp2 = getIndexStartTime(o2);
return Long.compare(epochTimestamp1, epochTimestamp2);
return Long.compare(epochTimestamp2, epochTimestamp1);
}
});

View File

@ -218,6 +218,10 @@ public class PersistentProvenanceRepository implements ProvenanceRepository {
rolloverExecutor = Executors.newScheduledThreadPool(numRolloverThreads, new NamedThreadFactory("Provenance Repository Rollover Thread"));
}
protected IndexManager getIndexManager() {
return indexManager;
}
@Override
public void initialize(final EventReporter eventReporter, final Authorizer authorizer, final ProvenanceAuthorizableFactory resourceFactory) throws IOException {
writeLock.lock();
@ -692,7 +696,7 @@ public class PersistentProvenanceRepository implements ProvenanceRepository {
rolloverExecutor.shutdownNow();
queryExecService.shutdownNow();
indexManager.close();
getIndexManager().close();
if ( writers != null ) {
for (final RecordWriter writer : writers) {
@ -1054,7 +1058,7 @@ public class PersistentProvenanceRepository implements ProvenanceRepository {
// we can safely delete the first index because the latest event in the index is an event
// that has already been expired from the repository.
final File indexingDirectory = indexDirs.get(0);
indexManager.removeIndex(indexingDirectory);
getIndexManager().removeIndex(indexingDirectory);
indexConfig.removeIndexDirectory(indexingDirectory);
deleteDirectory(indexingDirectory);
@ -1522,7 +1526,7 @@ public class PersistentProvenanceRepository implements ProvenanceRepository {
logger.warn("Merged Journal File {} already exists; however, all partial journal files also exist "
+ "so assuming that the merge did not finish. Repeating procedure in order to ensure consistency.");
final DeleteIndexAction deleteAction = new DeleteIndexAction(this, indexConfig, indexManager);
final DeleteIndexAction deleteAction = new DeleteIndexAction(this, indexConfig, getIndexManager());
try {
deleteAction.execute(suggestedMergeFile);
} catch (final Exception e) {
@ -1658,7 +1662,7 @@ public class PersistentProvenanceRepository implements ProvenanceRepository {
final AtomicBoolean finishedAdding = new AtomicBoolean(false);
final List<Future<?>> futures = new ArrayList<>();
final IndexWriter indexWriter = indexManager.borrowIndexWriter(indexingDirectory);
final IndexWriter indexWriter = getIndexManager().borrowIndexWriter(indexingDirectory);
try {
final ExecutorService exec = Executors.newFixedThreadPool(configuration.getIndexThreadPoolSize(), new ThreadFactory() {
@Override
@ -1781,7 +1785,7 @@ public class PersistentProvenanceRepository implements ProvenanceRepository {
}
}
} finally {
indexManager.returnIndexWriter(indexingDirectory, indexWriter);
getIndexManager().returnIndexWriter(indexingDirectory, indexWriter);
}
indexConfig.setMaxIdIndexed(maxId);
@ -1984,7 +1988,7 @@ public class PersistentProvenanceRepository implements ProvenanceRepository {
* @return an Iterator of ProvenanceEventRecord that match the query
* @throws IOException if unable to perform the query
*/
public Iterator<ProvenanceEventRecord> queryLucene(final org.apache.lucene.search.Query luceneQuery) throws IOException {
Iterator<ProvenanceEventRecord> queryLucene(final org.apache.lucene.search.Query luceneQuery) throws IOException {
final List<File> indexFiles = indexConfig.getIndexDirectories();
final AtomicLong hits = new AtomicLong(0L);
@ -2471,7 +2475,7 @@ public class PersistentProvenanceRepository implements ProvenanceRepository {
@Override
public void run() {
try {
final IndexSearch search = new IndexSearch(PersistentProvenanceRepository.this, indexDir, indexManager, maxAttributeChars);
final IndexSearch search = new IndexSearch(PersistentProvenanceRepository.this, indexDir, getIndexManager(), maxAttributeChars);
final StandardQueryResult queryResult = search.search(query, user, retrievalCount, firstEventTimestamp);
submission.getResult().update(queryResult.getMatchingEvents(), queryResult.getTotalHitCount());
} catch (final Throwable t) {
@ -2511,7 +2515,7 @@ public class PersistentProvenanceRepository implements ProvenanceRepository {
try {
final Set<ProvenanceEventRecord> matchingRecords = LineageQuery.computeLineageForFlowFiles(PersistentProvenanceRepository.this,
indexManager, indexDir, null, flowFileUuids, maxAttributeChars);
getIndexManager(), indexDir, null, flowFileUuids, maxAttributeChars);
final StandardLineageResult result = submission.getResult();
result.update(replaceUnauthorizedWithPlaceholders(matchingRecords, user));

View File

@ -26,6 +26,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@ -86,7 +87,7 @@ public class IndexManager implements Closeable {
public IndexWriter borrowIndexWriter(final File indexingDirectory) throws IOException {
final File absoluteFile = indexingDirectory.getAbsoluteFile();
logger.debug("Borrowing index writer for {}", indexingDirectory);
logger.trace("Borrowing index writer for {}", indexingDirectory);
lock.lock();
try {
@ -124,6 +125,7 @@ public class IndexManager implements Closeable {
final List<ActiveIndexSearcher> searchers = activeSearchers.get(absoluteFile);
if ( searchers != null ) {
for (final ActiveIndexSearcher activeSearcher : searchers) {
logger.debug("Poisoning {} because it is searching {}, which is getting updated", activeSearcher, indexingDirectory);
activeSearcher.poison();
}
}
@ -141,7 +143,7 @@ public class IndexManager implements Closeable {
public void returnIndexWriter(final File indexingDirectory, final IndexWriter writer) {
final File absoluteFile = indexingDirectory.getAbsoluteFile();
logger.debug("Returning Index Writer for {} to IndexManager", indexingDirectory);
logger.trace("Returning Index Writer for {} to IndexManager", indexingDirectory);
lock.lock();
try {
@ -154,7 +156,7 @@ public class IndexManager implements Closeable {
writer.close();
} else if ( count.getCount() <= 1 ) {
// we are finished with this writer.
logger.debug("Closing Index Writer for {}", indexingDirectory);
logger.debug("Decrementing count for Index Writer for {} to {}; Closing writer", indexingDirectory, count.getCount() - 1);
count.close();
} else {
// decrement the count.
@ -175,7 +177,7 @@ public class IndexManager implements Closeable {
public IndexSearcher borrowIndexSearcher(final File indexDir) throws IOException {
final File absoluteFile = indexDir.getAbsoluteFile();
logger.debug("Borrowing index searcher for {}", indexDir);
logger.trace("Borrowing index searcher for {}", indexDir);
lock.lock();
try {
@ -210,7 +212,8 @@ public class IndexManager implements Closeable {
continue;
}
logger.debug("Providing previously cached index searcher for {}", indexDir);
final int referenceCount = searcher.incrementReferenceCount();
logger.debug("Providing previously cached index searcher for {} and incrementing Reference Count to {}", indexDir, referenceCount);
return searcher.getSearcher();
}
}
@ -219,7 +222,9 @@ public class IndexManager implements Closeable {
// from the cache so that we don't try to use them again later.
for ( final ActiveIndexSearcher searcher : expired ) {
try {
logger.debug("Closing {}", searcher);
searcher.close();
logger.trace("Closed {}", searcher);
} catch (final Exception e) {
logger.debug("Failed to close 'expired' IndexSearcher {}", searcher);
}
@ -239,11 +244,14 @@ public class IndexManager implements Closeable {
final IndexSearcher searcher = new IndexSearcher(directoryReader);
// we want to cache the searcher that we create, since it's just a reader.
final ActiveIndexSearcher cached = new ActiveIndexSearcher(searcher, directoryReader, directory, true);
final ActiveIndexSearcher cached = new ActiveIndexSearcher(searcher, absoluteFile, directoryReader, directory, true);
currentlyCached.add(cached);
return cached.getSearcher();
} catch (final IOException e) {
logger.error("Failed to create Index Searcher for {} due to {}", absoluteFile, e.toString());
logger.error("", e);
try {
directory.close();
} catch (final IOException ioe) {
@ -269,7 +277,7 @@ public class IndexManager implements Closeable {
// we don't want to cache this searcher because it's based on a writer, so we want to get
// new values the next time that we search.
final ActiveIndexSearcher activeSearcher = new ActiveIndexSearcher(searcher, directoryReader, null, false);
final ActiveIndexSearcher activeSearcher = new ActiveIndexSearcher(searcher, absoluteFile, directoryReader, null, false);
currentlyCached.add(activeSearcher);
return activeSearcher.getSearcher();
@ -282,7 +290,7 @@ public class IndexManager implements Closeable {
public void returnIndexSearcher(final File indexDirectory, final IndexSearcher searcher) {
final File absoluteFile = indexDirectory.getAbsoluteFile();
logger.debug("Returning index searcher for {} to IndexManager", indexDirectory);
logger.trace("Returning index searcher for {} to IndexManager", indexDirectory);
lock.lock();
try {
@ -318,7 +326,8 @@ public class IndexManager implements Closeable {
return;
} else {
// the searcher is cached. Just leave it open.
logger.debug("Index searcher for {} is cached; leaving open", indexDirectory);
final int refCount = activeSearcher.decrementReferenceCount();
logger.debug("Index searcher for {} is cached; leaving open with reference count of {}", indexDirectory, refCount);
return;
}
} else {
@ -439,14 +448,17 @@ public class IndexManager implements Closeable {
private static class ActiveIndexSearcher implements Closeable {
private final IndexSearcher searcher;
private final DirectoryReader directoryReader;
private final File indexDirectory;
private final Directory directory;
private final boolean cache;
private boolean poisoned = false;
private final AtomicInteger referenceCount = new AtomicInteger(1);
private volatile boolean poisoned = false;
public ActiveIndexSearcher(final IndexSearcher searcher, final DirectoryReader directoryReader,
public ActiveIndexSearcher(final IndexSearcher searcher, final File indexDirectory, final DirectoryReader directoryReader,
final Directory directory, final boolean cache) {
this.searcher = searcher;
this.directoryReader = directoryReader;
this.indexDirectory = indexDirectory;
this.directory = directory;
this.cache = cache;
}
@ -467,9 +479,28 @@ public class IndexManager implements Closeable {
this.poisoned = true;
}
public int incrementReferenceCount() {
return referenceCount.incrementAndGet();
}
public int decrementReferenceCount() {
return referenceCount.decrementAndGet();
}
@Override
public void close() throws IOException {
IndexManager.close(directoryReader, directory);
final int updatedRefCount = referenceCount.decrementAndGet();
if (updatedRefCount <= 0) {
logger.debug("Decremented Reference Count for {} to {}; closing underlying directory reader", this, updatedRefCount);
IndexManager.close(directoryReader, directory);
} else {
logger.debug("Decremented Reference Count for {} to {}; leaving underlying directory reader open", this, updatedRefCount);
}
}
@Override
public String toString() {
return "ActiveIndexSearcher[directory=" + indexDirectory + ", cached=" + cache + ", poisoned=" + poisoned + "]";
}
}

View File

@ -92,11 +92,12 @@ public class IndexSearch {
final long searchStartNanos = System.nanoTime();
final long openSearcherNanos = searchStartNanos - start;
logger.debug("Searching {} for {}", this, provenanceQuery);
final TopDocs topDocs = searcher.search(luceneQuery, provenanceQuery.getMaxResults());
final long finishSearch = System.nanoTime();
final long searchNanos = finishSearch - searchStartNanos;
logger.debug("Searching {} took {} millis; opening searcher took {} millis", this,
logger.debug("Searching {} for {} took {} millis; opening searcher took {} millis", this, provenanceQuery,
TimeUnit.NANOSECONDS.toMillis(searchNanos), TimeUnit.NANOSECONDS.toMillis(openSearcherNanos));
if (topDocs.totalHits == 0) {

View File

@ -74,6 +74,7 @@ public class LineageQuery {
}
final long searchStart = System.nanoTime();
logger.debug("Searching {} for {}", indexDirectory, flowFileIdQuery);
final TopDocs uuidQueryTopDocs = searcher.search(flowFileIdQuery, MAX_QUERY_RESULTS);
final long searchEnd = System.nanoTime();

View File

@ -36,6 +36,7 @@ import org.apache.nifi.provenance.lineage.Lineage;
import org.apache.nifi.provenance.lineage.LineageEdge;
import org.apache.nifi.provenance.lineage.LineageNode;
import org.apache.nifi.provenance.lineage.LineageNodeType;
import org.apache.nifi.provenance.lucene.IndexManager;
import org.apache.nifi.provenance.lucene.IndexingAction;
import org.apache.nifi.provenance.search.Query;
import org.apache.nifi.provenance.search.QueryResult;
@ -59,6 +60,8 @@ import org.junit.rules.TestName;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.FileFilter;
@ -71,6 +74,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
@ -481,6 +485,165 @@ public class TestPersistentProvenanceRepository {
assertTrue(newRecordSet.getMatchingEvents().isEmpty());
}
// TODO: Switch to 10,000.
@Test(timeout = 1000000)
public void testModifyIndexWhileSearching() throws IOException, InterruptedException, ParseException {
final RepositoryConfiguration config = createConfiguration();
config.setMaxRecordLife(30, TimeUnit.SECONDS);
config.setMaxStorageCapacity(1024L * 1024L * 10);
config.setMaxEventFileLife(500, TimeUnit.MILLISECONDS);
config.setMaxEventFileCapacity(1024L * 1024L * 10);
config.setSearchableFields(new ArrayList<>(SearchableFields.getStandardFields()));
final CountDownLatch obtainIndexSearcherLatch = new CountDownLatch(2);
repo = new PersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS) {
private IndexManager wrappedManager = null;
// Create an IndexManager that adds a delay before returning the Index Searcher.
@Override
protected synchronized IndexManager getIndexManager() {
if (wrappedManager == null) {
final IndexManager mgr = super.getIndexManager();
final Logger logger = LoggerFactory.getLogger("IndexManager");
wrappedManager = new IndexManager() {
final AtomicInteger indexSearcherCount = new AtomicInteger(0);
@Override
public IndexSearcher borrowIndexSearcher(File indexDir) throws IOException {
final IndexSearcher searcher = mgr.borrowIndexSearcher(indexDir);
final int idx = indexSearcherCount.incrementAndGet();
obtainIndexSearcherLatch.countDown();
// The first searcher should sleep for 3 seconds. The second searcher should
// sleep for 5 seconds. This allows us to have two threads each obtain a Searcher
// and then have one of them finish searching and close the searcher if it's poisoned while the
// second thread is still holding the searcher
try {
if (idx == 1) {
Thread.sleep(3000L);
} else {
Thread.sleep(5000L);
}
} catch (InterruptedException e) {
throw new IOException("Interrupted", e);
}
logger.info("Releasing index searcher");
return searcher;
}
@Override
public IndexWriter borrowIndexWriter(File indexingDirectory) throws IOException {
return mgr.borrowIndexWriter(indexingDirectory);
}
@Override
public void close() throws IOException {
mgr.close();
}
@Override
public void removeIndex(File indexDirectory) {
mgr.removeIndex(indexDirectory);
}
@Override
public void returnIndexSearcher(File indexDirectory, IndexSearcher searcher) {
mgr.returnIndexSearcher(indexDirectory, searcher);
}
@Override
public void returnIndexWriter(File indexingDirectory, IndexWriter writer) {
mgr.returnIndexWriter(indexingDirectory, writer);
}
};
}
return wrappedManager;
}
};
repo.initialize(getEventReporter(), null, null);
final String uuid = "10000000-0000-0000-0000-000000000000";
final Map<String, String> attributes = new HashMap<>();
attributes.put("abc", "xyz");
attributes.put("xyz", "abc");
attributes.put("filename", "file-" + uuid);
final ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder();
builder.setEventTime(System.currentTimeMillis());
builder.setEventType(ProvenanceEventType.RECEIVE);
builder.setTransitUri("nifi://unit-test");
attributes.put("uuid", uuid);
builder.fromFlowFile(createFlowFile(3L, 3000L, attributes));
builder.setComponentId("1234");
builder.setComponentType("dummy processor");
for (int i = 0; i < 10; i++) {
builder.fromFlowFile(createFlowFile(i, 3000L, attributes));
attributes.put("uuid", "00000000-0000-0000-0000-00000000000" + i);
repo.registerEvent(builder.build());
}
repo.waitForRollover();
// Perform a query. This will ensure that an IndexSearcher is created and cached.
final Query query = new Query(UUID.randomUUID().toString());
query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.Filename, "file-*"));
query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.ComponentID, "12?4"));
query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.TransitURI, "nifi://*"));
query.setMaxResults(100);
// Run a query in a background thread. When this thread goes to obtain the IndexSearcher, it will have a 5 second delay.
// That delay will occur as the main thread is updating the index. This should result in the search creating a new Index Reader
// that can properly query the index.
final int numThreads = 2;
final CountDownLatch performSearchLatch = new CountDownLatch(numThreads);
final Runnable searchRunnable = new Runnable() {
@Override
public void run() {
QueryResult result;
try {
result = repo.queryEvents(query, createUser());
} catch (IOException e) {
e.printStackTrace();
Assert.fail(e.toString());
return;
}
System.out.println("Finished search: " + result);
performSearchLatch.countDown();
}
};
// Kick off the searcher threads
for (int i = 0; i < numThreads; i++) {
final Thread searchThread = new Thread(searchRunnable);
searchThread.start();
}
// Wait until we've obtained the Index Searchers before modifying the index.
obtainIndexSearcherLatch.await();
// add more events to the repo
for (int i = 0; i < 10; i++) {
builder.fromFlowFile(createFlowFile(i, 3000L, attributes));
attributes.put("uuid", "00000000-0000-0000-0000-00000000000" + i);
repo.registerEvent(builder.build());
}
// Force a rollover to occur. This will modify the index.
repo.rolloverWithLock(true);
// Wait for the repository to roll over.
repo.waitForRollover();
// Wait for the searches to complete.
performSearchLatch.await();
}
@Test
public void testIndexAndCompressOnRolloverAndSubsequentSearchMultipleStorageDirs() throws IOException, InterruptedException, ParseException {
final RepositoryConfiguration config = createConfiguration();