diff --git a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java index 4ba45aadd5..11172a8f54 100644 --- a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java +++ b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java @@ -579,7 +579,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE continue; } - if ( contentChanged ) { + final boolean newFlowFile = repoRecord.getOriginal() == null; + if ( contentChanged && !newFlowFile ) { recordsToSubmit.add(provenanceReporter.build(curFlowFile, ProvenanceEventType.CONTENT_MODIFIED).build()); addEventType(eventTypesPerFlowFileId, flowFileId, ProvenanceEventType.CONTENT_MODIFIED); eventAdded = true; diff --git a/nar-bundles/framework-bundle/framework/core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java b/nar-bundles/framework-bundle/framework/core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java index 3dbbcf34e2..060bbd9d95 100644 --- a/nar-bundles/framework-bundle/framework/core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java +++ b/nar-bundles/framework-bundle/framework/core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java @@ -846,6 +846,74 @@ public class TestStandardProcessSession { assertEquals(ProvenanceEventType.CREATE, event.getEventType()); } + @Test + public void testContentModifiedNotEmittedForCreate() throws IOException { + FlowFile newFlowFile = session.create(); + newFlowFile = session.write(newFlowFile, new OutputStreamCallback() { + @Override + public void process(OutputStream out) throws IOException { + } + }); + session.transfer(newFlowFile, new Relationship.Builder().name("A").build()); + session.commit(); + + final List events = provenanceRepo.getEvents(0L, 10000); + assertFalse(events.isEmpty()); + assertEquals(1, events.size()); + + final ProvenanceEventRecord event = events.get(0); + assertEquals(ProvenanceEventType.CREATE, event.getEventType()); + } + + @Test + public void testContentModifiedEmittedAndNotAttributesModified() throws IOException { + final FlowFileRecord flowFile = new StandardFlowFileRecord.Builder() + .id(1L) + .addAttribute("uuid", "000000000000-0000-0000-0000-00000000") + .build(); + this.flowFileQueue.put(flowFile); + + FlowFile existingFlowFile = session.get(); + existingFlowFile = session.write(existingFlowFile, new OutputStreamCallback() { + @Override + public void process(OutputStream out) throws IOException { + } + }); + existingFlowFile = session.putAttribute(existingFlowFile, "attr", "a"); + session.transfer(existingFlowFile, new Relationship.Builder().name("A").build()); + session.commit(); + + final List events = provenanceRepo.getEvents(0L, 10000); + assertFalse(events.isEmpty()); + assertEquals(1, events.size()); + + final ProvenanceEventRecord event = events.get(0); + assertEquals(ProvenanceEventType.CONTENT_MODIFIED, event.getEventType()); + } + + @Test + public void testAttributesModifiedEmitted() throws IOException { + final FlowFileRecord flowFile = new StandardFlowFileRecord.Builder() + .id(1L) + .addAttribute("uuid", "000000000000-0000-0000-0000-00000000") + .build(); + this.flowFileQueue.put(flowFile); + + FlowFile existingFlowFile = session.get(); + existingFlowFile = session.putAttribute(existingFlowFile, "attr", "a"); + session.transfer(existingFlowFile, new Relationship.Builder().name("A").build()); + session.commit(); + + final List events = provenanceRepo.getEvents(0L, 10000); + assertFalse(events.isEmpty()); + assertEquals(1, events.size()); + + final ProvenanceEventRecord event = events.get(0); + assertEquals(ProvenanceEventType.ATTRIBUTES_MODIFIED, event.getEventType()); + } + + + private static class MockFlowFileRepository implements FlowFileRepository { private final AtomicLong idGenerator = new AtomicLong(0L);