From bd110317256043ec1ac2aeabc75a4f01f1a9dbf0 Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Mon, 5 Feb 2024 16:56:52 -0500 Subject: [PATCH] NIFI-12659 Respawn Python Processes on Unexpected Termination Refactored so that when a Python Process dies, NiFi will detect this, respawn the process, recreate the Processors that exist in the process, re-initialize them, and restart them. In testing, found the PythonControllerInteractionIT had bugs that were causing Python Processors to be re-initialized many times; this resulted in threading issues that caused processors to be invalid, indicating that property descriptors didn't exist, etc. Addressed these concerns in the same commit, since they were necessary to properly run tests Ensure that ClassLoader is consistently established for python processor proxies; ensure that if we re-initialize python processor the old initialization thread is stopped This closes #8363 Signed-off-by: David Handermann --- .../apache/nifi/util/MockProcessContext.java | 2 +- .../py4j/CachingPythonProcessorDetails.java | 133 +++++++++++++++++ .../org/apache/nifi/py4j/PythonProcess.java | 137 +++++++++++++++++- .../nifi/py4j/StandardPythonBridge.java | 37 +---- .../py4j/StandardPythonProcessorBridge.java | 57 ++++++-- .../py4j/server/NiFiGatewayConnection.java | 21 ++- .../nifi/py4j/server/NiFiGatewayServer.java | 18 ++- .../processor/FlowFileTransformProxy.java | 18 +-- .../processor/PythonProcessorProxy.java | 28 +++- .../processor/RecordTransformProxy.java | 18 +-- .../PythonControllerInteractionIT.java | 106 ++++++++------ .../processor/PythonProcessorBridge.java | 3 + .../extensions/ExitAfterFourInvocations.py | 44 ++++++ 13 files changed, 482 insertions(+), 140 deletions(-) create mode 100644 nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/py4j/CachingPythonProcessorDetails.java create mode 100644 nifi-nar-bundles/nifi-py4j-bundle/nifi-python-test-extensions/src/main/resources/extensions/ExitAfterFourInvocations.py diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessContext.java b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessContext.java index fe31628fce..1c09c436ed 100644 --- a/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessContext.java +++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessContext.java @@ -406,7 +406,7 @@ public class MockProcessContext extends MockControllerServiceLookup implements P for (final ValidationResult result : validate()) { if (!result.isValid()) { - sb.append(result.toString()).append("\n"); + sb.append(result).append("\n"); failureCount++; } } diff --git a/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/py4j/CachingPythonProcessorDetails.java b/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/py4j/CachingPythonProcessorDetails.java new file mode 100644 index 0000000000..6a9b36a532 --- /dev/null +++ b/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/py4j/CachingPythonProcessorDetails.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.py4j; + +import org.apache.nifi.python.PythonProcessorDetails; +import org.apache.nifi.python.processor.documentation.MultiProcessorUseCaseDetails; +import org.apache.nifi.python.processor.documentation.PropertyDescription; +import org.apache.nifi.python.processor.documentation.UseCaseDetails; + +import java.util.List; + +/** + * A wrapper around a PythonProcessorDetails that caches the results of the delegate's methods. + * Making calls to the Python side is relatively expensive, and we make many calls to {@link #getProcessorType()}, + * {@link #getProcessorVersion()}, etc. This simple wrapper allows us to make the calls only once. + */ +public class CachingPythonProcessorDetails implements PythonProcessorDetails { + private final PythonProcessorDetails delegate; + private volatile String processorType; + private volatile String processorVersion; + private volatile String capabilityDescription; + private volatile String sourceLocation; + private volatile List tags; + private volatile List dependencies; + private volatile String processorInterface; + private volatile List useCases; + private volatile List multiProcessorUseCases; + private volatile List propertyDescriptions; + + + public CachingPythonProcessorDetails(final PythonProcessorDetails delegate) { + this.delegate = delegate; + } + + @Override + public String getProcessorType() { + if (processorType == null) { + processorType = delegate.getProcessorType(); + } + return processorType; + } + + @Override + public String getProcessorVersion() { + if (processorVersion == null) { + processorVersion = delegate.getProcessorVersion(); + } + return processorVersion; + } + + @Override + public String getSourceLocation() { + if (sourceLocation == null) { + sourceLocation = delegate.getSourceLocation(); + } + return sourceLocation; + } + + @Override + public List getTags() { + if (tags == null) { + tags = delegate.getTags(); + } + return tags; + } + + @Override + public List getDependencies() { + if (dependencies == null) { + dependencies = delegate.getDependencies(); + } + return dependencies; + } + + @Override + public String getCapabilityDescription() { + if (capabilityDescription == null) { + capabilityDescription = delegate.getCapabilityDescription(); + } + return capabilityDescription; + } + + @Override + public String getInterface() { + if (processorInterface == null) { + processorInterface = delegate.getInterface(); + } + return processorInterface; + } + + @Override + public List getUseCases() { + if (useCases == null) { + useCases = delegate.getUseCases(); + } + return useCases; + } + + @Override + public List getMultiProcessorUseCases() { + if (multiProcessorUseCases == null) { + multiProcessorUseCases = delegate.getMultiProcessorUseCases(); + } + return multiProcessorUseCases; + } + + @Override + public List getPropertyDescriptions() { + if (propertyDescriptions == null) { + propertyDescriptions = delegate.getPropertyDescriptions(); + } + return propertyDescriptions; + } + + @Override + public void free() { + } +} diff --git a/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/py4j/PythonProcess.java b/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/py4j/PythonProcess.java index c540c4cda4..10ca232516 100644 --- a/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/py4j/PythonProcess.java +++ b/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/py4j/PythonProcess.java @@ -24,6 +24,9 @@ import org.apache.nifi.py4j.server.NiFiGatewayServer; import org.apache.nifi.python.ControllerServiceTypeLookup; import org.apache.nifi.python.PythonController; import org.apache.nifi.python.PythonProcessConfig; +import org.apache.nifi.python.PythonProcessorDetails; +import org.apache.nifi.python.processor.PythonProcessorAdapter; +import org.apache.nifi.python.processor.PythonProcessorBridge; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import py4j.CallbackClient; @@ -40,7 +43,9 @@ import java.util.Base64; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.TimeUnit; public class PythonProcess { @@ -57,6 +62,10 @@ public class PythonProcess { private Process process; private NiFiPythonGateway gateway; private final Map processorPrefersIsolation = new ConcurrentHashMap<>(); + private final Set createdProcessors = new CopyOnWriteArraySet<>(); + private volatile boolean shutdown = false; + private volatile List extensionDirs; + private volatile String workDir; public PythonProcess(final PythonProcessConfig processConfig, final ControllerServiceTypeLookup controllerServiceTypeLookup, final File virtualEnvHome, @@ -68,11 +77,17 @@ public class PythonProcess { this.componentId = componentId; } - public PythonController getController() { + /** + * Returns the current Controller for the Python Process. Note that the Controller + * may change if the Python Process dies and is restarted. As a result, the value should never be + * cached and reused later. + * @return the current Controller for the Python Process + */ + PythonController getCurrentController() { return controller; } - public void start() throws IOException { + public synchronized void start() throws IOException { final ServerSocketFactory serverSocketFactory = ServerSocketFactory.getDefault(); final SocketFactory socketFactory = SocketFactory.getDefault(); @@ -101,6 +116,7 @@ public class PythonProcess { setupEnvironment(); this.process = launchPythonProcess(listeningPort, authToken); + this.process.onExit().thenAccept(this::handlePythonProcessDied); final StandardPythonClient pythonClient = new StandardPythonClient(gateway); controller = pythonClient.getController(); @@ -112,6 +128,7 @@ public class PythonProcess { try { final String pingResponse = controller.ping(); pingSuccessful = "pong".equals(pingResponse); + if (pingSuccessful) { break; } else { @@ -138,6 +155,52 @@ public class PythonProcess { logger.info("Successfully started and pinged Python Server. Python Process = {}", process); } + private void handlePythonProcessDied(final Process process) { + if (isShutdown()) { + // If shutdown, don't try to restart the Process + logger.info("Python Process {} exited with code {}", process, process.exitValue()); + return; + } + + final List processorsInvolved = this.createdProcessors.stream() + .map(coordinates -> "%s (%s)".formatted(coordinates.identifier(), coordinates.type())) + .toList(); + + logger.error("Python Process {} with Processors {} died unexpectedly with exit code {}. Restarting...", process, processorsInvolved, process.exitValue()); + long backoff = 1000L; + while (!isShutdown()) { + try { + // Ensure that we clean up any resources + killProcess(); + + // Restart the Process and establish new communications + start(); + + // Ensure that we re-discover any extensions, as this is necessary in order to create Processors + if (extensionDirs != null && workDir != null) { + discoverExtensions(extensionDirs, workDir); + recreateProcessors(); + } + + return; + } catch (final Exception e) { + // If we fail to restart the Python Process, we'll keep trying, as long as the Process isn't intentionally shutdown. + logger.error("Failed to restart Python Process with Processors {}; will keep trying", processorsInvolved, e); + + try { + // Sleep to avoid constantly hitting resources that are potentially already constrained + Thread.sleep(backoff); + + // Exponentially backoff, but cap at 60 seconds + backoff = Math.min(60000L, backoff * 2); + } catch (final InterruptedException ex) { + Thread.currentThread().interrupt(); + throw new RuntimeException(ex); + } + } + } + } + private String generateAuthToken() { final SecureRandom random = new SecureRandom(); final byte[] bytes = new byte[20]; @@ -255,15 +318,25 @@ public class PythonProcess { } } - public void shutdown() { - logger.info("Shutting down Python Process {}", process); + public boolean isShutdown() { + return shutdown; + } + public void shutdown() { + shutdown = true; + logger.info("Shutting down Python Process {}", process); + killProcess(); + } + + private synchronized void killProcess() { if (server != null) { try { server.shutdown(); } catch (final Exception e) { logger.error("Failed to cleanly shutdown Py4J server", e); } + + server = null; } if (gateway != null) { @@ -272,6 +345,8 @@ public class PythonProcess { } catch (final Exception e) { logger.error("Failed to cleanly shutdown Py4J Gateway", e); } + + gateway = null; } if (process != null) { @@ -280,13 +355,61 @@ public class PythonProcess { } catch (final Exception e) { logger.error("Failed to cleanly shutdown Py4J process", e); } + + process = null; } } - void addProcessor(final String identifier, final boolean prefersIsolation) { - processorPrefersIsolation.put(identifier, prefersIsolation); + public void discoverExtensions(final List directories, final String workDirectory) { + extensionDirs = new ArrayList<>(directories); + workDir = workDirectory; + controller.discoverExtensions(directories, workDirectory); } + public PythonProcessorBridge createProcessor(final String identifier, final String type, final String version, final String workDirPath) { + final ProcessorCreationWorkflow creationWorkflow = new ProcessorCreationWorkflow() { + @Override + public void downloadDependencies() { + controller.downloadDependencies(type, version, workDirPath); + } + + @Override + public PythonProcessorAdapter createProcessor() { + return controller.createProcessor(type, version, workDirPath); + } + }; + + // Create a PythonProcessorDetails and then call getProcessorType and getProcessorVersion to ensure that the details are cached + final PythonProcessorDetails processorDetails = new CachingPythonProcessorDetails(controller.getProcessorDetails(type, version)); + processorDetails.getProcessorType(); + processorDetails.getProcessorVersion(); + + final PythonProcessorBridge processorBridge = new StandardPythonProcessorBridge.Builder() + .controller(controller) + .creationWorkflow(creationWorkflow) + .processorDetails(processorDetails) + .workingDirectory(processConfig.getPythonWorkingDirectory()) + .moduleFile(new File(controller.getModuleFile(type, version))) + .build(); + + final CreatedProcessor createdProcessor = new CreatedProcessor(identifier, type, processorBridge); + createdProcessors.add(createdProcessor); + + return processorBridge; + } + + /** + * Updates all Processor Bridges to use the new Controller. This will cause the Processor Bridges to re-initialize + * themselves and recreate the Python Processors that they interact with. + */ + private void recreateProcessors() { + for (final CreatedProcessor createdProcessor : createdProcessors) { + createdProcessor.processorBridge().replaceController(controller); + logger.info("Recreated Processor {} ({}) in Python Process {}", createdProcessor.identifier(), createdProcessor.type(), process); + } + } + + public boolean containsIsolatedProcessor() { return processorPrefersIsolation.containsValue(Boolean.TRUE); } @@ -303,4 +426,6 @@ public class PythonProcess { public Map getJavaObjectBindingCounts() { return gateway.getObjectBindings().getCountsPerClass(); } + + private record CreatedProcessor(String identifier, String type, PythonProcessorBridge processorBridge) {} } \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/py4j/StandardPythonBridge.java b/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/py4j/StandardPythonBridge.java index e3f735bb01..bb3cbe496a 100644 --- a/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/py4j/StandardPythonBridge.java +++ b/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/py4j/StandardPythonBridge.java @@ -22,12 +22,10 @@ import org.apache.nifi.python.BoundObjectCounts; import org.apache.nifi.python.ControllerServiceTypeLookup; import org.apache.nifi.python.PythonBridge; import org.apache.nifi.python.PythonBridgeInitializationContext; -import org.apache.nifi.python.PythonController; import org.apache.nifi.python.PythonProcessConfig; import org.apache.nifi.python.PythonProcessorDetails; import org.apache.nifi.python.processor.FlowFileTransform; import org.apache.nifi.python.processor.FlowFileTransformProxy; -import org.apache.nifi.python.processor.PythonProcessorAdapter; import org.apache.nifi.python.processor.PythonProcessorBridge; import org.apache.nifi.python.processor.RecordTransform; import org.apache.nifi.python.processor.RecordTransformProxy; @@ -90,7 +88,7 @@ public class StandardPythonBridge implements PythonBridge { .map(File::getAbsolutePath) .collect(Collectors.toList()); final String workDirPath = processConfig.getPythonWorkingDirectory().getAbsolutePath(); - controllerProcess.getController().discoverExtensions(extensionsDirs, workDirPath); + controllerProcess.discoverExtensions(extensionsDirs, workDirPath); } private PythonProcessorBridge createProcessorBridge(final String identifier, final String type, final String version, final boolean preferIsolatedProcess) { @@ -103,30 +101,7 @@ public class StandardPythonBridge implements PythonBridge { final PythonProcess pythonProcess = getProcessForNextComponent(extensionId, identifier, preferIsolatedProcess); final String workDirPath = processConfig.getPythonWorkingDirectory().getAbsolutePath(); - final PythonController controller = pythonProcess.getController(); - - final ProcessorCreationWorkflow creationWorkflow = new ProcessorCreationWorkflow() { - @Override - public void downloadDependencies() { - controller.downloadDependencies(type, version, workDirPath); - } - - @Override - public PythonProcessorAdapter createProcessor() { - return controller.createProcessor(type, version, workDirPath); - } - }; - - final PythonProcessorDetails processorDetails = controller.getProcessorDetails(type, version); - final PythonProcessorBridge processorBridge = new StandardPythonProcessorBridge.Builder() - .controller(controller) - .creationWorkflow(creationWorkflow) - .processorDetails(processorDetails) - .workingDirectory(processConfig.getPythonWorkingDirectory()) - .moduleFile(new File(controller.getModuleFile(type, version))) - .build(); - - pythonProcess.addProcessor(identifier, preferIsolatedProcess); + final PythonProcessorBridge processorBridge = pythonProcess.createProcessor(identifier, type, version, workDirPath); processorCountByType.merge(extensionId, 1, Integer::sum); return processorBridge; } @@ -239,7 +214,7 @@ public class StandardPythonBridge implements PythonBridge { .map(File::getAbsolutePath) .collect(Collectors.toList()); final String workDirPath = processConfig.getPythonWorkingDirectory().getAbsolutePath(); - pythonProcess.getController().discoverExtensions(extensionsDirs, workDirPath); + pythonProcess.discoverExtensions(extensionsDirs, workDirPath); // Add the newly create process to the processes for the given type of processor. processesForType.add(pythonProcess); @@ -262,7 +237,7 @@ public class StandardPythonBridge implements PythonBridge { @Override public List getProcessorTypes() { ensureStarted(); - return controllerProcess.getController().getProcessorTypes(); + return controllerProcess.getCurrentController().getProcessorTypes(); } @Override @@ -321,7 +296,7 @@ public class StandardPythonBridge implements PythonBridge { @Override public void ping() { - controllerProcess.getController().ping(); + controllerProcess.getCurrentController().ping(); } @Override @@ -330,7 +305,7 @@ public class StandardPythonBridge implements PythonBridge { } private Optional findExtensionId(final String type, final String version) { - final List processorTypes = controllerProcess.getController().getProcessorTypes(); + final List processorTypes = controllerProcess.getCurrentController().getProcessorTypes(); return processorTypes.stream() .filter(details -> details.getProcessorType().equals(type)) .filter(details -> details.getProcessorVersion().equals(version)) diff --git a/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/py4j/StandardPythonProcessorBridge.java b/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/py4j/StandardPythonProcessorBridge.java index 848f2c3d0f..9546b1e0e2 100644 --- a/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/py4j/StandardPythonProcessorBridge.java +++ b/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/py4j/StandardPythonProcessorBridge.java @@ -27,6 +27,7 @@ import org.slf4j.LoggerFactory; import java.io.File; import java.util.Optional; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import static org.apache.nifi.components.AsyncLoadedProcessor.LoadState; @@ -34,7 +35,6 @@ import static org.apache.nifi.components.AsyncLoadedProcessor.LoadState; public class StandardPythonProcessorBridge implements PythonProcessorBridge { private static final Logger logger = LoggerFactory.getLogger(StandardPythonProcessorBridge.class); - private final PythonController controller; private final ProcessorCreationWorkflow creationWorkflow; private final PythonProcessorDetails processorDetails; private volatile PythonProcessorAdapter adapter; @@ -43,6 +43,9 @@ public class StandardPythonProcessorBridge implements PythonProcessorBridge { private volatile long lastModified; private volatile LoadState loadState = LoadState.DOWNLOADING_DEPENDENCIES; private volatile PythonProcessorInitializationContext initializationContext; + private volatile String identifier; + private volatile PythonController controller; + private volatile CompletableFuture initializationFuture; private StandardPythonProcessorBridge(final Builder builder) { @@ -61,24 +64,52 @@ public class StandardPythonProcessorBridge implements PythonProcessorBridge { @Override public void initialize(final PythonProcessorInitializationContext context) { - this.initializationContext = context; + if (initializationFuture != null) { + initializationFuture.cancel(true); + } - final String threadName = "Initialize Python Processor %s (%s)".formatted(initializationContext.getIdentifier(), getProcessorType()); - Thread.ofVirtual().name(threadName).start(() -> initializePythonSide(true)); + this.initializationContext = context; + this.identifier = context.getIdentifier(); + + final CompletableFuture future = new CompletableFuture<>(); + this.initializationFuture = future; + final String threadName = "Initialize Python Processor %s (%s)".formatted(identifier, getProcessorType()); + Thread.ofVirtual().name(threadName).start(() -> initializePythonSide(true, future)); + } + + @Override + public void replaceController(final PythonController controller) { + if (initializationFuture != null) { + initializationFuture.cancel(true); + } + + this.controller = controller; + this.adapter = null; + + final CompletableFuture future = new CompletableFuture<>(); + this.initializationFuture = future; + + final String threadName = "Re-Initialize Python Processor %s (%s)".formatted(identifier, getProcessorType()); + Thread.ofVirtual().name(threadName).start(() -> initializePythonSide(true, future)); } public LoadState getLoadState() { return loadState; } - private void initializePythonSide(final boolean continualRetry) { + private void initializePythonSide(final boolean continualRetry, final CompletableFuture future) { + if (initializationContext == null) { + future.complete(null); + return; + } + long sleepMillis = 1_000L; - while (true) { + while (!future.isCancelled()) { loadState = LoadState.DOWNLOADING_DEPENDENCIES; try { creationWorkflow.downloadDependencies(); - logger.info("Successfully downloaded dependencies for Python Processor {} ({})", initializationContext.getIdentifier(), getProcessorType()); + logger.info("Successfully downloaded dependencies for Python Processor {} ({})", identifier, getProcessorType()); break; } catch (final Exception e) { loadState = LoadState.DEPENDENCY_DOWNLOAD_FAILED; @@ -87,7 +118,7 @@ public class StandardPythonProcessorBridge implements PythonProcessorBridge { } sleepMillis = Math.min(sleepMillis * 2, TimeUnit.MINUTES.toMillis(10)); - logger.error("Failed to download dependencies for Python Processor {} ({}). Will try again in {} millis", initializationContext.getIdentifier(), getProcessorType(), sleepMillis); + logger.error("Failed to download dependencies for Python Processor {} ({}). Will try again in {} millis", identifier, getProcessorType(), sleepMillis, e); try { Thread.sleep(sleepMillis); @@ -99,7 +130,7 @@ public class StandardPythonProcessorBridge implements PythonProcessorBridge { } } - while (true) { + while (!future.isCancelled()) { loadState = LoadState.LOADING_PROCESSOR_CODE; try { @@ -107,7 +138,7 @@ public class StandardPythonProcessorBridge implements PythonProcessorBridge { pythonProcessorAdapter.initialize(initializationContext); this.adapter = pythonProcessorAdapter; loadState = LoadState.FINISHED_LOADING; - logger.info("Successfully loaded Python Processor {} ({})", initializationContext.getIdentifier(), getProcessorType()); + logger.info("Successfully loaded Python Processor {} ({})", identifier, getProcessorType()); break; } catch (final Exception e) { loadState = LoadState.LOADING_PROCESSOR_CODE_FAILED; @@ -117,7 +148,7 @@ public class StandardPythonProcessorBridge implements PythonProcessorBridge { } sleepMillis = Math.min(sleepMillis * 2, TimeUnit.MINUTES.toMillis(10)); - logger.error("Failed to load code for Python Processor {} ({}). Will try again in {} millis", initializationContext.getIdentifier(), getProcessorType(), sleepMillis, e); + logger.error("Failed to load code for Python Processor {} ({}). Will try again in {} millis", identifier, getProcessorType(), sleepMillis, e); try { Thread.sleep(sleepMillis); @@ -128,6 +159,8 @@ public class StandardPythonProcessorBridge implements PythonProcessorBridge { } } } + + future.complete(null); } @Override @@ -143,7 +176,7 @@ public class StandardPythonProcessorBridge implements PythonProcessorBridge { } controller.reloadProcessor(getProcessorType(), processorDetails.getProcessorVersion(), workingDir.getAbsolutePath()); - initializePythonSide(false); + initializePythonSide(false, new CompletableFuture<>()); lastModified = moduleFile.lastModified(); return true; diff --git a/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/py4j/server/NiFiGatewayConnection.java b/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/py4j/server/NiFiGatewayConnection.java index 434715e094..7254663f07 100644 --- a/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/py4j/server/NiFiGatewayConnection.java +++ b/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/py4j/server/NiFiGatewayConnection.java @@ -70,6 +70,7 @@ public class NiFiGatewayConnection extends GatewayConnection { private final NiFiGatewayServer gatewayServer; private volatile boolean poisoned = false; + private final ClassLoader contextClassLoader; public NiFiGatewayConnection(final NiFiGatewayServer gatewayServer, final Gateway gateway, @@ -79,6 +80,7 @@ public class NiFiGatewayConnection extends GatewayConnection { final List listeners) throws IOException { super(gateway, socket, authToken, customCommands, listeners); this.gatewayServer = gatewayServer; + this.contextClassLoader = getClass().getClassLoader(); } private boolean isContinue() { @@ -87,12 +89,21 @@ public class NiFiGatewayConnection extends GatewayConnection { @Override public void run() { - Thread.currentThread().setName(String.format("NiFiGatewayConnection Thread for %s %s", gatewayServer.getComponentType(), gatewayServer.getComponentId())); - while (isContinue()) { - super.run(); - } + final ClassLoader originalClassLoader = Thread.currentThread().getContextClassLoader(); + try { + Thread.currentThread().setContextClassLoader(this.contextClassLoader); - shutdown(false); + Thread.currentThread().setName(String.format("NiFiGatewayConnection Thread for %s %s", gatewayServer.getComponentType(), gatewayServer.getComponentId())); + while (isContinue()) { + super.run(); + } + + shutdown(false); + } finally { + if (originalClassLoader != null) { + Thread.currentThread().setContextClassLoader(originalClassLoader); + } + } } protected void quietSendFatalError(final BufferedWriter writer, final Throwable exception) { diff --git a/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/py4j/server/NiFiGatewayServer.java b/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/py4j/server/NiFiGatewayServer.java index 1869cb4bc0..ae24d499e0 100644 --- a/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/py4j/server/NiFiGatewayServer.java +++ b/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/py4j/server/NiFiGatewayServer.java @@ -41,6 +41,7 @@ public class NiFiGatewayServer extends GatewayServer { private final String componentType; private final String componentId; private final String authToken; + private final ClassLoader contextClassLoader; public NiFiGatewayServer(final Gateway gateway, final int port, @@ -56,6 +57,7 @@ public class NiFiGatewayServer extends GatewayServer { this.componentType = componentType; this.componentId = componentId; this.authToken = authToken; + this.contextClassLoader = getClass().getClassLoader(); } protected Py4JServerConnection createConnection(final Gateway gateway, final Socket socket) throws IOException { @@ -84,11 +86,19 @@ public class NiFiGatewayServer extends GatewayServer { @Override public void run() { - if (startProcessForked) { - Thread.currentThread().setName("NiFiGatewayServer Thread for " + componentType + " " + componentId); - } + final ClassLoader originalClassLoader = Thread.currentThread().getContextClassLoader(); + try { + Thread.currentThread().setContextClassLoader(this.contextClassLoader); + if (startProcessForked) { + Thread.currentThread().setName("NiFiGatewayServer Thread for " + componentType + " " + componentId); + } - super.run(); + super.run(); + } finally { + if (originalClassLoader != null) { + Thread.currentThread().setContextClassLoader(originalClassLoader); + } + } } @Override diff --git a/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/python/processor/FlowFileTransformProxy.java b/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/python/processor/FlowFileTransformProxy.java index 2c692cf5e5..073158f59b 100644 --- a/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/python/processor/FlowFileTransformProxy.java +++ b/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/python/processor/FlowFileTransformProxy.java @@ -19,7 +19,6 @@ package org.apache.nifi.python.processor; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; -import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; @@ -28,29 +27,16 @@ import org.apache.nifi.processor.exception.ProcessException; import py4j.Py4JNetworkException; import java.util.Map; -import java.util.Optional; import java.util.function.Supplier; @InputRequirement(Requirement.INPUT_REQUIRED) -public class FlowFileTransformProxy extends PythonProcessorProxy { +public class FlowFileTransformProxy extends PythonProcessorProxy { - private volatile FlowFileTransform transform; public FlowFileTransformProxy(final String processorType, final Supplier bridgeFactory, final boolean initialize) { super(processorType, bridgeFactory, initialize); } - @OnScheduled - public void setContext(final ProcessContext context) { - final PythonProcessorBridge bridge = getBridge().orElseThrow(() -> new IllegalStateException(this + " is not finished initializing")); - final Optional optionalAdapter = bridge.getProcessorAdapter(); - if (optionalAdapter.isEmpty()) { - throw new IllegalStateException(this + " is not finished initializing"); - } - - this.transform = (FlowFileTransform) optionalAdapter.get().getProcessor(); - this.transform.setContext(context); - } @Override public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { @@ -63,7 +49,7 @@ public class FlowFileTransformProxy extends PythonProcessorProxy { final FlowFileTransformResult result; try (final StandardInputFlowFile inputFlowFile = new StandardInputFlowFile(session, original)) { - result = transform.transformFlowFile(inputFlowFile); + result = getTransform().transformFlowFile(inputFlowFile); } catch (final Py4JNetworkException e) { throw new ProcessException("Failed to communicate with Python Process", e); } catch (final Exception e) { diff --git a/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/python/processor/PythonProcessorProxy.java b/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/python/processor/PythonProcessorProxy.java index a20ad583c2..a370d3d1bb 100644 --- a/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/python/processor/PythonProcessorProxy.java +++ b/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/python/processor/PythonProcessorProxy.java @@ -44,7 +44,7 @@ import java.util.function.Supplier; @SupportsBatching(defaultDuration = DefaultRunDuration.TWENTY_FIVE_MILLIS) @SupportsSensitiveDynamicProperties -public abstract class PythonProcessorProxy extends AbstractProcessor implements AsyncLoadedProcessor { +public abstract class PythonProcessorProxy extends AbstractProcessor implements AsyncLoadedProcessor { private final String processorType; private volatile PythonProcessorInitializationContext initContext; private volatile PythonProcessorBridge bridge; @@ -53,6 +53,10 @@ public abstract class PythonProcessorProxy extends AbstractProcessor implements private volatile Map cachedDynamicDescriptors = null; private volatile Boolean supportsDynamicProperties; + private volatile T currentTransform; + private volatile ProcessContext currentProcessContext; + + protected static final Relationship REL_ORIGINAL = new Relationship.Builder() .name("original") .description("The original FlowFile will be routed to this relationship when it has been successfully transformed") @@ -107,6 +111,28 @@ public abstract class PythonProcessorProxy extends AbstractProcessor implements } } + + @OnScheduled + public void setContext(final ProcessContext context) { + this.currentProcessContext = context; + } + + protected T getTransform() { + final PythonProcessorBridge bridge = getBridge().orElseThrow(() -> new IllegalStateException(this + " is not finished initializing")); + final Optional optionalAdapter = bridge.getProcessorAdapter(); + if (optionalAdapter.isEmpty()) { + throw new IllegalStateException(this + " is not finished initializing"); + } + + final T transform = (T) optionalAdapter.get().getProcessor(); + if (transform != currentTransform) { + transform.setContext(currentProcessContext); + currentTransform = transform; + } + + return transform; + } + protected Optional getBridge() { return Optional.ofNullable(this.bridge); } diff --git a/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/python/processor/RecordTransformProxy.java b/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/python/processor/RecordTransformProxy.java index 1b95ae3326..00389f5b61 100644 --- a/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/python/processor/RecordTransformProxy.java +++ b/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/python/processor/RecordTransformProxy.java @@ -20,7 +20,6 @@ package org.apache.nifi.python.processor; import org.apache.nifi.NullSuppression; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; -import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.json.JsonRecordSource; @@ -55,12 +54,10 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.Optional; import java.util.function.Supplier; @InputRequirement(Requirement.INPUT_REQUIRED) -public class RecordTransformProxy extends PythonProcessorProxy { - private volatile RecordTransform transform; +public class RecordTransformProxy extends PythonProcessorProxy { static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder() .name("Record Reader") @@ -91,18 +88,6 @@ public class RecordTransformProxy extends PythonProcessorProxy { return properties; } - @OnScheduled - public void setProcessContext(final ProcessContext context) { - final PythonProcessorBridge bridge = getBridge().orElseThrow(() -> new IllegalStateException(this + " is not finished initializing")); - - final Optional adapterOptional = bridge.getProcessorAdapter(); - if (adapterOptional.isEmpty()) { - throw new IllegalStateException(this + " is not finished initializing"); - } - - this.transform = (RecordTransform) adapterOptional.get().getProcessor(); - this.transform.setContext(context); - } @Override public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { @@ -120,6 +105,7 @@ public class RecordTransformProxy extends PythonProcessorProxy { long recordsRead = 0L; long recordsWritten = 0L; + final RecordTransform transform = getTransform(); Map> flowFilesPerRelationship; try (final InputStream in = session.read(flowFile); final RecordReader reader = readerFactory.createRecordReader(flowFile, in, getLogger())) { diff --git a/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-integration-tests/src/test/java/org.apache.nifi.py4j/PythonControllerInteractionIT.java b/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-integration-tests/src/test/java/org.apache.nifi.py4j/PythonControllerInteractionIT.java index 2f03504119..c41d459a17 100644 --- a/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-integration-tests/src/test/java/org.apache.nifi.py4j/PythonControllerInteractionIT.java +++ b/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-integration-tests/src/test/java/org.apache.nifi.py4j/PythonControllerInteractionIT.java @@ -23,16 +23,12 @@ import org.apache.nifi.controller.AbstractControllerService; import org.apache.nifi.controller.ControllerService; import org.apache.nifi.json.JsonRecordSetWriter; import org.apache.nifi.json.JsonTreeReader; -import org.apache.nifi.mock.MockProcessorInitializationContext; -import org.apache.nifi.processor.Processor; -import org.apache.nifi.processor.ProcessorInitializationContext; import org.apache.nifi.processor.Relationship; import org.apache.nifi.python.ControllerServiceTypeLookup; import org.apache.nifi.python.PythonBridge; import org.apache.nifi.python.PythonBridgeInitializationContext; import org.apache.nifi.python.PythonProcessConfig; import org.apache.nifi.python.PythonProcessorDetails; -import org.apache.nifi.python.processor.FlowFileTransformProxy; import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.serialization.SimpleRecordSchema; import org.apache.nifi.serialization.record.RecordField; @@ -46,6 +42,8 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; +import org.junit.jupiter.api.condition.DisabledOnOs; import java.io.File; import java.io.FileOutputStream; @@ -55,7 +53,6 @@ import java.nio.file.Files; import java.nio.file.Paths; import java.time.Duration; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -67,6 +64,7 @@ import java.util.concurrent.TimeUnit; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; public class PythonControllerInteractionIT { @@ -170,8 +168,7 @@ public class PythonControllerInteractionIT { // Create a PrettyPrintJson Processor final byte[] jsonContent = Files.readAllBytes(Paths.get("src/test/resources/json/input/simple-person.json")); for (int i=0; i < 3; i++) { - final FlowFileTransformProxy wrapper = createFlowFileTransform(PRETTY_PRINT_JSON); - final TestRunner runner = TestRunners.newTestRunner(wrapper); + final TestRunner runner = createFlowFileTransform(PRETTY_PRINT_JSON); runner.enqueue(jsonContent); runner.run(); @@ -184,8 +181,7 @@ public class PythonControllerInteractionIT { @Disabled("Just for manual testing...") public void runPrettyPrintJsonManyThreads() throws IOException { // Create a PrettyPrintJson Processor - final FlowFileTransformProxy wrapper = createFlowFileTransform(PRETTY_PRINT_JSON); - final TestRunner runner = TestRunners.newTestRunner(wrapper); + final TestRunner runner = createFlowFileTransform(PRETTY_PRINT_JSON); final int flowFileCount = 100_000; final int threadCount = 12; @@ -204,8 +200,7 @@ public class PythonControllerInteractionIT { @Test public void testSimplePrettyPrint() throws IOException { // Setup - final FlowFileTransformProxy wrapper = createFlowFileTransform(PRETTY_PRINT_JSON); - final TestRunner runner = TestRunners.newTestRunner(wrapper); + final TestRunner runner = createFlowFileTransform(PRETTY_PRINT_JSON); runner.enqueue(Paths.get("src/test/resources/json/input/simple-person.json")); runner.setProperty("Indentation", "2"); @@ -223,8 +218,7 @@ public class PythonControllerInteractionIT { @Test public void testValidator() { - final FlowFileTransformProxy wrapper = createFlowFileTransform(PRETTY_PRINT_JSON); - final TestRunner runner = TestRunners.newTestRunner(wrapper); + final TestRunner runner = createFlowFileTransform(PRETTY_PRINT_JSON); runner.setProperty("Indentation", "-1"); runner.assertNotValid(); @@ -245,8 +239,7 @@ public class PythonControllerInteractionIT { @Test public void testCsvToExcel() { // Create a PrettyPrintJson Processor - final FlowFileTransformProxy wrapper = createFlowFileTransform("ConvertCsvToExcel"); - final TestRunner runner = TestRunners.newTestRunner(wrapper); + final TestRunner runner = createFlowFileTransform("ConvertCsvToExcel"); runner.enqueue("name, number\nJohn Doe, 500"); // Trigger the processor @@ -259,8 +252,7 @@ public class PythonControllerInteractionIT { @Test public void testExpressionLanguageWithAttributes() { // Setup - final FlowFileTransformProxy wrapper = createFlowFileTransform("WritePropertyToFlowFile"); - final TestRunner runner = TestRunners.newTestRunner(wrapper); + final TestRunner runner = createFlowFileTransform("WritePropertyToFlowFile"); runner.setProperty("Message", "Hola Mundo"); runner.enqueue("Hello World"); @@ -274,8 +266,7 @@ public class PythonControllerInteractionIT { @Test public void testPythonPackage() { // Create a WriteNumber Processor - final FlowFileTransformProxy wrapper = createFlowFileTransform("WriteNumber"); - final TestRunner runner = TestRunners.newTestRunner(wrapper); + final TestRunner runner = createFlowFileTransform("WriteNumber"); runner.enqueue(""); // Trigger the processor @@ -289,12 +280,8 @@ public class PythonControllerInteractionIT { assertTrue(resultNum <= 1000); } - private FlowFileTransformProxy createFlowFileTransform(final String type) { - final Processor processor = createProcessor(type); - assertNotNull(processor); - - processor.initialize(new MockProcessorInitializationContext()); - return (FlowFileTransformProxy) processor; + private TestRunner createFlowFileTransform(final String type) { + return createProcessor(type); } @Test @@ -312,8 +299,7 @@ public class PythonControllerInteractionIT { assertEquals("numpy==1.25.0", dependencies.get(0)); // Setup - final FlowFileTransformProxy wrapper = createFlowFileTransform("WriteNumpyVersion"); - final TestRunner runner = TestRunners.newTestRunner(wrapper); + final TestRunner runner = createFlowFileTransform("WriteNumpyVersion"); runner.enqueue("Hello World"); // Trigger the processor @@ -328,8 +314,7 @@ public class PythonControllerInteractionIT { public void testControllerService() throws InitializationException { // Setup controllerServiceMap.put("StringLookupService", TestLookupService.class); - final FlowFileTransformProxy wrapper = createFlowFileTransform("LookupAddress"); - final TestRunner runner = TestRunners.newTestRunner(wrapper); + final TestRunner runner = createFlowFileTransform("LookupAddress"); final StringLookupService lookupService = new TestLookupService((Collections.singletonMap("John Doe", "123 My Street"))); runner.addControllerService("lookup", lookupService); runner.enableControllerService(lookupService); @@ -356,8 +341,7 @@ public class PythonControllerInteractionIT { replaceFileText(sourceFile, replacement, originalMessage); // Setup - final FlowFileTransformProxy wrapper = createFlowFileTransform("WriteMessage"); - final TestRunner runner = TestRunners.newTestRunner(wrapper); + final TestRunner runner = createFlowFileTransform("WriteMessage"); runner.enqueue(""); // Trigger the processor @@ -424,8 +408,7 @@ public class PythonControllerInteractionIT { assertEquals(1, v2Count); // Create a WriteMessage Processor, version 0.0.1-SNAPSHOT - final FlowFileTransformProxy wrapperV1 = createFlowFileTransform("WriteMessage"); - final TestRunner runnerV1 = TestRunners.newTestRunner(wrapperV1); + final TestRunner runnerV1 = createFlowFileTransform("WriteMessage"); runnerV1.enqueue(""); // Trigger the processor @@ -436,8 +419,7 @@ public class PythonControllerInteractionIT { runnerV1.getFlowFilesForRelationship("success").get(0).assertContentEquals("Hello, World"); // Create an instance of WriteMessage V2 - final Processor procV2 = createProcessor("WriteMessage", "0.0.2-SNAPSHOT"); - final TestRunner runnerV2 = TestRunners.newTestRunner(procV2); + final TestRunner runnerV2 = createProcessor("WriteMessage", "0.0.2-SNAPSHOT"); runnerV2.enqueue(""); // Trigger the processor @@ -486,13 +468,11 @@ public class PythonControllerInteractionIT { private TestRunner createRecordTransformRunner(final String type) throws InitializationException { - final Processor processor = createProcessor("SetRecordField"); - assertNotNull(processor); + final TestRunner runner = createProcessor("SetRecordField"); final JsonTreeReader reader = new JsonTreeReader(); final JsonRecordSetWriter writer = new JsonRecordSetWriter(); - final TestRunner runner = TestRunners.newTestRunner(processor); runner.addControllerService("reader", reader); runner.addControllerService("writer", writer); runner.enableControllerService(reader); @@ -525,8 +505,7 @@ public class PythonControllerInteractionIT { @Test public void testCustomRelationships() { - final FlowFileTransformProxy processor = createFlowFileTransform("RouteFlowFile"); - final TestRunner runner = TestRunners.newTestRunner(processor); + final TestRunner runner = createFlowFileTransform("RouteFlowFile"); final Set relationships = runner.getProcessor().getRelationships(); assertEquals(4, relationships.size()); @@ -545,9 +524,41 @@ public class PythonControllerInteractionIT { runner.assertTransferCount("failure", 0); } + @Test + @Timeout(45) + @DisabledOnOs(org.junit.jupiter.api.condition.OS.WINDOWS) // Cannot run on windows because ExitAfterFourInvocations uses `kill -9` command + public void testProcessRestarted() { + final TestRunner runner = createFlowFileTransform("ExitAfterFourInvocations"); - private RecordSchema createSimpleRecordSchema(final String... fieldNames) { - return createSimpleRecordSchema(Arrays.asList(fieldNames)); + for (int i=0; i < 10; i++) { + runner.enqueue(Integer.toString(i)); + } + + runner.run(4); + assertThrows(Throwable.class, runner::run); + + // Run 2 additional times. Because the Python Process will have to be restarted, it may take a bit, + // so we keep trying until we succeed, relying on the 15 second timeout for the test to fail us if + // the Process doesn't get restarted in time. + for (int i=0; i < 2; i++) { + while(true) { + try { + runner.run(1, false, i == 0); + break; + } catch (final Throwable t) { + try { + Thread.sleep(1000L); + } catch (final InterruptedException e) { + throw new RuntimeException(e); + } + } + } + } + + // Trigger stop on finish + runner.run(1, true, false); + + runner.assertTransferCount("success", 7); } private RecordSchema createSimpleRecordSchema(final List fieldNames) { @@ -583,16 +594,15 @@ public class PythonControllerInteractionIT { return UUID.randomUUID().toString(); } - private Processor createProcessor(final String type) { + private TestRunner createProcessor(final String type) { return createProcessor(type, VERSION); } - private Processor createProcessor(final String type, final String version) { + private TestRunner createProcessor(final String type, final String version) { bridge.discoverExtensions(); final AsyncLoadedProcessor processor = bridge.createProcessor(createId(), type, version, true, true); - final ProcessorInitializationContext initContext = new MockProcessorInitializationContext(); - processor.initialize(initContext); + final TestRunner runner = TestRunners.newTestRunner(processor); final long maxInitTime = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(30L); while (true) { @@ -615,8 +625,8 @@ public class PythonControllerInteractionIT { throw new RuntimeException("Interrupted while initializing processor of type %s version %s".formatted(type, version)); } } - processor.initialize(new MockProcessorInitializationContext()); - return processor; + + return runner; } } diff --git a/nifi-nar-bundles/nifi-py4j-bundle/nifi-python-framework-api/src/main/java/org/apache/nifi/python/processor/PythonProcessorBridge.java b/nifi-nar-bundles/nifi-py4j-bundle/nifi-python-framework-api/src/main/java/org/apache/nifi/python/processor/PythonProcessorBridge.java index 0a5b1af2c9..fbf891e8e0 100644 --- a/nifi-nar-bundles/nifi-py4j-bundle/nifi-python-framework-api/src/main/java/org/apache/nifi/python/processor/PythonProcessorBridge.java +++ b/nifi-nar-bundles/nifi-py4j-bundle/nifi-python-framework-api/src/main/java/org/apache/nifi/python/processor/PythonProcessorBridge.java @@ -18,6 +18,7 @@ package org.apache.nifi.python.processor; import org.apache.nifi.components.AsyncLoadedProcessor.LoadState; +import org.apache.nifi.python.PythonController; import java.util.Optional; @@ -32,6 +33,8 @@ public interface PythonProcessorBridge { */ Optional getProcessorAdapter(); + void replaceController(PythonController controller); + /** * @return the name of the Processor implementation. This will not contain a 'python.' prefix. */ diff --git a/nifi-nar-bundles/nifi-py4j-bundle/nifi-python-test-extensions/src/main/resources/extensions/ExitAfterFourInvocations.py b/nifi-nar-bundles/nifi-py4j-bundle/nifi-python-test-extensions/src/main/resources/extensions/ExitAfterFourInvocations.py new file mode 100644 index 0000000000..4e24c5de01 --- /dev/null +++ b/nifi-nar-bundles/nifi-py4j-bundle/nifi-python-test-extensions/src/main/resources/extensions/ExitAfterFourInvocations.py @@ -0,0 +1,44 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from nifiapi.flowfiletransform import FlowFileTransform, FlowFileTransformResult +import os +import subprocess + +class ExitAfterFourInvocations(FlowFileTransform): + class Java: + implements = ['org.apache.nifi.python.processor.FlowFileTransform'] + class ProcessorDetails: + version = '0.0.1-SNAPSHOT' + description = 'Transfers the first 4 FlowFiles to success, then kills the process on subsequent invocations by invoking system command kill -9 ' + dependencies = ['pandas'] # Just to have some dependency that must be downloaded to test functionality on restart + + invocations = 0 + + def __init__(self, **kwargs): + pass + + def transform(self, context, flowFile): + self.invocations += 1 + if self.invocations > 4: + # Issue a `kill -9` to the current process + pid = os.getpid() + subprocess.run(['kill', '-9', str(pid)]) + + return FlowFileTransformResult(relationship = "success") + + + def getPropertyDescriptors(self): + return [] \ No newline at end of file