NIFI-13779 Improved Provenance Lineage for Python FlowFileTransform (#9292)

- Transform input FlowFile instead of cloned FlowFile

Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
Bob Paulin 2024-09-26 12:39:19 -05:00 committed by GitHub
parent ee29562517
commit 34aa764132
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 31 additions and 17 deletions

View File

@ -27,6 +27,7 @@ import org.apache.nifi.processor.exception.ProcessException;
import py4j.Py4JNetworkException;
import java.util.Map;
import java.util.Optional;
import java.util.function.Supplier;
@InputRequirement(Requirement.INPUT_REQUIRED)
@ -40,22 +41,19 @@ public class FlowFileTransformProxy extends PythonProcessorProxy<FlowFileTransfo
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
FlowFile original = session.get();
if (original == null) {
FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}
FlowFile transformed = session.clone(original);
final FlowFileTransformResult result;
try (final StandardInputFlowFile inputFlowFile = new StandardInputFlowFile(session, original)) {
try (final StandardInputFlowFile inputFlowFile = new StandardInputFlowFile(session, flowFile)) {
result = getTransform().transformFlowFile(inputFlowFile);
} catch (final Py4JNetworkException e) {
throw new ProcessException("Failed to communicate with Python Process", e);
} catch (final Exception e) {
getLogger().error("Failed to transform {}", original, e);
session.remove(transformed);
session.transfer(original, REL_FAILURE);
getLogger().error("Failed to transform {}", flowFile, e);
session.transfer(flowFile, REL_FAILURE);
return;
}
@ -65,26 +63,37 @@ public class FlowFileTransformProxy extends PythonProcessorProxy<FlowFileTransfo
final Map<String, String> attributes = result.getAttributes();
if (REL_FAILURE.getName().equals(relationshipName)) {
session.remove(transformed);
if (attributes != null) {
original = session.putAllAttributes(original, attributes);
flowFile = session.putAllAttributes(flowFile, attributes);
}
session.transfer(original, REL_FAILURE);
session.transfer(flowFile, REL_FAILURE);
return;
}
//Clone before making modifications if needed for original relationship
final Optional<FlowFile> clone;
if (context.isAutoTerminated(REL_ORIGINAL)) {
clone = Optional.empty();
} else {
clone = Optional.of(session.clone(flowFile));
}
if (attributes != null) {
transformed = session.putAllAttributes(transformed, attributes);
flowFile = session.putAllAttributes(flowFile, attributes);
}
final byte[] contents = result.getContents();
if (contents != null) {
transformed = session.write(transformed, out -> out.write(contents));
flowFile = session.write(flowFile, out -> out.write(contents));
}
session.transfer(flowFile, relationship);
if (clone.isPresent()) {
session.transfer(clone.get(), REL_ORIGINAL);
}
session.transfer(transformed, relationship);
session.transfer(original, REL_ORIGINAL);
} finally {
result.free();
}

View File

@ -21,6 +21,7 @@ import org.apache.nifi.components.AsyncLoadedProcessor;
import org.apache.nifi.components.AsyncLoadedProcessor.LoadState;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.json.JsonRecordSetWriter;
import org.apache.nifi.json.JsonTreeReader;
import org.apache.nifi.processor.Relationship;
@ -513,14 +514,18 @@ public class PythonControllerInteractionIT {
assertTrue(relationships.stream().anyMatch(rel -> rel.getName().equals("original")));
assertTrue(relationships.stream().anyMatch(rel -> rel.getName().equals("failure")));
runner.enqueue(new byte[25]);
runner.enqueue(new byte[75 * 1024]);
final MockFlowFile smallInputFlowFile = runner.enqueue(new byte[25]);
final MockFlowFile largeInputFlowFile = runner.enqueue(new byte[75 * 1024]);
runner.run(2);
runner.assertTransferCount("original", 2);
runner.assertTransferCount("small", 1);
runner.assertTransferCount("large", 1);
runner.assertTransferCount("failure", 0);
final FlowFile largeOutputFlowFile = runner.getFlowFilesForRelationship("large").getFirst();
assertEquals(largeInputFlowFile.getId(), largeOutputFlowFile.getId(), "Large Transformed Flow File should be the same as inbound");
final FlowFile smallOutputFlowFile = runner.getFlowFilesForRelationship("small").getFirst();
assertEquals(smallInputFlowFile.getId(), smallOutputFlowFile.getId(), "Small Transformed Flow File should be the same as inbound");
}
@Test