diff --git a/nifi-api/src/main/java/org/apache/nifi/provenance/ProvenanceReporter.java b/nifi-api/src/main/java/org/apache/nifi/provenance/ProvenanceReporter.java index 442f1309c1..eeffc7fe93 100644 --- a/nifi-api/src/main/java/org/apache/nifi/provenance/ProvenanceReporter.java +++ b/nifi-api/src/main/java/org/apache/nifi/provenance/ProvenanceReporter.java @@ -675,4 +675,34 @@ public interface ProvenanceReporter { * @param details any relevant details about the CREATE event */ void create(FlowFile flowFile, String details); + + /** + * @return the number of FlowFiles for which there was a RECEIVE event + */ + int getFlowFilesReceived(); + + /** + * @return the sum of the sizes of all FlowFiles for which there was a RECEIVE event + */ + long getBytesReceived(); + + /** + * @return the number of FlowFiles for which there was a FETCH event + */ + int getFlowFilesFetched(); + + /** + * @return the sum of the sizes of all FlowFiles for which there was a FETCH event + */ + long getBytesFetched(); + + /** + * @return the number of FlowFiles for which there was a SEND event + */ + int getFlowFilesSent(); + + /** + * @return the sum of the sizes of all FlowFiles for which there was a SEND event + */ + long getBytesSent(); } diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/MockProvenanceReporter.java b/nifi-mock/src/main/java/org/apache/nifi/util/MockProvenanceReporter.java index 55e3a81457..ae808a4fe6 100644 --- a/nifi-mock/src/main/java/org/apache/nifi/util/MockProvenanceReporter.java +++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockProvenanceReporter.java @@ -503,6 +503,36 @@ public class MockProvenanceReporter implements ProvenanceReporter { } } + @Override + public int getFlowFilesReceived() { + return 0; + } + + @Override + public long getBytesReceived() { + return 0; + } + + @Override + public int getFlowFilesFetched() { + return 0; + } + + @Override + public long getBytesFetched() { + return 0; + } + + @Override + public int getFlowFilesSent() { + return 0; + } + + @Override + public long getBytesSent() { + return 0; + } + ProvenanceEventBuilder build(final FlowFile flowFile, final ProvenanceEventType eventType) { final ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder(); builder.setEventType(eventType); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java index ba49b27dfb..7891fc1ba7 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java @@ -485,31 +485,6 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE } private void updateEventRepository(final Checkpoint checkpoint) { - int flowFilesReceived = 0; - int flowFilesSent = 0; - long bytesReceived = 0L; - long bytesSent = 0L; - - for (final ProvenanceEventRecord event : checkpoint.reportedEvents) { - if (isSpuriousForkEvent(event, checkpoint.removedFlowFiles)) { - continue; - } - - switch (event.getEventType()) { - case SEND: - flowFilesSent++; - bytesSent += event.getFileSize(); - break; - case RECEIVE: - case FETCH: - flowFilesReceived++; - bytesReceived += event.getFileSize(); - break; - default: - break; - } - } - try { // update event repository final Connectable connectable = context.getConnectable(); @@ -522,10 +497,10 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE flowFileEvent.setFlowFilesIn(checkpoint.flowFilesIn); flowFileEvent.setFlowFilesOut(checkpoint.flowFilesOut); flowFileEvent.setFlowFilesRemoved(checkpoint.removedCount); - flowFileEvent.setFlowFilesReceived(flowFilesReceived); - flowFileEvent.setBytesReceived(bytesReceived); - flowFileEvent.setFlowFilesSent(flowFilesSent); - flowFileEvent.setBytesSent(bytesSent); + flowFileEvent.setFlowFilesReceived(checkpoint.flowFilesReceived); + flowFileEvent.setBytesReceived(checkpoint.bytesReceived); + flowFileEvent.setFlowFilesSent(checkpoint.flowFilesSent); + flowFileEvent.setBytesSent(checkpoint.bytesSent); final long now = System.currentTimeMillis(); long lineageMillis = 0L; @@ -3387,6 +3362,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE private long bytesWritten = 0L; private int flowFilesIn = 0, flowFilesOut = 0; private long contentSizeIn = 0L, contentSizeOut = 0L; + private int flowFilesReceived = 0, flowFilesSent = 0; + private long bytesReceived = 0L, bytesSent = 0L; private void checkpoint(final StandardProcessSession session, final List autoTerminatedEvents) { this.processingTime += System.nanoTime() - session.processingStartTime; @@ -3416,6 +3393,10 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE this.flowFilesOut += session.flowFilesOut; this.contentSizeIn += session.contentSizeIn; this.contentSizeOut += session.contentSizeOut; + this.flowFilesReceived += session.provenanceReporter.getFlowFilesReceived() + session.provenanceReporter.getFlowFilesFetched(); + this.bytesReceived += session.provenanceReporter.getBytesReceived() + session.provenanceReporter.getBytesFetched(); + this.flowFilesSent += session.provenanceReporter.getFlowFilesSent(); + this.bytesSent += session.provenanceReporter.getBytesSent(); } private void mergeMaps(final Map destination, final Map toMerge, final BiFunction merger) { diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProvenanceReporter.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProvenanceReporter.java index e956d5062f..cbe1c660d4 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProvenanceReporter.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProvenanceReporter.java @@ -34,7 +34,6 @@ import org.slf4j.LoggerFactory; public class StandardProvenanceReporter implements ProvenanceReporter { - public static final String NIFI_NAMESPACE = "nifi"; private final Logger logger = LoggerFactory.getLogger(StandardProvenanceReporter.class); private final String processorId; private final String processorType; @@ -42,6 +41,12 @@ public class StandardProvenanceReporter implements ProvenanceReporter { private final ProvenanceEventRepository repository; private final ProvenanceEventEnricher eventEnricher; private final StandardProcessSession session; + private long bytesSent = 0L; + private long bytesReceived = 0L; + private int flowFilesSent = 0; + private int flowFilesReceived = 0; + private int flowFilesFetched = 0; + private long bytesFetched = 0L; public StandardProvenanceReporter(final StandardProcessSession session, final String processorId, final String processorType, final ProvenanceEventRepository repository, final ProvenanceEventEnricher enricher) { @@ -138,6 +143,9 @@ public class StandardProvenanceReporter implements ProvenanceReporter { final ProvenanceEventRecord record = build(flowFile, ProvenanceEventType.RECEIVE) .setTransitUri(transitUri).setSourceSystemFlowFileIdentifier(sourceSystemFlowFileIdentifier).setEventDuration(transmissionMillis).setDetails(details).build(); events.add(record); + + bytesReceived += flowFile.getSize(); + flowFilesReceived++; } catch (final Exception e) { logger.error("Failed to generate Provenance Event due to " + e); if (logger.isDebugEnabled()) { @@ -166,7 +174,11 @@ public class StandardProvenanceReporter implements ProvenanceReporter { .setEventDuration(transmissionMillis) .setDetails(details) .build(); + events.add(record); + + bytesFetched += flowFile.getSize(); + flowFilesFetched++; } catch (final Exception e) { logger.error("Failed to generate Provenance Event due to " + e); if (logger.isDebugEnabled()) { @@ -218,6 +230,9 @@ public class StandardProvenanceReporter implements ProvenanceReporter { } else { events.add(enriched); } + + bytesSent += flowFile.getSize(); + flowFilesSent++; } catch (final Exception e) { logger.error("Failed to generate Provenance Event due to " + e); if (logger.isDebugEnabled()) { @@ -519,4 +534,34 @@ public class StandardProvenanceReporter implements ProvenanceReporter { builder.setComponentType(processorType); return builder; } + + @Override + public int getFlowFilesSent() { + return flowFilesSent; + } + + @Override + public long getBytesSent() { + return bytesSent; + } + + @Override + public int getFlowFilesReceived() { + return flowFilesReceived; + } + + @Override + public long getBytesReceived() { + return bytesReceived; + } + + @Override + public int getFlowFilesFetched() { + return flowFilesFetched; + } + + @Override + public long getBytesFetched() { + return bytesFetched; + } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-stateless/src/main/java/org/apache/nifi/stateless/core/ProvenanceCollector.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-stateless/src/main/java/org/apache/nifi/stateless/core/ProvenanceCollector.java index c946eceabc..ac8ee5d17a 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-stateless/src/main/java/org/apache/nifi/stateless/core/ProvenanceCollector.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-stateless/src/main/java/org/apache/nifi/stateless/core/ProvenanceCollector.java @@ -37,6 +37,13 @@ public class ProvenanceCollector implements ProvenanceReporter { private final String processorId; private final String processorType; private final Collection events; + private long bytesSent = 0L; + private long bytesReceived = 0L; + private int flowFilesSent = 0; + private int flowFilesReceived = 0; + private int flowFilesFetched = 0; + private long bytesFetched = 0L; + public ProvenanceCollector(final StatelessProcessSession session, final Collection events, final String processorId, final String processorType) { this.session = session; @@ -134,6 +141,9 @@ public class ProvenanceCollector implements ProvenanceReporter { .setDetails(details) .build(); events.add(record); + + flowFilesReceived++; + bytesReceived += flowFile.getSize(); } catch (final Exception e) { logger.error("Failed to generate Provenance Event due to " + e); if (logger.isDebugEnabled()) { @@ -163,6 +173,9 @@ public class ProvenanceCollector implements ProvenanceReporter { .setDetails(details) .build(); events.add(record); + + flowFilesFetched++; + bytesFetched += flowFile.getSize(); } catch (final Exception e) { logger.error("Failed to generate Provenance Event due to " + e); if (logger.isDebugEnabled()) { @@ -205,12 +218,9 @@ public class ProvenanceCollector implements ProvenanceReporter { public void send(final FlowFile flowFile, final String transitUri, final String details, final long transmissionMillis, final boolean force) { try { final ProvenanceEventRecord record = build(flowFile, ProvenanceEventType.SEND).setTransitUri(transitUri).setEventDuration(transmissionMillis).setDetails(details).build(); - if (force) { - //sharedSessionState.addProvenanceEvents(Collections.singleton(record)); - events.add(record); - } else { - events.add(record); - } + events.add(record); + flowFilesSent++; + bytesSent += flowFile.getSize(); } catch (final Exception e) { logger.error("Failed to generate Provenance Event due to " + e); if (logger.isDebugEnabled()) { @@ -497,6 +507,36 @@ public class ProvenanceCollector implements ProvenanceReporter { } } + @Override + public int getFlowFilesReceived() { + return flowFilesReceived; + } + + @Override + public long getBytesReceived() { + return bytesReceived; + } + + @Override + public int getFlowFilesFetched() { + return flowFilesFetched; + } + + @Override + public long getBytesFetched() { + return bytesFetched; + } + + @Override + public int getFlowFilesSent() { + return flowFilesSent; + } + + @Override + public long getBytesSent() { + return bytesSent; + } + ProvenanceEventBuilder build(final FlowFile flowFile, final ProvenanceEventType eventType) { final ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder(); builder.setEventType(eventType);