NIFI-1256: Added a 'Satisfies Expression' option for Matching Strategy of RouteText

Signed-off-by: jpercivall <joepercivall@yahoo.com>
This commit is contained in:
Mark Payne 2016-02-23 12:50:31 -05:00 committed by jpercivall
parent c7e24c7569
commit 28c2a3e5a6
4 changed files with 93 additions and 17 deletions

View File

@ -157,7 +157,7 @@ public class GetDelimitedFieldEvaluator extends StringEvaluator {
lastCharIsEscape = (c == escapeChar) && !lastCharIsEscape; lastCharIsEscape = (c == escapeChar) && !lastCharIsEscape;
} }
if (curFieldIndex == desiredFieldIndex - 1) { if (curFieldIndex == desiredFieldIndex) {
// we have run out of characters and we are on the desired field. Return the characters from this field. // we have run out of characters and we are on the desired field. Return the characters from this field.
return new StringQueryResult(fieldBuilder.toString()); return new StringQueryResult(fieldBuilder.toString());
} }

View File

@ -1163,11 +1163,21 @@ public class TestQuery {
attributes.put("line", "Name, Age, Title"); attributes.put("line", "Name, Age, Title");
// Test "simple" case - comma separated with no quoted or escaped text // Test "simple" case - comma separated with no quoted or escaped text
verifyEquals("${line:getDelimitedField(1)}", attributes, "Name");
verifyEquals("${line:getDelimitedField(1, ',')}", attributes, "Name");
verifyEquals("${line:getDelimitedField(1, ',', '\"')}", attributes, "Name");
verifyEquals("${line:getDelimitedField(1, ',', '\"', '\\\\')}", attributes, "Name");
verifyEquals("${line:getDelimitedField(2)}", attributes, " Age"); verifyEquals("${line:getDelimitedField(2)}", attributes, " Age");
verifyEquals("${line:getDelimitedField(2, ',')}", attributes, " Age"); verifyEquals("${line:getDelimitedField(2, ',')}", attributes, " Age");
verifyEquals("${line:getDelimitedField(2, ',', '\"')}", attributes, " Age"); verifyEquals("${line:getDelimitedField(2, ',', '\"')}", attributes, " Age");
verifyEquals("${line:getDelimitedField(2, ',', '\"', '\\\\')}", attributes, " Age"); verifyEquals("${line:getDelimitedField(2, ',', '\"', '\\\\')}", attributes, " Age");
verifyEquals("${line:getDelimitedField(3)}", attributes, " Title");
verifyEquals("${line:getDelimitedField(3, ',')}", attributes, " Title");
verifyEquals("${line:getDelimitedField(3, ',', '\"')}", attributes, " Title");
verifyEquals("${line:getDelimitedField(3, ',', '\"', '\\\\')}", attributes, " Title");
// test with a space in column // test with a space in column
attributes.put("line", "First Name, Age, Title"); attributes.put("line", "First Name, Age, Title");
verifyEquals("${line:getDelimitedField(1)}", attributes, "First Name"); verifyEquals("${line:getDelimitedField(1)}", attributes, "First Name");

View File

@ -52,6 +52,7 @@ import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult; import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator; import org.apache.nifi.components.Validator;
import org.apache.nifi.expression.AttributeExpression.ResultType;
import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ProcessorLog; import org.apache.nifi.logging.ProcessorLog;
import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.AbstractProcessor;
@ -69,7 +70,7 @@ import org.apache.nifi.processors.standard.util.NLKBufferedReader;
@SideEffectFree @SideEffectFree
@SupportsBatching @SupportsBatching
@InputRequirement(Requirement.INPUT_REQUIRED) @InputRequirement(Requirement.INPUT_REQUIRED)
@Tags({"attributes", "routing", "text", "regexp", "regex", "Regular Expression", "Expression Language"}) @Tags({"attributes", "routing", "text", "regexp", "regex", "Regular Expression", "Expression Language", "csv", "filter", "logs", "delimited"})
@CapabilityDescription("Routes textual data based on a set of user-defined rules. Each line in an incoming FlowFile is compared against the values specified by user-defined Properties. " @CapabilityDescription("Routes textual data based on a set of user-defined rules. Each line in an incoming FlowFile is compared against the values specified by user-defined Properties. "
+ "The mechanism by which the text is compared to these user-defined properties is defined by the 'Matching Strategy'. The data is then routed according to these rules, routing " + "The mechanism by which the text is compared to these user-defined properties is defined by the 'Matching Strategy'. The data is then routed according to these rules, routing "
+ "each line of the text individually.") + "each line of the text individually.")
@ -91,6 +92,7 @@ public class RouteText extends AbstractProcessor {
private static final String equalsValue = "Equals"; private static final String equalsValue = "Equals";
private static final String matchesRegularExpressionValue = "Matches Regular Expression"; private static final String matchesRegularExpressionValue = "Matches Regular Expression";
private static final String containsRegularExpressionValue = "Contains Regular Expression"; private static final String containsRegularExpressionValue = "Contains Regular Expression";
private static final String satisfiesExpression = "Satisfies Expression";
public static final AllowableValue ROUTE_TO_MATCHING_PROPERTY_NAME = new AllowableValue(routePropertyNameValue, routePropertyNameValue, public static final AllowableValue ROUTE_TO_MATCHING_PROPERTY_NAME = new AllowableValue(routePropertyNameValue, routePropertyNameValue,
@ -112,7 +114,10 @@ public class RouteText extends AbstractProcessor {
"Match lines based on whether the line exactly matches the Regular Expression that is provided as the Property value"); "Match lines based on whether the line exactly matches the Regular Expression that is provided as the Property value");
public static final AllowableValue CONTAINS_REGULAR_EXPRESSION = new AllowableValue(containsRegularExpressionValue, containsRegularExpressionValue, public static final AllowableValue CONTAINS_REGULAR_EXPRESSION = new AllowableValue(containsRegularExpressionValue, containsRegularExpressionValue,
"Match lines based on whether the line contains some text that matches the Regular Expression that is provided as the Property value"); "Match lines based on whether the line contains some text that matches the Regular Expression that is provided as the Property value");
public static final AllowableValue SATISFIES_EXPRESSION = new AllowableValue(satisfiesExpression, satisfiesExpression,
"Match lines based on whether or not the the text satisfies the given Expression Language expression. I.e., the line will match if the property value, evaluated as "
+ "an Expression, returns true. The expression is able to reference FlowFile Attributes, as well as the variables 'line' (which is the text of the line to evaluate) and "
+ "'lineNo' (which is the line number being evaluated. This will be 1 for the first line, 2 for the second and so on).");
public static final PropertyDescriptor ROUTE_STRATEGY = new PropertyDescriptor.Builder() public static final PropertyDescriptor ROUTE_STRATEGY = new PropertyDescriptor.Builder()
.name("Routing Strategy") .name("Routing Strategy")
@ -127,7 +132,7 @@ public class RouteText extends AbstractProcessor {
.name("Matching Strategy") .name("Matching Strategy")
.description("Specifies how to evaluate each line of incoming text against the user-defined properties.") .description("Specifies how to evaluate each line of incoming text against the user-defined properties.")
.required(true) .required(true)
.allowableValues(STARTS_WITH, ENDS_WITH, CONTAINS, EQUALS, MATCHES_REGULAR_EXPRESSION, CONTAINS_REGULAR_EXPRESSION) .allowableValues(SATISFIES_EXPRESSION, STARTS_WITH, ENDS_WITH, CONTAINS, EQUALS, MATCHES_REGULAR_EXPRESSION, CONTAINS_REGULAR_EXPRESSION)
.dynamic(false) .dynamic(false)
.build(); .build();
@ -142,7 +147,8 @@ public class RouteText extends AbstractProcessor {
static final PropertyDescriptor IGNORE_CASE = new PropertyDescriptor.Builder() static final PropertyDescriptor IGNORE_CASE = new PropertyDescriptor.Builder()
.name("Ignore Case") .name("Ignore Case")
.description("If true, capitalization will not be taken into account when comparing values. E.g., matching against 'HELLO' or 'hello' will have the same result.") .description("If true, capitalization will not be taken into account when comparing values. E.g., matching against 'HELLO' or 'hello' will have the same result. "
+ "This property is ignored if the 'Matching Strategy' is set to 'Satisfies Expression'.")
.expressionLanguageSupported(false) .expressionLanguageSupported(false)
.allowableValues("true", "false") .allowableValues("true", "false")
.defaultValue("false") .defaultValue("false")
@ -298,6 +304,8 @@ public class RouteText extends AbstractProcessor {
final String matchStrategy = validationContext.getProperty(MATCH_STRATEGY).getValue(); final String matchStrategy = validationContext.getProperty(MATCH_STRATEGY).getValue();
final boolean compileRegex = matchStrategy.equals(matchesRegularExpressionValue) || matchStrategy.equals(containsRegularExpressionValue); final boolean compileRegex = matchStrategy.equals(matchesRegularExpressionValue) || matchStrategy.equals(containsRegularExpressionValue);
final boolean requiresExpression = matchStrategy.equalsIgnoreCase(satisfiesExpression);
Validator validator = null; Validator validator = null;
if (compileRegex) { if (compileRegex) {
validator = StandardValidators.createRegexValidator(0, Integer.MAX_VALUE, true); validator = StandardValidators.createRegexValidator(0, Integer.MAX_VALUE, true);
@ -308,11 +316,24 @@ public class RouteText extends AbstractProcessor {
if (descriptor.isDynamic()) { if (descriptor.isDynamic()) {
dynamicProperty = true; dynamicProperty = true;
final String propValue = validationContext.getProperty(descriptor).getValue();
if (compileRegex) { if (compileRegex) {
ValidationResult validationResult = validator.validate(descriptor.getName(), validationContext.getProperty(descriptor).getValue(), validationContext); ValidationResult validationResult = validator.validate(descriptor.getName(), propValue, validationContext);
if (validationResult != null) { if (validationResult != null) {
results.add(validationResult); results.add(validationResult);
} }
} else if (requiresExpression) {
try {
final ResultType resultType = validationContext.newExpressionLanguageCompiler().compile(propValue).getResultType();
if (resultType != ResultType.BOOLEAN) {
results.add(new ValidationResult.Builder().valid(false).input(propValue).subject(descriptor.getName())
.explanation("expression returns type of " + resultType.name() + " but is required to return a Boolean value").build());
}
} catch (final IllegalArgumentException iae) {
results.add(new ValidationResult.Builder().valid(false).input(propValue).subject(descriptor.getName())
.explanation("input is not a valid Expression Language expression").build());
}
} }
} }
} }
@ -326,6 +347,7 @@ public class RouteText extends AbstractProcessor {
} }
@Override @Override
@SuppressWarnings({"unchecked", "rawtypes"})
public void onTrigger(final ProcessContext context, final ProcessSession session) { public void onTrigger(final ProcessContext context, final ProcessSession session) {
final FlowFile originalFlowFile = session.get(); final FlowFile originalFlowFile = session.get();
if (originalFlowFile == null) { if (originalFlowFile == null) {
@ -339,19 +361,28 @@ public class RouteText extends AbstractProcessor {
final String matchStrategy = context.getProperty(MATCH_STRATEGY).getValue(); final String matchStrategy = context.getProperty(MATCH_STRATEGY).getValue();
final boolean ignoreCase = context.getProperty(IGNORE_CASE).asBoolean(); final boolean ignoreCase = context.getProperty(IGNORE_CASE).asBoolean();
final Map<Relationship, PropertyValue> propMap = this.propertyMap;
final Map<Relationship, Object> propValueMap = new HashMap<>(propMap.size());
final boolean compileRegex = matchStrategy.equals(matchesRegularExpressionValue) || matchStrategy.equals(containsRegularExpressionValue); final boolean compileRegex = matchStrategy.equals(matchesRegularExpressionValue) || matchStrategy.equals(containsRegularExpressionValue);
final boolean usePropValue = matchStrategy.equals(satisfiesExpression);
for (final Map.Entry<Relationship, PropertyValue> entry : propMap.entrySet()) { // Build up a Map of Relationship to object, where the object is the
final String value = entry.getValue().evaluateAttributeExpressions(originalFlowFile).getValue(); // thing that each line is compared against
final Map<Relationship, Object> propValueMap;
final Map<Relationship, PropertyValue> propMap = this.propertyMap;
if (usePropValue) {
// If we are using an Expression Language we want a Map where the value is the
// PropertyValue, so we can just use the 'propMap' - no need to copy it.
propValueMap = (Map) propMap;
} else {
propValueMap = new HashMap<>(propMap.size());
for (final Map.Entry<Relationship, PropertyValue> entry : propMap.entrySet()) {
final String value = entry.getValue().evaluateAttributeExpressions(originalFlowFile).getValue();
Pattern compiledRegex = null; Pattern compiledRegex = null;
if (compileRegex) { if (compileRegex) {
compiledRegex = ignoreCase ? Pattern.compile(value, Pattern.CASE_INSENSITIVE) : Pattern.compile(value); compiledRegex = ignoreCase ? Pattern.compile(value, Pattern.CASE_INSENSITIVE) : Pattern.compile(value);
}
propValueMap.put(entry.getKey(), compileRegex ? compiledRegex : value);
} }
propValueMap.put(entry.getKey(), compileRegex ? compiledRegex : value);
} }
final Map<Relationship, Map<Group, FlowFile>> flowFileMap = new HashMap<>(); final Map<Relationship, Map<Group, FlowFile>> flowFileMap = new HashMap<>();
@ -363,6 +394,9 @@ public class RouteText extends AbstractProcessor {
try (final Reader inReader = new InputStreamReader(in, charset); try (final Reader inReader = new InputStreamReader(in, charset);
final NLKBufferedReader reader = new NLKBufferedReader(inReader)) { final NLKBufferedReader reader = new NLKBufferedReader(inReader)) {
final Map<String, String> variables = new HashMap<>(2);
int lineCount = 0;
String line; String line;
while ((line = reader.readLine()) != null) { while ((line = reader.readLine()) != null) {
@ -389,9 +423,12 @@ public class RouteText extends AbstractProcessor {
matchLine = lineWithoutEndings; matchLine = lineWithoutEndings;
} }
variables.put("line", line);
variables.put("lineNo", String.valueOf(++lineCount));
int propertiesThatMatchedLine = 0; int propertiesThatMatchedLine = 0;
for (final Map.Entry<Relationship, Object> entry : propValueMap.entrySet()) { for (final Map.Entry<Relationship, Object> entry : propValueMap.entrySet()) {
boolean lineMatchesProperty = lineMatches(matchLine, entry.getValue(), context.getProperty(MATCH_STRATEGY).getValue(), ignoreCase); boolean lineMatchesProperty = lineMatches(matchLine, entry.getValue(), context.getProperty(MATCH_STRATEGY).getValue(), ignoreCase, originalFlowFile, variables);
if (lineMatchesProperty) { if (lineMatchesProperty) {
propertiesThatMatchedLine++; propertiesThatMatchedLine++;
} }
@ -507,7 +544,8 @@ public class RouteText extends AbstractProcessor {
} }
protected static boolean lineMatches(final String line, final Object comparison, final String matchingStrategy, final boolean ignoreCase) { protected static boolean lineMatches(final String line, final Object comparison, final String matchingStrategy, final boolean ignoreCase,
final FlowFile flowFile, final Map<String, String> variables) {
switch (matchingStrategy) { switch (matchingStrategy) {
case startsWithValue: case startsWithValue:
if (ignoreCase) { if (ignoreCase) {
@ -537,6 +575,10 @@ public class RouteText extends AbstractProcessor {
return ((Pattern) comparison).matcher(line).matches(); return ((Pattern) comparison).matcher(line).matches();
case containsRegularExpressionValue: case containsRegularExpressionValue:
return ((Pattern) comparison).matcher(line).find(); return ((Pattern) comparison).matcher(line).find();
case satisfiesExpression: {
final PropertyValue booleanProperty = (PropertyValue) comparison;
return booleanProperty.evaluateAttributeExpressions(flowFile, variables).asBoolean();
}
} }
return false; return false;

View File

@ -669,6 +669,30 @@ public class TestRouteText {
outUnmatched.assertContentEquals("not match".getBytes("UTF-8")); outUnmatched.assertContentEquals("not match".getBytes("UTF-8"));
} }
@Test
public void testSatisfiesExpression() throws IOException {
final TestRunner runner = TestRunners.newTestRunner(new RouteText());
runner.setProperty(RouteText.MATCH_STRATEGY, RouteText.SATISFIES_EXPRESSION);
runner.setProperty("empty", "${incomplete expression");
runner.assertNotValid();
runner.setProperty("empty", "${line:isEmpty()}");
runner.setProperty("third-line", "${lineNo:equals(3)}");
runner.setProperty("second-field-you", "${line:getDelimitedField(2):trim():equals('you')}");
runner.enqueue("hello\n\ngood-bye, you\n \t\t\n");
runner.run();
runner.assertTransferCount("empty", 1);
runner.assertTransferCount("third-line", 1);
runner.assertTransferCount("second-field-you", 1);
runner.assertTransferCount("unmatched", 1);
runner.assertTransferCount("original", 1);
runner.getFlowFilesForRelationship("empty").get(0).assertContentEquals("\n \t\t\n");
runner.getFlowFilesForRelationship("third-line").get(0).assertContentEquals("good-bye, you\n");
runner.getFlowFilesForRelationship("second-field-you").get(0).assertContentEquals("good-bye, you\n");
}
@Test @Test
public void testJson() throws IOException { public void testJson() throws IOException {
final TestRunner runner = TestRunners.newTestRunner(new RouteText()); final TestRunner runner = TestRunners.newTestRunner(new RouteText());