NIFI-12757: Issue GC commands to Python for FlowFileTransformResults and RecordTransformResults when no longer needed on Java side

This closes #8375

Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
Mark Payne 2024-02-07 19:05:15 -05:00 committed by exceptionfactory
parent e90c42d9cd
commit b8d7b9c677
No known key found for this signature in database
12 changed files with 81 additions and 39 deletions

View File

@ -27,6 +27,7 @@ import java.util.Stack;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import org.apache.nifi.python.PythonController; import org.apache.nifi.python.PythonController;
import org.apache.nifi.python.PythonObjectProxy;
import org.apache.nifi.python.processor.PreserveJavaBinding; import org.apache.nifi.python.processor.PreserveJavaBinding;
import org.apache.nifi.python.processor.PythonProcessor; import org.apache.nifi.python.processor.PythonProcessor;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -142,9 +143,23 @@ public class NiFiPythonGateway extends Gateway {
protected PythonProxyHandler createPythonProxyHandler(final String id) { protected PythonProxyHandler createPythonProxyHandler(final String id) {
logger.debug("Creating Python Proxy Handler for ID {}", id); logger.debug("Creating Python Proxy Handler for ID {}", id);
final PythonProxyInvocationHandler createdHandler = new PythonProxyInvocationHandler(this, id); final PythonProxyInvocationHandler createdHandler = new PythonProxyInvocationHandler(this, id);
Method proxyFreeMethod;
try {
proxyFreeMethod = PythonObjectProxy.class.getMethod("free");
} catch (final NoSuchMethodException ignored) {
proxyFreeMethod = null;
}
final Method freeMethod = proxyFreeMethod;
return new PythonProxyHandler(id, this) { return new PythonProxyHandler(id, this) {
@Override @Override
public Object invoke(final Object proxy, final Method method, final Object[] args) throws Throwable { public Object invoke(final Object proxy, final Method method, final Object[] args) throws Throwable {
if (Objects.equals(freeMethod, method)) {
createdHandler.free();
return null;
}
return createdHandler.invoke(proxy, method, args); return createdHandler.invoke(proxy, method, args);
} }

View File

@ -37,11 +37,20 @@ public class PythonProxyInvocationHandler implements InvocationHandler {
private final String objectId; private final String objectId;
private final NiFiPythonGateway gateway; private final NiFiPythonGateway gateway;
private final JavaObjectBindings bindings; private final JavaObjectBindings bindings;
private final String gcCommand;
public PythonProxyInvocationHandler(final NiFiPythonGateway gateway, final String objectId) { public PythonProxyInvocationHandler(final NiFiPythonGateway gateway, final String objectId) {
this.objectId = objectId; this.objectId = objectId;
this.gateway = gateway; this.gateway = gateway;
this.bindings = gateway.getObjectBindings(); this.bindings = gateway.getObjectBindings();
this.gcCommand = "g\n" + objectId + "\ne\n";
}
public void free() {
if (this.objectId != Protocol.ENTRY_POINT_OBJECT_ID) {
logger.debug("Issuing GC command to python for proxy id {}", this.objectId);
gateway.getCallbackClient().sendCommand(gcCommand, false);
}
} }
@Override @Override

View File

@ -72,26 +72,31 @@ public class FlowFileTransformProxy extends PythonProcessorProxy {
session.transfer(original, REL_FAILURE); session.transfer(original, REL_FAILURE);
return; return;
} }
final String relationshipName = result.getRelationship();
final Relationship relationship = new Relationship.Builder().name(relationshipName).build();
if (REL_FAILURE.getName().equals(relationshipName)) {
session.remove(transformed);
session.transfer(original, REL_FAILURE);
return;
}
final Map<String, String> attributes = result.getAttributes(); try {
if (attributes != null) { final String relationshipName = result.getRelationship();
transformed = session.putAllAttributes(transformed, attributes); final Relationship relationship = new Relationship.Builder().name(relationshipName).build();
} if (REL_FAILURE.getName().equals(relationshipName)) {
session.remove(transformed);
session.transfer(original, REL_FAILURE);
return;
}
final byte[] contents = result.getContents(); final Map<String, String> attributes = result.getAttributes();
if (contents != null) { if (attributes != null) {
transformed = session.write(transformed, out -> out.write(contents)); transformed = session.putAllAttributes(transformed, attributes);
} }
session.transfer(transformed, relationship); final byte[] contents = result.getContents();
session.transfer(original, REL_ORIGINAL); if (contents != null) {
transformed = session.write(transformed, out -> out.write(contents));
}
session.transfer(transformed, relationship);
session.transfer(original, REL_ORIGINAL);
} finally {
result.free();
}
} }
} }

View File

@ -17,9 +17,11 @@
package org.apache.nifi.python.processor; package org.apache.nifi.python.processor;
import org.apache.nifi.python.PythonObjectProxy;
import java.util.Map; import java.util.Map;
public interface FlowFileTransformResult { public interface FlowFileTransformResult extends PythonObjectProxy {
String getRelationship(); String getRelationship();
byte[] getContents(); byte[] getContents();

View File

@ -147,9 +147,13 @@ public class RecordTransformProxy extends PythonProcessorProxy {
baos.reset(); baos.reset();
final List<RecordTransformResult> results = transform.transformRecord(json, recordSchema, attributeMap); final List<RecordTransformResult> results = transform.transformRecord(json, recordSchema, attributeMap);
for (final RecordTransformResult result : results) { try {
writeResult(result, destinationTuples, writerFactory, session, flowFile); for (final RecordTransformResult result : results) {
recordsWritten++; writeResult(result, destinationTuples, writerFactory, session, flowFile);
recordsWritten++;
}
} finally {
results.forEach(RecordTransformResult::free);
} }
writtenSinceFlush = 0; writtenSinceFlush = 0;
@ -163,9 +167,13 @@ public class RecordTransformProxy extends PythonProcessorProxy {
baos.reset(); baos.reset();
final List<RecordTransformResult> results = transform.transformRecord(json, recordSchema, attributeMap); final List<RecordTransformResult> results = transform.transformRecord(json, recordSchema, attributeMap);
for (final RecordTransformResult result : results) { try {
writeResult(result, destinationTuples, writerFactory, session, flowFile); for (final RecordTransformResult result : results) {
recordsWritten++; writeResult(result, destinationTuples, writerFactory, session, flowFile);
recordsWritten++;
}
} finally {
results.forEach(RecordTransformResult::free);
} }
} }
} }

View File

@ -17,11 +17,12 @@
package org.apache.nifi.python.processor; package org.apache.nifi.python.processor;
import org.apache.nifi.python.PythonObjectProxy;
import org.apache.nifi.serialization.record.RecordSchema; import org.apache.nifi.serialization.record.RecordSchema;
import java.util.Map; import java.util.Map;
public interface RecordTransformResult { public interface RecordTransformResult extends PythonObjectProxy {
String getRecordJson(); String getRecordJson();

View File

@ -15,14 +15,8 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.nifi.python.processor; package org.apache.nifi.python;
import org.apache.nifi.serialization.record.RecordSchema; public interface PythonObjectProxy {
void free();
public interface OutputRecord {
String getRelationship();
Object getRecord();
RecordSchema getSchema();
} }

View File

@ -23,7 +23,7 @@ import org.apache.nifi.python.processor.documentation.UseCaseDetails;
import java.util.List; import java.util.List;
public interface PythonProcessorDetails { public interface PythonProcessorDetails extends PythonObjectProxy {
/** /**
* @return the type of the Processor (i.e., the class name of the Processor class) * @return the type of the Processor (i.e., the class name of the Processor class)
*/ */

View File

@ -17,9 +17,11 @@
package org.apache.nifi.python.processor.documentation; package org.apache.nifi.python.processor.documentation;
import org.apache.nifi.python.PythonObjectProxy;
import java.util.List; import java.util.List;
public interface MultiProcessorUseCaseDetails { public interface MultiProcessorUseCaseDetails extends PythonObjectProxy {
String getDescription(); String getDescription();
String getNotes(); String getNotes();

View File

@ -17,7 +17,9 @@
package org.apache.nifi.python.processor.documentation; package org.apache.nifi.python.processor.documentation;
public interface ProcessorConfigurationDetails { import org.apache.nifi.python.PythonObjectProxy;
public interface ProcessorConfigurationDetails extends PythonObjectProxy {
String getProcessorType(); String getProcessorType();
String getConfiguration(); String getConfiguration();

View File

@ -17,7 +17,9 @@
package org.apache.nifi.python.processor.documentation; package org.apache.nifi.python.processor.documentation;
public interface PropertyDescription { import org.apache.nifi.python.PythonObjectProxy;
public interface PropertyDescription extends PythonObjectProxy {
String getName(); String getName();
String getDisplayName(); String getDisplayName();

View File

@ -17,9 +17,11 @@
package org.apache.nifi.python.processor.documentation; package org.apache.nifi.python.processor.documentation;
import org.apache.nifi.python.PythonObjectProxy;
import java.util.List; import java.util.List;
public interface UseCaseDetails { public interface UseCaseDetails extends PythonObjectProxy {
String getDescription(); String getDescription();
String getNotes(); String getNotes();