diff --git a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java index 08e6afe84a..4ba45aadd5 100644 --- a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java +++ b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java @@ -75,7 +75,6 @@ import org.apache.nifi.provenance.ProvenanceEventType; import org.apache.nifi.provenance.ProvenanceReporter; import org.apache.nifi.provenance.StandardProvenanceEventRecord; import org.apache.nifi.util.NiFiProperties; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -488,6 +487,16 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE } } + private void addEventType(final Map> map, final String id, final ProvenanceEventType eventType) { + Set eventTypes = map.get(id); + if ( eventTypes == null ) { + eventTypes = new HashSet<>(); + map.put(id, eventTypes); + } + + eventTypes.add(eventType); + } + private void updateProvenanceRepo(final Checkpoint checkpoint) { // Update Provenance Repository final ProvenanceEventRepository provenanceRepo = context.getProvenanceRepository(); @@ -496,7 +505,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE // in case the Processor developer submitted the same events to the reporter. So we use a LinkedHashSet // for this, so that we are able to ensure that the events are submitted in the proper order. final Set recordsToSubmit = new LinkedHashSet<>(); - + final Map> eventTypesPerFlowFileId = new HashMap<>(); + final Set processorGenerated = checkpoint.reportedEvents; // We first want to submit FORK events because if the Processor is going to create events against @@ -513,6 +523,13 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE if (!event.getChildUuids().isEmpty() && !isSpuriousForkEvent(event, checkpoint.removedFlowFiles) && !processorGenerated.contains(event)) { recordsToSubmit.add(event); + + for ( final String childUuid : event.getChildUuids() ) { + addEventType(eventTypesPerFlowFileId, childUuid, event.getEventType()); + } + for ( final String parentUuid : event.getParentUuids() ) { + addEventType(eventTypesPerFlowFileId, parentUuid, event.getEventType()); + } } } @@ -523,6 +540,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE } recordsToSubmit.add(event); + addEventType(eventTypesPerFlowFileId, event.getFlowFileUuid(), event.getEventType()); } // Finally, add any other events that we may have generated. @@ -533,6 +551,67 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE } recordsToSubmit.add(event); + addEventType(eventTypesPerFlowFileId, event.getFlowFileUuid(), event.getEventType()); + } + } + + // Check if content or attributes changed. If so, register the appropriate events. + for (final StandardRepositoryRecord repoRecord : checkpoint.records.values() ) { + final ContentClaim original = repoRecord.getOriginalClaim(); + final ContentClaim current = repoRecord.getCurrentClaim(); + + boolean contentChanged = false; + if ( original == null && current != null ) { + contentChanged = true; + } + if ( original != null && current == null ) { + contentChanged = true; + } + if ( original != null && current != null && !original.equals(current) ) { + contentChanged = true; + } + + final FlowFileRecord curFlowFile = repoRecord.getCurrent(); + final String flowFileId = curFlowFile.getAttribute(CoreAttributes.UUID.key()); + boolean eventAdded = false; + + if (checkpoint.removedFlowFiles.contains(flowFileId)) { + continue; + } + + if ( contentChanged ) { + recordsToSubmit.add(provenanceReporter.build(curFlowFile, ProvenanceEventType.CONTENT_MODIFIED).build()); + addEventType(eventTypesPerFlowFileId, flowFileId, ProvenanceEventType.CONTENT_MODIFIED); + eventAdded = true; + } + + if ( checkpoint.createdFlowFiles.contains(flowFileId) ) { + final Set registeredTypes = eventTypesPerFlowFileId.get(flowFileId); + boolean creationEventRegistered = false; + if ( registeredTypes != null ) { + if ( registeredTypes.contains(ProvenanceEventType.CREATE) || + registeredTypes.contains(ProvenanceEventType.FORK) || + registeredTypes.contains(ProvenanceEventType.JOIN) || + registeredTypes.contains(ProvenanceEventType.RECEIVE) ) { + creationEventRegistered = true; + } + } + + if ( !creationEventRegistered ) { + recordsToSubmit.add(provenanceReporter.build(curFlowFile, ProvenanceEventType.CREATE).build()); + eventAdded = true; + } + } + + if ( !eventAdded && !repoRecord.getUpdatedAttributes().isEmpty() ) { + // We generate an ATTRIBUTES_MODIFIED event only if no other event has been + // created for the FlowFile. We do this because all events contain both the + // newest and the original attributes, so generating an ATTRIBUTES_MODIFIED + // event is redundant if another already exists. + if ( !eventTypesPerFlowFileId.containsKey(flowFileId) ) { + recordsToSubmit.add(provenanceReporter.build(curFlowFile, ProvenanceEventType.ATTRIBUTES_MODIFIED).build()); + addEventType(eventTypesPerFlowFileId, flowFileId, ProvenanceEventType.ATTRIBUTES_MODIFIED); + } } } diff --git a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/repository/StandardProvenanceReporter.java b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/repository/StandardProvenanceReporter.java index e8b1e87daf..01fb3dc028 100644 --- a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/repository/StandardProvenanceReporter.java +++ b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/repository/StandardProvenanceReporter.java @@ -328,7 +328,7 @@ public class StandardProvenanceReporter implements ProvenanceReporter { } } } - + @Override public void modifyContent(final FlowFile flowFile) { modifyContent(flowFile, null, -1L); @@ -421,7 +421,7 @@ public class StandardProvenanceReporter implements ProvenanceReporter { } } - private ProvenanceEventBuilder build(final FlowFile flowFile, final ProvenanceEventType eventType) { + ProvenanceEventBuilder build(final FlowFile flowFile, final ProvenanceEventType eventType) { final ProvenanceEventBuilder builder = repository.eventBuilder(); builder.setEventType(eventType); builder.fromFlowFile(flowFile); diff --git a/nar-bundles/framework-bundle/framework/core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java b/nar-bundles/framework-bundle/framework/core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java index 6e0a5d79fd..3dbbcf34e2 100644 --- a/nar-bundles/framework-bundle/framework/core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java +++ b/nar-bundles/framework-bundle/framework/core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java @@ -17,6 +17,7 @@ package org.apache.nifi.controller.repository; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.when; @@ -64,7 +65,6 @@ import org.apache.nifi.provenance.MockProvenanceEventRepository; import org.apache.nifi.provenance.ProvenanceEventRecord; import org.apache.nifi.provenance.ProvenanceEventRepository; import org.apache.nifi.provenance.ProvenanceEventType; - import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -267,7 +267,7 @@ public class TestStandardProcessSession { } @Test - public void testSpawnsNotEmittedIfFilesDeleted() throws IOException { + public void testForksNotEmittedIfFilesDeleted() throws IOException { final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder() .addAttribute("uuid", "12345678-1234-1234-1234-123456789012") .entryDate(System.currentTimeMillis()) @@ -283,8 +283,9 @@ public class TestStandardProcessSession { assertEquals(0, provenanceRepo.getEvents(0L, 100000).size()); } + @Test - public void testProvenanceEventsEmittedForSpawnIfNotRemoved() throws IOException { + public void testProvenanceEventsEmittedForForkIfNotRemoved() throws IOException { final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder() .addAttribute("uuid", "12345678-1234-1234-1234-123456789012") .entryDate(System.currentTimeMillis()) @@ -319,6 +320,79 @@ public class TestStandardProcessSession { assertEquals(1, provenanceRepo.getEvents(0L, 100000).size()); } + @Test + public void testUpdateAttributesThenJoin() throws IOException { + final FlowFileRecord flowFileRecord1 = new StandardFlowFileRecord.Builder() + .id(1L) + .addAttribute("uuid", "11111111-1111-1111-1111-111111111111") + .entryDate(System.currentTimeMillis()) + .build(); + + final FlowFileRecord flowFileRecord2 = new StandardFlowFileRecord.Builder() + .id(2L) + .addAttribute("uuid", "22222222-2222-2222-2222-222222222222") + .entryDate(System.currentTimeMillis()) + .build(); + + flowFileQueue.put(flowFileRecord1); + flowFileQueue.put(flowFileRecord2); + + FlowFile ff1 = session.get(); + FlowFile ff2 = session.get(); + + ff1 = session.putAttribute(ff1, "index", "1"); + ff2 = session.putAttribute(ff2, "index", "2"); + + final List parents = new ArrayList<>(2); + parents.add(ff1); + parents.add(ff2); + + final FlowFile child = session.create(parents); + + final Relationship rel = new Relationship.Builder().name("A").build(); + + session.transfer(ff1, rel); + session.transfer(ff2, rel); + session.transfer(child, rel); + + session.commit(); + + final List events = provenanceRepo.getEvents(0L, 1000); + + // We should have a JOIN and 2 ATTRIBUTE_MODIFIED's + assertEquals(3, events.size()); + + int joinCount = 0; + int ff1UpdateCount = 0; + int ff2UpdateCount = 0; + + for ( final ProvenanceEventRecord event : events ) { + switch (event.getEventType()) { + case JOIN: + assertEquals(child.getAttribute("uuid"), event.getFlowFileUuid()); + joinCount++; + break; + case ATTRIBUTES_MODIFIED: + if ( event.getFlowFileUuid().equals(ff1.getAttribute("uuid")) ) { + ff1UpdateCount++; + } else if ( event.getFlowFileUuid().equals(ff2.getAttribute("uuid")) ) { + ff2UpdateCount++; + } else { + Assert.fail("Got ATTRIBUTE_MODIFIED for wrong FlowFile: " + event.getFlowFileUuid()); + } + break; + default: + Assert.fail("Unexpected event type: " + event); + } + } + + assertEquals(1, joinCount); + assertEquals(1, ff1UpdateCount); + assertEquals(1, ff2UpdateCount); + + assertEquals(1, joinCount); + } + @Test public void testForkOneToOneReported() throws IOException { final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder() @@ -628,34 +702,34 @@ public class TestStandardProcessSession { @Test public void testContentNotFoundExceptionThrownWhenUnableToReadDataOffsetTooLarge() { final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder() - .addAttribute("uuid", "12345678-1234-1234-1234-123456789012") - .entryDate(System.currentTimeMillis()) - .contentClaim(new ContentClaim() { - @Override - public int compareTo(ContentClaim arg0) { - return 0; - } - - @Override - public String getId() { - return "0"; - } - - @Override - public String getContainer() { - return "container"; - } - - @Override - public String getSection() { - return "section"; - } - - @Override - public boolean isLossTolerant() { - return true; - } - }).build(); + .addAttribute("uuid", "12345678-1234-1234-1234-123456789012") + .entryDate(System.currentTimeMillis()) + .contentClaim(new ContentClaim() { + @Override + public int compareTo(ContentClaim arg0) { + return 0; + } + + @Override + public String getId() { + return "0"; + } + + @Override + public String getContainer() { + return "container"; + } + + @Override + public String getSection() { + return "section"; + } + + @Override + public boolean isLossTolerant() { + return true; + } + }).build(); flowFileQueue.put(flowFileRecord); FlowFile ff1 = session.get(); @@ -668,37 +742,35 @@ public class TestStandardProcessSession { session.commit(); final FlowFileRecord flowFileRecord2 = new StandardFlowFileRecord.Builder() - .addAttribute("uuid", "12345678-1234-1234-1234-123456789012") - .entryDate(System.currentTimeMillis()) - .contentClaim(new ContentClaim() { - @Override - public int compareTo(ContentClaim arg0) { - return 0; - } - - @Override - public String getId() { - return "0"; - } - - @Override - public String getContainer() { - return "container"; - } - - @Override - public String getSection() { - return "section"; - } - - @Override - public boolean isLossTolerant() { - return true; - } - }) - .contentClaimOffset(1000L) - .size(1L) - .build(); + .addAttribute("uuid", "12345678-1234-1234-1234-123456789012") + .entryDate(System.currentTimeMillis()) + .contentClaim(new ContentClaim() { + @Override + public int compareTo(ContentClaim arg0) { + return 0; + } + + @Override + public String getId() { + return "0"; + } + + @Override + public String getContainer() { + return "container"; + } + + @Override + public String getSection() { + return "section"; + } + + @Override + public boolean isLossTolerant() { + return true; + } + }) + .contentClaimOffset(1000L).size(1L).build(); flowFileQueue.put(flowFileRecord2); // attempt to read the data. @@ -759,6 +831,21 @@ public class TestStandardProcessSession { } } + + @Test + public void testCreateEmitted() throws IOException { + FlowFile newFlowFile = session.create(); + session.transfer(newFlowFile, new Relationship.Builder().name("A").build()); + session.commit(); + + final List events = provenanceRepo.getEvents(0L, 10000); + assertFalse(events.isEmpty()); + assertEquals(1, events.size()); + + final ProvenanceEventRecord event = events.get(0); + assertEquals(ProvenanceEventType.CREATE, event.getEventType()); + } + private static class MockFlowFileRepository implements FlowFileRepository { private final AtomicLong idGenerator = new AtomicLong(0L);