NIFI-13427 Added FlowFileSource interface for Python Processors

This closes #9000

Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
Peter Gyori 2024-06-19 10:29:14 +02:00 committed by exceptionfactory
parent b105b5bbef
commit 3efb0763da
No known key found for this signature in database
11 changed files with 413 additions and 8 deletions

View File

@ -122,7 +122,7 @@ Processors such as ExecuteScript) tend to be around data manipulation and/or com
- Calls from Python to Java (and vice versa) are far more expensive than native method calls. Having APIs that are more tailored toward
specific use cases allows for fewer interactions between the two processes, which greatly improves performance.
As a result, the Python API consists of two different Processor classes that can be implemented: `FlowFileTransform` and `RecordTransform`.
As a result, the Python API consists of three different Processor classes that can be implemented: `FlowFileTransform`, `RecordTransform` and `FlowFileSource`.
Others may emerge in the future.
@ -283,6 +283,48 @@ If the partition has more than one field in the dictionary, all fields in the di
the Records to be written to the same output FlowFile.
[[flowfile-source]]
=== FlowFileSource
The `FlowFileSource` API provides a mechanism for creating a FlowFile and routing it based on its textual or binary contents.
In order to implement the `FlowFileSource` API, a Python class must extend from the `nifiapi.FlowFileSource` class
and implement the `create(ProcessContext)` method, which returns a `FlowFileSourceResult`. Notice, that the difference between
`FlowFileSource's create(ProcessContext)` and `FlowFileTransform's transform(ProcessContext, InputFlowFile)` methods is
that the former does not expect an InputFlowFile object. That is because processors based on the `FlowFileSource` API
are "source" processors that do not accept incoming connections but are capable of creating FlowFiles themselves.
Implementing a Processor based on `FlowFileSource` is very similar to implementing one based on `FlowFileTransform`.
A simple implementation looks like this:
----
from nifiapi.flowfilesource import FlowFileSource, FlowFileSourceResult
class CreateFlowFile(FlowFileSource):
class Java:
implements = ['org.apache.nifi.python.processor.FlowFileSource']
class ProcessorDetails:
version = '0.0.1-SNAPSHOT'
description = '''A Python processor that creates FlowFiles.'''
def __init__(self, **kwargs):
pass
def create(self, context):
return FlowFileSourceResult(relationship = 'success', attributes = {'greeting': 'hello'}, contents = 'Hello World!')
----
As mentioned above, the `create` method only takes one argument: the context (of type `nifiapi.properties.ProcessContext`).
The return type is a `FlowFileSourceResult` that indicates which Relationship the FlowFile should be transferred to,
any attributes that should be added to the FlowFile and the contents of the FlowFile. The `relationship` is a required argument.
Each processor based on the `FlowFileSource` API has a `success` relationship and additional relationships can be
created in the Processor's Python code. `attributes` and `contents` are both optional. If `attributes` is not provided,
the FlowFile will still have the usual `filename`, `path` and `uuid` attributes, but no additional ones.
If `contents` is not provided, a FlowFile with no contents (only attributes) will be created.
[[property-descriptors]]
=== PropertyDescriptors
@ -295,7 +337,7 @@ A `PropertyDescriptor` is created using the `nifiapi.properties.PropertyDescript
arguments: `name` and `description`. All other arguments are optional.
Typically, a Processor will have multiple Property Descriptors. These descriptors are then returned to the NiFi framework by implementing the following
method in the Processor (regardless of whether it is a `FlowFileTransform` or a `RecordTransform`):
method in the Processor (regardless of whether it is a `FlowFileTransform`, a `RecordTransform` or a `FlowFileSource`):
----
def getPropertyDescriptors(self)
----
@ -351,7 +393,7 @@ invalid. Of course, we might also specify explicit validators that can be used,
Each Processor in NiFi must route its outgoing data to some destination. In NiFi, those destinations are called "Relationships."
Each Processor is responsible for declaring its Relationships.
Both the FlowFileTransform an RecordTransform Processors already have a Relationship named `original` and one named `failure.`
Both the FlowFileTransform and RecordTransform Processors already have a Relationship named `original` and one named `failure.`
The `original` relationship should not be used by implementations. This is used only by the framework and allows the input FlowFile
to be passed on without modification. If the Processor cannot transform its input (because the data is not valid, for example),
the Processor may route the data to the `failure` relationship.
@ -365,6 +407,11 @@ This method returns a list or a set of `nifiapi.relationship.Relationship` objec
Relationship will not automatically be made available. It will need to be created and returned within this list, if it is to be used.
Regardless of which Relationships are exposed by the implementation, the `failure` and `original` will always be made available.
Unlike FlowFileTransform and RecordTransform Processors, FlowFileSource Processors only have a `success` relationship by default.
Implementations can use this relationship to route the created FlowFiles. Additional relationships can be exposed by implementing
the `getRelationships` method. In the case of FlowFileSource implementing `getRelationships` does not remove the `success` relationship.
Any relationship returned by `getRelationships` appears besides the `success` relationship.
[[inner-classes]]
=== ProcessorDetails and Java inner classes
@ -630,8 +677,9 @@ For example, by default, `nifi.python.extensions.source.directory.default` is se
in the property name with some other value.
Any `.py` file found in the directory will be parsed and examined in order to determine whether or not it is a valid NiFi Processor.
In order to be found, the Processor must have a valid parent (`FlowFileTransform` or `RecordTransform`) and must have an inner class named `Java`
with a `implements = ['org.apache.nifi.python.processor.FlowFileTransform']` or `implements = ['org.apache.nifi.python.processor.RecordFileTransform']`.
In order to be found, the Processor must have a valid parent (`FlowFileTransform`, `RecordTransform` or `FlowFileSource`) and must have an inner class named `Java`
with a `implements = ['org.apache.nifi.python.processor.FlowFileTransform']` or `implements = ['org.apache.nifi.python.processor.RecordFileTransform']`
or `implements = ['org.apache.nifi.python.processor.FlowFileSource']`.
This will allow NiFi to automatically discover the Processor.
Note, however, that if the Processor implementation is broken into multiple Python modules, those modules will not be made available by default. In order

View File

@ -27,6 +27,8 @@ import org.apache.nifi.python.PythonBridge;
import org.apache.nifi.python.PythonBridgeInitializationContext;
import org.apache.nifi.python.PythonProcessConfig;
import org.apache.nifi.python.PythonProcessorDetails;
import org.apache.nifi.python.processor.FlowFileSource;
import org.apache.nifi.python.processor.FlowFileSourceProxy;
import org.apache.nifi.python.processor.FlowFileTransform;
import org.apache.nifi.python.processor.FlowFileTransformProxy;
import org.apache.nifi.python.processor.PythonProcessorBridge;
@ -145,6 +147,9 @@ public class StandardPythonBridge implements PythonBridge {
if (RecordTransform.class.getName().equals(implementedInterface)) {
return new RecordTransformProxy(type, processorBridgeFactory, initialize);
}
if (FlowFileSource.class.getName().equals(implementedInterface)) {
return new FlowFileSourceProxy(type, processorBridgeFactory, initialize);
}
return null;
}

View File

@ -0,0 +1,24 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.python.processor;
public interface FlowFileSource extends PythonProcessor {
FlowFileSourceResult createFlowFile();
}

View File

@ -0,0 +1,92 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.python.processor;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import py4j.Py4JNetworkException;
import java.util.Map;
import java.util.Set;
import java.util.function.Supplier;
@InputRequirement(Requirement.INPUT_FORBIDDEN)
public class FlowFileSourceProxy extends PythonProcessorProxy<FlowFileSource> {
protected static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("FlowFiles created by this processor can be routed to this relationship.")
.build();
private static final Set<Relationship> implicitRelationships = Set.of(REL_SUCCESS);
public FlowFileSourceProxy(final String processorType, final Supplier<PythonProcessorBridge> bridgeFactory, final boolean initialize) {
super(processorType, bridgeFactory, initialize);
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
final FlowFileSourceResult result;
try {
result = getTransform().createFlowFile();
} catch (final Py4JNetworkException e) {
throw new ProcessException("Failed to communicate with Python Process", e);
} catch (final Exception e) {
getLogger().error("Failed to create FlowFile {}", e);
return;
}
try {
final String relationshipName = result.getRelationship();
final Relationship relationship = new Relationship.Builder().name(relationshipName).build();
final Map<String, String> attributes = result.getAttributes();
final byte[] contents = result.getContents();
FlowFile output = createFlowFile(session, attributes, contents);
if (REL_SUCCESS.getName().equals(relationshipName)) {
session.transfer(output, REL_SUCCESS);
} else {
session.transfer(output, relationship);
}
} finally {
result.free();
}
}
protected FlowFile createFlowFile(final ProcessSession session, final Map<String, String> attributes, final byte[] contents) {
FlowFile flowFile = session.create();
if (attributes != null) {
flowFile = session.putAllAttributes(flowFile, attributes);
}
if (contents != null) {
flowFile = session.write(flowFile, out -> out.write(contents));
}
return flowFile;
}
@Override
protected Set<Relationship> getImplicitRelationships() {
return implicitRelationships;
}
}

View File

@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.python.processor;
import org.apache.nifi.python.PythonObjectProxy;
import java.util.Map;
public interface FlowFileSourceResult extends PythonObjectProxy {
String getRelationship();
byte[] getContents();
Map<String, String> getAttributes();
}

View File

@ -567,6 +567,28 @@ public class PythonControllerInteractionIT {
out.assertAttributeEquals("failureReason", "Intentional failure of unit test");
}
@Test
public void testCreateFlowFile() throws IOException {
final String processorName = "CreateFlowFile";
final String propertyName = "FlowFile Contents";
final String relationshipSuccess = "success";
final String relationshipMultiline = "multiline";
final String singleLineContent = "Hello World!";
testSourceProcessor(processorName,
Map.of(propertyName, singleLineContent),
Map.of(relationshipSuccess, 1, relationshipMultiline, 0),
relationshipSuccess,
singleLineContent.getBytes(StandardCharsets.UTF_8));
final String multiLineContent = "Hello\nWorld!";
testSourceProcessor(processorName,
Map.of(propertyName, multiLineContent),
Map.of(relationshipSuccess, 0, relationshipMultiline, 1),
relationshipMultiline,
multiLineContent.getBytes(StandardCharsets.UTF_8));
}
public interface StringLookupService extends ControllerService {
Optional<String> lookup(Map<String, String> coordinates);
}
@ -624,4 +646,27 @@ public class PythonControllerInteractionIT {
return runner;
}
private void testSourceProcessor(final String processorName,
final Map<String, String> propertiesWithValues,
final Map<String, Integer> relationshipsWithFlowFileCounts,
final String expectedOuputRelationship,
final byte[] expectedContent) throws IOException {
final TestRunner runner = createProcessor(processorName);
propertiesWithValues.forEach((propertyName, propertyValue) -> {
runner.setProperty(propertyName, propertyValue);
});
waitForValid(runner);
runner.run();
relationshipsWithFlowFileCounts.forEach((relationship, count) -> {
runner.assertTransferCount(relationship, count);
});
final MockFlowFile output = runner.getFlowFilesForRelationship(expectedOuputRelationship).get(0);
output.assertContentEquals(expectedContent);
}
}

View File

@ -0,0 +1,65 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from abc import ABC, abstractmethod
from nifiapi.__jvm__ import JvmHolder
from nifiapi.properties import ProcessContext
class FlowFileSource(ABC):
# These will be set by the PythonProcessorAdapter when the component is created
identifier = None
logger = None
def __init__(self):
self.arrayList = JvmHolder.jvm.java.util.ArrayList
def setContext(self, context):
self.process_context = ProcessContext(context)
def createFlowFile(self):
return self.create(self.process_context)
@abstractmethod
def create(self, context):
pass
class FlowFileSourceResult:
class Java:
implements = ['org.apache.nifi.python.processor.FlowFileSourceResult']
def __init__(self, relationship, attributes = None, contents = None):
self.relationship = relationship
self.attributes = attributes
if contents is not None and isinstance(contents, str):
self.contents = str.encode(contents)
else:
self.contents = contents
def getRelationship(self):
return self.relationship
def getContents(self):
return self.contents
def getAttributes(self):
if self.attributes is None:
return None
map = JvmHolder.jvm.java.util.HashMap()
for key, value in self.attributes.items():
map.put(key, value)
return map

View File

@ -53,7 +53,9 @@ class ExtensionManager:
third-party dependencies have been imported.
"""
processor_interfaces = ['org.apache.nifi.python.processor.FlowFileTransform', 'org.apache.nifi.python.processor.RecordTransform']
processor_interfaces = ['org.apache.nifi.python.processor.FlowFileTransform',
'org.apache.nifi.python.processor.RecordTransform',
'org.apache.nifi.python.processor.FlowFileSource']
processor_details = {}
processor_class_by_name = {}
module_files_by_extension_type = {}

View File

@ -20,7 +20,9 @@ from nifiapi.documentation import UseCaseDetails, MultiProcessorUseCaseDetails,
import ExtensionDetails
PROCESSOR_INTERFACES = ['org.apache.nifi.python.processor.FlowFileTransform', 'org.apache.nifi.python.processor.RecordTransform']
PROCESSOR_INTERFACES = ['org.apache.nifi.python.processor.FlowFileTransform',
'org.apache.nifi.python.processor.RecordTransform',
'org.apache.nifi.python.processor.FlowFileSource']
logger = logging.getLogger("python.ProcessorInspection")

View File

@ -0,0 +1,57 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from nifiapi.flowfilesource import FlowFileSource, FlowFileSourceResult
from nifiapi.properties import PropertyDescriptor, StandardValidators, PropertyDependency
from nifiapi.relationship import Relationship
class CreateFlowFile(FlowFileSource):
class Java:
implements = ['org.apache.nifi.python.processor.FlowFileSource']
class ProcessorDetails:
version = '0.0.1-SNAPSHOT'
description = '''A Python processor that creates FlowFiles with given contents.'''
tags = ['text', 'test', 'python', 'source']
FF_CONTENTS = PropertyDescriptor(
name='FlowFile Contents',
description='''The contents of the FlowFile.''',
required=True,
default_value='Hello World!'
)
property_descriptors = [FF_CONTENTS]
REL_MULTILINE = Relationship(name='multiline', description='FlowFiles that contain multiline text.')
def __init__(self, **kwargs):
pass
def getPropertyDescriptors(self):
return self.property_descriptors
def getRelationships(self):
return [self.REL_MULTILINE]
def create(self, context):
contents = context.getProperty(self.FF_CONTENTS).getValue()
if contents is not None and isinstance(contents, str):
contents_str = str.encode(contents)
if b'\n' in contents_str:
return FlowFileSourceResult(relationship='multiline', attributes=None, contents=contents_str)
return FlowFileSourceResult(relationship='success', attributes=None, contents=contents)

View File

@ -84,7 +84,6 @@ public class PythonProcessorIT extends NiFiSystemIT {
assertEquals(messageContents, contents);
}
@Test
public void testRecordTransform() throws NiFiClientException, IOException, InterruptedException {
final ProcessorEntity generate = getClientUtil().createProcessor("GenerateFlowFile");
@ -171,4 +170,39 @@ public class PythonProcessorIT extends NiFiSystemIT {
assertEquals("Ball", secondRecordValues.get( headerIndices.get("sport") ));
assertEquals("HELLO", secondRecordValues.get( headerIndices.get("greeting") ));
}
@Test
public void testFlowFileSource() throws NiFiClientException, IOException, InterruptedException {
final String messageContents = "Hello World";
final ProcessorEntity createFlowFilePython = getClientUtil().createPythonProcessor("CreateFlowFile");
final ProcessorEntity terminate = getClientUtil().createProcessor("TerminateFlowFile");
// Config CreateFlowFile with "Hello World" as the value of "FlowFile Contents" attribute
final ProcessorConfigDTO generateConfig = createFlowFilePython.getComponent().getConfig();
generateConfig.setProperties(Collections.singletonMap("FlowFile Contents", messageContents));
getClientUtil().updateProcessorConfig(createFlowFilePython, generateConfig);
// Connect the processors
final ConnectionEntity outputConnection = getClientUtil().createConnection(createFlowFilePython, terminate, "success");
getClientUtil().setAutoTerminatedRelationships(createFlowFilePython, "multiline");
// Wait for processor validation to complete
getClientUtil().waitForValidProcessor(createFlowFilePython.getId());
// Run the flow
runProcessorOnce(createFlowFilePython);
// Wait for output to be queued up
waitForQueueCount(outputConnection.getId(), 1);
// Validate the output
final String contents = getClientUtil().getFlowFileContentAsUtf8(outputConnection.getId(), 0);
assertEquals(messageContents, contents);
}
private void runProcessorOnce(final ProcessorEntity processorEntity) throws NiFiClientException, IOException, InterruptedException {
getNifiClient().getProcessorClient().runProcessorOnce(processorEntity);
getClientUtil().waitForStoppedProcessor(processorEntity.getId());
}
}