mirror of https://github.com/apache/nifi.git
NIFI-12446 Refactor FilterAttribute to align with code conventions
This closes #8161 Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
parent
16d170fdfd
commit
4f399c9bb9
|
@ -24,7 +24,7 @@ import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||||
import org.apache.nifi.annotation.documentation.Tags;
|
import org.apache.nifi.annotation.documentation.Tags;
|
||||||
import org.apache.nifi.annotation.documentation.UseCase;
|
import org.apache.nifi.annotation.documentation.UseCase;
|
||||||
import org.apache.nifi.annotation.lifecycle.OnScheduled;
|
import org.apache.nifi.annotation.lifecycle.OnScheduled;
|
||||||
import org.apache.nifi.components.AllowableValue;
|
import org.apache.nifi.components.DescribedValue;
|
||||||
import org.apache.nifi.components.PropertyDescriptor;
|
import org.apache.nifi.components.PropertyDescriptor;
|
||||||
import org.apache.nifi.expression.ExpressionLanguageScope;
|
import org.apache.nifi.expression.ExpressionLanguageScope;
|
||||||
import org.apache.nifi.flowfile.FlowFile;
|
import org.apache.nifi.flowfile.FlowFile;
|
||||||
|
@ -36,7 +36,6 @@ import org.apache.nifi.processor.exception.ProcessException;
|
||||||
import org.apache.nifi.processor.util.StandardValidators;
|
import org.apache.nifi.processor.util.StandardValidators;
|
||||||
|
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Collections;
|
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
@ -52,87 +51,64 @@ import java.util.stream.Collectors;
|
||||||
@UseCase(
|
@UseCase(
|
||||||
description = "Retain all FlowFile attributes matching a regular expression",
|
description = "Retain all FlowFile attributes matching a regular expression",
|
||||||
configuration = """
|
configuration = """
|
||||||
Set "Filter mode" to "Retain".
|
Set "Filter Mode" to "Retain".
|
||||||
Set "Attribute matching strategy" to "Use regular expression".
|
Set "Attribute Matching Strategy" to "Use regular expression".
|
||||||
Specify the "Regular expression to filter attributes", e.g. "my-property|a-prefix[.].*".
|
Specify the "Filtered Attributes Pattern", e.g. "my-property|a-prefix[.].*".
|
||||||
"""
|
"""
|
||||||
)
|
)
|
||||||
@UseCase(
|
@UseCase(
|
||||||
description = "Remove only a specified set of FlowFile attributes",
|
description = "Remove only a specified set of FlowFile attributes",
|
||||||
configuration = """
|
configuration = """
|
||||||
Set "Filter mode" to "Remove".
|
Set "Filter Mode" to "Remove".
|
||||||
Set "Attribute matching strategy" to "Enumerate attributes".
|
Set "Attribute Matching Strategy" to "Enumerate attributes".
|
||||||
Specify the set of "Set of attributes to filter" using the delimiter comma ',', e.g. "my-property,other,filename".
|
Specify the set of "Filtered Attributes" using the delimiter comma ',', e.g. "my-property,other,filename".
|
||||||
"""
|
"""
|
||||||
)
|
)
|
||||||
public class FilterAttribute extends AbstractProcessor {
|
public class FilterAttribute extends AbstractProcessor {
|
||||||
|
|
||||||
public static final Relationship REL_SUCCESS = new Relationship.Builder()
|
public static final Relationship REL_SUCCESS = new Relationship.Builder()
|
||||||
.description("All successful FlowFiles are routed to this relationship").name("success").build();
|
.name("success")
|
||||||
|
.description("All successful FlowFiles are routed to this relationship")
|
||||||
|
.build();
|
||||||
|
|
||||||
private final static Set<Relationship> relationships = Collections.singleton(REL_SUCCESS);
|
private final static Set<Relationship> relationships = Set.of(REL_SUCCESS);
|
||||||
|
|
||||||
|
|
||||||
public static final AllowableValue FILTER_MODE_VALUE_RETAIN = new AllowableValue(
|
|
||||||
"RETAIN",
|
|
||||||
"Retain",
|
|
||||||
"Retains only the attributes matching the filter, all other attributes are removed."
|
|
||||||
);
|
|
||||||
|
|
||||||
public static final AllowableValue FILTER_MODE_VALUE_REMOVE = new AllowableValue(
|
|
||||||
"REMOVE",
|
|
||||||
"Remove",
|
|
||||||
"Removes the attributes matching the filter, all other attributes are retained."
|
|
||||||
);
|
|
||||||
|
|
||||||
public static final PropertyDescriptor FILTER_MODE = new PropertyDescriptor.Builder()
|
public static final PropertyDescriptor FILTER_MODE = new PropertyDescriptor.Builder()
|
||||||
.name("FILTER_MODE")
|
.name("Filter Mode")
|
||||||
.displayName("Filter mode")
|
.displayName("Filter Mode")
|
||||||
.description("Specifies the strategy to apply on filtered attributes. Either 'Remove' or 'Retain' only the matching attributes.")
|
.description("Specifies the strategy to apply on filtered attributes. Either 'Remove' or 'Retain' only the matching attributes.")
|
||||||
.required(true)
|
.required(true)
|
||||||
.allowableValues(FILTER_MODE_VALUE_RETAIN, FILTER_MODE_VALUE_REMOVE)
|
.allowableValues(FilterMode.class)
|
||||||
|
.defaultValue(FilterMode.RETAIN)
|
||||||
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
|
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
|
||||||
.defaultValue(FILTER_MODE_VALUE_RETAIN.getValue())
|
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
public static final AllowableValue MATCHING_STRATEGY_VALUE_ENUMERATION = new AllowableValue(
|
|
||||||
"ENUMERATION",
|
|
||||||
"Enumerate attributes",
|
|
||||||
"Provides a set of attribute keys to filter for, separated by a comma delimiter ','."
|
|
||||||
);
|
|
||||||
|
|
||||||
public static final AllowableValue MATCHING_STRATEGY_VALUE_REGEX = new AllowableValue(
|
|
||||||
"REGEX",
|
|
||||||
"Use regular expression",
|
|
||||||
"Provides a regular expression to match keys of attributes to filter for."
|
|
||||||
);
|
|
||||||
|
|
||||||
public static final PropertyDescriptor MATCHING_STRATEGY = new PropertyDescriptor.Builder()
|
public static final PropertyDescriptor MATCHING_STRATEGY = new PropertyDescriptor.Builder()
|
||||||
.name("MATCHING_STRATEGY")
|
.name("Attribute Matching Strategy")
|
||||||
.displayName("Attribute matching strategy")
|
.displayName("Attribute Matching Strategy")
|
||||||
.description("Specifies the strategy to filter attributes by.")
|
.description("Specifies the strategy to filter attributes by.")
|
||||||
.required(true)
|
.required(true)
|
||||||
.allowableValues(MATCHING_STRATEGY_VALUE_ENUMERATION, MATCHING_STRATEGY_VALUE_REGEX)
|
.allowableValues(MatchingStrategy.class)
|
||||||
|
.defaultValue(MatchingStrategy.ENUMERATION)
|
||||||
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
|
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
|
||||||
.defaultValue(MATCHING_STRATEGY_VALUE_ENUMERATION.getValue())
|
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
public static final PropertyDescriptor ATTRIBUTE_SET = new PropertyDescriptor.Builder()
|
public static final PropertyDescriptor ATTRIBUTE_ENUMERATION = new PropertyDescriptor.Builder()
|
||||||
.name("ATTRIBUTE_SET")
|
.name("Filtered Attributes")
|
||||||
.displayName("Set of attributes to filter")
|
.displayName("Filtered Attributes")
|
||||||
.description("A set of attribute names to filter from FlowFiles. Each attribute name is separated by the comma delimiter ','.")
|
.description("A set of attribute names to filter from FlowFiles. Each attribute name is separated by the comma delimiter ','.")
|
||||||
.required(true)
|
.required(true)
|
||||||
.dependsOn(MATCHING_STRATEGY, MATCHING_STRATEGY_VALUE_ENUMERATION)
|
.dependsOn(MATCHING_STRATEGY, MatchingStrategy.ENUMERATION)
|
||||||
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
|
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
|
||||||
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
|
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
public static final PropertyDescriptor ATTRIBUTE_REGEX = new PropertyDescriptor.Builder()
|
public static final PropertyDescriptor ATTRIBUTE_PATTERN = new PropertyDescriptor.Builder()
|
||||||
.name("ATTRIBUTE_REGEX")
|
.name("Filtered Attributes Pattern")
|
||||||
.displayName("Regular expression to filter attributes")
|
.displayName("Filtered Attributes Pattern")
|
||||||
.description("A regular expression to match names of attributes to filter from FlowFiles.")
|
.description("A regular expression to match names of attributes to filter from FlowFiles.")
|
||||||
.required(true)
|
.required(true)
|
||||||
.dependsOn(MATCHING_STRATEGY, MATCHING_STRATEGY_VALUE_REGEX)
|
.dependsOn(MATCHING_STRATEGY, MatchingStrategy.PATTERN)
|
||||||
.addValidator(StandardValidators.REGULAR_EXPRESSION_WITH_EL_VALIDATOR)
|
.addValidator(StandardValidators.REGULAR_EXPRESSION_WITH_EL_VALIDATOR)
|
||||||
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
|
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
|
||||||
.build();
|
.build();
|
||||||
|
@ -140,7 +116,9 @@ public class FilterAttribute extends AbstractProcessor {
|
||||||
private final static String DELIMITER_VALUE = ",";
|
private final static String DELIMITER_VALUE = ",";
|
||||||
|
|
||||||
private final static List<PropertyDescriptor> properties =
|
private final static List<PropertyDescriptor> properties =
|
||||||
List.of(FILTER_MODE, MATCHING_STRATEGY, ATTRIBUTE_SET, ATTRIBUTE_REGEX);
|
List.of(FILTER_MODE, MATCHING_STRATEGY, ATTRIBUTE_ENUMERATION, ATTRIBUTE_PATTERN);
|
||||||
|
|
||||||
|
private volatile Predicate<String> cachedMatchingPredicate;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Set<Relationship> getRelationships() {
|
public Set<Relationship> getRelationships() {
|
||||||
|
@ -152,7 +130,6 @@ public class FilterAttribute extends AbstractProcessor {
|
||||||
return properties;
|
return properties;
|
||||||
}
|
}
|
||||||
|
|
||||||
private volatile Predicate<String> cachedMatchingPredicate;
|
|
||||||
|
|
||||||
@OnScheduled
|
@OnScheduled
|
||||||
public void onScheduled(final ProcessContext context) {
|
public void onScheduled(final ProcessContext context) {
|
||||||
|
@ -161,11 +138,11 @@ public class FilterAttribute extends AbstractProcessor {
|
||||||
cachedMatchingPredicate = null;
|
cachedMatchingPredicate = null;
|
||||||
|
|
||||||
if (matchingStrategy == MatchingStrategy.ENUMERATION
|
if (matchingStrategy == MatchingStrategy.ENUMERATION
|
||||||
&& !context.getProperty(ATTRIBUTE_SET).isExpressionLanguagePresent()) {
|
&& !context.getProperty(ATTRIBUTE_ENUMERATION).isExpressionLanguagePresent()) {
|
||||||
cachedMatchingPredicate = determineMatchingPredicateBasedOnEnumeration(context, null);
|
cachedMatchingPredicate = determineMatchingPredicateBasedOnEnumeration(context, null);
|
||||||
}
|
}
|
||||||
if (matchingStrategy == MatchingStrategy.REGEX
|
if (matchingStrategy == MatchingStrategy.PATTERN
|
||||||
&& !context.getProperty(ATTRIBUTE_REGEX).isExpressionLanguagePresent()) {
|
&& !context.getProperty(ATTRIBUTE_PATTERN).isExpressionLanguagePresent()) {
|
||||||
cachedMatchingPredicate = determineMatchingPredicateBasedOnRegex(context, null);
|
cachedMatchingPredicate = determineMatchingPredicateBasedOnRegex(context, null);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -200,7 +177,7 @@ public class FilterAttribute extends AbstractProcessor {
|
||||||
final MatchingStrategy matchingStrategy = getMatchingStrategy(context);
|
final MatchingStrategy matchingStrategy = getMatchingStrategy(context);
|
||||||
return switch (matchingStrategy) {
|
return switch (matchingStrategy) {
|
||||||
case ENUMERATION -> determineMatchingPredicateBasedOnEnumeration(context, flowFile);
|
case ENUMERATION -> determineMatchingPredicateBasedOnEnumeration(context, flowFile);
|
||||||
case REGEX -> determineMatchingPredicateBasedOnRegex(context, flowFile);
|
case PATTERN -> determineMatchingPredicateBasedOnRegex(context, flowFile);
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -225,29 +202,19 @@ public class FilterAttribute extends AbstractProcessor {
|
||||||
/* properties */
|
/* properties */
|
||||||
|
|
||||||
private static FilterMode getFilterMode(ProcessContext context) {
|
private static FilterMode getFilterMode(ProcessContext context) {
|
||||||
final String rawFilterMode = context
|
return context
|
||||||
.getProperty(FILTER_MODE)
|
.getProperty(FILTER_MODE)
|
||||||
.getValue();
|
.asDescribedValue(FilterMode.class);
|
||||||
|
|
||||||
if (FILTER_MODE_VALUE_REMOVE.getValue().equals(rawFilterMode)) {
|
|
||||||
return FilterMode.REMOVE;
|
|
||||||
}
|
|
||||||
return FilterMode.RETAIN;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private static MatchingStrategy getMatchingStrategy(ProcessContext context) {
|
private static MatchingStrategy getMatchingStrategy(ProcessContext context) {
|
||||||
final String rawMatchingStrategy = context
|
return context
|
||||||
.getProperty(MATCHING_STRATEGY)
|
.getProperty(MATCHING_STRATEGY)
|
||||||
.getValue();
|
.asDescribedValue(MatchingStrategy.class);
|
||||||
|
|
||||||
if (MATCHING_STRATEGY_VALUE_REGEX.getValue().equals(rawMatchingStrategy)) {
|
|
||||||
return MatchingStrategy.REGEX;
|
|
||||||
}
|
|
||||||
return MatchingStrategy.ENUMERATION;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private static String getAttributeSet(ProcessContext context, FlowFile flowFile) {
|
private static String getAttributeSet(ProcessContext context, FlowFile flowFile) {
|
||||||
return context.getProperty(ATTRIBUTE_SET).evaluateAttributeExpressions(flowFile).getValue();
|
return context.getProperty(ATTRIBUTE_ENUMERATION).evaluateAttributeExpressions(flowFile).getValue();
|
||||||
}
|
}
|
||||||
|
|
||||||
private static String getDelimiter() {
|
private static String getDelimiter() {
|
||||||
|
@ -256,17 +223,75 @@ public class FilterAttribute extends AbstractProcessor {
|
||||||
|
|
||||||
private static Pattern getAttributeRegex(ProcessContext context, FlowFile flowFile) {
|
private static Pattern getAttributeRegex(ProcessContext context, FlowFile flowFile) {
|
||||||
return Pattern.compile(
|
return Pattern.compile(
|
||||||
context.getProperty(ATTRIBUTE_REGEX).evaluateAttributeExpressions(flowFile).getValue()
|
context.getProperty(ATTRIBUTE_PATTERN).evaluateAttributeExpressions(flowFile).getValue()
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
private enum FilterMode {
|
enum FilterMode implements DescribedValue {
|
||||||
RETAIN,
|
RETAIN(
|
||||||
REMOVE,
|
"Retain",
|
||||||
|
"Retains only the attributes matching the filter, all other attributes are removed."
|
||||||
|
),
|
||||||
|
REMOVE(
|
||||||
|
"Remove",
|
||||||
|
"Removes the attributes matching the filter, all other attributes are retained."
|
||||||
|
);
|
||||||
|
|
||||||
|
private final String value;
|
||||||
|
private final String description;
|
||||||
|
|
||||||
|
FilterMode(final String value, final String description) {
|
||||||
|
this.value = value;
|
||||||
|
this.description = description;
|
||||||
}
|
}
|
||||||
|
|
||||||
private enum MatchingStrategy {
|
@Override
|
||||||
ENUMERATION,
|
public String getValue() {
|
||||||
REGEX,
|
return this.value;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getDisplayName() {
|
||||||
|
return this.value;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getDescription() {
|
||||||
|
return this.description;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
enum MatchingStrategy implements DescribedValue {
|
||||||
|
ENUMERATION(
|
||||||
|
"Enumerate attributes",
|
||||||
|
"Provides a set of attribute keys to filter for, separated by a comma delimiter ','."
|
||||||
|
),
|
||||||
|
PATTERN(
|
||||||
|
"Use regular expression",
|
||||||
|
"Provides a regular expression to match keys of attributes to filter for."
|
||||||
|
);
|
||||||
|
|
||||||
|
private final String value;
|
||||||
|
private final String description;
|
||||||
|
|
||||||
|
MatchingStrategy(final String value, final String description) {
|
||||||
|
this.value = value;
|
||||||
|
this.description = description;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getValue() {
|
||||||
|
return this.value;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getDisplayName() {
|
||||||
|
return this.value;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getDescription() {
|
||||||
|
return this.description;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -89,7 +89,7 @@ class TestFilterAttribute {
|
||||||
|
|
||||||
@BeforeEach
|
@BeforeEach
|
||||||
void setUp() {
|
void setUp() {
|
||||||
runner.setProperty(FilterAttribute.FILTER_MODE, FilterAttribute.FILTER_MODE_VALUE_REMOVE);
|
runner.setProperty(FilterAttribute.FILTER_MODE, FilterAttribute.FilterMode.REMOVE.getValue());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -198,8 +198,8 @@ class TestFilterAttribute {
|
||||||
}
|
}
|
||||||
|
|
||||||
private void runTestWith(Map<String, String> attributes, String attributeSet, Set<String> expectedAttributes) {
|
private void runTestWith(Map<String, String> attributes, String attributeSet, Set<String> expectedAttributes) {
|
||||||
runner.setProperty(FilterAttribute.MATCHING_STRATEGY, FilterAttribute.MATCHING_STRATEGY_VALUE_ENUMERATION);
|
runner.setProperty(FilterAttribute.MATCHING_STRATEGY, FilterAttribute.MatchingStrategy.ENUMERATION.getValue());
|
||||||
runner.setProperty(FilterAttribute.ATTRIBUTE_SET, attributeSet);
|
runner.setProperty(FilterAttribute.ATTRIBUTE_ENUMERATION, attributeSet);
|
||||||
|
|
||||||
final MockFlowFile input = runner.enqueue(exampleContent, attributes);
|
final MockFlowFile input = runner.enqueue(exampleContent, attributes);
|
||||||
final Map<String, String> inputAttributes = input.getAttributes();
|
final Map<String, String> inputAttributes = input.getAttributes();
|
||||||
|
@ -267,7 +267,7 @@ class TestFilterAttribute {
|
||||||
|
|
||||||
@BeforeEach
|
@BeforeEach
|
||||||
void setUp() {
|
void setUp() {
|
||||||
runner.setProperty(FilterAttribute.FILTER_MODE, FilterAttribute.FILTER_MODE_VALUE_REMOVE);
|
runner.setProperty(FilterAttribute.FILTER_MODE, FilterAttribute.FilterMode.REMOVE.getValue());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -320,8 +320,8 @@ class TestFilterAttribute {
|
||||||
}
|
}
|
||||||
|
|
||||||
private void runTestWith(Map<String, String> attributes, String regexPattern, Set<String> expectedAttributes) {
|
private void runTestWith(Map<String, String> attributes, String regexPattern, Set<String> expectedAttributes) {
|
||||||
runner.setProperty(FilterAttribute.MATCHING_STRATEGY, FilterAttribute.MATCHING_STRATEGY_VALUE_REGEX);
|
runner.setProperty(FilterAttribute.MATCHING_STRATEGY, FilterAttribute.MatchingStrategy.PATTERN.getValue());
|
||||||
runner.setProperty(FilterAttribute.ATTRIBUTE_REGEX, regexPattern);
|
runner.setProperty(FilterAttribute.ATTRIBUTE_PATTERN, regexPattern);
|
||||||
|
|
||||||
final MockFlowFile input = runner.enqueue(exampleContent, attributes);
|
final MockFlowFile input = runner.enqueue(exampleContent, attributes);
|
||||||
final Map<String, String> inputAttributes = input.getAttributes();
|
final Map<String, String> inputAttributes = input.getAttributes();
|
||||||
|
@ -356,7 +356,7 @@ class TestFilterAttribute {
|
||||||
"bar", "" + i
|
"bar", "" + i
|
||||||
));
|
));
|
||||||
}
|
}
|
||||||
runner.setProperty(FilterAttribute.ATTRIBUTE_SET, "foo");
|
runner.setProperty(FilterAttribute.ATTRIBUTE_ENUMERATION, "foo");
|
||||||
|
|
||||||
runner.run(flowFileCount);
|
runner.run(flowFileCount);
|
||||||
runner.assertAllFlowFilesTransferred(FilterAttribute.REL_SUCCESS, flowFileCount);
|
runner.assertAllFlowFilesTransferred(FilterAttribute.REL_SUCCESS, flowFileCount);
|
||||||
|
|
Loading…
Reference in New Issue