NIFI-647: When FORK Event emitted by processor, Framework generates CREATE events for each child in addition to the FORK event; this commit fixes that bug

This commit is contained in:
Mark Payne 2015-06-04 12:42:56 -04:00
parent 310ae3ebcd
commit 9fbd88f034
1 changed files with 88 additions and 85 deletions

View File

@ -93,7 +93,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
// determines how many things must be transferred, removed, modified in order to avoid logging the FlowFile ID's on commit/rollback // determines how many things must be transferred, removed, modified in order to avoid logging the FlowFile ID's on commit/rollback
public static final int VERBOSE_LOG_THRESHOLD = 10; public static final int VERBOSE_LOG_THRESHOLD = 10;
private static final long MAX_APPENDABLE_CLAIM_SIZE = DataUnit.parseDataSize( private static final long MAX_APPENDABLE_CLAIM_SIZE = DataUnit.parseDataSize(
NiFiProperties.getInstance().getMaxAppendableClaimSize(), DataUnit.B).longValue(); NiFiProperties.getInstance().getMaxAppendableClaimSize(), DataUnit.B).longValue();
private static final int MAX_FLOWFILES_PER_CLAIM = NiFiProperties.getInstance().getMaxFlowFilesPerClaim(); private static final int MAX_FLOWFILES_PER_CLAIM = NiFiProperties.getInstance().getMaxFlowFilesPerClaim();
public static final String DEFAULT_FLOWFILE_PATH = "./"; public static final String DEFAULT_FLOWFILE_PATH = "./";
@ -154,28 +154,28 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
String description = connectable.toString(); String description = connectable.toString();
switch (connectable.getConnectableType()) { switch (connectable.getConnectableType()) {
case PROCESSOR: case PROCESSOR:
final ProcessorNode procNode = (ProcessorNode) connectable; final ProcessorNode procNode = (ProcessorNode) connectable;
componentType = procNode.getProcessor().getClass().getSimpleName(); componentType = procNode.getProcessor().getClass().getSimpleName();
description = procNode.getProcessor().toString(); description = procNode.getProcessor().toString();
break; break;
case INPUT_PORT: case INPUT_PORT:
componentType = "Input Port"; componentType = "Input Port";
break; break;
case OUTPUT_PORT: case OUTPUT_PORT:
componentType = "Output Port"; componentType = "Output Port";
break; break;
case REMOTE_INPUT_PORT: case REMOTE_INPUT_PORT:
componentType = "Remote Input Port"; componentType = "Remote Input Port";
break; break;
case REMOTE_OUTPUT_PORT: case REMOTE_OUTPUT_PORT:
componentType = "Remote Output Port"; componentType = "Remote Output Port";
break; break;
case FUNNEL: case FUNNEL:
componentType = "Funnel"; componentType = "Funnel";
break; break;
default: default:
throw new AssertionError("Connectable type is " + connectable.getConnectableType()); throw new AssertionError("Connectable type is " + connectable.getConnectableType());
} }
this.provenanceReporter = new StandardProvenanceReporter(connectable.getIdentifier(), componentType, context.getProvenanceRepository(), this); this.provenanceReporter = new StandardProvenanceReporter(connectable.getIdentifier(), componentType, context.getProvenanceRepository(), this);
@ -437,16 +437,16 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
} }
switch (event.getEventType()) { switch (event.getEventType()) {
case SEND: case SEND:
flowFilesSent++; flowFilesSent++;
bytesSent += event.getFileSize(); bytesSent += event.getFileSize();
break; break;
case RECEIVE: case RECEIVE:
flowFilesReceived++; flowFilesReceived++;
bytesReceived += event.getFileSize(); bytesReceived += event.getFileSize();
break; break;
default: default:
break; break;
} }
} }
@ -519,9 +519,13 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
updateEventContentClaims(builder, flowFile, checkpoint.records.get(flowFile)); updateEventContentClaims(builder, flowFile, checkpoint.records.get(flowFile));
final ProvenanceEventRecord event = builder.build(); final ProvenanceEventRecord event = builder.build();
if (!event.getChildUuids().isEmpty() && !isSpuriousForkEvent(event, checkpoint.removedFlowFiles) && !processorGenerated.contains(event)) { if (!event.getChildUuids().isEmpty() && !isSpuriousForkEvent(event, checkpoint.removedFlowFiles)) {
recordsToSubmit.add(event); // If framework generated the event, add it to the 'recordsToSubmit' Set.
if (!processorGenerated.contains(event)) {
recordsToSubmit.add(event);
}
// Register the FORK event for each child and each parent.
for (final String childUuid : event.getChildUuids()) { for (final String childUuid : event.getChildUuids()) {
addEventType(eventTypesPerFlowFileId, childUuid, event.getEventType()); addEventType(eventTypesPerFlowFileId, childUuid, event.getEventType());
} }
@ -536,13 +540,12 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
if (isSpuriousForkEvent(event, checkpoint.removedFlowFiles)) { if (isSpuriousForkEvent(event, checkpoint.removedFlowFiles)) {
continue; continue;
} }
if (isSpuriousRouteEvent(event, checkpoint.records)) {
continue;
}
// Check if the event indicates that the FlowFile was routed to the same // Check if the event indicates that the FlowFile was routed to the same
// connection from which it was pulled (and only this connection). If so, discard the event. // connection from which it was pulled (and only this connection). If so, discard the event.
isSpuriousRouteEvent(event, checkpoint.records); if (isSpuriousRouteEvent(event, checkpoint.records)) {
continue;
}
recordsToSubmit.add(event); recordsToSubmit.add(event);
addEventType(eventTypesPerFlowFileId, event.getFlowFileUuid(), event.getEventType()); addEventType(eventTypesPerFlowFileId, event.getFlowFileUuid(), event.getEventType());
@ -596,9 +599,9 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
boolean creationEventRegistered = false; boolean creationEventRegistered = false;
if (registeredTypes != null) { if (registeredTypes != null) {
if (registeredTypes.contains(ProvenanceEventType.CREATE) if (registeredTypes.contains(ProvenanceEventType.CREATE)
|| registeredTypes.contains(ProvenanceEventType.FORK) || registeredTypes.contains(ProvenanceEventType.FORK)
|| registeredTypes.contains(ProvenanceEventType.JOIN) || registeredTypes.contains(ProvenanceEventType.JOIN)
|| registeredTypes.contains(ProvenanceEventType.RECEIVE)) { || registeredTypes.contains(ProvenanceEventType.RECEIVE)) {
creationEventRegistered = true; creationEventRegistered = true;
} }
} }
@ -687,11 +690,11 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
builder.setCurrentContentClaim(null, null, null, null, 0L); builder.setCurrentContentClaim(null, null, null, null, 0L);
} else { } else {
builder.setCurrentContentClaim( builder.setCurrentContentClaim(
originalClaim.getContainer(), originalClaim.getSection(), originalClaim.getId(), repoRecord.getOriginal().getContentClaimOffset(), repoRecord.getOriginal().getSize() originalClaim.getContainer(), originalClaim.getSection(), originalClaim.getId(), repoRecord.getOriginal().getContentClaimOffset(), repoRecord.getOriginal().getSize()
); );
builder.setPreviousContentClaim( builder.setPreviousContentClaim(
originalClaim.getContainer(), originalClaim.getSection(), originalClaim.getId(), repoRecord.getOriginal().getContentClaimOffset(), repoRecord.getOriginal().getSize() originalClaim.getContainer(), originalClaim.getSection(), originalClaim.getId(), repoRecord.getOriginal().getContentClaimOffset(), repoRecord.getOriginal().getSize()
); );
} }
} }
@ -727,7 +730,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
} }
private StandardProvenanceEventRecord enrich( private StandardProvenanceEventRecord enrich(
final ProvenanceEventRecord rawEvent, final Map<String, FlowFileRecord> flowFileRecordMap, final Map<FlowFileRecord, StandardRepositoryRecord> records, final boolean updateAttributes) { final ProvenanceEventRecord rawEvent, final Map<String, FlowFileRecord> flowFileRecordMap, final Map<FlowFileRecord, StandardRepositoryRecord> records, final boolean updateAttributes) {
final StandardProvenanceEventRecord.Builder recordBuilder = new StandardProvenanceEventRecord.Builder().fromEvent(rawEvent); final StandardProvenanceEventRecord.Builder recordBuilder = new StandardProvenanceEventRecord.Builder().fromEvent(rawEvent);
final FlowFileRecord eventFlowFile = flowFileRecordMap.get(rawEvent.getFlowFileUuid()); final FlowFileRecord eventFlowFile = flowFileRecordMap.get(rawEvent.getFlowFileUuid());
if (eventFlowFile != null) { if (eventFlowFile != null) {
@ -996,7 +999,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
final StringBuilder sb = new StringBuilder(512); final StringBuilder sb = new StringBuilder(512);
if (!LOG.isDebugEnabled() && (largestTransferSetSize > VERBOSE_LOG_THRESHOLD if (!LOG.isDebugEnabled() && (largestTransferSetSize > VERBOSE_LOG_THRESHOLD
|| numModified > VERBOSE_LOG_THRESHOLD || numCreated > VERBOSE_LOG_THRESHOLD || numRemoved > VERBOSE_LOG_THRESHOLD)) { || numModified > VERBOSE_LOG_THRESHOLD || numCreated > VERBOSE_LOG_THRESHOLD || numRemoved > VERBOSE_LOG_THRESHOLD)) {
if (numCreated > 0) { if (numCreated > 0) {
sb.append("created ").append(numCreated).append(" FlowFiles, "); sb.append("created ").append(numCreated).append(" FlowFiles, ");
} }
@ -1244,8 +1247,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
attrs.put(CoreAttributes.UUID.key(), UUID.randomUUID().toString()); attrs.put(CoreAttributes.UUID.key(), UUID.randomUUID().toString());
final FlowFileRecord fFile = new StandardFlowFileRecord.Builder().id(context.getNextFlowFileSequence()) final FlowFileRecord fFile = new StandardFlowFileRecord.Builder().id(context.getNextFlowFileSequence())
.addAttributes(attrs) .addAttributes(attrs)
.build(); .build();
final StandardRepositoryRecord record = new StandardRepositoryRecord(null); final StandardRepositoryRecord record = new StandardRepositoryRecord(null);
record.setWorking(fFile, attrs); record.setWorking(fFile, attrs);
records.put(fFile, record); records.put(fFile, record);
@ -1607,7 +1610,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
} }
final StandardProvenanceReporter expiredReporter = new StandardProvenanceReporter(connectable.getIdentifier(), final StandardProvenanceReporter expiredReporter = new StandardProvenanceReporter(connectable.getIdentifier(),
processorType, context.getProvenanceRepository(), this); processorType, context.getProvenanceRepository(), this);
final Map<String, FlowFileRecord> recordIdMap = new HashMap<>(); final Map<String, FlowFileRecord> recordIdMap = new HashMap<>();
for (final FlowFileRecord flowFile : flowFiles) { for (final FlowFileRecord flowFile : flowFiles) {
@ -1734,9 +1737,9 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
} }
try (final InputStream rawIn = getInputStream(source, record.getCurrentClaim(), record.getCurrentClaimOffset()); try (final InputStream rawIn = getInputStream(source, record.getCurrentClaim(), record.getCurrentClaimOffset());
final InputStream limitedIn = new LimitedInputStream(rawIn, source.getSize()); final InputStream limitedIn = new LimitedInputStream(rawIn, source.getSize());
final InputStream disableOnCloseIn = new DisableOnCloseInputStream(limitedIn); final InputStream disableOnCloseIn = new DisableOnCloseInputStream(limitedIn);
final ByteCountingInputStream countingStream = new ByteCountingInputStream(disableOnCloseIn, this.bytesRead)) { final ByteCountingInputStream countingStream = new ByteCountingInputStream(disableOnCloseIn, this.bytesRead)) {
// We want to differentiate between IOExceptions thrown by the repository and IOExceptions thrown from // We want to differentiate between IOExceptions thrown by the repository and IOExceptions thrown from
// Processor code. As a result, as have the FlowFileAccessInputStream that catches IOException from the repository // Processor code. As a result, as have the FlowFileAccessInputStream that catches IOException from the repository
@ -1807,7 +1810,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
try { try {
try (final OutputStream rawOut = contentRepo.write(newClaim); try (final OutputStream rawOut = contentRepo.write(newClaim);
final OutputStream out = new BufferedOutputStream(rawOut)) { final OutputStream out = new BufferedOutputStream(rawOut)) {
if (header != null && header.length > 0) { if (header != null && header.length > 0) {
out.write(header); out.write(header);
@ -1914,7 +1917,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
ensureNotAppending(newClaim); ensureNotAppending(newClaim);
try (final OutputStream disableOnClose = new DisableOnCloseOutputStream(currentWriteClaimStream); try (final OutputStream disableOnClose = new DisableOnCloseOutputStream(currentWriteClaimStream);
final OutputStream countingOut = new ByteCountingOutputStream(disableOnClose, writtenHolder)) { final OutputStream countingOut = new ByteCountingOutputStream(disableOnClose, writtenHolder)) {
recursionSet.add(source); recursionSet.add(source);
@ -1937,7 +1940,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
ensureNotAppending(newClaim); ensureNotAppending(newClaim);
try (final OutputStream stream = context.getContentRepository().write(newClaim); try (final OutputStream stream = context.getContentRepository().write(newClaim);
final OutputStream countingOut = new ByteCountingOutputStream(stream, writtenHolder)) { final OutputStream countingOut = new ByteCountingOutputStream(stream, writtenHolder)) {
recursionSet.add(source); recursionSet.add(source);
writeRecursionLevel++; writeRecursionLevel++;
@ -2173,11 +2176,11 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
ensureNotAppending(newClaim); ensureNotAppending(newClaim);
try (final InputStream rawIn = getInputStream(source, currClaim, record.getCurrentClaimOffset()); try (final InputStream rawIn = getInputStream(source, currClaim, record.getCurrentClaimOffset());
final InputStream limitedIn = new LimitedInputStream(rawIn, source.getSize()); final InputStream limitedIn = new LimitedInputStream(rawIn, source.getSize());
final InputStream disableOnCloseIn = new DisableOnCloseInputStream(limitedIn); final InputStream disableOnCloseIn = new DisableOnCloseInputStream(limitedIn);
final InputStream countingIn = new ByteCountingInputStream(disableOnCloseIn, bytesRead); final InputStream countingIn = new ByteCountingInputStream(disableOnCloseIn, bytesRead);
final OutputStream disableOnCloseOut = new DisableOnCloseOutputStream(currentWriteClaimStream); final OutputStream disableOnCloseOut = new DisableOnCloseOutputStream(currentWriteClaimStream);
final OutputStream countingOut = new ByteCountingOutputStream(disableOnCloseOut, writtenHolder)) { final OutputStream countingOut = new ByteCountingOutputStream(disableOnCloseOut, writtenHolder)) {
recursionSet.add(source); recursionSet.add(source);
@ -2216,10 +2219,10 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
ensureNotAppending(newClaim); ensureNotAppending(newClaim);
try (final InputStream is = getInputStream(source, currClaim, record.getCurrentClaimOffset()); try (final InputStream is = getInputStream(source, currClaim, record.getCurrentClaimOffset());
final InputStream limitedIn = new LimitedInputStream(is, source.getSize()); final InputStream limitedIn = new LimitedInputStream(is, source.getSize());
final InputStream countingIn = new ByteCountingInputStream(limitedIn, bytesRead); final InputStream countingIn = new ByteCountingInputStream(limitedIn, bytesRead);
final OutputStream os = context.getContentRepository().write(newClaim); final OutputStream os = context.getContentRepository().write(newClaim);
final OutputStream countingOut = new ByteCountingOutputStream(os, writtenHolder)) { final OutputStream countingOut = new ByteCountingOutputStream(os, writtenHolder)) {
recursionSet.add(source); recursionSet.add(source);
@ -2332,9 +2335,9 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
removeTemporaryClaim(record); removeTemporaryClaim(record);
final FlowFileRecord newFile = new StandardFlowFileRecord.Builder().fromFlowFile(record.getCurrent()) final FlowFileRecord newFile = new StandardFlowFileRecord.Builder().fromFlowFile(record.getCurrent())
.contentClaim(newClaim).contentClaimOffset(claimOffset).size(newSize) .contentClaim(newClaim).contentClaimOffset(claimOffset).size(newSize)
.addAttribute(CoreAttributes.FILENAME.key(), source.toFile().getName()) .addAttribute(CoreAttributes.FILENAME.key(), source.toFile().getName())
.build(); .build();
record.setWorking(newFile, CoreAttributes.FILENAME.key(), source.toFile().getName()); record.setWorking(newFile, CoreAttributes.FILENAME.key(), source.toFile().getName());
if (!keepSourceFile) { if (!keepSourceFile) {
deleteOnCommit.add(source); deleteOnCommit.add(source);
@ -2501,8 +2504,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
final String key = entry.getKey(); final String key = entry.getKey();
final String value = entry.getValue(); final String value = entry.getValue();
if (CoreAttributes.ALTERNATE_IDENTIFIER.key().equals(key) if (CoreAttributes.ALTERNATE_IDENTIFIER.key().equals(key)
|| CoreAttributes.DISCARD_REASON.key().equals(key) || CoreAttributes.DISCARD_REASON.key().equals(key)
|| CoreAttributes.UUID.key().equals(key)) { || CoreAttributes.UUID.key().equals(key)) {
continue; continue;
} }
newAttributes.put(key, value); newAttributes.put(key, value);
@ -2547,10 +2550,10 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
newAttributes.put(CoreAttributes.UUID.key(), UUID.randomUUID().toString()); newAttributes.put(CoreAttributes.UUID.key(), UUID.randomUUID().toString());
final FlowFileRecord fFile = new StandardFlowFileRecord.Builder().id(context.getNextFlowFileSequence()) final FlowFileRecord fFile = new StandardFlowFileRecord.Builder().id(context.getNextFlowFileSequence())
.addAttributes(newAttributes) .addAttributes(newAttributes)
.lineageIdentifiers(lineageIdentifiers) .lineageIdentifiers(lineageIdentifiers)
.lineageStartDate(lineageStartDate) .lineageStartDate(lineageStartDate)
.build(); .build();
final StandardRepositoryRecord record = new StandardRepositoryRecord(null); final StandardRepositoryRecord record = new StandardRepositoryRecord(null);
record.setWorking(fFile, newAttributes); record.setWorking(fFile, newAttributes);
@ -2584,18 +2587,18 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
final Map<String, String> firstMap = flowFileList.iterator().next().getAttributes(); final Map<String, String> firstMap = flowFileList.iterator().next().getAttributes();
outer: outer:
for (final Map.Entry<String, String> mapEntry : firstMap.entrySet()) { for (final Map.Entry<String, String> mapEntry : firstMap.entrySet()) {
final String key = mapEntry.getKey(); final String key = mapEntry.getKey();
final String value = mapEntry.getValue(); final String value = mapEntry.getValue();
for (final FlowFile flowFile : flowFileList) { for (final FlowFile flowFile : flowFileList) {
final Map<String, String> currMap = flowFile.getAttributes(); final Map<String, String> currMap = flowFile.getAttributes();
final String curVal = currMap.get(key); final String curVal = currMap.get(key);
if (curVal == null || !curVal.equals(value)) { if (curVal == null || !curVal.equals(value)) {
continue outer; continue outer;
}
} }
result.put(key, value);
} }
result.put(key, value);
}
return result; return result;
} }