diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DocsReader.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DocsReader.java index af5fe5079f..6446a35497 100644 --- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DocsReader.java +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DocsReader.java @@ -48,18 +48,22 @@ public class DocsReader { return Collections.emptySet(); } - final List docs = new ArrayList<>(); + final int numDocs = Math.min(topDocs.scoreDocs.length, maxResults); + final List docs = new ArrayList<>(numDocs); - for (ScoreDoc scoreDoc : topDocs.scoreDocs) { + for (final ScoreDoc scoreDoc : topDocs.scoreDocs) { final int docId = scoreDoc.doc; final Document d = indexReader.document(docId); docs.add(d); + if ( retrievalCount.incrementAndGet() >= maxResults ) { + break; + } } - return read(docs, allProvenanceLogFiles, retrievalCount, maxResults); + return read(docs, allProvenanceLogFiles); } - public Set read(final List docs, final Collection allProvenanceLogFiles, final AtomicInteger retrievalCount, final int maxResults) throws IOException { + public Set read(final List docs, final Collection allProvenanceLogFiles) throws IOException { LuceneUtil.sortDocsForRetrieval(docs); RecordReader reader = null; @@ -79,9 +83,6 @@ public class DocsReader { reader.skipTo(byteOffset); final StandardProvenanceEventRecord record = reader.nextRecord(); matchingRecords.add(record); - if (retrievalCount.incrementAndGet() >= maxResults) { - break; - } } catch (final IOException e) { throw new FileNotFoundException("Could not find Provenance Log File with basename " + storageFilename + " in the Provenance Repository"); } @@ -91,7 +92,7 @@ public class DocsReader { reader.close(); } - final List potentialFiles = LuceneUtil.getProvenanceLogFiles(storageFilename, allProvenanceLogFiles); + List potentialFiles = LuceneUtil.getProvenanceLogFiles(storageFilename, allProvenanceLogFiles); if (potentialFiles.isEmpty()) { throw new FileNotFoundException("Could not find Provenance Log File with basename " + storageFilename + " in the Provenance Repository"); } @@ -108,9 +109,6 @@ public class DocsReader { final StandardProvenanceEventRecord record = reader.nextRecord(); matchingRecords.add(record); - if (retrievalCount.incrementAndGet() >= maxResults) { - break; - } } catch (final IOException e) { throw new IOException("Failed to retrieve record from Provenance File " + file + " due to " + e, e); } diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReaders.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReaders.java index f902b9275f..8f0699529c 100644 --- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReaders.java +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReaders.java @@ -33,6 +33,8 @@ import org.apache.nifi.provenance.lucene.LuceneUtil; public class RecordReaders { public static RecordReader newRecordReader(File file, final Collection provenanceLogFiles) throws IOException { + final File originalFile = file; + if (!file.exists()) { if (provenanceLogFiles == null) { throw new FileNotFoundException(file.toString()); @@ -47,11 +49,44 @@ public class RecordReaders { } } - if (file == null || !file.exists()) { - throw new FileNotFoundException(file.toString()); + InputStream fis = null; + if ( file.exists() ) { + try { + fis = new FileInputStream(file); + } catch (final FileNotFoundException fnfe) { + fis = null; + } + } + + openStream: while ( fis == null ) { + final File dir = file.getParentFile(); + final String baseName = LuceneUtil.substringBefore(file.getName(), "."); + + // depending on which rollover actions have occurred, we could have 3 possibilities for the + // filename that we need. The majority of the time, we will use the extension ".prov.indexed.gz" + // because most often we are compressing on rollover and most often we have already finished + // compressing by the time that we are querying the data. + for ( final String extension : new String[] {".indexed.prov.gz", ".indexed.prov", ".prov"} ) { + file = new File(dir, baseName + extension); + if ( file.exists() ) { + try { + fis = new FileInputStream(file); + break openStream; + } catch (final FileNotFoundException fnfe) { + // file was modified by a RolloverAction after we verified that it exists but before we could + // create an InputStream for it. Start over. + fis = null; + continue openStream; + } + } + } + + break; } - final InputStream fis = new FileInputStream(file); + if ( fis == null ) { + throw new FileNotFoundException("Unable to locate file " + originalFile); + } final InputStream readableStream; if (file.getName().endsWith(".gz")) { readableStream = new BufferedInputStream(new GZIPInputStream(fis));