From 81ba02f785103ba61edf895ae6d3ec0cc763bcbb Mon Sep 17 00:00:00 2001 From: lehelb Date: Fri, 20 Dec 2024 14:41:59 +0100 Subject: [PATCH] NIFI-14100: Add option to sequential download of dependencies for Python processors --- .../org/apache/nifi/util/NiFiProperties.java | 1 + .../main/asciidoc/administration-guide.adoc | 1 + .../org/apache/nifi/py4j/PythonProcess.java | 1 + .../py4j/StandardPythonProcessorBridge.java | 24 +++++++++++++++++-- .../nifi/python/PythonProcessConfig.java | 12 ++++++++++ .../nifi/controller/FlowController.java | 3 +++ .../nifi-framework/nifi-resources/pom.xml | 3 ++- .../src/main/resources/conf/nifi.properties | 1 + 8 files changed, 43 insertions(+), 3 deletions(-) diff --git a/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java b/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java index 5b4753b00d..ca563f052c 100644 --- a/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java +++ b/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java @@ -308,6 +308,7 @@ public class NiFiProperties extends ApplicationProperties { public static final String PYTHON_WORKING_DIRECTORY = "nifi.python.working.directory"; public static final String PYTHON_MAX_PROCESSES = "nifi.python.max.processes"; public static final String PYTHON_MAX_PROCESSES_PER_TYPE = "nifi.python.max.processes.per.extension.type"; + public static final String PYTHON_DEPENDENCY_INSTALL_SEQUENTIAL = "nifi.python.dependency.install.sequential"; public static final String PYTHON_COMMS_TIMEOUT = "nifi.python.comms.timeout"; public static final String PYTHON_CONTROLLER_DEBUGPY_ENABLED = "nifi.python.controller.debugpy.enabled"; diff --git a/nifi-docs/src/main/asciidoc/administration-guide.adoc b/nifi-docs/src/main/asciidoc/administration-guide.adoc index 0e92332123..f70bc545d5 100644 --- a/nifi-docs/src/main/asciidoc/administration-guide.adoc +++ b/nifi-docs/src/main/asciidoc/administration-guide.adoc @@ -251,6 +251,7 @@ that can be allocated for each individual type of Processor. | nifi.python.max.processes | 100 | The maximum number of Python processes to spawn for all Processors combined. Once this limit is reached, if another Processor is added to the NiFi canvas, the newly added Processor will be added to one of the existing Python processes that was allocated for other Processors of the same type. If there are no other Python processes allocated for the same type, an Exception will be thrown and the Processor will not be added to the canvas. +| nifi.python.dependency.install.sequential | false | The value controls whether dependencies are downloaded and installed sequentially or in parallel when initializing multiple Python processor |================================================================================================================================================== diff --git a/nifi-extension-bundles/nifi-py4j-extension-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/py4j/PythonProcess.java b/nifi-extension-bundles/nifi-py4j-extension-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/py4j/PythonProcess.java index 4b5b6cc8cc..9795d3207a 100644 --- a/nifi-extension-bundles/nifi-py4j-extension-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/py4j/PythonProcess.java +++ b/nifi-extension-bundles/nifi-py4j-extension-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/py4j/PythonProcess.java @@ -482,6 +482,7 @@ public class PythonProcess { .processorType(processorType) .processorVersion(processorVersion) .workingDirectory(processConfig.getPythonWorkingDirectory()) + .installDependenciesSequential(processConfig.getDependencyInstallSequential()) .moduleFile(new File(controller.getModuleFile(type, version))) .build(); diff --git a/nifi-extension-bundles/nifi-py4j-extension-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/py4j/StandardPythonProcessorBridge.java b/nifi-extension-bundles/nifi-py4j-extension-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/py4j/StandardPythonProcessorBridge.java index 713d12addb..673a73618c 100644 --- a/nifi-extension-bundles/nifi-py4j-extension-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/py4j/StandardPythonProcessorBridge.java +++ b/nifi-extension-bundles/nifi-py4j-extension-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/py4j/StandardPythonProcessorBridge.java @@ -40,12 +40,14 @@ public class StandardPythonProcessorBridge implements PythonProcessorBridge { private volatile PythonProcessorAdapter adapter; private final File workingDir; private final File moduleFile; + private final boolean installDependenciesSequential; private volatile long lastModified; private volatile LoadState loadState = LoadState.DOWNLOADING_DEPENDENCIES; private volatile PythonProcessorInitializationContext initializationContext; private volatile String identifier; private volatile PythonController controller; private volatile CompletableFuture initializationFuture; + private static final Object DEPENDENCY_DOWNLOAD_LOCK = new Object(); private StandardPythonProcessorBridge(final Builder builder) { @@ -55,6 +57,7 @@ public class StandardPythonProcessorBridge implements PythonProcessorBridge { this.processorVersion = builder.processorVersion; this.workingDir = builder.workDir; this.moduleFile = builder.moduleFile; + this.installDependenciesSequential = builder.installDependenciesSequential; this.lastModified = this.moduleFile.lastModified(); } @@ -115,8 +118,14 @@ public class StandardPythonProcessorBridge implements PythonProcessorBridge { loadState = LoadState.DOWNLOADING_DEPENDENCIES; try { - creationWorkflow.downloadDependencies(); - logger.info("Successfully downloaded dependencies for Python Processor {} ({})", identifier, getProcessorType()); + if (installDependenciesSequential) { + synchronized (DEPENDENCY_DOWNLOAD_LOCK) { + logger.debug("Installing dependencies sequentially."); + downloadDependencies(); + } + } else { + downloadDependencies(); + } break; } catch (final Exception e) { @@ -171,6 +180,11 @@ public class StandardPythonProcessorBridge implements PythonProcessorBridge { future.complete(null); } + private void downloadDependencies() { + creationWorkflow.downloadDependencies(); + logger.info("Successfully downloaded dependencies for Python Processor {} ({})", identifier, getProcessorType()); + } + @Override public String getProcessorType() { return processorType; @@ -198,6 +212,7 @@ public class StandardPythonProcessorBridge implements PythonProcessorBridge { private File moduleFile; private String processorType; private String processorVersion; + private boolean installDependenciesSequential; public Builder controller(final PythonController controller) { this.controller = controller; @@ -229,6 +244,11 @@ public class StandardPythonProcessorBridge implements PythonProcessorBridge { return this; } + public Builder installDependenciesSequential(final boolean installDependenciesSequential) { + this.installDependenciesSequential = installDependenciesSequential; + return this; + } + public StandardPythonProcessorBridge build() { if (controller == null) { throw new IllegalStateException("Must specify the PythonController"); diff --git a/nifi-framework-bundle/nifi-framework-extensions/nifi-py4j-framework-bundle/nifi-python-framework-api/src/main/java/org/apache/nifi/python/PythonProcessConfig.java b/nifi-framework-bundle/nifi-framework-extensions/nifi-py4j-framework-bundle/nifi-python-framework-api/src/main/java/org/apache/nifi/python/PythonProcessConfig.java index 042a1b36be..adbac04aaf 100644 --- a/nifi-framework-bundle/nifi-framework-extensions/nifi-py4j-framework-bundle/nifi-python-framework-api/src/main/java/org/apache/nifi/python/PythonProcessConfig.java +++ b/nifi-framework-bundle/nifi-framework-extensions/nifi-py4j-framework-bundle/nifi-python-framework-api/src/main/java/org/apache/nifi/python/PythonProcessConfig.java @@ -35,6 +35,7 @@ public class PythonProcessConfig { private final boolean debugController; private final String debugHost; private final int debugPort; + private final boolean dependencyInstallSequential; private PythonProcessConfig(final Builder builder) { this.pythonCommand = builder.pythonCommand; @@ -47,6 +48,7 @@ public class PythonProcessConfig { this.debugController = builder.debugController; this.debugPort = builder.debugPort; this.debugHost = builder.debugHost; + this.dependencyInstallSequential = builder.dependencyInstallSequential; } public String getPythonCommand() { @@ -89,6 +91,10 @@ public class PythonProcessConfig { return debugPort; } + public boolean getDependencyInstallSequential() { + return dependencyInstallSequential; + } + public static class Builder { private String pythonCommand = "python3"; private File pythonFrameworkDirectory = new File("python/framework"); @@ -100,6 +106,7 @@ public class PythonProcessConfig { private boolean debugController = false; private String debugHost = "localhost"; private int debugPort = 5678; + private boolean dependencyInstallSequential = false; public Builder pythonCommand(final String command) { @@ -164,6 +171,11 @@ public class PythonProcessConfig { return this; } + public Builder dependencyInstallSequential(final boolean dependencyInstallSequential) { + this.dependencyInstallSequential = dependencyInstallSequential; + return this; + } + public PythonProcessConfig build() { return new PythonProcessConfig(this); } diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java index ee50266061..243ec6e0d5 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java @@ -254,6 +254,7 @@ import javax.management.NotificationEmitter; import javax.net.ssl.SSLContext; import static java.util.Objects.requireNonNull; +import static org.apache.nifi.util.NiFiProperties.PYTHON_DEPENDENCY_INSTALL_SEQUENTIAL; public class FlowController implements ReportingTaskProvider, FlowAnalysisRuleProvider, Authorizable, NodeTypeProvider { private static final String STANDARD_PYTHON_BRIDGE_IMPLEMENTATION_CLASS = "org.apache.nifi.py4j.StandardPythonBridge"; @@ -875,6 +876,7 @@ public class FlowController implements ReportingTaskProvider, FlowAnalysisRulePr int maxProcesses = nifiProperties.getIntegerProperty(NiFiProperties.PYTHON_MAX_PROCESSES, 20); int maxProcessesPerType = nifiProperties.getIntegerProperty(NiFiProperties.PYTHON_MAX_PROCESSES_PER_TYPE, 2); + boolean isdependencyInstallSequential = Boolean.parseBoolean(nifiProperties.getProperty(PYTHON_DEPENDENCY_INSTALL_SEQUENTIAL, "false")); final boolean enableControllerDebug = Boolean.parseBoolean(nifiProperties.getProperty(NiFiProperties.PYTHON_CONTROLLER_DEBUGPY_ENABLED, "false")); final int debugPort = nifiProperties.getIntegerProperty(NiFiProperties.PYTHON_CONTROLLER_DEBUGPY_PORT, 5678); @@ -909,6 +911,7 @@ public class FlowController implements ReportingTaskProvider, FlowAnalysisRulePr .commsTimeout(commsTimeout == null ? null : Duration.ofMillis(FormatUtils.getTimeDuration(commsTimeout, TimeUnit.MILLISECONDS))) .maxPythonProcesses(maxProcesses) .maxPythonProcessesPerType(maxProcessesPerType) + .dependencyInstallSequential(isdependencyInstallSequential) .enableControllerDebug(enableControllerDebug) .debugPort(debugPort) .debugHost(debugHost) diff --git a/nifi-framework-bundle/nifi-framework/nifi-resources/pom.xml b/nifi-framework-bundle/nifi-framework/nifi-resources/pom.xml index 40d115880c..a36133685d 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-resources/pom.xml +++ b/nifi-framework-bundle/nifi-framework/nifi-resources/pom.xml @@ -264,7 +264,8 @@ ./work/python 100 10 - + false + 0 diff --git a/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties b/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties index 5e6136628d..fe2587daec 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties +++ b/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties @@ -48,6 +48,7 @@ nifi.python.extensions.source.directory.default=${nifi.python.extensions.source. nifi.python.working.directory=${nifi.python.working.directory} nifi.python.max.processes=${nifi.python.max.processes} nifi.python.max.processes.per.extension.type=${nifi.python.max.processes.per.extension.type} +nifi.python.dependency.install.sequential=${nifi.python.dependency.install.sequential} #################### # State Management #