NIFI-12326: When a Python Processor is created, it attempts to download dependencies in the background and then load the processor code. If that fails, it previously gave up; now it will log the exception and keep trying

This closes #7990

Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
Mark Payne 2023-11-06 16:23:36 -05:00 committed by exceptionfactory
parent 940c9276c5
commit a978406c83
No known key found for this signature in database
GPG Key ID: 29B6A52D2AAE8DBA
5 changed files with 63 additions and 25 deletions

View File

@ -235,7 +235,7 @@ public class PythonProcess {
private void installDebugPy() throws IOException {
final String pythonCommand = processConfig.getPythonCommand();
final ProcessBuilder processBuilder = new ProcessBuilder(pythonCommand, "-m", "pip", "install", "--upgrade", "debugpy", "--target",
final ProcessBuilder processBuilder = new ProcessBuilder(pythonCommand, "-m", "pip", "install", "--no-cache-dir", "--upgrade", "debugpy", "--target",
virtualEnvHome.getAbsolutePath());
processBuilder.directory(virtualEnvHome);

View File

@ -27,6 +27,7 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import static org.apache.nifi.components.AsyncLoadedProcessor.LoadState;
@ -63,31 +64,69 @@ public class StandardPythonProcessorBridge implements PythonProcessorBridge {
this.initializationContext = context;
final String threadName = "Initialize Python Processor %s (%s)".formatted(initializationContext.getIdentifier(), getProcessorType());
Thread.ofVirtual().name(threadName).start(this::initializePythonSide);
Thread.ofVirtual().name(threadName).start(() -> initializePythonSide(true));
}
public LoadState getLoadState() {
return loadState;
}
private void initializePythonSide() {
try {
creationWorkflow.downloadDependencies();
loadState = LoadState.LOADING_PROCESSOR_CODE;
} catch (final Exception e) {
loadState = LoadState.DEPENDENCY_DOWNLOAD_FAILED;
throw e;
private void initializePythonSide(final boolean continualRetry) {
long sleepMillis = 1_000L;
while (true) {
loadState = LoadState.DOWNLOADING_DEPENDENCIES;
try {
creationWorkflow.downloadDependencies();
logger.info("Successfully downloaded dependencies for Python Processor {} ({})", initializationContext.getIdentifier(), getProcessorType());
break;
} catch (final Exception e) {
loadState = LoadState.DEPENDENCY_DOWNLOAD_FAILED;
if (!continualRetry) {
throw e;
}
sleepMillis = Math.min(sleepMillis * 2, TimeUnit.MINUTES.toMillis(10));
logger.error("Failed to download dependencies for Python Processor {} ({}). Will try again in {} millis", initializationContext.getIdentifier(), getProcessorType(), sleepMillis);
try {
Thread.sleep(sleepMillis);
} catch (final InterruptedException ex) {
Thread.currentThread().interrupt();
e.addSuppressed(ex);
throw e;
}
}
}
final PythonProcessorAdapter pythonProcessorAdapter;
try {
pythonProcessorAdapter = creationWorkflow.createProcessor();
pythonProcessorAdapter.initialize(initializationContext);
this.adapter = pythonProcessorAdapter;
loadState = LoadState.FINISHED_LOADING;
} catch (final Exception e) {
loadState = LoadState.LOADING_PROCESSOR_CODE_FAILED;
throw e;
while (true) {
loadState = LoadState.LOADING_PROCESSOR_CODE;
try {
final PythonProcessorAdapter pythonProcessorAdapter = creationWorkflow.createProcessor();
pythonProcessorAdapter.initialize(initializationContext);
this.adapter = pythonProcessorAdapter;
loadState = LoadState.FINISHED_LOADING;
logger.info("Successfully loaded Python Processor {} ({})", initializationContext.getIdentifier(), getProcessorType());
break;
} catch (final Exception e) {
loadState = LoadState.LOADING_PROCESSOR_CODE_FAILED;
if (!continualRetry) {
throw e;
}
sleepMillis = Math.min(sleepMillis * 2, TimeUnit.MINUTES.toMillis(10));
logger.error("Failed to load code for Python Processor {} ({}). Will try again in {} millis", initializationContext.getIdentifier(), getProcessorType(), sleepMillis);
try {
Thread.sleep(sleepMillis);
} catch (final InterruptedException ex) {
Thread.currentThread().interrupt();
e.addSuppressed(ex);
throw e;
}
}
}
}
@ -104,7 +143,7 @@ public class StandardPythonProcessorBridge implements PythonProcessorBridge {
}
controller.reloadProcessor(getProcessorType(), processorDetails.getProcessorVersion(), workingDir.getAbsolutePath());
initializePythonSide();
initializePythonSide(false);
lastModified = moduleFile.lastModified();
return true;

View File

@ -485,7 +485,7 @@ class ExtensionManager:
package_dir = os.path.dirname(processor_details.source_location)
requirements_file = os.path.join(package_dir, 'requirements.txt')
if os.path.exists(requirements_file):
args = [python_cmd, '-m', 'pip', 'install', '--target', target_dir, '-r', requirements_file]
args = [python_cmd, '-m', 'pip', 'install', '--no-cache-dir', '--target', target_dir, '-r', requirements_file]
logger.info(f"Importing dependencies from requirements file for package {package_dir} to {target_dir} using command {args}")
result = subprocess.run(args)
@ -498,7 +498,7 @@ class ExtensionManager:
dependencies = processor_details.getDependencies()
if len(dependencies) > 0:
python_cmd = os.getenv("PYTHON_CMD")
args = [python_cmd, '-m', 'pip', 'install', '--target', target_dir]
args = [python_cmd, '-m', 'pip', 'install', '--no-cache-dir', '--target', target_dir]
for dep in dependencies:
args.append(dep)

View File

@ -130,7 +130,7 @@ import java.util.stream.Stream;
"Replacement Value Strategy" = "Literal Value"
A single additional property is added to the Processor. The name of the property is a RecordPath identifying the field to update.
The value is an Expression Language expression that references the `field.name` variable. For example, to change the date/time format of \
The value is an Expression Language expression that references the `field.value` variable. For example, to change the date/time format of \
a field named `txDate` from `year-month-day` format to `month/day/year` format, we add a property named `/txDate` with a value of \
`${field.value:toDate('yyyy-MM-dd'):format('MM/dd/yyyy')}`. We could also change the timezone of a timestamp field (and insert the timezone for clarity) by using a value of \
`${field.value:toDate('yyyy-MM-dd HH:mm:ss', 'UTC-0400'):format('yyyy-MM-dd HH:mm:ss Z', 'UTC')}`.

View File

@ -33,7 +33,6 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.junit.jupiter.api.Assertions.assertEquals;
@ -155,7 +154,7 @@ public class PythonProcessorIT extends NiFiSystemIT {
final String firstRecordLine = lines[1];
final List<String> firstRecordValues = Stream.of(firstRecordLine.split(","))
.map(String::trim)
.collect(Collectors.toList());
.toList();
assertEquals("Jane Doe", firstRecordValues.get( headerIndices.get("name") ));
assertEquals("yellow", firstRecordValues.get( headerIndices.get("color") ));
assertEquals("3", firstRecordValues.get( headerIndices.get("age") ));
@ -165,7 +164,7 @@ public class PythonProcessorIT extends NiFiSystemIT {
final String secondRecordLine = lines[2];
final List<String> secondRecordValues = Stream.of(secondRecordLine.split(","))
.map(String::trim)
.collect(Collectors.toList());
.toList();
assertEquals("Jake Doe", secondRecordValues.get( headerIndices.get("name") ));
assertEquals("yellow", secondRecordValues.get( headerIndices.get("color") ));
assertEquals("3", secondRecordValues.get( headerIndices.get("age") ));