From 711f2f11888b9ea9eace8816758ad1d0d2075aa5 Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Thu, 28 Sep 2023 10:09:10 -0400 Subject: [PATCH] NIFI-12137: This closes #7808. Fixed name of Expression Language Scope from LIMITED to ENVIRONMENT, ; fixed issue in which flowfiles routed to failure by FlowFileTransform were also being routed to original; defined 'identifier' and 'logger' as member variables for FlowFileTransform and RecordTransform so that IDEs know about them Signed-off-by: Joseph Witt --- .../nifi/groups/StandardProcessGroup.java | 5 ++++ .../processor/FlowFileTransformProxy.java | 12 ++++++---- .../processor/PythonProcessorProxy.java | 19 ++++++++------- .../processor/RecordTransformProxy.java | 23 +++++++++---------- .../python/src/nifiapi/flowfiletransform.py | 4 ++++ .../src/main/python/src/nifiapi/properties.py | 7 +++--- .../python/src/nifiapi/recordtransform.py | 4 ++++ 7 files changed, 45 insertions(+), 29 deletions(-) diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java index c56a779e9e..a5d0d388ed 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java @@ -1236,6 +1236,11 @@ public final class StandardProcessGroup implements ProcessGroup { conn.verifyCanDelete(); } + // Avoid performing any more validation on the processor, as it is no longer necessary and may + // cause issues with Python-based Processor, as validation may trigger, attempting to communicate + // with the Python process even after the Python process has been destroyed. + processor.pauseValidationTrigger(); + try (final NarCloseable x = NarCloseable.withComponentNarLoader(extensionManager, processor.getProcessor().getClass(), processor.getIdentifier())) { final StandardProcessContext processContext = new StandardProcessContext(processor, controllerServiceProvider, getStateManager(processor.getIdentifier()), () -> false, nodeTypeProvider); diff --git a/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/python/processor/FlowFileTransformProxy.java b/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/python/processor/FlowFileTransformProxy.java index 50bc4ae70d..8ebf00eb19 100644 --- a/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/python/processor/FlowFileTransformProxy.java +++ b/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/python/processor/FlowFileTransformProxy.java @@ -17,6 +17,7 @@ package org.apache.nifi.python.processor; +import java.util.Map; import org.apache.nifi.annotation.behavior.DefaultRunDuration; import org.apache.nifi.annotation.behavior.SupportsBatching; import org.apache.nifi.annotation.lifecycle.OnScheduled; @@ -27,8 +28,6 @@ import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; import py4j.Py4JNetworkException; -import java.util.Map; - @SupportsBatching(defaultDuration = DefaultRunDuration.TWENTY_FIVE_MILLIS) public class FlowFileTransformProxy extends PythonProcessorProxy { @@ -76,6 +75,13 @@ public class FlowFileTransformProxy extends PythonProcessorProxy { session.transfer(original, REL_FAILURE); return; } + final String relationshipName = result.getRelationship(); + final Relationship relationship = new Relationship.Builder().name(relationshipName).build(); + if (REL_FAILURE.getName().equals(relationshipName)) { + session.remove(transformed); + session.transfer(original, REL_FAILURE); + return; + } final Map attributes = result.getAttributes(); if (attributes != null) { @@ -87,8 +93,6 @@ public class FlowFileTransformProxy extends PythonProcessorProxy { transformed = session.write(transformed, out -> out.write(contents)); } - final String relationshipName = result.getRelationship(); - final Relationship relationship = new Relationship.Builder().name(relationshipName).build(); session.transfer(transformed, relationship); session.transfer(original, REL_ORIGINAL); } diff --git a/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/python/processor/PythonProcessorProxy.java b/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/python/processor/PythonProcessorProxy.java index a2de5bebdd..987c75e074 100644 --- a/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/python/processor/PythonProcessorProxy.java +++ b/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/python/processor/PythonProcessorProxy.java @@ -17,15 +17,6 @@ package org.apache.nifi.python.processor; -import org.apache.nifi.annotation.lifecycle.OnScheduled; -import org.apache.nifi.annotation.lifecycle.OnStopped; -import org.apache.nifi.components.PropertyDescriptor; -import org.apache.nifi.components.ValidationContext; -import org.apache.nifi.components.ValidationResult; -import org.apache.nifi.processor.AbstractProcessor; -import org.apache.nifi.processor.ProcessContext; -import org.apache.nifi.processor.Relationship; - import java.util.Arrays; import java.util.Collection; import java.util.Collections; @@ -34,6 +25,14 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.Relationship; public abstract class PythonProcessorProxy extends AbstractProcessor { private final PythonProcessorBridge bridge; @@ -124,7 +123,7 @@ public abstract class PythonProcessorProxy extends AbstractProcessor { // We cache this to avoid having to call into the Python side while the Processor is running. However, once // it is stopped, its relationships may change due to properties, etc. final Set relationships = fetchRelationshipsFromPythonProcessor(); - this.cachedRelationships = Collections.unmodifiableSet(new HashSet<>(relationships)); + this.cachedRelationships = Set.copyOf(relationships); } @OnScheduled diff --git a/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/python/processor/RecordTransformProxy.java b/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/python/processor/RecordTransformProxy.java index 3914f13e46..a9e400a004 100644 --- a/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/python/processor/RecordTransformProxy.java +++ b/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/python/processor/RecordTransformProxy.java @@ -17,6 +17,17 @@ package org.apache.nifi.python.processor; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; import org.apache.nifi.NullSuppression; import org.apache.nifi.annotation.behavior.DefaultRunDuration; import org.apache.nifi.annotation.behavior.SupportsBatching; @@ -44,18 +55,6 @@ import org.apache.nifi.serialization.WriteResult; import org.apache.nifi.serialization.record.Record; import org.apache.nifi.serialization.record.RecordSchema; -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Objects; - @SupportsBatching(defaultDuration = DefaultRunDuration.TWENTY_FIVE_MILLIS) public class RecordTransformProxy extends PythonProcessorProxy { private final PythonProcessorBridge bridge; diff --git a/nifi-nar-bundles/nifi-py4j-bundle/nifi-python-extension-api/src/main/python/src/nifiapi/flowfiletransform.py b/nifi-nar-bundles/nifi-py4j-bundle/nifi-python-extension-api/src/main/python/src/nifiapi/flowfiletransform.py index 78fecf79d9..4b2f47d17e 100644 --- a/nifi-nar-bundles/nifi-py4j-bundle/nifi-python-extension-api/src/main/python/src/nifiapi/flowfiletransform.py +++ b/nifi-nar-bundles/nifi-py4j-bundle/nifi-python-extension-api/src/main/python/src/nifiapi/flowfiletransform.py @@ -19,6 +19,10 @@ from nifiapi.properties import ProcessContext class FlowFileTransform(ABC): + # These will be set by the PythonProcessorAdapter when the component is created + identifier = None + logger = None + def __init__(self): self.arrayList = JvmHolder.jvm.java.util.ArrayList diff --git a/nifi-nar-bundles/nifi-py4j-bundle/nifi-python-extension-api/src/main/python/src/nifiapi/properties.py b/nifi-nar-bundles/nifi-py4j-bundle/nifi-python-extension-api/src/main/python/src/nifiapi/properties.py index d5f3e153d9..885447914c 100644 --- a/nifi-nar-bundles/nifi-py4j-bundle/nifi-python-extension-api/src/main/python/src/nifiapi/properties.py +++ b/nifi-nar-bundles/nifi-py4j-bundle/nifi-python-extension-api/src/main/python/src/nifiapi/properties.py @@ -22,7 +22,7 @@ EMPTY_ALLOWABLE_VALUE_ARRAY = JvmHolder.gateway.new_array(JvmHolder.jvm.org.apac class ExpressionLanguageScope(Enum): NONE = 1 - LIMITED = 2 + ENVIRONMENT = 2 FLOWFILE_ATTRIBUTES = 3 @@ -133,8 +133,9 @@ class PropertyDescriptor: :param expression_language_scope: documents the scope in which Expression Language is valid. This value must be specified as one of the enum values in `nifiapi.properties.ExpressionLanguageScope`. A value of `NONE` indicates that Expression Language will not be evaluated for this property. This is the default. A value of `FLOWFILE_ATTRIBUTES` indicates that FlowFile attributes may be referenced when configuring the property value. - A value of `LIMITED` indicates that Expression Language may be used but may not reference FlowFile attributes. For example, a value of `${now()}` might - be used to reference the current date and time, or `${hostname(true)}` might be used to specify the hostname. + A value of `ENVIRONMENT` indicates that Expression Language may be used and may reference environment variables but may not reference FlowFile attributes. + For example, a value of `${now()}` might be used to reference the current date and time, or `${hostname(true)}` might be used to specify the hostname. + Or a value of `${ENV_VAR}` could be used to reference an environment variable named `ENV_VAR`. :param dynamic: whether or not this Property Descriptor represents a dynamic (aka user-defined) property. This is not necessary to specify, as the framework can determine this. However, it is available if there is a desire to explicitly set it for completeness' sake. :param validators: A list of property validators that can be used to ensure that the user-supplied value is valid. The standard validators can be referenced using the diff --git a/nifi-nar-bundles/nifi-py4j-bundle/nifi-python-extension-api/src/main/python/src/nifiapi/recordtransform.py b/nifi-nar-bundles/nifi-py4j-bundle/nifi-python-extension-api/src/main/python/src/nifiapi/recordtransform.py index 28036a1dbc..7685378f78 100644 --- a/nifi-nar-bundles/nifi-py4j-bundle/nifi-python-extension-api/src/main/python/src/nifiapi/recordtransform.py +++ b/nifi-nar-bundles/nifi-py4j-bundle/nifi-python-extension-api/src/main/python/src/nifiapi/recordtransform.py @@ -19,6 +19,10 @@ from nifiapi.properties import ProcessContext from nifiapi.__jvm__ import JvmHolder class RecordTransform(ABC): + # These will be set by the PythonProcessorAdapter when the component is created + identifier = None + logger = None + def __init__(self): self.arrayList = JvmHolder.jvm.java.util.ArrayList