mirror of https://github.com/apache/nifi.git
NIFI-1132: Limited number of Lineage Identifiers held to 100 and marked the getLineageIdentifiers() method as deprecated
This commit is contained in:
parent
5f8fdae909
commit
73c1671975
|
@ -64,7 +64,14 @@ public interface FlowFile extends Comparable<FlowFile> {
|
|||
* @return a set of identifiers that are unique to this FlowFile's lineage.
|
||||
* If FlowFile X is derived from FlowFile Y, both FlowFiles will have the
|
||||
* same value for the Lineage Claim ID.
|
||||
*
|
||||
* @deprecated this collection was erroneously unbounded and caused a lot of OutOfMemoryError problems
|
||||
* when dealing with FlowFiles with many ancestors. This Collection is
|
||||
* now capped at 100 lineage identifiers. This method was introduced with the idea of providing
|
||||
* future performance improvements but due to the high cost of heap consumption will not be used
|
||||
* in such a manner. As a result, this method will be removed in a future release.
|
||||
*/
|
||||
@Deprecated
|
||||
Set<String> getLineageIdentifiers();
|
||||
|
||||
/**
|
||||
|
|
|
@ -51,7 +51,14 @@ public interface ProvenanceEventRecord {
|
|||
/**
|
||||
* @return the set of all lineage identifiers that are associated with the
|
||||
* FlowFile for which this Event was created
|
||||
*
|
||||
* @deprecated this collection was erroneously unbounded and caused a lot of OutOfMemoryError problems
|
||||
* when querying Provenance Events about FlowFiles with many ancestors. This Collection is
|
||||
* now capped at 100 lineage identifiers. This method was introduced with the idea of providing
|
||||
* future performance improvements but due to the high cost of heap consumption will not be used
|
||||
* in such a manner. As a result, this method will be removed in a future release.
|
||||
*/
|
||||
@Deprecated
|
||||
Set<String> getLineageIdentifiers();
|
||||
|
||||
/**
|
||||
|
|
|
@ -18,8 +18,12 @@ package org.apache.nifi.provenance;
|
|||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Comparator;
|
||||
import java.util.Date;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.TreeSet;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.locks.Lock;
|
||||
import java.util.concurrent.locks.ReadWriteLock;
|
||||
|
@ -40,7 +44,7 @@ public class StandardQueryResult implements QueryResult {
|
|||
|
||||
private final Lock writeLock = rwLock.writeLock();
|
||||
// guarded by writeLock
|
||||
private final List<ProvenanceEventRecord> matchingRecords = new ArrayList<>();
|
||||
private final Set<ProvenanceEventRecord> matchingRecords = new TreeSet<>(new EventIdComparator());
|
||||
private long totalHitCount;
|
||||
private int numCompletedSteps = 0;
|
||||
private Date expirationDate;
|
||||
|
@ -66,8 +70,14 @@ public class StandardQueryResult implements QueryResult {
|
|||
}
|
||||
|
||||
final List<ProvenanceEventRecord> copy = new ArrayList<>(query.getMaxResults());
|
||||
for (int i = 0; i < query.getMaxResults(); i++) {
|
||||
copy.add(matchingRecords.get(i));
|
||||
|
||||
int i = 0;
|
||||
final Iterator<ProvenanceEventRecord> itr = matchingRecords.iterator();
|
||||
while (itr.hasNext()) {
|
||||
copy.add(itr.next());
|
||||
if (++i >= query.getMaxResults()) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
return copy;
|
||||
|
@ -165,4 +175,11 @@ public class StandardQueryResult implements QueryResult {
|
|||
private void updateExpiration() {
|
||||
expirationDate = new Date(System.currentTimeMillis() + TTL);
|
||||
}
|
||||
|
||||
private static class EventIdComparator implements Comparator<ProvenanceEventRecord> {
|
||||
@Override
|
||||
public int compare(final ProvenanceEventRecord o1, final ProvenanceEventRecord o2) {
|
||||
return Long.compare(o2.getEventId(), o1.getEventId());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -25,24 +25,25 @@ import java.util.Map;
|
|||
import java.util.Set;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
import org.apache.nifi.controller.repository.claim.ContentClaim;
|
||||
import org.apache.nifi.flowfile.FlowFile;
|
||||
import org.apache.nifi.flowfile.attributes.CoreAttributes;
|
||||
|
||||
import org.apache.commons.lang3.builder.CompareToBuilder;
|
||||
import org.apache.commons.lang3.builder.EqualsBuilder;
|
||||
import org.apache.commons.lang3.builder.HashCodeBuilder;
|
||||
import org.apache.commons.lang3.builder.ToStringBuilder;
|
||||
import org.apache.commons.lang3.builder.ToStringStyle;
|
||||
import org.apache.nifi.controller.repository.claim.ContentClaim;
|
||||
import org.apache.nifi.flowfile.FlowFile;
|
||||
import org.apache.nifi.flowfile.attributes.CoreAttributes;
|
||||
|
||||
/**
|
||||
* <p>
|
||||
* A flow file is a logical notion of an item in a flow with its associated attributes and identity which can be used as a reference for its actual content.</p>
|
||||
* A flow file is a logical notion of an item in a flow with its associated attributes and identity which can be used as a reference for its actual content.
|
||||
* </p>
|
||||
*
|
||||
* <b>Immutable - Thread Safe</b>
|
||||
*
|
||||
*/
|
||||
public final class StandardFlowFileRecord implements FlowFile, FlowFileRecord {
|
||||
private static final int MAX_LINEAGE_IDENTIFIERS = 100;
|
||||
|
||||
private final long id;
|
||||
private final long entryDate;
|
||||
|
@ -182,7 +183,18 @@ public final class StandardFlowFileRecord implements FlowFile, FlowFileRecord {
|
|||
public Builder lineageIdentifiers(final Collection<String> lineageIdentifiers) {
|
||||
if (null != lineageIdentifiers) {
|
||||
bLineageIdentifiers.clear();
|
||||
bLineageIdentifiers.addAll(lineageIdentifiers);
|
||||
|
||||
if (lineageIdentifiers.size() > MAX_LINEAGE_IDENTIFIERS) {
|
||||
int i = 0;
|
||||
for (final String id : lineageIdentifiers) {
|
||||
bLineageIdentifiers.add(id);
|
||||
if (i++ >= MAX_LINEAGE_IDENTIFIERS) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
bLineageIdentifiers.addAll(lineageIdentifiers);
|
||||
}
|
||||
}
|
||||
return this;
|
||||
}
|
||||
|
|
|
@ -117,16 +117,16 @@ public class TestPersistentProvenanceRepository {
|
|||
// Delete all of the storage files. We do this in order to clean up the tons of files that
|
||||
// we create but also to ensure that we have closed all of the file handles. If we leave any
|
||||
// streams open, for instance, this will throw an IOException, causing our unit test to fail.
|
||||
for ( final File storageDir : config.getStorageDirectories() ) {
|
||||
for (final File storageDir : config.getStorageDirectories()) {
|
||||
int i;
|
||||
for (i=0; i < 3; i++) {
|
||||
for (i = 0; i < 3; i++) {
|
||||
try {
|
||||
FileUtils.deleteFile(storageDir, true);
|
||||
break;
|
||||
} catch (final IOException ioe) {
|
||||
// if there is a virus scanner, etc. running in the background we may not be able to
|
||||
// delete the file. Wait a sec and try again.
|
||||
if ( i == 2 ) {
|
||||
if (i == 2) {
|
||||
throw ioe;
|
||||
} else {
|
||||
try {
|
||||
|
@ -441,7 +441,7 @@ public class TestPersistentProvenanceRepository {
|
|||
repo.waitForRollover();
|
||||
|
||||
final Query query = new Query(UUID.randomUUID().toString());
|
||||
// query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.FlowFileUUID, "00000000-0000-0000-0000*"));
|
||||
// query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.FlowFileUUID, "00000000-0000-0000-0000*"));
|
||||
query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.Filename, "file-*"));
|
||||
query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.ComponentID, "12?4"));
|
||||
query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.TransitURI, "nifi://*"));
|
||||
|
@ -464,68 +464,6 @@ public class TestPersistentProvenanceRepository {
|
|||
assertTrue(newRecordSet.getMatchingEvents().isEmpty());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIndexAndCompressOnRolloverAndSubsequentSearchAsync() throws IOException, InterruptedException, ParseException {
|
||||
final RepositoryConfiguration config = createConfiguration();
|
||||
config.setMaxRecordLife(3, TimeUnit.SECONDS);
|
||||
config.setMaxStorageCapacity(1024L * 1024L);
|
||||
config.setMaxEventFileLife(500, TimeUnit.MILLISECONDS);
|
||||
config.setMaxEventFileCapacity(1024L * 1024L);
|
||||
config.setSearchableFields(new ArrayList<>(SearchableFields.getStandardFields()));
|
||||
|
||||
repo = new PersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS);
|
||||
repo.initialize(getEventReporter());
|
||||
|
||||
final String uuid = "00000000-0000-0000-0000-000000000000";
|
||||
final Map<String, String> attributes = new HashMap<>();
|
||||
attributes.put("abc", "xyz");
|
||||
attributes.put("xyz", "abc");
|
||||
attributes.put("filename", "file-" + uuid);
|
||||
|
||||
final ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder();
|
||||
builder.setEventTime(System.currentTimeMillis());
|
||||
builder.setEventType(ProvenanceEventType.RECEIVE);
|
||||
builder.setTransitUri("nifi://unit-test");
|
||||
builder.fromFlowFile(createFlowFile(3L, 3000L, attributes));
|
||||
builder.setComponentId("1234");
|
||||
builder.setComponentType("dummy processor");
|
||||
|
||||
for (int i = 0; i < 10; i++) {
|
||||
attributes.put("uuid", "00000000-0000-0000-0000-00000000000" + i);
|
||||
builder.fromFlowFile(createFlowFile(i, 3000L, attributes));
|
||||
repo.registerEvent(builder.build());
|
||||
}
|
||||
|
||||
repo.waitForRollover();
|
||||
|
||||
final Query query = new Query(UUID.randomUUID().toString());
|
||||
query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.FlowFileUUID, "00000*"));
|
||||
query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.Filename, "file-*"));
|
||||
query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.ComponentID, "12?4"));
|
||||
query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.TransitURI, "nifi://*"));
|
||||
query.setMaxResults(100);
|
||||
|
||||
final QuerySubmission submission = repo.submitQuery(query);
|
||||
while (!submission.getResult().isFinished()) {
|
||||
Thread.sleep(100L);
|
||||
}
|
||||
|
||||
assertEquals(10, submission.getResult().getMatchingEvents().size());
|
||||
for (final ProvenanceEventRecord match : submission.getResult().getMatchingEvents()) {
|
||||
System.out.println(match);
|
||||
}
|
||||
|
||||
Thread.sleep(2000L);
|
||||
|
||||
config.setMaxStorageCapacity(100L);
|
||||
config.setMaxRecordLife(500, TimeUnit.MILLISECONDS);
|
||||
repo.purgeOldEvents();
|
||||
Thread.sleep(2000L);
|
||||
|
||||
final QueryResult newRecordSet = repo.queryEvents(query);
|
||||
assertTrue(newRecordSet.getMatchingEvents().isEmpty());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIndexAndCompressOnRolloverAndSubsequentSearchMultipleStorageDirs() throws IOException, InterruptedException, ParseException {
|
||||
final RepositoryConfiguration config = createConfiguration();
|
||||
|
@ -603,7 +541,7 @@ public class TestPersistentProvenanceRepository {
|
|||
|
||||
repo.purgeOldEvents();
|
||||
|
||||
Thread.sleep(2000L); // purge is async. Give it time to do its job.
|
||||
Thread.sleep(2000L); // purge is async. Give it time to do its job.
|
||||
|
||||
query.setMaxResults(100);
|
||||
final QuerySubmission noResultSubmission = repo.submitQuery(query);
|
||||
|
@ -939,7 +877,7 @@ public class TestPersistentProvenanceRepository {
|
|||
config.setMaxEventFileLife(500, TimeUnit.MILLISECONDS);
|
||||
config.setMaxEventFileCapacity(1024L * 1024L);
|
||||
config.setSearchableFields(new ArrayList<>(SearchableFields.getStandardFields()));
|
||||
config.setDesiredIndexSize(10); // force new index to be created for each rollover
|
||||
config.setDesiredIndexSize(10); // force new index to be created for each rollover
|
||||
|
||||
repo = new PersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS);
|
||||
repo.initialize(getEventReporter());
|
||||
|
@ -961,7 +899,7 @@ public class TestPersistentProvenanceRepository {
|
|||
for (int i = 0; i < 10; i++) {
|
||||
attributes.put("uuid", "00000000-0000-0000-0000-00000000000" + i);
|
||||
builder.fromFlowFile(createFlowFile(i, 3000L, attributes));
|
||||
builder.setEventTime(10L); // make sure the events are destroyed when we call purge
|
||||
builder.setEventTime(10L); // make sure the events are destroyed when we call purge
|
||||
repo.registerEvent(builder.build());
|
||||
}
|
||||
|
||||
|
@ -1019,7 +957,7 @@ public class TestPersistentProvenanceRepository {
|
|||
@Test
|
||||
public void testBackPressure() throws IOException, InterruptedException {
|
||||
final RepositoryConfiguration config = createConfiguration();
|
||||
config.setMaxEventFileCapacity(1L); // force rollover on each record.
|
||||
config.setMaxEventFileCapacity(1L); // force rollover on each record.
|
||||
config.setJournalCount(1);
|
||||
|
||||
final AtomicInteger journalCountRef = new AtomicInteger(0);
|
||||
|
|
Loading…
Reference in New Issue