NIFI-1196 Providing handling of FETCH provenance events for their "unique" property, transit URI, within the framework and UI.

Reviewed by Tony Kurc (tkurc@apache.org)
This commit is contained in:
Aldrin Piri 2015-11-19 02:52:01 -05:00
parent 40dd8a0a84
commit 08d59e4374
6 changed files with 14 additions and 1 deletions

View File

@ -288,6 +288,7 @@ public class StandardLineageResult implements ComputeLineageResult {
} }
break; break;
case RECEIVE: case RECEIVE:
case FETCH:
case CREATE: { case CREATE: {
// for a receive event, we want to create a FlowFile Node that represents the FlowFile received // for a receive event, we want to create a FlowFile Node that represents the FlowFile received
// and create an edge from the Receive Event to the FlowFile Node // and create an edge from the Receive Event to the FlowFile Node

View File

@ -728,6 +728,7 @@ public final class StandardProvenanceEventRecord implements ProvenanceEventRecor
} }
break; break;
case RECEIVE: case RECEIVE:
case FETCH:
case SEND: case SEND:
assertSet(transitUri, "Transit URI"); assertSet(transitUri, "Transit URI");
break; break;

View File

@ -457,6 +457,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
bytesSent += event.getFileSize(); bytesSent += event.getFileSize();
break; break;
case RECEIVE: case RECEIVE:
case FETCH:
flowFilesReceived++; flowFilesReceived++;
bytesReceived += event.getFileSize(); bytesReceived += event.getFileSize();
break; break;
@ -616,7 +617,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
if (registeredTypes.contains(ProvenanceEventType.CREATE) if (registeredTypes.contains(ProvenanceEventType.CREATE)
|| registeredTypes.contains(ProvenanceEventType.FORK) || registeredTypes.contains(ProvenanceEventType.FORK)
|| registeredTypes.contains(ProvenanceEventType.JOIN) || registeredTypes.contains(ProvenanceEventType.JOIN)
|| registeredTypes.contains(ProvenanceEventType.RECEIVE)) { || registeredTypes.contains(ProvenanceEventType.RECEIVE)
|| registeredTypes.contains(ProvenanceEventType.FETCH)) {
creationEventRegistered = true; creationEventRegistered = true;
} }
} }

View File

@ -1203,6 +1203,11 @@ nf.ProvenanceTable = (function () {
formatEventDetail('Relationship', event.relationship); formatEventDetail('Relationship', event.relationship);
} }
// conditionally show FETCH details
if (event.eventType === 'FETCH') {
formatEventDetail('Transit Uri', event.transitUri);
}
// conditionally show the cluster node identifier // conditionally show the cluster node identifier
if (nf.Common.isDefinedAndNotNull(event.clusterNodeId)) { if (nf.Common.isDefinedAndNotNull(event.clusterNodeId)) {
// save the cluster node id // save the cluster node id

View File

@ -347,6 +347,8 @@ public class StandardRecordReader implements RecordReader {
} else if (eventType == ProvenanceEventType.RECEIVE) { } else if (eventType == ProvenanceEventType.RECEIVE) {
builder.setTransitUri(readNullableString(dis)); builder.setTransitUri(readNullableString(dis));
builder.setSourceSystemFlowFileIdentifier(readNullableString(dis)); builder.setSourceSystemFlowFileIdentifier(readNullableString(dis));
} else if (eventType == ProvenanceEventType.FETCH) {
builder.setTransitUri(readNullableString(dis));
} else if (eventType == ProvenanceEventType.SEND) { } else if (eventType == ProvenanceEventType.SEND) {
builder.setTransitUri(readNullableString(dis)); builder.setTransitUri(readNullableString(dis));
} else if (eventType == ProvenanceEventType.ADDINFO) { } else if (eventType == ProvenanceEventType.ADDINFO) {

View File

@ -235,6 +235,8 @@ public class StandardRecordWriter implements RecordWriter {
} else if (recordType == ProvenanceEventType.RECEIVE) { } else if (recordType == ProvenanceEventType.RECEIVE) {
writeNullableString(out, record.getTransitUri()); writeNullableString(out, record.getTransitUri());
writeNullableString(out, record.getSourceSystemFlowFileIdentifier()); writeNullableString(out, record.getSourceSystemFlowFileIdentifier());
} else if (recordType == ProvenanceEventType.FETCH) {
writeNullableString(out, record.getTransitUri());
} else if (recordType == ProvenanceEventType.SEND) { } else if (recordType == ProvenanceEventType.SEND) {
writeNullableString(out, record.getTransitUri()); writeNullableString(out, record.getTransitUri());
} else if (recordType == ProvenanceEventType.ADDINFO) { } else if (recordType == ProvenanceEventType.ADDINFO) {