mirror of https://github.com/apache/nifi.git
NIFI-399 addressed items in the ticket
This commit is contained in:
parent
ad18853b58
commit
a6740a6e2c
|
@ -287,6 +287,28 @@ public class StandardValidators {
|
|||
return createAttributeExpressionLanguageValidator(expectedResultType, true);
|
||||
}
|
||||
|
||||
public static Validator createDataSizeBoundsValidator(final long minBytesInclusive, final long maxBytesInclusive) {
|
||||
return new Validator() {
|
||||
|
||||
@Override
|
||||
public ValidationResult validate(final String subject, final String input, final ValidationContext context) {
|
||||
final ValidationResult vr = DATA_SIZE_VALIDATOR.validate(subject, input, context);
|
||||
if(!vr.isValid()){
|
||||
return vr;
|
||||
}
|
||||
final long dataSizeBytes = DataUnit.parseDataSize(input, DataUnit.B).longValue();
|
||||
if(dataSizeBytes < minBytesInclusive){
|
||||
return new ValidationResult.Builder().subject(subject).input(input).valid(false).explanation("Cannot be smaller than " + minBytesInclusive + " bytes").build();
|
||||
}
|
||||
if(dataSizeBytes > maxBytesInclusive){
|
||||
return new ValidationResult.Builder().subject(subject).input(input).valid(false).explanation("Cannot be larger than " + maxBytesInclusive + " bytes").build();
|
||||
}
|
||||
return new ValidationResult.Builder().subject(subject).input(input).valid(true).build();
|
||||
}
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
public static Validator createRegexMatchingValidator(final Pattern pattern) {
|
||||
return new Validator() {
|
||||
@Override
|
||||
|
|
|
@ -51,4 +51,35 @@ public class TestStandardValidators {
|
|||
vr = val.validate("TimePeriodTest", "1 sec", null);
|
||||
assertTrue(vr.isValid());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDataSizeBoundsValidator() {
|
||||
Validator val = StandardValidators.createDataSizeBoundsValidator(100, 1000);
|
||||
ValidationResult vr;
|
||||
|
||||
vr = val.validate("DataSizeBounds", "5 GB", null);
|
||||
assertFalse(vr.isValid());
|
||||
|
||||
vr = val.validate("DataSizeBounds", "0 B", null);
|
||||
assertFalse(vr.isValid());
|
||||
|
||||
vr = val.validate("DataSizeBounds", "99 B", null);
|
||||
assertFalse(vr.isValid());
|
||||
|
||||
vr = val.validate("DataSizeBounds", "100 B", null);
|
||||
assertTrue(vr.isValid());
|
||||
|
||||
vr = val.validate("DataSizeBounds", "999 B", null);
|
||||
assertTrue(vr.isValid());
|
||||
|
||||
vr = val.validate("DataSizeBounds", "1000 B", null);
|
||||
assertTrue(vr.isValid());
|
||||
|
||||
vr = val.validate("DataSizeBounds", "1001 B", null);
|
||||
assertFalse(vr.isValid());
|
||||
|
||||
vr = val.validate("DataSizeBounds", "water", null);
|
||||
assertFalse(vr.isValid());
|
||||
|
||||
}
|
||||
}
|
||||
|
|
|
@ -26,6 +26,7 @@ import java.util.HashSet;
|
|||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.regex.Matcher;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
|
@ -47,7 +48,7 @@ import org.apache.nifi.annotation.documentation.Tags;
|
|||
import org.apache.nifi.processor.io.InputStreamCallback;
|
||||
import org.apache.nifi.processor.util.StandardValidators;
|
||||
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.nifi.annotation.lifecycle.OnScheduled;
|
||||
|
||||
@EventDriven
|
||||
@SideEffectFree
|
||||
|
@ -58,10 +59,14 @@ import org.apache.commons.lang3.StringUtils;
|
|||
+ "The results of those Regular Expressions are assigned to FlowFile Attributes. "
|
||||
+ "Regular Expressions are entered by adding user-defined properties; "
|
||||
+ "the name of the property maps to the Attribute Name into which the result will be placed. "
|
||||
+ "The value of the property must be a valid Regular Expressions with exactly one capturing group. "
|
||||
+ "The first capture group, if any found, will be placed into that attribute name."
|
||||
+ "But all catpure groups, including the matching string sequence itself will also be "
|
||||
+ "provided at that attribute name with an index value provided."
|
||||
+ "The value of the property must be a valid Regular Expressions with one or more capturing groups. "
|
||||
+ "If the Regular Expression matches more than once, only the first match will be used. "
|
||||
+ "If any provided Regular Expression matches, the FlowFile(s) will be routed to 'matched'. "
|
||||
+ "If no provided Regular Expression matches, the FlowFile will be routed to 'unmatched' and no attributes will be applied to the FlowFile.")
|
||||
+ "If no provided Regular Expression matches, the FlowFile will be routed to 'unmatched' "
|
||||
+ "and no attributes will be applied to the FlowFile.")
|
||||
|
||||
public class ExtractText extends AbstractProcessor {
|
||||
|
||||
|
@ -78,9 +83,18 @@ public class ExtractText extends AbstractProcessor {
|
|||
.description("Specifies the maximum amount of data to buffer (per file) in order to apply the regular expressions. Files larger than the specified maximum will not be fully evaluated.")
|
||||
.required(true)
|
||||
.addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
|
||||
.addValidator(StandardValidators.createDataSizeBoundsValidator(0, Integer.MAX_VALUE))
|
||||
.defaultValue("1 MB")
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor MAX_CAPTURE_GROUP_LENGTH = new PropertyDescriptor.Builder()
|
||||
.name("Maximum Capture Group Length")
|
||||
.description("Specifies the maximum number of characters a given capture group value can have. Any characters beyond the max will be truncated.")
|
||||
.required(false)
|
||||
.defaultValue("1024")
|
||||
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor CANON_EQ = new PropertyDescriptor.Builder()
|
||||
.name("Enable Canonical Equivalence")
|
||||
.description("Indicates that two characters match only when their full canonical decompositions match.")
|
||||
|
@ -168,27 +182,29 @@ public class ExtractText extends AbstractProcessor {
|
|||
|
||||
private Set<Relationship> relationships;
|
||||
private List<PropertyDescriptor> properties;
|
||||
private final AtomicReference<Map<String, Pattern>> compiledPattersMapRef = new AtomicReference<>();
|
||||
|
||||
@Override
|
||||
protected void init(final ProcessorInitializationContext context) {
|
||||
final Set<Relationship> relationships = new HashSet<>();
|
||||
relationships.add(REL_MATCH);
|
||||
relationships.add(REL_NO_MATCH);
|
||||
this.relationships = Collections.unmodifiableSet(relationships);
|
||||
final Set<Relationship> rels = new HashSet<>();
|
||||
rels.add(REL_MATCH);
|
||||
rels.add(REL_NO_MATCH);
|
||||
this.relationships = Collections.unmodifiableSet(rels);
|
||||
|
||||
final List<PropertyDescriptor> properties = new ArrayList<>();
|
||||
properties.add(CHARACTER_SET);
|
||||
properties.add(MAX_BUFFER_SIZE);
|
||||
properties.add(CANON_EQ);
|
||||
properties.add(CASE_INSENSITIVE);
|
||||
properties.add(COMMENTS);
|
||||
properties.add(DOTALL);
|
||||
properties.add(LITERAL);
|
||||
properties.add(MULTILINE);
|
||||
properties.add(UNICODE_CASE);
|
||||
properties.add(UNICODE_CHARACTER_CLASS);
|
||||
properties.add(UNIX_LINES);
|
||||
this.properties = Collections.unmodifiableList(properties);
|
||||
final List<PropertyDescriptor> props = new ArrayList<>();
|
||||
props.add(CHARACTER_SET);
|
||||
props.add(MAX_BUFFER_SIZE);
|
||||
props.add(MAX_CAPTURE_GROUP_LENGTH);
|
||||
props.add(CANON_EQ);
|
||||
props.add(CASE_INSENSITIVE);
|
||||
props.add(COMMENTS);
|
||||
props.add(DOTALL);
|
||||
props.add(LITERAL);
|
||||
props.add(MULTILINE);
|
||||
props.add(UNICODE_CASE);
|
||||
props.add(UNICODE_CHARACTER_CLASS);
|
||||
props.add(UNIX_LINES);
|
||||
this.properties = Collections.unmodifiableList(props);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -206,77 +222,80 @@ public class ExtractText extends AbstractProcessor {
|
|||
return new PropertyDescriptor.Builder()
|
||||
.name(propertyDescriptorName)
|
||||
.expressionLanguageSupported(false)
|
||||
.addValidator(StandardValidators.createRegexValidator(1, 1, true))
|
||||
.addValidator(StandardValidators.createRegexValidator(1, 40, true))
|
||||
.required(false)
|
||||
.dynamic(true)
|
||||
.build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onTrigger(final ProcessContext context, final ProcessSession session) {
|
||||
final List<FlowFile> flowFileBatch = session.get(50);
|
||||
if (flowFileBatch.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
final ProcessorLog logger = getLogger();
|
||||
@OnScheduled
|
||||
public final void onScheduled(final ProcessContext context) throws IOException {
|
||||
final Map<String, Pattern> compiledPatternsMap = new HashMap<>();
|
||||
|
||||
// Compile the Regular Expressions
|
||||
Map<String, Matcher> regexMap = new HashMap<>();
|
||||
for (final Map.Entry<PropertyDescriptor, String> entry : context.getProperties().entrySet()) {
|
||||
if (!entry.getKey().isDynamic()) {
|
||||
continue;
|
||||
}
|
||||
final int flags = getCompileFlags(context);
|
||||
final Matcher matcher = Pattern.compile(entry.getValue(), flags).matcher("");
|
||||
regexMap.put(entry.getKey().getName(), matcher);
|
||||
final Pattern pattern = Pattern.compile(entry.getValue(), flags);
|
||||
compiledPatternsMap.put(entry.getKey().getName(), pattern);
|
||||
}
|
||||
compiledPattersMapRef.set(compiledPatternsMap);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onTrigger(final ProcessContext context, final ProcessSession session) {
|
||||
FlowFile flowFile = session.get();
|
||||
if (flowFile == null) {
|
||||
return;
|
||||
}
|
||||
final ProcessorLog logger = getLogger();
|
||||
final Charset charset = Charset.forName(context.getProperty(CHARACTER_SET).getValue());
|
||||
final int maxCaptureGroupLength = context.getProperty(MAX_CAPTURE_GROUP_LENGTH).asInteger();
|
||||
final long maxBufferSizeL = Math.min(flowFile.getSize(), context.getProperty(MAX_BUFFER_SIZE).asDataSize(DataUnit.B).longValue());
|
||||
final byte[] buffer = new byte[(int) maxBufferSizeL];
|
||||
|
||||
final int maxBufferSize = context.getProperty(MAX_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
|
||||
session.read(flowFile, new InputStreamCallback() {
|
||||
@Override
|
||||
public void process(InputStream in) throws IOException {
|
||||
StreamUtils.fillBuffer(in, buffer, false);
|
||||
}
|
||||
});
|
||||
|
||||
for (FlowFile flowFile : flowFileBatch) {
|
||||
final String contentString = new String(buffer, 0, (int) maxBufferSizeL, charset);
|
||||
final Map<String, String> regexResults = new HashMap<>();
|
||||
|
||||
final Map<String, String> regexResults = new HashMap<>();
|
||||
final Map<String, Pattern> patternMap = compiledPattersMapRef.get();
|
||||
for (final Map.Entry<String, Pattern> entry : patternMap.entrySet()) {
|
||||
|
||||
final byte[] buffer = new byte[maxBufferSize];
|
||||
final Matcher matcher = entry.getValue().matcher(contentString);
|
||||
|
||||
session.read(flowFile, new InputStreamCallback() {
|
||||
@Override
|
||||
public void process(InputStream in) throws IOException {
|
||||
StreamUtils.fillBuffer(in, buffer, false);
|
||||
}
|
||||
});
|
||||
|
||||
final int flowFileSize = Math.min((int) flowFile.getSize(), maxBufferSize);
|
||||
|
||||
final String contentString = new String(buffer, 0, flowFileSize, charset);
|
||||
|
||||
for (final Map.Entry<String, Matcher> entry : regexMap.entrySet()) {
|
||||
|
||||
final Matcher matcher = entry.getValue();
|
||||
|
||||
matcher.reset(contentString);
|
||||
|
||||
if (matcher.find()) {
|
||||
final String group = matcher.group(1);
|
||||
if (!StringUtils.isBlank(group)) {
|
||||
regexResults.put(entry.getKey(), group);
|
||||
if (matcher.find()) {
|
||||
final String baseKey = entry.getKey();
|
||||
for (int i = 0; i <= matcher.groupCount(); i++) {
|
||||
final String key = new StringBuilder(baseKey).append(".").append(i).toString();
|
||||
String value = matcher.group(i);
|
||||
if (value.length() > maxCaptureGroupLength) {
|
||||
value = value.substring(0, maxCaptureGroupLength);
|
||||
}
|
||||
regexResults.put(key, value);
|
||||
if (i == 1) {
|
||||
regexResults.put(baseKey, value);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (!regexResults.isEmpty()) {
|
||||
flowFile = session.putAllAttributes(flowFile, regexResults);
|
||||
session.getProvenanceReporter().modifyAttributes(flowFile);
|
||||
session.transfer(flowFile, REL_MATCH);
|
||||
logger.info("Matched {} Regular Expressions and added attributes to FlowFile {}", new Object[]{regexResults.size(), flowFile});
|
||||
} else {
|
||||
session.transfer(flowFile, REL_NO_MATCH);
|
||||
logger.info("Did not match any Regular Expressions for FlowFile {}", new Object[]{flowFile});
|
||||
}
|
||||
if (!regexResults.isEmpty()) {
|
||||
flowFile = session.putAllAttributes(flowFile, regexResults);
|
||||
session.getProvenanceReporter().modifyAttributes(flowFile);
|
||||
session.transfer(flowFile, REL_MATCH);
|
||||
logger.info("Matched {} Regular Expressions and added attributes to FlowFile {}", new Object[]{regexResults.size(), flowFile});
|
||||
} else {
|
||||
session.transfer(flowFile, REL_NO_MATCH);
|
||||
logger.info("Did not match any Regular Expressions for FlowFile {}", new Object[]{flowFile});
|
||||
}
|
||||
|
||||
} // end flowFileLoop
|
||||
}
|
||||
|
||||
int getCompileFlags(ProcessContext context) {
|
||||
|
|
|
@ -16,7 +16,6 @@
|
|||
*/
|
||||
package org.apache.nifi.processors.standard;
|
||||
|
||||
import org.apache.nifi.processors.standard.ExtractText;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
|
@ -43,7 +42,7 @@ public class TestExtractText {
|
|||
testRunner.setProperty("regex.result1", "(?s)(.*)");
|
||||
testRunner.setProperty("regex.result2", "(?s).*(bar1).*");
|
||||
testRunner.setProperty("regex.result3", "(?s).*?(bar\\d).*"); // reluctant gets first
|
||||
testRunner.setProperty("regex.result4", "(?s).*?(?:bar\\d).*?(bar\\d).*"); // reluctant w/ repeated pattern gets second
|
||||
testRunner.setProperty("regex.result4", "(?s).*?(?:bar\\d).*?(bar\\d).*?(bar3).*"); // reluctant w/ repeated pattern gets second
|
||||
testRunner.setProperty("regex.result5", "(?s).*(bar\\d).*"); // greedy gets last
|
||||
testRunner.setProperty("regex.result6", "(?s)^(.*)$");
|
||||
testRunner.setProperty("regex.result7", "(?s)(XXX)");
|
||||
|
@ -57,6 +56,10 @@ public class TestExtractText {
|
|||
out.assertAttributeEquals("regex.result2", "bar1");
|
||||
out.assertAttributeEquals("regex.result3", "bar1");
|
||||
out.assertAttributeEquals("regex.result4", "bar2");
|
||||
out.assertAttributeEquals("regex.result4.0", SAMPLE_STRING);
|
||||
out.assertAttributeEquals("regex.result4.1", "bar2");
|
||||
out.assertAttributeEquals("regex.result4.2", "bar3");
|
||||
out.assertAttributeNotExists("regex.result4.3");
|
||||
out.assertAttributeEquals("regex.result5", "bar3");
|
||||
out.assertAttributeEquals("regex.result6", SAMPLE_STRING);
|
||||
out.assertAttributeEquals("regex.result7", null);
|
||||
|
@ -209,14 +212,6 @@ public class TestExtractText {
|
|||
|
||||
}
|
||||
|
||||
@Test(expected = java.lang.AssertionError.class)
|
||||
public void testTooManyCaptureGroups() throws UnsupportedEncodingException {
|
||||
final TestRunner testRunner = TestRunners.newTestRunner(new ExtractText());
|
||||
testRunner.setProperty("regex.result1", "(.)(.)");
|
||||
testRunner.enqueue(SAMPLE_STRING.getBytes("UTF-8"));
|
||||
testRunner.run();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMatchOutsideBuffer() throws Exception {
|
||||
final TestRunner testRunner = TestRunners.newTestRunner(new ExtractText());
|
||||
|
|
Loading…
Reference in New Issue