From 34aa7641321c2cb3c23c8bf5123ddf7824069950 Mon Sep 17 00:00:00 2001 From: Bob Paulin Date: Thu, 26 Sep 2024 12:39:19 -0500 Subject: [PATCH] NIFI-13779 Improved Provenance Lineage for Python FlowFileTransform (#9292) - Transform input FlowFile instead of cloned FlowFile Signed-off-by: David Handermann --- .../processor/FlowFileTransformProxy.java | 39 ++++++++++++------- .../PythonControllerInteractionIT.java | 9 ++++- 2 files changed, 31 insertions(+), 17 deletions(-) diff --git a/nifi-extension-bundles/nifi-py4j-extension-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/python/processor/FlowFileTransformProxy.java b/nifi-extension-bundles/nifi-py4j-extension-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/python/processor/FlowFileTransformProxy.java index 4d654d7c4e..f553bfafa4 100644 --- a/nifi-extension-bundles/nifi-py4j-extension-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/python/processor/FlowFileTransformProxy.java +++ b/nifi-extension-bundles/nifi-py4j-extension-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/python/processor/FlowFileTransformProxy.java @@ -27,6 +27,7 @@ import org.apache.nifi.processor.exception.ProcessException; import py4j.Py4JNetworkException; import java.util.Map; +import java.util.Optional; import java.util.function.Supplier; @InputRequirement(Requirement.INPUT_REQUIRED) @@ -40,22 +41,19 @@ public class FlowFileTransformProxy extends PythonProcessorProxy attributes = result.getAttributes(); if (REL_FAILURE.getName().equals(relationshipName)) { - session.remove(transformed); if (attributes != null) { - original = session.putAllAttributes(original, attributes); + flowFile = session.putAllAttributes(flowFile, attributes); } - session.transfer(original, REL_FAILURE); + session.transfer(flowFile, REL_FAILURE); return; } + //Clone before making modifications if needed for original relationship + final Optional clone; + if (context.isAutoTerminated(REL_ORIGINAL)) { + clone = Optional.empty(); + } else { + clone = Optional.of(session.clone(flowFile)); + } + if (attributes != null) { - transformed = session.putAllAttributes(transformed, attributes); + flowFile = session.putAllAttributes(flowFile, attributes); } final byte[] contents = result.getContents(); if (contents != null) { - transformed = session.write(transformed, out -> out.write(contents)); + flowFile = session.write(flowFile, out -> out.write(contents)); + } + + session.transfer(flowFile, relationship); + + if (clone.isPresent()) { + session.transfer(clone.get(), REL_ORIGINAL); } - session.transfer(transformed, relationship); - session.transfer(original, REL_ORIGINAL); } finally { result.free(); } diff --git a/nifi-extension-bundles/nifi-py4j-extension-bundle/nifi-py4j-integration-tests/src/test/java/org.apache.nifi.py4j/PythonControllerInteractionIT.java b/nifi-extension-bundles/nifi-py4j-extension-bundle/nifi-py4j-integration-tests/src/test/java/org.apache.nifi.py4j/PythonControllerInteractionIT.java index 39a1bb91eb..fd121397ba 100644 --- a/nifi-extension-bundles/nifi-py4j-extension-bundle/nifi-py4j-integration-tests/src/test/java/org.apache.nifi.py4j/PythonControllerInteractionIT.java +++ b/nifi-extension-bundles/nifi-py4j-extension-bundle/nifi-py4j-integration-tests/src/test/java/org.apache.nifi.py4j/PythonControllerInteractionIT.java @@ -21,6 +21,7 @@ import org.apache.nifi.components.AsyncLoadedProcessor; import org.apache.nifi.components.AsyncLoadedProcessor.LoadState; import org.apache.nifi.controller.AbstractControllerService; import org.apache.nifi.controller.ControllerService; +import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.json.JsonRecordSetWriter; import org.apache.nifi.json.JsonTreeReader; import org.apache.nifi.processor.Relationship; @@ -513,14 +514,18 @@ public class PythonControllerInteractionIT { assertTrue(relationships.stream().anyMatch(rel -> rel.getName().equals("original"))); assertTrue(relationships.stream().anyMatch(rel -> rel.getName().equals("failure"))); - runner.enqueue(new byte[25]); - runner.enqueue(new byte[75 * 1024]); + final MockFlowFile smallInputFlowFile = runner.enqueue(new byte[25]); + final MockFlowFile largeInputFlowFile = runner.enqueue(new byte[75 * 1024]); runner.run(2); runner.assertTransferCount("original", 2); runner.assertTransferCount("small", 1); runner.assertTransferCount("large", 1); runner.assertTransferCount("failure", 0); + final FlowFile largeOutputFlowFile = runner.getFlowFilesForRelationship("large").getFirst(); + assertEquals(largeInputFlowFile.getId(), largeOutputFlowFile.getId(), "Large Transformed Flow File should be the same as inbound"); + final FlowFile smallOutputFlowFile = runner.getFlowFilesForRelationship("small").getFirst(); + assertEquals(smallInputFlowFile.getId(), smallOutputFlowFile.getId(), "Small Transformed Flow File should be the same as inbound"); } @Test