diff --git a/minifi/minifi-assembly/pom.xml b/minifi/minifi-assembly/pom.xml index 69c7a2ce6b..3762920ddc 100644 --- a/minifi/minifi-assembly/pom.xml +++ b/minifi/minifi-assembly/pom.xml @@ -179,11 +179,6 @@ limitations under the License. nifi-stateless-api compile - - org.apache.nifi - nifi-python-framework-api - compile - org.apache.nifi nifi-framework-api @@ -299,6 +294,38 @@ limitations under the License. nifi-property-utils compile + + + org.apache.nifi + nifi-py4j-nar + 2.0.0-SNAPSHOT + nar + + + org.apache.nifi + nifi-python-extensions-bundle + 2.0.0-SNAPSHOT + zip + + + org.apache.nifi + nifi-python-framework + 2.0.0-SNAPSHOT + compile + + + org.apache.nifi + nifi-python-extension-api + 2.0.0-SNAPSHOT + compile + + + org.apache.nifi + nifi-python-framework-api + 2.0.0-SNAPSHOT + compile + + commons-io commons-io diff --git a/minifi/minifi-assembly/src/main/assembly/dependencies-windows-service.xml b/minifi/minifi-assembly/src/main/assembly/dependencies-windows-service.xml index 234472e544..c798d21be7 100644 --- a/minifi/minifi-assembly/src/main/assembly/dependencies-windows-service.xml +++ b/minifi/minifi-assembly/src/main/assembly/dependencies-windows-service.xml @@ -151,6 +151,90 @@ + + + + runtime + false + python/ + true + false + + *:nifi-python-framework + + true + + false + + + LICENSE + NOTICE + META-INF + META-INF/**/* + + + + + + + runtime + false + python/api + true + false + + *:nifi-python-extension-api + + true + + false + + + LICENSE + NOTICE + META-INF + META-INF/**/* + + + + + + runtime + false + ./python/extensions + 0770 + 0664 + true + + *:nifi-python-extensions-bundle + + true + + + META-INF/ + META-INF/** + + + + + + runtime + false + ./python/extensions + 0770 + 0664 + true + + *:nifi-python-extensions-bundle + + true + + + META-INF/ + META-INF/** + + + diff --git a/minifi/minifi-assembly/src/main/assembly/dependencies.xml b/minifi/minifi-assembly/src/main/assembly/dependencies.xml index 19d9e7203e..41051e3ac0 100644 --- a/minifi/minifi-assembly/src/main/assembly/dependencies.xml +++ b/minifi/minifi-assembly/src/main/assembly/dependencies.xml @@ -153,6 +153,71 @@ + + + + runtime + false + python/ + true + false + + *:nifi-python-framework + + true + + false + + + LICENSE + NOTICE + META-INF + META-INF/**/* + + + + + + + runtime + false + python/api + true + false + + *:nifi-python-extension-api + + true + + false + + + LICENSE + NOTICE + META-INF + META-INF/**/* + + + + + + runtime + false + ./python/extensions + 0770 + 0664 + true + + *:nifi-python-extensions-bundle + + true + + + META-INF/ + META-INF/** + + + diff --git a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/command/DefaultUpdateConfigurationStrategy.java b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/command/DefaultUpdateConfigurationStrategy.java index a46122fa90..678ab912b7 100644 --- a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/command/DefaultUpdateConfigurationStrategy.java +++ b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/command/DefaultUpdateConfigurationStrategy.java @@ -20,6 +20,11 @@ package org.apache.nifi.minifi.c2.command; import static java.nio.charset.StandardCharsets.UTF_8; import static java.util.UUID.randomUUID; import static java.util.function.Predicate.not; +import static org.apache.nifi.components.AsyncLoadedProcessor.LoadState.DOWNLOADING_DEPENDENCIES; +import static org.apache.nifi.components.AsyncLoadedProcessor.LoadState.INITIALIZING_ENVIRONMENT; +import static org.apache.nifi.components.AsyncLoadedProcessor.LoadState.LOADING_PROCESSOR_CODE; +import static org.apache.nifi.components.validation.ValidationStatus.INVALID; +import static org.apache.nifi.components.validation.ValidationStatus.VALIDATING; import static org.apache.nifi.minifi.commons.api.MiNiFiConstants.BACKUP_EXTENSION; import static org.apache.nifi.minifi.commons.api.MiNiFiConstants.RAW_EXTENSION; import static org.apache.nifi.minifi.commons.util.FlowUpdateUtils.backup; @@ -39,10 +44,9 @@ import java.util.function.Predicate; import java.util.function.Supplier; import java.util.stream.Stream; import org.apache.commons.io.FilenameUtils; -import org.apache.commons.lang3.tuple.Pair; import org.apache.nifi.c2.client.service.operation.UpdateConfigurationStrategy; +import org.apache.nifi.components.AsyncLoadedProcessor; import org.apache.nifi.components.ValidationResult; -import org.apache.nifi.components.validation.ValidationStatus; import org.apache.nifi.controller.ComponentNode; import org.apache.nifi.controller.FlowController; import org.apache.nifi.controller.ProcessorNode; @@ -61,6 +65,11 @@ public class DefaultUpdateConfigurationStrategy implements UpdateConfigurationSt private static final Logger LOGGER = LoggerFactory.getLogger(DefaultUpdateConfigurationStrategy.class); + private static final Set INITIALIZING_ASYNC_PROCESSOR_STATES = + Set.of(INITIALIZING_ENVIRONMENT, DOWNLOADING_DEPENDENCIES, LOADING_PROCESSOR_CODE); + + private static int ASYNC_LOADING_COMPONENT_INIT_RETRY_PAUSE_DURATION_MS = 5000; + private static int ASYNC_LOADING_COMPONENT_INIT_MAX_RETRIES = 60; private static int VALIDATION_RETRY_PAUSE_DURATION_MS = 1000; private static int VALIDATION_MAX_RETRIES = 5; private static int FLOW_DRAIN_RETRY_PAUSE_DURATION_MS = 1000; @@ -194,7 +203,14 @@ public class DefaultUpdateConfigurationStrategy implements UpdateConfigurationSt private List validate(FlowManager flowManager) { List componentNodes = extractComponentNodes(flowManager); - retry(() -> componentIsInValidatingState(componentNodes), List::isEmpty, VALIDATION_MAX_RETRIES, VALIDATION_RETRY_PAUSE_DURATION_MS) + retry(() -> initializingAsyncLoadingComponents(componentNodes), List::isEmpty, + ASYNC_LOADING_COMPONENT_INIT_MAX_RETRIES, ASYNC_LOADING_COMPONENT_INIT_RETRY_PAUSE_DURATION_MS) + .ifPresent(components -> { + LOGGER.error("The following components are async loading components and are still initializing: {}", components); + throw new IllegalStateException("Maximum retry number exceeded while waiting for async loading components to be initialized"); + }); + + retry(() -> componentsInValidatingState(componentNodes), List::isEmpty, VALIDATION_MAX_RETRIES, VALIDATION_RETRY_PAUSE_DURATION_MS) .ifPresent(components -> { LOGGER.error("The following components are still in VALIDATING state: {}", components); throw new IllegalStateException("Maximum retry number exceeded while waiting for components to be validated"); @@ -215,11 +231,17 @@ public class DefaultUpdateConfigurationStrategy implements UpdateConfigurationSt .toList(); } - private List componentIsInValidatingState(List componentNodes) { + private List componentsInValidatingState(List componentNodes) { return componentNodes.stream() - .map(componentNode -> Pair.of((ComponentNode) componentNode, componentNode.performValidation())) - .filter(pair -> pair.getRight() == ValidationStatus.VALIDATING) - .map(Pair::getLeft) + .filter(componentNode -> componentNode.performValidation() == VALIDATING) + .toList(); + } + + private List initializingAsyncLoadingComponents(List componentNodes) { + return componentNodes.stream() + .filter(componentNode -> componentNode.performValidation() == INVALID) + .filter(componentNode -> componentNode.getComponent() instanceof AsyncLoadedProcessor asyncLoadedProcessor + && INITIALIZING_ASYNC_PROCESSOR_STATES.contains(asyncLoadedProcessor.getState())) .toList(); } diff --git a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-resources/src/main/resources/conf/bootstrap.conf b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-resources/src/main/resources/conf/bootstrap.conf index 680465a5d2..273a96ebe0 100644 --- a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-resources/src/main/resources/conf/bootstrap.conf +++ b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-resources/src/main/resources/conf/bootstrap.conf @@ -197,3 +197,11 @@ nifi.components.status.snapshot.frequency=1 min # Swap Manager nifi.swap.manager.implementation=org.apache.nifi.controller.FileSystemSwapManager nifi.queue.swap.threshold=20000 + +# Uncomment in order to enable Python Extensions. +#nifi.python.command=python3 +nifi.python.framework.source.directory=./python/framework +nifi.python.extensions.source.directory.default=./python/extensions +nifi.python.working.directory=./work/python +nifi.python.max.processes=100 +nifi.python.max.processes.per.extension.type=10 diff --git a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-resources/src/main/resources/conf/logback.xml b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-resources/src/main/resources/conf/logback.xml index 4911a5f7a1..2f1cab15d9 100644 --- a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-resources/src/main/resources/conf/logback.xml +++ b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-resources/src/main/resources/conf/logback.xml @@ -90,6 +90,9 @@ + + + diff --git a/nifi-python-extensions/nifi-text-embeddings-module/src/main/python/vectorstores/PutQdrant.py b/nifi-python-extensions/nifi-text-embeddings-module/src/main/python/vectorstores/PutQdrant.py index f7e2de1ea5..2d3e7e8575 100644 --- a/nifi-python-extensions/nifi-text-embeddings-module/src/main/python/vectorstores/PutQdrant.py +++ b/nifi-python-extensions/nifi-text-embeddings-module/src/main/python/vectorstores/PutQdrant.py @@ -85,7 +85,7 @@ class PutQdrant(FlowFileTransform): name="Force Recreate Collection", description="Specifies whether to recreate the collection if it already exists. Essentially clearing the existing data.", required=True, - default_value=False, + default_value="False", allowable_values=["True", "False"], validators=[StandardValidators.BOOLEAN_VALIDATOR], )