NIFI-72: Auto-generate CREATE, CONTENT_MODIFIED, ATTRIBUTES_MODIFIED events when appropriate

This commit is contained in:
Mark Payne 2014-12-11 09:03:05 -05:00
parent 8ed8d69899
commit 6b0a5e8cd7
3 changed files with 232 additions and 66 deletions

View File

@ -75,7 +75,6 @@ import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.provenance.ProvenanceReporter;
import org.apache.nifi.provenance.StandardProvenanceEventRecord;
import org.apache.nifi.util.NiFiProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -488,6 +487,16 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
}
}
private void addEventType(final Map<String, Set<ProvenanceEventType>> map, final String id, final ProvenanceEventType eventType) {
Set<ProvenanceEventType> eventTypes = map.get(id);
if ( eventTypes == null ) {
eventTypes = new HashSet<>();
map.put(id, eventTypes);
}
eventTypes.add(eventType);
}
private void updateProvenanceRepo(final Checkpoint checkpoint) {
// Update Provenance Repository
final ProvenanceEventRepository provenanceRepo = context.getProvenanceRepository();
@ -496,6 +505,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
// in case the Processor developer submitted the same events to the reporter. So we use a LinkedHashSet
// for this, so that we are able to ensure that the events are submitted in the proper order.
final Set<ProvenanceEventRecord> recordsToSubmit = new LinkedHashSet<>();
final Map<String, Set<ProvenanceEventType>> eventTypesPerFlowFileId = new HashMap<>();
final Set<ProvenanceEventRecord> processorGenerated = checkpoint.reportedEvents;
@ -513,6 +523,13 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
if (!event.getChildUuids().isEmpty() && !isSpuriousForkEvent(event, checkpoint.removedFlowFiles) && !processorGenerated.contains(event)) {
recordsToSubmit.add(event);
for ( final String childUuid : event.getChildUuids() ) {
addEventType(eventTypesPerFlowFileId, childUuid, event.getEventType());
}
for ( final String parentUuid : event.getParentUuids() ) {
addEventType(eventTypesPerFlowFileId, parentUuid, event.getEventType());
}
}
}
@ -523,6 +540,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
}
recordsToSubmit.add(event);
addEventType(eventTypesPerFlowFileId, event.getFlowFileUuid(), event.getEventType());
}
// Finally, add any other events that we may have generated.
@ -533,6 +551,67 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
}
recordsToSubmit.add(event);
addEventType(eventTypesPerFlowFileId, event.getFlowFileUuid(), event.getEventType());
}
}
// Check if content or attributes changed. If so, register the appropriate events.
for (final StandardRepositoryRecord repoRecord : checkpoint.records.values() ) {
final ContentClaim original = repoRecord.getOriginalClaim();
final ContentClaim current = repoRecord.getCurrentClaim();
boolean contentChanged = false;
if ( original == null && current != null ) {
contentChanged = true;
}
if ( original != null && current == null ) {
contentChanged = true;
}
if ( original != null && current != null && !original.equals(current) ) {
contentChanged = true;
}
final FlowFileRecord curFlowFile = repoRecord.getCurrent();
final String flowFileId = curFlowFile.getAttribute(CoreAttributes.UUID.key());
boolean eventAdded = false;
if (checkpoint.removedFlowFiles.contains(flowFileId)) {
continue;
}
if ( contentChanged ) {
recordsToSubmit.add(provenanceReporter.build(curFlowFile, ProvenanceEventType.CONTENT_MODIFIED).build());
addEventType(eventTypesPerFlowFileId, flowFileId, ProvenanceEventType.CONTENT_MODIFIED);
eventAdded = true;
}
if ( checkpoint.createdFlowFiles.contains(flowFileId) ) {
final Set<ProvenanceEventType> registeredTypes = eventTypesPerFlowFileId.get(flowFileId);
boolean creationEventRegistered = false;
if ( registeredTypes != null ) {
if ( registeredTypes.contains(ProvenanceEventType.CREATE) ||
registeredTypes.contains(ProvenanceEventType.FORK) ||
registeredTypes.contains(ProvenanceEventType.JOIN) ||
registeredTypes.contains(ProvenanceEventType.RECEIVE) ) {
creationEventRegistered = true;
}
}
if ( !creationEventRegistered ) {
recordsToSubmit.add(provenanceReporter.build(curFlowFile, ProvenanceEventType.CREATE).build());
eventAdded = true;
}
}
if ( !eventAdded && !repoRecord.getUpdatedAttributes().isEmpty() ) {
// We generate an ATTRIBUTES_MODIFIED event only if no other event has been
// created for the FlowFile. We do this because all events contain both the
// newest and the original attributes, so generating an ATTRIBUTES_MODIFIED
// event is redundant if another already exists.
if ( !eventTypesPerFlowFileId.containsKey(flowFileId) ) {
recordsToSubmit.add(provenanceReporter.build(curFlowFile, ProvenanceEventType.ATTRIBUTES_MODIFIED).build());
addEventType(eventTypesPerFlowFileId, flowFileId, ProvenanceEventType.ATTRIBUTES_MODIFIED);
}
}
}

View File

@ -421,7 +421,7 @@ public class StandardProvenanceReporter implements ProvenanceReporter {
}
}
private ProvenanceEventBuilder build(final FlowFile flowFile, final ProvenanceEventType eventType) {
ProvenanceEventBuilder build(final FlowFile flowFile, final ProvenanceEventType eventType) {
final ProvenanceEventBuilder builder = repository.eventBuilder();
builder.setEventType(eventType);
builder.fromFlowFile(flowFile);

View File

@ -17,6 +17,7 @@
package org.apache.nifi.controller.repository;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.when;
@ -64,7 +65,6 @@ import org.apache.nifi.provenance.MockProvenanceEventRepository;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventRepository;
import org.apache.nifi.provenance.ProvenanceEventType;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@ -267,7 +267,7 @@ public class TestStandardProcessSession {
}
@Test
public void testSpawnsNotEmittedIfFilesDeleted() throws IOException {
public void testForksNotEmittedIfFilesDeleted() throws IOException {
final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
.addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
.entryDate(System.currentTimeMillis())
@ -283,8 +283,9 @@ public class TestStandardProcessSession {
assertEquals(0, provenanceRepo.getEvents(0L, 100000).size());
}
@Test
public void testProvenanceEventsEmittedForSpawnIfNotRemoved() throws IOException {
public void testProvenanceEventsEmittedForForkIfNotRemoved() throws IOException {
final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
.addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
.entryDate(System.currentTimeMillis())
@ -319,6 +320,79 @@ public class TestStandardProcessSession {
assertEquals(1, provenanceRepo.getEvents(0L, 100000).size());
}
@Test
public void testUpdateAttributesThenJoin() throws IOException {
final FlowFileRecord flowFileRecord1 = new StandardFlowFileRecord.Builder()
.id(1L)
.addAttribute("uuid", "11111111-1111-1111-1111-111111111111")
.entryDate(System.currentTimeMillis())
.build();
final FlowFileRecord flowFileRecord2 = new StandardFlowFileRecord.Builder()
.id(2L)
.addAttribute("uuid", "22222222-2222-2222-2222-222222222222")
.entryDate(System.currentTimeMillis())
.build();
flowFileQueue.put(flowFileRecord1);
flowFileQueue.put(flowFileRecord2);
FlowFile ff1 = session.get();
FlowFile ff2 = session.get();
ff1 = session.putAttribute(ff1, "index", "1");
ff2 = session.putAttribute(ff2, "index", "2");
final List<FlowFile> parents = new ArrayList<>(2);
parents.add(ff1);
parents.add(ff2);
final FlowFile child = session.create(parents);
final Relationship rel = new Relationship.Builder().name("A").build();
session.transfer(ff1, rel);
session.transfer(ff2, rel);
session.transfer(child, rel);
session.commit();
final List<ProvenanceEventRecord> events = provenanceRepo.getEvents(0L, 1000);
// We should have a JOIN and 2 ATTRIBUTE_MODIFIED's
assertEquals(3, events.size());
int joinCount = 0;
int ff1UpdateCount = 0;
int ff2UpdateCount = 0;
for ( final ProvenanceEventRecord event : events ) {
switch (event.getEventType()) {
case JOIN:
assertEquals(child.getAttribute("uuid"), event.getFlowFileUuid());
joinCount++;
break;
case ATTRIBUTES_MODIFIED:
if ( event.getFlowFileUuid().equals(ff1.getAttribute("uuid")) ) {
ff1UpdateCount++;
} else if ( event.getFlowFileUuid().equals(ff2.getAttribute("uuid")) ) {
ff2UpdateCount++;
} else {
Assert.fail("Got ATTRIBUTE_MODIFIED for wrong FlowFile: " + event.getFlowFileUuid());
}
break;
default:
Assert.fail("Unexpected event type: " + event);
}
}
assertEquals(1, joinCount);
assertEquals(1, ff1UpdateCount);
assertEquals(1, ff2UpdateCount);
assertEquals(1, joinCount);
}
@Test
public void testForkOneToOneReported() throws IOException {
final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
@ -628,34 +702,34 @@ public class TestStandardProcessSession {
@Test
public void testContentNotFoundExceptionThrownWhenUnableToReadDataOffsetTooLarge() {
final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
.addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
.entryDate(System.currentTimeMillis())
.contentClaim(new ContentClaim() {
@Override
public int compareTo(ContentClaim arg0) {
return 0;
}
.addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
.entryDate(System.currentTimeMillis())
.contentClaim(new ContentClaim() {
@Override
public int compareTo(ContentClaim arg0) {
return 0;
}
@Override
public String getId() {
return "0";
}
@Override
public String getId() {
return "0";
}
@Override
public String getContainer() {
return "container";
}
@Override
public String getContainer() {
return "container";
}
@Override
public String getSection() {
return "section";
}
@Override
public String getSection() {
return "section";
}
@Override
public boolean isLossTolerant() {
return true;
}
}).build();
@Override
public boolean isLossTolerant() {
return true;
}
}).build();
flowFileQueue.put(flowFileRecord);
FlowFile ff1 = session.get();
@ -668,37 +742,35 @@ public class TestStandardProcessSession {
session.commit();
final FlowFileRecord flowFileRecord2 = new StandardFlowFileRecord.Builder()
.addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
.entryDate(System.currentTimeMillis())
.contentClaim(new ContentClaim() {
@Override
public int compareTo(ContentClaim arg0) {
return 0;
}
.addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
.entryDate(System.currentTimeMillis())
.contentClaim(new ContentClaim() {
@Override
public int compareTo(ContentClaim arg0) {
return 0;
}
@Override
public String getId() {
return "0";
}
@Override
public String getId() {
return "0";
}
@Override
public String getContainer() {
return "container";
}
@Override
public String getContainer() {
return "container";
}
@Override
public String getSection() {
return "section";
}
@Override
public String getSection() {
return "section";
}
@Override
public boolean isLossTolerant() {
return true;
}
})
.contentClaimOffset(1000L)
.size(1L)
.build();
@Override
public boolean isLossTolerant() {
return true;
}
})
.contentClaimOffset(1000L).size(1L).build();
flowFileQueue.put(flowFileRecord2);
// attempt to read the data.
@ -759,6 +831,21 @@ public class TestStandardProcessSession {
}
}
@Test
public void testCreateEmitted() throws IOException {
FlowFile newFlowFile = session.create();
session.transfer(newFlowFile, new Relationship.Builder().name("A").build());
session.commit();
final List<ProvenanceEventRecord> events = provenanceRepo.getEvents(0L, 10000);
assertFalse(events.isEmpty());
assertEquals(1, events.size());
final ProvenanceEventRecord event = events.get(0);
assertEquals(ProvenanceEventType.CREATE, event.getEventType());
}
private static class MockFlowFileRepository implements FlowFileRepository {
private final AtomicLong idGenerator = new AtomicLong(0L);