mirror of https://github.com/apache/nifi.git
Merge branch 'persistent-prov-fixes' into develop
This commit is contained in:
commit
c5b961be5b
|
@ -48,18 +48,22 @@ public class DocsReader {
|
|||
return Collections.emptySet();
|
||||
}
|
||||
|
||||
final List<Document> docs = new ArrayList<>();
|
||||
final int numDocs = Math.min(topDocs.scoreDocs.length, maxResults);
|
||||
final List<Document> docs = new ArrayList<>(numDocs);
|
||||
|
||||
for (ScoreDoc scoreDoc : topDocs.scoreDocs) {
|
||||
for (final ScoreDoc scoreDoc : topDocs.scoreDocs) {
|
||||
final int docId = scoreDoc.doc;
|
||||
final Document d = indexReader.document(docId);
|
||||
docs.add(d);
|
||||
if ( retrievalCount.incrementAndGet() >= maxResults ) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
return read(docs, allProvenanceLogFiles, retrievalCount, maxResults);
|
||||
return read(docs, allProvenanceLogFiles);
|
||||
}
|
||||
|
||||
public Set<ProvenanceEventRecord> read(final List<Document> docs, final Collection<Path> allProvenanceLogFiles, final AtomicInteger retrievalCount, final int maxResults) throws IOException {
|
||||
public Set<ProvenanceEventRecord> read(final List<Document> docs, final Collection<Path> allProvenanceLogFiles) throws IOException {
|
||||
LuceneUtil.sortDocsForRetrieval(docs);
|
||||
|
||||
RecordReader reader = null;
|
||||
|
@ -79,9 +83,6 @@ public class DocsReader {
|
|||
reader.skipTo(byteOffset);
|
||||
final StandardProvenanceEventRecord record = reader.nextRecord();
|
||||
matchingRecords.add(record);
|
||||
if (retrievalCount.incrementAndGet() >= maxResults) {
|
||||
break;
|
||||
}
|
||||
} catch (final IOException e) {
|
||||
throw new FileNotFoundException("Could not find Provenance Log File with basename " + storageFilename + " in the Provenance Repository");
|
||||
}
|
||||
|
@ -91,7 +92,7 @@ public class DocsReader {
|
|||
reader.close();
|
||||
}
|
||||
|
||||
final List<File> potentialFiles = LuceneUtil.getProvenanceLogFiles(storageFilename, allProvenanceLogFiles);
|
||||
List<File> potentialFiles = LuceneUtil.getProvenanceLogFiles(storageFilename, allProvenanceLogFiles);
|
||||
if (potentialFiles.isEmpty()) {
|
||||
throw new FileNotFoundException("Could not find Provenance Log File with basename " + storageFilename + " in the Provenance Repository");
|
||||
}
|
||||
|
@ -108,9 +109,6 @@ public class DocsReader {
|
|||
|
||||
final StandardProvenanceEventRecord record = reader.nextRecord();
|
||||
matchingRecords.add(record);
|
||||
if (retrievalCount.incrementAndGet() >= maxResults) {
|
||||
break;
|
||||
}
|
||||
} catch (final IOException e) {
|
||||
throw new IOException("Failed to retrieve record from Provenance File " + file + " due to " + e, e);
|
||||
}
|
||||
|
|
|
@ -33,6 +33,8 @@ import org.apache.nifi.provenance.lucene.LuceneUtil;
|
|||
public class RecordReaders {
|
||||
|
||||
public static RecordReader newRecordReader(File file, final Collection<Path> provenanceLogFiles) throws IOException {
|
||||
final File originalFile = file;
|
||||
|
||||
if (!file.exists()) {
|
||||
if (provenanceLogFiles == null) {
|
||||
throw new FileNotFoundException(file.toString());
|
||||
|
@ -47,11 +49,44 @@ public class RecordReaders {
|
|||
}
|
||||
}
|
||||
|
||||
if (file == null || !file.exists()) {
|
||||
throw new FileNotFoundException(file.toString());
|
||||
InputStream fis = null;
|
||||
if ( file.exists() ) {
|
||||
try {
|
||||
fis = new FileInputStream(file);
|
||||
} catch (final FileNotFoundException fnfe) {
|
||||
fis = null;
|
||||
}
|
||||
}
|
||||
|
||||
openStream: while ( fis == null ) {
|
||||
final File dir = file.getParentFile();
|
||||
final String baseName = LuceneUtil.substringBefore(file.getName(), ".");
|
||||
|
||||
// depending on which rollover actions have occurred, we could have 3 possibilities for the
|
||||
// filename that we need. The majority of the time, we will use the extension ".prov.indexed.gz"
|
||||
// because most often we are compressing on rollover and most often we have already finished
|
||||
// compressing by the time that we are querying the data.
|
||||
for ( final String extension : new String[] {".indexed.prov.gz", ".indexed.prov", ".prov"} ) {
|
||||
file = new File(dir, baseName + extension);
|
||||
if ( file.exists() ) {
|
||||
try {
|
||||
fis = new FileInputStream(file);
|
||||
break openStream;
|
||||
} catch (final FileNotFoundException fnfe) {
|
||||
// file was modified by a RolloverAction after we verified that it exists but before we could
|
||||
// create an InputStream for it. Start over.
|
||||
fis = null;
|
||||
continue openStream;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
break;
|
||||
}
|
||||
|
||||
final InputStream fis = new FileInputStream(file);
|
||||
if ( fis == null ) {
|
||||
throw new FileNotFoundException("Unable to locate file " + originalFile);
|
||||
}
|
||||
final InputStream readableStream;
|
||||
if (file.getName().endsWith(".gz")) {
|
||||
readableStream = new BufferedInputStream(new GZIPInputStream(fis));
|
||||
|
|
Loading…
Reference in New Issue