diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/MockFlowFile.java b/nifi-mock/src/main/java/org/apache/nifi/util/MockFlowFile.java index f5702daed1..ec86fe7656 100644 --- a/nifi-mock/src/main/java/org/apache/nifi/util/MockFlowFile.java +++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockFlowFile.java @@ -189,6 +189,11 @@ public class MockFlowFile implements FlowFileRecord { public void removeAttributes(final Set attrNames) { for (final String attrName : attrNames) { + if (CoreAttributes.UUID.key().equals(attrName)) { + // the core attribute "uuid" of a FlowFile cannot be altered / removed + continue; + } + attributes.remove(attrName); } } diff --git a/nifi-mock/src/test/java/org/apache/nifi/util/TestMockProcessSession.java b/nifi-mock/src/test/java/org/apache/nifi/util/TestMockProcessSession.java index 700d0d29b2..c1364a75e3 100644 --- a/nifi-mock/src/test/java/org/apache/nifi/util/TestMockProcessSession.java +++ b/nifi-mock/src/test/java/org/apache/nifi/util/TestMockProcessSession.java @@ -17,6 +17,7 @@ package org.apache.nifi.util; import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; @@ -34,6 +35,7 @@ import java.util.Collections; import java.util.List; import java.util.Set; import java.util.concurrent.atomic.AtomicLong; +import java.util.regex.Pattern; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; @@ -182,6 +184,28 @@ public class TestMockProcessSession { output.get(0).assertAttributeEquals("key1", "val1"); } + @Test + void testAttributeUUIDNotRemovable() { + final Processor processor = new PoorlyBehavedProcessor(); + final MockProcessSession session = new MockProcessSession(new SharedSessionState(processor, new AtomicLong(0L)), processor, new MockStateManager(processor)); + FlowFile ff1 = session.createFlowFile("removeAttribute(attrName)".getBytes()); + FlowFile ff2 = session.createFlowFile("removeAllAttributes(attrNames)".getBytes()); + FlowFile ff3 = session.createFlowFile("removeAllAttributes(keyPattern)".getBytes()); + + String attrName = CoreAttributes.UUID.key(); + session.removeAttribute(ff1, attrName); + session.removeAllAttributes(ff2, Set.of(attrName)); + session.removeAllAttributes(ff3, Pattern.compile(Pattern.quote(attrName))); + + session.transfer(List.of(ff1, ff2, ff3), PoorlyBehavedProcessor.REL_FAILURE); + session.commitAsync(); + List output = session.getFlowFilesForRelationship(PoorlyBehavedProcessor.REL_FAILURE); + assertEquals(3, output.size()); + output.get(0).assertAttributeEquals(attrName, ff1.getAttribute(attrName)); + output.get(1).assertAttributeEquals(attrName, ff2.getAttribute(attrName)); + output.get(2).assertAttributeEquals(attrName, ff3.getAttribute(attrName)); + } + protected static class PoorlyBehavedProcessor extends AbstractProcessor { private static final Relationship REL_FAILURE = new Relationship.Builder() diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FilterAttribute.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FilterAttribute.java new file mode 100644 index 0000000000..de612196fd --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FilterAttribute.java @@ -0,0 +1,272 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + +import org.apache.nifi.annotation.behavior.DefaultRunDuration; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.SideEffectFree; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.documentation.UseCase; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.function.Predicate; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +@SideEffectFree +@SupportsBatching(defaultDuration = DefaultRunDuration.TWENTY_FIVE_MILLIS) +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@Tags({"attributes", "modification", "filter", "retain", "remove", "delete", "regex", "regular expression", "Attribute Expression Language"}) +@CapabilityDescription("Filters the attributes of a FlowFile by retaining specified attributes and removing the rest or by removing specified attributes and retaining the rest.") +@UseCase( + description = "Retain all FlowFile attributes matching a regular expression", + configuration = """ + Set "Filter mode" to "Retain". + Set "Attribute matching strategy" to "Use regular expression". + Specify the "Regular expression to filter attributes", e.g. "my-property|a-prefix[.].*". + """ +) +@UseCase( + description = "Remove only a specified set of FlowFile attributes", + configuration = """ + Set "Filter mode" to "Remove". + Set "Attribute matching strategy" to "Enumerate attributes". + Specify the set of "Set of attributes to filter" using the delimiter comma ',', e.g. "my-property,other,filename". + """ +) +public class FilterAttribute extends AbstractProcessor { + + public static final Relationship REL_SUCCESS = new Relationship.Builder() + .description("All successful FlowFiles are routed to this relationship").name("success").build(); + + private final static Set relationships = Collections.singleton(REL_SUCCESS); + + + public static final AllowableValue FILTER_MODE_VALUE_RETAIN = new AllowableValue( + "RETAIN", + "Retain", + "Retains only the attributes matching the filter, all other attributes are removed." + ); + + public static final AllowableValue FILTER_MODE_VALUE_REMOVE = new AllowableValue( + "REMOVE", + "Remove", + "Removes the attributes matching the filter, all other attributes are retained." + ); + + public static final PropertyDescriptor FILTER_MODE = new PropertyDescriptor.Builder() + .name("FILTER_MODE") + .displayName("Filter mode") + .description("Specifies the strategy to apply on filtered attributes. Either 'Remove' or 'Retain' only the matching attributes.") + .required(true) + .allowableValues(FILTER_MODE_VALUE_RETAIN, FILTER_MODE_VALUE_REMOVE) + .expressionLanguageSupported(ExpressionLanguageScope.NONE) + .defaultValue(FILTER_MODE_VALUE_RETAIN.getValue()) + .build(); + + public static final AllowableValue MATCHING_STRATEGY_VALUE_ENUMERATION = new AllowableValue( + "ENUMERATION", + "Enumerate attributes", + "Provides a set of attribute keys to filter for, separated by a comma delimiter ','." + ); + + public static final AllowableValue MATCHING_STRATEGY_VALUE_REGEX = new AllowableValue( + "REGEX", + "Use regular expression", + "Provides a regular expression to match keys of attributes to filter for." + ); + + public static final PropertyDescriptor MATCHING_STRATEGY = new PropertyDescriptor.Builder() + .name("MATCHING_STRATEGY") + .displayName("Attribute matching strategy") + .description("Specifies the strategy to filter attributes by.") + .required(true) + .allowableValues(MATCHING_STRATEGY_VALUE_ENUMERATION, MATCHING_STRATEGY_VALUE_REGEX) + .expressionLanguageSupported(ExpressionLanguageScope.NONE) + .defaultValue(MATCHING_STRATEGY_VALUE_ENUMERATION.getValue()) + .build(); + + public static final PropertyDescriptor ATTRIBUTE_SET = new PropertyDescriptor.Builder() + .name("ATTRIBUTE_SET") + .displayName("Set of attributes to filter") + .description("A set of attribute names to filter from FlowFiles. Each attribute name is separated by the comma delimiter ','.") + .required(true) + .dependsOn(MATCHING_STRATEGY, MATCHING_STRATEGY_VALUE_ENUMERATION) + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .build(); + + public static final PropertyDescriptor ATTRIBUTE_REGEX = new PropertyDescriptor.Builder() + .name("ATTRIBUTE_REGEX") + .displayName("Regular expression to filter attributes") + .description("A regular expression to match names of attributes to filter from FlowFiles.") + .required(true) + .dependsOn(MATCHING_STRATEGY, MATCHING_STRATEGY_VALUE_REGEX) + .addValidator(StandardValidators.REGULAR_EXPRESSION_WITH_EL_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .build(); + + private final static String DELIMITER_VALUE = ","; + + private final static List properties = + List.of(FILTER_MODE, MATCHING_STRATEGY, ATTRIBUTE_SET, ATTRIBUTE_REGEX); + + @Override + public Set getRelationships() { + return relationships; + } + + @Override + protected List getSupportedPropertyDescriptors() { + return properties; + } + + private volatile Predicate cachedMatchingPredicate; + + @OnScheduled + public void onScheduled(final ProcessContext context) { + final MatchingStrategy matchingStrategy = getMatchingStrategy(context); + + cachedMatchingPredicate = null; + + if (matchingStrategy == MatchingStrategy.ENUMERATION + && !context.getProperty(ATTRIBUTE_SET).isExpressionLanguagePresent()) { + cachedMatchingPredicate = determineMatchingPredicateBasedOnEnumeration(context, null); + } + if (matchingStrategy == MatchingStrategy.REGEX + && !context.getProperty(ATTRIBUTE_REGEX).isExpressionLanguagePresent()) { + cachedMatchingPredicate = determineMatchingPredicateBasedOnRegex(context, null); + } + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + final FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + + final Predicate matchingPredicate = determineMatchingPredicate(context, flowFile); + + final FilterMode filterMode = getFilterMode(context); + final Predicate isMatched = switch (filterMode) { + case RETAIN -> matchingPredicate; + case REMOVE -> matchingPredicate.negate(); + }; + + final Set attributesToRemove = new HashSet<>(flowFile.getAttributes().keySet()); + attributesToRemove.removeIf(isMatched); + + final FlowFile updatedFlowFile = session.removeAllAttributes(flowFile, attributesToRemove); + session.transfer(updatedFlowFile, REL_SUCCESS); + } + + private Predicate determineMatchingPredicate(ProcessContext context, FlowFile flowFile) { + if (cachedMatchingPredicate != null) { + return cachedMatchingPredicate; + } + + final MatchingStrategy matchingStrategy = getMatchingStrategy(context); + return switch (matchingStrategy) { + case ENUMERATION -> determineMatchingPredicateBasedOnEnumeration(context, flowFile); + case REGEX -> determineMatchingPredicateBasedOnRegex(context, flowFile); + }; + } + + private static Predicate determineMatchingPredicateBasedOnEnumeration(ProcessContext context, FlowFile flowFile) { + final String attributeSetDeclaration = getAttributeSet(context, flowFile); + final String delimiter = getDelimiter(); + + Set attributeSet = Arrays.stream(attributeSetDeclaration.split(Pattern.quote(delimiter))) + .map(String::trim) + .filter(attributeName -> !attributeName.isBlank()) + .collect(Collectors.toUnmodifiableSet()); + + return attributeSet::contains; + } + + private static Predicate determineMatchingPredicateBasedOnRegex(ProcessContext context, FlowFile flowFile) { + Pattern attributeRegex = getAttributeRegex(context, flowFile); + + return attributeRegex.asMatchPredicate(); + } + + /* properties */ + + private static FilterMode getFilterMode(ProcessContext context) { + final String rawFilterMode = context + .getProperty(FILTER_MODE) + .getValue(); + + if (FILTER_MODE_VALUE_REMOVE.getValue().equals(rawFilterMode)) { + return FilterMode.REMOVE; + } + return FilterMode.RETAIN; + } + + private static MatchingStrategy getMatchingStrategy(ProcessContext context) { + final String rawMatchingStrategy = context + .getProperty(MATCHING_STRATEGY) + .getValue(); + + if (MATCHING_STRATEGY_VALUE_REGEX.getValue().equals(rawMatchingStrategy)) { + return MatchingStrategy.REGEX; + } + return MatchingStrategy.ENUMERATION; + } + + private static String getAttributeSet(ProcessContext context, FlowFile flowFile) { + return context.getProperty(ATTRIBUTE_SET).evaluateAttributeExpressions(flowFile).getValue(); + } + + private static String getDelimiter() { + return DELIMITER_VALUE; + } + + private static Pattern getAttributeRegex(ProcessContext context, FlowFile flowFile) { + return Pattern.compile( + context.getProperty(ATTRIBUTE_REGEX).evaluateAttributeExpressions(flowFile).getValue() + ); + } + + private enum FilterMode { + RETAIN, + REMOVE, + } + + private enum MatchingStrategy { + ENUMERATION, + REGEX, + } +} diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor index e303ba8c89..a070ad732c 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -43,6 +43,7 @@ org.apache.nifi.processors.standard.FetchDistributedMapCache org.apache.nifi.processors.standard.FetchFile org.apache.nifi.processors.standard.FetchFTP org.apache.nifi.processors.standard.FetchSFTP +org.apache.nifi.processors.standard.FilterAttribute org.apache.nifi.processors.standard.FlattenJson org.apache.nifi.processors.standard.ForkRecord org.apache.nifi.processors.standard.ForkEnrichment diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFilterAttribute.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFilterAttribute.java new file mode 100644 index 0000000000..d6d29d97ba --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFilterAttribute.java @@ -0,0 +1,373 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Test; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +class TestFilterAttribute { + + private final TestRunner runner = TestRunners.newTestRunner(FilterAttribute.class); + + private final String exampleContent = "lorem ipsum dolor sit amet"; + + private final Map exampleAttributes = Map.of( + "foo", "fooValue", + "bar", "barValue", + "batz", "batzValue" + ); + + @Nested + class WithStrategyEnumeration { + @Nested + class InModeRetain { + @Test + void retainsAllAttributesWhenAllAreFiltered() { + final String attributeSet = "foo,bar,batz"; + final Set expectedAttributes = Set.of("foo", "bar", "batz", "uuid"); + + runTestWith(exampleAttributes, attributeSet, expectedAttributes); + } + + @Test + void retainsUUIDAndFilteredAttributesWhenOnlySomeAreFiltered() { + final String attributeSet = "bar"; + final Set expectedAttributes = Set.of("bar", "uuid"); + + runTestWith(exampleAttributes, attributeSet, expectedAttributes); + } + + @Test + void retainsUUIDOnlyWhenNoneOfTheAttributesAreFiltered() { + final String attributeSet = "other"; + final Set expectedAttributes = Set.of("uuid"); + + runTestWith(exampleAttributes, attributeSet, expectedAttributes); + } + + @Test + void supportsAttributeNamesWithWhitespace() { + final Map attributes = new HashMap<>(exampleAttributes); + attributes.put("fo\no", "some value"); + final String attributeSet = "fo\no"; + final Set expectedAttributes = Set.of("fo\no", "uuid"); + + runTestWith(attributes, attributeSet, expectedAttributes); + } + } + + @Nested + class InModeRemove { + + @BeforeEach + void setUp() { + runner.setProperty(FilterAttribute.FILTER_MODE, FilterAttribute.FILTER_MODE_VALUE_REMOVE); + } + + @Test + void removesAllAttributesExceptUUIDWhenAllAreFiltered() { + final String attributeSet = "foo,bar,batz,uuid,path,filename"; + final Set expectedAttributes = Set.of("uuid"); + + runTestWith(exampleAttributes, attributeSet, expectedAttributes); + } + + @Test + void removesFilteredAttributesExceptUUIDWhenOnlySomeAreFiltered() { + final String attributeSet = "bar,uuid,path,filename"; + final Set expectedAttributes = Set.of("foo", "batz", "uuid"); + + runTestWith(exampleAttributes, attributeSet, expectedAttributes); + } + + @Test + void removesNoAttributeWhenNoneOfTheAttributesAreFiltered() { + final String attributeSet = "other"; + final Set expectedAttributes = Set.of("foo", "bar", "batz", "uuid", "path", "filename"); + + runTestWith(exampleAttributes, attributeSet, expectedAttributes); + } + + @Test + void supportsAttributeNamesWithWhitespace() { + final Map attributes = new HashMap<>(exampleAttributes); + attributes.put("fo\no", "some value"); + final String attributeSet = "fo\no"; + final Set expectedAttributes = Set.of("foo", "bar", "batz", "uuid", "path", "filename"); + + runTestWith(attributes, attributeSet, expectedAttributes); + } + } + + @Nested + class RegardingAttributeSetParsing { + + @Test + void ignoresLeadingDelimiters() { + final String attributeSet = ",foo,bar"; + final Set expectedAttributes = Set.of("foo", "bar", "uuid"); + + runTestWith(exampleAttributes, attributeSet, expectedAttributes); + } + + @Test + void ignoresTrailingDelimiters() { + final String attributeSet = "foo,bar,"; + final Set expectedAttributes = Set.of("foo", "bar", "uuid"); + + runTestWith(exampleAttributes, attributeSet, expectedAttributes); + } + + @Test + void doesNotYieldErrorWhenAttributeSetIsEffectivelyEmpty() { + final String attributeSet = " , "; + final Set expectedAttributes = Set.of("uuid"); + + runTestWith(exampleAttributes, attributeSet, expectedAttributes); + } + + @Test + void worksWithSingleAttributeInSet() { + final String attributeSet = "batz"; + final Set expectedAttributes = Set.of("batz", "uuid"); + + runTestWith(exampleAttributes, attributeSet, expectedAttributes); + } + + @Test + void worksWithMultipleAttributesInSet() { + final String attributeSet = "foo,bar,batz"; + final Set expectedAttributes = Set.of("foo", "bar", "batz", "uuid"); + + runTestWith(exampleAttributes, attributeSet, expectedAttributes); + } + + @Test + void ignoresLeadingWhitespaceInAttributeName() { + final String attributeSet = "foo, batz"; + final Set expectedAttributes = Set.of("foo", "batz", "uuid"); + + runTestWith(exampleAttributes, attributeSet, expectedAttributes); + } + + @Test + void ignoresTrailingWhitespaceInAttributeName() { + final String attributeSet = "foo ,bar"; + final Set expectedAttributes = Set.of("foo", "bar", "uuid"); + + runTestWith(exampleAttributes, attributeSet, expectedAttributes); + } + } + + @Test + void supportsDefiningAttributeSetInFlowFileAttribute() { + final Map attributes = new HashMap<>(exampleAttributes); + attributes.put("lookup", "bar,batz"); + final String attributeSet = "${lookup}"; // NiFi EL with reference to FlowFile attribute + final Set expectedAttributes = Set.of("bar", "batz", "uuid"); + + runTestWith(attributes, attributeSet, expectedAttributes); + } + + private void runTestWith(Map attributes, String attributeSet, Set expectedAttributes) { + runner.setProperty(FilterAttribute.MATCHING_STRATEGY, FilterAttribute.MATCHING_STRATEGY_VALUE_ENUMERATION); + runner.setProperty(FilterAttribute.ATTRIBUTE_SET, attributeSet); + + final MockFlowFile input = runner.enqueue(exampleContent, attributes); + final Map inputAttributes = input.getAttributes(); + final Set notExpectedAttributes = new HashSet<>(inputAttributes.keySet()); + notExpectedAttributes.removeAll(expectedAttributes); + + runner.run(); + + runner.assertAllFlowFilesTransferred(FilterAttribute.REL_SUCCESS, 1); + final MockFlowFile result = runner.getFlowFilesForRelationship(FilterAttribute.REL_SUCCESS).getFirst(); + result.assertContentEquals(exampleContent); + for (String expectedName : expectedAttributes) { + final String expectedValue = inputAttributes.get(expectedName); + + result.assertAttributeEquals(expectedName, expectedValue); + } + for (String notExpectedName : notExpectedAttributes) { + result.assertAttributeNotExists(notExpectedName); + } + } + } + + @Nested + class WithStrategyRegex { + + @Nested + class InModeRetain { + @Test + void retainsAllAttributesWhenAllAreFiltered() { + final Pattern attributeRegex = Pattern.compile("foo|bar|batz"); + final Set expectedAttributes = Set.of("foo", "bar", "batz", "uuid"); + + runTestWith(exampleAttributes, attributeRegex, expectedAttributes); + } + + @Test + void retainsUUIDAndFilteredAttributesWhenOnlySomeAreFiltered() { + final Pattern attributeRegex = Pattern.compile("bar"); + final Set expectedAttributes = Set.of("bar", "uuid"); + + runTestWith(exampleAttributes, attributeRegex, expectedAttributes); + } + + @Test + void retainsUUIDOnlyWhenNoneOfTheAttributesAreFiltered() { + final Pattern attributeRegex = Pattern.compile("other"); + final Set expectedAttributes = Set.of("uuid"); + + runTestWith(exampleAttributes, attributeRegex, expectedAttributes); + } + + @Test + void supportsAttributeNamesWithWhitespace() { + final Map attributes = new HashMap<>(exampleAttributes); + attributes.put("fo\no", "some value"); + final Pattern attributeRegex = Pattern.compile("fo\no"); + final Set expectedAttributes = Set.of("fo\no", "uuid"); + + runTestWith(attributes, attributeRegex, expectedAttributes); + } + } + + @Nested + class InModeRemove { + + @BeforeEach + void setUp() { + runner.setProperty(FilterAttribute.FILTER_MODE, FilterAttribute.FILTER_MODE_VALUE_REMOVE); + } + + @Test + void removesAllAttributesExceptUUIDWhenAllAreFiltered() { + final Pattern attributeRegex = Pattern.compile("foo|bar|batz|uuid|path|filename"); + final Set expectedAttributes = Set.of("uuid"); + + runTestWith(exampleAttributes, attributeRegex, expectedAttributes); + } + + @Test + void removesFilteredAttributesExceptUUIDWhenOnlySomeAreFiltered() { + final Pattern attributeRegex = Pattern.compile("bar|uuid|path|filename"); + final Set expectedAttributes = Set.of("foo", "batz", "uuid"); + + runTestWith(exampleAttributes, attributeRegex, expectedAttributes); + } + + @Test + void removesNoAttributeWhenNoneOfTheAttributesAreFiltered() { + final Pattern attributeRegex = Pattern.compile("other"); + final Set expectedAttributes = Set.of("foo", "bar", "batz", "uuid", "path", "filename"); + + runTestWith(exampleAttributes, attributeRegex, expectedAttributes); + } + + @Test + void supportsAttributeNamesWithWhitespace() { + final Map attributes = new HashMap<>(exampleAttributes); + attributes.put("fo\no", "some value"); + final Pattern attributeRegex = Pattern.compile("fo\no"); + final Set expectedAttributes = Set.of("foo", "bar", "batz", "uuid", "path", "filename"); + + runTestWith(attributes, attributeRegex, expectedAttributes); + } + } + + @Test + void supportsDefiningAttributeSetInFlowFileAttribute() { + final Map attributes = new HashMap<>(exampleAttributes); + attributes.put("lookup", "bar|batz"); + final String attributeRegex = "${lookup}"; // NiFi EL with reference to FlowFile attribute + final Set expectedAttributes = Set.of("bar", "batz", "uuid"); + + runTestWith(attributes, attributeRegex, expectedAttributes); + } + + private void runTestWith(Map attributes, Pattern regex, Set expectedAttributes) { + runTestWith(attributes, regex.pattern(), expectedAttributes); + } + + private void runTestWith(Map attributes, String regexPattern, Set expectedAttributes) { + runner.setProperty(FilterAttribute.MATCHING_STRATEGY, FilterAttribute.MATCHING_STRATEGY_VALUE_REGEX); + runner.setProperty(FilterAttribute.ATTRIBUTE_REGEX, regexPattern); + + final MockFlowFile input = runner.enqueue(exampleContent, attributes); + final Map inputAttributes = input.getAttributes(); + final Set notExpectedAttributes = new HashSet<>(inputAttributes.keySet()); + notExpectedAttributes.removeAll(expectedAttributes); + + runner.run(); + + runner.assertAllFlowFilesTransferred(FilterAttribute.REL_SUCCESS, 1); + final MockFlowFile result = runner.getFlowFilesForRelationship(FilterAttribute.REL_SUCCESS).getFirst(); + result.assertContentEquals(exampleContent); + for (String expectedName : expectedAttributes) { + final String expectedValue = inputAttributes.get(expectedName); + + result.assertAttributeEquals(expectedName, expectedValue); + } + for (String notExpectedName : notExpectedAttributes) { + result.assertAttributeNotExists(notExpectedName); + } + } + } + + + @Test + void supportMultiThreadedExecution() { + runner.setThreadCount(5); + + final int flowFileCount = 10_000; + for (int i = 0; i < flowFileCount; i++) { + runner.enqueue(exampleContent, Map.of( + "foo", "" + i, + "bar", "" + i + )); + } + runner.setProperty(FilterAttribute.ATTRIBUTE_SET, "foo"); + + runner.run(flowFileCount); + runner.assertAllFlowFilesTransferred(FilterAttribute.REL_SUCCESS, flowFileCount); + List resultFlowFiles = runner.getFlowFilesForRelationship(FilterAttribute.REL_SUCCESS); + for (final MockFlowFile resultFlowFile : resultFlowFiles) { + resultFlowFile.assertAttributeExists("foo"); + resultFlowFile.assertAttributeNotExists("bar"); + } + final Set fooValues = resultFlowFiles.stream() + .map(flowFile -> flowFile.getAttribute("foo")) + .collect(Collectors.toUnmodifiableSet()); + assertEquals(flowFileCount, fooValues.size()); + } +} \ No newline at end of file