NIFI-12321 Avoid Exceptions when removing Python Processors (#7984)

- Updated StandardPythonBridge.onProcessorRemoved to avoid throwing exceptions when called with a Processor Type and Version that is not registered
- Updated system-tests workflow to run on changes in the nifi-py4j-bundle
This commit is contained in:
exceptionfactory 2023-11-06 15:17:41 -06:00 committed by GitHub
parent b24249c850
commit b50557b854
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 30 additions and 23 deletions

View File

@ -26,6 +26,7 @@ on:
- 'nifi-api/**'
- 'nifi-framework-api/**'
- 'nifi-nar-bundles/nifi-framework-bundle/**'
- 'nifi-nar-bundles/nifi-py4j-bundle/**'
- 'nifi-stateless/**'
pull_request:
paths:
@ -34,6 +35,7 @@ on:
- 'nifi-api/**'
- 'nifi-framework-api/**'
- 'nifi-nar-bundles/nifi-framework-bundle/**'
- 'nifi-nar-bundles/nifi-py4j-bundle/**'
- 'nifi-stateless/**'
env:

View File

@ -96,7 +96,8 @@ public class StandardPythonBridge implements PythonBridge {
private PythonProcessorBridge createProcessorBridge(final String identifier, final String type, final String version, final boolean preferIsolatedProcess) {
ensureStarted();
final ExtensionId extensionId = getExtensionId(type, version);
final Optional<ExtensionId> extensionIdFound = findExtensionId(type, version);
final ExtensionId extensionId = extensionIdFound.orElseThrow(() -> new IllegalArgumentException("Processor Type [%s] Version [%s] not found".formatted(type, version)));
logger.debug("Creating Python Processor Type [{}] Version [{}]", extensionId.type(), extensionId.version());
final PythonProcess pythonProcess = getProcessForNextComponent(extensionId, identifier, preferIsolatedProcess);
@ -151,26 +152,32 @@ public class StandardPythonBridge implements PythonBridge {
@Override
public synchronized void onProcessorRemoved(final String identifier, final String type, final String version) {
final ExtensionId extensionId = getExtensionId(type, version);
final List<PythonProcess> processes = processesByProcessorType.get(extensionId);
if (processes == null) {
return;
}
final Optional<ExtensionId> extensionIdFound = findExtensionId(type, version);
// Find the Python Process that has the Processor, if any, and remove it.
// If there are no additional Processors in the Python Process, remove it from our list and shut down the process.
final Iterator<PythonProcess> processItr = processes.iterator(); // Use iterator so we can call remove()
while (processItr.hasNext()) {
final PythonProcess process = processItr.next();
final boolean removed = process.removeProcessor(identifier);
if (removed && process.getProcessorCount() == 0) {
processItr.remove();
process.shutdown();
break;
if (extensionIdFound.isPresent()) {
final ExtensionId extensionId = extensionIdFound.get();
final List<PythonProcess> processes = processesByProcessorType.get(extensionId);
if (processes == null) {
return;
}
}
processorCountByType.merge(extensionId, -1, Integer::sum);
// Find the Python Process that has the Processor, if any, and remove it.
// If there are no additional Processors in the Python Process, remove it from our list and shut down the process.
final Iterator<PythonProcess> processItr = processes.iterator(); // Use iterator so we can call remove()
while (processItr.hasNext()) {
final PythonProcess process = processItr.next();
final boolean removed = process.removeProcessor(identifier);
if (removed && process.getProcessorCount() == 0) {
processItr.remove();
process.shutdown();
break;
}
}
processorCountByType.merge(extensionId, -1, Integer::sum);
} else {
logger.debug("Processor Type [{}] Version [{}] not found", type, version);
}
}
public int getTotalProcessCount() {
@ -310,15 +317,13 @@ public class StandardPythonBridge implements PythonBridge {
return "StandardPythonBridge";
}
private ExtensionId getExtensionId(final String type, final String version) {
private Optional<ExtensionId> findExtensionId(final String type, final String version) {
final List<PythonProcessorDetails> processorTypes = controllerProcess.getController().getProcessorTypes();
final Optional<PythonProcessorDetails> processorTypeFound = processorTypes.stream()
return processorTypes.stream()
.filter(details -> details.getProcessorType().equals(type))
.filter(details -> details.getProcessorVersion().equals(version))
.map(details -> new ExtensionId(details.getProcessorType(), details.getProcessorVersion()))
.findFirst();
return processorTypeFound.map(details -> new ExtensionId(details.getProcessorType(), details.getProcessorVersion()))
.orElseThrow(() -> new IllegalArgumentException(String.format("Processor Type [%s] Version [%s] not found", type, version)));
}
private record ExtensionId(String type, String version) {