mirror of https://github.com/apache/nifi.git
NIFI-13808: Record attributes and content for clone, upload and send.
This commit is contained in:
parent
aacbd514ce
commit
cb6a8415e1
|
@ -1007,8 +1007,9 @@ public class StandardProcessSession implements ProcessSession, ProvenanceEventEn
|
||||||
// the representation of the FlowFile as it is committed, as this is the only way in which it really
|
// the representation of the FlowFile as it is committed, as this is the only way in which it really
|
||||||
// exists in our system -- all other representations are volatile representations that have not been
|
// exists in our system -- all other representations are volatile representations that have not been
|
||||||
// exposed.
|
// exposed.
|
||||||
final boolean isUpdateAttributes = rawEvent.getEventType() != ProvenanceEventType.SEND && rawEvent.getEventType() != ProvenanceEventType.UPLOAD;
|
final boolean isUpdateAttributesAndContent = rawEvent.getEventType() != ProvenanceEventType.SEND && rawEvent.getEventType() != ProvenanceEventType.UPLOAD
|
||||||
return enrich(rawEvent, flowFileRecordMap, checkpoint.records, isUpdateAttributes, commitNanos);
|
&& rawEvent.getEventType() != ProvenanceEventType.CLONE;
|
||||||
|
return enrich(rawEvent, flowFileRecordMap, checkpoint.records, isUpdateAttributesAndContent, commitNanos);
|
||||||
} else if (autoTermIterator != null && autoTermIterator.hasNext()) {
|
} else if (autoTermIterator != null && autoTermIterator.hasNext()) {
|
||||||
return enrich(autoTermIterator.next(), flowFileRecordMap, checkpoint.records, true, commitNanos);
|
return enrich(autoTermIterator.next(), flowFileRecordMap, checkpoint.records, true, commitNanos);
|
||||||
}
|
}
|
||||||
|
@ -1085,13 +1086,13 @@ public class StandardProcessSession implements ProcessSession, ProvenanceEventEn
|
||||||
|
|
||||||
private ProvenanceEventRecord enrich(
|
private ProvenanceEventRecord enrich(
|
||||||
final ProvenanceEventRecord rawEvent, final Map<String, FlowFileRecord> flowFileRecordMap, final Map<Long, StandardRepositoryRecord> records,
|
final ProvenanceEventRecord rawEvent, final Map<String, FlowFileRecord> flowFileRecordMap, final Map<Long, StandardRepositoryRecord> records,
|
||||||
final boolean updateAttributes, final long commitNanos) {
|
final boolean updateAttributesAndContent, final long commitNanos) {
|
||||||
final ProvenanceEventBuilder recordBuilder = context.createProvenanceEventBuilder().fromEvent(rawEvent);
|
final ProvenanceEventBuilder recordBuilder = context.createProvenanceEventBuilder().fromEvent(rawEvent);
|
||||||
final FlowFileRecord eventFlowFile = flowFileRecordMap.get(rawEvent.getFlowFileUuid());
|
final FlowFileRecord eventFlowFile = flowFileRecordMap.get(rawEvent.getFlowFileUuid());
|
||||||
if (eventFlowFile != null) {
|
if (eventFlowFile != null) {
|
||||||
final StandardRepositoryRecord repoRecord = records.get(eventFlowFile.getId());
|
final StandardRepositoryRecord repoRecord = records.get(eventFlowFile.getId());
|
||||||
|
|
||||||
if (repoRecord.getCurrent() != null && repoRecord.getCurrentClaim() != null) {
|
if (updateAttributesAndContent && repoRecord.getCurrent() != null && repoRecord.getCurrentClaim() != null) {
|
||||||
final ContentClaim currentClaim = repoRecord.getCurrentClaim();
|
final ContentClaim currentClaim = repoRecord.getCurrentClaim();
|
||||||
final long currentOffset = repoRecord.getCurrentClaimOffset();
|
final long currentOffset = repoRecord.getCurrentClaimOffset();
|
||||||
final long size = eventFlowFile.getSize();
|
final long size = eventFlowFile.getSize();
|
||||||
|
@ -1100,7 +1101,7 @@ public class StandardProcessSession implements ProcessSession, ProvenanceEventEn
|
||||||
recordBuilder.setCurrentContentClaim(resourceClaim.getContainer(), resourceClaim.getSection(), resourceClaim.getId(), currentOffset + currentClaim.getOffset(), size);
|
recordBuilder.setCurrentContentClaim(resourceClaim.getContainer(), resourceClaim.getSection(), resourceClaim.getId(), currentOffset + currentClaim.getOffset(), size);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (repoRecord.getOriginal() != null && repoRecord.getOriginalClaim() != null) {
|
if (updateAttributesAndContent && repoRecord.getOriginal() != null && repoRecord.getOriginalClaim() != null) {
|
||||||
final ContentClaim originalClaim = repoRecord.getOriginalClaim();
|
final ContentClaim originalClaim = repoRecord.getOriginalClaim();
|
||||||
final long originalOffset = repoRecord.getOriginal().getContentClaimOffset();
|
final long originalOffset = repoRecord.getOriginal().getContentClaimOffset();
|
||||||
final long originalSize = repoRecord.getOriginal().getSize();
|
final long originalSize = repoRecord.getOriginal().getSize();
|
||||||
|
@ -1114,7 +1115,7 @@ public class StandardProcessSession implements ProcessSession, ProvenanceEventEn
|
||||||
recordBuilder.setSourceQueueIdentifier(originalQueue.getIdentifier());
|
recordBuilder.setSourceQueueIdentifier(originalQueue.getIdentifier());
|
||||||
}
|
}
|
||||||
|
|
||||||
if (updateAttributes) {
|
if (updateAttributesAndContent) {
|
||||||
recordBuilder.setAttributes(repoRecord.getOriginalAttributes(), repoRecord.getUpdatedAttributes());
|
recordBuilder.setAttributes(repoRecord.getOriginalAttributes(), repoRecord.getUpdatedAttributes());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -457,7 +457,10 @@ public class StandardProvenanceReporter implements InternalProvenanceReporter {
|
||||||
final ProvenanceEventBuilder eventBuilder = build(parent, ProvenanceEventType.CLONE);
|
final ProvenanceEventBuilder eventBuilder = build(parent, ProvenanceEventType.CLONE);
|
||||||
eventBuilder.addChildFlowFile(child);
|
eventBuilder.addChildFlowFile(child);
|
||||||
eventBuilder.addParentFlowFile(parent);
|
eventBuilder.addParentFlowFile(parent);
|
||||||
events.add(eventBuilder.build());
|
final ProvenanceEventRecord eventRecord = eventBuilder.build();
|
||||||
|
final ProvenanceEventRecord enriched = eventEnricher == null ? eventRecord : eventEnricher.enrich(eventRecord, parent, System.nanoTime());
|
||||||
|
|
||||||
|
events.add(enriched);
|
||||||
} catch (final Exception e) {
|
} catch (final Exception e) {
|
||||||
logger.error("Failed to generate Provenance Event", e);
|
logger.error("Failed to generate Provenance Event", e);
|
||||||
}
|
}
|
||||||
|
|
|
@ -28,6 +28,11 @@ import java.util.Set;
|
||||||
|
|
||||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||||
import static org.junit.jupiter.api.Assertions.assertNotNull;
|
import static org.junit.jupiter.api.Assertions.assertNotNull;
|
||||||
|
import static org.mockito.ArgumentMatchers.any;
|
||||||
|
import static org.mockito.ArgumentMatchers.anyLong;
|
||||||
|
import static org.mockito.ArgumentMatchers.eq;
|
||||||
|
import static org.mockito.Mockito.times;
|
||||||
|
import static org.mockito.Mockito.verify;
|
||||||
|
|
||||||
class StandardProvenanceReporterTest {
|
class StandardProvenanceReporterTest {
|
||||||
|
|
||||||
|
@ -46,4 +51,19 @@ class StandardProvenanceReporterTest {
|
||||||
assertNotNull(record);
|
assertNotNull(record);
|
||||||
assertEquals("These are details", record.getDetails());
|
assertEquals("These are details", record.getDetails());
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
@Test
|
||||||
|
public void testEnrichEvents() {
|
||||||
|
final ProvenanceEventRepository mockRepo = Mockito.mock(ProvenanceEventRepository.class);
|
||||||
|
final ProvenanceEventEnricher enricher = Mockito.mock(ProvenanceEventEnricher.class);
|
||||||
|
final StandardProvenanceReporter reporter = new StandardProvenanceReporter(null, "1234", "TestProc", mockRepo, enricher);
|
||||||
|
Mockito.when(mockRepo.eventBuilder()).thenReturn(new StandardProvenanceEventRecord.Builder());
|
||||||
|
|
||||||
|
final FlowFile flowFile = new StandardFlowFileRecord.Builder().id(10L).addAttribute("uuid", "10").build();
|
||||||
|
final FlowFile childFlowFile = new StandardFlowFileRecord.Builder().id(11L).addAttribute("uuid", "11").build();
|
||||||
|
reporter.send(flowFile, "test://noop");
|
||||||
|
reporter.upload(flowFile, 0, "test://noop");
|
||||||
|
reporter.clone(flowFile, childFlowFile);
|
||||||
|
verify(enricher, times(3)).enrich(any(), eq(flowFile), anyLong());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue