From 9fbd88f034e6d5006b0f9cc7c4ffcd09c7c65a01 Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Thu, 4 Jun 2015 12:42:56 -0400 Subject: [PATCH] NIFI-647: When FORK Event emitted by processor, Framework generates CREATE events for each child in addition to the FORK event; this commit fixes that bug --- .../repository/StandardProcessSession.java | 173 +++++++++--------- 1 file changed, 88 insertions(+), 85 deletions(-) diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java index 2b05b477a1..a048d21b91 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java @@ -93,7 +93,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE // determines how many things must be transferred, removed, modified in order to avoid logging the FlowFile ID's on commit/rollback public static final int VERBOSE_LOG_THRESHOLD = 10; private static final long MAX_APPENDABLE_CLAIM_SIZE = DataUnit.parseDataSize( - NiFiProperties.getInstance().getMaxAppendableClaimSize(), DataUnit.B).longValue(); + NiFiProperties.getInstance().getMaxAppendableClaimSize(), DataUnit.B).longValue(); private static final int MAX_FLOWFILES_PER_CLAIM = NiFiProperties.getInstance().getMaxFlowFilesPerClaim(); public static final String DEFAULT_FLOWFILE_PATH = "./"; @@ -154,28 +154,28 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE String description = connectable.toString(); switch (connectable.getConnectableType()) { - case PROCESSOR: - final ProcessorNode procNode = (ProcessorNode) connectable; - componentType = procNode.getProcessor().getClass().getSimpleName(); - description = procNode.getProcessor().toString(); - break; - case INPUT_PORT: - componentType = "Input Port"; - break; - case OUTPUT_PORT: - componentType = "Output Port"; - break; - case REMOTE_INPUT_PORT: - componentType = "Remote Input Port"; - break; - case REMOTE_OUTPUT_PORT: - componentType = "Remote Output Port"; - break; - case FUNNEL: - componentType = "Funnel"; - break; - default: - throw new AssertionError("Connectable type is " + connectable.getConnectableType()); + case PROCESSOR: + final ProcessorNode procNode = (ProcessorNode) connectable; + componentType = procNode.getProcessor().getClass().getSimpleName(); + description = procNode.getProcessor().toString(); + break; + case INPUT_PORT: + componentType = "Input Port"; + break; + case OUTPUT_PORT: + componentType = "Output Port"; + break; + case REMOTE_INPUT_PORT: + componentType = "Remote Input Port"; + break; + case REMOTE_OUTPUT_PORT: + componentType = "Remote Output Port"; + break; + case FUNNEL: + componentType = "Funnel"; + break; + default: + throw new AssertionError("Connectable type is " + connectable.getConnectableType()); } this.provenanceReporter = new StandardProvenanceReporter(connectable.getIdentifier(), componentType, context.getProvenanceRepository(), this); @@ -437,16 +437,16 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE } switch (event.getEventType()) { - case SEND: - flowFilesSent++; - bytesSent += event.getFileSize(); - break; - case RECEIVE: - flowFilesReceived++; - bytesReceived += event.getFileSize(); - break; - default: - break; + case SEND: + flowFilesSent++; + bytesSent += event.getFileSize(); + break; + case RECEIVE: + flowFilesReceived++; + bytesReceived += event.getFileSize(); + break; + default: + break; } } @@ -519,9 +519,13 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE updateEventContentClaims(builder, flowFile, checkpoint.records.get(flowFile)); final ProvenanceEventRecord event = builder.build(); - if (!event.getChildUuids().isEmpty() && !isSpuriousForkEvent(event, checkpoint.removedFlowFiles) && !processorGenerated.contains(event)) { - recordsToSubmit.add(event); + if (!event.getChildUuids().isEmpty() && !isSpuriousForkEvent(event, checkpoint.removedFlowFiles)) { + // If framework generated the event, add it to the 'recordsToSubmit' Set. + if (!processorGenerated.contains(event)) { + recordsToSubmit.add(event); + } + // Register the FORK event for each child and each parent. for (final String childUuid : event.getChildUuids()) { addEventType(eventTypesPerFlowFileId, childUuid, event.getEventType()); } @@ -536,13 +540,12 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE if (isSpuriousForkEvent(event, checkpoint.removedFlowFiles)) { continue; } - if (isSpuriousRouteEvent(event, checkpoint.records)) { - continue; - } // Check if the event indicates that the FlowFile was routed to the same // connection from which it was pulled (and only this connection). If so, discard the event. - isSpuriousRouteEvent(event, checkpoint.records); + if (isSpuriousRouteEvent(event, checkpoint.records)) { + continue; + } recordsToSubmit.add(event); addEventType(eventTypesPerFlowFileId, event.getFlowFileUuid(), event.getEventType()); @@ -596,9 +599,9 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE boolean creationEventRegistered = false; if (registeredTypes != null) { if (registeredTypes.contains(ProvenanceEventType.CREATE) - || registeredTypes.contains(ProvenanceEventType.FORK) - || registeredTypes.contains(ProvenanceEventType.JOIN) - || registeredTypes.contains(ProvenanceEventType.RECEIVE)) { + || registeredTypes.contains(ProvenanceEventType.FORK) + || registeredTypes.contains(ProvenanceEventType.JOIN) + || registeredTypes.contains(ProvenanceEventType.RECEIVE)) { creationEventRegistered = true; } } @@ -687,11 +690,11 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE builder.setCurrentContentClaim(null, null, null, null, 0L); } else { builder.setCurrentContentClaim( - originalClaim.getContainer(), originalClaim.getSection(), originalClaim.getId(), repoRecord.getOriginal().getContentClaimOffset(), repoRecord.getOriginal().getSize() - ); + originalClaim.getContainer(), originalClaim.getSection(), originalClaim.getId(), repoRecord.getOriginal().getContentClaimOffset(), repoRecord.getOriginal().getSize() + ); builder.setPreviousContentClaim( - originalClaim.getContainer(), originalClaim.getSection(), originalClaim.getId(), repoRecord.getOriginal().getContentClaimOffset(), repoRecord.getOriginal().getSize() - ); + originalClaim.getContainer(), originalClaim.getSection(), originalClaim.getId(), repoRecord.getOriginal().getContentClaimOffset(), repoRecord.getOriginal().getSize() + ); } } @@ -727,7 +730,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE } private StandardProvenanceEventRecord enrich( - final ProvenanceEventRecord rawEvent, final Map flowFileRecordMap, final Map records, final boolean updateAttributes) { + final ProvenanceEventRecord rawEvent, final Map flowFileRecordMap, final Map records, final boolean updateAttributes) { final StandardProvenanceEventRecord.Builder recordBuilder = new StandardProvenanceEventRecord.Builder().fromEvent(rawEvent); final FlowFileRecord eventFlowFile = flowFileRecordMap.get(rawEvent.getFlowFileUuid()); if (eventFlowFile != null) { @@ -996,7 +999,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE final StringBuilder sb = new StringBuilder(512); if (!LOG.isDebugEnabled() && (largestTransferSetSize > VERBOSE_LOG_THRESHOLD - || numModified > VERBOSE_LOG_THRESHOLD || numCreated > VERBOSE_LOG_THRESHOLD || numRemoved > VERBOSE_LOG_THRESHOLD)) { + || numModified > VERBOSE_LOG_THRESHOLD || numCreated > VERBOSE_LOG_THRESHOLD || numRemoved > VERBOSE_LOG_THRESHOLD)) { if (numCreated > 0) { sb.append("created ").append(numCreated).append(" FlowFiles, "); } @@ -1244,8 +1247,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE attrs.put(CoreAttributes.UUID.key(), UUID.randomUUID().toString()); final FlowFileRecord fFile = new StandardFlowFileRecord.Builder().id(context.getNextFlowFileSequence()) - .addAttributes(attrs) - .build(); + .addAttributes(attrs) + .build(); final StandardRepositoryRecord record = new StandardRepositoryRecord(null); record.setWorking(fFile, attrs); records.put(fFile, record); @@ -1607,7 +1610,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE } final StandardProvenanceReporter expiredReporter = new StandardProvenanceReporter(connectable.getIdentifier(), - processorType, context.getProvenanceRepository(), this); + processorType, context.getProvenanceRepository(), this); final Map recordIdMap = new HashMap<>(); for (final FlowFileRecord flowFile : flowFiles) { @@ -1734,9 +1737,9 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE } try (final InputStream rawIn = getInputStream(source, record.getCurrentClaim(), record.getCurrentClaimOffset()); - final InputStream limitedIn = new LimitedInputStream(rawIn, source.getSize()); - final InputStream disableOnCloseIn = new DisableOnCloseInputStream(limitedIn); - final ByteCountingInputStream countingStream = new ByteCountingInputStream(disableOnCloseIn, this.bytesRead)) { + final InputStream limitedIn = new LimitedInputStream(rawIn, source.getSize()); + final InputStream disableOnCloseIn = new DisableOnCloseInputStream(limitedIn); + final ByteCountingInputStream countingStream = new ByteCountingInputStream(disableOnCloseIn, this.bytesRead)) { // We want to differentiate between IOExceptions thrown by the repository and IOExceptions thrown from // Processor code. As a result, as have the FlowFileAccessInputStream that catches IOException from the repository @@ -1807,7 +1810,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE try { try (final OutputStream rawOut = contentRepo.write(newClaim); - final OutputStream out = new BufferedOutputStream(rawOut)) { + final OutputStream out = new BufferedOutputStream(rawOut)) { if (header != null && header.length > 0) { out.write(header); @@ -1914,7 +1917,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE ensureNotAppending(newClaim); try (final OutputStream disableOnClose = new DisableOnCloseOutputStream(currentWriteClaimStream); - final OutputStream countingOut = new ByteCountingOutputStream(disableOnClose, writtenHolder)) { + final OutputStream countingOut = new ByteCountingOutputStream(disableOnClose, writtenHolder)) { recursionSet.add(source); @@ -1937,7 +1940,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE ensureNotAppending(newClaim); try (final OutputStream stream = context.getContentRepository().write(newClaim); - final OutputStream countingOut = new ByteCountingOutputStream(stream, writtenHolder)) { + final OutputStream countingOut = new ByteCountingOutputStream(stream, writtenHolder)) { recursionSet.add(source); writeRecursionLevel++; @@ -2173,11 +2176,11 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE ensureNotAppending(newClaim); try (final InputStream rawIn = getInputStream(source, currClaim, record.getCurrentClaimOffset()); - final InputStream limitedIn = new LimitedInputStream(rawIn, source.getSize()); - final InputStream disableOnCloseIn = new DisableOnCloseInputStream(limitedIn); - final InputStream countingIn = new ByteCountingInputStream(disableOnCloseIn, bytesRead); - final OutputStream disableOnCloseOut = new DisableOnCloseOutputStream(currentWriteClaimStream); - final OutputStream countingOut = new ByteCountingOutputStream(disableOnCloseOut, writtenHolder)) { + final InputStream limitedIn = new LimitedInputStream(rawIn, source.getSize()); + final InputStream disableOnCloseIn = new DisableOnCloseInputStream(limitedIn); + final InputStream countingIn = new ByteCountingInputStream(disableOnCloseIn, bytesRead); + final OutputStream disableOnCloseOut = new DisableOnCloseOutputStream(currentWriteClaimStream); + final OutputStream countingOut = new ByteCountingOutputStream(disableOnCloseOut, writtenHolder)) { recursionSet.add(source); @@ -2216,10 +2219,10 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE ensureNotAppending(newClaim); try (final InputStream is = getInputStream(source, currClaim, record.getCurrentClaimOffset()); - final InputStream limitedIn = new LimitedInputStream(is, source.getSize()); - final InputStream countingIn = new ByteCountingInputStream(limitedIn, bytesRead); - final OutputStream os = context.getContentRepository().write(newClaim); - final OutputStream countingOut = new ByteCountingOutputStream(os, writtenHolder)) { + final InputStream limitedIn = new LimitedInputStream(is, source.getSize()); + final InputStream countingIn = new ByteCountingInputStream(limitedIn, bytesRead); + final OutputStream os = context.getContentRepository().write(newClaim); + final OutputStream countingOut = new ByteCountingOutputStream(os, writtenHolder)) { recursionSet.add(source); @@ -2332,9 +2335,9 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE removeTemporaryClaim(record); final FlowFileRecord newFile = new StandardFlowFileRecord.Builder().fromFlowFile(record.getCurrent()) - .contentClaim(newClaim).contentClaimOffset(claimOffset).size(newSize) - .addAttribute(CoreAttributes.FILENAME.key(), source.toFile().getName()) - .build(); + .contentClaim(newClaim).contentClaimOffset(claimOffset).size(newSize) + .addAttribute(CoreAttributes.FILENAME.key(), source.toFile().getName()) + .build(); record.setWorking(newFile, CoreAttributes.FILENAME.key(), source.toFile().getName()); if (!keepSourceFile) { deleteOnCommit.add(source); @@ -2501,8 +2504,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE final String key = entry.getKey(); final String value = entry.getValue(); if (CoreAttributes.ALTERNATE_IDENTIFIER.key().equals(key) - || CoreAttributes.DISCARD_REASON.key().equals(key) - || CoreAttributes.UUID.key().equals(key)) { + || CoreAttributes.DISCARD_REASON.key().equals(key) + || CoreAttributes.UUID.key().equals(key)) { continue; } newAttributes.put(key, value); @@ -2547,10 +2550,10 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE newAttributes.put(CoreAttributes.UUID.key(), UUID.randomUUID().toString()); final FlowFileRecord fFile = new StandardFlowFileRecord.Builder().id(context.getNextFlowFileSequence()) - .addAttributes(newAttributes) - .lineageIdentifiers(lineageIdentifiers) - .lineageStartDate(lineageStartDate) - .build(); + .addAttributes(newAttributes) + .lineageIdentifiers(lineageIdentifiers) + .lineageStartDate(lineageStartDate) + .build(); final StandardRepositoryRecord record = new StandardRepositoryRecord(null); record.setWorking(fFile, newAttributes); @@ -2584,18 +2587,18 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE final Map firstMap = flowFileList.iterator().next().getAttributes(); outer: - for (final Map.Entry mapEntry : firstMap.entrySet()) { - final String key = mapEntry.getKey(); - final String value = mapEntry.getValue(); - for (final FlowFile flowFile : flowFileList) { - final Map currMap = flowFile.getAttributes(); - final String curVal = currMap.get(key); - if (curVal == null || !curVal.equals(value)) { - continue outer; + for (final Map.Entry mapEntry : firstMap.entrySet()) { + final String key = mapEntry.getKey(); + final String value = mapEntry.getValue(); + for (final FlowFile flowFile : flowFileList) { + final Map currMap = flowFile.getAttributes(); + final String curVal = currMap.get(key); + if (curVal == null || !curVal.equals(value)) { + continue outer; + } } + result.put(key, value); } - result.put(key, value); - } return result; }