NIFI-11793 Added Python debugging properties and documentation

Added documentation to indicate how to debug Python side of nifi framework, as well as debugging Python processors themselves using VSCode's Remote debugger.
This also provides the ability to launch the Controller process in such a way that it will listen to incoming remote debug connections.

This closes #7469

Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
Mark Payne 2023-07-11 11:26:07 -04:00 committed by exceptionfactory
parent e0d6b49cd5
commit c68f5ec9a2
No known key found for this signature in database
GPG Key ID: 29B6A52D2AAE8DBA
8 changed files with 271 additions and 4 deletions

View File

@ -341,6 +341,11 @@ public class NiFiProperties extends ApplicationProperties {
public static final String PYTHON_MAX_PROCESSES_PER_TYPE = "nifi.python.max.processes.per.extension.type";
public static final String PYTHON_COMMS_TIMEOUT = "nifi.python.comms.timeout";
public static final String PYTHON_CONTROLLER_DEBUGPY_ENABLED = "nifi.python.controller.debugpy.enabled";
public static final String PYTHON_CONTROLLER_DEBUGPY_PORT = "nifi.python.controller.debugpy.port";
public static final String PYTHON_CONTROLLER_DEBUGPY_HOST = "nifi.python.controller.debugpy.host";
public static final String PYTHON_CONTROLLER_DEBUGPY_LOGS_DIR = "nifi.python.controller.debugpy.logs.directory";
public static final String DEFAULT_PYTHON_WORKING_DIRECTORY = "./work/python";
// automatic diagnostic defaults

View File

@ -541,3 +541,83 @@ appropriate environment directory, and restarting NiFi.
While you may delete the entire `work` directory while NiFi is stopped, doing so may result in NiFi taking significantly longer to startup
the next time, as it must source all extensions' dependencies from PyPI, as well as expand all Java extensions' NAR files.
[[debugging]]
== Debugging
It's often helpful to attach a remote debugger to the Python process so that we can step through the code, examine variables,
and execute code snippets, etc.
The method used will vary based on the IDE used. However, here we will look at how to enable remote debugging using VSCode.
VSCode comes with a debugger named DebugPy.
It is important to note that every instance of a Processor and even each concurrent task in a Processor may end up
using a separate Python process. Because of this, it is difficult to enable the Python process to listen for incoming
connections because there may be many different processes. Instead, it is recommended to enable listening in VSCode
and then allow the Processor itself to connect to the debugger.
To enable listening in VSCode, we must first create a `launch.json` launch configuration. The `launch.json` should have a
`listen` section to tell it to listen on a particular port. Additionally, the `pathMappings` must be setup to indicate the
local directory in which VSCode should find the Python code, and the `remoteRoot` which is the directory in which NiFi should find
the Python code.
For example:
[source]
----
{
"version": "0.2.0",
"configurations": [
{
"name": "Python: Remote Attach",
"type": "python",
"request": "attach",
"listen": {
"host": "localhost",
"port": 5678
},
"pathMappings": [
{
"localRoot": "${workspaceFolder}/nifi-python-test-extensions/src/main/resources/extensions",
"remoteRoot": "./python/extensions"
}
],
"justMyCode": true
}
]
}
----
We can then launch this using VSCode.
Next, we must tell the Processor to connect to the debugger. This requires updating the Processor's code.
Firstly, the DebugPy module must be added as a dependency. Then the Processor needs to connect to the debugger.
To enable remote debugging on Processors themselves, it is necessary to update the Processor's code to implement something
akin to:
[source]
----
class MyProcessor(FlowFileTransform):
class Java:
implements = ['org.apache.nifi.python.processor.FlowFileTransform']
class ProcessorDetails:
version = '0.0.1-SNAPSHOT'
dependencies = ['debugpy']
def onScheduled(context):
try:
import debugpy
debugpy.connect(6688)
except e:
self.logger.error("Failed to connect to python debug listener")
----
It is important to note, however, that the code available to VSCode must exactly match the code that NiFi is using in order
to ensure that breakpoints line up correctly. As a result, the code should be updated in VSCode and then copied into NiFi's directory.
At that point, NiFi does not require a restart, but the Processor must be stopped and started again.
Now, when the Processor is scheduled, it will connect to the VSCode debugger, and you can set breakpoints in the VSCode
in order to debug the Processor.

View File

@ -855,6 +855,11 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
int maxProcesses = nifiProperties.getIntegerProperty(NiFiProperties.PYTHON_MAX_PROCESSES, 20);
int maxProcessesPerType = nifiProperties.getIntegerProperty(NiFiProperties.PYTHON_MAX_PROCESSES_PER_TYPE, 2);
final boolean enableControllerDebug = Boolean.parseBoolean(nifiProperties.getProperty(NiFiProperties.PYTHON_CONTROLLER_DEBUGPY_ENABLED, "false"));
final int debugPort = nifiProperties.getIntegerProperty(NiFiProperties.PYTHON_CONTROLLER_DEBUGPY_PORT, 5678);
final String debugHost = nifiProperties.getProperty(NiFiProperties.PYTHON_CONTROLLER_DEBUGPY_HOST, "localhost");
final String debugLogs = nifiProperties.getProperty(NiFiProperties.PYTHON_CONTROLLER_DEBUGPY_LOGS_DIR, "logs");
// Validate configuration for max numbers of processes.
if (maxProcessesPerType < 1) {
LOG.warn("Configured value for {} in nifi.properties is {}, which is invalid. Defaulting to 2.", NiFiProperties.PYTHON_MAX_PROCESSES_PER_TYPE, maxProcessesPerType);
@ -885,6 +890,10 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
.commsTimeout(commsTimeout == null ? null : Duration.ofMillis(FormatUtils.getTimeDuration(commsTimeout, TimeUnit.MILLISECONDS)))
.maxPythonProcesses(maxProcesses)
.maxPythonProcessesPerType(maxProcessesPerType)
.enableControllerDebug(enableControllerDebug)
.debugPort(debugPort)
.debugHost(debugHost)
.debugLogsDirectory(new File(debugLogs))
.build();
final ControllerServiceTypeLookup serviceTypeLookup = serviceProvider::getControllerServiceType;

View File

@ -0,0 +1,79 @@
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
# nifi-py4j-bundle module
The NiFi Py4J Bundle provides a linkage between NiFi's Java Process and Python.
Py4J is the library used in order to launch an RPG server that can be used to communicate between
the Java and Python Processes.
See the NiFi Python Developer's Guide for more information about how to build Processors in Python.
# Debugging
There are times when it's helpful to enable remote debugging of the Python code. Because NiFi is responsible
for launching the Python process, how to enable this may not be as straight-forward as when launching a Python
process yourself. However, NiFi can be told to enable remote debugging when launching the Python process.
The manner in which you connect to the Python Process differs by IDE. Here, we will examine how to use
VSCode's DebugPy.
## Debugging Framework
The method for debugging the framework and debugging Processors is different. Typically, when performing debugging
on the Framework itself, it is easiest to have NiFi enable a DebugPy listener when launching the Python process
that hosts the Controller.
To enable remote debugging, NiFi will use `pip` to install the `debugpy` module into the environment used by the main Python process.
This process is used to discover available Processors and to create Processors. It is not used by Processors themselves.
### Listen for Incoming Connections (Controller)
The following properties may be added to nifi.properties in order to enable remote debugging
of the Controller process:
`nifi.python.controller.debugpy.enable` : Indicates whether or not DebugPy should be used when launching hte Controller.
Defaults to `false`. If set to `true`, the Python process that is responsible for discovering and creating Processors
will be launched using DebugPy.
`nifi.python.controller.debugpy.port` : The local port to use. Defaults to `5678`.
`nifi.python.controller.debugpy.host` : The hostname to listen on. Defaults to `localhost`.
`nifi.python.controller.debugpy.logs.directory` : The directory to write DebugPy logs to. Defaults to `./logs`
Note that these properties do not exist in the nifi.properties by default.
This is intentional and is due to the fact that during any normal operations, this should not be used.
This should be used only by developers wanting to debug the NiFi application itself.
### Connecting to the Python Process
It is important, however, to note the host and port that the debugger is using.
When establishing a connection to the remote debugger, the VSCode may be configured with both the local
directory to use for Python source files, as well as the remote debugger.
Generally, the local directory should point to `${NIFI_SOURCE_DIRECTORY}/nifi-nar-bundles/nifi-py4j-bundle/nifi-python-framework/src/main/python/framework`.
The remote directory, which defaults to `.` should be specified as `./python/framework`.
## Debugging Processors
It is also important to enable remote debugging for Processors. We expect Processor developers to be able
to do this, not just those who are maintaining the NiFi codebase. As a result, instructions for enabling
remote debugging of Processors has been added to the NiFi Python Developer's Guide.

View File

@ -35,8 +35,10 @@ import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.security.SecureRandom;
import java.util.ArrayList;
import java.util.Base64;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
@ -153,7 +155,23 @@ public class PythonProcess {
final String pythonCommand = pythonCommandFile.getAbsolutePath();
final File controllerPyFile = new File(pythonFrameworkDirectory, PYTHON_CONTROLLER_FILENAME);
final ProcessBuilder processBuilder = new ProcessBuilder(pythonCommand, controllerPyFile.getAbsolutePath());
final ProcessBuilder processBuilder = new ProcessBuilder();
final List<String> commands = new ArrayList<>();
commands.add(pythonCommand);
if (processConfig.isDebugController() && "Controller".equals(componentId)) {
commands.add("-m");
commands.add("debugpy");
commands.add("--listen");
commands.add(processConfig.getDebugHost() + ":" + processConfig.getDebugPort());
commands.add("--log-to");
commands.add(processConfig.getDebugLogsDirectory().getAbsolutePath());
}
commands.add(controllerPyFile.getAbsolutePath());
processBuilder.command(commands);
processBuilder.environment().put("JAVA_PORT", String.valueOf(listeningPort));
processBuilder.environment().put("LOGS_DIR", pythonLogsDirectory.getAbsolutePath());
processBuilder.environment().put("ENV_HOME", virtualEnvHome.getAbsolutePath());
@ -200,11 +218,38 @@ public class PythonProcess {
throw new IOException("Failed to create Python Environment " + virtualEnvHome + ": process existed with code " + result);
}
if (processConfig.isDebugController() && "Controller".equals(componentId)) {
installDebugPy();
}
// Create file so that we don't keep trying to recreate the virtual environment
environmentCreationCompleteFile.createNewFile();
logger.info("Successfully created Python Virtual Environment {}", virtualEnvHome);
}
private void installDebugPy() throws IOException {
final String pythonCommand = processConfig.getPythonCommand();
final ProcessBuilder processBuilder = new ProcessBuilder(pythonCommand, "-m", "pip", "install", "--upgrade", "debugpy", "--target",
processConfig.getPythonWorkingDirectory().getAbsolutePath());
processBuilder.directory(virtualEnvHome.getParentFile());
final String command = String.join(" ", processBuilder.command());
logger.debug("Installing DebugPy to Virtual Env {} using command {}", virtualEnvHome, command);
final Process process = processBuilder.start();
final int result;
try {
result = process.waitFor();
} catch (final InterruptedException e) {
throw new IOException("Interrupted while waiting for DebugPy to be installed");
}
if (result != 0) {
throw new IOException("Failed to install DebugPy for Python Environment " + virtualEnvHome + ": process existed with code " + result);
}
}
public void shutdown() {
logger.info("Shutting down Python Process {}", process);

View File

@ -337,7 +337,7 @@ public class PythonControllerInteractionIT {
final List<String> dependencies = writeNumpyVersionDetails.getDependencies();
assertEquals(1, dependencies.size());
assertEquals("numpy==1.20.0", dependencies.get(0));
assertEquals("numpy==1.25.0", dependencies.get(0));
// Create a PrettyPrintJson Processor
final PythonProcessorBridge writeNumPyVersion = createProcessor("WriteNumpyVersion");
@ -352,7 +352,7 @@ public class PythonControllerInteractionIT {
runner.run();
runner.assertTransferCount("original", 1);
runner.assertTransferCount("success", 1);
runner.getFlowFilesForRelationship("success").get(0).assertContentEquals("1.20.0");
runner.getFlowFilesForRelationship("success").get(0).assertContentEquals("1.25.0");
}

View File

@ -34,6 +34,10 @@ public class PythonProcessConfig {
private final Duration commsTimeout;
private final int maxPythonProcesses;
private final int maxPythonProcessesPerType;
private final boolean debugController;
private final String debugHost;
private final int debugPort;
private final File debugLogsDirectory;
private PythonProcessConfig(final Builder builder) {
this.pythonCommand = builder.pythonCommand;
@ -44,6 +48,10 @@ public class PythonProcessConfig {
this.commsTimeout = builder.commsTimeout;
this.maxPythonProcesses = builder.maxProcesses;
this.maxPythonProcessesPerType = builder.maxProcessesPerType;
this.debugController = builder.debugController;
this.debugPort = builder.debugPort;
this.debugHost = builder.debugHost;
this.debugLogsDirectory = builder.debugLogsDirectory;
}
public String getPythonCommand() {
@ -78,6 +86,22 @@ public class PythonProcessConfig {
return maxPythonProcessesPerType;
}
public boolean isDebugController() {
return debugController;
}
public String getDebugHost() {
return debugHost;
}
public int getDebugPort() {
return debugPort;
}
public File getDebugLogsDirectory() {
return debugLogsDirectory;
}
public static class Builder {
private String pythonCommand = "python3";
private File pythonFrameworkDirectory = new File("python/framework");
@ -87,6 +111,11 @@ public class PythonProcessConfig {
private Duration commsTimeout = Duration.ofSeconds(0);
private int maxProcesses;
private int maxProcessesPerType;
private boolean debugController = false;
private String debugHost = "localhost";
private int debugPort = 5678;
private File debugLogsDirectory = new File("logs/");
public Builder pythonCommand(final String command) {
this.pythonCommand = command;
@ -140,6 +169,26 @@ public class PythonProcessConfig {
return this;
}
public Builder enableControllerDebug(final boolean enableDebug) {
this.debugController = enableDebug;
return this;
}
public Builder debugPort(final int debugPort) {
this.debugPort = debugPort;
return this;
}
public Builder debugHost(final String debugHost) {
this.debugHost = debugHost;
return this;
}
public Builder debugLogsDirectory(final File debugLogsDirectory) {
this.debugLogsDirectory = debugLogsDirectory;
return this;
}
public PythonProcessConfig build() {
return new PythonProcessConfig(this);
}

View File

@ -21,7 +21,7 @@ class WriteNumpyVersion(FlowFileTransform):
implements = ['org.apache.nifi.python.processor.FlowFileTransform']
class ProcessorDetails:
dependencies = ['numpy==1.20.0']
dependencies = ['numpy==1.25.0']
version = '0.0.1-SNAPSHOT'
def __init__(self, jvm):