NIFI-12310 Validate Python Component Type for Processes (#7972)

* NIFI-12310 Validated Python Component Type for Processes
- Updated StandardPythonBridge to validate requested Component Type and Version against available Python Processors
This commit is contained in:
exceptionfactory 2023-11-03 13:42:08 -05:00 committed by GitHub
parent dcc4b8590f
commit 309061ca2d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 27 additions and 52 deletions

View File

@ -41,7 +41,7 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Supplier;
import java.util.stream.Collectors;
@ -57,7 +57,6 @@ public class StandardPythonBridge implements PythonBridge {
private final Map<ExtensionId, Integer> processorCountByType = new ConcurrentHashMap<>();
private final Map<ExtensionId, List<PythonProcess>> processesByProcessorType = new ConcurrentHashMap<>();
@Override
public void initialize(final PythonBridgeInitializationContext context) {
this.processConfig = context.getPythonProcessConfig();
@ -84,7 +83,6 @@ public class StandardPythonBridge implements PythonBridge {
}
}
@Override
public void discoverExtensions() {
ensureStarted();
@ -98,9 +96,10 @@ public class StandardPythonBridge implements PythonBridge {
private PythonProcessorBridge createProcessorBridge(final String identifier, final String type, final String version, final boolean preferIsolatedProcess) {
ensureStarted();
logger.debug("Creating Python Processor of type {}", type);
final ExtensionId extensionId = getExtensionId(type, version);
logger.debug("Creating Python Processor Type [{}] Version [{}]", extensionId.type(), extensionId.version());
final PythonProcess pythonProcess = getProcessForNextComponent(type, identifier, version, preferIsolatedProcess);
final PythonProcess pythonProcess = getProcessForNextComponent(extensionId, identifier, preferIsolatedProcess);
final String workDirPath = processConfig.getPythonWorkingDirectory().getAbsolutePath();
final PythonController controller = pythonProcess.getController();
@ -127,7 +126,6 @@ public class StandardPythonBridge implements PythonBridge {
.build();
pythonProcess.addProcessor(identifier, preferIsolatedProcess);
final ExtensionId extensionId = new ExtensionId(type, version);
processorCountByType.merge(extensionId, 1, Integer::sum);
return processorBridge;
}
@ -153,7 +151,7 @@ public class StandardPythonBridge implements PythonBridge {
@Override
public synchronized void onProcessorRemoved(final String identifier, final String type, final String version) {
final ExtensionId extensionId = new ExtensionId(type, version);
final ExtensionId extensionId = getExtensionId(type, version);
final List<PythonProcess> processes = processesByProcessorType.get(extensionId);
if (processes == null) {
return;
@ -161,7 +159,7 @@ public class StandardPythonBridge implements PythonBridge {
// Find the Python Process that has the Processor, if any, and remove it.
// If there are no additional Processors in the Python Process, remove it from our list and shut down the process.
final Iterator<PythonProcess> processItr = processes.iterator(); // Use iter so we can call remove()
final Iterator<PythonProcess> processItr = processes.iterator(); // Use iterator so we can call remove()
while (processItr.hasNext()) {
final PythonProcess process = processItr.next();
final boolean removed = process.removeProcessor(identifier);
@ -183,8 +181,7 @@ public class StandardPythonBridge implements PythonBridge {
return count;
}
private synchronized PythonProcess getProcessForNextComponent(final String type, final String componentId, final String version, final boolean preferIsolatedProcess) {
final ExtensionId extensionId = new ExtensionId(type, version);
private synchronized PythonProcess getProcessForNextComponent(final ExtensionId extensionId, final String componentId, final boolean preferIsolatedProcess) {
final int processorsOfThisType = processorCountByType.getOrDefault(extensionId, 0);
final int processIndex = processorsOfThisType % processConfig.getMaxPythonProcessesPerType();
@ -196,7 +193,7 @@ public class StandardPythonBridge implements PythonBridge {
final List<PythonProcess> processesForType = processesByProcessorType.computeIfAbsent(extensionId, key -> new ArrayList<>());
for (final PythonProcess pythonProcess : processesForType) {
if (!preferIsolatedProcess || !pythonProcess.containsIsolatedProcessor()) {
logger.debug("Using {} to create Processor of type {}", pythonProcess, type);
logger.debug("Using {} to create Processor of type {}", pythonProcess, extensionId.type());
return pythonProcess;
}
}
@ -211,12 +208,12 @@ public class StandardPythonBridge implements PythonBridge {
}
logger.info("In order to create Python Processor of type {}, launching a new Python Process because there are currently {} Python Processors of this type and {} Python Processes",
type, processorsOfThisType, processesByProcessorType.size());
extensionId.type(), processorsOfThisType, processesByProcessorType.size());
final File extensionsWorkDir = new File(processConfig.getPythonWorkingDirectory(), "extensions");
final File componentTypeHome = new File(extensionsWorkDir, type);
final File envHome = new File(componentTypeHome, version);
final PythonProcess pythonProcess = new PythonProcess(processConfig, serviceTypeLookup, envHome, type, componentId);
final File componentTypeHome = new File(extensionsWorkDir, extensionId.type());
final File envHome = new File(componentTypeHome, extensionId.version());
final PythonProcess pythonProcess = new PythonProcess(processConfig, serviceTypeLookup, envHome, extensionId.type(), componentId);
pythonProcess.start();
final List<String> extensionsDirs = processConfig.getPythonExtensionsDirectories().stream()
@ -230,13 +227,14 @@ public class StandardPythonBridge implements PythonBridge {
return pythonProcess;
} catch (final IOException ioe) {
throw new RuntimeException("Failed launch Python Process in order to create new Processor of type " + type, ioe);
final String message = String.format("Failed to launch Process for Python Processor [%s] Version [%s]", extensionId.type(), extensionId.version());
throw new RuntimeException(message, ioe);
}
} else {
final PythonProcess pythonProcess = processesForType.get(processIndex);
logger.warn("Using existing process {} to create Processor of type {} because configuration indicates that no more than {} processes " +
"should be created for any Processor Type. This may result in slower performance for Processors of this type",
pythonProcess, type, processConfig.getMaxPythonProcessesPerType());
pythonProcess, extensionId.type(), processConfig.getMaxPythonProcessesPerType());
return pythonProcess;
}
@ -253,7 +251,7 @@ public class StandardPythonBridge implements PythonBridge {
final Map<String, Integer> counts = new HashMap<>(processesByProcessorType.size());
for (final Map.Entry<ExtensionId, List<PythonProcess>> entry : processesByProcessorType.entrySet()) {
counts.put(entry.getKey().getType() + " version " + entry.getKey().getVersion(), entry.getValue().size());
counts.put(entry.getKey().type() + " version " + entry.getKey().version(), entry.getValue().size());
}
return counts;
@ -269,7 +267,7 @@ public class StandardPythonBridge implements PythonBridge {
for (final PythonProcess process : processes) {
final Map<String, Integer> counts = process.getJavaObjectBindingCounts();
final BoundObjectCounts boundObjectCounts = new StandardBoundObjectCounts(process.toString(), extensionId.getType(), extensionId.getVersion(), counts);
final BoundObjectCounts boundObjectCounts = new StandardBoundObjectCounts(process.toString(), extensionId.type(), extensionId.version(), counts);
list.add(boundObjectCounts);
}
}
@ -312,40 +310,17 @@ public class StandardPythonBridge implements PythonBridge {
return "StandardPythonBridge";
}
private ExtensionId getExtensionId(final String type, final String version) {
final List<PythonProcessorDetails> processorTypes = controllerProcess.getController().getProcessorTypes();
final Optional<PythonProcessorDetails> processorTypeFound = processorTypes.stream()
.filter(details -> details.getProcessorType().equals(type))
.filter(details -> details.getProcessorVersion().equals(version))
.findFirst();
private static class ExtensionId {
private final String type;
private final String version;
return processorTypeFound.map(details -> new ExtensionId(details.getProcessorType(), details.getProcessorVersion()))
.orElseThrow(() -> new IllegalArgumentException(String.format("Processor Type [%s] Version [%s] not found", type, version)));
}
public ExtensionId(final String type, final String version) {
this.type = type;
this.version = version;
}
public String getType() {
return type;
}
public String getVersion() {
return version;
}
@Override
public boolean equals(final Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final ExtensionId that = (ExtensionId) o;
return Objects.equals(type, that.type) && Objects.equals(version, that.version);
}
@Override
public int hashCode() {
return Objects.hash(type, version);
}
private record ExtensionId(String type, String version) {
}
}