diff --git a/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/evaluation/functions/GetDelimitedFieldEvaluator.java b/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/evaluation/functions/GetDelimitedFieldEvaluator.java index 230b9425f1..e23634aa6b 100644 --- a/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/evaluation/functions/GetDelimitedFieldEvaluator.java +++ b/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/evaluation/functions/GetDelimitedFieldEvaluator.java @@ -157,7 +157,7 @@ public class GetDelimitedFieldEvaluator extends StringEvaluator { lastCharIsEscape = (c == escapeChar) && !lastCharIsEscape; } - if (curFieldIndex == desiredFieldIndex - 1) { + if (curFieldIndex == desiredFieldIndex) { // we have run out of characters and we are on the desired field. Return the characters from this field. return new StringQueryResult(fieldBuilder.toString()); } diff --git a/nifi-commons/nifi-expression-language/src/test/java/org/apache/nifi/attribute/expression/language/TestQuery.java b/nifi-commons/nifi-expression-language/src/test/java/org/apache/nifi/attribute/expression/language/TestQuery.java index c42931ea96..751a0acce0 100644 --- a/nifi-commons/nifi-expression-language/src/test/java/org/apache/nifi/attribute/expression/language/TestQuery.java +++ b/nifi-commons/nifi-expression-language/src/test/java/org/apache/nifi/attribute/expression/language/TestQuery.java @@ -1163,11 +1163,21 @@ public class TestQuery { attributes.put("line", "Name, Age, Title"); // Test "simple" case - comma separated with no quoted or escaped text + verifyEquals("${line:getDelimitedField(1)}", attributes, "Name"); + verifyEquals("${line:getDelimitedField(1, ',')}", attributes, "Name"); + verifyEquals("${line:getDelimitedField(1, ',', '\"')}", attributes, "Name"); + verifyEquals("${line:getDelimitedField(1, ',', '\"', '\\\\')}", attributes, "Name"); + verifyEquals("${line:getDelimitedField(2)}", attributes, " Age"); verifyEquals("${line:getDelimitedField(2, ',')}", attributes, " Age"); verifyEquals("${line:getDelimitedField(2, ',', '\"')}", attributes, " Age"); verifyEquals("${line:getDelimitedField(2, ',', '\"', '\\\\')}", attributes, " Age"); + verifyEquals("${line:getDelimitedField(3)}", attributes, " Title"); + verifyEquals("${line:getDelimitedField(3, ',')}", attributes, " Title"); + verifyEquals("${line:getDelimitedField(3, ',', '\"')}", attributes, " Title"); + verifyEquals("${line:getDelimitedField(3, ',', '\"', '\\\\')}", attributes, " Title"); + // test with a space in column attributes.put("line", "First Name, Age, Title"); verifyEquals("${line:getDelimitedField(1)}", attributes, "First Name"); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/RouteText.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/RouteText.java index d00121f212..0ed43b4c6f 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/RouteText.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/RouteText.java @@ -52,6 +52,7 @@ import org.apache.nifi.components.PropertyValue; import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; import org.apache.nifi.components.Validator; +import org.apache.nifi.expression.AttributeExpression.ResultType; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.logging.ProcessorLog; import org.apache.nifi.processor.AbstractProcessor; @@ -69,7 +70,7 @@ import org.apache.nifi.processors.standard.util.NLKBufferedReader; @SideEffectFree @SupportsBatching @InputRequirement(Requirement.INPUT_REQUIRED) -@Tags({"attributes", "routing", "text", "regexp", "regex", "Regular Expression", "Expression Language"}) +@Tags({"attributes", "routing", "text", "regexp", "regex", "Regular Expression", "Expression Language", "csv", "filter", "logs", "delimited"}) @CapabilityDescription("Routes textual data based on a set of user-defined rules. Each line in an incoming FlowFile is compared against the values specified by user-defined Properties. " + "The mechanism by which the text is compared to these user-defined properties is defined by the 'Matching Strategy'. The data is then routed according to these rules, routing " + "each line of the text individually.") @@ -91,6 +92,7 @@ public class RouteText extends AbstractProcessor { private static final String equalsValue = "Equals"; private static final String matchesRegularExpressionValue = "Matches Regular Expression"; private static final String containsRegularExpressionValue = "Contains Regular Expression"; + private static final String satisfiesExpression = "Satisfies Expression"; public static final AllowableValue ROUTE_TO_MATCHING_PROPERTY_NAME = new AllowableValue(routePropertyNameValue, routePropertyNameValue, @@ -112,7 +114,10 @@ public class RouteText extends AbstractProcessor { "Match lines based on whether the line exactly matches the Regular Expression that is provided as the Property value"); public static final AllowableValue CONTAINS_REGULAR_EXPRESSION = new AllowableValue(containsRegularExpressionValue, containsRegularExpressionValue, "Match lines based on whether the line contains some text that matches the Regular Expression that is provided as the Property value"); - + public static final AllowableValue SATISFIES_EXPRESSION = new AllowableValue(satisfiesExpression, satisfiesExpression, + "Match lines based on whether or not the the text satisfies the given Expression Language expression. I.e., the line will match if the property value, evaluated as " + + "an Expression, returns true. The expression is able to reference FlowFile Attributes, as well as the variables 'line' (which is the text of the line to evaluate) and " + + "'lineNo' (which is the line number being evaluated. This will be 1 for the first line, 2 for the second and so on)."); public static final PropertyDescriptor ROUTE_STRATEGY = new PropertyDescriptor.Builder() .name("Routing Strategy") @@ -127,7 +132,7 @@ public class RouteText extends AbstractProcessor { .name("Matching Strategy") .description("Specifies how to evaluate each line of incoming text against the user-defined properties.") .required(true) - .allowableValues(STARTS_WITH, ENDS_WITH, CONTAINS, EQUALS, MATCHES_REGULAR_EXPRESSION, CONTAINS_REGULAR_EXPRESSION) + .allowableValues(SATISFIES_EXPRESSION, STARTS_WITH, ENDS_WITH, CONTAINS, EQUALS, MATCHES_REGULAR_EXPRESSION, CONTAINS_REGULAR_EXPRESSION) .dynamic(false) .build(); @@ -142,7 +147,8 @@ public class RouteText extends AbstractProcessor { static final PropertyDescriptor IGNORE_CASE = new PropertyDescriptor.Builder() .name("Ignore Case") - .description("If true, capitalization will not be taken into account when comparing values. E.g., matching against 'HELLO' or 'hello' will have the same result.") + .description("If true, capitalization will not be taken into account when comparing values. E.g., matching against 'HELLO' or 'hello' will have the same result. " + + "This property is ignored if the 'Matching Strategy' is set to 'Satisfies Expression'.") .expressionLanguageSupported(false) .allowableValues("true", "false") .defaultValue("false") @@ -298,6 +304,8 @@ public class RouteText extends AbstractProcessor { final String matchStrategy = validationContext.getProperty(MATCH_STRATEGY).getValue(); final boolean compileRegex = matchStrategy.equals(matchesRegularExpressionValue) || matchStrategy.equals(containsRegularExpressionValue); + final boolean requiresExpression = matchStrategy.equalsIgnoreCase(satisfiesExpression); + Validator validator = null; if (compileRegex) { validator = StandardValidators.createRegexValidator(0, Integer.MAX_VALUE, true); @@ -308,11 +316,24 @@ public class RouteText extends AbstractProcessor { if (descriptor.isDynamic()) { dynamicProperty = true; + final String propValue = validationContext.getProperty(descriptor).getValue(); + if (compileRegex) { - ValidationResult validationResult = validator.validate(descriptor.getName(), validationContext.getProperty(descriptor).getValue(), validationContext); + ValidationResult validationResult = validator.validate(descriptor.getName(), propValue, validationContext); if (validationResult != null) { results.add(validationResult); } + } else if (requiresExpression) { + try { + final ResultType resultType = validationContext.newExpressionLanguageCompiler().compile(propValue).getResultType(); + if (resultType != ResultType.BOOLEAN) { + results.add(new ValidationResult.Builder().valid(false).input(propValue).subject(descriptor.getName()) + .explanation("expression returns type of " + resultType.name() + " but is required to return a Boolean value").build()); + } + } catch (final IllegalArgumentException iae) { + results.add(new ValidationResult.Builder().valid(false).input(propValue).subject(descriptor.getName()) + .explanation("input is not a valid Expression Language expression").build()); + } } } } @@ -326,6 +347,7 @@ public class RouteText extends AbstractProcessor { } @Override + @SuppressWarnings({"unchecked", "rawtypes"}) public void onTrigger(final ProcessContext context, final ProcessSession session) { final FlowFile originalFlowFile = session.get(); if (originalFlowFile == null) { @@ -339,19 +361,28 @@ public class RouteText extends AbstractProcessor { final String matchStrategy = context.getProperty(MATCH_STRATEGY).getValue(); final boolean ignoreCase = context.getProperty(IGNORE_CASE).asBoolean(); - final Map propMap = this.propertyMap; - final Map propValueMap = new HashMap<>(propMap.size()); - final boolean compileRegex = matchStrategy.equals(matchesRegularExpressionValue) || matchStrategy.equals(containsRegularExpressionValue); + final boolean usePropValue = matchStrategy.equals(satisfiesExpression); - for (final Map.Entry entry : propMap.entrySet()) { - final String value = entry.getValue().evaluateAttributeExpressions(originalFlowFile).getValue(); + // Build up a Map of Relationship to object, where the object is the + // thing that each line is compared against + final Map propValueMap; + final Map propMap = this.propertyMap; + if (usePropValue) { + // If we are using an Expression Language we want a Map where the value is the + // PropertyValue, so we can just use the 'propMap' - no need to copy it. + propValueMap = (Map) propMap; + } else { + propValueMap = new HashMap<>(propMap.size()); + for (final Map.Entry entry : propMap.entrySet()) { + final String value = entry.getValue().evaluateAttributeExpressions(originalFlowFile).getValue(); - Pattern compiledRegex = null; - if (compileRegex) { - compiledRegex = ignoreCase ? Pattern.compile(value, Pattern.CASE_INSENSITIVE) : Pattern.compile(value); + Pattern compiledRegex = null; + if (compileRegex) { + compiledRegex = ignoreCase ? Pattern.compile(value, Pattern.CASE_INSENSITIVE) : Pattern.compile(value); + } + propValueMap.put(entry.getKey(), compileRegex ? compiledRegex : value); } - propValueMap.put(entry.getKey(), compileRegex ? compiledRegex : value); } final Map> flowFileMap = new HashMap<>(); @@ -363,6 +394,9 @@ public class RouteText extends AbstractProcessor { try (final Reader inReader = new InputStreamReader(in, charset); final NLKBufferedReader reader = new NLKBufferedReader(inReader)) { + final Map variables = new HashMap<>(2); + + int lineCount = 0; String line; while ((line = reader.readLine()) != null) { @@ -389,9 +423,12 @@ public class RouteText extends AbstractProcessor { matchLine = lineWithoutEndings; } + variables.put("line", line); + variables.put("lineNo", String.valueOf(++lineCount)); + int propertiesThatMatchedLine = 0; for (final Map.Entry entry : propValueMap.entrySet()) { - boolean lineMatchesProperty = lineMatches(matchLine, entry.getValue(), context.getProperty(MATCH_STRATEGY).getValue(), ignoreCase); + boolean lineMatchesProperty = lineMatches(matchLine, entry.getValue(), context.getProperty(MATCH_STRATEGY).getValue(), ignoreCase, originalFlowFile, variables); if (lineMatchesProperty) { propertiesThatMatchedLine++; } @@ -507,7 +544,8 @@ public class RouteText extends AbstractProcessor { } - protected static boolean lineMatches(final String line, final Object comparison, final String matchingStrategy, final boolean ignoreCase) { + protected static boolean lineMatches(final String line, final Object comparison, final String matchingStrategy, final boolean ignoreCase, + final FlowFile flowFile, final Map variables) { switch (matchingStrategy) { case startsWithValue: if (ignoreCase) { @@ -537,6 +575,10 @@ public class RouteText extends AbstractProcessor { return ((Pattern) comparison).matcher(line).matches(); case containsRegularExpressionValue: return ((Pattern) comparison).matcher(line).find(); + case satisfiesExpression: { + final PropertyValue booleanProperty = (PropertyValue) comparison; + return booleanProperty.evaluateAttributeExpressions(flowFile, variables).asBoolean(); + } } return false; diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestRouteText.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestRouteText.java index 7c40445666..71bd83b371 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestRouteText.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestRouteText.java @@ -669,6 +669,30 @@ public class TestRouteText { outUnmatched.assertContentEquals("not match".getBytes("UTF-8")); } + @Test + public void testSatisfiesExpression() throws IOException { + final TestRunner runner = TestRunners.newTestRunner(new RouteText()); + runner.setProperty(RouteText.MATCH_STRATEGY, RouteText.SATISFIES_EXPRESSION); + runner.setProperty("empty", "${incomplete expression"); + runner.assertNotValid(); + + runner.setProperty("empty", "${line:isEmpty()}"); + runner.setProperty("third-line", "${lineNo:equals(3)}"); + runner.setProperty("second-field-you", "${line:getDelimitedField(2):trim():equals('you')}"); + runner.enqueue("hello\n\ngood-bye, you\n \t\t\n"); + runner.run(); + + runner.assertTransferCount("empty", 1); + runner.assertTransferCount("third-line", 1); + runner.assertTransferCount("second-field-you", 1); + runner.assertTransferCount("unmatched", 1); + runner.assertTransferCount("original", 1); + + runner.getFlowFilesForRelationship("empty").get(0).assertContentEquals("\n \t\t\n"); + runner.getFlowFilesForRelationship("third-line").get(0).assertContentEquals("good-bye, you\n"); + runner.getFlowFilesForRelationship("second-field-you").get(0).assertContentEquals("good-bye, you\n"); + } + @Test public void testJson() throws IOException { final TestRunner runner = TestRunners.newTestRunner(new RouteText());