diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java index ef2fb935d5..ca68725bf1 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java @@ -426,6 +426,7 @@ public class TestStandardProcessSession { assertEquals(0, provenanceRepo.getEvents(0L, 100000).size()); } + @Test public void testProvenanceEventsEmittedForForkIfNotRemoved() throws IOException { final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder() @@ -465,59 +466,59 @@ public class TestStandardProcessSession { @Test public void testUpdateAttributesThenJoin() throws IOException { final FlowFileRecord flowFileRecord1 = new StandardFlowFileRecord.Builder() - .id(1L) - .addAttribute("uuid", "11111111-1111-1111-1111-111111111111") - .entryDate(System.currentTimeMillis()) - .build(); - + .id(1L) + .addAttribute("uuid", "11111111-1111-1111-1111-111111111111") + .entryDate(System.currentTimeMillis()) + .build(); + final FlowFileRecord flowFileRecord2 = new StandardFlowFileRecord.Builder() - .id(2L) - .addAttribute("uuid", "22222222-2222-2222-2222-222222222222") - .entryDate(System.currentTimeMillis()) - .build(); - + .id(2L) + .addAttribute("uuid", "22222222-2222-2222-2222-222222222222") + .entryDate(System.currentTimeMillis()) + .build(); + flowFileQueue.put(flowFileRecord1); flowFileQueue.put(flowFileRecord2); - + FlowFile ff1 = session.get(); FlowFile ff2 = session.get(); ff1 = session.putAttribute(ff1, "index", "1"); ff2 = session.putAttribute(ff2, "index", "2"); - + final List parents = new ArrayList<>(2); parents.add(ff1); parents.add(ff2); - + final FlowFile child = session.create(parents); - + final Relationship rel = new Relationship.Builder().name("A").build(); - + session.transfer(ff1, rel); session.transfer(ff2, rel); session.transfer(child, rel); - + session.commit(); - + final List events = provenanceRepo.getEvents(0L, 1000); // We should have a JOIN and 2 ATTRIBUTE_MODIFIED's assertEquals(3, events.size()); - + int joinCount = 0; int ff1UpdateCount = 0; int ff2UpdateCount = 0; - - for (final ProvenanceEventRecord event : events) { + + for ( final ProvenanceEventRecord event : events ) { switch (event.getEventType()) { case JOIN: assertEquals(child.getAttribute("uuid"), event.getFlowFileUuid()); joinCount++; break; case ATTRIBUTES_MODIFIED: - if (event.getFlowFileUuid().equals(ff1.getAttribute("uuid"))) { + if ( event.getFlowFileUuid().equals(ff1.getAttribute("uuid")) ) { ff1UpdateCount++; - } else if (event.getFlowFileUuid().equals(ff2.getAttribute("uuid"))) { + } else if ( event.getFlowFileUuid().equals(ff2.getAttribute("uuid")) ) { ff2UpdateCount++; } else { Assert.fail("Got ATTRIBUTE_MODIFIED for wrong FlowFile: " + event.getFlowFileUuid()); @@ -527,14 +528,14 @@ public class TestStandardProcessSession { Assert.fail("Unexpected event type: " + event); } } - + assertEquals(1, joinCount); assertEquals(1, ff1UpdateCount); assertEquals(1, ff2UpdateCount); - + assertEquals(1, joinCount); } - + @Test public void testForkOneToOneReported() throws IOException { final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder() @@ -844,34 +845,34 @@ public class TestStandardProcessSession { @Test public void testContentNotFoundExceptionThrownWhenUnableToReadDataOffsetTooLarge() { final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder() - .addAttribute("uuid", "12345678-1234-1234-1234-123456789012") - .entryDate(System.currentTimeMillis()) - .contentClaim(new ContentClaim() { - @Override - public int compareTo(ContentClaim arg0) { - return 0; - } - - @Override - public String getId() { - return "0"; - } - - @Override - public String getContainer() { - return "container"; - } - - @Override - public String getSection() { - return "section"; - } - - @Override - public boolean isLossTolerant() { - return true; - } - }).build(); + .addAttribute("uuid", "12345678-1234-1234-1234-123456789012") + .entryDate(System.currentTimeMillis()) + .contentClaim(new ContentClaim() { + @Override + public int compareTo(ContentClaim arg0) { + return 0; + } + + @Override + public String getId() { + return "0"; + } + + @Override + public String getContainer() { + return "container"; + } + + @Override + public String getSection() { + return "section"; + } + + @Override + public boolean isLossTolerant() { + return true; + } + }).build(); flowFileQueue.put(flowFileRecord); FlowFile ff1 = session.get(); @@ -884,35 +885,35 @@ public class TestStandardProcessSession { session.commit(); final FlowFileRecord flowFileRecord2 = new StandardFlowFileRecord.Builder() - .addAttribute("uuid", "12345678-1234-1234-1234-123456789012") - .entryDate(System.currentTimeMillis()) - .contentClaim(new ContentClaim() { - @Override - public int compareTo(ContentClaim arg0) { - return 0; - } - - @Override - public String getId() { - return "0"; - } - - @Override - public String getContainer() { - return "container"; - } - - @Override - public String getSection() { - return "section"; - } - - @Override - public boolean isLossTolerant() { - return true; - } - }) - .contentClaimOffset(1000L).size(1L).build(); + .addAttribute("uuid", "12345678-1234-1234-1234-123456789012") + .entryDate(System.currentTimeMillis()) + .contentClaim(new ContentClaim() { + @Override + public int compareTo(ContentClaim arg0) { + return 0; + } + + @Override + public String getId() { + return "0"; + } + + @Override + public String getContainer() { + return "container"; + } + + @Override + public String getSection() { + return "section"; + } + + @Override + public boolean isLossTolerant() { + return true; + } + }) + .contentClaimOffset(1000L).size(1L).build(); flowFileQueue.put(flowFileRecord2); // attempt to read the data. @@ -973,20 +974,21 @@ public class TestStandardProcessSession { } } + @Test public void testCreateEmitted() throws IOException { FlowFile newFlowFile = session.create(); session.transfer(newFlowFile, new Relationship.Builder().name("A").build()); session.commit(); - + final List events = provenanceRepo.getEvents(0L, 10000); assertFalse(events.isEmpty()); assertEquals(1, events.size()); - + final ProvenanceEventRecord event = events.get(0); assertEquals(ProvenanceEventType.CREATE, event.getEventType()); } - + @Test public void testContentModifiedNotEmittedForCreate() throws IOException { FlowFile newFlowFile = session.create(); @@ -997,23 +999,23 @@ public class TestStandardProcessSession { }); session.transfer(newFlowFile, new Relationship.Builder().name("A").build()); session.commit(); - + final List events = provenanceRepo.getEvents(0L, 10000); assertFalse(events.isEmpty()); assertEquals(1, events.size()); - + final ProvenanceEventRecord event = events.get(0); assertEquals(ProvenanceEventType.CREATE, event.getEventType()); } - + @Test public void testContentModifiedEmittedAndNotAttributesModified() throws IOException { final FlowFileRecord flowFile = new StandardFlowFileRecord.Builder() - .id(1L) - .addAttribute("uuid", "000000000000-0000-0000-0000-00000000") - .build(); + .id(1L) + .addAttribute("uuid", "000000000000-0000-0000-0000-00000000") + .build(); this.flowFileQueue.put(flowFile); - + FlowFile existingFlowFile = session.get(); existingFlowFile = session.write(existingFlowFile, new OutputStreamCallback() { @Override @@ -1023,36 +1025,38 @@ public class TestStandardProcessSession { existingFlowFile = session.putAttribute(existingFlowFile, "attr", "a"); session.transfer(existingFlowFile, new Relationship.Builder().name("A").build()); session.commit(); - + final List events = provenanceRepo.getEvents(0L, 10000); assertFalse(events.isEmpty()); assertEquals(1, events.size()); - + final ProvenanceEventRecord event = events.get(0); assertEquals(ProvenanceEventType.CONTENT_MODIFIED, event.getEventType()); } - + @Test public void testAttributesModifiedEmitted() throws IOException { final FlowFileRecord flowFile = new StandardFlowFileRecord.Builder() - .id(1L) - .addAttribute("uuid", "000000000000-0000-0000-0000-00000000") - .build(); + .id(1L) + .addAttribute("uuid", "000000000000-0000-0000-0000-00000000") + .build(); this.flowFileQueue.put(flowFile); - + FlowFile existingFlowFile = session.get(); existingFlowFile = session.putAttribute(existingFlowFile, "attr", "a"); session.transfer(existingFlowFile, new Relationship.Builder().name("A").build()); session.commit(); - + final List events = provenanceRepo.getEvents(0L, 10000); assertFalse(events.isEmpty()); assertEquals(1, events.size()); - + final ProvenanceEventRecord event = events.get(0); assertEquals(ProvenanceEventType.ATTRIBUTES_MODIFIED, event.getEventType()); } - + + + private static class MockFlowFileRepository implements FlowFileRepository { private final AtomicLong idGenerator = new AtomicLong(0L); @@ -1119,7 +1123,7 @@ public class TestStandardProcessSession { @Override public void shutdown() { } - + public Set getExistingClaims() { final Set claims = new HashSet<>(); @@ -1142,7 +1146,7 @@ public class TestStandardProcessSession { if (Files.exists(parent) == false) { Files.createDirectories(parent); } - Files.createFile(getPath(claim)); + Files.createFile(getPath(claim)); return claim; }