NIFI-12616 Added Processor Documentation Support for Python

- Added some Use Case docs for Python processors and updated Runtime Manifests to include Python based processors as well as Use Case/MultiProcessorUseCase documentation elements. Refactored/cleaned up some of the Python code and added unit tests.

- Added python-unit-tests profile and enabled on Ubuntu and macOS GitHub workflows

This closes #8253

Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
Mark Payne 2024-01-03 15:06:03 -05:00 committed by exceptionfactory
parent 2212afe482
commit 2acc1038c9
No known key found for this signature in database
40 changed files with 1589 additions and 345 deletions

View File

@ -170,6 +170,7 @@ jobs:
${{ env.MAVEN_VERIFY_COMMAND }}
${{ env.MAVEN_BUILD_PROFILES }}
-P report-code-coverage
-P python-unit-tests
${{ env.MAVEN_PROJECTS }}
- name: Codecov
uses: codecov/codecov-action@v3
@ -238,6 +239,7 @@ jobs:
${{ env.MAVEN_COMMAND }}
${{ env.MAVEN_VERIFY_COMMAND }}
${{ env.MAVEN_BUILD_PROFILES }}
-P python-unit-tests
${{ env.MAVEN_PROJECTS }}
- name: Upload Test Reports
uses: actions/upload-artifact@v3

1
.gitignore vendored
View File

@ -18,3 +18,4 @@ nb-configuration.xml
.vscode/
.java-version
/nifi-nar-bundles/nifi-py4j-bundle/nifi-python-extension-api/src/main/python/dist/
__pycache__

View File

@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.c2.protocol.component.api;
import io.swagger.v3.oas.annotations.media.Schema;
import java.io.Serializable;
import java.util.List;
public class MultiProcessorUseCase implements Serializable {
private String description;
private String notes;
private List<String> keywords;
private List<ProcessorConfiguration> configurations;
@Schema(description="A description of the use case")
public String getDescription() {
return description;
}
public void setDescription(final String description) {
this.description = description;
}
@Schema(description="Any pertinent notes about the use case")
public String getNotes() {
return notes;
}
public void setNotes(final String notes) {
this.notes = notes;
}
@Schema(description="Keywords that pertain to the use csae")
public List<String> getKeywords() {
return keywords;
}
public void setKeywords(final List<String> keywords) {
this.keywords = keywords;
}
@Schema(description="A description of how to configure the Processor to perform the task described in the use case")
public List<ProcessorConfiguration> getConfigurations() {
return configurations;
}
public void setConfigurations(final List<ProcessorConfiguration> configurations) {
this.configurations = configurations;
}
}

View File

@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.c2.protocol.component.api;
import io.swagger.v3.oas.annotations.media.Schema;
import java.io.Serializable;
public class ProcessorConfiguration implements Serializable {
private String processorClassName;
private String configuration;
@Schema(description="The fully qualified classname of the Processor that should be used to accomplish the use case")
public String getProcessorClassName() {
return processorClassName;
}
public void setProcessorClassName(final String processorClassName) {
this.processorClassName = processorClassName;
}
@Schema(description="A description of how the Processor should be configured in order to accomplish the use case")
public String getConfiguration() {
return configuration;
}
public void setConfiguration(final String configuration) {
this.configuration = configuration;
}
}

View File

@ -51,7 +51,11 @@ public class ProcessorDefinition extends ConfigurableExtensionDefinition {
private List<Attribute> readsAttributes;
private List<Attribute> writesAttributes;
@Schema(description = "Any input requirements this processor has.")
private List<UseCase> useCases;
private List<MultiProcessorUseCase> multiProcessorUseCases;
@Schema(description="Any input requirements this processor has.")
public InputRequirement.Requirement getInputRequirement() {
return inputRequirement;
}
@ -225,4 +229,22 @@ public class ProcessorDefinition extends ConfigurableExtensionDefinition {
public void setWritesAttributes(List<Attribute> writesAttributes) {
this.writesAttributes = writesAttributes;
}
@Schema(description="A list of use cases that have been documented for this Processor")
public List<UseCase> getUseCases() {
return useCases;
}
public void setUseCases(final List<UseCase> useCases) {
this.useCases = useCases;
}
@Schema(description="A list of use cases that have been documented that involve this Processor in conjunction with other Processors")
public List<MultiProcessorUseCase> getMultiProcessorUseCases() {
return multiProcessorUseCases;
}
public void setMultiProcessorUseCases(final List<MultiProcessorUseCase> multiProcessorUseCases) {
this.multiProcessorUseCases = multiProcessorUseCases;
}
}

View File

@ -0,0 +1,77 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.c2.protocol.component.api;
import io.swagger.v3.oas.annotations.media.Schema;
import org.apache.nifi.annotation.behavior.InputRequirement;
import java.io.Serializable;
import java.util.List;
public class UseCase implements Serializable {
private String description;
private String notes;
private List<String> keywords;
private String configuration;
private InputRequirement.Requirement inputRequirement;
@Schema(description="A description of the use case")
public String getDescription() {
return description;
}
public void setDescription(final String description) {
this.description = description;
}
@Schema(description="Any pertinent notes about the use case")
public String getNotes() {
return notes;
}
public void setNotes(final String notes) {
this.notes = notes;
}
@Schema(description="Keywords that pertain to the use case")
public List<String> getKeywords() {
return keywords;
}
public void setKeywords(final List<String> keywords) {
this.keywords = keywords;
}
@Schema(description="A description of how to configure the Processor to perform the task described in the use case")
public String getConfiguration() {
return configuration;
}
public void setConfiguration(final String configuration) {
this.configuration = configuration;
}
@Schema(description="Specifies whether an incoming FlowFile is expected for this use case")
public InputRequirement.Requirement getInputRequirement() {
return inputRequirement;
}
public void setInputRequirement(final InputRequirement.Requirement inputRequirement) {
this.inputRequirement = inputRequirement;
}
}

View File

@ -424,6 +424,88 @@ that there are no longer any invocations of the `transform` method running when
[[documenting_use_cases]]
== Documenting Use Cases
No matter how powerful a piece of software is, it has no value unless people are able to use it. To that end, documentation of Processors is
very important. While a description of the Processor should be provided in the `ProcessorDetails` class and each PropertyDescriptor is expected to have a description,
it is usually helpful to also call out specific use cases that can be performed by the Processor. This is particularly important for Processors that perform
more generalized transformations on objects, where a single Processor may be capable of performing multiple tasks, based on its configuration.
[[use_case_decorator]]
=== The `@use_case` Decorator
The `@use_case` decorator, defined in the `nifiapi.documentation` module can facilitate this. The decorator takes four arguments:
- `description`: A simple 1 (at most 2) sentence description of the use case. Generally, this should not include any extraneous details,
such as caveats, etc. Those can be provided using the `notes` argument. The description is required.
- `notes`: Most of the time, 1-2 sentences is sufficient to describe a use case. Those 1-2 sentence should then be returned
by the `description`. In the event that the description is not sufficient, details may be provided to
further explain, by providing caveats, etc. This is optional.
- `keywords`: An array of keywords that can be associated with the use case. This is optional.
- `configuration`: A description of how to configure the Processor for this particular use case. This may include explicit values to set for some properties,
and may include instructions on how to choose the appropriate value for other properties. The configuration is required.
A single Processor may have multiple `@use_case` decorators.
[[multi_processor_use_case_decorator]]
=== The `@multi_processor_use_case` Decorator
When designing and creating Processors, it is important to keep in mind the idea of loose coupling. One Processor should not be dependent on another Processor
in order to perform its task. That being said, it is often advantageous to build Processors that are designed to work well together. For example, a Processor that
is able to perform a listing of files in a directory can provide an important capability in and of itself. Similarly, a Processor that is able to ingest the contents
of a specific file and make that file's contents the contents of a FlowFile is also an important capability in and of itself. But far more powerful than either of these
individual capabilities is the notion of being able to compose a flow that lists all files in a directory and then ingests each of those files as a FlowFile. This is
done by using a combination of the two. As such, it is important that the two Processors be able to work together in such a way that the output of the first is
easily understood as the input of the second.
In this case, it makes sense to document this composition of Processors as a use case so that users can understand how to compose such a pipeline. This is accomplished
by using the `@multi_processor_use_case` decorator. This decorator is very similar to the <<use_case_decorator>> but instead of a `configuration` element, it has a
`configurations` element, which is a `list` of `ProcessorConfiguration` objects, where each `ProcessorConfiguration` object has both a `processor_type`, which is the
name of the Processor, and a `configuration` that explains how to configure that particular Processor. The `configuration` element typically also explains how to connect
outbound Relationships.
For example, we might use these decorators as such:
----
@use_case(description="Retrieve the contents of a given file on disk and create a FlowFile from it without modifying the file",
keywords=["file", "filesystem"],
configuration="""
Set the 'Filename' property to the fully qualified path of the file to ingest
Set the 'Completion Strategy' to 'None'
""")
@use_case(description="Retrieve the contents of a given file on disk and create a FlowFile from it, deleting the local file upon success",
keywords=["file", "filesystem"],
configuration="""
Set the 'Filename' property to the fully qualified path of the file to ingest
Set the 'Completion Strategy' to 'Delete'
""")
@multi_processor_use_case(
description="Ingest all files from a landing directory on the filesystem and delete them after ingesting them.",
keywords=["file", "filesystem", "landing directory"],
configurations=[
ProcessorConfiguration(
processor_type="org.apache.nifi.processors.standard.ListFile",
configuration="""
Set 'Input Directory' to the directory that files should be ingested from
Set 'Input Directory Location' to 'Local'
"""
),
ProcessorConfiguration(
processor_type="FetchFile",
configuration="""
Set the 'Filename' property to `${absolute.path}/${filename}`
Set the 'Completion Strategy' to 'Delete'
"""
)
])
class FetchFile(FlowFileTransform):
----
Note that in this case, we are able to specifically tell the user that the Filename property of FetchFile should be set to the value `${absolute.path}/${filename}`
because we know that the ListFile Processor will produce these attributes for us.
[[requirements]]
== Requirements

View File

@ -23,6 +23,8 @@ import org.apache.nifi.c2.protocol.component.api.ConfigurableComponentDefinition
import org.apache.nifi.c2.protocol.component.api.ControllerServiceDefinition;
import org.apache.nifi.c2.protocol.component.api.DefinedType;
import org.apache.nifi.c2.protocol.component.api.ExtensionComponent;
import org.apache.nifi.c2.protocol.component.api.MultiProcessorUseCase;
import org.apache.nifi.c2.protocol.component.api.ProcessorConfiguration;
import org.apache.nifi.c2.protocol.component.api.ProcessorDefinition;
import org.apache.nifi.c2.protocol.component.api.PropertyAllowableValue;
import org.apache.nifi.c2.protocol.component.api.PropertyDependency;
@ -33,6 +35,7 @@ import org.apache.nifi.c2.protocol.component.api.ReportingTaskDefinition;
import org.apache.nifi.c2.protocol.component.api.Restriction;
import org.apache.nifi.c2.protocol.component.api.RuntimeManifest;
import org.apache.nifi.c2.protocol.component.api.SchedulingDefaults;
import org.apache.nifi.c2.protocol.component.api.UseCase;
import org.apache.nifi.components.resource.ResourceCardinality;
import org.apache.nifi.components.resource.ResourceType;
import org.apache.nifi.components.state.Scope;
@ -151,7 +154,7 @@ public class StandardRuntimeManifestBuilder implements RuntimeManifestBuilder {
@Override
public RuntimeManifestBuilder addBundles(final Iterable<ExtensionManifestContainer> extensionManifests) {
extensionManifests.forEach(em -> addBundle(em));
extensionManifests.forEach(this::addBundle);
return this;
}
@ -277,9 +280,45 @@ public class StandardRuntimeManifestBuilder implements RuntimeManifestBuilder {
);
}
final List<UseCase> useCases = extension.getUseCases() == null ? List.of() : extension.getUseCases().stream()
.map(StandardRuntimeManifestBuilder::createUseCase)
.toList();
processorDefinition.setUseCases(useCases);
final List<MultiProcessorUseCase> multiProcessorUseCases = extension.getMultiProcessorUseCases() == null ? List.of() : extension.getMultiProcessorUseCases().stream()
.map(StandardRuntimeManifestBuilder::createMultiProcessorUseCase)
.toList();
processorDefinition.setMultiProcessorUseCases(multiProcessorUseCases);
componentManifestBuilder.addProcessor(processorDefinition);
}
private static UseCase createUseCase(final org.apache.nifi.extension.manifest.UseCase extensionUseCase) {
final UseCase useCase = new UseCase();
useCase.setDescription(extensionUseCase.getDescription());
useCase.setConfiguration(extensionUseCase.getConfiguration());
useCase.setKeywords(extensionUseCase.getKeywords());
useCase.setNotes(extensionUseCase.getNotes());
return useCase;
}
private static MultiProcessorUseCase createMultiProcessorUseCase(final org.apache.nifi.extension.manifest.MultiProcessorUseCase extensionUseCase) {
final MultiProcessorUseCase useCase = new MultiProcessorUseCase();
useCase.setDescription(extensionUseCase.getDescription());
useCase.setKeywords(extensionUseCase.getKeywords());
useCase.setNotes(extensionUseCase.getNotes());
final List<ProcessorConfiguration> processorConfigs = new ArrayList<>();
for (final org.apache.nifi.extension.manifest.ProcessorConfiguration extensionConfig : extensionUseCase.getProcessorConfigurations()) {
final ProcessorConfiguration processorConfig = new ProcessorConfiguration();
processorConfig.setConfiguration(extensionConfig.getConfiguration());
processorConfig.setProcessorClassName(extensionConfig.getProcessorClassName());
processorConfigs.add(processorConfig);
}
useCase.setConfigurations(processorConfigs);
return useCase;
}
private org.apache.nifi.c2.protocol.component.api.Attribute getAttribute(final Attribute attribute) {
final org.apache.nifi.c2.protocol.component.api.Attribute c2Attribute = new org.apache.nifi.c2.protocol.component.api.Attribute();
c2Attribute.setName(attribute.getName());
@ -299,16 +338,11 @@ public class StandardRuntimeManifestBuilder implements RuntimeManifestBuilder {
return null;
}
switch (inputRequirement) {
case INPUT_ALLOWED:
return InputRequirement.Requirement.INPUT_ALLOWED;
case INPUT_REQUIRED:
return InputRequirement.Requirement.INPUT_REQUIRED;
case INPUT_FORBIDDEN:
return InputRequirement.Requirement.INPUT_FORBIDDEN;
default:
throw new IllegalArgumentException("Unknown input requirement: " + inputRequirement.name());
}
return switch (inputRequirement) {
case INPUT_ALLOWED -> InputRequirement.Requirement.INPUT_ALLOWED;
case INPUT_REQUIRED -> InputRequirement.Requirement.INPUT_REQUIRED;
case INPUT_FORBIDDEN -> InputRequirement.Requirement.INPUT_FORBIDDEN;
};
}
private List<Relationship> getSupportedRelationships(final List<org.apache.nifi.extension.manifest.Relationship> relationships) {
@ -460,14 +494,10 @@ public class StandardRuntimeManifestBuilder implements RuntimeManifestBuilder {
}
private Scope getScope(final org.apache.nifi.extension.manifest.Scope sourceScope) {
switch (sourceScope) {
case LOCAL:
return Scope.LOCAL;
case CLUSTER:
return Scope.CLUSTER;
default:
throw new IllegalArgumentException("Unknown scope: " + sourceScope);
}
return switch (sourceScope) {
case LOCAL -> Scope.LOCAL;
case CLUSTER -> Scope.CLUSTER;
};
}
private Restriction createRestriction(final org.apache.nifi.extension.manifest.Restriction extensionRestriction) {
@ -550,8 +580,7 @@ public class StandardRuntimeManifestBuilder implements RuntimeManifestBuilder {
final DependentValues dependentValues = dependency.getDependentValues();
if (dependentValues != null && dependentValues.getValues() != null) {
final List<String> values = new ArrayList<String>();
values.addAll(dependentValues.getValues());
final List<String> values = new ArrayList<>(dependentValues.getValues());
propertyDependency.setDependentValues(values);
}
propertyDependencies.add(propertyDependency);
@ -565,18 +594,15 @@ public class StandardRuntimeManifestBuilder implements RuntimeManifestBuilder {
}
final PropertyResourceDefinition propertyResourceDefinition = new PropertyResourceDefinition();
switch (resourceDefinition.getCardinality()) {
case SINGLE:
propertyResourceDefinition.setCardinality(ResourceCardinality.SINGLE);
break;
case MULTIPLE:
propertyResourceDefinition.setCardinality(ResourceCardinality.MULTIPLE);
break;
}
final ResourceCardinality cardinality = switch (resourceDefinition.getCardinality()) {
case SINGLE -> ResourceCardinality.SINGLE;
case MULTIPLE -> ResourceCardinality.MULTIPLE;
};
propertyResourceDefinition.setCardinality(cardinality);
propertyResourceDefinition.setResourceTypes(
resourceDefinition.getResourceTypes().stream()
.map(rt -> getResourceType(rt))
.map(this::getResourceType)
.collect(Collectors.toSet())
);
@ -584,18 +610,12 @@ public class StandardRuntimeManifestBuilder implements RuntimeManifestBuilder {
}
private ResourceType getResourceType(final org.apache.nifi.extension.manifest.ResourceType resourceType) {
switch (resourceType) {
case URL:
return ResourceType.URL;
case FILE:
return ResourceType.FILE;
case TEXT:
return ResourceType.TEXT;
case DIRECTORY:
return ResourceType.DIRECTORY;
default:
throw new IllegalArgumentException("Unknown resource type: " + resourceType);
}
return switch (resourceType) {
case URL -> ResourceType.URL;
case FILE -> ResourceType.FILE;
case TEXT -> ResourceType.TEXT;
case DIRECTORY -> ResourceType.DIRECTORY;
};
}
private ExpressionLanguageScope getELScope(final org.apache.nifi.extension.manifest.ExpressionLanguageScope elScope) {
@ -603,16 +623,11 @@ public class StandardRuntimeManifestBuilder implements RuntimeManifestBuilder {
return null;
}
switch (elScope) {
case NONE:
return ExpressionLanguageScope.NONE;
case FLOWFILE_ATTRIBUTES:
return ExpressionLanguageScope.FLOWFILE_ATTRIBUTES;
case ENVIRONMENT:
return ExpressionLanguageScope.ENVIRONMENT;
default:
throw new IllegalArgumentException("Unknown Expression Language Scope: " + elScope.name());
}
return switch (elScope) {
case NONE -> ExpressionLanguageScope.NONE;
case FLOWFILE_ATTRIBUTES -> ExpressionLanguageScope.FLOWFILE_ATTRIBUTES;
case ENVIRONMENT -> ExpressionLanguageScope.ENVIRONMENT;
};
}
private List<PropertyAllowableValue> getPropertyAllowableValues(final List<AllowableValue> allowableValues) {

View File

@ -96,9 +96,9 @@ public class ClassLoaderAwarePythonBridge implements PythonBridge {
}
@Override
public AsyncLoadedProcessor createProcessor(final String identifier, final String type, final String version, final boolean preferIsolatedProcess) {
public AsyncLoadedProcessor createProcessor(final String identifier, final String type, final String version, final boolean preferIsolatedProcess, final boolean initialize) {
try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(classLoader)) {
return delegate.createProcessor(identifier, type, version, preferIsolatedProcess);
return delegate.createProcessor(identifier, type, version, preferIsolatedProcess, initialize);
}
}

View File

@ -866,7 +866,7 @@ public class ExtensionBuilder {
// TODO: This is a hack because there's a bug in the UI that causes it to not load extensions that don't have a `.` in the type.
final String processorType = type.startsWith("python.") ? type.substring("python.".length()) : type;
final Processor processor = pythonBridge.createProcessor(identifier, processorType, bundleCoordinate.getVersion(), true);
final Processor processor = pythonBridge.createProcessor(identifier, processorType, bundleCoordinate.getVersion(), true, true);
final ComponentLog componentLog = new SimpleProcessLogger(identifier, processor, new StandardLoggingContext(null));
final TerminationAwareLogger terminationAwareLogger = new TerminationAwareLogger(componentLog);

View File

@ -20,10 +20,29 @@ import org.apache.nifi.bundle.Bundle;
import org.apache.nifi.bundle.BundleDetails;
import org.apache.nifi.c2.protocol.component.api.BuildInfo;
import org.apache.nifi.c2.protocol.component.api.RuntimeManifest;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.extension.manifest.AllowableValue;
import org.apache.nifi.extension.manifest.ControllerServiceDefinition;
import org.apache.nifi.extension.manifest.ExpressionLanguageScope;
import org.apache.nifi.extension.manifest.Extension;
import org.apache.nifi.extension.manifest.ExtensionManifest;
import org.apache.nifi.extension.manifest.ExtensionType;
import org.apache.nifi.extension.manifest.InputRequirement;
import org.apache.nifi.extension.manifest.MultiProcessorUseCase;
import org.apache.nifi.extension.manifest.ProcessorConfiguration;
import org.apache.nifi.extension.manifest.Property;
import org.apache.nifi.extension.manifest.UseCase;
import org.apache.nifi.extension.manifest.parser.ExtensionManifestParser;
import org.apache.nifi.nar.ExtensionDefinition;
import org.apache.nifi.nar.ExtensionManager;
import org.apache.nifi.nar.NarClassLoadersHolder;
import org.apache.nifi.nar.PythonBundle;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.python.PythonProcessorDetails;
import org.apache.nifi.python.processor.documentation.MultiProcessorUseCaseDetails;
import org.apache.nifi.python.processor.documentation.ProcessorConfigurationDetails;
import org.apache.nifi.python.processor.documentation.PropertyDescription;
import org.apache.nifi.python.processor.documentation.UseCaseDetails;
import org.apache.nifi.runtime.manifest.ExtensionManifestContainer;
import org.apache.nifi.runtime.manifest.RuntimeManifestBuilder;
import org.apache.nifi.runtime.manifest.impl.SchedulingDefaultsFactory;
@ -33,12 +52,14 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
@ -94,16 +115,186 @@ public class StandardRuntimeManifestService implements RuntimeManifestService {
getExtensionManifest(bundle).ifPresent(manifestBuilder::addBundle);
}
getPythonExtensionManifests().forEach(manifestBuilder::addBundle);
return manifestBuilder.build();
}
private List<ExtensionManifestContainer> getPythonExtensionManifests() {
final Map<String, List<Extension>> extensionsPerVersion = new HashMap<>();
final Set<ExtensionDefinition> processorDefinitions = extensionManager.getExtensions(Processor.class);
for (final ExtensionDefinition definition : processorDefinitions) {
if (!PythonBundle.isPythonCoordinate(definition.getBundle().getBundleDetails().getCoordinate())) {
continue;
}
final PythonProcessorDetails pythonProcessorDetails = extensionManager.getPythonProcessorDetails(definition.getImplementationClassName(), definition.getVersion());
if (pythonProcessorDetails == null) {
LOGGER.debug("Could not find Python Processor Details for {} version {}", definition.getImplementationClassName(), definition.getVersion());
continue;
}
final Extension extension = createExtension(pythonProcessorDetails);
final List<Extension> extensions = extensionsPerVersion.computeIfAbsent(definition.getVersion(), version -> new ArrayList<>());
extensions.add(extension);
}
if (extensionsPerVersion.isEmpty()) {
return List.of();
}
final List<ExtensionManifestContainer> containers = new ArrayList<>();
for (final Map.Entry<String, List<Extension>> entry : extensionsPerVersion.entrySet()) {
final String version = entry.getKey();
final List<Extension> extensions = entry.getValue();
final ExtensionManifest extensionManifest = new ExtensionManifest();
extensionManifest.setGroupId(PythonBundle.GROUP_ID);
extensionManifest.setArtifactId(PythonBundle.ARTIFACT_ID);
extensionManifest.setVersion(version);
extensionManifest.setExtensions(extensions);
containers.add(new ExtensionManifestContainer(extensionManifest, Map.of()));
}
return containers;
}
private Extension createExtension(final PythonProcessorDetails pythonProcessorDetails) {
final Extension extension = new Extension();
extension.setDescription(pythonProcessorDetails.getCapabilityDescription());
extension.setName(pythonProcessorDetails.getProcessorType());
extension.setInputRequirement(InputRequirement.INPUT_REQUIRED);
extension.setSupportsBatching(true);
extension.setType(ExtensionType.PROCESSOR);
extension.setTriggerWhenEmpty(false);
extension.setTriggerSerially(false);
extension.setTriggerWhenAnyDestinationAvailable(false);
final List<org.apache.nifi.extension.manifest.Relationship> relationships = new ArrayList<>();
extension.setRelationships(relationships);
for (final String relationshipName : List.of("success", "failure", "original")) {
final org.apache.nifi.extension.manifest.Relationship relationship = new org.apache.nifi.extension.manifest.Relationship();
relationship.setAutoTerminated(false);
relationship.setName(relationshipName);
relationships.add(relationship);
}
final List<UseCase> useCases = getUseCases(pythonProcessorDetails);
extension.setUseCases(useCases);
final List<MultiProcessorUseCase> multiProcessorUseCases = getMultiProcessorUseCases(pythonProcessorDetails);
extension.setMultiProcessorUseCases(multiProcessorUseCases);
final List<PropertyDescription> propertyDescriptions = pythonProcessorDetails.getPropertyDescriptions();
final List<Property> manifestProperties = propertyDescriptions == null ? List.of() : propertyDescriptions.stream()
.map(StandardRuntimeManifestService::createManifestProperty)
.toList();
extension.setProperties(manifestProperties);
return extension;
}
private static Property createManifestProperty(final PropertyDescription propertyDescription) {
final Property property = new Property();
property.setName(propertyDescription.getName());
property.setDescription(propertyDescription.getDescription());
property.setDefaultValue(propertyDescription.getDefaultValue());
property.setDisplayName(propertyDescription.getDisplayName());
property.setDynamicallyModifiesClasspath(false);
property.setDynamic(false);
try {
property.setExpressionLanguageScope(ExpressionLanguageScope.valueOf(propertyDescription.getExpressionLanguageScope()));
} catch (final Exception e) {
property.setExpressionLanguageScope(ExpressionLanguageScope.NONE);
}
property.setRequired(propertyDescription.isRequired());
property.setSensitive(propertyDescription.isSensitive());
// TODO: Handle Allowable Values
property.setControllerServiceDefinition(getManifestControllerServiceDefinition(propertyDescription.getControllerServiceDefinition()));
return property;
}
private static ControllerServiceDefinition getManifestControllerServiceDefinition(final String controllerServiceClassName) {
if (controllerServiceClassName == null) {
return null;
}
final ControllerServiceDefinition definition = new ControllerServiceDefinition();
definition.setClassName(controllerServiceClassName);
return definition;
}
private static List<AllowableValue> getManifestAllowableValues(final PropertyDescriptor propertyDescriptor) {
if (propertyDescriptor.getAllowableValues() == null) {
return List.of();
}
final List<AllowableValue> allowableValues = new ArrayList<>();
for (final org.apache.nifi.components.AllowableValue allowableValue : propertyDescriptor.getAllowableValues()) {
final AllowableValue manifestAllowableValue = new AllowableValue();
manifestAllowableValue.setDescription(allowableValue.getDescription());
manifestAllowableValue.setValue(allowableValue.getValue());
manifestAllowableValue.setDisplayName(allowableValue.getDisplayName());
allowableValues.add(manifestAllowableValue);
}
return allowableValues;
}
private static List<UseCase> getUseCases(final PythonProcessorDetails pythonProcessorDetails) {
final List<UseCase> useCases = new ArrayList<>();
for (final UseCaseDetails useCaseDetails : pythonProcessorDetails.getUseCases()) {
final UseCase useCase = new UseCase();
useCases.add(useCase);
useCase.setDescription(useCaseDetails.getDescription());
useCase.setNotes(useCaseDetails.getNotes());
useCase.setKeywords(useCaseDetails.getKeywords());
useCase.setInputRequirement(InputRequirement.INPUT_REQUIRED);
useCase.setConfiguration(useCaseDetails.getConfiguration());
}
return useCases;
}
private static List<MultiProcessorUseCase> getMultiProcessorUseCases(final PythonProcessorDetails pythonProcessorDetails) {
final List<MultiProcessorUseCase> multiProcessorUseCases = new ArrayList<>();
for (final MultiProcessorUseCaseDetails useCaseDetails : pythonProcessorDetails.getMultiProcessorUseCases()) {
final MultiProcessorUseCase useCase = new MultiProcessorUseCase();
multiProcessorUseCases.add(useCase);
useCase.setDescription(useCaseDetails.getDescription());
useCase.setNotes(useCaseDetails.getNotes());
useCase.setKeywords(useCaseDetails.getKeywords());
final List<ProcessorConfiguration> processorConfigurations = new ArrayList<>();
useCase.setProcessorConfigurations(processorConfigurations);
for (final ProcessorConfigurationDetails processorConfig : useCaseDetails.getConfigurations()) {
final ProcessorConfiguration processorConfiguration = new ProcessorConfiguration();
processorConfigurations.add(processorConfiguration);
processorConfiguration.setConfiguration(processorConfig.getConfiguration());
processorConfiguration.setProcessorClassName(processorConfig.getProcessorType());
}
}
return multiProcessorUseCases;
}
private Optional<ExtensionManifestContainer> getExtensionManifest(final Bundle bundle) {
final BundleDetails bundleDetails = bundle.getBundleDetails();
try {
final ExtensionManifest extensionManifest = loadExtensionManifest(bundleDetails);
final Optional<ExtensionManifest> extensionManifest = loadExtensionManifest(bundleDetails);
if (extensionManifest.isEmpty()) {
return Optional.empty();
}
final Map<String, String> additionalDetails = loadAdditionalDetails(bundleDetails);
final ExtensionManifestContainer container = new ExtensionManifestContainer(extensionManifest, additionalDetails);
final ExtensionManifestContainer container = new ExtensionManifestContainer(extensionManifest.get(), additionalDetails);
return Optional.of(container);
} catch (final IOException e) {
LOGGER.error("Unable to load extension manifest for bundle [{}]", bundleDetails.getCoordinate(), e);
@ -111,11 +302,11 @@ public class StandardRuntimeManifestService implements RuntimeManifestService {
}
}
private ExtensionManifest loadExtensionManifest(final BundleDetails bundleDetails) throws IOException {
private Optional<ExtensionManifest> loadExtensionManifest(final BundleDetails bundleDetails) throws IOException {
final File manifestFile = new File(bundleDetails.getWorkingDirectory(), "META-INF/docs/extension-manifest.xml");
if (!manifestFile.exists()) {
throw new FileNotFoundException("Extension manifest files does not exist for "
+ bundleDetails.getCoordinate() + " at " + manifestFile.getAbsolutePath());
LOGGER.warn("There is no extension manifest for bundle [{}]", bundleDetails.getCoordinate());
return Optional.empty();
}
try (final InputStream inputStream = new FileInputStream(manifestFile)) {
@ -125,7 +316,7 @@ public class StandardRuntimeManifestService implements RuntimeManifestService {
extensionManifest.setGroupId(bundleDetails.getCoordinate().getGroup());
extensionManifest.setArtifactId(bundleDetails.getCoordinate().getId());
extensionManifest.setVersion(bundleDetails.getCoordinate().getVersion());
return extensionManifest;
return Optional.of(extensionManifest);
}
}

View File

@ -180,7 +180,7 @@ public class ExtensionDefinition {
}
}
enum ExtensionRuntime {
public enum ExtensionRuntime {
JAVA,
PYTHON;

View File

@ -19,6 +19,7 @@ package org.apache.nifi.nar;
import org.apache.nifi.bundle.Bundle;
import org.apache.nifi.bundle.BundleCoordinate;
import org.apache.nifi.components.ConfigurableComponent;
import org.apache.nifi.python.PythonProcessorDetails;
import java.net.URL;
import java.util.List;
@ -151,6 +152,14 @@ public interface ExtensionManager {
*/
ConfigurableComponent getTempComponent(String classType, BundleCoordinate bundleCoordinate);
/**
* Returns the details about the Python Processor with the given type and version
* @param processorType the type of the Processor
* @param version the version of the Processor
* @return the details for the Python Processor, or <code>null</code> if no Processor can be given that match the given type and version
*/
PythonProcessorDetails getPythonProcessorDetails(String processorType, String version);
/**
* Logs the available class loaders.
*/

View File

@ -96,6 +96,7 @@ public class StandardExtensionDiscoveringManager implements ExtensionDiscovering
private final Map<BundleCoordinate, Bundle> bundleCoordinateBundleLookup = new HashMap<>();
private final Map<ClassLoader, Bundle> classLoaderBundleLookup = new HashMap<>();
private final Map<String, ConfigurableComponent> tempComponentLookup = new HashMap<>();
private final Map<String, List<PythonProcessorDetails>> pythonProcessorDetails = new HashMap<>();
private final Map<String, InstanceClassLoader> instanceClassloaderLookup = new ConcurrentHashMap<>();
private final ConcurrentMap<BaseClassLoaderKey, SharedInstanceClassLoader> sharedBaseClassloaders = new ConcurrentHashMap<>();
@ -201,7 +202,7 @@ public class StandardExtensionDiscoveringManager implements ExtensionDiscovering
final Bundle bundle = new Bundle(bundleDetails, pythonBundle.getClassLoader());
// TODO: This is a workaround because the UI has a bug that causes it not to work properly if the type doesn't have a '.' in it
final String className = "python." + details.getProcessorType();
final String className = PYTHON_TYPE_PREFIX + details.getProcessorType();
final ExtensionDefinition extensionDefinition = new ExtensionDefinition.Builder()
.implementationClassName(className)
.runtime(ExtensionRuntime.PYTHON)
@ -218,6 +219,10 @@ public class StandardExtensionDiscoveringManager implements ExtensionDiscovering
final List<Bundle> bundlesForClass = classNameBundleLookup.computeIfAbsent(className, key -> new ArrayList<>());
bundlesForClass.add(bundle);
bundleCoordinateBundleLookup.putIfAbsent(bundleDetails.getCoordinate(), bundle);
final List<PythonProcessorDetails> detailsList = this.pythonProcessorDetails.computeIfAbsent(details.getProcessorType(), key -> new ArrayList<>());
detailsList.add(details);
logger.info("Discovered Python Processor {}", details.getProcessorType());
} else {
logger.debug("Python Processor {} is already known", details.getProcessorType());
@ -231,6 +236,23 @@ public class StandardExtensionDiscoveringManager implements ExtensionDiscovering
}
}
@Override
public PythonProcessorDetails getPythonProcessorDetails(final String processorType, final String version) {
final String canonicalProcessorType = stripPythonTypePrefix(processorType);
final List<PythonProcessorDetails> detailsList = this.pythonProcessorDetails.get(canonicalProcessorType);
if (detailsList == null) {
return null;
}
for (final PythonProcessorDetails processorDetails : detailsList) {
if (processorDetails.getProcessorVersion().equals(version)) {
return processorDetails;
}
}
return null;
}
private BundleDetails createBundleDetailsWithOverriddenVersion(final BundleDetails details, final String version) {
final BundleCoordinate overriddenCoordinate = new BundleCoordinate(details.getCoordinate().getGroup(), details.getCoordinate().getId(), version);
@ -705,10 +727,10 @@ public class StandardExtensionDiscoveringManager implements ExtensionDiscovering
final ConfigurableComponent tempComponent;
if (PythonBundle.isPythonCoordinate(bundle.getBundleDetails().getCoordinate())) {
// TODO: This is a workaround due to bug in UI. Fix bug in UI.
final String type = classType.startsWith(PYTHON_TYPE_PREFIX) ? classType.substring(PYTHON_TYPE_PREFIX.length()) : classType;
final String type = stripPythonTypePrefix(classType);
final String procId = "temp-component-" + type;
tempComponent = pythonBridge.createProcessor(procId, type, bundleCoordinate.getVersion(), false);
tempComponent = pythonBridge.createProcessor(procId, type, bundleCoordinate.getVersion(), false, false);
} else {
final Class<?> componentClass = Class.forName(classType, true, bundleClassLoader);
tempComponent = (ConfigurableComponent) componentClass.getDeclaredConstructor().newInstance();
@ -729,6 +751,16 @@ public class StandardExtensionDiscoveringManager implements ExtensionDiscovering
}
}
private static String stripPythonTypePrefix(final String value) {
if (value == null) {
return null;
}
if (value.startsWith(PYTHON_TYPE_PREFIX)) {
return value.substring(PYTHON_TYPE_PREFIX.length());
}
return value;
}
private static String getClassBundleKey(final String classType, final BundleCoordinate bundleCoordinate) {
return classType + "_" + bundleCoordinate.getCoordinate();

View File

@ -26,8 +26,11 @@ import java.io.File;
import java.util.Objects;
public class PythonBundle {
private static final BundleCoordinate PYTHON_BUNDLE_COORDINATE = new BundleCoordinate(
"org.apache.nifi", "python-extensions", BundleCoordinate.DEFAULT_VERSION);
public static final String GROUP_ID = "org.apache.nifi";
public static final String ARTIFACT_ID = "python-extensions";
public static final String VERSION = BundleCoordinate.DEFAULT_VERSION;
private static final BundleCoordinate PYTHON_BUNDLE_COORDINATE = new BundleCoordinate(GROUP_ID, ARTIFACT_ID, VERSION);
public static Bundle create(final NiFiProperties properties, final ClassLoader classLoader) {
final File pythonWorkingDirectory = new File(properties.getProperty(NiFiProperties.PYTHON_WORKING_DIRECTORY, NiFiProperties.DEFAULT_PYTHON_WORKING_DIRECTORY));

View File

@ -132,20 +132,21 @@ public class StandardPythonBridge implements PythonBridge {
}
@Override
public AsyncLoadedProcessor createProcessor(final String identifier, final String type, final String version, final boolean preferIsolatedProcess) {
public AsyncLoadedProcessor createProcessor(final String identifier, final String type, final String version, final boolean preferIsolatedProcess, final boolean initialize) {
final PythonProcessorDetails processorDetails = getProcessorTypes().stream()
.filter(details -> details.getProcessorType().equals(type))
.filter(details -> details.getProcessorVersion().equals(version))
.findFirst()
.orElseThrow(() -> new IllegalArgumentException("Unknown Python Processor type: " + type));
.orElseThrow(() -> new IllegalArgumentException("Unknown Python Processor type [%s] or version [%s]".formatted(type, version)));
final String implementedInterface = processorDetails.getInterface();
final Supplier<PythonProcessorBridge> processorBridgeFactory = () -> createProcessorBridge(identifier, type, version, preferIsolatedProcess);
if (FlowFileTransform.class.getName().equals(implementedInterface)) {
return new FlowFileTransformProxy(type, processorBridgeFactory);
return new FlowFileTransformProxy(type, processorBridgeFactory, initialize);
}
if (RecordTransform.class.getName().equals(implementedInterface)) {
return new RecordTransformProxy(type, processorBridgeFactory);
return new RecordTransformProxy(type, processorBridgeFactory, initialize);
}
return null;
}

View File

@ -117,7 +117,7 @@ public class StandardPythonProcessorBridge implements PythonProcessorBridge {
}
sleepMillis = Math.min(sleepMillis * 2, TimeUnit.MINUTES.toMillis(10));
logger.error("Failed to load code for Python Processor {} ({}). Will try again in {} millis", initializationContext.getIdentifier(), getProcessorType(), sleepMillis);
logger.error("Failed to load code for Python Processor {} ({}). Will try again in {} millis", initializationContext.getIdentifier(), getProcessorType(), sleepMillis, e);
try {
Thread.sleep(sleepMillis);

View File

@ -36,8 +36,8 @@ public class FlowFileTransformProxy extends PythonProcessorProxy {
private volatile FlowFileTransform transform;
public FlowFileTransformProxy(final String processorType, final Supplier<PythonProcessorBridge> bridgeFactory) {
super(processorType, bridgeFactory);
public FlowFileTransformProxy(final String processorType, final Supplier<PythonProcessorBridge> bridgeFactory, final boolean initialize) {
super(processorType, bridgeFactory, initialize);
}
@OnScheduled

View File

@ -66,16 +66,18 @@ public abstract class PythonProcessorProxy extends AbstractProcessor implements
REL_ORIGINAL,
REL_FAILURE);
public PythonProcessorProxy(final String processorType, final Supplier<PythonProcessorBridge> bridgeFactory) {
public PythonProcessorProxy(final String processorType, final Supplier<PythonProcessorBridge> bridgeFactory, final boolean initialize) {
this.processorType = processorType;
Thread.ofVirtual().name("Initialize " + processorType).start(() -> {
this.bridge = bridgeFactory.get();
// If initialization context has already been set, initialize bridge.
final PythonProcessorInitializationContext pythonInitContext = initContext;
if (pythonInitContext != null) {
this.bridge.initialize(pythonInitContext);
if (initialize) {
// If initialization context has already been set, initialize bridge.
final PythonProcessorInitializationContext pythonInitContext = initContext;
if (pythonInitContext != null) {
this.bridge.initialize(pythonInitContext);
}
}
});
}

View File

@ -78,8 +78,8 @@ public class RecordTransformProxy extends PythonProcessorProxy {
.build();
public RecordTransformProxy(final String processorType, final Supplier<PythonProcessorBridge> bridgeFactory) {
super(processorType, bridgeFactory);
public RecordTransformProxy(final String processorType, final Supplier<PythonProcessorBridge> bridgeFactory, final boolean initialize) {
super(processorType, bridgeFactory, initialize);
}
@Override

View File

@ -567,7 +567,7 @@ public class PythonControllerInteractionIT {
private Processor createProcessor(final String type, final String version) {
bridge.discoverExtensions();
final AsyncLoadedProcessor processor = bridge.createProcessor(createId(), type, version, true);
final AsyncLoadedProcessor processor = bridge.createProcessor(createId(), type, version, true, true);
final ProcessorInitializationContext initContext = new MockProcessorInitializationContext();
processor.initialize(initContext);

View File

@ -14,4 +14,10 @@
# limitations under the License.
class JvmHolder:
jvm = None
jvm = None
def ArrayList(list: list):
array_list = JvmHolder.jvm.java.util.ArrayList()
for val in list:
array_list.add(val)
return array_list

View File

@ -0,0 +1,142 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from nifiapi.__jvm__ import ArrayList
class ProcessorConfiguration:
class Java:
implements = ['org.apache.nifi.python.processor.documentation.ProcessorConfigurationDetails']
def __init__(self, processor_type: str, configuration: str):
self.processor_type = processor_type
self.configuration = configuration
def getProcessorType(self):
return self.processor_type
def getConfiguration(self):
return self.configuration
def use_case(description: str, configuration: str = None, notes: str = None, keywords: list[str] = None):
"""Decorator to explain how to perform a specific use case with a given processor"""
def decorator(func):
return func
return decorator
def multi_processor_use_case(description: str, configurations: list[ProcessorConfiguration], notes: str = None, keywords: list[str] = None):
"""Decorator to explain how to perform a specific use case that involves the decorated Processor, in addition to additional Processors"""
def decorator(func):
return func
return decorator
class UseCaseDetails:
class Java:
implements = ['org.apache.nifi.python.processor.documentation.UseCaseDetails']
def __init__(self, description: str, notes: str, keywords: list[str], configuration: str):
self.description = description
self.notes = notes
self.keywords = keywords
self.configuration = configuration
def getDescription(self):
return self.description
def getNotes(self):
return self.notes
def getKeywords(self):
return ArrayList(self.keywords)
def getConfiguration(self):
return self.configuration
def __str__(self):
return f"UseCaseDetails[description={self.description}]"
class MultiProcessorUseCaseDetails:
class Java:
implements = ['org.apache.nifi.python.processor.documentation.MultiProcessorUseCaseDetails']
def __init__(self, description: str, notes: str, keywords: list[str], configurations: list[ProcessorConfiguration]):
self.description = description
self.notes = notes
self.keywords = keywords
self.configurations = configurations
def getDescription(self):
return self.description
def getNotes(self):
return self.notes
def getKeywords(self):
return ArrayList(self.keywords)
def getConfigurations(self):
return ArrayList(self.configurations)
def __str__(self):
return f"MultiProcessorUseCaseDetails[description={self.description}]"
class PropertyDescription:
class Java:
implements = ['org.apache.nifi.python.processor.documentation.PropertyDescription']
def __init__(self,
name: str,
description: str,
display_name: str = None,
required: bool = False,
sensitive: bool = True,
default_value: str = None,
expression_language_scope: str = None,
controller_service_definition: str = None):
self.name = name
self.description = description
self.display_name = display_name
self.required = required
self.sensitive = sensitive
self.default_value = default_value
self.expression_language_scope = expression_language_scope
self.controller_service_definition = controller_service_definition
def getName(self):
return self.name
def getDescription(self):
return self.description
def getDisplayName(self):
return self.display_name
def isRequired(self):
return self.required
def isSensitive(self):
return self.sensitive
def getDefaultValue(self):
return self.default_value
def getExpressionLanguageScope(self):
return self.expression_language_scope
def getControllerServiceDefinition(self):
return self.controller_service_definition

View File

@ -64,7 +64,7 @@ public class DisabledPythonBridge implements PythonBridge {
}
@Override
public AsyncLoadedProcessor createProcessor(final String identifier, final String type, final String version, final boolean preferIsolatedProcess) {
public AsyncLoadedProcessor createProcessor(final String identifier, final String type, final String version, final boolean preferIsolatedProcess, final boolean initialize) {
throw new UnsupportedOperationException("Cannot create Processor of type " + type + " because Python extensions are disabled");
}

View File

@ -94,9 +94,10 @@ public interface PythonBridge {
* @param type the Processor's type
* @param version the Processor's version
* @param preferIsolatedProcess whether or not to prefer launching a Python Process that is isolated for just this one instance of the Processor
* @param initialize whether or not to initialize the processor
* @return a PythonProcessorBridge that can be used for interacting with the Processor
*/
AsyncLoadedProcessor createProcessor(String identifier, String type, String version, boolean preferIsolatedProcess);
AsyncLoadedProcessor createProcessor(String identifier, String type, String version, boolean preferIsolatedProcess, boolean initialize);
/**
* A notification that the Processor with the given identifier, type, and version was removed from the flow. This triggers the bridge

View File

@ -17,6 +17,10 @@
package org.apache.nifi.python;
import org.apache.nifi.python.processor.documentation.MultiProcessorUseCaseDetails;
import org.apache.nifi.python.processor.documentation.PropertyDescription;
import org.apache.nifi.python.processor.documentation.UseCaseDetails;
import java.util.List;
public interface PythonProcessorDetails {
@ -54,4 +58,22 @@ public interface PythonProcessorDetails {
* @return the name of the Java interface that is implemented by the Python Processor
*/
String getInterface();
/**
* @return the use cases that have been described by the Python Processor
*/
List<UseCaseDetails> getUseCases();
/**
* @return the multi-processor use cases that have been described by the Python Processor
*/
List<MultiProcessorUseCaseDetails> getMultiProcessorUseCases();
/**
* A list of property descriptions for the known properties. Note that unlikely Java Processors, Python-based Processors are
* more dynamic, and the properties may not all be discoverable
*
* @return a list of descriptions for known properties
*/
List<PropertyDescription> getPropertyDescriptions();
}

View File

@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.python.processor.documentation;
import java.util.List;
public interface MultiProcessorUseCaseDetails {
String getDescription();
String getNotes();
List<String> getKeywords();
List<ProcessorConfigurationDetails> getConfigurations();
}

View File

@ -0,0 +1,24 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.python.processor.documentation;
public interface ProcessorConfigurationDetails {
String getProcessorType();
String getConfiguration();
}

View File

@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.python.processor.documentation;
public interface PropertyDescription {
String getName();
String getDisplayName();
String getDescription();
String getDefaultValue();
boolean isSensitive();
boolean isRequired();
String getExpressionLanguageScope();
String getControllerServiceDefinition();
}

View File

@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.python.processor.documentation;
import java.util.List;
public interface UseCaseDetails {
String getDescription();
String getNotes();
List<String> getKeywords();
String getConfiguration();
}

View File

@ -76,4 +76,46 @@
</plugins>
</build>
<profiles>
<profile>
<id>python-unit-tests</id>
<activation>
<activeByDefault>false</activeByDefault>
</activation>
<build>
<plugins>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
<version>3.1.1</version>
<executions>
<execution>
<id>python-test</id>
<phase>test</phase>
<goals>
<goal>exec</goal>
</goals>
<configuration>
<executable>python3</executable>
<environmentVariables>
<PYTHONPATH>src/main/python/framework:../nifi-python-extension-api/src/main/python/src</PYTHONPATH>
</environmentVariables>
<arguments>
<argument>-m</argument>
<argument>unittest</argument>
<argument>discover</argument>
<argument>-s</argument>
<argument>src/test/python/framework</argument>
<argument>-p</argument>
<argument>Test*.py</argument>
</arguments>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</profile>
</profiles>
</project>

View File

@ -0,0 +1,74 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from nifiapi.__jvm__ import ArrayList
class ExtensionDetails:
class Java:
implements = ['org.apache.nifi.python.PythonProcessorDetails']
def __init__(self, type, interfaces,
version='Unknown',
dependencies=None,
source_location=None,
description=None,
tags=None,
use_cases=None,
multi_processor_use_cases=None,
property_descriptions=None):
self.type = type
self.interfaces = interfaces if interfaces else []
self.dependencies = dependencies if dependencies else []
self.tags = tags if tags else []
self.version = version
self.source_location = source_location
self.description = description
self.use_cases = use_cases if use_cases else {}
self.multi_processor_use_cases = multi_processor_use_cases if multi_processor_use_cases else {}
self.property_descriptions = property_descriptions if property_descriptions else {}
def getProcessorType(self):
return self.type
def getProcessorVersion(self):
return self.version
def getSourceLocation(self):
return self.source_location
def getDependencies(self):
return ArrayList(self.dependencies)
def getCapabilityDescription(self):
return self.description
def getTags(self):
return ArrayList(self.tags)
def getUseCases(self):
return ArrayList(self.use_cases)
def getMultiProcessorUseCases(self):
return ArrayList(self.multi_processor_use_cases)
def getPropertyDescriptions(self):
return ArrayList(self.property_descriptions)
def getInterface(self):
if len(self.interfaces) == 0:
return None
return self.interfaces[0]

View File

@ -13,9 +13,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import ast
import importlib
import importlib.util # Note requires Python 3.4+
import importlib.util
import inspect
import logging
import os
@ -24,7 +23,10 @@ import subprocess
import sys
from pathlib import Path
logger = logging.getLogger("org.apache.nifi.py4j.ExtensionManager")
import ProcessorInspection
logger = logging.getLogger("python.ExtensionManager")
# A simple wrapper class to encompass a processor type and its version
@ -40,67 +42,6 @@ class ExtensionId:
return (self.classname, self.version) == (other.classname, other.version)
class ExtensionDetails:
class Java:
implements = ['org.apache.nifi.python.PythonProcessorDetails']
def __init__(self, gateway, type, interfaces,
version='Unknown',
dependencies=None,
source_location=None,
description=None,
tags=None):
self.gateway = gateway
self.type = type
if interfaces is None:
interfaces = []
self.interfaces = interfaces
if dependencies is None:
dependencies = []
if tags is None:
tags = []
self.version = version
self.dependencies = dependencies
self.source_location = source_location
self.description = description
self.tags = tags
def getProcessorType(self):
return self.type
def getProcessorVersion(self):
return self.version
def getSourceLocation(self):
return self.source_location
def getDependencies(self):
list = self.gateway.jvm.java.util.ArrayList()
for dep in self.dependencies:
list.add(dep)
return list
def getCapabilityDescription(self):
return self.description
def getTags(self):
list = self.gateway.jvm.java.util.ArrayList()
for tag in self.tags:
list.add(tag)
return list
def getInterface(self):
if len(self.interfaces) == 0:
return None
return self.interfaces[0]
class ExtensionManager:
"""
@ -113,7 +54,7 @@ class ExtensionManager:
third-party dependencies have been imported.
"""
processorInterfaces = ['org.apache.nifi.python.processor.FlowFileTransform', 'org.apache.nifi.python.processor.RecordTransform']
processor_interfaces = ['org.apache.nifi.python.processor.FlowFileTransform', 'org.apache.nifi.python.processor.RecordTransform']
processor_details = {}
processor_class_by_name = {}
module_files_by_extension_type = {}
@ -307,166 +248,17 @@ class ExtensionManager:
return self.processor_details[id].dependencies
def __get_processor_classes_and_details(self, module_file):
# Parse the python file
with open(module_file) as file:
root_node = ast.parse(file.read())
class_nodes = ProcessorInspection.get_processor_class_nodes(module_file)
details_by_class = {}
# Get top-level class nodes (e.g., MyProcessor)
class_nodes = self.__get_class_nodes(root_node)
for class_node in class_nodes:
logger.debug(f"Checking if class {class_node.name} is a processor")
if self.__is_processor_class_node(class_node):
logger.info(f"Discovered Processor class {class_node.name} in module {module_file}")
details = self.__get_processor_details(class_node, module_file)
details_by_class[class_node.name] = details
logger.info(f"Discovered Processor class {class_node.name} in module {module_file}")
details = ProcessorInspection.get_processor_details(class_node, module_file)
details_by_class[class_node.name] = details
return details_by_class
def __is_processor_class_node(self, class_node):
"""
Checks if the Abstract Syntax Tree (AST) Node represents a Processor class.
We are looking for any classes within the given module file that look like:
class MyProcessor:
...
class Java:
implements = ['org.apache.nifi.python.processor.FlowFileTransform']
:param class_node: the abstract syntax tree (AST) node
:return: True if the AST Node represents a Python Class that is a Processor, False otherwise
"""
# Look for a 'Java' sub-class
interfaces = self.__get_java_interfaces(class_node)
return len(interfaces) > 0
def __get_java_interfaces(self, class_node):
# Get all class definition nodes
child_class_nodes = self.__get_class_nodes(class_node)
interfaces = []
for child_class_node in child_class_nodes:
# Look for a 'Java' sub-class
if child_class_node.name != 'Java':
continue
# Look for an assignment that assigns values to the `implements` keyword
assignment_nodes = self.__get_assignment_nodes(child_class_node)
for assignment_node in assignment_nodes:
targets = assignment_node.targets
if (len(targets) == 1 and targets[0].id == 'implements'):
assigned_values = assignment_node.value.elts
for assigned_value in assigned_values:
if assigned_value.value in self.processorInterfaces:
interfaces.append(assigned_value.value)
return interfaces
def __get_processor_details(self, class_node, module_file):
# Look for a 'ProcessorDetails' class
child_class_nodes = self.__get_class_nodes(class_node)
# Get the Java interfaces that it implements
interfaces = self.__get_java_interfaces(class_node)
for child_class_node in child_class_nodes:
if child_class_node.name == 'ProcessorDetails':
logger.debug(f"Found ProcessorDetails class in {class_node.name}")
version = self.__get_processor_version(child_class_node, class_node.name)
dependencies = self.__get_processor_dependencies(child_class_node, class_node.name)
description = self.__get_processor_description(child_class_node, class_node.name)
tags = self.__get_processor_tags(child_class_node, class_node.name)
return ExtensionDetails(gateway=self.gateway,
interfaces=interfaces,
type=class_node.name,
version=version,
dependencies=dependencies,
source_location=module_file,
description=description,
tags=tags)
return ExtensionDetails(gateway=self.gateway,
interfaces=interfaces,
type=class_node.name,
version='Unknown',
dependencies=[],
source_location=module_file)
def __get_processor_version(self, details_node, class_name):
assignment_nodes = self.__get_assignment_nodes(details_node)
for assignment_node in assignment_nodes:
targets = assignment_node.targets
if (len(targets) == 1 and targets[0].id == 'version'):
assigned_values = assignment_node.value.value
logger.info("Found version of {0} to be {1}".format(class_name, assigned_values))
return assigned_values
# No dependencies found
logger.info("Found no version information for {0}".format(class_name))
return 'Unknown'
def __get_processor_dependencies(self, details_node, class_name):
deps = self.__get_assigned_list(details_node, class_name, 'dependencies')
if len(deps) == 0:
logger.info("Found no external dependencies that are required for class %s" % class_name)
else:
logger.info("Found the following external dependencies that are required for class {0}: {1}".format(class_name, deps))
return deps
def __get_processor_tags(self, details_node, class_name):
return self.__get_assigned_list(details_node, class_name, 'tags')
def __get_assigned_list(self, details_node, class_name, element_name):
assignment_nodes = self.__get_assignment_nodes(details_node)
for assignment_node in assignment_nodes:
targets = assignment_node.targets
if (len(targets) == 1 and targets[0].id == element_name):
assigned_values = assignment_node.value.elts
declared_dependencies = []
for assigned_value in assigned_values:
declared_dependencies.append(assigned_value.value)
return declared_dependencies
# No values found
return []
def __get_processor_description(self, details_node, class_name):
assignment_nodes = self.__get_assignment_nodes(details_node)
for assignment_node in assignment_nodes:
targets = assignment_node.targets
if (len(targets) == 1 and targets[0].id == 'description'):
return assignment_node.value.value
# No description found
logger.debug("Found no description for class %s" % class_name)
return None
def __get_class_nodes(self, node):
class_nodes = [n for n in node.body if isinstance(n, ast.ClassDef)]
return class_nodes
def __get_assignment_nodes(self, node):
assignment_nodes = [n for n in node.body if isinstance(n, ast.Assign)]
return assignment_nodes
def import_external_dependencies(self, processor_details, work_dir):
class_name = processor_details.getProcessorType()
@ -519,7 +311,7 @@ class ExtensionManager:
def __load_extension_module(self, file, local_dependencies):
# If there are any local dependencies (i.e., other python files in the same directory), load those modules first
if local_dependencies is not None and len(local_dependencies) > 0:
if local_dependencies:
to_load = [dep for dep in local_dependencies]
if file in to_load:
to_load.remove(file)
@ -553,20 +345,20 @@ class ExtensionManager:
to_load.insert(0, local_dependency)
# Determine the module name
moduleName = Path(file).name.split('.py')[0]
module_name = Path(file).name.split('.py')[0]
# Create the module specification
moduleSpec = importlib.util.spec_from_file_location(moduleName, file)
logger.debug(f"Module Spec: {moduleSpec}")
module_spec = importlib.util.spec_from_file_location(module_name, file)
logger.debug(f"Module Spec: {module_spec}")
# Create the module from the specification
module = importlib.util.module_from_spec(moduleSpec)
module = importlib.util.module_from_spec(module_spec)
logger.debug(f"Module: {module}")
# Load the module
sys.modules[moduleName] = module
moduleSpec.loader.exec_module(module)
logger.info(f"Loaded module {moduleName}")
sys.modules[module_name] = module
module_spec.loader.exec_module(module)
logger.info(f"Loaded module {module_name}")
# Find the Processor class and return it
for name, member in inspect.getmembers(module):
@ -579,27 +371,27 @@ class ExtensionManager:
return None
def __is_processor_class(self, potentialProcessorClass):
def __is_processor_class(self, potential_processor_class):
# Go through all members of the given class and see if it has an inner class named Java
for name, member in inspect.getmembers(potentialProcessorClass):
for name, member in inspect.getmembers(potential_processor_class):
if name == 'Java' and inspect.isclass(member):
# Instantiate the Java class
instance = member()
# Check if the instance has a method named 'implements'
hasImplements = False
has_implements = False
for attr in dir(instance):
if attr == 'implements':
hasImplements = True
has_implements = True
break
# If not, move to the next member
if not hasImplements:
if not has_implements:
continue
# The class implements something. Check if it implements Processor
for interface in instance.implements:
if interface in self.processorInterfaces:
logger.debug(f"{potentialProcessorClass} implements Processor")
if interface in self.processor_interfaces:
logger.debug(f"{potential_processor_class} implements Processor")
return True
return False
return False

View File

@ -0,0 +1,295 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import ast
import ExtensionDetails
import logging
import textwrap
from nifiapi.documentation import UseCaseDetails, MultiProcessorUseCaseDetails, ProcessorConfiguration, PropertyDescription
PROCESSOR_INTERFACES = ['org.apache.nifi.python.processor.FlowFileTransform', 'org.apache.nifi.python.processor.RecordTransform']
logger = logging.getLogger("python.ProcessorInspection")
def get_processor_class_nodes(module_file: str) -> list:
with open(module_file) as file:
root_node = ast.parse(file.read())
processor_class_nodes = []
class_nodes = get_class_nodes(root_node)
for class_node in class_nodes:
if is_processor_class_node(class_node):
processor_class_nodes.append(class_node)
return processor_class_nodes
def get_processor_details(class_node, module_file):
# Look for a 'ProcessorDetails' class
child_class_nodes = get_class_nodes(class_node)
# Get the Java interfaces that it implements
interfaces = get_java_interfaces(class_node)
for child_class_node in child_class_nodes:
if child_class_node.name == 'ProcessorDetails':
logger.debug(f"Found ProcessorDetails class in {class_node.name}")
version = __get_processor_version(child_class_node)
dependencies = __get_processor_dependencies(child_class_node, class_node.name)
description = __get_processor_description(child_class_node)
tags = __get_processor_tags(child_class_node)
use_cases = get_use_cases(class_node)
multi_processor_use_cases = get_multi_processor_use_cases(class_node)
property_descriptions = get_property_descriptions(class_node)
return ExtensionDetails.ExtensionDetails(interfaces=interfaces,
type=class_node.name,
version=version,
dependencies=dependencies,
source_location=module_file,
description=description,
tags=tags,
use_cases=use_cases,
multi_processor_use_cases=multi_processor_use_cases,
property_descriptions=property_descriptions)
return ExtensionDetails.ExtensionDetails(interfaces=interfaces,
type=class_node.name,
version='Unknown',
dependencies=[],
source_location=module_file)
def __get_processor_version(details_node):
return get_assigned_value(details_node, 'version', 'Unknown')
def __get_processor_dependencies(details_node, class_name):
deps = get_assigned_value(details_node, 'dependencies', [])
if len(deps) == 0:
logger.info("Found no external dependencies that are required for class %s" % class_name)
else:
logger.info("Found the following external dependencies that are required for class {0}: {1}".format(class_name, deps))
return deps
def __get_processor_tags(details_node):
return get_assigned_value(details_node, 'tags', [])
def get_use_cases(class_node) -> list[UseCaseDetails]:
decorators = class_node.decorator_list
if not decorators:
return []
use_cases = []
for decorator in decorators:
if decorator.func.id != 'use_case' or not decorator.keywords:
continue
kv_pairs = {}
for keyword in decorator.keywords:
keyword_name = keyword.arg
keyword_value = get_constant_values(keyword.value)
if keyword_value is not None and isinstance(keyword_value, str):
keyword_value = textwrap.dedent(keyword_value).strip()
kv_pairs[keyword_name] = keyword_value
use_case = UseCaseDetails(
description=dedent(kv_pairs.get('description')),
notes=dedent(kv_pairs.get('notes')),
keywords=kv_pairs.get('keywords'),
configuration=dedent(kv_pairs.get('configuration'))
)
use_cases.append(use_case)
return use_cases
def dedent(val: str) -> str:
if not val:
return ""
return textwrap.dedent(val)
def get_multi_processor_use_cases(class_node) -> list[MultiProcessorUseCaseDetails]:
decorators = class_node.decorator_list
if not decorators:
return []
use_cases = []
for decorator in decorators:
if decorator.func.id != 'multi_processor_use_case' or not decorator.keywords:
continue
kv_pairs = {}
for keyword in decorator.keywords:
keyword_name = keyword.arg
if keyword_name == 'configurations':
keyword_value = get_processor_configurations(keyword.value)
else:
keyword_value = get_constant_values(keyword.value)
if keyword_value is not None and isinstance(keyword_value, str):
keyword_value = textwrap.dedent(keyword_value).strip()
kv_pairs[keyword_name] = keyword_value
use_case = MultiProcessorUseCaseDetails(
description=dedent(kv_pairs.get('description')),
notes=dedent(kv_pairs.get('notes')),
keywords=kv_pairs.get('keywords'),
configurations=kv_pairs.get('configurations')
)
use_cases.append(use_case)
return use_cases
def get_processor_configurations(constructor_calls: ast.List) -> list:
configurations = []
for constructor_call in constructor_calls.elts:
if not isinstance(constructor_call, ast.Call) or constructor_call.func.id != 'ProcessorConfiguration':
continue
kv_pairs = {}
for keyword in constructor_call.keywords:
keyword_name = keyword.arg
keyword_value = get_constant_values(keyword.value)
if keyword_value is not None and isinstance(keyword_value, str):
keyword_value = textwrap.dedent(keyword_value).strip()
kv_pairs[keyword_name] = keyword_value
processor_config = ProcessorConfiguration(processor_type=kv_pairs.get('processor_type'),
configuration=kv_pairs.get('configuration'))
configurations.append(processor_config)
return configurations
def get_property_descriptions(class_node):
descriptions = []
for element in class_node.body:
if not isinstance(element, ast.Assign) or not element.value:
continue
if not isinstance(element.value, ast.Call):
continue
if element.value.func.id != 'PropertyDescriptor':
continue
if not element.value.keywords:
continue
descriptor_info = {}
for keyword in element.value.keywords:
key = keyword.arg
value = get_constant_values(keyword.value)
descriptor_info[key] = value
description = PropertyDescription(name=descriptor_info.get('name'),
description=descriptor_info.get('description'),
display_name=replace_null(descriptor_info.get('display_name'), descriptor_info.get('name')),
required=replace_null(descriptor_info.get('required'), False),
sensitive=replace_null(descriptor_info.get('sensitive'), False),
default_value=descriptor_info.get('default_value'),
expression_language_scope=replace_null(descriptor_info.get('expression_language_scope'), 'NONE'),
controller_service_definition=descriptor_info.get('controller_service_definition'))
descriptions.append(description)
return descriptions
def replace_null(val: any, replacement: any):
return val if val else replacement
def get_assigned_value(class_node, assignment_id, default_value=None):
assignments = get_assignment_nodes(class_node)
for assignment in assignments:
targets = assignment.targets
if len(targets) != 1 or targets[0].id != assignment_id:
continue
assigned_value = assignment.value
return get_constant_values(assigned_value)
return default_value
def get_constant_values(val):
if isinstance(val, ast.Constant):
return val.value
if isinstance(val, ast.List):
return [get_constant_values(v) for v in val.elts]
if isinstance(val, ast.Dict):
keys = val.keys
values = val.values
key_values = [get_constant_values(v).strip() for v in keys]
value_values = [get_constant_values(v).strip() for v in values]
return dict(zip(key_values, value_values))
if isinstance(val, ast.Attribute):
return val.attr
if isinstance(val, ast.BinOp) and isinstance(val.op, ast.Add):
left = get_constant_values(val.left)
right = get_constant_values(val.right)
if left and right:
return left + right
if left and not right:
return left
if right and not left:
return right
return None
def __get_processor_description(details_node):
return get_assigned_value(details_node, 'description')
def is_processor_class_node(class_node):
"""
Checks if the Abstract Syntax Tree (AST) Node represents a Processor class.
We are looking for any classes within the given module file that look like:
class MyProcessor:
...
class Java:
implements = ['org.apache.nifi.python.processor.FlowFileTransform']
:param class_node: the abstract syntax tree (AST) node
:return: True if the AST Node represents a Python Class that is a Processor, False otherwise
"""
# Look for a 'Java' sub-class
interfaces = get_java_interfaces(class_node)
return len(interfaces) > 0
def get_java_interfaces(class_node):
# Get all class definition nodes
child_class_nodes = get_class_nodes(class_node)
interfaces = []
for child_class_node in child_class_nodes:
# Look for a 'Java' sub-class
if child_class_node.name != 'Java':
continue
implemented = get_assigned_value(child_class_node, 'implements')
interfaces.extend([ifc_name for ifc_name in implemented if ifc_name in PROCESSOR_INTERFACES])
return interfaces
def get_class_nodes(node) -> list:
return [n for n in node.body if isinstance(n, ast.ClassDef)]
def get_assignment_nodes(node) -> list:
return [n for n in node.body if isinstance(n, ast.Assign)]

View File

@ -16,6 +16,14 @@
from nifiapi.properties import ProcessContext
def is_method_defined(processor, method_name):
# Get the attribute from the given Processor with the provided method name, returning None if method is not present
attr = getattr(processor, method_name, None)
# Return True if the attribute is present and is a method (i.e., is callable).
return callable(attr)
# PythonProcessorAdapter is responsible for receiving method invocations from Java side and delegating to the appropriate
# method for a Processor. We use this adapter instead of calling directly into the Processor because it allows us to be more
# flexible on the Python side, by allowing us to avoid implementing things like customValidate, etc.
@ -26,30 +34,23 @@ class PythonProcessorAdapter:
def __init__(self, gateway, processor, extension_manager, controller_service_type_lookup):
self.processor = processor
self.gateway = gateway
self.hasCustomValidate = self.hasMethod(processor, 'customValidate')
self.hasCustomValidate = is_method_defined(processor, 'customValidate')
self.extension_manager = extension_manager
self.controller_service_type_lookup = controller_service_type_lookup
self.has_properties = self.hasMethod(processor, 'getPropertyDescriptors')
self.supportsDynamicProperties = self.hasMethod(processor, 'getDynamicPropertyDescriptor')
self.has_properties = is_method_defined(processor, 'getPropertyDescriptors')
self.supportsDynamicProperties = is_method_defined(processor, 'getDynamicPropertyDescriptor')
if self.hasMethod(processor, 'getRelationships'):
if is_method_defined(processor, 'getRelationships'):
self.relationships = None
else:
self.relationships = gateway.jvm.java.util.HashSet()
success = gateway.jvm.org.apache.nifi.processor.Relationship.Builder() \
.name("success") \
.description("All FlowFiles will go to this relationship") \
.description("All FlowFiles will go to this relationship after being successfully processed") \
.build()
self.relationships.add(success)
def hasMethod(self, processor, methodName):
# Get the attribute from the given Processor with the provided method name, returning None if method is not present
attr = getattr(processor, methodName, None)
# Return True if the attribute is present and is a method (i.e., is callable).
return callable(attr)
def customValidate(self, context):
# If no customValidate method, just return
if not self.hasCustomValidate:
@ -59,7 +60,7 @@ class PythonProcessorAdapter:
def getRelationships(self):
# If self.relationships is None, it means that the Processor has implemented the method, and we need
# to call the Processor's implementation. This allows for dynamically change the Relationships based on
# to call the Processor's implementation. This allows for dynamically changing the Relationships based on
# configuration, etc.
if self.relationships is None:
return self.processor.getRelationships()
@ -68,11 +69,11 @@ class PythonProcessorAdapter:
def getSupportedPropertyDescriptors(self):
descriptors = self.processor.getPropertyDescriptors() if self.has_properties else []
descriptorList = self.gateway.jvm.java.util.ArrayList()
descriptor_list = self.gateway.jvm.java.util.ArrayList()
for descriptor in descriptors:
descriptorList.add(descriptor.to_java_descriptor(self.gateway, self.controller_service_type_lookup))
descriptor_list.add(descriptor.to_java_descriptor(self.gateway, self.controller_service_type_lookup))
return descriptorList
return descriptor_list
def getProcessor(self):
return self.processor
@ -80,18 +81,18 @@ class PythonProcessorAdapter:
def isDynamicPropertySupported(self):
return self.supportsDynamicProperties
def getSupportedDynamicPropertyDescriptor(self, propertyName):
def getSupportedDynamicPropertyDescriptor(self, property_name):
if not self.supportsDynamicProperties:
return None
descriptor = self.processor.getDynamicPropertyDescriptor(propertyName)
return None if descriptor is None else descriptor.to_java_descriptor(gateway = self.gateway, cs_type_lookup=self.controller_service_type_lookup)
descriptor = self.processor.getDynamicPropertyDescriptor(property_name)
return None if descriptor is None else descriptor.to_java_descriptor(gateway=self.gateway, cs_type_lookup=self.controller_service_type_lookup)
def onScheduled(self, context):
if self.hasMethod(self.processor, 'onScheduled'):
if is_method_defined(self.processor, 'onScheduled'):
self.processor.onScheduled(ProcessContext(context))
def onStopped(self, context):
if self.hasMethod(self.processor, 'onStopped'):
if is_method_defined(self.processor, 'onStopped'):
self.processor.onStopped(ProcessContext(context))
def initialize(self, context):

View File

@ -0,0 +1,45 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from nifiapi.documentation import use_case, multi_processor_use_case, ProcessorConfiguration
@use_case(description="First Use Case",
notes="First Note",
keywords=["A", "B"],
configuration="""This Processor has no configuration.""")
@use_case(description="Second Use Case",
notes="Another Note",
keywords=["C", "B"],
configuration="""No config.""")
@multi_processor_use_case(description="Multi Processor Use Case",
notes="Note #1",
keywords=["D", "E"],
configurations=[
ProcessorConfiguration(processor_type="OtherProcessor",
configuration="No config "
"necessary."),
ProcessorConfiguration(processor_type="DummyProcessor",
configuration="None.")
])
class DummyProcessor:
class Java:
implements = ['org.apache.nifi.python.processor.FlowFileTransform']
class ProcessorDetails:
description = "Fake Processor"
tags = ["tag1", "tag2"]
def __init__(self):
pass

View File

@ -0,0 +1,53 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest
import ProcessorInspection
DUMMY_PROCESSOR_FILE = 'src/test/python/framework/DummyProcessor.py'
class DetectProcessorUseCase(unittest.TestCase):
def test_get_processor_details(self):
class_nodes = ProcessorInspection.get_processor_class_nodes(DUMMY_PROCESSOR_FILE)
self.assertIsNotNone(class_nodes)
self.assertEqual(len(class_nodes), 1)
class_node = class_nodes[0]
self.assertEqual(class_node.name, 'DummyProcessor')
details = ProcessorInspection.get_processor_details(class_node, DUMMY_PROCESSOR_FILE)
self.assertIsNotNone(details)
self.assertEqual(details.description, 'Fake Processor')
self.assertEqual(details.tags, ['tag1', 'tag2'])
self.assertEqual(len(details.use_cases), 2)
self.assertEqual(details.use_cases[0].description, 'First Use Case')
self.assertEqual(details.use_cases[1].description, 'Second Use Case')
self.assertEqual(details.use_cases[0].notes, 'First Note')
self.assertEqual(details.use_cases[1].notes, 'Another Note')
self.assertEqual(details.use_cases[0].configuration, 'This Processor has no configuration.')
self.assertEqual(details.use_cases[0].keywords, ['A', 'B'])
self.assertEqual(len(details.multi_processor_use_cases), 1)
self.assertEqual(details.multi_processor_use_cases[0].description, 'Multi Processor Use Case')
self.assertEqual(details.multi_processor_use_cases[0].notes, 'Note #1')
self.assertEqual(details.multi_processor_use_cases[0].keywords, ['D', 'E'])
self.assertEqual(len(details.multi_processor_use_cases[0].configurations), 2)
self.assertEqual(details.multi_processor_use_cases[0].configurations[0].processor_type, 'OtherProcessor')
self.assertEqual(details.multi_processor_use_cases[0].configurations[0].configuration, 'No config necessary.')
self.assertEqual(details.multi_processor_use_cases[0].configurations[1].processor_type, 'DummyProcessor')
self.assertEqual(details.multi_processor_use_cases[0].configurations[1].configuration, 'None.')
if __name__ == '__main__':
unittest.main()

View File

@ -47,7 +47,7 @@ class PromptChatGPT(FlowFileTransform):
PROMPT = PropertyDescriptor(
name="Prompt",
description="The prompt to issue to ChatGPT. This may use FlowFile attributes via Expression Language and may also reference the FlowFile content by using the literal " +
f"{FLOWFILE_CONTENT_REFERENCE} (including braces) in the prompt. If the FlowFile's content is JSON formatted, a reference may also include JSONPath Expressions "
"{flowfile_content} (including braces) in the prompt. If the FlowFile's content is JSON formatted, a reference may also include JSONPath Expressions "
"to reference specific fields in the FlowFile content, such as {$.page_content}",
validators=[StandardValidators.NON_EMPTY_VALIDATOR],
expression_language_scope=ExpressionLanguageScope.FLOWFILE_ATTRIBUTES,

View File

@ -18,6 +18,8 @@ import json
from langchain.text_splitter import Language
from nifiapi.flowfiletransform import FlowFileTransform, FlowFileTransformResult
from nifiapi.properties import PropertyDescriptor, StandardValidators, PropertyDependency, ExpressionLanguageScope
from nifiapi.documentation import use_case, multi_processor_use_case, ProcessorConfiguration
SPLIT_BY_CHARACTER = 'Split by Character'
SPLIT_CODE = 'Split Code'
@ -26,14 +28,86 @@ RECURSIVELY_SPLIT_BY_CHARACTER = 'Recursively Split by Character'
TEXT_KEY = "text"
METADATA_KEY = "metadata"
@use_case(
description="Create chunks of text from a single larger chunk.",
notes="The input for this use case is expected to be a FlowFile whose content is a JSON Lines document, with each line having a 'text' and a 'metadata' element.",
keywords=["embedding", "vector", "text", "rag", "retrieval augmented generation"],
configuration="""
Set "Input Format" to "Plain Text"
Set "Element Strategy" to "Single Document"
"""
)
@multi_processor_use_case(
description="""
Chunk Plaintext data in order to prepare it for storage in a vector store. The output is in "json-lines" format,
containing the chunked data as text, as well as metadata pertaining to the chunk.""",
notes="The input for this use case is expected to be a FlowFile whose content is a plaintext document.",
keywords=["embedding", "vector", "text", "rag", "retrieval augmented generation"],
configurations=[
ProcessorConfiguration(
processor_type="ParseDocument",
configuration="""
Set "Input Format" to "Plain Text"
Set "Element Strategy" to "Single Document"
Connect the 'success' Relationship to ChunkDocument.
"""
),
ProcessorConfiguration(
processor_type="ChunkDocument",
configuration="""
Set the following properties:
"Chunking Strategy" = "Recursively Split by Character"
"Separator" = "\\n\\n,\\n, ,"
"Separator Format" = "Plain Text"
"Chunk Size" = "4000"
"Chunk Overlap" = "200"
"Keep Separator" = "false"
Connect the 'success' Relationship to the appropriate destination to store data in the desired vector store.
"""
)
])
@multi_processor_use_case(
description="""
Parse and chunk the textual contents of a PDF document in order to prepare it for storage in a vector store. The output is in "json-lines" format,
containing the chunked data as text, as well as metadata pertaining to the chunk.""",
notes="The input for this use case is expected to be a FlowFile whose content is a PDF document.",
keywords=["pdf", "embedding", "vector", "text", "rag", "retrieval augmented generation"],
configurations=[
ProcessorConfiguration(
processor_type="ParseDocument",
configuration="""
Set "Input Format" to "PDF"
Set "Element Strategy" to "Single Document"
Set "Include Extracted Metadata" to "false"
Connect the 'success' Relationship to ChunkDocument.
"""
),
ProcessorConfiguration(
processor_type="ChunkDocument",
configuration="""
Set the following properties:
"Chunking Strategy" = "Recursively Split by Character"
"Separator" = "\\n\\n,\\n, ,"
"Separator Format" = "Plain Text"
"Chunk Size" = "4000"
"Chunk Overlap" = "200"
"Keep Separator" = "false"
Connect the 'success' Relationship to the appropriate destination to store data in the desired vector store.
"""
)
])
class ChunkDocument(FlowFileTransform):
class Java:
implements = ['org.apache.nifi.python.processor.FlowFileTransform']
class ProcessorDetails:
version = '2.0.0-SNAPSHOT'
description = """Splits incoming documents into chunks that are appropriately sized for creating Text Embeddings. The input is expected to be in "json-lines" format, with
each line having a 'text' and a 'metadata' element. Each line will then be split into one or more lines in the output."""
description = """Chunks incoming documents that are formatted as JSON Lines into chunks that are appropriately sized for creating Text Embeddings.
The input is expected to be in "json-lines" format, with each line having a 'text' and a 'metadata' element.
Each line will then be split into one or more lines in the output."""
tags = ["text", "split", "chunk", "langchain", "embeddings", "vector", "machine learning", "ML", "artificial intelligence", "ai", "document"]
dependencies = ['langchain']

View File

@ -19,8 +19,34 @@ from nifiapi.properties import PropertyDescriptor, StandardValidators, Expressio
import pinecone
import json
from EmbeddingUtils import OPENAI, HUGGING_FACE, EMBEDDING_MODEL, create_embedding_service
from nifiapi.documentation import use_case, multi_processor_use_case, ProcessorConfiguration
@use_case(description="Create vectors/embeddings that represent text content and send the vectors to Pinecone",
notes="This use case assumes that the data has already been formatted in JSONL format with the text to store in Pinecone provided in the 'text' field.",
keywords=["pinecone", "embedding", "vector", "text", "vectorstore", "insert"],
configuration="""
Configure the 'Pinecone API Key' to the appropriate authentication token for interacting with Pinecone.
Configure 'Embedding Model' to indicate whether OpenAI embeddings should be used or a HuggingFace embedding model should be used: 'Hugging Face Model' or 'OpenAI Model'
Configure the 'OpenAI API Key' or 'HuggingFace API Key', depending on the chosen Embedding Model.
Set 'Pinecone Environment' to the name of your Pinecone environment
Set 'Index Name' to the name of your Pinecone Index.
Set 'Namespace' to appropriate namespace, or leave it empty to use the default Namespace.
If the documents to send to Pinecone contain a unique identifier, set the 'Document ID Field Name' property to the name of the field that contains the document ID.
This property can be left blank, in which case a unique ID will be generated based on the FlowFile's filename.
""")
@use_case(description="Update vectors/embeddings in Pinecone",
notes="This use case assumes that the data has already been formatted in JSONL format with the text to store in Pinecone provided in the 'text' field.",
keywords=["pinecone", "embedding", "vector", "text", "vectorstore", "update", "upsert"],
configuration="""
Configure the 'Pinecone API Key' to the appropriate authentication token for interacting with Pinecone.
Configure 'Embedding Model' to indicate whether OpenAI embeddings should be used or a HuggingFace embedding model should be used: 'Hugging Face Model' or 'OpenAI Model'
Configure the 'OpenAI API Key' or 'HuggingFace API Key', depending on the chosen Embedding Model.
Set 'Pinecone Environment' to the name of your Pinecone environment
Set 'Index Name' to the name of your Pinecone Index.
Set 'Namespace' to appropriate namespace, or leave it empty to use the default Namespace.
Set the 'Document ID Field Name' property to the name of the field that contains the identifier of the document in Pinecone to update.
""")
class PutPinecone(FlowFileTransform):
class Java:
implements = ['org.apache.nifi.python.processor.FlowFileTransform']
@ -31,6 +57,7 @@ class PutPinecone(FlowFileTransform):
The text must be a string, while metadata must be a map with strings for values. Any additional fields will be ignored."""
tags = ["pinecone", "vector", "vectordb", "vectorstore", "embeddings", "ai", "artificial intelligence", "ml", "machine learning", "text", "LLM"]
PINECONE_API_KEY = PropertyDescriptor(
name="Pinecone API Key",
description="The API Key to use in order to authentication with Pinecone",