mirror of https://github.com/apache/nifi.git
NIFI-80: Truncate attribute values that exceed some threshold. Expose threshold as properties in nifi.properties file
This commit is contained in:
parent
979671ca97
commit
25146a5828
|
@ -276,6 +276,7 @@ language governing permissions and limitations under the License. -->
|
|||
<nifi.provenance.repository.index.shard.size>500 MB</nifi.provenance.repository.index.shard.size>
|
||||
<nifi.provenance.repository.always.sync>false</nifi.provenance.repository.always.sync>
|
||||
<nifi.provenance.repository.journal.count>16</nifi.provenance.repository.journal.count>
|
||||
<nifi.provenance.repository.max.attribute.length>65536</nifi.provenance.repository.max.attribute.length>
|
||||
|
||||
<!-- volatile provenance repository properties -->
|
||||
<nifi.provenance.repository.buffer.size>100000</nifi.provenance.repository.buffer.size>
|
||||
|
|
|
@ -82,6 +82,9 @@ nifi.provenance.repository.indexed.attributes=${nifi.provenance.repository.index
|
|||
# Large values for the shard size will result in more Java heap usage when searching the Provenance Repository
|
||||
# but should provide better performance
|
||||
nifi.provenance.repository.index.shard.size=${nifi.provenance.repository.index.shard.size}
|
||||
# Indicates the maximum length that a FlowFile attribute can be when retrieving a Provenance Event from
|
||||
# the repository. If the length of any attribute exceeds this value, it will be truncated when the event is retrieved.
|
||||
nifi.provenance.repository.max.attribute.length=${nifi.provenance.repository.max.attribute.length}
|
||||
|
||||
# Volatile Provenance Respository Properties
|
||||
nifi.provenance.repository.buffer.size=${nifi.provenance.repository.buffer.size}
|
||||
|
|
|
@ -85,7 +85,7 @@ public class IndexConfiguration {
|
|||
}
|
||||
|
||||
private Long getFirstEntryTime(final File provenanceLogFile) {
|
||||
try (final RecordReader reader = RecordReaders.newRecordReader(provenanceLogFile, null)) {
|
||||
try (final RecordReader reader = RecordReaders.newRecordReader(provenanceLogFile, null, Integer.MAX_VALUE)) {
|
||||
final StandardProvenanceEventRecord firstRecord = reader.nextRecord();
|
||||
if (firstRecord == null) {
|
||||
return provenanceLogFile.lastModified();
|
||||
|
|
|
@ -134,6 +134,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
|
|||
private final IndexManager indexManager;
|
||||
private final boolean alwaysSync;
|
||||
private final int rolloverCheckMillis;
|
||||
private final int maxAttributeChars;
|
||||
|
||||
private final ScheduledExecutorService scheduledExecService;
|
||||
private final ScheduledExecutorService rolloverExecutor;
|
||||
|
@ -167,6 +168,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
|
|||
}
|
||||
|
||||
this.configuration = configuration;
|
||||
this.maxAttributeChars = configuration.getMaxAttributeChars();
|
||||
|
||||
for (final File file : configuration.getStorageDirectories()) {
|
||||
final Path storageDirectory = file.toPath();
|
||||
|
@ -289,6 +291,21 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
|
|||
|
||||
final Boolean alwaysSync = Boolean.parseBoolean(properties.getProperty("nifi.provenance.repository.always.sync", "false"));
|
||||
|
||||
final int defaultMaxAttrChars = 65536;
|
||||
final String maxAttrLength = properties.getProperty("nifi.provenance.repository.max.attribute.length", String.valueOf(defaultMaxAttrChars));
|
||||
int maxAttrChars;
|
||||
try {
|
||||
maxAttrChars = Integer.parseInt(maxAttrLength);
|
||||
// must be at least 36 characters because that's the length of the uuid attribute,
|
||||
// which must be kept intact
|
||||
if (maxAttrChars < 36) {
|
||||
maxAttrChars = 36;
|
||||
logger.warn("Found max attribute length property set to " + maxAttrLength + " but minimum length is 36; using 36 instead");
|
||||
}
|
||||
} catch (final Exception e) {
|
||||
maxAttrChars = defaultMaxAttrChars;
|
||||
}
|
||||
|
||||
final List<SearchableField> searchableFields = SearchableFieldParser.extractSearchableFields(indexedFieldString, true);
|
||||
final List<SearchableField> searchableAttributes = SearchableFieldParser.extractSearchableFields(indexedAttrString, false);
|
||||
|
||||
|
@ -310,6 +327,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
|
|||
config.setMaxStorageCapacity(maxStorageBytes);
|
||||
config.setQueryThreadPoolSize(queryThreads);
|
||||
config.setJournalCount(journalCount);
|
||||
config.setMaxAttributeChars(maxAttrChars);
|
||||
|
||||
if (shardSize != null) {
|
||||
config.setDesiredIndexSize(DataUnit.parseDataSize(shardSize, DataUnit.B).longValue());
|
||||
|
@ -337,6 +355,14 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
|
|||
return writers;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the maximum number of characters that any Event attribute should contain. If the event contains
|
||||
* more characters than this, the attribute may be truncated on retrieval
|
||||
*/
|
||||
public int getMaxAttributeCharacters() {
|
||||
return maxAttributeChars;
|
||||
}
|
||||
|
||||
@Override
|
||||
public StandardProvenanceEventRecord.Builder eventBuilder() {
|
||||
return new StandardProvenanceEventRecord.Builder();
|
||||
|
@ -362,7 +388,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
|
|||
}
|
||||
|
||||
for (final Path path : paths) {
|
||||
try (RecordReader reader = RecordReaders.newRecordReader(path.toFile(), getAllLogFiles())) {
|
||||
try (RecordReader reader = RecordReaders.newRecordReader(path.toFile(), getAllLogFiles(), maxAttributeChars)) {
|
||||
// if this is the first record, try to find out the block index and jump directly to
|
||||
// the block index. This avoids having to read through a lot of data that we don't care about
|
||||
// just to get to the first record that we want.
|
||||
|
@ -377,7 +403,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
|
|||
}
|
||||
|
||||
StandardProvenanceEventRecord record;
|
||||
while (records.size() < maxRecords && ((record = reader.nextRecord()) != null)) {
|
||||
while (records.size() < maxRecords && (record = reader.nextRecord()) != null) {
|
||||
if (record.getEventId() >= firstRecordId) {
|
||||
records.add(record);
|
||||
}
|
||||
|
@ -507,7 +533,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
|
|||
|
||||
if (maxIdFile != null) {
|
||||
// Determine the max ID in the last file.
|
||||
try (final RecordReader reader = RecordReaders.newRecordReader(maxIdFile, getAllLogFiles())) {
|
||||
try (final RecordReader reader = RecordReaders.newRecordReader(maxIdFile, getAllLogFiles(), maxAttributeChars)) {
|
||||
final long eventId = reader.getMaxEventId();
|
||||
if (eventId > maxId) {
|
||||
maxId = eventId;
|
||||
|
@ -571,7 +597,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
|
|||
|
||||
// Read the records in the last file to find its max id
|
||||
if (greatestMinIdFile != null) {
|
||||
try (final RecordReader recordReader = RecordReaders.newRecordReader(greatestMinIdFile, Collections.<Path>emptyList())) {
|
||||
try (final RecordReader recordReader = RecordReaders.newRecordReader(greatestMinIdFile, Collections.<Path> emptyList(), maxAttributeChars)) {
|
||||
maxId = recordReader.getMaxEventId();
|
||||
}
|
||||
}
|
||||
|
@ -1224,7 +1250,10 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
|
|||
try {
|
||||
for (final File journalFile : journalFiles) {
|
||||
try {
|
||||
readers.add(RecordReaders.newRecordReader(journalFile, null));
|
||||
// Use MAX_VALUE for number of chars because we don't want to truncate the value as we write it
|
||||
// out. This allows us to later decide that we want more characters and still be able to retrieve
|
||||
// the entire event.
|
||||
readers.add(RecordReaders.newRecordReader(journalFile, null, Integer.MAX_VALUE));
|
||||
} catch (final EOFException eof) {
|
||||
// there's nothing here. Skip over it.
|
||||
} catch (final IOException ioe) {
|
||||
|
@ -1314,7 +1343,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
|
|||
indexingAction.index(record, indexWriter, blockIndex);
|
||||
maxId = record.getEventId();
|
||||
|
||||
latestRecords.add(record);
|
||||
latestRecords.add(truncateAttributes(record));
|
||||
records++;
|
||||
|
||||
// Remove this entry from the map
|
||||
|
@ -1383,6 +1412,39 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
|
|||
return writerFile;
|
||||
}
|
||||
|
||||
private StandardProvenanceEventRecord truncateAttributes(final StandardProvenanceEventRecord original) {
|
||||
boolean requireTruncation = false;
|
||||
|
||||
for (final Map.Entry<String, String> entry : original.getAttributes().entrySet()) {
|
||||
if (entry.getValue().length() > maxAttributeChars) {
|
||||
requireTruncation = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (!requireTruncation) {
|
||||
return original;
|
||||
}
|
||||
|
||||
final StandardProvenanceEventRecord.Builder builder = new StandardProvenanceEventRecord.Builder().fromEvent(original);
|
||||
builder.setAttributes(truncateAttributes(original.getPreviousAttributes()), truncateAttributes(original.getUpdatedAttributes()));
|
||||
final StandardProvenanceEventRecord truncated = builder.build();
|
||||
truncated.setEventId(original.getEventId());
|
||||
return truncated;
|
||||
}
|
||||
|
||||
private Map<String, String> truncateAttributes(final Map<String, String> original) {
|
||||
final Map<String, String> truncatedAttrs = new HashMap<>();
|
||||
for (final Map.Entry<String, String> entry : original.entrySet()) {
|
||||
if (entry.getValue().length() > maxAttributeChars) {
|
||||
truncatedAttrs.put(entry.getKey(), entry.getValue().substring(0, maxAttributeChars));
|
||||
} else {
|
||||
truncatedAttrs.put(entry.getKey(), entry.getValue());
|
||||
}
|
||||
}
|
||||
return truncatedAttrs;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<SearchableField> getSearchableFields() {
|
||||
final List<SearchableField> searchableFields = new ArrayList<>(configuration.getSearchableFields());
|
||||
|
@ -1612,7 +1674,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
|
|||
|
||||
for (final File file : potentialFiles) {
|
||||
try {
|
||||
reader = RecordReaders.newRecordReader(file, allLogFiles);
|
||||
reader = RecordReaders.newRecordReader(file, allLogFiles, maxAttributeChars);
|
||||
} catch (final IOException ioe) {
|
||||
continue;
|
||||
}
|
||||
|
@ -1788,7 +1850,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
|
|||
return true;
|
||||
}
|
||||
|
||||
if (repoDirty.get() || (writtenSinceRollover > 0 && System.currentTimeMillis() > streamStartTime.get() + maxPartitionMillis)) {
|
||||
if (repoDirty.get() || writtenSinceRollover > 0 && System.currentTimeMillis() > streamStartTime.get() + maxPartitionMillis) {
|
||||
return true;
|
||||
}
|
||||
|
||||
|
@ -1797,7 +1859,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
|
|||
|
||||
public Collection<Path> getAllLogFiles() {
|
||||
final SortedMap<Long, Path> map = idToPathMap.get();
|
||||
return (map == null) ? new ArrayList<Path>() : map.values();
|
||||
return map == null ? new ArrayList<Path>() : map.values();
|
||||
}
|
||||
|
||||
private static class PathMapComparator implements Comparator<Long> {
|
||||
|
@ -1885,7 +1947,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
|
|||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
final IndexSearch search = new IndexSearch(PersistentProvenanceRepository.this, indexDir, indexManager);
|
||||
final IndexSearch search = new IndexSearch(PersistentProvenanceRepository.this, indexDir, indexManager, maxAttributeChars);
|
||||
final StandardQueryResult queryResult = search.search(query, retrievalCount);
|
||||
submission.getResult().update(queryResult.getMatchingEvents(), queryResult.getTotalHitCount());
|
||||
if (queryResult.isFinished()) {
|
||||
|
@ -1926,7 +1988,9 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
|
|||
}
|
||||
|
||||
try {
|
||||
final Set<ProvenanceEventRecord> matchingRecords = LineageQuery.computeLineageForFlowFiles(PersistentProvenanceRepository.this, indexManager, indexDir, null, flowFileUuids);
|
||||
final Set<ProvenanceEventRecord> matchingRecords = LineageQuery.computeLineageForFlowFiles(PersistentProvenanceRepository.this,
|
||||
indexManager, indexDir, null, flowFileUuids, maxAttributeChars);
|
||||
|
||||
final StandardLineageResult result = submission.getResult();
|
||||
result.update(matchingRecords);
|
||||
|
||||
|
@ -1959,7 +2023,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
|
|||
final Map.Entry<String, AsyncQuerySubmission> entry = queryIterator.next();
|
||||
|
||||
final StandardQueryResult result = entry.getValue().getResult();
|
||||
if (entry.getValue().isCanceled() || (result.isFinished() && result.getExpiration().before(now))) {
|
||||
if (entry.getValue().isCanceled() || result.isFinished() && result.getExpiration().before(now)) {
|
||||
queryIterator.remove();
|
||||
}
|
||||
}
|
||||
|
@ -1969,7 +2033,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
|
|||
final Map.Entry<String, AsyncLineageSubmission> entry = lineageIterator.next();
|
||||
|
||||
final StandardLineageResult result = entry.getValue().getResult();
|
||||
if (entry.getValue().isCanceled() || (result.isFinished() && result.getExpiration().before(now))) {
|
||||
if (entry.getValue().isCanceled() || result.isFinished() && result.getExpiration().before(now)) {
|
||||
lineageIterator.remove();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -34,6 +34,7 @@ public class RepositoryConfiguration {
|
|||
private long desiredIndexBytes = 1024L * 1024L * 500L; // 500 MB
|
||||
private int journalCount = 16;
|
||||
private int compressionBlockBytes = 1024 * 1024;
|
||||
private int maxAttributeChars = 65536;
|
||||
|
||||
private List<SearchableField> searchableFields = new ArrayList<>();
|
||||
private List<SearchableField> searchableAttributes = new ArrayList<>();
|
||||
|
@ -278,4 +279,21 @@ public class RepositoryConfiguration {
|
|||
public void setAlwaysSync(boolean alwaysSync) {
|
||||
this.alwaysSync = alwaysSync;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the maximum number of characters to include in any attribute. If an attribute in a Provenance
|
||||
* Event has more than this number of characters, it will be truncated when the event is retrieved.
|
||||
*/
|
||||
public int getMaxAttributeChars() {
|
||||
return maxAttributeChars;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the maximum number of characters to include in any attribute. If an attribute in a Provenance
|
||||
* Event has more than this number of characters, it will be truncated when it is retrieved.
|
||||
*/
|
||||
public void setMaxAttributeChars(int maxAttributeChars) {
|
||||
this.maxAttributeChars = maxAttributeChars;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -47,18 +47,20 @@ public class StandardRecordReader implements RecordReader {
|
|||
private final boolean compressed;
|
||||
private final TocReader tocReader;
|
||||
private final int headerLength;
|
||||
private final int maxAttributeChars;
|
||||
|
||||
private DataInputStream dis;
|
||||
private ByteCountingInputStream byteCountingIn;
|
||||
|
||||
public StandardRecordReader(final InputStream in, final String filename) throws IOException {
|
||||
this(in, filename, null);
|
||||
public StandardRecordReader(final InputStream in, final String filename, final int maxAttributeChars) throws IOException {
|
||||
this(in, filename, null, maxAttributeChars);
|
||||
}
|
||||
|
||||
public StandardRecordReader(final InputStream in, final String filename, final TocReader tocReader) throws IOException {
|
||||
public StandardRecordReader(final InputStream in, final String filename, final TocReader tocReader, final int maxAttributeChars) throws IOException {
|
||||
logger.trace("Creating RecordReader for {}", filename);
|
||||
|
||||
rawInputStream = new ByteCountingInputStream(in);
|
||||
this.maxAttributeChars = maxAttributeChars;
|
||||
|
||||
final InputStream limitedStream;
|
||||
if ( tocReader == null ) {
|
||||
|
@ -367,7 +369,8 @@ public class StandardRecordReader implements RecordReader {
|
|||
for (int i = 0; i < numAttributes; i++) {
|
||||
final String key = readLongString(dis);
|
||||
final String value = valueNullable ? readLongNullableString(dis) : readLongString(dis);
|
||||
attrs.put(key, value);
|
||||
final String truncatedValue = value.length() > maxAttributeChars ? value.substring(0, maxAttributeChars) : value;
|
||||
attrs.put(key, truncatedValue);
|
||||
}
|
||||
|
||||
return attrs;
|
||||
|
@ -429,7 +432,7 @@ public class StandardRecordReader implements RecordReader {
|
|||
byteCountingIn.reset();
|
||||
}
|
||||
|
||||
return (nextByte >= 0);
|
||||
return nextByte >= 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -451,7 +454,7 @@ public class StandardRecordReader implements RecordReader {
|
|||
// committed, so we can just process the FlowFile again.
|
||||
}
|
||||
|
||||
return (lastRecord == null) ? -1L : lastRecord.getEventId();
|
||||
return lastRecord == null ? -1L : lastRecord.getEventId();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -46,9 +46,9 @@ public class DeleteIndexAction implements ExpirationAction {
|
|||
@Override
|
||||
public File execute(final File expiredFile) throws IOException {
|
||||
// count the number of records and determine the max event id that we are deleting.
|
||||
long numDeleted = 0;
|
||||
final long numDeleted = 0;
|
||||
long maxEventId = -1L;
|
||||
try (final RecordReader reader = RecordReaders.newRecordReader(expiredFile, repository.getAllLogFiles())) {
|
||||
try (final RecordReader reader = RecordReaders.newRecordReader(expiredFile, repository.getAllLogFiles(), Integer.MAX_VALUE)) {
|
||||
maxEventId = reader.getMaxEventId();
|
||||
} catch (final IOException ioe) {
|
||||
logger.warn("Failed to obtain max ID present in journal file {}", expiredFile.getAbsolutePath());
|
||||
|
@ -65,7 +65,7 @@ public class DeleteIndexAction implements ExpirationAction {
|
|||
writer.deleteDocuments(term);
|
||||
writer.commit();
|
||||
final int docsLeft = writer.numDocs();
|
||||
deleteDir = (docsLeft <= 0);
|
||||
deleteDir = docsLeft <= 0;
|
||||
logger.debug("After expiring {}, there are {} docs left for index {}", expiredFile, docsLeft, indexingDirectory);
|
||||
} finally {
|
||||
indexManager.returnIndexWriter(indexingDirectory, writer);
|
||||
|
|
|
@ -51,7 +51,7 @@ public class DocsReader {
|
|||
}
|
||||
|
||||
public Set<ProvenanceEventRecord> read(final TopDocs topDocs, final IndexReader indexReader, final Collection<Path> allProvenanceLogFiles,
|
||||
final AtomicInteger retrievalCount, final int maxResults) throws IOException {
|
||||
final AtomicInteger retrievalCount, final int maxResults, final int maxAttributeChars) throws IOException {
|
||||
if (retrievalCount.get() >= maxResults) {
|
||||
return Collections.emptySet();
|
||||
}
|
||||
|
@ -68,7 +68,7 @@ public class DocsReader {
|
|||
|
||||
final long readDocuments = System.nanoTime() - start;
|
||||
logger.debug("Reading {} Lucene Documents took {} millis", docs.size(), TimeUnit.NANOSECONDS.toMillis(readDocuments));
|
||||
return read(docs, allProvenanceLogFiles, retrievalCount, maxResults);
|
||||
return read(docs, allProvenanceLogFiles, retrievalCount, maxResults, maxAttributeChars);
|
||||
}
|
||||
|
||||
|
||||
|
@ -108,7 +108,8 @@ public class DocsReader {
|
|||
}
|
||||
|
||||
|
||||
public Set<ProvenanceEventRecord> read(final List<Document> docs, final Collection<Path> allProvenanceLogFiles, final AtomicInteger retrievalCount, final int maxResults) throws IOException {
|
||||
public Set<ProvenanceEventRecord> read(final List<Document> docs, final Collection<Path> allProvenanceLogFiles,
|
||||
final AtomicInteger retrievalCount, final int maxResults, final int maxAttributeChars) throws IOException {
|
||||
if (retrievalCount.get() >= maxResults) {
|
||||
return Collections.emptySet();
|
||||
}
|
||||
|
@ -161,7 +162,7 @@ public class DocsReader {
|
|||
|
||||
for (final File file : potentialFiles) {
|
||||
try {
|
||||
reader = RecordReaders.newRecordReader(file, allProvenanceLogFiles);
|
||||
reader = RecordReaders.newRecordReader(file, allProvenanceLogFiles, maxAttributeChars);
|
||||
matchingRecords.add(getRecord(d, reader));
|
||||
|
||||
if ( retrievalCount.incrementAndGet() >= maxResults ) {
|
||||
|
|
|
@ -39,11 +39,13 @@ public class IndexSearch {
|
|||
private final PersistentProvenanceRepository repository;
|
||||
private final File indexDirectory;
|
||||
private final IndexManager indexManager;
|
||||
private final int maxAttributeChars;
|
||||
|
||||
public IndexSearch(final PersistentProvenanceRepository repo, final File indexDirectory, final IndexManager indexManager) {
|
||||
public IndexSearch(final PersistentProvenanceRepository repo, final File indexDirectory, final IndexManager indexManager, final int maxAttributeChars) {
|
||||
this.repository = repo;
|
||||
this.indexDirectory = indexDirectory;
|
||||
this.indexManager = indexManager;
|
||||
this.maxAttributeChars = maxAttributeChars;
|
||||
}
|
||||
|
||||
public StandardQueryResult search(final org.apache.nifi.provenance.search.Query provenanceQuery, final AtomicInteger retrievedCount) throws IOException {
|
||||
|
@ -82,7 +84,8 @@ public class IndexSearch {
|
|||
}
|
||||
|
||||
final DocsReader docsReader = new DocsReader(repository.getConfiguration().getStorageDirectories());
|
||||
matchingRecords = docsReader.read(topDocs, searcher.getIndexReader(), repository.getAllLogFiles(), retrievedCount, provenanceQuery.getMaxResults());
|
||||
matchingRecords = docsReader.read(topDocs, searcher.getIndexReader(), repository.getAllLogFiles(), retrievedCount,
|
||||
provenanceQuery.getMaxResults(), maxAttributeChars);
|
||||
|
||||
final long readRecordsNanos = System.nanoTime() - finishSearch;
|
||||
logger.debug("Reading {} records took {} millis for {}", matchingRecords.size(), TimeUnit.NANOSECONDS.toMillis(readRecordsNanos), this);
|
||||
|
|
|
@ -46,7 +46,7 @@ public class LineageQuery {
|
|||
private static final Logger logger = LoggerFactory.getLogger(LineageQuery.class);
|
||||
|
||||
public static Set<ProvenanceEventRecord> computeLineageForFlowFiles(final PersistentProvenanceRepository repo, final IndexManager indexManager, final File indexDirectory,
|
||||
final String lineageIdentifier, final Collection<String> flowFileUuids) throws IOException {
|
||||
final String lineageIdentifier, final Collection<String> flowFileUuids, final int maxAttributeChars) throws IOException {
|
||||
if (requireNonNull(flowFileUuids).size() > MAX_LINEAGE_UUIDS) {
|
||||
throw new IllegalArgumentException(String.format("Cannot compute lineage for more than %s FlowFiles. This lineage contains %s.", MAX_LINEAGE_UUIDS, flowFileUuids.size()));
|
||||
}
|
||||
|
@ -94,7 +94,9 @@ public class LineageQuery {
|
|||
final long searchEnd = System.nanoTime();
|
||||
|
||||
final DocsReader docsReader = new DocsReader(repo.getConfiguration().getStorageDirectories());
|
||||
final Set<ProvenanceEventRecord> recs = docsReader.read(uuidQueryTopDocs, searcher.getIndexReader(), repo.getAllLogFiles(), new AtomicInteger(0), Integer.MAX_VALUE);
|
||||
final Set<ProvenanceEventRecord> recs = docsReader.read(uuidQueryTopDocs, searcher.getIndexReader(), repo.getAllLogFiles(),
|
||||
new AtomicInteger(0), Integer.MAX_VALUE, maxAttributeChars);
|
||||
|
||||
final long readDocsEnd = System.nanoTime();
|
||||
logger.debug("Finished Lineage Query against {}; Lucene search took {} millis, reading records took {} millis",
|
||||
indexDirectory, TimeUnit.NANOSECONDS.toMillis(searchEnd - searchStart), TimeUnit.NANOSECONDS.toMillis(readDocsEnd - searchEnd));
|
||||
|
|
|
@ -32,7 +32,18 @@ import org.apache.nifi.provenance.toc.TocUtil;
|
|||
|
||||
public class RecordReaders {
|
||||
|
||||
public static RecordReader newRecordReader(File file, final Collection<Path> provenanceLogFiles) throws IOException {
|
||||
/**
|
||||
* Creates a new Record Reader that is capable of reading Provenance Event Journals
|
||||
*
|
||||
* @param file the Provenance Event Journal to read data from
|
||||
* @param provenanceLogFiles collection of all provenance journal files
|
||||
* @param maxAttributeChars the maximum number of characters to retrieve for any one attribute. This allows us to avoid
|
||||
* issues where a FlowFile has an extremely large attribute and reading events
|
||||
* for that FlowFile results in loading that attribute into memory many times, exhausting the Java Heap
|
||||
* @return a Record Reader capable of reading Provenance Event Journals
|
||||
* @throws IOException if unable to create a Record Reader for the given file
|
||||
*/
|
||||
public static RecordReader newRecordReader(File file, final Collection<Path> provenanceLogFiles, final int maxAttributeChars) throws IOException {
|
||||
final File originalFile = file;
|
||||
InputStream fis = null;
|
||||
|
||||
|
@ -92,9 +103,9 @@ public class RecordReaders {
|
|||
final File tocFile = TocUtil.getTocFile(file);
|
||||
if ( tocFile.exists() ) {
|
||||
final TocReader tocReader = new StandardTocReader(tocFile);
|
||||
return new StandardRecordReader(fis, filename, tocReader);
|
||||
return new StandardRecordReader(fis, filename, tocReader, maxAttributeChars);
|
||||
} else {
|
||||
return new StandardRecordReader(fis, filename);
|
||||
return new StandardRecordReader(fis, filename, maxAttributeChars);
|
||||
}
|
||||
} catch (final IOException ioe) {
|
||||
if ( fis != null ) {
|
||||
|
|
|
@ -252,7 +252,7 @@ public class TestPersistentProvenanceRepository {
|
|||
assertEquals(10, recoveredRecords.size());
|
||||
for (int i = 0; i < 10; i++) {
|
||||
final ProvenanceEventRecord recovered = recoveredRecords.get(i);
|
||||
assertEquals((long) i, recovered.getEventId());
|
||||
assertEquals(i, recovered.getEventId());
|
||||
assertEquals("nifi://unit-test", recovered.getTransitUri());
|
||||
assertEquals(ProvenanceEventType.RECEIVE, recovered.getEventType());
|
||||
assertEquals(attributes, recovered.getAttributes());
|
||||
|
@ -283,7 +283,7 @@ public class TestPersistentProvenanceRepository {
|
|||
builder.fromFlowFile(createFlowFile(3L, 3000L, attributes));
|
||||
builder.setComponentId("1234");
|
||||
builder.setComponentType("dummy processor");
|
||||
ProvenanceEventRecord record = builder.build();
|
||||
final ProvenanceEventRecord record = builder.build();
|
||||
|
||||
for (int i = 0; i < 10; i++) {
|
||||
repo.registerEvent(record);
|
||||
|
@ -1106,7 +1106,7 @@ public class TestPersistentProvenanceRepository {
|
|||
|
||||
final Query q = new Query("");
|
||||
q.setMaxResults(1000);
|
||||
TopDocs topDocs = searcher.search(luceneQuery, 1000);
|
||||
final TopDocs topDocs = searcher.search(luceneQuery, 1000);
|
||||
|
||||
final List<Document> docs = new ArrayList<>();
|
||||
for (int i = 0; i < topDocs.scoreDocs.length; i++) {
|
||||
|
@ -1157,7 +1157,7 @@ public class TestPersistentProvenanceRepository {
|
|||
for (final File file : storageDir.listFiles()) {
|
||||
if (file.isFile()) {
|
||||
|
||||
try (RecordReader reader = RecordReaders.newRecordReader(file, null)) {
|
||||
try (RecordReader reader = RecordReaders.newRecordReader(file, null, 2048)) {
|
||||
ProvenanceEventRecord r = null;
|
||||
|
||||
while ((r = reader.nextRecord()) != null) {
|
||||
|
@ -1169,4 +1169,35 @@ public class TestPersistentProvenanceRepository {
|
|||
|
||||
assertEquals(10000, counter);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTruncateAttributes() throws IOException, InterruptedException {
|
||||
final RepositoryConfiguration config = createConfiguration();
|
||||
config.setMaxAttributeChars(50);
|
||||
config.setMaxEventFileLife(3, TimeUnit.SECONDS);
|
||||
repo = new PersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS);
|
||||
repo.initialize(getEventReporter());
|
||||
|
||||
final Map<String, String> attributes = new HashMap<>();
|
||||
attributes.put("75chars", "123456789012345678901234567890123456789012345678901234567890123456789012345");
|
||||
|
||||
final ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder();
|
||||
builder.setEventTime(System.currentTimeMillis());
|
||||
builder.setEventType(ProvenanceEventType.RECEIVE);
|
||||
builder.setTransitUri("nifi://unit-test");
|
||||
attributes.put("uuid", "12345678-0000-0000-0000-012345678912");
|
||||
builder.fromFlowFile(createFlowFile(3L, 3000L, attributes));
|
||||
builder.setComponentId("1234");
|
||||
builder.setComponentType("dummy processor");
|
||||
|
||||
final ProvenanceEventRecord record = builder.build();
|
||||
repo.registerEvent(record);
|
||||
repo.waitForRollover();
|
||||
|
||||
final ProvenanceEventRecord retrieved = repo.getEvent(0L);
|
||||
assertNotNull(retrieved);
|
||||
assertEquals("12345678-0000-0000-0000-012345678912", retrieved.getAttributes().get("uuid"));
|
||||
assertEquals("12345678901234567890123456789012345678901234567890", retrieved.getAttributes().get("75chars"));
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -74,7 +74,7 @@ public class TestStandardRecordReaderWriter {
|
|||
final TocReader tocReader = new StandardTocReader(tocFile);
|
||||
|
||||
try (final FileInputStream fis = new FileInputStream(journalFile);
|
||||
final StandardRecordReader reader = new StandardRecordReader(fis, journalFile.getName(), tocReader)) {
|
||||
final StandardRecordReader reader = new StandardRecordReader(fis, journalFile.getName(), tocReader, 2048)) {
|
||||
assertEquals(0, reader.getBlockIndex());
|
||||
reader.skipToBlock(0);
|
||||
final StandardProvenanceEventRecord recovered = reader.nextRecord();
|
||||
|
@ -102,7 +102,7 @@ public class TestStandardRecordReaderWriter {
|
|||
final TocReader tocReader = new StandardTocReader(tocFile);
|
||||
|
||||
try (final FileInputStream fis = new FileInputStream(journalFile);
|
||||
final StandardRecordReader reader = new StandardRecordReader(fis, journalFile.getName(), tocReader)) {
|
||||
final StandardRecordReader reader = new StandardRecordReader(fis, journalFile.getName(), tocReader, 2048)) {
|
||||
assertEquals(0, reader.getBlockIndex());
|
||||
reader.skipToBlock(0);
|
||||
final StandardProvenanceEventRecord recovered = reader.nextRecord();
|
||||
|
@ -133,7 +133,7 @@ public class TestStandardRecordReaderWriter {
|
|||
final TocReader tocReader = new StandardTocReader(tocFile);
|
||||
|
||||
try (final FileInputStream fis = new FileInputStream(journalFile);
|
||||
final StandardRecordReader reader = new StandardRecordReader(fis, journalFile.getName(), tocReader)) {
|
||||
final StandardRecordReader reader = new StandardRecordReader(fis, journalFile.getName(), tocReader, 2048)) {
|
||||
for (int i=0; i < 10; i++) {
|
||||
assertEquals(0, reader.getBlockIndex());
|
||||
|
||||
|
@ -172,12 +172,12 @@ public class TestStandardRecordReaderWriter {
|
|||
final TocReader tocReader = new StandardTocReader(tocFile);
|
||||
|
||||
try (final FileInputStream fis = new FileInputStream(journalFile);
|
||||
final StandardRecordReader reader = new StandardRecordReader(fis, journalFile.getName(), tocReader)) {
|
||||
final StandardRecordReader reader = new StandardRecordReader(fis, journalFile.getName(), tocReader, 2048)) {
|
||||
for (int i=0; i < 10; i++) {
|
||||
final StandardProvenanceEventRecord recovered = reader.nextRecord();
|
||||
System.out.println(recovered);
|
||||
assertNotNull(recovered);
|
||||
assertEquals((long) i, recovered.getEventId());
|
||||
assertEquals(i, recovered.getEventId());
|
||||
assertEquals("nifi://unit-test", recovered.getTransitUri());
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue