NIFI-399 addressed items in the ticket

This commit is contained in:
joewitt 2015-03-19 01:21:32 -04:00
parent ad18853b58
commit a6740a6e2c
4 changed files with 144 additions and 77 deletions

View File

@ -287,6 +287,28 @@ public class StandardValidators {
return createAttributeExpressionLanguageValidator(expectedResultType, true); return createAttributeExpressionLanguageValidator(expectedResultType, true);
} }
public static Validator createDataSizeBoundsValidator(final long minBytesInclusive, final long maxBytesInclusive) {
return new Validator() {
@Override
public ValidationResult validate(final String subject, final String input, final ValidationContext context) {
final ValidationResult vr = DATA_SIZE_VALIDATOR.validate(subject, input, context);
if(!vr.isValid()){
return vr;
}
final long dataSizeBytes = DataUnit.parseDataSize(input, DataUnit.B).longValue();
if(dataSizeBytes < minBytesInclusive){
return new ValidationResult.Builder().subject(subject).input(input).valid(false).explanation("Cannot be smaller than " + minBytesInclusive + " bytes").build();
}
if(dataSizeBytes > maxBytesInclusive){
return new ValidationResult.Builder().subject(subject).input(input).valid(false).explanation("Cannot be larger than " + maxBytesInclusive + " bytes").build();
}
return new ValidationResult.Builder().subject(subject).input(input).valid(true).build();
}
};
}
public static Validator createRegexMatchingValidator(final Pattern pattern) { public static Validator createRegexMatchingValidator(final Pattern pattern) {
return new Validator() { return new Validator() {
@Override @Override

View File

@ -51,4 +51,35 @@ public class TestStandardValidators {
vr = val.validate("TimePeriodTest", "1 sec", null); vr = val.validate("TimePeriodTest", "1 sec", null);
assertTrue(vr.isValid()); assertTrue(vr.isValid());
} }
@Test
public void testDataSizeBoundsValidator() {
Validator val = StandardValidators.createDataSizeBoundsValidator(100, 1000);
ValidationResult vr;
vr = val.validate("DataSizeBounds", "5 GB", null);
assertFalse(vr.isValid());
vr = val.validate("DataSizeBounds", "0 B", null);
assertFalse(vr.isValid());
vr = val.validate("DataSizeBounds", "99 B", null);
assertFalse(vr.isValid());
vr = val.validate("DataSizeBounds", "100 B", null);
assertTrue(vr.isValid());
vr = val.validate("DataSizeBounds", "999 B", null);
assertTrue(vr.isValid());
vr = val.validate("DataSizeBounds", "1000 B", null);
assertTrue(vr.isValid());
vr = val.validate("DataSizeBounds", "1001 B", null);
assertFalse(vr.isValid());
vr = val.validate("DataSizeBounds", "water", null);
assertFalse(vr.isValid());
}
} }

View File

@ -26,6 +26,7 @@ import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Matcher; import java.util.regex.Matcher;
import java.util.regex.Pattern; import java.util.regex.Pattern;
@ -47,7 +48,7 @@ import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.processor.io.InputStreamCallback; import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processor.util.StandardValidators;
import org.apache.commons.lang3.StringUtils; import org.apache.nifi.annotation.lifecycle.OnScheduled;
@EventDriven @EventDriven
@SideEffectFree @SideEffectFree
@ -58,10 +59,14 @@ import org.apache.commons.lang3.StringUtils;
+ "The results of those Regular Expressions are assigned to FlowFile Attributes. " + "The results of those Regular Expressions are assigned to FlowFile Attributes. "
+ "Regular Expressions are entered by adding user-defined properties; " + "Regular Expressions are entered by adding user-defined properties; "
+ "the name of the property maps to the Attribute Name into which the result will be placed. " + "the name of the property maps to the Attribute Name into which the result will be placed. "
+ "The value of the property must be a valid Regular Expressions with exactly one capturing group. " + "The first capture group, if any found, will be placed into that attribute name."
+ "But all catpure groups, including the matching string sequence itself will also be "
+ "provided at that attribute name with an index value provided."
+ "The value of the property must be a valid Regular Expressions with one or more capturing groups. "
+ "If the Regular Expression matches more than once, only the first match will be used. " + "If the Regular Expression matches more than once, only the first match will be used. "
+ "If any provided Regular Expression matches, the FlowFile(s) will be routed to 'matched'. " + "If any provided Regular Expression matches, the FlowFile(s) will be routed to 'matched'. "
+ "If no provided Regular Expression matches, the FlowFile will be routed to 'unmatched' and no attributes will be applied to the FlowFile.") + "If no provided Regular Expression matches, the FlowFile will be routed to 'unmatched' "
+ "and no attributes will be applied to the FlowFile.")
public class ExtractText extends AbstractProcessor { public class ExtractText extends AbstractProcessor {
@ -78,9 +83,18 @@ public class ExtractText extends AbstractProcessor {
.description("Specifies the maximum amount of data to buffer (per file) in order to apply the regular expressions. Files larger than the specified maximum will not be fully evaluated.") .description("Specifies the maximum amount of data to buffer (per file) in order to apply the regular expressions. Files larger than the specified maximum will not be fully evaluated.")
.required(true) .required(true)
.addValidator(StandardValidators.DATA_SIZE_VALIDATOR) .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
.addValidator(StandardValidators.createDataSizeBoundsValidator(0, Integer.MAX_VALUE))
.defaultValue("1 MB") .defaultValue("1 MB")
.build(); .build();
public static final PropertyDescriptor MAX_CAPTURE_GROUP_LENGTH = new PropertyDescriptor.Builder()
.name("Maximum Capture Group Length")
.description("Specifies the maximum number of characters a given capture group value can have. Any characters beyond the max will be truncated.")
.required(false)
.defaultValue("1024")
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.build();
public static final PropertyDescriptor CANON_EQ = new PropertyDescriptor.Builder() public static final PropertyDescriptor CANON_EQ = new PropertyDescriptor.Builder()
.name("Enable Canonical Equivalence") .name("Enable Canonical Equivalence")
.description("Indicates that two characters match only when their full canonical decompositions match.") .description("Indicates that two characters match only when their full canonical decompositions match.")
@ -168,27 +182,29 @@ public class ExtractText extends AbstractProcessor {
private Set<Relationship> relationships; private Set<Relationship> relationships;
private List<PropertyDescriptor> properties; private List<PropertyDescriptor> properties;
private final AtomicReference<Map<String, Pattern>> compiledPattersMapRef = new AtomicReference<>();
@Override @Override
protected void init(final ProcessorInitializationContext context) { protected void init(final ProcessorInitializationContext context) {
final Set<Relationship> relationships = new HashSet<>(); final Set<Relationship> rels = new HashSet<>();
relationships.add(REL_MATCH); rels.add(REL_MATCH);
relationships.add(REL_NO_MATCH); rels.add(REL_NO_MATCH);
this.relationships = Collections.unmodifiableSet(relationships); this.relationships = Collections.unmodifiableSet(rels);
final List<PropertyDescriptor> properties = new ArrayList<>(); final List<PropertyDescriptor> props = new ArrayList<>();
properties.add(CHARACTER_SET); props.add(CHARACTER_SET);
properties.add(MAX_BUFFER_SIZE); props.add(MAX_BUFFER_SIZE);
properties.add(CANON_EQ); props.add(MAX_CAPTURE_GROUP_LENGTH);
properties.add(CASE_INSENSITIVE); props.add(CANON_EQ);
properties.add(COMMENTS); props.add(CASE_INSENSITIVE);
properties.add(DOTALL); props.add(COMMENTS);
properties.add(LITERAL); props.add(DOTALL);
properties.add(MULTILINE); props.add(LITERAL);
properties.add(UNICODE_CASE); props.add(MULTILINE);
properties.add(UNICODE_CHARACTER_CLASS); props.add(UNICODE_CASE);
properties.add(UNIX_LINES); props.add(UNICODE_CHARACTER_CLASS);
this.properties = Collections.unmodifiableList(properties); props.add(UNIX_LINES);
this.properties = Collections.unmodifiableList(props);
} }
@Override @Override
@ -206,77 +222,80 @@ public class ExtractText extends AbstractProcessor {
return new PropertyDescriptor.Builder() return new PropertyDescriptor.Builder()
.name(propertyDescriptorName) .name(propertyDescriptorName)
.expressionLanguageSupported(false) .expressionLanguageSupported(false)
.addValidator(StandardValidators.createRegexValidator(1, 1, true)) .addValidator(StandardValidators.createRegexValidator(1, 40, true))
.required(false) .required(false)
.dynamic(true) .dynamic(true)
.build(); .build();
} }
@Override @OnScheduled
public void onTrigger(final ProcessContext context, final ProcessSession session) { public final void onScheduled(final ProcessContext context) throws IOException {
final List<FlowFile> flowFileBatch = session.get(50); final Map<String, Pattern> compiledPatternsMap = new HashMap<>();
if (flowFileBatch.isEmpty()) {
return;
}
final ProcessorLog logger = getLogger();
// Compile the Regular Expressions
Map<String, Matcher> regexMap = new HashMap<>();
for (final Map.Entry<PropertyDescriptor, String> entry : context.getProperties().entrySet()) { for (final Map.Entry<PropertyDescriptor, String> entry : context.getProperties().entrySet()) {
if (!entry.getKey().isDynamic()) { if (!entry.getKey().isDynamic()) {
continue; continue;
} }
final int flags = getCompileFlags(context); final int flags = getCompileFlags(context);
final Matcher matcher = Pattern.compile(entry.getValue(), flags).matcher(""); final Pattern pattern = Pattern.compile(entry.getValue(), flags);
regexMap.put(entry.getKey().getName(), matcher); compiledPatternsMap.put(entry.getKey().getName(), pattern);
} }
compiledPattersMapRef.set(compiledPatternsMap);
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) {
FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}
final ProcessorLog logger = getLogger();
final Charset charset = Charset.forName(context.getProperty(CHARACTER_SET).getValue()); final Charset charset = Charset.forName(context.getProperty(CHARACTER_SET).getValue());
final int maxCaptureGroupLength = context.getProperty(MAX_CAPTURE_GROUP_LENGTH).asInteger();
final long maxBufferSizeL = Math.min(flowFile.getSize(), context.getProperty(MAX_BUFFER_SIZE).asDataSize(DataUnit.B).longValue());
final byte[] buffer = new byte[(int) maxBufferSizeL];
final int maxBufferSize = context.getProperty(MAX_BUFFER_SIZE).asDataSize(DataUnit.B).intValue(); session.read(flowFile, new InputStreamCallback() {
@Override
public void process(InputStream in) throws IOException {
StreamUtils.fillBuffer(in, buffer, false);
}
});
for (FlowFile flowFile : flowFileBatch) { final String contentString = new String(buffer, 0, (int) maxBufferSizeL, charset);
final Map<String, String> regexResults = new HashMap<>();
final Map<String, String> regexResults = new HashMap<>(); final Map<String, Pattern> patternMap = compiledPattersMapRef.get();
for (final Map.Entry<String, Pattern> entry : patternMap.entrySet()) {
final byte[] buffer = new byte[maxBufferSize]; final Matcher matcher = entry.getValue().matcher(contentString);
session.read(flowFile, new InputStreamCallback() { if (matcher.find()) {
@Override final String baseKey = entry.getKey();
public void process(InputStream in) throws IOException { for (int i = 0; i <= matcher.groupCount(); i++) {
StreamUtils.fillBuffer(in, buffer, false); final String key = new StringBuilder(baseKey).append(".").append(i).toString();
} String value = matcher.group(i);
}); if (value.length() > maxCaptureGroupLength) {
value = value.substring(0, maxCaptureGroupLength);
final int flowFileSize = Math.min((int) flowFile.getSize(), maxBufferSize); }
regexResults.put(key, value);
final String contentString = new String(buffer, 0, flowFileSize, charset); if (i == 1) {
regexResults.put(baseKey, value);
for (final Map.Entry<String, Matcher> entry : regexMap.entrySet()) {
final Matcher matcher = entry.getValue();
matcher.reset(contentString);
if (matcher.find()) {
final String group = matcher.group(1);
if (!StringUtils.isBlank(group)) {
regexResults.put(entry.getKey(), group);
} }
} }
} }
}
if (!regexResults.isEmpty()) { if (!regexResults.isEmpty()) {
flowFile = session.putAllAttributes(flowFile, regexResults); flowFile = session.putAllAttributes(flowFile, regexResults);
session.getProvenanceReporter().modifyAttributes(flowFile); session.getProvenanceReporter().modifyAttributes(flowFile);
session.transfer(flowFile, REL_MATCH); session.transfer(flowFile, REL_MATCH);
logger.info("Matched {} Regular Expressions and added attributes to FlowFile {}", new Object[]{regexResults.size(), flowFile}); logger.info("Matched {} Regular Expressions and added attributes to FlowFile {}", new Object[]{regexResults.size(), flowFile});
} else { } else {
session.transfer(flowFile, REL_NO_MATCH); session.transfer(flowFile, REL_NO_MATCH);
logger.info("Did not match any Regular Expressions for FlowFile {}", new Object[]{flowFile}); logger.info("Did not match any Regular Expressions for FlowFile {}", new Object[]{flowFile});
} }
} // end flowFileLoop
} }
int getCompileFlags(ProcessContext context) { int getCompileFlags(ProcessContext context) {

View File

@ -16,7 +16,6 @@
*/ */
package org.apache.nifi.processors.standard; package org.apache.nifi.processors.standard;
import org.apache.nifi.processors.standard.ExtractText;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
@ -43,7 +42,7 @@ public class TestExtractText {
testRunner.setProperty("regex.result1", "(?s)(.*)"); testRunner.setProperty("regex.result1", "(?s)(.*)");
testRunner.setProperty("regex.result2", "(?s).*(bar1).*"); testRunner.setProperty("regex.result2", "(?s).*(bar1).*");
testRunner.setProperty("regex.result3", "(?s).*?(bar\\d).*"); // reluctant gets first testRunner.setProperty("regex.result3", "(?s).*?(bar\\d).*"); // reluctant gets first
testRunner.setProperty("regex.result4", "(?s).*?(?:bar\\d).*?(bar\\d).*"); // reluctant w/ repeated pattern gets second testRunner.setProperty("regex.result4", "(?s).*?(?:bar\\d).*?(bar\\d).*?(bar3).*"); // reluctant w/ repeated pattern gets second
testRunner.setProperty("regex.result5", "(?s).*(bar\\d).*"); // greedy gets last testRunner.setProperty("regex.result5", "(?s).*(bar\\d).*"); // greedy gets last
testRunner.setProperty("regex.result6", "(?s)^(.*)$"); testRunner.setProperty("regex.result6", "(?s)^(.*)$");
testRunner.setProperty("regex.result7", "(?s)(XXX)"); testRunner.setProperty("regex.result7", "(?s)(XXX)");
@ -57,6 +56,10 @@ public class TestExtractText {
out.assertAttributeEquals("regex.result2", "bar1"); out.assertAttributeEquals("regex.result2", "bar1");
out.assertAttributeEquals("regex.result3", "bar1"); out.assertAttributeEquals("regex.result3", "bar1");
out.assertAttributeEquals("regex.result4", "bar2"); out.assertAttributeEquals("regex.result4", "bar2");
out.assertAttributeEquals("regex.result4.0", SAMPLE_STRING);
out.assertAttributeEquals("regex.result4.1", "bar2");
out.assertAttributeEquals("regex.result4.2", "bar3");
out.assertAttributeNotExists("regex.result4.3");
out.assertAttributeEquals("regex.result5", "bar3"); out.assertAttributeEquals("regex.result5", "bar3");
out.assertAttributeEquals("regex.result6", SAMPLE_STRING); out.assertAttributeEquals("regex.result6", SAMPLE_STRING);
out.assertAttributeEquals("regex.result7", null); out.assertAttributeEquals("regex.result7", null);
@ -209,14 +212,6 @@ public class TestExtractText {
} }
@Test(expected = java.lang.AssertionError.class)
public void testTooManyCaptureGroups() throws UnsupportedEncodingException {
final TestRunner testRunner = TestRunners.newTestRunner(new ExtractText());
testRunner.setProperty("regex.result1", "(.)(.)");
testRunner.enqueue(SAMPLE_STRING.getBytes("UTF-8"));
testRunner.run();
}
@Test @Test
public void testMatchOutsideBuffer() throws Exception { public void testMatchOutsideBuffer() throws Exception {
final TestRunner testRunner = TestRunners.newTestRunner(new ExtractText()); final TestRunner testRunner = TestRunners.newTestRunner(new ExtractText());