NIFI-399 addressed items in the ticket

This commit is contained in:
joewitt 2015-03-19 01:21:32 -04:00
parent ad18853b58
commit a6740a6e2c
4 changed files with 144 additions and 77 deletions

View File

@ -287,6 +287,28 @@ public class StandardValidators {
return createAttributeExpressionLanguageValidator(expectedResultType, true);
}
public static Validator createDataSizeBoundsValidator(final long minBytesInclusive, final long maxBytesInclusive) {
return new Validator() {
@Override
public ValidationResult validate(final String subject, final String input, final ValidationContext context) {
final ValidationResult vr = DATA_SIZE_VALIDATOR.validate(subject, input, context);
if(!vr.isValid()){
return vr;
}
final long dataSizeBytes = DataUnit.parseDataSize(input, DataUnit.B).longValue();
if(dataSizeBytes < minBytesInclusive){
return new ValidationResult.Builder().subject(subject).input(input).valid(false).explanation("Cannot be smaller than " + minBytesInclusive + " bytes").build();
}
if(dataSizeBytes > maxBytesInclusive){
return new ValidationResult.Builder().subject(subject).input(input).valid(false).explanation("Cannot be larger than " + maxBytesInclusive + " bytes").build();
}
return new ValidationResult.Builder().subject(subject).input(input).valid(true).build();
}
};
}
public static Validator createRegexMatchingValidator(final Pattern pattern) {
return new Validator() {
@Override

View File

@ -51,4 +51,35 @@ public class TestStandardValidators {
vr = val.validate("TimePeriodTest", "1 sec", null);
assertTrue(vr.isValid());
}
@Test
public void testDataSizeBoundsValidator() {
Validator val = StandardValidators.createDataSizeBoundsValidator(100, 1000);
ValidationResult vr;
vr = val.validate("DataSizeBounds", "5 GB", null);
assertFalse(vr.isValid());
vr = val.validate("DataSizeBounds", "0 B", null);
assertFalse(vr.isValid());
vr = val.validate("DataSizeBounds", "99 B", null);
assertFalse(vr.isValid());
vr = val.validate("DataSizeBounds", "100 B", null);
assertTrue(vr.isValid());
vr = val.validate("DataSizeBounds", "999 B", null);
assertTrue(vr.isValid());
vr = val.validate("DataSizeBounds", "1000 B", null);
assertTrue(vr.isValid());
vr = val.validate("DataSizeBounds", "1001 B", null);
assertFalse(vr.isValid());
vr = val.validate("DataSizeBounds", "water", null);
assertFalse(vr.isValid());
}
}

View File

@ -26,6 +26,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@ -47,7 +48,7 @@ import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
@EventDriven
@SideEffectFree
@ -58,10 +59,14 @@ import org.apache.commons.lang3.StringUtils;
+ "The results of those Regular Expressions are assigned to FlowFile Attributes. "
+ "Regular Expressions are entered by adding user-defined properties; "
+ "the name of the property maps to the Attribute Name into which the result will be placed. "
+ "The value of the property must be a valid Regular Expressions with exactly one capturing group. "
+ "The first capture group, if any found, will be placed into that attribute name."
+ "But all catpure groups, including the matching string sequence itself will also be "
+ "provided at that attribute name with an index value provided."
+ "The value of the property must be a valid Regular Expressions with one or more capturing groups. "
+ "If the Regular Expression matches more than once, only the first match will be used. "
+ "If any provided Regular Expression matches, the FlowFile(s) will be routed to 'matched'. "
+ "If no provided Regular Expression matches, the FlowFile will be routed to 'unmatched' and no attributes will be applied to the FlowFile.")
+ "If no provided Regular Expression matches, the FlowFile will be routed to 'unmatched' "
+ "and no attributes will be applied to the FlowFile.")
public class ExtractText extends AbstractProcessor {
@ -78,9 +83,18 @@ public class ExtractText extends AbstractProcessor {
.description("Specifies the maximum amount of data to buffer (per file) in order to apply the regular expressions. Files larger than the specified maximum will not be fully evaluated.")
.required(true)
.addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
.addValidator(StandardValidators.createDataSizeBoundsValidator(0, Integer.MAX_VALUE))
.defaultValue("1 MB")
.build();
public static final PropertyDescriptor MAX_CAPTURE_GROUP_LENGTH = new PropertyDescriptor.Builder()
.name("Maximum Capture Group Length")
.description("Specifies the maximum number of characters a given capture group value can have. Any characters beyond the max will be truncated.")
.required(false)
.defaultValue("1024")
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.build();
public static final PropertyDescriptor CANON_EQ = new PropertyDescriptor.Builder()
.name("Enable Canonical Equivalence")
.description("Indicates that two characters match only when their full canonical decompositions match.")
@ -168,27 +182,29 @@ public class ExtractText extends AbstractProcessor {
private Set<Relationship> relationships;
private List<PropertyDescriptor> properties;
private final AtomicReference<Map<String, Pattern>> compiledPattersMapRef = new AtomicReference<>();
@Override
protected void init(final ProcessorInitializationContext context) {
final Set<Relationship> relationships = new HashSet<>();
relationships.add(REL_MATCH);
relationships.add(REL_NO_MATCH);
this.relationships = Collections.unmodifiableSet(relationships);
final Set<Relationship> rels = new HashSet<>();
rels.add(REL_MATCH);
rels.add(REL_NO_MATCH);
this.relationships = Collections.unmodifiableSet(rels);
final List<PropertyDescriptor> properties = new ArrayList<>();
properties.add(CHARACTER_SET);
properties.add(MAX_BUFFER_SIZE);
properties.add(CANON_EQ);
properties.add(CASE_INSENSITIVE);
properties.add(COMMENTS);
properties.add(DOTALL);
properties.add(LITERAL);
properties.add(MULTILINE);
properties.add(UNICODE_CASE);
properties.add(UNICODE_CHARACTER_CLASS);
properties.add(UNIX_LINES);
this.properties = Collections.unmodifiableList(properties);
final List<PropertyDescriptor> props = new ArrayList<>();
props.add(CHARACTER_SET);
props.add(MAX_BUFFER_SIZE);
props.add(MAX_CAPTURE_GROUP_LENGTH);
props.add(CANON_EQ);
props.add(CASE_INSENSITIVE);
props.add(COMMENTS);
props.add(DOTALL);
props.add(LITERAL);
props.add(MULTILINE);
props.add(UNICODE_CASE);
props.add(UNICODE_CHARACTER_CLASS);
props.add(UNIX_LINES);
this.properties = Collections.unmodifiableList(props);
}
@Override
@ -206,77 +222,80 @@ public class ExtractText extends AbstractProcessor {
return new PropertyDescriptor.Builder()
.name(propertyDescriptorName)
.expressionLanguageSupported(false)
.addValidator(StandardValidators.createRegexValidator(1, 1, true))
.addValidator(StandardValidators.createRegexValidator(1, 40, true))
.required(false)
.dynamic(true)
.build();
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) {
final List<FlowFile> flowFileBatch = session.get(50);
if (flowFileBatch.isEmpty()) {
return;
}
final ProcessorLog logger = getLogger();
@OnScheduled
public final void onScheduled(final ProcessContext context) throws IOException {
final Map<String, Pattern> compiledPatternsMap = new HashMap<>();
// Compile the Regular Expressions
Map<String, Matcher> regexMap = new HashMap<>();
for (final Map.Entry<PropertyDescriptor, String> entry : context.getProperties().entrySet()) {
if (!entry.getKey().isDynamic()) {
continue;
}
final int flags = getCompileFlags(context);
final Matcher matcher = Pattern.compile(entry.getValue(), flags).matcher("");
regexMap.put(entry.getKey().getName(), matcher);
final Pattern pattern = Pattern.compile(entry.getValue(), flags);
compiledPatternsMap.put(entry.getKey().getName(), pattern);
}
compiledPattersMapRef.set(compiledPatternsMap);
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) {
FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}
final ProcessorLog logger = getLogger();
final Charset charset = Charset.forName(context.getProperty(CHARACTER_SET).getValue());
final int maxCaptureGroupLength = context.getProperty(MAX_CAPTURE_GROUP_LENGTH).asInteger();
final long maxBufferSizeL = Math.min(flowFile.getSize(), context.getProperty(MAX_BUFFER_SIZE).asDataSize(DataUnit.B).longValue());
final byte[] buffer = new byte[(int) maxBufferSizeL];
final int maxBufferSize = context.getProperty(MAX_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
session.read(flowFile, new InputStreamCallback() {
@Override
public void process(InputStream in) throws IOException {
StreamUtils.fillBuffer(in, buffer, false);
}
});
for (FlowFile flowFile : flowFileBatch) {
final String contentString = new String(buffer, 0, (int) maxBufferSizeL, charset);
final Map<String, String> regexResults = new HashMap<>();
final Map<String, String> regexResults = new HashMap<>();
final Map<String, Pattern> patternMap = compiledPattersMapRef.get();
for (final Map.Entry<String, Pattern> entry : patternMap.entrySet()) {
final byte[] buffer = new byte[maxBufferSize];
final Matcher matcher = entry.getValue().matcher(contentString);
session.read(flowFile, new InputStreamCallback() {
@Override
public void process(InputStream in) throws IOException {
StreamUtils.fillBuffer(in, buffer, false);
}
});
final int flowFileSize = Math.min((int) flowFile.getSize(), maxBufferSize);
final String contentString = new String(buffer, 0, flowFileSize, charset);
for (final Map.Entry<String, Matcher> entry : regexMap.entrySet()) {
final Matcher matcher = entry.getValue();
matcher.reset(contentString);
if (matcher.find()) {
final String group = matcher.group(1);
if (!StringUtils.isBlank(group)) {
regexResults.put(entry.getKey(), group);
if (matcher.find()) {
final String baseKey = entry.getKey();
for (int i = 0; i <= matcher.groupCount(); i++) {
final String key = new StringBuilder(baseKey).append(".").append(i).toString();
String value = matcher.group(i);
if (value.length() > maxCaptureGroupLength) {
value = value.substring(0, maxCaptureGroupLength);
}
regexResults.put(key, value);
if (i == 1) {
regexResults.put(baseKey, value);
}
}
}
}
if (!regexResults.isEmpty()) {
flowFile = session.putAllAttributes(flowFile, regexResults);
session.getProvenanceReporter().modifyAttributes(flowFile);
session.transfer(flowFile, REL_MATCH);
logger.info("Matched {} Regular Expressions and added attributes to FlowFile {}", new Object[]{regexResults.size(), flowFile});
} else {
session.transfer(flowFile, REL_NO_MATCH);
logger.info("Did not match any Regular Expressions for FlowFile {}", new Object[]{flowFile});
}
if (!regexResults.isEmpty()) {
flowFile = session.putAllAttributes(flowFile, regexResults);
session.getProvenanceReporter().modifyAttributes(flowFile);
session.transfer(flowFile, REL_MATCH);
logger.info("Matched {} Regular Expressions and added attributes to FlowFile {}", new Object[]{regexResults.size(), flowFile});
} else {
session.transfer(flowFile, REL_NO_MATCH);
logger.info("Did not match any Regular Expressions for FlowFile {}", new Object[]{flowFile});
}
} // end flowFileLoop
}
int getCompileFlags(ProcessContext context) {

View File

@ -16,7 +16,6 @@
*/
package org.apache.nifi.processors.standard;
import org.apache.nifi.processors.standard.ExtractText;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@ -43,7 +42,7 @@ public class TestExtractText {
testRunner.setProperty("regex.result1", "(?s)(.*)");
testRunner.setProperty("regex.result2", "(?s).*(bar1).*");
testRunner.setProperty("regex.result3", "(?s).*?(bar\\d).*"); // reluctant gets first
testRunner.setProperty("regex.result4", "(?s).*?(?:bar\\d).*?(bar\\d).*"); // reluctant w/ repeated pattern gets second
testRunner.setProperty("regex.result4", "(?s).*?(?:bar\\d).*?(bar\\d).*?(bar3).*"); // reluctant w/ repeated pattern gets second
testRunner.setProperty("regex.result5", "(?s).*(bar\\d).*"); // greedy gets last
testRunner.setProperty("regex.result6", "(?s)^(.*)$");
testRunner.setProperty("regex.result7", "(?s)(XXX)");
@ -57,6 +56,10 @@ public class TestExtractText {
out.assertAttributeEquals("regex.result2", "bar1");
out.assertAttributeEquals("regex.result3", "bar1");
out.assertAttributeEquals("regex.result4", "bar2");
out.assertAttributeEquals("regex.result4.0", SAMPLE_STRING);
out.assertAttributeEquals("regex.result4.1", "bar2");
out.assertAttributeEquals("regex.result4.2", "bar3");
out.assertAttributeNotExists("regex.result4.3");
out.assertAttributeEquals("regex.result5", "bar3");
out.assertAttributeEquals("regex.result6", SAMPLE_STRING);
out.assertAttributeEquals("regex.result7", null);
@ -209,14 +212,6 @@ public class TestExtractText {
}
@Test(expected = java.lang.AssertionError.class)
public void testTooManyCaptureGroups() throws UnsupportedEncodingException {
final TestRunner testRunner = TestRunners.newTestRunner(new ExtractText());
testRunner.setProperty("regex.result1", "(.)(.)");
testRunner.enqueue(SAMPLE_STRING.getBytes("UTF-8"));
testRunner.run();
}
@Test
public void testMatchOutsideBuffer() throws Exception {
final TestRunner testRunner = TestRunners.newTestRunner(new ExtractText());