From b8d7b9c67722e465e9ac43facb148a4462051373 Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Wed, 7 Feb 2024 19:05:15 -0500 Subject: [PATCH] NIFI-12757: Issue GC commands to Python for FlowFileTransformResults and RecordTransformResults when no longer needed on Java side This closes #8375 Signed-off-by: David Handermann --- .../nifi/py4j/client/NiFiPythonGateway.java | 15 +++++++ .../client/PythonProxyInvocationHandler.java | 9 +++++ .../processor/FlowFileTransformProxy.java | 39 +++++++++++-------- .../processor/FlowFileTransformResult.java | 4 +- .../processor/RecordTransformProxy.java | 20 +++++++--- .../processor/RecordTransformResult.java | 3 +- .../nifi/python/PythonObjectProxy.java} | 12 ++---- .../nifi/python/PythonProcessorDetails.java | 2 +- .../MultiProcessorUseCaseDetails.java | 4 +- .../ProcessorConfigurationDetails.java | 4 +- .../documentation/PropertyDescription.java | 4 +- .../documentation/UseCaseDetails.java | 4 +- 12 files changed, 81 insertions(+), 39 deletions(-) rename nifi-nar-bundles/nifi-py4j-bundle/{nifi-py4j-bridge/src/main/java/org/apache/nifi/python/processor/OutputRecord.java => nifi-python-framework-api/src/main/java/org/apache/nifi/python/PythonObjectProxy.java} (78%) diff --git a/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/py4j/client/NiFiPythonGateway.java b/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/py4j/client/NiFiPythonGateway.java index e2c5da9b85..4051fac853 100644 --- a/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/py4j/client/NiFiPythonGateway.java +++ b/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/py4j/client/NiFiPythonGateway.java @@ -27,6 +27,7 @@ import java.util.Stack; import java.util.concurrent.ConcurrentHashMap; import org.apache.nifi.python.PythonController; +import org.apache.nifi.python.PythonObjectProxy; import org.apache.nifi.python.processor.PreserveJavaBinding; import org.apache.nifi.python.processor.PythonProcessor; import org.slf4j.Logger; @@ -142,9 +143,23 @@ public class NiFiPythonGateway extends Gateway { protected PythonProxyHandler createPythonProxyHandler(final String id) { logger.debug("Creating Python Proxy Handler for ID {}", id); final PythonProxyInvocationHandler createdHandler = new PythonProxyInvocationHandler(this, id); + + Method proxyFreeMethod; + try { + proxyFreeMethod = PythonObjectProxy.class.getMethod("free"); + } catch (final NoSuchMethodException ignored) { + proxyFreeMethod = null; + } + + final Method freeMethod = proxyFreeMethod; return new PythonProxyHandler(id, this) { @Override public Object invoke(final Object proxy, final Method method, final Object[] args) throws Throwable { + if (Objects.equals(freeMethod, method)) { + createdHandler.free(); + return null; + } + return createdHandler.invoke(proxy, method, args); } diff --git a/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/py4j/client/PythonProxyInvocationHandler.java b/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/py4j/client/PythonProxyInvocationHandler.java index 3a13d0fcad..edba8c60ca 100644 --- a/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/py4j/client/PythonProxyInvocationHandler.java +++ b/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/py4j/client/PythonProxyInvocationHandler.java @@ -37,11 +37,20 @@ public class PythonProxyInvocationHandler implements InvocationHandler { private final String objectId; private final NiFiPythonGateway gateway; private final JavaObjectBindings bindings; + private final String gcCommand; public PythonProxyInvocationHandler(final NiFiPythonGateway gateway, final String objectId) { this.objectId = objectId; this.gateway = gateway; this.bindings = gateway.getObjectBindings(); + this.gcCommand = "g\n" + objectId + "\ne\n"; + } + + public void free() { + if (this.objectId != Protocol.ENTRY_POINT_OBJECT_ID) { + logger.debug("Issuing GC command to python for proxy id {}", this.objectId); + gateway.getCallbackClient().sendCommand(gcCommand, false); + } } @Override diff --git a/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/python/processor/FlowFileTransformProxy.java b/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/python/processor/FlowFileTransformProxy.java index 06f54472a0..2c692cf5e5 100644 --- a/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/python/processor/FlowFileTransformProxy.java +++ b/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/python/processor/FlowFileTransformProxy.java @@ -72,26 +72,31 @@ public class FlowFileTransformProxy extends PythonProcessorProxy { session.transfer(original, REL_FAILURE); return; } - final String relationshipName = result.getRelationship(); - final Relationship relationship = new Relationship.Builder().name(relationshipName).build(); - if (REL_FAILURE.getName().equals(relationshipName)) { - session.remove(transformed); - session.transfer(original, REL_FAILURE); - return; - } - final Map attributes = result.getAttributes(); - if (attributes != null) { - transformed = session.putAllAttributes(transformed, attributes); - } + try { + final String relationshipName = result.getRelationship(); + final Relationship relationship = new Relationship.Builder().name(relationshipName).build(); + if (REL_FAILURE.getName().equals(relationshipName)) { + session.remove(transformed); + session.transfer(original, REL_FAILURE); + return; + } - final byte[] contents = result.getContents(); - if (contents != null) { - transformed = session.write(transformed, out -> out.write(contents)); - } + final Map attributes = result.getAttributes(); + if (attributes != null) { + transformed = session.putAllAttributes(transformed, attributes); + } - session.transfer(transformed, relationship); - session.transfer(original, REL_ORIGINAL); + final byte[] contents = result.getContents(); + if (contents != null) { + transformed = session.write(transformed, out -> out.write(contents)); + } + + session.transfer(transformed, relationship); + session.transfer(original, REL_ORIGINAL); + } finally { + result.free(); + } } } diff --git a/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/python/processor/FlowFileTransformResult.java b/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/python/processor/FlowFileTransformResult.java index 7e57f1b845..b1d29992fe 100644 --- a/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/python/processor/FlowFileTransformResult.java +++ b/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/python/processor/FlowFileTransformResult.java @@ -17,9 +17,11 @@ package org.apache.nifi.python.processor; +import org.apache.nifi.python.PythonObjectProxy; + import java.util.Map; -public interface FlowFileTransformResult { +public interface FlowFileTransformResult extends PythonObjectProxy { String getRelationship(); byte[] getContents(); diff --git a/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/python/processor/RecordTransformProxy.java b/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/python/processor/RecordTransformProxy.java index 4f20c56850..1b95ae3326 100644 --- a/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/python/processor/RecordTransformProxy.java +++ b/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/python/processor/RecordTransformProxy.java @@ -147,9 +147,13 @@ public class RecordTransformProxy extends PythonProcessorProxy { baos.reset(); final List results = transform.transformRecord(json, recordSchema, attributeMap); - for (final RecordTransformResult result : results) { - writeResult(result, destinationTuples, writerFactory, session, flowFile); - recordsWritten++; + try { + for (final RecordTransformResult result : results) { + writeResult(result, destinationTuples, writerFactory, session, flowFile); + recordsWritten++; + } + } finally { + results.forEach(RecordTransformResult::free); } writtenSinceFlush = 0; @@ -163,9 +167,13 @@ public class RecordTransformProxy extends PythonProcessorProxy { baos.reset(); final List results = transform.transformRecord(json, recordSchema, attributeMap); - for (final RecordTransformResult result : results) { - writeResult(result, destinationTuples, writerFactory, session, flowFile); - recordsWritten++; + try { + for (final RecordTransformResult result : results) { + writeResult(result, destinationTuples, writerFactory, session, flowFile); + recordsWritten++; + } + } finally { + results.forEach(RecordTransformResult::free); } } } diff --git a/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/python/processor/RecordTransformResult.java b/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/python/processor/RecordTransformResult.java index c479cf2437..5f212aad71 100644 --- a/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/python/processor/RecordTransformResult.java +++ b/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/python/processor/RecordTransformResult.java @@ -17,11 +17,12 @@ package org.apache.nifi.python.processor; +import org.apache.nifi.python.PythonObjectProxy; import org.apache.nifi.serialization.record.RecordSchema; import java.util.Map; -public interface RecordTransformResult { +public interface RecordTransformResult extends PythonObjectProxy { String getRecordJson(); diff --git a/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/python/processor/OutputRecord.java b/nifi-nar-bundles/nifi-py4j-bundle/nifi-python-framework-api/src/main/java/org/apache/nifi/python/PythonObjectProxy.java similarity index 78% rename from nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/python/processor/OutputRecord.java rename to nifi-nar-bundles/nifi-py4j-bundle/nifi-python-framework-api/src/main/java/org/apache/nifi/python/PythonObjectProxy.java index d90ea081f7..b1ffd64894 100644 --- a/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/python/processor/OutputRecord.java +++ b/nifi-nar-bundles/nifi-py4j-bundle/nifi-python-framework-api/src/main/java/org/apache/nifi/python/PythonObjectProxy.java @@ -15,14 +15,8 @@ * limitations under the License. */ -package org.apache.nifi.python.processor; +package org.apache.nifi.python; -import org.apache.nifi.serialization.record.RecordSchema; - -public interface OutputRecord { - String getRelationship(); - - Object getRecord(); - - RecordSchema getSchema(); +public interface PythonObjectProxy { + void free(); } diff --git a/nifi-nar-bundles/nifi-py4j-bundle/nifi-python-framework-api/src/main/java/org/apache/nifi/python/PythonProcessorDetails.java b/nifi-nar-bundles/nifi-py4j-bundle/nifi-python-framework-api/src/main/java/org/apache/nifi/python/PythonProcessorDetails.java index cb8237ceed..9831f9ddd3 100644 --- a/nifi-nar-bundles/nifi-py4j-bundle/nifi-python-framework-api/src/main/java/org/apache/nifi/python/PythonProcessorDetails.java +++ b/nifi-nar-bundles/nifi-py4j-bundle/nifi-python-framework-api/src/main/java/org/apache/nifi/python/PythonProcessorDetails.java @@ -23,7 +23,7 @@ import org.apache.nifi.python.processor.documentation.UseCaseDetails; import java.util.List; -public interface PythonProcessorDetails { +public interface PythonProcessorDetails extends PythonObjectProxy { /** * @return the type of the Processor (i.e., the class name of the Processor class) */ diff --git a/nifi-nar-bundles/nifi-py4j-bundle/nifi-python-framework-api/src/main/java/org/apache/nifi/python/processor/documentation/MultiProcessorUseCaseDetails.java b/nifi-nar-bundles/nifi-py4j-bundle/nifi-python-framework-api/src/main/java/org/apache/nifi/python/processor/documentation/MultiProcessorUseCaseDetails.java index b772ca992d..9be26efbaf 100644 --- a/nifi-nar-bundles/nifi-py4j-bundle/nifi-python-framework-api/src/main/java/org/apache/nifi/python/processor/documentation/MultiProcessorUseCaseDetails.java +++ b/nifi-nar-bundles/nifi-py4j-bundle/nifi-python-framework-api/src/main/java/org/apache/nifi/python/processor/documentation/MultiProcessorUseCaseDetails.java @@ -17,9 +17,11 @@ package org.apache.nifi.python.processor.documentation; +import org.apache.nifi.python.PythonObjectProxy; + import java.util.List; -public interface MultiProcessorUseCaseDetails { +public interface MultiProcessorUseCaseDetails extends PythonObjectProxy { String getDescription(); String getNotes(); diff --git a/nifi-nar-bundles/nifi-py4j-bundle/nifi-python-framework-api/src/main/java/org/apache/nifi/python/processor/documentation/ProcessorConfigurationDetails.java b/nifi-nar-bundles/nifi-py4j-bundle/nifi-python-framework-api/src/main/java/org/apache/nifi/python/processor/documentation/ProcessorConfigurationDetails.java index d32b3f01ab..38927e32ca 100644 --- a/nifi-nar-bundles/nifi-py4j-bundle/nifi-python-framework-api/src/main/java/org/apache/nifi/python/processor/documentation/ProcessorConfigurationDetails.java +++ b/nifi-nar-bundles/nifi-py4j-bundle/nifi-python-framework-api/src/main/java/org/apache/nifi/python/processor/documentation/ProcessorConfigurationDetails.java @@ -17,7 +17,9 @@ package org.apache.nifi.python.processor.documentation; -public interface ProcessorConfigurationDetails { +import org.apache.nifi.python.PythonObjectProxy; + +public interface ProcessorConfigurationDetails extends PythonObjectProxy { String getProcessorType(); String getConfiguration(); diff --git a/nifi-nar-bundles/nifi-py4j-bundle/nifi-python-framework-api/src/main/java/org/apache/nifi/python/processor/documentation/PropertyDescription.java b/nifi-nar-bundles/nifi-py4j-bundle/nifi-python-framework-api/src/main/java/org/apache/nifi/python/processor/documentation/PropertyDescription.java index f843f85642..ca71317cae 100644 --- a/nifi-nar-bundles/nifi-py4j-bundle/nifi-python-framework-api/src/main/java/org/apache/nifi/python/processor/documentation/PropertyDescription.java +++ b/nifi-nar-bundles/nifi-py4j-bundle/nifi-python-framework-api/src/main/java/org/apache/nifi/python/processor/documentation/PropertyDescription.java @@ -17,7 +17,9 @@ package org.apache.nifi.python.processor.documentation; -public interface PropertyDescription { +import org.apache.nifi.python.PythonObjectProxy; + +public interface PropertyDescription extends PythonObjectProxy { String getName(); String getDisplayName(); diff --git a/nifi-nar-bundles/nifi-py4j-bundle/nifi-python-framework-api/src/main/java/org/apache/nifi/python/processor/documentation/UseCaseDetails.java b/nifi-nar-bundles/nifi-py4j-bundle/nifi-python-framework-api/src/main/java/org/apache/nifi/python/processor/documentation/UseCaseDetails.java index 307f63aa49..8110d27fdd 100644 --- a/nifi-nar-bundles/nifi-py4j-bundle/nifi-python-framework-api/src/main/java/org/apache/nifi/python/processor/documentation/UseCaseDetails.java +++ b/nifi-nar-bundles/nifi-py4j-bundle/nifi-python-framework-api/src/main/java/org/apache/nifi/python/processor/documentation/UseCaseDetails.java @@ -17,9 +17,11 @@ package org.apache.nifi.python.processor.documentation; +import org.apache.nifi.python.PythonObjectProxy; + import java.util.List; -public interface UseCaseDetails { +public interface UseCaseDetails extends PythonObjectProxy { String getDescription(); String getNotes();