mirror of https://github.com/apache/nifi.git
NIFI-12205: Moved loading of Python dependencies into background thread when processor created instead of during startup. Some code cleanup.
This closes #7863 Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
parent
61c856a54c
commit
cbdf32ab79
|
@ -0,0 +1,38 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.nifi.components;
|
||||
|
||||
public interface AsyncLoadedProcessor {
|
||||
default boolean isLoaded() {
|
||||
return getState() == LoadState.FINISHED_LOADING;
|
||||
}
|
||||
|
||||
LoadState getState();
|
||||
|
||||
enum LoadState {
|
||||
DOWNLOADING_DEPENDENCIES,
|
||||
|
||||
LOADING_PROCESSOR_CODE,
|
||||
|
||||
DEPENDENCY_DOWNLOAD_FAILED,
|
||||
|
||||
LOADING_PROCESSOR_CODE_FAILED,
|
||||
|
||||
FINISHED_LOADING
|
||||
}
|
||||
}
|
|
@ -23,6 +23,7 @@ import org.apache.nifi.attribute.expression.language.VariableImpact;
|
|||
import org.apache.nifi.bundle.Bundle;
|
||||
import org.apache.nifi.bundle.BundleCoordinate;
|
||||
import org.apache.nifi.components.AllowableValue;
|
||||
import org.apache.nifi.components.AsyncLoadedProcessor;
|
||||
import org.apache.nifi.components.ClassloaderIsolationKeyProvider;
|
||||
import org.apache.nifi.components.ConfigVerificationResult;
|
||||
import org.apache.nifi.components.ConfigVerificationResult.Outcome;
|
||||
|
@ -397,16 +398,7 @@ public abstract class AbstractComponentNode implements ComponentNode {
|
|||
try {
|
||||
final long startNanos = System.nanoTime();
|
||||
|
||||
final Map<PropertyDescriptor, PropertyConfiguration> descriptorToConfigMap = new LinkedHashMap<>();
|
||||
for (final Map.Entry<PropertyDescriptor, String> entry : propertyValues.entrySet()) {
|
||||
final PropertyDescriptor descriptor = entry.getKey();
|
||||
final String rawValue = entry.getValue();
|
||||
final String propertyValue = rawValue == null ? descriptor.getDefaultValue() : rawValue;
|
||||
|
||||
final PropertyConfiguration propertyConfiguration = new PropertyConfiguration(propertyValue, null, Collections.emptyList(), VariableImpact.NEVER_IMPACTED);
|
||||
descriptorToConfigMap.put(descriptor, propertyConfiguration);
|
||||
}
|
||||
|
||||
final Map<PropertyDescriptor, PropertyConfiguration> descriptorToConfigMap = createDescriptorToConfigMap(propertyValues);
|
||||
final ValidationContext validationContext = getValidationContextFactory().newValidationContext(descriptorToConfigMap, annotationData,
|
||||
getProcessGroupIdentifier(), getIdentifier(), parameterContext, false);
|
||||
|
||||
|
@ -457,6 +449,19 @@ public abstract class AbstractComponentNode implements ComponentNode {
|
|||
return results;
|
||||
}
|
||||
|
||||
private static Map<PropertyDescriptor, PropertyConfiguration> createDescriptorToConfigMap(final Map<PropertyDescriptor, String> propertyValues) {
|
||||
final Map<PropertyDescriptor, PropertyConfiguration> descriptorToConfigMap = new LinkedHashMap<>();
|
||||
for (final Map.Entry<PropertyDescriptor, String> entry : propertyValues.entrySet()) {
|
||||
final PropertyDescriptor descriptor = entry.getKey();
|
||||
final String rawValue = entry.getValue();
|
||||
final String propertyValue = rawValue == null ? descriptor.getDefaultValue() : rawValue;
|
||||
|
||||
final PropertyConfiguration propertyConfiguration = new PropertyConfiguration(propertyValue, null, Collections.emptyList(), VariableImpact.NEVER_IMPACTED);
|
||||
descriptorToConfigMap.put(descriptor, propertyConfiguration);
|
||||
}
|
||||
return descriptorToConfigMap;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Set<String> getReferencedParameterNames() {
|
||||
return Collections.unmodifiableSet(parameterReferenceCounts.keySet());
|
||||
|
@ -842,6 +847,25 @@ public abstract class AbstractComponentNode implements ComponentNode {
|
|||
);
|
||||
}
|
||||
|
||||
final ConfigurableComponent component = getComponent();
|
||||
if (component instanceof final AsyncLoadedProcessor asyncLoadedProcessor) {
|
||||
if (!asyncLoadedProcessor.isLoaded()) {
|
||||
final String explanation = switch (asyncLoadedProcessor.getState()) {
|
||||
case DEPENDENCY_DOWNLOAD_FAILED -> "Failed to download one or more Processor dependencies. See logs for additional details.";
|
||||
case DOWNLOADING_DEPENDENCIES -> "In the process of downloading third-party dependencies required by the Processor.";
|
||||
case LOADING_PROCESSOR_CODE -> "In the process of loading Processor code";
|
||||
case LOADING_PROCESSOR_CODE_FAILED -> "Failed to parse or load Processor code. See logs for additional details.";
|
||||
default -> null;
|
||||
};
|
||||
|
||||
return Collections.singletonList(new ValidationResult.Builder()
|
||||
.subject("Processor")
|
||||
.explanation(explanation)
|
||||
.valid(false)
|
||||
.build());
|
||||
}
|
||||
}
|
||||
|
||||
final List<ValidationResult> invalidParameterResults = validateParameterReferences(validationContext);
|
||||
invalidParameterResults.addAll(validateConfig());
|
||||
|
||||
|
@ -851,9 +875,8 @@ public abstract class AbstractComponentNode implements ComponentNode {
|
|||
return invalidParameterResults;
|
||||
}
|
||||
|
||||
final List<ValidationResult> validationResults = new ArrayList<>();
|
||||
final Collection<ValidationResult> results = getComponent().validate(validationContext);
|
||||
validationResults.addAll(results);
|
||||
final List<ValidationResult> validationResults = new ArrayList<>(results);
|
||||
|
||||
// validate selected controller services implement the API required by the processor
|
||||
final Collection<ValidationResult> referencedServiceValidationResults = validateReferencedControllerServices(validationContext);
|
||||
|
@ -1423,10 +1446,18 @@ public abstract class AbstractComponentNode implements ComponentNode {
|
|||
return context;
|
||||
}
|
||||
|
||||
boolean cacheValidationContext = true;
|
||||
if (getComponent() instanceof final AsyncLoadedProcessor asyncLoadedProcessor) {
|
||||
cacheValidationContext = asyncLoadedProcessor.isLoaded();
|
||||
}
|
||||
|
||||
context = getValidationContextFactory().newValidationContext(getProperties(), getAnnotationData(), getProcessGroupIdentifier(), getIdentifier(), getParameterContext(), true);
|
||||
|
||||
this.validationContext = context;
|
||||
logger.debug("Updating validation context to {}", context);
|
||||
if (cacheValidationContext) {
|
||||
this.validationContext = context;
|
||||
logger.debug("Updating validation context to {}", context);
|
||||
}
|
||||
|
||||
return context;
|
||||
} finally {
|
||||
lock.unlock();
|
||||
|
|
|
@ -35,6 +35,10 @@
|
|||
<artifactId>nifi-python-framework-api</artifactId>
|
||||
<version>2.0.0-SNAPSHOT</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-framework-api</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>net.sf.py4j</groupId>
|
||||
<artifactId>py4j</artifactId>
|
||||
|
|
|
@ -0,0 +1,28 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.nifi.py4j;
|
||||
|
||||
import org.apache.nifi.python.processor.PythonProcessorAdapter;
|
||||
|
||||
public interface ProcessorCreationWorkflow {
|
||||
|
||||
void downloadDependencies();
|
||||
|
||||
PythonProcessorAdapter createProcessor();
|
||||
|
||||
}
|
|
@ -99,12 +99,24 @@ public class StandardPythonBridge implements PythonBridge {
|
|||
final String workDirPath = processConfig.getPythonWorkingDirectory().getAbsolutePath();
|
||||
|
||||
final PythonController controller = pythonProcess.getController();
|
||||
final PythonProcessorAdapter processorAdapter = controller.createProcessor(type, version, workDirPath);
|
||||
|
||||
final ProcessorCreationWorkflow creationWorkflow = new ProcessorCreationWorkflow() {
|
||||
@Override
|
||||
public void downloadDependencies() {
|
||||
controller.downloadDependencies(type, version, workDirPath);
|
||||
}
|
||||
|
||||
@Override
|
||||
public PythonProcessorAdapter createProcessor() {
|
||||
return controller.createProcessor(type, version, workDirPath);
|
||||
}
|
||||
};
|
||||
|
||||
final PythonProcessorDetails processorDetails = controller.getProcessorDetails(type, version);
|
||||
final PythonProcessorBridge processorBridge = new StandardPythonProcessorBridge.Builder()
|
||||
.controller(controller)
|
||||
.processorAdapter(processorAdapter)
|
||||
.processorType(type)
|
||||
.processorVersion(version)
|
||||
.creationWorkflow(creationWorkflow)
|
||||
.processorDetails(processorDetails)
|
||||
.workingDirectory(processConfig.getPythonWorkingDirectory())
|
||||
.moduleFile(new File(controller.getModuleFile(type, version)))
|
||||
.build();
|
||||
|
|
|
@ -19,37 +19,44 @@ package org.apache.nifi.py4j;
|
|||
|
||||
import org.apache.nifi.processor.Processor;
|
||||
import org.apache.nifi.python.PythonController;
|
||||
import org.apache.nifi.python.PythonProcessorDetails;
|
||||
import org.apache.nifi.python.processor.FlowFileTransform;
|
||||
import org.apache.nifi.python.processor.FlowFileTransformProxy;
|
||||
import org.apache.nifi.python.processor.PythonProcessor;
|
||||
import org.apache.nifi.python.processor.PythonProcessorAdapter;
|
||||
import org.apache.nifi.python.processor.PythonProcessorBridge;
|
||||
import org.apache.nifi.python.processor.PythonProcessorInitializationContext;
|
||||
import org.apache.nifi.python.processor.PythonProcessorProxy;
|
||||
import org.apache.nifi.python.processor.RecordTransform;
|
||||
import org.apache.nifi.python.processor.RecordTransformProxy;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.File;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.Future;
|
||||
|
||||
import static org.apache.nifi.components.AsyncLoadedProcessor.LoadState;
|
||||
|
||||
public class StandardPythonProcessorBridge implements PythonProcessorBridge {
|
||||
private static final Logger logger = LoggerFactory.getLogger(StandardPythonProcessorBridge.class);
|
||||
|
||||
private final PythonController controller;
|
||||
private final ProcessorCreationWorkflow creationWorkflow;
|
||||
private final PythonProcessorDetails processorDetails;
|
||||
private volatile PythonProcessorAdapter adapter;
|
||||
private volatile Processor proxy;
|
||||
private final String processorType;
|
||||
private final String version;
|
||||
private final PythonProcessorProxy proxy;
|
||||
private final File workingDir;
|
||||
private final File moduleFile;
|
||||
private volatile long lastModified;
|
||||
private volatile LoadState loadState = LoadState.DOWNLOADING_DEPENDENCIES;
|
||||
private volatile PythonProcessorInitializationContext initializationContext;
|
||||
|
||||
|
||||
private StandardPythonProcessorBridge(final Builder builder) {
|
||||
this.controller = builder.controller;
|
||||
this.adapter = builder.adapter;
|
||||
this.processorType = builder.processorType;
|
||||
this.version = builder.version;
|
||||
this.creationWorkflow = builder.creationWorkflow;
|
||||
this.processorDetails = builder.processorDetails;
|
||||
this.workingDir = builder.workDir;
|
||||
this.moduleFile = builder.moduleFile;
|
||||
this.lastModified = this.moduleFile.lastModified();
|
||||
|
@ -58,19 +65,57 @@ public class StandardPythonProcessorBridge implements PythonProcessorBridge {
|
|||
}
|
||||
|
||||
@Override
|
||||
public PythonProcessorAdapter getProcessorAdapter() {
|
||||
return adapter;
|
||||
public Optional<PythonProcessorAdapter> getProcessorAdapter() {
|
||||
return Optional.ofNullable(adapter);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void initialize(final PythonProcessorInitializationContext context) {
|
||||
public Future<Void> initialize(final PythonProcessorInitializationContext context) {
|
||||
this.initializationContext = context;
|
||||
this.adapter.initialize(context);
|
||||
|
||||
final String threadName = "Initialize Python Processor %s (%s)".formatted(initializationContext.getIdentifier(), getProcessorType());
|
||||
final CompletableFuture<Void> future = new CompletableFuture<>();
|
||||
|
||||
Thread.ofVirtual().name(threadName).start(() -> initializePythonSide(future));
|
||||
return future;
|
||||
}
|
||||
|
||||
public LoadState getLoadState() {
|
||||
return loadState;
|
||||
}
|
||||
|
||||
private void initializePythonSide(final CompletableFuture<Void> future) {
|
||||
try {
|
||||
try {
|
||||
creationWorkflow.downloadDependencies();
|
||||
loadState = LoadState.LOADING_PROCESSOR_CODE;
|
||||
} catch (final Exception e) {
|
||||
loadState = LoadState.DEPENDENCY_DOWNLOAD_FAILED;
|
||||
throw e;
|
||||
}
|
||||
|
||||
final PythonProcessorAdapter pythonProcessorAdapter;
|
||||
try {
|
||||
pythonProcessorAdapter = creationWorkflow.createProcessor();
|
||||
pythonProcessorAdapter.initialize(initializationContext);
|
||||
this.adapter = pythonProcessorAdapter;
|
||||
this.proxy.onPythonSideInitialized(pythonProcessorAdapter);
|
||||
|
||||
loadState = LoadState.FINISHED_LOADING;
|
||||
} catch (final Exception e) {
|
||||
loadState = LoadState.LOADING_PROCESSOR_CODE_FAILED;
|
||||
throw e;
|
||||
}
|
||||
|
||||
future.complete(null);
|
||||
} catch (final Throwable t) {
|
||||
future.completeExceptionally(t);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getProcessorType() {
|
||||
return processorType;
|
||||
return processorDetails.getProcessorType();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -81,56 +126,49 @@ public class StandardPythonProcessorBridge implements PythonProcessorBridge {
|
|||
@Override
|
||||
public boolean reload() {
|
||||
if (moduleFile.lastModified() <= lastModified) {
|
||||
logger.debug("Processor {} has not been modified since it was last loaded so will not reload", processorType);
|
||||
logger.debug("Processor {} has not been modified since it was last loaded so will not reload", getProcessorType());
|
||||
return false;
|
||||
}
|
||||
|
||||
controller.reloadProcessor(processorType, version, workingDir.getAbsolutePath());
|
||||
adapter = controller.createProcessor(processorType, version, workingDir.getAbsolutePath());
|
||||
adapter.initialize(initializationContext);
|
||||
proxy = createProxy();
|
||||
controller.reloadProcessor(getProcessorType(), processorDetails.getProcessorVersion(), workingDir.getAbsolutePath());
|
||||
initializePythonSide(new CompletableFuture<>());
|
||||
lastModified = moduleFile.lastModified();
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
private PythonProcessorProxy createProxy() {
|
||||
final PythonProcessor pythonProcessor = adapter.getProcessor();
|
||||
|
||||
// Instantiate the appropriate Processor Proxy based on the type of Python Processor
|
||||
if (pythonProcessor instanceof FlowFileTransform) {
|
||||
final String implementedInterface = processorDetails.getInterface();
|
||||
if (FlowFileTransform.class.getName().equals(implementedInterface)) {
|
||||
return new FlowFileTransformProxy(this);
|
||||
} else {
|
||||
}
|
||||
if (RecordTransform.class.getName().equals(implementedInterface)) {
|
||||
return new RecordTransformProxy(this);
|
||||
}
|
||||
|
||||
throw new IllegalArgumentException("Python Processor does not implement any of the valid interfaces. Interface implemented: " + implementedInterface);
|
||||
}
|
||||
|
||||
|
||||
public static class Builder {
|
||||
private PythonController controller;
|
||||
private PythonProcessorAdapter adapter;
|
||||
private String processorType;
|
||||
private String version;
|
||||
private ProcessorCreationWorkflow creationWorkflow;
|
||||
private File workDir;
|
||||
private File moduleFile;
|
||||
private PythonProcessorDetails processorDetails;
|
||||
|
||||
public Builder controller(final PythonController controller) {
|
||||
this.controller = controller;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder processorAdapter(final PythonProcessorAdapter adapter) {
|
||||
this.adapter = adapter;
|
||||
public Builder creationWorkflow(final ProcessorCreationWorkflow creationWorkflow) {
|
||||
this.creationWorkflow = creationWorkflow;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder processorType(final String type) {
|
||||
this.processorType = type;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder processorVersion(final String version) {
|
||||
this.version = version;
|
||||
public Builder processorDetails(final PythonProcessorDetails details) {
|
||||
this.processorDetails = details;
|
||||
return this;
|
||||
}
|
||||
|
||||
|
@ -148,11 +186,11 @@ public class StandardPythonProcessorBridge implements PythonProcessorBridge {
|
|||
if (controller == null) {
|
||||
throw new IllegalStateException("Must specify the PythonController");
|
||||
}
|
||||
if (adapter == null) {
|
||||
throw new IllegalStateException("Must specify the Processor Adapter");
|
||||
if (creationWorkflow == null) {
|
||||
throw new IllegalStateException("Must specify the Processor Creation Workflow");
|
||||
}
|
||||
if (processorType == null) {
|
||||
throw new IllegalStateException("Must specify the Processor Type");
|
||||
if (processorDetails == null) {
|
||||
throw new IllegalStateException("Must specify the Processor Details");
|
||||
}
|
||||
if (workDir == null) {
|
||||
throw new IllegalStateException("Must specify the Working Directory");
|
||||
|
|
|
@ -17,7 +17,6 @@
|
|||
|
||||
package org.apache.nifi.python.processor;
|
||||
|
||||
import java.util.Map;
|
||||
import org.apache.nifi.annotation.behavior.DefaultRunDuration;
|
||||
import org.apache.nifi.annotation.behavior.SupportsBatching;
|
||||
import org.apache.nifi.annotation.lifecycle.OnScheduled;
|
||||
|
@ -28,31 +27,30 @@ import org.apache.nifi.processor.Relationship;
|
|||
import org.apache.nifi.processor.exception.ProcessException;
|
||||
import py4j.Py4JNetworkException;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
|
||||
@SupportsBatching(defaultDuration = DefaultRunDuration.TWENTY_FIVE_MILLIS)
|
||||
public class FlowFileTransformProxy extends PythonProcessorProxy {
|
||||
|
||||
private final PythonProcessorBridge bridge;
|
||||
private volatile FlowFileTransform transform;
|
||||
|
||||
|
||||
public FlowFileTransformProxy(final PythonProcessorBridge bridge) {
|
||||
super(bridge);
|
||||
this.bridge = bridge;
|
||||
this.transform = (FlowFileTransform) bridge.getProcessorAdapter().getProcessor();
|
||||
}
|
||||
|
||||
|
||||
protected void reloadProcessor() {
|
||||
final boolean reloaded = bridge.reload();
|
||||
if (reloaded) {
|
||||
transform = (FlowFileTransform) bridge.getProcessorAdapter().getProcessor();
|
||||
getLogger().info("Successfully reloaded Processor");
|
||||
}
|
||||
}
|
||||
|
||||
@OnScheduled
|
||||
public void setContext(final ProcessContext context) {
|
||||
transform.setContext(context);
|
||||
final Optional<PythonProcessorAdapter> optionalAdapter = bridge.getProcessorAdapter();
|
||||
if (optionalAdapter.isEmpty()) {
|
||||
throw new IllegalStateException(this + " is not finished initializing");
|
||||
}
|
||||
|
||||
this.transform = (FlowFileTransform) optionalAdapter.get().getProcessor();
|
||||
this.transform.setContext(context);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -17,16 +17,9 @@
|
|||
|
||||
package org.apache.nifi.python.processor;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import org.apache.nifi.annotation.lifecycle.OnScheduled;
|
||||
import org.apache.nifi.annotation.lifecycle.OnStopped;
|
||||
import org.apache.nifi.components.AsyncLoadedProcessor;
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.components.ValidationContext;
|
||||
import org.apache.nifi.components.ValidationResult;
|
||||
|
@ -34,13 +27,21 @@ import org.apache.nifi.processor.AbstractProcessor;
|
|||
import org.apache.nifi.processor.ProcessContext;
|
||||
import org.apache.nifi.processor.Relationship;
|
||||
|
||||
public abstract class PythonProcessorProxy extends AbstractProcessor {
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
|
||||
public abstract class PythonProcessorProxy extends AbstractProcessor implements AsyncLoadedProcessor {
|
||||
private final PythonProcessorBridge bridge;
|
||||
private volatile Set<Relationship> cachedRelationships = null;
|
||||
private volatile List<PropertyDescriptor> cachedPropertyDescriptors = null;
|
||||
private volatile Map<String, PropertyDescriptor> cachedDynamicDescriptors = null;
|
||||
private boolean supportsDynamicProperties;
|
||||
private volatile boolean running = false;
|
||||
private volatile boolean supportsDynamicProperties;
|
||||
|
||||
protected static final Relationship REL_ORIGINAL = new Relationship.Builder()
|
||||
.name("original")
|
||||
|
@ -51,22 +52,37 @@ public abstract class PythonProcessorProxy extends AbstractProcessor {
|
|||
.name("failure")
|
||||
.description("The original FlowFile will be routed to this relationship if it unable to be transformed for some reason")
|
||||
.build();
|
||||
private static final Set<Relationship> implicitRelationships = Collections.unmodifiableSet(
|
||||
new HashSet<>(Arrays.asList(REL_ORIGINAL, REL_FAILURE)));
|
||||
private static final Set<Relationship> implicitRelationships = Set.of(
|
||||
REL_ORIGINAL,
|
||||
REL_FAILURE);
|
||||
|
||||
public PythonProcessorProxy(final PythonProcessorBridge bridge) {
|
||||
this.bridge = bridge;
|
||||
supportsDynamicProperties = bridge.getProcessorAdapter().isDynamicPropertySupported();
|
||||
}
|
||||
|
||||
public void onPythonSideInitialized(final PythonProcessorAdapter adapter) {
|
||||
supportsDynamicProperties = adapter.isDynamicPropertySupported();
|
||||
}
|
||||
|
||||
@Override
|
||||
public LoadState getState() {
|
||||
return bridge.getLoadState();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
||||
if (running && cachedPropertyDescriptors != null) {
|
||||
if (cachedPropertyDescriptors != null) {
|
||||
return this.cachedPropertyDescriptors;
|
||||
}
|
||||
|
||||
final Optional<PythonProcessorAdapter> optionalAdapter = bridge.getProcessorAdapter();
|
||||
if (optionalAdapter.isEmpty()) {
|
||||
// If we don't have the adapter yet, use whatever is cached, even if it's old, or an empty List if we have nothing cached.
|
||||
return this.cachedPropertyDescriptors == null ? Collections.emptyList() : cachedPropertyDescriptors;
|
||||
}
|
||||
|
||||
try {
|
||||
final List<PropertyDescriptor> properties = bridge.getProcessorAdapter().getSupportedPropertyDescriptors();
|
||||
final List<PropertyDescriptor> properties = optionalAdapter.get().getSupportedPropertyDescriptors();
|
||||
this.cachedPropertyDescriptors = properties; // cache descriptors in case the processor is updated and the properties can no longer be properly accessed
|
||||
return properties;
|
||||
} catch (final Exception e) {
|
||||
|
@ -76,19 +92,27 @@ public abstract class PythonProcessorProxy extends AbstractProcessor {
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
|
||||
final Optional<PythonProcessorAdapter> optionalAdapter = bridge.getProcessorAdapter();
|
||||
if (optionalAdapter.isEmpty()) {
|
||||
return List.of(new ValidationResult.Builder()
|
||||
.subject("Processor")
|
||||
.explanation("Processor has not yet completed initialization")
|
||||
.valid(false)
|
||||
.build());
|
||||
}
|
||||
|
||||
try {
|
||||
reload();
|
||||
return bridge.getProcessorAdapter().customValidate(validationContext);
|
||||
return optionalAdapter.get().customValidate(validationContext);
|
||||
} catch (final Exception e) {
|
||||
getLogger().warn("Failed to perform validation for Python Processor {}; assuming invalid", this, e);
|
||||
|
||||
return Collections.singleton(new ValidationResult.Builder()
|
||||
.subject("Perform Validation")
|
||||
.valid(false)
|
||||
.explanation("Unable to communicate with Python Processor")
|
||||
.explanation("Failed to trigger Python Processor to perform validation: " + e)
|
||||
.build());
|
||||
}
|
||||
}
|
||||
|
@ -106,7 +130,9 @@ public abstract class PythonProcessorProxy extends AbstractProcessor {
|
|||
}
|
||||
|
||||
try {
|
||||
return bridge.getProcessorAdapter().getSupportedDynamicPropertyDescriptor(propertyDescriptorName);
|
||||
final Optional<PythonProcessorAdapter> optionalAdapter = bridge.getProcessorAdapter();
|
||||
return optionalAdapter.map(adapter -> adapter.getSupportedDynamicPropertyDescriptor(propertyDescriptorName))
|
||||
.orElse(null);
|
||||
} catch (final Exception e) {
|
||||
getLogger().warn("Failed to obtain Dynamic Property Descriptor with name {} from Python Processor {}; assuming property is not valid", propertyDescriptorName, this, e);
|
||||
return null;
|
||||
|
@ -138,12 +164,10 @@ public abstract class PythonProcessorProxy extends AbstractProcessor {
|
|||
}
|
||||
|
||||
this.cachedDynamicDescriptors = dynamicDescriptors;
|
||||
this.running = true;
|
||||
}
|
||||
|
||||
@OnStopped
|
||||
public void destroyCachedElements() {
|
||||
this.running = false;
|
||||
this.cachedRelationships = null;
|
||||
this.cachedDynamicDescriptors = null;
|
||||
}
|
||||
|
@ -161,11 +185,14 @@ public abstract class PythonProcessorProxy extends AbstractProcessor {
|
|||
private Set<Relationship> fetchRelationshipsFromPythonProcessor() {
|
||||
Set<Relationship> processorRelationships;
|
||||
try {
|
||||
processorRelationships = bridge.getProcessorAdapter().getRelationships();
|
||||
processorRelationships = bridge.getProcessorAdapter()
|
||||
.map(PythonProcessorAdapter::getRelationships)
|
||||
.orElseGet(HashSet::new);
|
||||
} catch (final Exception e) {
|
||||
getLogger().warn("Failed to obtain list of Relationships from Python Processor {}; assuming no explicit relationships", this, e);
|
||||
processorRelationships = new HashSet<>();
|
||||
}
|
||||
|
||||
processorRelationships.addAll(getImplicitRelationships());
|
||||
return processorRelationships;
|
||||
}
|
||||
|
@ -174,12 +201,16 @@ public abstract class PythonProcessorProxy extends AbstractProcessor {
|
|||
@OnScheduled
|
||||
public void onScheduled(final ProcessContext context) {
|
||||
reload();
|
||||
bridge.getProcessorAdapter().onScheduled(context);
|
||||
bridge.getProcessorAdapter()
|
||||
.orElseThrow(() -> new IllegalStateException("Processor has not finished initializing"))
|
||||
.onScheduled(context);
|
||||
}
|
||||
|
||||
@OnStopped
|
||||
public void onStopped(final ProcessContext context) {
|
||||
bridge.getProcessorAdapter().onStopped(context);
|
||||
bridge.getProcessorAdapter()
|
||||
.orElseThrow(() -> new IllegalStateException("Processor has not finished initializing"))
|
||||
.onStopped(context);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -188,11 +219,15 @@ public abstract class PythonProcessorProxy extends AbstractProcessor {
|
|||
}
|
||||
|
||||
private void reload() {
|
||||
reloadProcessor();
|
||||
supportsDynamicProperties = bridge.getProcessorAdapter().isDynamicPropertySupported();
|
||||
}
|
||||
final boolean reloaded = bridge.reload();
|
||||
if (reloaded) {
|
||||
getLogger().info("Successfully reloaded Processor");
|
||||
}
|
||||
|
||||
protected abstract void reloadProcessor();
|
||||
supportsDynamicProperties = bridge.getProcessorAdapter()
|
||||
.orElseThrow(() -> new IllegalStateException("Processor has not finished initializing"))
|
||||
.isDynamicPropertySupported();
|
||||
}
|
||||
|
||||
protected Set<Relationship> getImplicitRelationships() {
|
||||
return implicitRelationships;
|
||||
|
|
|
@ -17,17 +17,6 @@
|
|||
|
||||
package org.apache.nifi.python.processor;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.OutputStream;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import org.apache.nifi.NullSuppression;
|
||||
import org.apache.nifi.annotation.behavior.DefaultRunDuration;
|
||||
import org.apache.nifi.annotation.behavior.SupportsBatching;
|
||||
|
@ -55,12 +44,24 @@ import org.apache.nifi.serialization.WriteResult;
|
|||
import org.apache.nifi.serialization.record.Record;
|
||||
import org.apache.nifi.serialization.record.RecordSchema;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.OutputStream;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
|
||||
@SupportsBatching(defaultDuration = DefaultRunDuration.TWENTY_FIVE_MILLIS)
|
||||
public class RecordTransformProxy extends PythonProcessorProxy {
|
||||
private final PythonProcessorBridge bridge;
|
||||
private volatile RecordTransform transform;
|
||||
|
||||
|
||||
static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
|
||||
.name("Record Reader")
|
||||
.displayName("Record Reader")
|
||||
|
@ -80,7 +81,6 @@ public class RecordTransformProxy extends PythonProcessorProxy {
|
|||
public RecordTransformProxy(final PythonProcessorBridge bridge) {
|
||||
super(bridge);
|
||||
this.bridge = bridge;
|
||||
this.transform = (RecordTransform) bridge.getProcessorAdapter().getProcessor();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -92,18 +92,15 @@ public class RecordTransformProxy extends PythonProcessorProxy {
|
|||
return properties;
|
||||
}
|
||||
|
||||
|
||||
public void reloadProcessor() {
|
||||
final boolean reloaded = bridge.reload();
|
||||
if (reloaded) {
|
||||
transform = (RecordTransform) bridge.getProcessorAdapter().getProcessor();
|
||||
getLogger().info("Successfully reloaded Processor");
|
||||
}
|
||||
}
|
||||
|
||||
@OnScheduled
|
||||
public void setProcessContext(final ProcessContext context) {
|
||||
transform.setContext(context);
|
||||
final Optional<PythonProcessorAdapter> adapterOptional = bridge.getProcessorAdapter();
|
||||
if (adapterOptional.isEmpty()) {
|
||||
throw new IllegalStateException(this + " is not finished initializing");
|
||||
}
|
||||
|
||||
this.transform = (RecordTransform) adapterOptional.get().getProcessor();
|
||||
this.transform.setContext(context);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -182,10 +179,10 @@ public class RecordTransformProxy extends PythonProcessorProxy {
|
|||
session.transfer(flowFile, REL_FAILURE);
|
||||
|
||||
destinationTuples.values().forEach(tuple -> {
|
||||
session.remove(tuple.getFlowFile());
|
||||
session.remove(tuple.flowFile());
|
||||
|
||||
try {
|
||||
tuple.getWriter().close();
|
||||
tuple.writer().close();
|
||||
} catch (final IOException ioe) {
|
||||
getLogger().warn("Failed to close Record Writer for FlowFile created in this session", ioe);
|
||||
}
|
||||
|
@ -217,26 +214,25 @@ public class RecordTransformProxy extends PythonProcessorProxy {
|
|||
final Map<Relationship, List<FlowFile>> flowFilesPerRelationship = new HashMap<>();
|
||||
for (final Map.Entry<RecordGroupingKey, DestinationTuple> entry : destinationTuples.entrySet()) {
|
||||
final DestinationTuple destinationTuple = entry.getValue();
|
||||
final RecordSetWriter writer = destinationTuple.getWriter();
|
||||
final RecordSetWriter writer = destinationTuple.writer();
|
||||
|
||||
final WriteResult writeResult = writer.finishRecordSet();
|
||||
writer.close();
|
||||
|
||||
// Create attribute map
|
||||
final Map<String, String> attributes = new HashMap<>();
|
||||
attributes.putAll(writeResult.getAttributes());
|
||||
final Map<String, String> attributes = new HashMap<>(writeResult.getAttributes());
|
||||
attributes.put("record.count", String.valueOf(writeResult.getRecordCount()));
|
||||
attributes.put("mime.type", writer.getMimeType());
|
||||
|
||||
final RecordGroupingKey groupingKey = entry.getKey();
|
||||
final Map<String, Object> partition = groupingKey.getPartition();
|
||||
final Map<String, Object> partition = groupingKey.partition();
|
||||
if (partition != null) {
|
||||
partition.forEach((key, value) -> attributes.put(key, Objects.toString(value)));
|
||||
}
|
||||
|
||||
// Update the FlowFile and add to the appropriate Relationship and grouping
|
||||
final FlowFile outputFlowFile = session.putAllAttributes(destinationTuple.getFlowFile(), attributes);
|
||||
final Relationship destinationRelationship = new Relationship.Builder().name(groupingKey.getRelationship()).build();
|
||||
final FlowFile outputFlowFile = session.putAllAttributes(destinationTuple.flowFile(), attributes);
|
||||
final Relationship destinationRelationship = new Relationship.Builder().name(groupingKey.relationship()).build();
|
||||
final List<FlowFile> flowFiles = flowFilesPerRelationship.computeIfAbsent(destinationRelationship, key -> new ArrayList<>());
|
||||
flowFiles.add(outputFlowFile);
|
||||
}
|
||||
|
@ -289,7 +285,7 @@ public class RecordTransformProxy extends PythonProcessorProxy {
|
|||
}
|
||||
|
||||
// Transform the result into a Record and write it out
|
||||
destinationTuple.getWriter().write(transformed);
|
||||
destinationTuple.writer().write(transformed);
|
||||
}
|
||||
|
||||
|
||||
|
@ -326,59 +322,13 @@ public class RecordTransformProxy extends PythonProcessorProxy {
|
|||
* A tuple representing the name of a Relationship to which a Record should be transferred and an optional Partition that may distinguish
|
||||
* a Record from other Records going to the same Relationship
|
||||
*/
|
||||
private static class RecordGroupingKey {
|
||||
private final String relationship;
|
||||
private final Map<String, Object> partition;
|
||||
private record RecordGroupingKey(String relationship, Map<String, Object> partition) {
|
||||
|
||||
public RecordGroupingKey(final String relationship, final Map<String, Object> partition) {
|
||||
this.relationship = relationship;
|
||||
this.partition = partition;
|
||||
}
|
||||
|
||||
public String getRelationship() {
|
||||
return relationship;
|
||||
}
|
||||
|
||||
public Map<String, Object> getPartition() {
|
||||
return partition;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(final Object o) {
|
||||
if (this == o) {
|
||||
return true;
|
||||
}
|
||||
if (o == null || getClass() != o.getClass()) {
|
||||
return false;
|
||||
}
|
||||
final RecordGroupingKey that = (RecordGroupingKey) o;
|
||||
return Objects.equals(relationship, that.relationship) && Objects.equals(partition, that.partition);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(relationship, partition);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* A tuple of a FlowFile and the RecordSetWriter to use for writing to that FlowFile
|
||||
*/
|
||||
private static class DestinationTuple {
|
||||
private final FlowFile flowFile;
|
||||
private final RecordSetWriter writer;
|
||||
|
||||
public DestinationTuple(final FlowFile flowFile, final RecordSetWriter writer) {
|
||||
this.flowFile = flowFile;
|
||||
this.writer = writer;
|
||||
}
|
||||
|
||||
public FlowFile getFlowFile() {
|
||||
return flowFile;
|
||||
}
|
||||
|
||||
public RecordSetWriter getWriter() {
|
||||
return writer;
|
||||
}
|
||||
private record DestinationTuple(FlowFile flowFile, RecordSetWriter writer) {
|
||||
}
|
||||
}
|
||||
|
|
|
@ -74,6 +74,8 @@ import java.util.List;
|
|||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
|
@ -476,7 +478,7 @@ public class PythonControllerInteractionIT {
|
|||
runnerV1.getFlowFilesForRelationship("success").get(0).assertContentEquals("Hello, World");
|
||||
|
||||
// Create an instance of WriteMessage V2
|
||||
final PythonProcessorBridge procV2 = bridge.createProcessor(createId(), "WriteMessage", "0.0.2-SNAPSHOT", true);
|
||||
final PythonProcessorBridge procV2 = createProcessor("WriteMessage", "0.0.2-SNAPSHOT");
|
||||
final FlowFileTransformProxy wrapperV2 = new FlowFileTransformProxy(procV2);
|
||||
final TestRunner runnerV2 = TestRunners.newTestRunner(wrapperV2);
|
||||
runnerV2.enqueue("");
|
||||
|
@ -515,7 +517,7 @@ public class PythonControllerInteractionIT {
|
|||
// Create a Record to transform and transform it
|
||||
final String json = "[{ \"name\": \"John Doe\" }]";
|
||||
final RecordSchema schema = createSimpleRecordSchema("name");
|
||||
final RecordTransform recordTransform = (RecordTransform) processor.getProcessorAdapter().getProcessor();
|
||||
final RecordTransform recordTransform = (RecordTransform) processor.getProcessorAdapter().get().getProcessor();
|
||||
recordTransform.setContext(context);
|
||||
final RecordTransformResult result = recordTransform.transformRecord(json, schema, new EmptyAttributeMap()).get(0);
|
||||
|
||||
|
@ -562,7 +564,7 @@ public class PythonControllerInteractionIT {
|
|||
// Create a Record to transform and transform it
|
||||
final String json = "[{\"name\": \"Jake Doe\", \"father\": { \"name\": \"John Doe\" }}]";
|
||||
final RecordSchema recordSchema = createTwoLevelRecord().getSchema();
|
||||
final RecordTransform recordTransform = (RecordTransform) processor.getProcessorAdapter().getProcessor();
|
||||
final RecordTransform recordTransform = (RecordTransform) processor.getProcessorAdapter().get().getProcessor();
|
||||
recordTransform.setContext(context);
|
||||
final RecordTransformResult result = recordTransform.transformRecord(json, recordSchema, new EmptyAttributeMap()).get(0);
|
||||
|
||||
|
@ -574,10 +576,12 @@ public class PythonControllerInteractionIT {
|
|||
|
||||
|
||||
@Test
|
||||
public void testLogger() {
|
||||
public void testLogger() throws ExecutionException, InterruptedException {
|
||||
bridge.discoverExtensions();
|
||||
|
||||
final String procId = createId();
|
||||
// Create the Processor, but we do not use this.createProcessor() because we want to explicitly have access to the
|
||||
// initialization context to inject in the logger that we want.
|
||||
final PythonProcessorBridge logContentsBridge = bridge.createProcessor(procId, "LogContents", VERSION, true);
|
||||
|
||||
final ComponentLog logger = Mockito.mock(ComponentLog.class);
|
||||
|
@ -593,7 +597,7 @@ public class PythonControllerInteractionIT {
|
|||
}
|
||||
};
|
||||
|
||||
logContentsBridge.initialize(initContext);
|
||||
logContentsBridge.initialize(initContext).get();
|
||||
|
||||
final TestRunner runner = TestRunners.newTestRunner(logContentsBridge.getProcessorProxy());
|
||||
runner.enqueue("Hello World");
|
||||
|
@ -668,9 +672,22 @@ public class PythonControllerInteractionIT {
|
|||
}
|
||||
|
||||
private PythonProcessorBridge createProcessor(final String type) {
|
||||
return createProcessor(type, VERSION);
|
||||
}
|
||||
|
||||
private PythonProcessorBridge createProcessor(final String type, final String version) {
|
||||
bridge.discoverExtensions();
|
||||
final PythonProcessorBridge processor = bridge.createProcessor(createId(), type, VERSION, true);
|
||||
processor.initialize(createInitContext());
|
||||
final PythonProcessorBridge processor = bridge.createProcessor(createId(), type, version, true);
|
||||
final Future<Void> future = processor.initialize(createInitContext());
|
||||
|
||||
try {
|
||||
future.get();
|
||||
} catch (InterruptedException e) {
|
||||
throw new RuntimeException(e);
|
||||
} catch (ExecutionException e) {
|
||||
throw new RuntimeException(e.getCause());
|
||||
}
|
||||
|
||||
return processor;
|
||||
}
|
||||
|
||||
|
|
|
@ -259,34 +259,34 @@ class PropertyDescriptor:
|
|||
|
||||
return values
|
||||
|
||||
def __add_resource_definition(self, gateway, resouce_definition, builder):
|
||||
def __add_resource_definition(self, gateway, resource_definition, builder):
|
||||
allowed_types = 0
|
||||
if resouce_definition.allow_file:
|
||||
if resource_definition.allow_file:
|
||||
allowed_types += 1
|
||||
if resouce_definition.allow_directory:
|
||||
if resource_definition.allow_directory:
|
||||
allowed_types += 1
|
||||
if resouce_definition.allow_url:
|
||||
if resource_definition.allow_url:
|
||||
allowed_types += 1
|
||||
if resouce_definition.allow_text:
|
||||
if resource_definition.allow_text:
|
||||
allowed_types += 1
|
||||
|
||||
array_type = gateway.jvm.org.apache.nifi.components.resource.ResourceType
|
||||
types = gateway.new_array(array_type, allowed_types)
|
||||
index = 0
|
||||
if resouce_definition.allow_file:
|
||||
if resource_definition.allow_file:
|
||||
types[index] = gateway.jvm.org.apache.nifi.components.resource.ResourceType.FILE
|
||||
index += 1
|
||||
if resouce_definition.allow_directory:
|
||||
if resource_definition.allow_directory:
|
||||
types[index] = gateway.jvm.org.apache.nifi.components.resource.ResourceType.DIRECTORY
|
||||
index += 1
|
||||
if resouce_definition.allow_url:
|
||||
if resource_definition.allow_url:
|
||||
types[index] = gateway.jvm.org.apache.nifi.components.resource.ResourceType.URL
|
||||
index += 1
|
||||
if resouce_definition.allow_text:
|
||||
if resource_definition.allow_text:
|
||||
types[index] = gateway.jvm.org.apache.nifi.components.resource.ResourceType.TEXT
|
||||
index += 1
|
||||
|
||||
cardinality = gateway.jvm.org.apache.nifi.components.resource.ResourceCardinality.MULTIPLE if resouce_definition.allow_multiple else \
|
||||
cardinality = gateway.jvm.org.apache.nifi.components.resource.ResourceCardinality.MULTIPLE if resource_definition.allow_multiple else \
|
||||
gateway.jvm.org.apache.nifi.components.resource.ResourceCardinality.SINGLE
|
||||
|
||||
builder.identifiesExternalResource(cardinality, types[0], types[1:])
|
||||
|
|
|
@ -30,6 +30,10 @@
|
|||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-api</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-framework-api</artifactId>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
</project>
|
|
@ -45,6 +45,16 @@ public interface PythonController {
|
|||
*/
|
||||
void discoverExtensions(List<String> directories, String workDirectory);
|
||||
|
||||
|
||||
/**
|
||||
* Downloads any third-party dependencies that are necessary in order to use the processor of the given type and version
|
||||
*
|
||||
* @param type the type of the processor
|
||||
* @param version the version of the processor
|
||||
* @param workDirectory the work directory where the processor's virtual environment should live
|
||||
*/
|
||||
void downloadDependencies(String type, String version, String workDirectory);
|
||||
|
||||
/**
|
||||
* Creates a Processor of the given type and version, returning a PythonProcessorAdapter that can be used for interacting with it.
|
||||
*
|
||||
|
@ -79,4 +89,12 @@ public interface PythonController {
|
|||
*/
|
||||
String getModuleFile(String processorType, String version);
|
||||
|
||||
/**
|
||||
* Returns details about the Processor with the given type and version
|
||||
*
|
||||
* @param type the type of the Processor
|
||||
* @param version the version of the Processor
|
||||
* @return the details that have been discovered
|
||||
*/
|
||||
PythonProcessorDetails getProcessorDetails(String type, String version);
|
||||
}
|
||||
|
|
|
@ -54,4 +54,9 @@ public interface PythonProcessorDetails {
|
|||
* @return the dependencies that must be imported in order to use the Processor
|
||||
*/
|
||||
List<String> getDependencies();
|
||||
|
||||
/**
|
||||
* @return the name of the Java interface that is implemented by the Python Processor
|
||||
*/
|
||||
String getInterface();
|
||||
}
|
||||
|
|
|
@ -17,20 +17,26 @@
|
|||
|
||||
package org.apache.nifi.python.processor;
|
||||
|
||||
import org.apache.nifi.components.AsyncLoadedProcessor.LoadState;
|
||||
import org.apache.nifi.processor.Processor;
|
||||
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.Future;
|
||||
|
||||
/**
|
||||
* A model object that is used to bridge the gap between what is necessary for the Framework to interact
|
||||
* with a Python Processor and what is made available by lower level APIs
|
||||
*/
|
||||
public interface PythonProcessorBridge {
|
||||
/**
|
||||
* @return a proxy for the PythonProcessorAdapter that is responsible for calling the associated method on the Python side
|
||||
* @return a proxy for the PythonProcessorAdapter that is responsible for calling the associated method on the Python side,
|
||||
* or an empty Optional if the adapter has not yet been initialized
|
||||
*/
|
||||
PythonProcessorAdapter getProcessorAdapter();
|
||||
Optional<PythonProcessorAdapter> getProcessorAdapter();
|
||||
|
||||
/**
|
||||
* @return a proxy for the actual Processor implementation that will trigger the appropriate method on the Python side
|
||||
* @return a proxy for the actual Processor implementation that will trigger the appropriate method on the Python side, or an empty Optional
|
||||
* if the processor/adapter have not yet been initialized
|
||||
*/
|
||||
Processor getProcessorProxy();
|
||||
|
||||
|
@ -51,6 +57,10 @@ public interface PythonProcessorBridge {
|
|||
* Initializes the Processor
|
||||
* @param context the initialization context
|
||||
*/
|
||||
void initialize(PythonProcessorInitializationContext context);
|
||||
Future<Void> initialize(PythonProcessorInitializationContext context);
|
||||
|
||||
/**
|
||||
* @return the current state of the Processor loading
|
||||
*/
|
||||
LoadState getLoadState();
|
||||
}
|
||||
|
|
|
@ -48,6 +48,20 @@ class Controller:
|
|||
def discoverExtensions(self, dirs, work_dir):
|
||||
self.extensionManager.discoverExtensions(dirs, work_dir)
|
||||
|
||||
def getProcessorDetails(self, type, version):
|
||||
processor_details = self.extensionManager.get_processor_details(type, version)
|
||||
if processor_details is None:
|
||||
raise ValueError(f"Invalid Processor Type/Version: {type}/{version}")
|
||||
|
||||
return processor_details
|
||||
|
||||
def downloadDependencies(self, type, version, work_dir):
|
||||
processor_details = self.extensionManager.get_processor_details(type, version)
|
||||
if processor_details is None:
|
||||
raise ValueError(f"Invalid Processor Type/Version: {type}/{version}")
|
||||
|
||||
self.extensionManager.import_external_dependencies(processor_details, work_dir)
|
||||
|
||||
def createProcessor(self, processorType, version, work_dir):
|
||||
processorClass = self.extensionManager.getProcessorClass(processorType, version, work_dir)
|
||||
processor = processorClass(jvm=self.gateway.jvm)
|
||||
|
@ -61,14 +75,6 @@ class Controller:
|
|||
module_file = self.extensionManager.get_module_file(processorType, version)
|
||||
return module_file
|
||||
|
||||
def getProcessorDependencies(self, processorType, version):
|
||||
deps = self.extensionManager.__get_dependencies_for_extension_type(processorType, version)
|
||||
dependencyList = self.gateway.jvm.java.util.ArrayList()
|
||||
for dep in deps:
|
||||
dependencyList.add(dep)
|
||||
|
||||
return dependencyList
|
||||
|
||||
def setGateway(self, gateway):
|
||||
self.gateway = gateway
|
||||
self.extensionManager = ExtensionManager.ExtensionManager(gateway)
|
||||
|
|
|
@ -26,6 +26,7 @@ from pathlib import Path
|
|||
|
||||
logger = logging.getLogger("org.apache.nifi.py4j.ExtensionManager")
|
||||
|
||||
|
||||
# A simple wrapper class to encompass a processor type and its version
|
||||
class ExtensionId:
|
||||
def __init__(self, classname=None, version=None):
|
||||
|
@ -43,14 +44,19 @@ class ExtensionDetails:
|
|||
class Java:
|
||||
implements = ['org.apache.nifi.python.PythonProcessorDetails']
|
||||
|
||||
def __init__(self, gateway, type, version='Unknown', dependencies=None, source_location=None, package_name=None, description=None, tags=None):
|
||||
def __init__(self, gateway, type, interfaces, version='Unknown', dependencies=None, source_location=None, package_name=None, description=None, tags=None):
|
||||
self.gateway = gateway
|
||||
self.type = type
|
||||
|
||||
if interfaces is None:
|
||||
interfaces = []
|
||||
self.interfaces = interfaces
|
||||
|
||||
if dependencies is None:
|
||||
dependencies = []
|
||||
if tags is None:
|
||||
tags = []
|
||||
|
||||
self.type = type
|
||||
self.version = version
|
||||
self.dependencies = dependencies
|
||||
self.source_location = source_location
|
||||
|
@ -87,6 +93,10 @@ class ExtensionDetails:
|
|||
|
||||
return list
|
||||
|
||||
def getInterface(self):
|
||||
if len(self.interfaces) == 0:
|
||||
return None
|
||||
return self.interfaces[0]
|
||||
|
||||
|
||||
|
||||
|
@ -111,6 +121,11 @@ class ExtensionManager:
|
|||
self.gateway = gateway
|
||||
|
||||
|
||||
def get_processor_details(self, classname, version):
|
||||
extension_id = ExtensionId(classname=classname, version=version)
|
||||
return self.processor_details.get(extension_id)
|
||||
|
||||
|
||||
def getProcessorTypes(self):
|
||||
"""
|
||||
:return: a list of Processor types that have been discovered by the #discoverExtensions method
|
||||
|
@ -230,7 +245,6 @@ class ExtensionManager:
|
|||
logger.error("Failed to load Python extensions from module file {0}. This module will be ignored.".format(module_file), exc_info=True)
|
||||
|
||||
|
||||
|
||||
def __gather_extension_details(self, module_file, work_dir, local_dependencies=None):
|
||||
path = Path(module_file)
|
||||
basename = os.path.basename(module_file)
|
||||
|
@ -276,9 +290,6 @@ class ExtensionManager:
|
|||
|
||||
self.module_files_by_extension_type[id] = module_file
|
||||
|
||||
# If there are any dependencies, use pip to install them
|
||||
self.__import_external_dependencies(classname, details.dependencies, work_dir, details.version)
|
||||
|
||||
|
||||
def __get_download_complete_marker_file(self, work_dir, extension_type, version):
|
||||
return os.path.join(work_dir, 'extensions', extension_type, version, 'dependency-download.complete')
|
||||
|
@ -299,9 +310,9 @@ class ExtensionManager:
|
|||
class_nodes = self.__get_class_nodes(root_node)
|
||||
|
||||
for class_node in class_nodes:
|
||||
logger.debug("Checking if class %s is a processor" % class_node.name)
|
||||
logger.debug(f"Checking if class {class_node.name} is a processor")
|
||||
if self.__is_processor_class_node(class_node):
|
||||
logger.info("Discovered Processor class {0} in module {1}".format(class_node.name, module_file))
|
||||
logger.info(f"Discovered Processor class {class_node.name} in module {module_file}")
|
||||
details = self.__get_processor_details(class_node, module_file)
|
||||
details_by_class[class_node.name] = details
|
||||
|
||||
|
@ -323,34 +334,50 @@ class ExtensionManager:
|
|||
"""
|
||||
|
||||
# Look for a 'Java' sub-class
|
||||
interfaces = self.__get_java_interfaces(class_node)
|
||||
return len(interfaces) > 0
|
||||
|
||||
|
||||
def __get_java_interfaces(self, class_node):
|
||||
# Get all class definition nodes
|
||||
child_class_nodes = self.__get_class_nodes(class_node)
|
||||
|
||||
interfaces = []
|
||||
for child_class_node in child_class_nodes:
|
||||
if child_class_node.name == 'Java':
|
||||
# Look for an assignment that assigns values to the `implements` keyword
|
||||
assignment_nodes = self.__get_assignment_nodes(child_class_node)
|
||||
for assignment_node in assignment_nodes:
|
||||
if (len(assignment_node.targets) == 1 and assignment_node.targets[0].id == 'implements'):
|
||||
assigned_values = assignment_node.value.elts
|
||||
for assigned_value in assigned_values:
|
||||
if assigned_value.value in self.processorInterfaces:
|
||||
return True
|
||||
return False
|
||||
# Look for a 'Java' sub-class
|
||||
if child_class_node.name != 'Java':
|
||||
continue
|
||||
|
||||
# Look for an assignment that assigns values to the `implements` keyword
|
||||
assignment_nodes = self.__get_assignment_nodes(child_class_node)
|
||||
for assignment_node in assignment_nodes:
|
||||
targets = assignment_node.targets
|
||||
if (len(targets) == 1 and targets[0].id == 'implements'):
|
||||
assigned_values = assignment_node.value.elts
|
||||
for assigned_value in assigned_values:
|
||||
if assigned_value.value in self.processorInterfaces:
|
||||
interfaces.append(assigned_value.value)
|
||||
|
||||
return interfaces
|
||||
|
||||
|
||||
def __get_processor_details(self, class_node, module_file):
|
||||
# Look for a 'ProcessorDetails' class
|
||||
child_class_nodes = self.__get_class_nodes(class_node)
|
||||
|
||||
# Get the Java interfaces that it implements
|
||||
interfaces = self.__get_java_interfaces(class_node)
|
||||
|
||||
for child_class_node in child_class_nodes:
|
||||
if child_class_node.name == 'ProcessorDetails':
|
||||
logger.debug('Found ProcessorDetails class in %s' % class_node.name)
|
||||
logger.debug(f"Found ProcessorDetails class in {class_node.name}")
|
||||
version = self.__get_processor_version(child_class_node, class_node.name)
|
||||
dependencies = self.__get_processor_dependencies(child_class_node, class_node.name)
|
||||
description = self.__get_processor_description(child_class_node, class_node.name)
|
||||
tags = self.__get_processor_tags(child_class_node, class_node.name)
|
||||
|
||||
return ExtensionDetails(gateway=self.gateway,
|
||||
interfaces=interfaces,
|
||||
type=class_node.name,
|
||||
version=version,
|
||||
dependencies=dependencies,
|
||||
|
@ -359,6 +386,7 @@ class ExtensionManager:
|
|||
tags=tags)
|
||||
|
||||
return ExtensionDetails(gateway=self.gateway,
|
||||
interfaces=interfaces,
|
||||
type=class_node.name,
|
||||
version='Unknown',
|
||||
dependencies=[],
|
||||
|
@ -368,7 +396,8 @@ class ExtensionManager:
|
|||
def __get_processor_version(self, details_node, class_name):
|
||||
assignment_nodes = self.__get_assignment_nodes(details_node)
|
||||
for assignment_node in assignment_nodes:
|
||||
if (len(assignment_node.targets) == 1 and assignment_node.targets[0].id == 'version'):
|
||||
targets = assignment_node.targets
|
||||
if (len(targets) == 1 and targets[0].id == 'version'):
|
||||
assigned_values = assignment_node.value.value
|
||||
logger.info("Found version of {0} to be {1}".format(class_name, assigned_values))
|
||||
return assigned_values
|
||||
|
@ -395,7 +424,8 @@ class ExtensionManager:
|
|||
def __get_assigned_list(self, details_node, class_name, element_name):
|
||||
assignment_nodes = self.__get_assignment_nodes(details_node)
|
||||
for assignment_node in assignment_nodes:
|
||||
if (len(assignment_node.targets) == 1 and assignment_node.targets[0].id == element_name):
|
||||
targets = assignment_node.targets
|
||||
if (len(targets) == 1 and targets[0].id == element_name):
|
||||
assigned_values = assignment_node.value.elts
|
||||
declared_dependencies = []
|
||||
for assigned_value in assigned_values:
|
||||
|
@ -410,7 +440,8 @@ class ExtensionManager:
|
|||
def __get_processor_description(self, details_node, class_name):
|
||||
assignment_nodes = self.__get_assignment_nodes(details_node)
|
||||
for assignment_node in assignment_nodes:
|
||||
if (len(assignment_node.targets) == 1 and assignment_node.targets[0].id == 'description'):
|
||||
targets = assignment_node.targets
|
||||
if (len(targets) == 1 and targets[0].id == 'description'):
|
||||
return assignment_node.value.value
|
||||
|
||||
# No description found
|
||||
|
@ -429,7 +460,10 @@ class ExtensionManager:
|
|||
return assignment_nodes
|
||||
|
||||
|
||||
def __import_external_dependencies(self, class_name, dependencies, work_dir, extension_version):
|
||||
def import_external_dependencies(self, processor_details, work_dir):
|
||||
class_name = processor_details.getProcessorType()
|
||||
extension_version = processor_details.getProcessorVersion()
|
||||
|
||||
completion_marker_file = self.__get_download_complete_marker_file(work_dir, class_name, extension_version)
|
||||
target_dir = os.path.dirname(completion_marker_file)
|
||||
|
||||
|
@ -440,32 +474,26 @@ class ExtensionManager:
|
|||
logger.info("All dependencies have already been imported for {0}".format(class_name))
|
||||
return True
|
||||
|
||||
success = False
|
||||
dependencies = processor_details.getDependencies()
|
||||
if len(dependencies) > 0:
|
||||
python_cmd = os.getenv("PYTHON_CMD")
|
||||
args = [python_cmd, '-m', 'pip', 'install', '--target', target_dir]
|
||||
for dep in dependencies:
|
||||
args.append(dep)
|
||||
|
||||
logger.info("Importing dependencies {0} for {1} to {2} using command {3}".format(dependencies, class_name, target_dir, args))
|
||||
logger.info(f"Importing dependencies {dependencies} for {class_name} to {target_dir} using command {args}")
|
||||
result = subprocess.run(args)
|
||||
|
||||
if result.returncode == 0:
|
||||
logger.info("Successfully imported requirements for {0} to {1}".format(class_name, target_dir))
|
||||
success = True
|
||||
logger.info(f"Successfully imported requirements for {class_name} to {target_dir}")
|
||||
else:
|
||||
logger.error("Failed to import requirements for {0}: process exited with status code {1}".format(class_name, result))
|
||||
return False
|
||||
raise RuntimeError(f"Failed to import requirements for {class_name}: process exited with status code {result}")
|
||||
else:
|
||||
logger.info("No dependencies to import for {0}".format(class_name))
|
||||
success = True
|
||||
logger.info(f"No dependencies to import for {class_name}")
|
||||
|
||||
if success:
|
||||
# Write a completion Marker File
|
||||
with open(completion_marker_file, "w") as file:
|
||||
file.write("True")
|
||||
|
||||
return success
|
||||
# Write a completion Marker File
|
||||
with open(completion_marker_file, "w") as file:
|
||||
file.write("True")
|
||||
|
||||
|
||||
def __load_extension_module(self, file, local_dependencies):
|
||||
|
@ -475,7 +503,7 @@ class ExtensionManager:
|
|||
if local_dependency == file:
|
||||
continue
|
||||
|
||||
logger.debug("Loading local dependency {0} before loading {1}".format(local_dependency, file))
|
||||
logger.debug(f"Loading local dependency {local_dependency} before loading {file}")
|
||||
self.__load_extension_module(local_dependency, None)
|
||||
|
||||
|
||||
|
@ -502,14 +530,14 @@ class ExtensionManager:
|
|||
# Load the module
|
||||
sys.modules[moduleName] = module
|
||||
moduleSpec.loader.exec_module(module)
|
||||
logger.info("Loaded module %s" % moduleName)
|
||||
logger.info(f"Loaded module {moduleName}")
|
||||
|
||||
# Find the Processor class and return it
|
||||
for name, member in inspect.getmembers(module):
|
||||
if inspect.isclass(member):
|
||||
logger.debug('Found class: %s' % member)
|
||||
logger.debug(f"Found class: {member}")
|
||||
if self.__is_processor_class(member):
|
||||
logger.debug('Found Processor: %s' % member)
|
||||
logger.debug(f"Found Processor: {member}")
|
||||
return member
|
||||
|
||||
return None
|
||||
|
@ -536,6 +564,6 @@ class ExtensionManager:
|
|||
# The class implements something. Check if it implements Processor
|
||||
for interface in instance.implements:
|
||||
if interface in self.processorInterfaces:
|
||||
logger.debug('%s implements Processor' % potentialProcessorClass)
|
||||
logger.debug(f"{potentialProcessorClass} implements Processor")
|
||||
return True
|
||||
return False
|
||||
|
|
|
@ -139,7 +139,7 @@ public class PythonProcessorIT extends NiFiSystemIT {
|
|||
final String headerLine = lines[0];
|
||||
final List<String> headers = Stream.of(headerLine.split(","))
|
||||
.map(String::trim)
|
||||
.collect(Collectors.toList());
|
||||
.toList();
|
||||
assertTrue(headers.contains("name"));
|
||||
assertTrue(headers.contains("age"));
|
||||
assertTrue(headers.contains("color"));
|
||||
|
|
|
@ -263,3 +263,5 @@ nifi.kerberos.spnego.authentication.expiration=12 hours
|
|||
# external properties files for variable registry
|
||||
# supports a comma delimited list of file locations
|
||||
nifi.variable.registry.properties=
|
||||
|
||||
nifi.flow.analysis.background.task.schedule=5 mins
|
||||
|
|
Loading…
Reference in New Issue