NIFI-5420: Allow StandardProcessSession to calculate duration for provenance events

This closes #2886.

Signed-off-by: Mark Payne <markap14@hotmail.com>
This commit is contained in:
Matthew Burgess 2018-07-12 17:13:32 -04:00 committed by Mark Payne
parent b1f78d58a6
commit 473221368c
5 changed files with 49 additions and 16 deletions

View File

@ -26,8 +26,9 @@ public interface ProvenanceEventEnricher {
*
* @param record record
* @param flowFile flowfile
* @param commitNanos the time (in nanoseconds) when the associated session was committed
* @return new event record
*/
ProvenanceEventRecord enrich(ProvenanceEventRecord record, FlowFile flowFile);
ProvenanceEventRecord enrich(ProvenanceEventRecord record, FlowFile flowFile, long commitNanos);
}

View File

@ -737,6 +737,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
flowFileRecordMap.put(flowFile.getAttribute(CoreAttributes.UUID.key()), flowFile);
}
final long commitNanos = System.nanoTime();
final List<ProvenanceEventRecord> autoTermEvents = checkpoint.autoTerminatedEvents;
final Iterable<ProvenanceEventRecord> iterable = new Iterable<ProvenanceEventRecord>() {
final Iterator<ProvenanceEventRecord> recordsToSubmitIterator = recordsToSubmit.iterator();
@ -761,9 +762,9 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
// the representation of the FlowFile as it is committed, as this is the only way in which it really
// exists in our system -- all other representations are volatile representations that have not been
// exposed.
return enrich(rawEvent, flowFileRecordMap, checkpoint.records, rawEvent.getEventType() != ProvenanceEventType.SEND);
return enrich(rawEvent, flowFileRecordMap, checkpoint.records, rawEvent.getEventType() != ProvenanceEventType.SEND, commitNanos);
} else if (autoTermIterator != null && autoTermIterator.hasNext()) {
return enrich(autoTermIterator.next(), flowFileRecordMap, checkpoint.records, true);
return enrich(autoTermIterator.next(), flowFileRecordMap, checkpoint.records, true, commitNanos);
}
throw new NoSuchElementException();
@ -796,7 +797,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
}
@Override
public StandardProvenanceEventRecord enrich(final ProvenanceEventRecord rawEvent, final FlowFile flowFile) {
public StandardProvenanceEventRecord enrich(final ProvenanceEventRecord rawEvent, final FlowFile flowFile, final long commitNanos) {
verifyTaskActive();
final StandardRepositoryRecord repoRecord = records.get(flowFile);
@ -829,11 +830,15 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
}
recordBuilder.setAttributes(repoRecord.getOriginalAttributes(), repoRecord.getUpdatedAttributes());
if (rawEvent.getEventDuration() < 0) {
recordBuilder.setEventDuration(TimeUnit.NANOSECONDS.toMillis(commitNanos - repoRecord.getStartNanos()));
}
return recordBuilder.build();
}
private StandardProvenanceEventRecord enrich(
final ProvenanceEventRecord rawEvent, final Map<String, FlowFileRecord> flowFileRecordMap, final Map<FlowFileRecord, StandardRepositoryRecord> records, final boolean updateAttributes) {
final ProvenanceEventRecord rawEvent, final Map<String, FlowFileRecord> flowFileRecordMap, final Map<FlowFileRecord, StandardRepositoryRecord> records,
final boolean updateAttributes, final long commitNanos) {
final StandardProvenanceEventRecord.Builder recordBuilder = new StandardProvenanceEventRecord.Builder().fromEvent(rawEvent);
final FlowFileRecord eventFlowFile = flowFileRecordMap.get(rawEvent.getFlowFileUuid());
if (eventFlowFile != null) {
@ -861,18 +866,15 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
if (originalQueue != null) {
recordBuilder.setSourceQueueIdentifier(originalQueue.getIdentifier());
}
}
if (updateAttributes) {
final FlowFileRecord flowFileRecord = flowFileRecordMap.get(rawEvent.getFlowFileUuid());
if (flowFileRecord != null) {
final StandardRepositoryRecord record = records.get(flowFileRecord);
if (record != null) {
recordBuilder.setAttributes(record.getOriginalAttributes(), record.getUpdatedAttributes());
}
if (updateAttributes) {
recordBuilder.setAttributes(repoRecord.getOriginalAttributes(), repoRecord.getUpdatedAttributes());
}
if (rawEvent.getEventDuration() < 0) {
recordBuilder.setEventDuration(TimeUnit.NANOSECONDS.toMillis(commitNanos - repoRecord.getStartNanos()));
}
}
return recordBuilder.build();
}

View File

@ -209,7 +209,9 @@ public class StandardProvenanceReporter implements ProvenanceReporter {
public void send(final FlowFile flowFile, final String transitUri, final String details, final long transmissionMillis, final boolean force) {
try {
final ProvenanceEventRecord record = build(flowFile, ProvenanceEventType.SEND).setTransitUri(transitUri).setEventDuration(transmissionMillis).setDetails(details).build();
final ProvenanceEventRecord enriched = eventEnricher == null ? record : eventEnricher.enrich(record, flowFile);
// If the transmissionMillis field has been populated, use zero as the value of commitNanos (the call to System.nanoTime() is expensive but the value will be ignored).
final long commitNanos = transmissionMillis < 0 ? System.nanoTime() : 0L;
final ProvenanceEventRecord enriched = eventEnricher == null ? record : eventEnricher.enrich(record, flowFile, commitNanos);
if (force) {
repository.registerEvent(enriched);
@ -226,7 +228,7 @@ public class StandardProvenanceReporter implements ProvenanceReporter {
@Override
public void send(final FlowFile flowFile, final String transitUri, final boolean force) {
send(flowFile, transitUri, -1L, true);
send(flowFile, transitUri, -1L, force);
}
@Override

View File

@ -928,6 +928,29 @@ public class TestStandardProcessSession {
assertEquals(1, provenanceRepo.getEvents(0L, 100000).size());
}
@Test
public void testProvenanceEventsHaveDurationFromSession() throws IOException {
final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
.addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
.entryDate(System.currentTimeMillis())
.build();
flowFileQueue.put(flowFileRecord);
final FlowFile orig = session.get();
final FlowFile newFlowFile = session.create(orig);
session.getProvenanceReporter().fork(orig, Collections.singletonList(newFlowFile), 0L);
session.getProvenanceReporter().fetch(newFlowFile, "nowhere://");
session.getProvenanceReporter().send(newFlowFile, "nowhere://");
session.transfer(newFlowFile, new Relationship.Builder().name("A").build());
session.commit();
List<ProvenanceEventRecord> events = provenanceRepo.getEvents(0L, 100000);
assertNotNull(events);
assertEquals(3, events.size()); // FETCH, SEND, and FORK
events.forEach((event) -> assertTrue(event.getEventDuration() > -1));
}
@Test
public void testUuidAttributeCannotBeUpdated() {
String originalUuid = "11111111-1111-1111-1111-111111111111";

View File

@ -38,6 +38,7 @@ public class StandardRepositoryRecord implements RepositoryRecord {
private final Map<String, String> updatedAttributes = new HashMap<>();
private final Map<String, String> originalAttributes;
private List<ContentClaim> transientClaims;
private final long startNanos = System.nanoTime();
/**
* Creates a new record which has no original claim or flow file - it is entirely new
@ -218,4 +219,8 @@ public class StandardRepositoryRecord implements RepositoryRecord {
}
transientClaims.add(claim);
}
public long getStartNanos() {
return startNanos;
}
}