diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java index fe89a5e1c6..214fc7cc17 100644 --- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java @@ -699,7 +699,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository if (bytesWrittenSinceRollover.get() >= configuration.getMaxEventFileCapacity()) { try { rollover(false); - } catch (IOException e) { + } catch (final IOException e) { logger.error("Failed to Rollover Provenance Event Repository file due to {}", e.toString()); logger.error("", e); eventReporter.reportEvent(Severity.ERROR, EVENT_CATEGORY, "Failed to Rollover Provenance Event Repository file due to " + e.toString()); @@ -1001,7 +1001,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository if (fileRolledOver == null) { return; } - File file = fileRolledOver; + final File file = fileRolledOver; // update our map of id to Path // need lock to update the map, even though it's an AtomicReference, AtomicReference allows those doing a @@ -1010,7 +1010,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository writeLock.lock(); try { final Long fileFirstEventId = Long.valueOf(LuceneUtil.substringBefore(fileRolledOver.getName(), ".")); - SortedMap newIdToPathMap = new TreeMap<>(new PathMapComparator()); + final SortedMap newIdToPathMap = new TreeMap<>(new PathMapComparator()); newIdToPathMap.putAll(idToPathMap.get()); newIdToPathMap.put(fileFirstEventId, file.toPath()); idToPathMap.set(newIdToPathMap); @@ -1452,11 +1452,11 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository try (final DirectoryReader directoryReader = DirectoryReader.open(FSDirectory.open(indexDirectory))) { final IndexSearcher searcher = new IndexSearcher(directoryReader); - TopDocs topDocs = searcher.search(luceneQuery, 10000000); + final TopDocs topDocs = searcher.search(luceneQuery, 10000000); logger.info("For {}, Top Docs has {} hits; reading Lucene results", indexDirectory, topDocs.scoreDocs.length); if (topDocs.totalHits > 0) { - for (ScoreDoc scoreDoc : topDocs.scoreDocs) { + for (final ScoreDoc scoreDoc : topDocs.scoreDocs) { final int docId = scoreDoc.doc; final Document d = directoryReader.document(docId); localScoreDocs.add(d); @@ -1649,16 +1649,16 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository } switch (event.getEventType()) { - case CLONE: - case FORK: - case JOIN: - case REPLAY: - return submitLineageComputation(event.getChildUuids(), LineageComputationType.EXPAND_CHILDREN, eventId, event.getEventTime(), Long.MAX_VALUE); - default: - final AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_CHILDREN, eventId, Collections.emptyList(), 1); - lineageSubmissionMap.put(submission.getLineageIdentifier(), submission); - submission.getResult().setError("Event ID " + eventId + " indicates an event of type " + event.getEventType() + " so its children cannot be expanded"); - return submission; + case CLONE: + case FORK: + case JOIN: + case REPLAY: + return submitLineageComputation(event.getChildUuids(), LineageComputationType.EXPAND_CHILDREN, eventId, event.getEventTime(), Long.MAX_VALUE); + default: + final AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_CHILDREN, eventId, Collections.emptyList(), 1); + lineageSubmissionMap.put(submission.getLineageIdentifier(), submission); + submission.getResult().setError("Event ID " + eventId + " indicates an event of type " + event.getEventType() + " so its children cannot be expanded"); + return submission; } } catch (final IOException ioe) { final AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_CHILDREN, eventId, Collections.emptyList(), 1); @@ -1686,17 +1686,17 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository } switch (event.getEventType()) { - case JOIN: - case FORK: - case CLONE: - case REPLAY: - return submitLineageComputation(event.getParentUuids(), LineageComputationType.EXPAND_PARENTS, eventId, 0L, event.getEventTime()); - default: { - final AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_PARENTS, eventId, Collections.emptyList(), 1); - lineageSubmissionMap.put(submission.getLineageIdentifier(), submission); - submission.getResult().setError("Event ID " + eventId + " indicates an event of type " + event.getEventType() + " so its parents cannot be expanded"); - return submission; - } + case JOIN: + case FORK: + case CLONE: + case REPLAY: + return submitLineageComputation(event.getParentUuids(), LineageComputationType.EXPAND_PARENTS, eventId, 0L, event.getEventTime()); + default: { + final AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_PARENTS, eventId, Collections.emptyList(), 1); + lineageSubmissionMap.put(submission.getLineageIdentifier(), submission); + submission.getResult().setError("Event ID " + eventId + " indicates an event of type " + event.getEventType() + " so its parents cannot be expanded"); + return submission; + } } } catch (final IOException ioe) { final AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_PARENTS, eventId, Collections.emptyList(), 1); @@ -1880,7 +1880,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository } try { - final Set matchingRecords = LineageQuery.computeLineageForFlowFiles(PersistentProvenanceRepository.this, indexDir, null, flowFileUuids); + final Set matchingRecords = LineageQuery.computeLineageForFlowFiles(PersistentProvenanceRepository.this, indexManager, indexDir, null, flowFileUuids); final StandardLineageResult result = submission.getResult(); result.update(matchingRecords); diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LineageQuery.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LineageQuery.java index 3f75c006ac..5e4f69d509 100644 --- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LineageQuery.java +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LineageQuery.java @@ -19,26 +19,23 @@ package org.apache.nifi.provenance.lucene; import static java.util.Objects.requireNonNull; import java.io.File; +import java.io.FileNotFoundException; import java.io.IOException; import java.util.Collection; +import java.util.Collections; import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; -import org.apache.nifi.provenance.PersistentProvenanceRepository; -import org.apache.nifi.provenance.ProvenanceEventRecord; -import org.apache.nifi.provenance.SearchableFields; - -import org.apache.lucene.index.DirectoryReader; -import org.apache.lucene.index.IndexReader; import org.apache.lucene.index.Term; import org.apache.lucene.search.BooleanClause.Occur; import org.apache.lucene.search.BooleanQuery; import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.TermQuery; import org.apache.lucene.search.TopDocs; -import org.apache.lucene.store.Directory; -import org.apache.lucene.store.FSDirectory; +import org.apache.nifi.provenance.PersistentProvenanceRepository; +import org.apache.nifi.provenance.ProvenanceEventRecord; +import org.apache.nifi.provenance.SearchableFields; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -48,7 +45,7 @@ public class LineageQuery { public static final int MAX_LINEAGE_UUIDS = 100; private static final Logger logger = LoggerFactory.getLogger(LineageQuery.class); - public static Set computeLineageForFlowFiles(final PersistentProvenanceRepository repo, final File indexDirectory, + public static Set computeLineageForFlowFiles(final PersistentProvenanceRepository repo, final IndexManager indexManager, final File indexDirectory, final String lineageIdentifier, final Collection flowFileUuids) throws IOException { if (requireNonNull(flowFileUuids).size() > MAX_LINEAGE_UUIDS) { throw new IllegalArgumentException(String.format("Cannot compute lineage for more than %s FlowFiles. This lineage contains %s.", MAX_LINEAGE_UUIDS, flowFileUuids.size())); @@ -58,52 +55,62 @@ public class LineageQuery { throw new IllegalArgumentException("Must specify either Lineage Identifier or FlowFile UUIDs to compute lineage"); } - try (final Directory fsDir = FSDirectory.open(indexDirectory); - final IndexReader indexReader = DirectoryReader.open(fsDir)) { - - final IndexSearcher searcher = new IndexSearcher(indexReader); - - // Create a query for all Events related to the FlowFiles of interest. We do this by adding all ID's as - // "SHOULD" clauses and then setting the minimum required to 1. - final BooleanQuery flowFileIdQuery; - if (flowFileUuids == null || flowFileUuids.isEmpty()) { - flowFileIdQuery = null; - } else { - flowFileIdQuery = new BooleanQuery(); - for (final String flowFileUuid : flowFileUuids) { - flowFileIdQuery.add(new TermQuery(new Term(SearchableFields.FlowFileUUID.getSearchableFieldName(), flowFileUuid)), Occur.SHOULD); - } - flowFileIdQuery.setMinimumNumberShouldMatch(1); - } - - BooleanQuery query; - if (lineageIdentifier == null) { - query = flowFileIdQuery; - } else { - final BooleanQuery lineageIdQuery = new BooleanQuery(); - lineageIdQuery.add(new TermQuery(new Term(SearchableFields.LineageIdentifier.getSearchableFieldName(), lineageIdentifier)), Occur.MUST); - - if (flowFileIdQuery == null) { - query = lineageIdQuery; + final IndexSearcher searcher; + try { + searcher = indexManager.borrowIndexSearcher(indexDirectory); + try { + // Create a query for all Events related to the FlowFiles of interest. We do this by adding all ID's as + // "SHOULD" clauses and then setting the minimum required to 1. + final BooleanQuery flowFileIdQuery; + if (flowFileUuids == null || flowFileUuids.isEmpty()) { + flowFileIdQuery = null; } else { - query = new BooleanQuery(); - query.add(flowFileIdQuery, Occur.SHOULD); - query.add(lineageIdQuery, Occur.SHOULD); - query.setMinimumNumberShouldMatch(1); + flowFileIdQuery = new BooleanQuery(); + for (final String flowFileUuid : flowFileUuids) { + flowFileIdQuery.add(new TermQuery(new Term(SearchableFields.FlowFileUUID.getSearchableFieldName(), flowFileUuid)), Occur.SHOULD); + } + flowFileIdQuery.setMinimumNumberShouldMatch(1); } + + BooleanQuery query; + if (lineageIdentifier == null) { + query = flowFileIdQuery; + } else { + final BooleanQuery lineageIdQuery = new BooleanQuery(); + lineageIdQuery.add(new TermQuery(new Term(SearchableFields.LineageIdentifier.getSearchableFieldName(), lineageIdentifier)), Occur.MUST); + + if (flowFileIdQuery == null) { + query = lineageIdQuery; + } else { + query = new BooleanQuery(); + query.add(flowFileIdQuery, Occur.SHOULD); + query.add(lineageIdQuery, Occur.SHOULD); + query.setMinimumNumberShouldMatch(1); + } + } + + final long searchStart = System.nanoTime(); + final TopDocs uuidQueryTopDocs = searcher.search(query, MAX_QUERY_RESULTS); + final long searchEnd = System.nanoTime(); + + final DocsReader docsReader = new DocsReader(repo.getConfiguration().getStorageDirectories()); + final Set recs = docsReader.read(uuidQueryTopDocs, searcher.getIndexReader(), repo.getAllLogFiles(), new AtomicInteger(0), Integer.MAX_VALUE); + final long readDocsEnd = System.nanoTime(); + logger.debug("Finished Lineage Query; Lucene search took {} millis, reading records took {} millis", + TimeUnit.NANOSECONDS.toMillis(searchEnd - searchStart), TimeUnit.NANOSECONDS.toMillis(readDocsEnd - searchEnd)); + + return recs; + } finally { + indexManager.returnIndexSearcher(indexDirectory, searcher); + } + } catch (final FileNotFoundException fnfe) { + // nothing has been indexed yet, or the data has already aged off + logger.warn("Attempted to search Provenance Index {} but could not find the file due to {}", indexDirectory, fnfe); + if ( logger.isDebugEnabled() ) { + logger.warn("", fnfe); } - final long searchStart = System.nanoTime(); - final TopDocs uuidQueryTopDocs = searcher.search(query, MAX_QUERY_RESULTS); - final long searchEnd = System.nanoTime(); - - final DocsReader docsReader = new DocsReader(repo.getConfiguration().getStorageDirectories()); - final Set recs = docsReader.read(uuidQueryTopDocs, indexReader, repo.getAllLogFiles(), new AtomicInteger(0), Integer.MAX_VALUE); - final long readDocsEnd = System.nanoTime(); - logger.debug("Finished Lineage Query; Lucene search took {} millis, reading records took {} millis", - TimeUnit.NANOSECONDS.toMillis(searchEnd - searchStart), TimeUnit.NANOSECONDS.toMillis(readDocsEnd - searchEnd)); - - return recs; + return Collections.emptySet(); } }