NIFI-13745 Added StateManager Support to Python Processor API (#9419)

Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
Peter Gyori 2024-11-12 16:03:57 +01:00 committed by GitHub
parent f15b7caaa4
commit 2f3dd015e0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 419 additions and 1 deletions

View File

@ -475,6 +475,64 @@ that there are no longer any invocations of the `transform` method running when
[[state-manager]]
=== State Manager
The `context` object that is the parameter of the `transform()`, `create()`, `onScheduled()` and `onStopped()` methods
provides access to State Manager through the `getStateManager()` method. State Manager is responsible for providing
a simple API for storing and retrieving state. For information on State Manager, refer to the NiFi Developer's Guide.
The Python StateManager object returned by `context.getStateManager()` provides access to the underlying Java StateManager.
With the Python StateManager, the state is handled in the form of Python dictionaries (as opposed to Java Maps).
Just like in Java, StateManager can handle both `LOCAL` and `CLUSTER` state.
Should an error occur accessing the state using the StateManager's methods, a StateException is thrown that can be caught and
handled in the Python Processor's code. This enables the Python developer to implement the Processor in such a way that
when a state-related error occurs, the Processor can continue its operation without disruption.
If a StateException is thrown by StateManager but not caught in the Processor's code, the ProcessSession is rolled back
and an error is logged.
Below is an example Processor that uses StateManager. This example assumes that the Processor is executed on
primary node only, and only on one thread.
----
from nifiapi.componentstate import Scope, StateManager, StateException
from nifiapi.flowfilesource import FlowFileSource, FlowFileSourceResult
class CreateFlowFile(FlowFileSource):
class Java:
implements = ['org.apache.nifi.python.processor.FlowFileSource']
class ProcessorDetails:
version = '0.0.1-SNAPSHOT'
description = '''A Python processor that creates FlowFiles and uses StateManager.'''
tags = ['test', 'python', 'source', 'state']
def __init__(self, **kwargs):
pass
def onScheduled(self, context):
try:
self.state = context.getStateManager().getState(Scope.CLUSTER).toMap()
except StateException as e:
self.logger.warn('Failed to read processor state. ' + str(e))
self.state = dict()
def create(self, context):
old_value = int(self.state.get('FlowFileNumber', '0'))
new_value = old_value + 1
new_state = {'FlowFileNumber': str(new_value)}
try:
context.getStateManager().setState(new_state, Scope.CLUSTER)
self.state = new_state
except StateException as e:
self.logger.warn('Failed to save state. ' + str(e))
return FlowFileSourceResult(relationship='success', attributes=new_state, contents=None)
----
[[documenting_use_cases]]
== Documenting Use Cases

View File

@ -61,7 +61,7 @@ public class FlowFileSourceProxy extends PythonProcessorProxy<FlowFileSource> {
} catch (final Py4JNetworkException e) {
throw new ProcessException("Failed to communicate with Python Process", e);
} catch (final Exception e) {
getLogger().error("Failed to create FlowFile {}", e);
getLogger().error("Failed to create FlowFile", e);
return;
}

View File

@ -18,6 +18,7 @@
package org.apache.nifi.python.processor;
import org.apache.nifi.annotation.behavior.DefaultRunDuration;
import org.apache.nifi.annotation.behavior.Stateful;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.SupportsSensitiveDynamicProperties;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
@ -26,6 +27,7 @@ import org.apache.nifi.components.AsyncLoadedProcessor;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
@ -44,6 +46,8 @@ import java.util.function.Supplier;
@SupportsBatching(defaultDuration = DefaultRunDuration.TWENTY_FIVE_MILLIS)
@SupportsSensitiveDynamicProperties
@Stateful(scopes = {Scope.LOCAL, Scope.CLUSTER},
description = "Python processors can store and retrieve state using the State Management APIs. Consult the State Manager section of the Developer's Guide for more details.")
public abstract class PythonProcessorProxy<T extends PythonProcessor> extends AbstractProcessor implements AsyncLoadedProcessor {
private final String processorType;
private volatile PythonProcessorInitializationContext initContext;

View File

@ -19,6 +19,7 @@ package org.apache.nifi.py4j;
import org.apache.nifi.components.AsyncLoadedProcessor;
import org.apache.nifi.components.AsyncLoadedProcessor.LoadState;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.flowfile.FlowFile;
@ -31,6 +32,7 @@ import org.apache.nifi.python.PythonBridgeInitializationContext;
import org.apache.nifi.python.PythonProcessConfig;
import org.apache.nifi.python.PythonProcessorDetails;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.state.MockStateManager;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
@ -610,6 +612,76 @@ public class PythonControllerInteractionIT {
runner.assertTransferCount("success", 0);
}
@Test
public void testStateManagerSetState() {
final TestRunner runner = createStateManagerTesterProcessor("setState");
final MockStateManager stateManager = runner.getStateManager();
stateManager.assertStateNotSet();
runner.run();
stateManager.assertStateSet(Scope.CLUSTER);
stateManager.assertStateEquals(Map.of("state_key_1", "state_value_1"), Scope.CLUSTER);
runner.assertTransferCount("success", 1);
}
@Test
public void testStateManagerGetState() throws IOException {
final TestRunner runner = createStateManagerTesterProcessor("getState");
final MockStateManager stateManager = initializeStateManager(runner);
runner.run();
// The processor reads the state and adds the key-value pairs
// to the FlowFile as attributes.
stateManager.assertStateEquals(Map.of("state_key_1", "state_value_1"), Scope.CLUSTER);
final MockFlowFile flowFile = runner.getFlowFilesForRelationship("success").get(0);
flowFile.assertAttributeEquals("state_key_1", "state_value_1");
}
@Test
public void testStateManagerReplace() throws IOException {
final TestRunner runner = createStateManagerTesterProcessor("replace");
final MockStateManager stateManager = initializeStateManager(runner);
runner.run();
final Map finalState = Map.of("state_key_2", "state_value_2");
stateManager.assertStateEquals(finalState, Scope.CLUSTER);
runner.assertTransferCount("success", 1);
}
@Test
public void testStateManagerClear() throws IOException {
final TestRunner runner = createStateManagerTesterProcessor("clear");
final MockStateManager stateManager = initializeStateManager(runner);
runner.run();
stateManager.assertStateEquals(Map.of(), Scope.CLUSTER);
runner.assertTransferCount("success", 1);
}
@Test
public void testStateManagerExceptionHandling() {
// Use-case tested: StateManager's exception can be caught
// in the Python code and execution continued without producing errors.
final TestRunner runner = createProcessor("TestStateManagerException");
waitForValid(runner);
final MockStateManager stateManager = runner.getStateManager();
stateManager.setFailOnStateSet(Scope.CLUSTER, true);
runner.run();
runner.assertTransferCount("success", 1);
runner.getFlowFilesForRelationship("success").get(0).assertAttributeEquals("exception_msg", "Set state failed");
}
public interface StringLookupService extends ControllerService {
Optional<String> lookup(Map<String, String> coordinates);
}
@ -690,4 +762,18 @@ public class PythonControllerInteractionIT {
output.assertContentEquals(expectedContent);
}
private TestRunner createStateManagerTesterProcessor(String methodToTest) {
final TestRunner runner = createProcessor("TestStateManager");
runner.setProperty("StateManager Method To Test", methodToTest);
waitForValid(runner);
return runner;
}
private MockStateManager initializeStateManager(TestRunner runner) throws IOException {
final MockStateManager stateManager = runner.getStateManager();
final Map initialState = Map.of("state_key_1", "state_value_1");
stateManager.setState(initialState, Scope.CLUSTER);
return stateManager;
}
}

View File

@ -0,0 +1,63 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from nifiapi.componentstate import Scope, StateManager, StateException
from nifiapi.flowfilesource import FlowFileSource, FlowFileSourceResult
from nifiapi.properties import PropertyDescriptor, StandardValidators, PropertyDependency
class TestStateManager(FlowFileSource):
class Java:
implements = ['org.apache.nifi.python.processor.FlowFileSource']
class ProcessorDetails:
version = '0.0.1-SNAPSHOT'
description = '''A Python source processor that uses StateManager.'''
tags = ['text', 'test', 'python', 'source']
METHOD_TO_TEST = PropertyDescriptor(
name='StateManager Method To Test',
description='''The name of StateManager's method that should be tested.''',
required=True
)
property_descriptors = [METHOD_TO_TEST]
def __init__(self, **kwargs):
pass
def getPropertyDescriptors(self):
return self.property_descriptors
def create(self, context):
method_to_test = context.getProperty(self.METHOD_TO_TEST).getValue()
flowfile_attributes = None
state_manager = context.getStateManager()
match method_to_test.lower():
case 'setstate':
new_state = {'state_key_1': 'state_value_1'}
state_manager.setState(new_state, Scope.CLUSTER)
case 'getstate':
flowfile_attributes = state_manager.getState(Scope.CLUSTER).toMap()
case 'replace':
old_state = state_manager.getState(Scope.CLUSTER)
new_state = {'state_key_2': 'state_value_2'}
state_manager.replace(old_state, new_state, Scope.CLUSTER)
case 'clear':
state_manager.clear(Scope.CLUSTER)
case _:
pass
return FlowFileSourceResult(relationship='success', attributes=flowfile_attributes, contents='Output FlowFile Contents')

View File

@ -0,0 +1,40 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from nifiapi.componentstate import Scope, StateManager, StateException
from nifiapi.flowfilesource import FlowFileSource, FlowFileSourceResult
class TestStateManagerException(FlowFileSource):
class Java:
implements = ['org.apache.nifi.python.processor.FlowFileSource']
class ProcessorDetails:
version = '0.0.1-SNAPSHOT'
description = '''A Python source processor that uses StateManager.'''
tags = ['text', 'test', 'python', 'source']
def __init__(self, **kwargs):
pass
def create(self, context):
state_manager = context.getStateManager()
flowfile_attributes = None
try:
new_state = {'state_key_1': 'state_value_1'}
state_manager.setState(new_state, Scope.CLUSTER)
except StateException as state_exception:
flowfile_attributes = {'exception_msg': str(state_exception)}
return FlowFileSourceResult(relationship='success', attributes=flowfile_attributes, contents='Output FlowFile Contents')

View File

@ -0,0 +1,163 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from enum import Enum
from nifiapi.__jvm__ import JvmHolder
from py4j.protocol import Py4JJavaError
CLUSTER_SCOPE = JvmHolder.jvm.org.apache.nifi.components.state.Scope.CLUSTER
LOCAL_SCOPE = JvmHolder.jvm.org.apache.nifi.components.state.Scope.LOCAL
def convert_pyscope_to_jscope(scope):
"""Converts Python Scope to Java Scope."""
if scope == Scope.LOCAL:
return LOCAL_SCOPE
else:
return CLUSTER_SCOPE
def convert_pydict_to_jmap(dictionary):
"""Converts a Python Dictionary to a Java Map"""
java_map = JvmHolder.jvm.java.util.HashMap()
for key, value in dictionary.items():
java_map.put(key, value)
return java_map
class Scope(Enum):
CLUSTER = 1
LOCAL = 2
class StateManager:
"""
Python class wrapping the StateManager Java implementation.
The constructor receives a Java object implementing the Java StateManager interface.
The methods in Java StateManager have corresponding methods in this class.
Each method in this class propagates the method call to the Java StateManager object.
Each method may raise StateException in case the Java object fails to execute the particular operation.
When an operation fails on the Java side, a Py4JJavaError is produced on the Python side by the Py4J bridge.
The Py4JJavaError is caught in the Python StateManager and a new StateException instance is raised,
indicating that the exception carries information regarding an error when accessing the state.
"""
_java_state_manager = None
def __init__(self, java_state_manager):
"""
:param java_state_manager: Java StateManager object that performs all the StateManager-related operations.
"""
self._java_state_manager = java_state_manager
def setState(self, state, scope):
"""
:param state: (dict) Python dictionary containing the state to be saved.
:param scope: Scope.CLUSTER or Scope.LOCAL
"""
try:
self._java_state_manager.setState(convert_pydict_to_jmap(state), convert_pyscope_to_jscope(scope))
except Py4JJavaError as exception:
raise StateException("Set state failed") from exception
def getState(self, scope):
"""
:param scope: Scope.CLUSTER or Scope.LOCAL
:return: Python StateMap object representing the state.
"""
try:
java_statemap = self._java_state_manager.getState(convert_pyscope_to_jscope(scope))
return StateMap(java_statemap)
except Py4JJavaError as exception:
raise StateException('Get state failed') from exception
def replace(self, oldValue, newValue, scope):
"""
:param oldValue: (StateMap) Python StateMap object with the state to be replaced.
:param newValue: (dict) Python dictionary containing the state to be saved.
:param scope: Scope.CLUSTER or Scope.LOCAL
"""
try:
self._java_state_manager.replace(oldValue.get_java_statemap(), convert_pydict_to_jmap(newValue), convert_pyscope_to_jscope(scope))
except Py4JJavaError as exception:
raise StateException('Replace state failed') from exception
def clear(self, scope):
"""
:param scope: Scope.CLUSTER or Scope.LOCAL
"""
try:
self._java_state_manager.clear(convert_pyscope_to_jscope(scope))
except Py4JJavaError as exception:
raise StateException('Clear state failed') from exception
class StateMap:
"""
Python class wrapping the StateMap Java implementation.
The constructor receives a Java object implementing the Java StateMap interface.
The methods in Java StateMap have corresponding methods in this class.
Methods in this class propagate the method calls to the Java StateMap object.
"""
_java_statemap = None
_state_version = None
def __init__(self, java_statemap):
"""
:param java_statemap: Java StateMap object.
"""
self._java_statemap = java_statemap
if not java_statemap.getStateVersion().isEmpty():
self._state_version = java_statemap.getStateVersion().get()
def getStateVersion(self):
"""
:return: (int) Integer value of the state version, or None if it is not set.
"""
return self._state_version
def get(self, key):
"""
:param key: (str) String key indicating which state value should be returned.
:return: (str) String state value.
"""
return self._java_statemap.get(key)
def toMap(self):
"""
:return: (dict) Python dictionary representation of the state.
"""
result = dict()
java_iterator = self._java_statemap.toMap().entrySet().iterator()
while java_iterator.hasNext():
java_map_entry = java_iterator.next()
result[java_map_entry.getKey()] = java_map_entry.getValue()
return result
def get_java_statemap(self):
"""
:return: The Java StateMap object behind this wrapper class.
"""
return self._java_statemap
class StateException(Exception):
"""
Exception class for any exception produced by the operations in StateManager.
"""

View File

@ -14,6 +14,7 @@
# limitations under the License.
from enum import Enum
from nifiapi.componentstate import StateManager
from nifiapi.__jvm__ import JvmHolder
import re
@ -349,6 +350,9 @@ class ProcessContext(PropertyContext):
def yield_resources(self):
JvmHolder.java_gateway.get_method(self.java_context, "yield")()
def getStateManager(self):
return StateManager(self.java_context.getStateManager())
class ValidationContext(PropertyContext):