diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java index 9a89b058f3..20ca408c9a 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java @@ -1007,8 +1007,9 @@ public class StandardProcessSession implements ProcessSession, ProvenanceEventEn // the representation of the FlowFile as it is committed, as this is the only way in which it really // exists in our system -- all other representations are volatile representations that have not been // exposed. - final boolean isUpdateAttributes = rawEvent.getEventType() != ProvenanceEventType.SEND && rawEvent.getEventType() != ProvenanceEventType.UPLOAD; - return enrich(rawEvent, flowFileRecordMap, checkpoint.records, isUpdateAttributes, commitNanos); + final boolean isUpdateAttributesAndContent = rawEvent.getEventType() != ProvenanceEventType.SEND && rawEvent.getEventType() != ProvenanceEventType.UPLOAD + && rawEvent.getEventType() != ProvenanceEventType.CLONE; + return enrich(rawEvent, flowFileRecordMap, checkpoint.records, isUpdateAttributesAndContent, commitNanos); } else if (autoTermIterator != null && autoTermIterator.hasNext()) { return enrich(autoTermIterator.next(), flowFileRecordMap, checkpoint.records, true, commitNanos); } @@ -1085,13 +1086,13 @@ public class StandardProcessSession implements ProcessSession, ProvenanceEventEn private ProvenanceEventRecord enrich( final ProvenanceEventRecord rawEvent, final Map flowFileRecordMap, final Map records, - final boolean updateAttributes, final long commitNanos) { + final boolean updateAttributesAndContent, final long commitNanos) { final ProvenanceEventBuilder recordBuilder = context.createProvenanceEventBuilder().fromEvent(rawEvent); final FlowFileRecord eventFlowFile = flowFileRecordMap.get(rawEvent.getFlowFileUuid()); if (eventFlowFile != null) { final StandardRepositoryRecord repoRecord = records.get(eventFlowFile.getId()); - if (repoRecord.getCurrent() != null && repoRecord.getCurrentClaim() != null) { + if (updateAttributesAndContent && repoRecord.getCurrent() != null && repoRecord.getCurrentClaim() != null) { final ContentClaim currentClaim = repoRecord.getCurrentClaim(); final long currentOffset = repoRecord.getCurrentClaimOffset(); final long size = eventFlowFile.getSize(); @@ -1100,7 +1101,7 @@ public class StandardProcessSession implements ProcessSession, ProvenanceEventEn recordBuilder.setCurrentContentClaim(resourceClaim.getContainer(), resourceClaim.getSection(), resourceClaim.getId(), currentOffset + currentClaim.getOffset(), size); } - if (repoRecord.getOriginal() != null && repoRecord.getOriginalClaim() != null) { + if (updateAttributesAndContent && repoRecord.getOriginal() != null && repoRecord.getOriginalClaim() != null) { final ContentClaim originalClaim = repoRecord.getOriginalClaim(); final long originalOffset = repoRecord.getOriginal().getContentClaimOffset(); final long originalSize = repoRecord.getOriginal().getSize(); @@ -1114,7 +1115,7 @@ public class StandardProcessSession implements ProcessSession, ProvenanceEventEn recordBuilder.setSourceQueueIdentifier(originalQueue.getIdentifier()); } - if (updateAttributes) { + if (updateAttributesAndContent) { recordBuilder.setAttributes(repoRecord.getOriginalAttributes(), repoRecord.getUpdatedAttributes()); } diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProvenanceReporter.java b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProvenanceReporter.java index 3ac141368b..832b9bba1b 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProvenanceReporter.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProvenanceReporter.java @@ -457,7 +457,10 @@ public class StandardProvenanceReporter implements InternalProvenanceReporter { final ProvenanceEventBuilder eventBuilder = build(parent, ProvenanceEventType.CLONE); eventBuilder.addChildFlowFile(child); eventBuilder.addParentFlowFile(parent); - events.add(eventBuilder.build()); + final ProvenanceEventRecord eventRecord = eventBuilder.build(); + final ProvenanceEventRecord enriched = eventEnricher == null ? eventRecord : eventEnricher.enrich(eventRecord, parent, System.nanoTime()); + + events.add(enriched); } catch (final Exception e) { logger.error("Failed to generate Provenance Event", e); } diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/test/java/org/apache/nifi/controller/repository/StandardProvenanceReporterTest.java b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/test/java/org/apache/nifi/controller/repository/StandardProvenanceReporterTest.java index 3bb9a53e8e..d636f3224e 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/test/java/org/apache/nifi/controller/repository/StandardProvenanceReporterTest.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/test/java/org/apache/nifi/controller/repository/StandardProvenanceReporterTest.java @@ -28,6 +28,11 @@ import java.util.Set; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; class StandardProvenanceReporterTest { @@ -46,4 +51,19 @@ class StandardProvenanceReporterTest { assertNotNull(record); assertEquals("These are details", record.getDetails()); } -} \ No newline at end of file + + @Test + public void testEnrichEvents() { + final ProvenanceEventRepository mockRepo = Mockito.mock(ProvenanceEventRepository.class); + final ProvenanceEventEnricher enricher = Mockito.mock(ProvenanceEventEnricher.class); + final StandardProvenanceReporter reporter = new StandardProvenanceReporter(null, "1234", "TestProc", mockRepo, enricher); + Mockito.when(mockRepo.eventBuilder()).thenReturn(new StandardProvenanceEventRecord.Builder()); + + final FlowFile flowFile = new StandardFlowFileRecord.Builder().id(10L).addAttribute("uuid", "10").build(); + final FlowFile childFlowFile = new StandardFlowFileRecord.Builder().id(11L).addAttribute("uuid", "11").build(); + reporter.send(flowFile, "test://noop"); + reporter.upload(flowFile, 0, "test://noop"); + reporter.clone(flowFile, childFlowFile); + verify(enricher, times(3)).enrich(any(), eq(flowFile), anyLong()); + } +}