NIFI-13395 Added Python Processors to MiNiFi

This closes #8962

Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
Ferenc Kis 2024-06-13 09:41:03 +02:00 committed by exceptionfactory
parent 57e07c080f
commit 55d2f39d55
No known key found for this signature in database
7 changed files with 222 additions and 13 deletions

View File

@ -179,11 +179,6 @@ limitations under the License.
<artifactId>nifi-stateless-api</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-python-framework-api</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-framework-api</artifactId>
@ -299,6 +294,38 @@ limitations under the License.
<artifactId>nifi-property-utils</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-py4j-nar</artifactId>
<version>2.0.0-SNAPSHOT</version>
<type>nar</type>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-python-extensions-bundle</artifactId>
<version>2.0.0-SNAPSHOT</version>
<type>zip</type>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-python-framework</artifactId>
<version>2.0.0-SNAPSHOT</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-python-extension-api</artifactId>
<version>2.0.0-SNAPSHOT</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-python-framework-api</artifactId>
<version>2.0.0-SNAPSHOT</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>

View File

@ -151,6 +151,90 @@
</excludes>
</unpackOptions>
</dependencySet>
<!-- Write out all dependency artifacts to lib directory -->
<dependencySet>
<scope>runtime</scope>
<useProjectArtifact>false</useProjectArtifact>
<outputDirectory>python/</outputDirectory>
<useStrictFiltering>true</useStrictFiltering>
<useTransitiveDependencies>false</useTransitiveDependencies>
<includes>
<include>*:nifi-python-framework</include>
</includes>
<unpack>true</unpack>
<unpackOptions>
<filtered>false</filtered>
<excludes>
<!-- LICENSE and NOTICE both covered by top-level -->
<exclude>LICENSE</exclude>
<exclude>NOTICE</exclude>
<exclude>META-INF</exclude>
<exclude>META-INF/**/*</exclude>
</excludes>
</unpackOptions>
</dependencySet>
<!-- Write out the python api contents -->
<dependencySet>
<scope>runtime</scope>
<useProjectArtifact>false</useProjectArtifact>
<outputDirectory>python/api</outputDirectory>
<useStrictFiltering>true</useStrictFiltering>
<useTransitiveDependencies>false</useTransitiveDependencies>
<includes>
<include>*:nifi-python-extension-api</include>
</includes>
<unpack>true</unpack>
<unpackOptions>
<filtered>false</filtered>
<excludes>
<!-- LICENSE and NOTICE both covered by top-level -->
<exclude>LICENSE</exclude>
<exclude>NOTICE</exclude>
<exclude>META-INF</exclude>
<exclude>META-INF/**/*</exclude>
</excludes>
</unpackOptions>
</dependencySet>
<dependencySet>
<scope>runtime</scope>
<useProjectArtifact>false</useProjectArtifact>
<outputDirectory>./python/extensions</outputDirectory>
<directoryMode>0770</directoryMode>
<fileMode>0664</fileMode>
<useTransitiveFiltering>true</useTransitiveFiltering>
<includes>
<include>*:nifi-python-extensions-bundle</include>
</includes>
<unpack>true</unpack>
<unpackOptions>
<excludes>
<exclude>META-INF/</exclude>
<exclude>META-INF/**</exclude>
</excludes>
</unpackOptions>
</dependencySet>
<dependencySet>
<scope>runtime</scope>
<useProjectArtifact>false</useProjectArtifact>
<outputDirectory>./python/extensions</outputDirectory>
<directoryMode>0770</directoryMode>
<fileMode>0664</fileMode>
<useTransitiveFiltering>true</useTransitiveFiltering>
<includes>
<include>*:nifi-python-extensions-bundle</include>
</includes>
<unpack>true</unpack>
<unpackOptions>
<excludes>
<exclude>META-INF/</exclude>
<exclude>META-INF/**</exclude>
</excludes>
</unpackOptions>
</dependencySet>
</dependencySets>
<fileSets>
<fileSet>

View File

@ -153,6 +153,71 @@
</excludes>
</unpackOptions>
</dependencySet>
<!-- Write out all dependency artifacts to lib directory -->
<dependencySet>
<scope>runtime</scope>
<useProjectArtifact>false</useProjectArtifact>
<outputDirectory>python/</outputDirectory>
<useStrictFiltering>true</useStrictFiltering>
<useTransitiveDependencies>false</useTransitiveDependencies>
<includes>
<include>*:nifi-python-framework</include>
</includes>
<unpack>true</unpack>
<unpackOptions>
<filtered>false</filtered>
<excludes>
<!-- LICENSE and NOTICE both covered by top-level -->
<exclude>LICENSE</exclude>
<exclude>NOTICE</exclude>
<exclude>META-INF</exclude>
<exclude>META-INF/**/*</exclude>
</excludes>
</unpackOptions>
</dependencySet>
<!-- Write out the python api contents -->
<dependencySet>
<scope>runtime</scope>
<useProjectArtifact>false</useProjectArtifact>
<outputDirectory>python/api</outputDirectory>
<useStrictFiltering>true</useStrictFiltering>
<useTransitiveDependencies>false</useTransitiveDependencies>
<includes>
<include>*:nifi-python-extension-api</include>
</includes>
<unpack>true</unpack>
<unpackOptions>
<filtered>false</filtered>
<excludes>
<!-- LICENSE and NOTICE both covered by top-level -->
<exclude>LICENSE</exclude>
<exclude>NOTICE</exclude>
<exclude>META-INF</exclude>
<exclude>META-INF/**/*</exclude>
</excludes>
</unpackOptions>
</dependencySet>
<dependencySet>
<scope>runtime</scope>
<useProjectArtifact>false</useProjectArtifact>
<outputDirectory>./python/extensions</outputDirectory>
<directoryMode>0770</directoryMode>
<fileMode>0664</fileMode>
<useTransitiveFiltering>true</useTransitiveFiltering>
<includes>
<include>*:nifi-python-extensions-bundle</include>
</includes>
<unpack>true</unpack>
<unpackOptions>
<excludes>
<exclude>META-INF/</exclude>
<exclude>META-INF/**</exclude>
</excludes>
</unpackOptions>
</dependencySet>
</dependencySets>
<fileSets>
<fileSet>

View File

@ -20,6 +20,11 @@ package org.apache.nifi.minifi.c2.command;
import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.UUID.randomUUID;
import static java.util.function.Predicate.not;
import static org.apache.nifi.components.AsyncLoadedProcessor.LoadState.DOWNLOADING_DEPENDENCIES;
import static org.apache.nifi.components.AsyncLoadedProcessor.LoadState.INITIALIZING_ENVIRONMENT;
import static org.apache.nifi.components.AsyncLoadedProcessor.LoadState.LOADING_PROCESSOR_CODE;
import static org.apache.nifi.components.validation.ValidationStatus.INVALID;
import static org.apache.nifi.components.validation.ValidationStatus.VALIDATING;
import static org.apache.nifi.minifi.commons.api.MiNiFiConstants.BACKUP_EXTENSION;
import static org.apache.nifi.minifi.commons.api.MiNiFiConstants.RAW_EXTENSION;
import static org.apache.nifi.minifi.commons.util.FlowUpdateUtils.backup;
@ -39,10 +44,9 @@ import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Stream;
import org.apache.commons.io.FilenameUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.nifi.c2.client.service.operation.UpdateConfigurationStrategy;
import org.apache.nifi.components.AsyncLoadedProcessor;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.validation.ValidationStatus;
import org.apache.nifi.controller.ComponentNode;
import org.apache.nifi.controller.FlowController;
import org.apache.nifi.controller.ProcessorNode;
@ -61,6 +65,11 @@ public class DefaultUpdateConfigurationStrategy implements UpdateConfigurationSt
private static final Logger LOGGER = LoggerFactory.getLogger(DefaultUpdateConfigurationStrategy.class);
private static final Set<AsyncLoadedProcessor.LoadState> INITIALIZING_ASYNC_PROCESSOR_STATES =
Set.of(INITIALIZING_ENVIRONMENT, DOWNLOADING_DEPENDENCIES, LOADING_PROCESSOR_CODE);
private static int ASYNC_LOADING_COMPONENT_INIT_RETRY_PAUSE_DURATION_MS = 5000;
private static int ASYNC_LOADING_COMPONENT_INIT_MAX_RETRIES = 60;
private static int VALIDATION_RETRY_PAUSE_DURATION_MS = 1000;
private static int VALIDATION_MAX_RETRIES = 5;
private static int FLOW_DRAIN_RETRY_PAUSE_DURATION_MS = 1000;
@ -194,7 +203,14 @@ public class DefaultUpdateConfigurationStrategy implements UpdateConfigurationSt
private List<ValidationResult> validate(FlowManager flowManager) {
List<? extends ComponentNode> componentNodes = extractComponentNodes(flowManager);
retry(() -> componentIsInValidatingState(componentNodes), List::isEmpty, VALIDATION_MAX_RETRIES, VALIDATION_RETRY_PAUSE_DURATION_MS)
retry(() -> initializingAsyncLoadingComponents(componentNodes), List::isEmpty,
ASYNC_LOADING_COMPONENT_INIT_MAX_RETRIES, ASYNC_LOADING_COMPONENT_INIT_RETRY_PAUSE_DURATION_MS)
.ifPresent(components -> {
LOGGER.error("The following components are async loading components and are still initializing: {}", components);
throw new IllegalStateException("Maximum retry number exceeded while waiting for async loading components to be initialized");
});
retry(() -> componentsInValidatingState(componentNodes), List::isEmpty, VALIDATION_MAX_RETRIES, VALIDATION_RETRY_PAUSE_DURATION_MS)
.ifPresent(components -> {
LOGGER.error("The following components are still in VALIDATING state: {}", components);
throw new IllegalStateException("Maximum retry number exceeded while waiting for components to be validated");
@ -215,11 +231,17 @@ public class DefaultUpdateConfigurationStrategy implements UpdateConfigurationSt
.toList();
}
private List<ComponentNode> componentIsInValidatingState(List<? extends ComponentNode> componentNodes) {
private List<? extends ComponentNode> componentsInValidatingState(List<? extends ComponentNode> componentNodes) {
return componentNodes.stream()
.map(componentNode -> Pair.of((ComponentNode) componentNode, componentNode.performValidation()))
.filter(pair -> pair.getRight() == ValidationStatus.VALIDATING)
.map(Pair::getLeft)
.filter(componentNode -> componentNode.performValidation() == VALIDATING)
.toList();
}
private List<? extends ComponentNode> initializingAsyncLoadingComponents(List<? extends ComponentNode> componentNodes) {
return componentNodes.stream()
.filter(componentNode -> componentNode.performValidation() == INVALID)
.filter(componentNode -> componentNode.getComponent() instanceof AsyncLoadedProcessor asyncLoadedProcessor
&& INITIALIZING_ASYNC_PROCESSOR_STATES.contains(asyncLoadedProcessor.getState()))
.toList();
}

View File

@ -197,3 +197,11 @@ nifi.components.status.snapshot.frequency=1 min
# Swap Manager
nifi.swap.manager.implementation=org.apache.nifi.controller.FileSystemSwapManager
nifi.queue.swap.threshold=20000
# Uncomment in order to enable Python Extensions.
#nifi.python.command=python3
nifi.python.framework.source.directory=./python/framework
nifi.python.extensions.source.directory.default=./python/extensions
nifi.python.working.directory=./work/python
nifi.python.max.processes=100
nifi.python.max.processes.per.extension.type=10

View File

@ -90,6 +90,9 @@
<!-- Suppress non-error messages due to known warning about redundant path annotation (NIFI-574) -->
<logger name="com.sun.jersey.spi.inject.Errors" level="ERROR"/>
<!-- Py4J set to WARN to avoid verbose socket communication messages -->
<logger name="py4j" level="WARN" />
<!--
Logger for capturing Bootstrap logs and MiNiFi's standard error and standard out.
-->

View File

@ -85,7 +85,7 @@ class PutQdrant(FlowFileTransform):
name="Force Recreate Collection",
description="Specifies whether to recreate the collection if it already exists. Essentially clearing the existing data.",
required=True,
default_value=False,
default_value="False",
allowable_values=["True", "False"],
validators=[StandardValidators.BOOLEAN_VALIDATOR],
)