mirror of https://github.com/apache/nifi.git
NIFI-12740 Fixed Python to Java Object Binding
Fixed issue in NiFiPythonGateway that stems from the fact that the thread adding an object to the JavaObjectBindings was not necessarily the thread removing them. The algorithm that was in place assumed that the same thread would be used, in order to ensure that an object could be unbound before being accessed. The new algorithm binds each new object to all active method invocations and only unbinds the objects after all method invocations complete, regardless of thread. Additionally, found that many method calls could create new proxies on the Python side, just for getter methods whose values don't change. This is very expensive, so introduced a new @Idempotent annotation that can be added to interface methods such that we can cache the value and avoid the expensive overhead. This closes #8456 Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
parent
f9c1c3f042
commit
55738f8522
|
@ -712,7 +712,7 @@ class MyProcessor(FlowFileTransform):
|
|||
version = '0.0.1-SNAPSHOT'
|
||||
dependencies = ['debugpy']
|
||||
|
||||
def onScheduled(context):
|
||||
def onScheduled(self, context):
|
||||
try:
|
||||
import debugpy
|
||||
debugpy.connect(6688)
|
||||
|
|
|
@ -1,133 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.nifi.py4j;
|
||||
|
||||
import org.apache.nifi.python.PythonProcessorDetails;
|
||||
import org.apache.nifi.python.processor.documentation.MultiProcessorUseCaseDetails;
|
||||
import org.apache.nifi.python.processor.documentation.PropertyDescription;
|
||||
import org.apache.nifi.python.processor.documentation.UseCaseDetails;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* A wrapper around a PythonProcessorDetails that caches the results of the delegate's methods.
|
||||
* Making calls to the Python side is relatively expensive, and we make many calls to {@link #getProcessorType()},
|
||||
* {@link #getProcessorVersion()}, etc. This simple wrapper allows us to make the calls only once.
|
||||
*/
|
||||
public class CachingPythonProcessorDetails implements PythonProcessorDetails {
|
||||
private final PythonProcessorDetails delegate;
|
||||
private volatile String processorType;
|
||||
private volatile String processorVersion;
|
||||
private volatile String capabilityDescription;
|
||||
private volatile String sourceLocation;
|
||||
private volatile List<String> tags;
|
||||
private volatile List<String> dependencies;
|
||||
private volatile String processorInterface;
|
||||
private volatile List<UseCaseDetails> useCases;
|
||||
private volatile List<MultiProcessorUseCaseDetails> multiProcessorUseCases;
|
||||
private volatile List<PropertyDescription> propertyDescriptions;
|
||||
|
||||
|
||||
public CachingPythonProcessorDetails(final PythonProcessorDetails delegate) {
|
||||
this.delegate = delegate;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getProcessorType() {
|
||||
if (processorType == null) {
|
||||
processorType = delegate.getProcessorType();
|
||||
}
|
||||
return processorType;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getProcessorVersion() {
|
||||
if (processorVersion == null) {
|
||||
processorVersion = delegate.getProcessorVersion();
|
||||
}
|
||||
return processorVersion;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getSourceLocation() {
|
||||
if (sourceLocation == null) {
|
||||
sourceLocation = delegate.getSourceLocation();
|
||||
}
|
||||
return sourceLocation;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<String> getTags() {
|
||||
if (tags == null) {
|
||||
tags = delegate.getTags();
|
||||
}
|
||||
return tags;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<String> getDependencies() {
|
||||
if (dependencies == null) {
|
||||
dependencies = delegate.getDependencies();
|
||||
}
|
||||
return dependencies;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getCapabilityDescription() {
|
||||
if (capabilityDescription == null) {
|
||||
capabilityDescription = delegate.getCapabilityDescription();
|
||||
}
|
||||
return capabilityDescription;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getInterface() {
|
||||
if (processorInterface == null) {
|
||||
processorInterface = delegate.getInterface();
|
||||
}
|
||||
return processorInterface;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<UseCaseDetails> getUseCases() {
|
||||
if (useCases == null) {
|
||||
useCases = delegate.getUseCases();
|
||||
}
|
||||
return useCases;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<MultiProcessorUseCaseDetails> getMultiProcessorUseCases() {
|
||||
if (multiProcessorUseCases == null) {
|
||||
multiProcessorUseCases = delegate.getMultiProcessorUseCases();
|
||||
}
|
||||
return multiProcessorUseCases;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<PropertyDescription> getPropertyDescriptions() {
|
||||
if (propertyDescriptions == null) {
|
||||
propertyDescriptions = delegate.getPropertyDescriptions();
|
||||
}
|
||||
return propertyDescriptions;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void free() {
|
||||
}
|
||||
}
|
|
@ -380,22 +380,27 @@ public class PythonProcess {
|
|||
};
|
||||
|
||||
// Create a PythonProcessorDetails and then call getProcessorType and getProcessorVersion to ensure that the details are cached
|
||||
final PythonProcessorDetails processorDetails = new CachingPythonProcessorDetails(controller.getProcessorDetails(type, version));
|
||||
processorDetails.getProcessorType();
|
||||
processorDetails.getProcessorVersion();
|
||||
final PythonProcessorDetails processorDetails = controller.getProcessorDetails(type, version);
|
||||
try {
|
||||
final String processorType = processorDetails.getProcessorType();
|
||||
final String processorVersion = processorDetails.getProcessorVersion();
|
||||
|
||||
final PythonProcessorBridge processorBridge = new StandardPythonProcessorBridge.Builder()
|
||||
.controller(controller)
|
||||
.creationWorkflow(creationWorkflow)
|
||||
.processorDetails(processorDetails)
|
||||
.workingDirectory(processConfig.getPythonWorkingDirectory())
|
||||
.moduleFile(new File(controller.getModuleFile(type, version)))
|
||||
.build();
|
||||
final PythonProcessorBridge processorBridge = new StandardPythonProcessorBridge.Builder()
|
||||
.controller(controller)
|
||||
.creationWorkflow(creationWorkflow)
|
||||
.processorType(processorType)
|
||||
.processorVersion(processorVersion)
|
||||
.workingDirectory(processConfig.getPythonWorkingDirectory())
|
||||
.moduleFile(new File(controller.getModuleFile(type, version)))
|
||||
.build();
|
||||
|
||||
final CreatedProcessor createdProcessor = new CreatedProcessor(identifier, type, processorBridge);
|
||||
createdProcessors.add(createdProcessor);
|
||||
final CreatedProcessor createdProcessor = new CreatedProcessor(identifier, type, processorBridge);
|
||||
createdProcessors.add(createdProcessor);
|
||||
|
||||
return processorBridge;
|
||||
return processorBridge;
|
||||
} finally {
|
||||
processorDetails.free();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -18,7 +18,6 @@
|
|||
package org.apache.nifi.py4j;
|
||||
|
||||
import org.apache.nifi.python.PythonController;
|
||||
import org.apache.nifi.python.PythonProcessorDetails;
|
||||
import org.apache.nifi.python.processor.PythonProcessorAdapter;
|
||||
import org.apache.nifi.python.processor.PythonProcessorBridge;
|
||||
import org.apache.nifi.python.processor.PythonProcessorInitializationContext;
|
||||
|
@ -36,7 +35,8 @@ public class StandardPythonProcessorBridge implements PythonProcessorBridge {
|
|||
private static final Logger logger = LoggerFactory.getLogger(StandardPythonProcessorBridge.class);
|
||||
|
||||
private final ProcessorCreationWorkflow creationWorkflow;
|
||||
private final PythonProcessorDetails processorDetails;
|
||||
private final String processorType;
|
||||
private final String processorVersion;
|
||||
private volatile PythonProcessorAdapter adapter;
|
||||
private final File workingDir;
|
||||
private final File moduleFile;
|
||||
|
@ -51,7 +51,8 @@ public class StandardPythonProcessorBridge implements PythonProcessorBridge {
|
|||
private StandardPythonProcessorBridge(final Builder builder) {
|
||||
this.controller = builder.controller;
|
||||
this.creationWorkflow = builder.creationWorkflow;
|
||||
this.processorDetails = builder.processorDetails;
|
||||
this.processorType = builder.processorType;
|
||||
this.processorVersion = builder.processorVersion;
|
||||
this.workingDir = builder.workDir;
|
||||
this.moduleFile = builder.moduleFile;
|
||||
this.lastModified = this.moduleFile.lastModified();
|
||||
|
@ -165,7 +166,7 @@ public class StandardPythonProcessorBridge implements PythonProcessorBridge {
|
|||
|
||||
@Override
|
||||
public String getProcessorType() {
|
||||
return processorDetails.getProcessorType();
|
||||
return processorType;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -175,7 +176,7 @@ public class StandardPythonProcessorBridge implements PythonProcessorBridge {
|
|||
return false;
|
||||
}
|
||||
|
||||
controller.reloadProcessor(getProcessorType(), processorDetails.getProcessorVersion(), workingDir.getAbsolutePath());
|
||||
controller.reloadProcessor(getProcessorType(), processorVersion, workingDir.getAbsolutePath());
|
||||
initializePythonSide(false, new CompletableFuture<>());
|
||||
lastModified = moduleFile.lastModified();
|
||||
|
||||
|
@ -188,7 +189,8 @@ public class StandardPythonProcessorBridge implements PythonProcessorBridge {
|
|||
private ProcessorCreationWorkflow creationWorkflow;
|
||||
private File workDir;
|
||||
private File moduleFile;
|
||||
private PythonProcessorDetails processorDetails;
|
||||
private String processorType;
|
||||
private String processorVersion;
|
||||
|
||||
public Builder controller(final PythonController controller) {
|
||||
this.controller = controller;
|
||||
|
@ -200,8 +202,13 @@ public class StandardPythonProcessorBridge implements PythonProcessorBridge {
|
|||
return this;
|
||||
}
|
||||
|
||||
public Builder processorDetails(final PythonProcessorDetails details) {
|
||||
this.processorDetails = details;
|
||||
public Builder processorType(final String processorType) {
|
||||
this.processorType = processorType;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder processorVersion(final String processorVersion) {
|
||||
this.processorVersion = processorVersion;
|
||||
return this;
|
||||
}
|
||||
|
||||
|
@ -222,8 +229,11 @@ public class StandardPythonProcessorBridge implements PythonProcessorBridge {
|
|||
if (creationWorkflow == null) {
|
||||
throw new IllegalStateException("Must specify the Processor Creation Workflow");
|
||||
}
|
||||
if (processorDetails == null) {
|
||||
throw new IllegalStateException("Must specify the Processor Details");
|
||||
if (processorType == null) {
|
||||
throw new IllegalStateException("Must specify the Processor Type");
|
||||
}
|
||||
if (processorVersion == null) {
|
||||
throw new IllegalStateException("Must specify the Processor Version");
|
||||
}
|
||||
if (workDir == null) {
|
||||
throw new IllegalStateException("Must specify the Working Directory");
|
||||
|
|
|
@ -104,7 +104,7 @@ public class CommandBuilder {
|
|||
}
|
||||
|
||||
private String bind(final Object value) {
|
||||
final String objectId = bindings.bind(value);
|
||||
final String objectId = bindings.bind(value, 1);
|
||||
boundIds.add(objectId);
|
||||
return objectId;
|
||||
}
|
||||
|
|
|
@ -24,49 +24,90 @@ import py4j.Protocol;
|
|||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.concurrent.locks.Lock;
|
||||
import java.util.concurrent.locks.ReadWriteLock;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
|
||||
public class JavaObjectBindings {
|
||||
private static final Logger logger = LoggerFactory.getLogger(JavaObjectBindings.class);
|
||||
|
||||
private final AtomicLong idGenerator = new AtomicLong(0L);
|
||||
private final Map<String, Object> bindings = new ConcurrentHashMap<>();
|
||||
private final Map<String, Object> bindings = new HashMap<>();
|
||||
private final Map<String, Integer> bindingCounts = new HashMap<>();
|
||||
private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
|
||||
private final Lock readLock = rwLock.readLock();
|
||||
private final Lock writeLock = rwLock.writeLock();
|
||||
|
||||
|
||||
public JavaObjectBindings() {
|
||||
bind(Protocol.DEFAULT_JVM_OBJECT_ID, new JVMView("default", Protocol.DEFAULT_JVM_OBJECT_ID));
|
||||
bind(Protocol.DEFAULT_JVM_OBJECT_ID, new JVMView("default", Protocol.DEFAULT_JVM_OBJECT_ID), 1);
|
||||
}
|
||||
|
||||
public String bind(final Object object) {
|
||||
public String bind(final Object object, final int count) {
|
||||
final String id = "o" + idGenerator.getAndIncrement();
|
||||
return bind(id, object);
|
||||
return bind(id, object, count);
|
||||
}
|
||||
|
||||
public String bind(final String objectId, final Object object) {
|
||||
bindings.put(objectId, object);
|
||||
public String bind(final String objectId, final Object object, final int count) {
|
||||
if (count == 0) {
|
||||
logger.debug("Will not bind {} to ID {} because count is 0", object, objectId);
|
||||
return objectId;
|
||||
}
|
||||
|
||||
logger.debug("Bound {} to ID {}", object, objectId);
|
||||
writeLock.lock();
|
||||
try {
|
||||
bindings.put(objectId, object);
|
||||
bindingCounts.put(objectId, count);
|
||||
} finally {
|
||||
writeLock.unlock();
|
||||
}
|
||||
|
||||
logger.debug("Bound {} to ID {} with count {}", object, objectId, count);
|
||||
return objectId;
|
||||
}
|
||||
|
||||
public Object getBoundObject(final String objectId) {
|
||||
return bindings.get(objectId);
|
||||
readLock.lock();
|
||||
try {
|
||||
return bindings.get(objectId);
|
||||
} finally {
|
||||
readLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
public Object unbind(final String objectId) {
|
||||
final Object unbound = bindings.remove(objectId);
|
||||
logger.debug("Unbound {} from ID {}", unbound, objectId);
|
||||
writeLock.lock();
|
||||
try {
|
||||
final Integer currentValue = bindingCounts.remove(objectId);
|
||||
final int updatedValue = (currentValue == null) ? 0 : currentValue - 1;
|
||||
if (updatedValue < 1) {
|
||||
final Object unbound = bindings.remove(objectId);
|
||||
logger.debug("Unbound {} from ID {}", unbound, objectId);
|
||||
return unbound;
|
||||
}
|
||||
|
||||
return unbound;
|
||||
bindingCounts.put(objectId, updatedValue);
|
||||
logger.debug("Decremented binding count for ID {} to {}", objectId, updatedValue);
|
||||
return bindings.get(objectId);
|
||||
} finally {
|
||||
writeLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
public Map<String, Integer> getCountsPerClass() {
|
||||
final Map<String, Integer> counts = new HashMap<>();
|
||||
bindings.values().forEach(object -> {
|
||||
final String className = (object == null) ? "<null>" : object.getClass().getName();
|
||||
counts.merge(className, 1, Integer::sum);
|
||||
});
|
||||
|
||||
readLock.lock();
|
||||
try {
|
||||
bindings.values().forEach(object -> {
|
||||
final String className = (object == null) ? "<null>" : object.getClass().getName();
|
||||
counts.merge(className, 1, Integer::sum);
|
||||
});
|
||||
} finally {
|
||||
readLock.unlock();
|
||||
}
|
||||
|
||||
return counts;
|
||||
}
|
||||
|
|
|
@ -17,15 +17,6 @@
|
|||
|
||||
package org.apache.nifi.py4j.client;
|
||||
|
||||
import java.lang.reflect.Method;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Stack;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
import org.apache.nifi.python.PythonController;
|
||||
import org.apache.nifi.python.PythonObjectProxy;
|
||||
import org.apache.nifi.python.processor.PreserveJavaBinding;
|
||||
|
@ -34,8 +25,17 @@ import org.slf4j.Logger;
|
|||
import org.slf4j.LoggerFactory;
|
||||
import py4j.CallbackClient;
|
||||
import py4j.Gateway;
|
||||
import py4j.ReturnObject;
|
||||
import py4j.reflection.PythonProxyHandler;
|
||||
|
||||
import java.lang.reflect.Method;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.NoSuchElementException;
|
||||
import java.util.Objects;
|
||||
|
||||
|
||||
/**
|
||||
* <p>
|
||||
|
@ -64,13 +64,29 @@ import py4j.reflection.PythonProxyHandler;
|
|||
public class NiFiPythonGateway extends Gateway {
|
||||
private static final Logger logger = LoggerFactory.getLogger(NiFiPythonGateway.class);
|
||||
private final JavaObjectBindings objectBindings;
|
||||
private final Map<Long, Stack<InvocationBindings>> invocationBindingsById = new ConcurrentHashMap<>();
|
||||
|
||||
// Guarded by synchronized methods
|
||||
private final List<InvocationBindings> activeInvocations = new ArrayList<>();
|
||||
|
||||
private final ReturnObject END_OF_ITERATOR_OBJECT = ReturnObject.getErrorReturnObject(new NoSuchElementException());
|
||||
private final Method freeMethod;
|
||||
private final Method pingMethod;
|
||||
|
||||
public NiFiPythonGateway(final JavaObjectBindings bindings, final Object entryPoint, final CallbackClient callbackClient) {
|
||||
super(entryPoint, callbackClient);
|
||||
this.objectBindings = bindings;
|
||||
|
||||
freeMethod = getMethod(PythonObjectProxy.class, "free");
|
||||
pingMethod = getMethod(PythonController.class, "ping");
|
||||
}
|
||||
|
||||
private Method getMethod(final Class<?> clazz, final String methodName) {
|
||||
try {
|
||||
return clazz.getMethod(methodName);
|
||||
} catch (final NoSuchMethodException ignored) {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
public JavaObjectBindings getObjectBindings() {
|
||||
return objectBindings;
|
||||
|
@ -82,20 +98,55 @@ public class NiFiPythonGateway extends Gateway {
|
|||
}
|
||||
|
||||
@Override
|
||||
public String putNewObject(final Object object) {
|
||||
final String objectId = objectBindings.bind(object);
|
||||
|
||||
final InvocationBindings bindings = getInvocationBindings();
|
||||
if (bindings != null) {
|
||||
bindings.add(objectId);
|
||||
public ReturnObject invoke(final String methodName, final String targetObjectId, final List<Object> args) {
|
||||
// Py4J detects the end of an iterator on the Python side by catching a Py4JError. It makes the assumption that
|
||||
// if it encounters Py4JError then it was due to a NoSuchElementException being thrown on the Java side. In this case,
|
||||
// it throws a StopIteration exception on the Python side. This has a couple of problems:
|
||||
// 1. It's not clear that the exception was thrown because the iterator was exhausted. It could have been thrown for
|
||||
// any other reason, such as an IOException, etc.
|
||||
// 2. It relies on Exception handling for flow control, which is generally considered bad practice.
|
||||
// While we aren't going to go so far as to override the Python code, we can at least prevent Java from constantly
|
||||
// throwing the Exception, catching it, logging it, and re-throwing it.
|
||||
// Instead, we just create the ReturnObject once and return it.
|
||||
final boolean intervene = isNextOnExhaustedIterator(methodName, targetObjectId, args);
|
||||
if (!intervene) {
|
||||
return super.invoke(methodName, targetObjectId, args);
|
||||
}
|
||||
logger.debug("Binding {}: {} ({}) for {}", objectId, object, object == null ? "null" : object.getClass().getName(), bindings);
|
||||
|
||||
return END_OF_ITERATOR_OBJECT;
|
||||
}
|
||||
|
||||
private boolean isNextOnExhaustedIterator(final String methodName, final String targetObjectId, final List<Object> args) {
|
||||
if (!"next".equals(methodName)) {
|
||||
return false;
|
||||
}
|
||||
if (args != null && !args.isEmpty()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
final Object targetObject = getObjectFromId(targetObjectId);
|
||||
if (!(targetObject instanceof final Iterator<?> itr)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return !itr.hasNext();
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized String putNewObject(final Object object) {
|
||||
final String objectId = objectBindings.bind(object, activeInvocations.size() + 1);
|
||||
|
||||
for (final InvocationBindings activeInvocation : activeInvocations) {
|
||||
activeInvocation.add(objectId);
|
||||
}
|
||||
|
||||
logger.debug("Binding {}: {} ({})", objectId, object, object == null ? "null" : object.getClass().getName());
|
||||
return objectId;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object putObject(final String id, final Object object) {
|
||||
objectBindings.bind(id, object);
|
||||
public synchronized Object putObject(final String id, final Object object) {
|
||||
objectBindings.bind(id, object, activeInvocations.size() + 1);
|
||||
logger.debug("Binding {}: {} ({})", id, object, object == null ? "null" : object.getClass().getName());
|
||||
|
||||
return super.putObject(id, object);
|
||||
|
@ -103,55 +154,16 @@ public class NiFiPythonGateway extends Gateway {
|
|||
|
||||
@Override
|
||||
public void deleteObject(final String objectId) {
|
||||
// When the python side no longer needs an object, its finalizer will notify the Java side that it's no longer needed and can be removed
|
||||
// from the accessible objects on the Java heap. However, if we are making a call to the Python side, it's possible that even though the Python
|
||||
// side no longer needs the object, the Java side still needs it bound. For instance, consider the following case:
|
||||
//
|
||||
// Java side calls PythonController.getProcessorTypes()
|
||||
// Python side needs to return an ArrayList, so it calls back to the Java side (in a separate thread) to create this ArrayList with ID o54
|
||||
// Python side adds several Processors to the ArrayList.
|
||||
// Python side returns the ArrayList to the Java side.
|
||||
// Python side no longer needs the ArrayList, so while the Java side is processing the response from the Python side, the Python side notifies the Java side that it's no longer needed.
|
||||
// Java side unbinds the ArrayList.
|
||||
// Java side parses the response from Python, indicating that the return value is the object with ID o54.
|
||||
// Java side cannot access object with ID o54 because it was already removed.
|
||||
//
|
||||
// To avoid this, we check if there is an Invocation Binding (indicating that we are in the middle of a method invocation) and if so,
|
||||
// we add the object to a list of objects to delete on completion.
|
||||
// If there is no Invocation Binding, we unbind the object immediately.
|
||||
final InvocationBindings invocationBindings = getInvocationBindings();
|
||||
if (invocationBindings == null) {
|
||||
final Object unbound = objectBindings.unbind(objectId);
|
||||
logger.debug("Unbound {}: {} because it was explicitly requested from Python side", objectId, unbound);
|
||||
} else {
|
||||
invocationBindings.deleteOnCompletion(objectId);
|
||||
logger.debug("Unbound {} because it was requested from Python side but in active method invocation so will not remove until invocation completed", objectId);
|
||||
}
|
||||
logger.debug("Unbinding {} because it was explicitly requested from Python side", objectId);
|
||||
objectBindings.unbind(objectId);
|
||||
}
|
||||
|
||||
private InvocationBindings getInvocationBindings() {
|
||||
final long threadId = Thread.currentThread().threadId();
|
||||
final Stack<InvocationBindings> stack = invocationBindingsById.get(threadId);
|
||||
if (stack == null || stack.isEmpty()) {
|
||||
return null;
|
||||
}
|
||||
|
||||
return stack.peek();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected PythonProxyHandler createPythonProxyHandler(final String id) {
|
||||
logger.debug("Creating Python Proxy Handler for ID {}", id);
|
||||
final PythonProxyInvocationHandler createdHandler = new PythonProxyInvocationHandler(this, id);
|
||||
|
||||
Method proxyFreeMethod;
|
||||
try {
|
||||
proxyFreeMethod = PythonObjectProxy.class.getMethod("free");
|
||||
} catch (final NoSuchMethodException ignored) {
|
||||
proxyFreeMethod = null;
|
||||
}
|
||||
|
||||
final Method freeMethod = proxyFreeMethod;
|
||||
return new PythonProxyHandler(id, this) {
|
||||
@Override
|
||||
public Object invoke(final Object proxy, final Method method, final Object[] args) throws Throwable {
|
||||
|
@ -163,54 +175,41 @@ public class NiFiPythonGateway extends Gateway {
|
|||
return createdHandler.invoke(proxy, method, args);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void finalize() {
|
||||
// Do nothing. Prevent super.finalize() from being called.
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
public void beginInvocation(final String objectId, final Method method, final Object[] args) {
|
||||
final long threadId = Thread.currentThread().threadId();
|
||||
final InvocationBindings bindings = new InvocationBindings(objectId, method, args);
|
||||
final Stack<InvocationBindings> stack = invocationBindingsById.computeIfAbsent(threadId, id -> new Stack<>());
|
||||
stack.push(bindings);
|
||||
|
||||
logger.debug("Beginning method invocation {} on {} with args {}", method, objectId, Arrays.toString(args));
|
||||
}
|
||||
|
||||
public void endInvocation(final String objectId, final Method method, final Object[] args) {
|
||||
public synchronized InvocationBindings beginInvocation(final String objectId, final Method method, final Object[] args) {
|
||||
final boolean unbind = isUnbind(method);
|
||||
|
||||
final long threadId = Thread.currentThread().threadId();
|
||||
final Stack<InvocationBindings> stack = invocationBindingsById.get(threadId);
|
||||
if (stack == null) {
|
||||
return;
|
||||
final InvocationBindings bindings = new InvocationBindings(objectId, method, args, unbind);
|
||||
|
||||
// We don't want to keep track of invocations of the Ping method.
|
||||
// The Ping method has no arguments or bound objects to free, and can occasionally
|
||||
// even hang, if the timing is right because of the startup sequence of the Python side and how the
|
||||
// communications are established.
|
||||
if (!pingMethod.equals(method)) {
|
||||
activeInvocations.add(bindings);
|
||||
}
|
||||
|
||||
while (!stack.isEmpty()) {
|
||||
final InvocationBindings invocationBindings = stack.pop();
|
||||
final String methodName = invocationBindings.getMethod().getName();
|
||||
logger.debug("Beginning method invocation {}", bindings);
|
||||
|
||||
invocationBindings.getObjectIds().forEach(id -> {
|
||||
if (unbind) {
|
||||
final Object unbound = objectBindings.unbind(id);
|
||||
logger.debug("Unbinding {}: {} because invocation of {} on {} with args {} has completed", id, unbound, methodName, objectId, Arrays.toString(args));
|
||||
} else {
|
||||
logger.debug("Will not unbind {} even though invocation of {} on {} with args {} has completed because of the method being completed",
|
||||
id, methodName, objectId, Arrays.toString(args));
|
||||
}
|
||||
});
|
||||
return bindings;
|
||||
}
|
||||
|
||||
invocationBindings.getObjectsToDeleteOnCompletion().forEach(id -> {
|
||||
final Object unbound = objectBindings.unbind(id);
|
||||
logger.debug("Unbinding {}: {} because invocation of {} on {} with args {} has completed", id, unbound, methodName, objectId, Arrays.toString(args));
|
||||
});
|
||||
public synchronized void endInvocation(final InvocationBindings invocationBindings) {
|
||||
final String methodName = invocationBindings.getMethod().getName();
|
||||
logger.debug("Ending method invocation {}", invocationBindings);
|
||||
|
||||
if (Objects.equals(invocationBindings.getTargetObjectId(), objectId) && Objects.equals(invocationBindings.getMethod(), method) && Arrays.equals(invocationBindings.getArgs(), args)) {
|
||||
break;
|
||||
final String objectId = invocationBindings.getTargetObjectId();
|
||||
|
||||
activeInvocations.remove(invocationBindings);
|
||||
invocationBindings.getObjectIds().forEach(id -> {
|
||||
final Object unbound = objectBindings.unbind(id);
|
||||
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("Unbinding {}: {} because invocation of {} on {} with args {} has completed", id, unbound, methodName, objectId, Arrays.toString(invocationBindings.getArgs()));
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
protected boolean isUnbind(final Method method) {
|
||||
|
@ -222,42 +221,52 @@ public class NiFiPythonGateway extends Gateway {
|
|||
// that it is no longer needed. We can do this by annotating the method with the PreserveJavaBinding annotation.
|
||||
final boolean relevantClass = PythonController.class.isAssignableFrom(declaringClass) || PythonProcessor.class.isAssignableFrom(declaringClass);
|
||||
if (relevantClass && method.getAnnotation(PreserveJavaBinding.class) == null) {
|
||||
return true;
|
||||
// No need for binding / unbinding if types are primitives, because the values themselves are sent across and not references to objects.
|
||||
boolean bindNecessary = isBindNecessary(method.getReturnType());
|
||||
for (final Class<?> parameterType : method.getParameterTypes()) {
|
||||
if (isBindNecessary(parameterType)) {
|
||||
bindNecessary = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
return bindNecessary;
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
private boolean isBindNecessary(final Class<?> type) {
|
||||
return !type.isPrimitive() && type != String.class && type != byte[].class;
|
||||
}
|
||||
|
||||
private static class InvocationBindings {
|
||||
|
||||
public static class InvocationBindings {
|
||||
private final String targetObjectId;
|
||||
private final Method method;
|
||||
private final Object[] args;
|
||||
private final boolean unbind;
|
||||
private final List<String> objectIds = new ArrayList<>();
|
||||
private final List<String> deleteOnCompletion = new ArrayList<>();
|
||||
|
||||
public InvocationBindings(final String targetObjectId, final Method method, final Object[] args) {
|
||||
public InvocationBindings(final String targetObjectId, final Method method, final Object[] args, final boolean unbind) {
|
||||
this.targetObjectId = targetObjectId;
|
||||
this.method = method;
|
||||
this.args = args;
|
||||
this.unbind = unbind;
|
||||
}
|
||||
|
||||
public boolean isUnbind() {
|
||||
return unbind;
|
||||
}
|
||||
|
||||
public void add(final String objectId) {
|
||||
objectIds.add(objectId);
|
||||
}
|
||||
|
||||
public void deleteOnCompletion(final String objectId) {
|
||||
deleteOnCompletion.add(objectId);
|
||||
}
|
||||
|
||||
public List<String> getObjectIds() {
|
||||
return objectIds;
|
||||
}
|
||||
|
||||
public List<String> getObjectsToDeleteOnCompletion() {
|
||||
return deleteOnCompletion;
|
||||
}
|
||||
|
||||
public String getTargetObjectId() {
|
||||
return targetObjectId;
|
||||
}
|
||||
|
|
|
@ -17,6 +17,8 @@
|
|||
|
||||
package org.apache.nifi.py4j.client;
|
||||
|
||||
import org.apache.nifi.py4j.client.NiFiPythonGateway.InvocationBindings;
|
||||
import org.apache.nifi.python.processor.Idempotent;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import py4j.Protocol;
|
||||
|
@ -30,6 +32,8 @@ import java.util.ArrayList;
|
|||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
|
||||
public class PythonProxyInvocationHandler implements InvocationHandler {
|
||||
private static final Logger logger = LoggerFactory.getLogger(PythonProxyInvocationHandler.class);
|
||||
|
@ -38,6 +42,7 @@ public class PythonProxyInvocationHandler implements InvocationHandler {
|
|||
private final NiFiPythonGateway gateway;
|
||||
private final JavaObjectBindings bindings;
|
||||
private final String gcCommand;
|
||||
private final ConcurrentMap<Method, Object> cachedValues = new ConcurrentHashMap<>();
|
||||
|
||||
public PythonProxyInvocationHandler(final NiFiPythonGateway gateway, final String objectId) {
|
||||
this.objectId = objectId;
|
||||
|
@ -59,6 +64,15 @@ public class PythonProxyInvocationHandler implements InvocationHandler {
|
|||
return "PythonProxy[targetObjectId=" + objectId + "]";
|
||||
}
|
||||
|
||||
// Only support caching for 0-arg methods currently
|
||||
final boolean idempotent = (args == null || args.length == 0) && method.getAnnotation(Idempotent.class) != null;
|
||||
if (idempotent) {
|
||||
final Object cachedValue = cachedValues.get(method);
|
||||
if (cachedValue != null) {
|
||||
return cachedValue;
|
||||
}
|
||||
}
|
||||
|
||||
final CommandBuilder commandBuilder = new CommandBuilder(bindings, objectId, method.getName());
|
||||
final String command = commandBuilder.buildCommand(args);
|
||||
|
||||
|
@ -67,23 +81,27 @@ public class PythonProxyInvocationHandler implements InvocationHandler {
|
|||
logger.debug("Invoking {} on {} with args {} using command {}", method, proxy, argList, command);
|
||||
}
|
||||
|
||||
gateway.beginInvocation(this.objectId, method, args);
|
||||
final InvocationBindings invocationBindings = gateway.beginInvocation(this.objectId, method, args);
|
||||
try {
|
||||
final String response = gateway.getCallbackClient().sendCommand(command);
|
||||
final Object output = Protocol.getReturnValue(response, gateway);
|
||||
final Object result = convertOutput(method, output);
|
||||
if (idempotent) {
|
||||
cachedValues.putIfAbsent(method, result);
|
||||
}
|
||||
|
||||
final String response = gateway.getCallbackClient().sendCommand(command);
|
||||
final Object output = Protocol.getReturnValue(response, gateway);
|
||||
final Object convertedOutput = convertOutput(method, output);
|
||||
return result;
|
||||
} finally {
|
||||
if (invocationBindings.isUnbind()) {
|
||||
commandBuilder.getBoundIds().forEach(bindings::unbind);
|
||||
commandBuilder.getBoundIds().forEach(i -> logger.debug("For method invocation {} unbound {} (from command builder)", method.getName(), i));
|
||||
} else {
|
||||
commandBuilder.getBoundIds().forEach(i -> logger.debug("For method invocation {} will not unbind {} (from command builder) because arguments of this method are not to be unbound",
|
||||
method.getName(), i));
|
||||
}
|
||||
|
||||
if (gateway.isUnbind(method)) {
|
||||
commandBuilder.getBoundIds().forEach(bindings::unbind);
|
||||
commandBuilder.getBoundIds().forEach(i -> logger.debug("For method invocation {} unbound {} (from command builder)", method.getName(), i));
|
||||
} else {
|
||||
commandBuilder.getBoundIds().forEach(i -> logger.debug("For method invocation {} will not unbind {} (from command builder) because arguments of this method are not to be unbound",
|
||||
method.getName(), i));
|
||||
gateway.endInvocation(invocationBindings);
|
||||
}
|
||||
|
||||
gateway.endInvocation(this.objectId, method, args);
|
||||
|
||||
return convertedOutput;
|
||||
}
|
||||
|
||||
|
||||
|
@ -107,7 +125,7 @@ public class PythonProxyInvocationHandler implements InvocationHandler {
|
|||
throw new Py4JException("Incompatible output type. Expected: " + returnType.getName() + " Actual: " + outputType.getName());
|
||||
}
|
||||
|
||||
return converters.get(0).convert(output);
|
||||
return converters.getFirst().convert(output);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -17,17 +17,13 @@
|
|||
|
||||
package org.apache.nifi.py4j.server;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import py4j.Gateway;
|
||||
import py4j.GatewayConnection;
|
||||
import py4j.GatewayServerListener;
|
||||
import py4j.commands.Command;
|
||||
|
||||
import java.io.BufferedWriter;
|
||||
import java.io.IOException;
|
||||
import java.net.Socket;
|
||||
import java.net.SocketTimeoutException;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
|
@ -66,10 +62,8 @@ import java.util.List;
|
|||
* -
|
||||
*/
|
||||
public class NiFiGatewayConnection extends GatewayConnection {
|
||||
private static final Logger logger = LoggerFactory.getLogger(NiFiGatewayConnection.class);
|
||||
|
||||
private final NiFiGatewayServer gatewayServer;
|
||||
private volatile boolean poisoned = false;
|
||||
private final ClassLoader contextClassLoader;
|
||||
|
||||
public NiFiGatewayConnection(final NiFiGatewayServer gatewayServer,
|
||||
|
@ -83,10 +77,6 @@ public class NiFiGatewayConnection extends GatewayConnection {
|
|||
this.contextClassLoader = getClass().getClassLoader();
|
||||
}
|
||||
|
||||
private boolean isContinue() {
|
||||
return !poisoned && !gatewayServer.isShutdown();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
final ClassLoader originalClassLoader = Thread.currentThread().getContextClassLoader();
|
||||
|
@ -94,9 +84,7 @@ public class NiFiGatewayConnection extends GatewayConnection {
|
|||
Thread.currentThread().setContextClassLoader(this.contextClassLoader);
|
||||
|
||||
Thread.currentThread().setName(String.format("NiFiGatewayConnection Thread for %s %s", gatewayServer.getComponentType(), gatewayServer.getComponentId()));
|
||||
while (isContinue()) {
|
||||
super.run();
|
||||
}
|
||||
super.run();
|
||||
|
||||
shutdown(false);
|
||||
} finally {
|
||||
|
@ -105,36 +93,4 @@ public class NiFiGatewayConnection extends GatewayConnection {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
protected void quietSendFatalError(final BufferedWriter writer, final Throwable exception) {
|
||||
if (gatewayServer.isShutdown()) {
|
||||
super.quietSendFatalError(writer, exception);
|
||||
}
|
||||
|
||||
if (exception instanceof SocketTimeoutException) {
|
||||
logger.debug("{} received call to quietSendFatalError with Exception {} but will ignore because it's a SocketTimeoutException", this, exception.toString());
|
||||
return;
|
||||
}
|
||||
|
||||
poisoned = true;
|
||||
super.quietSendFatalError(writer, exception);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void shutdown(final boolean reset) {
|
||||
if (poisoned) {
|
||||
logger.debug("Connection {} shutdown and is poisoned so will truly shutdown connection, reset={}", this, reset);
|
||||
super.shutdown(reset);
|
||||
return;
|
||||
}
|
||||
|
||||
if (gatewayServer.isShutdown()) {
|
||||
logger.debug("Connection {} shutdown and Gateway Server is shutdown so will truly shutdown connection", this);
|
||||
super.shutdown(false);
|
||||
return;
|
||||
}
|
||||
|
||||
// do nothing.
|
||||
logger.debug("Connection {} shutdown but is not poisoned. Will not shutdown the connection", this);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -54,6 +54,7 @@ public abstract class PythonProcessorProxy<T extends PythonProcessor> extends Ab
|
|||
private volatile Boolean supportsDynamicProperties;
|
||||
|
||||
private volatile T currentTransform;
|
||||
private volatile PythonProcessorAdapter currentAdapter;
|
||||
private volatile ProcessContext currentProcessContext;
|
||||
|
||||
|
||||
|
@ -112,25 +113,23 @@ public abstract class PythonProcessorProxy<T extends PythonProcessor> extends Ab
|
|||
}
|
||||
|
||||
|
||||
@OnScheduled
|
||||
public void setContext(final ProcessContext context) {
|
||||
this.currentProcessContext = context;
|
||||
}
|
||||
|
||||
protected T getTransform() {
|
||||
protected synchronized T getTransform() {
|
||||
final PythonProcessorBridge bridge = getBridge().orElseThrow(() -> new IllegalStateException(this + " is not finished initializing"));
|
||||
final Optional<PythonProcessorAdapter> optionalAdapter = bridge.getProcessorAdapter();
|
||||
if (optionalAdapter.isEmpty()) {
|
||||
throw new IllegalStateException(this + " is not finished initializing");
|
||||
}
|
||||
|
||||
final T transform = (T) optionalAdapter.get().getProcessor();
|
||||
if (transform != currentTransform) {
|
||||
final PythonProcessorAdapter adapter = optionalAdapter.get();
|
||||
if (adapter != currentAdapter) {
|
||||
final T transform = (T) adapter.getProcessor();
|
||||
transform.setContext(currentProcessContext);
|
||||
|
||||
currentTransform = transform;
|
||||
currentAdapter = adapter;
|
||||
}
|
||||
|
||||
return transform;
|
||||
return currentTransform;
|
||||
}
|
||||
|
||||
protected Optional<PythonProcessorBridge> getBridge() {
|
||||
|
@ -257,8 +256,7 @@ public abstract class PythonProcessorProxy<T extends PythonProcessor> extends Ab
|
|||
return supported;
|
||||
}
|
||||
|
||||
@OnScheduled
|
||||
public void cacheRelationships() {
|
||||
private void cacheRelationships() {
|
||||
// Get the Relationships from the Python side. Then make a defensive copy and make that copy immutable.
|
||||
// We cache this to avoid having to call into the Python side while the Processor is running. However, once
|
||||
// it is stopped, its relationships may change due to properties, etc.
|
||||
|
@ -266,8 +264,7 @@ public abstract class PythonProcessorProxy<T extends PythonProcessor> extends Ab
|
|||
this.cachedRelationships = Set.copyOf(relationships);
|
||||
}
|
||||
|
||||
@OnScheduled
|
||||
public void cacheDynamicPropertyDescriptors(final ProcessContext context) {
|
||||
private void cacheDynamicPropertyDescriptors(final ProcessContext context) {
|
||||
final Map<String, PropertyDescriptor> dynamicDescriptors = new HashMap<>();
|
||||
|
||||
final Set<PropertyDescriptor> descriptors = context.getProperties().keySet();
|
||||
|
@ -299,8 +296,8 @@ public abstract class PythonProcessorProxy<T extends PythonProcessor> extends Ab
|
|||
Set<Relationship> processorRelationships;
|
||||
try {
|
||||
processorRelationships = bridge.getProcessorAdapter()
|
||||
.map(PythonProcessorAdapter::getRelationships)
|
||||
.orElseGet(HashSet::new);
|
||||
.map(PythonProcessorAdapter::getRelationships)
|
||||
.orElseGet(HashSet::new);
|
||||
} catch (final Exception e) {
|
||||
getLogger().warn("Failed to obtain list of Relationships from Python Processor {}; assuming no explicit relationships", this, e);
|
||||
processorRelationships = new HashSet<>();
|
||||
|
@ -313,14 +310,23 @@ public abstract class PythonProcessorProxy<T extends PythonProcessor> extends Ab
|
|||
|
||||
@OnScheduled
|
||||
public void onScheduled(final ProcessContext context) {
|
||||
this.currentProcessContext = context;
|
||||
|
||||
if (bridge == null) {
|
||||
throw new IllegalStateException("Processor is not yet initialized");
|
||||
}
|
||||
|
||||
reload();
|
||||
bridge.getProcessorAdapter()
|
||||
.orElseThrow(() -> new IllegalStateException("Processor has not finished initializing"))
|
||||
.onScheduled(context);
|
||||
|
||||
final PythonProcessorAdapter adapter = bridge.getProcessorAdapter()
|
||||
.orElseThrow(() -> new IllegalStateException("Processor has not finished initializing"));
|
||||
|
||||
adapter.onScheduled(context);
|
||||
|
||||
adapter.getProcessor().setContext(context);
|
||||
|
||||
cacheRelationships();
|
||||
cacheDynamicPropertyDescriptors(context);
|
||||
}
|
||||
|
||||
@OnStopped
|
||||
|
|
|
@ -0,0 +1,153 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.nifi.py4j.client;
|
||||
|
||||
import org.apache.nifi.py4j.client.NiFiPythonGateway.InvocationBindings;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import py4j.CallbackClient;
|
||||
|
||||
import java.lang.reflect.Method;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertNotNull;
|
||||
import static org.junit.jupiter.api.Assertions.assertNull;
|
||||
import static org.mockito.Mockito.mock;
|
||||
|
||||
public class TestNiFiPythonGateway {
|
||||
|
||||
private static final Method NOP_METHOD;
|
||||
|
||||
static {
|
||||
try {
|
||||
NOP_METHOD = TestNiFiPythonGateway.class.getMethod("nop", Object[].class);
|
||||
} catch (NoSuchMethodException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
private JavaObjectBindings bindings;
|
||||
private NiFiPythonGateway gateway;
|
||||
|
||||
@BeforeEach
|
||||
public void setup() {
|
||||
bindings = new JavaObjectBindings();
|
||||
gateway = new NiFiPythonGateway(bindings, this, mock(CallbackClient.class)) {
|
||||
@Override
|
||||
protected boolean isUnbind(final Method method) {
|
||||
return true;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testObjectBoundUnboundWithSingleInvocation() {
|
||||
final Object[] args = new Object[] { new Object() };
|
||||
final InvocationBindings invocationBindings = gateway.beginInvocation("o123", NOP_METHOD, args);
|
||||
final String objectId = gateway.putNewObject(args[0]);
|
||||
final List<String> objectIds = invocationBindings.getObjectIds();
|
||||
assertEquals(List.of(objectId), objectIds);
|
||||
assertEquals(args[0], gateway.getObject(objectId));
|
||||
|
||||
gateway.endInvocation(invocationBindings);
|
||||
gateway.deleteObject(objectId);
|
||||
assertNull(gateway.getObject(objectId));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testObjectBoundNotUnboundWhileInvocationActive() {
|
||||
final Object[] args = new Object[] { new Object() };
|
||||
final InvocationBindings invocationBindings = gateway.beginInvocation("o123", NOP_METHOD, args);
|
||||
final String objectId = gateway.putNewObject(args[0]);
|
||||
final List<String> objectIds = invocationBindings.getObjectIds();
|
||||
assertEquals(List.of(objectId), objectIds);
|
||||
assertEquals(args[0], gateway.getObject(objectId));
|
||||
|
||||
gateway.deleteObject(objectId);
|
||||
|
||||
// gateway.deleteObject should not remove the value because the invocation is still active
|
||||
assertEquals(args[0], gateway.getObject(objectId));
|
||||
|
||||
// After calling endInvocation, the object should be cleaned up.
|
||||
gateway.endInvocation(invocationBindings);
|
||||
assertNull(gateway.getObject(objectId));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testEndInvocationDifferentThread() throws InterruptedException {
|
||||
final Object[] args = new Object[] { new Object() };
|
||||
final InvocationBindings invocationBindings = gateway.beginInvocation("o123", NOP_METHOD, args);
|
||||
final String objectId = gateway.putNewObject(args[0]);
|
||||
final List<String> objectIds = invocationBindings.getObjectIds();
|
||||
assertEquals(List.of(objectId), objectIds);
|
||||
assertEquals(args[0], gateway.getObject(objectId));
|
||||
|
||||
gateway.deleteObject(objectId);
|
||||
|
||||
// gateway.deleteObject should not remove the value because the invocation is still active
|
||||
assertEquals(args[0], gateway.getObject(objectId));
|
||||
|
||||
Thread.ofVirtual().start(() -> {
|
||||
gateway.endInvocation(invocationBindings);
|
||||
}).join();
|
||||
|
||||
assertNull(gateway.getObject(objectId));
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testMultipleInvocationsActive() {
|
||||
final Object[] args = new Object[] { new Object() };
|
||||
|
||||
// Simulate 5 different threads making invocations into the Python process
|
||||
final List<InvocationBindings> bindings = new ArrayList<>();
|
||||
for (int i=0; i < 5; i++) {
|
||||
final InvocationBindings invocationBindings = gateway.beginInvocation("o123", NOP_METHOD, args);
|
||||
bindings.add(invocationBindings);
|
||||
}
|
||||
|
||||
// Create an object while there are 5 invocations active. We don't know which invocation caused
|
||||
// the object to be created, so we can't unbind it until all invocations are complete.
|
||||
final String objectId = gateway.putNewObject(args[0]);
|
||||
|
||||
// Simulate Python process garbage collecting the object after 2 of the invocations are complete
|
||||
gateway.endInvocation(bindings.removeFirst());
|
||||
gateway.endInvocation(bindings.removeFirst());
|
||||
gateway.deleteObject(objectId);
|
||||
|
||||
// We should now be able to add additional invocations, and they should not prevent the already-bound
|
||||
// object from being cleaned up.
|
||||
for (int i=0; i < 3; i++) {
|
||||
gateway.beginInvocation("o123", NOP_METHOD, args);
|
||||
}
|
||||
|
||||
// As each of the invocations complete, we should find the object is still bound
|
||||
for (final InvocationBindings invocationBindings : bindings) {
|
||||
assertNotNull(gateway.getObject(objectId));
|
||||
gateway.endInvocation(invocationBindings);
|
||||
}
|
||||
|
||||
// When the final invocation completes, the object should be unbound
|
||||
assertNull(gateway.getObject(objectId));
|
||||
}
|
||||
|
||||
public void nop(Object... args) {
|
||||
}
|
||||
}
|
|
@ -0,0 +1,37 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.nifi.python.processor;
|
||||
|
||||
import java.lang.annotation.Documented;
|
||||
import java.lang.annotation.ElementType;
|
||||
import java.lang.annotation.Inherited;
|
||||
import java.lang.annotation.Retention;
|
||||
import java.lang.annotation.RetentionPolicy;
|
||||
import java.lang.annotation.Target;
|
||||
|
||||
/**
|
||||
* A marker annotation that can be added to an interface's method.
|
||||
* When this annotation is present, the proxy that is responsible for invoking the method on the Python side
|
||||
* may opt to cache the value and return the same value in subsequent calls.
|
||||
*/
|
||||
@Documented
|
||||
@Target({ElementType.METHOD})
|
||||
@Retention(RetentionPolicy.RUNTIME)
|
||||
@Inherited
|
||||
public @interface Idempotent {
|
||||
}
|
|
@ -19,13 +19,14 @@ package org.apache.nifi.python.processor;
|
|||
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.processor.ProcessContext;
|
||||
import org.apache.nifi.python.PythonObjectProxy;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* Base interface for any Python based processor
|
||||
*/
|
||||
public interface PythonProcessor {
|
||||
public interface PythonProcessor extends PythonObjectProxy {
|
||||
|
||||
List<PropertyDescriptor> getSupportedPropertyDescriptors();
|
||||
|
||||
|
|
|
@ -27,6 +27,7 @@ import java.util.Collection;
|
|||
import java.util.Set;
|
||||
|
||||
public interface PythonProcessorAdapter extends PythonProcessor {
|
||||
@Idempotent
|
||||
PythonProcessor getProcessor();
|
||||
|
||||
@PreserveJavaBinding
|
||||
|
@ -40,9 +41,11 @@ public interface PythonProcessorAdapter extends PythonProcessor {
|
|||
|
||||
void onStopped(ProcessContext context);
|
||||
|
||||
@Idempotent
|
||||
String getCapabilityDescription();
|
||||
|
||||
PropertyDescriptor getSupportedDynamicPropertyDescriptor(String propertyName);
|
||||
|
||||
@Idempotent
|
||||
boolean isDynamicPropertySupported();
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue