diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java index 89e1419869..1740f51521 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java @@ -1784,6 +1784,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository try { Thread.sleep(100L); } catch (final InterruptedException ie) { + Thread.currentThread().interrupt(); } } diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DocsReader.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DocsReader.java index c2a7609eb8..7bd800b99c 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DocsReader.java +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DocsReader.java @@ -17,15 +17,14 @@ package org.apache.nifi.provenance.lucene; import java.io.File; -import java.io.FileNotFoundException; import java.io.IOException; import java.nio.file.Path; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; -import java.util.HashSet; import java.util.LinkedHashSet; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -47,9 +46,6 @@ import org.slf4j.LoggerFactory; public class DocsReader { private final Logger logger = LoggerFactory.getLogger(DocsReader.class); - public DocsReader(final List storageDirectories) { - } - public Set read(final TopDocs topDocs, final IndexReader indexReader, final Collection allProvenanceLogFiles, final AtomicInteger retrievalCount, final int maxResults, final int maxAttributeChars) throws IOException { if (retrievalCount.get() >= maxResults) { @@ -100,101 +96,61 @@ public class DocsReader { } } - if ( record == null ) { - throw new IOException("Failed to find Provenance Event " + d); - } else { - return record; + if (record == null) { + logger.warn("Failed to read Provenance Event for '" + d + "'. The event file may be missing or corrupted"); } + + return record; } - public Set read(final List docs, final Collection allProvenanceLogFiles, - final AtomicInteger retrievalCount, final int maxResults, final int maxAttributeChars) throws IOException { - if (retrievalCount.get() >= maxResults) { - return Collections.emptySet(); - } - - LuceneUtil.sortDocsForRetrieval(docs); - - RecordReader reader = null; - String lastStorageFilename = null; - final Set matchingRecords = new LinkedHashSet<>(); + final AtomicInteger retrievalCount, final int maxResults, final int maxAttributeChars) throws IOException { final long start = System.nanoTime(); + + Set matchingRecords = new LinkedHashSet<>(); + if (retrievalCount.get() >= maxResults) { + return matchingRecords; + } + + Map> byStorageNameDocGroups = LuceneUtil.groupDocsByStorageFileName(docs); + + int eventsReadThisFile = 0; int logFileCount = 0; - final Set storageFilesToSkip = new HashSet<>(); - int eventsReadThisFile = 0; + for (String storageFileName : byStorageNameDocGroups.keySet()) { + File provenanceEventFile = LuceneUtil.getProvenanceLogFile(storageFileName, allProvenanceLogFiles); + if (provenanceEventFile != null) { + try (RecordReader reader = RecordReaders.newRecordReader(provenanceEventFile, allProvenanceLogFiles, + maxAttributeChars)) { + for (Document document : byStorageNameDocGroups.get(storageFileName)) { + ProvenanceEventRecord eRec = this.getRecord(document, reader); + if (eRec != null) { + matchingRecords.add(eRec); + eventsReadThisFile++; - try { - for (final Document d : docs) { - final String storageFilename = d.getField(FieldNames.STORAGE_FILENAME).stringValue(); - if ( storageFilesToSkip.contains(storageFilename) ) { - continue; - } - - try { - if (reader != null && storageFilename.equals(lastStorageFilename)) { - matchingRecords.add(getRecord(d, reader)); - eventsReadThisFile++; - - if ( retrievalCount.incrementAndGet() >= maxResults ) { - break; - } - } else { - logger.debug("Opening log file {}", storageFilename); - - logFileCount++; - if (reader != null) { - reader.close(); - } - - final List potentialFiles = LuceneUtil.getProvenanceLogFiles(storageFilename, allProvenanceLogFiles); - if (potentialFiles.isEmpty()) { - logger.warn("Could not find Provenance Log File with basename {} in the " - + "Provenance Repository; assuming file has expired and continuing without it", storageFilename); - storageFilesToSkip.add(storageFilename); - continue; - } - - if (potentialFiles.size() > 1) { - throw new FileNotFoundException("Found multiple Provenance Log Files with basename " + - storageFilename + " in the Provenance Repository"); - } - - for (final File file : potentialFiles) { - try { - if (reader != null) { - logger.debug("Read {} records from previous file", eventsReadThisFile); - } - - reader = RecordReaders.newRecordReader(file, allProvenanceLogFiles, maxAttributeChars); - matchingRecords.add(getRecord(d, reader)); - eventsReadThisFile = 1; - - if ( retrievalCount.incrementAndGet() >= maxResults ) { - break; - } - } catch (final IOException e) { - throw new IOException("Failed to retrieve record " + d + " from Provenance File " + file + " due to " + e, e); + if (retrievalCount.incrementAndGet() >= maxResults) { + break; } } } - } finally { - lastStorageFilename = storageFilename; + } catch (Exception e) { + logger.warn("Failed while trying to read Provenance Events. The event file '" + + provenanceEventFile.getAbsolutePath() + + "' may be missing or corrupted.", e); } - } - } finally { - if (reader != null) { - reader.close(); + } else { + logger.warn("Could not find Provenance Log File with " + + "basename {} in the Provenance Repository; assuming " + + "file has expired and continuing without it", storageFileName); } } logger.debug("Read {} records from previous file", eventsReadThisFile); final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start); - logger.debug("Took {} ms to read {} events from {} prov log files", millis, matchingRecords.size(), logFileCount); + logger.debug("Took {} ms to read {} events from {} prov log files", millis, matchingRecords.size(), + logFileCount); return matchingRecords; } - } diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexSearch.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexSearch.java index 7fcd8ab718..b8661df2fd 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexSearch.java +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexSearch.java @@ -89,7 +89,7 @@ public class IndexSearch { return sqr; } - final DocsReader docsReader = new DocsReader(repository.getConfiguration().getStorageDirectories()); + final DocsReader docsReader = new DocsReader(); matchingRecords = docsReader.read(topDocs, searcher.getIndexReader(), repository.getAllLogFiles(), retrievedCount, provenanceQuery.getMaxResults(), maxAttributeChars); diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LineageQuery.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LineageQuery.java index e9e6e63462..ce60e03d2c 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LineageQuery.java +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LineageQuery.java @@ -93,7 +93,7 @@ public class LineageQuery { final TopDocs uuidQueryTopDocs = searcher.search(query, MAX_QUERY_RESULTS); final long searchEnd = System.nanoTime(); - final DocsReader docsReader = new DocsReader(repo.getConfiguration().getStorageDirectories()); + final DocsReader docsReader = new DocsReader(); final Set recs = docsReader.read(uuidQueryTopDocs, searcher.getIndexReader(), repo.getAllLogFiles(), new AtomicInteger(0), Integer.MAX_VALUE, maxAttributeChars); diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LuceneUtil.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LuceneUtil.java index c622ea1a62..aa50b94700 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LuceneUtil.java +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LuceneUtil.java @@ -22,7 +22,9 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.Comparator; +import java.util.HashMap; import java.util.List; +import java.util.Map; import org.apache.nifi.processor.DataUnit; import org.apache.nifi.provenance.SearchableFields; @@ -160,4 +162,25 @@ public class LuceneUtil { } }); } + + /** + * Will group documents based on the {@link FieldNames#STORAGE_FILENAME}. + * + * @param documents + * list of {@link Document}s + * @return a {@link Map} of document groups with + * {@link FieldNames#STORAGE_FILENAME} as key and {@link List} of + * {@link Document}s as value. + */ + public static Map> groupDocsByStorageFileName(final List documents) { + Map> documentGroups = new HashMap<>(); + for (Document document : documents) { + String fileName = document.get(FieldNames.STORAGE_FILENAME); + if (!documentGroups.containsKey(fileName)) { + documentGroups.put(fileName, new ArrayList()); + } + documentGroups.get(fileName).add(document); + } + return documentGroups; + } } diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java index 5e4aed05eb..02b9216604 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java @@ -20,9 +20,11 @@ import static org.apache.nifi.provenance.TestUtil.createFlowFile; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; import java.io.File; import java.io.FileFilter; +import java.io.FileOutputStream; import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; @@ -36,6 +38,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.zip.GZIPOutputStream; import org.apache.lucene.analysis.Analyzer; import org.apache.lucene.analysis.core.SimpleAnalyzer; @@ -48,6 +51,7 @@ import org.apache.lucene.search.ScoreDoc; import org.apache.lucene.search.TopDocs; import org.apache.lucene.store.FSDirectory; import org.apache.nifi.events.EventReporter; +import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.provenance.lineage.EventNode; import org.apache.nifi.provenance.lineage.Lineage; import org.apache.nifi.provenance.lineage.LineageEdge; @@ -869,6 +873,72 @@ public class TestPersistentProvenanceRepository { } } + /** + * Here the event file is simply corrupted by virtue of not having any event + * records while having correct headers + */ + @Test + public void testWithWithEventFileMissingRecord() throws Exception { + File eventFile = this.prepCorruptedEventFileTests(); + + final Query query = new Query(UUID.randomUUID().toString()); + query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.ComponentID, "foo-*")); + query.setMaxResults(100); + + DataOutputStream in = new DataOutputStream(new GZIPOutputStream(new FileOutputStream(eventFile))); + in.writeUTF("BlahBlah"); + in.writeInt(4); + in.close(); + assertTrue(eventFile.exists()); + final QueryResult result = repo.queryEvents(query); + assertEquals(10, result.getMatchingEvents().size()); + } + + /** + * Here the event file is simply corrupted by virtue of being empty (0 + * bytes) + */ + @Test + public void testWithWithEventFileCorrupted() throws Exception { + File eventFile = this.prepCorruptedEventFileTests(); + + final Query query = new Query(UUID.randomUUID().toString()); + query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.ComponentID, "foo-*")); + query.setMaxResults(100); + DataOutputStream in = new DataOutputStream(new GZIPOutputStream(new FileOutputStream(eventFile))); + in.close(); + final QueryResult result = repo.queryEvents(query); + assertEquals(10, result.getMatchingEvents().size()); + } + + private File prepCorruptedEventFileTests() throws Exception { + RepositoryConfiguration config = createConfiguration(); + config.setMaxStorageCapacity(1024L * 1024L); + config.setMaxEventFileLife(500, TimeUnit.MILLISECONDS); + config.setMaxEventFileCapacity(1024L * 1024L); + config.setSearchableFields(new ArrayList<>(SearchableFields.getStandardFields())); + config.setDesiredIndexSize(10); + + repo = new PersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS); + repo.initialize(getEventReporter()); + + String uuid = UUID.randomUUID().toString(); + for (int i = 0; i < 20; i++) { + ProvenanceEventRecord record = repo.eventBuilder().fromFlowFile(mock(FlowFile.class)) + .setEventType(ProvenanceEventType.CREATE).setComponentId("foo-" + i).setComponentType("myComponent") + .setFlowFileUUID(uuid).build(); + repo.registerEvent(record); + if (i == 9) { + repo.waitForRollover(); + Thread.sleep(2000L); + } + } + repo.waitForRollover(); + File eventFile = new File(config.getStorageDirectories().get(0), "10.prov.gz"); + assertTrue(eventFile.delete()); + return eventFile; + } + @Test public void testIndexDirectoryRemoved() throws InterruptedException, IOException, ParseException { final RepositoryConfiguration config = createConfiguration();