NIFI-12137: This closes #7808. Fixed name of Expression Language Scope from LIMITED to ENVIRONMENT, ; fixed issue in which flowfiles routed to failure by FlowFileTransform were also being routed to original; defined 'identifier' and 'logger' as member variables for FlowFileTransform and RecordTransform so that IDEs know about them

Signed-off-by: Joseph Witt <joewitt@apache.org>
This commit is contained in:
Mark Payne 2023-09-28 10:09:10 -04:00 committed by Joseph Witt
parent 1d9ccb3857
commit 711f2f1188
No known key found for this signature in database
GPG Key ID: 9093BF854F811A1A
7 changed files with 45 additions and 29 deletions

View File

@ -1236,6 +1236,11 @@ public final class StandardProcessGroup implements ProcessGroup {
conn.verifyCanDelete();
}
// Avoid performing any more validation on the processor, as it is no longer necessary and may
// cause issues with Python-based Processor, as validation may trigger, attempting to communicate
// with the Python process even after the Python process has been destroyed.
processor.pauseValidationTrigger();
try (final NarCloseable x = NarCloseable.withComponentNarLoader(extensionManager, processor.getProcessor().getClass(), processor.getIdentifier())) {
final StandardProcessContext processContext = new StandardProcessContext(processor, controllerServiceProvider,
getStateManager(processor.getIdentifier()), () -> false, nodeTypeProvider);

View File

@ -17,6 +17,7 @@
package org.apache.nifi.python.processor;
import java.util.Map;
import org.apache.nifi.annotation.behavior.DefaultRunDuration;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
@ -27,8 +28,6 @@ import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import py4j.Py4JNetworkException;
import java.util.Map;
@SupportsBatching(defaultDuration = DefaultRunDuration.TWENTY_FIVE_MILLIS)
public class FlowFileTransformProxy extends PythonProcessorProxy {
@ -76,6 +75,13 @@ public class FlowFileTransformProxy extends PythonProcessorProxy {
session.transfer(original, REL_FAILURE);
return;
}
final String relationshipName = result.getRelationship();
final Relationship relationship = new Relationship.Builder().name(relationshipName).build();
if (REL_FAILURE.getName().equals(relationshipName)) {
session.remove(transformed);
session.transfer(original, REL_FAILURE);
return;
}
final Map<String, String> attributes = result.getAttributes();
if (attributes != null) {
@ -87,8 +93,6 @@ public class FlowFileTransformProxy extends PythonProcessorProxy {
transformed = session.write(transformed, out -> out.write(contents));
}
final String relationshipName = result.getRelationship();
final Relationship relationship = new Relationship.Builder().name(relationshipName).build();
session.transfer(transformed, relationship);
session.transfer(original, REL_ORIGINAL);
}

View File

@ -17,15 +17,6 @@
package org.apache.nifi.python.processor;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.Relationship;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
@ -34,6 +25,14 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.Relationship;
public abstract class PythonProcessorProxy extends AbstractProcessor {
private final PythonProcessorBridge bridge;
@ -124,7 +123,7 @@ public abstract class PythonProcessorProxy extends AbstractProcessor {
// We cache this to avoid having to call into the Python side while the Processor is running. However, once
// it is stopped, its relationships may change due to properties, etc.
final Set<Relationship> relationships = fetchRelationshipsFromPythonProcessor();
this.cachedRelationships = Collections.unmodifiableSet(new HashSet<>(relationships));
this.cachedRelationships = Set.copyOf(relationships);
}
@OnScheduled

View File

@ -17,6 +17,17 @@
package org.apache.nifi.python.processor;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import org.apache.nifi.NullSuppression;
import org.apache.nifi.annotation.behavior.DefaultRunDuration;
import org.apache.nifi.annotation.behavior.SupportsBatching;
@ -44,18 +55,6 @@ import org.apache.nifi.serialization.WriteResult;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordSchema;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@SupportsBatching(defaultDuration = DefaultRunDuration.TWENTY_FIVE_MILLIS)
public class RecordTransformProxy extends PythonProcessorProxy {
private final PythonProcessorBridge bridge;

View File

@ -19,6 +19,10 @@ from nifiapi.properties import ProcessContext
class FlowFileTransform(ABC):
# These will be set by the PythonProcessorAdapter when the component is created
identifier = None
logger = None
def __init__(self):
self.arrayList = JvmHolder.jvm.java.util.ArrayList

View File

@ -22,7 +22,7 @@ EMPTY_ALLOWABLE_VALUE_ARRAY = JvmHolder.gateway.new_array(JvmHolder.jvm.org.apac
class ExpressionLanguageScope(Enum):
NONE = 1
LIMITED = 2
ENVIRONMENT = 2
FLOWFILE_ATTRIBUTES = 3
@ -133,8 +133,9 @@ class PropertyDescriptor:
:param expression_language_scope: documents the scope in which Expression Language is valid. This value must be specified as one of the enum values
in `nifiapi.properties.ExpressionLanguageScope`. A value of `NONE` indicates that Expression Language will not be evaluated for this property.
This is the default. A value of `FLOWFILE_ATTRIBUTES` indicates that FlowFile attributes may be referenced when configuring the property value.
A value of `LIMITED` indicates that Expression Language may be used but may not reference FlowFile attributes. For example, a value of `${now()}` might
be used to reference the current date and time, or `${hostname(true)}` might be used to specify the hostname.
A value of `ENVIRONMENT` indicates that Expression Language may be used and may reference environment variables but may not reference FlowFile attributes.
For example, a value of `${now()}` might be used to reference the current date and time, or `${hostname(true)}` might be used to specify the hostname.
Or a value of `${ENV_VAR}` could be used to reference an environment variable named `ENV_VAR`.
:param dynamic: whether or not this Property Descriptor represents a dynamic (aka user-defined) property. This is not necessary to specify, as the framework can determine this.
However, it is available if there is a desire to explicitly set it for completeness' sake.
:param validators: A list of property validators that can be used to ensure that the user-supplied value is valid. The standard validators can be referenced using the

View File

@ -19,6 +19,10 @@ from nifiapi.properties import ProcessContext
from nifiapi.__jvm__ import JvmHolder
class RecordTransform(ABC):
# These will be set by the PythonProcessorAdapter when the component is created
identifier = None
logger = None
def __init__(self):
self.arrayList = JvmHolder.jvm.java.util.ArrayList