NIFI-72: Added unit tests and fixed bug that caused CONTENT_MODIFIED to be emitted for newly created FlowFiles

This commit is contained in:
Mark Payne 2014-12-11 09:16:48 -05:00
parent 6b0a5e8cd7
commit cbea1f1936
2 changed files with 70 additions and 1 deletions

View File

@ -579,7 +579,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
continue; continue;
} }
if ( contentChanged ) { final boolean newFlowFile = repoRecord.getOriginal() == null;
if ( contentChanged && !newFlowFile ) {
recordsToSubmit.add(provenanceReporter.build(curFlowFile, ProvenanceEventType.CONTENT_MODIFIED).build()); recordsToSubmit.add(provenanceReporter.build(curFlowFile, ProvenanceEventType.CONTENT_MODIFIED).build());
addEventType(eventTypesPerFlowFileId, flowFileId, ProvenanceEventType.CONTENT_MODIFIED); addEventType(eventTypesPerFlowFileId, flowFileId, ProvenanceEventType.CONTENT_MODIFIED);
eventAdded = true; eventAdded = true;

View File

@ -846,6 +846,74 @@ public class TestStandardProcessSession {
assertEquals(ProvenanceEventType.CREATE, event.getEventType()); assertEquals(ProvenanceEventType.CREATE, event.getEventType());
} }
@Test
public void testContentModifiedNotEmittedForCreate() throws IOException {
FlowFile newFlowFile = session.create();
newFlowFile = session.write(newFlowFile, new OutputStreamCallback() {
@Override
public void process(OutputStream out) throws IOException {
}
});
session.transfer(newFlowFile, new Relationship.Builder().name("A").build());
session.commit();
final List<ProvenanceEventRecord> events = provenanceRepo.getEvents(0L, 10000);
assertFalse(events.isEmpty());
assertEquals(1, events.size());
final ProvenanceEventRecord event = events.get(0);
assertEquals(ProvenanceEventType.CREATE, event.getEventType());
}
@Test
public void testContentModifiedEmittedAndNotAttributesModified() throws IOException {
final FlowFileRecord flowFile = new StandardFlowFileRecord.Builder()
.id(1L)
.addAttribute("uuid", "000000000000-0000-0000-0000-00000000")
.build();
this.flowFileQueue.put(flowFile);
FlowFile existingFlowFile = session.get();
existingFlowFile = session.write(existingFlowFile, new OutputStreamCallback() {
@Override
public void process(OutputStream out) throws IOException {
}
});
existingFlowFile = session.putAttribute(existingFlowFile, "attr", "a");
session.transfer(existingFlowFile, new Relationship.Builder().name("A").build());
session.commit();
final List<ProvenanceEventRecord> events = provenanceRepo.getEvents(0L, 10000);
assertFalse(events.isEmpty());
assertEquals(1, events.size());
final ProvenanceEventRecord event = events.get(0);
assertEquals(ProvenanceEventType.CONTENT_MODIFIED, event.getEventType());
}
@Test
public void testAttributesModifiedEmitted() throws IOException {
final FlowFileRecord flowFile = new StandardFlowFileRecord.Builder()
.id(1L)
.addAttribute("uuid", "000000000000-0000-0000-0000-00000000")
.build();
this.flowFileQueue.put(flowFile);
FlowFile existingFlowFile = session.get();
existingFlowFile = session.putAttribute(existingFlowFile, "attr", "a");
session.transfer(existingFlowFile, new Relationship.Builder().name("A").build());
session.commit();
final List<ProvenanceEventRecord> events = provenanceRepo.getEvents(0L, 10000);
assertFalse(events.isEmpty());
assertEquals(1, events.size());
final ProvenanceEventRecord event = events.get(0);
assertEquals(ProvenanceEventType.ATTRIBUTES_MODIFIED, event.getEventType());
}
private static class MockFlowFileRepository implements FlowFileRepository { private static class MockFlowFileRepository implements FlowFileRepository {
private final AtomicLong idGenerator = new AtomicLong(0L); private final AtomicLong idGenerator = new AtomicLong(0L);