diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/IndexConfiguration.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/IndexConfiguration.java index 4e808111d5..af7bff5421 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/IndexConfiguration.java +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/IndexConfiguration.java @@ -212,13 +212,14 @@ public class IndexConfiguration { final List dirs = new ArrayList<>(); lock.lock(); try { + // Sort directories so that we return the newest index first final List sortedIndexDirectories = getIndexDirectories(); Collections.sort(sortedIndexDirectories, new Comparator() { @Override public int compare(final File o1, final File o2) { final long epochTimestamp1 = getIndexStartTime(o1); final long epochTimestamp2 = getIndexStartTime(o2); - return Long.compare(epochTimestamp1, epochTimestamp2); + return Long.compare(epochTimestamp2, epochTimestamp1); } }); diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java index aee8277482..87b617f53c 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java @@ -218,6 +218,10 @@ public class PersistentProvenanceRepository implements ProvenanceRepository { rolloverExecutor = Executors.newScheduledThreadPool(numRolloverThreads, new NamedThreadFactory("Provenance Repository Rollover Thread")); } + protected IndexManager getIndexManager() { + return indexManager; + } + @Override public void initialize(final EventReporter eventReporter, final Authorizer authorizer, final ProvenanceAuthorizableFactory resourceFactory) throws IOException { writeLock.lock(); @@ -692,7 +696,7 @@ public class PersistentProvenanceRepository implements ProvenanceRepository { rolloverExecutor.shutdownNow(); queryExecService.shutdownNow(); - indexManager.close(); + getIndexManager().close(); if ( writers != null ) { for (final RecordWriter writer : writers) { @@ -1054,7 +1058,7 @@ public class PersistentProvenanceRepository implements ProvenanceRepository { // we can safely delete the first index because the latest event in the index is an event // that has already been expired from the repository. final File indexingDirectory = indexDirs.get(0); - indexManager.removeIndex(indexingDirectory); + getIndexManager().removeIndex(indexingDirectory); indexConfig.removeIndexDirectory(indexingDirectory); deleteDirectory(indexingDirectory); @@ -1522,7 +1526,7 @@ public class PersistentProvenanceRepository implements ProvenanceRepository { logger.warn("Merged Journal File {} already exists; however, all partial journal files also exist " + "so assuming that the merge did not finish. Repeating procedure in order to ensure consistency."); - final DeleteIndexAction deleteAction = new DeleteIndexAction(this, indexConfig, indexManager); + final DeleteIndexAction deleteAction = new DeleteIndexAction(this, indexConfig, getIndexManager()); try { deleteAction.execute(suggestedMergeFile); } catch (final Exception e) { @@ -1658,7 +1662,7 @@ public class PersistentProvenanceRepository implements ProvenanceRepository { final AtomicBoolean finishedAdding = new AtomicBoolean(false); final List> futures = new ArrayList<>(); - final IndexWriter indexWriter = indexManager.borrowIndexWriter(indexingDirectory); + final IndexWriter indexWriter = getIndexManager().borrowIndexWriter(indexingDirectory); try { final ExecutorService exec = Executors.newFixedThreadPool(configuration.getIndexThreadPoolSize(), new ThreadFactory() { @Override @@ -1781,7 +1785,7 @@ public class PersistentProvenanceRepository implements ProvenanceRepository { } } } finally { - indexManager.returnIndexWriter(indexingDirectory, indexWriter); + getIndexManager().returnIndexWriter(indexingDirectory, indexWriter); } indexConfig.setMaxIdIndexed(maxId); @@ -1984,7 +1988,7 @@ public class PersistentProvenanceRepository implements ProvenanceRepository { * @return an Iterator of ProvenanceEventRecord that match the query * @throws IOException if unable to perform the query */ - public Iterator queryLucene(final org.apache.lucene.search.Query luceneQuery) throws IOException { + Iterator queryLucene(final org.apache.lucene.search.Query luceneQuery) throws IOException { final List indexFiles = indexConfig.getIndexDirectories(); final AtomicLong hits = new AtomicLong(0L); @@ -2471,7 +2475,7 @@ public class PersistentProvenanceRepository implements ProvenanceRepository { @Override public void run() { try { - final IndexSearch search = new IndexSearch(PersistentProvenanceRepository.this, indexDir, indexManager, maxAttributeChars); + final IndexSearch search = new IndexSearch(PersistentProvenanceRepository.this, indexDir, getIndexManager(), maxAttributeChars); final StandardQueryResult queryResult = search.search(query, user, retrievalCount, firstEventTimestamp); submission.getResult().update(queryResult.getMatchingEvents(), queryResult.getTotalHitCount()); } catch (final Throwable t) { @@ -2511,7 +2515,7 @@ public class PersistentProvenanceRepository implements ProvenanceRepository { try { final Set matchingRecords = LineageQuery.computeLineageForFlowFiles(PersistentProvenanceRepository.this, - indexManager, indexDir, null, flowFileUuids, maxAttributeChars); + getIndexManager(), indexDir, null, flowFileUuids, maxAttributeChars); final StandardLineageResult result = submission.getResult(); result.update(replaceUnauthorizedWithPlaceholders(matchingRecords, user)); diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexManager.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexManager.java index 57d0d78283..07cd1903b2 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexManager.java +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexManager.java @@ -26,6 +26,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -86,7 +87,7 @@ public class IndexManager implements Closeable { public IndexWriter borrowIndexWriter(final File indexingDirectory) throws IOException { final File absoluteFile = indexingDirectory.getAbsoluteFile(); - logger.debug("Borrowing index writer for {}", indexingDirectory); + logger.trace("Borrowing index writer for {}", indexingDirectory); lock.lock(); try { @@ -124,6 +125,7 @@ public class IndexManager implements Closeable { final List searchers = activeSearchers.get(absoluteFile); if ( searchers != null ) { for (final ActiveIndexSearcher activeSearcher : searchers) { + logger.debug("Poisoning {} because it is searching {}, which is getting updated", activeSearcher, indexingDirectory); activeSearcher.poison(); } } @@ -141,7 +143,7 @@ public class IndexManager implements Closeable { public void returnIndexWriter(final File indexingDirectory, final IndexWriter writer) { final File absoluteFile = indexingDirectory.getAbsoluteFile(); - logger.debug("Returning Index Writer for {} to IndexManager", indexingDirectory); + logger.trace("Returning Index Writer for {} to IndexManager", indexingDirectory); lock.lock(); try { @@ -154,7 +156,7 @@ public class IndexManager implements Closeable { writer.close(); } else if ( count.getCount() <= 1 ) { // we are finished with this writer. - logger.debug("Closing Index Writer for {}", indexingDirectory); + logger.debug("Decrementing count for Index Writer for {} to {}; Closing writer", indexingDirectory, count.getCount() - 1); count.close(); } else { // decrement the count. @@ -175,7 +177,7 @@ public class IndexManager implements Closeable { public IndexSearcher borrowIndexSearcher(final File indexDir) throws IOException { final File absoluteFile = indexDir.getAbsoluteFile(); - logger.debug("Borrowing index searcher for {}", indexDir); + logger.trace("Borrowing index searcher for {}", indexDir); lock.lock(); try { @@ -210,7 +212,8 @@ public class IndexManager implements Closeable { continue; } - logger.debug("Providing previously cached index searcher for {}", indexDir); + final int referenceCount = searcher.incrementReferenceCount(); + logger.debug("Providing previously cached index searcher for {} and incrementing Reference Count to {}", indexDir, referenceCount); return searcher.getSearcher(); } } @@ -219,7 +222,9 @@ public class IndexManager implements Closeable { // from the cache so that we don't try to use them again later. for ( final ActiveIndexSearcher searcher : expired ) { try { + logger.debug("Closing {}", searcher); searcher.close(); + logger.trace("Closed {}", searcher); } catch (final Exception e) { logger.debug("Failed to close 'expired' IndexSearcher {}", searcher); } @@ -239,11 +244,14 @@ public class IndexManager implements Closeable { final IndexSearcher searcher = new IndexSearcher(directoryReader); // we want to cache the searcher that we create, since it's just a reader. - final ActiveIndexSearcher cached = new ActiveIndexSearcher(searcher, directoryReader, directory, true); + final ActiveIndexSearcher cached = new ActiveIndexSearcher(searcher, absoluteFile, directoryReader, directory, true); currentlyCached.add(cached); return cached.getSearcher(); } catch (final IOException e) { + logger.error("Failed to create Index Searcher for {} due to {}", absoluteFile, e.toString()); + logger.error("", e); + try { directory.close(); } catch (final IOException ioe) { @@ -269,7 +277,7 @@ public class IndexManager implements Closeable { // we don't want to cache this searcher because it's based on a writer, so we want to get // new values the next time that we search. - final ActiveIndexSearcher activeSearcher = new ActiveIndexSearcher(searcher, directoryReader, null, false); + final ActiveIndexSearcher activeSearcher = new ActiveIndexSearcher(searcher, absoluteFile, directoryReader, null, false); currentlyCached.add(activeSearcher); return activeSearcher.getSearcher(); @@ -282,7 +290,7 @@ public class IndexManager implements Closeable { public void returnIndexSearcher(final File indexDirectory, final IndexSearcher searcher) { final File absoluteFile = indexDirectory.getAbsoluteFile(); - logger.debug("Returning index searcher for {} to IndexManager", indexDirectory); + logger.trace("Returning index searcher for {} to IndexManager", indexDirectory); lock.lock(); try { @@ -318,7 +326,8 @@ public class IndexManager implements Closeable { return; } else { // the searcher is cached. Just leave it open. - logger.debug("Index searcher for {} is cached; leaving open", indexDirectory); + final int refCount = activeSearcher.decrementReferenceCount(); + logger.debug("Index searcher for {} is cached; leaving open with reference count of {}", indexDirectory, refCount); return; } } else { @@ -439,14 +448,17 @@ public class IndexManager implements Closeable { private static class ActiveIndexSearcher implements Closeable { private final IndexSearcher searcher; private final DirectoryReader directoryReader; + private final File indexDirectory; private final Directory directory; private final boolean cache; - private boolean poisoned = false; + private final AtomicInteger referenceCount = new AtomicInteger(1); + private volatile boolean poisoned = false; - public ActiveIndexSearcher(final IndexSearcher searcher, final DirectoryReader directoryReader, + public ActiveIndexSearcher(final IndexSearcher searcher, final File indexDirectory, final DirectoryReader directoryReader, final Directory directory, final boolean cache) { this.searcher = searcher; this.directoryReader = directoryReader; + this.indexDirectory = indexDirectory; this.directory = directory; this.cache = cache; } @@ -467,9 +479,28 @@ public class IndexManager implements Closeable { this.poisoned = true; } + public int incrementReferenceCount() { + return referenceCount.incrementAndGet(); + } + + public int decrementReferenceCount() { + return referenceCount.decrementAndGet(); + } + @Override public void close() throws IOException { - IndexManager.close(directoryReader, directory); + final int updatedRefCount = referenceCount.decrementAndGet(); + if (updatedRefCount <= 0) { + logger.debug("Decremented Reference Count for {} to {}; closing underlying directory reader", this, updatedRefCount); + IndexManager.close(directoryReader, directory); + } else { + logger.debug("Decremented Reference Count for {} to {}; leaving underlying directory reader open", this, updatedRefCount); + } + } + + @Override + public String toString() { + return "ActiveIndexSearcher[directory=" + indexDirectory + ", cached=" + cache + ", poisoned=" + poisoned + "]"; } } diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexSearch.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexSearch.java index 00e5f3849a..8d7df8bc98 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexSearch.java +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexSearch.java @@ -92,11 +92,12 @@ public class IndexSearch { final long searchStartNanos = System.nanoTime(); final long openSearcherNanos = searchStartNanos - start; + logger.debug("Searching {} for {}", this, provenanceQuery); final TopDocs topDocs = searcher.search(luceneQuery, provenanceQuery.getMaxResults()); final long finishSearch = System.nanoTime(); final long searchNanos = finishSearch - searchStartNanos; - logger.debug("Searching {} took {} millis; opening searcher took {} millis", this, + logger.debug("Searching {} for {} took {} millis; opening searcher took {} millis", this, provenanceQuery, TimeUnit.NANOSECONDS.toMillis(searchNanos), TimeUnit.NANOSECONDS.toMillis(openSearcherNanos)); if (topDocs.totalHits == 0) { diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LineageQuery.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LineageQuery.java index 2706082454..1b13504d63 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LineageQuery.java +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LineageQuery.java @@ -74,6 +74,7 @@ public class LineageQuery { } final long searchStart = System.nanoTime(); + logger.debug("Searching {} for {}", indexDirectory, flowFileIdQuery); final TopDocs uuidQueryTopDocs = searcher.search(flowFileIdQuery, MAX_QUERY_RESULTS); final long searchEnd = System.nanoTime(); diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java index d7738e7f0b..b78dfcddc8 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java @@ -36,6 +36,7 @@ import org.apache.nifi.provenance.lineage.Lineage; import org.apache.nifi.provenance.lineage.LineageEdge; import org.apache.nifi.provenance.lineage.LineageNode; import org.apache.nifi.provenance.lineage.LineageNodeType; +import org.apache.nifi.provenance.lucene.IndexManager; import org.apache.nifi.provenance.lucene.IndexingAction; import org.apache.nifi.provenance.search.Query; import org.apache.nifi.provenance.search.QueryResult; @@ -59,6 +60,8 @@ import org.junit.rules.TestName; import org.mockito.Mockito; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.File; import java.io.FileFilter; @@ -71,6 +74,7 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.UUID; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @@ -481,6 +485,165 @@ public class TestPersistentProvenanceRepository { assertTrue(newRecordSet.getMatchingEvents().isEmpty()); } + // TODO: Switch to 10,000. + @Test(timeout = 1000000) + public void testModifyIndexWhileSearching() throws IOException, InterruptedException, ParseException { + final RepositoryConfiguration config = createConfiguration(); + config.setMaxRecordLife(30, TimeUnit.SECONDS); + config.setMaxStorageCapacity(1024L * 1024L * 10); + config.setMaxEventFileLife(500, TimeUnit.MILLISECONDS); + config.setMaxEventFileCapacity(1024L * 1024L * 10); + config.setSearchableFields(new ArrayList<>(SearchableFields.getStandardFields())); + + final CountDownLatch obtainIndexSearcherLatch = new CountDownLatch(2); + repo = new PersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS) { + private IndexManager wrappedManager = null; + + // Create an IndexManager that adds a delay before returning the Index Searcher. + @Override + protected synchronized IndexManager getIndexManager() { + if (wrappedManager == null) { + final IndexManager mgr = super.getIndexManager(); + final Logger logger = LoggerFactory.getLogger("IndexManager"); + + wrappedManager = new IndexManager() { + final AtomicInteger indexSearcherCount = new AtomicInteger(0); + + @Override + public IndexSearcher borrowIndexSearcher(File indexDir) throws IOException { + final IndexSearcher searcher = mgr.borrowIndexSearcher(indexDir); + final int idx = indexSearcherCount.incrementAndGet(); + obtainIndexSearcherLatch.countDown(); + + // The first searcher should sleep for 3 seconds. The second searcher should + // sleep for 5 seconds. This allows us to have two threads each obtain a Searcher + // and then have one of them finish searching and close the searcher if it's poisoned while the + // second thread is still holding the searcher + try { + if (idx == 1) { + Thread.sleep(3000L); + } else { + Thread.sleep(5000L); + } + } catch (InterruptedException e) { + throw new IOException("Interrupted", e); + } + + logger.info("Releasing index searcher"); + return searcher; + } + + @Override + public IndexWriter borrowIndexWriter(File indexingDirectory) throws IOException { + return mgr.borrowIndexWriter(indexingDirectory); + } + + @Override + public void close() throws IOException { + mgr.close(); + } + + @Override + public void removeIndex(File indexDirectory) { + mgr.removeIndex(indexDirectory); + } + + @Override + public void returnIndexSearcher(File indexDirectory, IndexSearcher searcher) { + mgr.returnIndexSearcher(indexDirectory, searcher); + } + + @Override + public void returnIndexWriter(File indexingDirectory, IndexWriter writer) { + mgr.returnIndexWriter(indexingDirectory, writer); + } + }; + } + + return wrappedManager; + } + }; + + repo.initialize(getEventReporter(), null, null); + + final String uuid = "10000000-0000-0000-0000-000000000000"; + final Map attributes = new HashMap<>(); + attributes.put("abc", "xyz"); + attributes.put("xyz", "abc"); + attributes.put("filename", "file-" + uuid); + + final ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder(); + builder.setEventTime(System.currentTimeMillis()); + builder.setEventType(ProvenanceEventType.RECEIVE); + builder.setTransitUri("nifi://unit-test"); + attributes.put("uuid", uuid); + builder.fromFlowFile(createFlowFile(3L, 3000L, attributes)); + builder.setComponentId("1234"); + builder.setComponentType("dummy processor"); + + for (int i = 0; i < 10; i++) { + builder.fromFlowFile(createFlowFile(i, 3000L, attributes)); + attributes.put("uuid", "00000000-0000-0000-0000-00000000000" + i); + repo.registerEvent(builder.build()); + } + + repo.waitForRollover(); + + // Perform a query. This will ensure that an IndexSearcher is created and cached. + final Query query = new Query(UUID.randomUUID().toString()); + query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.Filename, "file-*")); + query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.ComponentID, "12?4")); + query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.TransitURI, "nifi://*")); + query.setMaxResults(100); + + // Run a query in a background thread. When this thread goes to obtain the IndexSearcher, it will have a 5 second delay. + // That delay will occur as the main thread is updating the index. This should result in the search creating a new Index Reader + // that can properly query the index. + final int numThreads = 2; + final CountDownLatch performSearchLatch = new CountDownLatch(numThreads); + final Runnable searchRunnable = new Runnable() { + @Override + public void run() { + QueryResult result; + try { + result = repo.queryEvents(query, createUser()); + } catch (IOException e) { + e.printStackTrace(); + Assert.fail(e.toString()); + return; + } + + System.out.println("Finished search: " + result); + performSearchLatch.countDown(); + } + }; + + // Kick off the searcher threads + for (int i = 0; i < numThreads; i++) { + final Thread searchThread = new Thread(searchRunnable); + searchThread.start(); + } + + // Wait until we've obtained the Index Searchers before modifying the index. + obtainIndexSearcherLatch.await(); + + // add more events to the repo + for (int i = 0; i < 10; i++) { + builder.fromFlowFile(createFlowFile(i, 3000L, attributes)); + attributes.put("uuid", "00000000-0000-0000-0000-00000000000" + i); + repo.registerEvent(builder.build()); + } + + // Force a rollover to occur. This will modify the index. + repo.rolloverWithLock(true); + + // Wait for the repository to roll over. + repo.waitForRollover(); + + // Wait for the searches to complete. + performSearchLatch.await(); + } + @Test public void testIndexAndCompressOnRolloverAndSubsequentSearchMultipleStorageDirs() throws IOException, InterruptedException, ParseException { final RepositoryConfiguration config = createConfiguration();