From 25146a5828bc5dd1d04c5252843bef93ea87afac Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Sat, 20 Jun 2015 08:52:14 -0400 Subject: [PATCH] NIFI-80: Truncate attribute values that exceed some threshold. Expose threshold as properties in nifi.properties file --- nifi/nifi-assembly/pom.xml | 1 + .../src/main/resources/conf/nifi.properties | 3 + .../nifi/provenance/IndexConfiguration.java | 2 +- .../PersistentProvenanceRepository.java | 90 ++++++++++++++++--- .../provenance/RepositoryConfiguration.java | 18 ++++ .../nifi/provenance/StandardRecordReader.java | 15 ++-- .../provenance/lucene/DeleteIndexAction.java | 6 +- .../nifi/provenance/lucene/DocsReader.java | 9 +- .../nifi/provenance/lucene/IndexSearch.java | 7 +- .../nifi/provenance/lucene/LineageQuery.java | 6 +- .../serialization/RecordReaders.java | 17 +++- .../TestPersistentProvenanceRepository.java | 39 +++++++- .../TestStandardRecordReaderWriter.java | 10 +-- 13 files changed, 180 insertions(+), 43 deletions(-) diff --git a/nifi/nifi-assembly/pom.xml b/nifi/nifi-assembly/pom.xml index 02a16f9384..9b17617b2a 100644 --- a/nifi/nifi-assembly/pom.xml +++ b/nifi/nifi-assembly/pom.xml @@ -276,6 +276,7 @@ language governing permissions and limitations under the License. --> 500 MB false 16 + 65536 100000 diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties index 90b3cdd07f..4043076774 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties @@ -82,6 +82,9 @@ nifi.provenance.repository.indexed.attributes=${nifi.provenance.repository.index # Large values for the shard size will result in more Java heap usage when searching the Provenance Repository # but should provide better performance nifi.provenance.repository.index.shard.size=${nifi.provenance.repository.index.shard.size} +# Indicates the maximum length that a FlowFile attribute can be when retrieving a Provenance Event from +# the repository. If the length of any attribute exceeds this value, it will be truncated when the event is retrieved. +nifi.provenance.repository.max.attribute.length=${nifi.provenance.repository.max.attribute.length} # Volatile Provenance Respository Properties nifi.provenance.repository.buffer.size=${nifi.provenance.repository.buffer.size} diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/IndexConfiguration.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/IndexConfiguration.java index 9ea793daf9..4e808111d5 100644 --- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/IndexConfiguration.java +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/IndexConfiguration.java @@ -85,7 +85,7 @@ public class IndexConfiguration { } private Long getFirstEntryTime(final File provenanceLogFile) { - try (final RecordReader reader = RecordReaders.newRecordReader(provenanceLogFile, null)) { + try (final RecordReader reader = RecordReaders.newRecordReader(provenanceLogFile, null, Integer.MAX_VALUE)) { final StandardProvenanceEventRecord firstRecord = reader.nextRecord(); if (firstRecord == null) { return provenanceLogFile.lastModified(); diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java index 5da5d6fe42..81d883a74e 100644 --- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java @@ -134,6 +134,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository private final IndexManager indexManager; private final boolean alwaysSync; private final int rolloverCheckMillis; + private final int maxAttributeChars; private final ScheduledExecutorService scheduledExecService; private final ScheduledExecutorService rolloverExecutor; @@ -167,6 +168,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository } this.configuration = configuration; + this.maxAttributeChars = configuration.getMaxAttributeChars(); for (final File file : configuration.getStorageDirectories()) { final Path storageDirectory = file.toPath(); @@ -289,6 +291,21 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository final Boolean alwaysSync = Boolean.parseBoolean(properties.getProperty("nifi.provenance.repository.always.sync", "false")); + final int defaultMaxAttrChars = 65536; + final String maxAttrLength = properties.getProperty("nifi.provenance.repository.max.attribute.length", String.valueOf(defaultMaxAttrChars)); + int maxAttrChars; + try { + maxAttrChars = Integer.parseInt(maxAttrLength); + // must be at least 36 characters because that's the length of the uuid attribute, + // which must be kept intact + if (maxAttrChars < 36) { + maxAttrChars = 36; + logger.warn("Found max attribute length property set to " + maxAttrLength + " but minimum length is 36; using 36 instead"); + } + } catch (final Exception e) { + maxAttrChars = defaultMaxAttrChars; + } + final List searchableFields = SearchableFieldParser.extractSearchableFields(indexedFieldString, true); final List searchableAttributes = SearchableFieldParser.extractSearchableFields(indexedAttrString, false); @@ -310,6 +327,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository config.setMaxStorageCapacity(maxStorageBytes); config.setQueryThreadPoolSize(queryThreads); config.setJournalCount(journalCount); + config.setMaxAttributeChars(maxAttrChars); if (shardSize != null) { config.setDesiredIndexSize(DataUnit.parseDataSize(shardSize, DataUnit.B).longValue()); @@ -337,6 +355,14 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository return writers; } + /** + * @return the maximum number of characters that any Event attribute should contain. If the event contains + * more characters than this, the attribute may be truncated on retrieval + */ + public int getMaxAttributeCharacters() { + return maxAttributeChars; + } + @Override public StandardProvenanceEventRecord.Builder eventBuilder() { return new StandardProvenanceEventRecord.Builder(); @@ -362,7 +388,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository } for (final Path path : paths) { - try (RecordReader reader = RecordReaders.newRecordReader(path.toFile(), getAllLogFiles())) { + try (RecordReader reader = RecordReaders.newRecordReader(path.toFile(), getAllLogFiles(), maxAttributeChars)) { // if this is the first record, try to find out the block index and jump directly to // the block index. This avoids having to read through a lot of data that we don't care about // just to get to the first record that we want. @@ -377,7 +403,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository } StandardProvenanceEventRecord record; - while (records.size() < maxRecords && ((record = reader.nextRecord()) != null)) { + while (records.size() < maxRecords && (record = reader.nextRecord()) != null) { if (record.getEventId() >= firstRecordId) { records.add(record); } @@ -507,7 +533,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository if (maxIdFile != null) { // Determine the max ID in the last file. - try (final RecordReader reader = RecordReaders.newRecordReader(maxIdFile, getAllLogFiles())) { + try (final RecordReader reader = RecordReaders.newRecordReader(maxIdFile, getAllLogFiles(), maxAttributeChars)) { final long eventId = reader.getMaxEventId(); if (eventId > maxId) { maxId = eventId; @@ -571,7 +597,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository // Read the records in the last file to find its max id if (greatestMinIdFile != null) { - try (final RecordReader recordReader = RecordReaders.newRecordReader(greatestMinIdFile, Collections.emptyList())) { + try (final RecordReader recordReader = RecordReaders.newRecordReader(greatestMinIdFile, Collections. emptyList(), maxAttributeChars)) { maxId = recordReader.getMaxEventId(); } } @@ -1224,7 +1250,10 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository try { for (final File journalFile : journalFiles) { try { - readers.add(RecordReaders.newRecordReader(journalFile, null)); + // Use MAX_VALUE for number of chars because we don't want to truncate the value as we write it + // out. This allows us to later decide that we want more characters and still be able to retrieve + // the entire event. + readers.add(RecordReaders.newRecordReader(journalFile, null, Integer.MAX_VALUE)); } catch (final EOFException eof) { // there's nothing here. Skip over it. } catch (final IOException ioe) { @@ -1314,7 +1343,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository indexingAction.index(record, indexWriter, blockIndex); maxId = record.getEventId(); - latestRecords.add(record); + latestRecords.add(truncateAttributes(record)); records++; // Remove this entry from the map @@ -1383,6 +1412,39 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository return writerFile; } + private StandardProvenanceEventRecord truncateAttributes(final StandardProvenanceEventRecord original) { + boolean requireTruncation = false; + + for (final Map.Entry entry : original.getAttributes().entrySet()) { + if (entry.getValue().length() > maxAttributeChars) { + requireTruncation = true; + break; + } + } + + if (!requireTruncation) { + return original; + } + + final StandardProvenanceEventRecord.Builder builder = new StandardProvenanceEventRecord.Builder().fromEvent(original); + builder.setAttributes(truncateAttributes(original.getPreviousAttributes()), truncateAttributes(original.getUpdatedAttributes())); + final StandardProvenanceEventRecord truncated = builder.build(); + truncated.setEventId(original.getEventId()); + return truncated; + } + + private Map truncateAttributes(final Map original) { + final Map truncatedAttrs = new HashMap<>(); + for (final Map.Entry entry : original.entrySet()) { + if (entry.getValue().length() > maxAttributeChars) { + truncatedAttrs.put(entry.getKey(), entry.getValue().substring(0, maxAttributeChars)); + } else { + truncatedAttrs.put(entry.getKey(), entry.getValue()); + } + } + return truncatedAttrs; + } + @Override public List getSearchableFields() { final List searchableFields = new ArrayList<>(configuration.getSearchableFields()); @@ -1612,7 +1674,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository for (final File file : potentialFiles) { try { - reader = RecordReaders.newRecordReader(file, allLogFiles); + reader = RecordReaders.newRecordReader(file, allLogFiles, maxAttributeChars); } catch (final IOException ioe) { continue; } @@ -1788,7 +1850,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository return true; } - if (repoDirty.get() || (writtenSinceRollover > 0 && System.currentTimeMillis() > streamStartTime.get() + maxPartitionMillis)) { + if (repoDirty.get() || writtenSinceRollover > 0 && System.currentTimeMillis() > streamStartTime.get() + maxPartitionMillis) { return true; } @@ -1797,7 +1859,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository public Collection getAllLogFiles() { final SortedMap map = idToPathMap.get(); - return (map == null) ? new ArrayList() : map.values(); + return map == null ? new ArrayList() : map.values(); } private static class PathMapComparator implements Comparator { @@ -1885,7 +1947,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository @Override public void run() { try { - final IndexSearch search = new IndexSearch(PersistentProvenanceRepository.this, indexDir, indexManager); + final IndexSearch search = new IndexSearch(PersistentProvenanceRepository.this, indexDir, indexManager, maxAttributeChars); final StandardQueryResult queryResult = search.search(query, retrievalCount); submission.getResult().update(queryResult.getMatchingEvents(), queryResult.getTotalHitCount()); if (queryResult.isFinished()) { @@ -1926,7 +1988,9 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository } try { - final Set matchingRecords = LineageQuery.computeLineageForFlowFiles(PersistentProvenanceRepository.this, indexManager, indexDir, null, flowFileUuids); + final Set matchingRecords = LineageQuery.computeLineageForFlowFiles(PersistentProvenanceRepository.this, + indexManager, indexDir, null, flowFileUuids, maxAttributeChars); + final StandardLineageResult result = submission.getResult(); result.update(matchingRecords); @@ -1959,7 +2023,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository final Map.Entry entry = queryIterator.next(); final StandardQueryResult result = entry.getValue().getResult(); - if (entry.getValue().isCanceled() || (result.isFinished() && result.getExpiration().before(now))) { + if (entry.getValue().isCanceled() || result.isFinished() && result.getExpiration().before(now)) { queryIterator.remove(); } } @@ -1969,7 +2033,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository final Map.Entry entry = lineageIterator.next(); final StandardLineageResult result = entry.getValue().getResult(); - if (entry.getValue().isCanceled() || (result.isFinished() && result.getExpiration().before(now))) { + if (entry.getValue().isCanceled() || result.isFinished() && result.getExpiration().before(now)) { lineageIterator.remove(); } } diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/RepositoryConfiguration.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/RepositoryConfiguration.java index d0d147c332..381d778595 100644 --- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/RepositoryConfiguration.java +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/RepositoryConfiguration.java @@ -34,6 +34,7 @@ public class RepositoryConfiguration { private long desiredIndexBytes = 1024L * 1024L * 500L; // 500 MB private int journalCount = 16; private int compressionBlockBytes = 1024 * 1024; + private int maxAttributeChars = 65536; private List searchableFields = new ArrayList<>(); private List searchableAttributes = new ArrayList<>(); @@ -278,4 +279,21 @@ public class RepositoryConfiguration { public void setAlwaysSync(boolean alwaysSync) { this.alwaysSync = alwaysSync; } + + /** + * @return the maximum number of characters to include in any attribute. If an attribute in a Provenance + * Event has more than this number of characters, it will be truncated when the event is retrieved. + */ + public int getMaxAttributeChars() { + return maxAttributeChars; + } + + /** + * Sets the maximum number of characters to include in any attribute. If an attribute in a Provenance + * Event has more than this number of characters, it will be truncated when it is retrieved. + */ + public void setMaxAttributeChars(int maxAttributeChars) { + this.maxAttributeChars = maxAttributeChars; + } + } diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordReader.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordReader.java index ca0d5edd26..09391072c6 100644 --- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordReader.java +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordReader.java @@ -47,18 +47,20 @@ public class StandardRecordReader implements RecordReader { private final boolean compressed; private final TocReader tocReader; private final int headerLength; + private final int maxAttributeChars; private DataInputStream dis; private ByteCountingInputStream byteCountingIn; - public StandardRecordReader(final InputStream in, final String filename) throws IOException { - this(in, filename, null); + public StandardRecordReader(final InputStream in, final String filename, final int maxAttributeChars) throws IOException { + this(in, filename, null, maxAttributeChars); } - public StandardRecordReader(final InputStream in, final String filename, final TocReader tocReader) throws IOException { + public StandardRecordReader(final InputStream in, final String filename, final TocReader tocReader, final int maxAttributeChars) throws IOException { logger.trace("Creating RecordReader for {}", filename); rawInputStream = new ByteCountingInputStream(in); + this.maxAttributeChars = maxAttributeChars; final InputStream limitedStream; if ( tocReader == null ) { @@ -367,7 +369,8 @@ public class StandardRecordReader implements RecordReader { for (int i = 0; i < numAttributes; i++) { final String key = readLongString(dis); final String value = valueNullable ? readLongNullableString(dis) : readLongString(dis); - attrs.put(key, value); + final String truncatedValue = value.length() > maxAttributeChars ? value.substring(0, maxAttributeChars) : value; + attrs.put(key, truncatedValue); } return attrs; @@ -429,7 +432,7 @@ public class StandardRecordReader implements RecordReader { byteCountingIn.reset(); } - return (nextByte >= 0); + return nextByte >= 0; } @Override @@ -451,7 +454,7 @@ public class StandardRecordReader implements RecordReader { // committed, so we can just process the FlowFile again. } - return (lastRecord == null) ? -1L : lastRecord.getEventId(); + return lastRecord == null ? -1L : lastRecord.getEventId(); } @Override diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DeleteIndexAction.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DeleteIndexAction.java index 70bf36e108..7707352064 100644 --- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DeleteIndexAction.java +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DeleteIndexAction.java @@ -46,9 +46,9 @@ public class DeleteIndexAction implements ExpirationAction { @Override public File execute(final File expiredFile) throws IOException { // count the number of records and determine the max event id that we are deleting. - long numDeleted = 0; + final long numDeleted = 0; long maxEventId = -1L; - try (final RecordReader reader = RecordReaders.newRecordReader(expiredFile, repository.getAllLogFiles())) { + try (final RecordReader reader = RecordReaders.newRecordReader(expiredFile, repository.getAllLogFiles(), Integer.MAX_VALUE)) { maxEventId = reader.getMaxEventId(); } catch (final IOException ioe) { logger.warn("Failed to obtain max ID present in journal file {}", expiredFile.getAbsolutePath()); @@ -65,7 +65,7 @@ public class DeleteIndexAction implements ExpirationAction { writer.deleteDocuments(term); writer.commit(); final int docsLeft = writer.numDocs(); - deleteDir = (docsLeft <= 0); + deleteDir = docsLeft <= 0; logger.debug("After expiring {}, there are {} docs left for index {}", expiredFile, docsLeft, indexingDirectory); } finally { indexManager.returnIndexWriter(indexingDirectory, writer); diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DocsReader.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DocsReader.java index 02fd5c3e28..eef46281a2 100644 --- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DocsReader.java +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DocsReader.java @@ -51,7 +51,7 @@ public class DocsReader { } public Set read(final TopDocs topDocs, final IndexReader indexReader, final Collection allProvenanceLogFiles, - final AtomicInteger retrievalCount, final int maxResults) throws IOException { + final AtomicInteger retrievalCount, final int maxResults, final int maxAttributeChars) throws IOException { if (retrievalCount.get() >= maxResults) { return Collections.emptySet(); } @@ -68,7 +68,7 @@ public class DocsReader { final long readDocuments = System.nanoTime() - start; logger.debug("Reading {} Lucene Documents took {} millis", docs.size(), TimeUnit.NANOSECONDS.toMillis(readDocuments)); - return read(docs, allProvenanceLogFiles, retrievalCount, maxResults); + return read(docs, allProvenanceLogFiles, retrievalCount, maxResults, maxAttributeChars); } @@ -108,7 +108,8 @@ public class DocsReader { } - public Set read(final List docs, final Collection allProvenanceLogFiles, final AtomicInteger retrievalCount, final int maxResults) throws IOException { + public Set read(final List docs, final Collection allProvenanceLogFiles, + final AtomicInteger retrievalCount, final int maxResults, final int maxAttributeChars) throws IOException { if (retrievalCount.get() >= maxResults) { return Collections.emptySet(); } @@ -161,7 +162,7 @@ public class DocsReader { for (final File file : potentialFiles) { try { - reader = RecordReaders.newRecordReader(file, allProvenanceLogFiles); + reader = RecordReaders.newRecordReader(file, allProvenanceLogFiles, maxAttributeChars); matchingRecords.add(getRecord(d, reader)); if ( retrievalCount.incrementAndGet() >= maxResults ) { diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexSearch.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexSearch.java index 53869f4563..c9bb238bed 100644 --- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexSearch.java +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexSearch.java @@ -39,11 +39,13 @@ public class IndexSearch { private final PersistentProvenanceRepository repository; private final File indexDirectory; private final IndexManager indexManager; + private final int maxAttributeChars; - public IndexSearch(final PersistentProvenanceRepository repo, final File indexDirectory, final IndexManager indexManager) { + public IndexSearch(final PersistentProvenanceRepository repo, final File indexDirectory, final IndexManager indexManager, final int maxAttributeChars) { this.repository = repo; this.indexDirectory = indexDirectory; this.indexManager = indexManager; + this.maxAttributeChars = maxAttributeChars; } public StandardQueryResult search(final org.apache.nifi.provenance.search.Query provenanceQuery, final AtomicInteger retrievedCount) throws IOException { @@ -82,7 +84,8 @@ public class IndexSearch { } final DocsReader docsReader = new DocsReader(repository.getConfiguration().getStorageDirectories()); - matchingRecords = docsReader.read(topDocs, searcher.getIndexReader(), repository.getAllLogFiles(), retrievedCount, provenanceQuery.getMaxResults()); + matchingRecords = docsReader.read(topDocs, searcher.getIndexReader(), repository.getAllLogFiles(), retrievedCount, + provenanceQuery.getMaxResults(), maxAttributeChars); final long readRecordsNanos = System.nanoTime() - finishSearch; logger.debug("Reading {} records took {} millis for {}", matchingRecords.size(), TimeUnit.NANOSECONDS.toMillis(readRecordsNanos), this); diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LineageQuery.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LineageQuery.java index 502068bea5..e9e6e63462 100644 --- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LineageQuery.java +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LineageQuery.java @@ -46,7 +46,7 @@ public class LineageQuery { private static final Logger logger = LoggerFactory.getLogger(LineageQuery.class); public static Set computeLineageForFlowFiles(final PersistentProvenanceRepository repo, final IndexManager indexManager, final File indexDirectory, - final String lineageIdentifier, final Collection flowFileUuids) throws IOException { + final String lineageIdentifier, final Collection flowFileUuids, final int maxAttributeChars) throws IOException { if (requireNonNull(flowFileUuids).size() > MAX_LINEAGE_UUIDS) { throw new IllegalArgumentException(String.format("Cannot compute lineage for more than %s FlowFiles. This lineage contains %s.", MAX_LINEAGE_UUIDS, flowFileUuids.size())); } @@ -94,7 +94,9 @@ public class LineageQuery { final long searchEnd = System.nanoTime(); final DocsReader docsReader = new DocsReader(repo.getConfiguration().getStorageDirectories()); - final Set recs = docsReader.read(uuidQueryTopDocs, searcher.getIndexReader(), repo.getAllLogFiles(), new AtomicInteger(0), Integer.MAX_VALUE); + final Set recs = docsReader.read(uuidQueryTopDocs, searcher.getIndexReader(), repo.getAllLogFiles(), + new AtomicInteger(0), Integer.MAX_VALUE, maxAttributeChars); + final long readDocsEnd = System.nanoTime(); logger.debug("Finished Lineage Query against {}; Lucene search took {} millis, reading records took {} millis", indexDirectory, TimeUnit.NANOSECONDS.toMillis(searchEnd - searchStart), TimeUnit.NANOSECONDS.toMillis(readDocsEnd - searchEnd)); diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReaders.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReaders.java index cab5e6f250..7889cd6952 100644 --- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReaders.java +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReaders.java @@ -32,7 +32,18 @@ import org.apache.nifi.provenance.toc.TocUtil; public class RecordReaders { - public static RecordReader newRecordReader(File file, final Collection provenanceLogFiles) throws IOException { + /** + * Creates a new Record Reader that is capable of reading Provenance Event Journals + * + * @param file the Provenance Event Journal to read data from + * @param provenanceLogFiles collection of all provenance journal files + * @param maxAttributeChars the maximum number of characters to retrieve for any one attribute. This allows us to avoid + * issues where a FlowFile has an extremely large attribute and reading events + * for that FlowFile results in loading that attribute into memory many times, exhausting the Java Heap + * @return a Record Reader capable of reading Provenance Event Journals + * @throws IOException if unable to create a Record Reader for the given file + */ + public static RecordReader newRecordReader(File file, final Collection provenanceLogFiles, final int maxAttributeChars) throws IOException { final File originalFile = file; InputStream fis = null; @@ -92,9 +103,9 @@ public class RecordReaders { final File tocFile = TocUtil.getTocFile(file); if ( tocFile.exists() ) { final TocReader tocReader = new StandardTocReader(tocFile); - return new StandardRecordReader(fis, filename, tocReader); + return new StandardRecordReader(fis, filename, tocReader, maxAttributeChars); } else { - return new StandardRecordReader(fis, filename); + return new StandardRecordReader(fis, filename, maxAttributeChars); } } catch (final IOException ioe) { if ( fis != null ) { diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java index 7d97bcdd5d..16f0312839 100644 --- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java @@ -252,7 +252,7 @@ public class TestPersistentProvenanceRepository { assertEquals(10, recoveredRecords.size()); for (int i = 0; i < 10; i++) { final ProvenanceEventRecord recovered = recoveredRecords.get(i); - assertEquals((long) i, recovered.getEventId()); + assertEquals(i, recovered.getEventId()); assertEquals("nifi://unit-test", recovered.getTransitUri()); assertEquals(ProvenanceEventType.RECEIVE, recovered.getEventType()); assertEquals(attributes, recovered.getAttributes()); @@ -283,7 +283,7 @@ public class TestPersistentProvenanceRepository { builder.fromFlowFile(createFlowFile(3L, 3000L, attributes)); builder.setComponentId("1234"); builder.setComponentType("dummy processor"); - ProvenanceEventRecord record = builder.build(); + final ProvenanceEventRecord record = builder.build(); for (int i = 0; i < 10; i++) { repo.registerEvent(record); @@ -1106,7 +1106,7 @@ public class TestPersistentProvenanceRepository { final Query q = new Query(""); q.setMaxResults(1000); - TopDocs topDocs = searcher.search(luceneQuery, 1000); + final TopDocs topDocs = searcher.search(luceneQuery, 1000); final List docs = new ArrayList<>(); for (int i = 0; i < topDocs.scoreDocs.length; i++) { @@ -1157,7 +1157,7 @@ public class TestPersistentProvenanceRepository { for (final File file : storageDir.listFiles()) { if (file.isFile()) { - try (RecordReader reader = RecordReaders.newRecordReader(file, null)) { + try (RecordReader reader = RecordReaders.newRecordReader(file, null, 2048)) { ProvenanceEventRecord r = null; while ((r = reader.nextRecord()) != null) { @@ -1169,4 +1169,35 @@ public class TestPersistentProvenanceRepository { assertEquals(10000, counter); } + + @Test + public void testTruncateAttributes() throws IOException, InterruptedException { + final RepositoryConfiguration config = createConfiguration(); + config.setMaxAttributeChars(50); + config.setMaxEventFileLife(3, TimeUnit.SECONDS); + repo = new PersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS); + repo.initialize(getEventReporter()); + + final Map attributes = new HashMap<>(); + attributes.put("75chars", "123456789012345678901234567890123456789012345678901234567890123456789012345"); + + final ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder(); + builder.setEventTime(System.currentTimeMillis()); + builder.setEventType(ProvenanceEventType.RECEIVE); + builder.setTransitUri("nifi://unit-test"); + attributes.put("uuid", "12345678-0000-0000-0000-012345678912"); + builder.fromFlowFile(createFlowFile(3L, 3000L, attributes)); + builder.setComponentId("1234"); + builder.setComponentType("dummy processor"); + + final ProvenanceEventRecord record = builder.build(); + repo.registerEvent(record); + repo.waitForRollover(); + + final ProvenanceEventRecord retrieved = repo.getEvent(0L); + assertNotNull(retrieved); + assertEquals("12345678-0000-0000-0000-012345678912", retrieved.getAttributes().get("uuid")); + assertEquals("12345678901234567890123456789012345678901234567890", retrieved.getAttributes().get("75chars")); + } + } diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestStandardRecordReaderWriter.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestStandardRecordReaderWriter.java index f242642fbe..d9e64e55d3 100644 --- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestStandardRecordReaderWriter.java +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestStandardRecordReaderWriter.java @@ -74,7 +74,7 @@ public class TestStandardRecordReaderWriter { final TocReader tocReader = new StandardTocReader(tocFile); try (final FileInputStream fis = new FileInputStream(journalFile); - final StandardRecordReader reader = new StandardRecordReader(fis, journalFile.getName(), tocReader)) { + final StandardRecordReader reader = new StandardRecordReader(fis, journalFile.getName(), tocReader, 2048)) { assertEquals(0, reader.getBlockIndex()); reader.skipToBlock(0); final StandardProvenanceEventRecord recovered = reader.nextRecord(); @@ -102,7 +102,7 @@ public class TestStandardRecordReaderWriter { final TocReader tocReader = new StandardTocReader(tocFile); try (final FileInputStream fis = new FileInputStream(journalFile); - final StandardRecordReader reader = new StandardRecordReader(fis, journalFile.getName(), tocReader)) { + final StandardRecordReader reader = new StandardRecordReader(fis, journalFile.getName(), tocReader, 2048)) { assertEquals(0, reader.getBlockIndex()); reader.skipToBlock(0); final StandardProvenanceEventRecord recovered = reader.nextRecord(); @@ -133,7 +133,7 @@ public class TestStandardRecordReaderWriter { final TocReader tocReader = new StandardTocReader(tocFile); try (final FileInputStream fis = new FileInputStream(journalFile); - final StandardRecordReader reader = new StandardRecordReader(fis, journalFile.getName(), tocReader)) { + final StandardRecordReader reader = new StandardRecordReader(fis, journalFile.getName(), tocReader, 2048)) { for (int i=0; i < 10; i++) { assertEquals(0, reader.getBlockIndex()); @@ -172,12 +172,12 @@ public class TestStandardRecordReaderWriter { final TocReader tocReader = new StandardTocReader(tocFile); try (final FileInputStream fis = new FileInputStream(journalFile); - final StandardRecordReader reader = new StandardRecordReader(fis, journalFile.getName(), tocReader)) { + final StandardRecordReader reader = new StandardRecordReader(fis, journalFile.getName(), tocReader, 2048)) { for (int i=0; i < 10; i++) { final StandardProvenanceEventRecord recovered = reader.nextRecord(); System.out.println(recovered); assertNotNull(recovered); - assertEquals((long) i, recovered.getEventId()); + assertEquals(i, recovered.getEventId()); assertEquals("nifi://unit-test", recovered.getTransitUri()); }