NIFI-12659 Respawn Python Processes on Unexpected Termination

Refactored so that when a Python Process dies, NiFi will detect this, respawn the process, recreate the Processors that exist in the process, re-initialize them, and restart them. In testing, found the PythonControllerInteractionIT had bugs that were causing Python Processors to be re-initialized many times; this resulted in threading issues that caused processors to be invalid, indicating that property descriptors didn't exist, etc. Addressed these concerns in the same commit, since they were necessary to properly run tests

Ensure that ClassLoader is consistently established for python processor proxies; ensure that if we re-initialize python processor the old initialization thread is stopped

This closes #8363

Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
Mark Payne 2024-02-05 16:56:52 -05:00 committed by exceptionfactory
parent 4b4ca30fc0
commit bd11031725
No known key found for this signature in database
13 changed files with 482 additions and 140 deletions

View File

@ -406,7 +406,7 @@ public class MockProcessContext extends MockControllerServiceLookup implements P
for (final ValidationResult result : validate()) {
if (!result.isValid()) {
sb.append(result.toString()).append("\n");
sb.append(result).append("\n");
failureCount++;
}
}

View File

@ -0,0 +1,133 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.py4j;
import org.apache.nifi.python.PythonProcessorDetails;
import org.apache.nifi.python.processor.documentation.MultiProcessorUseCaseDetails;
import org.apache.nifi.python.processor.documentation.PropertyDescription;
import org.apache.nifi.python.processor.documentation.UseCaseDetails;
import java.util.List;
/**
* A wrapper around a PythonProcessorDetails that caches the results of the delegate's methods.
* Making calls to the Python side is relatively expensive, and we make many calls to {@link #getProcessorType()},
* {@link #getProcessorVersion()}, etc. This simple wrapper allows us to make the calls only once.
*/
public class CachingPythonProcessorDetails implements PythonProcessorDetails {
private final PythonProcessorDetails delegate;
private volatile String processorType;
private volatile String processorVersion;
private volatile String capabilityDescription;
private volatile String sourceLocation;
private volatile List<String> tags;
private volatile List<String> dependencies;
private volatile String processorInterface;
private volatile List<UseCaseDetails> useCases;
private volatile List<MultiProcessorUseCaseDetails> multiProcessorUseCases;
private volatile List<PropertyDescription> propertyDescriptions;
public CachingPythonProcessorDetails(final PythonProcessorDetails delegate) {
this.delegate = delegate;
}
@Override
public String getProcessorType() {
if (processorType == null) {
processorType = delegate.getProcessorType();
}
return processorType;
}
@Override
public String getProcessorVersion() {
if (processorVersion == null) {
processorVersion = delegate.getProcessorVersion();
}
return processorVersion;
}
@Override
public String getSourceLocation() {
if (sourceLocation == null) {
sourceLocation = delegate.getSourceLocation();
}
return sourceLocation;
}
@Override
public List<String> getTags() {
if (tags == null) {
tags = delegate.getTags();
}
return tags;
}
@Override
public List<String> getDependencies() {
if (dependencies == null) {
dependencies = delegate.getDependencies();
}
return dependencies;
}
@Override
public String getCapabilityDescription() {
if (capabilityDescription == null) {
capabilityDescription = delegate.getCapabilityDescription();
}
return capabilityDescription;
}
@Override
public String getInterface() {
if (processorInterface == null) {
processorInterface = delegate.getInterface();
}
return processorInterface;
}
@Override
public List<UseCaseDetails> getUseCases() {
if (useCases == null) {
useCases = delegate.getUseCases();
}
return useCases;
}
@Override
public List<MultiProcessorUseCaseDetails> getMultiProcessorUseCases() {
if (multiProcessorUseCases == null) {
multiProcessorUseCases = delegate.getMultiProcessorUseCases();
}
return multiProcessorUseCases;
}
@Override
public List<PropertyDescription> getPropertyDescriptions() {
if (propertyDescriptions == null) {
propertyDescriptions = delegate.getPropertyDescriptions();
}
return propertyDescriptions;
}
@Override
public void free() {
}
}

View File

@ -24,6 +24,9 @@ import org.apache.nifi.py4j.server.NiFiGatewayServer;
import org.apache.nifi.python.ControllerServiceTypeLookup;
import org.apache.nifi.python.PythonController;
import org.apache.nifi.python.PythonProcessConfig;
import org.apache.nifi.python.PythonProcessorDetails;
import org.apache.nifi.python.processor.PythonProcessorAdapter;
import org.apache.nifi.python.processor.PythonProcessorBridge;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import py4j.CallbackClient;
@ -40,7 +43,9 @@ import java.util.Base64;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.TimeUnit;
public class PythonProcess {
@ -57,6 +62,10 @@ public class PythonProcess {
private Process process;
private NiFiPythonGateway gateway;
private final Map<String, Boolean> processorPrefersIsolation = new ConcurrentHashMap<>();
private final Set<CreatedProcessor> createdProcessors = new CopyOnWriteArraySet<>();
private volatile boolean shutdown = false;
private volatile List<String> extensionDirs;
private volatile String workDir;
public PythonProcess(final PythonProcessConfig processConfig, final ControllerServiceTypeLookup controllerServiceTypeLookup, final File virtualEnvHome,
@ -68,11 +77,17 @@ public class PythonProcess {
this.componentId = componentId;
}
public PythonController getController() {
/**
* Returns the current Controller for the Python Process. Note that the Controller
* may change if the Python Process dies and is restarted. As a result, the value should never be
* cached and reused later.
* @return the current Controller for the Python Process
*/
PythonController getCurrentController() {
return controller;
}
public void start() throws IOException {
public synchronized void start() throws IOException {
final ServerSocketFactory serverSocketFactory = ServerSocketFactory.getDefault();
final SocketFactory socketFactory = SocketFactory.getDefault();
@ -101,6 +116,7 @@ public class PythonProcess {
setupEnvironment();
this.process = launchPythonProcess(listeningPort, authToken);
this.process.onExit().thenAccept(this::handlePythonProcessDied);
final StandardPythonClient pythonClient = new StandardPythonClient(gateway);
controller = pythonClient.getController();
@ -112,6 +128,7 @@ public class PythonProcess {
try {
final String pingResponse = controller.ping();
pingSuccessful = "pong".equals(pingResponse);
if (pingSuccessful) {
break;
} else {
@ -138,6 +155,52 @@ public class PythonProcess {
logger.info("Successfully started and pinged Python Server. Python Process = {}", process);
}
private void handlePythonProcessDied(final Process process) {
if (isShutdown()) {
// If shutdown, don't try to restart the Process
logger.info("Python Process {} exited with code {}", process, process.exitValue());
return;
}
final List<String> processorsInvolved = this.createdProcessors.stream()
.map(coordinates -> "%s (%s)".formatted(coordinates.identifier(), coordinates.type()))
.toList();
logger.error("Python Process {} with Processors {} died unexpectedly with exit code {}. Restarting...", process, processorsInvolved, process.exitValue());
long backoff = 1000L;
while (!isShutdown()) {
try {
// Ensure that we clean up any resources
killProcess();
// Restart the Process and establish new communications
start();
// Ensure that we re-discover any extensions, as this is necessary in order to create Processors
if (extensionDirs != null && workDir != null) {
discoverExtensions(extensionDirs, workDir);
recreateProcessors();
}
return;
} catch (final Exception e) {
// If we fail to restart the Python Process, we'll keep trying, as long as the Process isn't intentionally shutdown.
logger.error("Failed to restart Python Process with Processors {}; will keep trying", processorsInvolved, e);
try {
// Sleep to avoid constantly hitting resources that are potentially already constrained
Thread.sleep(backoff);
// Exponentially backoff, but cap at 60 seconds
backoff = Math.min(60000L, backoff * 2);
} catch (final InterruptedException ex) {
Thread.currentThread().interrupt();
throw new RuntimeException(ex);
}
}
}
}
private String generateAuthToken() {
final SecureRandom random = new SecureRandom();
final byte[] bytes = new byte[20];
@ -255,15 +318,25 @@ public class PythonProcess {
}
}
public void shutdown() {
logger.info("Shutting down Python Process {}", process);
public boolean isShutdown() {
return shutdown;
}
public void shutdown() {
shutdown = true;
logger.info("Shutting down Python Process {}", process);
killProcess();
}
private synchronized void killProcess() {
if (server != null) {
try {
server.shutdown();
} catch (final Exception e) {
logger.error("Failed to cleanly shutdown Py4J server", e);
}
server = null;
}
if (gateway != null) {
@ -272,6 +345,8 @@ public class PythonProcess {
} catch (final Exception e) {
logger.error("Failed to cleanly shutdown Py4J Gateway", e);
}
gateway = null;
}
if (process != null) {
@ -280,13 +355,61 @@ public class PythonProcess {
} catch (final Exception e) {
logger.error("Failed to cleanly shutdown Py4J process", e);
}
process = null;
}
}
void addProcessor(final String identifier, final boolean prefersIsolation) {
processorPrefersIsolation.put(identifier, prefersIsolation);
public void discoverExtensions(final List<String> directories, final String workDirectory) {
extensionDirs = new ArrayList<>(directories);
workDir = workDirectory;
controller.discoverExtensions(directories, workDirectory);
}
public PythonProcessorBridge createProcessor(final String identifier, final String type, final String version, final String workDirPath) {
final ProcessorCreationWorkflow creationWorkflow = new ProcessorCreationWorkflow() {
@Override
public void downloadDependencies() {
controller.downloadDependencies(type, version, workDirPath);
}
@Override
public PythonProcessorAdapter createProcessor() {
return controller.createProcessor(type, version, workDirPath);
}
};
// Create a PythonProcessorDetails and then call getProcessorType and getProcessorVersion to ensure that the details are cached
final PythonProcessorDetails processorDetails = new CachingPythonProcessorDetails(controller.getProcessorDetails(type, version));
processorDetails.getProcessorType();
processorDetails.getProcessorVersion();
final PythonProcessorBridge processorBridge = new StandardPythonProcessorBridge.Builder()
.controller(controller)
.creationWorkflow(creationWorkflow)
.processorDetails(processorDetails)
.workingDirectory(processConfig.getPythonWorkingDirectory())
.moduleFile(new File(controller.getModuleFile(type, version)))
.build();
final CreatedProcessor createdProcessor = new CreatedProcessor(identifier, type, processorBridge);
createdProcessors.add(createdProcessor);
return processorBridge;
}
/**
* Updates all Processor Bridges to use the new Controller. This will cause the Processor Bridges to re-initialize
* themselves and recreate the Python Processors that they interact with.
*/
private void recreateProcessors() {
for (final CreatedProcessor createdProcessor : createdProcessors) {
createdProcessor.processorBridge().replaceController(controller);
logger.info("Recreated Processor {} ({}) in Python Process {}", createdProcessor.identifier(), createdProcessor.type(), process);
}
}
public boolean containsIsolatedProcessor() {
return processorPrefersIsolation.containsValue(Boolean.TRUE);
}
@ -303,4 +426,6 @@ public class PythonProcess {
public Map<String, Integer> getJavaObjectBindingCounts() {
return gateway.getObjectBindings().getCountsPerClass();
}
private record CreatedProcessor(String identifier, String type, PythonProcessorBridge processorBridge) {}
}

View File

@ -22,12 +22,10 @@ import org.apache.nifi.python.BoundObjectCounts;
import org.apache.nifi.python.ControllerServiceTypeLookup;
import org.apache.nifi.python.PythonBridge;
import org.apache.nifi.python.PythonBridgeInitializationContext;
import org.apache.nifi.python.PythonController;
import org.apache.nifi.python.PythonProcessConfig;
import org.apache.nifi.python.PythonProcessorDetails;
import org.apache.nifi.python.processor.FlowFileTransform;
import org.apache.nifi.python.processor.FlowFileTransformProxy;
import org.apache.nifi.python.processor.PythonProcessorAdapter;
import org.apache.nifi.python.processor.PythonProcessorBridge;
import org.apache.nifi.python.processor.RecordTransform;
import org.apache.nifi.python.processor.RecordTransformProxy;
@ -90,7 +88,7 @@ public class StandardPythonBridge implements PythonBridge {
.map(File::getAbsolutePath)
.collect(Collectors.toList());
final String workDirPath = processConfig.getPythonWorkingDirectory().getAbsolutePath();
controllerProcess.getController().discoverExtensions(extensionsDirs, workDirPath);
controllerProcess.discoverExtensions(extensionsDirs, workDirPath);
}
private PythonProcessorBridge createProcessorBridge(final String identifier, final String type, final String version, final boolean preferIsolatedProcess) {
@ -103,30 +101,7 @@ public class StandardPythonBridge implements PythonBridge {
final PythonProcess pythonProcess = getProcessForNextComponent(extensionId, identifier, preferIsolatedProcess);
final String workDirPath = processConfig.getPythonWorkingDirectory().getAbsolutePath();
final PythonController controller = pythonProcess.getController();
final ProcessorCreationWorkflow creationWorkflow = new ProcessorCreationWorkflow() {
@Override
public void downloadDependencies() {
controller.downloadDependencies(type, version, workDirPath);
}
@Override
public PythonProcessorAdapter createProcessor() {
return controller.createProcessor(type, version, workDirPath);
}
};
final PythonProcessorDetails processorDetails = controller.getProcessorDetails(type, version);
final PythonProcessorBridge processorBridge = new StandardPythonProcessorBridge.Builder()
.controller(controller)
.creationWorkflow(creationWorkflow)
.processorDetails(processorDetails)
.workingDirectory(processConfig.getPythonWorkingDirectory())
.moduleFile(new File(controller.getModuleFile(type, version)))
.build();
pythonProcess.addProcessor(identifier, preferIsolatedProcess);
final PythonProcessorBridge processorBridge = pythonProcess.createProcessor(identifier, type, version, workDirPath);
processorCountByType.merge(extensionId, 1, Integer::sum);
return processorBridge;
}
@ -239,7 +214,7 @@ public class StandardPythonBridge implements PythonBridge {
.map(File::getAbsolutePath)
.collect(Collectors.toList());
final String workDirPath = processConfig.getPythonWorkingDirectory().getAbsolutePath();
pythonProcess.getController().discoverExtensions(extensionsDirs, workDirPath);
pythonProcess.discoverExtensions(extensionsDirs, workDirPath);
// Add the newly create process to the processes for the given type of processor.
processesForType.add(pythonProcess);
@ -262,7 +237,7 @@ public class StandardPythonBridge implements PythonBridge {
@Override
public List<PythonProcessorDetails> getProcessorTypes() {
ensureStarted();
return controllerProcess.getController().getProcessorTypes();
return controllerProcess.getCurrentController().getProcessorTypes();
}
@Override
@ -321,7 +296,7 @@ public class StandardPythonBridge implements PythonBridge {
@Override
public void ping() {
controllerProcess.getController().ping();
controllerProcess.getCurrentController().ping();
}
@Override
@ -330,7 +305,7 @@ public class StandardPythonBridge implements PythonBridge {
}
private Optional<ExtensionId> findExtensionId(final String type, final String version) {
final List<PythonProcessorDetails> processorTypes = controllerProcess.getController().getProcessorTypes();
final List<PythonProcessorDetails> processorTypes = controllerProcess.getCurrentController().getProcessorTypes();
return processorTypes.stream()
.filter(details -> details.getProcessorType().equals(type))
.filter(details -> details.getProcessorVersion().equals(version))

View File

@ -27,6 +27,7 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import static org.apache.nifi.components.AsyncLoadedProcessor.LoadState;
@ -34,7 +35,6 @@ import static org.apache.nifi.components.AsyncLoadedProcessor.LoadState;
public class StandardPythonProcessorBridge implements PythonProcessorBridge {
private static final Logger logger = LoggerFactory.getLogger(StandardPythonProcessorBridge.class);
private final PythonController controller;
private final ProcessorCreationWorkflow creationWorkflow;
private final PythonProcessorDetails processorDetails;
private volatile PythonProcessorAdapter adapter;
@ -43,6 +43,9 @@ public class StandardPythonProcessorBridge implements PythonProcessorBridge {
private volatile long lastModified;
private volatile LoadState loadState = LoadState.DOWNLOADING_DEPENDENCIES;
private volatile PythonProcessorInitializationContext initializationContext;
private volatile String identifier;
private volatile PythonController controller;
private volatile CompletableFuture<Void> initializationFuture;
private StandardPythonProcessorBridge(final Builder builder) {
@ -61,24 +64,52 @@ public class StandardPythonProcessorBridge implements PythonProcessorBridge {
@Override
public void initialize(final PythonProcessorInitializationContext context) {
this.initializationContext = context;
if (initializationFuture != null) {
initializationFuture.cancel(true);
}
final String threadName = "Initialize Python Processor %s (%s)".formatted(initializationContext.getIdentifier(), getProcessorType());
Thread.ofVirtual().name(threadName).start(() -> initializePythonSide(true));
this.initializationContext = context;
this.identifier = context.getIdentifier();
final CompletableFuture<Void> future = new CompletableFuture<>();
this.initializationFuture = future;
final String threadName = "Initialize Python Processor %s (%s)".formatted(identifier, getProcessorType());
Thread.ofVirtual().name(threadName).start(() -> initializePythonSide(true, future));
}
@Override
public void replaceController(final PythonController controller) {
if (initializationFuture != null) {
initializationFuture.cancel(true);
}
this.controller = controller;
this.adapter = null;
final CompletableFuture<Void> future = new CompletableFuture<>();
this.initializationFuture = future;
final String threadName = "Re-Initialize Python Processor %s (%s)".formatted(identifier, getProcessorType());
Thread.ofVirtual().name(threadName).start(() -> initializePythonSide(true, future));
}
public LoadState getLoadState() {
return loadState;
}
private void initializePythonSide(final boolean continualRetry) {
private void initializePythonSide(final boolean continualRetry, final CompletableFuture<Void> future) {
if (initializationContext == null) {
future.complete(null);
return;
}
long sleepMillis = 1_000L;
while (true) {
while (!future.isCancelled()) {
loadState = LoadState.DOWNLOADING_DEPENDENCIES;
try {
creationWorkflow.downloadDependencies();
logger.info("Successfully downloaded dependencies for Python Processor {} ({})", initializationContext.getIdentifier(), getProcessorType());
logger.info("Successfully downloaded dependencies for Python Processor {} ({})", identifier, getProcessorType());
break;
} catch (final Exception e) {
loadState = LoadState.DEPENDENCY_DOWNLOAD_FAILED;
@ -87,7 +118,7 @@ public class StandardPythonProcessorBridge implements PythonProcessorBridge {
}
sleepMillis = Math.min(sleepMillis * 2, TimeUnit.MINUTES.toMillis(10));
logger.error("Failed to download dependencies for Python Processor {} ({}). Will try again in {} millis", initializationContext.getIdentifier(), getProcessorType(), sleepMillis);
logger.error("Failed to download dependencies for Python Processor {} ({}). Will try again in {} millis", identifier, getProcessorType(), sleepMillis, e);
try {
Thread.sleep(sleepMillis);
@ -99,7 +130,7 @@ public class StandardPythonProcessorBridge implements PythonProcessorBridge {
}
}
while (true) {
while (!future.isCancelled()) {
loadState = LoadState.LOADING_PROCESSOR_CODE;
try {
@ -107,7 +138,7 @@ public class StandardPythonProcessorBridge implements PythonProcessorBridge {
pythonProcessorAdapter.initialize(initializationContext);
this.adapter = pythonProcessorAdapter;
loadState = LoadState.FINISHED_LOADING;
logger.info("Successfully loaded Python Processor {} ({})", initializationContext.getIdentifier(), getProcessorType());
logger.info("Successfully loaded Python Processor {} ({})", identifier, getProcessorType());
break;
} catch (final Exception e) {
loadState = LoadState.LOADING_PROCESSOR_CODE_FAILED;
@ -117,7 +148,7 @@ public class StandardPythonProcessorBridge implements PythonProcessorBridge {
}
sleepMillis = Math.min(sleepMillis * 2, TimeUnit.MINUTES.toMillis(10));
logger.error("Failed to load code for Python Processor {} ({}). Will try again in {} millis", initializationContext.getIdentifier(), getProcessorType(), sleepMillis, e);
logger.error("Failed to load code for Python Processor {} ({}). Will try again in {} millis", identifier, getProcessorType(), sleepMillis, e);
try {
Thread.sleep(sleepMillis);
@ -128,6 +159,8 @@ public class StandardPythonProcessorBridge implements PythonProcessorBridge {
}
}
}
future.complete(null);
}
@Override
@ -143,7 +176,7 @@ public class StandardPythonProcessorBridge implements PythonProcessorBridge {
}
controller.reloadProcessor(getProcessorType(), processorDetails.getProcessorVersion(), workingDir.getAbsolutePath());
initializePythonSide(false);
initializePythonSide(false, new CompletableFuture<>());
lastModified = moduleFile.lastModified();
return true;

View File

@ -70,6 +70,7 @@ public class NiFiGatewayConnection extends GatewayConnection {
private final NiFiGatewayServer gatewayServer;
private volatile boolean poisoned = false;
private final ClassLoader contextClassLoader;
public NiFiGatewayConnection(final NiFiGatewayServer gatewayServer,
final Gateway gateway,
@ -79,6 +80,7 @@ public class NiFiGatewayConnection extends GatewayConnection {
final List<GatewayServerListener> listeners) throws IOException {
super(gateway, socket, authToken, customCommands, listeners);
this.gatewayServer = gatewayServer;
this.contextClassLoader = getClass().getClassLoader();
}
private boolean isContinue() {
@ -87,12 +89,21 @@ public class NiFiGatewayConnection extends GatewayConnection {
@Override
public void run() {
Thread.currentThread().setName(String.format("NiFiGatewayConnection Thread for %s %s", gatewayServer.getComponentType(), gatewayServer.getComponentId()));
while (isContinue()) {
super.run();
}
final ClassLoader originalClassLoader = Thread.currentThread().getContextClassLoader();
try {
Thread.currentThread().setContextClassLoader(this.contextClassLoader);
shutdown(false);
Thread.currentThread().setName(String.format("NiFiGatewayConnection Thread for %s %s", gatewayServer.getComponentType(), gatewayServer.getComponentId()));
while (isContinue()) {
super.run();
}
shutdown(false);
} finally {
if (originalClassLoader != null) {
Thread.currentThread().setContextClassLoader(originalClassLoader);
}
}
}
protected void quietSendFatalError(final BufferedWriter writer, final Throwable exception) {

View File

@ -41,6 +41,7 @@ public class NiFiGatewayServer extends GatewayServer {
private final String componentType;
private final String componentId;
private final String authToken;
private final ClassLoader contextClassLoader;
public NiFiGatewayServer(final Gateway gateway,
final int port,
@ -56,6 +57,7 @@ public class NiFiGatewayServer extends GatewayServer {
this.componentType = componentType;
this.componentId = componentId;
this.authToken = authToken;
this.contextClassLoader = getClass().getClassLoader();
}
protected Py4JServerConnection createConnection(final Gateway gateway, final Socket socket) throws IOException {
@ -84,11 +86,19 @@ public class NiFiGatewayServer extends GatewayServer {
@Override
public void run() {
if (startProcessForked) {
Thread.currentThread().setName("NiFiGatewayServer Thread for " + componentType + " " + componentId);
}
final ClassLoader originalClassLoader = Thread.currentThread().getContextClassLoader();
try {
Thread.currentThread().setContextClassLoader(this.contextClassLoader);
if (startProcessForked) {
Thread.currentThread().setName("NiFiGatewayServer Thread for " + componentType + " " + componentId);
}
super.run();
super.run();
} finally {
if (originalClassLoader != null) {
Thread.currentThread().setContextClassLoader(originalClassLoader);
}
}
}
@Override

View File

@ -19,7 +19,6 @@ package org.apache.nifi.python.processor;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
@ -28,29 +27,16 @@ import org.apache.nifi.processor.exception.ProcessException;
import py4j.Py4JNetworkException;
import java.util.Map;
import java.util.Optional;
import java.util.function.Supplier;
@InputRequirement(Requirement.INPUT_REQUIRED)
public class FlowFileTransformProxy extends PythonProcessorProxy {
public class FlowFileTransformProxy extends PythonProcessorProxy<FlowFileTransform> {
private volatile FlowFileTransform transform;
public FlowFileTransformProxy(final String processorType, final Supplier<PythonProcessorBridge> bridgeFactory, final boolean initialize) {
super(processorType, bridgeFactory, initialize);
}
@OnScheduled
public void setContext(final ProcessContext context) {
final PythonProcessorBridge bridge = getBridge().orElseThrow(() -> new IllegalStateException(this + " is not finished initializing"));
final Optional<PythonProcessorAdapter> optionalAdapter = bridge.getProcessorAdapter();
if (optionalAdapter.isEmpty()) {
throw new IllegalStateException(this + " is not finished initializing");
}
this.transform = (FlowFileTransform) optionalAdapter.get().getProcessor();
this.transform.setContext(context);
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
@ -63,7 +49,7 @@ public class FlowFileTransformProxy extends PythonProcessorProxy {
final FlowFileTransformResult result;
try (final StandardInputFlowFile inputFlowFile = new StandardInputFlowFile(session, original)) {
result = transform.transformFlowFile(inputFlowFile);
result = getTransform().transformFlowFile(inputFlowFile);
} catch (final Py4JNetworkException e) {
throw new ProcessException("Failed to communicate with Python Process", e);
} catch (final Exception e) {

View File

@ -44,7 +44,7 @@ import java.util.function.Supplier;
@SupportsBatching(defaultDuration = DefaultRunDuration.TWENTY_FIVE_MILLIS)
@SupportsSensitiveDynamicProperties
public abstract class PythonProcessorProxy extends AbstractProcessor implements AsyncLoadedProcessor {
public abstract class PythonProcessorProxy<T extends PythonProcessor> extends AbstractProcessor implements AsyncLoadedProcessor {
private final String processorType;
private volatile PythonProcessorInitializationContext initContext;
private volatile PythonProcessorBridge bridge;
@ -53,6 +53,10 @@ public abstract class PythonProcessorProxy extends AbstractProcessor implements
private volatile Map<String, PropertyDescriptor> cachedDynamicDescriptors = null;
private volatile Boolean supportsDynamicProperties;
private volatile T currentTransform;
private volatile ProcessContext currentProcessContext;
protected static final Relationship REL_ORIGINAL = new Relationship.Builder()
.name("original")
.description("The original FlowFile will be routed to this relationship when it has been successfully transformed")
@ -107,6 +111,28 @@ public abstract class PythonProcessorProxy extends AbstractProcessor implements
}
}
@OnScheduled
public void setContext(final ProcessContext context) {
this.currentProcessContext = context;
}
protected T getTransform() {
final PythonProcessorBridge bridge = getBridge().orElseThrow(() -> new IllegalStateException(this + " is not finished initializing"));
final Optional<PythonProcessorAdapter> optionalAdapter = bridge.getProcessorAdapter();
if (optionalAdapter.isEmpty()) {
throw new IllegalStateException(this + " is not finished initializing");
}
final T transform = (T) optionalAdapter.get().getProcessor();
if (transform != currentTransform) {
transform.setContext(currentProcessContext);
currentTransform = transform;
}
return transform;
}
protected Optional<PythonProcessorBridge> getBridge() {
return Optional.ofNullable(this.bridge);
}

View File

@ -20,7 +20,6 @@ package org.apache.nifi.python.processor;
import org.apache.nifi.NullSuppression;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.json.JsonRecordSource;
@ -55,12 +54,10 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Supplier;
@InputRequirement(Requirement.INPUT_REQUIRED)
public class RecordTransformProxy extends PythonProcessorProxy {
private volatile RecordTransform transform;
public class RecordTransformProxy extends PythonProcessorProxy<RecordTransform> {
static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
.name("Record Reader")
@ -91,18 +88,6 @@ public class RecordTransformProxy extends PythonProcessorProxy {
return properties;
}
@OnScheduled
public void setProcessContext(final ProcessContext context) {
final PythonProcessorBridge bridge = getBridge().orElseThrow(() -> new IllegalStateException(this + " is not finished initializing"));
final Optional<PythonProcessorAdapter> adapterOptional = bridge.getProcessorAdapter();
if (adapterOptional.isEmpty()) {
throw new IllegalStateException(this + " is not finished initializing");
}
this.transform = (RecordTransform) adapterOptional.get().getProcessor();
this.transform.setContext(context);
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
@ -120,6 +105,7 @@ public class RecordTransformProxy extends PythonProcessorProxy {
long recordsRead = 0L;
long recordsWritten = 0L;
final RecordTransform transform = getTransform();
Map<Relationship, List<FlowFile>> flowFilesPerRelationship;
try (final InputStream in = session.read(flowFile);
final RecordReader reader = readerFactory.createRecordReader(flowFile, in, getLogger())) {

View File

@ -23,16 +23,12 @@ import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.json.JsonRecordSetWriter;
import org.apache.nifi.json.JsonTreeReader;
import org.apache.nifi.mock.MockProcessorInitializationContext;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.python.ControllerServiceTypeLookup;
import org.apache.nifi.python.PythonBridge;
import org.apache.nifi.python.PythonBridgeInitializationContext;
import org.apache.nifi.python.PythonProcessConfig;
import org.apache.nifi.python.PythonProcessorDetails;
import org.apache.nifi.python.processor.FlowFileTransformProxy;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.record.RecordField;
@ -46,6 +42,8 @@ import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.condition.DisabledOnOs;
import java.io.File;
import java.io.FileOutputStream;
@ -55,7 +53,6 @@ import java.nio.file.Files;
import java.nio.file.Paths;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@ -67,6 +64,7 @@ import java.util.concurrent.TimeUnit;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class PythonControllerInteractionIT {
@ -170,8 +168,7 @@ public class PythonControllerInteractionIT {
// Create a PrettyPrintJson Processor
final byte[] jsonContent = Files.readAllBytes(Paths.get("src/test/resources/json/input/simple-person.json"));
for (int i=0; i < 3; i++) {
final FlowFileTransformProxy wrapper = createFlowFileTransform(PRETTY_PRINT_JSON);
final TestRunner runner = TestRunners.newTestRunner(wrapper);
final TestRunner runner = createFlowFileTransform(PRETTY_PRINT_JSON);
runner.enqueue(jsonContent);
runner.run();
@ -184,8 +181,7 @@ public class PythonControllerInteractionIT {
@Disabled("Just for manual testing...")
public void runPrettyPrintJsonManyThreads() throws IOException {
// Create a PrettyPrintJson Processor
final FlowFileTransformProxy wrapper = createFlowFileTransform(PRETTY_PRINT_JSON);
final TestRunner runner = TestRunners.newTestRunner(wrapper);
final TestRunner runner = createFlowFileTransform(PRETTY_PRINT_JSON);
final int flowFileCount = 100_000;
final int threadCount = 12;
@ -204,8 +200,7 @@ public class PythonControllerInteractionIT {
@Test
public void testSimplePrettyPrint() throws IOException {
// Setup
final FlowFileTransformProxy wrapper = createFlowFileTransform(PRETTY_PRINT_JSON);
final TestRunner runner = TestRunners.newTestRunner(wrapper);
final TestRunner runner = createFlowFileTransform(PRETTY_PRINT_JSON);
runner.enqueue(Paths.get("src/test/resources/json/input/simple-person.json"));
runner.setProperty("Indentation", "2");
@ -223,8 +218,7 @@ public class PythonControllerInteractionIT {
@Test
public void testValidator() {
final FlowFileTransformProxy wrapper = createFlowFileTransform(PRETTY_PRINT_JSON);
final TestRunner runner = TestRunners.newTestRunner(wrapper);
final TestRunner runner = createFlowFileTransform(PRETTY_PRINT_JSON);
runner.setProperty("Indentation", "-1");
runner.assertNotValid();
@ -245,8 +239,7 @@ public class PythonControllerInteractionIT {
@Test
public void testCsvToExcel() {
// Create a PrettyPrintJson Processor
final FlowFileTransformProxy wrapper = createFlowFileTransform("ConvertCsvToExcel");
final TestRunner runner = TestRunners.newTestRunner(wrapper);
final TestRunner runner = createFlowFileTransform("ConvertCsvToExcel");
runner.enqueue("name, number\nJohn Doe, 500");
// Trigger the processor
@ -259,8 +252,7 @@ public class PythonControllerInteractionIT {
@Test
public void testExpressionLanguageWithAttributes() {
// Setup
final FlowFileTransformProxy wrapper = createFlowFileTransform("WritePropertyToFlowFile");
final TestRunner runner = TestRunners.newTestRunner(wrapper);
final TestRunner runner = createFlowFileTransform("WritePropertyToFlowFile");
runner.setProperty("Message", "Hola Mundo");
runner.enqueue("Hello World");
@ -274,8 +266,7 @@ public class PythonControllerInteractionIT {
@Test
public void testPythonPackage() {
// Create a WriteNumber Processor
final FlowFileTransformProxy wrapper = createFlowFileTransform("WriteNumber");
final TestRunner runner = TestRunners.newTestRunner(wrapper);
final TestRunner runner = createFlowFileTransform("WriteNumber");
runner.enqueue("");
// Trigger the processor
@ -289,12 +280,8 @@ public class PythonControllerInteractionIT {
assertTrue(resultNum <= 1000);
}
private FlowFileTransformProxy createFlowFileTransform(final String type) {
final Processor processor = createProcessor(type);
assertNotNull(processor);
processor.initialize(new MockProcessorInitializationContext());
return (FlowFileTransformProxy) processor;
private TestRunner createFlowFileTransform(final String type) {
return createProcessor(type);
}
@Test
@ -312,8 +299,7 @@ public class PythonControllerInteractionIT {
assertEquals("numpy==1.25.0", dependencies.get(0));
// Setup
final FlowFileTransformProxy wrapper = createFlowFileTransform("WriteNumpyVersion");
final TestRunner runner = TestRunners.newTestRunner(wrapper);
final TestRunner runner = createFlowFileTransform("WriteNumpyVersion");
runner.enqueue("Hello World");
// Trigger the processor
@ -328,8 +314,7 @@ public class PythonControllerInteractionIT {
public void testControllerService() throws InitializationException {
// Setup
controllerServiceMap.put("StringLookupService", TestLookupService.class);
final FlowFileTransformProxy wrapper = createFlowFileTransform("LookupAddress");
final TestRunner runner = TestRunners.newTestRunner(wrapper);
final TestRunner runner = createFlowFileTransform("LookupAddress");
final StringLookupService lookupService = new TestLookupService((Collections.singletonMap("John Doe", "123 My Street")));
runner.addControllerService("lookup", lookupService);
runner.enableControllerService(lookupService);
@ -356,8 +341,7 @@ public class PythonControllerInteractionIT {
replaceFileText(sourceFile, replacement, originalMessage);
// Setup
final FlowFileTransformProxy wrapper = createFlowFileTransform("WriteMessage");
final TestRunner runner = TestRunners.newTestRunner(wrapper);
final TestRunner runner = createFlowFileTransform("WriteMessage");
runner.enqueue("");
// Trigger the processor
@ -424,8 +408,7 @@ public class PythonControllerInteractionIT {
assertEquals(1, v2Count);
// Create a WriteMessage Processor, version 0.0.1-SNAPSHOT
final FlowFileTransformProxy wrapperV1 = createFlowFileTransform("WriteMessage");
final TestRunner runnerV1 = TestRunners.newTestRunner(wrapperV1);
final TestRunner runnerV1 = createFlowFileTransform("WriteMessage");
runnerV1.enqueue("");
// Trigger the processor
@ -436,8 +419,7 @@ public class PythonControllerInteractionIT {
runnerV1.getFlowFilesForRelationship("success").get(0).assertContentEquals("Hello, World");
// Create an instance of WriteMessage V2
final Processor procV2 = createProcessor("WriteMessage", "0.0.2-SNAPSHOT");
final TestRunner runnerV2 = TestRunners.newTestRunner(procV2);
final TestRunner runnerV2 = createProcessor("WriteMessage", "0.0.2-SNAPSHOT");
runnerV2.enqueue("");
// Trigger the processor
@ -486,13 +468,11 @@ public class PythonControllerInteractionIT {
private TestRunner createRecordTransformRunner(final String type) throws InitializationException {
final Processor processor = createProcessor("SetRecordField");
assertNotNull(processor);
final TestRunner runner = createProcessor("SetRecordField");
final JsonTreeReader reader = new JsonTreeReader();
final JsonRecordSetWriter writer = new JsonRecordSetWriter();
final TestRunner runner = TestRunners.newTestRunner(processor);
runner.addControllerService("reader", reader);
runner.addControllerService("writer", writer);
runner.enableControllerService(reader);
@ -525,8 +505,7 @@ public class PythonControllerInteractionIT {
@Test
public void testCustomRelationships() {
final FlowFileTransformProxy processor = createFlowFileTransform("RouteFlowFile");
final TestRunner runner = TestRunners.newTestRunner(processor);
final TestRunner runner = createFlowFileTransform("RouteFlowFile");
final Set<Relationship> relationships = runner.getProcessor().getRelationships();
assertEquals(4, relationships.size());
@ -545,9 +524,41 @@ public class PythonControllerInteractionIT {
runner.assertTransferCount("failure", 0);
}
@Test
@Timeout(45)
@DisabledOnOs(org.junit.jupiter.api.condition.OS.WINDOWS) // Cannot run on windows because ExitAfterFourInvocations uses `kill -9` command
public void testProcessRestarted() {
final TestRunner runner = createFlowFileTransform("ExitAfterFourInvocations");
private RecordSchema createSimpleRecordSchema(final String... fieldNames) {
return createSimpleRecordSchema(Arrays.asList(fieldNames));
for (int i=0; i < 10; i++) {
runner.enqueue(Integer.toString(i));
}
runner.run(4);
assertThrows(Throwable.class, runner::run);
// Run 2 additional times. Because the Python Process will have to be restarted, it may take a bit,
// so we keep trying until we succeed, relying on the 15 second timeout for the test to fail us if
// the Process doesn't get restarted in time.
for (int i=0; i < 2; i++) {
while(true) {
try {
runner.run(1, false, i == 0);
break;
} catch (final Throwable t) {
try {
Thread.sleep(1000L);
} catch (final InterruptedException e) {
throw new RuntimeException(e);
}
}
}
}
// Trigger stop on finish
runner.run(1, true, false);
runner.assertTransferCount("success", 7);
}
private RecordSchema createSimpleRecordSchema(final List<String> fieldNames) {
@ -583,16 +594,15 @@ public class PythonControllerInteractionIT {
return UUID.randomUUID().toString();
}
private Processor createProcessor(final String type) {
private TestRunner createProcessor(final String type) {
return createProcessor(type, VERSION);
}
private Processor createProcessor(final String type, final String version) {
private TestRunner createProcessor(final String type, final String version) {
bridge.discoverExtensions();
final AsyncLoadedProcessor processor = bridge.createProcessor(createId(), type, version, true, true);
final ProcessorInitializationContext initContext = new MockProcessorInitializationContext();
processor.initialize(initContext);
final TestRunner runner = TestRunners.newTestRunner(processor);
final long maxInitTime = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(30L);
while (true) {
@ -615,8 +625,8 @@ public class PythonControllerInteractionIT {
throw new RuntimeException("Interrupted while initializing processor of type %s version %s".formatted(type, version));
}
}
processor.initialize(new MockProcessorInitializationContext());
return processor;
return runner;
}
}

View File

@ -18,6 +18,7 @@
package org.apache.nifi.python.processor;
import org.apache.nifi.components.AsyncLoadedProcessor.LoadState;
import org.apache.nifi.python.PythonController;
import java.util.Optional;
@ -32,6 +33,8 @@ public interface PythonProcessorBridge {
*/
Optional<PythonProcessorAdapter> getProcessorAdapter();
void replaceController(PythonController controller);
/**
* @return the name of the Processor implementation. This will not contain a 'python.' prefix.
*/

View File

@ -0,0 +1,44 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from nifiapi.flowfiletransform import FlowFileTransform, FlowFileTransformResult
import os
import subprocess
class ExitAfterFourInvocations(FlowFileTransform):
class Java:
implements = ['org.apache.nifi.python.processor.FlowFileTransform']
class ProcessorDetails:
version = '0.0.1-SNAPSHOT'
description = 'Transfers the first 4 FlowFiles to success, then kills the process on subsequent invocations by invoking system command kill -9 <pid>'
dependencies = ['pandas'] # Just to have some dependency that must be downloaded to test functionality on restart
invocations = 0
def __init__(self, **kwargs):
pass
def transform(self, context, flowFile):
self.invocations += 1
if self.invocations > 4:
# Issue a `kill -9` to the current process
pid = os.getpid()
subprocess.run(['kill', '-9', str(pid)])
return FlowFileTransformResult(relationship = "success")
def getPropertyDescriptors(self):
return []