NIFI-13824 Installed Python Processor Dependencies with one command

If a Python processor defines dependencies both inline and in a
requirements.txt file, then we need to install the two groups of
dependencies in a single `pip install` command, otherwise pip is
not able to resolve the web of dependencies correctly.

- Added setup-python step with Python 3.12 to ci-workflow for consistent version behavior

This closes #9429

Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
Ferenc Gerlits 2024-10-21 17:22:01 +02:00 committed by exceptionfactory
parent 5be2cd73e6
commit 3b3e74d46b
No known key found for this signature in database
8 changed files with 180 additions and 28 deletions

View File

@ -162,6 +162,10 @@ jobs:
distribution: 'corretto'
java-version: '21'
cache: 'maven'
- name: Set up Python 3.12
uses: actions/setup-python@v5
with:
python-version: '3.12'
- name: Evaluate Changed Paths
uses: dorny/paths-filter@v3
id: changes
@ -227,6 +231,10 @@ jobs:
distribution: 'zulu'
java-version: '21'
cache: 'maven'
- name: Set up Python 3.12
uses: actions/setup-python@v5
with:
python-version: '3.12'
- name: Evaluate Changed Paths
uses: dorny/paths-filter@v3
id: changes

View File

@ -285,37 +285,31 @@ class ExtensionManager:
logger.info("All dependencies have already been imported for {0}".format(class_name))
return True
python_cmd = os.getenv("PYTHON_CMD")
dependency_references = []
if processor_details.source_location is not None:
package_dir = os.path.dirname(processor_details.source_location)
requirements_file = os.path.join(package_dir, 'requirements.txt')
if os.path.exists(requirements_file):
args = [python_cmd, '-m', 'pip', 'install', '--no-cache-dir', '--target', target_dir, '-r', requirements_file]
dependency_references.append('-r')
dependency_references.append(requirements_file)
logger.info(f"Importing dependencies from requirements file for package {package_dir} to {target_dir} using command {args}")
result = subprocess.run(args)
inline_dependencies = processor_details.getDependencies()
for dependency in inline_dependencies:
dependency_references.append(dependency)
if result.returncode == 0:
logger.info(f"Successfully imported requirements for package {package_dir} to {target_dir}")
else:
raise RuntimeError(f"Failed to import requirements for package {package_dir} from requirements.txt file: process exited with status code {result}")
dependencies = processor_details.getDependencies()
if len(dependencies) > 0:
if len(dependency_references) > 0:
python_cmd = os.getenv("PYTHON_CMD")
args = [python_cmd, '-m', 'pip', 'install', '--no-cache-dir', '--target', target_dir]
for dep in dependencies:
args.append(dep)
logger.info(f"Importing dependencies {dependencies} for {class_name} to {target_dir} using command {args}")
args = [python_cmd, '-m', 'pip', 'install', '--no-cache-dir', '--target', target_dir] + dependency_references
logger.info(f"Installing dependencies {dependency_references} for {class_name} to {target_dir} using command {args}")
result = subprocess.run(args)
if result.returncode == 0:
logger.info(f"Successfully imported requirements for {class_name} to {target_dir}")
logger.info(f"Successfully installed requirements for {class_name} to {target_dir}")
else:
raise RuntimeError(f"Failed to import requirements for {class_name}: process exited with status code {result}")
raise RuntimeError(f"Failed to install requirements for {class_name}: process exited with status code {result}")
else:
logger.info(f"No dependencies to import for {class_name}")
logger.info(f"No dependencies to install for {class_name}")
# Write a completion Marker File
with open(completion_marker_file, "w") as file:

View File

@ -0,0 +1,49 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import tempfile
import unittest
from unittest.mock import patch
from ExtensionManager import ExtensionManager
from testutils import set_up_env, get_processor_details
PROCESSOR_WITH_DEPENDENCIES_TEST_FILE = 'src/test/resources/python/framework/processor_with_dependencies/ProcessorWithDependencies.py'
class ReturncodeMocker:
def __init__(self, return_code):
self.returncode = return_code
class TestExtensionManager(unittest.TestCase):
def setUp(self):
set_up_env()
self.extension_manager = ExtensionManager(None)
@patch('subprocess.run')
def test_import_external_dependencies(self, mock_subprocess_run):
details = get_processor_details(self, 'ProcessorWithDependencies', PROCESSOR_WITH_DEPENDENCIES_TEST_FILE, '/extensions/processor_with_dependencies')
self.assertIsNotNone(details)
mock_subprocess_run.return_value = ReturncodeMocker(0)
with tempfile.TemporaryDirectory() as temp_dir:
packages_dir = os.path.join(temp_dir, 'packages')
self.extension_manager.import_external_dependencies(details, packages_dir)
mock_subprocess_run.assert_called_once()
if __name__ == '__main__':
unittest.main()

View File

@ -13,20 +13,14 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import ProcessorInspection
import unittest
from testutils import get_processor_details
DUMMY_PROCESSOR_FILE = 'src/test/python/framework/DummyProcessor.py'
DUMMY_PROCESSOR_FILE = 'src/test/resources/python/framework/dummy_processor/DummyProcessor.py'
class DetectProcessorUseCase(unittest.TestCase):
def test_get_processor_details(self):
class_nodes = ProcessorInspection.get_processor_class_nodes(DUMMY_PROCESSOR_FILE)
self.assertIsNotNone(class_nodes)
self.assertEqual(len(class_nodes), 1)
class_node = class_nodes[0]
self.assertEqual(class_node.name, 'DummyProcessor')
details = ProcessorInspection.get_processor_details(class_node, DUMMY_PROCESSOR_FILE, '/extensions/dummy_processor', False)
details = get_processor_details(self, 'DummyProcessor', DUMMY_PROCESSOR_FILE, '/extensions/dummy_processor')
self.assertIsNotNone(details)
self.assertEqual(details.description, 'Fake Processor')
self.assertEqual(details.tags, ['tag1', 'tag2'])

View File

@ -0,0 +1,58 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import sys
from nifiapi.__jvm__ import JvmHolder
import ProcessorInspection
class FakeJvm:
def __init__(self):
self.java = FakeJava()
class FakeJava:
def __init__(self):
self.util = FakeJavaUtil()
class FakeJavaUtil:
def ArrayList(self):
return FakeArrayList([])
class FakeArrayList:
def __init__(self, my_list):
self.my_list = my_list
def __len__(self):
return len(self.my_list)
def __iter__(self):
return iter(self.my_list)
def add(self, element):
self.my_list.append(element)
def set_up_env():
python_command = sys.executable
os.environ["PYTHON_CMD"] = python_command
JvmHolder.jvm = FakeJvm()
def get_processor_details(test_fixture, processor_name, processor_file, extension_home):
class_nodes = ProcessorInspection.get_processor_class_nodes(processor_file)
test_fixture.assertIsNotNone(class_nodes)
test_fixture.assertEqual(len(class_nodes), 1)
class_node = class_nodes[0]
test_fixture.assertEqual(class_node.name, processor_name)
return ProcessorInspection.get_processor_details(class_node, processor_file, extension_home, False)

View File

@ -0,0 +1,33 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from nifiapi.flowfiletransform import FlowFileTransform, FlowFileTransformResult
class ProcessorWithDependencies(FlowFileTransform):
class Java:
implements = ['org.apache.nifi.python.processor.FlowFileTransform']
class ProcessorDetails:
description = "This processor depends on both google-cloud-vision and pymilvus"
version = '0.0.1'
tags = ['cloud', 'vision', 'milvus']
dependencies = ['pymilvus==2.4.4']
def __init__(self):
pass
def transform(self, context, flow_file):
self.logger.info("ProcessorWithDependencies is returning")
return FlowFileTransformResult('success', contents='foobar')

View File

@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
google-cloud-vision==3.7.4