mirror of https://github.com/apache/nifi.git
NIFI-12308: Create Python Environment in background thread instead of during Processor creation
This closes #7971 Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
parent
f1a34a58fa
commit
754baf0a37
|
@ -17,7 +17,9 @@
|
|||
|
||||
package org.apache.nifi.components;
|
||||
|
||||
public interface AsyncLoadedProcessor {
|
||||
import org.apache.nifi.processor.Processor;
|
||||
|
||||
public interface AsyncLoadedProcessor extends Processor {
|
||||
default boolean isLoaded() {
|
||||
return getState() == LoadState.FINISHED_LOADING;
|
||||
}
|
||||
|
@ -25,6 +27,8 @@ public interface AsyncLoadedProcessor {
|
|||
LoadState getState();
|
||||
|
||||
enum LoadState {
|
||||
INITIALIZING_ENVIRONMENT,
|
||||
|
||||
DOWNLOADING_DEPENDENCIES,
|
||||
|
||||
LOADING_PROCESSOR_CODE,
|
||||
|
|
|
@ -878,6 +878,7 @@ public abstract class AbstractComponentNode implements ComponentNode {
|
|||
if (component instanceof final AsyncLoadedProcessor asyncLoadedProcessor) {
|
||||
if (!asyncLoadedProcessor.isLoaded()) {
|
||||
final String explanation = switch (asyncLoadedProcessor.getState()) {
|
||||
case INITIALIZING_ENVIRONMENT -> "Initializing runtime environment for the Processor.";
|
||||
case DEPENDENCY_DOWNLOAD_FAILED -> "Failed to download one or more Processor dependencies. See logs for additional details.";
|
||||
case DOWNLOADING_DEPENDENCIES -> "In the process of downloading third-party dependencies required by the Processor.";
|
||||
case LOADING_PROCESSOR_CODE -> "In the process of loading Processor code";
|
||||
|
|
|
@ -22,7 +22,6 @@ import org.apache.nifi.python.BoundObjectCounts;
|
|||
import org.apache.nifi.python.PythonBridge;
|
||||
import org.apache.nifi.python.PythonBridgeInitializationContext;
|
||||
import org.apache.nifi.python.PythonProcessorDetails;
|
||||
import org.apache.nifi.python.processor.PythonProcessorBridge;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
|
@ -97,7 +96,7 @@ public class ClassLoaderAwarePythonBridge implements PythonBridge {
|
|||
}
|
||||
|
||||
@Override
|
||||
public PythonProcessorBridge createProcessor(final String identifier, final String type, final String version, final boolean preferIsolatedProcess) {
|
||||
public AsyncLoadedProcessor createProcessor(final String identifier, final String type, final String version, final boolean preferIsolatedProcess) {
|
||||
try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(classLoader)) {
|
||||
return delegate.createProcessor(identifier, type, version, preferIsolatedProcess);
|
||||
}
|
||||
|
|
|
@ -16,16 +16,6 @@
|
|||
*/
|
||||
package org.apache.nifi.controller;
|
||||
|
||||
import java.lang.reflect.InvocationTargetException;
|
||||
import java.lang.reflect.Proxy;
|
||||
import java.net.URL;
|
||||
import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.stream.Collectors;
|
||||
import javax.net.ssl.SSLContext;
|
||||
import org.apache.commons.lang3.ClassUtils;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading;
|
||||
|
@ -77,8 +67,6 @@ import org.apache.nifi.processor.SimpleProcessLogger;
|
|||
import org.apache.nifi.processor.StandardProcessorInitializationContext;
|
||||
import org.apache.nifi.processor.StandardValidationContextFactory;
|
||||
import org.apache.nifi.python.PythonBridge;
|
||||
import org.apache.nifi.python.processor.PythonProcessorBridge;
|
||||
import org.apache.nifi.python.processor.PythonProcessorInitializationContext;
|
||||
import org.apache.nifi.registry.flow.FlowRegistryClient;
|
||||
import org.apache.nifi.registry.flow.FlowRegistryClientInitializationContext;
|
||||
import org.apache.nifi.registry.flow.FlowRegistryClientNode;
|
||||
|
@ -94,6 +82,17 @@ import org.apache.nifi.validation.RuleViolationsManager;
|
|||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import javax.net.ssl.SSLContext;
|
||||
import java.lang.reflect.InvocationTargetException;
|
||||
import java.lang.reflect.Proxy;
|
||||
import java.net.URL;
|
||||
import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
public class ExtensionBuilder {
|
||||
private static final Logger logger = LoggerFactory.getLogger(ExtensionBuilder.class);
|
||||
|
||||
|
@ -750,9 +749,9 @@ public class ExtensionBuilder {
|
|||
|
||||
final Processor processor = processorComponent.getComponent();
|
||||
|
||||
final ProcessorInitializationContext initiContext = new StandardProcessorInitializationContext(identifier, processorComponent.getLogger(),
|
||||
final ProcessorInitializationContext initContext = new StandardProcessorInitializationContext(identifier, processorComponent.getLogger(),
|
||||
serviceProvider, nodeTypeProvider, kerberosConfig);
|
||||
processor.initialize(initiContext);
|
||||
processor.initialize(initContext);
|
||||
|
||||
final Bundle bundle = extensionManager.getBundle(bundleCoordinate);
|
||||
verifyControllerServiceReferences(processor, bundle.getClassLoader());
|
||||
|
@ -867,24 +866,14 @@ public class ExtensionBuilder {
|
|||
|
||||
// TODO: This is a hack because there's a bug in the UI that causes it to not load extensions that don't have a `.` in the type.
|
||||
final String processorType = type.startsWith("python.") ? type.substring("python.".length()) : type;
|
||||
final PythonProcessorBridge processorBridge = pythonBridge.createProcessor(identifier, processorType, bundleCoordinate.getVersion(), true);
|
||||
final Processor processor = processorBridge.getProcessorProxy();
|
||||
final Processor processor = pythonBridge.createProcessor(identifier, processorType, bundleCoordinate.getVersion(), true);
|
||||
|
||||
final ComponentLog componentLog = new SimpleProcessLogger(identifier, processor, new StandardLoggingContext(null));
|
||||
final TerminationAwareLogger terminationAwareLogger = new TerminationAwareLogger(componentLog);
|
||||
|
||||
final PythonProcessorInitializationContext initContext = new PythonProcessorInitializationContext() {
|
||||
@Override
|
||||
public String getIdentifier() {
|
||||
return identifier;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ComponentLog getLogger() {
|
||||
return terminationAwareLogger;
|
||||
}
|
||||
};
|
||||
processorBridge.initialize(initContext);
|
||||
final ProcessorInitializationContext initContext = new StandardProcessorInitializationContext(identifier, terminationAwareLogger,
|
||||
serviceProvider, nodeTypeProvider, kerberosConfig);
|
||||
processor.initialize(initContext);
|
||||
|
||||
return new LoggableComponent<>(processor, bundleCoordinate, terminationAwareLogger);
|
||||
} finally {
|
||||
|
|
|
@ -16,29 +16,6 @@
|
|||
*/
|
||||
package org.apache.nifi.nar;
|
||||
|
||||
import java.io.BufferedReader;
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.InputStreamReader;
|
||||
import java.io.Reader;
|
||||
import java.net.URL;
|
||||
import java.net.URLClassLoader;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.Enumeration;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.LinkedHashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.stream.Collectors;
|
||||
import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading;
|
||||
import org.apache.nifi.authentication.LoginIdentityProvider;
|
||||
import org.apache.nifi.authorization.AccessPolicyProvider;
|
||||
|
@ -63,16 +40,12 @@ import org.apache.nifi.flowanalysis.FlowAnalysisRule;
|
|||
import org.apache.nifi.flowfile.FlowFilePrioritizer;
|
||||
import org.apache.nifi.init.ConfigurableComponentInitializer;
|
||||
import org.apache.nifi.init.ConfigurableComponentInitializerFactory;
|
||||
import org.apache.nifi.logging.ComponentLog;
|
||||
import org.apache.nifi.mock.MockComponentLogger;
|
||||
import org.apache.nifi.nar.ExtensionDefinition.ExtensionRuntime;
|
||||
import org.apache.nifi.parameter.ParameterProvider;
|
||||
import org.apache.nifi.processor.Processor;
|
||||
import org.apache.nifi.provenance.ProvenanceRepository;
|
||||
import org.apache.nifi.python.PythonBridge;
|
||||
import org.apache.nifi.python.PythonProcessorDetails;
|
||||
import org.apache.nifi.python.processor.PythonProcessorBridge;
|
||||
import org.apache.nifi.python.processor.PythonProcessorInitializationContext;
|
||||
import org.apache.nifi.registry.flow.FlowRegistryClient;
|
||||
import org.apache.nifi.reporting.InitializationException;
|
||||
import org.apache.nifi.reporting.ReportingTask;
|
||||
|
@ -80,6 +53,30 @@ import org.apache.nifi.util.StringUtils;
|
|||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.BufferedReader;
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.InputStreamReader;
|
||||
import java.io.Reader;
|
||||
import java.net.URL;
|
||||
import java.net.URLClassLoader;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.Enumeration;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.LinkedHashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* Scans through the classpath to load all FlowFileProcessors, FlowFileComparators, and ReportingTasks using the service provider API and running through all classloaders (root, NARs).
|
||||
*
|
||||
|
@ -711,23 +708,7 @@ public class StandardExtensionDiscoveringManager implements ExtensionDiscovering
|
|||
final String type = classType.startsWith(PYTHON_TYPE_PREFIX) ? classType.substring(PYTHON_TYPE_PREFIX.length()) : classType;
|
||||
|
||||
final String procId = "temp-component-" + type;
|
||||
final PythonProcessorBridge processorBridge = pythonBridge.createProcessor(procId, type, bundleCoordinate.getVersion(), false);
|
||||
tempComponent = processorBridge.getProcessorProxy();
|
||||
|
||||
final ComponentLog componentLog = new MockComponentLogger();
|
||||
final PythonProcessorInitializationContext initContext = new PythonProcessorInitializationContext() {
|
||||
@Override
|
||||
public String getIdentifier() {
|
||||
return procId;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ComponentLog getLogger() {
|
||||
return componentLog;
|
||||
}
|
||||
};
|
||||
|
||||
processorBridge.initialize(initContext);
|
||||
tempComponent = pythonBridge.createProcessor(procId, type, bundleCoordinate.getVersion(), false);
|
||||
} else {
|
||||
final Class<?> componentClass = Class.forName(classType, true, bundleClassLoader);
|
||||
tempComponent = (ConfigurableComponent) componentClass.getDeclaredConstructor().newInstance();
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
|
||||
package org.apache.nifi.py4j;
|
||||
|
||||
import org.apache.nifi.components.AsyncLoadedProcessor;
|
||||
import org.apache.nifi.python.BoundObjectCounts;
|
||||
import org.apache.nifi.python.ControllerServiceTypeLookup;
|
||||
import org.apache.nifi.python.PythonBridge;
|
||||
|
@ -24,8 +25,12 @@ import org.apache.nifi.python.PythonBridgeInitializationContext;
|
|||
import org.apache.nifi.python.PythonController;
|
||||
import org.apache.nifi.python.PythonProcessConfig;
|
||||
import org.apache.nifi.python.PythonProcessorDetails;
|
||||
import org.apache.nifi.python.processor.FlowFileTransform;
|
||||
import org.apache.nifi.python.processor.FlowFileTransformProxy;
|
||||
import org.apache.nifi.python.processor.PythonProcessorAdapter;
|
||||
import org.apache.nifi.python.processor.PythonProcessorBridge;
|
||||
import org.apache.nifi.python.processor.RecordTransform;
|
||||
import org.apache.nifi.python.processor.RecordTransformProxy;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
@ -38,6 +43,7 @@ import java.util.List;
|
|||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.function.Supplier;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
public class StandardPythonBridge implements PythonBridge {
|
||||
|
@ -89,8 +95,7 @@ public class StandardPythonBridge implements PythonBridge {
|
|||
controllerProcess.getController().discoverExtensions(extensionsDirs, workDirPath);
|
||||
}
|
||||
|
||||
@Override
|
||||
public PythonProcessorBridge createProcessor(final String identifier, final String type, final String version, final boolean preferIsolatedProcess) {
|
||||
private PythonProcessorBridge createProcessorBridge(final String identifier, final String type, final String version, final boolean preferIsolatedProcess) {
|
||||
ensureStarted();
|
||||
|
||||
logger.debug("Creating Python Processor of type {}", type);
|
||||
|
@ -127,6 +132,25 @@ public class StandardPythonBridge implements PythonBridge {
|
|||
return processorBridge;
|
||||
}
|
||||
|
||||
@Override
|
||||
public AsyncLoadedProcessor createProcessor(final String identifier, final String type, final String version, final boolean preferIsolatedProcess) {
|
||||
final PythonProcessorDetails processorDetails = getProcessorTypes().stream()
|
||||
.filter(details -> details.getProcessorType().equals(type))
|
||||
.findFirst()
|
||||
.orElseThrow(() -> new IllegalArgumentException("Unknown Python Processor type: " + type));
|
||||
|
||||
final String implementedInterface = processorDetails.getInterface();
|
||||
final Supplier<PythonProcessorBridge> processorBridgeFactory = () -> createProcessorBridge(identifier, type, version, preferIsolatedProcess);
|
||||
|
||||
if (FlowFileTransform.class.getName().equals(implementedInterface)) {
|
||||
return new FlowFileTransformProxy(type, processorBridgeFactory);
|
||||
}
|
||||
if (RecordTransform.class.getName().equals(implementedInterface)) {
|
||||
return new RecordTransformProxy(type, processorBridgeFactory);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void onProcessorRemoved(final String identifier, final String type, final String version) {
|
||||
final ExtensionId extensionId = new ExtensionId(type, version);
|
||||
|
|
|
@ -17,24 +17,16 @@
|
|||
|
||||
package org.apache.nifi.py4j;
|
||||
|
||||
import org.apache.nifi.processor.Processor;
|
||||
import org.apache.nifi.python.PythonController;
|
||||
import org.apache.nifi.python.PythonProcessorDetails;
|
||||
import org.apache.nifi.python.processor.FlowFileTransform;
|
||||
import org.apache.nifi.python.processor.FlowFileTransformProxy;
|
||||
import org.apache.nifi.python.processor.PythonProcessorAdapter;
|
||||
import org.apache.nifi.python.processor.PythonProcessorBridge;
|
||||
import org.apache.nifi.python.processor.PythonProcessorInitializationContext;
|
||||
import org.apache.nifi.python.processor.PythonProcessorProxy;
|
||||
import org.apache.nifi.python.processor.RecordTransform;
|
||||
import org.apache.nifi.python.processor.RecordTransformProxy;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.File;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.Future;
|
||||
|
||||
import static org.apache.nifi.components.AsyncLoadedProcessor.LoadState;
|
||||
|
||||
|
@ -45,7 +37,6 @@ public class StandardPythonProcessorBridge implements PythonProcessorBridge {
|
|||
private final ProcessorCreationWorkflow creationWorkflow;
|
||||
private final PythonProcessorDetails processorDetails;
|
||||
private volatile PythonProcessorAdapter adapter;
|
||||
private final PythonProcessorProxy proxy;
|
||||
private final File workingDir;
|
||||
private final File moduleFile;
|
||||
private volatile long lastModified;
|
||||
|
@ -60,8 +51,6 @@ public class StandardPythonProcessorBridge implements PythonProcessorBridge {
|
|||
this.workingDir = builder.workDir;
|
||||
this.moduleFile = builder.moduleFile;
|
||||
this.lastModified = this.moduleFile.lastModified();
|
||||
|
||||
this.proxy = createProxy();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -70,46 +59,35 @@ public class StandardPythonProcessorBridge implements PythonProcessorBridge {
|
|||
}
|
||||
|
||||
@Override
|
||||
public Future<Void> initialize(final PythonProcessorInitializationContext context) {
|
||||
public void initialize(final PythonProcessorInitializationContext context) {
|
||||
this.initializationContext = context;
|
||||
|
||||
final String threadName = "Initialize Python Processor %s (%s)".formatted(initializationContext.getIdentifier(), getProcessorType());
|
||||
final CompletableFuture<Void> future = new CompletableFuture<>();
|
||||
|
||||
Thread.ofVirtual().name(threadName).start(() -> initializePythonSide(future));
|
||||
return future;
|
||||
Thread.ofVirtual().name(threadName).start(this::initializePythonSide);
|
||||
}
|
||||
|
||||
public LoadState getLoadState() {
|
||||
return loadState;
|
||||
}
|
||||
|
||||
private void initializePythonSide(final CompletableFuture<Void> future) {
|
||||
private void initializePythonSide() {
|
||||
try {
|
||||
try {
|
||||
creationWorkflow.downloadDependencies();
|
||||
loadState = LoadState.LOADING_PROCESSOR_CODE;
|
||||
} catch (final Exception e) {
|
||||
loadState = LoadState.DEPENDENCY_DOWNLOAD_FAILED;
|
||||
throw e;
|
||||
}
|
||||
creationWorkflow.downloadDependencies();
|
||||
loadState = LoadState.LOADING_PROCESSOR_CODE;
|
||||
} catch (final Exception e) {
|
||||
loadState = LoadState.DEPENDENCY_DOWNLOAD_FAILED;
|
||||
throw e;
|
||||
}
|
||||
|
||||
final PythonProcessorAdapter pythonProcessorAdapter;
|
||||
try {
|
||||
pythonProcessorAdapter = creationWorkflow.createProcessor();
|
||||
pythonProcessorAdapter.initialize(initializationContext);
|
||||
this.adapter = pythonProcessorAdapter;
|
||||
this.proxy.onPythonSideInitialized(pythonProcessorAdapter);
|
||||
|
||||
loadState = LoadState.FINISHED_LOADING;
|
||||
} catch (final Exception e) {
|
||||
loadState = LoadState.LOADING_PROCESSOR_CODE_FAILED;
|
||||
throw e;
|
||||
}
|
||||
|
||||
future.complete(null);
|
||||
} catch (final Throwable t) {
|
||||
future.completeExceptionally(t);
|
||||
final PythonProcessorAdapter pythonProcessorAdapter;
|
||||
try {
|
||||
pythonProcessorAdapter = creationWorkflow.createProcessor();
|
||||
pythonProcessorAdapter.initialize(initializationContext);
|
||||
this.adapter = pythonProcessorAdapter;
|
||||
loadState = LoadState.FINISHED_LOADING;
|
||||
} catch (final Exception e) {
|
||||
loadState = LoadState.LOADING_PROCESSOR_CODE_FAILED;
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -118,11 +96,6 @@ public class StandardPythonProcessorBridge implements PythonProcessorBridge {
|
|||
return processorDetails.getProcessorType();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Processor getProcessorProxy() {
|
||||
return proxy;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean reload() {
|
||||
if (moduleFile.lastModified() <= lastModified) {
|
||||
|
@ -131,24 +104,12 @@ public class StandardPythonProcessorBridge implements PythonProcessorBridge {
|
|||
}
|
||||
|
||||
controller.reloadProcessor(getProcessorType(), processorDetails.getProcessorVersion(), workingDir.getAbsolutePath());
|
||||
initializePythonSide(new CompletableFuture<>());
|
||||
initializePythonSide();
|
||||
lastModified = moduleFile.lastModified();
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
private PythonProcessorProxy createProxy() {
|
||||
final String implementedInterface = processorDetails.getInterface();
|
||||
if (FlowFileTransform.class.getName().equals(implementedInterface)) {
|
||||
return new FlowFileTransformProxy(this);
|
||||
}
|
||||
if (RecordTransform.class.getName().equals(implementedInterface)) {
|
||||
return new RecordTransformProxy(this);
|
||||
}
|
||||
|
||||
throw new IllegalArgumentException("Python Processor does not implement any of the valid interfaces. Interface implemented: " + implementedInterface);
|
||||
}
|
||||
|
||||
|
||||
public static class Builder {
|
||||
private PythonController controller;
|
||||
|
|
|
@ -29,21 +29,20 @@ import py4j.Py4JNetworkException;
|
|||
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
@InputRequirement(Requirement.INPUT_REQUIRED)
|
||||
public class FlowFileTransformProxy extends PythonProcessorProxy {
|
||||
|
||||
private final PythonProcessorBridge bridge;
|
||||
private volatile FlowFileTransform transform;
|
||||
|
||||
public FlowFileTransformProxy(final PythonProcessorBridge bridge) {
|
||||
super(bridge);
|
||||
this.bridge = bridge;
|
||||
public FlowFileTransformProxy(final String processorType, final Supplier<PythonProcessorBridge> bridgeFactory) {
|
||||
super(processorType, bridgeFactory);
|
||||
}
|
||||
|
||||
|
||||
@OnScheduled
|
||||
public void setContext(final ProcessContext context) {
|
||||
final PythonProcessorBridge bridge = getBridge().orElseThrow(() -> new IllegalStateException(this + " is not finished initializing"));
|
||||
final Optional<PythonProcessorAdapter> optionalAdapter = bridge.getProcessorAdapter();
|
||||
if (optionalAdapter.isEmpty()) {
|
||||
throw new IllegalStateException(this + " is not finished initializing");
|
||||
|
|
|
@ -26,8 +26,10 @@ import org.apache.nifi.components.AsyncLoadedProcessor;
|
|||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.components.ValidationContext;
|
||||
import org.apache.nifi.components.ValidationResult;
|
||||
import org.apache.nifi.logging.ComponentLog;
|
||||
import org.apache.nifi.processor.AbstractProcessor;
|
||||
import org.apache.nifi.processor.ProcessContext;
|
||||
import org.apache.nifi.processor.ProcessorInitializationContext;
|
||||
import org.apache.nifi.processor.Relationship;
|
||||
|
||||
import java.util.Collection;
|
||||
|
@ -38,15 +40,18 @@ import java.util.List;
|
|||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
@SupportsBatching(defaultDuration = DefaultRunDuration.TWENTY_FIVE_MILLIS)
|
||||
@SupportsSensitiveDynamicProperties
|
||||
public abstract class PythonProcessorProxy extends AbstractProcessor implements AsyncLoadedProcessor {
|
||||
private final PythonProcessorBridge bridge;
|
||||
private final String processorType;
|
||||
private volatile PythonProcessorInitializationContext initContext;
|
||||
private volatile PythonProcessorBridge bridge;
|
||||
private volatile Set<Relationship> cachedRelationships = null;
|
||||
private volatile List<PropertyDescriptor> cachedPropertyDescriptors = null;
|
||||
private volatile Map<String, PropertyDescriptor> cachedDynamicDescriptors = null;
|
||||
private volatile boolean supportsDynamicProperties;
|
||||
private volatile Boolean supportsDynamicProperties;
|
||||
|
||||
protected static final Relationship REL_ORIGINAL = new Relationship.Builder()
|
||||
.name("original")
|
||||
|
@ -61,16 +66,55 @@ public abstract class PythonProcessorProxy extends AbstractProcessor implements
|
|||
REL_ORIGINAL,
|
||||
REL_FAILURE);
|
||||
|
||||
public PythonProcessorProxy(final PythonProcessorBridge bridge) {
|
||||
this.bridge = bridge;
|
||||
public PythonProcessorProxy(final String processorType, final Supplier<PythonProcessorBridge> bridgeFactory) {
|
||||
this.processorType = processorType;
|
||||
|
||||
Thread.ofVirtual().name("Initialize " + processorType).start(() -> {
|
||||
this.bridge = bridgeFactory.get();
|
||||
|
||||
// If initialization context has already been set, initialize bridge.
|
||||
final PythonProcessorInitializationContext pythonInitContext = initContext;
|
||||
if (pythonInitContext != null) {
|
||||
this.bridge.initialize(pythonInitContext);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
public void onPythonSideInitialized(final PythonProcessorAdapter adapter) {
|
||||
supportsDynamicProperties = adapter.isDynamicPropertySupported();
|
||||
@Override
|
||||
protected void init(final ProcessorInitializationContext context) {
|
||||
super.init(context);
|
||||
|
||||
final PythonProcessorInitializationContext initContext = new PythonProcessorInitializationContext() {
|
||||
@Override
|
||||
public String getIdentifier() {
|
||||
return context.getIdentifier();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ComponentLog getLogger() {
|
||||
return context.getLogger();
|
||||
}
|
||||
};
|
||||
|
||||
this.initContext = initContext;
|
||||
|
||||
// If Bridge has already been set, initialize it.
|
||||
final PythonProcessorBridge bridge = this.bridge;
|
||||
if (bridge != null) {
|
||||
bridge.initialize(initContext);
|
||||
}
|
||||
}
|
||||
|
||||
protected Optional<PythonProcessorBridge> getBridge() {
|
||||
return Optional.ofNullable(this.bridge);
|
||||
}
|
||||
|
||||
@Override
|
||||
public LoadState getState() {
|
||||
if (bridge == null) {
|
||||
return LoadState.INITIALIZING_ENVIRONMENT;
|
||||
}
|
||||
|
||||
return bridge.getLoadState();
|
||||
}
|
||||
|
||||
|
@ -80,6 +124,10 @@ public abstract class PythonProcessorProxy extends AbstractProcessor implements
|
|||
return this.cachedPropertyDescriptors;
|
||||
}
|
||||
|
||||
if (bridge == null) {
|
||||
return Collections.emptyList();
|
||||
}
|
||||
|
||||
final Optional<PythonProcessorAdapter> optionalAdapter = bridge.getProcessorAdapter();
|
||||
if (optionalAdapter.isEmpty()) {
|
||||
// If we don't have the adapter yet, use whatever is cached, even if it's old, or an empty List if we have nothing cached.
|
||||
|
@ -99,6 +147,14 @@ public abstract class PythonProcessorProxy extends AbstractProcessor implements
|
|||
|
||||
@Override
|
||||
protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
|
||||
if (bridge == null) {
|
||||
return List.of(new ValidationResult.Builder()
|
||||
.subject("Processor")
|
||||
.explanation("Python environment is not yet initialized")
|
||||
.valid(false)
|
||||
.build());
|
||||
}
|
||||
|
||||
final LoadState loadState = bridge.getLoadState();
|
||||
if (loadState == LoadState.LOADING_PROCESSOR_CODE || loadState == LoadState.DOWNLOADING_DEPENDENCIES) {
|
||||
return List.of(new ValidationResult.Builder()
|
||||
|
@ -144,6 +200,10 @@ public abstract class PythonProcessorProxy extends AbstractProcessor implements
|
|||
return cachedDynamicDescriptors.get(propertyDescriptorName);
|
||||
}
|
||||
|
||||
if (bridge == null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
try {
|
||||
final Optional<PythonProcessorAdapter> optionalAdapter = bridge.getProcessorAdapter();
|
||||
return optionalAdapter.map(adapter -> adapter.getSupportedDynamicPropertyDescriptor(propertyDescriptorName))
|
||||
|
@ -155,7 +215,18 @@ public abstract class PythonProcessorProxy extends AbstractProcessor implements
|
|||
}
|
||||
|
||||
protected boolean isSupportsDynamicPropertyDescriptor() {
|
||||
return supportsDynamicProperties;
|
||||
if (this.supportsDynamicProperties != null) {
|
||||
return supportsDynamicProperties;
|
||||
}
|
||||
|
||||
if (bridge == null) {
|
||||
return false;
|
||||
}
|
||||
|
||||
final Optional<PythonProcessorAdapter> adapter = bridge.getProcessorAdapter();
|
||||
final boolean supported = adapter.map(PythonProcessorAdapter::isDynamicPropertySupported).orElse(false);
|
||||
supportsDynamicProperties = supported;
|
||||
return supported;
|
||||
}
|
||||
|
||||
@OnScheduled
|
||||
|
@ -193,6 +264,10 @@ public abstract class PythonProcessorProxy extends AbstractProcessor implements
|
|||
}
|
||||
|
||||
private Set<Relationship> fetchRelationshipsFromPythonProcessor() {
|
||||
if (bridge == null) {
|
||||
return Collections.emptySet();
|
||||
}
|
||||
|
||||
Set<Relationship> processorRelationships;
|
||||
try {
|
||||
processorRelationships = bridge.getProcessorAdapter()
|
||||
|
@ -210,6 +285,10 @@ public abstract class PythonProcessorProxy extends AbstractProcessor implements
|
|||
|
||||
@OnScheduled
|
||||
public void onScheduled(final ProcessContext context) {
|
||||
if (bridge == null) {
|
||||
throw new IllegalStateException("Processor is not yet initialized");
|
||||
}
|
||||
|
||||
reload();
|
||||
bridge.getProcessorAdapter()
|
||||
.orElseThrow(() -> new IllegalStateException("Processor has not finished initializing"))
|
||||
|
@ -218,6 +297,10 @@ public abstract class PythonProcessorProxy extends AbstractProcessor implements
|
|||
|
||||
@OnStopped
|
||||
public void onStopped(final ProcessContext context) {
|
||||
if (bridge == null) {
|
||||
throw new IllegalStateException("Processor is not yet initialized");
|
||||
}
|
||||
|
||||
bridge.getProcessorAdapter()
|
||||
.orElseThrow(() -> new IllegalStateException("Processor has not finished initializing"))
|
||||
.onStopped(context);
|
||||
|
@ -225,10 +308,14 @@ public abstract class PythonProcessorProxy extends AbstractProcessor implements
|
|||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "PythonProcessor[type=" + bridge.getProcessorType() + ", id=" + getIdentifier() + "]";
|
||||
return "PythonProcessor[type=" + processorType + ", id=" + getIdentifier() + "]";
|
||||
}
|
||||
|
||||
private void reload() {
|
||||
if (bridge == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
final boolean reloaded = bridge.reload();
|
||||
if (reloaded) {
|
||||
getLogger().info("Successfully reloaded Processor");
|
||||
|
|
|
@ -56,10 +56,10 @@ import java.util.List;
|
|||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
@InputRequirement(Requirement.INPUT_REQUIRED)
|
||||
public class RecordTransformProxy extends PythonProcessorProxy {
|
||||
private final PythonProcessorBridge bridge;
|
||||
private volatile RecordTransform transform;
|
||||
|
||||
static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
|
||||
|
@ -78,9 +78,8 @@ public class RecordTransformProxy extends PythonProcessorProxy {
|
|||
.build();
|
||||
|
||||
|
||||
public RecordTransformProxy(final PythonProcessorBridge bridge) {
|
||||
super(bridge);
|
||||
this.bridge = bridge;
|
||||
public RecordTransformProxy(final String processorType, final Supplier<PythonProcessorBridge> bridgeFactory) {
|
||||
super(processorType, bridgeFactory);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -94,6 +93,8 @@ public class RecordTransformProxy extends PythonProcessorProxy {
|
|||
|
||||
@OnScheduled
|
||||
public void setProcessContext(final ProcessContext context) {
|
||||
final PythonProcessorBridge bridge = getBridge().orElseThrow(() -> new IllegalStateException(this + " is not finished initializing"));
|
||||
|
||||
final Optional<PythonProcessorAdapter> adapterOptional = bridge.getProcessorAdapter();
|
||||
if (adapterOptional.isEmpty()) {
|
||||
throw new IllegalStateException(this + " is not finished initializing");
|
||||
|
|
|
@ -82,7 +82,17 @@
|
|||
<artifactId>nifi-record-serialization-service-api</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-record-serialization-services</artifactId>
|
||||
<version>2.0.0-SNAPSHOT</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-schema-registry-service-api</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
<build>
|
||||
|
|
|
@ -17,34 +17,28 @@
|
|||
|
||||
package org.apache.nifi.py4j;
|
||||
|
||||
import org.apache.nifi.components.AsyncLoadedProcessor;
|
||||
import org.apache.nifi.components.AsyncLoadedProcessor.LoadState;
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.components.Validator;
|
||||
import org.apache.nifi.controller.AbstractControllerService;
|
||||
import org.apache.nifi.controller.ControllerService;
|
||||
import org.apache.nifi.logging.ComponentLog;
|
||||
import org.apache.nifi.mock.MockComponentLogger;
|
||||
import org.apache.nifi.json.JsonRecordSetWriter;
|
||||
import org.apache.nifi.json.JsonTreeReader;
|
||||
import org.apache.nifi.mock.MockProcessorInitializationContext;
|
||||
import org.apache.nifi.processor.ProcessContext;
|
||||
import org.apache.nifi.processor.util.StandardValidators;
|
||||
import org.apache.nifi.processor.Processor;
|
||||
import org.apache.nifi.processor.ProcessorInitializationContext;
|
||||
import org.apache.nifi.python.ControllerServiceTypeLookup;
|
||||
import org.apache.nifi.python.PythonBridge;
|
||||
import org.apache.nifi.python.PythonBridgeInitializationContext;
|
||||
import org.apache.nifi.python.PythonProcessConfig;
|
||||
import org.apache.nifi.python.PythonProcessorDetails;
|
||||
import org.apache.nifi.python.processor.EmptyAttributeMap;
|
||||
import org.apache.nifi.python.processor.FlowFileTransformProxy;
|
||||
import org.apache.nifi.python.processor.PythonProcessorBridge;
|
||||
import org.apache.nifi.python.processor.PythonProcessorInitializationContext;
|
||||
import org.apache.nifi.python.processor.RecordTransform;
|
||||
import org.apache.nifi.python.processor.RecordTransformResult;
|
||||
import org.apache.nifi.reporting.InitializationException;
|
||||
import org.apache.nifi.serialization.SimpleRecordSchema;
|
||||
import org.apache.nifi.serialization.record.DataType;
|
||||
import org.apache.nifi.serialization.record.MapRecord;
|
||||
import org.apache.nifi.serialization.record.Record;
|
||||
import org.apache.nifi.serialization.record.RecordField;
|
||||
import org.apache.nifi.serialization.record.RecordFieldType;
|
||||
import org.apache.nifi.serialization.record.RecordSchema;
|
||||
import org.apache.nifi.serialization.record.util.DataTypeUtils;
|
||||
import org.apache.nifi.util.MockFlowFile;
|
||||
import org.apache.nifi.util.MockPropertyValue;
|
||||
import org.apache.nifi.util.TestRunner;
|
||||
|
@ -54,7 +48,6 @@ import org.junit.jupiter.api.AfterEach;
|
|||
import org.junit.jupiter.api.BeforeAll;
|
||||
import org.junit.jupiter.api.Disabled;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.mockito.ArgumentCaptor;
|
||||
import org.mockito.Mockito;
|
||||
import org.mockito.invocation.InvocationOnMock;
|
||||
import org.mockito.stubbing.Answer;
|
||||
|
@ -74,13 +67,10 @@ import java.util.List;
|
|||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertNotNull;
|
||||
import static org.junit.jupiter.api.Assertions.assertNull;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
@ -166,7 +156,7 @@ public class PythonControllerInteractionIT {
|
|||
final List<PythonProcessorDetails> extensionDetails = bridge.getProcessorTypes();
|
||||
final List<String> types = extensionDetails.stream()
|
||||
.map(PythonProcessorDetails::getProcessorType)
|
||||
.collect(Collectors.toList());
|
||||
.toList();
|
||||
|
||||
assertTrue(types.contains(PRETTY_PRINT_JSON));
|
||||
assertTrue(types.contains("ConvertCsvToExcel"));
|
||||
|
@ -186,10 +176,7 @@ public class PythonControllerInteractionIT {
|
|||
// Create a PrettyPrintJson Processor
|
||||
final byte[] jsonContent = Files.readAllBytes(Paths.get("src/test/resources/json/input/simple-person.json"));
|
||||
for (int i=0; i < 3; i++) {
|
||||
final PythonProcessorBridge prettyPrintJson = createProcessor(PRETTY_PRINT_JSON);
|
||||
assertNotNull(prettyPrintJson);
|
||||
|
||||
final FlowFileTransformProxy wrapper = new FlowFileTransformProxy(prettyPrintJson);
|
||||
final FlowFileTransformProxy wrapper = createFlowFileTransform(PRETTY_PRINT_JSON);
|
||||
final TestRunner runner = TestRunners.newTestRunner(wrapper);
|
||||
|
||||
runner.enqueue(jsonContent);
|
||||
|
@ -203,11 +190,7 @@ public class PythonControllerInteractionIT {
|
|||
@Disabled("Just for manual testing...")
|
||||
public void runPrettyPrintJsonManyThreads() throws IOException {
|
||||
// Create a PrettyPrintJson Processor
|
||||
final PythonProcessorBridge prettyPrintJson = createProcessor(PRETTY_PRINT_JSON);
|
||||
assertNotNull(prettyPrintJson);
|
||||
|
||||
// Setup
|
||||
final FlowFileTransformProxy wrapper = new FlowFileTransformProxy(prettyPrintJson);
|
||||
final FlowFileTransformProxy wrapper = createFlowFileTransform(PRETTY_PRINT_JSON);
|
||||
final TestRunner runner = TestRunners.newTestRunner(wrapper);
|
||||
|
||||
final int flowFileCount = 100_000;
|
||||
|
@ -226,12 +209,8 @@ public class PythonControllerInteractionIT {
|
|||
|
||||
@Test
|
||||
public void testSimplePrettyPrint() throws IOException {
|
||||
// Discover extensions so that they can be created
|
||||
final PythonProcessorBridge prettyPrintJson = createProcessor(PRETTY_PRINT_JSON);
|
||||
assertNotNull(prettyPrintJson);
|
||||
|
||||
// Setup
|
||||
final FlowFileTransformProxy wrapper = new FlowFileTransformProxy(prettyPrintJson);
|
||||
final FlowFileTransformProxy wrapper = createFlowFileTransform(PRETTY_PRINT_JSON);
|
||||
final TestRunner runner = TestRunners.newTestRunner(wrapper);
|
||||
runner.enqueue(Paths.get("src/test/resources/json/input/simple-person.json"));
|
||||
runner.setProperty("Indentation", "2");
|
||||
|
@ -250,8 +229,7 @@ public class PythonControllerInteractionIT {
|
|||
|
||||
@Test
|
||||
public void testValidator() {
|
||||
final PythonProcessorBridge prettyPrintJson = createProcessor(PRETTY_PRINT_JSON);
|
||||
final FlowFileTransformProxy wrapper = new FlowFileTransformProxy(prettyPrintJson);
|
||||
final FlowFileTransformProxy wrapper = createFlowFileTransform(PRETTY_PRINT_JSON);
|
||||
final TestRunner runner = TestRunners.newTestRunner(wrapper);
|
||||
|
||||
runner.setProperty("Indentation", "-1");
|
||||
|
@ -273,11 +251,7 @@ public class PythonControllerInteractionIT {
|
|||
@Test
|
||||
public void testCsvToExcel() {
|
||||
// Create a PrettyPrintJson Processor
|
||||
final PythonProcessorBridge csvToExcel = createProcessor("ConvertCsvToExcel");
|
||||
assertNotNull(csvToExcel);
|
||||
|
||||
// Setup
|
||||
final FlowFileTransformProxy wrapper = new FlowFileTransformProxy(csvToExcel);
|
||||
final FlowFileTransformProxy wrapper = createFlowFileTransform("ConvertCsvToExcel");
|
||||
final TestRunner runner = TestRunners.newTestRunner(wrapper);
|
||||
runner.enqueue("name, number\nJohn Doe, 500");
|
||||
|
||||
|
@ -289,11 +263,8 @@ public class PythonControllerInteractionIT {
|
|||
|
||||
@Test
|
||||
public void testExpressionLanguageWithAttributes() {
|
||||
final PythonProcessorBridge writeProperty = createProcessor("WritePropertyToFlowFile");
|
||||
assertNotNull(writeProperty);
|
||||
|
||||
// Setup
|
||||
final FlowFileTransformProxy wrapper = new FlowFileTransformProxy(writeProperty);
|
||||
final FlowFileTransformProxy wrapper = createFlowFileTransform("WritePropertyToFlowFile");
|
||||
final TestRunner runner = TestRunners.newTestRunner(wrapper);
|
||||
runner.setProperty("Message", "Hola Mundo");
|
||||
runner.enqueue("Hello World");
|
||||
|
@ -308,11 +279,7 @@ public class PythonControllerInteractionIT {
|
|||
@Test
|
||||
public void testPythonPackage() {
|
||||
// Create a WriteNumber Processor
|
||||
final PythonProcessorBridge procBridge = createProcessor("WriteNumber");
|
||||
assertNotNull(procBridge);
|
||||
|
||||
// Setup
|
||||
final FlowFileTransformProxy wrapper = new FlowFileTransformProxy(procBridge);
|
||||
final FlowFileTransformProxy wrapper = createFlowFileTransform("WriteNumber");
|
||||
final TestRunner runner = TestRunners.newTestRunner(wrapper);
|
||||
runner.enqueue("");
|
||||
|
||||
|
@ -326,6 +293,14 @@ public class PythonControllerInteractionIT {
|
|||
assertTrue(resultNum <= 1000);
|
||||
}
|
||||
|
||||
private FlowFileTransformProxy createFlowFileTransform(final String type) {
|
||||
final Processor processor = createProcessor(type);
|
||||
assertNotNull(processor);
|
||||
|
||||
processor.initialize(new MockProcessorInitializationContext());
|
||||
return (FlowFileTransformProxy) processor;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testImportRequirements() {
|
||||
// Discover extensions so that they can be created
|
||||
|
@ -340,12 +315,8 @@ public class PythonControllerInteractionIT {
|
|||
assertEquals(1, dependencies.size());
|
||||
assertEquals("numpy==1.25.0", dependencies.get(0));
|
||||
|
||||
// Create a PrettyPrintJson Processor
|
||||
final PythonProcessorBridge writeNumPyVersion = createProcessor("WriteNumpyVersion");
|
||||
assertNotNull(writeNumPyVersion);
|
||||
|
||||
// Setup
|
||||
final FlowFileTransformProxy wrapper = new FlowFileTransformProxy(writeNumPyVersion);
|
||||
final FlowFileTransformProxy wrapper = createFlowFileTransform("WriteNumpyVersion");
|
||||
final TestRunner runner = TestRunners.newTestRunner(wrapper);
|
||||
runner.enqueue("Hello World");
|
||||
|
||||
|
@ -359,13 +330,9 @@ public class PythonControllerInteractionIT {
|
|||
|
||||
@Test
|
||||
public void testControllerService() throws InitializationException {
|
||||
final PythonProcessorBridge processor = createProcessor("LookupAddress");
|
||||
assertNotNull(processor);
|
||||
|
||||
controllerServiceMap.put("StringLookupService", TestLookupService.class);
|
||||
|
||||
// Setup
|
||||
final FlowFileTransformProxy wrapper = new FlowFileTransformProxy(processor);
|
||||
controllerServiceMap.put("StringLookupService", TestLookupService.class);
|
||||
final FlowFileTransformProxy wrapper = createFlowFileTransform("LookupAddress");
|
||||
final TestRunner runner = TestRunners.newTestRunner(wrapper);
|
||||
final StringLookupService lookupService = new TestLookupService((Collections.singletonMap("John Doe", "123 My Street")));
|
||||
runner.addControllerService("lookup", lookupService);
|
||||
|
@ -392,13 +359,8 @@ public class PythonControllerInteractionIT {
|
|||
// Ensure that we started with "Hello, World" because if the test is run multiple times, we may already be starting with the modified version
|
||||
replaceFileText(sourceFile, replacement, originalMessage);
|
||||
|
||||
// Create a PrettyPrintJson Processor
|
||||
final PythonProcessorBridge processor = createProcessor("WriteMessage");
|
||||
processor.initialize(createInitContext());
|
||||
assertNotNull(processor);
|
||||
|
||||
// Setup
|
||||
final FlowFileTransformProxy wrapper = new FlowFileTransformProxy(processor);
|
||||
final FlowFileTransformProxy wrapper = createFlowFileTransform("WriteMessage");
|
||||
final TestRunner runner = TestRunners.newTestRunner(wrapper);
|
||||
runner.enqueue("");
|
||||
|
||||
|
@ -465,8 +427,7 @@ public class PythonControllerInteractionIT {
|
|||
assertEquals(1, v2Count);
|
||||
|
||||
// Create a WriteMessage Processor, version 0.0.1-SNAPSHOT
|
||||
final PythonProcessorBridge procV1 = createProcessor("WriteMessage");
|
||||
final FlowFileTransformProxy wrapperV1 = new FlowFileTransformProxy(procV1);
|
||||
final FlowFileTransformProxy wrapperV1 = createFlowFileTransform("WriteMessage");
|
||||
final TestRunner runnerV1 = TestRunners.newTestRunner(wrapperV1);
|
||||
runnerV1.enqueue("");
|
||||
|
||||
|
@ -477,9 +438,8 @@ public class PythonControllerInteractionIT {
|
|||
runnerV1.getFlowFilesForRelationship("success").get(0).assertContentEquals("Hello, World");
|
||||
|
||||
// Create an instance of WriteMessage V2
|
||||
final PythonProcessorBridge procV2 = createProcessor("WriteMessage", "0.0.2-SNAPSHOT");
|
||||
final FlowFileTransformProxy wrapperV2 = new FlowFileTransformProxy(procV2);
|
||||
final TestRunner runnerV2 = TestRunners.newTestRunner(wrapperV2);
|
||||
final Processor procV2 = createProcessor("WriteMessage", "0.0.2-SNAPSHOT");
|
||||
final TestRunner runnerV2 = TestRunners.newTestRunner(procV2);
|
||||
runnerV2.enqueue("");
|
||||
|
||||
// Trigger the processor
|
||||
|
@ -490,40 +450,23 @@ public class PythonControllerInteractionIT {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testRecordTransformWithDynamicProperties() {
|
||||
// Create a PrettyPrintJson Processor
|
||||
final PythonProcessorBridge processor = createProcessor("SetRecordField");
|
||||
assertNotNull(processor);
|
||||
|
||||
// Mock out ProcessContext to reflect that the processor should set the 'name' field to 'Jane Doe'
|
||||
final PropertyDescriptor nameDescriptor = new PropertyDescriptor.Builder()
|
||||
.name("name")
|
||||
.dynamic(true)
|
||||
.addValidator(Validator.VALID)
|
||||
.build();
|
||||
final PropertyDescriptor numberDescriptor = new PropertyDescriptor.Builder()
|
||||
.name("number")
|
||||
.dynamic(true)
|
||||
.addValidator(StandardValidators.INTEGER_VALIDATOR)
|
||||
.build();
|
||||
|
||||
final Map<PropertyDescriptor, String> propertyMap = new HashMap<>();
|
||||
propertyMap.put(nameDescriptor, "Jane Doe");
|
||||
propertyMap.put(numberDescriptor, "8");
|
||||
|
||||
final ProcessContext context = createContext(propertyMap);
|
||||
public void testRecordTransformWithDynamicProperties() throws InitializationException {
|
||||
// Create a SetRecordField Processor
|
||||
final TestRunner runner = createRecordTransformRunner("SetRecordField");
|
||||
runner.setProperty("name", "Jane Doe");
|
||||
runner.setProperty("number", "8");
|
||||
|
||||
// Create a Record to transform and transform it
|
||||
final String json = "[{ \"name\": \"John Doe\" }]";
|
||||
final RecordSchema schema = createSimpleRecordSchema("name");
|
||||
final RecordTransform recordTransform = (RecordTransform) processor.getProcessorAdapter().get().getProcessor();
|
||||
recordTransform.setContext(context);
|
||||
final RecordTransformResult result = recordTransform.transformRecord(json, schema, new EmptyAttributeMap()).get(0);
|
||||
runner.enqueue(json);
|
||||
runner.run();
|
||||
runner.assertTransferCount("original", 1);
|
||||
runner.assertTransferCount("success", 1);
|
||||
|
||||
// Verify the results
|
||||
assertEquals("success", result.getRelationship());
|
||||
assertNull(result.getSchema());
|
||||
assertEquals("{\"name\": \"Jane Doe\", \"number\": \"8\"}", result.getRecordJson());
|
||||
final MockFlowFile out = runner.getFlowFilesForRelationship("success").get(0);
|
||||
out.assertContentEquals("""
|
||||
[{"name":"Jane Doe","number":"8"}]""");
|
||||
}
|
||||
|
||||
private ProcessContext createContext(final Map<PropertyDescriptor, String> propertyValues) {
|
||||
|
@ -543,72 +486,44 @@ public class PythonControllerInteractionIT {
|
|||
return context;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRecordTransformWithInnerRecord() {
|
||||
// Create a PrettyPrintJson Processor
|
||||
final PythonProcessorBridge processor = createProcessor("SetRecordField");
|
||||
private TestRunner createRecordTransformRunner(final String type) throws InitializationException {
|
||||
final Processor processor = createProcessor("SetRecordField");
|
||||
assertNotNull(processor);
|
||||
|
||||
// Mock out ProcessContext to reflect that the processor should set the 'name' field to 'Jane Doe'
|
||||
final PropertyDescriptor nameDescriptor = new PropertyDescriptor.Builder()
|
||||
.name("name")
|
||||
.dynamic(true)
|
||||
.addValidator(Validator.VALID)
|
||||
.build();
|
||||
final JsonTreeReader reader = new JsonTreeReader();
|
||||
final JsonRecordSetWriter writer = new JsonRecordSetWriter();
|
||||
|
||||
final Map<PropertyDescriptor, String> propertyMap = new HashMap<>();
|
||||
propertyMap.put(nameDescriptor, "Jane Doe");
|
||||
final ProcessContext context = createContext(propertyMap);
|
||||
final TestRunner runner = TestRunners.newTestRunner(processor);
|
||||
runner.addControllerService("reader", reader);
|
||||
runner.addControllerService("writer", writer);
|
||||
runner.enableControllerService(reader);
|
||||
runner.enableControllerService(writer);
|
||||
runner.setProperty("Record Reader", "reader");
|
||||
runner.setProperty("Record Writer", "writer");
|
||||
|
||||
return runner;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRecordTransformWithInnerRecord() throws InitializationException {
|
||||
// Create a SetRecordField Processor
|
||||
final TestRunner runner = createRecordTransformRunner("SetRecordField");
|
||||
runner.setProperty("name", "Jane Doe");
|
||||
|
||||
// Create a Record to transform and transform it
|
||||
final String json = "[{\"name\": \"Jake Doe\", \"father\": { \"name\": \"John Doe\" }}]";
|
||||
final RecordSchema recordSchema = createTwoLevelRecord().getSchema();
|
||||
final RecordTransform recordTransform = (RecordTransform) processor.getProcessorAdapter().get().getProcessor();
|
||||
recordTransform.setContext(context);
|
||||
final RecordTransformResult result = recordTransform.transformRecord(json, recordSchema, new EmptyAttributeMap()).get(0);
|
||||
|
||||
// Verify the results
|
||||
assertEquals("success", result.getRelationship());
|
||||
|
||||
assertEquals("{\"name\": \"Jane Doe\", \"father\": {\"name\": \"John Doe\"}}", result.getRecordJson());
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testLogger() throws ExecutionException, InterruptedException {
|
||||
bridge.discoverExtensions();
|
||||
|
||||
final String procId = createId();
|
||||
// Create the Processor, but we do not use this.createProcessor() because we want to explicitly have access to the
|
||||
// initialization context to inject in the logger that we want.
|
||||
final PythonProcessorBridge logContentsBridge = bridge.createProcessor(procId, "LogContents", VERSION, true);
|
||||
|
||||
final ComponentLog logger = Mockito.mock(ComponentLog.class);
|
||||
final PythonProcessorInitializationContext initContext = new PythonProcessorInitializationContext() {
|
||||
@Override
|
||||
public String getIdentifier() {
|
||||
return procId;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ComponentLog getLogger() {
|
||||
return logger;
|
||||
}
|
||||
};
|
||||
|
||||
logContentsBridge.initialize(initContext).get();
|
||||
|
||||
final TestRunner runner = TestRunners.newTestRunner(logContentsBridge.getProcessorProxy());
|
||||
runner.enqueue("Hello World");
|
||||
runner.enqueue(json);
|
||||
runner.run();
|
||||
|
||||
runner.assertTransferCount("original", 1);
|
||||
// Verify the results
|
||||
runner.assertTransferCount("success", 1);
|
||||
final ArgumentCaptor<String> argumentCaptor = ArgumentCaptor.forClass(String.class);
|
||||
Mockito.verify(logger).info(argumentCaptor.capture());
|
||||
assertEquals("Hello World", argumentCaptor.getValue());
|
||||
runner.assertTransferCount("original", 1);
|
||||
final MockFlowFile out = runner.getFlowFilesForRelationship("success").get(0);
|
||||
out.assertContentEquals("""
|
||||
[{"name":"Jane Doe","father":{"name":"John Doe"}}]""");
|
||||
}
|
||||
|
||||
|
||||
private RecordSchema createSimpleRecordSchema(final String... fieldNames) {
|
||||
return createSimpleRecordSchema(Arrays.asList(fieldNames));
|
||||
}
|
||||
|
@ -623,30 +538,6 @@ public class PythonControllerInteractionIT {
|
|||
return schema;
|
||||
}
|
||||
|
||||
private Record createSimpleRecord(final Map<String, Object> values) {
|
||||
final List<RecordField> recordFields = new ArrayList<>();
|
||||
for (final Map.Entry<String, Object> entry : values.entrySet()) {
|
||||
final DataType dataType = DataTypeUtils.inferDataType(entry.getValue(), RecordFieldType.STRING.getDataType());
|
||||
recordFields.add(new RecordField(entry.getKey(), dataType, true));
|
||||
}
|
||||
|
||||
final RecordSchema schema = new SimpleRecordSchema(recordFields);
|
||||
return new MapRecord(schema, values);
|
||||
}
|
||||
|
||||
private Record createTwoLevelRecord() {
|
||||
final Map<String, Object> innerPersonValues = new HashMap<>();
|
||||
innerPersonValues.put("name", "Jake Doe");
|
||||
final Record innerPersonRecord = createSimpleRecord(innerPersonValues);
|
||||
|
||||
final Map<String, Object> outerPersonValues = new HashMap<>();
|
||||
outerPersonValues.put("name", "John Doe");
|
||||
outerPersonValues.put("father", innerPersonRecord);
|
||||
final Record outerPersonRecord = createSimpleRecord(outerPersonValues);
|
||||
|
||||
return outerPersonRecord;
|
||||
}
|
||||
|
||||
|
||||
public interface StringLookupService extends ControllerService {
|
||||
Optional<String> lookup(Map<String, String> coordinates);
|
||||
|
@ -670,37 +561,40 @@ public class PythonControllerInteractionIT {
|
|||
return UUID.randomUUID().toString();
|
||||
}
|
||||
|
||||
private PythonProcessorBridge createProcessor(final String type) {
|
||||
private Processor createProcessor(final String type) {
|
||||
return createProcessor(type, VERSION);
|
||||
}
|
||||
|
||||
private PythonProcessorBridge createProcessor(final String type, final String version) {
|
||||
private Processor createProcessor(final String type, final String version) {
|
||||
bridge.discoverExtensions();
|
||||
final PythonProcessorBridge processor = bridge.createProcessor(createId(), type, version, true);
|
||||
final Future<Void> future = processor.initialize(createInitContext());
|
||||
final AsyncLoadedProcessor processor = bridge.createProcessor(createId(), type, version, true);
|
||||
|
||||
try {
|
||||
future.get();
|
||||
} catch (InterruptedException e) {
|
||||
throw new RuntimeException(e);
|
||||
} catch (ExecutionException e) {
|
||||
throw new RuntimeException(e.getCause());
|
||||
final ProcessorInitializationContext initContext = new MockProcessorInitializationContext();
|
||||
processor.initialize(initContext);
|
||||
|
||||
final long maxInitTime = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(30L);
|
||||
while (true) {
|
||||
final LoadState state = processor.getState();
|
||||
if (state == LoadState.FINISHED_LOADING) {
|
||||
break;
|
||||
}
|
||||
if (state == LoadState.DEPENDENCY_DOWNLOAD_FAILED || state == LoadState.LOADING_PROCESSOR_CODE_FAILED) {
|
||||
throw new RuntimeException("Failed to initialize processor of type %s version %s".formatted(type, version));
|
||||
}
|
||||
|
||||
if (System.currentTimeMillis() > maxInitTime) {
|
||||
throw new RuntimeException("Timed out waiting for processor of type %s version %s to initialize".formatted(type, version));
|
||||
}
|
||||
|
||||
try {
|
||||
Thread.sleep(10L);
|
||||
} catch (final InterruptedException ie) {
|
||||
Thread.currentThread().interrupt();
|
||||
throw new RuntimeException("Interrupted while initializing processor of type %s version %s".formatted(type, version));
|
||||
}
|
||||
}
|
||||
|
||||
processor.initialize(new MockProcessorInitializationContext());
|
||||
return processor;
|
||||
}
|
||||
|
||||
private PythonProcessorInitializationContext createInitContext() {
|
||||
return new PythonProcessorInitializationContext() {
|
||||
@Override
|
||||
public String getIdentifier() {
|
||||
return "unit-test-id";
|
||||
}
|
||||
|
||||
@Override
|
||||
public ComponentLog getLogger() {
|
||||
return new MockComponentLogger();
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
|
|
|
@ -17,7 +17,7 @@
|
|||
|
||||
package org.apache.nifi.python;
|
||||
|
||||
import org.apache.nifi.python.processor.PythonProcessorBridge;
|
||||
import org.apache.nifi.components.AsyncLoadedProcessor;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collections;
|
||||
|
@ -64,7 +64,7 @@ public class DisabledPythonBridge implements PythonBridge {
|
|||
}
|
||||
|
||||
@Override
|
||||
public PythonProcessorBridge createProcessor(final String identifier, final String type, final String version, final boolean preferIsolatedProcess) {
|
||||
public AsyncLoadedProcessor createProcessor(final String identifier, final String type, final String version, final boolean preferIsolatedProcess) {
|
||||
throw new UnsupportedOperationException("Cannot create Processor of type " + type + " because Python extensions are disabled");
|
||||
}
|
||||
|
||||
|
|
|
@ -17,7 +17,7 @@
|
|||
|
||||
package org.apache.nifi.python;
|
||||
|
||||
import org.apache.nifi.python.processor.PythonProcessorBridge;
|
||||
import org.apache.nifi.components.AsyncLoadedProcessor;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
|
@ -88,8 +88,7 @@ public interface PythonBridge {
|
|||
void discoverExtensions();
|
||||
|
||||
/**
|
||||
* Creates a Processor with the given identifier, type, and version. Then returns a PythonProcessorBridge that provides access to all
|
||||
* necessary information and objects for interacting with this Processor from the Java side.
|
||||
* Creates a Processor with the given identifier, type, and version.
|
||||
*
|
||||
* @param identifier the Processor's identifier
|
||||
* @param type the Processor's type
|
||||
|
@ -97,7 +96,7 @@ public interface PythonBridge {
|
|||
* @param preferIsolatedProcess whether or not to prefer launching a Python Process that is isolated for just this one instance of the Processor
|
||||
* @return a PythonProcessorBridge that can be used for interacting with the Processor
|
||||
*/
|
||||
PythonProcessorBridge createProcessor(String identifier, String type, String version, boolean preferIsolatedProcess);
|
||||
AsyncLoadedProcessor createProcessor(String identifier, String type, String version, boolean preferIsolatedProcess);
|
||||
|
||||
/**
|
||||
* A notification that the Processor with the given identifier, type, and version was removed from the flow. This triggers the bridge
|
||||
|
|
|
@ -18,10 +18,8 @@
|
|||
package org.apache.nifi.python.processor;
|
||||
|
||||
import org.apache.nifi.components.AsyncLoadedProcessor.LoadState;
|
||||
import org.apache.nifi.processor.Processor;
|
||||
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.Future;
|
||||
|
||||
/**
|
||||
* A model object that is used to bridge the gap between what is necessary for the Framework to interact
|
||||
|
@ -34,12 +32,6 @@ public interface PythonProcessorBridge {
|
|||
*/
|
||||
Optional<PythonProcessorAdapter> getProcessorAdapter();
|
||||
|
||||
/**
|
||||
* @return a proxy for the actual Processor implementation that will trigger the appropriate method on the Python side, or an empty Optional
|
||||
* if the processor/adapter have not yet been initialized
|
||||
*/
|
||||
Processor getProcessorProxy();
|
||||
|
||||
/**
|
||||
* @return the name of the Processor implementation. This will not contain a 'python.' prefix.
|
||||
*/
|
||||
|
@ -57,7 +49,7 @@ public interface PythonProcessorBridge {
|
|||
* Initializes the Processor
|
||||
* @param context the initialization context
|
||||
*/
|
||||
Future<Void> initialize(PythonProcessorInitializationContext context);
|
||||
void initialize(PythonProcessorInitializationContext context);
|
||||
|
||||
/**
|
||||
* @return the current state of the Processor loading
|
||||
|
|
Loading…
Reference in New Issue