NIFI-12386 Adds processor FilterAttribute

attribute uuid is not removed by removeAttribute(s) in MockProcessSession

Signed-off-by: Mike Moser <mosermw@apache.org>

This closes #8049
This commit is contained in:
EndzeitBegins 2023-11-18 00:28:07 +01:00 committed by Mike Moser
parent 0430974783
commit edca4cd347
5 changed files with 675 additions and 0 deletions

View File

@ -189,6 +189,11 @@ public class MockFlowFile implements FlowFileRecord {
public void removeAttributes(final Set<String> attrNames) {
for (final String attrName : attrNames) {
if (CoreAttributes.UUID.key().equals(attrName)) {
// the core attribute "uuid" of a FlowFile cannot be altered / removed
continue;
}
attributes.remove(attrName);
}
}

View File

@ -17,6 +17,7 @@
package org.apache.nifi.util;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
@ -34,6 +35,7 @@ import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.regex.Pattern;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
@ -182,6 +184,28 @@ public class TestMockProcessSession {
output.get(0).assertAttributeEquals("key1", "val1");
}
@Test
void testAttributeUUIDNotRemovable() {
final Processor processor = new PoorlyBehavedProcessor();
final MockProcessSession session = new MockProcessSession(new SharedSessionState(processor, new AtomicLong(0L)), processor, new MockStateManager(processor));
FlowFile ff1 = session.createFlowFile("removeAttribute(attrName)".getBytes());
FlowFile ff2 = session.createFlowFile("removeAllAttributes(attrNames)".getBytes());
FlowFile ff3 = session.createFlowFile("removeAllAttributes(keyPattern)".getBytes());
String attrName = CoreAttributes.UUID.key();
session.removeAttribute(ff1, attrName);
session.removeAllAttributes(ff2, Set.of(attrName));
session.removeAllAttributes(ff3, Pattern.compile(Pattern.quote(attrName)));
session.transfer(List.of(ff1, ff2, ff3), PoorlyBehavedProcessor.REL_FAILURE);
session.commitAsync();
List<MockFlowFile> output = session.getFlowFilesForRelationship(PoorlyBehavedProcessor.REL_FAILURE);
assertEquals(3, output.size());
output.get(0).assertAttributeEquals(attrName, ff1.getAttribute(attrName));
output.get(1).assertAttributeEquals(attrName, ff2.getAttribute(attrName));
output.get(2).assertAttributeEquals(attrName, ff3.getAttribute(attrName));
}
protected static class PoorlyBehavedProcessor extends AbstractProcessor {
private static final Relationship REL_FAILURE = new Relationship.Builder()

View File

@ -0,0 +1,272 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard;
import org.apache.nifi.annotation.behavior.DefaultRunDuration;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.documentation.UseCase;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.function.Predicate;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
@SideEffectFree
@SupportsBatching(defaultDuration = DefaultRunDuration.TWENTY_FIVE_MILLIS)
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@Tags({"attributes", "modification", "filter", "retain", "remove", "delete", "regex", "regular expression", "Attribute Expression Language"})
@CapabilityDescription("Filters the attributes of a FlowFile by retaining specified attributes and removing the rest or by removing specified attributes and retaining the rest.")
@UseCase(
description = "Retain all FlowFile attributes matching a regular expression",
configuration = """
Set "Filter mode" to "Retain".
Set "Attribute matching strategy" to "Use regular expression".
Specify the "Regular expression to filter attributes", e.g. "my-property|a-prefix[.].*".
"""
)
@UseCase(
description = "Remove only a specified set of FlowFile attributes",
configuration = """
Set "Filter mode" to "Remove".
Set "Attribute matching strategy" to "Enumerate attributes".
Specify the set of "Set of attributes to filter" using the delimiter comma ',', e.g. "my-property,other,filename".
"""
)
public class FilterAttribute extends AbstractProcessor {
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.description("All successful FlowFiles are routed to this relationship").name("success").build();
private final static Set<Relationship> relationships = Collections.singleton(REL_SUCCESS);
public static final AllowableValue FILTER_MODE_VALUE_RETAIN = new AllowableValue(
"RETAIN",
"Retain",
"Retains only the attributes matching the filter, all other attributes are removed."
);
public static final AllowableValue FILTER_MODE_VALUE_REMOVE = new AllowableValue(
"REMOVE",
"Remove",
"Removes the attributes matching the filter, all other attributes are retained."
);
public static final PropertyDescriptor FILTER_MODE = new PropertyDescriptor.Builder()
.name("FILTER_MODE")
.displayName("Filter mode")
.description("Specifies the strategy to apply on filtered attributes. Either 'Remove' or 'Retain' only the matching attributes.")
.required(true)
.allowableValues(FILTER_MODE_VALUE_RETAIN, FILTER_MODE_VALUE_REMOVE)
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.defaultValue(FILTER_MODE_VALUE_RETAIN.getValue())
.build();
public static final AllowableValue MATCHING_STRATEGY_VALUE_ENUMERATION = new AllowableValue(
"ENUMERATION",
"Enumerate attributes",
"Provides a set of attribute keys to filter for, separated by a comma delimiter ','."
);
public static final AllowableValue MATCHING_STRATEGY_VALUE_REGEX = new AllowableValue(
"REGEX",
"Use regular expression",
"Provides a regular expression to match keys of attributes to filter for."
);
public static final PropertyDescriptor MATCHING_STRATEGY = new PropertyDescriptor.Builder()
.name("MATCHING_STRATEGY")
.displayName("Attribute matching strategy")
.description("Specifies the strategy to filter attributes by.")
.required(true)
.allowableValues(MATCHING_STRATEGY_VALUE_ENUMERATION, MATCHING_STRATEGY_VALUE_REGEX)
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.defaultValue(MATCHING_STRATEGY_VALUE_ENUMERATION.getValue())
.build();
public static final PropertyDescriptor ATTRIBUTE_SET = new PropertyDescriptor.Builder()
.name("ATTRIBUTE_SET")
.displayName("Set of attributes to filter")
.description("A set of attribute names to filter from FlowFiles. Each attribute name is separated by the comma delimiter ','.")
.required(true)
.dependsOn(MATCHING_STRATEGY, MATCHING_STRATEGY_VALUE_ENUMERATION)
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
public static final PropertyDescriptor ATTRIBUTE_REGEX = new PropertyDescriptor.Builder()
.name("ATTRIBUTE_REGEX")
.displayName("Regular expression to filter attributes")
.description("A regular expression to match names of attributes to filter from FlowFiles.")
.required(true)
.dependsOn(MATCHING_STRATEGY, MATCHING_STRATEGY_VALUE_REGEX)
.addValidator(StandardValidators.REGULAR_EXPRESSION_WITH_EL_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
private final static String DELIMITER_VALUE = ",";
private final static List<PropertyDescriptor> properties =
List.of(FILTER_MODE, MATCHING_STRATEGY, ATTRIBUTE_SET, ATTRIBUTE_REGEX);
@Override
public Set<Relationship> getRelationships() {
return relationships;
}
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return properties;
}
private volatile Predicate<String> cachedMatchingPredicate;
@OnScheduled
public void onScheduled(final ProcessContext context) {
final MatchingStrategy matchingStrategy = getMatchingStrategy(context);
cachedMatchingPredicate = null;
if (matchingStrategy == MatchingStrategy.ENUMERATION
&& !context.getProperty(ATTRIBUTE_SET).isExpressionLanguagePresent()) {
cachedMatchingPredicate = determineMatchingPredicateBasedOnEnumeration(context, null);
}
if (matchingStrategy == MatchingStrategy.REGEX
&& !context.getProperty(ATTRIBUTE_REGEX).isExpressionLanguagePresent()) {
cachedMatchingPredicate = determineMatchingPredicateBasedOnRegex(context, null);
}
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
final FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}
final Predicate<String> matchingPredicate = determineMatchingPredicate(context, flowFile);
final FilterMode filterMode = getFilterMode(context);
final Predicate<String> isMatched = switch (filterMode) {
case RETAIN -> matchingPredicate;
case REMOVE -> matchingPredicate.negate();
};
final Set<String> attributesToRemove = new HashSet<>(flowFile.getAttributes().keySet());
attributesToRemove.removeIf(isMatched);
final FlowFile updatedFlowFile = session.removeAllAttributes(flowFile, attributesToRemove);
session.transfer(updatedFlowFile, REL_SUCCESS);
}
private Predicate<String> determineMatchingPredicate(ProcessContext context, FlowFile flowFile) {
if (cachedMatchingPredicate != null) {
return cachedMatchingPredicate;
}
final MatchingStrategy matchingStrategy = getMatchingStrategy(context);
return switch (matchingStrategy) {
case ENUMERATION -> determineMatchingPredicateBasedOnEnumeration(context, flowFile);
case REGEX -> determineMatchingPredicateBasedOnRegex(context, flowFile);
};
}
private static Predicate<String> determineMatchingPredicateBasedOnEnumeration(ProcessContext context, FlowFile flowFile) {
final String attributeSetDeclaration = getAttributeSet(context, flowFile);
final String delimiter = getDelimiter();
Set<String> attributeSet = Arrays.stream(attributeSetDeclaration.split(Pattern.quote(delimiter)))
.map(String::trim)
.filter(attributeName -> !attributeName.isBlank())
.collect(Collectors.toUnmodifiableSet());
return attributeSet::contains;
}
private static Predicate<String> determineMatchingPredicateBasedOnRegex(ProcessContext context, FlowFile flowFile) {
Pattern attributeRegex = getAttributeRegex(context, flowFile);
return attributeRegex.asMatchPredicate();
}
/* properties */
private static FilterMode getFilterMode(ProcessContext context) {
final String rawFilterMode = context
.getProperty(FILTER_MODE)
.getValue();
if (FILTER_MODE_VALUE_REMOVE.getValue().equals(rawFilterMode)) {
return FilterMode.REMOVE;
}
return FilterMode.RETAIN;
}
private static MatchingStrategy getMatchingStrategy(ProcessContext context) {
final String rawMatchingStrategy = context
.getProperty(MATCHING_STRATEGY)
.getValue();
if (MATCHING_STRATEGY_VALUE_REGEX.getValue().equals(rawMatchingStrategy)) {
return MatchingStrategy.REGEX;
}
return MatchingStrategy.ENUMERATION;
}
private static String getAttributeSet(ProcessContext context, FlowFile flowFile) {
return context.getProperty(ATTRIBUTE_SET).evaluateAttributeExpressions(flowFile).getValue();
}
private static String getDelimiter() {
return DELIMITER_VALUE;
}
private static Pattern getAttributeRegex(ProcessContext context, FlowFile flowFile) {
return Pattern.compile(
context.getProperty(ATTRIBUTE_REGEX).evaluateAttributeExpressions(flowFile).getValue()
);
}
private enum FilterMode {
RETAIN,
REMOVE,
}
private enum MatchingStrategy {
ENUMERATION,
REGEX,
}
}

View File

@ -43,6 +43,7 @@ org.apache.nifi.processors.standard.FetchDistributedMapCache
org.apache.nifi.processors.standard.FetchFile
org.apache.nifi.processors.standard.FetchFTP
org.apache.nifi.processors.standard.FetchSFTP
org.apache.nifi.processors.standard.FilterAttribute
org.apache.nifi.processors.standard.FlattenJson
org.apache.nifi.processors.standard.ForkRecord
org.apache.nifi.processors.standard.ForkEnrichment

View File

@ -0,0 +1,373 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import static org.junit.jupiter.api.Assertions.assertEquals;
class TestFilterAttribute {
private final TestRunner runner = TestRunners.newTestRunner(FilterAttribute.class);
private final String exampleContent = "lorem ipsum dolor sit amet";
private final Map<String, String> exampleAttributes = Map.of(
"foo", "fooValue",
"bar", "barValue",
"batz", "batzValue"
);
@Nested
class WithStrategyEnumeration {
@Nested
class InModeRetain {
@Test
void retainsAllAttributesWhenAllAreFiltered() {
final String attributeSet = "foo,bar,batz";
final Set<String> expectedAttributes = Set.of("foo", "bar", "batz", "uuid");
runTestWith(exampleAttributes, attributeSet, expectedAttributes);
}
@Test
void retainsUUIDAndFilteredAttributesWhenOnlySomeAreFiltered() {
final String attributeSet = "bar";
final Set<String> expectedAttributes = Set.of("bar", "uuid");
runTestWith(exampleAttributes, attributeSet, expectedAttributes);
}
@Test
void retainsUUIDOnlyWhenNoneOfTheAttributesAreFiltered() {
final String attributeSet = "other";
final Set<String> expectedAttributes = Set.of("uuid");
runTestWith(exampleAttributes, attributeSet, expectedAttributes);
}
@Test
void supportsAttributeNamesWithWhitespace() {
final Map<String, String> attributes = new HashMap<>(exampleAttributes);
attributes.put("fo\no", "some value");
final String attributeSet = "fo\no";
final Set<String> expectedAttributes = Set.of("fo\no", "uuid");
runTestWith(attributes, attributeSet, expectedAttributes);
}
}
@Nested
class InModeRemove {
@BeforeEach
void setUp() {
runner.setProperty(FilterAttribute.FILTER_MODE, FilterAttribute.FILTER_MODE_VALUE_REMOVE);
}
@Test
void removesAllAttributesExceptUUIDWhenAllAreFiltered() {
final String attributeSet = "foo,bar,batz,uuid,path,filename";
final Set<String> expectedAttributes = Set.of("uuid");
runTestWith(exampleAttributes, attributeSet, expectedAttributes);
}
@Test
void removesFilteredAttributesExceptUUIDWhenOnlySomeAreFiltered() {
final String attributeSet = "bar,uuid,path,filename";
final Set<String> expectedAttributes = Set.of("foo", "batz", "uuid");
runTestWith(exampleAttributes, attributeSet, expectedAttributes);
}
@Test
void removesNoAttributeWhenNoneOfTheAttributesAreFiltered() {
final String attributeSet = "other";
final Set<String> expectedAttributes = Set.of("foo", "bar", "batz", "uuid", "path", "filename");
runTestWith(exampleAttributes, attributeSet, expectedAttributes);
}
@Test
void supportsAttributeNamesWithWhitespace() {
final Map<String, String> attributes = new HashMap<>(exampleAttributes);
attributes.put("fo\no", "some value");
final String attributeSet = "fo\no";
final Set<String> expectedAttributes = Set.of("foo", "bar", "batz", "uuid", "path", "filename");
runTestWith(attributes, attributeSet, expectedAttributes);
}
}
@Nested
class RegardingAttributeSetParsing {
@Test
void ignoresLeadingDelimiters() {
final String attributeSet = ",foo,bar";
final Set<String> expectedAttributes = Set.of("foo", "bar", "uuid");
runTestWith(exampleAttributes, attributeSet, expectedAttributes);
}
@Test
void ignoresTrailingDelimiters() {
final String attributeSet = "foo,bar,";
final Set<String> expectedAttributes = Set.of("foo", "bar", "uuid");
runTestWith(exampleAttributes, attributeSet, expectedAttributes);
}
@Test
void doesNotYieldErrorWhenAttributeSetIsEffectivelyEmpty() {
final String attributeSet = " , ";
final Set<String> expectedAttributes = Set.of("uuid");
runTestWith(exampleAttributes, attributeSet, expectedAttributes);
}
@Test
void worksWithSingleAttributeInSet() {
final String attributeSet = "batz";
final Set<String> expectedAttributes = Set.of("batz", "uuid");
runTestWith(exampleAttributes, attributeSet, expectedAttributes);
}
@Test
void worksWithMultipleAttributesInSet() {
final String attributeSet = "foo,bar,batz";
final Set<String> expectedAttributes = Set.of("foo", "bar", "batz", "uuid");
runTestWith(exampleAttributes, attributeSet, expectedAttributes);
}
@Test
void ignoresLeadingWhitespaceInAttributeName() {
final String attributeSet = "foo, batz";
final Set<String> expectedAttributes = Set.of("foo", "batz", "uuid");
runTestWith(exampleAttributes, attributeSet, expectedAttributes);
}
@Test
void ignoresTrailingWhitespaceInAttributeName() {
final String attributeSet = "foo ,bar";
final Set<String> expectedAttributes = Set.of("foo", "bar", "uuid");
runTestWith(exampleAttributes, attributeSet, expectedAttributes);
}
}
@Test
void supportsDefiningAttributeSetInFlowFileAttribute() {
final Map<String, String> attributes = new HashMap<>(exampleAttributes);
attributes.put("lookup", "bar,batz");
final String attributeSet = "${lookup}"; // NiFi EL with reference to FlowFile attribute
final Set<String> expectedAttributes = Set.of("bar", "batz", "uuid");
runTestWith(attributes, attributeSet, expectedAttributes);
}
private void runTestWith(Map<String, String> attributes, String attributeSet, Set<String> expectedAttributes) {
runner.setProperty(FilterAttribute.MATCHING_STRATEGY, FilterAttribute.MATCHING_STRATEGY_VALUE_ENUMERATION);
runner.setProperty(FilterAttribute.ATTRIBUTE_SET, attributeSet);
final MockFlowFile input = runner.enqueue(exampleContent, attributes);
final Map<String, String> inputAttributes = input.getAttributes();
final Set<String> notExpectedAttributes = new HashSet<>(inputAttributes.keySet());
notExpectedAttributes.removeAll(expectedAttributes);
runner.run();
runner.assertAllFlowFilesTransferred(FilterAttribute.REL_SUCCESS, 1);
final MockFlowFile result = runner.getFlowFilesForRelationship(FilterAttribute.REL_SUCCESS).getFirst();
result.assertContentEquals(exampleContent);
for (String expectedName : expectedAttributes) {
final String expectedValue = inputAttributes.get(expectedName);
result.assertAttributeEquals(expectedName, expectedValue);
}
for (String notExpectedName : notExpectedAttributes) {
result.assertAttributeNotExists(notExpectedName);
}
}
}
@Nested
class WithStrategyRegex {
@Nested
class InModeRetain {
@Test
void retainsAllAttributesWhenAllAreFiltered() {
final Pattern attributeRegex = Pattern.compile("foo|bar|batz");
final Set<String> expectedAttributes = Set.of("foo", "bar", "batz", "uuid");
runTestWith(exampleAttributes, attributeRegex, expectedAttributes);
}
@Test
void retainsUUIDAndFilteredAttributesWhenOnlySomeAreFiltered() {
final Pattern attributeRegex = Pattern.compile("bar");
final Set<String> expectedAttributes = Set.of("bar", "uuid");
runTestWith(exampleAttributes, attributeRegex, expectedAttributes);
}
@Test
void retainsUUIDOnlyWhenNoneOfTheAttributesAreFiltered() {
final Pattern attributeRegex = Pattern.compile("other");
final Set<String> expectedAttributes = Set.of("uuid");
runTestWith(exampleAttributes, attributeRegex, expectedAttributes);
}
@Test
void supportsAttributeNamesWithWhitespace() {
final Map<String, String> attributes = new HashMap<>(exampleAttributes);
attributes.put("fo\no", "some value");
final Pattern attributeRegex = Pattern.compile("fo\no");
final Set<String> expectedAttributes = Set.of("fo\no", "uuid");
runTestWith(attributes, attributeRegex, expectedAttributes);
}
}
@Nested
class InModeRemove {
@BeforeEach
void setUp() {
runner.setProperty(FilterAttribute.FILTER_MODE, FilterAttribute.FILTER_MODE_VALUE_REMOVE);
}
@Test
void removesAllAttributesExceptUUIDWhenAllAreFiltered() {
final Pattern attributeRegex = Pattern.compile("foo|bar|batz|uuid|path|filename");
final Set<String> expectedAttributes = Set.of("uuid");
runTestWith(exampleAttributes, attributeRegex, expectedAttributes);
}
@Test
void removesFilteredAttributesExceptUUIDWhenOnlySomeAreFiltered() {
final Pattern attributeRegex = Pattern.compile("bar|uuid|path|filename");
final Set<String> expectedAttributes = Set.of("foo", "batz", "uuid");
runTestWith(exampleAttributes, attributeRegex, expectedAttributes);
}
@Test
void removesNoAttributeWhenNoneOfTheAttributesAreFiltered() {
final Pattern attributeRegex = Pattern.compile("other");
final Set<String> expectedAttributes = Set.of("foo", "bar", "batz", "uuid", "path", "filename");
runTestWith(exampleAttributes, attributeRegex, expectedAttributes);
}
@Test
void supportsAttributeNamesWithWhitespace() {
final Map<String, String> attributes = new HashMap<>(exampleAttributes);
attributes.put("fo\no", "some value");
final Pattern attributeRegex = Pattern.compile("fo\no");
final Set<String> expectedAttributes = Set.of("foo", "bar", "batz", "uuid", "path", "filename");
runTestWith(attributes, attributeRegex, expectedAttributes);
}
}
@Test
void supportsDefiningAttributeSetInFlowFileAttribute() {
final Map<String, String> attributes = new HashMap<>(exampleAttributes);
attributes.put("lookup", "bar|batz");
final String attributeRegex = "${lookup}"; // NiFi EL with reference to FlowFile attribute
final Set<String> expectedAttributes = Set.of("bar", "batz", "uuid");
runTestWith(attributes, attributeRegex, expectedAttributes);
}
private void runTestWith(Map<String, String> attributes, Pattern regex, Set<String> expectedAttributes) {
runTestWith(attributes, regex.pattern(), expectedAttributes);
}
private void runTestWith(Map<String, String> attributes, String regexPattern, Set<String> expectedAttributes) {
runner.setProperty(FilterAttribute.MATCHING_STRATEGY, FilterAttribute.MATCHING_STRATEGY_VALUE_REGEX);
runner.setProperty(FilterAttribute.ATTRIBUTE_REGEX, regexPattern);
final MockFlowFile input = runner.enqueue(exampleContent, attributes);
final Map<String, String> inputAttributes = input.getAttributes();
final Set<String> notExpectedAttributes = new HashSet<>(inputAttributes.keySet());
notExpectedAttributes.removeAll(expectedAttributes);
runner.run();
runner.assertAllFlowFilesTransferred(FilterAttribute.REL_SUCCESS, 1);
final MockFlowFile result = runner.getFlowFilesForRelationship(FilterAttribute.REL_SUCCESS).getFirst();
result.assertContentEquals(exampleContent);
for (String expectedName : expectedAttributes) {
final String expectedValue = inputAttributes.get(expectedName);
result.assertAttributeEquals(expectedName, expectedValue);
}
for (String notExpectedName : notExpectedAttributes) {
result.assertAttributeNotExists(notExpectedName);
}
}
}
@Test
void supportMultiThreadedExecution() {
runner.setThreadCount(5);
final int flowFileCount = 10_000;
for (int i = 0; i < flowFileCount; i++) {
runner.enqueue(exampleContent, Map.of(
"foo", "" + i,
"bar", "" + i
));
}
runner.setProperty(FilterAttribute.ATTRIBUTE_SET, "foo");
runner.run(flowFileCount);
runner.assertAllFlowFilesTransferred(FilterAttribute.REL_SUCCESS, flowFileCount);
List<MockFlowFile> resultFlowFiles = runner.getFlowFilesForRelationship(FilterAttribute.REL_SUCCESS);
for (final MockFlowFile resultFlowFile : resultFlowFiles) {
resultFlowFile.assertAttributeExists("foo");
resultFlowFile.assertAttributeNotExists("bar");
}
final Set<String> fooValues = resultFlowFiles.stream()
.map(flowFile -> flowFile.getAttribute("foo"))
.collect(Collectors.toUnmodifiableSet());
assertEquals(flowFileCount, fooValues.size());
}
}