NIFI-14100: Add option to sequential download of dependencies for Python processors

This commit is contained in:
lehelb 2024-12-20 14:41:59 +01:00
parent df793ce14e
commit 81ba02f785
8 changed files with 43 additions and 3 deletions

View File

@ -308,6 +308,7 @@ public class NiFiProperties extends ApplicationProperties {
public static final String PYTHON_WORKING_DIRECTORY = "nifi.python.working.directory"; public static final String PYTHON_WORKING_DIRECTORY = "nifi.python.working.directory";
public static final String PYTHON_MAX_PROCESSES = "nifi.python.max.processes"; public static final String PYTHON_MAX_PROCESSES = "nifi.python.max.processes";
public static final String PYTHON_MAX_PROCESSES_PER_TYPE = "nifi.python.max.processes.per.extension.type"; public static final String PYTHON_MAX_PROCESSES_PER_TYPE = "nifi.python.max.processes.per.extension.type";
public static final String PYTHON_DEPENDENCY_INSTALL_SEQUENTIAL = "nifi.python.dependency.install.sequential";
public static final String PYTHON_COMMS_TIMEOUT = "nifi.python.comms.timeout"; public static final String PYTHON_COMMS_TIMEOUT = "nifi.python.comms.timeout";
public static final String PYTHON_CONTROLLER_DEBUGPY_ENABLED = "nifi.python.controller.debugpy.enabled"; public static final String PYTHON_CONTROLLER_DEBUGPY_ENABLED = "nifi.python.controller.debugpy.enabled";

View File

@ -251,6 +251,7 @@ that can be allocated for each individual type of Processor.
| nifi.python.max.processes | 100 | The maximum number of Python processes to spawn for all Processors combined. Once this limit is reached, if another Processor is added to the NiFi canvas, | nifi.python.max.processes | 100 | The maximum number of Python processes to spawn for all Processors combined. Once this limit is reached, if another Processor is added to the NiFi canvas,
the newly added Processor will be added to one of the existing Python processes that was allocated for other Processors of the same type. If there are no other Python processes allocated for the newly added Processor will be added to one of the existing Python processes that was allocated for other Processors of the same type. If there are no other Python processes allocated for
the same type, an Exception will be thrown and the Processor will not be added to the canvas. the same type, an Exception will be thrown and the Processor will not be added to the canvas.
| nifi.python.dependency.install.sequential | false | The value controls whether dependencies are downloaded and installed sequentially or in parallel when initializing multiple Python processor
|================================================================================================================================================== |==================================================================================================================================================

View File

@ -482,6 +482,7 @@ public class PythonProcess {
.processorType(processorType) .processorType(processorType)
.processorVersion(processorVersion) .processorVersion(processorVersion)
.workingDirectory(processConfig.getPythonWorkingDirectory()) .workingDirectory(processConfig.getPythonWorkingDirectory())
.installDependenciesSequential(processConfig.getDependencyInstallSequential())
.moduleFile(new File(controller.getModuleFile(type, version))) .moduleFile(new File(controller.getModuleFile(type, version)))
.build(); .build();

View File

@ -40,12 +40,14 @@ public class StandardPythonProcessorBridge implements PythonProcessorBridge {
private volatile PythonProcessorAdapter adapter; private volatile PythonProcessorAdapter adapter;
private final File workingDir; private final File workingDir;
private final File moduleFile; private final File moduleFile;
private final boolean installDependenciesSequential;
private volatile long lastModified; private volatile long lastModified;
private volatile LoadState loadState = LoadState.DOWNLOADING_DEPENDENCIES; private volatile LoadState loadState = LoadState.DOWNLOADING_DEPENDENCIES;
private volatile PythonProcessorInitializationContext initializationContext; private volatile PythonProcessorInitializationContext initializationContext;
private volatile String identifier; private volatile String identifier;
private volatile PythonController controller; private volatile PythonController controller;
private volatile CompletableFuture<Void> initializationFuture; private volatile CompletableFuture<Void> initializationFuture;
private static final Object DEPENDENCY_DOWNLOAD_LOCK = new Object();
private StandardPythonProcessorBridge(final Builder builder) { private StandardPythonProcessorBridge(final Builder builder) {
@ -55,6 +57,7 @@ public class StandardPythonProcessorBridge implements PythonProcessorBridge {
this.processorVersion = builder.processorVersion; this.processorVersion = builder.processorVersion;
this.workingDir = builder.workDir; this.workingDir = builder.workDir;
this.moduleFile = builder.moduleFile; this.moduleFile = builder.moduleFile;
this.installDependenciesSequential = builder.installDependenciesSequential;
this.lastModified = this.moduleFile.lastModified(); this.lastModified = this.moduleFile.lastModified();
} }
@ -115,8 +118,14 @@ public class StandardPythonProcessorBridge implements PythonProcessorBridge {
loadState = LoadState.DOWNLOADING_DEPENDENCIES; loadState = LoadState.DOWNLOADING_DEPENDENCIES;
try { try {
creationWorkflow.downloadDependencies(); if (installDependenciesSequential) {
logger.info("Successfully downloaded dependencies for Python Processor {} ({})", identifier, getProcessorType()); synchronized (DEPENDENCY_DOWNLOAD_LOCK) {
logger.debug("Installing dependencies sequentially.");
downloadDependencies();
}
} else {
downloadDependencies();
}
break; break;
} catch (final Exception e) { } catch (final Exception e) {
@ -171,6 +180,11 @@ public class StandardPythonProcessorBridge implements PythonProcessorBridge {
future.complete(null); future.complete(null);
} }
private void downloadDependencies() {
creationWorkflow.downloadDependencies();
logger.info("Successfully downloaded dependencies for Python Processor {} ({})", identifier, getProcessorType());
}
@Override @Override
public String getProcessorType() { public String getProcessorType() {
return processorType; return processorType;
@ -198,6 +212,7 @@ public class StandardPythonProcessorBridge implements PythonProcessorBridge {
private File moduleFile; private File moduleFile;
private String processorType; private String processorType;
private String processorVersion; private String processorVersion;
private boolean installDependenciesSequential;
public Builder controller(final PythonController controller) { public Builder controller(final PythonController controller) {
this.controller = controller; this.controller = controller;
@ -229,6 +244,11 @@ public class StandardPythonProcessorBridge implements PythonProcessorBridge {
return this; return this;
} }
public Builder installDependenciesSequential(final boolean installDependenciesSequential) {
this.installDependenciesSequential = installDependenciesSequential;
return this;
}
public StandardPythonProcessorBridge build() { public StandardPythonProcessorBridge build() {
if (controller == null) { if (controller == null) {
throw new IllegalStateException("Must specify the PythonController"); throw new IllegalStateException("Must specify the PythonController");

View File

@ -35,6 +35,7 @@ public class PythonProcessConfig {
private final boolean debugController; private final boolean debugController;
private final String debugHost; private final String debugHost;
private final int debugPort; private final int debugPort;
private final boolean dependencyInstallSequential;
private PythonProcessConfig(final Builder builder) { private PythonProcessConfig(final Builder builder) {
this.pythonCommand = builder.pythonCommand; this.pythonCommand = builder.pythonCommand;
@ -47,6 +48,7 @@ public class PythonProcessConfig {
this.debugController = builder.debugController; this.debugController = builder.debugController;
this.debugPort = builder.debugPort; this.debugPort = builder.debugPort;
this.debugHost = builder.debugHost; this.debugHost = builder.debugHost;
this.dependencyInstallSequential = builder.dependencyInstallSequential;
} }
public String getPythonCommand() { public String getPythonCommand() {
@ -89,6 +91,10 @@ public class PythonProcessConfig {
return debugPort; return debugPort;
} }
public boolean getDependencyInstallSequential() {
return dependencyInstallSequential;
}
public static class Builder { public static class Builder {
private String pythonCommand = "python3"; private String pythonCommand = "python3";
private File pythonFrameworkDirectory = new File("python/framework"); private File pythonFrameworkDirectory = new File("python/framework");
@ -100,6 +106,7 @@ public class PythonProcessConfig {
private boolean debugController = false; private boolean debugController = false;
private String debugHost = "localhost"; private String debugHost = "localhost";
private int debugPort = 5678; private int debugPort = 5678;
private boolean dependencyInstallSequential = false;
public Builder pythonCommand(final String command) { public Builder pythonCommand(final String command) {
@ -164,6 +171,11 @@ public class PythonProcessConfig {
return this; return this;
} }
public Builder dependencyInstallSequential(final boolean dependencyInstallSequential) {
this.dependencyInstallSequential = dependencyInstallSequential;
return this;
}
public PythonProcessConfig build() { public PythonProcessConfig build() {
return new PythonProcessConfig(this); return new PythonProcessConfig(this);
} }

View File

@ -254,6 +254,7 @@ import javax.management.NotificationEmitter;
import javax.net.ssl.SSLContext; import javax.net.ssl.SSLContext;
import static java.util.Objects.requireNonNull; import static java.util.Objects.requireNonNull;
import static org.apache.nifi.util.NiFiProperties.PYTHON_DEPENDENCY_INSTALL_SEQUENTIAL;
public class FlowController implements ReportingTaskProvider, FlowAnalysisRuleProvider, Authorizable, NodeTypeProvider { public class FlowController implements ReportingTaskProvider, FlowAnalysisRuleProvider, Authorizable, NodeTypeProvider {
private static final String STANDARD_PYTHON_BRIDGE_IMPLEMENTATION_CLASS = "org.apache.nifi.py4j.StandardPythonBridge"; private static final String STANDARD_PYTHON_BRIDGE_IMPLEMENTATION_CLASS = "org.apache.nifi.py4j.StandardPythonBridge";
@ -875,6 +876,7 @@ public class FlowController implements ReportingTaskProvider, FlowAnalysisRulePr
int maxProcesses = nifiProperties.getIntegerProperty(NiFiProperties.PYTHON_MAX_PROCESSES, 20); int maxProcesses = nifiProperties.getIntegerProperty(NiFiProperties.PYTHON_MAX_PROCESSES, 20);
int maxProcessesPerType = nifiProperties.getIntegerProperty(NiFiProperties.PYTHON_MAX_PROCESSES_PER_TYPE, 2); int maxProcessesPerType = nifiProperties.getIntegerProperty(NiFiProperties.PYTHON_MAX_PROCESSES_PER_TYPE, 2);
boolean isdependencyInstallSequential = Boolean.parseBoolean(nifiProperties.getProperty(PYTHON_DEPENDENCY_INSTALL_SEQUENTIAL, "false"));
final boolean enableControllerDebug = Boolean.parseBoolean(nifiProperties.getProperty(NiFiProperties.PYTHON_CONTROLLER_DEBUGPY_ENABLED, "false")); final boolean enableControllerDebug = Boolean.parseBoolean(nifiProperties.getProperty(NiFiProperties.PYTHON_CONTROLLER_DEBUGPY_ENABLED, "false"));
final int debugPort = nifiProperties.getIntegerProperty(NiFiProperties.PYTHON_CONTROLLER_DEBUGPY_PORT, 5678); final int debugPort = nifiProperties.getIntegerProperty(NiFiProperties.PYTHON_CONTROLLER_DEBUGPY_PORT, 5678);
@ -909,6 +911,7 @@ public class FlowController implements ReportingTaskProvider, FlowAnalysisRulePr
.commsTimeout(commsTimeout == null ? null : Duration.ofMillis(FormatUtils.getTimeDuration(commsTimeout, TimeUnit.MILLISECONDS))) .commsTimeout(commsTimeout == null ? null : Duration.ofMillis(FormatUtils.getTimeDuration(commsTimeout, TimeUnit.MILLISECONDS)))
.maxPythonProcesses(maxProcesses) .maxPythonProcesses(maxProcesses)
.maxPythonProcessesPerType(maxProcessesPerType) .maxPythonProcessesPerType(maxProcessesPerType)
.dependencyInstallSequential(isdependencyInstallSequential)
.enableControllerDebug(enableControllerDebug) .enableControllerDebug(enableControllerDebug)
.debugPort(debugPort) .debugPort(debugPort)
.debugHost(debugHost) .debugHost(debugHost)

View File

@ -264,6 +264,7 @@
<nifi.python.working.directory>./work/python</nifi.python.working.directory> <nifi.python.working.directory>./work/python</nifi.python.working.directory>
<nifi.python.max.processes>100</nifi.python.max.processes> <nifi.python.max.processes>100</nifi.python.max.processes>
<nifi.python.max.processes.per.extension.type>10</nifi.python.max.processes.per.extension.type> <nifi.python.max.processes.per.extension.type>10</nifi.python.max.processes.per.extension.type>
<nifi.python.dependency.install.sequential>false</nifi.python.dependency.install.sequential>
<nifi.cluster.leader.election.kubernetes.lease.prefix /> <nifi.cluster.leader.election.kubernetes.lease.prefix />

View File

@ -48,6 +48,7 @@ nifi.python.extensions.source.directory.default=${nifi.python.extensions.source.
nifi.python.working.directory=${nifi.python.working.directory} nifi.python.working.directory=${nifi.python.working.directory}
nifi.python.max.processes=${nifi.python.max.processes} nifi.python.max.processes=${nifi.python.max.processes}
nifi.python.max.processes.per.extension.type=${nifi.python.max.processes.per.extension.type} nifi.python.max.processes.per.extension.type=${nifi.python.max.processes.per.extension.type}
nifi.python.dependency.install.sequential=${nifi.python.dependency.install.sequential}
#################### ####################
# State Management # # State Management #